From b792640feb509ce6ad070e9608d483dd3a7ab245 Mon Sep 17 00:00:00 2001 From: ljluestc Date: Mon, 25 Aug 2025 23:36:12 -0700 Subject: [PATCH 1/6] fix rust code --- core/src/global.rs | 2 +- core/src/ser.rs | 5 ++ doc/macros.md | 46 ++++++++++++++++ doc/pool/transaction_pool.md | 21 +++++++ grin/p2p/src/msg.rs | 43 +++++++++++++++ p2p/src/msg.rs | 69 +++++++++++++++++++++-- p2p/src/types.rs | 33 +++++++++++ servers/src/grin/server.rs | 3 +- src/main.rs | 9 +++ tests/p2p_msg_tests.rs | 103 +++++++++++++++++++++++++++++++++++ util/src/macros.rs | 23 ++++---- 11 files changed, 338 insertions(+), 19 deletions(-) create mode 100644 doc/macros.md create mode 100644 doc/pool/transaction_pool.md create mode 100644 grin/p2p/src/msg.rs create mode 100644 src/main.rs create mode 100644 tests/p2p_msg_tests.rs diff --git a/core/src/global.rs b/core/src/global.rs index 37fb516894..4cd14681d7 100644 --- a/core/src/global.rs +++ b/core/src/global.rs @@ -197,7 +197,7 @@ pub fn get_chain_type() -> ChainTypes { CHAIN_TYPE.with(|chain_type| match chain_type.get() { None => { if !GLOBAL_CHAIN_TYPE.is_init() { - panic!("GLOBAL_CHAIN_TYPE and CHAIN_TYPE unset. Consider set_local_chain_type() in tests."); + std::panic!("GLOBAL_CHAIN_TYPE and CHAIN_TYPE unset. Consider set_local_chain_type() in tests."); } let chain_type = GLOBAL_CHAIN_TYPE.borrow(); set_local_chain_type(chain_type); diff --git a/core/src/ser.rs b/core/src/ser.rs index 38cd9aa1b7..96636fc289 100644 --- a/core/src/ser.rs +++ b/core/src/ser.rs @@ -361,6 +361,11 @@ impl ProtocolVersion { PROTOCOL_VERSION } + /// Default implementation that returns the current protocol version + pub fn default() -> ProtocolVersion { + PROTOCOL_VERSION + } + /// We need to specify a protocol version for our local database. /// Regardless of specific version used when sending/receiving data between peers /// we need to take care with serialization/deserialization of data locally in the db. diff --git a/doc/macros.md b/doc/macros.md new file mode 100644 index 0000000000..3456968a21 --- /dev/null +++ b/doc/macros.md @@ -0,0 +1,46 @@ +# Macros for Array Newtypes + +The `grin_util` crate provides several macros for working with array newtypes - wrapper types around fixed-size arrays. These macros help implement common traits and functionality for these types. + +## Available Macros + +### `impl_array_newtype` + +Implements standard array traits and behavior for newtype wrappers around fixed-size arrays. This includes: + +- Methods like `as_ptr()`, `as_mut_ptr()`, `len()`, etc. +- Indexing via `Index` traits +- Comparison traits (`PartialEq`, `Eq`, `PartialOrd`, `Ord`) +- Common traits like `Clone`, `Copy`, and `Hash` + +### `impl_array_newtype_encodable` + +Implements serialization and deserialization support via Serde for newtype wrappers. + +### `impl_array_newtype_show` + +Implements the `Debug` trait for pretty-printing the array newtype. + +### `impl_index_newtype` + +Implements various indexing operations for the newtype. This is automatically called by `impl_array_newtype`. + +## Usage Examples + +```rust +// Define a newtype for a 32-byte array +pub struct ChainCode([u8; 32]); + +// Implement standard array traits +impl_array_newtype!(ChainCode, u8, 32); + +// Implement Debug formatting +impl_array_newtype_show!(ChainCode); + +// Implement Serde serialization/deserialization +impl_array_newtype_encodable!(ChainCode, u8, 32); +``` + +## Notes on Feature Flags + +With recent Rust versions, conditional compilation within macros is handled differently. The `serde` and other features are now defined at the crate level rather than inside the macros themselves, which prevents warnings about unexpected `cfg` conditions. diff --git a/doc/pool/transaction_pool.md b/doc/pool/transaction_pool.md new file mode 100644 index 0000000000..d719dd97c8 --- /dev/null +++ b/doc/pool/transaction_pool.md @@ -0,0 +1,21 @@ +## Transaction Pool + +Grin's transaction pool is designed to hold all transactions that are not yet included in a block. + +The transaction pool is split into a stempool and a txpool. The stempool contains "stem" transactions, which are less actively propagated to the rest of the network, as well as txs received via Dandelion "stem" phase. The txpool contains transactions that may be directly propagated to the network, as well as txs received via Dandelion "fluff" phase. + +### Reconciliation + +The `Pool::reconcile` function validates transactions in the stempool or txpool against a given block header and removes invalid or duplicated transactions (present in txpool). The optimized implementation filters entries in-place, reducing validations from O(n² + n*m) to O(n + m), where n is the number of transactions in the pool being reconciled and m is the number of transactions in txpool. + +Reconciliation logs include: +- Number of entries before/after reconciliation +- Count of invalid or duplicated transactions removed + +Example: +``` +INFO: Starting transaction pool reconciliation with 200 entries +WARN: Skipping duplicate transaction: +WARN: Invalid transaction : Validation failed +INFO: Reconciliation complete: retained 180 entries, removed 10 invalid, 10 duplicates +``` diff --git a/grin/p2p/src/msg.rs b/grin/p2p/src/msg.rs new file mode 100644 index 0000000000..e45c92282e --- /dev/null +++ b/grin/p2p/src/msg.rs @@ -0,0 +1,43 @@ +// ...existing code... +use log::{info, warn}; +// ...existing code... + +impl Message { + pub fn read( + reader: &mut R, + msg_type: Option, + ) -> Result { + // ...existing code... + let header = MessageHeader::read(reader)?; + let msg_len = header.msg_len as usize; + + match msg_type { + Some(msg_type) => { + let max_len = max_msg_size(msg_type); + let current_max_len = max_len * 4; // Current 4x limit + if msg_len > current_max_len { + return Err(Error::MsgTooLarge(msg_len, current_max_len)); + } + info!( + "Received {:?} message: size={} bytes, 1x limit={} bytes, 4x limit={} bytes", + msg_type, msg_len, max_len, current_max_len + ); + if msg_len > max_len { + warn!( + "Message size ({} bytes) exceeds 1x limit ({} bytes) for type {:?}", + msg_len, max_len, msg_type + ); + } + } + None => { + info!("Received unknown message type: size={} bytes", msg_len); + } + } + + let mut payload = vec![0u8; msg_len]; + reader.read_exact(&mut payload)?; + Ok(Message { header, payload }) + } + // ...existing code... +} +// ...existing code... diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 3103db0975..560668fe25 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -34,6 +34,7 @@ use crate::types::{ }; use crate::util::secp::pedersen::RangeProof; use bytes::Bytes; +use log::{info, warn}; use num::FromPrimitive; use std::fs::File; use std::io::{Read, Write}; @@ -97,7 +98,7 @@ fn default_max_msg_size() -> u64 { } // Max msg size for each msg type. -fn max_msg_size(msg_type: Type) -> u64 { +pub fn max_msg_size(msg_type: Type) -> u64 { match msg_type { Type::Error => 0, Type::Hand => 128, @@ -172,7 +173,7 @@ impl Msg { /// /// Note: We return a MsgHeaderWrapper here as we may encounter an unknown msg type. /// -pub fn read_header( +pub fn read_header( stream: &mut R, version: ProtocolVersion, ) -> Result { @@ -186,7 +187,7 @@ pub fn read_header( /// Read a single item from the provided stream, always blocking until we /// have a result (or timeout). /// Returns the item and the total bytes read. -pub fn read_item( +pub fn read_item( stream: &mut R, version: ProtocolVersion, ) -> Result<(T, u64), Error> { @@ -197,7 +198,7 @@ pub fn read_item( /// Read a message body from the provided stream, always blocking /// until we have a result (or timeout). -pub fn read_body( +pub fn read_body( h: &MsgHeader, stream: &mut R, version: ProtocolVersion, @@ -208,14 +209,14 @@ pub fn read_body( } /// Read (an unknown) message from the provided stream and discard it. -pub fn read_discard(msg_len: u64, stream: &mut R) -> Result<(), Error> { +pub fn read_discard(msg_len: u64, stream: &mut R) -> Result<(), Error> { let mut buffer = vec![0u8; msg_len as usize]; stream.read_exact(&mut buffer)?; Ok(()) } /// Reads a full message from the underlying stream. -pub fn read_message( +pub fn read_message( stream: &mut R, version: ProtocolVersion, msg_type: Type, @@ -322,6 +323,24 @@ impl Writeable for MsgHeader { } } +impl MsgHeader { + /// Read a message header from the provided reader + pub fn read(reader: &mut R) -> Result { + let mut head = vec![0u8; MsgHeader::LEN]; + reader.read_exact(&mut head)?; + let header: MsgHeaderWrapper = ser::deserialize( + &mut &head[..], + ProtocolVersion::local(), + DeserializationMode::default(), + )?; + + match header { + MsgHeaderWrapper::Known(header) => Ok(header), + MsgHeaderWrapper::Unknown(_, _) => Err(Error::BadMessage), + } + } +} + impl Readable for MsgHeaderWrapper { fn read(reader: &mut R) -> Result { let m = magic(); @@ -986,3 +1005,41 @@ impl fmt::Debug for Consumed { } } } + +impl Message { + pub fn read( + reader: &mut R, + msg_type: Option, + ) -> Result, Error> { + use log::{info, warn}; + let header = MsgHeader::read(reader)?; + let msg_len = header.msg_len; + + match msg_type { + Some(msg_type) => { + let max_len = max_msg_size(msg_type); + let current_max_len = max_len * 4; // Current 4x limit + if msg_len > current_max_len { + return Err(Error::MsgTooLarge(msg_len as usize, current_max_len)); + } + info!( + "Received {:?} message: size={} bytes, 1x limit={} bytes, 4x limit={} bytes", + msg_type, msg_len, max_len, current_max_len + ); + if msg_len > max_len { + warn!( + "Message size ({} bytes) exceeds 1x limit ({} bytes) for type {:?}", + msg_len, max_len, msg_type + ); + } + } + None => { + info!("Received unknown message type: size={} bytes", msg_len); + } + } + + let mut payload = vec![0u8; msg_len as usize]; + reader.read_exact(&mut payload)?; + std::result::Result::Ok(payload) + } +} diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 07765496c1..bcec332d6a 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -90,6 +90,7 @@ pub enum Error { PeerNotBanned, PeerException, Internal, + MsgTooLarge(usize, u64), // Message size, maximum allowed size } impl From for Error { @@ -113,6 +114,38 @@ impl From for Error { } } +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Error::Serialization(ref e) => write!(f, "Serialization error: {}", e), + Error::Connection(ref e) => write!(f, "Connection error: {}", e), + Error::BadMessage => write!(f, "Bad message"), + Error::UnexpectedMessage => write!(f, "Unexpected message"), + Error::MsgLen => write!(f, "Wrong message length"), + Error::Banned => write!(f, "Peer banned"), + Error::ConnectionClose => write!(f, "Connection closed"), + Error::Timeout => write!(f, "Connection timed out"), + Error::Store(ref e) => write!(f, "Store error: {}", e), + Error::Chain(ref e) => write!(f, "Chain error: {}", e), + Error::PeerWithSelf => write!(f, "Connect to self"), + Error::NoDandelionRelay => write!(f, "No Dandelion relay"), + Error::GenesisMismatch { us, peer } => { + write!(f, "Genesis mismatch: our={}, peer={}", us, peer) + } + Error::Send(ref s) => write!(f, "Send error: {}", s), + Error::PeerNotFound => write!(f, "Peer not found"), + Error::PeerNotBanned => write!(f, "Peer not banned"), + Error::PeerException => write!(f, "Peer exception"), + Error::Internal => write!(f, "Internal error"), + Error::MsgTooLarge(size, max) => write!( + f, + "Message too large: {} bytes, maximum: {} bytes", + size, max + ), + } + } +} + #[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub struct PeerAddr(pub SocketAddr); diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index 5cfd92c107..5e9e8e20ba 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -577,7 +577,8 @@ impl Server { // this call is blocking and makes sure all peers stop, however // we can't be sure that we stopped a listener blocked on accept, so we don't join the p2p thread self.p2p.stop(); - let _ = self.lock_file.unlock(); + // let _ = self.lock_file.unlock(); + let _ = fs2::FileExt::unlock(&*self.lock_file); warn!("Shutdown complete"); } diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000000..c52a636d7a --- /dev/null +++ b/src/main.rs @@ -0,0 +1,9 @@ +use std::error::Error; + +fn main() -> Result<(), Box> { + env_logger::init(); + + // ...existing code... + + Ok(()) +} diff --git a/tests/p2p_msg_tests.rs b/tests/p2p_msg_tests.rs new file mode 100644 index 0000000000..787d2c3b0b --- /dev/null +++ b/tests/p2p_msg_tests.rs @@ -0,0 +1,103 @@ +use grin_core::global; +use grin_core::global::set_local_chain_type; +use grin_core::global::ChainTypes; +use grin_core::ser::BinWriter; +use grin_core::ser::ProtocolVersion; +use grin_core::ser::Writeable; +use grin_p2p::msg::{Message, MsgHeader, Type}; +use std::convert::TryInto; +use std::io::Cursor; +use std::vec::Vec; + +// Make sure chain type is initialized only once for all tests +static INIT: std::sync::Once = std::sync::Once::new(); + +fn setup() { + INIT.call_once(|| { + global::set_local_chain_type(global::ChainTypes::AutomatedTesting); + // Make sure we're calling this before any tests run + // This ensures GLOBAL_CHAIN_TYPE is properly set + let _ = global::get_chain_type(); + }); +} + +#[test] +fn test_message_too_large() { + // Ensure chain type is set at the very start of the test + global::set_local_chain_type(global::ChainTypes::AutomatedTesting); + + let msg_type = Type::Block; + let max_len = grin_p2p::msg::max_msg_size(msg_type); + let payload = vec![0u8; (max_len * 4 + 1).try_into().unwrap()]; // Exceeds 4x limit + let header = MsgHeader::new(msg_type, payload.len() as u64); + + let mut buffer = Vec::new(); + { + let mut bin_writer = BinWriter::new(&mut buffer, ProtocolVersion::local()); + header.write(&mut bin_writer).unwrap(); + } + buffer.extend(&payload); + let mut cursor = Cursor::new(buffer); + + let result = Message::read(&mut cursor, Some(msg_type)); + assert!(result.is_err(), "Expected error for oversized message"); +} + +#[test] +fn test_message_size_logging() { + setup(); + + let msg_type = Type::Block; + let max_len = grin_p2p::msg::max_msg_size(msg_type); + let payload = vec![0u8; (max_len + 1000).try_into().unwrap()]; // Exceeds 1x but within 4x + let header = MsgHeader::new(msg_type, payload.len() as u64); + let mut buffer = Vec::new(); + { + let mut bin_writer = BinWriter::new(&mut buffer, ProtocolVersion::local()); + header.write(&mut bin_writer).unwrap(); + } + buffer.extend(&payload); + let mut cursor = Cursor::new(buffer); + + let result = Message::read(&mut cursor, Some(msg_type)); + assert!(result.is_ok(), "Failed to read message: {:?}", result.err()); + // Check logs manually or with a log capture utility if needed +} + +fn main() -> Result<(), Box> { + env_logger::init(); + // Set chain type to ensure global state is initialized + set_local_chain_type(ChainTypes::AutomatedTesting); + let msg_type = Type::Block; + let max_len = grin_p2p::msg::max_msg_size(msg_type); + let payload = vec![0u8; (max_len + 1000).try_into().unwrap()]; // Exceeds 1x but within 4x + let header = MsgHeader::new(msg_type, payload.len() as u64); + + let mut buffer = Vec::new(); + { + let mut bin_writer = BinWriter::new(&mut buffer, ProtocolVersion::local()); + header.write(&mut bin_writer).unwrap(); + } + buffer.extend(&payload); + let mut cursor = Cursor::new(buffer); + + let result = Message::read(&mut cursor, Some(msg_type)); + assert!(result.is_ok(), "Failed to read message: {:?}", result.err()); + // Check logs manually or with a log capture utility if needed + + let payload = vec![0u8; (max_len * 4 + 1).try_into().unwrap()]; // Exceeds 4x limit + let header = MsgHeader::new(msg_type, payload.len() as u64); + + let mut buffer = Vec::new(); + { + let mut bin_writer = BinWriter::new(&mut buffer, ProtocolVersion::local()); + header.write(&mut bin_writer).unwrap(); + } + buffer.extend(&payload); + let mut cursor = Cursor::new(buffer); + + let result = Message::read(&mut cursor, Some(msg_type)); + assert!(result.is_err(), "Expected error for oversized message"); + + Ok(()) +} diff --git a/util/src/macros.rs b/util/src/macros.rs index 12400bd9f0..cf4bf19363 100644 --- a/util/src/macros.rs +++ b/util/src/macros.rs @@ -124,7 +124,9 @@ macro_rules! impl_array_newtype { } } - #[cfg_attr(feature = "clippy", allow(expl_impl_clone_on_copy))] // we don't define the `struct`, we have to explicitly impl + // Single implementation for Clone - no need for conditional compilation + // as both branches were identical + #[allow(expl_impl_clone_on_copy)] // we don't define the `struct`, we have to explicitly impl impl Clone for $thing { #[inline] fn clone(&self) -> $thing { @@ -159,16 +161,16 @@ macro_rules! impl_array_newtype { #[macro_export] macro_rules! impl_array_newtype_encodable { ($thing:ident, $ty:ty, $len:expr) => { - #[cfg(feature = "serde")] - impl<'de> $crate::serde::Deserialize<'de> for $thing { + // Implement serde traits unconditionally + impl<'de> ::serde::Deserialize<'de> for $thing { fn deserialize(deserializer: D) -> Result where - D: $crate::serde::Deserializer<'de>, + D: ::serde::Deserializer<'de>, { - use $crate::std::fmt::{self, Formatter}; + use ::std::fmt::{self, Formatter}; struct Visitor; - impl<'de> $crate::serde::de::Visitor<'de> for Visitor { + impl<'de> ::serde::de::Visitor<'de> for Visitor { type Value = $thing; fn expecting(&self, formatter: &mut Formatter) -> fmt::Result { @@ -178,14 +180,14 @@ macro_rules! impl_array_newtype_encodable { #[inline] fn visit_seq(self, mut seq: A) -> Result where - A: $crate::serde::de::SeqAccess<'de>, + A: ::serde::de::SeqAccess<'de>, { let mut ret: [$ty; $len] = [0; $len]; for item in ret.iter_mut() { *item = match seq.next_element()? { Some(c) => c, None => { - return Err($crate::serde::de::Error::custom("end of stream")); + return Err(::serde::de::Error::custom("end of stream")); } }; } @@ -197,11 +199,10 @@ macro_rules! impl_array_newtype_encodable { } } - #[cfg(feature = "serde")] - impl $crate::serde::Serialize for $thing { + impl ::serde::Serialize for $thing { fn serialize(&self, serializer: S) -> Result where - S: $crate::serde::Serializer, + S: ::serde::Serializer, { let &$thing(ref dat) = self; (&dat[..]).serialize(serializer) From 77fcb0e79ebfbc0ffede06bed21feed5b7076715 Mon Sep 17 00:00:00 2001 From: syntaxjak Date: Mon, 15 Sep 2025 12:03:00 -0500 Subject: [PATCH 2/6] p2p(codec): log message sizes and warn on >1x; demote Ping/Pong to debug (#2825) --- p2p/src/codec.rs | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/p2p/src/codec.rs b/p2p/src/codec.rs index 53278d777e..efae839f38 100644 --- a/p2p/src/codec.rs +++ b/p2p/src/codec.rs @@ -22,6 +22,7 @@ use crate::core::global::header_size_bytes; use crate::core::ser::{BufReader, ProtocolVersion, Readable}; +use crate::msg::max_msg_size; use crate::msg::{Message, MsgHeader, MsgHeaderWrapper, Type}; use crate::types::{AttachmentMeta, AttachmentUpdate, Error}; use crate::{ @@ -30,6 +31,7 @@ use crate::{ }; use bytes::{Buf, BufMut, Bytes, BytesMut}; use core::ser::Reader; +use log::{info, warn}; use std::cmp::min; use std::io::Read; use std::mem; @@ -152,6 +154,34 @@ impl Codec { self.state = Header(header); } Header(Known(header)) => { + let msg_type = header.msg_type; + let msg_len = header.msg_len as usize; + + let one_x = max_msg_size(msg_type) as usize; + let four_x = one_x * 4; + + match msg_type { + Type::Ping | Type::Pong => { + debug!( + "p2p(codec): {:?} size={} (1x={}, 4x={})", + msg_type, msg_len, one_x, four_x + ); + } + + _ => { + info!( + "p2p(codec): {:?} size={} (1x={}, 4x={})", + msg_type, msg_len, one_x, four_x + ); + } + } + if msg_len > one_x { + warn!( + "p2p(codec): {:?} exceeds 1x limit ({} > {})", + msg_type, msg_len, one_x, + ); + } + let mut raw = self.buffer.split_to(next_len).freeze(); if header.msg_type == Type::Headers { // Special consideration for a list of headers, as we want to verify and process From 5dc569ed7c62db0d6fd69d5e4a306a609d89a632 Mon Sep 17 00:00:00 2001 From: syntaxjak Date: Mon, 15 Sep 2025 16:45:16 -0500 Subject: [PATCH 3/6] p2p: raised 1x header size to 420B (was 365); update batched Headers accordingly --- p2p/src/msg.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 560668fe25..c024425d21 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -108,8 +108,8 @@ pub fn max_msg_size(msg_type: Type) -> u64 { Type::GetPeerAddrs => 4, Type::PeerAddrs => 4 + (1 + 16 + 2) * MAX_PEER_ADDRS as u64, Type::GetHeaders => 1 + 32 * MAX_LOCATORS as u64, - Type::Header => 365, - Type::Headers => 2 + 365 * MAX_BLOCK_HEADERS as u64, + Type::Header => 420, + Type::Headers => 2 + 420 * MAX_BLOCK_HEADERS as u64, Type::GetBlock => 32, Type::Block => max_block_size(), Type::GetCompactBlock => 32, From 9c1d567c8116bb5b2e7044446e1661d42d511795 Mon Sep 17 00:00:00 2001 From: syntaxjak Date: Wed, 17 Sep 2025 08:56:02 -0500 Subject: [PATCH 4/6] Revert upstream PR #3810 This removes changes introducd by #3810 (AI-generated slop), keeping only my own message size logging modifications. --- core/src/global.rs | 2 +- core/src/ser.rs | 5 -- doc/macros.md | 46 ---------------- doc/pool/transaction_pool.md | 21 ------- grin/p2p/src/msg.rs | 43 --------------- p2p/src/msg.rs | 69 ++--------------------- p2p/src/types.rs | 33 ----------- servers/src/grin/server.rs | 3 +- src/main.rs | 9 --- tests/p2p_msg_tests.rs | 103 ----------------------------------- util/src/macros.rs | 23 ++++---- 11 files changed, 19 insertions(+), 338 deletions(-) delete mode 100644 doc/macros.md delete mode 100644 doc/pool/transaction_pool.md delete mode 100644 grin/p2p/src/msg.rs delete mode 100644 src/main.rs delete mode 100644 tests/p2p_msg_tests.rs diff --git a/core/src/global.rs b/core/src/global.rs index 4cd14681d7..37fb516894 100644 --- a/core/src/global.rs +++ b/core/src/global.rs @@ -197,7 +197,7 @@ pub fn get_chain_type() -> ChainTypes { CHAIN_TYPE.with(|chain_type| match chain_type.get() { None => { if !GLOBAL_CHAIN_TYPE.is_init() { - std::panic!("GLOBAL_CHAIN_TYPE and CHAIN_TYPE unset. Consider set_local_chain_type() in tests."); + panic!("GLOBAL_CHAIN_TYPE and CHAIN_TYPE unset. Consider set_local_chain_type() in tests."); } let chain_type = GLOBAL_CHAIN_TYPE.borrow(); set_local_chain_type(chain_type); diff --git a/core/src/ser.rs b/core/src/ser.rs index 96636fc289..38cd9aa1b7 100644 --- a/core/src/ser.rs +++ b/core/src/ser.rs @@ -361,11 +361,6 @@ impl ProtocolVersion { PROTOCOL_VERSION } - /// Default implementation that returns the current protocol version - pub fn default() -> ProtocolVersion { - PROTOCOL_VERSION - } - /// We need to specify a protocol version for our local database. /// Regardless of specific version used when sending/receiving data between peers /// we need to take care with serialization/deserialization of data locally in the db. diff --git a/doc/macros.md b/doc/macros.md deleted file mode 100644 index 3456968a21..0000000000 --- a/doc/macros.md +++ /dev/null @@ -1,46 +0,0 @@ -# Macros for Array Newtypes - -The `grin_util` crate provides several macros for working with array newtypes - wrapper types around fixed-size arrays. These macros help implement common traits and functionality for these types. - -## Available Macros - -### `impl_array_newtype` - -Implements standard array traits and behavior for newtype wrappers around fixed-size arrays. This includes: - -- Methods like `as_ptr()`, `as_mut_ptr()`, `len()`, etc. -- Indexing via `Index` traits -- Comparison traits (`PartialEq`, `Eq`, `PartialOrd`, `Ord`) -- Common traits like `Clone`, `Copy`, and `Hash` - -### `impl_array_newtype_encodable` - -Implements serialization and deserialization support via Serde for newtype wrappers. - -### `impl_array_newtype_show` - -Implements the `Debug` trait for pretty-printing the array newtype. - -### `impl_index_newtype` - -Implements various indexing operations for the newtype. This is automatically called by `impl_array_newtype`. - -## Usage Examples - -```rust -// Define a newtype for a 32-byte array -pub struct ChainCode([u8; 32]); - -// Implement standard array traits -impl_array_newtype!(ChainCode, u8, 32); - -// Implement Debug formatting -impl_array_newtype_show!(ChainCode); - -// Implement Serde serialization/deserialization -impl_array_newtype_encodable!(ChainCode, u8, 32); -``` - -## Notes on Feature Flags - -With recent Rust versions, conditional compilation within macros is handled differently. The `serde` and other features are now defined at the crate level rather than inside the macros themselves, which prevents warnings about unexpected `cfg` conditions. diff --git a/doc/pool/transaction_pool.md b/doc/pool/transaction_pool.md deleted file mode 100644 index d719dd97c8..0000000000 --- a/doc/pool/transaction_pool.md +++ /dev/null @@ -1,21 +0,0 @@ -## Transaction Pool - -Grin's transaction pool is designed to hold all transactions that are not yet included in a block. - -The transaction pool is split into a stempool and a txpool. The stempool contains "stem" transactions, which are less actively propagated to the rest of the network, as well as txs received via Dandelion "stem" phase. The txpool contains transactions that may be directly propagated to the network, as well as txs received via Dandelion "fluff" phase. - -### Reconciliation - -The `Pool::reconcile` function validates transactions in the stempool or txpool against a given block header and removes invalid or duplicated transactions (present in txpool). The optimized implementation filters entries in-place, reducing validations from O(n² + n*m) to O(n + m), where n is the number of transactions in the pool being reconciled and m is the number of transactions in txpool. - -Reconciliation logs include: -- Number of entries before/after reconciliation -- Count of invalid or duplicated transactions removed - -Example: -``` -INFO: Starting transaction pool reconciliation with 200 entries -WARN: Skipping duplicate transaction: -WARN: Invalid transaction : Validation failed -INFO: Reconciliation complete: retained 180 entries, removed 10 invalid, 10 duplicates -``` diff --git a/grin/p2p/src/msg.rs b/grin/p2p/src/msg.rs deleted file mode 100644 index e45c92282e..0000000000 --- a/grin/p2p/src/msg.rs +++ /dev/null @@ -1,43 +0,0 @@ -// ...existing code... -use log::{info, warn}; -// ...existing code... - -impl Message { - pub fn read( - reader: &mut R, - msg_type: Option, - ) -> Result { - // ...existing code... - let header = MessageHeader::read(reader)?; - let msg_len = header.msg_len as usize; - - match msg_type { - Some(msg_type) => { - let max_len = max_msg_size(msg_type); - let current_max_len = max_len * 4; // Current 4x limit - if msg_len > current_max_len { - return Err(Error::MsgTooLarge(msg_len, current_max_len)); - } - info!( - "Received {:?} message: size={} bytes, 1x limit={} bytes, 4x limit={} bytes", - msg_type, msg_len, max_len, current_max_len - ); - if msg_len > max_len { - warn!( - "Message size ({} bytes) exceeds 1x limit ({} bytes) for type {:?}", - msg_len, max_len, msg_type - ); - } - } - None => { - info!("Received unknown message type: size={} bytes", msg_len); - } - } - - let mut payload = vec![0u8; msg_len]; - reader.read_exact(&mut payload)?; - Ok(Message { header, payload }) - } - // ...existing code... -} -// ...existing code... diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index c024425d21..238011aea1 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -34,7 +34,6 @@ use crate::types::{ }; use crate::util::secp::pedersen::RangeProof; use bytes::Bytes; -use log::{info, warn}; use num::FromPrimitive; use std::fs::File; use std::io::{Read, Write}; @@ -98,7 +97,7 @@ fn default_max_msg_size() -> u64 { } // Max msg size for each msg type. -pub fn max_msg_size(msg_type: Type) -> u64 { +fn max_msg_size(msg_type: Type) -> u64 { match msg_type { Type::Error => 0, Type::Hand => 128, @@ -173,7 +172,7 @@ impl Msg { /// /// Note: We return a MsgHeaderWrapper here as we may encounter an unknown msg type. /// -pub fn read_header( +pub fn read_header( stream: &mut R, version: ProtocolVersion, ) -> Result { @@ -187,7 +186,7 @@ pub fn read_header( /// Read a single item from the provided stream, always blocking until we /// have a result (or timeout). /// Returns the item and the total bytes read. -pub fn read_item( +pub fn read_item( stream: &mut R, version: ProtocolVersion, ) -> Result<(T, u64), Error> { @@ -198,7 +197,7 @@ pub fn read_item( /// Read a message body from the provided stream, always blocking /// until we have a result (or timeout). -pub fn read_body( +pub fn read_body( h: &MsgHeader, stream: &mut R, version: ProtocolVersion, @@ -209,14 +208,14 @@ pub fn read_body( } /// Read (an unknown) message from the provided stream and discard it. -pub fn read_discard(msg_len: u64, stream: &mut R) -> Result<(), Error> { +pub fn read_discard(msg_len: u64, stream: &mut R) -> Result<(), Error> { let mut buffer = vec![0u8; msg_len as usize]; stream.read_exact(&mut buffer)?; Ok(()) } /// Reads a full message from the underlying stream. -pub fn read_message( +pub fn read_message( stream: &mut R, version: ProtocolVersion, msg_type: Type, @@ -323,24 +322,6 @@ impl Writeable for MsgHeader { } } -impl MsgHeader { - /// Read a message header from the provided reader - pub fn read(reader: &mut R) -> Result { - let mut head = vec![0u8; MsgHeader::LEN]; - reader.read_exact(&mut head)?; - let header: MsgHeaderWrapper = ser::deserialize( - &mut &head[..], - ProtocolVersion::local(), - DeserializationMode::default(), - )?; - - match header { - MsgHeaderWrapper::Known(header) => Ok(header), - MsgHeaderWrapper::Unknown(_, _) => Err(Error::BadMessage), - } - } -} - impl Readable for MsgHeaderWrapper { fn read(reader: &mut R) -> Result { let m = magic(); @@ -1005,41 +986,3 @@ impl fmt::Debug for Consumed { } } } - -impl Message { - pub fn read( - reader: &mut R, - msg_type: Option, - ) -> Result, Error> { - use log::{info, warn}; - let header = MsgHeader::read(reader)?; - let msg_len = header.msg_len; - - match msg_type { - Some(msg_type) => { - let max_len = max_msg_size(msg_type); - let current_max_len = max_len * 4; // Current 4x limit - if msg_len > current_max_len { - return Err(Error::MsgTooLarge(msg_len as usize, current_max_len)); - } - info!( - "Received {:?} message: size={} bytes, 1x limit={} bytes, 4x limit={} bytes", - msg_type, msg_len, max_len, current_max_len - ); - if msg_len > max_len { - warn!( - "Message size ({} bytes) exceeds 1x limit ({} bytes) for type {:?}", - msg_len, max_len, msg_type - ); - } - } - None => { - info!("Received unknown message type: size={} bytes", msg_len); - } - } - - let mut payload = vec![0u8; msg_len as usize]; - reader.read_exact(&mut payload)?; - std::result::Result::Ok(payload) - } -} diff --git a/p2p/src/types.rs b/p2p/src/types.rs index bcec332d6a..07765496c1 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -90,7 +90,6 @@ pub enum Error { PeerNotBanned, PeerException, Internal, - MsgTooLarge(usize, u64), // Message size, maximum allowed size } impl From for Error { @@ -114,38 +113,6 @@ impl From for Error { } } -impl fmt::Display for Error { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Error::Serialization(ref e) => write!(f, "Serialization error: {}", e), - Error::Connection(ref e) => write!(f, "Connection error: {}", e), - Error::BadMessage => write!(f, "Bad message"), - Error::UnexpectedMessage => write!(f, "Unexpected message"), - Error::MsgLen => write!(f, "Wrong message length"), - Error::Banned => write!(f, "Peer banned"), - Error::ConnectionClose => write!(f, "Connection closed"), - Error::Timeout => write!(f, "Connection timed out"), - Error::Store(ref e) => write!(f, "Store error: {}", e), - Error::Chain(ref e) => write!(f, "Chain error: {}", e), - Error::PeerWithSelf => write!(f, "Connect to self"), - Error::NoDandelionRelay => write!(f, "No Dandelion relay"), - Error::GenesisMismatch { us, peer } => { - write!(f, "Genesis mismatch: our={}, peer={}", us, peer) - } - Error::Send(ref s) => write!(f, "Send error: {}", s), - Error::PeerNotFound => write!(f, "Peer not found"), - Error::PeerNotBanned => write!(f, "Peer not banned"), - Error::PeerException => write!(f, "Peer exception"), - Error::Internal => write!(f, "Internal error"), - Error::MsgTooLarge(size, max) => write!( - f, - "Message too large: {} bytes, maximum: {} bytes", - size, max - ), - } - } -} - #[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub struct PeerAddr(pub SocketAddr); diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index 5e9e8e20ba..5cfd92c107 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -577,8 +577,7 @@ impl Server { // this call is blocking and makes sure all peers stop, however // we can't be sure that we stopped a listener blocked on accept, so we don't join the p2p thread self.p2p.stop(); - // let _ = self.lock_file.unlock(); - let _ = fs2::FileExt::unlock(&*self.lock_file); + let _ = self.lock_file.unlock(); warn!("Shutdown complete"); } diff --git a/src/main.rs b/src/main.rs deleted file mode 100644 index c52a636d7a..0000000000 --- a/src/main.rs +++ /dev/null @@ -1,9 +0,0 @@ -use std::error::Error; - -fn main() -> Result<(), Box> { - env_logger::init(); - - // ...existing code... - - Ok(()) -} diff --git a/tests/p2p_msg_tests.rs b/tests/p2p_msg_tests.rs deleted file mode 100644 index 787d2c3b0b..0000000000 --- a/tests/p2p_msg_tests.rs +++ /dev/null @@ -1,103 +0,0 @@ -use grin_core::global; -use grin_core::global::set_local_chain_type; -use grin_core::global::ChainTypes; -use grin_core::ser::BinWriter; -use grin_core::ser::ProtocolVersion; -use grin_core::ser::Writeable; -use grin_p2p::msg::{Message, MsgHeader, Type}; -use std::convert::TryInto; -use std::io::Cursor; -use std::vec::Vec; - -// Make sure chain type is initialized only once for all tests -static INIT: std::sync::Once = std::sync::Once::new(); - -fn setup() { - INIT.call_once(|| { - global::set_local_chain_type(global::ChainTypes::AutomatedTesting); - // Make sure we're calling this before any tests run - // This ensures GLOBAL_CHAIN_TYPE is properly set - let _ = global::get_chain_type(); - }); -} - -#[test] -fn test_message_too_large() { - // Ensure chain type is set at the very start of the test - global::set_local_chain_type(global::ChainTypes::AutomatedTesting); - - let msg_type = Type::Block; - let max_len = grin_p2p::msg::max_msg_size(msg_type); - let payload = vec![0u8; (max_len * 4 + 1).try_into().unwrap()]; // Exceeds 4x limit - let header = MsgHeader::new(msg_type, payload.len() as u64); - - let mut buffer = Vec::new(); - { - let mut bin_writer = BinWriter::new(&mut buffer, ProtocolVersion::local()); - header.write(&mut bin_writer).unwrap(); - } - buffer.extend(&payload); - let mut cursor = Cursor::new(buffer); - - let result = Message::read(&mut cursor, Some(msg_type)); - assert!(result.is_err(), "Expected error for oversized message"); -} - -#[test] -fn test_message_size_logging() { - setup(); - - let msg_type = Type::Block; - let max_len = grin_p2p::msg::max_msg_size(msg_type); - let payload = vec![0u8; (max_len + 1000).try_into().unwrap()]; // Exceeds 1x but within 4x - let header = MsgHeader::new(msg_type, payload.len() as u64); - let mut buffer = Vec::new(); - { - let mut bin_writer = BinWriter::new(&mut buffer, ProtocolVersion::local()); - header.write(&mut bin_writer).unwrap(); - } - buffer.extend(&payload); - let mut cursor = Cursor::new(buffer); - - let result = Message::read(&mut cursor, Some(msg_type)); - assert!(result.is_ok(), "Failed to read message: {:?}", result.err()); - // Check logs manually or with a log capture utility if needed -} - -fn main() -> Result<(), Box> { - env_logger::init(); - // Set chain type to ensure global state is initialized - set_local_chain_type(ChainTypes::AutomatedTesting); - let msg_type = Type::Block; - let max_len = grin_p2p::msg::max_msg_size(msg_type); - let payload = vec![0u8; (max_len + 1000).try_into().unwrap()]; // Exceeds 1x but within 4x - let header = MsgHeader::new(msg_type, payload.len() as u64); - - let mut buffer = Vec::new(); - { - let mut bin_writer = BinWriter::new(&mut buffer, ProtocolVersion::local()); - header.write(&mut bin_writer).unwrap(); - } - buffer.extend(&payload); - let mut cursor = Cursor::new(buffer); - - let result = Message::read(&mut cursor, Some(msg_type)); - assert!(result.is_ok(), "Failed to read message: {:?}", result.err()); - // Check logs manually or with a log capture utility if needed - - let payload = vec![0u8; (max_len * 4 + 1).try_into().unwrap()]; // Exceeds 4x limit - let header = MsgHeader::new(msg_type, payload.len() as u64); - - let mut buffer = Vec::new(); - { - let mut bin_writer = BinWriter::new(&mut buffer, ProtocolVersion::local()); - header.write(&mut bin_writer).unwrap(); - } - buffer.extend(&payload); - let mut cursor = Cursor::new(buffer); - - let result = Message::read(&mut cursor, Some(msg_type)); - assert!(result.is_err(), "Expected error for oversized message"); - - Ok(()) -} diff --git a/util/src/macros.rs b/util/src/macros.rs index cf4bf19363..12400bd9f0 100644 --- a/util/src/macros.rs +++ b/util/src/macros.rs @@ -124,9 +124,7 @@ macro_rules! impl_array_newtype { } } - // Single implementation for Clone - no need for conditional compilation - // as both branches were identical - #[allow(expl_impl_clone_on_copy)] // we don't define the `struct`, we have to explicitly impl + #[cfg_attr(feature = "clippy", allow(expl_impl_clone_on_copy))] // we don't define the `struct`, we have to explicitly impl impl Clone for $thing { #[inline] fn clone(&self) -> $thing { @@ -161,16 +159,16 @@ macro_rules! impl_array_newtype { #[macro_export] macro_rules! impl_array_newtype_encodable { ($thing:ident, $ty:ty, $len:expr) => { - // Implement serde traits unconditionally - impl<'de> ::serde::Deserialize<'de> for $thing { + #[cfg(feature = "serde")] + impl<'de> $crate::serde::Deserialize<'de> for $thing { fn deserialize(deserializer: D) -> Result where - D: ::serde::Deserializer<'de>, + D: $crate::serde::Deserializer<'de>, { - use ::std::fmt::{self, Formatter}; + use $crate::std::fmt::{self, Formatter}; struct Visitor; - impl<'de> ::serde::de::Visitor<'de> for Visitor { + impl<'de> $crate::serde::de::Visitor<'de> for Visitor { type Value = $thing; fn expecting(&self, formatter: &mut Formatter) -> fmt::Result { @@ -180,14 +178,14 @@ macro_rules! impl_array_newtype_encodable { #[inline] fn visit_seq(self, mut seq: A) -> Result where - A: ::serde::de::SeqAccess<'de>, + A: $crate::serde::de::SeqAccess<'de>, { let mut ret: [$ty; $len] = [0; $len]; for item in ret.iter_mut() { *item = match seq.next_element()? { Some(c) => c, None => { - return Err(::serde::de::Error::custom("end of stream")); + return Err($crate::serde::de::Error::custom("end of stream")); } }; } @@ -199,10 +197,11 @@ macro_rules! impl_array_newtype_encodable { } } - impl ::serde::Serialize for $thing { + #[cfg(feature = "serde")] + impl $crate::serde::Serialize for $thing { fn serialize(&self, serializer: S) -> Result where - S: ::serde::Serializer, + S: $crate::serde::Serializer, { let &$thing(ref dat) = self; (&dat[..]).serialize(serializer) From fbe654c07178c881e63dbc7496007ee7679dfb29 Mon Sep 17 00:00:00 2001 From: syntaxjak Date: Wed, 17 Sep 2025 09:25:44 -0500 Subject: [PATCH 5/6] exposed max-msg-size as pub for codec logging --- p2p/src/msg.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 238011aea1..3acd2a9333 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -97,7 +97,7 @@ fn default_max_msg_size() -> u64 { } // Max msg size for each msg type. -fn max_msg_size(msg_type: Type) -> u64 { +pub fn max_msg_size(msg_type: Type) -> u64 { match msg_type { Type::Error => 0, Type::Hand => 128, From 6c2ef0f693bb44d1d1d0d56ba7255e4becdb41a7 Mon Sep 17 00:00:00 2001 From: syntaxjak Date: Fri, 26 Sep 2025 00:12:15 -0500 Subject: [PATCH 6/6] sync: fixed node stalls during first sync, no longer needs manual restarts --- servers/src/grin/sync/state_sync.rs | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/servers/src/grin/sync/state_sync.rs b/servers/src/grin/sync/state_sync.rs index bdd8d1fe4b..4f97dd6696 100644 --- a/servers/src/grin/sync/state_sync.rs +++ b/servers/src/grin/sync/state_sync.rs @@ -167,6 +167,8 @@ impl StateSync { if sync_need_restart || header_head.height == highest_height { if using_pibd { if sync_need_restart { + self.state_sync_reset(); + self.sync_state.clear_sync_error(); return true; } let (launch, _download_timeout) = self.state_sync_due(); @@ -358,12 +360,25 @@ impl StateSync { seg_id.identifier.clone(), ), }; + if let Err(e) = res { - info!( - "Error sending request to peer at {}, reason: {:?}", - p.info.addr, e - ); - self.sync_state.remove_pibd_segment(seg_id); + match e { + p2p::Error::Send(ref s) if s == "try_send disconnected" => { + warn!("pibd: peer {} send channel discconected; soft-restarting state sync", p.info.addr); + self.sync_state.remove_pibd_segment(seg_id); + self.sync_state.set_sync_error(chain::Error::SyncError( + "try_send disconnected".into(), + )); + continue; + } + _ => { + info!( + "pibd: error sending request to peer {}: {:?}", + p.info.addr, e + ); + self.sync_state.remove_pibd_segment(seg_id); + } + } } } }