Skip to content

Commit f6942ae

Browse files
committed
Fix: If the official API official.aleph.cloud is not available use another CCN URL to try to connect it.
Also use another IPv6 URL to check if IPv6 is running.
1 parent 105c7f0 commit f6942ae

File tree

1 file changed

+158
-67
lines changed

1 file changed

+158
-67
lines changed

examples/example_fastapi/main.py

Lines changed: 158 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from starlette.responses import JSONResponse
2727

2828
from aleph.sdk.chains.remote import RemoteAccount
29+
from aleph.sdk.chains.ethereum import ETHAccount
2930
from aleph.sdk.client import AlephHttpClient, AuthenticatedAlephHttpClient
3031
from aleph.sdk.query.filters import MessageFilter
3132
from aleph.sdk.types import StorageEnum
@@ -48,6 +49,11 @@
4849

4950
startup_lifespan_executed: bool = False
5051

52+
ALEPH_API_HOSTS: list[str] = [
53+
"https://official.aleph.cloud",
54+
"https://api.aleph.im",
55+
]
56+
5157

5258
@app.on_event("startup")
5359
async def startup_event() -> None:
@@ -180,22 +186,30 @@ async def connect_ipv6():
180186
"""Connect to the Quad9 VPN provider using their IPv6 address.
181187
The webserver on that address returns a 404 error, so we accept that response code.
182188
"""
183-
ipv6_host = "https://[2620:fe::fe]"
184-
timeout = aiohttp.ClientTimeout(total=5)
185-
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(), timeout=timeout) as session:
186-
try:
187-
async with session.get(ipv6_host) as resp:
188-
# We expect this endpoint to return a 404 error
189-
if resp.status != 404:
190-
resp.raise_for_status()
191-
return {"result": True, "headers": resp.headers}
192-
except TimeoutError:
193-
logger.warning(f"Session connection to host {ipv6_host} timed out")
194-
return {"result": False, "reason": "Timeout"}
195-
except aiohttp.ClientConnectionError as error:
196-
logger.warning(f"Client connection to host {ipv6_host} failed: {error}")
197-
# Get a string that describes the error
198-
return {"result": False, "reason": str(error.args[0])}
189+
ipv6_hosts: list[str] = [
190+
"https://[2620:fe::fe]", # Quad9 DNS service
191+
"https://[2606:4700:4700::1111]", # CloudFlare DNS service
192+
]
193+
timeout_seconds = 5
194+
195+
# Create a list of tasks to check the URLs in parallel
196+
tasks: set[asyncio.Task] = {asyncio.create_task(check_url(host, timeout_seconds, socket_family=socket.AF_INET6)) for host in ipv6_hosts}
197+
198+
# While no tasks have completed, keep waiting for the next one to finish
199+
while tasks:
200+
done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
201+
result = done.pop().result()
202+
203+
if result["result"]:
204+
# The task was successful, cancel the remaining tasks and return the result
205+
for task in tasks:
206+
task.cancel()
207+
return result
208+
else:
209+
continue
210+
211+
# No IPv6 URL was reachable
212+
return {"result": False}
199213

200214

