Skip to content

Conversation

@maxim-inj
Copy link
Collaborator

@maxim-inj maxim-inj commented Oct 13, 2025

Summary by CodeRabbit

Release Notes

  • New Features
    • Added event publishing framework enabling events to be published during block finalization and transaction execution.
    • Message queue publish configuration now available in streaming settings with seed brokers, topic name, and control port options.
    • Enhanced event tracking with improved ordering and separation between standard and published events throughout the block lifecycle.

* chore: apply pipeline patch

Signed-off-by: 0x0f0d0 <[email protected]>

* Fix failing CI tests

Signed-off-by: Nikola Marcetic <[email protected]>

---------

Signed-off-by: 0x0f0d0 <[email protected]>
Signed-off-by: Nikola Marcetic <[email protected]>
Co-authored-by: 0x0f0d0 <[email protected]>
@coderabbitai
Copy link

coderabbitai bot commented Oct 13, 2025

Walkthrough

Introduces publish event infrastructure throughout the Cosmos SDK by adding event publishing types (EventSet, PublishEventFlush), integrating PublishEventManager into Context and BaseApp, extending executeTxs to return per-transaction EventSet alongside results, and adding server configuration for message queue publishing. Test utilities enable custom event emission during testing.

Changes

Cohort / File(s) Summary
ABCI execution and event propagation
baseapp/abci.go
Modified executeTxs to return ([]*abci.ExecTxResult, []EventSet, error) instead of ([]*abci.ExecTxResult, error). Added per-transaction EventSet collection and publish event filtering via filterOutPublishEvents helper. Integrated PublishEventManager initialization and event accumulation throughout block finalization and commit workflows.
ABCI tests
baseapp/abci_test.go, baseapp/streaming_test.go
Updated CounterServerImpl struct literals to include new boolean field (false) in server registration calls across multiple tests (TestABCI_CheckTx, TestABCI_FinalizeBlock_DeliverTx, TestABCI_FinalizeBlock_MultiMsg, TestABCI_MultiListener_StateChanges).
BaseApp infrastructure
baseapp/baseapp.go, baseapp/baseapp_test.go
Added public fields to BaseApp: EnablePublish bool, PublishEvents chan PublishEventFlush, and flushData PublishEventFlush. Initialized PublishEvents channel in NewBaseApp. Updated test struct literals to match new CounterServerImpl signature with additional boolean field.
Publish event types and API
baseapp/publish_event.go
Introduced EventType enum (EventTypeAbci, EventTypePublish), EventSet struct (carrying AbciEvents, PublishEvents, and TrueOrder), PublishEventFlush struct (holding Height, app hash fields, block/tx events), and PublishBlockEvents method to route flush payloads to PublishEvents channel.
Publish event tests
baseapp/publish_event_test.go
Added comprehensive test coverage for publish-event behavior, including TestPublishEvent_FinalizeBlock_WithBeginAndEndBlocker and TestPublishEvent_FinalizeBlock_DeliverTx. Introduced StringPublishEvent test type implementing PublishEvent interface with ToString() and Serialize() methods.
Test utilities
baseapp/utils_test.go
Extended CounterServerImpl and Counter2ServerImpl with emitCustomEvent flag. Added TestPublishEvent type implementing sdk.PublishEvent interface with Serialize() and ToString() methods. Enhanced incrementCounter logic to optionally emit TestPublishEvent via PublishEventManager, and added anteHandlerTxTestWithCustomEventEmit helper.
Server configuration
server/config/config.go, server/config/config_test.go, server/config/toml.go
Added new MQPubConfig struct (Enabled, SeedBrokers, TopicName, ControlPort) and extended StreamingConfig with MQPub field. Updated DefaultConfig initialization and TOML template to include streaming.mqpub configuration block.
Context event management
types/context.go
Added publishEventManager field to Context with accessor PublishEventManager() and setter WithPublishEventManager(). Updated NewContext and CacheContext to initialize and propagate PublishEventManager. Modified writeCache to emit events from both EventManager and PublishEventManager.
Publish event interfaces
types/publish_events.go
Introduced PublishEventManagerI interface (Events(), EmitEvent(), EmitEvents()), PublishEventManager concrete implementation with simple in-memory storage, PublishEvent interface (Serialize()), PublishEvents type alias, and EmptyPublishEvents() helper.
Event placeholder bridge
types/events_placeholder.go
Added EventPlaceholderManager implementing both EventManagerI and PublishEventManagerI, bridging publish events to standard ABCI event manager via placeholder events. Includes PlaceholderEventType constant and forwarding logic for EmitEvent/EmitEvents.

