Skip to content

Commit c640f85

Browse files
authored
Merge branch 'master' into master
2 parents 1e3b3a1 + c72b237 commit c640f85

File tree

13 files changed

+207
-31
lines changed

13 files changed

+207
-31
lines changed

doc/hooks.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ The full list of supported hooks is best found in code: [hooks.go](https://githu
4949
- `gh-ost-on-before-cut-over`
5050
- `gh-ost-on-success`
5151
- `gh-ost-on-failure`
52+
- `gh-ost-on-batch-copy-retry`
5253

5354
### Context
5455

@@ -82,6 +83,7 @@ The following variable are available on particular hooks:
8283

8384
- `GH_OST_COMMAND` is only available in `gh-ost-on-interactive-command`
8485
- `GH_OST_STATUS` is only available in `gh-ost-on-status`
86+
- `GH_OST_LAST_BATCH_COPY_ERROR` is only available in `gh-ost-on-batch-copy-retry`
8587

8688
### Examples
8789

go/base/context.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -611,6 +611,13 @@ func (this *MigrationContext) GetIteration() int64 {
611611
return atomic.LoadInt64(&this.Iteration)
612612
}
613613

614+
func (this *MigrationContext) SetNextIterationRangeMinValues() {
615+
this.MigrationIterationRangeMinValues = this.MigrationIterationRangeMaxValues
616+
if this.MigrationIterationRangeMinValues == nil {
617+
this.MigrationIterationRangeMinValues = this.MigrationRangeMinValues
618+
}
619+
}
620+
614621
func (this *MigrationContext) MarkPointOfInterest() int64 {
615622
this.pointOfInterestTimeMutex.Lock()
616623
defer this.pointOfInterestTimeMutex.Unlock()

go/logic/applier.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -819,17 +819,6 @@ func (this *Applier) ReadMigrationRangeValues() error {
819819
// no further chunk to work through, i.e. we're past the last chunk and are done with
820820
// iterating the range (and thus done with copying row chunks)
821821
func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) {
822-
this.LastIterationRangeMutex.Lock()
823-
if this.migrationContext.MigrationIterationRangeMinValues != nil && this.migrationContext.MigrationIterationRangeMaxValues != nil {
824-
this.LastIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMinValues.Clone()
825-
this.LastIterationRangeMaxValues = this.migrationContext.MigrationIterationRangeMaxValues.Clone()
826-
}
827-
this.LastIterationRangeMutex.Unlock()
828-
829-
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMaxValues
830-
if this.migrationContext.MigrationIterationRangeMinValues == nil {
831-
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues
832-
}
833822
for i := 0; i < 2; i++ {
834823
buildFunc := sql.BuildUniqueKeyRangeEndPreparedQueryViaOffset
835824
if i == 1 {

go/logic/applier_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -542,7 +542,9 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySuc
542542
err = applier.ReadMigrationRangeValues()
543543
suite.Require().NoError(err)
544544

545+
migrationContext.SetNextIterationRangeMinValues()
545546
hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues()
547+
546548
suite.Require().NoError(err)
547549
suite.Require().True(hasFurtherRange)
548550

@@ -620,6 +622,7 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQueryFai
620622
err = applier.AlterGhost()
621623
suite.Require().NoError(err)
622624

625+
migrationContext.SetNextIterationRangeMinValues()
623626
hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues()
624627
suite.Require().NoError(err)
625628
suite.Require().True(hasFurtherRange)

go/logic/hooks.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ const (
2828
onInteractiveCommand = "gh-ost-on-interactive-command"
2929
onSuccess = "gh-ost-on-success"
3030
onFailure = "gh-ost-on-failure"
31+
onBatchCopyRetry = "gh-ost-on-batch-copy-retry"
3132
onStatus = "gh-ost-on-status"
3233
onStopReplication = "gh-ost-on-stop-replication"
3334
onStartReplication = "gh-ost-on-start-replication"
@@ -78,6 +79,7 @@ func (this *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) [
7879
// executeHook executes a command, and sets relevant environment variables
7980
// combined output & error are printed to the configured writer.
8081
func (this *HooksExecutor) executeHook(hook string, extraVariables ...string) error {
82+
this.migrationContext.Log.Infof("executing hook: %+v", hook)
8183
cmd := exec.Command(hook)
8284
cmd.Env = this.applyEnvironmentVariables(extraVariables...)
8385

@@ -124,6 +126,11 @@ func (this *HooksExecutor) onBeforeRowCopy() error {
124126
return this.executeHooks(onBeforeRowCopy)
125127
}
126128

129+
func (this *HooksExecutor) onBatchCopyRetry(errorMessage string) error {
130+
v := fmt.Sprintf("GH_OST_LAST_BATCH_COPY_ERROR=%s", errorMessage)
131+
return this.executeHooks(onBatchCopyRetry, v)
132+
}
133+
127134
func (this *HooksExecutor) onRowCopyComplete() error {
128135
return this.executeHooks(onRowCopyComplete)
129136
}

go/logic/migrator.go

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,18 @@ func (this *Migrator) sleepWhileTrue(operation func() (bool, error)) error {
135135
}
136136
}
137137

138+
func (this *Migrator) retryBatchCopyWithHooks(operation func() error, notFatalHint ...bool) (err error) {
139+
wrappedOperation := func() error {
140+
if err := operation(); err != nil {
141+
this.hooksExecutor.onBatchCopyRetry(err.Error())
142+
return err
143+
}
144+
return nil
145+
}
146+
147+
return this.retryOperation(wrappedOperation, notFatalHint...)
148+
}
149+
138150
// retryOperation attempts up to `count` attempts at running given function,
139151
// exiting as soon as it returns with non-error.
140152
func (this *Migrator) retryOperation(operation func() error, notFatalHint ...bool) (err error) {
@@ -1407,27 +1419,24 @@ func (this *Migrator) iterateChunks() error {
14071419
return nil
14081420
}
14091421
copyRowsFunc := func() error {
1410-
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 {
1411-
// Done.
1412-
// There's another such check down the line
1413-
return nil
1414-
}
1415-
1416-
// When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever
1417-
1418-
hasFurtherRange := false
1419-
if err := this.retryOperation(func() (e error) {
1420-
hasFurtherRange, e = this.applier.CalculateNextIterationRangeEndValues()
1421-
return e
1422-
}); err != nil {
1423-
return terminateRowIteration(err)
1424-
}
1425-
if !hasFurtherRange {
1426-
atomic.StoreInt64(&hasNoFurtherRangeFlag, 1)
1427-
return terminateRowIteration(nil)
1428-
}
1422+
this.migrationContext.SetNextIterationRangeMinValues()
14291423
// Copy task:
14301424
applyCopyRowsFunc := func() error {
1425+
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 {
1426+
// Done.
1427+
// There's another such check down the line
1428+
return nil
1429+
}
1430+
1431+
// When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever
1432+
hasFurtherRange, err := this.applier.CalculateNextIterationRangeEndValues()
1433+
if err != nil {
1434+
return err // wrapping call will retry
1435+
}
1436+
if !hasFurtherRange {
1437+
atomic.StoreInt64(&hasNoFurtherRangeFlag, 1)
1438+
return terminateRowIteration(nil)
1439+
}
14311440
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 {
14321441
// No need for more writes.
14331442
// This is the de-facto place where we avoid writing in the event of completed cut-over.
@@ -1458,9 +1467,18 @@ func (this *Migrator) iterateChunks() error {
14581467
atomic.AddInt64(&this.migrationContext.Iteration, 1)
14591468
return nil
14601469
}
1461-
if err := this.retryOperation(applyCopyRowsFunc); err != nil {
1470+
if err := this.retryBatchCopyWithHooks(applyCopyRowsFunc); err != nil {
14621471
return terminateRowIteration(err)
14631472
}
1473+
1474+
// record last successfully copied range
1475+
this.applier.LastIterationRangeMutex.Lock()
1476+
if this.migrationContext.MigrationIterationRangeMinValues != nil && this.migrationContext.MigrationIterationRangeMaxValues != nil {
1477+
this.applier.LastIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMinValues.Clone()
1478+
this.applier.LastIterationRangeMaxValues = this.migrationContext.MigrationIterationRangeMaxValues.Clone()
1479+
}
1480+
this.applier.LastIterationRangeMutex.Unlock()
1481+
14641482
return nil
14651483
}
14661484
// Enqueue copy operation; to be executed by executeWriteFuncs()

go/logic/migrator_test.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
package logic
77

88
import (
9+
"bytes"
910
"context"
1011
gosql "database/sql"
1112
"errors"
1213
"fmt"
14+
"io"
1315
"os"
1416
"path/filepath"
1517
"strings"
@@ -325,6 +327,8 @@ func (suite *MigratorTestSuite) SetupTest() {
325327

326328
_, err := suite.db.ExecContext(ctx, "CREATE DATABASE IF NOT EXISTS "+testMysqlDatabase)
327329
suite.Require().NoError(err)
330+
331+
os.Remove("/tmp/gh-ost.sock")
328332
}
329333

330334
func (suite *MigratorTestSuite) TearDownTest() {
@@ -384,6 +388,126 @@ func (suite *MigratorTestSuite) TestMigrateEmpty() {
384388
suite.Require().Equal("_testing_del", tableName)
385389
}
386390

391+
func (suite *MigratorTestSuite) TestRetryBatchCopyWithHooks() {
392+
ctx := context.Background()
393+
394+
_, err := suite.db.ExecContext(ctx, "CREATE TABLE test.test_retry_batch (id INT PRIMARY KEY AUTO_INCREMENT, name TEXT)")
395+
suite.Require().NoError(err)
396+
397+
const initStride = 1000
398+
const totalBatches = 3
399+
for i := 0; i < totalBatches; i++ {
400+
dataSize := 50 * i
401+
for j := 0; j < initStride; j++ {
402+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO test.test_retry_batch (name) VALUES ('%s')", strings.Repeat("a", dataSize)))
403+
suite.Require().NoError(err)
404+
}
405+
}
406+
407+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("SET GLOBAL max_binlog_cache_size = %d", 1024*8))
408+
suite.Require().NoError(err)
409+
defer func() {
410+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("SET GLOBAL max_binlog_cache_size = %d", 1024*1024*1024))
411+
suite.Require().NoError(err)
412+
}()
413+
414+
tmpDir, err := os.MkdirTemp("", "gh-ost-hooks")
415+
suite.Require().NoError(err)
416+
defer os.RemoveAll(tmpDir)
417+
418+
hookScript := filepath.Join(tmpDir, "gh-ost-on-batch-copy-retry")
419+
hookContent := `#!/bin/bash
420+
# Mock hook that reduces chunk size on binlog cache error
421+
ERROR_MSG="$GH_OST_LAST_BATCH_COPY_ERROR"
422+
SOCKET_PATH="/tmp/gh-ost.sock"
423+
424+
if ! [[ "$ERROR_MSG" =~ "max_binlog_cache_size" ]]; then
425+
echo "Nothing to do for error: $ERROR_MSG"
426+
exit 0
427+
fi
428+
429+
CHUNK_SIZE=$(echo "chunk-size=?" | nc -U $SOCKET_PATH | tr -d '\n')
430+
431+
MIN_CHUNK_SIZE=10
432+
NEW_CHUNK_SIZE=$(( CHUNK_SIZE * 8 / 10 ))
433+
if [ $NEW_CHUNK_SIZE -lt $MIN_CHUNK_SIZE ]; then
434+
NEW_CHUNK_SIZE=$MIN_CHUNK_SIZE
435+
fi
436+
437+
if [ $CHUNK_SIZE -eq $NEW_CHUNK_SIZE ]; then
438+
echo "Chunk size unchanged: $CHUNK_SIZE"
439+
exit 0
440+
fi
441+
442+
echo "[gh-ost-on-batch-copy-retry]: Changing chunk size from $CHUNK_SIZE to $NEW_CHUNK_SIZE"
443+
echo "chunk-size=$NEW_CHUNK_SIZE" | nc -U $SOCKET_PATH
444+
echo "[gh-ost-on-batch-copy-retry]: Done, exiting..."
445+
`
446+
err = os.WriteFile(hookScript, []byte(hookContent), 0755)
447+
suite.Require().NoError(err)
448+
449+
origStdout := os.Stdout
450+
origStderr := os.Stderr
451+
452+
rOut, wOut, _ := os.Pipe()
453+
rErr, wErr, _ := os.Pipe()
454+
os.Stdout = wOut
455+
os.Stderr = wErr
456+
457+
connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
458+
suite.Require().NoError(err)
459+
460+
migrationContext := base.NewMigrationContext()
461+
migrationContext.AllowedRunningOnMaster = true
462+
migrationContext.ApplierConnectionConfig = connectionConfig
463+
migrationContext.InspectorConnectionConfig = connectionConfig
464+
migrationContext.DatabaseName = "test"
465+
migrationContext.SkipPortValidation = true
466+
migrationContext.OriginalTableName = "test_retry_batch"
467+
migrationContext.SetConnectionConfig("innodb")
468+
migrationContext.AlterStatementOptions = "MODIFY name LONGTEXT, ENGINE=InnoDB"
469+
migrationContext.ReplicaServerId = 99999
470+
migrationContext.HeartbeatIntervalMilliseconds = 100
471+
migrationContext.ThrottleHTTPIntervalMillis = 100
472+
migrationContext.ThrottleHTTPTimeoutMillis = 1000
473+
migrationContext.HooksPath = tmpDir
474+
migrationContext.ChunkSize = 1000
475+
migrationContext.SetDefaultNumRetries(10)
476+
migrationContext.ServeSocketFile = "/tmp/gh-ost.sock"
477+
478+
migrator := NewMigrator(migrationContext, "0.0.0")
479+
480+
err = migrator.Migrate()
481+
suite.Require().NoError(err)
482+
483+
wOut.Close()
484+
wErr.Close()
485+
os.Stdout = origStdout
486+
os.Stderr = origStderr
487+
488+
var bufOut, bufErr bytes.Buffer
489+
io.Copy(&bufOut, rOut)
490+
io.Copy(&bufErr, rErr)
491+
492+
outStr := bufOut.String()
493+
errStr := bufErr.String()
494+
495+
suite.Assert().Contains(outStr, "chunk-size: 1000")
496+
suite.Assert().Contains(errStr, "[gh-ost-on-batch-copy-retry]: Changing chunk size from 1000 to 800")
497+
suite.Assert().Contains(outStr, "chunk-size: 800")
498+
499+
suite.Assert().Contains(errStr, "[gh-ost-on-batch-copy-retry]: Changing chunk size from 800 to 640")
500+
suite.Assert().Contains(outStr, "chunk-size: 640")
501+
502+
suite.Assert().Contains(errStr, "[gh-ost-on-batch-copy-retry]: Changing chunk size from 640 to 512")
503+
suite.Assert().Contains(outStr, "chunk-size: 512")
504+
505+
var count int
506+
err = suite.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM test.test_retry_batch").Scan(&count)
507+
suite.Require().NoError(err)
508+
suite.Assert().Equal(3000, count)
509+
}
510+
387511
func (suite *MigratorTestSuite) TestCopierIntPK() {
388512
ctx := context.Background()
389513

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
set global max_binlog_cache_size = 1073741824; -- 1GB
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
set global max_binlog_cache_size = 1024;
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
drop table if exists gh_ost_test;
2+
create table gh_ost_test (
3+
id int auto_increment,
4+
name mediumtext not null,
5+
primary key (id)
6+
) auto_increment=1;
7+
8+
insert into gh_ost_test (name)
9+
select repeat('a', 1500)
10+
from information_schema.columns
11+
cross join information_schema.tables
12+
limit 1000;

0 commit comments

Comments
 (0)