Skip to content

Commit c72b237

Browse files
meiji163grodowski
andauthored
Create a hook to capture copy batch errors and retries (#1638)
* Execute hook on every batch insert retry Co-authored-by: Bastian Bartmann <bastian.bartmann@shopify.com> * Expose the last error message to the onBatchCopyRetry hook Co-authored-by: Bastian Bartmann <bastian.bartmann@shopify.com> * Remove double retries CalculateNextIterationRangeEndValues needs to be recomputed on every retry in case of configuration (e.g. chunk-size) changes were made by onBatchCopyRetry hooks. * include dev.yml (temp for Shopify) * Update doc/hooks.md * Remove dev.yml * Fix retry issue where MigrationIterationRangeMinValues advances before insert completes - extract MigrationContext.SetNextIterationRangeValues outside of applyCopyRowsFunc, so that it doesn't run on retries - add an integration test for Migrator with retry hooks Co-authored-by: Bastian Bartmann <bastian.bartmann@shopify.com> * Add localtest that expects gh-ost to fail on exhausted retries * Rename method * fmt and lint * gofmt * Fix problems when altering a column from `binary` to `varbinary` (#1628) * Fix binary column trailing zero stripping for non-key columns MySQL's binlog strips trailing 0x00 bytes from binary(N) columns. PR #915 fixed this for unique key columns only, but the same issue affects all binary columns in INSERT/UPDATE operations. Remove the isUniqueKeyColumn condition so all binary(N) columns are padded to their declared length. Fixes a variation of #909 where the affected column is not a primary key. * Simplify by removing isUniqueKeyColumn now that it's no longer used. * In convertArg, don't convert binary data to strings. In this case, the input is binary, and the column type is `binary`. So the output should be binary, not text. * fix a lint * Fix 4 trigger handling bugs (#1626) * fix: remove double-transformation in trigger length validation ValidateGhostTriggerLengthBelowMaxLength was calling GetGhostTriggerName on an already-transformed name, adding the suffix twice. This caused valid trigger names (ghost name <= 64 chars) to be falsely rejected. The caller in inspect.go:627 already transforms the name via GetGhostTriggerName before passing it, so the validation function should check the length as-is. Unit tests updated to reflect the correct call pattern: transform first with GetGhostTriggerName, then validate the result. Added boundary tests for exactly 64 and 65 char names. * fix: return error from trigger creation during atomic cut-over During atomic cut-over, if CreateTriggersOnGhost failed, the error was logged but not returned. The migration continued and completed without triggers, silently losing them. The two-step cut-over (line 793) already correctly returns the error. This aligns the atomic cut-over to do the same. * fix: check trigger name uniqueness per schema, not per table validateGhostTriggersDontExist was filtering by event_object_table, only checking if the ghost trigger name existed on the original table. MySQL trigger names are unique per schema, so a trigger with the same name on any other table would block CREATE TRIGGER but pass validation. Remove the event_object_table filter to check trigger_name + trigger_schema only, matching MySQL's uniqueness constraint. * fix: use parameterized query in GetTriggers to prevent SQL injection GetTriggers used fmt.Sprintf with string interpolation for database and table names, causing SQL syntax errors with special characters and potential SQL injection. Switched to parameterized query with ? placeholders, matching the safe pattern already used in inspect.go:553-559. * test: add regression tests for trigger handling bugs Add two integration tests: - trigger-long-name-validation: verifies 60-char trigger names (64-char ghost name) are not falsely rejected by double-transform - trigger-ghost-name-conflict: verifies validation detects ghost trigger name conflicts on other tables in the same schema * style: gofmt context_test.go --------- Co-authored-by: Yakir Gibraltar <yakir.g@taboola.com> Co-authored-by: meiji163 <meiji163@github.com> * fix update of LastIterationRange values --------- Co-authored-by: Jan Grodowski <jan.grodowski@shopify.com>
1 parent c6f95cc commit c72b237

File tree

13 files changed

+207
-31
lines changed

13 files changed

+207
-31
lines changed

doc/hooks.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ The full list of supported hooks is best found in code: [hooks.go](https://githu
4949
- `gh-ost-on-before-cut-over`
5050
- `gh-ost-on-success`
5151
- `gh-ost-on-failure`
52+
- `gh-ost-on-batch-copy-retry`
5253

5354
### Context
5455

@@ -82,6 +83,7 @@ The following variable are available on particular hooks:
8283

8384
- `GH_OST_COMMAND` is only available in `gh-ost-on-interactive-command`
8485
- `GH_OST_STATUS` is only available in `gh-ost-on-status`
86+
- `GH_OST_LAST_BATCH_COPY_ERROR` is only available in `gh-ost-on-batch-copy-retry`
8587

8688
### Examples
8789

go/base/context.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -611,6 +611,13 @@ func (this *MigrationContext) GetIteration() int64 {
611611
return atomic.LoadInt64(&this.Iteration)
612612
}
613613

614+
func (this *MigrationContext) SetNextIterationRangeMinValues() {
615+
this.MigrationIterationRangeMinValues = this.MigrationIterationRangeMaxValues
616+
if this.MigrationIterationRangeMinValues == nil {
617+
this.MigrationIterationRangeMinValues = this.MigrationRangeMinValues
618+
}
619+
}
620+
614621
func (this *MigrationContext) MarkPointOfInterest() int64 {
615622
this.pointOfInterestTimeMutex.Lock()
616623
defer this.pointOfInterestTimeMutex.Unlock()

go/logic/applier.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -819,17 +819,6 @@ func (this *Applier) ReadMigrationRangeValues() error {
819819
// no further chunk to work through, i.e. we're past the last chunk and are done with
820820
// iterating the range (and thus done with copying row chunks)
821821
func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) {
822-
this.LastIterationRangeMutex.Lock()
823-
if this.migrationContext.MigrationIterationRangeMinValues != nil && this.migrationContext.MigrationIterationRangeMaxValues != nil {
824-
this.LastIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMinValues.Clone()
825-
this.LastIterationRangeMaxValues = this.migrationContext.MigrationIterationRangeMaxValues.Clone()
826-
}
827-
this.LastIterationRangeMutex.Unlock()
828-
829-
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMaxValues
830-
if this.migrationContext.MigrationIterationRangeMinValues == nil {
831-
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues
832-
}
833822
for i := 0; i < 2; i++ {
834823
buildFunc := sql.BuildUniqueKeyRangeEndPreparedQueryViaOffset
835824
if i == 1 {

go/logic/applier_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -542,7 +542,9 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySuc
542542
err = applier.ReadMigrationRangeValues()
543543
suite.Require().NoError(err)
544544

545+
migrationContext.SetNextIterationRangeMinValues()
545546
hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues()
547+
546548
suite.Require().NoError(err)
547549
suite.Require().True(hasFurtherRange)
548550

@@ -620,6 +622,7 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQueryFai
620622
err = applier.AlterGhost()
621623
suite.Require().NoError(err)
622624

625+
migrationContext.SetNextIterationRangeMinValues()
623626
hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues()
624627
suite.Require().NoError(err)
625628
suite.Require().True(hasFurtherRange)

go/logic/hooks.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ const (
2828
onInteractiveCommand = "gh-ost-on-interactive-command"
2929
onSuccess = "gh-ost-on-success"
3030
onFailure = "gh-ost-on-failure"
31+
onBatchCopyRetry = "gh-ost-on-batch-copy-retry"
3132
onStatus = "gh-ost-on-status"
3233
onStopReplication = "gh-ost-on-stop-replication"
3334
onStartReplication = "gh-ost-on-start-replication"
@@ -78,6 +79,7 @@ func (this *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) [
7879
// executeHook executes a command, and sets relevant environment variables
7980
// combined output & error are printed to the configured writer.
8081
func (this *HooksExecutor) executeHook(hook string, extraVariables ...string) error {
82+
this.migrationContext.Log.Infof("executing hook: %+v", hook)
8183
cmd := exec.Command(hook)
8284
cmd.Env = this.applyEnvironmentVariables(extraVariables...)
8385

@@ -124,6 +126,11 @@ func (this *HooksExecutor) onBeforeRowCopy() error {
124126
return this.executeHooks(onBeforeRowCopy)
125127
}
126128

129+
func (this *HooksExecutor) onBatchCopyRetry(errorMessage string) error {
130+
v := fmt.Sprintf("GH_OST_LAST_BATCH_COPY_ERROR=%s", errorMessage)
131+
return this.executeHooks(onBatchCopyRetry, v)
132+
}
133+
127134
func (this *HooksExecutor) onRowCopyComplete() error {
128135
return this.executeHooks(onRowCopyComplete)
129136
}

go/logic/migrator.go

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,18 @@ func (this *Migrator) sleepWhileTrue(operation func() (bool, error)) error {
135135
}
136136
}
137137

138+
func (this *Migrator) retryBatchCopyWithHooks(operation func() error, notFatalHint ...bool) (err error) {
139+
wrappedOperation := func() error {
140+
if err := operation(); err != nil {
141+
this.hooksExecutor.onBatchCopyRetry(err.Error())
142+
return err
143+
}
144+
return nil
145+
}
146+
147+
return this.retryOperation(wrappedOperation, notFatalHint...)
148+
}
149+
138150
// retryOperation attempts up to `count` attempts at running given function,
139151
// exiting as soon as it returns with non-error.
140152
func (this *Migrator) retryOperation(operation func() error, notFatalHint ...bool) (err error) {
@@ -1407,27 +1419,24 @@ func (this *Migrator) iterateChunks() error {
14071419
return nil
14081420
}
14091421
copyRowsFunc := func() error {
1410-
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 {
1411-
// Done.
1412-
// There's another such check down the line
1413-
return nil
1414-
}
1415-
1416-
// When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever
1417-
1418-
hasFurtherRange := false
1419-
if err := this.retryOperation(func() (e error) {
1420-
hasFurtherRange, e = this.applier.CalculateNextIterationRangeEndValues()
1421-
return e
1422-
}); err != nil {
1423-
return terminateRowIteration(err)
1424-
}
1425-
if !hasFurtherRange {
1426-
atomic.StoreInt64(&hasNoFurtherRangeFlag, 1)
1427-
return terminateRowIteration(nil)
1428-
}
1422+
this.migrationContext.SetNextIterationRangeMinValues()
14291423
// Copy task:
14301424
applyCopyRowsFunc := func() error {
1425+
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 {
1426+
// Done.
1427+
// There's another such check down the line
1428+
return nil
1429+
}
1430+
1431+
// When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever
1432+
hasFurtherRange, err := this.applier.CalculateNextIterationRangeEndValues()
1433+
if err != nil {
1434+
return err // wrapping call will retry
1435+
}
1436+
if !hasFurtherRange {
1437+
atomic.StoreInt64(&hasNoFurtherRangeFlag, 1)
1438+
return terminateRowIteration(nil)
1439+
}
14311440
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 {
14321441
// No need for more writes.
14331442
// This is the de-facto place where we avoid writing in the event of completed cut-over.
@@ -1458,9 +1467,18 @@ func (this *Migrator) iterateChunks() error {
14581467
atomic.AddInt64(&this.migrationContext.Iteration, 1)
14591468
return nil
14601469
}
1461-
if err := this.retryOperation(applyCopyRowsFunc); err != nil {
1470+
if err := this.retryBatchCopyWithHooks(applyCopyRowsFunc); err != nil {
14621471
return terminateRowIteration(err)
14631472
}
1473+
1474+
// record last successfully copied range
1475+
this.applier.LastIterationRangeMutex.Lock()
1476+
if this.migrationContext.MigrationIterationRangeMinValues != nil && this.migrationContext.MigrationIterationRangeMaxValues != nil {
1477+
this.applier.LastIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMinValues.Clone()
1478+
this.applier.LastIterationRangeMaxValues = this.migrationContext.MigrationIterationRangeMaxValues.Clone()
1479+
}
1480+
this.applier.LastIterationRangeMutex.Unlock()
1481+
14641482
return nil
14651483
}
14661484
// Enqueue copy operation; to be executed by executeWriteFuncs()

go/logic/migrator_test.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
package logic
77

88
import (
9+
"bytes"
910
"context"
1011
gosql "database/sql"
1112
"errors"
1213
"fmt"
14+
"io"
1315
"os"
1416
"path/filepath"
1517
"strings"
@@ -325,6 +327,8 @@ func (suite *MigratorTestSuite) SetupTest() {
325327

326328
_, err := suite.db.ExecContext(ctx, "CREATE DATABASE IF NOT EXISTS "+testMysqlDatabase)
327329
suite.Require().NoError(err)
330+
331+
os.Remove("/tmp/gh-ost.sock")
328332
}
329333

330334
func (suite *MigratorTestSuite) TearDownTest() {
@@ -384,6 +388,126 @@ func (suite *MigratorTestSuite) TestMigrateEmpty() {
384388
suite.Require().Equal("_testing_del", tableName)
385389
}
386390

391+
func (suite *MigratorTestSuite) TestRetryBatchCopyWithHooks() {
392+
ctx := context.Background()
393+
394+
_, err := suite.db.ExecContext(ctx, "CREATE TABLE test.test_retry_batch (id INT PRIMARY KEY AUTO_INCREMENT, name TEXT)")
395+
suite.Require().NoError(err)
396+
397+
const initStride = 1000
398+
const totalBatches = 3
399+
for i := 0; i < totalBatches; i++ {
400+
dataSize := 50 * i
401+
for j := 0; j < initStride; j++ {
402+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO test.test_retry_batch (name) VALUES ('%s')", strings.Repeat("a", dataSize)))
403+
suite.Require().NoError(err)
404+
}
405+
}
406+
407+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("SET GLOBAL max_binlog_cache_size = %d", 1024*8))
408+
suite.Require().NoError(err)
409+
defer func() {
410+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("SET GLOBAL max_binlog_cache_size = %d", 1024*1024*1024))
411+
suite.Require().NoError(err)
412+
}()
413+
414+
tmpDir, err := os.MkdirTemp("", "gh-ost-hooks")
415+
suite.Require().NoError(err)
416+
defer os.RemoveAll(tmpDir)
417+
418+
hookScript := filepath.Join(tmpDir, "gh-ost-on-batch-copy-retry")
419+
hookContent := `#!/bin/bash
420+
# Mock hook that reduces chunk size on binlog cache error
421+
ERROR_MSG="$GH_OST_LAST_BATCH_COPY_ERROR"
422+
SOCKET_PATH="/tmp/gh-ost.sock"
423+
424+
if ! [[ "$ERROR_MSG" =~ "max_binlog_cache_size" ]]; then
425+
echo "Nothing to do for error: $ERROR_MSG"
426+
exit 0
427+
fi
428+
429+
CHUNK_SIZE=$(echo "chunk-size=?" | nc -U $SOCKET_PATH | tr -d '\n')
430+
431+
MIN_CHUNK_SIZE=10
432+
NEW_CHUNK_SIZE=$(( CHUNK_SIZE * 8 / 10 ))
433+
if [ $NEW_CHUNK_SIZE -lt $MIN_CHUNK_SIZE ]; then
434+
NEW_CHUNK_SIZE=$MIN_CHUNK_SIZE
435+
fi
436+
437+
if [ $CHUNK_SIZE -eq $NEW_CHUNK_SIZE ]; then
438+
echo "Chunk size unchanged: $CHUNK_SIZE"
439+
exit 0
440+
fi
441+
442+
echo "[gh-ost-on-batch-copy-retry]: Changing chunk size from $CHUNK_SIZE to $NEW_CHUNK_SIZE"
443+
echo "chunk-size=$NEW_CHUNK_SIZE" | nc -U $SOCKET_PATH
444+
echo "[gh-ost-on-batch-copy-retry]: Done, exiting..."
445+
`
446+
err = os.WriteFile(hookScript, []byte(hookContent), 0755)
447+
suite.Require().NoError(err)
448+
449+
origStdout := os.Stdout
450+
origStderr := os.Stderr
451+
452+
rOut, wOut, _ := os.Pipe()
453+
rErr, wErr, _ := os.Pipe()
454+
os.Stdout = wOut
455+
os.Stderr = wErr
456+
457+
connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
458+
suite.Require().NoError(err)
459+
460+
migrationContext := base.NewMigrationContext()
461+
migrationContext.AllowedRunningOnMaster = true
462+
migrationContext.ApplierConnectionConfig = connectionConfig
463+
migrationContext.InspectorConnectionConfig = connectionConfig
464+
migrationContext.DatabaseName = "test"
465+
migrationContext.SkipPortValidation = true
466+
migrationContext.OriginalTableName = "test_retry_batch"
467+
migrationContext.SetConnectionConfig("innodb")
468+
migrationContext.AlterStatementOptions = "MODIFY name LONGTEXT, ENGINE=InnoDB"
469+
migrationContext.ReplicaServerId = 99999
470+
migrationContext.HeartbeatIntervalMilliseconds = 100
471+
migrationContext.ThrottleHTTPIntervalMillis = 100
472+
migrationContext.ThrottleHTTPTimeoutMillis = 1000
473+
migrationContext.HooksPath = tmpDir
474+
migrationContext.ChunkSize = 1000
475+
migrationContext.SetDefaultNumRetries(10)
476+
migrationContext.ServeSocketFile = "/tmp/gh-ost.sock"
477+
478+
migrator := NewMigrator(migrationContext, "0.0.0")
479+
480+
err = migrator.Migrate()
481+
suite.Require().NoError(err)
482+
483+
wOut.Close()
484+
wErr.Close()
485+
os.Stdout = origStdout
486+
os.Stderr = origStderr
487+
488+
var bufOut, bufErr bytes.Buffer
489+
io.Copy(&bufOut, rOut)
490+
io.Copy(&bufErr, rErr)
491+
492+
outStr := bufOut.String()
493+
errStr := bufErr.String()
494+
495+
suite.Assert().Contains(outStr, "chunk-size: 1000")
496+
suite.Assert().Contains(errStr, "[gh-ost-on-batch-copy-retry]: Changing chunk size from 1000 to 800")
497+
suite.Assert().Contains(outStr, "chunk-size: 800")
498+
499+
suite.Assert().Contains(errStr, "[gh-ost-on-batch-copy-retry]: Changing chunk size from 800 to 640")
500+
suite.Assert().Contains(outStr, "chunk-size: 640")
501+
502+
suite.Assert().Contains(errStr, "[gh-ost-on-batch-copy-retry]: Changing chunk size from 640 to 512")
503+
suite.Assert().Contains(outStr, "chunk-size: 512")
504+
505+
var count int
506+
err = suite.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM test.test_retry_batch").Scan(&count)
507+
suite.Require().NoError(err)
508+
suite.Assert().Equal(3000, count)
509+
}
510+
387511
func (suite *MigratorTestSuite) TestCopierIntPK() {
388512
ctx := context.Background()
389513

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
set global max_binlog_cache_size = 1073741824; -- 1GB
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
set global max_binlog_cache_size = 1024;
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
drop table if exists gh_ost_test;
2+
create table gh_ost_test (
3+
id int auto_increment,
4+
name mediumtext not null,
5+
primary key (id)
6+
) auto_increment=1;
7+
8+
insert into gh_ost_test (name)
9+
select repeat('a', 1500)
10+
from information_schema.columns
11+
cross join information_schema.tables
12+
limit 1000;

0 commit comments

Comments
 (0)