201215
async def check_url(internet_host: str, timeout_seconds: int = 5, socket_family=socket.AF_INET):
@@ -246,7 +260,28 @@ async def read_internet():
246260
async def get_a_message():
247261
"""Get a message from the Aleph.im network"""
248262
item_hash = "3fc0aa9569da840c43e7bd2033c3c580abb46b007527d6d20f2d4e98e867f7af"
249-
async with AlephHttpClient() as client:
263+
# Create a list of tasks to check the URLs in parallel
264+
tasks: set[asyncio.Task] = {asyncio.create_task(get_aleph_message(host, item_hash)) for host in ALEPH_API_HOSTS}
265+
266+
# While no tasks have completed, keep waiting for the next one to finish
267+
while tasks:
268+
done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
269+
result = done.pop().result()
270+
271+
if result["status"]:
272+
# The task was successful, cancel the remaining tasks and return the result
273+
for task in tasks:
274+
task.cancel()
275+
return result
276+
else:
277+
continue
278+
279+
# No API Host was reachable
280+
return {"result": False}
281+
282+
283+
async def get_aleph_message(api_host: str, item_hash: str):
284+
async with AlephHttpClient(api_server=api_host) as client:
250285
message = await client.get_message(
251286
item_hash=item_hash,
252287
message_type=ProgramMessage,
@@ -257,36 +292,31 @@ async def get_a_message():
257292
@app.post("/post_a_message")
258293
async def post_with_remote_account():
259294
"""Post a message on the Aleph.im network using the remote account of the host."""
260-
try:
261-
account = await RemoteAccount.from_crypto_host(host="http://localhost", unix_socket="/tmp/socat-socket")
262-
263-
content = {
264-
"date": datetime.now(tz=timezone.utc).isoformat(),
265-
"test": True,
266-
"answer": 42,
267-
"something": "interesting",
268-
}
269-
async with AuthenticatedAlephHttpClient(
270-
account=account,
271-
) as client:
272-
message: PostMessage
273-
status: MessageStatus
274-
message, status = await client.create_post(
275-
post_content=content,
276-
post_type="test",
277-
ref=None,
278-
channel="TEST",
279-
inline=True,
280-
storage_engine=StorageEnum.storage,
281-
sync=True,
282-
)
283-
if status != MessageStatus.PROCESSED:
284-
return JSONResponse(status_code=500, content={"error": status})
285-
return {
286-
"message": message,
287-
}
288-
except aiohttp.client_exceptions.UnixClientConnectorError:
289-
return JSONResponse(status_code=500, content={"error": "Could not connect to the remote account"})
295+
account = await RemoteAccount.from_crypto_host(host="http://localhost", unix_socket="/tmp/socat-socket")
296+
297+
# Create a list of tasks to check the URLs in parallel
298+
tasks: set[asyncio.Task] = {asyncio.create_task(send_post_aleph_message(host, account)) for host in ALEPH_API_HOSTS}
299+
300+
# While no tasks have completed, keep waiting for the next one to finish
301+
while tasks:
302+
try:
303+
done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
304+
message, status = done.pop().result()
305+
306+
if status == MessageStatus.PROCESSED:
307+
# The task was successful, cancel the remaining tasks and return the result
308+
for task in tasks:
309+
task.cancel()
310+
return {
311+
"message": message,
312+
}
313+
else:
314+
continue
315+
except aiohttp.client_exceptions.UnixClientConnectorError:
316+
return JSONResponse(status_code=500, content={"error": "Could not connect to the remote account"})
317+
318+
# No API Host was reachable
319+
return JSONResponse(status_code=500, content={"error": status})
290320

291321

292322
@app.post("/post_a_message_local_account")
@@ -296,6 +326,30 @@ async def post_with_local_account():
296326

297327
account = get_fallback_account()
298328

329+
# Create a list of tasks to check the URLs in parallel
330+
tasks: set[asyncio.Task] = {asyncio.create_task(send_post_aleph_message(host, account)) for host in ALEPH_API_HOSTS}
331+
332+
# While no tasks have completed, keep waiting for the next one to finish
333+
while tasks:
334+
done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
335+
message, status = done.pop().result()
336+
337+
if status == MessageStatus.PROCESSED:
338+
# The task was successful, cancel the remaining tasks and return the result
339+
for task in tasks:
340+
task.cancel()
341+
return {
342+
"message": message,
343+
}
344+
else:
345+
continue
346+
347+
# No API Host was reachable
348+
return JSONResponse(status_code=500, content={"error": status})
349+
350+
351+
async def send_post_aleph_message(api_host: str, account: RemoteAccount | ETHAccount):
352+
""" Post a message on the Aleph.im network using a local or the remote account of the host. """
299353
content = {
300354
"date": datetime.now(tz=timezone.utc).isoformat(),
301355
"test": True,
@@ -304,12 +358,11 @@ async def post_with_local_account():
304358
}
305359
async with AuthenticatedAlephHttpClient(
306360
account=account,
307-
api_server="https://api2.aleph.im",
308-
allow_unix_sockets=False,
361+
api_server=api_host,
309362
) as client:
310363
message: PostMessage
311364
status: MessageStatus
312-
message, status = await client.create_post(
365+
return await client.create_post(
313366
post_content=content,
314367
post_type="test",
315368
ref=None,
@@ -318,11 +371,6 @@ async def post_with_local_account():
318371
storage_engine=StorageEnum.storage,
319372
sync=True,
320373
)
321-
if status != MessageStatus.PROCESSED:
322-
return JSONResponse(status_code=500, content={"error": status})
323-
return {
324-
"message": message,
325-
}
326374

327375

328376
@app.post("/post_a_file")
@@ -331,23 +379,44 @@ async def post_a_file():
331379

332380
account = get_fallback_account()
333381
file_path = Path(__file__).absolute()
382+
383+
# Create a list of tasks to check the URLs in parallel
384+
tasks: set[asyncio.Task] = {asyncio.create_task(send_store_aleph_message(host, account, file_path)) for host in ALEPH_API_HOSTS}
385+
386+
# While no tasks have completed, keep waiting for the next one to finish
387+
while tasks:
388+
done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
389+
message, status = done.pop().result()
390+
391+
if status == MessageStatus.PROCESSED:
392+
# The task was successful, cancel the remaining tasks and return the result
393+
for task in tasks:
394+
task.cancel()
395+
return {
396+
"message": message,
397+
}
398+
else:
399+
continue
400+
401+
# No API Host was reachable
402+
return JSONResponse(status_code=500, content={"error": status})
403+
404+
405+
async def send_store_aleph_message(api_host: str, account: ETHAccount, file_path: Path):
406+
""" Store a file on the Aleph.im network using a local account. """
334407
async with AuthenticatedAlephHttpClient(
335408
account=account,
409+
api_server=api_host,
336410
) as client:
337411
message: StoreMessage
338412
status: MessageStatus
339-
message, status = await client.create_store(
413+
return await client.create_store(
340414
file_path=file_path,
341415
ref=None,
342416
channel="TEST",
343417
storage_engine=StorageEnum.storage,
344418
sync=True,
345419
)
346-
if status != MessageStatus.PROCESSED:
347-
return JSONResponse(status_code=500, content={"error": status})
348-
return {
349-
"message": message,
350-
}
351420

352421

353422
@app.get("/sign_a_message")
@@ -453,9 +522,31 @@ def platform_pip_freeze() -> list[str]:
453522

454523
@app.event(filters=filters)
455524
async def aleph_event(event) -> dict[str, str]:
456-
print("aleph_event", event)
457-
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector()) as session:
458-
async with session.get("https://official.aleph.cloud/api/v0/info/public.json") as resp:
459-
print("RESP", resp)
460-
resp.raise_for_status()
461-
return {"result": "Good"}
525+
526+
# Create a list of tasks to check the URLs in parallel
527+
tasks: set[asyncio.Task] = {asyncio.create_task(get_aleph_json(host)) for host in ALEPH_API_HOSTS}
528+
529+
# While no tasks have completed, keep waiting for the next one to finish
530+
while tasks:
531+
done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
532+
status = done.pop().result()
533+
534+
if status:
535+
# The task was successful, cancel the remaining tasks and return the result
536+
for task in tasks:
537+
task.cancel()
538+
return {"result": "Good"}
539+
else:
540+
continue
541+
542+
return {"result": "Bad"}
543+
544+
545+
async def get_aleph_json(api_host: str) -> bool:
546+
try:
547+
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector()) as session:
548+
async with session.get(f"{api_host}/api/v0/info/public.json") as resp:
549+
resp.raise_for_status()
550+
return True
551+
except Exception:
552+
return False

0 commit comments

Comments
 (0)