Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions pkgs/cli/src/node.zig
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ pub const Node = struct {
logger: zeam_utils.ModuleLogger,
db: database.Db,
key_manager: key_manager_lib.KeyManager,
// Track heap-allocated anchor_state for cleanup
anchor_state_owned: ?*types.BeamState = null,
Copy link
Member

@g11tech g11tech Jan 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this is because anchor state is not from the chain.states cache? which scenario will this be null?


const Self = @This();

Expand Down Expand Up @@ -154,9 +156,6 @@ pub const Node = struct {

// transfer ownership of the chain_options to ChainConfig
const chain_config = try ChainConfig.init(Chain.custom, chain_options);
var anchorState: types.BeamState = undefined;
try anchorState.genGenesisState(allocator, chain_config.genesis);
errdefer anchorState.deinit();

// TODO we seem to be needing one loop because then the events added to loop are not being fired
// in the order to which they have been added even with the an appropriate delay added
Expand All @@ -180,6 +179,29 @@ pub const Node = struct {
var db = try database.Db.open(allocator, options.logger_config.logger(.database), options.database_path);
errdefer db.deinit();

// Try to load the latest finalized state from the database, fallback to genesis
const chain_logger = options.logger_config.logger(.chain);
var anchorState: *types.BeamState = undefined;
if (try db.loadLatestFinalizedState()) |latest_finalized| {
anchorState = latest_finalized.state;
self.anchor_state_owned = anchorState;
chain_logger.info("resuming from finalized slot {d}", .{latest_finalized.slot});
} else {
// First run or no finalized state, use genesis
const genesis_state = try allocator.create(types.BeamState);
errdefer allocator.destroy(genesis_state);
try genesis_state.genGenesisState(allocator, chain_config.genesis);
anchorState = genesis_state;
self.anchor_state_owned = anchorState;
chain_logger.info("starting from genesis", .{});
}
errdefer {
if (self.anchor_state_owned) |owned| {
owned.deinit();
allocator.destroy(owned);
}
}

const num_validators: usize = @intCast(chain_config.genesis.numValidators());
self.key_manager = key_manager_lib.KeyManager.init(allocator);
errdefer self.key_manager.deinit();
Expand All @@ -192,7 +214,7 @@ pub const Node = struct {
try self.beam_node.init(allocator, .{
.nodeId = @intCast(options.node_key_index),
.config = chain_config,
.anchorState = &anchorState,
.anchorState = anchorState,
.backend = self.network.getNetworkInterface(),
.clock = &self.clock,
.validator_ids = validator_ids,
Expand All @@ -214,6 +236,11 @@ pub const Node = struct {
self.db.deinit();
self.loop.deinit();
event_broadcaster.deinitGlobalBroadcaster();

// Cleanup owned anchor_state if we allocated one
if (self.anchor_state_owned) |owned| {
self.allocator.destroy(owned);
}
}

pub fn run(self: *Node) !void {
Expand Down
141 changes: 141 additions & 0 deletions pkgs/database/src/rocksdb.zig
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,23 @@ pub fn RocksDB(comptime column_namespaces: []const ColumnNamespace) type {
);
}

/// Put the latest finalized slot metadata to this write batch
pub fn putLatestFinalizedSlot(
self: *WriteBatch,
comptime cn: ColumnNamespace,
slot: types.Slot,
) void {
const key = "latest_finalized_slot";
self.putToBatch(
types.Slot,
key,
slot,
cn,
"updated latest finalized slot metadata: slot={d}",
.{slot},
);
}

/// Put an unfinalized slot index entry to this write batch
pub fn putUnfinalizedSlotIndex(
self: *WriteBatch,
Expand Down Expand Up @@ -568,6 +585,56 @@ pub fn RocksDB(comptime column_namespaces: []const ColumnNamespace) type {
);
}

/// Load the latest finalized slot metadata from the database
pub fn loadLatestFinalizedSlot(self: *Self, comptime cn: ColumnNamespace) ?types.Slot {
const key = "latest_finalized_slot";
return self.loadFromDatabase(
types.Slot,
key,
cn,
"loaded latest finalized slot metadata",
.{},
);
}

/// Attempts to load the latest finalized state from the database
/// Returns null if no finalized state is found (e.g., first run)
pub fn loadLatestFinalizedState(
self: *Self,
) !?struct { state: *types.BeamState, slot: types.Slot } {
// Load the latest finalized slot from metadata
const finalized_slot = self.loadLatestFinalizedSlot(database.DbDefaultNamespace) orelse {
self.logger.info("no finalized state found in database, will use genesis", .{});
return null;
};

self.logger.info("found latest finalized slot {d}, loading block root...", .{finalized_slot});

// Load the block root for this finalized slot
const block_root = self.loadFinalizedSlotIndex(database.DbFinalizedSlotsNamespace, finalized_slot) orelse {
self.logger.warn("finalized slot {d} found in metadata but not in finalized index", .{finalized_slot});
return null;
};

// Load the state from the database
if (self.loadState(database.DbStatesNamespace, block_root)) |state| {
// Clone the state to allocator-managed memory
const state_ptr = try self.allocator.create(types.BeamState);
errdefer self.allocator.destroy(state_ptr);

try types.sszClone(self.allocator, types.BeamState, state, state_ptr);

// Clean up the loaded state (it was allocated in loadState)
@constCast(&state).deinit();

self.logger.info("successfully loaded finalized state at slot {d}", .{finalized_slot});
return .{ .state = state_ptr, .slot = finalized_slot };
} else {
self.logger.warn("finalized slot {d} found in index but state not in database", .{finalized_slot});
return null;
}
}

/// Load an unfinalized slot index from the database
pub fn loadUnfinalizedSlotIndex(self: *Self, comptime cn: ColumnNamespace, slot: types.Slot) ?[]types.Root {
const key = interface.formatUnfinalizedSlotKey(self.allocator, slot) catch |err| {
Expand Down Expand Up @@ -1093,3 +1160,77 @@ test "batch write and commit" {
try std.testing.expect(loaded_state_data.latest_finalized.slot == test_state.latest_finalized.slot);
try std.testing.expect(std.mem.eql(u8, &loaded_state_data.latest_finalized.root, &test_state.latest_finalized.root));
}

test "loadLatestFinalizedState" {
var arena_allocator = std.heap.ArenaAllocator.init(std.testing.allocator);
defer arena_allocator.deinit();
const allocator = arena_allocator.allocator();

var zeam_logger_config = zeam_utils.getTestLoggerConfig();

var tmp_dir = std.testing.tmpDir(.{});
defer tmp_dir.cleanup();

const data_dir = try tmp_dir.dir.realpathAlloc(allocator, ".");
defer allocator.free(data_dir);

var db = try database.Db.open(allocator, zeam_logger_config.logger(.database_test), data_dir);
defer db.deinit();

// Test 1: Empty database should return null
const empty_result = try db.loadLatestFinalizedState();
try std.testing.expect(empty_result == null);

// Test 2: Add multiple finalized states and verify it returns the latest one
var batch = db.initWriteBatch();
defer batch.deinit();

// Create three finalized states at different slots
const slot1: types.Slot = 100;
const slot2: types.Slot = 200;
const slot3: types.Slot = 300;

const root1 = test_helpers.createDummyRoot(0x11);
const root2 = test_helpers.createDummyRoot(0x22);
const root3 = test_helpers.createDummyRoot(0x33);

var state1 = try test_helpers.createDummyState(allocator, slot1, 4, 93, 0, 0, 0xAA, 0xBB);
defer state1.deinit();
var state2 = try test_helpers.createDummyState(allocator, slot2, 4, 93, 0, 0, 0xCC, 0xDD);
defer state2.deinit();
var state3 = try test_helpers.createDummyState(allocator, slot3, 4, 93, 0, 0, 0xEE, 0xFF);
defer state3.deinit();

// Save states to database
batch.putState(database.DbStatesNamespace, root1, state1);
batch.putState(database.DbStatesNamespace, root2, state2);
batch.putState(database.DbStatesNamespace, root3, state3);

// Save finalized slot indices
batch.putFinalizedSlotIndex(database.DbFinalizedSlotsNamespace, slot1, root1);
batch.putFinalizedSlotIndex(database.DbFinalizedSlotsNamespace, slot2, root2);
batch.putFinalizedSlotIndex(database.DbFinalizedSlotsNamespace, slot3, root3);

// Save latest finalized slot metadata
batch.putLatestFinalizedSlot(database.DbDefaultNamespace, slot3);

db.commit(&batch);

// Load the latest finalized state
const result = try db.loadLatestFinalizedState();
try std.testing.expect(result != null);

const latest = result.?;
defer {
latest.state.deinit();
allocator.destroy(latest.state);
}

// Verify it's the state from slot3 (the highest slot)
try std.testing.expect(latest.slot == slot3);
try std.testing.expect(latest.state.slot == state3.slot);
try std.testing.expect(latest.state.latest_justified.slot == state3.latest_justified.slot);
try std.testing.expect(std.mem.eql(u8, &latest.state.latest_justified.root, &state3.latest_justified.root));
try std.testing.expect(latest.state.latest_finalized.slot == state3.latest_finalized.slot);
try std.testing.expect(std.mem.eql(u8, &latest.state.latest_finalized.root, &state3.latest_finalized.root));
}
55 changes: 55 additions & 0 deletions pkgs/node/src/chain.zig
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub const ChainOpts = struct {
logger_config: *zeam_utils.ZeamLoggerConfig,
db: database.Db,
node_registry: *const NodeNameRegistry,
force_block_generation: bool = false,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
force_block_generation: bool = false,
force_block_production: bool = false,

just to go with the common lingo

};

pub const CachedProcessedBlockInfo = struct {
Expand Down Expand Up @@ -85,6 +86,7 @@ pub const BeamChain = struct {
last_emitted_finalized: types.Checkpoint,
connected_peers: *const std.StringHashMap(PeerInfo),
node_registry: *const NodeNameRegistry,
force_block_generation: bool,

const Self = @This();

Expand Down Expand Up @@ -121,6 +123,7 @@ pub const BeamChain = struct {
.last_emitted_finalized = fork_choice.fcStore.latest_finalized,
.connected_peers = connected_peers,
.node_registry = opts.node_registry,
.force_block_generation = opts.force_block_generation,
};
}

Expand Down Expand Up @@ -810,6 +813,9 @@ pub const BeamChain = struct {
});
}

// Update the latest finalized slot metadata
batch.putLatestFinalizedSlot(database.DbDefaultNamespace, latestFinalized.slot);

// 3. commit all batch ops for finalized indices before we prune
self.db.commit(&batch);

Expand Down Expand Up @@ -1002,6 +1008,55 @@ pub const BeamChain = struct {
.head_slot = head.slot,
};
}

/// Check if the chain is synced by verifying we're at or past the justified slot
/// and synced with peer finalized checkpoints.
/// Once past justified and synced with peers, validators can safely participate in consensus.
/// If blocks are produced while slightly behind peers, they will naturally get reorged.
pub fn isSynced(self: *Self) bool {
const our_head_slot = self.forkChoice.head.slot;
const our_justified_slot = self.forkChoice.fcStore.latest_justified.slot;
const our_finalized_slot = self.forkChoice.fcStore.latest_finalized.slot;

// If no peers connected, we can't verify sync status - assume not synced
// Unless force_block_generation is enabled, which allows block generation without peers
if (self.connected_peers.count() == 0 or self.force_block_generation) {
return false;
}

// We must be at or past the justified slot to participate
if (our_head_slot < our_justified_slot) {
self.module_logger.debug("not synced: our head slot {d} < our justified slot {d}", .{
our_head_slot,
our_justified_slot,
});
return false;
}

// Find the maximum finalized slot reported by any peer
var max_peer_finalized_slot: types.Slot = our_finalized_slot;

var peer_iter = self.connected_peers.iterator();
while (peer_iter.next()) |entry| {
const peer_info = entry.value_ptr;
if (peer_info.latest_status) |status| {
if (status.finalized_slot > max_peer_finalized_slot) {
max_peer_finalized_slot = status.finalized_slot;
}
}
}

// We must also be synced with peers (at or past max peer finalized slot)
if (our_head_slot < max_peer_finalized_slot) {
self.module_logger.debug("not synced: our head slot {d} < max peer finalized slot {d}", .{
our_head_slot,
max_peer_finalized_slot,
});
return false;
}

return true;
}
};

pub const BlockProcessingError = error{MissingPreState};
Expand Down
7 changes: 7 additions & 0 deletions pkgs/node/src/forkchoice.zig
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,13 @@ pub const ForkChoice = struct {
target_idx = nodes[target_idx].parent orelse return ForkChoiceError.InvalidTargetSearch;
}

// Ensure target is at or after the source (latest_justified) to maintain invariant: source.slot <= target.slot
// This prevents creating invalid attestations where source slot exceeds target slot
// If the calculated target is older than latest_justified, use latest_justified instead
if (nodes[target_idx].slot < self.fcStore.latest_justified.slot) {
return self.fcStore.latest_justified;
}

return types.Checkpoint{
.root = nodes[target_idx].blockRoot,
.slot = nodes[target_idx].slot,
Expand Down
28 changes: 28 additions & 0 deletions pkgs/node/src/validator_client.zig
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,20 @@ pub const ValidatorClient = struct {

pub fn maybeDoProposal(self: *Self, slot: usize) !?ValidatorClientOutput {
if (self.getSlotProposer(slot)) |slot_proposer_id| {
// Check if chain is synced before producing a block
if (!self.chain.isSynced()) {
const current_slot = self.chain.forkChoice.fcStore.timeSlots;
const head_slot = self.chain.forkChoice.head.slot;
self.logger.warn("skipping block production for slot={d} proposer={d}: chain not synced (current_slot={d}, head_slot={d}, behind={d})", .{
slot,
slot_proposer_id,
current_slot,
head_slot,
current_slot - head_slot,
});
return null;
}

// 1. construct the block
self.logger.debug("constructing block message & proposer attestation data for slot={d} proposer={d}", .{ slot, slot_proposer_id });
const produced_block = try self.chain.produceBlock(.{ .slot = slot, .proposer_index = slot_proposer_id });
Expand Down Expand Up @@ -144,6 +158,20 @@ pub const ValidatorClient = struct {

pub fn mayBeDoAttestation(self: *Self, slot: usize) !?ValidatorClientOutput {
if (self.ids.len == 0) return null;

// Check if chain is synced before producing attestations
if (!self.chain.isSynced()) {
const current_slot = self.chain.forkChoice.fcStore.timeSlots;
const head_slot = self.chain.forkChoice.head.slot;
self.logger.warn("skipping attestation production for slot={d}: chain not synced (current_slot={d}, head_slot={d}, behind={d})", .{
slot,
current_slot,
head_slot,
current_slot - head_slot,
});
return null;
}

const slot_proposer_id = self.getSlotProposer(slot);

self.logger.info("constructing attestation message for slot={d}", .{slot});
Expand Down