Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 41 additions & 39 deletions README.md

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,12 @@ def ipfs_providers() -> Iterator[IPFSProvider]:
timeout=variables.HTTP_REQUEST_TIMEOUT_IPFS,
)

if variables.PINATA_JWT:
if variables.PINATA_JWT and variables.PINATA_DEDICATED_GATEWAY_URL and variables.PINATA_DEDICATED_GATEWAY_TOKEN:
yield Pinata(
variables.PINATA_JWT,
timeout=variables.HTTP_REQUEST_TIMEOUT_IPFS,
dedicated_gateway_url=variables.PINATA_DEDICATED_GATEWAY_URL,
dedicated_gateway_token=variables.PINATA_DEDICATED_GATEWAY_TOKEN,
)

yield PublicIPFS(timeout=variables.HTTP_REQUEST_TIMEOUT_IPFS)
Expand Down
43 changes: 38 additions & 5 deletions src/providers/ipfs/pinata.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import logging
from json import JSONDecodeError
from urllib.parse import urljoin

import requests
from requests.adapters import HTTPAdapter
from urllib3 import Retry

from src.utils.jwt import validate_jwt

Expand All @@ -15,17 +18,48 @@ class Pinata(IPFSProvider):
"""pinata.cloud IPFS provider"""

API_ENDPOINT = "https://api.pinata.cloud"
GATEWAY = "https://gateway.pinata.cloud"
PUBLIC_GATEWAY = "https://gateway.pinata.cloud"
MAX_DEDICATED_GATEWAY_RETRIES = 1

def __init__(self, jwt_token: str, *, timeout: int) -> None:
def __init__(self, jwt_token: str, *, timeout: int, dedicated_gateway_url: str, dedicated_gateway_token: str) -> None:
super().__init__()
validate_jwt(jwt_token)
self.timeout = timeout

self.session = requests.Session()
self.session.headers["Authorization"] = f"Bearer {jwt_token}"

dedicated_adapter = HTTPAdapter(max_retries=Retry(
total=self.MAX_DEDICATED_GATEWAY_RETRIES,
status_forcelist=list(range(400, 600)),
backoff_factor=3.0,
))
self.dedicated_session = requests.Session()
self.dedicated_session.headers["x-pinata-gateway-token"] = dedicated_gateway_token
self.dedicated_session.mount("https://", dedicated_adapter)
self.dedicated_session.mount("http://", dedicated_adapter)

self.dedicated_gateway_url = dedicated_gateway_url
self.dedicated_gateway_token = dedicated_gateway_token

def fetch(self, cid: CID) -> bytes:
url = f"{self.GATEWAY}/ipfs/{cid}"
try:
return self._fetch_from_dedicated_gateway(cid)
except requests.RequestException as ex:
logger.warning({
"msg": "Dedicated gateway failed after retries, trying public gateway",
"error": str(ex)
})
return self._fetch_from_public_gateway(cid)

def _fetch_from_dedicated_gateway(self, cid: CID) -> bytes:
url = urljoin(self.dedicated_gateway_url, f"/ipfs/{cid}")
resp = self.dedicated_session.get(url, timeout=self.timeout)
resp.raise_for_status()
return resp.content

def _fetch_from_public_gateway(self, cid: CID) -> bytes:
url = urljoin(self.PUBLIC_GATEWAY, f'/ipfs/{cid}')
try:
resp = requests.get(url, timeout=self.timeout)
resp.raise_for_status()
Expand All @@ -40,8 +74,7 @@ def publish(self, content: bytes, name: str | None = None) -> CID:

def _upload(self, content: bytes, name: str | None = None) -> str:
"""Pinata has no dedicated endpoint for uploading, so pinFileToIPFS is used"""

url = f"{self.API_ENDPOINT}/pinning/pinFileToIPFS"
url = urljoin(self.API_ENDPOINT, '/pinning/pinFileToIPFS')
try:
with self.session as s:
resp = s.post(url, files={"file": content})
Expand Down
3 changes: 3 additions & 0 deletions src/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
KEYS_API_URI: Final = os.getenv('KEYS_API_URI', '').split(',')

