Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ services:
- USE_ADAPTERS_WITH_USER_EVENT_ONLY=true
- CACHE_DATABASE_URL=postgresql://postgres:postgres@postgres:5432/defi-adapters
- CACHE_DATABASE_DISABLE_SSL=true
- BEACON_BASE_URL
- BEACON_NODE_API_KEY
depends_on:
postgres:
condition: service_healthy
Expand All @@ -43,8 +45,10 @@ services:
- CACHE_DATABASE_DISABLE_SSL=true
- BLOCK_RUNNER_BATCH_SIZE=10
- HISTORIC_CACHE_BATCH_SIZE=5
- BEACON_BASE_URL
- BEACON_NODE_API_KEY
depends_on:
postgres:
postgres:
condition: service_healthy

# Optional: To run just database service (run manually docker-compose up postgres)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,17 @@ import {
UnwrapInput,
} from '../../../../types/adapter'
import { Protocol } from '../../../protocols'
import { ZERO_ADDRESS } from '../../../../core/constants/ZERO_ADDRESS'

/**
* ETH2 Validator Staking Adapter
* Queries beacon node API for validator information
*/

export const ETH2_DEPOSIT_CONTRACT_ADDRESS = getAddress(
'0x00000000219ab540356cBB839Cbe05303d7705Fa',
)

