Skip to content

Commit b242901

Browse files
committed
WIP: dynamic state indexing on mongodb
1 parent eb131c3 commit b242901

File tree

5 files changed

+177
-50
lines changed

5 files changed

+177
-50
lines changed

src/indexer/helpers/elastic-routes.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -525,7 +525,7 @@ export class ElasticRoutes {
525525

526526
private registerRoutes() {
527527
const partitionRouter = this.getIndexPartition.bind(this);
528-
this.registerDynamicTableRoute();
528+
// this.registerDynamicTableRoute();
529529
this.addRoute('abi', buildAbiBulk, partitionRouter);
530530
this.addRoute('action', buildActionBulk, partitionRouter);
531531
this.addRoute('block', buildBlockBulk, partitionRouter);

src/indexer/helpers/mongo-routes.ts

Lines changed: 72 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
import { Collection, Db } from "mongodb";
2-
import { ConnectionManager } from "../connections/manager.class.js";
3-
import { Message } from "amqplib";
4-
import { hLog } from "./common_functions.js";
5-
import { IAccount } from "../../interfaces/table-account.js";
6-
import { IProposal } from "../../interfaces/table-proposal.js";
7-
import { IVoter } from "../../interfaces/table-voter.js";
1+
import {BulkWriteResult, Collection, Db} from "mongodb";
2+
import {ConnectionManager} from "../connections/manager.class.js";
3+
import {Message} from "amqplib";
4+
import {hLog} from "./common_functions.js";
5+
import {IAccount} from "../../interfaces/table-account.js";
6+
import {IProposal} from "../../interfaces/table-proposal.js";
7+
import {IVoter} from "../../interfaces/table-voter.js";
88

99
export class MongoRoutes {
1010

@@ -24,6 +24,7 @@ export class MongoRoutes {
2424
this.proposalsCollection = this.db.collection('proposals');
2525
this.votersCollection = this.db.collection('voters');
2626
this.addRoutes();
27+
this.addDynamicRoutes();
2728
}
2829
}
2930

@@ -62,7 +63,7 @@ export class MongoRoutes {
6263
}
6364
});
6465

65-
this.accountsCollection?.bulkWrite(operations, { ordered: false }).catch(reason => {
66+
this.accountsCollection?.bulkWrite(operations, {ordered: false}).catch(reason => {
6667
hLog('error', 'mongo-routes', 'table-accounts', reason);
6768
}).finally(() => {
6869
// TODO: ack
@@ -88,7 +89,7 @@ export class MongoRoutes {
8889
};
8990
});
9091

91-
this.proposalsCollection?.bulkWrite(operations, { ordered: false }).catch(reason => {
92+
this.proposalsCollection?.bulkWrite(operations, {ordered: false}).catch(reason => {
9293
hLog('error', 'mongo-routes', 'table-proposals', reason);
9394
}).finally(() => {
9495
callback(payload.length);
@@ -126,4 +127,66 @@ export class MongoRoutes {
126127
});
127128
};
128129
}
130+
131+
private addDynamicRoutes() {
132+
this.routes['dynamic-table'] = (payload: Message[], callback: (indexed_size?: number) => void) => {
133+
const groupedOps = new Map<string, any[]>();
134+
payload.forEach((msg: Message) => {
135+
const delta = JSON.parse(msg.content.toString()) as any;
136+
const headers = msg.properties.headers as any;
137+
138+
const op = {};
139+
140+
if (headers.present === 1) {
141+
op['updateOne'] = {
142+
filter: {
143+
'@scope': delta.scope,
144+
'@pk': delta.primary_key
145+
},
146+
update: {
147+
$set: {
148+
'@scope': delta.scope,
149+
'@pk': delta.primary_key,
150+
'@payer': delta.payer,
151+
'@block_num': delta.block_num,
152+
'@block_time': delta['@timestamp'],
153+
'@block_id': delta['block_id'],
154+
...delta.data
155+
}
156+
},
157+
upsert: true
158+
};
159+
} else {
160+
op['deleteOne'] = {
161+
filter: {
162+
'@scope': delta.scope,
163+
'@pk': delta.primary_key
164+
}
165+
};
166+
}
167+
168+
const collection = headers.code + '-' + headers.table;
169+
const col = groupedOps.get(collection);
170+
if (col) {
171+
col.push(op);
172+
} else {
173+
groupedOps.set(collection, [op]);
174+
}
175+
// hLog(msg.properties.headers, delta);
176+
});
177+
178+
const promises: Promise<BulkWriteResult>[] = [];
179+
180+
groupedOps.forEach((value, key) => {
181+
if (this.db) {
182+
promises.push(this.db.collection(key).bulkWrite(value, {ordered: false}))
183+
}
184+
});
185+
186+
Promise.all(promises).finally(() => {
187+
callback(payload.length);
188+
});
189+
}
190+
191+
}
129192
}

src/indexer/modules/master.ts

Lines changed: 62 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ import {updateByBlock} from "../definitions/updateByBlock.painless.js";
5151
import {IAccount} from "../../interfaces/table-account.js";
5252
import {IProposal} from "../../interfaces/table-proposal.js";
5353
import {IVoter} from "../../interfaces/table-voter.js";
54+
import {Db, IndexDescription} from "mongodb";
5455
import Timeout = NodeJS.Timeout;
5556

5657
interface RevBlock {
@@ -2455,7 +2456,7 @@ export class HyperionMaster {
24552456
{key: {is_proxy: 1}}
24562457
]);
24572458

2458-
this.parseContractStateConfig();
2459+
await this.parseContractStateConfig(db);
24592460
}
24602461
}
24612462
} catch (e: any) {
@@ -2464,14 +2465,70 @@ export class HyperionMaster {
24642465
}
24652466
}
24662467

