From 2e327070a1703ef7adabc785c741631b0fc5ea96 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Tue, 29 Jul 2025 18:22:51 +0200 Subject: [PATCH 01/32] This should fix the concurrencyerrors --- async_substrate_interface/async_substrate.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index fc4f034..efe1985 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -613,9 +613,10 @@ async def connect(self, force=False): self.ws = await asyncio.wait_for( connect(self.ws_url, **self._options), timeout=10.0 ) - self._receiving_task = asyncio.get_running_loop().create_task( - self._start_receiving() - ) + if self._receiving_task is None or self._receiving_task.done(): + self._receiving_task = asyncio.get_running_loop().create_task( + self._start_receiving() + ) self._initialized = True finally: self._is_connecting = False From b2cf95bdafb9cee73e1c3f2b9f8bc85d030ff82a Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Wed, 30 Jul 2025 20:58:44 +0200 Subject: [PATCH 02/32] Added better typing --- async_substrate_interface/async_substrate.py | 56 +++++++++++--------- async_substrate_interface/sync_substrate.py | 45 +++++++++------- async_substrate_interface/types.py | 2 +- async_substrate_interface/utils/cache.py | 4 +- async_substrate_interface/utils/decoding.py | 2 +- async_substrate_interface/utils/storage.py | 6 +-- 6 files changed, 64 insertions(+), 51 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index efe1985..ec6703a 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -689,7 +689,7 @@ async def _start_receiving(self): except ConnectionClosed: await self.connect(force=True) - async def send(self, payload: dict) -> int: + async def send(self, payload: dict) -> str: """ Sends a payload to the websocket connection. @@ -714,6 +714,7 @@ async def send(self, payload: dict) -> int: return original_id except (ConnectionClosed, ssl.SSLError, EOFError): await self.connect(force=True) + return await self.send(payload) async def retrieve(self, item_id: int) -> Optional[dict]: """ @@ -911,7 +912,7 @@ async def name(self): return self._name async def get_storage_item( - self, module: str, storage_function: str, block_hash: str = None + self, module: str, storage_function: str, block_hash: Optional[str] = None ): runtime = await self.init_runtime(block_hash=block_hash) metadata_pallet = runtime.metadata.get_metadata_pallet(module) @@ -1154,7 +1155,7 @@ async def create_storage_key( pallet: str, storage_function: str, params: Optional[list] = None, - block_hash: str = None, + block_hash: Optional[str] = None, ) -> StorageKey: """ Create a `StorageKey` instance providing storage function details. See `subscribe_storage()`. @@ -1169,7 +1170,7 @@ async def create_storage_key( StorageKey """ runtime = await self.init_runtime(block_hash=block_hash) - + params = params or [] return StorageKey.create_from_storage_function( pallet, storage_function, @@ -1424,7 +1425,7 @@ async def get_metadata_error( return error async def get_metadata_runtime_call_functions( - self, block_hash: str = None, runtime: Optional[Runtime] = None + self, block_hash: Optional[str] = None, runtime: Optional[Runtime] = None ) -> list[GenericRuntimeCallDefinition]: """ Get a list of available runtime API calls @@ -1763,7 +1764,7 @@ async def get_block_header( ignore_decoding_errors: bool = False, include_author: bool = False, finalized_only: bool = False, - ) -> dict: + ) -> Optional[dict]: """ Retrieves a block header and decodes its containing log digest items. If `block_hash` and `block_number` is omitted the chain tip will be retrieved, or the finalized head if `finalized_only` is set to true. @@ -1790,7 +1791,7 @@ async def get_block_header( block_hash = await self.get_block_hash(block_number) if block_hash is None: - return + return None if block_hash and finalized_only: raise ValueError( @@ -1820,7 +1821,7 @@ async def get_block_header( async def subscribe_block_headers( self, - subscription_handler: callable, + subscription_handler: Callable, ignore_decoding_errors: bool = False, include_author: bool = False, finalized_only=False, @@ -1902,7 +1903,7 @@ def retrieve_extrinsic_by_hash( ) async def get_extrinsics( - self, block_hash: str = None, block_number: int = None + self, block_hash: Optional[str] = None, block_number: Optional[int] = None ) -> Optional[list["AsyncExtrinsicReceipt"]]: """ Return all extrinsics for given block_hash or block_number @@ -2780,7 +2781,7 @@ async def create_signed_extrinsic( self, call: GenericCall, keypair: Keypair, - era: Optional[dict] = None, + era: Optional[Union[dict, str]] = None, nonce: Optional[int] = None, tip: int = 0, tip_asset_id: Optional[int] = None, @@ -2932,12 +2933,12 @@ async def _do_runtime_call_old( params: Optional[Union[list, dict]] = None, block_hash: Optional[str] = None, runtime: Optional[Runtime] = None, - ) -> ScaleType: + ) -> ScaleObj: logger.debug( f"Decoding old runtime call: {api}.{method} with params: {params} at block hash: {block_hash}" ) runtime_call_def = _TYPE_REGISTRY["runtime_api"][api]["methods"][method] - + params = params or [] # Encode params param_data = b"" @@ -3245,7 +3246,7 @@ async def get_payment_info( return result.value async def get_type_registry( - self, block_hash: str = None, max_recursion: int = 4 + self, block_hash: Optional[str] = None, max_recursion: int = 4 ) -> dict: """ Generates an exhaustive list of which RUST types exist in the runtime specified at given block_hash (or @@ -3284,7 +3285,7 @@ async def get_type_registry( return type_registry async def get_type_definition( - self, type_string: str, block_hash: str = None + self, type_string: str, block_hash: Optional[str] = None ) -> str: """ Retrieves SCALE encoding specifications of given type_string @@ -3589,11 +3590,11 @@ async def create_multisig_extrinsic( keypair: Keypair, multisig_account: MultiAccountId, max_weight: Optional[Union[dict, int]] = None, - era: dict = None, - nonce: int = None, + era: Optional[dict] = None, + nonce: Optional[int] = None, tip: int = 0, - tip_asset_id: int = None, - signature: Union[bytes, str] = None, + tip_asset_id: Optional[int] = None, + signature: Optional[Union[bytes, str]] = None, ) -> GenericExtrinsic: """ Create a Multisig extrinsic that will be signed by one of the signatories. Checks on-chain if the threshold @@ -3878,6 +3879,9 @@ async def get_block_number(self, block_hash: Optional[str] = None) -> int: elif "result" in response: if response["result"]: return int(response["result"]["number"], 16) + raise SubstrateRequestException( + f"Unable to retrieve block number for {block_hash}" + ) async def close(self): """ @@ -3973,14 +3977,14 @@ async def get_async_substrate_interface( """ substrate = AsyncSubstrateInterface( url, - use_remote_preset, - auto_discover, - ss58_format, - type_registry, - chain_name, - max_retries, - retry_timeout, - _mock, + use_remote_preset=use_remote_preset, + auto_discover=auto_discover, + ss58_format=ss58_format, + type_registry=type_registry, + chain_name=chain_name, + max_retries=max_retries, + retry_timeout=retry_timeout, + _mock=_mock, ) await substrate.initialize() return substrate diff --git a/async_substrate_interface/sync_substrate.py b/async_substrate_interface/sync_substrate.py index f6d1876..927caf6 100644 --- a/async_substrate_interface/sync_substrate.py +++ b/async_substrate_interface/sync_substrate.py @@ -641,7 +641,7 @@ def connect(self, init=False): raise ConnectionError(e) def get_storage_item( - self, module: str, storage_function: str, block_hash: str = None + self, module: str, storage_function: str, block_hash: Optional[str] = None ): self.init_runtime(block_hash=block_hash) metadata_pallet = self.runtime.metadata.get_metadata_pallet(module) @@ -659,7 +659,9 @@ def _get_current_block_hash( return self.last_block_hash return block_hash - def _load_registry_at_block(self, block_hash: Optional[str]) -> MetadataV15: + def _load_registry_at_block( + self, block_hash: Optional[str] + ) -> tuple[Optional[MetadataV15], Optional[PortableRegistry]]: # Should be called for any block that fails decoding. # Possibly the metadata was different. try: @@ -864,7 +866,7 @@ def create_storage_key( pallet: str, storage_function: str, params: Optional[list] = None, - block_hash: str = None, + block_hash: Optional[str] = None, ) -> StorageKey: """ Create a `StorageKey` instance providing storage function details. See `subscribe_storage()`. @@ -883,7 +885,7 @@ def create_storage_key( return StorageKey.create_from_storage_function( pallet, storage_function, - params, + params or [], runtime_config=self.runtime_config, metadata=self.runtime.metadata, ) @@ -1104,7 +1106,7 @@ def get_metadata_error(self, module_name, error_name, block_hash=None): return error def get_metadata_runtime_call_functions( - self, block_hash: str = None + self, block_hash: Optional[str] = None ) -> list[GenericRuntimeCallDefinition]: """ Get a list of available runtime API calls @@ -1124,7 +1126,7 @@ def get_metadata_runtime_call_functions( return call_functions def get_metadata_runtime_call_function( - self, api: str, method: str, block_hash: str = None + self, api: str, method: str, block_hash: Optional[str] = None ) -> GenericRuntimeCallDefinition: """ Get details of a runtime API call @@ -1416,7 +1418,7 @@ def get_block_header( ignore_decoding_errors: bool = False, include_author: bool = False, finalized_only: bool = False, - ) -> dict: + ) -> Optional[dict]: """ Retrieves a block header and decodes its containing log digest items. If `block_hash` and `block_number` is omitted the chain tip will be retrieved, or the finalized head if `finalized_only` is set to true. @@ -1473,7 +1475,7 @@ def get_block_header( def subscribe_block_headers( self, - subscription_handler: callable, + subscription_handler: Callable, ignore_decoding_errors: bool = False, include_author: bool = False, finalized_only=False, @@ -1555,7 +1557,7 @@ def retrieve_extrinsic_by_hash( ) def get_extrinsics( - self, block_hash: str = None, block_number: int = None + self, block_hash: str = None, block_number: Optional[int] = None ) -> Optional[list["ExtrinsicReceipt"]]: """ Return all extrinsics for given block_hash or block_number @@ -2349,7 +2351,7 @@ def create_signed_extrinsic( self, call: GenericCall, keypair: Keypair, - era: Optional[dict] = None, + era: Optional[Union[dict, str]] = None, nonce: Optional[int] = None, tip: int = 0, tip_asset_id: Optional[int] = None, @@ -2496,7 +2498,7 @@ def _do_runtime_call_old( method: str, params: Optional[Union[list, dict]] = None, block_hash: Optional[str] = None, - ) -> ScaleType: + ) -> ScaleObj: logger.debug( f"Decoding old runtime call: {api}.{method} with params: {params} at block hash: {block_hash}" ) @@ -2544,7 +2546,7 @@ def runtime_call( method: str, params: Optional[Union[list, dict]] = None, block_hash: Optional[str] = None, - ) -> ScaleType: + ) -> ScaleObj: """ Calls a runtime API method @@ -2770,7 +2772,9 @@ def get_payment_info(self, call: GenericCall, keypair: Keypair) -> dict[str, Any return result.value - def get_type_registry(self, block_hash: str = None, max_recursion: int = 4) -> dict: + def get_type_registry( + self, block_hash: Optional[str] = None, max_recursion: int = 4 + ) -> dict: """ Generates an exhaustive list of which RUST types exist in the runtime specified at given block_hash (or chaintip if block_hash is omitted) @@ -2807,7 +2811,9 @@ def get_type_registry(self, block_hash: str = None, max_recursion: int = 4) -> d return type_registry - def get_type_definition(self, type_string: str, block_hash: str = None) -> str: + def get_type_definition( + self, type_string: str, block_hash: Optional[str] = None + ) -> str: """ Retrieves SCALE encoding specifications of given type_string @@ -3052,11 +3058,11 @@ def create_multisig_extrinsic( keypair: Keypair, multisig_account: MultiAccountId, max_weight: Optional[Union[dict, int]] = None, - era: dict = None, - nonce: int = None, + era: Optional[dict] = None, + nonce: Optional[int] = None, tip: int = 0, - tip_asset_id: int = None, - signature: Union[bytes, str] = None, + tip_asset_id: Optional[int] = None, + signature: Optional[Union[bytes, str]] = None, ) -> GenericExtrinsic: """ Create a Multisig extrinsic that will be signed by one of the signatories. Checks on-chain if the threshold @@ -3333,6 +3339,9 @@ def get_block_number(self, block_hash: Optional[str] = None) -> int: elif "result" in response: if response["result"]: return int(response["result"]["number"], 16) + raise SubstrateRequestException( + f"Unable to determine block number for {block_hash}" + ) def close(self): """ diff --git a/async_substrate_interface/types.py b/async_substrate_interface/types.py index 95575bf..bcf2fe1 100644 --- a/async_substrate_interface/types.py +++ b/async_substrate_interface/types.py @@ -372,7 +372,7 @@ def __init__(self, payloads): self.responses = defaultdict(lambda: {"complete": False, "results": []}) self.payloads_count = len(payloads) - def add_request(self, item_id: int, request_id: Any): + def add_request(self, item_id: Union[int, str], request_id: Any): """ Adds an outgoing request to the responses map for later retrieval """ diff --git a/async_substrate_interface/utils/cache.py b/async_substrate_interface/utils/cache.py index 23bbf9f..cf40539 100644 --- a/async_substrate_interface/utils/cache.py +++ b/async_substrate_interface/utils/cache.py @@ -127,7 +127,7 @@ def inner(self, *args, **kwargs): return decorator -def async_sql_lru_cache(maxsize=None): +def async_sql_lru_cache(maxsize: Optional[int] = None): def decorator(func): @cached_fetcher(max_size=maxsize) async def inner(self, *args, **kwargs): @@ -283,7 +283,7 @@ def __get__(self, instance, owner): return self._instances[instance] -def cached_fetcher(max_size: int, cache_key_index: int = 0): +def cached_fetcher(max_size: Optional[int] = None, cache_key_index: int = 0): """Wrapper for CachedFetcher. See example in CachedFetcher docstring.""" def wrapper(method): diff --git a/async_substrate_interface/utils/decoding.py b/async_substrate_interface/utils/decoding.py index af8d969..1dc494a 100644 --- a/async_substrate_interface/utils/decoding.py +++ b/async_substrate_interface/utils/decoding.py @@ -160,7 +160,7 @@ def concat_hash_len(key_hasher: str) -> int: def legacy_scale_decode( - type_string: str, scale_bytes: Union[str, ScaleBytes], runtime: "Runtime" + type_string: str, scale_bytes: Union[str, bytes, ScaleBytes], runtime: "Runtime" ): if isinstance(scale_bytes, (str, bytes)): scale_bytes = ScaleBytes(scale_bytes) diff --git a/async_substrate_interface/utils/storage.py b/async_substrate_interface/utils/storage.py index 5778887..f697c8a 100644 --- a/async_substrate_interface/utils/storage.py +++ b/async_substrate_interface/utils/storage.py @@ -48,9 +48,9 @@ def create_from_data( data: bytes, runtime_config: RuntimeConfigurationObject, metadata: GenericMetadataVersioned, - value_scale_type: str = None, - pallet: str = None, - storage_function: str = None, + value_scale_type: Optional[str] = None, + pallet: Optional[str] = None, + storage_function: Optional[str] = None, ) -> "StorageKey": """ Create a StorageKey instance providing raw storage key bytes From eeabb539d2de26736e30c4ab4f5215387dad3270 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Wed, 30 Jul 2025 21:03:23 +0200 Subject: [PATCH 03/32] Typing --- async_substrate_interface/async_substrate.py | 26 ++++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index ec6703a..f609bfe 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -1015,7 +1015,7 @@ async def decode_scale( # Decode AccountId bytes to SS58 address return ss58_encode(scale_bytes, self.ss58_format) else: - if not runtime: + if runtime is None: runtime = await self.init_runtime(block_hash=block_hash) if runtime.metadata_v15 is not None and force_legacy is False: obj = decode_by_type_string(type_string, runtime.registry, scale_bytes) @@ -1318,7 +1318,7 @@ async def get_metadata_storage_functions( Returns: list of storage functions """ - if not runtime: + if runtime is None: runtime = await self.init_runtime(block_hash=block_hash) storage_list = [] @@ -1356,7 +1356,7 @@ async def get_metadata_storage_function( Returns: Metadata storage function """ - if not runtime: + if runtime is None: runtime = await self.init_runtime(block_hash=block_hash) pallet = runtime.metadata.get_metadata_pallet(module_name) @@ -1377,7 +1377,7 @@ async def get_metadata_errors( Returns: list of errors in the metadata """ - if not runtime: + if runtime is None: runtime = await self.init_runtime(block_hash=block_hash) error_list = [] @@ -1415,7 +1415,7 @@ async def get_metadata_error( error """ - if not runtime: + if runtime is None: runtime = await self.init_runtime(block_hash=block_hash) for module_idx, module in enumerate(runtime.metadata.pallets): @@ -1433,7 +1433,7 @@ async def get_metadata_runtime_call_functions( Returns: list of runtime call functions """ - if not runtime: + if runtime is None: runtime = await self.init_runtime(block_hash=block_hash) call_functions = [] @@ -1467,7 +1467,7 @@ async def get_metadata_runtime_call_function( Returns: GenericRuntimeCallDefinition """ - if not runtime: + if runtime is None: runtime = await self.init_runtime(block_hash=block_hash) try: @@ -2142,7 +2142,7 @@ async def _preprocess( """ params = query_for if query_for else [] # Search storage call in metadata - if not runtime: + if runtime is None: runtime = self.runtime metadata_pallet = runtime.metadata.get_metadata_pallet(module) @@ -2504,7 +2504,7 @@ async def query_multiple( block_hash = await self._get_current_block_hash(block_hash, reuse_block_hash) if block_hash: self.last_block_hash = block_hash - if not runtime: + if runtime is None: runtime = await self.init_runtime(block_hash=block_hash) preprocessed: tuple[Preprocessed] = await asyncio.gather( *[ @@ -2562,7 +2562,7 @@ async def query_multi( Returns: list of `(storage_key, scale_obj)` tuples """ - if not runtime: + if runtime is None: runtime = await self.init_runtime(block_hash=block_hash) # Retrieve corresponding value @@ -2617,7 +2617,7 @@ async def create_scale_object( Returns: The created Scale Type object """ - if not runtime: + if runtime is None: runtime = await self.init_runtime(block_hash=block_hash) if "metadata" not in kwargs: kwargs["metadata"] = runtime.metadata @@ -3160,7 +3160,7 @@ async def get_metadata_constant( Returns: MetadataModuleConstants """ - if not runtime: + if runtime is None: runtime = await self.init_runtime(block_hash=block_hash) for module in runtime.metadata.pallets: @@ -3361,7 +3361,7 @@ async def query( block_hash = await self._get_current_block_hash(block_hash, reuse_block_hash) if block_hash: self.last_block_hash = block_hash - if not runtime: + if runtime is None: runtime = await self.init_runtime(block_hash=block_hash) preprocessed: Preprocessed = await self._preprocess( params, From 12676fa8aad1c76ad2f267c805a55a5c1f35a2c8 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Fri, 1 Aug 2025 12:30:21 +0200 Subject: [PATCH 04/32] WIP check-in --- async_substrate_interface/async_substrate.py | 160 ++++++++++--------- 1 file changed, 83 insertions(+), 77 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index f609bfe..92bf5b7 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -31,8 +31,9 @@ ss58_encode, MultiAccountId, ) -from websockets.asyncio.client import connect +from websockets.asyncio.client import connect, ClientConnection from websockets.exceptions import ConnectionClosed, WebSocketException +from websockets.protocol import State from async_substrate_interface.errors import ( SubstrateRequestException, @@ -72,9 +73,6 @@ decode_query_map, ) -if TYPE_CHECKING: - from websockets.asyncio.client import ClientConnection - ResultHandler = Callable[[dict, Any], Awaitable[tuple[dict, bool]]] logger = logging.getLogger("async_substrate_interface") @@ -516,6 +514,7 @@ def __getitem__(self, item): class Websocket: + ws: def __init__( self, ws_url: str, @@ -538,22 +537,19 @@ def __init__( # TODO allow setting max concurrent connections and rpc subscriptions per connection # TODO reconnection logic self.ws_url = ws_url - self.ws: Optional["ClientConnection"] = None + self.ws: Optional[ClientConnection] = None self.max_subscriptions = asyncio.Semaphore(max_subscriptions) self.max_connections = max_connections self.shutdown_timer = shutdown_timer self._received = {} - self._in_use = 0 - self._receiving_task = None + self._sending = asyncio.Queue() + self._receiving_task = None # TODO rename, as this now does send/recv self._attempts = 0 - self._initialized = False + self._initialized = False # TODO remove self._lock = asyncio.Lock() self._exit_task = None - self._open_subscriptions = 0 self._options = options if options else {} self._log_raw_websockets = _log_raw_websockets - self._is_connecting = False - self._is_closing = False try: now = asyncio.get_running_loop().time() @@ -570,9 +566,16 @@ def __init__( self.last_sent = now self._in_use_ids = set() + @property + def state(self): + if self.ws is None: + return State.CLOSED + else: + return self.ws.state + async def __aenter__(self): - self._in_use += 1 - await self.connect() + if self.state not in (State.CONNECTING, State.OPEN): + await self.connect() return self @staticmethod @@ -596,47 +599,47 @@ async def _cancel(self): ) async def connect(self, force=False): - self._is_connecting = True - try: - now = await self.loop_time() - self.last_received = now - self.last_sent = now - if self._exit_task: - self._exit_task.cancel() - if not self._is_closing: - if not self._initialized or force: - try: - await asyncio.wait_for(self._cancel(), timeout=10.0) - except asyncio.TimeoutError: - pass - - self.ws = await asyncio.wait_for( - connect(self.ws_url, **self._options), timeout=10.0 + now = await self.loop_time() + self.last_received = now + self.last_sent = now + if self._exit_task: + self._exit_task.cancel() + if self.state != State.CLOSING: + if not self._initialized or force: + try: + await asyncio.wait_for(self._cancel(), timeout=10.0) + except asyncio.TimeoutError: + pass + self.ws = await asyncio.wait_for( + connect(self.ws_url, **self._options), timeout=10.0 + ) + if self._receiving_task is None or self._receiving_task.done(): + self._receiving_task = asyncio.get_running_loop().create_task( + self._handler(self.ws) ) - if self._receiving_task is None or self._receiving_task.done(): - self._receiving_task = asyncio.get_running_loop().create_task( - self._start_receiving() - ) - self._initialized = True - finally: - self._is_connecting = False + self._initialized = True + + async def _handler(self, ws: ClientConnection): + consumer_task = asyncio.create_task(self._start_receiving(ws)) + producer_task = asyncio.create_task(self._start_sending(ws)) + # TODO should attach futures and add exceptions to them + done, pending = await asyncio.wait( + [consumer_task, producer_task], + return_when=asyncio.FIRST_COMPLETED, + ) + for task in pending: + task.cancel() async def __aexit__(self, exc_type, exc_val, exc_tb): - self._is_closing = True - try: - if not self._is_connecting: - self._in_use -= 1 - if self._exit_task is not None: - self._exit_task.cancel() - try: - await self._exit_task - except asyncio.CancelledError: - pass - if self._in_use == 0 and self.ws is not None: - self._open_subscriptions = 0 - self._exit_task = asyncio.create_task(self._exit_with_timer()) - finally: - self._is_closing = False + if not self.state != State.CONNECTING: + if self._exit_task is not None: + self._exit_task.cancel() + try: + await self._exit_task + except asyncio.CancelledError: + pass + if self.ws is not None: + self._exit_task = asyncio.create_task(self._exit_with_timer()) async def _exit_with_timer(self): """ @@ -660,12 +663,10 @@ async def shutdown(self): self._receiving_task = None self._is_closing = False - async def _recv(self) -> None: + async def _recv(self, recd) -> None: try: - # TODO consider wrapping this in asyncio.wait_for and use that for the timeout logic - recd = await self.ws.recv(decode=False) if self._log_raw_websockets: - raw_websocket_logger.debug(f"WEBSOCKET_RECEIVE> {recd.decode()}") + raw_websocket_logger.debug(f"WEBSOCKET_RECEIVE> {recd}") response = json.loads(recd) self.last_received = await self.loop_time() if "id" in response: @@ -680,14 +681,24 @@ async def _recv(self) -> None: except (ConnectionClosed, KeyError): raise - async def _start_receiving(self): + async def _start_receiving(self, ws: ClientConnection) -> Exception: + try: + async for recd in ws: + await self._recv(recd) + except Exception as e: + return e + + async def _start_sending(self, ws) -> Exception: try: while True: - await self._recv() - except asyncio.CancelledError: - pass - except ConnectionClosed: - await self.connect(force=True) + logger.info("699 Not Empty") + to_send = await self._sending.get() + if self._log_raw_websockets: + raw_websocket_logger.debug(f"WEBSOCKET_SEND> {to_send}") + await ws.send(json.dumps(to_send)) + self.last_sent = await self.loop_time() + except Exception as e: + return e async def send(self, payload: dict) -> str: """ @@ -699,22 +710,16 @@ async def send(self, payload: dict) -> str: Returns: id: the internal ID of the request (incremented int) """ - original_id = get_next_id() - while original_id in self._in_use_ids: - original_id = get_next_id() - self._in_use_ids.add(original_id) - # self._open_subscriptions += 1 await self.max_subscriptions.acquire() - try: - to_send = {**payload, **{"id": original_id}} - if self._log_raw_websockets: - raw_websocket_logger.debug(f"WEBSOCKET_SEND> {to_send}") - await self.ws.send(json.dumps(to_send)) - self.last_sent = await self.loop_time() - return original_id - except (ConnectionClosed, ssl.SSLError, EOFError): - await self.connect(force=True) - return await self.send(payload) + async with self._lock: + original_id = get_next_id() + while original_id in self._in_use_ids: + original_id = get_next_id() + self._in_use_ids.add(original_id) + to_send = {**payload, **{"id": original_id}} + logger.info(f"Sending {to_send}") + await self._sending.put(to_send) + return original_id async def retrieve(self, item_id: int) -> Optional[dict]: """ @@ -827,6 +832,7 @@ async def initialize(self): """ self._initializing = True if not self.initialized: + await self.ws.connect() if not self._chain: chain = await self.rpc_request("system_chain", []) self._chain = chain.get("result") @@ -845,7 +851,7 @@ async def initialize(self): self._initializing = False async def __aexit__(self, exc_type, exc_val, exc_tb): - pass + await self.ws.shutdown() @property def metadata(self): From 54026f654ecad7a4f42b30d0e767dbf3674169c7 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Fri, 1 Aug 2025 12:30:48 +0200 Subject: [PATCH 05/32] WIP check-in --- async_substrate_interface/async_substrate.py | 1 - 1 file changed, 1 deletion(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 92bf5b7..86901d5 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -514,7 +514,6 @@ def __getitem__(self, item): class Websocket: - ws: def __init__( self, ws_url: str, From 14faed1d73bfbff14363b6dd23fcbae8e9b53aa8 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Fri, 1 Aug 2025 14:03:47 +0200 Subject: [PATCH 06/32] WIP check-in --- async_substrate_interface/async_substrate.py | 82 ++++++++++---------- 1 file changed, 42 insertions(+), 40 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 86901d5..6f40805 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -540,7 +540,7 @@ def __init__( self.max_subscriptions = asyncio.Semaphore(max_subscriptions) self.max_connections = max_connections self.shutdown_timer = shutdown_timer - self._received = {} + self._received: dict[str, asyncio.Future] = {} self._sending = asyncio.Queue() self._receiving_task = None # TODO rename, as this now does send/recv self._attempts = 0 @@ -601,22 +601,23 @@ async def connect(self, force=False): now = await self.loop_time() self.last_received = now self.last_sent = now - if self._exit_task: - self._exit_task.cancel() - if self.state != State.CLOSING: - if not self._initialized or force: - try: - await asyncio.wait_for(self._cancel(), timeout=10.0) - except asyncio.TimeoutError: - pass - self.ws = await asyncio.wait_for( - connect(self.ws_url, **self._options), timeout=10.0 - ) - if self._receiving_task is None or self._receiving_task.done(): - self._receiving_task = asyncio.get_running_loop().create_task( - self._handler(self.ws) + async with self._lock: + if self._exit_task: + self._exit_task.cancel() + if self.state not in (State.OPEN, State.CONNECTING): + if not self._initialized or force: + try: + await asyncio.wait_for(self._cancel(), timeout=10.0) + except asyncio.TimeoutError: + pass + self.ws = await asyncio.wait_for( + connect(self.ws_url, **self._options), timeout=10.0 ) - self._initialized = True + if self._receiving_task is None or self._receiving_task.done(): + self._receiving_task = asyncio.get_running_loop().create_task( + self._handler(self.ws) + ) + self._initialized = True async def _handler(self, ws: ClientConnection): consumer_task = asyncio.create_task(self._start_receiving(ws)) @@ -669,10 +670,10 @@ async def _recv(self, recd) -> None: response = json.loads(recd) self.last_received = await self.loop_time() if "id" in response: - self._received[response["id"]] = response + self._received[response["id"]].set_result(response) self._in_use_ids.remove(response["id"]) elif "params" in response: - self._received[response["params"]["subscription"]] = response + self._received[response["params"]["subscription"]].set_result(response) else: raise KeyError(response) except ssl.SSLError: @@ -685,19 +686,26 @@ async def _start_receiving(self, ws: ClientConnection) -> Exception: async for recd in ws: await self._recv(recd) except Exception as e: - return e + for i in self._received.keys(): + self._received[i].set_exception(e) + return async def _start_sending(self, ws) -> Exception: + to_send = None try: while True: - logger.info("699 Not Empty") to_send = await self._sending.get() if self._log_raw_websockets: - raw_websocket_logger.debug(f"WEBSOCKET_SEND> {to_send}") + raw_websocket_logger.debug(f"WEBSOCKET_SEND> {to_send}}") await ws.send(json.dumps(to_send)) self.last_sent = await self.loop_time() except Exception as e: - return e + if to_send is not None: + self._received[to_send["id"]].set_exception(e) + else: + for i in self._received.keys(): + self._received[i].set_exception(e) + return async def send(self, payload: dict) -> str: """ @@ -715,8 +723,8 @@ async def send(self, payload: dict) -> str: while original_id in self._in_use_ids: original_id = get_next_id() self._in_use_ids.add(original_id) + self._received[original_id] = asyncio.get_running_loop().create_future() to_send = {**payload, **{"id": original_id}} - logger.info(f"Sending {to_send}") await self._sending.put(to_send) return original_id @@ -730,11 +738,12 @@ async def retrieve(self, item_id: int) -> Optional[dict]: Returns: retrieved item """ - try: - item = self._received.pop(item_id) + item: asyncio.Future = self._received.get(item_id) + if item.done(): self.max_subscriptions.release() - return item - except KeyError: + del self._received[item_id] + return item.result() + else: await asyncio.sleep(0.1) return None @@ -2263,16 +2272,9 @@ async def _make_rpc_request( subscription_added = False async with self.ws as ws: - if len(payloads) > 1: - send_coroutines = await asyncio.gather( - *[ws.send(item["payload"]) for item in payloads] - ) - for item_id, item in zip(send_coroutines, payloads): - request_manager.add_request(item_id, item["id"]) - else: - item = payloads[0] - item_id = await ws.send(item["payload"]) - request_manager.add_request(item_id, item["id"]) + for payload in payloads: + item_id = await ws.send(payload) + request_manager.add_request(item_id, payload["id"]) while True: for item_id in list(request_manager.response_map.keys()): @@ -2311,9 +2313,9 @@ async def _make_rpc_request( if request_manager.is_complete: break if ( - (current_time := await self.ws.loop_time()) - self.ws.last_received + (current_time := await ws.loop_time()) - ws.last_received >= self.retry_timeout - and current_time - self.ws.last_sent >= self.retry_timeout + and current_time - ws.last_sent >= self.retry_timeout ): if attempt >= self.max_retries: logger.error( @@ -2321,7 +2323,7 @@ async def _make_rpc_request( ) raise MaxRetriesExceeded("Max retries reached.") else: - self.ws.last_received = time.time() + self.ws.last_received = await ws.loop_time() await self.ws.connect(force=True) logger.warning( f"Timed out waiting for RPC requests. " From 7d8c5b477dee15fc0c26170fdf815111a35d5d05 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Fri, 1 Aug 2025 14:10:51 +0200 Subject: [PATCH 07/32] WIP check-in --- async_substrate_interface/async_substrate.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 6f40805..7f0cca8 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -696,7 +696,7 @@ async def _start_sending(self, ws) -> Exception: while True: to_send = await self._sending.get() if self._log_raw_websockets: - raw_websocket_logger.debug(f"WEBSOCKET_SEND> {to_send}}") + raw_websocket_logger.debug(f"WEBSOCKET_SEND> {to_send}") await ws.send(json.dumps(to_send)) self.last_sent = await self.loop_time() except Exception as e: @@ -2273,7 +2273,7 @@ async def _make_rpc_request( async with self.ws as ws: for payload in payloads: - item_id = await ws.send(payload) + item_id = await ws.send(payload["payload"]) request_manager.add_request(item_id, payload["id"]) while True: From 6cb12cf16cd176fcb500a535d3947b7fd04ad10f Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Fri, 1 Aug 2025 14:41:43 +0200 Subject: [PATCH 08/32] Okay. Seems to work well. --- async_substrate_interface/async_substrate.py | 93 +++++++++++--------- 1 file changed, 50 insertions(+), 43 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 7f0cca8..bd3c16d 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -664,30 +664,28 @@ async def shutdown(self): self._is_closing = False async def _recv(self, recd) -> None: - try: - if self._log_raw_websockets: - raw_websocket_logger.debug(f"WEBSOCKET_RECEIVE> {recd}") - response = json.loads(recd) - self.last_received = await self.loop_time() - if "id" in response: - self._received[response["id"]].set_result(response) - self._in_use_ids.remove(response["id"]) - elif "params" in response: - self._received[response["params"]["subscription"]].set_result(response) - else: - raise KeyError(response) - except ssl.SSLError: - raise ConnectionClosed - except (ConnectionClosed, KeyError): - raise + if self._log_raw_websockets: + raw_websocket_logger.debug(f"WEBSOCKET_RECEIVE> {recd}") + response = json.loads(recd) + self.last_received = await self.loop_time() + if "id" in response: + self._received[response["id"]].set_result(response) + self._in_use_ids.remove(response["id"]) + elif "params" in response: + self._received[response["params"]["subscription"]].set_result(response) + else: + raise KeyError(response) async def _start_receiving(self, ws: ClientConnection) -> Exception: try: async for recd in ws: await self._recv(recd) except Exception as e: + if isinstance(e, ssl.SSLError): + e = ConnectionClosed for i in self._received.keys(): self._received[i].set_exception(e) + self._received[i].cancel() return async def _start_sending(self, ws) -> Exception: @@ -702,9 +700,11 @@ async def _start_sending(self, ws) -> Exception: except Exception as e: if to_send is not None: self._received[to_send["id"]].set_exception(e) + self._received[to_send["id"]].cancel() else: for i in self._received.keys(): self._received[i].set_exception(e) + self._received[i].cancel() return async def send(self, payload: dict) -> str: @@ -2270,6 +2270,7 @@ async def _make_rpc_request( request_manager = RequestManager(payloads) subscription_added = False + should_retry = False async with self.ws as ws: for payload in payloads: @@ -2282,37 +2283,43 @@ async def _make_rpc_request( item_id not in request_manager.responses or asyncio.iscoroutinefunction(result_handler) ): - if response := await ws.retrieve(item_id): - if ( - asyncio.iscoroutinefunction(result_handler) - and not subscription_added - ): - # handles subscriptions, overwrites the previous mapping of {item_id : payload_id} - # with {subscription_id : payload_id} - try: - item_id = request_manager.overwrite_request( - item_id, response["result"] - ) - subscription_added = True - except KeyError: - raise SubstrateRequestException(str(response)) - decoded_response, complete = await self._process_response( - response, - item_id, - value_scale_type, - storage_item, - result_handler, - runtime=runtime, - force_legacy_decode=force_legacy_decode, - ) + try: + if response := await ws.retrieve(item_id): + if ( + asyncio.iscoroutinefunction(result_handler) + and not subscription_added + ): + # handles subscriptions, overwrites the previous mapping of {item_id : payload_id} + # with {subscription_id : payload_id} + try: + item_id = request_manager.overwrite_request( + item_id, response["result"] + ) + subscription_added = True + except KeyError: + raise SubstrateRequestException(str(response)) + ( + decoded_response, + complete, + ) = await self._process_response( + response, + item_id, + value_scale_type, + storage_item, + result_handler, + runtime=runtime, + force_legacy_decode=force_legacy_decode, + ) - request_manager.add_response( - item_id, decoded_response, complete - ) + request_manager.add_response( + item_id, decoded_response, complete + ) + except ConnectionClosed: + should_retry = True if request_manager.is_complete: break - if ( + if should_retry or ( (current_time := await ws.loop_time()) - ws.last_received >= self.retry_timeout and current_time - ws.last_sent >= self.retry_timeout From ad83f6524639dc4d13f40a873227f5bc7c2439a2 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Fri, 1 Aug 2025 14:42:41 +0200 Subject: [PATCH 09/32] TODO --- async_substrate_interface/async_substrate.py | 1 + 1 file changed, 1 insertion(+) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index bd3c16d..2ccb2cd 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -2324,6 +2324,7 @@ async def _make_rpc_request( >= self.retry_timeout and current_time - ws.last_sent >= self.retry_timeout ): + # TODO this retry logic should really live inside the Websocket if attempt >= self.max_retries: logger.error( f"Timed out waiting for RPC requests {attempt} times. Exiting." From 0144ee01c8450ab5fcb66a4f4dbf7012c19f40ca Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Fri, 1 Aug 2025 15:02:18 +0200 Subject: [PATCH 10/32] TODONE --- async_substrate_interface/async_substrate.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 2ccb2cd..9c50ebb 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -622,7 +622,6 @@ async def connect(self, force=False): async def _handler(self, ws: ClientConnection): consumer_task = asyncio.create_task(self._start_receiving(ws)) producer_task = asyncio.create_task(self._start_sending(ws)) - # TODO should attach futures and add exceptions to them done, pending = await asyncio.wait( [consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED, @@ -663,9 +662,9 @@ async def shutdown(self): self._receiving_task = None self._is_closing = False - async def _recv(self, recd) -> None: + async def _recv(self, recd: bytes) -> None: if self._log_raw_websockets: - raw_websocket_logger.debug(f"WEBSOCKET_RECEIVE> {recd}") + raw_websocket_logger.debug(f"WEBSOCKET_RECEIVE> {recd.decode()}") response = json.loads(recd) self.last_received = await self.loop_time() if "id" in response: @@ -678,8 +677,8 @@ async def _recv(self, recd) -> None: async def _start_receiving(self, ws: ClientConnection) -> Exception: try: - async for recd in ws: - await self._recv(recd) + while True: + await self._recv(await ws.recv(decode=False)) except Exception as e: if isinstance(e, ssl.SSLError): e = ConnectionClosed From a953c015f8e295d274ee401eb96edab2005411da Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Fri, 1 Aug 2025 15:45:25 +0200 Subject: [PATCH 11/32] True runtime independence --- async_substrate_interface/async_substrate.py | 11 ++++----- async_substrate_interface/sync_substrate.py | 21 +++++++++++++---- async_substrate_interface/types.py | 24 +++++++++++--------- 3 files changed, 33 insertions(+), 23 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 9c50ebb..7aeb0e4 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -8,7 +8,6 @@ import inspect import logging import ssl -import time import warnings from unittest.mock import AsyncMock from hashlib import blake2b @@ -19,7 +18,6 @@ Callable, Awaitable, cast, - TYPE_CHECKING, ) from bt_decode import MetadataV15, PortableRegistry, decode as decode_by_type_string @@ -32,7 +30,10 @@ MultiAccountId, ) from websockets.asyncio.client import connect, ClientConnection -from websockets.exceptions import ConnectionClosed, WebSocketException +from websockets.exceptions import ( + ConnectionClosed, + WebSocketException, +) from websockets.protocol import State from async_substrate_interface.errors import ( @@ -1142,12 +1143,8 @@ async def _get_runtime_for_version( f"Exported method Metadata_metadata_at_version is not found for {runtime_version}. This indicates the " f"block is quite old, decoding for this block will use legacy Python decoding." ) - implements_scale_info = metadata.portable_registry is not None runtime = Runtime( chain=self.chain, - runtime_config=self._runtime_config_copy( - implements_scale_info=implements_scale_info - ), metadata=metadata, type_registry=self.type_registry, metadata_v15=metadata_v15, diff --git a/async_substrate_interface/sync_substrate.py b/async_substrate_interface/sync_substrate.py index 927caf6..44fd158 100644 --- a/async_substrate_interface/sync_substrate.py +++ b/async_substrate_interface/sync_substrate.py @@ -774,6 +774,9 @@ def init_runtime( if block_id is not None: if runtime := self.runtime_cache.retrieve(block=block_id): + runtime.load_runtime() + if runtime.registry: + runtime.load_registry_type_map() self.runtime = runtime return self.runtime block_hash = self.get_block_hash(block_id) @@ -783,6 +786,9 @@ def init_runtime( else: self.last_block_hash = block_hash if runtime := self.runtime_cache.retrieve(block_hash=block_hash): + runtime.load_runtime() + if runtime.registry: + runtime.load_registry_type_map() self.runtime = runtime return self.runtime @@ -795,12 +801,17 @@ def init_runtime( if self.runtime and runtime_version == self.runtime.runtime_version: return self.runtime - if runtime := self.runtime_cache.retrieve(runtime_version=runtime_version): - self.runtime = runtime - return self.runtime + if ( + runtime := self.runtime_cache.retrieve(runtime_version=runtime_version) + ) is not None: + pass else: - self.runtime = self.get_runtime_for_version(runtime_version, block_hash) - return self.runtime + runtime = self.get_runtime_for_version(runtime_version, block_hash) + runtime.load_runtime() + if runtime.registry: + runtime.load_registry_type_map() + self.runtime = runtime + return self.runtime def get_runtime_for_version( self, runtime_version: int, block_hash: Optional[str] = None diff --git a/async_substrate_interface/types.py b/async_substrate_interface/types.py index bcf2fe1..e7c4d35 100644 --- a/async_substrate_interface/types.py +++ b/async_substrate_interface/types.py @@ -75,25 +75,25 @@ def retrieve( runtime = self.blocks.get(block) if runtime is not None: self.last_used = runtime - runtime.load_runtime() - if runtime.registry: - runtime.load_registry_type_map() + # runtime.load_runtime() + # if runtime.registry: + # runtime.load_registry_type_map() return runtime if block_hash is not None: runtime = self.block_hashes.get(block_hash) if runtime is not None: self.last_used = runtime - runtime.load_runtime() - if runtime.registry: - runtime.load_registry_type_map() + # runtime.load_runtime() + # if runtime.registry: + # runtime.load_registry_type_map() return runtime if runtime_version is not None: runtime = self.versions.get(runtime_version) if runtime is not None: self.last_used = runtime - runtime.load_runtime() - if runtime.registry: - runtime.load_registry_type_map() + # runtime.load_runtime() + # if runtime.registry: + # runtime.load_registry_type_map() return runtime return None @@ -119,9 +119,9 @@ class Runtime: def __init__( self, chain: str, - runtime_config: RuntimeConfigurationObject, metadata, type_registry, + runtime_config: Optional[RuntimeConfigurationObject] = None, metadata_v15=None, runtime_info=None, registry=None, @@ -131,13 +131,15 @@ def __init__( self.config = {} self.chain = chain self.type_registry = type_registry - self.runtime_config = runtime_config self.metadata = metadata self.metadata_v15 = metadata_v15 self.runtime_info = runtime_info self.registry = registry self.runtime_version = runtime_info.get("specVersion") self.transaction_version = runtime_info.get("transactionVersion") + self.runtime_config = runtime_config or RuntimeConfigurationObject( + implements_scale_info=self.implements_scaleinfo + ) self.load_runtime() if registry is not None: self.load_registry_type_map() From a335d4b98d13a6e7d3fcaa1b6697ce9f4f465487 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Fri, 1 Aug 2025 15:50:18 +0200 Subject: [PATCH 12/32] RuntimeCache improvements --- async_substrate_interface/types.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/async_substrate_interface/types.py b/async_substrate_interface/types.py index e7c4d35..bb2c89a 100644 --- a/async_substrate_interface/types.py +++ b/async_substrate_interface/types.py @@ -74,26 +74,28 @@ def retrieve( if block is not None: runtime = self.blocks.get(block) if runtime is not None: + if block_hash is not None: + # if lookup occurs for block_hash and block, but only block matches, also map to block_hash + self.add_item(runtime, block_hash=block_hash) self.last_used = runtime - # runtime.load_runtime() - # if runtime.registry: - # runtime.load_registry_type_map() return runtime if block_hash is not None: runtime = self.block_hashes.get(block_hash) if runtime is not None: + if block is not None: + # if lookup occurs for block_hash and block, but only block_hash matches, also map to block + self.add_item(runtime, block=block) self.last_used = runtime - # runtime.load_runtime() - # if runtime.registry: - # runtime.load_registry_type_map() return runtime if runtime_version is not None: runtime = self.versions.get(runtime_version) if runtime is not None: + # if runtime_version matches, also map to block and block_hash (if supplied) + if block is not None: + self.add_item(runtime, block=block) + if block_hash is not None: + self.add_item(runtime, block_hash=block_hash) self.last_used = runtime - # runtime.load_runtime() - # if runtime.registry: - # runtime.load_registry_type_map() return runtime return None From c1856e20a718e62d0b14e8d142766b37e1846ae9 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Fri, 1 Aug 2025 16:11:31 +0200 Subject: [PATCH 13/32] Tests --- async_substrate_interface/types.py | 1 + .../asyncio_/test_substrate_interface.py | 19 +++++++++++ .../sync/test_substrate_interface.py | 13 ++++++++ tests/unit_tests/test_types.py | 33 ++++++++++++++++++- 4 files changed, 65 insertions(+), 1 deletion(-) diff --git a/async_substrate_interface/types.py b/async_substrate_interface/types.py index bb2c89a..1549987 100644 --- a/async_substrate_interface/types.py +++ b/async_substrate_interface/types.py @@ -137,6 +137,7 @@ def __init__( self.metadata_v15 = metadata_v15 self.runtime_info = runtime_info self.registry = registry + runtime_info = runtime_info or {} self.runtime_version = runtime_info.get("specVersion") self.transaction_version = runtime_info.get("transactionVersion") self.runtime_config = runtime_config or RuntimeConfigurationObject( diff --git a/tests/unit_tests/asyncio_/test_substrate_interface.py b/tests/unit_tests/asyncio_/test_substrate_interface.py index 1ea30ef..817cdf3 100644 --- a/tests/unit_tests/asyncio_/test_substrate_interface.py +++ b/tests/unit_tests/asyncio_/test_substrate_interface.py @@ -6,6 +6,7 @@ from async_substrate_interface.async_substrate import AsyncSubstrateInterface from async_substrate_interface.types import ScaleObj +from tests.helpers.settings import ARCHIVE_ENTRYPOINT @pytest.mark.asyncio @@ -113,3 +114,21 @@ async def test_websocket_shutdown_timer(): await substrate.get_chain_head() await asyncio.sleep(6) # same sleep time as before assert substrate.ws._initialized is True # connection should still be open + + +@pytest.mark.asyncio +async def test_runtime_switching(): + block = 6067945 # block where a runtime switch happens + async with AsyncSubstrateInterface( + ARCHIVE_ENTRYPOINT, ss58_format=42, chain_name="Bittensor" + ) as substrate: + # assures we switch between the runtimes without error + assert await substrate.get_extrinsics(block_number=block - 20) is not None + assert await substrate.get_extrinsics(block_number=block) is not None + assert await substrate.get_extrinsics(block_number=block - 21) is not None + one, two = await asyncio.gather( + substrate.get_extrinsics(block_number=block - 22), + substrate.get_extrinsics(block_number=block + 1), + ) + assert one is not None + assert two is not None diff --git a/tests/unit_tests/sync/test_substrate_interface.py b/tests/unit_tests/sync/test_substrate_interface.py index ea6d7b5..284b8cb 100644 --- a/tests/unit_tests/sync/test_substrate_interface.py +++ b/tests/unit_tests/sync/test_substrate_interface.py @@ -3,6 +3,8 @@ from async_substrate_interface.sync_substrate import SubstrateInterface from async_substrate_interface.types import ScaleObj +from tests.helpers.settings import ARCHIVE_ENTRYPOINT + def test_runtime_call(monkeypatch): substrate = SubstrateInterface("ws://localhost", _mock=True) @@ -73,3 +75,14 @@ def test_runtime_call(monkeypatch): "state_call", ["SubstrateApi_SubstrateMethod", "", None] ) substrate.close() + + +def test_runtime_switching(): + block = 6067945 # block where a runtime switch happens + with SubstrateInterface( + ARCHIVE_ENTRYPOINT, ss58_format=42, chain_name="Bittensor" + ) as substrate: + # assures we switch between the runtimes without error + assert substrate.get_extrinsics(block_number=block - 20) is not None + assert substrate.get_extrinsics(block_number=block) is not None + assert substrate.get_extrinsics(block_number=block - 21) is not None diff --git a/tests/unit_tests/test_types.py b/tests/unit_tests/test_types.py index cba7b57..7292177 100644 --- a/tests/unit_tests/test_types.py +++ b/tests/unit_tests/test_types.py @@ -1,4 +1,4 @@ -from async_substrate_interface.types import ScaleObj +from async_substrate_interface.types import ScaleObj, Runtime, RuntimeCache def test_scale_object(): @@ -51,3 +51,34 @@ def test_scale_object(): assert inst_dict["a"] == 1 assert inst_dict["b"] == 2 assert [i for i in inst_dict] == ["a", "b"] + + +def test_runtime_cache(): + fake_block = 2 + fake_hash = "0xignore" + fake_version = 271 + + new_fake_block = 3 + newer_fake_block = 4 + + new_fake_hash = "0xnewfakehash" + + runtime = Runtime("", None, None) + runtime_cache = RuntimeCache() + # insert our Runtime object into the cache with a set block, hash, and version + runtime_cache.add_item(runtime, fake_block, fake_hash, fake_version) + + assert runtime_cache.retrieve(fake_block) is not None + # cache does not yet know that new_fake_block has the same runtime + assert runtime_cache.retrieve(new_fake_block) is None + assert ( + runtime_cache.retrieve(new_fake_block, runtime_version=fake_version) is not None + ) + # after checking the runtime with the new block, it now knows this runtime should also map to this block + assert runtime_cache.retrieve(new_fake_block) is not None + assert runtime_cache.retrieve(newer_fake_block) is None + assert runtime_cache.retrieve(newer_fake_block, fake_hash) is not None + assert runtime_cache.retrieve(newer_fake_block) is not None + assert runtime_cache.retrieve(block_hash=new_fake_hash) is None + assert runtime_cache.retrieve(fake_block, block_hash=new_fake_hash) is not None + assert runtime_cache.retrieve(block_hash=new_fake_hash) is not None From 88b6357a7e83c98266f87950236e0fa7195eae2f Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Fri, 1 Aug 2025 18:41:08 +0200 Subject: [PATCH 14/32] Needed to initiate the runtime config differently --- async_substrate_interface/async_substrate.py | 46 ++++++++++++++------ 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 7aeb0e4..0030639 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -21,7 +21,8 @@ ) from bt_decode import MetadataV15, PortableRegistry, decode as decode_by_type_string -from scalecodec.base import ScaleBytes, ScaleType +from scalecodec.base import ScaleBytes, ScaleType, RuntimeConfigurationObject +from scalecodec.type_registry import load_type_registry_preset from scalecodec.types import ( GenericCall, GenericExtrinsic, @@ -1113,6 +1114,10 @@ async def get_runtime_for_version( async def _get_runtime_for_version( self, runtime_version: int, block_hash: Optional[str] = None ) -> Runtime: + runtime_config = RuntimeConfigurationObject() + runtime_config.clear_type_registry() + runtime_config.update_type_registry(load_type_registry_preset(name="core")) + if not block_hash: block_hash, runtime_block_hash, block_number = await asyncio.gather( self.get_chain_head(), @@ -1126,7 +1131,11 @@ async def _get_runtime_for_version( ) runtime_info, metadata, (metadata_v15, registry) = await asyncio.gather( self.get_block_runtime_info(runtime_block_hash), - self.get_block_metadata(block_hash=runtime_block_hash, decode=True), + self.get_block_metadata( + block_hash=runtime_block_hash, + runtime_config=runtime_config, + decode=True, + ), self._load_registry_at_block(block_hash=runtime_block_hash), ) if metadata is None: @@ -1147,6 +1156,7 @@ async def _get_runtime_for_version( chain=self.chain, metadata=metadata, type_registry=self.type_registry, + runtime_config=runtime_config, metadata_v15=metadata_v15, runtime_info=runtime_info, registry=registry, @@ -2102,7 +2112,10 @@ async def _get_block_runtime_version_for(self, block_hash: str): return runtime_info["specVersion"] async def get_block_metadata( - self, block_hash: Optional[str] = None, decode: bool = True + self, + block_hash: Optional[str] = None, + runtime_config: Optional[RuntimeConfigurationObject] = None, + decode: bool = True, ) -> Optional[Union[dict, ScaleType]]: """ A pass-though to existing JSONRPC method `state_getMetadata`. @@ -2116,7 +2129,7 @@ async def get_block_metadata( from the server """ params = None - if decode and not self.runtime_config: + if decode and not runtime_config: raise ValueError( "Cannot decode runtime configuration without a supplied runtime_config" ) @@ -2129,7 +2142,7 @@ async def get_block_metadata( raise SubstrateRequestException(response["error"]["message"]) if (result := response.get("result")) and decode: - metadata_decoder = self.runtime_config.create_scale_object( + metadata_decoder = runtime_config.create_scale_object( "MetadataVersioned", data=ScaleBytes(result) ) metadata_decoder.decode() @@ -2645,10 +2658,12 @@ async def generate_signature_payload( tip: int = 0, tip_asset_id: Optional[int] = None, include_call_length: bool = False, + runtime: Optional[Runtime] = None, ) -> ScaleBytes: # Retrieve genesis hash genesis_hash = await self.get_block_hash(0) - runtime = await self.init_runtime(block_hash=None) + if runtime is None: + runtime = await self.init_runtime(block_hash=None) if not era: era = "00" @@ -2759,7 +2774,7 @@ async def generate_signature_payload( ) if include_call_length: - length_obj = self.runtime_config.create_scale_object("Bytes") + length_obj = runtime.runtime_config.create_scale_object("Bytes") call_data = str(length_obj.encode(str(call.data))) else: @@ -2855,7 +2870,12 @@ async def create_signed_extrinsic( else: # Create signature payload signature_payload = await self.generate_signature_payload( - call=call, era=era, nonce=nonce, tip=tip, tip_asset_id=tip_asset_id + call=call, + era=era, + nonce=nonce, + tip=tip, + tip_asset_id=tip_asset_id, + runtime=runtime, ) # Set Signature version to crypto type of keypair @@ -2867,7 +2887,7 @@ async def create_signed_extrinsic( signature = await signature # Create extrinsic - extrinsic = self.runtime_config.create_scale_object( + extrinsic = runtime.runtime_config.create_scale_object( type_string="Extrinsic", metadata=runtime.metadata ) @@ -2907,7 +2927,7 @@ async def create_unsigned_extrinsic(self, call: GenericCall) -> GenericExtrinsic runtime = await self.init_runtime() # Create extrinsic - extrinsic = self.runtime_config.create_scale_object( + extrinsic = runtime.runtime_config.create_scale_object( type_string="Extrinsic", metadata=runtime.metadata ) @@ -3018,10 +3038,10 @@ async def runtime_call( try: if runtime.metadata_v15 is None: - _ = self.runtime_config.type_registry["runtime_api"][api]["methods"][ + _ = runtime.runtime_config.type_registry["runtime_api"][api]["methods"][ method ] - runtime_api_types = self.runtime_config.type_registry["runtime_api"][ + runtime_api_types = runtime.runtime_config.type_registry["runtime_api"][ api ].get("types", {}) runtime.runtime_config.update_type_registry_types(runtime_api_types) @@ -3288,7 +3308,7 @@ async def get_type_registry( else: type_string = f"scale_info::{scale_info_type.value['id']}" - scale_cls = self.runtime_config.get_decoder_class(type_string) + scale_cls = runtime.runtime_config.get_decoder_class(type_string) type_registry[type_string] = scale_cls.generate_type_decomposition( max_recursion=max_recursion ) From 8bb565323795e937c36e1e70fcce065216ffbf98 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Sun, 3 Aug 2025 21:13:43 +0200 Subject: [PATCH 15/32] WIP --- async_substrate_interface/async_substrate.py | 43 +++++++++++++------- async_substrate_interface/types.py | 6 +-- 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 0030639..58eaa46 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -543,6 +543,7 @@ def __init__( self.max_connections = max_connections self.shutdown_timer = shutdown_timer self._received: dict[str, asyncio.Future] = {} + self._received_subscriptions: dict[str, asyncio.Queue] = {} self._sending = asyncio.Queue() self._receiving_task = None # TODO rename, as this now does send/recv self._attempts = 0 @@ -673,7 +674,8 @@ async def _recv(self, recd: bytes) -> None: self._received[response["id"]].set_result(response) self._in_use_ids.remove(response["id"]) elif "params" in response: - self._received[response["params"]["subscription"]].set_result(response) + sub_id = response["params"]["subscription"] + await self._received_subscriptions[sub_id].put(response) else: raise KeyError(response) @@ -708,6 +710,9 @@ async def _start_sending(self, ws) -> Exception: self._received[i].cancel() return + async def add_subscription(self, subscription_id: str) -> None: + self._received_subscriptions[subscription_id] = asyncio.Queue() + async def send(self, payload: dict) -> str: """ Sends a payload to the websocket connection. @@ -729,7 +734,7 @@ async def send(self, payload: dict) -> str: await self._sending.put(to_send) return original_id - async def retrieve(self, item_id: int) -> Optional[dict]: + async def retrieve(self, item_id: str) -> Optional[dict]: """ Retrieves a single item from received responses dict queue @@ -739,14 +744,20 @@ async def retrieve(self, item_id: int) -> Optional[dict]: Returns: retrieved item """ - item: asyncio.Future = self._received.get(item_id) - if item.done(): - self.max_subscriptions.release() - del self._received[item_id] - return item.result() + item: Optional[asyncio.Future] = self._received.get(item_id) + if item is not None: + if item.done(): + self.max_subscriptions.release() + del self._received[item_id] + return item.result() else: - await asyncio.sleep(0.1) - return None + try: + return self._received_subscriptions[item_id].get_nowait() + # TODO make sure to delete during unsubscribe + except asyncio.QueueEmpty: + pass + await asyncio.sleep(0.1) + return None class AsyncSubstrateInterface(SubstrateMixin): @@ -2304,6 +2315,7 @@ async def _make_rpc_request( item_id = request_manager.overwrite_request( item_id, response["result"] ) + await ws.add_subscription(response["result"]) subscription_added = True except KeyError: raise SubstrateRequestException(str(response)) @@ -2347,12 +2359,13 @@ async def _make_rpc_request( f"Retrying attempt {attempt + 1} of {self.max_retries}" ) return await self._make_rpc_request( - payloads, - value_scale_type, - storage_item, - result_handler, - attempt + 1, - force_legacy_decode, + payloads=payloads, + value_scale_type=value_scale_type, + storage_item=storage_item, + result_handler=result_handler, + attempt=attempt + 1, + runtime=runtime, + force_legacy_decode=force_legacy_decode, ) return request_manager.get_results() diff --git a/async_substrate_interface/types.py b/async_substrate_interface/types.py index 1549987..1c064d6 100644 --- a/async_substrate_interface/types.py +++ b/async_substrate_interface/types.py @@ -377,13 +377,13 @@ def __init__(self, payloads): self.responses = defaultdict(lambda: {"complete": False, "results": []}) self.payloads_count = len(payloads) - def add_request(self, item_id: Union[int, str], request_id: Any): + def add_request(self, item_id: str, request_id: Any): """ Adds an outgoing request to the responses map for later retrieval """ self.response_map[item_id] = request_id - def overwrite_request(self, item_id: int, request_id: Any): + def overwrite_request(self, item_id: str, request_id: Any): """ Overwrites an existing request in the responses map with a new request_id. This is used for multipart responses that generate a subscription id we need to watch, rather than the initial @@ -392,7 +392,7 @@ def overwrite_request(self, item_id: int, request_id: Any): self.response_map[request_id] = self.response_map.pop(item_id) return request_id - def add_response(self, item_id: int, response: dict, complete: bool): + def add_response(self, item_id: str, response: dict, complete: bool): """ Maps a response to the request for later retrieval """ From 5e52dedca86b533aac9c816d8f7a2ea9e5a7bbb5 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Sun, 3 Aug 2025 21:16:49 +0200 Subject: [PATCH 16/32] =?UTF-8?q?Specify=20args=20by=20keyword=20=E2=80=94?= =?UTF-8?q?=20during=20restarts=20`force=5Flegacy=5Fdecode`=20was=20passed?= =?UTF-8?q?=20as=20`runtime`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- async_substrate_interface/async_substrate.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index f609bfe..0230f7d 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -2323,12 +2323,13 @@ async def _make_rpc_request( f"Retrying attempt {attempt + 1} of {self.max_retries}" ) return await self._make_rpc_request( - payloads, - value_scale_type, - storage_item, - result_handler, - attempt + 1, - force_legacy_decode, + payloads=payloads, + value_scale_type=value_scale_type, + storage_item=storage_item, + result_handler=result_handler, + attempt=attempt + 1, + runtime=runtime, + force_legacy_decode=force_legacy_decode, ) return request_manager.get_results() From e7414719b8eebd1d394ac2ce20f21f7f370f7ee2 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Sun, 3 Aug 2025 21:43:38 +0200 Subject: [PATCH 17/32] Error handling --- async_substrate_interface/async_substrate.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 58eaa46..782e1f7 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -686,9 +686,10 @@ async def _start_receiving(self, ws: ClientConnection) -> Exception: except Exception as e: if isinstance(e, ssl.SSLError): e = ConnectionClosed - for i in self._received.keys(): - self._received[i].set_exception(e) - self._received[i].cancel() + for fut in self._received.values(): + if not fut.done(): + fut.set_exception(e) + fut.cancel() return async def _start_sending(self, ws) -> Exception: From 55a84579ac67b0f04e9048f2632e2d841f88ca60 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Sun, 3 Aug 2025 21:56:31 +0200 Subject: [PATCH 18/32] TODOs --- async_substrate_interface/async_substrate.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 782e1f7..84bbaad 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -2289,6 +2289,8 @@ async def _make_rpc_request( force_legacy_decode: bool = False, ) -> RequestManager.RequestResults: request_manager = RequestManager(payloads) + # TODO maybe instead of the current logic, I should assign the futs during send() and then just + # TODO use that to determine when it's completed. But how would this work with subscriptions? subscription_added = False should_retry = False @@ -2341,6 +2343,7 @@ async def _make_rpc_request( if request_manager.is_complete: break + # TODO I sometimes get timeouts immediately. Why? if should_retry or ( (current_time := await ws.loop_time()) - ws.last_received >= self.retry_timeout From 78cf8c26e84dba4fe7f3a4d4ca830c099086052c Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Sun, 3 Aug 2025 21:58:38 +0200 Subject: [PATCH 19/32] TODO --- async_substrate_interface/async_substrate.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 84bbaad..3b7ba8b 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -696,6 +696,9 @@ async def _start_sending(self, ws) -> Exception: to_send = None try: while True: + # TODO possibly when these are pulled from the Queue, they should also go into a dict or set, with the + # TODO done_callback assigned to remove them when complete. This could allow easier resending in cases + # TODO such as a timeout. to_send = await self._sending.get() if self._log_raw_websockets: raw_websocket_logger.debug(f"WEBSOCKET_SEND> {to_send}") From 8f54649628768c3c740dd14edc030f09b9d8e81a Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Mon, 4 Aug 2025 13:01:51 +0200 Subject: [PATCH 20/32] WIP --- async_substrate_interface/async_substrate.py | 58 ++++++++++++++------ 1 file changed, 41 insertions(+), 17 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 3b7ba8b..9be3e83 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -524,6 +524,7 @@ def __init__( shutdown_timer=5, options: Optional[dict] = None, _log_raw_websockets: bool = False, + retry_timeout: float = 60.0 ): """ Websocket manager object. Allows for the use of a single websocket connection by multiple @@ -542,10 +543,12 @@ def __init__( self.max_subscriptions = asyncio.Semaphore(max_subscriptions) self.max_connections = max_connections self.shutdown_timer = shutdown_timer + self.retry_timeout = retry_timeout self._received: dict[str, asyncio.Future] = {} self._received_subscriptions: dict[str, asyncio.Queue] = {} self._sending = asyncio.Queue() - self._receiving_task = None # TODO rename, as this now does send/recv + self._send_recv_task = None + self._inflight: dict[str, str] = {} self._attempts = 0 self._initialized = False # TODO remove self._lock = asyncio.Lock() @@ -586,8 +589,8 @@ async def loop_time() -> float: async def _cancel(self): try: - self._receiving_task.cancel() - await self._receiving_task + self._send_recv_task.cancel() + await self._send_recv_task await self.ws.close() except ( AttributeError, @@ -601,13 +604,14 @@ async def _cancel(self): ) async def connect(self, force=False): + # TODO after connecting, move from _inflight to the queue now = await self.loop_time() self.last_received = now self.last_sent = now async with self._lock: if self._exit_task: self._exit_task.cancel() - if self.state not in (State.OPEN, State.CONNECTING): + if self.state not in (State.OPEN, State.CONNECTING) or force: if not self._initialized or force: try: await asyncio.wait_for(self._cancel(), timeout=10.0) @@ -616,21 +620,34 @@ async def connect(self, force=False): self.ws = await asyncio.wait_for( connect(self.ws_url, **self._options), timeout=10.0 ) - if self._receiving_task is None or self._receiving_task.done(): - self._receiving_task = asyncio.get_running_loop().create_task( + if self._send_recv_task is None or self._send_recv_task.done(): + self._send_recv_task = asyncio.get_running_loop().create_task( self._handler(self.ws) ) self._initialized = True - async def _handler(self, ws: ClientConnection): - consumer_task = asyncio.create_task(self._start_receiving(ws)) - producer_task = asyncio.create_task(self._start_sending(ws)) + async def _handler(self, ws: ClientConnection) -> None: + recv_task = asyncio.create_task(self._start_receiving(ws)) + send_task = asyncio.create_task(self._start_sending(ws)) done, pending = await asyncio.wait( - [consumer_task, producer_task], + [recv_task, send_task], return_when=asyncio.FIRST_COMPLETED, ) + loop = asyncio.get_running_loop() + should_reconnect = False for task in pending: task.cancel() + if isinstance(task.exception(), asyncio.TimeoutError): + should_reconnect = True + if should_reconnect is True: + for original_id, payload in list(self._inflight.items()): + self._received[original_id] = loop.create_future() + to_send = json.loads(payload) + await self._sending.put(to_send) + logger.info("Timeout occurred. Reconnecting.") + await self.connect(True) + await self._handler(ws=ws) + async def __aexit__(self, exc_type, exc_val, exc_tb): if not self.state != State.CONNECTING: @@ -662,7 +679,7 @@ async def shutdown(self): pass self.ws = None self._initialized = False - self._receiving_task = None + self._send_recv_task = None self._is_closing = False async def _recv(self, recd: bytes) -> None: @@ -671,9 +688,12 @@ async def _recv(self, recd: bytes) -> None: response = json.loads(recd) self.last_received = await self.loop_time() if "id" in response: + async with self._lock: + self._inflight.pop(response["id"]) self._received[response["id"]].set_result(response) self._in_use_ids.remove(response["id"]) elif "params" in response: + # TODO self._inflight won't work with subscriptions sub_id = response["params"]["subscription"] await self._received_subscriptions[sub_id].put(response) else: @@ -682,7 +702,9 @@ async def _recv(self, recd: bytes) -> None: async def _start_receiving(self, ws: ClientConnection) -> Exception: try: while True: - await self._recv(await ws.recv(decode=False)) + if self._inflight: + recd = await asyncio.wait_for(ws.recv(decode=False), timeout=self.retry_timeout) + await self._recv(recd) except Exception as e: if isinstance(e, ssl.SSLError): e = ConnectionClosed @@ -696,13 +718,14 @@ async def _start_sending(self, ws) -> Exception: to_send = None try: while True: - # TODO possibly when these are pulled from the Queue, they should also go into a dict or set, with the - # TODO done_callback assigned to remove them when complete. This could allow easier resending in cases - # TODO such as a timeout. - to_send = await self._sending.get() + to_send_ = await self._sending.get() + send_id = to_send_["id"] + to_send = json.dumps(to_send_) + async with self._lock: + self._inflight[send_id] = to_send if self._log_raw_websockets: raw_websocket_logger.debug(f"WEBSOCKET_SEND> {to_send}") - await ws.send(json.dumps(to_send)) + await ws.send(to_send) self.last_sent = await self.loop_time() except Exception as e: if to_send is not None: @@ -824,6 +847,7 @@ def __init__( "write_limit": 2**16, }, shutdown_timer=ws_shutdown_timer, + retry_timeout=self.retry_timeout, ) else: self.ws = AsyncMock(spec=Websocket) From 07663a3d96b34d31dd853c171789ad4d3215d732 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Mon, 4 Aug 2025 13:28:28 +0200 Subject: [PATCH 21/32] WIP check-in --- async_substrate_interface/async_substrate.py | 109 ++++++++----------- 1 file changed, 43 insertions(+), 66 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 9be3e83..997785d 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -604,7 +604,7 @@ async def _cancel(self): ) async def connect(self, force=False): - # TODO after connecting, move from _inflight to the queue + logger.debug("Connecting.") now = await self.loop_time() self.last_received = now self.last_sent = now @@ -620,25 +620,30 @@ async def connect(self, force=False): self.ws = await asyncio.wait_for( connect(self.ws_url, **self._options), timeout=10.0 ) + logger.debug("Connected.") if self._send_recv_task is None or self._send_recv_task.done(): self._send_recv_task = asyncio.get_running_loop().create_task( self._handler(self.ws) ) + logger.debug("Recd task started.") self._initialized = True async def _handler(self, ws: ClientConnection) -> None: recv_task = asyncio.create_task(self._start_receiving(ws)) send_task = asyncio.create_task(self._start_sending(ws)) + logger.debug("Starting send/recv tasks.") done, pending = await asyncio.wait( [recv_task, send_task], return_when=asyncio.FIRST_COMPLETED, ) + logger.debug("send/recv tasks done.") loop = asyncio.get_running_loop() should_reconnect = False for task in pending: task.cancel() - if isinstance(task.exception(), asyncio.TimeoutError): - should_reconnect = True + if isinstance(recv_task.exception(), asyncio.TimeoutError): + # TODO check the logic here + should_reconnect = True if should_reconnect is True: for original_id, payload in list(self._inflight.items()): self._received[original_id] = loop.create_future() @@ -648,7 +653,6 @@ async def _handler(self, ws: ClientConnection) -> None: await self.connect(True) await self._handler(ws=ws) - async def __aexit__(self, exc_type, exc_val, exc_tb): if not self.state != State.CONNECTING: if self._exit_task is not None: @@ -705,6 +709,8 @@ async def _start_receiving(self, ws: ClientConnection) -> Exception: if self._inflight: recd = await asyncio.wait_for(ws.recv(decode=False), timeout=self.retry_timeout) await self._recv(recd) + else: + await asyncio.sleep(0.1) except Exception as e: if isinstance(e, ssl.SSLError): e = ConnectionClosed @@ -719,6 +725,7 @@ async def _start_sending(self, ws) -> Exception: try: while True: to_send_ = await self._sending.get() + logger.debug(f"Pulled {to_send_} from the queue") send_id = to_send_["id"] to_send = json.dumps(to_send_) async with self._lock: @@ -751,6 +758,7 @@ async def send(self, payload: dict) -> str: id: the internal ID of the request (incremented int) """ await self.max_subscriptions.acquire() + logger.debug(f"Sending payload: {payload}") async with self._lock: original_id = get_next_id() while original_id in self._in_use_ids: @@ -759,6 +767,7 @@ async def send(self, payload: dict) -> str: self._received[original_id] = asyncio.get_running_loop().create_future() to_send = {**payload, **{"id": original_id}} await self._sending.put(to_send) + logger.debug("767 queue put") return original_id async def retrieve(self, item_id: str) -> Optional[dict]: @@ -2320,7 +2329,6 @@ async def _make_rpc_request( # TODO use that to determine when it's completed. But how would this work with subscriptions? subscription_added = False - should_retry = False async with self.ws as ws: for payload in payloads: @@ -2333,71 +2341,40 @@ async def _make_rpc_request( item_id not in request_manager.responses or asyncio.iscoroutinefunction(result_handler) ): - try: - if response := await ws.retrieve(item_id): - if ( - asyncio.iscoroutinefunction(result_handler) - and not subscription_added - ): - # handles subscriptions, overwrites the previous mapping of {item_id : payload_id} - # with {subscription_id : payload_id} - try: - item_id = request_manager.overwrite_request( - item_id, response["result"] - ) - await ws.add_subscription(response["result"]) - subscription_added = True - except KeyError: - raise SubstrateRequestException(str(response)) - ( - decoded_response, - complete, - ) = await self._process_response( - response, - item_id, - value_scale_type, - storage_item, - result_handler, - runtime=runtime, - force_legacy_decode=force_legacy_decode, - ) + if response := await ws.retrieve(item_id): + if ( + asyncio.iscoroutinefunction(result_handler) + and not subscription_added + ): + # handles subscriptions, overwrites the previous mapping of {item_id : payload_id} + # with {subscription_id : payload_id} + try: + item_id = request_manager.overwrite_request( + item_id, response["result"] + ) + await ws.add_subscription(response["result"]) + subscription_added = True + except KeyError: + raise SubstrateRequestException(str(response)) + ( + decoded_response, + complete, + ) = await self._process_response( + response, + item_id, + value_scale_type, + storage_item, + result_handler, + runtime=runtime, + force_legacy_decode=force_legacy_decode, + ) - request_manager.add_response( - item_id, decoded_response, complete - ) - except ConnectionClosed: - should_retry = True + request_manager.add_response( + item_id, decoded_response, complete + ) if request_manager.is_complete: break - # TODO I sometimes get timeouts immediately. Why? - if should_retry or ( - (current_time := await ws.loop_time()) - ws.last_received - >= self.retry_timeout - and current_time - ws.last_sent >= self.retry_timeout - ): - # TODO this retry logic should really live inside the Websocket - if attempt >= self.max_retries: - logger.error( - f"Timed out waiting for RPC requests {attempt} times. Exiting." - ) - raise MaxRetriesExceeded("Max retries reached.") - else: - self.ws.last_received = await ws.loop_time() - await self.ws.connect(force=True) - logger.warning( - f"Timed out waiting for RPC requests. " - f"Retrying attempt {attempt + 1} of {self.max_retries}" - ) - return await self._make_rpc_request( - payloads=payloads, - value_scale_type=value_scale_type, - storage_item=storage_item, - result_handler=result_handler, - attempt=attempt + 1, - runtime=runtime, - force_legacy_decode=force_legacy_decode, - ) return request_manager.get_results() From d89b230c24cf5cd713e389d26df7fe0345d16ab8 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Mon, 4 Aug 2025 16:55:09 +0200 Subject: [PATCH 22/32] WIP check-in --- async_substrate_interface/async_substrate.py | 52 +++++++++++++------- async_substrate_interface/types.py | 4 +- 2 files changed, 37 insertions(+), 19 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 997785d..ea252b8 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -636,7 +636,7 @@ async def _handler(self, ws: ClientConnection) -> None: [recv_task, send_task], return_when=asyncio.FIRST_COMPLETED, ) - logger.debug("send/recv tasks done.") + logger.debug(f"send/recv tasks done: {done}\n{pending}") loop = asyncio.get_running_loop() should_reconnect = False for task in pending: @@ -652,6 +652,10 @@ async def _handler(self, ws: ClientConnection) -> None: logger.info("Timeout occurred. Reconnecting.") await self.connect(True) await self._handler(ws=ws) + elif isinstance(e := recv_task.result(), Exception): + return e + elif isinstance(e := send_task.result(), Exception): + return e async def __aexit__(self, exc_type, exc_val, exc_tb): if not self.state != State.CONNECTING: @@ -699,6 +703,7 @@ async def _recv(self, recd: bytes) -> None: elif "params" in response: # TODO self._inflight won't work with subscriptions sub_id = response["params"]["subscription"] + logger.debug(f"Adding {sub_id} to subscriptions.") await self._received_subscriptions[sub_id].put(response) else: raise KeyError(response) @@ -706,19 +711,17 @@ async def _recv(self, recd: bytes) -> None: async def _start_receiving(self, ws: ClientConnection) -> Exception: try: while True: - if self._inflight: - recd = await asyncio.wait_for(ws.recv(decode=False), timeout=self.retry_timeout) - await self._recv(recd) - else: - await asyncio.sleep(0.1) + recd = await asyncio.wait_for(ws.recv(decode=False), timeout=self.retry_timeout) + await self._recv(recd) except Exception as e: + logger.exception("Start receving exception", exc_info=e) if isinstance(e, ssl.SSLError): e = ConnectionClosed for fut in self._received.values(): if not fut.done(): fut.set_exception(e) fut.cancel() - return + return e async def _start_sending(self, ws) -> Exception: to_send = None @@ -742,9 +745,10 @@ async def _start_sending(self, ws) -> Exception: for i in self._received.keys(): self._received[i].set_exception(e) self._received[i].cancel() - return + return e async def add_subscription(self, subscription_id: str) -> None: + logger.debug(f"Adding {subscription_id} to subscriptions.") self._received_subscriptions[subscription_id] = asyncio.Queue() async def send(self, payload: dict) -> str: @@ -770,6 +774,22 @@ async def send(self, payload: dict) -> str: logger.debug("767 queue put") return original_id + async def unsubscribe(self, subscription_id: str) -> None: + """ + Unwatches a watched extrinsic subscription. + + Args: + subscription_id: the internal ID of the subscription (typically a hex string) + """ + async with self._lock: + original_id = get_next_id() + while original_id in self._in_use_ids: + original_id = get_next_id() + del self._received_subscriptions[subscription_id] + + to_send = {"jsonrpc": "2.0", "method": "author_unwatchExtrinsic", "params": [subscription_id]} + await self._sending.put(to_send) + async def retrieve(self, item_id: str) -> Optional[dict]: """ Retrieves a single item from received responses dict queue @@ -789,9 +809,11 @@ async def retrieve(self, item_id: str) -> Optional[dict]: else: try: return self._received_subscriptions[item_id].get_nowait() - # TODO make sure to delete during unsubscribe except asyncio.QueueEmpty: pass + if self._send_recv_task.done(): + if isinstance(e := self._send_recv_task.result(), Exception): + raise e await asyncio.sleep(0.1) return None @@ -3776,10 +3798,8 @@ async def result_handler(message: dict, subscription_id) -> tuple[dict, bool]: } if "finalized" in message_result and wait_for_finalization: - # Created as a task because we don't actually care about the result - self._forgettable_task = asyncio.create_task( - self.rpc_request("author_unwatchExtrinsic", [subscription_id]) - ) + async with self.ws as ws: + await ws.unsubscribe(subscription_id) return { "block_hash": message_result["finalized"], "extrinsic_hash": "0x{}".format(extrinsic.extrinsic_hash.hex()), @@ -3790,10 +3810,8 @@ async def result_handler(message: dict, subscription_id) -> tuple[dict, bool]: and wait_for_inclusion and not wait_for_finalization ): - # Created as a task because we don't actually care about the result - self._forgettable_task = asyncio.create_task( - self.rpc_request("author_unwatchExtrinsic", [subscription_id]) - ) + async with self.ws as ws: + await ws.unsubscribe(subscription_id) return { "block_hash": message_result["inblock"], "extrinsic_hash": "0x{}".format(extrinsic.extrinsic_hash.hex()), diff --git a/async_substrate_interface/types.py b/async_substrate_interface/types.py index 1c064d6..f1efbc3 100644 --- a/async_substrate_interface/types.py +++ b/async_substrate_interface/types.py @@ -377,13 +377,13 @@ def __init__(self, payloads): self.responses = defaultdict(lambda: {"complete": False, "results": []}) self.payloads_count = len(payloads) - def add_request(self, item_id: str, request_id: Any): + def add_request(self, item_id: str, request_id: str): """ Adds an outgoing request to the responses map for later retrieval """ self.response_map[item_id] = request_id - def overwrite_request(self, item_id: str, request_id: Any): + def overwrite_request(self, item_id: str, request_id: str): """ Overwrites an existing request in the responses map with a new request_id. This is used for multipart responses that generate a subscription id we need to watch, rather than the initial From 8ce18e2b747e4c182c6aa4d35d489ab92f012c09 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Mon, 4 Aug 2025 17:14:23 +0200 Subject: [PATCH 23/32] Remove debug logs --- async_substrate_interface/async_substrate.py | 20 +++----------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index ea252b8..620adb1 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -9,6 +9,7 @@ import logging import ssl import warnings +from contextlib import suppress from unittest.mock import AsyncMock from hashlib import blake2b from typing import ( @@ -524,7 +525,7 @@ def __init__( shutdown_timer=5, options: Optional[dict] = None, _log_raw_websockets: bool = False, - retry_timeout: float = 60.0 + retry_timeout: float = 60.0, ): """ Websocket manager object. Allows for the use of a single websocket connection by multiple @@ -604,7 +605,6 @@ async def _cancel(self): ) async def connect(self, force=False): - logger.debug("Connecting.") now = await self.loop_time() self.last_received = now self.last_sent = now @@ -620,28 +620,24 @@ async def connect(self, force=False): self.ws = await asyncio.wait_for( connect(self.ws_url, **self._options), timeout=10.0 ) - logger.debug("Connected.") if self._send_recv_task is None or self._send_recv_task.done(): self._send_recv_task = asyncio.get_running_loop().create_task( self._handler(self.ws) ) - logger.debug("Recd task started.") self._initialized = True async def _handler(self, ws: ClientConnection) -> None: recv_task = asyncio.create_task(self._start_receiving(ws)) send_task = asyncio.create_task(self._start_sending(ws)) - logger.debug("Starting send/recv tasks.") done, pending = await asyncio.wait( [recv_task, send_task], return_when=asyncio.FIRST_COMPLETED, ) - logger.debug(f"send/recv tasks done: {done}\n{pending}") loop = asyncio.get_running_loop() should_reconnect = False for task in pending: task.cancel() - if isinstance(recv_task.exception(), asyncio.TimeoutError): + if isinstance(recv_task.result(), asyncio.TimeoutError): # TODO check the logic here should_reconnect = True if should_reconnect is True: @@ -680,7 +676,6 @@ async def _exit_with_timer(self): pass async def shutdown(self): - self._is_closing = True try: await asyncio.wait_for(self._cancel(), timeout=10.0) except asyncio.TimeoutError: @@ -688,7 +683,6 @@ async def shutdown(self): self.ws = None self._initialized = False self._send_recv_task = None - self._is_closing = False async def _recv(self, recd: bytes) -> None: if self._log_raw_websockets: @@ -728,7 +722,6 @@ async def _start_sending(self, ws) -> Exception: try: while True: to_send_ = await self._sending.get() - logger.debug(f"Pulled {to_send_} from the queue") send_id = to_send_["id"] to_send = json.dumps(to_send_) async with self._lock: @@ -747,10 +740,6 @@ async def _start_sending(self, ws) -> Exception: self._received[i].cancel() return e - async def add_subscription(self, subscription_id: str) -> None: - logger.debug(f"Adding {subscription_id} to subscriptions.") - self._received_subscriptions[subscription_id] = asyncio.Queue() - async def send(self, payload: dict) -> str: """ Sends a payload to the websocket connection. @@ -762,7 +751,6 @@ async def send(self, payload: dict) -> str: id: the internal ID of the request (incremented int) """ await self.max_subscriptions.acquire() - logger.debug(f"Sending payload: {payload}") async with self._lock: original_id = get_next_id() while original_id in self._in_use_ids: @@ -771,7 +759,6 @@ async def send(self, payload: dict) -> str: self._received[original_id] = asyncio.get_running_loop().create_future() to_send = {**payload, **{"id": original_id}} await self._sending.put(to_send) - logger.debug("767 queue put") return original_id async def unsubscribe(self, subscription_id: str) -> None: @@ -2374,7 +2361,6 @@ async def _make_rpc_request( item_id = request_manager.overwrite_request( item_id, response["result"] ) - await ws.add_subscription(response["result"]) subscription_added = True except KeyError: raise SubstrateRequestException(str(response)) From 74cf005fa694a82971857aa6f0e62f56fa000505 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Mon, 4 Aug 2025 17:14:45 +0200 Subject: [PATCH 24/32] Fix subscription logic --- async_substrate_interface/async_substrate.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 620adb1..d58321b 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -692,12 +692,15 @@ async def _recv(self, recd: bytes) -> None: if "id" in response: async with self._lock: self._inflight.pop(response["id"]) - self._received[response["id"]].set_result(response) - self._in_use_ids.remove(response["id"]) + with suppress(KeyError): + # These would be subscriptions that were unsubscribed + self._received[response["id"]].set_result(response) + self._in_use_ids.remove(response["id"]) elif "params" in response: # TODO self._inflight won't work with subscriptions sub_id = response["params"]["subscription"] - logger.debug(f"Adding {sub_id} to subscriptions.") + if sub_id not in self._received_subscriptions: + self._received_subscriptions[sub_id] = asyncio.Queue() await self._received_subscriptions[sub_id].put(response) else: raise KeyError(response) @@ -705,7 +708,9 @@ async def _recv(self, recd: bytes) -> None: async def _start_receiving(self, ws: ClientConnection) -> Exception: try: while True: - recd = await asyncio.wait_for(ws.recv(decode=False), timeout=self.retry_timeout) + recd = await asyncio.wait_for( + ws.recv(decode=False), timeout=self.retry_timeout + ) await self._recv(recd) except Exception as e: logger.exception("Start receving exception", exc_info=e) @@ -774,7 +779,12 @@ async def unsubscribe(self, subscription_id: str) -> None: original_id = get_next_id() del self._received_subscriptions[subscription_id] - to_send = {"jsonrpc": "2.0", "method": "author_unwatchExtrinsic", "params": [subscription_id]} + to_send = { + "jsonrpc": "2.0", + "id": original_id, + "method": "author_unwatchExtrinsic", + "params": [subscription_id], + } await self._sending.put(to_send) async def retrieve(self, item_id: str) -> Optional[dict]: From 37a52b9d8bc7b72068ca898d8ca2d97a6899bf87 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Mon, 4 Aug 2025 17:16:30 +0200 Subject: [PATCH 25/32] Removed unused params --- async_substrate_interface/async_substrate.py | 27 ++++++++------------ 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index d58321b..6ec0f8a 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -538,7 +538,6 @@ def __init__( shutdown_timer: Number of seconds to shut down websocket connection after last use """ # TODO allow setting max concurrent connections and rpc subscriptions per connection - # TODO reconnection logic self.ws_url = ws_url self.ws: Optional[ClientConnection] = None self.max_subscriptions = asyncio.Semaphore(max_subscriptions) @@ -551,7 +550,6 @@ def __init__( self._send_recv_task = None self._inflight: dict[str, str] = {} self._attempts = 0 - self._initialized = False # TODO remove self._lock = asyncio.Lock() self._exit_task = None self._options = options if options else {} @@ -612,19 +610,17 @@ async def connect(self, force=False): if self._exit_task: self._exit_task.cancel() if self.state not in (State.OPEN, State.CONNECTING) or force: - if not self._initialized or force: - try: - await asyncio.wait_for(self._cancel(), timeout=10.0) - except asyncio.TimeoutError: - pass - self.ws = await asyncio.wait_for( - connect(self.ws_url, **self._options), timeout=10.0 + try: + await asyncio.wait_for(self._cancel(), timeout=10.0) + except asyncio.TimeoutError: + pass + self.ws = await asyncio.wait_for( + connect(self.ws_url, **self._options), timeout=10.0 + ) + if self._send_recv_task is None or self._send_recv_task.done(): + self._send_recv_task = asyncio.get_running_loop().create_task( + self._handler(self.ws) ) - if self._send_recv_task is None or self._send_recv_task.done(): - self._send_recv_task = asyncio.get_running_loop().create_task( - self._handler(self.ws) - ) - self._initialized = True async def _handler(self, ws: ClientConnection) -> None: recv_task = asyncio.create_task(self._start_receiving(ws)) @@ -681,7 +677,6 @@ async def shutdown(self): except asyncio.TimeoutError: pass self.ws = None - self._initialized = False self._send_recv_task = None async def _recv(self, recd: bytes) -> None: @@ -2344,8 +2339,6 @@ async def _make_rpc_request( force_legacy_decode: bool = False, ) -> RequestManager.RequestResults: request_manager = RequestManager(payloads) - # TODO maybe instead of the current logic, I should assign the futs during send() and then just - # TODO use that to determine when it's completed. But how would this work with subscriptions? subscription_added = False From 415c66a71d4335785b423ce5a489df8bc44cb10a Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Mon, 4 Aug 2025 17:17:16 +0200 Subject: [PATCH 26/32] TODONE --- async_substrate_interface/async_substrate.py | 1 - 1 file changed, 1 deletion(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 6ec0f8a..9849cae 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -692,7 +692,6 @@ async def _recv(self, recd: bytes) -> None: self._received[response["id"]].set_result(response) self._in_use_ids.remove(response["id"]) elif "params" in response: - # TODO self._inflight won't work with subscriptions sub_id = response["params"]["subscription"] if sub_id not in self._received_subscriptions: self._received_subscriptions[sub_id] = asyncio.Queue() From bff6a1a8d32182d07177e0c4f14cf121b317f6f2 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Mon, 4 Aug 2025 17:32:42 +0200 Subject: [PATCH 27/32] Fin. --- async_substrate_interface/async_substrate.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 9849cae..9ce442c 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -633,9 +633,9 @@ async def _handler(self, ws: ClientConnection) -> None: should_reconnect = False for task in pending: task.cancel() - if isinstance(recv_task.result(), asyncio.TimeoutError): - # TODO check the logic here - should_reconnect = True + for task in done: + if isinstance(task.result(), (asyncio.TimeoutError, ConnectionClosed)): + should_reconnect = True if should_reconnect is True: for original_id, payload in list(self._inflight.items()): self._received[original_id] = loop.create_future() @@ -796,6 +796,7 @@ async def retrieve(self, item_id: str) -> Optional[dict]: if item.done(): self.max_subscriptions.release() del self._received[item_id] + return item.result() else: try: From 1ad38c5f71afdc976748177603530067658644d9 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Mon, 4 Aug 2025 18:23:39 +0200 Subject: [PATCH 28/32] Handle different loops --- async_substrate_interface/async_substrate.py | 26 ++++---------------- 1 file changed, 5 insertions(+), 21 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 9ce442c..c998e92 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -546,7 +546,7 @@ def __init__( self.retry_timeout = retry_timeout self._received: dict[str, asyncio.Future] = {} self._received_subscriptions: dict[str, asyncio.Queue] = {} - self._sending = asyncio.Queue() + self._sending: Optional[asyncio.Queue] = None self._send_recv_task = None self._inflight: dict[str, str] = {} self._attempts = 0 @@ -554,20 +554,6 @@ def __init__( self._exit_task = None self._options = options if options else {} self._log_raw_websockets = _log_raw_websockets - - try: - now = asyncio.get_running_loop().time() - except RuntimeError: - warnings.warn( - "You are instantiating the AsyncSubstrateInterface Websocket outside of an event loop. " - "Verify this is intended." - ) - # default value for in case there's no running asyncio loop - # this really doesn't matter in most cases, as it's only used for comparison on the first call to - # see how long it's been since the last call - now = 0.0 - self.last_received = now - self.last_sent = now self._in_use_ids = set() @property @@ -603,10 +589,9 @@ async def _cancel(self): ) async def connect(self, force=False): - now = await self.loop_time() - self.last_received = now - self.last_sent = now async with self._lock: + if self._sending is None or self._sending.empty(): + self._sending = asyncio.Queue() if self._exit_task: self._exit_task.cancel() if self.state not in (State.OPEN, State.CONNECTING) or force: @@ -683,7 +668,6 @@ async def _recv(self, recd: bytes) -> None: if self._log_raw_websockets: raw_websocket_logger.debug(f"WEBSOCKET_RECEIVE> {recd.decode()}") response = json.loads(recd) - self.last_received = await self.loop_time() if "id" in response: async with self._lock: self._inflight.pop(response["id"]) @@ -707,7 +691,7 @@ async def _start_receiving(self, ws: ClientConnection) -> Exception: ) await self._recv(recd) except Exception as e: - logger.exception("Start receving exception", exc_info=e) + logger.exception("Start receiving exception", exc_info=e) if isinstance(e, ssl.SSLError): e = ConnectionClosed for fut in self._received.values(): @@ -728,8 +712,8 @@ async def _start_sending(self, ws) -> Exception: if self._log_raw_websockets: raw_websocket_logger.debug(f"WEBSOCKET_SEND> {to_send}") await ws.send(to_send) - self.last_sent = await self.loop_time() except Exception as e: + logger.exception("Start sending exception", exc_info=e) if to_send is not None: self._received[to_send["id"]].set_exception(e) self._received[to_send["id"]].cancel() From 76b9f96890a78278afb49144ee09fe1718176bb8 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Mon, 4 Aug 2025 19:39:47 +0200 Subject: [PATCH 29/32] Fixed other chain subscriptions. --- async_substrate_interface/async_substrate.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index c998e92..13d4842 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -744,12 +744,15 @@ async def send(self, payload: dict) -> str: await self._sending.put(to_send) return original_id - async def unsubscribe(self, subscription_id: str) -> None: + async def unsubscribe(self, subscription_id: str, method: str = "author_unwatchExtrinsic") -> None: """ Unwatches a watched extrinsic subscription. Args: subscription_id: the internal ID of the subscription (typically a hex string) + method: Typically "author_unwatchExtrinsic" for extrinsics, but can have different unsubscribe + methods for things like watching chain head ("chain_unsubscribeFinalizedHeads" or + "chain_unsubscribeNewHeads") """ async with self._lock: original_id = get_next_id() @@ -760,7 +763,7 @@ async def unsubscribe(self, subscription_id: str) -> None: to_send = { "jsonrpc": "2.0", "id": original_id, - "method": "author_unwatchExtrinsic", + "method": method, "params": [subscription_id], } await self._sending.put(to_send) @@ -807,7 +810,7 @@ def __init__( max_retries: int = 5, retry_timeout: float = 60.0, _mock: bool = False, - _log_raw_websockets: bool = False, + _log_raw_websockets: bool = True, # TODO change this back ws_shutdown_timer: float = 5.0, decode_ss58: bool = False, ): @@ -1727,13 +1730,13 @@ async def result_handler( if subscription_result is not None: reached = True + logger.info("REACHED!") # Handler returned end result: unsubscribe from further updates - self._forgettable_task = asyncio.create_task( - self.rpc_request( - f"chain_unsubscribe{rpc_method_prefix}Heads", - [subscription_id], + async with self.ws as ws: + await ws.unsubscribe( + subscription_id, + method=f"chain_unsubscribe{rpc_method_prefix}Heads", ) - ) return subscription_result, reached From 73db3c937f5cec38d88eb926c78cf380dd797e1c Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Mon, 4 Aug 2025 19:42:37 +0200 Subject: [PATCH 30/32] TODONE --- async_substrate_interface/async_substrate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 13d4842..7d61cbe 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -810,7 +810,7 @@ def __init__( max_retries: int = 5, retry_timeout: float = 60.0, _mock: bool = False, - _log_raw_websockets: bool = True, # TODO change this back + _log_raw_websockets: bool = False, ws_shutdown_timer: float = 5.0, decode_ss58: bool = False, ): From 36eb95b00fc74e9b00594d234fba6fc47ea1d156 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Mon, 4 Aug 2025 19:42:46 +0200 Subject: [PATCH 31/32] Ruff --- async_substrate_interface/async_substrate.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 7d61cbe..c827377 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -744,7 +744,9 @@ async def send(self, payload: dict) -> str: await self._sending.put(to_send) return original_id - async def unsubscribe(self, subscription_id: str, method: str = "author_unwatchExtrinsic") -> None: + async def unsubscribe( + self, subscription_id: str, method: str = "author_unwatchExtrinsic" + ) -> None: """ Unwatches a watched extrinsic subscription. From f9278254d0e0cbbd0dbbebae15b6e800692f263b Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Mon, 4 Aug 2025 21:21:02 +0200 Subject: [PATCH 32/32] Bump changelog + version --- CHANGELOG.md | 18 ++++++++++++++++++ pyproject.toml | 2 +- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f9bc44e..41791c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,23 @@ # Changelog +## 1.5.0 /2025-08-04 +* ConcurrencyError fix by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/162 +* Added better typing by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/163 +* Fix arg order in retries by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/165 + * removes "bool object has no attribute Metadata" errors +* Concurrency improvements by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/164 + * True Runtime independence in AsyncSubstrateInterface: + * ensures no need to reload types from a shared object that may interfere with concurrency + * increases memory usage slightly, but drops CPU usage dramatically by not needing to reload the type registry when retrieving from cache + * RuntimeCache improved to automatically add additional mappings + * Uses a single dispatcher queue for concurrent sending/receiving which eliminates the need for coroutines to manage their own state in regard to connection management. + * Futures from the Websocket now get assigned their own exceptions + * Overall cleaner logic flow with regard to rpc requests + * The Websocket object now handles reconnections/timeouts + * Separation of normal ping-pong calls and longer-running subscriptions + +**Full Changelog**: https://github.com/opentensor/async-substrate-interface/compare/v1.4.3...v1.5.0 + ## 1.4.3 /2025-07-28 * Add "Token" to caught error messages for extrinsic receipts by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/156 * runtime version switching by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/157 diff --git a/pyproject.toml b/pyproject.toml index 5fe8b39..bdddd5c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "async-substrate-interface" -version = "1.4.3" +version = "1.5.0" description = "Asyncio library for interacting with substrate. Mostly API-compatible with py-substrate-interface" readme = "README.md" license = { file = "LICENSE" }