Skip to content

Release/1.3.0 #132

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Jun 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
97d78d4
Fixes potential issues with starting a new loop
thewhaleking Jun 5, 2025
cf24395
Websockets potential not started when started (issue #95).
thewhaleking Jun 5, 2025
7420723
Fixes potential cache issues.
thewhaleking Jun 5, 2025
ce855da
Adjust sleep wait
thewhaleking Jun 5, 2025
b1f9157
Ensure ids cannot be reused while still in use.
thewhaleking Jun 5, 2025
5b97c88
Change where `_in_use_ids` gets updated.
thewhaleking Jun 5, 2025
8b7eb0c
Add archive node for RetrySubstrate
thewhaleking Jun 6, 2025
6ef9efd
Add archive node for RetrySyncSubstrate
thewhaleking Jun 6, 2025
9558526
Added magic number comment.
thewhaleking Jun 9, 2025
fab28fc
Added unit tests for cache
thewhaleking Jun 9, 2025
a52f356
Renamed `tests/unittests/asyncio` to `asyncio_` to avoid name conflicts.
thewhaleking Jun 9, 2025
831c1c0
Fix unit tests + ruff
thewhaleking Jun 9, 2025
f1d5de3
Added tests
thewhaleking Jun 9, 2025
f9776bc
Ensure runtime_call is the same in both async and sync versions. Fix …
thewhaleking Jun 9, 2025
f62e0eb
Add test workflow
thewhaleking Jun 9, 2025
5865615
Merge pull request #129 from opentensor/feat/thewhaleking/tests
thewhaleking Jun 9, 2025
7d03ece
Idk
thewhaleking Jun 9, 2025
a46e1b8
Trigger no-op
thewhaleking Jun 9, 2025
e53f27f
Merge pull request #127 from opentensor/feat/thewhaleking/improvements
thewhaleking Jun 9, 2025
ed0c6d6
Merge remote-tracking branch 'origin/staging' into feat/thewhaleking/…
thewhaleking Jun 9, 2025
faa14ed
Merge pull request #128 from opentensor/feat/thewhaleking/add-archive…
thewhaleking Jun 9, 2025
72252f7
Updates changelog and version
thewhaleking Jun 10, 2025
b5106e3
Corrected dates
thewhaleking Jun 10, 2025
e0b7aa3
Merge pull request #131 from opentensor/update-changelog-version
thewhaleking Jun 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions .github/workflows/run-async-substrate-interface-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
name: Run Tests

on:
push:
branches: [main, staging]
pull_request:
branches: [main, staging]
workflow_dispatch:

jobs:
find-tests:
runs-on: ubuntu-latest
steps:
- name: Check-out repository
uses: actions/checkout@v4

- name: Find test files
id: get-tests
run: |
test_files=$(find tests -name "test*.py" | jq -R -s -c 'split("\n") | map(select(. != ""))')
echo "::set-output name=test-files::$test_files"

pull-docker-image:
runs-on: ubuntu-latest
steps:
- name: Log in to GitHub Container Registry
run: echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io -u $GITHUB_ACTOR --password-stdin

- name: Pull Docker Image
run: docker pull ghcr.io/opentensor/subtensor-localnet:devnet-ready

- name: Save Docker Image to Cache
run: docker save -o subtensor-localnet.tar ghcr.io/opentensor/subtensor-localnet:devnet-ready

- name: Upload Docker Image as Artifact
uses: actions/upload-artifact@v4
with:
name: subtensor-localnet
path: subtensor-localnet.tar

run-unit-tests:
name: ${{ matrix.test-file }} / Python ${{ matrix.python-version }}
needs:
- find-tests
- pull-docker-image
runs-on: ubuntu-latest
timeout-minutes: 30
strategy:
fail-fast: false
max-parallel: 32
matrix:
test-file: ${{ fromJson(needs.find-tests.outputs.test-files) }}
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]

steps:
- name: Check-out repository
uses: actions/checkout@v4

- name: Install uv
uses: astral-sh/setup-uv@v4
with:
python-version: ${{ matrix.python-version }}

- name: Install dependencies
run: |
uv venv .venv
source .venv/bin/activate
uv pip install .[dev]

- name: Download Docker Image
uses: actions/download-artifact@v4
with:
name: subtensor-localnet

- name: Load Docker Image
run: docker load -i subtensor-localnet.tar

- name: Run pytest
run: |
source .venv/bin/activate
uv run pytest ${{ matrix.test-file }} -v -s
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
# Changelog

