3333 tokio:: {
3434 net:: TcpStream ,
3535 select,
36- sync:: mpsc:: {
37- self ,
38- Receiver ,
39- } ,
36+ sync:: broadcast,
4037 task:: JoinHandle ,
4138 } ,
4239 tokio_tungstenite:: {
@@ -80,7 +77,7 @@ impl RelayerWsSession {
8077 & mut self ,
8178 signed_lazer_transaction : & SignedLazerTransaction ,
8279 ) -> Result < ( ) > {
83- tracing:: debug!( "price_update : {:?}" , signed_lazer_transaction) ;
80+ tracing:: debug!( "signed_lazer_transaction : {:?}" , signed_lazer_transaction) ;
8481 let buf = signed_lazer_transaction. write_to_bytes ( ) ?;
8582 self . ws_sender
8683 . send ( TungsteniteMessage :: from ( buf. clone ( ) ) )
@@ -113,7 +110,7 @@ struct RelayerSessionTask {
113110 // connection state
114111 url : Url ,
115112 token : String ,
116- receiver : Receiver < SignedLazerTransaction > ,
113+ receiver : broadcast :: Receiver < SignedLazerTransaction > ,
117114}
118115
119116impl RelayerSessionTask {
@@ -168,11 +165,25 @@ impl RelayerSessionTask {
168165
169166 loop {
170167 select ! {
171- Some ( transaction) = self . receiver. recv( ) => {
172- if let Err ( e) = relayer_ws_session. send_transaction( & transaction) . await
173- {
174- tracing:: error!( "Error publishing transaction to Lazer relayer: {e:?}" ) ;
175- bail!( "Failed to publish transaction to Lazer relayer: {e:?}" ) ;
168+ recv_result = self . receiver. recv( ) => {
169+ match recv_result {
170+ Ok ( transaction) => {
171+ if let Err ( e) = relayer_ws_session. send_transaction( & transaction) . await {
172+ tracing:: error!( "Error publishing transaction to Lazer relayer: {e:?}" ) ;
173+ bail!( "Failed to publish transaction to Lazer relayer: {e:?}" ) ;
174+ }
175+ } ,
176+ Err ( e) => {
177+ match e {
178+ broadcast:: error:: RecvError :: Closed => {
179+ tracing:: error!( "transaction broadcast channel closed" ) ;
180+ bail!( "transaction broadcast channel closed" ) ;
181+ }
182+ broadcast:: error:: RecvError :: Lagged ( skipped_count) => {
183+ tracing:: warn!( "transaction broadcast channel lagged by {skipped_count} messages" ) ;
184+ }
185+ }
186+ }
176187 }
177188 }
178189 // Handle messages from the relayers, such as errors if we send a bad update
@@ -237,23 +248,23 @@ async fn fetch_symbols(history_url: &Url) -> Result<Vec<SymbolResponse>> {
237248#[ instrument( skip( config, state) ) ]
238249pub fn lazer_exporter ( config : Config , state : Arc < state:: State > ) -> Vec < JoinHandle < ( ) > > {
239250 let mut handles = vec ! [ ] ;
240- let mut relayer_senders = vec ! [ ] ;
251+
252+ // can safely drop first receiver for ease of iteration
253+ let ( relayer_sender, _) = broadcast:: channel ( RELAYER_CHANNEL_CAPACITY ) ;
241254
242255 for url in config. relayer_urls . iter ( ) {
243- let ( sender, receiver) = mpsc:: channel ( RELAYER_CHANNEL_CAPACITY ) ;
244256 let mut task = RelayerSessionTask {
245- url : url. clone ( ) ,
246- token : config. authorization_token . to_owned ( ) ,
247- receiver,
257+ url : url. clone ( ) ,
258+ token : config. authorization_token . to_owned ( ) ,
259+ receiver : relayer_sender . subscribe ( ) ,
248260 } ;
249261 handles. push ( tokio:: spawn ( async move { task. run ( ) . await } ) ) ;
250- relayer_senders. push ( sender) ;
251262 }
252263
253264 handles. push ( tokio:: spawn ( lazer_exporter:: lazer_exporter (
254265 config. clone ( ) ,
255266 state,
256- relayer_senders ,
267+ relayer_sender ,
257268 ) ) ) ;
258269
259270 handles
@@ -305,7 +316,7 @@ mod lazer_exporter {
305316 collections:: HashMap ,
306317 sync:: Arc ,
307318 } ,
308- tokio:: sync:: mpsc :: Sender ,
319+ tokio:: sync:: broadcast :: Sender ,
309320 } ;
310321
311322 fn get_signing_key ( config : & Config ) -> Result < SigningKey > {
@@ -329,7 +340,7 @@ mod lazer_exporter {
329340 pub async fn lazer_exporter < S > (
330341 config : Config ,
331342 state : Arc < S > ,
332- relayer_senders : Vec < Sender < SignedLazerTransaction > > ,
343+ relayer_sender : Sender < SignedLazerTransaction > ,
333344 ) where
334345 S : LocalStore ,
335346 S : Send + Sync + ' static ,
@@ -428,9 +439,12 @@ mod lazer_exporter {
428439 payload: Some ( buf) ,
429440 special_fields: Default :: default ( ) ,
430441 } ;
431- futures:: future:: join_all( relayer_senders. iter( ) . map( |relayer_sender|
432- relayer_sender. send( signed_lazer_transaction. clone( ) ) )
433- ) . await ;
442+ match relayer_sender. send( signed_lazer_transaction. clone( ) ) {
443+ Ok ( _) => ( ) ,
444+ Err ( e) => {
445+ tracing:: error!( "Error sending transaction to relayer receivers: {e}" ) ;
446+ }
447+ }
434448 }
435449 }
436450 }
0 commit comments