diff --git a/p2p/src/codec.rs b/p2p/src/codec.rs index 53278d777e..efae839f38 100644 --- a/p2p/src/codec.rs +++ b/p2p/src/codec.rs @@ -22,6 +22,7 @@ use crate::core::global::header_size_bytes; use crate::core::ser::{BufReader, ProtocolVersion, Readable}; +use crate::msg::max_msg_size; use crate::msg::{Message, MsgHeader, MsgHeaderWrapper, Type}; use crate::types::{AttachmentMeta, AttachmentUpdate, Error}; use crate::{ @@ -30,6 +31,7 @@ use crate::{ }; use bytes::{Buf, BufMut, Bytes, BytesMut}; use core::ser::Reader; +use log::{info, warn}; use std::cmp::min; use std::io::Read; use std::mem; @@ -152,6 +154,34 @@ impl Codec { self.state = Header(header); } Header(Known(header)) => { + let msg_type = header.msg_type; + let msg_len = header.msg_len as usize; + + let one_x = max_msg_size(msg_type) as usize; + let four_x = one_x * 4; + + match msg_type { + Type::Ping | Type::Pong => { + debug!( + "p2p(codec): {:?} size={} (1x={}, 4x={})", + msg_type, msg_len, one_x, four_x + ); + } + + _ => { + info!( + "p2p(codec): {:?} size={} (1x={}, 4x={})", + msg_type, msg_len, one_x, four_x + ); + } + } + if msg_len > one_x { + warn!( + "p2p(codec): {:?} exceeds 1x limit ({} > {})", + msg_type, msg_len, one_x, + ); + } + let mut raw = self.buffer.split_to(next_len).freeze(); if header.msg_type == Type::Headers { // Special consideration for a list of headers, as we want to verify and process diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 3103db0975..3acd2a9333 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -97,7 +97,7 @@ fn default_max_msg_size() -> u64 { } // Max msg size for each msg type. -fn max_msg_size(msg_type: Type) -> u64 { +pub fn max_msg_size(msg_type: Type) -> u64 { match msg_type { Type::Error => 0, Type::Hand => 128, @@ -107,8 +107,8 @@ fn max_msg_size(msg_type: Type) -> u64 { Type::GetPeerAddrs => 4, Type::PeerAddrs => 4 + (1 + 16 + 2) * MAX_PEER_ADDRS as u64, Type::GetHeaders => 1 + 32 * MAX_LOCATORS as u64, - Type::Header => 365, - Type::Headers => 2 + 365 * MAX_BLOCK_HEADERS as u64, + Type::Header => 420, + Type::Headers => 2 + 420 * MAX_BLOCK_HEADERS as u64, Type::GetBlock => 32, Type::Block => max_block_size(), Type::GetCompactBlock => 32, diff --git a/servers/src/grin/sync/state_sync.rs b/servers/src/grin/sync/state_sync.rs index bdd8d1fe4b..4f97dd6696 100644 --- a/servers/src/grin/sync/state_sync.rs +++ b/servers/src/grin/sync/state_sync.rs @@ -167,6 +167,8 @@ impl StateSync { if sync_need_restart || header_head.height == highest_height { if using_pibd { if sync_need_restart { + self.state_sync_reset(); + self.sync_state.clear_sync_error(); return true; } let (launch, _download_timeout) = self.state_sync_due(); @@ -358,12 +360,25 @@ impl StateSync { seg_id.identifier.clone(), ), }; + if let Err(e) = res { - info!( - "Error sending request to peer at {}, reason: {:?}", - p.info.addr, e - ); - self.sync_state.remove_pibd_segment(seg_id); + match e { + p2p::Error::Send(ref s) if s == "try_send disconnected" => { + warn!("pibd: peer {} send channel discconected; soft-restarting state sync", p.info.addr); + self.sync_state.remove_pibd_segment(seg_id); + self.sync_state.set_sync_error(chain::Error::SyncError( + "try_send disconnected".into(), + )); + continue; + } + _ => { + info!( + "pibd: error sending request to peer {}: {:?}", + p.info.addr, e + ); + self.sync_state.remove_pibd_segment(seg_id); + } + } } } }