@@ -30,6 +30,8 @@ use std::sync::Arc;
3030pub struct BitcoindRpcClient {
3131 rpc_client : Arc < RpcClient > ,
3232 latest_mempool_timestamp : AtomicU64 ,
33+ mempool_entries_cache : tokio:: sync:: Mutex < HashMap < Txid , MempoolEntry > > ,
34+ mempool_txs_cache : tokio:: sync:: Mutex < HashMap < Txid , ( Transaction , u64 ) > > ,
3335}
3436
3537impl BitcoindRpcClient {
@@ -42,7 +44,9 @@ impl BitcoindRpcClient {
4244
4345 let latest_mempool_timestamp = AtomicU64 :: new ( 0 ) ;
4446
45- Self { rpc_client, latest_mempool_timestamp }
47+ let mempool_entries_cache = tokio:: sync:: Mutex :: new ( HashMap :: new ( ) ) ;
48+ let mempool_txs_cache = tokio:: sync:: Mutex :: new ( HashMap :: new ( ) ) ;
49+ Self { rpc_client, latest_mempool_timestamp, mempool_entries_cache, mempool_txs_cache }
4650 }
4751
4852 pub ( crate ) fn rpc_client ( & self ) -> Arc < RpcClient > {
@@ -122,23 +126,68 @@ impl BitcoindRpcClient {
122126 . map ( |resp| resp. 0 )
123127 }
124128
125- pub ( crate ) async fn get_mempool_entry ( & self , txid : Txid ) -> std:: io:: Result < MempoolEntry > {
129+ pub ( crate ) async fn get_mempool_entry (
130+ & self , txid : Txid ,
131+ ) -> std:: io:: Result < Option < MempoolEntry > > {
126132 let txid_hex = bitcoin:: consensus:: encode:: serialize_hex ( & txid) ;
127133 let txid_json = serde_json:: json!( txid_hex) ;
128- self . rpc_client
134+ match self
135+ . rpc_client
129136 . call_method :: < GetMempoolEntryResponse > ( "getmempoolentry" , & [ txid_json] )
130137 . await
131- . map ( |resp| MempoolEntry { txid, height : resp. height , time : resp. time } )
138+ {
139+ Ok ( resp) => Ok ( Some ( MempoolEntry { txid, height : resp. height , time : resp. time } ) ) ,
140+ Err ( e) => match e. into_inner ( ) {
141+ Some ( inner) => {
142+ let rpc_error_res: Result < Box < RpcError > , _ > = inner. downcast ( ) ;
143+
144+ match rpc_error_res {
145+ Ok ( rpc_error) => {
146+ // Check if it's the 'not found' error code.
147+ if rpc_error. code == -5 {
148+ Ok ( None )
149+ } else {
150+ Err ( std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , rpc_error) )
151+ }
152+ } ,
153+ Err ( _) => Err ( std:: io:: Error :: new (
154+ std:: io:: ErrorKind :: Other ,
155+ "Failed to process getmempoolentry response" ,
156+ ) ) ,
157+ }
158+ } ,
159+ None => Err ( std:: io:: Error :: new (
160+ std:: io:: ErrorKind :: Other ,
161+ "Failed to process getmempoolentry response" ,
162+ ) ) ,
163+ } ,
164+ }
132165 }
133166
134- pub ( crate ) async fn get_mempool_entries ( & self ) -> std:: io:: Result < Vec < MempoolEntry > > {
167+ pub ( crate ) async fn update_mempool_entries_cache ( & self ) -> std:: io:: Result < ( ) > {
135168 let mempool_txids = self . get_raw_mempool ( ) . await ?;
136- let mut mempool_entries = Vec :: with_capacity ( mempool_txids. len ( ) ) ;
169+
170+ let mut mempool_entries_cache = self . mempool_entries_cache . lock ( ) . await ;
171+ mempool_entries_cache. retain ( |txid, _| mempool_txids. contains ( txid) ) ;
172+
173+ if let Some ( difference) = mempool_txids. len ( ) . checked_sub ( mempool_entries_cache. capacity ( ) )
174+ {
175+ mempool_entries_cache. reserve ( difference)
176+ }
177+
137178 for txid in mempool_txids {
138- let entry = self . get_mempool_entry ( txid) . await ?;
139- mempool_entries. push ( entry) ;
179+ if mempool_entries_cache. contains_key ( & txid) {
180+ continue ;
181+ }
182+
183+ if let Some ( entry) = self . get_mempool_entry ( txid) . await ? {
184+ mempool_entries_cache. insert ( txid, entry. clone ( ) ) ;
185+ }
140186 }
141- Ok ( mempool_entries)
187+
188+ mempool_entries_cache. shrink_to_fit ( ) ;
189+
190+ Ok ( ( ) )
142191 }
143192
144193 /// Get mempool transactions, alongside their first-seen unix timestamps.
@@ -152,10 +201,20 @@ impl BitcoindRpcClient {
152201 let prev_mempool_time = self . latest_mempool_timestamp . load ( Ordering :: Relaxed ) ;
153202 let mut latest_time = prev_mempool_time;
154203
155- let mempool_entries = self . get_mempool_entries ( ) . await ?;
156- let mut txs_to_emit = Vec :: new ( ) ;
204+ self . update_mempool_entries_cache ( ) . await ?;
205+
206+ let mempool_entries_cache = self . mempool_entries_cache . lock ( ) . await ;
207+ let mut mempool_txs_cache = self . mempool_txs_cache . lock ( ) . await ;
208+ mempool_txs_cache. retain ( |txid, _| mempool_entries_cache. contains_key ( txid) ) ;
157209
158- for entry in mempool_entries {
210+ if let Some ( difference) =
211+ mempool_entries_cache. len ( ) . checked_sub ( mempool_txs_cache. capacity ( ) )
212+ {
213+ mempool_txs_cache. reserve ( difference)
214+ }
215+
216+ let mut txs_to_emit = Vec :: with_capacity ( mempool_entries_cache. len ( ) ) ;
217+ for ( txid, entry) in mempool_entries_cache. iter ( ) {
159218 if entry. time > latest_time {
160219 latest_time = entry. time ;
161220 }
@@ -171,8 +230,14 @@ impl BitcoindRpcClient {
171230 continue ;
172231 }
173232
233+ if let Some ( ( cached_tx, cached_time) ) = mempool_txs_cache. get ( txid) {
234+ txs_to_emit. push ( ( cached_tx. clone ( ) , * cached_time) ) ;
235+ continue ;
236+ }
237+
174238 match self . get_raw_transaction ( & entry. txid ) . await {
175239 Ok ( Some ( tx) ) => {
240+ mempool_txs_cache. insert ( entry. txid , ( tx. clone ( ) , entry. time ) ) ;
176241 txs_to_emit. push ( ( tx, entry. time ) ) ;
177242 } ,
178243 Ok ( None ) => {
0 commit comments