Skip to content

Commit 6d5c009

Browse files
committed
feat: defer SV2 channel opening until first SV1 message received
- Move channel opening from connection acceptance to first message receipt - Cache Sv1 handshake messages until SV2 channel is established with proper extranonce - Send cached response only after channel setup is complete
1 parent 562c29e commit 6d5c009

File tree

5 files changed

+160
-54
lines changed

5 files changed

+160
-54
lines changed

roles/translator/src/lib/sv1/downstream/data.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ pub struct DownstreamData {
2828
pub pending_hashrate: Option<f32>,
2929
// Flag to track if SV1 handshake is complete (subscribe + authorize)
3030
pub sv1_handshake_complete: AtomicBool,
31+
// Queue of Sv1 handshake messages received while waiting for SV2 channel to open
32+
pub queued_sv1_handshake_messages: Vec<json_rpc::Message>,
33+
// Flag to indicate we're processing queued Sv1 handshake message responses
34+
pub processing_queued_sv1_handshake_responses: AtomicBool,
3135
// Stores pending shares to be sent to the sv1_server
3236
pub pending_share: RefCell<Option<SubmitShareWithChannelId>>,
3337
// Reference to shared sv1_server data for accessing valid_jobs during downstream sv1
@@ -61,6 +65,8 @@ impl DownstreamData {
6165
pending_target: None,
6266
pending_hashrate: None,
6367
sv1_handshake_complete: AtomicBool::new(false),
68+
queued_sv1_handshake_messages: Vec::new(),
69+
processing_queued_sv1_handshake_responses: AtomicBool::new(false),
6470
pending_share: RefCell::new(None),
6571
sv1_server_data,
6672
upstream_target: None,

roles/translator/src/lib/sv1/downstream/downstream.rs

Lines changed: 63 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -186,16 +186,21 @@ impl Downstream {
186186
.load(std::sync::atomic::Ordering::SeqCst),
187187
)
188188
});
189-
190189
let id_matches = (my_channel_id == Some(channel_id) || channel_id == 0)
191190
&& (downstream_id.is_none() || downstream_id == Some(my_downstream_id));
192-
193191
if !id_matches {
194192
return Ok(()); // Message not intended for this downstream
195193
}
196194