// Alchemy Beacon Node API types
interface ValidatorData {
index: string
Expand Down Expand Up @@ -107,12 +112,10 @@ export class Eth2ValidatorStakingStakingAdapter implements IProtocolAdapter {

@CacheToDb
async getProtocolTokens(): Promise<ProtocolToken[]> {
const underlyingToken = await this.helpers.getTokenMetadata(
'0x0000000000000000000000000000000000000000',
) // ETH
const underlyingToken = await this.helpers.getTokenMetadata(ZERO_ADDRESS) // ETH
return [
{
address: getAddress('0x00000000219ab540356cBB839Cbe05303d7705Fa'),
address: getAddress(ETH2_DEPOSIT_CONTRACT_ADDRESS),
name: 'ETH2 Validator Staking',
symbol: 'ETH2-VALIDATOR',
decimals: 18,
Expand Down
20 changes: 7 additions & 13 deletions packages/adapters-library/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,30 @@ const providers: Record<ChainName, string> = {
),
op: parseStringEnv(
process.env.DEFI_ADAPTERS_PROVIDER_OPTIMISM,
'https://optimism-mainnet.infura.io/v3/abafec30d6aa45ffa0c763b5552a2d02',
),
bsc: parseStringEnv(
CHAIN_NOT_ENABLED,
'https://bsc-mainnet.infura.io/v3/abafec30d6aa45ffa0c763b5552a2d02',
),
bsc: parseStringEnv(CHAIN_NOT_ENABLED, CHAIN_NOT_ENABLED),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Optimism Provider Configuration Defaults Incorrectly

The Optimism provider configuration incorrectly uses CHAIN_NOT_ENABLED as its default value. This disables Optimism support when the DEFI_ADAPTERS_PROVIDER_OPTIMISM environment variable is not set, breaking existing functionality.

Fix in Cursor Fix in Web

matic: parseStringEnv(
process.env.DEFI_ADAPTERS_PROVIDER_POLYGON,
'https://polygon-mainnet.infura.io/v3/abafec30d6aa45ffa0c763b5552a2d02',
),
ftm: parseStringEnv(CHAIN_NOT_ENABLED, 'https://rpc.ftm.tools'),
sei: parseStringEnv(
CHAIN_NOT_ENABLED,
'https://sei-mainnet.infura.io/v3/abafec30d6aa45ffa0c763b5552a2d02',
),
ftm: parseStringEnv(CHAIN_NOT_ENABLED, CHAIN_NOT_ENABLED),
sei: parseStringEnv(CHAIN_NOT_ENABLED, CHAIN_NOT_ENABLED),
base: parseStringEnv(
process.env.DEFI_ADAPTERS_PROVIDER_BASE,
'https://base-mainnet.infura.io/v3/abafec30d6aa45ffa0c763b5552a2d02',
CHAIN_NOT_ENABLED,
),
arb: parseStringEnv(
process.env.DEFI_ADAPTERS_PROVIDER_ARBITRUM,
'https://arbitrum-mainnet.infura.io/v3/abafec30d6aa45ffa0c763b5552a2d02',
CHAIN_NOT_ENABLED,
),
avax: parseStringEnv(
process.env.DEFI_ADAPTERS_PROVIDER_AVALANCHE,
'https://avalanche-mainnet.infura.io/v3/abafec30d6aa45ffa0c763b5552a2d02',
CHAIN_NOT_ENABLED,
),
linea: parseStringEnv(
process.env.DEFI_ADAPTERS_PROVIDER_LINEA,
'https://linea-mainnet.infura.io/v3/abafec30d6aa45ffa0c763b5552a2d02',
CHAIN_NOT_ENABLED,
),
solana: parseStringEnv(
process.env.DEFI_ADAPTERS_PROVIDER_SOLANA,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,19 @@
* These functions convert raw event data to actual user addresses
*/

import { getAddress } from 'ethers'

export type LogArgumentTransformer = (rawValue: string) => string | null

/**
* Placeholder address used for ETH2 Type 00 (BLS) withdrawal credentials that haven't been updated yet
* This address is used as a placeholder for validators that still have BLS withdrawal credentials
* and need to be periodically checked for updates to Ethereum withdrawal credentials
*/
export const ETH2_TYPE_00_WITHDRAWAL_PLACEHOLDER_ADDRESS = getAddress(
'0x000000000000000000000000000000000000faff',
)

export const LOG_ARGUMENT_TRANSFORMERS: Record<string, LogArgumentTransformer> =
{
/**
Expand All @@ -20,9 +31,9 @@ export const LOG_ARGUMENT_TRANSFORMERS: Record<string, LogArgumentTransformer> =

// Check if this is BLS withdrawal credentials (starts with 00)
if (hex.startsWith('00')) {
// Return a dummy address for BLS credentials
// Return a placeholder address for BLS credentials
// We can use this to periodically check if the BLS credentials have been updated
return '0x000000000000000000000000000000000000faff'
return ETH2_TYPE_00_WITHDRAWAL_PLACEHOLDER_ADDRESS
}

// Extract the last 20 bytes (40 hex characters) which represent the address
Expand Down
2 changes: 2 additions & 0 deletions packages/adapters-library/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export { PositionType, TokenType } from './types/adapter'
export {
type LogArgumentTransformer,
LOG_ARGUMENT_TRANSFORMERS,
ETH2_TYPE_00_WITHDRAWAL_PLACEHOLDER_ADDRESS,
} from './core/utils/log-argument-transformers'
export type {
AdapterResponse,
Expand All @@ -29,3 +30,4 @@ export type { TestCase } from './types/testCase'
export { AVERAGE_BLOCKS_PER_10_MINUTES } from './core/constants/AVERAGE_BLOCKS_PER_10_MINS'
export { AVERAGE_BLOCKS_PER_DAY } from './core/constants/AVERAGE_BLOCKS_PER_DAY'
export type { AdditionalMetadataConfig } from './types/adapter'
export { ETH2_DEPOSIT_CONTRACT_ADDRESS } from './adapters/eth-2-validator-staking/products/staking/eth2ValidatorStakingStakingAdapter'
79 changes: 77 additions & 2 deletions packages/workers/src/database/postgres-cache-client.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { ChainName, type EvmChain } from '@metamask-institutional/defi-adapters'
import { type AdditionalMetadataConfig } from '@metamask-institutional/defi-adapters'
import {
ChainName,
type EvmChain,
type AdditionalMetadataConfig,
ETH2_DEPOSIT_CONTRACT_ADDRESS,
ETH2_TYPE_00_WITHDRAWAL_PLACEHOLDER_ADDRESS,
} from '@metamask-institutional/defi-adapters'
import { Mutex } from 'async-mutex'
import type { Pool, PoolClient, PoolConfig } from 'pg'
import type { Logger } from 'pino'
Expand Down Expand Up @@ -50,6 +55,11 @@ export interface CacheClient {
}[],
blockNumber?: number,
) => Promise<number>

getEth2StakingPubkeysWithPlaceholderAddress: () => Promise<string[]>
updateUserAddressesForPubkeys: (
updates: Array<{ pubkey: string; userAddress: string }>,
) => Promise<void>
}

export async function createPostgresCacheClient({
Expand Down Expand Up @@ -94,6 +104,71 @@ class PostgresCacheClient implements CacheClient {
this.#logger = logger
}

/**
* Returns all ETH2 staking public keys (validator pubkeys) that are associated with the
* placeholder withdrawal credentials address (i.e., the ETH2 deposit contract's placeholder address).
* This is used to identify all validator pubkeys for which the withdrawal credentials
*
*/
async getEth2StakingPubkeysWithPlaceholderAddress(): Promise<string[]> {
// Query logs where user_address is the placeholder address and metadata_value contains a pubkey
// We assume metadata_value is a stringified JSON, so we use LIKE to filter, then parse in JS
// Query all metadata_value fields for the placeholder address, and return as a string array.
// 300k values should be fine in memory for most modern servers (a few tens of MB).
const res = await this.#dbPool.query<{ metadata_value: string }>(
`
SELECT metadata_value
FROM logs
WHERE address = $1
`,
[ETH2_TYPE_00_WITHDRAWAL_PLACEHOLDER_ADDRESS],
)
// Each metadata_value is expected to be a string (e.g., a pubkey)
// Return as a string array
return res.rows.map((row) => row.metadata_value)
}

/**
* Updates user addresses for specific validator pubkeys
* This is used when BLS withdrawal credentials are updated to Ethereum addresses
*
* @param updates Array of objects containing pubkey and new user address
*/
async updateUserAddressesForPubkeys(
updates: Array<{ pubkey: string; userAddress: string }>,
): Promise<void> {
if (updates.length === 0) {
return
}

// Use a transaction to ensure all updates succeed or fail together
const client = await this.#dbPool.connect()
try {
await client.query('BEGIN')

// Update each pubkey-userAddress pair
for (const { pubkey, userAddress } of updates) {
await client.query(
`
UPDATE logs
SET address = $1
WHERE address = $2
AND metadata_key = 'pubkey'
AND metadata_value = $3
`,
[userAddress, ETH2_TYPE_00_WITHDRAWAL_PLACEHOLDER_ADDRESS, pubkey],
)
}

await client.query('COMMIT')
} catch (error) {
await client.query('ROLLBACK')
throw error
} finally {
client.release()
}
}

async getLatestBlockProcessed(): Promise<number | undefined> {
const res = await this.#dbPool.query(
'SELECT value FROM settings WHERE key = $1',
Expand Down
34 changes: 32 additions & 2 deletions packages/workers/src/main-worker.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Import necessary modules for blockchain processing, database operations, and logging
import { DefiProvider, EvmChain } from '@metamask-institutional/defi-adapters'
import { JsonRpcProvider, Network } from 'ethers'
import type { Logger } from 'pino'
Expand All @@ -10,48 +11,65 @@ import { runnerLoop } from './runner-loop.js'
import { updateNewJobs } from './update-new-jobs.js'
import { extractErrorMessage } from './utils/extractErrorMessage.js'

// Validate that the database connection URL is provided via environment variable
// This is required for storing and retrieving blockchain processing state
if (!process.env.CACHE_DATABASE_URL) {
logger.error('CACHE_DATABASE_URL is required')
process.exit(1)
}

logger.info('Worker started: Processing EVM chains')
// Create a DeFi provider instance that contains configuration for all supported blockchain networks
const defiProvider = new DefiProvider()

// Process all supported EVM chains in parallel
// Each chain gets its own processing pipeline to maximize efficiency
await Promise.all(
Object.values(EvmChain).map(async (chainId) => {
// Create a child logger with chain-specific context for better log organization
const childLogger = logger.child({ chainId })
try {
childLogger.info('Starting worker')

// Create a database connection client for this specific chain
// This allows us to store and retrieve processing state per chain
const cacheClient = await createPostgresCacheClient({
dbUrl: process.env.CACHE_DATABASE_URL!,
chainId,
partialPoolConfig: {
max: 10,
idleTimeoutMillis: 0,
max: 10, // Maximum 10 concurrent database connections
idleTimeoutMillis: 0, // Keep connections alive indefinitely
},
logger: childLogger,
})

// Get the RPC endpoint URL for this blockchain network
// This is where we'll connect to fetch blockchain data
const providerUrl =
defiProvider.chainProvider.providers[chainId]?._getConnection().url

// Skip processing if no RPC provider is configured for this chain
if (!providerUrl) {
childLogger.error('Provider missing for this chain')
return
}

// Create an ethers.js provider to interact with the blockchain
// This handles JSON-RPC communication with the blockchain node
const provider = new JsonRpcProvider(providerUrl, chainId, {
staticNetwork: Network.from(chainId),
})

// Determine which block number to start processing from
// This could be the last processed block from DB or current block from chain
const blockNumber = await getBlockToProcess(
cacheClient,
provider,
childLogger,
)

// Update the job queue with new processing tasks for this chain
// This identifies what DeFi protocols and positions need to be processed
await updateNewJobs({
chainId,
blockNumber,
Expand All @@ -60,6 +78,8 @@ await Promise.all(
logger: childLogger,
})

// Start the main processing loop for this chain
// This continuously processes blocks and updates positions
await runnerLoop({
blockNumber,
provider,
Expand All @@ -68,6 +88,8 @@ await Promise.all(
logger: childLogger,
})
} catch (error) {
// If any chain fails, log the error and exit the entire worker process
// This ensures we don't continue with corrupted state
childLogger.error(
{ chainId, error: extractErrorMessage(error) },
'Runner execution failed',
Expand All @@ -78,23 +100,31 @@ await Promise.all(
}),
)

/**
* Determines which block number to start processing from
* Priority: 1) Last processed block from database, 2) Current block from blockchain
*/
async function getBlockToProcess(
cacheClient: CacheClient,
provider: JsonRpcProvider,
logger: Logger,
) {
// Try to get the last processed block number from the database
// This allows us to resume processing from where we left off
const dbBlockNumber = await cacheClient.getLatestBlockProcessed()

if (dbBlockNumber) {
logger.info({ dbBlockNumber }, 'Last block processed fetched from DB')
return dbBlockNumber
}

// If no previous processing state exists, start from the current blockchain block
try {
const blockNumber = await provider.getBlockNumber()
logger.info({ blockNumber }, 'Block number fetched from provider')
return blockNumber
} catch (error) {
// If we can't connect to the blockchain, log the error and exit
logger.error(
{ error, providerUrl: provider._getConnection().url },
'Error fetching block number',
Expand Down
Loading
Loading