-
Notifications
You must be signed in to change notification settings - Fork 1k
fix(replication): calculate LogPos for non-artificial events in MariaDB 11.4+ #1052
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -128,6 +128,12 @@ type BinlogSyncerConfig struct { | |
|
||
EventCacheCount int | ||
|
||
// MariaDBDynamicLogPos enables dynamic LogPos calculation for MariaDB. | ||
// When enabled, automatically adds BINLOG_SEND_ANNOTATE_ROWS_EVENT flag | ||
// to ensure correct position calculation in MariaDB 11.4+. | ||
// Only works with MariaDB flavor. | ||
MariaDBDynamicLogPos bool | ||
|
||
// SynchronousEventHandler is used for synchronous event handling. | ||
// This should not be used together with StartBackupWithHandler. | ||
// If this is not nil, GetEvent does not need to be called. | ||
|
@@ -509,7 +515,14 @@ func (b *BinlogSyncer) writeBinlogDumpCommand(p mysql.Position) error { | |
binary.LittleEndian.PutUint32(data[pos:], p.Pos) | ||
pos += 4 | ||
|
||
binary.LittleEndian.PutUint16(data[pos:], b.cfg.DumpCommandFlag) | ||
dumpCommandFlag := b.cfg.DumpCommandFlag | ||
if b.cfg.MariaDBDynamicLogPos && b.cfg.Flavor == mysql.MariaDBFlavor { | ||
// Add BINLOG_SEND_ANNOTATE_ROWS_EVENT flag when MariaDBDynamicLogPos is enabled. | ||
// This ensures the server sends ANNOTATE_ROWS_EVENT events which are needed | ||
// for correct LogPos calculation in MariaDB 11.4+, where some events have LogPos=0. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you have some manual test result to make sure only this flag is enough? Please add some explanation in PR description. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This has been my ongoing concern and feedback - that this might not be the only necessary flag, and likewise that perhaps using event types explicitly would be a better approach. It is too "hand wavey" to just use this flag without a more thorough rationale. I don't have much to contribute to resolving this, however. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't worry. I think because this fix indeed fixed your problem, we can merge it and leave more enhancement to future PR. And because it's possible (not likely, though) we will change to another implementation and break properties from this PR, can you add some unit tests to prevent it breaking unintentionally? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I havent yet tested the current iteration. In fact, I only had it working when I used my own event type implementation rather than flags. I'll test it tomorrow and report back |
||
dumpCommandFlag |= BINLOG_SEND_ANNOTATE_ROWS_EVENT | ||
} | ||
binary.LittleEndian.PutUint16(data[pos:], dumpCommandFlag) | ||
pos += 2 | ||
|
||
binary.LittleEndian.PutUint32(data[pos:], b.cfg.ServerID) | ||
|
@@ -861,6 +874,13 @@ func (b *BinlogSyncer) handleEventAndACK(s *BinlogStreamer, e *BinlogEvent, need | |
if e.Header.LogPos > 0 { | ||
// Some events like FormatDescriptionEvent return 0, ignore. | ||
b.nextPos.Pos = e.Header.LogPos | ||
} else if b.shouldCalculateDynamicLogPos(e) { | ||
calculatedPos := b.nextPos.Pos + e.Header.EventSize | ||
e.Header.LogPos = calculatedPos | ||
b.nextPos.Pos = calculatedPos | ||
b.cfg.Logger.Debug("MariaDB dynamic LogPos calculation", | ||
slog.String("eventType", e.Header.EventType.String()), | ||
slog.Uint64("logPos", uint64(calculatedPos))) | ||
} | ||
|
||
// Handle event types to update positions and GTID sets | ||
|
@@ -944,6 +964,19 @@ func (b *BinlogSyncer) handleEventAndACK(s *BinlogStreamer, e *BinlogEvent, need | |
return nil | ||
} | ||
|
||
// shouldCalculateDynamicLogPos determines if we should calculate LogPos dynamically for MariaDB events. | ||
// This is needed for MariaDB 11.4+ when: | ||
// 1. MariaDBDynamicLogPos is enabled | ||
// 2. We're using MariaDB flavor | ||
// 3. The event has LogPos=0 (indicating server didn't set it) | ||
// 4. The event is not artificial (not marked with LOG_EVENT_ARTIFICIAL_F flag) | ||
func (b *BinlogSyncer) shouldCalculateDynamicLogPos(e *BinlogEvent) bool { | ||
return b.cfg.MariaDBDynamicLogPos && | ||
b.cfg.Flavor == mysql.MariaDBFlavor && | ||
e.Header.LogPos == 0 && | ||
(e.Header.Flags&LOG_EVENT_ARTIFICIAL_F) == 0 | ||
} | ||
|
||
// getCurrentGtidSet returns a clone of the current GTID set. | ||
func (b *BinlogSyncer) getCurrentGtidSet() mysql.GTIDSet { | ||
if b.currGset != nil { | ||
|
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in
NewBinlogSyncer
we can check the flavor andMariaDBDynamicLogPos
match, and print warning log if mismatch.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about
MariaDBFillZeroLogPos
?Dynamic
makes me think more LogPos will be computed on-the-fly other than zero positions. And maybe the value is changeable, not the same one as upstream binlog position.