Skip to content

Commit 6df3609

Browse files
authored
Merge pull request #34 from aleph-im/feat/clean-responses-from-indexer
Clean up responses in indexer instances
2 parents 088a55a + 05d1a24 commit 6df3609

File tree

2 files changed

+37
-30
lines changed

2 files changed

+37
-30
lines changed

packages/framework/src/services/indexer/src/dal/entityRequestResponse.ts

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,21 +56,26 @@ export function createEntityRequestResponseDAL<T extends ParsedEntity<unknown>>(
5656
newEntity: EntityRequestResponse<T>,
5757
): Promise<EntityUpdateOp> {
5858
if (oldEntity) {
59-
const nonceIndexes = {
60-
...oldEntity.nonceIndexes,
61-
...newEntity.nonceIndexes,
62-
}
63-
6459
if (!('parsed' in newEntity) && 'parsed' in oldEntity) {
6560
Object.assign(newEntity, oldEntity)
6661
}
6762

68-
newEntity.nonceIndexes = nonceIndexes
63+
if (
64+
('parsed' in newEntity)
65+
&& !newEntity.nonceIndexes
66+
) {
67+
return EntityUpdateOp.Delete
68+
}
6969

70-
// console.log(
71-
// 'updated entity [entity_request_responses]',
72-
// newEntity.timestampIndexes.length,
73-
// )
70+
// @note: This is a hack to make sure that the nonce indexes are
71+
// not overwritten by the new entity. This is usually the case when
72+
// the entity contains the actual transaction data, at which point we
73+
// do not have the actual nonce indexes, but still need to pass in a
74+
// nonce index object to the entity storage.
75+
newEntity.nonceIndexes = {
76+
...newEntity.nonceIndexes,
77+
...oldEntity.nonceIndexes,
78+
}
7479
}
7580

7681
return EntityUpdateOp.Update

packages/framework/src/services/indexer/src/entityFetcher.ts

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -189,23 +189,20 @@ export abstract class BaseIndexerEntityFetcher<
189189

190190
async isRequestComplete(nonce: number): Promise<boolean> {
191191
const request = await this.entityRequestDAL.get(nonce.toString())
192-
if (!request) throw new Error(`Request with nonce ${nonce} does not exists`)
192+
if (!request) throw new Error(`Request with nonce ${nonce} does not exist`)
193193

194194
return !!request.complete
195195
}
196196

197197
async awaitRequestComplete(nonce: number): Promise<void> {
198-
const request = await this.entityRequestDAL.get(nonce.toString())
199-
if (!request) throw new Error(`Request with nonce ${nonce} does not exists`)
200-
201-
if (!request.complete) {
198+
if (!await this.isRequestComplete(nonce)) {
202199
await this.getFuture(nonce).promise
203200
}
204201
}
205202

206203
async getResponse(nonce: number): Promise<TransactionResponse<T>> {
207204
const request = await this.entityRequestDAL.get(nonce.toString())
208-
if (!request) throw new Error(`Request with nonce ${nonce} does not exists`)
205+
if (!request) throw new Error(`Request with nonce ${nonce} does not exist`)
209206

210207
if (!request.complete) {
211208
await this.getFuture(nonce).promise
@@ -223,7 +220,13 @@ export abstract class BaseIndexerEntityFetcher<
223220
remove: async () => {
224221
this.log('----> REMOVE REQ 🎈', request.nonce)
225222
await this.entityRequestDAL.remove(request)
226-
// @todo: Update nonceIndexes / Remove from entityRequestResponseDAL
223+
// remove nonce from nonceIndex and update response
224+
const items = []
225+
for await (const item of response) {
226+
delete item.nonceIndexes[request.nonce]
227+
items.push(item)
228+
}
229+
await this.entityRequestResponseDAL.save(items)
227230
},
228231
}
229232
}
@@ -253,10 +256,9 @@ export abstract class BaseIndexerEntityFetcher<
253256
const requestsNonces = []
254257
let lastFilteredTxs = 0
255258

256-
let filteredTxs: T[] = []
259+
let filteredTxs: EntityRequestResponse<T>[] = []
257260
let remainingTxs: T[] = chunk
258261
let requestCount = 0
259-
const requestCountId = []
260262

261263
for await (const request of requests) {
262264
const { nonce, complete } = request
@@ -267,10 +269,13 @@ export abstract class BaseIndexerEntityFetcher<
267269
request,
268270
)
269271

270-
filteredTxs = filteredTxs.concat(result.filteredEntities)
272+
const requestResponses = result.filteredEntities as EntityRequestResponse<T>[]
273+
for (const responses of requestResponses) {
274+
(responses as EntityRequestResponse<T>).nonceIndexes = { [nonce]: 0 }
275+
}
276+
filteredTxs = filteredTxs.concat(requestResponses)
271277
remainingTxs = result.remainingEntities
272278
requestCount++
273-
requestCountId.push(nonce)
274279

275280
lastFilteredTxs = filteredTxs.length - lastFilteredTxs
276281
requestsNonces.push([nonce, lastFilteredTxs])
@@ -282,14 +287,11 @@ export abstract class BaseIndexerEntityFetcher<
282287

283288
if (filteredTxs.length === 0) return
284289

285-
const requestResponse =
286-
filteredTxs as unknown as EntityRequestResponse<T>[]
287-
288290
const pendingIds = filteredTxs as unknown as EntityRequestPendingEntity[]
289291

290-
this.log(`Removing pendingIds`, pendingIds.map((p) => p.id).join('\n'))
292+
//this.log(`Removing pendingIds`, pendingIds.map((p) => p.id).join('\n'))
291293

292-
await this.entityRequestResponseDAL.save(requestResponse)
294+
await this.entityRequestResponseDAL.save(filteredTxs)
293295
await this.entityRequestPendingEntityDAL.remove(pendingIds)
294296

295297
this.checkCompletionJob.run().catch(() => 'ignore')
@@ -347,7 +349,7 @@ export abstract class BaseIndexerEntityFetcher<
347349
const future = this.getFuture(nonce)
348350
let count = 0
349351

350-
// @note: Sometimes we receive the responses before inserting the pendings signatures on
352+
// @note: Sometimes we receive the responses before inserting the pending signatures on
351353
// the db, the purpose of this mutex is to avoid this
352354
const now1 = Date.now()
353355
const release = await this.requestMutex.acquire()
@@ -397,7 +399,7 @@ export abstract class BaseIndexerEntityFetcher<
397399
const elapsed2 = Date.now() - now2
398400
this.log(`onRequest time => ${elapsed1 / 1000} | ${elapsed2 / 1000}`)
399401

400-
this.log(`🟡 Request ${nonce} inited`)
402+
this.log(`🟡 Request ${nonce} initialized with ${count} entities`)
401403

402404
if (!count) {
403405
this.log(`🟢 Request ${nonce} complete`)
@@ -513,7 +515,7 @@ export abstract class BaseIndexerEntityFetcher<
513515
`[Retry] Check ${tx?.id}`,
514516
!!tx,
515517
tx && 'parsed' in (tx || {}),
516-
tx && tx.nonceIndexes[nonce] >= 0,
518+
tx?.nonceIndexes[nonce],
517519
request.nonce,
518520
)
519521

@@ -564,7 +566,7 @@ export abstract class BaseIndexerEntityFetcher<
564566
): Promise<void> {
565567
const ids = pendings.map(({ id }) => id)
566568

567-
this.log(`Retrying ${ids.length} ${this.blockchainId} ids`, ids)
569+
this.log(`Retrying ${ids.length} ${this.blockchainId} ids`)
568570

569571
return this.fetcherMsClient
570572
.useBlockchain(this.blockchainId as Blockchain)

0 commit comments

Comments
 (0)