From fd31578a8785863e9d60d9b980aa3a3ed4ac79d6 Mon Sep 17 00:00:00 2001 From: Bulat Aikaev Date: Sun, 20 Jul 2025 17:55:47 -0600 Subject: [PATCH 1/2] fix(replication): calculate LogPos for non-artificial events in MariaDB 11.4+ --- replication/binlogsyncer.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 6bdd88faa..ecacf82d2 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -861,6 +861,16 @@ func (b *BinlogSyncer) handleEventAndACK(s *BinlogStreamer, e *BinlogEvent, need if e.Header.LogPos > 0 { // Some events like FormatDescriptionEvent return 0, ignore. b.nextPos.Pos = e.Header.LogPos + } else if b.cfg.Flavor == mysql.MariaDBFlavor && (e.Header.Flags&LOG_EVENT_ARTIFICIAL_F) == 0 && e.Header.LogPos == 0 { + // For MariaDB 11.4+, some events may have LogPos=0 but are still valid position updates + // if they are not artificial events (marked with LOG_EVENT_ARTIFICIAL_F flag). + // For such events, we calculate the next position dynamically. + calculatedPos := b.nextPos.Pos + e.Header.EventSize + e.Header.LogPos = calculatedPos + b.nextPos.Pos = calculatedPos + b.cfg.Logger.Debug("MariaDB event with LogPos=0 but not artificial, calculated position", + slog.String("eventType", e.Header.EventType.String()), + slog.Uint64("logPos", uint64(calculatedPos))) } // Handle event types to update positions and GTID sets From d9e2c9b6633353a56c7ee08071d56e01f75a0309 Mon Sep 17 00:00:00 2001 From: Bulat Aikaev Date: Thu, 24 Jul 2025 17:52:46 -0600 Subject: [PATCH 2/2] fix(replication): add configuration parameter to enable calculation of LogPos for non-artificial events in MariaDB 11.4+ --- README.md | 28 +++++++++++++++++++++++++++- replication/binlogsyncer.go | 35 +++++++++++++++++++++++++++++------ 2 files changed, 56 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 017e2ecb8..fee3c2850 100644 --- a/README.md +++ b/README.md @@ -133,7 +133,33 @@ Schema: test Query: DROP TABLE IF EXISTS `test_replication` /* generated by server */ ``` -## Canal +### MariaDB 11.4+ compatibility + +MariaDB 11.4+ introduced an optimization where events written through transaction or statement cache have `LogPos=0` so they can be copied directly to the binlog without computing the real end position. This optimization improves performance but makes position tracking unreliable for replication clients that need to track LogPos of events inside transactions. + +To address this, a `MariaDBDynamicLogPos` configuration option is available: + +```go +cfg := replication.BinlogSyncerConfig { + ServerID: 100, + Flavor: "mariadb", + Host: "127.0.0.1", + Port: 3306, + User: "root", + Password: "", + // Enable dynamic LogPos calculation for MariaDB 11.4+ + MariaDBDynamicLogPos: true, +} +``` + +**Behavior:** +- When `MariaDBDynamicLogPos` is `true` and flavor is `mariadb`, the library automatically: + - Adds `BINLOG_SEND_ANNOTATE_ROWS_EVENT` flag to binlog dump commands. This ensures correct position tracking by making the server send `ANNOTATE_ROWS_EVENT` events which are needed for accurate position calculation. + - Calculates LogPos dynamically for events with `LogPos=0` that are not artificial. +- Only works with MariaDB flavor; has no effect with MySQL. +- Should be set to `true` if tracking of LogPos inside transactions is required. + +## Canal Canal is a package that can sync your MySQL into everywhere, like Redis, Elasticsearch. diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index ecacf82d2..e306895e5 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -128,6 +128,12 @@ type BinlogSyncerConfig struct { EventCacheCount int + // MariaDBDynamicLogPos enables dynamic LogPos calculation for MariaDB. + // When enabled, automatically adds BINLOG_SEND_ANNOTATE_ROWS_EVENT flag + // to ensure correct position calculation in MariaDB 11.4+. + // Only works with MariaDB flavor. + MariaDBDynamicLogPos bool + // SynchronousEventHandler is used for synchronous event handling. // This should not be used together with StartBackupWithHandler. // If this is not nil, GetEvent does not need to be called. @@ -509,7 +515,14 @@ func (b *BinlogSyncer) writeBinlogDumpCommand(p mysql.Position) error { binary.LittleEndian.PutUint32(data[pos:], p.Pos) pos += 4 - binary.LittleEndian.PutUint16(data[pos:], b.cfg.DumpCommandFlag) + dumpCommandFlag := b.cfg.DumpCommandFlag + if b.cfg.MariaDBDynamicLogPos && b.cfg.Flavor == mysql.MariaDBFlavor { + // Add BINLOG_SEND_ANNOTATE_ROWS_EVENT flag when MariaDBDynamicLogPos is enabled. + // This ensures the server sends ANNOTATE_ROWS_EVENT events which are needed + // for correct LogPos calculation in MariaDB 11.4+, where some events have LogPos=0. + dumpCommandFlag |= BINLOG_SEND_ANNOTATE_ROWS_EVENT + } + binary.LittleEndian.PutUint16(data[pos:], dumpCommandFlag) pos += 2 binary.LittleEndian.PutUint32(data[pos:], b.cfg.ServerID) @@ -861,14 +874,11 @@ func (b *BinlogSyncer) handleEventAndACK(s *BinlogStreamer, e *BinlogEvent, need if e.Header.LogPos > 0 { // Some events like FormatDescriptionEvent return 0, ignore. b.nextPos.Pos = e.Header.LogPos - } else if b.cfg.Flavor == mysql.MariaDBFlavor && (e.Header.Flags&LOG_EVENT_ARTIFICIAL_F) == 0 && e.Header.LogPos == 0 { - // For MariaDB 11.4+, some events may have LogPos=0 but are still valid position updates - // if they are not artificial events (marked with LOG_EVENT_ARTIFICIAL_F flag). - // For such events, we calculate the next position dynamically. + } else if b.shouldCalculateDynamicLogPos(e) { calculatedPos := b.nextPos.Pos + e.Header.EventSize e.Header.LogPos = calculatedPos b.nextPos.Pos = calculatedPos - b.cfg.Logger.Debug("MariaDB event with LogPos=0 but not artificial, calculated position", + b.cfg.Logger.Debug("MariaDB dynamic LogPos calculation", slog.String("eventType", e.Header.EventType.String()), slog.Uint64("logPos", uint64(calculatedPos))) } @@ -954,6 +964,19 @@ func (b *BinlogSyncer) handleEventAndACK(s *BinlogStreamer, e *BinlogEvent, need return nil } +// shouldCalculateDynamicLogPos determines if we should calculate LogPos dynamically for MariaDB events. +// This is needed for MariaDB 11.4+ when: +// 1. MariaDBDynamicLogPos is enabled +// 2. We're using MariaDB flavor +// 3. The event has LogPos=0 (indicating server didn't set it) +// 4. The event is not artificial (not marked with LOG_EVENT_ARTIFICIAL_F flag) +func (b *BinlogSyncer) shouldCalculateDynamicLogPos(e *BinlogEvent) bool { + return b.cfg.MariaDBDynamicLogPos && + b.cfg.Flavor == mysql.MariaDBFlavor && + e.Header.LogPos == 0 && + (e.Header.Flags&LOG_EVENT_ARTIFICIAL_F) == 0 +} + // getCurrentGtidSet returns a clone of the current GTID set. func (b *BinlogSyncer) getCurrentGtidSet() mysql.GTIDSet { if b.currGset != nil {