diff --git a/README.md b/README.md index 017e2ecb8..fee3c2850 100644 --- a/README.md +++ b/README.md @@ -133,7 +133,33 @@ Schema: test Query: DROP TABLE IF EXISTS `test_replication` /* generated by server */ ``` -## Canal +### MariaDB 11.4+ compatibility + +MariaDB 11.4+ introduced an optimization where events written through transaction or statement cache have `LogPos=0` so they can be copied directly to the binlog without computing the real end position. This optimization improves performance but makes position tracking unreliable for replication clients that need to track LogPos of events inside transactions. + +To address this, a `MariaDBDynamicLogPos` configuration option is available: + +```go +cfg := replication.BinlogSyncerConfig { + ServerID: 100, + Flavor: "mariadb", + Host: "127.0.0.1", + Port: 3306, + User: "root", + Password: "", + // Enable dynamic LogPos calculation for MariaDB 11.4+ + MariaDBDynamicLogPos: true, +} +``` + +**Behavior:** +- When `MariaDBDynamicLogPos` is `true` and flavor is `mariadb`, the library automatically: + - Adds `BINLOG_SEND_ANNOTATE_ROWS_EVENT` flag to binlog dump commands. This ensures correct position tracking by making the server send `ANNOTATE_ROWS_EVENT` events which are needed for accurate position calculation. + - Calculates LogPos dynamically for events with `LogPos=0` that are not artificial. +- Only works with MariaDB flavor; has no effect with MySQL. +- Should be set to `true` if tracking of LogPos inside transactions is required. + +## Canal Canal is a package that can sync your MySQL into everywhere, like Redis, Elasticsearch. diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 6bdd88faa..e306895e5 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -128,6 +128,12 @@ type BinlogSyncerConfig struct { EventCacheCount int + // MariaDBDynamicLogPos enables dynamic LogPos calculation for MariaDB. + // When enabled, automatically adds BINLOG_SEND_ANNOTATE_ROWS_EVENT flag + // to ensure correct position calculation in MariaDB 11.4+. + // Only works with MariaDB flavor. + MariaDBDynamicLogPos bool + // SynchronousEventHandler is used for synchronous event handling. // This should not be used together with StartBackupWithHandler. // If this is not nil, GetEvent does not need to be called. @@ -509,7 +515,14 @@ func (b *BinlogSyncer) writeBinlogDumpCommand(p mysql.Position) error { binary.LittleEndian.PutUint32(data[pos:], p.Pos) pos += 4 - binary.LittleEndian.PutUint16(data[pos:], b.cfg.DumpCommandFlag) + dumpCommandFlag := b.cfg.DumpCommandFlag + if b.cfg.MariaDBDynamicLogPos && b.cfg.Flavor == mysql.MariaDBFlavor { + // Add BINLOG_SEND_ANNOTATE_ROWS_EVENT flag when MariaDBDynamicLogPos is enabled. + // This ensures the server sends ANNOTATE_ROWS_EVENT events which are needed + // for correct LogPos calculation in MariaDB 11.4+, where some events have LogPos=0. + dumpCommandFlag |= BINLOG_SEND_ANNOTATE_ROWS_EVENT + } + binary.LittleEndian.PutUint16(data[pos:], dumpCommandFlag) pos += 2 binary.LittleEndian.PutUint32(data[pos:], b.cfg.ServerID) @@ -861,6 +874,13 @@ func (b *BinlogSyncer) handleEventAndACK(s *BinlogStreamer, e *BinlogEvent, need if e.Header.LogPos > 0 { // Some events like FormatDescriptionEvent return 0, ignore. b.nextPos.Pos = e.Header.LogPos + } else if b.shouldCalculateDynamicLogPos(e) { + calculatedPos := b.nextPos.Pos + e.Header.EventSize + e.Header.LogPos = calculatedPos + b.nextPos.Pos = calculatedPos + b.cfg.Logger.Debug("MariaDB dynamic LogPos calculation", + slog.String("eventType", e.Header.EventType.String()), + slog.Uint64("logPos", uint64(calculatedPos))) } // Handle event types to update positions and GTID sets @@ -944,6 +964,19 @@ func (b *BinlogSyncer) handleEventAndACK(s *BinlogStreamer, e *BinlogEvent, need return nil } +// shouldCalculateDynamicLogPos determines if we should calculate LogPos dynamically for MariaDB events. +// This is needed for MariaDB 11.4+ when: +// 1. MariaDBDynamicLogPos is enabled +// 2. We're using MariaDB flavor +// 3. The event has LogPos=0 (indicating server didn't set it) +// 4. The event is not artificial (not marked with LOG_EVENT_ARTIFICIAL_F flag) +func (b *BinlogSyncer) shouldCalculateDynamicLogPos(e *BinlogEvent) bool { + return b.cfg.MariaDBDynamicLogPos && + b.cfg.Flavor == mysql.MariaDBFlavor && + e.Header.LogPos == 0 && + (e.Header.Flags&LOG_EVENT_ARTIFICIAL_F) == 0 +} + // getCurrentGtidSet returns a clone of the current GTID set. func (b *BinlogSyncer) getCurrentGtidSet() mysql.GTIDSet { if b.currGset != nil {