Skip to content

Commit 2da23cf

Browse files
feat: add contract state synchronization command and primary key validation utility
1 parent 5544f4a commit 2da23cf

File tree

3 files changed

+560
-2
lines changed

3 files changed

+560
-2
lines changed

src/cli/hyp-control.ts

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { Command } from "commander";
22
import { AccountStateSynchronizer as AccountSynchronizer } from "./sync-accounts/sync-accounts.js";
33
import { AccountStateSynchronizer as VoterSynchronizer } from "./sync-accounts/sync-voters.js";
44
import { ProposalSynchronizer } from "./sync-accounts/sync-proposals.js";
5+
import { ContractStateSynchronizer } from "./sync-accounts/sync-contract-state.js";
56

67
const __dirname = new URL('.', import.meta.url).pathname;
78

@@ -20,6 +21,11 @@ async function syncProposals(chain: string) {
2021
await synchronizer.run();
2122
}
2223

24+
async function syncContractState(chain: string) {
25+
const synchronizer = new ContractStateSynchronizer(chain);
26+
await synchronizer.run();
27+
}
28+
2329
(() => {
2430
const program = new Command();
2531

@@ -55,8 +61,18 @@ async function syncProposals(chain: string) {
5561
}
5662
});
5763

64+
sync.command('contract-state <chain>')
65+
.description('Sync contract state for a specific chain')
66+
.action(async (chain: string) => {
67+
try {
68+
await syncContractState(chain);
69+
} catch (error) {
70+
console.error('Error syncing contract state:', error);
71+
}
72+
});
73+
5874
sync.command('all <chain>')
59-
.description('Sync voters, accounts, and proposals for a specific chain')
75+
.description('Sync voters, accounts, proposals, and contract state for a specific chain')
6076
.action(async (chain: string) => {
6177
try {
6278
console.log('Syncing voters...');
@@ -65,7 +81,9 @@ async function syncProposals(chain: string) {
6581
await syncAccounts(chain);
6682
console.log('Syncing proposals...');
6783
await syncProposals(chain);
68-
console.log('Sync completed for voters, accounts, and proposals.');
84+
console.log('Syncing contract state...');
85+
await syncContractState(chain);
86+
console.log('Sync completed for voters, accounts, proposals, and contract state.');
6987
} catch (error) {
7088
console.error('Error during sync:', error);
7189
}
Lines changed: 325 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,325 @@
1+
import { readFileSync } from "node:fs";
2+
import { APIClient, Asset, Name } from "@wharfkit/antelope";
3+
import { MongoClient, Db, IndexDescription } from "mongodb";
4+
import { join } from "node:path";
5+
import { cargo } from "async";
6+
import { findAndValidatePrimaryKey } from "../utils/check-primary-key.js";
7+
8+
interface ChainConfig {
9+
features: {
10+
contract_state: {
11+
enabled: boolean;
12+
contracts: Record<string, Record<string, any>>;
13+
};
14+
};
15+
}
16+
17+
export class ContractStateSynchronizer {
18+
private chain: string;
19+
private config: ChainConfig;
20+
private client: APIClient;
21+
private mongoClient: MongoClient;
22+
private db: Db | undefined;
23+
private currentBlock: number = 0;
24+
private currentBlockId: string = '';
25+
private currentBlockTime: string = '';
26+
private processedDocs: number = 0;
27+
28+
constructor(chain: string) {
29+
this.chain = chain;
30+
this.config = this.loadConfig();
31+
this.client = this.createAPIClient();
32+
this.mongoClient = this.createMongoClient();
33+
}
34+
35+
private loadConfig(): ChainConfig {
36+
const configDir = join(import.meta.dirname, '../../../config/chains');
37+
const configPath = join(configDir, `${this.chain}.config.json`);
38+
return JSON.parse(readFileSync(configPath, 'utf-8'));
39+
}
40+
41+
private loadConnections() {
42+
const configDir = join(import.meta.dirname, '../../../config');
43+
return JSON.parse(readFileSync(join(configDir, "connections.json"), 'utf-8'));
44+
}
45+
46+
private createAPIClient(): APIClient {
47+
const connections = this.loadConnections();
48+
const endpoint = connections.chains[this.chain].http;
49+
if (!endpoint) {
50+
throw new Error("No HTTP Endpoint!");
51+
}
52+
return new APIClient({url: endpoint});
53+
}
54+
55+
private createMongoClient(): MongoClient {
56+
const connections = this.loadConnections();
57+
const _mongo = connections.mongodb;
58+
let uri = `mongodb://${_mongo.host}:${_mongo.port}`;
59+
if (_mongo.user && _mongo.pass) {
60+
uri = `mongodb://${_mongo.user}:${encodeURIComponent(_mongo.pass)}@${_mongo.host}:${_mongo.port}`;
61+
}
62+
if (_mongo.authSource) {
63+
uri += `/?authSource=${_mongo.authSource}`;
64+
}
65+
return new MongoClient(uri);
66+
}
67+
68+
private async setupIndices() {
69+
if (!this.db) {
70+
throw new Error("Database not initialized");
71+
}
72+
73+
if (this.config.features.contract_state.contracts) {
74+
for (const [contract, tables] of Object.entries(this.config.features.contract_state.contracts)) {
75+
for (const [table, config] of Object.entries(tables)) {
76+
const collectionName = `${contract}-${table}`;
77+
const collection = this.db.collection(collectionName);
78+
79+
const indices: IndexDescription[] = [];
80+
81+
// Adicionar índices padrão apenas se auto_index for true
82+
if (config.auto_index === true) {
83+
indices.push(
84+
{key: {'@pk': -1}},
85+
{key: {'@scope': 1}},
86+
{key: {'@block_num': -1}},
87+
{key: {'@block_time': -1}},
88+
{key: {'@payer': 1}}
89+
);
90+
}
91+
92+
if (config.auto_index === true) {
93+
console.log(`Auto-indexing enabled for ${collectionName}`);
94+
const contractAbi = await this.client.v1.chain.get_abi(contract);
95+
if (contractAbi && contractAbi.abi) {
96+
const tables = contractAbi.abi.tables;
97+
const structs = contractAbi.abi.structs;
98+
const extractStructFlat = (structName: string) => {
99+
const struct = structs.find(value => value.name === structName);
100+
if (struct?.base) {
101+
extractStructFlat(struct.base);
102+
}
103+
struct?.fields.forEach(value => {
104+
indices.push({key: {[value.name]: 1}});
105+
});
106+
};
107+
const tableData = tables.find(value => value.name === table);
108+
if (tableData) {
109+
extractStructFlat(tableData.type);
110+
}
111+
}
112+
} else if (config.indices) {
113+
console.log(`Using defined indices for ${collectionName}`);
114+
for (const [field, direction] of Object.entries(config.indices)) {
115+
indices.push({key: {[field]: direction === 'desc' ? -1 : 1}});
116+
}
117+
}
118+
119+
if (indices.length > 0) {
120+
console.log(`Creating ${indices.length} indices for ${collectionName}`);
121+
await collection.createIndexes(indices);
122+
} else {
123+
console.log(`No indices to create for ${collectionName}`);
124+
}
125+
}
126+
}
127+
}
128+
}
129+
130+
private async* processContractState() {
131+
if (this.config.features.contract_state.contracts) {
132+
for (const [contract, tables] of Object.entries(this.config.features.contract_state.contracts)) {
133+
for (const [table, config] of Object.entries(tables)) {
134+
135+
let pkField = await findAndValidatePrimaryKey(contract,table,this.client)
136+
137+
138+
if(!pkField?.field) {
139+
console.error(`Primary key not found for ${contract}-${table}`);
140+
continue;
141+
}else {
142+
console.log(`Primary key found for ${contract}-${table}: ${pkField.field}`);
143+
144+
145+
146+
}
147+
148+
149+
150+
console.log(`Processing ${contract}-${table}`);
151+
let lowerBound: string | null = null;
152+
let totalRows = 0;
153+
do {
154+
try {
155+
const scopes = await this.client.v1.chain.get_table_by_scope({
156+
code: contract,
157+
table: table,
158+
limit: 1000,
159+
lower_bound: lowerBound ? Name.from(lowerBound).value.toString() : undefined
160+
});
161+
162+
for (const scopeRow of scopes.rows) {
163+
const scope = scopeRow.scope.toString();
164+
// console.log(`Aqui scope`, scope)
165+
const result = await this.client.v1.chain.get_table_rows({
166+
code: contract,
167+
scope: scope,
168+
table: table,
169+
limit: 1000,
170+
json: true,
171+
show_payer: true
172+
});
173+
174+
175+
if(result.ram_payers) {
176+
for (const [index,row] of result.rows.entries()) {
177+
let pkValue = ''
178+
switch (pkField.type) {
179+
case 'asset':
180+
pkValue = Asset.from(row[pkField.field]).symbol.code.value.toString();
181+
182+
183+
break;
184+
case 'name':
185+
pkValue = Name.from(row[pkField.field]).value.toString();
186+
break;
187+
case 'uint64':
188+
pkValue = row[pkField.field].toString();
189+
break;
190+
default:
191+
pkValue = row[pkField.field].toString();
192+
break;
193+
}
194+
195+
console.log(`pkValue`, pkValue, scope)
196+
197+
totalRows++;
198+
yield {
199+
contract,
200+
table,
201+
data: row,
202+
scope: scope,
203+
primary_key: pkValue,
204+
payer: result.ram_payers[index].toString()
205+
};
206+
}
207+
}
208+
}
209+
lowerBound = scopes.more;
210+
console.log(`Fetched ${totalRows} rows from ${contract}-${table}. Total: ${totalRows}`);
211+
} catch (error) {
212+
console.error(`Error processing ${contract}-${table}:`, error);
213+
lowerBound = null;
214+
}
215+
} while (lowerBound);
216+
console.log(`Finished processing ${contract}-${table}. Total rows: ${totalRows}`);
217+
}
218+
}
219+
} else {
220+
console.log("No contracts defined in the configuration");
221+
}
222+
}
223+
224+
public async run() {
225+
console.log(`Starting contract state sync for chain: ${this.chain}`);
226+
const tRef = Date.now();
227+
try {
228+
if (!this.config.features.contract_state.enabled) {
229+
console.log("Contract state synchronization is not enabled in the config.");
230+
return;
231+
}
232+
233+
const info = await this.client.v1.chain.get_info();
234+
this.currentBlock = info.head_block_num.toNumber();
235+
this.currentBlockTime = info.head_block_time.toString();
236+
this.currentBlockId = info.head_block_id.toString();
237+
console.log(`Current block: ${this.currentBlock}`);
238+
239+
await this.mongoClient.connect();
240+
console.log("Connected to MongoDB");
241+
this.db = this.mongoClient.db(`hyperion_${this.chain}`);
242+
243+
await this.setupIndices();
244+
245+
const cargoQueue = cargo(async (docs: any[], cb) => {
246+
247+
const groupedOps = new Map<string, any[]>();
248+
249+
docs.forEach(doc => {
250+
251+
// const pk = String(Name.from(doc.primary_key).value);
252+
// const pk = String(Name.from(doc.data.account).value);
253+
// console.log(`pk`, pk)
254+
255+
const op = {
256+
updateOne: {
257+
filter: {
258+
'@scope': doc.scope,
259+
'@pk': doc.primary_key
260+
},
261+
update: {
262+
$set: {
263+
'@scope': doc.scope,
264+
'@pk': doc.primary_key,
265+
'@payer': doc.payer || '',
266+
'@block_num': this.currentBlock,
267+
'@block_id': this.currentBlockId,
268+
'@block_time': this.currentBlockTime,
269+
...doc.data
270+
}
271+
},
272+
upsert: true
273+
}
274+
};
275+
276+
const collection = `${doc.contract}-${doc.table}`;
277+
const col = groupedOps.get(collection);
278+
if (col) {
279+
col.push(op);
280+
} else {
281+
groupedOps.set(collection, [op]);
282+
}
283+
});
284+
285+
const promises: Promise<any>[] = [];
286+
287+
groupedOps.forEach((value, key) => {
288+
if (this.db) {
289+
// console.log(`Inserting ${value.length} documents into ${key}`);
290+
promises.push(this.db.collection(key).bulkWrite(value, {ordered: false}));
291+
}
292+
});
293+
294+
try {
295+
const results = await Promise.all(promises);
296+
} catch (error) {
297+
console.error("Error during bulk write:", error);
298+
} finally {
299+
cb();
300+
}
301+
}, 1000);
302+
303+
console.log("Starting to process contract state");
304+
for await (const doc of this.processContractState()) {
305+
this.processedDocs++;
306+
cargoQueue.push(doc);
307+
if (this.processedDocs % 1000 === 0) {
308+
console.log(`Processed ${this.processedDocs} documents`);
309+
}
310+
}
311+
312+
console.log(`Waiting for queue to drain...`);
313+
await cargoQueue.drain();
314+
console.log(`Queue drained. Total processed documents: ${this.processedDocs}`);
315+
} catch (e) {
316+
console.error("Error during contract state sync:", e);
317+
throw e;
318+
} finally {
319+
await this.mongoClient.close();
320+
console.log("MongoDB connection closed");
321+
const tFinal = Date.now();
322+
console.log(`Processing took: ${(tFinal - tRef)}ms`);
323+
}
324+
}
325+
}

0 commit comments

Comments
 (0)