197-
// Sv1 handshake complete - send messages immediately
198-
if handshake_complete {
195+
// Check if this is a queued message response
196+
let is_queued_sv1_handshake_response = self.downstream_data.super_safe_lock(|d| {
197+
d.processing_queued_sv1_handshake_responses
198+
.load(std::sync::atomic::Ordering::SeqCst)
199+
});
200+
201+
// Sv1 handshake complete - send messages immediately, or if it's a queued
202+
// Sv1 handshake message response
203+
if handshake_complete || is_queued_sv1_handshake_response {
199204
if let Message::Notification(notification) = &message {
200205
match notification.method.as_str() {
201206
"mining.set_difficulty" => {
@@ -349,37 +354,67 @@ impl Downstream {
349354
}
350355
};
351356

357+
// Check if channel is established
358+
let channel_established = self
359+
.downstream_data
360+
.super_safe_lock(|d| d.channel_id.is_some());
361+
362+
if !channel_established {
363+
// Check if this is the first message (queue is empty) and send OpenChannel request
364+
let is_first_message = self
365+
.downstream_data
366+
.super_safe_lock(|d| d.queued_sv1_handshake_messages.is_empty());
367+
368+
if is_first_message {
369+
let downstream_id = self.downstream_data.super_safe_lock(|d| d.downstream_id);
370+
self.downstream_channel_state
371+
.sv1_server_sender
372+
.send(DownstreamMessages::OpenChannel(downstream_id))
373+
.await
374+
.map_err(|e| {
375+
error!("Down: Failed to send OpenChannel request: {:?}", e);
376+
TproxyError::ChannelErrorSender
377+
})?;
378+
debug!(
379+
"Down: Sent OpenChannel request for downstream {}",
380+
downstream_id
381+
);
382+
}
383+
384+
// Queue all messages until channel is established
385+
debug!("Down: Queuing Sv1 message until channel is established");
386+
self.downstream_data.safe_lock(|d| {
387+
d.queued_sv1_handshake_messages.push(message.clone());
388+
})?;
389+
return Ok(());
390+
}
391+
392+
// Channel is established, process message normally
352393
let response = self
353394
.downstream_data
354395
.super_safe_lock(|data| data.handle_message(message.clone()));
355396

356397
match response {
357398
Ok(Some(response_msg)) => {
358-
if self
359-
.downstream_data
360-
.super_safe_lock(|d| d.channel_id)
361-
.is_some()
362-
{
363-
debug!(
364-
"Down: Sending SV1 message to downstream: {:?}",
365-
response_msg
366-
);
367-
self.downstream_channel_state
368-
.downstream_sv1_sender
369-
.send(response_msg.into())
370-
.await
371-
.map_err(|e| {
372-
error!("Down: Failed to send message to downstream: {:?}", e);
373-
TproxyError::ChannelErrorSender
374-
})?;
399+
debug!(
400+
"Down: Sending Sv1 message to downstream: {:?}",
401+
response_msg
402+
);
403+
self.downstream_channel_state
404+
.downstream_sv1_sender
405+
.send(response_msg.into())
406+
.await
407+
.map_err(|e| {
408+
error!("Down: Failed to send message to downstream: {:?}", e);
409+
TproxyError::ChannelErrorSender
410+
})?;
375411

376-
// Check if this was an authorize message and handle sv1 handshake completion
377-
if let v1::json_rpc::Message::StandardRequest(request) = &message {
378-
if request.method == "mining.authorize" {
379-
if let Err(e) = self.handle_sv1_handshake_completion().await {
380-
error!("Down: Failed to handle handshake completion: {:?}", e);
381-
return Err(e);
382-
}
412+
// Check if this was an authorize message and handle sv1 handshake completion
413+
if let v1::json_rpc::Message::StandardRequest(request) = &message {
414+
if request.method == "mining.authorize" {
415+
if let Err(e) = self.handle_sv1_handshake_completion().await {
416+
error!("Down: Failed to handle handshake completion: {:?}", e);
417+
return Err(e);
383418
}
384419
}
385420
}

roles/translator/src/lib/sv1/downstream/message_handler.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::{
1010
utils::validate_sv1_share,
1111
};
1212

13-
// Implements `IsServer` for `Downstream` to handle the SV1 messages.
13+
// Implements `IsServer` for `Downstream` to handle the Sv1 messages.
1414
impl IsServer<'static> for DownstreamData {
1515
fn handle_configure(
1616
&mut self,

roles/translator/src/lib/sv1/downstream/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ pub enum DownstreamMessages {
1414
/// Represents a submitted share from a downstream miner,
1515
/// wrapped with the relevant channel ID.
1616
SubmitShares(SubmitShareWithChannelId),
17+
/// Request to open an extended mining channel for a downstream that just sent its first
18+
/// message.
19+
OpenChannel(u32), // downstream_id
1720
}
1821

1922
/// A wrapper around a `mining.submit` message with additional channel information.

roles/translator/src/lib/sv1/sv1_server/sv1_server.rs

Lines changed: 87 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use tokio::{
4040
sync::{broadcast, mpsc},
4141
};
4242
use tracing::{debug, error, info, warn};
43+
use v1::IsServer;
4344

4445
/// SV1 server that handles connections from SV1 miners.
4546
///
@@ -281,11 +282,21 @@ impl Sv1Server {
281282
d.vardiff.insert(downstream_id, vardiff);
282283
}
283284
});
284-
info!("Downstream {} registered successfully", downstream_id);
285+
info!("Downstream {} registered successfully (channel will be opened after first message)", downstream_id);
285286

286-
self
287-
.open_extended_mining_channel(downstream.clone())
288-
.await?;
287+
// Start downstream tasks immediately, but defer channel opening until first message
288+
let status_sender = StatusSender::Downstream {
289+
downstream_id,
290+
tx: status_sender.clone(),
291+
};
292+
293+
Downstream::run_downstream_tasks(
294+
downstream,
295+
notify_shutdown.clone(),
296+
shutdown_complete_tx.clone(),
297+
status_sender,
298+
task_manager.clone(),
299+
);
289300
}
290301
Err(e) => {
291302
warn!("Failed to accept new connection: {:?}", e);
@@ -303,10 +314,6 @@ impl Sv1Server {
303314
res = Self::handle_upstream_message(
304315
Arc::clone(&self),
305316
first_target.clone(),
306-
notify_shutdown.clone(),
307-
shutdown_complete_tx_main_clone.clone(),
308-
status_sender.clone(),
309-
task_manager.clone()
310317
) => {
311318
if let Err(e) = res {
312319
handle_error(&sv1_status_sender, e).await;
@@ -340,8 +347,21 @@ impl Sv1Server {
340347
.await
341348
.map_err(TproxyError::ChannelErrorReceiver)?;
342349

343-
let DownstreamMessages::SubmitShares(message) = downstream_message;
350+
match downstream_message {
351+
DownstreamMessages::SubmitShares(message) => {
352+
return self.handle_submit_shares(message).await;
353+
}
354+
DownstreamMessages::OpenChannel(downstream_id) => {
355+
return self.handle_open_channel_request(downstream_id).await;
356+
}
357+
}
358+
}
344359

360+
/// Handles share submission messages from downstream.
361+
async fn handle_submit_shares(
362+
self: &Arc<Self>,
363+
message: crate::sv1::downstream::SubmitShareWithChannelId,
364+
) -> Result<(), TproxyError> {
345365
// Increment vardiff counter for this downstream (only if vardiff is enabled)
346366
if self.config.downstream_difficulty_config.enable_vardiff {
347367
self.sv1_server_data.safe_lock(|v| {
@@ -382,6 +402,28 @@ impl Sv1Server {
382402
Ok(())
383403
}
384404

405+
/// Handles channel opening requests from downstream when they send their first message.
406+
async fn handle_open_channel_request(
407+
self: &Arc<Self>,
408+
downstream_id: u32,
409+
) -> Result<(), TproxyError> {
410+
info!("SV1 Server: Opening extended mining channel for downstream {} after receiving first message", downstream_id);
411+
412+
let downstreams = self
413+
.sv1_server_data
414+
.super_safe_lock(|v| v.downstreams.clone());
415+
if let Some(downstream) = Self::get_downstream(downstream_id, downstreams) {
416+
self.open_extended_mining_channel(downstream).await?;
417+
} else {
418+
error!(
419+
"Downstream {} not found when trying to open channel",
420+
downstream_id
421+
);
422+
}
423+
424+
Ok(())
425+
}
426+
385427
/// Handles messages received from the upstream SV2 server via the channel manager.
386428
///
387429
/// This method processes various SV2 messages including:
@@ -403,10 +445,6 @@ impl Sv1Server {
403445
pub async fn handle_upstream_message(
404446
self: Arc<Self>,
405447
first_target: Target,
406-
notify_shutdown: broadcast::Sender<ShutdownMessage>,
407-
shutdown_complete_tx: mpsc::Sender<()>,
408-
status_sender: Sender<Status>,
409-
task_manager: Arc<TaskManager>,
410448
) -> Result<(), TproxyError> {
411449
let message = self
412450
.sv1_server_channel_state
@@ -435,18 +473,42 @@ impl Sv1Server {
435473
d.set_upstream_target(initial_target.clone());
436474
})?;
437475

438-
let status_sender = StatusSender::Downstream {
439-
downstream_id,
440-
tx: status_sender.clone(),
441-
};
442-
443-
Downstream::run_downstream_tasks(
444-
downstream,
445-
notify_shutdown,
446-
shutdown_complete_tx,
447-
status_sender,
448-
task_manager,
449-
);
476+
// Process all queued messages now that channel is established
477+
if let Ok(queued_messages) = downstream.downstream_data.safe_lock(|d| {
478+
let messages = d.queued_sv1_handshake_messages.clone();
479+
d.queued_sv1_handshake_messages.clear();
480+
messages
481+
}) {
482+
if !queued_messages.is_empty() {
483+
info!(
484+
"Processing {} queued Sv1 messages for downstream {}",
485+
queued_messages.len(),
486+
downstream_id
487+
);
488+
489+
// Set flag to indicate we're processing queued responses
490+
downstream.downstream_data.super_safe_lock(|data| {
491+
data.processing_queued_sv1_handshake_responses
492+
.store(true, std::sync::atomic::Ordering::SeqCst);
493+
});
494+
495+
for message in queued_messages {
496+
if let Ok(Some(response_msg)) = downstream
497+
.downstream_data
498+
.super_safe_lock(|data| data.handle_message(message))
499+
{
500+
self.sv1_server_channel_state
501+
.sv1_server_to_downstream_sender
502+
.send((
503+
m.channel_id,
504+
Some(downstream_id),
505+
response_msg.into(),
506+
))
507+
.map_err(|_| TproxyError::ChannelErrorSender)?;
508+
}
509+
}
510+
}
511+
}
450512

451513
let set_difficulty = build_sv1_set_difficulty_from_sv2_target(first_target)
452514
.map_err(|_| {

0 commit comments

Comments
 (0)