Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions p2p/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

use crate::core::global::header_size_bytes;
use crate::core::ser::{BufReader, ProtocolVersion, Readable};
use crate::msg::max_msg_size;
use crate::msg::{Message, MsgHeader, MsgHeaderWrapper, Type};
use crate::types::{AttachmentMeta, AttachmentUpdate, Error};
use crate::{
Expand All @@ -30,6 +31,7 @@ use crate::{
};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use core::ser::Reader;
use log::{info, warn};
use std::cmp::min;
use std::io::Read;
use std::mem;
Expand Down Expand Up @@ -152,6 +154,34 @@ impl Codec {
self.state = Header(header);
}
Header(Known(header)) => {
let msg_type = header.msg_type;
let msg_len = header.msg_len as usize;

let one_x = max_msg_size(msg_type) as usize;
let four_x = one_x * 4;

match msg_type {
Type::Ping | Type::Pong => {
debug!(
"p2p(codec): {:?} size={} (1x={}, 4x={})",
msg_type, msg_len, one_x, four_x
);
}

_ => {
info!(
"p2p(codec): {:?} size={} (1x={}, 4x={})",
msg_type, msg_len, one_x, four_x
);
}
}
if msg_len > one_x {
warn!(
"p2p(codec): {:?} exceeds 1x limit ({} > {})",
msg_type, msg_len, one_x,
);
}

let mut raw = self.buffer.split_to(next_len).freeze();
if header.msg_type == Type::Headers {
// Special consideration for a list of headers, as we want to verify and process
Expand Down
6 changes: 3 additions & 3 deletions p2p/src/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ fn default_max_msg_size() -> u64 {
}

// Max msg size for each msg type.
fn max_msg_size(msg_type: Type) -> u64 {
pub fn max_msg_size(msg_type: Type) -> u64 {
match msg_type {
Type::Error => 0,
Type::Hand => 128,
Expand All @@ -107,8 +107,8 @@ fn max_msg_size(msg_type: Type) -> u64 {
Type::GetPeerAddrs => 4,
Type::PeerAddrs => 4 + (1 + 16 + 2) * MAX_PEER_ADDRS as u64,
Type::GetHeaders => 1 + 32 * MAX_LOCATORS as u64,
Type::Header => 365,
Type::Headers => 2 + 365 * MAX_BLOCK_HEADERS as u64,
Type::Header => 420,
Type::Headers => 2 + 420 * MAX_BLOCK_HEADERS as u64,
Type::GetBlock => 32,
Type::Block => max_block_size(),
Type::GetCompactBlock => 32,
Expand Down
25 changes: 20 additions & 5 deletions servers/src/grin/sync/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ impl StateSync {
if sync_need_restart || header_head.height == highest_height {
if using_pibd {
if sync_need_restart {
self.state_sync_reset();
self.sync_state.clear_sync_error();
return true;
}
let (launch, _download_timeout) = self.state_sync_due();
Expand Down Expand Up @@ -358,12 +360,25 @@ impl StateSync {
seg_id.identifier.clone(),
),
};

if let Err(e) = res {
info!(
"Error sending request to peer at {}, reason: {:?}",
p.info.addr, e
);
self.sync_state.remove_pibd_segment(seg_id);
match e {
p2p::Error::Send(ref s) if s == "try_send disconnected" => {
warn!("pibd: peer {} send channel discconected; soft-restarting state sync", p.info.addr);
self.sync_state.remove_pibd_segment(seg_id);
self.sync_state.set_sync_error(chain::Error::SyncError(
"try_send disconnected".into(),
));
continue;
}
_ => {
info!(
"pibd: error sending request to peer {}: {:?}",
p.info.addr, e
);
self.sync_state.remove_pibd_segment(seg_id);
}
}
}
}
}
Expand Down