Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ json-stream = "^2.3.2"
oz-merkle-tree = { git = "https://github.com/lidofinance/oz-merkle-tree" }
py-multiformats-cid = "^0.4.4"
conventional-pre-commit = "^4.0.0"
deepdiff = "^8.5.0"

[tool.poetry.group.dev.dependencies]
base58 = "^2.1.1"
Expand Down
225 changes: 180 additions & 45 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,52 @@
import argparse
import re
import sys
from typing import Iterator, cast
from dataclasses import asdict
from typing import Any, Dict, Iterator, Optional, cast

from deepdiff import DeepDiff
from hexbytes import HexBytes
from packaging.version import Version
from prometheus_client import start_http_server

from src import constants
from src import variables
from src import constants, variables
from src.metrics.healthcheck_server import start_pulse_server
from src.metrics.logging import logging
from src.metrics.prometheus.basic import ENV_VARIABLES_INFO, BUILD_INFO
from src.metrics.prometheus.basic import BUILD_INFO, ENV_VARIABLES_INFO
from src.modules.accounting.accounting import Accounting
from src.modules.checks.checks_module import ChecksModule
from src.modules.checks.checks_module import execute_checks
from src.modules.csm.csm import CSOracle
from src.modules.ejector.ejector import Ejector
from src.providers.ipfs import GW3, IPFSProvider, Kubo, MultiIPFSProvider, Pinata, PublicIPFS
from src.types import OracleModule
from src.providers.execution.contracts.base_oracle import BaseOracleContract
from src.providers.ipfs import (
GW3,
IPFSProvider,
Kubo,
MultiIPFSProvider,
Pinata,
PublicIPFS,
)
from src.types import BlockRoot, OracleModule, SlotNumber
from src.utils.blockstamp import build_blockstamp
from src.utils.build import get_build_info
from src.utils.exception import IncompatibleException
from src.web3py.contract_tweak import tweak_w3_contracts
from src.web3py.extensions import (
LidoContracts,
TransactionUtils,
ConsensusClientModule,
KeysAPIClientModule,
LidoValidatorsProvider,
FallbackProviderModule,
KeysAPIClientModule,
LazyCSM,
LidoContracts,
LidoValidatorsProvider,
TransactionUtils,
)
from src.web3py.middleware import add_requests_metric_middleware
from src.web3py.types import Web3

logger = logging.getLogger(__name__)


def main(module_name: OracleModule):
build_info = get_build_info()
logger.info({
'msg': 'Oracle startup.',
'variables': {
**build_info,
'module': module_name,
**variables.PUBLIC_ENV_VARS,
},
})
ENV_VARIABLES_INFO.info(variables.PUBLIC_ENV_VARS)
BUILD_INFO.info(build_info)

logger.info({'msg': f'Start healthcheck server for Docker container on port {variables.HEALTHCHECK_SERVER_PORT}'})
start_pulse_server()

logger.info({'msg': f'Start http server with prometheus metrics on port {variables.PROMETHEUS_PORT}'})
start_http_server(variables.PROMETHEUS_PORT)

