Skip to content

Commit 56152aa

Browse files
committed
make tracker a library
1 parent 6f7457e commit 56152aa

File tree

3 files changed

+212
-249
lines changed

3 files changed

+212
-249
lines changed

Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,14 @@ name = "tracker"
33
version = "0.1.0"
44
edition = "2024"
55

6+
[lib]
7+
name = "tracker"
8+
path = "src/lib.rs"
9+
10+
[[bin]]
11+
name = "tracker"
12+
path = "src/main.rs"
13+
614
[dependencies]
715
bitcoincore-rpc = "0.19.0"
816
chrono = { version = "0.4", features = ["serde"] }

src/lib.rs

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
#![allow(warnings)]
2+
use bitcoincore_rpc::{Auth, Client};
3+
use diesel::SqliteConnection;
4+
use diesel::r2d2::ConnectionManager;
5+
use r2d2::Pool;
6+
use std::path::Path;
7+
use std::sync::Arc;
8+
use tokio::sync::mpsc;
9+
use tracing::{error, info, warn};
10+
11+
use crate::status::{State, Status};
12+
use crate::types::DbRequest;
13+
14+
mod db;
15+
mod error;
16+
mod handle_error;
17+
mod indexer;
18+
mod server;
19+
mod status;
20+
mod tor;
21+
mod types;
22+
mod utils;
23+
24+
use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
25+
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
26+
27+
#[derive(Debug, Clone)]
28+
pub struct Config {
29+
pub rpc_url: String,
30+
pub rpc_auth: Auth,
31+
pub address: String,
32+
pub control_port: u16,
33+
pub tor_auth_password: String,
34+
pub socks_port: u16,
35+
pub datadir: String,
36+
}
37+
38+
fn run_migrations(pool: Arc<Pool<ConnectionManager<SqliteConnection>>>) {
39+
let mut conn = pool
40+
.get()
41+
.expect("Failed to get DB connection for migrations");
42+
conn.run_pending_migrations(MIGRATIONS)
43+
.expect("Migration failed");
44+
}
45+
46+
pub async fn start(cfg: Config) {
47+
info!("Connecting to indexer db");
48+
let database_url = format!("{}/tracker.db", cfg.datadir);
49+
if let Some(parent) = Path::new(&database_url).parent() {
50+
std::fs::create_dir_all(parent).expect("Failed to create database directory");
51+
}
52+
let manager = ConnectionManager::<SqliteConnection>::new(database_url);
53+
let pool = Arc::new(
54+
Pool::builder()
55+
.build(manager)
56+
.expect("Failed to create DB pool"),
57+
);
58+
run_migrations(pool.clone());
59+
info!("Connected to indexer db");
60+
61+
tor::check_tor_status(cfg.control_port, &cfg.tor_auth_password)
62+
.await
63+
.expect("Failed to check Tor status");
64+
65+
let hostname = match cfg.address.split_once(':') {
66+
Some((_, port)) => {
67+
let port = port.parse::<u16>().expect("Invalid port in address");
68+
tor::get_tor_hostname(
69+
Path::new(&cfg.datadir),
70+
cfg.control_port,
71+
port,
72+
&cfg.tor_auth_password,
73+
)
74+
.await
75+
.expect("Failed to retrieve Tor hostname")
76+
}
77+
None => {
78+
error!("Invalid address format. Expected format: <host>:<port>");
79+
return;
80+
}
81+
};
82+
83+
info!("Tracker is listening at {}", hostname);
84+
85+
let (mut db_tx, db_rx) = mpsc::channel::<DbRequest>(10);
86+
let (status_tx, mut status_rx) = mpsc::channel::<Status>(10);
87+
88+
let rpc_client = Client::new(&cfg.rpc_url, cfg.rpc_auth.clone()).unwrap();
89+
90+
spawn_db_manager(pool.clone(), db_rx, status_tx.clone()).await;
91+
spawn_mempool_indexer(pool.clone(), db_tx.clone(), status_tx.clone(), rpc_client).await;
92+
spawn_server(
93+
db_tx.clone(),
94+
status_tx.clone(),
95+
cfg.address.clone(),
96+
cfg.socks_port,
97+
hostname.clone(),
98+
)
99+
.await;
100+
101+
info!("Tracker started");
102+
103+
while let Some(status) = status_rx.recv().await {
104+
match status.state {
105+
State::DBShutdown(err) => {
106+
warn!(
107+
"DB Manager exited unexpectedly. Restarting... Error: {:?}",
108+
err
109+
);
110+
let (new_db_tx, new_db_rx) = mpsc::channel::<DbRequest>(10);
111+
db_tx = new_db_tx;
112+
spawn_db_manager(pool.clone(), new_db_rx, status_tx.clone()).await;
113+
}
114+
State::Healthy(info) => {
115+
info!("System healthy: {:?}", info);
116+
}
117+
State::MempoolShutdown(err) => {
118+
warn!("Mempool Indexer crashed. Restarting... Error: {:?}", err);
119+
let client = Client::new(&cfg.rpc_url, cfg.rpc_auth.clone()).unwrap();
120+
spawn_mempool_indexer(pool.clone(), db_tx.clone(), status_tx.clone(), client).await;
121+
}
122+
State::ServerShutdown(err) => {
123+
warn!("Server crashed. Restarting... Error: {:?}", err);
124+
spawn_server(
125+
db_tx.clone(),
126+
status_tx.clone(),
127+
cfg.address.clone(),
128+
cfg.socks_port,
129+
hostname.clone(),
130+
)
131+
.await;
132+
}
133+
}
134+
}
135+
}
136+
137+
async fn spawn_db_manager(
138+
pool: Arc<Pool<ConnectionManager<SqliteConnection>>>,
139+
db_rx: tokio::sync::mpsc::Receiver<DbRequest>,
140+
status_tx: tokio::sync::mpsc::Sender<Status>,
141+
) {
142+
info!("Spawning db manager");
143+
tokio::spawn(db::run(pool, db_rx, status::Sender::DBManager(status_tx)));
144+
}
145+
146+
async fn spawn_mempool_indexer(
147+
pool: Arc<Pool<ConnectionManager<SqliteConnection>>>,
148+
db_tx: tokio::sync::mpsc::Sender<DbRequest>,
149+
status_tx: tokio::sync::mpsc::Sender<Status>,
150+
client: Client,
151+
) {
152+
info!("Spawning indexer");
153+
tokio::spawn(indexer::run(
154+
pool,
155+
db_tx,
156+
status::Sender::Mempool(status_tx),
157+
client.into(),
158+
));
159+
}
160+
161+
async fn spawn_server(
162+
db_tx: tokio::sync::mpsc::Sender<DbRequest>,
163+
status_tx: tokio::sync::mpsc::Sender<Status>,
164+
address: String,
165+
socks_port: u16,
166+
hostname: String,
167+
) {
168+
info!("Spawning server instance");
169+
tokio::spawn(server::run(
170+
db_tx,
171+
status::Sender::Server(status_tx),
172+
address,
173+
socks_port,
174+
hostname,
175+
));
176+
}

0 commit comments

Comments
 (0)