Skip to content

Commit ece7658

Browse files
committed
Correct tracker mempool schema and refine the filtering
1 parent 3a663b3 commit ece7658

File tree

6 files changed

+19
-11
lines changed

6 files changed

+19
-11
lines changed

src/db/db_manager.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,17 @@ pub async fn run(
5454
DbRequest::WatchUtxo(outpoint, resp_tx) => {
5555
info!("Watch utxo intercepted");
5656

57-
let mempool_tx = mempool_tx::table
57+
let mut mempool_tx = mempool_tx::table
5858
.inner_join(mempool_inputs::table.on(mempool_tx::txid.eq(mempool_inputs::txid)))
5959
.filter(mempool_inputs::input_txid.eq(outpoint.txid.to_string()))
6060
.filter(mempool_inputs::input_vout.eq(outpoint.vout as i32))
6161
.select((mempool_tx::txid, mempool_tx::seen_at))
6262
.load::<MempoolTx>(&mut conn)
6363
.unwrap();
6464

65+
mempool_tx.sort_by(|a, b| a.txid.cmp(&b.txid));
66+
mempool_tx.dedup_by(|a, b| a.txid == b.txid);
67+
6568
let _ = resp_tx.send(mempool_tx).await;
6669
}
6770
}

src/db/schema.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
// @generated automatically by Diesel CLI.
22

33
diesel::table! {
4-
mempool_inputs (rowid) {
5-
rowid -> Integer,
4+
mempool_inputs (txid, input_txid, input_vout) {
65
txid -> Text,
76
input_txid -> Text,
87
input_vout -> Integer,

src/indexer/tracker_indexer.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,10 @@ pub async fn run(
6161
}
6262
}
6363
last_tip = tip_height;
64+
#[cfg(not(feature = "integration-test"))]
6465
tokio::time::sleep(Duration::from_secs(10)).await;
66+
#[cfg(feature = "integration-test")]
67+
tokio::time::sleep(Duration::from_millis(100)).await;
6568
}
6669
}
6770

src/indexer/utxo_indexer.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::sync::Arc;
33
use crate::db::model::{MempoolInput, MempoolTx, Utxo};
44
use crate::db::schema::{mempool_inputs, mempool_tx, utxos};
55
use crate::indexer::rpc::BitcoinRpc;
6-
use chrono::NaiveDateTime;
6+
use chrono::{NaiveDateTime, Utc};
77
use diesel::SqliteConnection;
88
use diesel::prelude::*;
99
use diesel::r2d2::ConnectionManager;
@@ -30,7 +30,7 @@ impl<'a> Indexer<'a> {
3030

3131
for input in &tx.input {
3232
let prevout = &input.previous_output;
33-
diesel::insert_into(mempool_inputs::table)
33+
diesel::insert_or_ignore_into(mempool_inputs::table)
3434
.values(&MempoolInput {
3535
txid: txid.to_string(),
3636
input_txid: prevout.txid.to_string(),
@@ -40,7 +40,7 @@ impl<'a> Indexer<'a> {
4040
.unwrap();
4141

4242
self.mark_utxo_spent(
43-
&txid.to_string(),
43+
&prevout.txid.to_string(),
4444
prevout.vout as i32,
4545
Some(&txid.to_string()),
4646
false,
@@ -106,7 +106,7 @@ impl<'a> Indexer<'a> {
106106
diesel::insert_or_ignore_into(mempool_tx::table)
107107
.values(&MempoolTx {
108108
txid: txid.to_string(),
109-
seen_at: NaiveDateTime::MIN,
109+
seen_at: Utc::now().naive_utc(),
110110
})
111111
.execute(&mut conn)
112112
.unwrap();

src/server/tracker_monitor.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use crate::{
2121

2222
use tokio::io::BufReader;
2323

24-
const COOLDOWN_PERIOD: u64 = 5;
24+
const COOLDOWN_PERIOD: u64 = 15 * 60;
2525
pub async fn monitor_systems(
2626
db_tx: Sender<DbRequest>,
2727
status_tx: status::Sender,
@@ -73,7 +73,9 @@ pub async fn monitor_systems(
7373
};
7474
_ = send_message_with_prefix(&mut writer, &message).await;
7575

76-
let buffer = handle_result!(status_tx, read_message(&mut reader).await);
76+
let Ok(buffer) = read_message(&mut reader).await else {
77+
continue;
78+
};
7779
let response: TrackerClientToServer =
7880
match serde_cbor::de::from_reader(&buffer[..]) {
7981
Ok(resp) => resp,
@@ -114,7 +116,7 @@ pub async fn monitor_systems(
114116
let _ = db_tx.send(DbRequest::Update(address, updated_info)).await;
115117
}
116118
}
117-
sleep(Duration::from_secs(4)).await;
119+
sleep(Duration::from_secs(COOLDOWN_PERIOD)).await;
118120
}
119121
}
120122
}

src/server/tracker_server.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::db::model::MempoolTx;
2+
use crate::server::send_message_with_prefix;
23
use crate::server::tracker_monitor::monitor_systems;
34
use crate::status;
45
use crate::types::DbRequest;
@@ -120,7 +121,7 @@ async fn handle_client(mut stream: TcpStream, db_tx: Sender<DbRequest>) {
120121

121122
if let Some(mempool_tx) = response {
122123
let message = TrackerServerToClient::WatchResponse { mempool_tx };
123-
if let Err(e) = send_message(&mut writer, &message).await {
124+
if let Err(e) = send_message_with_prefix(&mut writer, &message).await {
124125
error!("Failed to send response to client: {e}");
125126
break;
126127
}

0 commit comments

Comments
 (0)