PINATA_JWT: Final = from_file_or_env('PINATA_JWT')
PINATA_DEDICATED_GATEWAY_URL: Final = os.getenv('PINATA_DEDICATED_GATEWAY_URL')
PINATA_DEDICATED_GATEWAY_TOKEN: Final = from_file_or_env('PINATA_DEDICATED_GATEWAY_TOKEN')
KUBO_HOST: Final = os.getenv('KUBO_HOST')
KUBO_GATEWAY_PORT: Final = int(os.getenv('KUBO_GATEWAY_PORT', 8080))
KUBO_RPC_PORT: Final = int(os.getenv('KUBO_RPC_PORT', 5001))
Expand Down Expand Up @@ -157,6 +159,7 @@ def raise_from_errors(errors):
'CONSENSUS_CLIENT_URI': CONSENSUS_CLIENT_URI,
'KEYS_API_URI': KEYS_API_URI,
'PINATA_JWT': PINATA_JWT,
'PINATA_DEDICATED_GATEWAY_TOKEN': PINATA_DEDICATED_GATEWAY_TOKEN,
'MEMBER_PRIV_KEY': MEMBER_PRIV_KEY,
'OPSGENIE_API_KEY': OPSGENIE_API_KEY,
'OPSGENIE_API_URL': OPSGENIE_API_URL,
Expand Down
8 changes: 4 additions & 4 deletions tests/integration/contracts/test_cs_parameters_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ def test_cs_parameters_registry(cs_params_contract, caplog):
check_contract(
cs_params_contract,
[
("get_performance_coefficients", None, check_is_instance_of(PerformanceCoefficients)),
("get_reward_share_data", None, check_is_instance_of(KeyNumberValueIntervalList)),
("get_performance_leeway_data", None, check_is_instance_of(KeyNumberValueIntervalList)),
("get_strikes_params", None, check_is_instance_of(StrikesParams)),
("get_performance_coefficients", (0,), check_is_instance_of(PerformanceCoefficients)),
("get_reward_share_data", (0,), check_is_instance_of(KeyNumberValueIntervalList)),
("get_performance_leeway_data", (0,), check_is_instance_of(KeyNumberValueIntervalList)),
("get_strikes_params", (0,), check_is_instance_of(StrikesParams)),
],
caplog,
)
9 changes: 4 additions & 5 deletions tests/integration/contracts/test_lido.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@ def test_lido_contract_call(lido_contract, accounting_oracle_contract, burner_co
(
1746275159, # timestamp
86400,
389746,
9190764598468942000000000,
403105, # Updated to match current beacon_validators count
8462132592019028000000000, # Updated to match current beacon_balance
13771995248000000000,
478072602914417566,
0,
accounting_oracle_contract.address,
11620928,
# Call depends on contract state
'0xffa34bcc5a08c92272a62e591f7afb9cb839134aa08c091ae0c95682fba35da9',
11620928, # ref_slot
'0x9bad2cb4e0ef017912b8c77e9ce1c6ec52a6b79013fe8d0d099a65a51ee4a66e', # block_identifier
),
lambda response: check_value_type(response, LidoReportRebase),
),
Expand Down
98 changes: 98 additions & 0 deletions tests/providers/test_pinata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import pytest
import responses

from src.providers.ipfs.pinata import Pinata
from src.providers.ipfs.types import FetchError


@pytest.fixture
def pinata_provider():
return Pinata(
jwt_token="eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyX2lkIjoxLCJ1c2VybmFtZSI6InRlc3QiLCJleHAiOjk5OTk5OTk5OTl9.Ps6jFKniFhNMYr_4WgETZP_LcXEfSzg3yUhNBn6Xgok",
timeout=30,
dedicated_gateway_url="https://dedicated.gateway.com",
dedicated_gateway_token="dedicated_token_123",
)


@pytest.mark.unit
@responses.activate
def test_fetch__dedicated_gateway_available__returns_content_from_dedicated(pinata_provider):
responses.add(responses.GET, "https://dedicated.gateway.com/ipfs/QmTest123", body=b'test content', status=200)

result = pinata_provider.fetch("QmTest123")

assert result == b'test content'
assert len(responses.calls) == 1
request_headers = responses.calls[0].request.headers
assert request_headers.get("x-pinata-gateway-token") == "dedicated_token_123"


@pytest.mark.unit
@responses.activate
def test_fetch__dedicated_gateway_fails_max_attempts__falls_back_to_public(pinata_provider):
responses.add(
responses.GET, "https://dedicated.gateway.com/ipfs/QmTest123", json={"error": "Gateway error"}, status=500
)
responses.add(
responses.GET, "https://dedicated.gateway.com/ipfs/QmTest123", json={"error": "Gateway error"}, status=500
)
responses.add(responses.GET, "https://gateway.pinata.cloud/ipfs/QmTest123", body=b'public content', status=200)

result = pinata_provider.fetch("QmTest123")

assert result == b'public content'
assert len(responses.calls) == 3
assert responses.calls[0].request.headers.get("x-pinata-gateway-token") == "dedicated_token_123"
assert responses.calls[1].request.headers.get("x-pinata-gateway-token") == "dedicated_token_123"
assert "x-pinata-gateway-token" not in responses.calls[2].request.headers


@pytest.mark.unit
@responses.activate
def test_fetch__dedicated_gateway_fails_once__retries_and_succeeds(pinata_provider):
responses.add(
responses.GET, "https://dedicated.gateway.com/ipfs/QmTest123", json={"error": "First failure"}, status=500
)
responses.add(responses.GET, "https://dedicated.gateway.com/ipfs/QmTest123", body=b'dedicated success', status=200)

result = pinata_provider.fetch("QmTest123")

assert result == b'dedicated success'
assert len(responses.calls) == 2


@pytest.mark.unit
@responses.activate
def test_fetch__both_gateways_fail__raises_fetch_error(pinata_provider):
responses.add(
responses.GET, "https://dedicated.gateway.com/ipfs/QmTest123", json={"error": "Dedicated error"}, status=500
)
responses.add(
responses.GET, "https://dedicated.gateway.com/ipfs/QmTest123", json={"error": "Dedicated error"}, status=500
)
responses.add(
responses.GET, "https://gateway.pinata.cloud/ipfs/QmTest123", json={"error": "Public error"}, status=500
)

with pytest.raises(FetchError):
pinata_provider.fetch("QmTest123")

assert len(responses.calls) == 3


@pytest.mark.unit
@responses.activate
def test_fetch__dedicated_gateway_429_rate_limit__retries_and_falls_back_to_public(pinata_provider):
responses.add(
responses.GET, "https://dedicated.gateway.com/ipfs/QmTest123", json={"error": "Rate limit exceeded"}, status=429
)
responses.add(
responses.GET, "https://dedicated.gateway.com/ipfs/QmTest123", json={"error": "Rate limit exceeded"}, status=429
)
responses.add(responses.GET, "https://gateway.pinata.cloud/ipfs/QmTest123", body=b'public content', status=200)

result = pinata_provider.fetch("QmTest123")

assert result == b'public content'
assert len(responses.calls) == 3