Sequence Diagram

sequenceDiagram
    participant App as BaseApp
    participant Ctx as Context
    participant PEM as PublishEventManager
    participant Finalize as FinalizeBlock
    participant Commit as Commit
    participant Channel as PublishEvents

    App->>Ctx: WithPublishEventManager()
    Ctx->>PEM: NewPublishEventManager()
    
    App->>Finalize: Execute with pub events collection
    Finalize->>Ctx: EmitEvent (ABCI)
    Ctx->>PEM: EmitEvent/EmitEvents
    Finalize->>Finalize: filterOutPublishEvents()<br/>(separate ABCI from publish)
    Finalize->>App: Return EventSet[]{AbciEvents, PublishEvents, TrueOrder}
    
    App->>App: Aggregate EventSet from block + txs
    App->>App: Create PublishEventFlush<br/>{Height, AppHash, BlockEvents, TxEvents}
    
    App->>Commit: Execute state commit
    Commit->>App: Return commit hash
    
    App->>App: PublishBlockEvents(flush)
    App->>Channel: Send flush to PublishEvents channel
    Channel-->>Channel: (async emission)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

  • Key areas requiring attention:
    • Signature change to executeTxs and event collection logic in baseapp/abci.go—verify all callers updated and EventSet construction is correct
    • PublishEventManager lifecycle and initialization throughout Context and BaseApp—ensure proper propagation through CacheContext and transaction boundaries
    • Event filtering logic in filterOutPublishEvents helper—validate that ABCI and publish events are correctly separated and ordered
    • Test coverage adequacy, particularly around per-transaction EventSet ordering and consistency in baseapp/publish_event_test.go
    • Integration of PublishBlockEvents channel communication—confirm non-blocking behavior and proper handling in deployment scenarios
    • CounterServerImpl struct additions across multiple tests—ensure all test callsites properly initialized with new boolean field

Possibly related PRs

Poem

🐰 Events now flow with purpose bright,
Through Publish channels into the night,
Each block and tx in ordered dance,
As placeholders give events their chance.
The BaseApp hops with grace anew,
Publishing what transactions do! 🌟

Pre-merge checks and finishing touches

❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 18.18% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
Title check ❓ Inconclusive The title 'feat: datapipe' is vague and does not clearly describe the actual changes in the pull request, which involve introducing comprehensive publish event handling across BaseApp's block finalization and transaction execution flows. Use a more descriptive title that captures the main feature being added, such as 'feat: add publish event handling to block finalization' or 'feat: integrate PublishEventManager throughout transaction/block lifecycle'.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch v0.50.x-datapipe-inj

Comment @coderabbitai help to get the list of available commands and usage tips.

