diff --git a/.gitignore b/.gitignore index 83d5e8ed9..b7df7a9c5 100644 --- a/.gitignore +++ b/.gitignore @@ -135,4 +135,6 @@ dist *.tsbuildinfo *db-shm -*db-wal \ No newline at end of file +*db-wal + + diff --git a/ethereum_index_latest_job.db b/ethereum_index_latest_job.db new file mode 100644 index 000000000..1f16ba8c7 Binary files /dev/null and b/ethereum_index_latest_job.db differ diff --git a/indexer-queries b/indexer-queries new file mode 100644 index 000000000..0f0cef521 --- /dev/null +++ b/indexer-queries @@ -0,0 +1,12 @@ +-- sql + +SELECT COUNT(*) FROM logs; + + +SELECT * FROM logs; +SELECT * FROM latest_block_processed; +SELECT * FROM contract_start_block; + +SELECT page_count * page_size as size_in_bytes, + (page_count * page_size) / 1024 / 1024 as size_in_mb +FROM pragma_page_count(), pragma_page_size(); diff --git a/package.json b/package.json index 56c0ffdae..6147573a6 100644 --- a/package.json +++ b/package.json @@ -17,6 +17,8 @@ "build:watch": "npm run build:watch -w packages/adapters-library", "adapters-cli": "node --env-file=.env packages/adapters-library/dist/scripts/index.js", "build-metadata-db": "npm run adapters-cli build-metadata-db --", + "indexer": "npm run adapters-cli indexer --", + "eth2-staking-indexer": "npm run adapters-cli indexer-eth2-staking-withdrawals --", "delete-adapter-metadata": "npm run adapters-cli delete-adapter-metadata --", "check-metadata-type": "npm run adapters-cli check-metadata-type --", "check-db-totals": "npm run adapters-cli check-db-totals --", diff --git a/packages/adapters-library/src/adapters/supportedProtocols.ts b/packages/adapters-library/src/adapters/supportedProtocols.ts index a2d30e38a..9fa67d6c1 100644 --- a/packages/adapters-library/src/adapters/supportedProtocols.ts +++ b/packages/adapters-library/src/adapters/supportedProtocols.ts @@ -178,7 +178,7 @@ export type SolanaChainAdapters = Partial< ) => IProtocolAdapter)[] > > - +// @ts-ignore export const supportedProtocols: Record< Protocol, EvmChainAdapters | SolanaChainAdapters @@ -458,39 +458,39 @@ export const supportedProtocols: Record< ], }, - [Protocol.MorphoAaveV2]: { - [Chain.Ethereum]: [ - MorphoAaveV2OptimizerBorrowAdapter, - MorphoAaveV2OptimizerSupplyAdapter, - ], - }, - - [Protocol.MorphoAaveV3]: { - [Chain.Ethereum]: [ - MorphoAaveV3OptimizerSupplyAdapter, - MorphoAaveV3OptimizerBorrowAdapter, - ], - }, - - [Protocol.MorphoBlue]: { - [Chain.Ethereum]: [ - MorphoBlueMarketSupplyAdapter, - MorphoBlueMarketBorrowAdapter, - MorphoBlueVaultAdapter, - ], - [Chain.Base]: [ - MorphoBlueMarketSupplyAdapter, - MorphoBlueMarketBorrowAdapter, - MorphoBlueVaultAdapter, - ], - }, - - [Protocol.MorphoCompoundV2]: { - [Chain.Ethereum]: [ - MorphoCompoundV2OptimizerSupplyAdapter, - MorphoCompoundV2OptimizerBorrowAdapter, - ], - }, + // [Protocol.MorphoAaveV2]: { + // [Chain.Ethereum]: [ + // MorphoAaveV2OptimizerBorrowAdapter, + // MorphoAaveV2OptimizerSupplyAdapter, + // ], + // }, + + // [Protocol.MorphoAaveV3]: { + // [Chain.Ethereum]: [ + // MorphoAaveV3OptimizerSupplyAdapter, + // MorphoAaveV3OptimizerBorrowAdapter, + // ], + // }, + + // [Protocol.MorphoBlue]: { + // [Chain.Ethereum]: [ + // MorphoBlueMarketSupplyAdapter, + // MorphoBlueMarketBorrowAdapter, + // MorphoBlueVaultAdapter, + // ], + // [Chain.Base]: [ + // MorphoBlueMarketSupplyAdapter, + // MorphoBlueMarketBorrowAdapter, + // MorphoBlueVaultAdapter, + // ], + // }, + + // [Protocol.MorphoCompoundV2]: { + // [Chain.Ethereum]: [ + // MorphoCompoundV2OptimizerSupplyAdapter, + // MorphoCompoundV2OptimizerBorrowAdapter, + // ], + // }, [Protocol.MountainProtocol]: { [Chain.Ethereum]: [ diff --git a/packages/adapters-library/src/scripts/blockIndexer.ts b/packages/adapters-library/src/scripts/blockIndexer.ts new file mode 100644 index 000000000..f067bd627 --- /dev/null +++ b/packages/adapters-library/src/scripts/blockIndexer.ts @@ -0,0 +1,218 @@ +import { Database as DatabaseType } from 'better-sqlite3' +import { AVERAGE_BLOCKS_PER_10_MINUTES } from '../core/constants/AVERAGE_BLOCKS_PER_10_MINS' +import { AVERAGE_BLOCKS_PER_DAY } from '../core/constants/AVERAGE_BLOCKS_PER_DAY' +import { Chain, ChainIdToChainNameMap } from '../core/constants/chains' +import { CustomJsonRpcProvider } from '../core/provider/CustomJsonRpcProvider' +import { createDatabase } from './createDatabase' + +/** + * BlockIndexer creates a SQLite database and continuously indexes blocks for a given blockchain. + * It listens for new blocks, stores processed blocks, and executes queries to maintain an index. + */ +export class BlockIndexer { + private _provider: CustomJsonRpcProvider + private _chainId: Chain + private _chainName: string + public db: DatabaseType + private static readonly _BATCH_SIZE = 50 + private static readonly _MAX_RETRIES = 5 + private _startBlockOverride: number | undefined + private _latestBlockNumber: number | undefined + + constructor({ + provider, + chainId, + chainName, + dbName, + startBlockOverride, + additionalTablesToCreate, + }: { + provider: CustomJsonRpcProvider + chainId: Chain + chainName: string + dbName: string + startBlockOverride?: number + additionalTablesToCreate?: Record + }) { + if (!dbName.endsWith('.db')) { + throw new Error('Database path must end with .db') + } + + this._startBlockOverride = startBlockOverride + this._provider = provider + this._chainId = chainId + this._chainName = chainName + + this.db = createDatabase(dbName, { + latest_block_processed: ` + CREATE TABLE IF NOT EXISTS latest_block_processed ( + id INTEGER PRIMARY KEY DEFAULT 1, + latest_block_processed INTEGER NOT NULL + ); + `, + ...additionalTablesToCreate, + }) + } + + public seedDb(seedDefaultData: string[]) { + seedDefaultData?.forEach((query) => this.db.exec(query)) + } + + public async getIndexerBlockNumbers(): Promise<{ + startBlockOverride?: number + lastProcessedBlockNumber: number + }> { + return { + startBlockOverride: this._startBlockOverride, + lastProcessedBlockNumber: await this.getLatestProcessedBlockNumber(), + } + } + + private async getLatestProcessedBlockNumber(): Promise { + const row = this.db + .prepare('SELECT latest_block_processed FROM latest_block_processed') + .get() as { latest_block_processed?: number } | undefined + + console.log(`Last processed block: ${row?.latest_block_processed}`) + + return row?.latest_block_processed ?? 0 + } + + private async waitForNewBlockToProcess(currentBlock: number): Promise { + let backoff = 1000 // Start with 1 second + + if (!this._latestBlockNumber) { + this._latestBlockNumber = await this._provider.getBlockNumber() + } + + while (currentBlock >= this._latestBlockNumber) { + try { + const blockNumber = await this._provider.getBlockNumber() + console.log(`[${this._chainName}] Latest block number: ${blockNumber}`) + if (blockNumber > this._latestBlockNumber) { + this._latestBlockNumber = blockNumber + return + } + } catch (error) { + console.error( + `[${this._chainName}] Error fetching block number:`, + error, + ) + } + await new Promise((resolve) => setTimeout(resolve, backoff)) + backoff = Math.min(backoff * 2, 60000) // Exponential backoff up to max 1 minute + } + } + + /** + * Processes blocks by fetching data and executing SQL queries. + * @param processBlockFn Function that takes a block number and returns an array of SQL queries to execute. + * @param startBlock Optional start block number. If not provided, it resumes from the last processed block. + */ + async processBlocks( + processBlockFn: (blockNumber: number) => Promise, + ) { + console.log(`[${this._chainName}] Starting block indexer...`) + let processingBlockNumber = + this._startBlockOverride ?? (await this.getLatestProcessedBlockNumber()) + this._latestBlockNumber = await this._provider.getBlockNumber() + let retryCount = 0 + + while (retryCount <= BlockIndexer._MAX_RETRIES) { + try { + // ensures we dont query future blocks + if (processingBlockNumber >= this._latestBlockNumber) { + await this.waitForNewBlockToProcess(processingBlockNumber) + } + + // if the indexer is too far behind, batch process blocks + const shouldBatch = + this._latestBlockNumber - processingBlockNumber > + BlockIndexer._BATCH_SIZE + + const results: string[] = [] + + if (shouldBatch) { + const batchEndBlock = Math.min( + processingBlockNumber + BlockIndexer._BATCH_SIZE, + this._latestBlockNumber, + ) + const blockPromises = [] + for ( + let blockNum = processingBlockNumber; + blockNum < batchEndBlock; + blockNum++ + ) { + blockPromises.push( + processBlockFn(blockNum).then((result) => { + results.push(...result) + console.log(`[${this._chainName}] Processed block ${blockNum}`) + }), + ) + } + await Promise.all(blockPromises) + processingBlockNumber = batchEndBlock + } else { + const result = await processBlockFn(processingBlockNumber) + results.push(...result) + console.log( + `[${this._chainName}] Processed block ${processingBlockNumber}`, + ) + processingBlockNumber++ + } + + this.db.transaction((inserts) => { + inserts.forEach((insert: string) => this.db.prepare(insert).run()) + this.db + .prepare( + 'INSERT OR REPLACE INTO latest_block_processed (id, latest_block_processed) VALUES (1, ?)', + ) + .run(processingBlockNumber) + })(results) + + await logUpdate(this._chainId, this._provider) + } catch (error) { + console.error( + `[${this._chainName}] Error processing block ${processingBlockNumber}:`, + error instanceof Error ? error.stack : String(error), + ) + + console.log( + `curl -X POST --data '{"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["0x${processingBlockNumber.toString( + 16, + )}", true],"id":1}' -H "Content-Type: application/json" ${ + this._provider._getConnection().url + } | jq`, + ) + + retryCount++ + if (retryCount >= BlockIndexer._MAX_RETRIES) { + const earliestSafeBlock = + processingBlockNumber - BlockIndexer._BATCH_SIZE + this.db + .prepare( + 'INSERT OR REPLACE INTO latest_block_processed (id, latest_block_processed) VALUES (1, ?)', + ) + .run(earliestSafeBlock) + throw new Error( + `[${this._chainName}] Max retries exceeded for block ${processingBlockNumber}`, + ) + } + await new Promise((res) => setTimeout(res, 5000 * retryCount)) + } + } + + async function logUpdate(chain: Chain, provider: CustomJsonRpcProvider) { + if (processingBlockNumber % AVERAGE_BLOCKS_PER_10_MINUTES[chain] === 0) { + const currentHeadBlock = await provider.getBlockNumber() + const blocksLagging = currentHeadBlock - processingBlockNumber + const blocksPerHour = AVERAGE_BLOCKS_PER_DAY[chain] / 24 + const lagInHours = (blocksLagging / blocksPerHour).toFixed(1) + + console.log( + `[${ChainIdToChainNameMap[chain]}] Indexer is ${lagInHours} hours behind, lagging ${blocksLagging} blocks.`, + ) + } + } + } +} diff --git a/packages/adapters-library/src/scripts/createDatabase.ts b/packages/adapters-library/src/scripts/createDatabase.ts new file mode 100644 index 000000000..8b956f2d5 --- /dev/null +++ b/packages/adapters-library/src/scripts/createDatabase.ts @@ -0,0 +1,35 @@ +import path from 'node:path' +import Database, { Database as BetterSqlite3Database } from 'better-sqlite3' + +/** + * Creates a database and ensures required tables exist. + */ +export function createDatabase( + dbName: string, + dbTables: Record, +): BetterSqlite3Database { + const dbPath = path.resolve(`./${dbName}`) + const db = new Database(dbPath) + + // Create tables if they don't exist + for (const [tableName, createTableQuery] of Object.entries(dbTables)) { + const tableExists = db + .prepare(`SELECT name FROM sqlite_master WHERE type='table' AND name=?`) + .get(tableName) + if (!tableExists) { + console.log(`Creating table: ${tableName}`) + db.exec(createTableQuery) + } + + // check table created successfully + const tableCreated = db + .prepare(`SELECT name FROM sqlite_master WHERE type='table' AND name=?`) + .get(tableName) + + if (!tableCreated) { + throw new Error(`Failed to create table: ${tableName}`) + } + } + + return db +} diff --git a/packages/adapters-library/src/scripts/depositEventAbi.ts b/packages/adapters-library/src/scripts/depositEventAbi.ts new file mode 100644 index 000000000..ed14ab1c0 --- /dev/null +++ b/packages/adapters-library/src/scripts/depositEventAbi.ts @@ -0,0 +1,39 @@ +export const depositEventAbi = [ + { + anonymous: false, + inputs: [ + { + indexed: false, + internalType: 'bytes', + name: 'pubkey', + type: 'bytes', + }, + { + indexed: false, + internalType: 'bytes', + name: 'withdrawal_credentials', + type: 'bytes', + }, + { + indexed: false, + internalType: 'bytes', + name: 'amount', + type: 'bytes', + }, + { + indexed: false, + internalType: 'bytes', + name: 'signature', + type: 'bytes', + }, + { + indexed: false, + internalType: 'bytes', + name: 'index', + type: 'bytes', + }, + ], + name: 'DepositEvent', + type: 'event', + }, +] diff --git a/packages/adapters-library/src/scripts/eth2Staking.ts b/packages/adapters-library/src/scripts/eth2Staking.ts new file mode 100644 index 000000000..3290de853 --- /dev/null +++ b/packages/adapters-library/src/scripts/eth2Staking.ts @@ -0,0 +1,88 @@ +import { ethers, getAddress } from 'ethers' + +import { CustomJsonRpcProvider } from '../core/provider/CustomJsonRpcProvider' +import { depositEventAbi } from './depositEventAbi' + +class Eth2StakingIndexer { + // Must start getting block withdrawals from this block number + public withdrawalsEnabledBlockNumber = 17034871 //21693897 + // Must start getting block deposits from this block number + private _depositsEnabledBlockNumber = 11052984 + private _depositContractAddress = '0x00000000219ab540356cBB839Cbe05303d7705Fa' + private _provider: CustomJsonRpcProvider + + constructor(provider: CustomJsonRpcProvider) { + this._provider = provider + } + + createTableCommand = ` + CREATE TABLE IF NOT EXISTS eth2_staking ( + userAddress CHAR(40) PRIMARY KEY, + deposits INTEGER DEFAULT 0, + full_withdrawals INTEGER DEFAULT 0 + ); +` + + public async withdrawalsProcessBlock(blockNumber: number): Promise { + const result: string[] = [] + + const block = await this._provider.send('eth_getBlockByNumber', [ + `0x${ethers.toBeHex(blockNumber).slice(2).replace(/^0+/, '')}`, + false, + ]) + + if (block.withdrawals?.length) { + for (const withdrawal of block.withdrawals) { + // Use ethers.js to properly parse the hex value + const amountInGwei = BigInt(withdrawal.amount) + // Convert Gwei to ETH (1 ETH = 1e9 Gwei) + const amountInEth = Number(amountInGwei) / 1e9 + // Count how many full validator withdrawals (32 ETH each) + const fullWithdrawals = Math.floor(amountInEth / 32) + + if (fullWithdrawals > 0) { + const userAddress = `'${getAddress(withdrawal.address).slice(2)}'` + const amount = fullWithdrawals * 32 + + result.push( + `INSERT INTO eth2_staking (userAddress, deposits, full_withdrawals) VALUES (${userAddress}, 0, ${amount}) ON CONFLICT(userAddress) DO UPDATE SET full_withdrawals = full_withdrawals + ${amount}`, + ) + } + } + } + + return result + } + + public async processLogIfEth2Deposit(log: ethers.Log): Promise { + if ( + !( + log.address.toLowerCase() === this._depositContractAddress.toLowerCase() + ) + ) { + return + } + + const iface = new ethers.Interface(depositEventAbi) + const decodedLog = iface.parseLog(log) + + if (!decodedLog) { + return + } + + const userAddress = ethers + .hexlify(decodedLog.args.withdrawal_credentials) + .slice(26) + const amount = ethers.hexlify(decodedLog.args.amount) + + await `INSERT INTO eth2_staking (userAddress, deposits, full_withdrawals) VALUES (${userAddress}, ${Number.parseInt( + amount, + 16, + )}, 0) ON CONFLICT(userAddress) DO UPDATE SET deposits = deposits + ${Number.parseInt( + amount, + 16, + )}` + } +} + +export default Eth2StakingIndexer diff --git a/packages/adapters-library/src/scripts/eth2StakingIndex.ts b/packages/adapters-library/src/scripts/eth2StakingIndex.ts new file mode 100644 index 000000000..152b9059b --- /dev/null +++ b/packages/adapters-library/src/scripts/eth2StakingIndex.ts @@ -0,0 +1,35 @@ +import { Command } from 'commander' + +import { Chain, ChainIdToChainNameMap } from '../core/constants/chains' + +import { DefiProvider } from '../defiProvider' +import { BlockIndexer } from './blockIndexer' +import Eth2StakingIndexer from './eth2Staking' + +export function indexerEth2Staking( + program: Command, + defiProvider: DefiProvider, +) { + program + .command('indexer-eth2-staking-withdrawals') + .showHelpAfterError() + .action(async () => { + const provider = defiProvider.chainProvider.providers[Chain.Ethereum] + const eth2StakingIndexer = new Eth2StakingIndexer(provider) + + const indexer = new BlockIndexer({ + provider, + chainId: Chain.Ethereum, + chainName: ChainIdToChainNameMap[Chain.Ethereum], + dbName: 'eth2_staking_indexer.db', + startBlockOverride: eth2StakingIndexer.withdrawalsEnabledBlockNumber, + additionalTablesToCreate: { + eth2_staking: eth2StakingIndexer.createTableCommand, + }, + }) + + await indexer.processBlocks( + eth2StakingIndexer.withdrawalsProcessBlock.bind(eth2StakingIndexer), + ) + }) +} diff --git a/packages/adapters-library/src/scripts/index.ts b/packages/adapters-library/src/scripts/index.ts index ec1652337..499b680a2 100644 --- a/packages/adapters-library/src/scripts/index.ts +++ b/packages/adapters-library/src/scripts/index.ts @@ -6,6 +6,7 @@ import { copyAdapter } from './adapterBuilder/copyAdapter' import { newAdapterCommand } from './adapterBuilder/newAdapterCommand' import { blockAverage } from './blockAverage' import { buildMetadataDb } from './buildMetadataDb' + import { buildScoreboard } from './buildScoreboard' import { buildSnapshots } from './buildSnapshots' import { buildContractTypes } from './buildTypes' @@ -13,10 +14,12 @@ import { checkBadSnapshots } from './checkBadSnapshots' import { checkDbTotals } from './checkDbTotals' import { checkMetadataType } from './checkMetadataType' import { deleteAdapterMetadata } from './deleteAdapterMetadata' +import { indexerEth2Staking } from './eth2StakingIndex' import { featureCommands } from './featureCommands' import { performance } from './performance' import { simulateTxCommand } from './simulateTxCommand' import { stressCommand } from './stress' +import { indexer } from './positionIndexer' const program = new Command('mmi-adapters') @@ -25,6 +28,10 @@ const chainProviders = defiProvider.chainProvider.providers const solanaProvider = defiProvider.chainProvider.solanaProvider const adaptersController = defiProvider.adaptersController +indexer(program, defiProvider) + +indexerEth2Staking(program, defiProvider) + featureCommands(program, defiProvider) checkMetadataType(program, chainProviders, solanaProvider, adaptersController) diff --git a/packages/adapters-library/src/scripts/positionIndexer.ts b/packages/adapters-library/src/scripts/positionIndexer.ts new file mode 100644 index 000000000..5ce3be6c9 --- /dev/null +++ b/packages/adapters-library/src/scripts/positionIndexer.ts @@ -0,0 +1,195 @@ +import { Command } from 'commander' +import { ethers, getAddress } from 'ethers' +import { + Chain, + ChainIdToChainNameMap, + EvmChain, +} from '../core/constants/chains' +import { CustomJsonRpcProvider } from '../core/provider/CustomJsonRpcProvider' +import { DefiProvider } from '../defiProvider' +import { BlockIndexer } from './blockIndexer' + +export function indexer(program: Command, defiProvider: DefiProvider) { + program + .command('indexer') + .option( + '-c, --chain ', + 'comma-separated chains filter (e.g. ethereum,arbitrum,linea)', + ) + .option( + '-b, --block ', + 'optional block number to start indexing from', + ) + .showHelpAfterError() + .action(async ({ chain, block }: { chain?: string; block?: string }) => { + const filterChainId = chain ? (Number(chain) as EvmChain) : undefined + if (filterChainId && !ChainIdToChainNameMap[filterChainId]) { + throw new Error(`No chain matches the given filter: ${chain}`) + } + const startBlockOverride = block ? Number(block) : undefined + + await Promise.all( + Object.values(EvmChain) + .filter( + (chainId) => + filterChainId === undefined || filterChainId === chainId, + ) + .map(async (chainId) => + processChain(chainId, defiProvider, startBlockOverride), + ), + ) + }) +} + +async function processChain( + chainId: EvmChain, + defiProvider: DefiProvider, + startBlockOverride?: number, +) { + const chainName = ChainIdToChainNameMap[chainId] + console.log(`Starting indexer for chain: ${chainName}`) + + const provider = defiProvider.chainProvider.providers[chainId] + + const watchContractListCheckSum = await getDeFiContractAddressesCheckSum({ + defiProvider, + chainId, + chainName, + }) + + const indexer = new BlockIndexer({ + provider, + chainId, + chainName, + dbName: `${chainName}_positions.db`, + startBlockOverride: startBlockOverride, + additionalTablesToCreate: { + logs: ` + CREATE TABLE IF NOT EXISTS logs ( + contract_address CHAR(40) NOT NULL, + address CHAR(40) NOT NULL, + UNIQUE(contract_address, address) + ); + `, + contract_start_block: ` + CREATE TABLE IF NOT EXISTS contract_start_block ( + contract_address CHAR(40) NOT NULL, + first_block_number INTEGER NOT NULL, + PRIMARY KEY (contract_address) + ); + `, + }, + }) + + const { lastProcessedBlockNumber } = await indexer.getIndexerBlockNumbers() + + // if block override is set, replace all start blocks with the override value + if (startBlockOverride) { + indexer.seedDb([ + Array.from(watchContractListCheckSum) + .map( + (address) => + `INSERT OR REPLACE INTO contract_start_block (contract_address, first_block_number) VALUES ('${address}', ${startBlockOverride});`, + ) + .join('\n'), + ]) + } else { + // if no block override, then only insert new contract addresses that are not already in the db + indexer.seedDb([ + Array.from(watchContractListCheckSum) + .map( + (address) => + `INSERT OR IGNORE INTO contract_start_block (contract_address, first_block_number) VALUES ('${address}', ${lastProcessedBlockNumber});`, + ) + .join('\n'), + ]) + } + + await indexer.processBlocks((blockNumber) => + fetchAndProcessLogs({ + provider, + blockNumber, + chainName: chainName.toLowerCase(), + watchContractListLowercase: new Set( + Array.from(watchContractListCheckSum).map((address) => + address.toLowerCase(), + ), + ), + }), + ) +} + +/** + * Checksum without 0x + * @param defiProvider + * @param chainId + * @param chainName + * @returns + */ +async function getDeFiContractAddressesCheckSum({ + defiProvider, + chainId, + chainName, +}: { defiProvider: DefiProvider; chainId: Chain; chainName: string }) { + const defiPoolAddresses = await defiProvider.getSupport({ + filterChainIds: [chainId], + }) + + const watchContractList = new Set() + for (const pools of Object.values(defiPoolAddresses || {})) { + for (const pool of pools) { + for (const address of pool.protocolTokenAddresses?.[chainId] || []) { + watchContractList.add(getAddress(address).slice(2)) + } + } + } + + console.log( + `Watching ${watchContractList.size} DeFi contracts on ${chainName}`, + ) + return watchContractList +} + +async function fetchAndProcessLogs({ + provider, + blockNumber, + chainName, + watchContractListLowercase, +}: { + provider: CustomJsonRpcProvider + blockNumber: number + chainName: string + watchContractListLowercase: Set +}): Promise { + const receipts = await provider.send('eth_getBlockReceipts', [ + `0x${ethers.toBeHex(blockNumber).slice(2).replace(/^0+/, '')}`, + ]) + + const queries: string[] = [] + + for (const receipt of receipts?.flat() || []) { + for (const log of receipt.logs || []) { + console.log(`[${chainName}] Processing log: ${log.address}`) + + // retuned lowercase from provider + const contractAddressLowercase = log.address + if (watchContractListLowercase.has(contractAddressLowercase.slice(2))) { + log.topics + .filter((topic: string) => + topic.startsWith('0x000000000000000000000000'), + ) + .forEach((topic: string) => + queries.push( + `INSERT OR IGNORE INTO logs (contract_address, address) VALUES ('${getAddress( + contractAddressLowercase, + ).slice(2)}', '${getAddress(topic.slice(-40).toLowerCase()).slice( + 2, + )}');`, + ), + ) + } + } + } + + return queries +}