From 8b7eb0ce927e1d23097715dc8dc59c8ce27a92e0 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Fri, 6 Jun 2025 11:51:59 +0200 Subject: [PATCH 1/3] Add archive node for RetrySubstrate --- async_substrate_interface/async_substrate.py | 16 +++++++++---- async_substrate_interface/errors.py | 10 ++++++++ async_substrate_interface/substrate_addons.py | 23 +++++++++++++++---- async_substrate_interface/sync_substrate.py | 15 ++++++++---- 4 files changed, 52 insertions(+), 12 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index acdbade..e22054a 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -41,6 +41,7 @@ BlockNotFound, MaxRetriesExceeded, MetadataAtVersionNotFound, + StateDiscardedError, ) from async_substrate_interface.protocols import Keypair from async_substrate_interface.types import ( @@ -2144,6 +2145,7 @@ async def _make_rpc_request( storage_item, result_handler, ) + request_manager.add_response( item_id, decoded_response, complete ) @@ -2230,9 +2232,8 @@ async def rpc_request( ] result = await self._make_rpc_request(payloads, result_handler=result_handler) if "error" in result[payload_id][0]: - if ( - "Failed to get runtime version" - in result[payload_id][0]["error"]["message"] + if "Failed to get runtime version" in ( + err_msg := result[payload_id][0]["error"]["message"] ): logger.warning( "Failed to get runtime. Re-fetching from chain, and retrying." @@ -2241,7 +2242,14 @@ async def rpc_request( return await self.rpc_request( method, params, result_handler, block_hash, reuse_block_hash ) - raise SubstrateRequestException(result[payload_id][0]["error"]["message"]) + elif ( + "Client error: Api called for an unknown Block: State already discarded" + in err_msg + ): + bh = err_msg.split("State already discarded for ")[1].strip() + raise StateDiscardedError(bh) + else: + raise SubstrateRequestException(err_msg) if "result" in result[payload_id][0]: return result[payload_id][0] else: diff --git a/async_substrate_interface/errors.py b/async_substrate_interface/errors.py index c6a2d8d..d016089 100644 --- a/async_substrate_interface/errors.py +++ b/async_substrate_interface/errors.py @@ -22,6 +22,16 @@ def __init__(self): super().__init__(message) +class StateDiscardedError(SubstrateRequestException): + def __init__(self, block_hash: str): + self.block_hash = block_hash + message = ( + f"State discarded for {block_hash}. This indicates the block is too old, and you should instead " + f"make this request using an archive node." + ) + super().__init__(message) + + class StorageFunctionNotFound(ValueError): pass diff --git a/async_substrate_interface/substrate_addons.py b/async_substrate_interface/substrate_addons.py index 5edb26a..2d772bc 100644 --- a/async_substrate_interface/substrate_addons.py +++ b/async_substrate_interface/substrate_addons.py @@ -13,7 +13,7 @@ from websockets.exceptions import ConnectionClosed from async_substrate_interface.async_substrate import AsyncSubstrateInterface, Websocket -from async_substrate_interface.errors import MaxRetriesExceeded +from async_substrate_interface.errors import MaxRetriesExceeded, StateDiscardedError from async_substrate_interface.sync_substrate import SubstrateInterface logger = logging.getLogger("async_substrate_interface") @@ -243,6 +243,7 @@ def __init__( max_retries: int = 5, retry_timeout: float = 60.0, _mock: bool = False, + archive_nodes: Optional[list[str]] = None, ): fallback_chains = fallback_chains or [] self.fallback_chains = ( @@ -250,6 +251,9 @@ def __init__( if not retry_forever else cycle(fallback_chains + [url]) ) + self.archive_nodes = ( + iter(archive_nodes) if not retry_forever else cycle(archive_nodes) + ) self.use_remote_preset = use_remote_preset self.chain_name = chain_name self._mock = _mock @@ -272,8 +276,17 @@ def __init__( for method in RETRY_METHODS: setattr(self, method, partial(self._retry, method)) - async def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None: - next_network = next(self.fallback_chains) + async def _reinstantiate_substrate( + self, e: Optional[Exception] = None, use_archive: bool = False + ) -> None: + if use_archive: + bh = getattr(e, "block_hash", "Unknown Block Hash") + logger.info( + f"Attempt made to {bh} failed for state discarded. Attempting to switch to archive node." + ) + next_network = next(self.archive_nodes) + else: + next_network = next(self.fallback_chains) if e.__class__ == MaxRetriesExceeded: logger.error( f"Max retries exceeded with {self.url}. Retrying with {next_network}." @@ -314,9 +327,11 @@ async def _retry(self, method, *args, **kwargs): ConnectionClosed, EOFError, socket.gaierror, + StateDiscardedError, ) as e: + use_archive = isinstance(e, StateDiscardedError) try: - await self._reinstantiate_substrate(e) + await self._reinstantiate_substrate(e, use_archive=use_archive) return await method_(*args, **kwargs) except StopAsyncIteration: logger.error( diff --git a/async_substrate_interface/sync_substrate.py b/async_substrate_interface/sync_substrate.py index dc8d178..e6f4644 100644 --- a/async_substrate_interface/sync_substrate.py +++ b/async_substrate_interface/sync_substrate.py @@ -24,6 +24,7 @@ BlockNotFound, MaxRetriesExceeded, MetadataAtVersionNotFound, + StateDiscardedError, ) from async_substrate_interface.protocols import Keypair from async_substrate_interface.types import ( @@ -1944,9 +1945,8 @@ def rpc_request( ] result = self._make_rpc_request(payloads, result_handler=result_handler) if "error" in result[payload_id][0]: - if ( - "Failed to get runtime version" - in result[payload_id][0]["error"]["message"] + if "Failed to get runtime version" in ( + err_msg := result[payload_id][0]["error"]["message"] ): logger.warning( "Failed to get runtime. Re-fetching from chain, and retrying." @@ -1955,7 +1955,14 @@ def rpc_request( return self.rpc_request( method, params, result_handler, block_hash, reuse_block_hash ) - raise SubstrateRequestException(result[payload_id][0]["error"]["message"]) + elif ( + "Client error: Api called for an unknown Block: State already discarded" + in err_msg + ): + bh = err_msg.split("State already discarded for ")[1].strip() + raise StateDiscardedError(bh) + else: + raise SubstrateRequestException(err_msg) if "result" in result[payload_id][0]: return result[payload_id][0] else: From 6ef9efd0e7666c833b18dae3353a63f722792a55 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Fri, 6 Jun 2025 17:19:11 +0200 Subject: [PATCH 2/3] Add archive node for RetrySyncSubstrate --- async_substrate_interface/substrate_addons.py | 26 +++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/async_substrate_interface/substrate_addons.py b/async_substrate_interface/substrate_addons.py index 2d772bc..c9ca1e8 100644 --- a/async_substrate_interface/substrate_addons.py +++ b/async_substrate_interface/substrate_addons.py @@ -117,6 +117,7 @@ def __init__( max_retries: int = 5, retry_timeout: float = 60.0, _mock: bool = False, + archive_nodes: Optional[list[str]] = None, ): fallback_chains = fallback_chains or [] self.fallback_chains = ( @@ -124,6 +125,9 @@ def __init__( if not retry_forever else cycle(fallback_chains + [url]) ) + self.archive_nodes = ( + iter(archive_nodes) if not retry_forever else cycle(archive_nodes) + ) self.use_remote_preset = use_remote_preset self.chain_name = chain_name self._mock = _mock @@ -174,9 +178,12 @@ def _retry(self, method, *args, **kwargs): EOFError, ConnectionClosed, TimeoutError, + socket.gaierror, + StateDiscardedError, ) as e: + use_archive = isinstance(e, StateDiscardedError) try: - self._reinstantiate_substrate(e) + self._reinstantiate_substrate(e, use_archive=use_archive) return method_(*args, **kwargs) except StopIteration: logger.error( @@ -184,10 +191,19 @@ def _retry(self, method, *args, **kwargs): ) raise MaxRetriesExceeded - def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None: - next_network = next(self.fallback_chains) + def _reinstantiate_substrate( + self, e: Optional[Exception] = None, use_archive: bool = False + ) -> None: + if use_archive: + bh = getattr(e, "block_hash", "Unknown Block Hash") + logger.info( + f"Attempt made to {bh} failed for state discarded. Attempting to switch to archive node." + ) + next_network = next(self.archive_nodes) + else: + next_network = next(self.fallback_chains) self.ws.close() - if e.__class__ == MaxRetriesExceeded: + if isinstance(e, MaxRetriesExceeded): logger.error( f"Max retries exceeded with {self.url}. Retrying with {next_network}." ) @@ -287,7 +303,7 @@ async def _reinstantiate_substrate( next_network = next(self.archive_nodes) else: next_network = next(self.fallback_chains) - if e.__class__ == MaxRetriesExceeded: + if isinstance(e, MaxRetriesExceeded): logger.error( f"Max retries exceeded with {self.url}. Retrying with {next_network}." ) From f1d5de38c3b64b2b978b81ffa9fe12bfb752a249 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Mon, 9 Jun 2025 20:50:29 +0200 Subject: [PATCH 3/3] Added tests --- tests/test_substrate_addons.py | 33 +++++++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/tests/test_substrate_addons.py b/tests/test_substrate_addons.py index c2e6854..da671eb 100644 --- a/tests/test_substrate_addons.py +++ b/tests/test_substrate_addons.py @@ -4,8 +4,12 @@ import pytest import time -from async_substrate_interface.substrate_addons import RetrySyncSubstrate -from async_substrate_interface.errors import MaxRetriesExceeded +from async_substrate_interface import AsyncSubstrateInterface, SubstrateInterface +from async_substrate_interface.substrate_addons import ( + RetrySyncSubstrate, + RetryAsyncSubstrate, +) +from async_substrate_interface.errors import MaxRetriesExceeded, StateDiscardedError from tests.conftest import start_docker_container LATENT_LITE_ENTRYPOINT = "wss://lite.sub.latent.to:443" @@ -70,3 +74,28 @@ def test_retry_sync_substrate_offline(): RetrySyncSubstrate( "ws://127.0.0.1:9945", fallback_chains=["ws://127.0.0.1:9946"] ) + + +@pytest.mark.asyncio +async def test_retry_async_subtensor_archive_node(): + async with AsyncSubstrateInterface("wss://lite.sub.latent.to:443") as substrate: + current_block = await substrate.get_block_number() + old_block = current_block - 1000 + with pytest.raises(StateDiscardedError): + await substrate.get_block(block_number=old_block) + async with RetryAsyncSubstrate( + "wss://lite.sub.latent.to:443", archive_nodes=["ws://178.156.172.75:9944"] + ) as substrate: + assert isinstance((await substrate.get_block(block_number=old_block)), dict) + + +def test_retry_sync_subtensor_archive_node(): + with SubstrateInterface("wss://lite.sub.latent.to:443") as substrate: + current_block = substrate.get_block_number() + old_block = current_block - 1000 + with pytest.raises(StateDiscardedError): + substrate.get_block(block_number=old_block) + with RetrySyncSubstrate( + "wss://lite.sub.latent.to:443", archive_nodes=["ws://178.156.172.75:9944"] + ) as substrate: + assert isinstance((substrate.get_block(block_number=old_block)), dict)