Skip to content

feat: implement fake DMQ node #2635

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 38 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
7562aae
refactor(dmq): move 'publisher' in 'client' sub-module
jpraynaud Jul 7, 2025
e9875f9
refactor(dmq): move 'consumer' in 'client' sub-module
jpraynaud Jul 7, 2025
9d3dcde
refactor(dmq): rename 'DmqConsumerPallas' to 'DmqConsumerClientPallas'
jpraynaud Jul 7, 2025
75113c9
refactor(dmq): rename 'DmqPublisherPallas' to 'DmqPublisherClientPallas'
jpraynaud Jul 7, 2025
0af6992
refactor(dmq): promote 'message' module to directory
jpraynaud Jul 8, 2025
2e2398b
feat(dmq): add 'DmqMessage' type to wrap a 'DmqMsg'
jpraynaud Jul 8, 2025
0292201
feat(dmq): add 'DmqPublisherServer' trait
jpraynaud Jul 8, 2025
c6aa11c
feat(dmq): add 'DmqPublisherServerPallas' implementation of 'DmqPubli…
jpraynaud Jul 8, 2025
f443976
feat(dmq): add 'DmqConsumerServer' trait
jpraynaud Jul 8, 2025
25076c1
feat(dmq): add 'MessageQueue' implementation for the consumer server
jpraynaud Jul 8, 2025
f9c05df
feat(dmq): add 'DmqConsumerServerPallas' implementation of 'DmqConsum…
jpraynaud Aug 8, 2025
f510273
feat(relay): add support for DMQ messages
jpraynaud Jul 8, 2025
b7e49ce
feat(relay): update passive relay for DMQ messages
jpraynaud Jul 8, 2025
cf081a3
feat(relay): update signer relay for DMQ messages
jpraynaud Jul 8, 2025
507f8cd
feat(relay): update signer command for DMQ messages
jpraynaud Jul 8, 2025
c7ce6e1
feat(relay): update aggregator relay for DMQ messages
jpraynaud Jul 8, 2025
6e73fef
feat(relay): update aggregator command for DMQ messages
jpraynaud Jul 8, 2025
afcf385
feat(relay): update integration test for DMQ messages
jpraynaud Jul 8, 2025
f1b7074
feat(dmq): export DMQ servers for publisher and consumer
jpraynaud Jul 8, 2025
1d19768
feat(relay): use binary encoding for exchanging messages in P2P pubsu…
jpraynaud Jul 21, 2025
8b7bffc
fix(dmq): activate Pallas server sides for Unix only
jpraynaud Jul 15, 2025
b28bc67
fix(dmq): add missing 'kes_period' in 'DmqMsg'
jpraynaud Jul 15, 2025
b5a17dc
fix(dmq): gate code behind 'future_dmq' feature
jpraynaud Jul 15, 2025
89684f8
fix(dmq): missing wait for Done message in publisher server state mac…
jpraynaud Jul 22, 2025
371ce7f
fix(relay): fix clippy warning
jpraynaud Jul 16, 2025
a308087
feat(infra): support for DMQ protocol in aggregator infrastructure
jpraynaud Jul 16, 2025
a87a8bd
feat(infra): support for DMQ protocol in signer infrastructure
jpraynaud Jul 16, 2025
2f85b68
feat(ci): support for using DMQ in infrastucture deployment
jpraynaud Jul 16, 2025
126d6cf
fix: rebase from main branch
jpraynaud Aug 8, 2025
950fa54
wip(dmq): temporary fix KES signature
jpraynaud Aug 18, 2025
9774ded
fix(dmq): run receive/serve messages on separate threads
jpraynaud Aug 18, 2025
5b4a6f1
test(dmq): add integration test for publisher client/server
jpraynaud Aug 20, 2025
bb2dff2
test(dmq): add integration test for consumer client/server
jpraynaud Aug 20, 2025
0c3e23f
fix(dmq): consumer can not serve messages multiple times
jpraynaud Aug 20, 2025
bac64bb
wip(e2e): support for fake DMQ node
jpraynaud Jul 8, 2025
81b82a8
wip: DO NOT MERGE - activate 'future_dmq'
jpraynaud Jul 15, 2025
ae39f4c
wip: DO NOT MERGE - activate 'allow_skip_signer_certification'
jpraynaud Jul 16, 2025
657a484
wip: DO NOT MERGE - deactivate signer HTTP signature broadcast
jpraynaud Aug 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ inputs:
description: Mithril use P2P network (experimental, for test only).
required: false
default: "false"
mithril_p2p_use_dmq_protocol:
description: Mithril P2P network use DMQ protocol (experimental, for test only).
required: false
default: "false"
mithril_p2p_network_bootstrap_peer:
description: Mithril P2P network bootstrap peer (experimental, for test only).
required: false
Expand Down Expand Up @@ -247,6 +251,7 @@ runs:
google_compute_instance_ssh_keys_environment = "${{ inputs.google_compute_instance_ssh_keys_environment }}"
google_service_credentials_json_file = "./google-application-credentials.json"
mithril_use_p2p_network = "${{ inputs.mithril_use_p2p_network }}"
mithril_p2p_use_dmq_protocol = "${{ inputs.mithril_p2p_use_dmq_protocol }}"
mithril_p2p_network_bootstrap_peer = "${{ inputs.mithril_p2p_network_bootstrap_peer }}"
mithril_p2p_signer_relay_signer_registration_mode = "${{ inputs.mithril_p2p_signer_relay_signer_registration_mode }}"
mithril_p2p_signer_relay_signature_registration_mode = "${{ inputs.mithril_p2p_signer_relay_signature_registration_mode }}"
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/test-deploy-network.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ jobs:
environment_prefix: dev
cardano_network: preview
mithril_use_p2p_network: true
mithril_p2p_use_dmq_protocol: true
mithril_p2p_signer_relay_signer_registration_mode: passthrough
mithril_p2p_signer_relay_signature_registration_mode: p2p
mithril_api_domain: api.mithril.network
Expand Down Expand Up @@ -72,6 +73,7 @@ jobs:
environment_prefix: dev-follower
cardano_network: preview
mithril_use_p2p_network: true
mithril_p2p_use_dmq_protocol: true
mithril_p2p_network_bootstrap_peer: "/dns4/aggregator.dev-preview.api.mithril.network/tcp/6060"
mithril_p2p_signer_relay_signer_registration_mode: passthrough
mithril_p2p_signer_relay_signature_registration_mode: p2p
Expand Down Expand Up @@ -103,6 +105,7 @@ jobs:
environment_prefix: dev
cardano_network: mainnet
mithril_use_p2p_network: false
mithril_p2p_use_dmq_protocol: true
mithril_api_domain: api.mithril.network
mithril_era_reader_adapter_type: bootstrap
mithril_protocol_parameters: |
Expand Down Expand Up @@ -160,6 +163,7 @@ jobs:
google_compute_instance_ssh_keys_environment: testing
google_application_credentials: ${{ secrets.GOOGLE_APPLICATION_CREDENTIALS }}
mithril_use_p2p_network: ${{ matrix.mithril_use_p2p_network }}
mithril_p2p_use_dmq_protocol: ${{ matrix.mithril_p2p_use_dmq_protocol }}
mithril_p2p_network_bootstrap_peer: ${{ matrix.mithril_p2p_network_bootstrap_peer }}
mithril_api_domain: ${{ matrix.mithril_api_domain }}
mithril_image_id: ${{ inputs.mithril_image_id }}
Expand Down
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions internal/mithril-dmq/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,26 @@ license.workspace = true
repository.workspace = true
include = ["**/*.rs", "Cargo.toml", "README.md", ".gitignore"]