2467-
private parseContractStateConfig() {
2468+
private async parseContractStateConfig(db: Db) {
24682469
if (this.conf.features.contract_state && this.conf.features.contract_state.enabled) {
24692470
console.log('MongoDB contract state is enabled!');
24702471
if (this.conf.features.contract_state.contracts) {
24712472
const contracts = this.conf.features.contract_state.contracts;
2472-
for (let accountKey in contracts) {
2473-
if (contracts[accountKey]) {
2474-
console.log(accountKey, contracts[accountKey]);
2473+
for (let code in contracts) {
2474+
if (contracts[code]) {
2475+
for (let table in contracts[code]) {
2476+
const indices: IndexDescription[] = [
2477+
{key: {'@pk': -1}},
2478+
{key: {'@scope': 1}},
2479+
{key: {'@block_id': -1}},
2480+
{key: {'@block_num': -1}},
2481+
{key: {'@block_time': -1}},
2482+
{key: {'@payer': 1}}
2483+
];
2484+
const textFields = {};
2485+
if (contracts[code][table]["auto_index"]) {
2486+
const contractAbi = await this.rpc.get_abi(code);
2487+
if (contractAbi && contractAbi.abi) {
2488+
const tables = contractAbi.abi.tables;
2489+
const structs = contractAbi.abi.structs;
2490+
const extractStructFlat = (structName: string) => {
2491+
const struct = structs.find(value => value.name === structName);
2492+
if (struct?.base) {
2493+
extractStructFlat(struct.base);
2494+
}
2495+
struct?.fields.forEach(value => {
2496+
const key = {};
2497+
key[value.name] = -1;
2498+
indices.push({key});
2499+
});
2500+
};
2501+
const tableData = tables.find(value => value.name === table);
2502+
if (tableData) {
2503+
extractStructFlat(tableData.type);
2504+
}
2505+
}
2506+
} else {
2507+
for (let index in contracts[code][table]['indices']) {
2508+
const key = {};
2509+
const indexValue = contracts[code][table]['indices'][index];
2510+
if (indexValue === 'date') {
2511+
key[index] = -1;
2512+
} else {
2513+
key[index] = indexValue;
2514+
}
2515+
if (indexValue === 'text') {
2516+
textFields[index] = 'text';
2517+
} else {
2518+
indices.push({key})
2519+
}
2520+
}
2521+
}
2522+
2523+
if (indices.length > 0) {
2524+
await db.collection(`${code}-${table}`).createIndexes(indices);
2525+
}
2526+
if (Object.keys(textFields).length > 0) {
2527+
await db.collection(`${code}-${table}`).createIndex(textFields, {
2528+
name: Object.keys(textFields).join('_') + "_text",
2529+
});
2530+
}
2531+
}
24752532
}
24762533
}
24772534
} else {

src/indexer/workers/deserializer.ts

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import {Action, Type as EOSJSType} from "eosjs/dist/eosjs-serialize.js";
1818
import {JsSignatureProvider} from "eosjs/dist/eosjs-jssig.js";
1919
import {HyperionSignedBlock, ProducerSchedule} from "../../interfaces/signed-block.js";
2020
import {estypes} from "@elastic/elasticsearch";
21+
import {Serializer} from "@wharfkit/antelope";
22+
2123

2224
const abi_remapping = {
2325
"_Bool": "bool"
@@ -1049,6 +1051,8 @@ export default class MainDSWorker extends HyperionWorker {
10491051
const key2 = `${contractRowDelta.code}:*`;
10501052
const key3 = `*:${contractRowDelta.table}`;
10511053

1054+
this.pushToDynamicTableQueue(contractRowDelta);
1055+
10521056
// strict code::table handlers
10531057
if (this.tableHandlers[key]) {
10541058
if (this.isAsync(this.tableHandlers[key])) {
@@ -1122,31 +1126,33 @@ export default class MainDSWorker extends HyperionWorker {
11221126
return prefixedOutput;
11231127
}
11241128

1125-
pushToDynamicTableQueue(jsonRow) {
1126-
if (this.allowedDynamicContracts.has(jsonRow.code)) {
1127-
const doc = {
1128-
'@timestamp': jsonRow['@timestamp'],
1129-
table: jsonRow.table,
1130-
scope: jsonRow.scope,
1131-
primary_key: jsonRow.primary_key,
1132-
payer: jsonRow.payer,
1133-
block_num: jsonRow.block_num,
1134-
block_id: jsonRow.block_id,
1135-
fields: this.addTablePrefix(jsonRow.table, jsonRow.data)
1136-
};
1137-
this.preIndexingQueue.push({
1138-
queue: this.chain + ":index_dynamic:" + (this.dyn_emit_idx),
1139-
content: bufferFromJson(doc),
1140-
headers: {
1141-
id: `${jsonRow.table}-${jsonRow.scope}-${jsonRow.primary_key}`,
1142-
code: jsonRow.code,
1129+
pushToDynamicTableQueue(jsonRow: HyperionDelta) {
1130+
1131+
if (this.conf.features.contract_state.contracts[jsonRow.code] || this.allowedDynamicContracts.has(jsonRow.code)) {
1132+
if (this.conf.features.contract_state.contracts[jsonRow.code][jsonRow.table]) {
1133+
const doc = {
1134+
'@timestamp': jsonRow['@timestamp'],
1135+
scope: jsonRow.scope,
1136+
primary_key: jsonRow.primary_key,
1137+
payer: jsonRow.payer,
11431138
block_num: jsonRow.block_num,
1144-
present: jsonRow.present
1139+
block_id: jsonRow.block_id,
1140+
data: jsonRow.data
1141+
};
1142+
this.preIndexingQueue.push({
1143+
queue: this.chain + ":index_dynamic:" + (this.dyn_emit_idx),
1144+
content: bufferFromJson(doc),
1145+
headers: {
1146+
code: jsonRow.code,
1147+
table: jsonRow.table,
1148+
block_num: jsonRow.block_num,
1149+
present: jsonRow.present
1150+
}
1151+
}).catch(console.log);
1152+
this.dyn_emit_idx++;
1153+
if (this.dyn_emit_idx > this.conf.scaling.dyn_idx_queues) {
1154+
this.dyn_emit_idx = 1;
11451155
}
1146-
}).catch(console.log);
1147-
this.dyn_emit_idx++;
1148-
if (this.dyn_emit_idx > this.conf.scaling.dyn_idx_queues) {
1149-
this.dyn_emit_idx = 1;
11501156
}
11511157
}
11521158
}
@@ -1247,8 +1253,6 @@ export default class MainDSWorker extends HyperionWorker {
12471253
if (jsonRow && await this.processTableDelta(jsonRow)) {
12481254
if (!this.conf.indexer.disable_indexing && this.conf.features.index_deltas) {
12491255

1250-
this.pushToDynamicTableQueue(jsonRow);
1251-
12521256
// check for plugin handlers
12531257
await this.mLoader.processDeltaData(jsonRow);
12541258

src/indexer/workers/indexer.ts

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,28 +39,30 @@ export default class IndexerWorker extends HyperionWorker {
3939
if (this.conf.indexer.experimental_mongodb_state && this.mongoRoutes.routes[process.env.type]) {
4040
// call route type
4141
(this.mongoRoutes.routes[process.env.type] as any)(payload, (indexed_size?: number) => {
42-
// if (indexed_size) {
43-
// this.temp_indexed_count += indexed_size;
44-
// }
45-
// console.log('MongoDB indexed: ', indexed_size);
42+
if (indexed_size) {
43+
this.temp_indexed_count += indexed_size;
44+
}
45+
// console.log('MongoDB indexed: ', indexed_size, ' on ', process.env.type);
46+
try {
47+
this.ch?.ackAll();
48+
callback();
49+
} catch (e: any) {
50+
hLog(`${e.message} on ${process.env.type}`);
51+
}
4652
});
47-
}
48-
49-
if (this.esRoutes.routes[process.env.type]) {
53+
} else if (this.esRoutes.routes[process.env.type]) {
5054
// call route type
5155
(this.esRoutes.routes[process.env.type] as RouteFunction)(payload, this.ch, (indexed_size?: number) => {
5256
if (indexed_size) {
5357
this.temp_indexed_count += indexed_size;
5458
}
59+
// console.log('ES indexed: ', indexed_size, ' on ', process.env.type);
5560
try {
5661
callback();
5762
} catch (e: any) {
5863
hLog(`${e.message} on ${process.env.type}`);
5964
}
6065
});
61-
} else {
62-
hLog(`No route for type: ${process.env.type}`);
63-
process.exit(1);
6466
}
6567
}
6668

@@ -72,6 +74,7 @@ export default class IndexerWorker extends HyperionWorker {
7274
const queueName = process.env.queue;
7375
if (this.ch && queueName) {
7476
this.ch_ready = true;
77+
console.log('Consumer on:', queueName);
7578
this.ch.on('close', () => {
7679
hLog('Channel closed for queue:', queueName);
7780
this.indexQueue.pause();

0 commit comments

Comments
 (0)