def _construct_web3() -> Web3:
logger.info({'msg': 'Initialize multi web3 provider.'})
web3 = Web3(FallbackProviderModule(
variables.EXECUTION_CLIENT_URI,
Expand Down Expand Up @@ -92,34 +87,129 @@ def main(module_name: OracleModule):

logger.info({'msg': 'Add metrics middleware for ETH1 requests.'})
add_requests_metric_middleware(web3)
return web3

logger.info({'msg': 'Sanity checks.'})

def _construct_module(web3: Web3, module_name: OracleModule, refslot: Optional[int] = None) -> Accounting | Ejector | CSOracle:
instance: Accounting | Ejector | CSOracle
if module_name == OracleModule.ACCOUNTING:
logger.info({'msg': 'Initialize Accounting module.'})
instance = Accounting(web3)
instance = Accounting(web3, refslot)
elif module_name == OracleModule.EJECTOR:
logger.info({'msg': 'Initialize Ejector module.'})
instance = Ejector(web3)
instance = Ejector(web3, refslot)
elif module_name == OracleModule.CSM:
logger.info({'msg': 'Initialize CSM performance oracle module.'})
instance = CSOracle(web3)
instance = CSOracle(web3, refslot)
else:
raise ValueError(f'Unexpected arg: {module_name=}.')

logger.info({'msg': 'Sanity checks.'})
instance.check_contract_configs()
return instance


def main(module_name: OracleModule):
build_info = get_build_info()
logger.info({
'msg': 'Oracle startup.',
'variables': {
**build_info,
'module': module_name,
**variables.PUBLIC_ENV_VARS,
},
})
ENV_VARIABLES_INFO.info(variables.PUBLIC_ENV_VARS)
BUILD_INFO.info(build_info)
start_pulse_server()

logger.info({'msg': f'Start http server with prometheus metrics on port {variables.PROMETHEUS_PORT}'})
start_http_server(variables.PROMETHEUS_PORT)
web3 = _construct_web3()
instance: Accounting | Ejector | CSOracle = _construct_module(web3, module_name)
if variables.DAEMON:
instance.run_as_daemon()
else:
instance.cycle_handler()


def check():
logger.info({'msg': 'Check oracle is ready to work in the current environment.'})
def get_transactions(w3, contract: BaseOracleContract, limit: int, block_offset: int):
event_abi = "ProcessingStarted(uint256,bytes32)"
event_topic = Web3.to_hex(primitive=Web3.keccak(text=event_abi))

def get_processing_started_logs(from_block: int, to_block: int):
return w3.eth.get_logs({
"fromBlock": from_block,
"toBlock": to_block,
"address": contract.address,
"topics": [event_topic],
})

latest = w3.eth.block_number
logs = get_processing_started_logs(from_block=latest - block_offset, to_block=latest)
tx_hashes = [log['transactionHash'].hex() for log in logs]
print(f"Found {len(tx_hashes)} submitReportData transactions in latest {block_offset} blocks.")

txs = []
for tx_hash in tx_hashes[:limit]:
tx = w3.eth.get_transaction(tx_hash)
_, params = contract.decode_function_input(tx['input'])
data = {
k: HexBytes(v.hex()) if isinstance(v, (bytes, bytearray)) else v
for k, v in params['data'].items()
}
txs.append(data)

print(f"Will process {len(txs)} transactions")
return txs


def run_on_refslot(module_name: OracleModule, limit, block_offset):
logging.getLogger().setLevel(logging.WARNING)
w3 = _construct_web3()
instance: Accounting | Ejector | CSOracle = _construct_module(w3, module_name, True)
instance.check_contract_configs()

return ChecksModule().execute_module()
txs = get_transactions(w3, instance.report_contract, limit, block_offset)
if not txs:
logger.error("No submitReportData transactions found!")
sys.exit(0)

_camel_to_snake_pattern = re.compile(r'(.)([A-Z][a-z]+)')
_camel_to_snake_pattern2 = re.compile(r'([a-z0-9])([A-Z])')

def camel_to_snake(name: str) -> str:
s1 = _camel_to_snake_pattern.sub(r'\1_\2', name)
return _camel_to_snake_pattern2.sub(r'\1_\2', s1).lower()

def normalize_keys(d: Dict[str, Any]) -> Dict[str, Any]:
return {camel_to_snake(k): v for k, v in d.items()}

for tx in txs:
refslot = int(tx['refSlot'])
print("Input data:", tx)
block_root = BlockRoot(w3.cc.get_block_root(SlotNumber(refslot + 3 * 32)).root)
block_details = w3.cc.get_block_details(block_root)
bs = build_blockstamp(block_details)
instance.refslot = refslot
instance.refresh_contracts_if_address_change()
report_blockstamp = instance.get_blockstamp_for_report(bs)
report = instance.build_report(report_blockstamp)

normalized_input = normalize_keys(tx)
report_dict = asdict(report)
report_dict = {
k: HexBytes(v.hex()) if isinstance(v, (bytes, bytearray)) else v
for k, v in report_dict.items()
}
print("Output data:", report_dict)
diff = DeepDiff(normalized_input, report_dict, ignore_order=True)
if diff:
print("🚨 Differences found:")
print(diff)
else:
print("✅ All fields match!")
instance = _construct_module(w3, module_name, refslot)


def check_providers_chain_ids(web3: Web3, cc: ConsensusClientModule, kac: KeysAPIClientModule):
Expand Down Expand Up @@ -163,19 +253,64 @@ def ipfs_providers() -> Iterator[IPFSProvider]:
yield PublicIPFS(timeout=variables.HTTP_REQUEST_TIMEOUT_IPFS)


def parse_args():
"""
Parse command-line arguments using argparse.
The 'module' argument is restricted to valid OracleModule values.
"""
valid_modules = [str(item) for item in OracleModule]

parser = argparse.ArgumentParser(description="Run the Oracle module process.")
subparsers = parser.add_subparsers(dest="module", required=True, help=f"Module to run. One of: {valid_modules}")
check_parser = subparsers.add_parser("check", help="Run the check module.")
check_parser.add_argument(
"--name",
"-n",
type=str,
default=None,
help="Module name to check for a refslot execution."
)
check_parser.add_argument(
"--limit",
type=int,
default=10,
help="Maximum number of items to process."
)
check_parser.add_argument(
"--offset",
type=int,
default=100_000,
help="Starting index offset for processing."
)
for mod in OracleModule:
if mod == OracleModule.CHECK:
continue
subparsers.add_parser(
mod.value,
help=f"Run the {mod.value} module."
)

return parser.parse_args()


if __name__ == '__main__':
module_name_arg = sys.argv[-1]
if module_name_arg not in OracleModule:
msg = f'Last arg should be one of {[str(item) for item in OracleModule]}, received {module_name_arg}.'
args = parse_args()
if args.module not in OracleModule:
msg = f'Last arg should be one of {[str(item) for item in OracleModule]}, received {args.module}.'
logger.error({'msg': msg})
raise ValueError(msg)

module = OracleModule(module_name_arg)
module = OracleModule(args.module)
if module is OracleModule.CHECK:
errors = variables.check_uri_required_variables()
variables.raise_from_errors(errors)

sys.exit(check())
if args.name is None:
errors = variables.check_uri_required_variables()
variables.raise_from_errors(errors)
sys.exit(execute_checks())
else:
errors = variables.check_all_required_variables(module)
variables.raise_from_errors(errors)
run_on_refslot(args.name, args.limit, args.offset)
sys.exit(0)

errors = variables.check_all_required_variables(module)
variables.raise_from_errors(errors)
Expand Down
2 changes: 1 addition & 1 deletion src/metrics/healthcheck_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from src import variables
from src.variables import MAX_CYCLE_LIFETIME_IN_SECONDS


_last_pulse = datetime.now()
logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -50,6 +49,7 @@ def start_pulse_server():
If bot didn't call pulse for a while (5 minutes but should be changed individually)
Docker healthcheck fails to do request
"""
logger.info({'msg': f'Start healthcheck server for Docker container on port {variables.HEALTHCHECK_SERVER_PORT}'})
server = HTTPServer(('localhost', variables.HEALTHCHECK_SERVER_PORT), RequestHandlerClass=PulseRequestHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
Loading