Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 124 additions & 37 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import argparse
import os
import sys
from typing import Iterator, cast
from typing import Iterator, cast, Optional

import requests
from hexbytes import HexBytes
from packaging.version import Version
from prometheus_client import start_http_server

Expand All @@ -10,11 +14,12 @@
from src.metrics.logging import logging
from src.metrics.prometheus.basic import ENV_VARIABLES_INFO, BUILD_INFO
from src.modules.accounting.accounting import Accounting
from src.modules.checks.checks_module import ChecksModule
from src.modules.checks.checks_module import execute_checks
from src.modules.csm.csm import CSOracle
from src.modules.ejector.ejector import Ejector
from src.providers.ipfs import GW3, IPFSProvider, Kubo, MultiIPFSProvider, Pinata, PublicIPFS
from src.types import OracleModule
from src.types import OracleModule, BlockRoot, SlotNumber
from src.utils.blockstamp import build_blockstamp
from src.utils.build import get_build_info
from src.utils.exception import IncompatibleException
from src.web3py.contract_tweak import tweak_w3_contracts
Expand All @@ -33,25 +38,7 @@
logger = logging.getLogger(__name__)


def main(module_name: OracleModule):
build_info = get_build_info()
logger.info({
'msg': 'Oracle startup.',
'variables': {
**build_info,
'module': module_name,
**variables.PUBLIC_ENV_VARS,
},
})
ENV_VARIABLES_INFO.info(variables.PUBLIC_ENV_VARS)
BUILD_INFO.info(build_info)

logger.info({'msg': f'Start healthcheck server for Docker container on port {variables.HEALTHCHECK_SERVER_PORT}'})
start_pulse_server()

logger.info({'msg': f'Start http server with prometheus metrics on port {variables.PROMETHEUS_PORT}'})
start_http_server(variables.PROMETHEUS_PORT)

