Skip to content

Add archive node to retry substrate #128

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions async_substrate_interface/async_substrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
BlockNotFound,
MaxRetriesExceeded,
MetadataAtVersionNotFound,
StateDiscardedError,
)
from async_substrate_interface.protocols import Keypair
from async_substrate_interface.types import (
Expand Down Expand Up @@ -2148,6 +2149,7 @@ async def _make_rpc_request(
storage_item,
result_handler,
)

request_manager.add_response(
item_id, decoded_response, complete
)
Expand Down Expand Up @@ -2234,9 +2236,8 @@ async def rpc_request(
]
result = await self._make_rpc_request(payloads, result_handler=result_handler)
if "error" in result[payload_id][0]:
if (
"Failed to get runtime version"
in result[payload_id][0]["error"]["message"]
if "Failed to get runtime version" in (
err_msg := result[payload_id][0]["error"]["message"]
):
logger.warning(
"Failed to get runtime. Re-fetching from chain, and retrying."
Expand All @@ -2245,7 +2246,14 @@ async def rpc_request(
return await self.rpc_request(
method, params, result_handler, block_hash, reuse_block_hash
)
raise SubstrateRequestException(result[payload_id][0]["error"]["message"])
elif (
"Client error: Api called for an unknown Block: State already discarded"
in err_msg
):
bh = err_msg.split("State already discarded for ")[1].strip()
raise StateDiscardedError(bh)
else:
raise SubstrateRequestException(err_msg)
if "result" in result[payload_id][0]:
return result[payload_id][0]
else:
Expand Down
10 changes: 10 additions & 0 deletions async_substrate_interface/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ def __init__(self):
super().__init__(message)


class StateDiscardedError(SubstrateRequestException):
def __init__(self, block_hash: str):
self.block_hash = block_hash
message = (
f"State discarded for {block_hash}. This indicates the block is too old, and you should instead "
f"make this request using an archive node."
)
super().__init__(message)


class StorageFunctionNotFound(ValueError):
pass

Expand Down
49 changes: 40 additions & 9 deletions async_substrate_interface/substrate_addons.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from websockets.exceptions import ConnectionClosed

from async_substrate_interface.async_substrate import AsyncSubstrateInterface, Websocket
from async_substrate_interface.errors import MaxRetriesExceeded
from async_substrate_interface.errors import MaxRetriesExceeded, StateDiscardedError
from async_substrate_interface.sync_substrate import SubstrateInterface

logger = logging.getLogger("async_substrate_interface")
Expand Down Expand Up @@ -117,13 +117,17 @@ def __init__(
max_retries: int = 5,
retry_timeout: float = 60.0,
_mock: bool = False,
archive_nodes: Optional[list[str]] = None,
):
fallback_chains = fallback_chains or []
self.fallback_chains = (
iter(fallback_chains)
if not retry_forever
else cycle(fallback_chains + [url])
)
self.archive_nodes = (
iter(archive_nodes) if not retry_forever else cycle(archive_nodes)
)
self.use_remote_preset = use_remote_preset
self.chain_name = chain_name
self._mock = _mock
Expand Down Expand Up @@ -174,20 +178,32 @@ def _retry(self, method, *args, **kwargs):
EOFError,
ConnectionClosed,
TimeoutError,
socket.gaierror,
StateDiscardedError,
) as e:
use_archive = isinstance(e, StateDiscardedError)
try:
self._reinstantiate_substrate(e)
self._reinstantiate_substrate(e, use_archive=use_archive)
return method_(*args, **kwargs)
except StopIteration:
logger.error(
f"Max retries exceeded with {self.url}. No more fallback chains."
)
raise MaxRetriesExceeded

def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None:
next_network = next(self.fallback_chains)
def _reinstantiate_substrate(
self, e: Optional[Exception] = None, use_archive: bool = False
) -> None:
if use_archive:
bh = getattr(e, "block_hash", "Unknown Block Hash")
logger.info(
f"Attempt made to {bh} failed for state discarded. Attempting to switch to archive node."
)
next_network = next(self.archive_nodes)
else:
next_network = next(self.fallback_chains)
self.ws.close()
if e.__class__ == MaxRetriesExceeded:
if isinstance(e, MaxRetriesExceeded):
logger.error(
f"Max retries exceeded with {self.url}. Retrying with {next_network}."
)
Expand Down Expand Up @@ -243,13 +259,17 @@ def __init__(
max_retries: int = 5,
retry_timeout: float = 60.0,
_mock: bool = False,
archive_nodes: Optional[list[str]] = None,
):
fallback_chains = fallback_chains or []
self.fallback_chains = (
iter(fallback_chains)
if not retry_forever
else cycle(fallback_chains + [url])
)
self.archive_nodes = (
iter(archive_nodes) if not retry_forever else cycle(archive_nodes)
)
self.use_remote_preset = use_remote_preset
self.chain_name = chain_name
self._mock = _mock
Expand All @@ -272,9 +292,18 @@ def __init__(
for method in RETRY_METHODS:
setattr(self, method, partial(self._retry, method))

async def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None:
next_network = next(self.fallback_chains)
if e.__class__ == MaxRetriesExceeded:
async def _reinstantiate_substrate(
self, e: Optional[Exception] = None, use_archive: bool = False
) -> None:
if use_archive:
bh = getattr(e, "block_hash", "Unknown Block Hash")
logger.info(
f"Attempt made to {bh} failed for state discarded. Attempting to switch to archive node."
)
next_network = next(self.archive_nodes)
else:
next_network = next(self.fallback_chains)
if isinstance(e, MaxRetriesExceeded):
logger.error(
f"Max retries exceeded with {self.url}. Retrying with {next_network}."
)
Expand Down Expand Up @@ -314,9 +343,11 @@ async def _retry(self, method, *args, **kwargs):
ConnectionClosed,
EOFError,
socket.gaierror,
StateDiscardedError,
) as e:
use_archive = isinstance(e, StateDiscardedError)
try:
await self._reinstantiate_substrate(e)
await self._reinstantiate_substrate(e, use_archive=use_archive)
return await method_(*args, **kwargs)
except StopAsyncIteration:
logger.error(
Expand Down
15 changes: 11 additions & 4 deletions async_substrate_interface/sync_substrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
BlockNotFound,
MaxRetriesExceeded,
MetadataAtVersionNotFound,
StateDiscardedError,
)
from async_substrate_interface.protocols import Keypair
from async_substrate_interface.types import (
Expand Down Expand Up @@ -1944,9 +1945,8 @@ def rpc_request(
]
result = self._make_rpc_request(payloads, result_handler=result_handler)
if "error" in result[payload_id][0]:
if (
"Failed to get runtime version"
in result[payload_id][0]["error"]["message"]
if "Failed to get runtime version" in (
err_msg := result[payload_id][0]["error"]["message"]
):
logger.warning(
"Failed to get runtime. Re-fetching from chain, and retrying."
Expand All @@ -1955,7 +1955,14 @@ def rpc_request(
return self.rpc_request(
method, params, result_handler, block_hash, reuse_block_hash
)
raise SubstrateRequestException(result[payload_id][0]["error"]["message"])
elif (
"Client error: Api called for an unknown Block: State already discarded"
in err_msg
):
bh = err_msg.split("State already discarded for ")[1].strip()
raise StateDiscardedError(bh)
else:
raise SubstrateRequestException(err_msg)
if "result" in result[payload_id][0]:
return result[payload_id][0]
else:
Expand Down
33 changes: 31 additions & 2 deletions tests/test_substrate_addons.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
import pytest
import time

from async_substrate_interface.substrate_addons import RetrySyncSubstrate
from async_substrate_interface.errors import MaxRetriesExceeded
from async_substrate_interface import AsyncSubstrateInterface, SubstrateInterface
from async_substrate_interface.substrate_addons import (
RetrySyncSubstrate,
RetryAsyncSubstrate,
)
from async_substrate_interface.errors import MaxRetriesExceeded, StateDiscardedError
from tests.conftest import start_docker_container

LATENT_LITE_ENTRYPOINT = "wss://lite.sub.latent.to:443"
Expand Down Expand Up @@ -70,3 +74,28 @@ def test_retry_sync_substrate_offline():
RetrySyncSubstrate(
"ws://127.0.0.1:9945", fallback_chains=["ws://127.0.0.1:9946"]
)


@pytest.mark.asyncio
async def test_retry_async_subtensor_archive_node():
async with AsyncSubstrateInterface("wss://lite.sub.latent.to:443") as substrate:
current_block = await substrate.get_block_number()
old_block = current_block - 1000
with pytest.raises(StateDiscardedError):
await substrate.get_block(block_number=old_block)
async with RetryAsyncSubstrate(
"wss://lite.sub.latent.to:443", archive_nodes=["ws://178.156.172.75:9944"]
) as substrate:
assert isinstance((await substrate.get_block(block_number=old_block)), dict)


def test_retry_sync_subtensor_archive_node():
with SubstrateInterface("wss://lite.sub.latent.to:443") as substrate:
current_block = substrate.get_block_number()
old_block = current_block - 1000
with pytest.raises(StateDiscardedError):
substrate.get_block(block_number=old_block)
with RetrySyncSubstrate(
"wss://lite.sub.latent.to:443", archive_nodes=["ws://178.156.172.75:9944"]
) as substrate:
assert isinstance((substrate.get_block(block_number=old_block)), dict)
Loading