[package.metadata.cargo-machete]
# `serde_bytes` is used for DmqMessage serialization
ignored = ["serde_bytes"]

[lib]
crate-type = ["lib", "cdylib", "staticlib"]

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
bincode = { version = "2.0.1" }
blake2 = "0.10.6"
mithril-cardano-node-chain = { path = "../cardano-node/mithril-cardano-node-chain" }
mithril-common = { path = "../../mithril-common" }
pallas-network = { git = "https://github.com/txpipe/pallas.git", branch = "main" }
pallas-codec = { git = "https://github.com/txpipe/pallas.git", branch = "main" }
serde = { workspace = true }
serde_bytes = "0.11.17"
slog = { workspace = true }
slog-scope = "4.4.0"
tokio = { workspace = true, features = ["sync"] }

[dev-dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use std::fmt::Debug;

use mithril_common::{StdResult, crypto_helper::TryFromBytes, entities::PartyId};

/// Trait for consuming messages from a DMQ node.
/// Trait for the client side of consuming messages from a DMQ node.
#[cfg_attr(test, mockall::automock)]
#[async_trait::async_trait]
pub trait DmqConsumer<M: TryFromBytes + Debug + Send + Sync>: Send + Sync {
pub trait DmqConsumerClient<M: TryFromBytes + Debug + Send + Sync>: Send + Sync {
/// Consume messages from the DMQ node.
async fn consume_messages(&self) -> StdResult<Vec<(M, PartyId)>>;
}
5 changes: 5 additions & 0 deletions internal/mithril-dmq/src/consumer/client/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod interface;
mod pallas;

pub use interface::*;
pub use pallas::*;
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{fmt::Debug, marker::PhantomData, path::PathBuf};

use anyhow::{Context, anyhow};
use pallas_network::facades::DmqClient;
use pallas_network::{facades::DmqClient, miniprotocols::localmsgnotification::State};
use slog::{Logger, debug, error};
use tokio::sync::{Mutex, MutexGuard};

Expand All @@ -12,21 +12,21 @@ use mithril_common::{
logging::LoggerExtensions,
};

use crate::DmqConsumer;
use crate::DmqConsumerClient;

/// A DMQ consumer implementation.
/// A DMQ client consumer implementation.
///
/// This implementation is built upon the n2c mini-protocols DMQ implementation in Pallas.
pub struct DmqConsumerPallas<M: TryFromBytes + Debug> {
pub struct DmqConsumerClientPallas<M: TryFromBytes + Debug> {
socket: PathBuf,
network: CardanoNetwork,
client: Mutex<Option<DmqClient>>,
logger: Logger,
phantom: PhantomData<M>,
}

impl<M: TryFromBytes + Debug> DmqConsumerPallas<M> {
/// Creates a new `DmqConsumerPallas` instance.
impl<M: TryFromBytes + Debug> DmqConsumerClientPallas<M> {
/// Creates a new `DmqConsumerClientPallas` instance.
pub fn new(socket: PathBuf, network: CardanoNetwork, logger: Logger) -> Self {
Self {
socket,
Expand All @@ -47,7 +47,7 @@ impl<M: TryFromBytes + Debug> DmqConsumerPallas<M> {
);
DmqClient::connect(&self.socket, self.network.magic_id())
.await
.with_context(|| "DmqConsumerPallas failed to create a new client")
.with_context(|| "DmqConsumerClientPallas failed to create a new client")
}

/// Gets the cached `DmqClient`, creating a new one if it does not exist.
Expand Down Expand Up @@ -94,21 +94,20 @@ impl<M: TryFromBytes + Debug> DmqConsumerPallas<M> {
debug!(self.logger, "Waiting for messages from DMQ...");
let mut client_guard = self.get_client().await?;
let client = client_guard.as_mut().ok_or(anyhow!("DMQ client does not exist"))?;
client
.msg_notification()
.send_request_messages_blocking()
.await
.with_context(|| "Failed to request notifications from DMQ server: {}")?;
if *client.msg_notification().state() == State::Idle {
client
.msg_notification()
.send_request_messages_blocking()
.await
.with_context(|| "Failed to request notifications from DMQ server: {}")?;
}

let reply = client
.msg_notification()
.recv_next_reply()
.await
.with_context(|| "Failed to receive notifications from DMQ server")?;
debug!(self.logger, "Received single signatures from DMQ"; "messages" => ?reply);
if let Err(e) = client.msg_notification().send_done().await {
error!(self.logger, "Failed to send Done"; "error" => ?e);
}

reply
.0
Expand All @@ -128,7 +127,7 @@ impl<M: TryFromBytes + Debug> DmqConsumerPallas<M> {
}

#[async_trait::async_trait]
impl<M: TryFromBytes + Debug + Sync + Send> DmqConsumer<M> for DmqConsumerPallas<M> {
impl<M: TryFromBytes + Debug + Sync + Send> DmqConsumerClient<M> for DmqConsumerClientPallas<M> {
async fn consume_messages(&self) -> StdResult<Vec<(M, PartyId)>> {
let messages = self.consume_messages_internal().await;
if messages.is_err() {
Expand Down Expand Up @@ -227,10 +226,6 @@ mod tests {
// server replies with messages if any
server_msg.send_reply_messages_blocking(reply_messages).await.unwrap();
assert_eq!(*server_msg.state(), localmsgnotification::State::Idle);

// server receives done from client
server_msg.recv_done().await.unwrap();
assert_eq!(*server_msg.state(), localmsgnotification::State::Done);
} else {
// server waits if no message available
future::pending().await
Expand All @@ -247,7 +242,7 @@ mod tests {
let reply_messages = fake_msgs();
let server = setup_dmq_server(socket_path.clone(), reply_messages);
let client = tokio::spawn(async move {
let consumer = DmqConsumerPallas::new(
let consumer = DmqConsumerClientPallas::new(
socket_path,
CardanoNetwork::TestNet(0),
TestLogger::stdout(),
Expand Down Expand Up @@ -280,7 +275,7 @@ mod tests {
let reply_messages = vec![];
let server = setup_dmq_server(socket_path.clone(), reply_messages);
let client = tokio::spawn(async move {
let consumer = DmqConsumerPallas::<DmqMessageTestPayload>::new(
let consumer = DmqConsumerClientPallas::<DmqMessageTestPayload>::new(
socket_path,
CardanoNetwork::TestNet(0),
TestLogger::stdout(),
Expand All @@ -304,7 +299,7 @@ mod tests {
let reply_messages = fake_msgs();
let server = setup_dmq_server(socket_path.clone(), reply_messages);
let client = tokio::spawn(async move {
let consumer = DmqConsumerPallas::<DmqMessageTestPayload>::new(
let consumer = DmqConsumerClientPallas::<DmqMessageTestPayload>::new(
socket_path,
CardanoNetwork::TestNet(0),
TestLogger::stdout(),
Expand Down
8 changes: 4 additions & 4 deletions internal/mithril-dmq/src/consumer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mod interface;
mod pallas;
mod client;
mod server;

pub use interface::*;
pub use pallas::*;
pub use client::*;
pub use server::*;
12 changes: 12 additions & 0 deletions internal/mithril-dmq/src/consumer/server/interface.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use mithril_common::StdResult;

/// Trait for the server side of consuming messages from a DMQ node.
#[cfg_attr(test, mockall::automock)]
#[async_trait::async_trait]
pub trait DmqConsumerServer: Send + Sync {
/// Processes the next message received from the DMQ network.
async fn process_message(&self) -> StdResult<()>;

/// Runs the DMQ publisher server.
async fn run(&self) -> StdResult<()>;
}
9 changes: 9 additions & 0 deletions internal/mithril-dmq/src/consumer/server/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
mod interface;
#[cfg(unix)]
mod pallas;
mod queue;

pub use interface::*;

#[cfg(unix)]
pub use pallas::*;
Loading
Loading