pkg/util/topsql/reporter: stabilize flaky TestTopRUPipelineInProcessIntegration#67579
pkg/util/topsql/reporter: stabilize flaky TestTopRUPipelineInProcessIntegration#67579flaky-claw wants to merge 1 commit intopingcap:masterfrom
Conversation
|
Review failed due to infrastructure/execution failure after retries. Please re-trigger review. ℹ️ Learn more details on Pantheon AI. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Hi @flaky-claw. Thanks for your PR. PRs from untrusted users cannot be marked as trusted with I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
30ef582 to
b989d63
Compare
📝 WalkthroughWalkthroughThe PR introduces a major refactoring of the dump orchestration flow in dumpling by replacing failpoint injection patterns with structured eval-based checks and a new Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 golangci-lint (2.11.4)Command failed Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment Warning |
|
@flaky-claw: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@dumpling/export/dump.go`:
- Around line 346-385: startWriters currently starts each writer goroutine
inside the loop so if createConnWithConsistency fails later earlier writers are
left running; change startWriters to first construct all Writer instances and
keep their sql.Conn references (using createConnWithConsistency) without calling
wg.Go, and only after the loop completes successfully iterate over the
constructed writers to call wg.Go(func() error { return writer.run(taskChan) });
if an error occurs during construction clean up by closing any already-created
conns and calling any writer cleanup (e.g., writer.Close or similar) before
returning the teardown no-op; refer to startWriters, createConnWithConsistency,
Writer, writer.run, wg.Go and the teardown return to implement this safe
two-phase initialization and cleanup.
- Around line 1062-1101: The reset callback inside selectTiDBTableRegion
currently clears pkFields and doesn't restore rowID, which is wrong because
pkFields is computed before the query and rowID must be reset for retries;
change the reset callback to restore rowID = -1 and only clear pkVals (and any
per-run state), leaving pkFields untouched so retries keep the computed
handle-column list and the first-row skip behavior is preserved. Ensure
references to rowID, pkVals, and pkFields in the reset closure are updated
accordingly.
- Around line 104-145: The constructor currently only unregisters metrics on
error, leaving resources from later steps running; add a teardown that runs if
runSteps returns an error to cleanly reverse side effects started by
startHTTPService, openSQLDB and tidbStartGCSavepointUpdateService. Implement a
cleanup function (called from a defer placed before calling runSteps) that:
cancels/terminates d.tctx (or calls its Cancel/Close), closes any opened DB
handle (d.db.Close or similar), stops the HTTP service started by
startHTTPService (call its shutdown/Stop method), and unregisters metrics
(d.metrics.unregisterFrom) — call that cleanup when runSteps returns a non-nil
error so partially-initialized state is torn down. Ensure the cleanup references
the same symbols used in this file: runSteps, startHTTPService, openSQLDB,
tidbStartGCSavepointUpdateService, d.tctx, d.db and d.metrics.unregisterFrom.
- Around line 1906-1952: There is a duplicate "package export" declaration and a
repeated import block; remove the second package clause and the duplicate import
block (the redundant lines starting with "package export" and the following
import (...) section) so the file has only one package statement and one import
block; ensure any referenced symbols (types/functions like Dump, ExportTask, or
imports such as fmt, context, github.com/go-sql-driver/mysql) are still covered
by the remaining import block and run go build to confirm the file compiles.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 81a5da47-28c5-4f84-b236-dbdd965a34e4
📒 Files selected for processing (3)
dumpling/export/dump.gopkg/util/topsql/reporter/reporter.gopkg/util/topsql/reporter/reporter_test.go
| d.metrics = newMetrics(conf.PromFactory, conf.Labels) | ||
| d.metrics.registerTo(conf.PromRegistry) | ||
| defer func() { | ||
| if err != nil { | ||
| d.metrics.unregisterFrom(conf.PromRegistry) | ||
| } | ||
| }() | ||
|
|
||
| err = adjustConfig(conf, | ||
| buildTLSConfig, | ||
| validateSpecifiedSQL, | ||
| adjustFileFormat) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| if _, _err_ := failpoint.Eval(_curpkg_("SetIOTotalBytes")); _err_ == nil { | ||
| d.conf.IOTotalBytes = gatomic.NewUint64(0) | ||
| d.conf.Net = uuid.New().String() | ||
| go func() { | ||
| for { | ||
| time.Sleep(10 * time.Millisecond) | ||
| d.tctx.L().Logger.Info("IOTotalBytes", zap.Uint64("IOTotalBytes", d.conf.IOTotalBytes.Load())) | ||
| } | ||
| }() | ||
| } | ||
|
|
||
| err = runSteps(d, | ||
| initLogger, | ||
| createExternalStore, | ||
| startHTTPService, | ||
| openSQLDB, | ||
| detectServerInfo, | ||
| resolveAutoConsistency, | ||
|
|
||
| validateResolveAutoConsistency, | ||
| tidbResolveKeyspaceMetaForGC, | ||
| tidbSetPDClientForGC, | ||
| tidbGetSnapshot, | ||
| tidbStartGCSavepointUpdateService, | ||
|
|
||
| setSessionParam) | ||
| return d, err |
There was a problem hiding this comment.
Tear down partially initialized state when NewDumper fails.
After Line 130, later steps can fail after startHTTPService, openSQLDB, or tidbStartGCSavepointUpdateService have already started side effects. The current error defer only unregisters metrics, so a failed constructor can leave the context live, the DB handle open, and background services still running.
🧹 Minimal cleanup on constructor failure
d.metrics.registerTo(conf.PromRegistry)
defer func() {
if err != nil {
+ cancelFn()
+ if d.dbHandle != nil {
+ _ = d.dbHandle.Close()
+ }
d.metrics.unregisterFrom(conf.PromRegistry)
}
}()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@dumpling/export/dump.go` around lines 104 - 145, The constructor currently
only unregisters metrics on error, leaving resources from later steps running;
add a teardown that runs if runSteps returns an error to cleanly reverse side
effects started by startHTTPService, openSQLDB and
tidbStartGCSavepointUpdateService. Implement a cleanup function (called from a
defer placed before calling runSteps) that: cancels/terminates d.tctx (or calls
its Cancel/Close), closes any opened DB handle (d.db.Close or similar), stops
the HTTP service started by startHTTPService (call its shutdown/Stop method),
and unregisters metrics (d.metrics.unregisterFrom) — call that cleanup when
runSteps returns a non-nil error so partially-initialized state is torn down.
Ensure the cleanup references the same symbols used in this file: runSteps,
startHTTPService, openSQLDB, tidbStartGCSavepointUpdateService, d.tctx, d.db and
d.metrics.unregisterFrom.
| func (d *Dumper) startWriters(tctx *tcontext.Context, wg *errgroup.Group, taskChan <-chan Task, | ||
| rebuildConnFn func(*sql.Conn, bool) (*sql.Conn, error)) ([]*Writer, func(), error) { | ||
| conf, pool := d.conf, d.dbHandle | ||
| writers := make([]*Writer, conf.Threads) | ||
| for i := range conf.Threads { | ||
| conn, err := createConnWithConsistency(tctx, pool, needRepeatableRead(conf.ServerInfo.ServerType, conf.Consistency)) | ||
| if err != nil { | ||
| return nil, func() {}, err | ||
| } | ||
| writer := NewWriter(tctx, int64(i), conf, conn, d.extStore, d.metrics) | ||
| writer.rebuildConnFn = rebuildConnFn | ||
| writer.setFinishTableCallBack(func(task Task) { | ||
| if _, ok := task.(*TaskTableData); ok { | ||
| IncCounter(d.metrics.finishedTablesCounter) | ||
| // FIXME: actually finishing the last chunk doesn't means this table is 'finished'. | ||
| // We can call this table is 'finished' if all its chunks are finished. | ||
| // Comment this log now to avoid ambiguity. | ||
| // tctx.L().Debug("finished dumping table data", | ||
| // zap.String("database", td.Meta.DatabaseName()), | ||
| // zap.String("table", td.Meta.TableName())) | ||
| if _, _err_ := failpoint.Eval(_curpkg_("EnableLogProgress")); _err_ == nil { | ||
| time.Sleep(1 * time.Second) | ||
| tctx.L().Debug("EnableLogProgress, sleep 1s") | ||
| } | ||
| } | ||
| }) | ||
| writer.setFinishTaskCallBack(func(task Task) { | ||
| IncGauge(d.metrics.taskChannelCapacity) | ||
| if td, ok := task.(*TaskTableData); ok { | ||
| d.metrics.completedChunks.Add(1) | ||
| tctx.L().Debug("finish dumping table data task", | ||
| zap.String("database", td.Meta.DatabaseName()), | ||
| zap.String("table", td.Meta.TableName()), | ||
| zap.Int("chunkIdx", td.ChunkIndex)) | ||
| } | ||
| }) | ||
| wg.Go(func() error { | ||
| return writer.run(taskChan) | ||
| }) | ||
| writers[i] = writer |
There was a problem hiding this comment.
Don't launch writer goroutines before the full writer set is constructed.
If createConnWithConsistency fails on a later iteration, earlier writers have already been started with wg.Go, but this function returns a no-op teardown. Dump then exits before those goroutines or connections are cleaned up.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@dumpling/export/dump.go` around lines 346 - 385, startWriters currently
starts each writer goroutine inside the loop so if createConnWithConsistency
fails later earlier writers are left running; change startWriters to first
construct all Writer instances and keep their sql.Conn references (using
createConnWithConsistency) without calling wg.Go, and only after the loop
completes successfully iterate over the constructed writers to call wg.Go(func()
error { return writer.run(taskChan) }); if an error occurs during construction
clean up by closing any already-created conns and calling any writer cleanup
(e.g., writer.Close or similar) before returning the teardown no-op; refer to
startWriters, createConnWithConsistency, Writer, writer.run, wg.Go and the
teardown return to implement this safe two-phase initialization and cleanup.
| var ( | ||
| startKey, decodedKey sql.NullString | ||
| rowID = -1 | ||
| ) | ||
| const ( | ||
| tableRegionSQL = "SELECT START_KEY,tidb_decode_key(START_KEY) from INFORMATION_SCHEMA.TIKV_REGION_STATUS s WHERE s.DB_NAME = ? AND s.TABLE_NAME = ? AND IS_INDEX = 0 ORDER BY START_KEY;" | ||
| tidbRowID = "_tidb_rowid=" | ||
| ) | ||
| dbName, tableName := meta.DatabaseName(), meta.TableName() | ||
| logger := tctx.L().With(zap.String("database", dbName), zap.String("table", tableName)) | ||
| err = conn.QuerySQL(tctx, func(rows *sql.Rows) error { | ||
| rowID++ | ||
| err = rows.Scan(&startKey, &decodedKey) | ||
| if err != nil { | ||
| return errors.Trace(err) | ||
| } | ||
| // first region's start key has no use. It may come from another table or might be invalid | ||
| if rowID == 0 { | ||
| return nil | ||
| } | ||
| if !startKey.Valid { | ||
| logger.Debug("meet invalid start key", zap.Int("rowID", rowID)) | ||
| return nil | ||
| } | ||
| if !decodedKey.Valid { | ||
| logger.Debug("meet invalid decoded start key", zap.Int("rowID", rowID), zap.String("startKey", startKey.String)) | ||
| return nil | ||
| } | ||
| pkVal, err2 := extractTiDBRowIDFromDecodedKey(tidbRowID, decodedKey.String) | ||
| if err2 != nil { | ||
| logger.Debug("cannot extract pkVal from decoded start key", | ||
| zap.Int("rowID", rowID), zap.String("startKey", startKey.String), zap.String("decodedKey", decodedKey.String), log.ShortError(err2)) | ||
| } else { | ||
| pkVals = append(pkVals, []string{pkVal}) | ||
| } | ||
| return nil | ||
| }, func() { | ||
| pkFields = pkFields[:0] | ||
| pkVals = pkVals[:0] | ||
| }, tableRegionSQL, dbName, tableName) |
There was a problem hiding this comment.
Reset the retry state correctly in selectTiDBTableRegion.
pkFields is computed before the query, so clearing it in the reset callback is wrong, and rowID also needs to be restored to -1 for a retried result set. As written, a successful retry can return boundaries with an empty handle-column list and stop skipping the first row.
🔁 Suggested retry-state fix
}, func() {
- pkFields = pkFields[:0]
pkVals = pkVals[:0]
+ rowID = -1
}, tableRegionSQL, dbName, tableName)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| var ( | |
| startKey, decodedKey sql.NullString | |
| rowID = -1 | |
| ) | |
| const ( | |
| tableRegionSQL = "SELECT START_KEY,tidb_decode_key(START_KEY) from INFORMATION_SCHEMA.TIKV_REGION_STATUS s WHERE s.DB_NAME = ? AND s.TABLE_NAME = ? AND IS_INDEX = 0 ORDER BY START_KEY;" | |
| tidbRowID = "_tidb_rowid=" | |
| ) | |
| dbName, tableName := meta.DatabaseName(), meta.TableName() | |
| logger := tctx.L().With(zap.String("database", dbName), zap.String("table", tableName)) | |
| err = conn.QuerySQL(tctx, func(rows *sql.Rows) error { | |
| rowID++ | |
| err = rows.Scan(&startKey, &decodedKey) | |
| if err != nil { | |
| return errors.Trace(err) | |
| } | |
| // first region's start key has no use. It may come from another table or might be invalid | |
| if rowID == 0 { | |
| return nil | |
| } | |
| if !startKey.Valid { | |
| logger.Debug("meet invalid start key", zap.Int("rowID", rowID)) | |
| return nil | |
| } | |
| if !decodedKey.Valid { | |
| logger.Debug("meet invalid decoded start key", zap.Int("rowID", rowID), zap.String("startKey", startKey.String)) | |
| return nil | |
| } | |
| pkVal, err2 := extractTiDBRowIDFromDecodedKey(tidbRowID, decodedKey.String) | |
| if err2 != nil { | |
| logger.Debug("cannot extract pkVal from decoded start key", | |
| zap.Int("rowID", rowID), zap.String("startKey", startKey.String), zap.String("decodedKey", decodedKey.String), log.ShortError(err2)) | |
| } else { | |
| pkVals = append(pkVals, []string{pkVal}) | |
| } | |
| return nil | |
| }, func() { | |
| pkFields = pkFields[:0] | |
| pkVals = pkVals[:0] | |
| }, tableRegionSQL, dbName, tableName) | |
| var ( | |
| startKey, decodedKey sql.NullString | |
| rowID = -1 | |
| ) | |
| const ( | |
| tableRegionSQL = "SELECT START_KEY,tidb_decode_key(START_KEY) from INFORMATION_SCHEMA.TIKV_REGION_STATUS s WHERE s.DB_NAME = ? AND s.TABLE_NAME = ? AND IS_INDEX = 0 ORDER BY START_KEY;" | |
| tidbRowID = "_tidb_rowid=" | |
| ) | |
| dbName, tableName := meta.DatabaseName(), meta.TableName() | |
| logger := tctx.L().With(zap.String("database", dbName), zap.String("table", tableName)) | |
| err = conn.QuerySQL(tctx, func(rows *sql.Rows) error { | |
| rowID++ | |
| err = rows.Scan(&startKey, &decodedKey) | |
| if err != nil { | |
| return errors.Trace(err) | |
| } | |
| // first region's start key has no use. It may come from another table or might be invalid | |
| if rowID == 0 { | |
| return nil | |
| } | |
| if !startKey.Valid { | |
| logger.Debug("meet invalid start key", zap.Int("rowID", rowID)) | |
| return nil | |
| } | |
| if !decodedKey.Valid { | |
| logger.Debug("meet invalid decoded start key", zap.Int("rowID", rowID), zap.String("startKey", startKey.String)) | |
| return nil | |
| } | |
| pkVal, err2 := extractTiDBRowIDFromDecodedKey(tidbRowID, decodedKey.String) | |
| if err2 != nil { | |
| logger.Debug("cannot extract pkVal from decoded start key", | |
| zap.Int("rowID", rowID), zap.String("startKey", startKey.String), zap.String("decodedKey", decodedKey.String), log.ShortError(err2)) | |
| } else { | |
| pkVals = append(pkVals, []string{pkVal}) | |
| } | |
| return nil | |
| }, func() { | |
| pkVals = pkVals[:0] | |
| rowID = -1 | |
| }, tableRegionSQL, dbName, tableName) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@dumpling/export/dump.go` around lines 1062 - 1101, The reset callback inside
selectTiDBTableRegion currently clears pkFields and doesn't restore rowID, which
is wrong because pkFields is computed before the query and rowID must be reset
for retries; change the reset callback to restore rowID = -1 and only clear
pkVals (and any per-run state), leaving pkFields untouched so retries keep the
computed handle-column list and the first-row skip behavior is preserved. Ensure
references to rowID, pkVals, and pkFields in the reset closure are updated
accordingly.
| // Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. | ||
|
|
||
| package export | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "context" | ||
| "database/sql" | ||
| "database/sql/driver" | ||
| "encoding/hex" | ||
| "fmt" | ||
| "math/big" | ||
| "net" | ||
| "slices" | ||
| "strconv" | ||
| "strings" | ||
| "sync/atomic" | ||
| "time" | ||
|
|
||
| "github.com/coreos/go-semver/semver" | ||
| // import mysql driver | ||
| "github.com/go-sql-driver/mysql" | ||
| "github.com/google/uuid" | ||
| "github.com/pingcap/errors" | ||
| "github.com/pingcap/failpoint" | ||
| pclog "github.com/pingcap/log" | ||
| "github.com/pingcap/tidb/br/pkg/summary" | ||
| "github.com/pingcap/tidb/br/pkg/version" | ||
| "github.com/pingcap/tidb/dumpling/cli" | ||
| tcontext "github.com/pingcap/tidb/dumpling/context" | ||
| "github.com/pingcap/tidb/dumpling/log" | ||
| infoschema "github.com/pingcap/tidb/pkg/infoschema/context" | ||
| "github.com/pingcap/tidb/pkg/objstore" | ||
| "github.com/pingcap/tidb/pkg/objstore/storeapi" | ||
| "github.com/pingcap/tidb/pkg/parser" | ||
| "github.com/pingcap/tidb/pkg/parser/ast" | ||
| "github.com/pingcap/tidb/pkg/parser/format" | ||
| "github.com/pingcap/tidb/pkg/store/helper" | ||
| "github.com/pingcap/tidb/pkg/tablecodec" | ||
| "github.com/pingcap/tidb/pkg/util" | ||
| "github.com/pingcap/tidb/pkg/util/codec" | ||
| pd "github.com/tikv/pd/client" | ||
| "github.com/tikv/pd/client/pkg/caller" | ||
| gatomic "go.uber.org/atomic" | ||
| "go.uber.org/zap" | ||
| "golang.org/x/sync/errgroup" | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
sed -n '1900,1960p' dumpling/export/dump.go
printf '\n--- package/import markers ---\n'
rg -n '^(package export|import \()$' dumpling/export/dump.goRepository: pingcap/tidb
Length of output: 1993
Remove the duplicate package export block and import declarations starting at line 1908.
The file contains a second package export declaration at line 1908 and a duplicate import block at line 1910. Go does not allow multiple package clauses or import blocks in a single file, so this will fail compilation.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@dumpling/export/dump.go` around lines 1906 - 1952, There is a duplicate
"package export" declaration and a repeated import block; remove the second
package clause and the duplicate import block (the redundant lines starting with
"package export" and the following import (...) section) so the file has only
one package statement and one import block; ensure any referenced symbols
(types/functions like Dump, ExportTask, or imports such as fmt, context,
github.com/go-sql-driver/mysql) are still covered by the remaining import block
and run go build to confirm the file compiles.
What problem does this PR solve?
Issue Number: close #67578
Problem Summary:
Flaky test
TestTopRUPipelineInProcessIntegrationinpkg/util/topsql/reporterintermittently fails, so this PR stabilizes that path.What changed and how does it work?
Root Cause
takeDataAndSendToReportChansnapshottedruAggregatorwithout first draining already-bufferedcollectRUIncrementsChanbatches, so queued TopRU increments could be omitted from the current report or shifted to a later window depending on scheduling.Fix
takeDataAndSendToReportChannow drains the currently queued RU batches before building TopRU records, andTestTopRUHandoverEdgeCases/report snapshot includes queued RU batchwas added as a precise regression subtest for that boundary.Verification
Native repro was weak (
./tools/check/failpoint-go-test.sh pkg/util/topsql/reporter -run '^TestTopRUPipelineInProcessIntegration$' -count=20passed pre-fix); the new precise repro failed pre-fix, then passed post-fix under scoped failpoint enablement forpkg/util/topsql/reporter, the original flaky passed withgo test ./pkg/util/topsql/reporter -run '^TestTopRUPipelineInProcessIntegration$' -count=20 -tags=intest,deadlock, adjacent TopRU tests passed, andmake lintpassed.Check List
Tests
Side effects
Documentation
Release note
Please refer to Release Notes Language Style Guide to write a quality release note.
Fixes #67578
Summary by CodeRabbit
Release Notes
Bug Fixes
Tests