## 1.2.1 /2025-05-22
## 1.3.0 /2025-06-10

* Add GH test runner by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/129
* Edge Case Fixes by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/127
* Add archive node to retry substrate by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/128

**Full Changelog**: https://github.com/opentensor/async-substrate-interface/compare/v1.2.2...v1.3.0

## 1.2.2 /2025-05-22

## What's Changed
* Add proper mock support by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/123
Expand Down
84 changes: 51 additions & 33 deletions async_substrate_interface/async_substrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
TYPE_CHECKING,
)

import asyncstdlib as a
from bt_decode import MetadataV15, PortableRegistry, decode as decode_by_type_string
from scalecodec.base import ScaleBytes, ScaleType, RuntimeConfigurationObject
from scalecodec.types import (
Expand All @@ -42,6 +41,7 @@
BlockNotFound,
MaxRetriesExceeded,
MetadataAtVersionNotFound,
StateDiscardedError,
)
from async_substrate_interface.protocols import Keypair
from async_substrate_interface.types import (
Expand All @@ -58,7 +58,7 @@
get_next_id,
rng as random,
)
from async_substrate_interface.utils.cache import async_sql_lru_cache
from async_substrate_interface.utils.cache import async_sql_lru_cache, CachedFetcher
from async_substrate_interface.utils.decoding import (
_determine_if_old_runtime_call,
_bt_decode_to_dict_or_list,
Expand Down Expand Up @@ -539,14 +539,17 @@ def __init__(
"You are instantiating the AsyncSubstrateInterface Websocket outside of an event loop. "
"Verify this is intended."
)
now = asyncio.new_event_loop().time()
# default value for in case there's no running asyncio loop
# this really doesn't matter in most cases, as it's only used for comparison on the first call to
# see how long it's been since the last call
now = 0.0
self.last_received = now
self.last_sent = now
self._in_use_ids = set()

async def __aenter__(self):
async with self._lock:
self._in_use += 1
await self.connect()
self._in_use += 1
await self.connect()
return self

@staticmethod
Expand All @@ -559,18 +562,19 @@ async def connect(self, force=False):
self.last_sent = now
if self._exit_task:
self._exit_task.cancel()
if not self._initialized or force:
self._initialized = True
try:
self._receiving_task.cancel()
await self._receiving_task
await self.ws.close()
except (AttributeError, asyncio.CancelledError):
pass
self.ws = await asyncio.wait_for(
connect(self.ws_url, **self._options), timeout=10
)
self._receiving_task = asyncio.create_task(self._start_receiving())
async with self._lock:
if not self._initialized or force:
try:
self._receiving_task.cancel()
await self._receiving_task
await self.ws.close()
except (AttributeError, asyncio.CancelledError):
pass
self.ws = await asyncio.wait_for(
connect(self.ws_url, **self._options), timeout=10
)
self._receiving_task = asyncio.create_task(self._start_receiving())
self._initialized = True

async def __aexit__(self, exc_type, exc_val, exc_tb):
async with self._lock: # TODO is this actually what I want to happen?
Expand Down Expand Up @@ -619,6 +623,7 @@ async def _recv(self) -> None:
self._open_subscriptions -= 1
if "id" in response:
self._received[response["id"]] = response
self._in_use_ids.remove(response["id"])
elif "params" in response:
self._received[response["params"]["subscription"]] = response
else:
Expand Down Expand Up @@ -649,6 +654,9 @@ async def send(self, payload: dict) -> int:
id: the internal ID of the request (incremented int)
"""
original_id = get_next_id()
while original_id in self._in_use_ids:
original_id = get_next_id()
self._in_use_ids.add(original_id)
# self._open_subscriptions += 1
await self.max_subscriptions.acquire()
try:
Expand All @@ -674,7 +682,7 @@ async def retrieve(self, item_id: int) -> Optional[dict]:
self.max_subscriptions.release()
return item
except KeyError:
await asyncio.sleep(0.001)
await asyncio.sleep(0.1)
return None


Expand Down Expand Up @@ -725,6 +733,7 @@ def __init__(
)
else:
self.ws = AsyncMock(spec=Websocket)

self._lock = asyncio.Lock()
self.config = {
"use_remote_preset": use_remote_preset,
Expand All @@ -748,6 +757,12 @@ def __init__(
self.registry_type_map = {}
self.type_id_to_name = {}
self._mock = _mock
self._block_hash_fetcher = CachedFetcher(512, self._get_block_hash)
self._parent_hash_fetcher = CachedFetcher(512, self._get_parent_block_hash)
self._runtime_info_fetcher = CachedFetcher(16, self._get_block_runtime_info)
self._runtime_version_for_fetcher = CachedFetcher(
512, self._get_block_runtime_version_for
)

async def __aenter__(self):
if not self._mock:
Expand Down Expand Up @@ -1869,9 +1884,8 @@ async def get_metadata(self, block_hash=None) -> MetadataV15:

return runtime.metadata_v15

@a.lru_cache(maxsize=512)
async def get_parent_block_hash(self, block_hash):
return await self._get_parent_block_hash(block_hash)
return await self._parent_hash_fetcher.execute(block_hash)

async def _get_parent_block_hash(self, block_hash):
block_header = await self.rpc_request("chain_getHeader", [block_hash])
Expand Down Expand Up @@ -1916,9 +1930,8 @@ async def get_storage_by_key(self, block_hash: str, storage_key: str) -> Any:
"Unknown error occurred during retrieval of events"
)

@a.lru_cache(maxsize=16)
async def get_block_runtime_info(self, block_hash: str) -> dict:
return await self._get_block_runtime_info(block_hash)
return await self._runtime_info_fetcher.execute(block_hash)

get_block_runtime_version = get_block_runtime_info

Expand All @@ -1929,9 +1942,8 @@ async def _get_block_runtime_info(self, block_hash: str) -> dict:
response = await self.rpc_request("state_getRuntimeVersion", [block_hash])
return response.get("result")

@a.lru_cache(maxsize=512)
async def get_block_runtime_version_for(self, block_hash: str):
return await self._get_block_runtime_version_for(block_hash)
return await self._runtime_version_for_fetcher.execute(block_hash)

async def _get_block_runtime_version_for(self, block_hash: str):
"""
Expand Down Expand Up @@ -2137,6 +2149,7 @@ async def _make_rpc_request(
storage_item,
result_handler,
)

request_manager.add_response(
item_id, decoded_response, complete
)
Expand All @@ -2149,14 +2162,14 @@ async def _make_rpc_request(
and current_time - self.ws.last_sent >= self.retry_timeout
):
if attempt >= self.max_retries:
logger.warning(
logger.error(
f"Timed out waiting for RPC requests {attempt} times. Exiting."
)
raise MaxRetriesExceeded("Max retries reached.")
else:
self.ws.last_received = time.time()
await self.ws.connect(force=True)
logger.error(
logger.warning(
f"Timed out waiting for RPC requests. "
f"Retrying attempt {attempt + 1} of {self.max_retries}"
)
Expand Down Expand Up @@ -2223,9 +2236,8 @@ async def rpc_request(
]
result = await self._make_rpc_request(payloads, result_handler=result_handler)
if "error" in result[payload_id][0]:
if (
"Failed to get runtime version"
in result[payload_id][0]["error"]["message"]
if "Failed to get runtime version" in (
err_msg := result[payload_id][0]["error"]["message"]
):
logger.warning(
"Failed to get runtime. Re-fetching from chain, and retrying."
Expand All @@ -2234,15 +2246,21 @@ async def rpc_request(
return await self.rpc_request(
method, params, result_handler, block_hash, reuse_block_hash
)
raise SubstrateRequestException(result[payload_id][0]["error"]["message"])
elif (
"Client error: Api called for an unknown Block: State already discarded"
in err_msg
):
bh = err_msg.split("State already discarded for ")[1].strip()
raise StateDiscardedError(bh)
else:
raise SubstrateRequestException(err_msg)
if "result" in result[payload_id][0]:
return result[payload_id][0]
else:
raise SubstrateRequestException(result[payload_id][0])

@a.lru_cache(maxsize=512)
async def get_block_hash(self, block_id: int) -> str:
return await self._get_block_hash(block_id)
return await self._block_hash_fetcher.execute(block_id)

async def _get_block_hash(self, block_id: int) -> str:
return (await self.rpc_request("chain_getBlockHash", [block_id]))["result"]
Expand Down
10 changes: 10 additions & 0 deletions async_substrate_interface/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ def __init__(self):
super().__init__(message)


class StateDiscardedError(SubstrateRequestException):
def __init__(self, block_hash: str):
self.block_hash = block_hash
message = (
f"State discarded for {block_hash}. This indicates the block is too old, and you should instead "
f"make this request using an archive node."
)
super().__init__(message)


class StorageFunctionNotFound(ValueError):
pass

Expand Down
Loading
Loading