Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions dumpling/export/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"task.go",
"util.go",
"writer.go",
"writer_parquet.go",
"writer_util.go",
],
importpath = "github.com/pingcap/tidb/dumpling/export",
Expand Down Expand Up @@ -65,6 +66,13 @@ go_library(
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//http",
"@com_github_tikv_pd_client//pkg/caller",
"@com_github_xitongsys_parquet_go//layout",
"@com_github_xitongsys_parquet_go//marshal",
"@com_github_xitongsys_parquet_go//parquet",
"@com_github_xitongsys_parquet_go//schema",
"@com_github_xitongsys_parquet_go//types",
"@com_github_xitongsys_parquet_go//writer",
"@com_github_xitongsys_parquet_go_source//buffer",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_x_sync//errgroup",
"@org_uber_go_atomic//:atomic",
Expand All @@ -91,6 +99,7 @@ go_test(
"status_test.go",
"util_for_test.go",
"util_test.go",
"writer_parquet_test.go",
"writer_serial_test.go",
"writer_test.go",
],
Expand Down Expand Up @@ -118,6 +127,9 @@ go_test(
"@com_github_prometheus_client_golang//prometheus/collectors",
"@com_github_spf13_pflag//:pflag",
"@com_github_stretchr_testify//require",
"@com_github_xitongsys_parquet_go//reader",
"@com_github_xitongsys_parquet_go//types",
"@com_github_xitongsys_parquet_go_source//local",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//clients/gc",
"@org_golang_x_sync//errgroup",
Expand Down
48 changes: 44 additions & 4 deletions dumpling/export/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ const (
flagClusterSSLCA = "cluster-ssl-ca"
flagClusterSSLCert = "cluster-ssl-cert"
flagClusterSSLKey = "cluster-ssl-key"
flagParquetCompress = "parquet-compress"
flagParquetPageSize = "parquet-page-size"
flagParquetRowGroupSize = "parquet-row-group-size"

// FlagHelp represents the help flag
FlagHelp = "help"
Expand Down Expand Up @@ -203,11 +206,27 @@ type Config struct {
PDAddr string
// ClusterSSLCA/ClusterSSLCert/ClusterSSLKey override Security.* when connecting
// to PD endpoints for GC control.
ClusterSSLCA string
ClusterSSLCert string
ClusterSSLKey string
ClusterSSLCA string
ClusterSSLCert string
ClusterSSLKey string
ParquetCompressType ParquetCompressType
ParquetPageSize int64
ParquetRowGroupSize int64
}

type ParquetCompressType string

const (
// NoCompression won't compress given bytes.
NoCompression ParquetCompressType = "no-compression"
// Gzip will compress given bytes in gzip format.
Gzip ParquetCompressType = "gz"
// Snappy will compress given bytes in snappy format.
Snappy ParquetCompressType = "snappy"
// Zstd will compress given bytes in zstd format.
Zstd ParquetCompressType = "zst"
)

// ServerInfoUnknown is the unknown database type to dumpling
var ServerInfoUnknown = version.ServerInfo{
ServerType: version.ServerTypeUnknown,
Expand Down Expand Up @@ -352,7 +371,7 @@ func (*Config) DefineFlags(flags *pflag.FlagSet) {
"If not specified, dumpling will dump table without inner-concurrency which could be relatively slow. default unlimited")
flags.String(flagWhere, "", "Dump only selected records")
flags.Bool(flagEscapeBackslash, true, "use backslash to escape special characters")
flags.String(flagFiletype, "", "The type of export file (sql/csv)")
flags.String(flagFiletype, "", "The type of export file (sql/csv/parquet)")
flags.Bool(flagNoHeader, false, "whether not to dump CSV table header")
flags.BoolP(flagNoSchemas, "m", false, "Do not dump table schemas with the data")
flags.BoolP(flagNoData, "d", false, "Do not dump table data")
Expand Down Expand Up @@ -384,6 +403,9 @@ func (*Config) DefineFlags(flags *pflag.FlagSet) {
flags.String(flagClusterSSLCA, "", "CA certificate path for TLS connections to PD endpoints used by GC control; if empty, reuse --ca")
flags.String(flagClusterSSLCert, "", "Client certificate path for TLS connections to PD endpoints used by GC control; if empty, reuse --cert")
flags.String(flagClusterSSLKey, "", "Client private key path for TLS connections to PD endpoints used by GC control; if empty, reuse --key")
flags.String(flagParquetCompress, "snappy", "Compress algorithm for parquet file, support 'no-compression', 'snappy', 'gzip', 'zstd'")
flags.Int64(flagParquetPageSize, 1024*1024, "Parquet page size in bytes")
flags.Int64(flagParquetRowGroupSize, 16*1024*1024, "Parquet row group size in bytes")
}

// ParseFromFlags parses dumpling's export.Config from flags
Expand Down Expand Up @@ -625,6 +647,20 @@ func (conf *Config) ParseFromFlags(flags *pflag.FlagSet) error {
return errors.Errorf("%s is only supported when dumping whole table to csv, not compatible with %s", flagCsvOutputDialect, conf.FileType)
}
conf.CsvOutputDialect, err = ParseOutputDialect(dialect)

parquetCompressType, err := flags.GetString(flagParquetCompress)
if err != nil {
return errors.Trace(err)
}
conf.ParquetCompressType, err = ParseParquetCompressType(parquetCompressType)
if err != nil {
return errors.Trace(err)
}
conf.ParquetPageSize, err = flags.GetInt64(flagParquetPageSize)
if err != nil {
return errors.Trace(err)
}
conf.ParquetRowGroupSize, err = flags.GetInt64(flagParquetRowGroupSize)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -818,6 +854,10 @@ func adjustFileFormat(conf *Config) error {
return errors.Errorf("unsupported config.FileType '%s' when we specify --sql, please unset --filetype or set it to 'csv'", conf.FileType)
}
case FileFormatCSVString:
case FileFormatParquetString:
if conf.CompressType != storage.NoCompression {
return errors.Errorf("parquet does not support --compress, please unset it or use --parquet-compress instead")
}
Comment on lines +857 to +860
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Confirm the correct NoCompression constant in compressedio (or whatever package owns CompressType).
rg -nP '\bNoCompression\b' -g '!**/dumpling/export/**'
rg -nP 'type\s+CompressType\b' -g 'pkg/objstore/**'

Repository: pingcap/tidb

Length of output: 1415


🏁 Script executed:

# Check imports at the top of the file
head -100 dumpling/export/config.go | grep -n "import\|^package"

# Check lines around 857-860
sed -n '850,865p' dumpling/export/config.go | cat -n

# Check line 221 for the ParquetCompressType constant
sed -n '215,230p' dumpling/export/config.go | cat -n

Repository: pingcap/tidb

Length of output: 1441


🏁 Script executed:

# Show full imports section of the file
sed -n '5,50p' dumpling/export/config.go | cat -n

# Show what CompressType is defined as - check the Config struct
grep -n "CompressType" dumpling/export/config.go | head -20

Repository: pingcap/tidb

Length of output: 2363


Compile error: storage is not imported.

Line 858 references storage.NoCompression, but this file does not import a storage package. It imports compressedio (line 20), and conf.CompressType is a compressedio.CompressType (line 146), so the check must use compressedio.NoCompression. Note that there is a distinct local NoCompression ParquetCompressType constant at line 221, but that's a different type.

Fix
 	case FileFormatParquetString:
-		if conf.CompressType != storage.NoCompression {
+		if conf.CompressType != compressedio.NoCompression {
 			return errors.Errorf("parquet does not support --compress, please unset it or use --parquet-compress instead")
 		}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
case FileFormatParquetString:
if conf.CompressType != storage.NoCompression {
return errors.Errorf("parquet does not support --compress, please unset it or use --parquet-compress instead")
}
case FileFormatParquetString:
if conf.CompressType != compressedio.NoCompression {
return errors.Errorf("parquet does not support --compress, please unset it or use --parquet-compress instead")
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@dumpling/export/config.go` around lines 857 - 860, The check in the
FileFormatParquetString case uses an undefined storage.NoCompression; change it
to use the correct type's constant compressedio.NoCompression (because
conf.CompressType is a compressedio.CompressType), i.e., replace
storage.NoCompression with compressedio.NoCompression in the if that compares
conf.CompressType; leave the existing ParquetCompressType.NoCompression constant
untouched.

default:
return errors.Errorf("unknown config.FileType '%s'", conf.FileType)
}
Expand Down
10 changes: 10 additions & 0 deletions dumpling/export/ir.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ type TableMeta interface {
ShowCreateView() string
AvgRowLength() uint64
HasImplicitRowID() bool
ColumnInfos() []*ColumnInfo
}

type ColumnInfo struct {
Name string
Type string
Nullable bool
Precision int64
Scale int64
}

// SQLRowIter is the iterator on a collection of sql.Row.
Expand All @@ -57,6 +66,7 @@ type RowReceiverStringer interface {
type Stringer interface {
WriteToBuffer(*bytes.Buffer, bool)
WriteToBufferInCsv(*bytes.Buffer, bool, *csvOption)
GetRawBytes() []sql.RawBytes
}

// RowReceiver is an interface which represents sql types that support bind address for *sql.Rows
Expand Down
19 changes: 19 additions & 0 deletions dumpling/export/ir_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,25 @@ type tableMeta struct {
hasImplicitRowID bool
}

func (tm *tableMeta) ColumnInfos() []*ColumnInfo {
columnInfos := make([]*ColumnInfo, 0, len(tm.colTypes))
for _, ct := range tm.colTypes {
nullable, _ := ct.Nullable()
precision, scale, ok := ct.DecimalSize()
if !ok {
precision, scale = 0, 0
}
columnInfos = append(columnInfos, &ColumnInfo{
Name: ct.Name(),
Type: ct.DatabaseTypeName(),
Nullable: nullable,
Precision: precision,
Scale: scale,
})
}
return columnInfos
}

func (tm *tableMeta) ColumnTypes() []string {
colTypes := make([]string, len(tm.colTypes))
for i, ct := range tm.colTypes {
Expand Down
21 changes: 21 additions & 0 deletions dumpling/export/sql_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,15 @@ func (r *RowReceiverArr) WriteToBufferInCsv(bf *bytes.Buffer, escapeBackslash bo
}
}

func (r RowReceiverArr) GetRawBytes() []sql.RawBytes {
rawBytes := make([]sql.RawBytes, len(r.receivers))
for i, receiver := range r.receivers {
receiver.GetRawBytes()
rawBytes[i] = receiver.GetRawBytes()[0]
}
return rawBytes
}
Comment on lines +233 to +240
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Remove redundant GetRawBytes() call and use a pointer receiver.

Line 236 invokes receiver.GetRawBytes() and discards the result, then line 237 calls it again — the first call is dead code. Also, this method uses a value receiver while every other method on RowReceiverArr (BindAddress, WriteToBuffer, WriteToBufferInCsv) uses a pointer receiver; the inconsistency risks method-set surprises when satisfying interfaces through a pointer. Finally, the method is exported and lacks a doc comment.

🐛 Proposed fix
-func (r RowReceiverArr) GetRawBytes() []sql.RawBytes {
-	rawBytes := make([]sql.RawBytes, len(r.receivers))
-	for i, receiver := range r.receivers {
-		receiver.GetRawBytes()
-		rawBytes[i] = receiver.GetRawBytes()[0]
-	}
-	return rawBytes
-}
+// GetRawBytes implements Stringer.GetRawBytes by collecting the first raw-bytes
+// element from each underlying receiver.
+func (r *RowReceiverArr) GetRawBytes() []sql.RawBytes {
+	rawBytes := make([]sql.RawBytes, len(r.receivers))
+	for i, receiver := range r.receivers {
+		rawBytes[i] = receiver.GetRawBytes()[0]
+	}
+	return rawBytes
+}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (r RowReceiverArr) GetRawBytes() []sql.RawBytes {
rawBytes := make([]sql.RawBytes, len(r.receivers))
for i, receiver := range r.receivers {
receiver.GetRawBytes()
rawBytes[i] = receiver.GetRawBytes()[0]
}
return rawBytes
}
// GetRawBytes implements Stringer.GetRawBytes by collecting the first raw-bytes
// element from each underlying receiver.
func (r *RowReceiverArr) GetRawBytes() []sql.RawBytes {
rawBytes := make([]sql.RawBytes, len(r.receivers))
for i, receiver := range r.receivers {
rawBytes[i] = receiver.GetRawBytes()[0]
}
return rawBytes
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@dumpling/export/sql_type.go` around lines 233 - 240, The GetRawBytes method
on RowReceiverArr currently makes a redundant call and uses a value receiver;
change its signature to use a pointer receiver (func (r *RowReceiverArr)
GetRawBytes() []sql.RawBytes), remove the dead receiver.GetRawBytes() call so
you only call receiver.GetRawBytes()[0] once when populating rawBytes from
r.receivers, and add a short doc comment above the exported GetRawBytes method;
ensure consistency with the other methods BindAddress, WriteToBuffer, and
WriteToBufferInCsv which are pointer receivers.


// SQLTypeNumber implements RowReceiverStringer which represents numeric type columns in database
type SQLTypeNumber struct {
SQLTypeString
Expand All @@ -253,6 +262,10 @@ func (s SQLTypeNumber) WriteToBufferInCsv(bf *bytes.Buffer, _ bool, opt *csvOpti
}
}

func (s *SQLTypeNumber) GetRawBytes() []sql.RawBytes {
return []sql.RawBytes{s.RawBytes}
}

// SQLTypeString implements RowReceiverStringer which represents string type columns in database
type SQLTypeString struct {
sql.RawBytes
Expand Down Expand Up @@ -285,6 +298,10 @@ func (s *SQLTypeString) WriteToBufferInCsv(bf *bytes.Buffer, escapeBackslash boo
}
}

func (s *SQLTypeString) GetRawBytes() []sql.RawBytes {
return []sql.RawBytes{s.RawBytes}
}

// SQLTypeBytes implements RowReceiverStringer which represents bytes type columns in database
type SQLTypeBytes struct {
sql.RawBytes
Expand Down Expand Up @@ -321,3 +338,7 @@ func (s *SQLTypeBytes) WriteToBufferInCsv(bf *bytes.Buffer, escapeBackslash bool
bf.WriteString(opt.nullValue)
}
}

func (s *SQLTypeBytes) GetRawBytes() []sql.RawBytes {
return []sql.RawBytes{s.RawBytes}
}
23 changes: 23 additions & 0 deletions dumpling/export/util_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ type mockTableIR struct {
hasImplicitRowID bool
rowErr error
rows *sql.Rows
columnInfos []*ColumnInfo
SQLRowIter
}

Expand Down Expand Up @@ -256,6 +257,10 @@ func (m *mockTableIR) EscapeBackSlash() bool {
return m.escapeBackSlash
}

func (m *mockTableIR) ColumnInfos() []*ColumnInfo {
return m.columnInfos
}

func newMockTableIR(databaseName, tableName string, data [][]driver.Value, specialComments, colTypes []string) *mockTableIR {
return &mockTableIR{
dbName: databaseName,
Expand All @@ -268,3 +273,21 @@ func newMockTableIR(databaseName, tableName string, data [][]driver.Value, speci
SQLRowIter: nil,
}
}

func newMockTableIRWithColumnInfo(databaseName, tableName string, data [][]driver.Value, specialComments []string, infos []*ColumnInfo) *mockTableIR {
colTypes := make([]string, len(infos))
for i, info := range infos {
colTypes[i] = info.Type
}
return &mockTableIR{
dbName: databaseName,
tblName: tableName,
data: data,
specCmt: specialComments,
selectedField: "*",
selectedLen: len(infos),
colTypes: colTypes,
SQLRowIter: nil,
columnInfos: infos,
}
}
8 changes: 7 additions & 1 deletion dumpling/export/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func NewWriter(
sw.fileFmt = FileFormatSQLText
case FileFormatCSVString:
sw.fileFmt = FileFormatCSV
case FileFormatParquetString:
sw.fileFmt = FileFormatParquet
}
return sw
}
Expand Down Expand Up @@ -232,7 +234,11 @@ func (w *Writer) WriteTableData(meta TableMeta, ir TableDataIR, currentChunk int
func (w *Writer) tryToWriteTableData(tctx *tcontext.Context, meta TableMeta, ir TableDataIR, curChkIdx int) error {
conf, format := w.conf, w.fileFmt
namer := newOutputFileNamer(meta, curChkIdx, conf.Rows != UnspecifiedSize, conf.FileSize != UnspecifiedSize)
fileName, err := namer.NextName(conf.OutputFileTemplate, w.fileFmt.Extension())
fileFmtExtension := format.Extension()
if format == FileFormatParquet && conf.ParquetCompressType != NoCompression {
fileFmtExtension = fmt.Sprintf("%s.%s", conf.ParquetCompressType, fileFmtExtension)
}
fileName, err := namer.NextName(conf.OutputFileTemplate, fileFmtExtension)
if err != nil {
return err
}
Expand Down
Loading
Loading