def _construct_web3() -> Web3:
logger.info({'msg': 'Initialize multi web3 provider.'})
web3 = Web3(FallbackProviderModule(
variables.EXECUTION_CLIENT_URI,
Expand Down Expand Up @@ -92,34 +79,104 @@ def main(module_name: OracleModule):

logger.info({'msg': 'Add metrics middleware for ETH1 requests.'})
add_requests_metric_middleware(web3)
return web3

logger.info({'msg': 'Sanity checks.'})

def _construct_module(web3: Web3, module_name: OracleModule, refslot: Optional[int] = None) -> Accounting | Ejector | CSOracle:
instance: Accounting | Ejector | CSOracle
if module_name == OracleModule.ACCOUNTING:
logger.info({'msg': 'Initialize Accounting module.'})
instance = Accounting(web3)
instance = Accounting(web3, refslot)
elif module_name == OracleModule.EJECTOR:
logger.info({'msg': 'Initialize Ejector module.'})
instance = Ejector(web3)
instance = Ejector(web3, refslot)
elif module_name == OracleModule.CSM:
logger.info({'msg': 'Initialize CSM performance oracle module.'})
instance = CSOracle(web3)
instance = CSOracle(web3, refslot)
else:
raise ValueError(f'Unexpected arg: {module_name=}.')

logger.info({'msg': 'Sanity checks.'})
instance.check_contract_configs()
return instance


def main(module_name: OracleModule):
build_info = get_build_info()
logger.info({
'msg': 'Oracle startup.',
'variables': {
**build_info,
'module': module_name,
**variables.PUBLIC_ENV_VARS,
},
})
ENV_VARIABLES_INFO.info(variables.PUBLIC_ENV_VARS)
BUILD_INFO.info(build_info)

logger.info({'msg': f'Start healthcheck server for Docker container on port {variables.HEALTHCHECK_SERVER_PORT}'})
start_pulse_server()

logger.info({'msg': f'Start http server with prometheus metrics on port {variables.PROMETHEUS_PORT}'})
start_http_server(variables.PROMETHEUS_PORT)
web3 = _construct_web3()
instance: Accounting | Ejector | CSOracle = _construct_module(web3, module_name)
if variables.DAEMON:
instance.run_as_daemon()
else:
instance.cycle_handler()


def check():
logger.info({'msg': 'Check oracle is ready to work in the current environment.'})
def get_transactions(contract_address, selector: str, limit: int = 10):
etherscan_key = os.getenv('ETHERSCAN_API_KEY')
if etherscan_key is None:
raise ValueError('ETHERSCAN_API_KEY is not set')
url = f"https://api.etherscan.io/api?module=account&action=txlist&address={contract_address}&sort=desc&apikey={etherscan_key}"

response = requests.get(url, timeout=30)
data = response.json()

if data["status"] != "1":
logger.error("Error fetching transactions: %s", data["message"])
return []

return ChecksModule().execute_module()
transactions = data["result"]
successful_transactions = [tx for tx in transactions if tx.get("txreceipt_status") == "1"]
return [tx for tx in successful_transactions if tx["input"].startswith(selector)][:limit]


def run_on_refslot(module_name: OracleModule):
logging.getLogger().setLevel(logging.WARNING)
w3 = _construct_web3()
instance: Accounting | Ejector | CSOracle = _construct_module(w3, module_name, True)
instance.check_contract_configs()
submit_report_fn = instance.report_contract.get_function_by_name("submitReportData")
selector = '0x' + w3.keccak(text=submit_report_fn.abi_element_identifier)[:4].hex()

txs = get_transactions(instance.report_contract.address, selector)
if not txs:
logger.error("No submitReportData transactions found!")
sys.exit(0)

logger.info("Found %d submitReportData calls", len(txs))

for tx in txs:
tx_hash = tx["hash"]
_, data = instance.report_contract.decode_function_input(tx["input"])
refslot = int(data['data']['refSlot'])
print(f"Tx: {tx_hash} → slot: {refslot}")
print([HexBytes(v.hex()) if isinstance(v, bytes) else v for v in data['data'].values()])
block_root = BlockRoot(w3.cc.get_block_root(SlotNumber(refslot + 3 * 32)).root)
block_details = w3.cc.get_block_details(block_root)
bs = build_blockstamp(block_details)
instance.refslot = refslot
instance.refresh_contracts_if_address_change()
report_blockstamp = instance.get_blockstamp_for_report(bs)
report = instance.build_report(report_blockstamp)


print(report)
instance = _construct_module(w3, module_name, refslot)


def check_providers_chain_ids(web3: Web3, cc: ConsensusClientModule, kac: KeysAPIClientModule):
Expand Down Expand Up @@ -163,19 +220,49 @@ def ipfs_providers() -> Iterator[IPFSProvider]:
yield PublicIPFS(timeout=variables.HTTP_REQUEST_TIMEOUT_IPFS)


def parse_args():
"""
Parse command-line arguments using argparse.
The 'module' argument is restricted to valid OracleModule values.
"""
valid_modules = [str(item) for item in OracleModule]

parser = argparse.ArgumentParser(description="Run the Oracle module process.")
subparsers = parser.add_subparsers(dest="module", required=True, help=f"Module to run. One of: {valid_modules}")
check_parser = subparsers.add_parser("check", help="Run the check module.")
check_parser.add_argument(
"--name",
"-n",
type=str,
default=None,
help="Module name to check for a refslot execution."
)
for mod in OracleModule:
if mod == OracleModule.CHECK:
continue
subparsers.add_parser(mod.value, help=f"Run the {mod.value} module.")

return parser.parse_args()


if __name__ == '__main__':
module_name_arg = sys.argv[-1]
if module_name_arg not in OracleModule:
msg = f'Last arg should be one of {[str(item) for item in OracleModule]}, received {module_name_arg}.'
args = parse_args()
if args.module not in OracleModule:
msg = f'Last arg should be one of {[str(item) for item in OracleModule]}, received {args.module}.'
logger.error({'msg': msg})
raise ValueError(msg)

module = OracleModule(module_name_arg)
module = OracleModule(args.module)
if module is OracleModule.CHECK:
errors = variables.check_uri_required_variables()
variables.raise_from_errors(errors)

sys.exit(check())
if args.name is None:
errors = variables.check_uri_required_variables()
variables.raise_from_errors(errors)
sys.exit(execute_checks())
else:
errors = variables.check_all_required_variables(module)
variables.raise_from_errors(errors)
run_on_refslot(args.name)
sys.exit(0)

errors = variables.check_all_required_variables(module)
variables.raise_from_errors(errors)
Expand Down
2 changes: 1 addition & 1 deletion src/metrics/healthcheck_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from src import variables
from src.variables import MAX_CYCLE_LIFETIME_IN_SECONDS


_last_pulse = datetime.now()
logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -50,6 +49,7 @@ def start_pulse_server():
If bot didn't call pulse for a while (5 minutes but should be changed individually)
Docker healthcheck fails to do request
"""
logger.info({'msg': f'Start healthcheck server for Docker container on port {variables.HEALTHCHECK_SERVER_PORT}'})
server = HTTPServer(('localhost', variables.HEALTHCHECK_SERVER_PORT), RequestHandlerClass=PulseRequestHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
28 changes: 8 additions & 20 deletions src/modules/accounting/accounting.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,29 @@
import logging
from collections import defaultdict
from time import sleep
from typing import Optional

from hexbytes import HexBytes
from web3.exceptions import ContractCustomError
from web3.types import Wei

from src import variables
from src.constants import SHARE_RATE_PRECISION_E27
from src.metrics.prometheus.accounting import (
ACCOUNTING_IS_BUNKER,
ACCOUNTING_CL_BALANCE_GWEI,
ACCOUNTING_EL_REWARDS_VAULT_BALANCE_WEI,
ACCOUNTING_WITHDRAWAL_VAULT_BALANCE_WEI
)
from src.metrics.prometheus.accounting import (ACCOUNTING_CL_BALANCE_GWEI, ACCOUNTING_EL_REWARDS_VAULT_BALANCE_WEI, ACCOUNTING_IS_BUNKER,
ACCOUNTING_WITHDRAWAL_VAULT_BALANCE_WEI)
from src.metrics.prometheus.duration_meter import duration_meter
from src.modules.accounting.third_phase.extra_data import ExtraDataService
from src.modules.accounting.third_phase.types import ExtraData, FormatList
from src.modules.accounting.types import (
ReportData,
LidoReportRebase,
GenericExtraData,
WqReport,
RebaseReport,
BunkerMode,
FinalizationShareRate,
ValidatorsCount,
ValidatorsBalance,
AccountingProcessingState,
)
from src.modules.accounting.types import (AccountingProcessingState, BunkerMode, FinalizationShareRate, GenericExtraData, LidoReportRebase,
RebaseReport, ReportData, ValidatorsBalance, ValidatorsCount, WqReport)
from src.modules.submodules.consensus import ConsensusModule, InitialEpochIsYetToArriveRevert
from src.modules.submodules.oracle_module import BaseModule, ModuleExecuteDelay
from src.modules.submodules.types import ZERO_HASH
from src.providers.execution.contracts.accounting_oracle import AccountingOracleContract
from src.services.bunker import BunkerService
from src.services.validator_state import LidoValidatorStateService
from src.services.withdrawal import Withdrawal
from src.types import BlockStamp, Gwei, ReferenceBlockStamp, StakingModuleId, NodeOperatorGlobalIndex, FinalizationBatches
from src.types import BlockStamp, FinalizationBatches, Gwei, NodeOperatorGlobalIndex, ReferenceBlockStamp, StakingModuleId
from src.utils.cache import global_lru_cache as lru_cache
from src.utils.units import gwei_to_wei
from src.variables import ALLOW_REPORTING_IN_BUNKER_MODE
Expand All @@ -60,8 +47,9 @@ class Accounting(BaseModule, ConsensusModule):
COMPATIBLE_CONTRACT_VERSION = 2
COMPATIBLE_CONSENSUS_VERSION = 3

def __init__(self, w3: Web3):
def __init__(self, w3: Web3, refslot: Optional[int] = None):
self.report_contract: AccountingOracleContract = w3.lido_contracts.accounting_oracle
self.refslot = refslot
super().__init__(w3)

self.lido_validator_state_service = LidoValidatorStateService(self.w3)
Expand Down
4 changes: 2 additions & 2 deletions src/modules/accounting/third_phase/extra_data.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from dataclasses import dataclass
from itertools import groupby, batched
from itertools import batched, groupby
from typing import Sequence

from src.modules.accounting.third_phase.types import ExtraData, ItemType, ExtraDataLengths, FormatList
from src.modules.accounting.third_phase.types import ExtraData, ExtraDataLengths, FormatList, ItemType
from src.modules.submodules.types import ZERO_HASH
from src.types import NodeOperatorGlobalIndex
from src.web3py.types import Web3
Expand Down
27 changes: 9 additions & 18 deletions src/modules/checks/checks_module.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,13 @@
import logging

import pytest

logger = logging.getLogger(__name__)

class ChecksModule:
"""
Module that executes all tests to figure out that environment is ready for Oracle.

Checks:
- Consensus Layer node
- Execution Layer node
- Keys API service
if LIDO_LOCATOR address provided
- Checks configs in Accounting module and Ejector module
if CSM_MODULE_ADDRESS provided
- Checks configs in CSM oracle module
- Checks with special blockstamp value (6300 slots in the past)
"""
def execute_module(self):
return pytest.main([
'src/modules/checks/suites',
'-c', 'src/modules/checks/pytest.ini',
])
def execute_checks():
logger.info({'msg': 'Check oracle is ready to work in the current environment.'})
return pytest.main([
'src/modules/checks/suites',
'-c', 'src/modules/checks/pytest.ini',
])
3 changes: 1 addition & 2 deletions src/modules/checks/suites/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

from src import variables
from src.types import EpochNumber, SlotNumber, BlockRoot
from src.utils.blockstamp import build_blockstamp
from src.utils.api import opsgenie_api
from src.utils.blockstamp import build_blockstamp
from src.utils.slot import get_reference_blockstamp
from src.web3py.contract_tweak import tweak_w3_contracts
from src.web3py.extensions import (
Expand All @@ -20,7 +20,6 @@
)
from src.web3py.types import Web3


TITLE_PROPERTY_NAME = "test_title"

_config = None
Expand Down
5 changes: 3 additions & 2 deletions src/modules/csm/csm.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from collections import defaultdict
from typing import Iterator
from typing import Iterator, Optional

from hexbytes import HexBytes

Expand Down Expand Up @@ -65,9 +65,10 @@ class CSOracle(BaseModule, ConsensusModule):
report_contract: CSFeeOracleContract
module_id: StakingModuleId

def __init__(self, w3: Web3):
def __init__(self, w3: Web3, refslot: Optional[int] = None):
self.report_contract = w3.csm.oracle
self.state = State.load()
self.refslot = refslot
super().__init__(w3)
self.module_id = self._get_module_id()

Expand Down
2 changes: 1 addition & 1 deletion src/modules/csm/types.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from dataclasses import dataclass
from typing import TypeAlias, Literal
from typing import Literal, TypeAlias

from hexbytes import HexBytes

Expand Down
1 change: 0 additions & 1 deletion src/modules/ejector/data_encode.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from src.utils.types import hex_str_to_bytes
from src.web3py.extensions.lido_validators import LidoValidator, NodeOperatorGlobalIndex


DATA_FORMAT_LIST = 1

MODULE_ID_LENGTH = 3
Expand Down
Loading