Skip to content

Commit 9a72f07

Browse files
committed
Merge branch 'fix/evm-cursor-memory-leaks' of github.com:nitsujlangston/bitcore
2 parents 87ca4dd + 8da0c3f commit 9a72f07

File tree

6 files changed

+308
-19
lines changed

6 files changed

+308
-19
lines changed

packages/bitcore-node/src/providers/chain-state/evm/api/csp.ts

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -546,8 +546,8 @@ export class BaseEVMStateProvider extends InternalStateProvider implements IChai
546546
return resolve();
547547
}
548548
const ethTransactionTransform = new EVMListTransactionsStream(walletAddresses, args.tokenAddress);
549-
const populateReceipt = new PopulateReceiptTransform();
550-
const populateEffects = new PopulateEffectsTransform();
549+
const populateReceipt = new PopulateReceiptTransform(this);
550+
const populateEffects = new PopulateEffectsTransform(this);
551551

552552
const streamParams: BuildWalletTxsStreamParams = {
553553
transactionStream,
@@ -582,13 +582,33 @@ export class BaseEVMStateProvider extends InternalStateProvider implements IChai
582582
let { transactionStream } = streamParams;
583583
const { populateEffects } = streamParams;
584584

585-
transactionStream = EVMTransactionStorage.collection
585+
// Store cursor reference for cleanup
586+
const cursor = EVMTransactionStorage.collection
586587
.find(query)
587588
.sort({ blockTimeNormalized: 1 })
588-
.addCursorFlag('noCursorTimeout', true)
589-
.pipe(new TransformWithEventPipe({ objectMode: true, passThrough: true }));
589+
.addCursorFlag('noCursorTimeout', true);
590+
591+
// Add cleanup handlers when client disconnects
592+
let cursorClosed = false;
593+
const cleanupCursor = () => {
594+
if (!cursorClosed) {
595+
cursorClosed = true;
596+
try {
597+
cursor.close();
598+
} catch {
599+
// Cursor might already be closed, ignore
600+
}
601+
}
602+
};
603+
604+
const { req, res } = params;
605+
req.on('close', cleanupCursor);
606+
res.on('close', cleanupCursor);
607+
608+
// Pipe cursor to transform stream
609+
transactionStream = cursor.pipe(new TransformWithEventPipe({ objectMode: true, passThrough: true }));
590610

591-
transactionStream = transactionStream.eventPipe(populateEffects); // For old db entires
611+
transactionStream = transactionStream.eventPipe(populateEffects); // For old db entries
592612

593613
if (params.args.tokenAddress) {
594614
const erc20Transform = new Erc20RelatedFilterTransform(params.args.tokenAddress);

packages/bitcore-node/src/providers/chain-state/evm/api/gnosis.ts

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -192,15 +192,34 @@ export class GnosisApi {
192192

193193
let transactionStream = new Readable({ objectMode: true });
194194
const ethTransactionTransform = new EVMListTransactionsStream([multisigContractAddress, args.tokenAddress]);
195-
const populateReceipt = new PopulateReceiptTransform();
196-
const populateEffects = new PopulateEffectsTransform();
195+
const EVM = getCSP(chain, network);
196+
const populateReceipt = new PopulateReceiptTransform(EVM);
197+
const populateEffects = new PopulateEffectsTransform(EVM);
197198

198-
transactionStream = EVMTransactionStorage.collection
199+
// Store cursor reference for cleanup
200+
const cursor = EVMTransactionStorage.collection
199201
.find(query)
200202
.sort({ blockTimeNormalized: 1 })
201203
.addCursorFlag('noCursorTimeout', true);
202204

203-
transactionStream = transactionStream.pipe(populateEffects); // For old db entires
205+
// Add cleanup handlers when client disconnects
206+
let cursorClosed = false;
207+
const cleanupCursor = () => {
208+
if (!cursorClosed) {
209+
cursorClosed = true;
210+
try {
211+
cursor.close();
212+
} catch {
213+
// Cursor might already be closed, ignore
214+
}
215+
}
216+
};
217+
218+
const { req } = params;
219+
req.on('close', cleanupCursor);
220+
res.on('close', cleanupCursor);
221+
222+
transactionStream = cursor.pipe(populateEffects); // For old db entries
204223

205224
if (multisigContractAddress) {
206225
const multisigTransform = new MultisigRelatedFilterTransform(multisigContractAddress, args.tokenAddress);

packages/bitcore-node/src/providers/chain-state/evm/api/internalTxTransform.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,13 @@ export class InternalTxRelatedFilterTransform extends TransformWithEventPipe {
6868

6969
async getWalletAddresses(tx) {
7070
if (!this.walletAddresses.length) {
71-
this.walletAddresses = await WalletAddressStorage.collection
72-
.find({ chain: tx.chain, network: tx.network, wallet: this.walletId })
73-
.toArray();
71+
const cursor = WalletAddressStorage.collection
72+
.find({ chain: tx.chain, network: tx.network, wallet: this.walletId });
73+
try {
74+
this.walletAddresses = await cursor.toArray();
75+
} finally {
76+
await cursor.close();
77+
}
7478
}
7579
return this.walletAddresses.map(walletAddress => this.web3.utils.toChecksumAddress(walletAddress.address));
7680
}

packages/bitcore-node/src/providers/chain-state/evm/api/populateEffectsTransform.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,13 @@ import { IEVMTransaction } from '../types';
44
import { BaseEVMStateProvider } from './csp';
55

66
export class PopulateEffectsTransform extends TransformWithEventPipe {
7-
constructor() {
7+
constructor(private evm: BaseEVMStateProvider) {
88
super({ objectMode: true });
99
}
1010

1111
async _transform(tx: MongoBound<IEVMTransaction>, _, done) {
1212
// Add effects to old db entries
13-
const EVM = new BaseEVMStateProvider(tx.chain);
14-
tx = EVM.populateEffects(tx);
13+
tx = this.evm.populateEffects(tx);
1514
this.push(tx);
1615
return done();
1716
}

packages/bitcore-node/src/providers/chain-state/evm/api/populateReceiptTransform.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,13 @@ import { IEVMTransaction } from '../types';
44
import { BaseEVMStateProvider } from './csp';
55

66
export class PopulateReceiptTransform extends TransformWithEventPipe {
7-
constructor() {
7+
constructor(private evm: BaseEVMStateProvider) {
88
super({ objectMode: true });
99
}
1010

1111
async _transform(tx: MongoBound<IEVMTransaction>, _, done) {
1212
try {
13-
const EVM = new BaseEVMStateProvider(tx.chain);
14-
tx = await EVM.populateReceipt(tx);
13+
tx = await this.evm.populateReceipt(tx);
1514
} catch {/* ignore error */}
1615
this.push(tx);
1716
return done();
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
import { ObjectId } from 'bson';
2+
import { expect } from 'chai';
3+
import { Request, Response } from 'express-serve-static-core';
4+
import * as sinon from 'sinon';
5+
import { PassThrough } from 'stream';
6+
import { MongoBound } from '../../../src/models/base';
7+
import { IWallet, WalletStorage } from '../../../src/models/wallet';
8+
import { WalletAddressStorage } from '../../../src/models/walletAddress';
9+
import { EVMTransactionStorage } from '../../../src/providers/chain-state/evm/models/transaction';
10+
import { intAfterHelper, intBeforeHelper } from '../../helpers/integration';
11+
12+
const chain = 'ETH';
13+
const network = 'regtest';
14+
15+
describe('EVM Memory Leak Prevention', function() {
16+
const suite = this;
17+
this.timeout(30000);
18+
let globalSandbox: sinon.SinonSandbox;
19+
let ETH: any;
20+
21+
before(intBeforeHelper);
22+
after(async () => intAfterHelper(suite));
23+
24+
beforeEach(async () => {
25+
globalSandbox = sinon.createSandbox();
26+
// Use the ETH module instance
27+
const { ETH: ETHModule } = await import('../../../src/modules/ethereum/api/csp');
28+
ETH = ETHModule;
29+
await EVMTransactionStorage.collection.deleteMany({});
30+
await WalletStorage.collection.deleteMany({});
31+
await WalletAddressStorage.collection.deleteMany({});
32+
});
33+
34+
afterEach(() => {
35+
globalSandbox.restore();
36+
});
37+
38+
function createMockReqRes() {
39+
const reqStream = new PassThrough();
40+
const req = reqStream as unknown as Request;
41+
42+
const resStream = new PassThrough();
43+
const res = resStream as unknown as Response;
44+
45+
(res as any).write = resStream.write.bind(resStream);
46+
(res as any).end = resStream.end.bind(resStream);
47+
48+
res.type = () => res;
49+
res.status = () => res;
50+
res.send = () => res;
51+
52+
// Consume data to keep stream flowing
53+
resStream.on('data', () => {});
54+
55+
return { req, res, reqEmitter: reqStream, resEmitter: resStream };
56+
}
57+
58+
describe('Cursor Cleanup on Client Disconnect', () => {
59+
it('should not crash when client disconnects during streamWalletTransactions', async () => {
60+
const address = '0x7F17aF79AABC4A297A58D389ab5905fEd4Ec9502';
61+
const objectId = ObjectId.createFromHexString('60f9abed0e32086bf9903bb5');
62+
const wallet = {
63+
_id: objectId,
64+
chain,
65+
network,
66+
name: 'test-wallet',
67+
pubKey: '0x029ec2ebdebe6966259cf3c6f35c4f126b82fe072bf9d0e81dad375f1d6d2d9054',
68+
path: 'm/44\'/60\'/0\'/0/0',
69+
singleAddress: true
70+
} as MongoBound<IWallet>;
71+
72+
await WalletStorage.collection.insertOne(wallet as any);
73+
await WalletAddressStorage.collection.insertOne({
74+
chain,
75+
network,
76+
wallet: objectId,
77+
address,
78+
processed: true
79+
});
80+
81+
const txCount = 5;
82+
const txs = new Array(txCount).fill({}).map((_, i) => ({
83+
chain,
84+
network,
85+
blockHeight: 1,
86+
gasPrice: 10 * 1e9,
87+
data: Buffer.from(''),
88+
from: address,
89+
to: '0xRecipient123',
90+
txid: '0x' + (i + 1).toString(16).padStart(64, '0'),
91+
wallets: [objectId]
92+
} as any));
93+
94+
await EVMTransactionStorage.collection.insertMany(txs);
95+
96+
const { req, res, reqEmitter } = createMockReqRes();
97+
98+
const streamPromise = ETH.streamWalletTransactions({
99+
chain,
100+
network,
101+
wallet,
102+
req,
103+
res,
104+
args: {}
105+
});
106+
107+
// Wait for stream to start
108+
await new Promise(resolve => setTimeout(resolve, 100));
109+
110+
// Simulate client disconnect
111+
reqEmitter.emit('close');
112+
113+
// Wait for cleanup
114+
await new Promise(resolve => setTimeout(resolve, 100));
115+
116+
// Should not throw - the implementation should handle cleanup gracefully
117+
await streamPromise.catch(() => {});
118+
119+
// If we get here without crashing, the test passes
120+
expect(true).to.be.true;
121+
});
122+
123+
it('should handle multiple interrupted requests without accumulating resources', async () => {
124+
const address = '0x7F17aF79AABC4A297A58D389ab5905fEd4Ec9502';
125+
const objectId = ObjectId.createFromHexString('60f9abed0e32086bf9903bb5');
126+
const wallet = {
127+
_id: objectId,
128+
chain,
129+
network,
130+
name: 'test-wallet-multi',
131+
pubKey: '0x029ec2ebdebe6966259cf3c6f35c4f126b82fe072bf9d0e81dad375f1d6d2d9054',
132+
path: 'm/44\'/60\'/0\'/0/0',
133+
singleAddress: true
134+
} as MongoBound<IWallet>;
135+
136+
await WalletStorage.collection.insertOne(wallet as any);
137+
await WalletAddressStorage.collection.insertOne({
138+
chain,
139+
network,
140+
wallet: objectId,
141+
address,
142+
processed: true
143+
});
144+
145+
const txCount = 5;
146+
const txs = new Array(txCount).fill({}).map((_, i) => ({
147+
chain,
148+
network,
149+
blockHeight: 1,
150+
gasPrice: 10 * 1e9,
151+
data: Buffer.from(''),
152+
from: address,
153+
to: '0xRecipient456',
154+
txid: '0x' + (i + 100).toString(16).padStart(64, '0'),
155+
wallets: [objectId]
156+
} as any));
157+
158+
await EVMTransactionStorage.collection.insertMany(txs);
159+
160+
// Make 3 requests that all get interrupted
161+
const numRequests = 3;
162+
for (let i = 0; i < numRequests; i++) {
163+
const { req, res, reqEmitter } = createMockReqRes();
164+
165+
const streamPromise = ETH.streamWalletTransactions({
166+
chain,
167+
network,
168+
wallet,
169+
req,
170+
res,
171+
args: {}
172+
});
173+
174+
await new Promise(resolve => setTimeout(resolve, 50));
175+
reqEmitter.emit('close');
176+
await new Promise(resolve => setTimeout(resolve, 50));
177+
await streamPromise.catch(() => {});
178+
}
179+
180+
// If we completed all requests without issues, test passes
181+
expect(true).to.be.true;
182+
});
183+
});
184+
185+
describe('Provider Instance Reuse', () => {
186+
it('should complete streaming without errors', async () => {
187+
const address = '0x7F17aF79AABC4A297A58D389ab5905fEd4Ec9502';
188+
const objectId = ObjectId.createFromHexString('60f9abed0e32086bf9903bb5');
189+
const wallet = {
190+
_id: objectId,
191+
chain,
192+
network,
193+
name: 'test-wallet-provider',
194+
pubKey: '0x029ec2ebdebe6966259cf3c6f35c4f126b82fe072bf9d0e81dad375f1d6d2d9054',
195+
path: 'm/44\'/60\'/0\'/0/0',
196+
singleAddress: true
197+
} as MongoBound<IWallet>;
198+
199+
await WalletStorage.collection.insertOne(wallet as any);
200+
await WalletAddressStorage.collection.insertOne({
201+
chain,
202+
network,
203+
wallet: objectId,
204+
address,
205+
processed: true
206+
});
207+
208+
const txCount = 5;
209+
const txs = new Array(txCount).fill({}).map((_, i) => ({
210+
chain,
211+
network,
212+
blockHeight: 1,
213+
gasPrice: 10 * 1e9,
214+
data: Buffer.from(''),
215+
from: address,
216+
to: '0xRecipient789',
217+
txid: '0x' + (i + 200).toString(16).padStart(64, '0'),
218+
wallets: [objectId]
219+
} as any));
220+
221+
await EVMTransactionStorage.collection.insertMany(txs);
222+
223+
const { req, res, resEmitter } = createMockReqRes();
224+
225+
let receivedTxCount = 0;
226+
resEmitter.on('data', () => {
227+
receivedTxCount++;
228+
});
229+
230+
await new Promise((resolve, reject) => {
231+
resEmitter.on('finish', resolve);
232+
resEmitter.on('error', reject);
233+
234+
ETH.streamWalletTransactions({
235+
chain,
236+
network,
237+
wallet,
238+
req,
239+
res,
240+
args: {}
241+
}).catch(reject);
242+
});
243+
244+
// Verify that we received some transactions (stream worked)
245+
expect(receivedTxCount).to.be.greaterThan(0);
246+
});
247+
});
248+
});

0 commit comments

Comments
 (0)