Comment on lines 711 to 717
// must be used.
func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.FinalizeBlockRequest) (*abci.FinalizeBlockResponse, error) {
var events []abci.Event
var publishEvents sdk.PublishEvents

if err := app.checkHalt(req.Height, req.Time); err != nil {
return nil, err

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change potentially affects state.

Call sequence:

(*github.com/cosmos/cosmos-sdk/baseapp.BaseApp).internalFinalizeBlock (baseapp/abci.go:712)
(*github.com/cosmos/cosmos-sdk/baseapp.BaseApp).FinalizeBlock (baseapp/abci.go:893)

@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the Stale label Nov 13, 2025
@maxim-inj maxim-inj marked this pull request as ready for review November 13, 2025 01:40
@github-actions
Copy link

@maxim-inj your pull request is missing a changelog!

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (6)
server/config/toml.go (1)

237-251: LGTM! Consider minor comment refinements.

The MQPub configuration block is correctly structured and the conditional rendering for seed-brokers properly handles both empty and populated arrays.

For improved clarity, consider these optional comment refinements:

-# topic-name defines the to be connected when message gets published.
+# topic-name defines the topic to use when publishing messages.
 topic-name = "{{ .Streaming.MQPub.TopicName }}"
-# control-port defines the server port for mq control
+# control-port defines the server port for MQ control.
 control-port = {{ .Streaming.MQPub.ControlPort }}
server/config/config.go (1)

273-278: Good default values; consider adding validation.

The default initialization is sensible (disabled by default with empty configuration). However, consider adding validation to ValidateBasic() to ensure that when Enabled is true, required fields like SeedBrokers and TopicName are properly configured.

Consider adding validation in the ValidateBasic() method:

func (c Config) ValidateBasic() error {
	if c.BaseConfig.MinGasPrices == "" {
		return sdkerrors.ErrAppConfig.Wrap("set min gas price in app.toml or flag or env variable")
	}
	if c.Pruning == pruningtypes.PruningOptionEverything && c.StateSync.SnapshotInterval > 0 {
		return sdkerrors.ErrAppConfig.Wrapf(
			"cannot enable state sync snapshots with '%s' pruning setting", pruningtypes.PruningOptionEverything,
		)
	}
+	if c.Streaming.MQPub.Enabled {
+		if len(c.Streaming.MQPub.SeedBrokers) == 0 {
+			return sdkerrors.ErrAppConfig.Wrap("streaming.mqpub.seed-brokers cannot be empty when mqpub is enabled")
+		}
+		if c.Streaming.MQPub.TopicName == "" {
+			return sdkerrors.ErrAppConfig.Wrap("streaming.mqpub.topic-name cannot be empty when mqpub is enabled")
+		}
+	}

	return nil
}
baseapp/publish_event_test.go (1)

197-197: Remove debug print from test

Line 197 still has a fmt.Println that just dumps events during every test run. It adds noisy stdout without affecting assertions—please drop it.

-		fmt.Println(pevts.TxEvents)
baseapp/abci.go (3)

828-829: Document the PublishEventManager reset lifecycle.

The PublishEventManager is reset multiple times during block finalization (here after tx execution, and at line 875 for each transaction). While this appears intentional to separate event collection phases (beginBlock → per-tx → endBlock), consider adding a comment explaining this lifecycle to aid future maintainers.


870-870: Pre-allocate txEventSet capacity for better performance.

Similar to txResults on line 869, consider pre-allocating txEventSet with the known capacity to avoid slice reallocations.

Apply this diff:

-	txEventSet := make([]EventSet, 0)
+	txEventSet := make([]EventSet, 0, len(txs))

911-925: Consider pre-allocating slices in filterOutPublishEvents for better performance.

The function could benefit from pre-allocating the result slices with the known input length to reduce memory allocations during appends.

Apply this diff:

 func filterOutPublishEvents(events []abci.Event) ([]abci.Event, []EventType) {
-	var filteredEvents []abci.Event
-	var trueOrder []EventType
+	filteredEvents := make([]abci.Event, 0, len(events))
+	trueOrder := make([]EventType, 0, len(events))
 
 	for _, e := range events {
 		if e.Type == sdk.PlaceholderEventType {
 			trueOrder = append(trueOrder, EventTypePublish)
 			continue
 		}
 		filteredEvents = append(filteredEvents, e)
 		trueOrder = append(trueOrder, EventTypeAbci)
 	}
 
 	return filteredEvents, trueOrder
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a04d582 and b4e50a7.

📒 Files selected for processing (14)
  • baseapp/abci.go (8 hunks)
  • baseapp/abci_test.go (3 hunks)
  • baseapp/baseapp.go (2 hunks)
  • baseapp/baseapp_test.go (3 hunks)
  • baseapp/publish_event.go (1 hunks)
  • baseapp/publish_event_test.go (1 hunks)
  • baseapp/streaming_test.go (1 hunks)
  • baseapp/utils_test.go (5 hunks)
  • server/config/config.go (2 hunks)
  • server/config/config_test.go (2 hunks)
  • server/config/toml.go (1 hunks)
  • types/context.go (5 hunks)
  • types/events_placeholder.go (1 hunks)
  • types/publish_events.go (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (11)
baseapp/baseapp.go (2)
types/publish_events.go (1)
  • PublishEvents (35-35)
baseapp/publish_event.go (1)
  • PublishEventFlush (21-27)
types/context.go (3)
types/publish_events.go (3)
  • PublishEventManagerI (3-7)
  • PublishEventManager (13-15)
  • NewPublishEventManager (17-19)
types/events_placeholder.go (1)
  • EventPlaceholderManager (7-10)
types/events.go (3)
  • NewEventManager (39-41)
  • EventManager (35-37)
  • Events (167-167)
types/events_placeholder.go (2)
types/publish_events.go (3)
  • PublishEventManagerI (3-7)
  • PublishEvents (35-35)
  • PublishEvent (31-33)
types/events.go (2)
  • EventManagerI (18-25)
  • NewEvent (172-180)
server/config/config_test.go (1)
server/config/config.go (1)
  • MQPubConfig (186-191)
baseapp/utils_test.go (2)
types/publish_events.go (2)
  • PublishEvent (31-33)
  • PublishEventManager (13-15)
types/context.go (1)
  • Context (41-74)
baseapp/baseapp_test.go (2)
baseapp/testutil/messages.pb.go (1)
  • RegisterCounterServer (369-371)
baseapp/utils_test.go (1)
  • CounterServerImpl (171-177)
baseapp/publish_event.go (2)
types/publish_events.go (1)
  • PublishEvents (35-35)
baseapp/baseapp.go (1)
  • BaseApp (67-213)
baseapp/abci_test.go (2)
baseapp/testutil/messages.pb.go (2)
  • RegisterCounterServer (369-371)
  • RegisterCounter2Server (441-443)
baseapp/utils_test.go (2)
  • CounterServerImpl (171-177)
  • Counter2ServerImpl (183-189)
baseapp/streaming_test.go (2)
baseapp/testutil/messages.pb.go (1)
  • RegisterCounterServer (369-371)
baseapp/utils_test.go (1)
  • CounterServerImpl (171-177)
baseapp/publish_event_test.go (8)
types/publish_events.go (3)
  • PublishEvent (31-33)
  • PublishEvents (35-35)
  • PublishEventManager (13-15)
baseapp/baseapp.go (2)
  • BaseApp (67-213)
  • NewBaseApp (218-270)
baseapp/publish_event.go (4)
  • PublishEventFlush (21-27)
  • EventType (8-8)
  • EventTypeAbci (11-11)
  • EventTypePublish (12-12)
types/context.go (1)
  • Context (41-74)
types/events.go (3)
  • EventManager (35-37)
  • Event (164-164)
  • MarkEventsToIndex (318-343)
baseapp/baseapp_test.go (1)
  • NewBaseAppSuite (64-92)
baseapp/testutil/messages.pb.go (1)
  • RegisterCounterServer (369-371)
baseapp/utils_test.go (1)
  • CounterServerImpl (171-177)
baseapp/abci.go (4)
types/publish_events.go (3)
  • PublishEvents (35-35)
  • PublishEventManager (13-15)
  • NewPublishEventManager (17-19)
types/context.go (1)
  • Context (41-74)
baseapp/publish_event.go (5)
  • PublishEventFlush (21-27)
  • EventSet (15-19)
  • EventType (8-8)
  • EventTypePublish (12-12)
  • EventTypeAbci (11-11)
types/events_placeholder.go (1)
  • PlaceholderEventType (3-3)
🔇 Additional comments (4)
server/config/config_test.go (1)

55-60: Excellent test coverage for MQPub configuration.

The test properly verifies:

  • Initialization with default values
  • TOML rendering of all four configuration fields
  • Round-trip serialization through Viper

Also applies to: 77-80

server/config/config.go (1)

176-176: Well-structured configuration types.

The MQPubConfig struct and its integration into StreamingConfig are correctly defined with appropriate field types and mapstructure tags.

Also applies to: 184-191

baseapp/abci.go (2)

1014-1017: Event publishing integration looks correct.

The sequence of committing the store, populating the final app hash, and publishing events is properly ordered. The PublishBlockEvents call will flush all accumulated block and transaction events through the publish channel.


849-858: Remove thread safety concern—ABCI methods are serialized.

The app.flushData field is assigned once in FinalizeBlock (line 849) and read in Commit (lines 1016–1017). ABCI guarantees that FinalizeBlock and Commit are called sequentially by the consensus engine, so no concurrent access occurs. The struct is passed by value to the channel, ensuring atomic copying at send time.

Likely an incorrect or invalid review comment.

Comment on lines +226 to +230
if emitCustomEvent {
sdkCtx.PublishEventManager().EmitEvent(
NewTestPublishEvent("message", map[string]string{"counter": fmt.Sprintf("%d", msgCount)}),
)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid nil PublishEventManager panic

When emitCustomEvent is true but the context doesn’t have a PublishEventManager wired in (the default case before enabling publish), this will dereference nil and panic. The test helper shouldn’t assume the manager is always present.

Please guard the lookup before emitting:

-   if emitCustomEvent {
-       sdkCtx.PublishEventManager().EmitEvent(
-           NewTestPublishEvent("message", map[string]string{"counter": fmt.Sprintf("%d", msgCount)}),
-       )
-   }
+   if emitCustomEvent {
+       if pem := sdkCtx.PublishEventManager(); pem != nil {
+           pem.EmitEvent(NewTestPublishEvent("message", map[string]string{"counter": fmt.Sprintf("%d", msgCount)}))
+       }
+   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if emitCustomEvent {
sdkCtx.PublishEventManager().EmitEvent(
NewTestPublishEvent("message", map[string]string{"counter": fmt.Sprintf("%d", msgCount)}),
)
}
if emitCustomEvent {
if pem := sdkCtx.PublishEventManager(); pem != nil {
pem.EmitEvent(NewTestPublishEvent("message", map[string]string{"counter": fmt.Sprintf("%d", msgCount)}))
}
}
🤖 Prompt for AI Agents
In baseapp/utils_test.go around lines 226 to 230, the test helper assumes
sdkCtx.PublishEventManager() is non-nil and will panic when emitCustomEvent is
true but no PublishEventManager is wired; guard the lookup by retrieving the
manager into a variable, check it for nil, and only call EmitEvent when the
manager is non-nil (or return/skip emitting if nil); update the code to perform
this nil-check before emitting to avoid dereference panics in tests.

Comment on lines +271 to +275
if emitCustomEvent {
ctx.PublishEventManager().EmitEvent(
NewTestPublishEvent("ante_handler", map[string]string{"counter": fmt.Sprintf("%d", counter)}),
)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Mirror nil guard for ante handler events

Same issue here: if the publish manager isn’t configured this helper will panic. Please mirror the guard to keep tests robust.

-       if emitCustomEvent {
-           ctx.PublishEventManager().EmitEvent(
-               NewTestPublishEvent("ante_handler", map[string]string{"counter": fmt.Sprintf("%d", counter)}),
-           )
-       }
+       if emitCustomEvent {
+           if pem := ctx.PublishEventManager(); pem != nil {
+               pem.EmitEvent(NewTestPublishEvent("ante_handler", map[string]string{"counter": fmt.Sprintf("%d", counter)}))
+           }
+       }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if emitCustomEvent {
ctx.PublishEventManager().EmitEvent(
NewTestPublishEvent("ante_handler", map[string]string{"counter": fmt.Sprintf("%d", counter)}),
)
}
if emitCustomEvent {
if pem := ctx.PublishEventManager(); pem != nil {
pem.EmitEvent(NewTestPublishEvent("ante_handler", map[string]string{"counter": fmt.Sprintf("%d", counter)}))
}
}
🤖 Prompt for AI Agents
In baseapp/utils_test.go around lines 271 to 275, the test helper calls
ctx.PublishEventManager().EmitEvent(...) without checking for a nil
PublishEventManager, which can cause a panic when the publish manager isn’t
configured; update the helper to mirror the existing nil guard used elsewhere by
checking that ctx.PublishEventManager() != nil before calling EmitEvent and only
emit the event when the manager is non-nil (i.e., wrap the EmitEvent call in a
conditional guard).

Comment on lines +311 to +314
func (c Context) WithPublishEventManager(sem PublishEventManagerI) Context {
c.publishEventManager = sem
return c
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Prevent nil PublishEventManager panics

Line 312 currently accepts whatever the caller passes. If someone clears the manager with ctx.WithPublishEventManager(nil) (or hands in a nil custom impl), PublishEventManager() still returns an EventPlaceholderManager whose publishEventManager field is nil. The next ctx.PublishEventManager().EmitEvent(...) call dereferences that nil and panics, and CacheContext will also panic when it unwraps and calls EmitEvents. Please defend against nil input—either reject it up front or replace it with a fresh manager—so that this public API can’t brick the context at runtime.

 func (c Context) WithPublishEventManager(sem PublishEventManagerI) Context {
-	c.publishEventManager = sem
+	if sem == nil {
+		sem = NewPublishEventManager()
+	}
+	c.publishEventManager = sem
 	return c
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (c Context) WithPublishEventManager(sem PublishEventManagerI) Context {
c.publishEventManager = sem
return c
}
func (c Context) WithPublishEventManager(sem PublishEventManagerI) Context {
if sem == nil {
sem = NewPublishEventManager()
}
c.publishEventManager = sem
return c
}
🤖 Prompt for AI Agents
In types/context.go around lines 311–314, the WithPublishEventManager method
currently assigns whatever is passed, allowing callers to set a nil manager
which later causes panics when EmitEvent is called; change the method to
defensively handle a nil argument by replacing a nil input with a fresh default
PublishEventManager instance (e.g., create a new EventPlaceholderManager or call
a NewPublishEventManager() factory) before assigning to c.publishEventManager
and returning c so the public API never stores a nil manager.

@github-actions github-actions bot removed the Stale label Nov 14, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants