Skip to content

Commit c1fbd0e

Browse files
committed
feat: Implement notifyConsumedBlock method in HyperionLifecycleManager for pruning checks
1 parent 7460fc3 commit c1fbd0e

File tree

5 files changed

+66
-14
lines changed

5 files changed

+66
-14
lines changed

src/indexer/modules/lifecycleManager.ts

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { HyperionConfig, NodeAttributeRequirement, TieredIndexAllocationSettings
33
import { ConnectionManager } from "../connections/manager.class.js";
44
import { hLog } from "../helpers/common_functions.js";
55
import { HyperionMaster } from "./master.js";
6+
import { delay } from "lodash";
67

78
interface IndexData {
89
index: string;
@@ -27,6 +28,8 @@ export class HyperionLifecycleManager {
2728
autoPrune = false;
2829
totalDeletedBytes = 0;
2930

31+
private lastPruningCheckBlockNum = 0;
32+
3033
constructor(master: HyperionMaster) {
3134
this.master = master;
3235
this.conf = this.master.conf;
@@ -72,6 +75,40 @@ export class HyperionLifecycleManager {
7275
}
7376
}
7477

78+
async notifyConsumedBlock(blockNum: number) {
79+
if (!this.autoPrune) {
80+
return;
81+
}
82+
83+
const partitionSize = this.conf.settings.index_partition_size;
84+
if (partitionSize <= 0) {
85+
return; // Pruning is based on partitions, so this is required.
86+
}
87+
88+
if (this.lastPruningCheckBlockNum === 0) {
89+
// First block seen, initialize and return.
90+
this.lastPruningCheckBlockNum = blockNum;
91+
return;
92+
}
93+
94+
// Calculate partitions based on 1-based block numbers
95+
const oldPartition = Math.floor((this.lastPruningCheckBlockNum - 1) / partitionSize);
96+
const newPartition = Math.floor((blockNum - 1) / partitionSize);
97+
98+
if (newPartition > oldPartition) {
99+
hLog(`New index partition boundary crossed at block ${blockNum}. Triggering pruning check in 5 seconds...`);
100+
setTimeout(() => {
101+
this.checkPruning().catch((err) => {
102+
hLog(`Error during pruning check: ${err.message}`);
103+
});
104+
}, 5000);
105+
}
106+
107+
if (blockNum > this.lastPruningCheckBlockNum) {
108+
this.lastPruningCheckBlockNum = blockNum;
109+
}
110+
}
111+
75112
startAllocationMonitoring() {
76113
if (!this.allocationEnabled || !this.allocation) {
77114
return;
@@ -128,7 +165,9 @@ export class HyperionLifecycleManager {
128165
bytes: 'b',
129166
index: `${this.master.chain}-block-${this.conf.settings.index_version}*`
130167
});
131-
console.dir(blockIndices, { depth: Infinity });
168+
169+
// console.dir(blockIndices, { depth: Infinity });
170+
132171
if (blockIndices.length === 1) {
133172

134173
const indexName = blockIndices[0].index;
@@ -137,6 +176,8 @@ export class HyperionLifecycleManager {
137176
return;
138177
}
139178

179+
hLog(`Found block index: ${indexName}`);
180+
140181
// Get the current head block number
141182
const chainInfo = await this.master.rpc.v1.chain.get_info();
142183

@@ -167,7 +208,7 @@ export class HyperionLifecycleManager {
167208
query: { bool: { must: [{ range: { block_num: { lte: finalBlockToKeep } } }] } }
168209
});
169210

170-
console.dir(response, { depth: Infinity });
211+
// console.dir(response, { depth: Infinity });
171212

172213
const totalHits = response.hits.total as estypes.SearchTotalHits;
173214
if (totalHits && totalHits.value > 0) {
@@ -189,6 +230,7 @@ export class HyperionLifecycleManager {
189230

190231
} else if (blockIndices.length > 1) {
191232
// Partitioned block index, we can prune by index
233+
hLog(`Multiple block indices found. Pruning by index...`);
192234
await this.pruneIndices('block');
193235
}
194236
}

src/indexer/modules/loader.ts

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
import {ConfigurationModule} from "./config.js";
2-
import path, {join} from "path";
3-
import {existsSync, readdirSync, readFileSync} from "fs";
4-
import {HyperionConnections} from "../../interfaces/hyperionConnections.js";
5-
import {HyperionConfig} from "../../interfaces/hyperionConfig.js";
6-
import {BaseParser} from "./parsers/base-parser.js";
7-
import {hLog} from "../helpers/common_functions.js";
8-
import {ActionTrace} from "../../interfaces/action-trace.js";
1+
import { ConfigurationModule } from "./config.js";
2+
import path, { join } from "path";
3+
import { existsSync, readdirSync, readFileSync } from "fs";
4+
import { HyperionConnections } from "../../interfaces/hyperionConnections.js";
5+
import { HyperionConfig } from "../../interfaces/hyperionConfig.js";
6+
import { BaseParser } from "./parsers/base-parser.js";
7+
import { hLog } from "../helpers/common_functions.js";
8+
import { ActionTrace } from "../../interfaces/action-trace.js";
99
import { HyperionPlugin, HyperionStreamHandler } from "@eosrio/hyperion-plugin-core";
1010

1111
export class HyperionModuleLoader {
@@ -134,7 +134,9 @@ export class HyperionModuleLoader {
134134
// main loader function for plugin modules
135135
private async loadPlugins() {
136136
const base = join(import.meta.dirname, '../../../', 'plugins');
137-
hLog(`Loading plugins from ${base}`);
137+
138+
// hLog(`Loading plugins from ${base}`);
139+
138140
if (!existsSync(base)) {
139141
console.error('Plugin folder not found');
140142
return;
@@ -155,7 +157,7 @@ export class HyperionModuleLoader {
155157
return;
156158
}
157159

158-
console.log(JSON.stringify(pState, null, 2));
160+
// console.log(JSON.stringify(pState, null, 2));
159161

160162
for (const key in this.config.plugins) {
161163
if (this.config.plugins.hasOwnProperty(key)) {

src/indexer/modules/master.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,14 +273,21 @@ export class HyperionMaster {
273273
}
274274

275275
if (msg.live === 'false') {
276+
276277
this.indexerMonitor.consumedBlocks++;
277278
if (msg.block_num && msg.block_num > this.lastProcessedBlockNum) {
278279
this.lastProcessedBlockNum = msg.block_num;
279280
}
281+
280282
} else {
281283
// LIVE READER
282284
this.indexerMonitor.liveConsumedBlocks++;
283285

286+
// notify the lifecycle manager
287+
this.lifecycleManager.notifyConsumedBlock(msg.block_num).catch(e => {
288+
hLog(`Error from lifecycle manager notification: ${e.message}`);
289+
});
290+
284291
// cache the last block number for quick api access
285292
this.ioRedisClient.set(`${this.chain}:last_idx_block`, `${msg.block_num}@${msg.block_ts}`).catch(console.error);
286293

src/indexer/workers/deserializer.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1021,6 +1021,7 @@ export default class MainDSWorker extends HyperionWorker {
10211021
// TODO: check error on stake.libre::temppower
10221022
if (!row.data) {
10231023
console.log(`Failed to deserialize ${row.code}::${row.table} at block ${block}`);
1024+
console.log(row);
10241025
}
10251026

10261027
// const row_sb = this.createSerialBuffer(Serialize.hexToUint8Array(row['value']));

src/indexer/workers/indexer.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ export default class IndexerWorker extends HyperionWorker {
7373
const queueName = process.env.queue;
7474
if (this.ch && queueName) {
7575
this.ch_ready = true;
76-
console.log('Consumer on:', queueName);
76+
hLog('Starting consumer on:', queueName);
7777
this.ch.on('close', () => {
7878
hLog('Channel closed for queue:', queueName);
7979
this.indexQueue.pause();
@@ -94,7 +94,7 @@ export default class IndexerWorker extends HyperionWorker {
9494
startMonitoring() {
9595
setInterval(() => {
9696
if (this.temp_indexed_count > 0) {
97-
process.send?.({event: 'add_index', size: this.temp_indexed_count});
97+
process.send?.({ event: 'add_index', size: this.temp_indexed_count });
9898
}
9999
this.temp_indexed_count = 0;
100100
}, 1000);

0 commit comments

Comments
 (0)