Skip to content

query multiple/decoding fix #168

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions async_substrate_interface/async_substrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -784,9 +784,9 @@ async def retrieve(self, item_id: str) -> Optional[dict]:
if item is not None:
if item.done():
self.max_subscriptions.release()
res = item.result()
del self._received[item_id]

return item.result()
return res
else:
try:
return self._received_subscriptions[item_id].get_nowait()
Expand Down Expand Up @@ -1165,7 +1165,7 @@ async def get_runtime_for_version(
async def _get_runtime_for_version(
self, runtime_version: int, block_hash: Optional[str] = None
) -> Runtime:
runtime_config = RuntimeConfigurationObject()
runtime_config = RuntimeConfigurationObject(ss58_format=self.ss58_format)
runtime_config.clear_type_registry()
runtime_config.update_type_registry(load_type_registry_preset(name="core"))

Expand Down Expand Up @@ -2337,7 +2337,7 @@ async def _make_rpc_request(
request_manager.add_request(item_id, payload["id"])

while True:
for item_id in list(request_manager.response_map.keys()):
for item_id in request_manager.unresponded():
if (
item_id not in request_manager.responses
or asyncio.iscoroutinefunction(result_handler)
Expand Down Expand Up @@ -2368,7 +2368,6 @@ async def _make_rpc_request(
runtime=runtime,
force_legacy_decode=force_legacy_decode,
)

request_manager.add_response(
item_id, decoded_response, complete
)
Expand Down
2 changes: 1 addition & 1 deletion async_substrate_interface/sync_substrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -1924,7 +1924,7 @@ def _make_rpc_request(
_received[response["params"]["subscription"]] = response
else:
raise SubstrateRequestException(response)
for item_id in list(request_manager.response_map.keys()):
for item_id in request_manager.unresponded():
if item_id not in request_manager.responses or isinstance(
result_handler, Callable
):
Expand Down
9 changes: 9 additions & 0 deletions async_substrate_interface/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,15 @@ def get_results(self) -> RequestResults:
request_id: info["results"] for request_id, info in self.responses.items()
}

def unresponded(self):
"""
Yields items from response_map whose corresponding response is missing or incomplete.
"""
for item_id, request_id in list(self.response_map.items()):
response_info = self.responses.get(request_id)
if response_info is None or not response_info["complete"]:
yield item_id


@dataclass
class Preprocessed:
Expand Down
19 changes: 18 additions & 1 deletion tests/integration_tests/test_async_substrate_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,26 @@ async def test_get_events_proper_decoding():
async with AsyncSubstrateInterface(ARCHIVE_ENTRYPOINT) as substrate:
all_events = await substrate.get_events(block_hash=block_hash)
event = all_events[1]
print(type(event["attributes"]))
assert event["attributes"] == (
"5G1NjW9YhXLadMWajvTkfcJy6up3yH2q1YzMXDTi6ijanChe",
30,
"0xa6b4e5c8241d60ece0c25056b19f7d21ae845269fc771ad46bf3e011865129a5",
)


@pytest.mark.asyncio
async def test_query_multiple():
block = 6153277
cks = [
"5FH9AQM4kqbkdC9jyV5FrdEWVYt41nkhFstop7Vhyfb9ZsXt",
"5GQxLKxjZWNZDsghmYcw7P6ahC7XJCjx1WD94WGh92quSycx",
"5EcaPiDT1cv951SkCFsvdHDs2yAEUWhJDuRP9mHb343WnaVn",
]
async with AsyncSubstrateInterface(ARCHIVE_ENTRYPOINT) as substrate:
block_hash = await substrate.get_block_hash(block_id=block)
assert await substrate.query_multiple(
params=cks,
module="SubtensorModule",
storage_function="OwnedHotkeys",
block_hash=block_hash,
)
18 changes: 17 additions & 1 deletion tests/integration_tests/test_substrate_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,25 @@ def test_get_events_proper_decoding():
with SubstrateInterface(ARCHIVE_ENTRYPOINT) as substrate:
all_events = substrate.get_events(block_hash=block_hash)
event = all_events[1]
print(type(event["attributes"]))
assert event["attributes"] == (
"5G1NjW9YhXLadMWajvTkfcJy6up3yH2q1YzMXDTi6ijanChe",
30,
"0xa6b4e5c8241d60ece0c25056b19f7d21ae845269fc771ad46bf3e011865129a5",
)


def test_query_multiple():
block = 6153277
cks = [
"5FH9AQM4kqbkdC9jyV5FrdEWVYt41nkhFstop7Vhyfb9ZsXt",
"5GQxLKxjZWNZDsghmYcw7P6ahC7XJCjx1WD94WGh92quSycx",
"5EcaPiDT1cv951SkCFsvdHDs2yAEUWhJDuRP9mHb343WnaVn",
]
with SubstrateInterface(ARCHIVE_ENTRYPOINT) as substrate:
block_hash = substrate.get_block_hash(block_id=block)
assert substrate.query_multiple(
params=cks,
module="SubtensorModule",
storage_function="OwnedHotkeys",
block_hash=block_hash,
)
9 changes: 5 additions & 4 deletions tests/unit_tests/asyncio_/test_substrate_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import pytest
from websockets.exceptions import InvalidURI
from websockets.protocol import State

from async_substrate_interface.async_substrate import AsyncSubstrateInterface
from async_substrate_interface.types import ScaleObj
Expand Down Expand Up @@ -103,17 +104,17 @@ async def test_websocket_shutdown_timer():
async with AsyncSubstrateInterface("wss://lite.sub.latent.to:443") as substrate:
await substrate.get_chain_head()
await asyncio.sleep(6)
assert (
substrate.ws._initialized is False
) # connection should have closed automatically
assert (
substrate.ws.state is State.CLOSED
) # connection should have closed automatically

# using custom ws shutdown timer of 10.0 seconds
async with AsyncSubstrateInterface(
"wss://lite.sub.latent.to:443", ws_shutdown_timer=10.0
) as substrate:
await substrate.get_chain_head()
await asyncio.sleep(6) # same sleep time as before
assert substrate.ws._initialized is True # connection should still be open
assert substrate.ws.state is State.OPEN # connection should still be open


@pytest.mark.asyncio
Expand Down
Loading