-
Couldn't load subscription status.
- Fork 60
TQ: Async Nodes and P2P connections #9258
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
f24e5fb to
8257402
Compare
| } | ||
| } | ||
|
|
||
| pub async fn run(&mut self) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is nearly identical to LRTQ
| async fn on_read( | ||
| &mut self, | ||
| res: Result<usize, std::io::Error>, | ||
| ) -> Result<(), ConnErr> { | ||
| match res { | ||
| Ok(n) => { | ||
| self.total_read += n; | ||
| } | ||
| Err(e) => { | ||
| return Err(ConnErr::FailedRead(e)); | ||
| } | ||
| } | ||
|
|
||
| // We may have more than one message that has been read | ||
| loop { | ||
| if self.total_read < FRAME_HEADER_SIZE { | ||
| return Ok(()); | ||
| } | ||
| // Read frame size | ||
| let size = read_frame_size( | ||
| self.read_buf[..FRAME_HEADER_SIZE].try_into().unwrap(), | ||
| ); | ||
| let end = size + FRAME_HEADER_SIZE; | ||
|
|
||
| // If we haven't read the whole message yet, then return | ||
| if end > self.total_read { | ||
| return Ok(()); | ||
| } | ||
| let msg: WireMsg = | ||
| ciborium::from_reader(&self.read_buf[FRAME_HEADER_SIZE..end])?; | ||
| // Move any remaining bytes to the beginning of the buffer. | ||
| self.read_buf.copy_within(end..self.total_read, 0); | ||
| self.total_read = self.total_read - end; | ||
|
|
||
| self.last_received_msg = Instant::now(); | ||
| debug!(self.log, "Received {msg:?}"); | ||
| match msg { | ||
| WireMsg::Tq(msg) => { | ||
| if let Err(e) = self | ||
| .main_tx | ||
| .send(ConnToMainMsg { | ||
| task_id: self.task_id, | ||
| msg: ConnToMainMsgInner::Received { | ||
| from: self.peer_id.clone(), | ||
| msg, | ||
| }, | ||
| }) | ||
| .await | ||
| { | ||
| warn!( | ||
| self.log, | ||
| "Failed to send received fsm msg to main task: {e:?}" | ||
| ); | ||
| } | ||
| } | ||
| WireMsg::Ping => { | ||
| // Nothing to do here, since Ping is just to keep us alive and | ||
| // we updated self.last_received_msg above. | ||
| } | ||
| WireMsg::NetworkConfig(config) => { | ||
| let generation = config.generation; | ||
| if let Err(e) = self | ||
| .main_tx | ||
| .send(ConnToMainMsg { | ||
| task_id: self.task_id, | ||
| msg: ConnToMainMsgInner::ReceivedNetworkConfig { | ||
| from: self.peer_id.clone(), | ||
| config, | ||
| }, | ||
| }) | ||
| .await | ||
| { | ||
| warn!( | ||
| self.log, | ||
| "Failed to send received NetworkConfig with | ||
| generation {generation} to main task: {e:?}" | ||
| ); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| async fn check_write_result( | ||
| &mut self, | ||
| res: Result<usize, std::io::Error>, | ||
| ) -> Result<(), ConnErr> { | ||
| match res { | ||
| Ok(_) => { | ||
| if !self.current_write.has_remaining() { | ||
| self.current_write = Cursor::new(Vec::new()); | ||
| } | ||
| Ok(()) | ||
| } | ||
| Err(e) => { | ||
| let _ = self.writer.shutdown().await; | ||
| Err(ConnErr::FailedWrite(e)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| async fn on_msg_from_main( | ||
| &mut self, | ||
| msg: MainToConnMsg, | ||
| ) -> Result<(), ConnErr> { | ||
| match msg { | ||
| MainToConnMsg::Close => { | ||
| return Err(ConnErr::Close); | ||
| } | ||
| MainToConnMsg::Msg(msg) => self.write_framed_to_queue(msg).await, | ||
| } | ||
| } | ||
|
|
||
| async fn write_framed_to_queue( | ||
| &mut self, | ||
| msg: WireMsg, | ||
| ) -> Result<(), ConnErr> { | ||
| if self.write_queue.len() == MSG_WRITE_QUEUE_CAPACITY { | ||
| return Err(ConnErr::WriteQueueFull); | ||
| } else { | ||
| let msg = write_framed(&msg)?; | ||
| self.write_queue.push_back(msg); | ||
| Ok(()) | ||
| } | ||
| } | ||
|
|
||
| async fn ping(&mut self) -> Result<(), ConnErr> { | ||
| if Instant::now() - self.last_received_msg > INACTIVITY_TIMEOUT { | ||
| return Err(ConnErr::InactivityTimeout); | ||
| } | ||
| self.write_framed_to_queue(WireMsg::Ping).await | ||
| } | ||
| } | ||
|
|
||
| // Decode the 4-byte big-endian frame size header | ||
| fn read_frame_size(buf: [u8; FRAME_HEADER_SIZE]) -> usize { | ||
| u32::from_be_bytes(buf) as usize | ||
| } | ||
|
|
||
| /// Serialize and write `msg` into `buf`, prefixed by a 4-byte big-endian size | ||
| /// header | ||
| /// | ||
| /// Return the total amount of data written into `buf` including the 4-byte | ||
| /// header. | ||
| fn write_framed<T: Serialize + ?Sized>( | ||
| msg: &T, | ||
| ) -> Result<Vec<u8>, ciborium::ser::Error<std::io::Error>> { | ||
| let mut cursor = Cursor::new(vec![]); | ||
| // Write a size placeholder | ||
| std::io::Write::write(&mut cursor, &[0u8; FRAME_HEADER_SIZE])?; | ||
| cursor.set_position(FRAME_HEADER_SIZE as u64); | ||
| ciborium::into_writer(msg, &mut cursor)?; | ||
| let size: u32 = | ||
| (cursor.position() - FRAME_HEADER_SIZE as u64).try_into().unwrap(); | ||
| let mut buf = cursor.into_inner(); | ||
| buf[0..FRAME_HEADER_SIZE].copy_from_slice(&size.to_be_bytes()); | ||
| Ok(buf) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is all nearly identical to LRTQ. The logic is the same, and LRTQ has been running without incident for over 2 years. What has change are some names and err types.
| socket2 = { version = "0.5", features = ["all"] } | ||
| sp-sim = { path = "sp-sim" } | ||
| sprockets-tls = { git = "https://github.com/oxidecomputer/sprockets.git", rev = "7da1f0b5dcd3d631da18b43ba78a84b1a2b425ee" } | ||
| sprockets-tls = { git = "https://github.com/oxidecomputer/sprockets.git", rev = "dea3bbfac7d9d3c45f088898fcd05ee5d2ec2210" } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This just pulls in a couple of helpers.
|
I added @jgallagher and @hawkw as reviewers to look over the bulk of the code as it is async. @labbott and @flihp you can mostly ignore code that isn't related to sprockets and dice setup/test helpers. |
08cd293 to
655d671
Compare
e42af9c to
035aba3
Compare
Builds on #9232 This is the first step in wrapping the `trust_quorum::Node` so that it can be used in an async context and integrated with sled-agent. Only the sprockets networking has been fully integrated so far such that each `NodeTask` has a `ConnMgr` that sets up a full mesh of sprockets connections. A test for this connectivity behavior has been written but the code is not wired into the production code yet. Messages can be sent between `NodeTasks` over sprockets connections. Each connection exists in it's own task managed by an `EstablishedConn`. The main `NodeTask` task sends messages to and receives messages from this task to interact with the outside world via sprockets. Currently only `Ping` messages are sent over the wire as a means to keep the connections alive and detect disconnects. A `NodeHandle` allows one to interact with the `NodeTask`. Currently only three operations are implemented with messages defined in `NodeApiRequest`. The user can instruct the node who it's peers are on the bootstrap network to establish connectivity, can poll for connectivity status, and can shutdown the node. All of this functionality is used in the accompanying test. It's important to re-iterate that this code only implements connectivity between trust quorum nodes and no actual trust quorum messages are sent. They can't be as a handle can not yet initiate a reconfiguration or LRTQ upgrade. That behavior will come in a follow up. This PR is large enough. A lot of this code is similar to the LRTQ connection management code, except that it operates over sprockets rather than TCP channels. This introduces some complexity, but it is mostly abstracted away into the `SprocketsConfig`.
b44c01e to
7d20b2d
Compare
|
@hawkw Thanks for the great review! I believe I fixed up everything related to your comments, and then some ;) |
|
I think I"m going to have to put all the code in this PR into a new crate; or rather, put the existing code into a |
|
Done in 79a6730 |
|
I realized that there is a possibility of deadlock between the main task and the connection tasks due to the use of bounded channels and blocking sends. The main task can be sending a message to write to a connection task which can at the same time be sending a message read off the wire to the main task. The channels can be sized differently, but the risk is inherent in the structure. We can't use A few other thoughts for resolution:
|
Builds on #9232
This is the first step in wrapping the
trust_quorum::Nodeso that it can be used in an async context and integrated with sled-agent. Only the sprockets networking has been fully integrated so far such that eachNodeTaskhas aConnMgrthat sets up a full mesh of sprockets connections. A test for this connectivity behavior has been written but the code is not wired into the production code yet.Messages can be sent between
NodeTasksover sprockets connections. Each connection exists in it's own task managed by anEstablishedConn. The mainNodeTasktask sends messages to and receives messages from this task to interact with the outside world via sprockets. Currently onlyPingmessages are sent over the wire as a means to keep the connections alive and detect disconnects.A
NodeHandleallows one to interact with theNodeTask. Currently only three operations are implemented with messages defined inNodeApiRequest. The user can instruct the node who it's peers are on the bootstrap network to establish connectivity, can poll for connectivity status, and can shutdown the node. All of this functionality is used in the accompanying test.It's important to re-iterate that this code only implements connectivity between trust quorum nodes and no actual trust quorum messages are sent. They can't be as a handle can not yet initiate a reconfiguration or LRTQ upgrade. That behavior will come in a follow up. This PR is large enough.
A lot of this code is similar to the LRTQ connection management code, except that it operates over sprockets rather than TCP channels. This introduces some complexity, but it is mostly abstracted away into the
SprocketsConfig.