Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions canal/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,8 @@ func (c *Canal) prepareSyncer() error {
Dialer: c.cfg.Dialer,
Localhost: c.cfg.Localhost,
EventCacheCount: c.cfg.EventCacheCount,
FillZeroLogPos: c.cfg.FillZeroLogPos,

RowsEventDecodeFunc: func(event *replication.RowsEvent, data []byte) error {
pos, err := event.DecodeHeader(data)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions canal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ type Config struct {
// the default value is 10240.
// if you table contain large columns, you can decrease this value to avoid OOM.
EventCacheCount int

// FillZeroLogPos enables dynamic LogPos calculation for MariaDB.
// When enabled, automatically adds BINLOG_SEND_ANNOTATE_ROWS_EVENT flag
// to ensure correct position calculation in MariaDB 11.4+.
// Only works with MariaDB flavor.
FillZeroLogPos bool
}

func NewConfigWithFile(name string) (*Config, error) {
Expand Down
12 changes: 6 additions & 6 deletions replication/binlogsyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,11 @@ type BinlogSyncerConfig struct {

EventCacheCount int

// MariaDBDynamicLogPos enables dynamic LogPos calculation for MariaDB.
// FillZeroLogPos enables dynamic LogPos calculation for MariaDB.
// When enabled, automatically adds BINLOG_SEND_ANNOTATE_ROWS_EVENT flag
// to ensure correct position calculation in MariaDB 11.4+.
// Only works with MariaDB flavor.
MariaDBDynamicLogPos bool
FillZeroLogPos bool

// SynchronousEventHandler is used for synchronous event handling.
// This should not be used together with StartBackupWithHandler.
Expand Down Expand Up @@ -516,8 +516,8 @@ func (b *BinlogSyncer) writeBinlogDumpCommand(p mysql.Position) error {
pos += 4

dumpCommandFlag := b.cfg.DumpCommandFlag
if b.cfg.MariaDBDynamicLogPos && b.cfg.Flavor == mysql.MariaDBFlavor {
// Add BINLOG_SEND_ANNOTATE_ROWS_EVENT flag when MariaDBDynamicLogPos is enabled.
if b.cfg.FillZeroLogPos && b.cfg.Flavor == mysql.MariaDBFlavor {
// Add BINLOG_SEND_ANNOTATE_ROWS_EVENT flag when FillZeroLogPos is enabled.
// This ensures the server sends ANNOTATE_ROWS_EVENT events which are needed
// for correct LogPos calculation in MariaDB 11.4+, where some events have LogPos=0.
dumpCommandFlag |= BINLOG_SEND_ANNOTATE_ROWS_EVENT
Expand Down Expand Up @@ -966,12 +966,12 @@ func (b *BinlogSyncer) handleEventAndACK(s *BinlogStreamer, e *BinlogEvent, need

// shouldCalculateDynamicLogPos determines if we should calculate LogPos dynamically for MariaDB events.
// This is needed for MariaDB 11.4+ when:
// 1. MariaDBDynamicLogPos is enabled
// 1. FillZeroLogPos is enabled
// 2. We're using MariaDB flavor
// 3. The event has LogPos=0 (indicating server didn't set it)
// 4. The event is not artificial (not marked with LOG_EVENT_ARTIFICIAL_F flag)
func (b *BinlogSyncer) shouldCalculateDynamicLogPos(e *BinlogEvent) bool {
return b.cfg.MariaDBDynamicLogPos &&
return b.cfg.FillZeroLogPos &&
b.cfg.Flavor == mysql.MariaDBFlavor &&
e.Header.LogPos == 0 &&
(e.Header.Flags&LOG_EVENT_ARTIFICIAL_F) == 0
Expand Down
Loading