[WIP] dumpling: support export to parquet type#67846
[WIP] dumpling: support export to parquet type#67846shiyuhang0 wants to merge 4 commits intopingcap:masterfrom
Conversation
|
@shiyuhang0 I've received your pull request and will start the review. I'll conduct a thorough review covering code quality, potential issues, and implementation details. ⏳ This process typically takes 10-30 minutes depending on the complexity of the changes. ℹ️ Learn more details on Pantheon AI. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughAdds Parquet export support to dumpling with configurable compression, page/row-group sizes, and schema inference. Extends core interfaces to expose column metadata and raw bytes; implements Parquet writer with type conversion and schema handling; updates file writing logic to support Parquet filename extensions; includes configuration parsing and round-trip validation tests. Changes
Sequence DiagramsequenceDiagram
participant Caller
participant WriteInsertInParquet
participant TableDataIR as TableDataIR<br/>(Iterator)
participant ParquetWriter
participant ExternalFileWriter
participant Metrics
Caller->>WriteInsertInParquet: WriteInsertInParquet(ctx, cfg, meta, tblIR, writer)
activate WriteInsertInParquet
WriteInsertInParquet->>ParquetWriter: NewParquetWriter(buffer, columns, cfg)
activate ParquetWriter
ParquetWriter->>ParquetWriter: NewSchemaHandlerFromSQL(columns)
ParquetWriter->>ParquetWriter: Configure compression, page/row-group sizes
ParquetWriter-->>WriteInsertInParquet: SQLWriter initialized
deactivate ParquetWriter
WriteInsertInParquet->>TableDataIR: Next()
activate TableDataIR
loop for each row
TableDataIR-->>WriteInsertInParquet: row data
WriteInsertInParquet->>WriteInsertInParquet: convertDataToParquet(rawBytes)
WriteInsertInParquet->>ParquetWriter: WriteRow(converted values)
ParquetWriter->>ParquetWriter: Marshal row to Parquet format
WriteInsertInParquet->>Metrics: Update row/byte counts
end
TableDataIR-->>WriteInsertInParquet: EOF or error
deactivate TableDataIR
WriteInsertInParquet->>ParquetWriter: WriteStop()
WriteInsertInParquet->>ExternalFileWriter: Flush buffered data
WriteInsertInParquet->>Metrics: Report success/failure
WriteInsertInParquet-->>Caller: (rowCount, error)
deactivate WriteInsertInParquet
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ❌ 3❌ Failed checks (2 warnings, 1 inconclusive)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
d2231aa to
a6163f1
Compare
|
@shiyuhang0: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
There was a problem hiding this comment.
Actionable comments posted: 9
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
dumpling/export/writer.go (1)
237-275:⚠️ Potential issue | 🟠 MajorCompression suffix dropped for subsequent Parquet chunks.
When
conf.FileSize != UnspecifiedSize, the loop at line 272 generates subsequent chunk names withw.fileFmt.Extension(), which is just"parquet"— theconf.ParquetCompressTypeprefix applied at lines 237-240 to the first chunk is missing. Multi-chunk Parquet dumps with compression will produce the first chunk with e.g..snappy.parquetand follow-up chunks with.parquet, inconsistent naming that downstream loaders will likely treat as different formats.Extract the compression-aware extension once and reuse it:
🐛 Proposed fix
- fileFmtExtension := format.Extension() - if format == FileFormatParquet && conf.ParquetCompressType != NoCompression { - fileFmtExtension = fmt.Sprintf("%s.%s", conf.ParquetCompressType, fileFmtExtension) - } - fileName, err := namer.NextName(conf.OutputFileTemplate, fileFmtExtension) + fileFmtExtension := format.Extension() + if format == FileFormatParquet && conf.ParquetCompressType != NoCompression { + fileFmtExtension = fmt.Sprintf("%s.%s", conf.ParquetCompressType, fileFmtExtension) + } + fileName, err := namer.NextName(conf.OutputFileTemplate, fileFmtExtension) @@ - fileName, err = namer.NextName(conf.OutputFileTemplate, w.fileFmt.Extension()) + fileName, err = namer.NextName(conf.OutputFileTemplate, fileFmtExtension)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@dumpling/export/writer.go` around lines 237 - 275, The first chunk's compression-aware extension (currently computed into fileFmtExtension before the loop) must be captured and reused for subsequent chunk names instead of calling w.fileFmt.Extension(); change the loop to use that stored extension when calling namer.NextName so all chunks use the same compression-prefixed extension (update references to fileFmtExtension and the namer.NextName call that currently uses w.fileFmt.Extension()); ensure you still compute the initial fileName using namer.NextName(conf.OutputFileTemplate, fileFmtExtension) before entering the loop and then reuse fileFmtExtension for later iterations.
🧹 Nitpick comments (9)
dumpling/export/ir.go (1)
41-47: Add a doc comment on the exportedColumnInfo.Per TiDB Go conventions, exported types should carry a short doc comment explaining their purpose (e.g. that it summarizes a column's metadata for Parquet schema generation).
📝 Suggested diff
+// ColumnInfo describes a single column's metadata (type, nullability, precision/scale) +// as consumed by the Parquet writer for schema generation. type ColumnInfo struct { Name string Type string Nullable bool Precision int64 Scale int64 }As per coding guidelines: "Keep exported-symbol doc comments, and prefer semantic constraints over name restatement."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@dumpling/export/ir.go` around lines 41 - 47, Add a short exported doc comment for the ColumnInfo type explaining its purpose and semantics: state that ColumnInfo summarizes a table column's metadata used for Parquet schema generation (mapping Name, Type, Nullable, and numeric Precision/Scale), and clarify units/meaning for Precision and Scale if needed; place the comment immediately above the ColumnInfo type declaration so it follows Go/TiDB convention.dumpling/export/config.go (3)
236-286: Consider setting Parquet defaults inDefaultConfig().Flag defaults cover the CLI path, but library embedders constructing
ConfigviaDefaultConfig()(e.g. tests or programmatic callers) will get zero-valuedParquetCompressType(""),ParquetPageSize(0) andParquetRowGroupSize(0), which will silently produce invalid Parquet output. Consider seeding sane defaults (e.g.Snappy,1 MiB,16 MiB) to match the flag defaults on lines 406-408.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@dumpling/export/config.go` around lines 236 - 286, DefaultConfig() currently leaves ParquetCompressType, ParquetPageSize and ParquetRowGroupSize as zero-values which can produce invalid Parquet files when callers use the library path; set sensible defaults inside DefaultConfig() by assigning ParquetCompressType to "Snappy" (or the enum/value used), ParquetPageSize to 1 MiB and ParquetRowGroupSize to 16 MiB (matching the CLI flag defaults) so programmatic consumers get the same safe defaults as the CLI.
651-658:ParseParquetCompressTypeis referenced but not defined.
ParseParquetCompressTypeis called here but I don't see a definition in the changed files — this matches the "missing ParseParquetCompressType function" item flagged in the PR description. The package will not compile until it is added (e.g. aswitchover"no-compression" | "snappy" | "gz" | "zst"returningParquetCompressType, mirroringParseOutputDialect).Note: the CLI flag default is
"snappy"(line 406), but the constant on line 225 isSnappy ParquetCompressType = "snappy"— the parser must map the user-facing string"gzip"(mentioned in the help text on line 406) to the constant value"gz", otherwise--parquet-compress gzipwill fail validation despite being advertised.Want me to sketch a
ParseParquetCompressTypethat accepts both user-facing aliases ("gzip","zstd") and the canonical values ("gz","zst")?🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@dumpling/export/config.go` around lines 651 - 658, ParseParquetCompressType is missing causing build/validation failures for --parquet-compress; add a ParseParquetCompressType(par string) (ParquetCompressType, error) function (similar to ParseOutputDialect) that accepts both canonical values ("no-compression","snappy","gz","zst") and user-facing aliases ("gzip" -> "gz", "gzip" alternative, "zstd" -> "zst") and returns the corresponding ParquetCompressType constant (e.g. Snappy) or a descriptive error; update references to flagParquetCompress handling to use this parser so inputs like "gzip" and "zstd" validate as expected.
209-228: Field placement and name clarity.Two minor readability nits:
ParquetCompressType/PageSize/RowGroupSizeare wedged in after theClusterSSL*fields; grouping them with related output settings (nearFileType,CsvOutputDialect, etc.) would make the struct easier to scan.- The unqualified constant name
NoCompressionon line 221 is easy to confuse withcompressedio.NoCompression. A prefix likeParquetNoCompression/ParquetSnappy/ParquetGzip/ParquetZstdwould make intent self-evident at call sites (and would have prevented thestorage.NoCompressionslip on line 858).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@dumpling/export/config.go` around lines 209 - 228, Move the Parquet-related fields (ParquetCompressType, ParquetPageSize, ParquetRowGroupSize) out from after the ClusterSSL* fields and place them with the other output settings near FileType and CsvOutputDialect to improve grouping and readability; also rename the Parquet compression constants (currently NoCompression, Gzip, Snappy, Zstd) to prefixed names like ParquetNoCompression, ParquetGzip, ParquetSnappy, ParquetZstd and update all usages (e.g., any references that mistakenly used storage.NoCompression) to the new names so callers of the ParquetCompressType type and its constants are unambiguous.dumpling/export/sql_type.go (1)
265-267: Add doc comments on the new exportedGetRawBytesmethods.Per TiDB conventions, these exported methods should each carry a brief doc comment (e.g. "GetRawBytes implements Stringer.GetRawBytes...").
As per coding guidelines: "Keep exported-symbol doc comments, and prefer semantic constraints over name restatement."
Also applies to: 301-303, 342-344
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@dumpling/export/sql_type.go` around lines 265 - 267, Add brief TiDB-style doc comments for the exported GetRawBytes methods (e.g., SQLTypeNumber.GetRawBytes and the other two GetRawBytes methods referenced at lines 301-303 and 342-344): each comment should be one short sentence that states the semantic purpose (for example, "GetRawBytes returns the underlying SQL raw bytes for this type." or "GetRawBytes implements the retrieval of the underlying sql.RawBytes for this value.") and avoid merely repeating the method name; place the comment immediately above each method declaration.dumpling/export/writer_parquet.go (3)
56-56: Local variablewritershadows importedwriterpackage.Inside
WriteInsertInParquetthe namewriterrebinds thegithub.com/xitongsys/parquet-go/writerpackage alias. While this compiles (the package isn't referenced inside this function), it is confusing for readers who follow from the typewriter.ParquetWriteron line 176. Rename locally.♻️ Proposed fix
- writer, err := NewParquetWriter(bf, meta.ColumnInfos(), cfg) + pw, err := NewParquetWriter(bf, meta.ColumnInfos(), cfg) if err != nil { return 0, errors.Trace(err) } ... - err = writer.WriteRow(*row) + err = pw.WriteRow(*row) ... - writer.PFile = buffer.BufferFile{ + pw.PFile = buffer.BufferFile{ Writer: bf, } ... - if err = writer.WriteStop(); err != nil { + if err = pw.WriteStop(); err != nil {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@dumpling/export/writer_parquet.go` at line 56, The local variable named "writer" in the WriteInsertInParquet function shadows the imported package alias "writer"; rename the local variable returned by NewParquetWriter (e.g., to "pw" or "parquetWriter") and update all its usages in WriteInsertInParquet so they no longer conflict with the package name (this preserves clarity where the package type writer.ParquetWriter is referenced elsewhere).
28-33: Missing doc comments on exported symbols.
TagTemplateandDefaultCompressionTypeare exported but undocumented. Same applies toWriteInsertInParquet(line 35),SQLWriter(line 175), andNewParquetWriter(line 179). Adding short doc comments satisfies both lint (revive/golint) and the coding guideline.📝 Proposed fixes
const ( - TagTemplate = "name=%s, type=%s, repetitiontype=%s" - DefaultCompressionType = parquet.CompressionCodec_ZSTD + // TagTemplate is the format used to build parquet-go struct tags from column metadata. + TagTemplate = "name=%s, type=%s, repetitiontype=%s" + // DefaultCompressionType is used when the configured ParquetCompressType is unrecognized. + DefaultCompressionType = parquet.CompressionCodec_ZSTD parquetMagicNumber = "PAR1" parquetParallelNumber = 4 )+// WriteInsertInParquet streams rows from tblIR into a parquet file written through w. func WriteInsertInParquet(+// SQLWriter wraps parquet-go's ParquetWriter with per-row conversion from SQL raw bytes. type SQLWriter struct { writer.ParquetWriter } +// NewParquetWriter constructs a SQLWriter writing into bf, with schema derived from columns +// and compression/page/row-group sizing taken from cfg. func NewParquetWriter(bf *bytes.Buffer, columns []*ColumnInfo, cfg *Config) (*SQLWriter, error) {As per coding guidelines: "Keep exported-symbol doc comments, and prefer semantic constraints over name restatement."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@dumpling/export/writer_parquet.go` around lines 28 - 33, Add short, meaningful doc comments for the exported symbols TagTemplate, DefaultCompressionType, and the exported functions/types WriteInsertInParquet, SQLWriter, and NewParquetWriter: explain what each constant/value represents and any important semantic constraints (e.g., TagTemplate format, that DefaultCompressionType is the default codec used, what WriteInsertInParquet does/accepts/returns, the role of SQLWriter, and what NewParquetWriter constructs/expectations). Place comments directly above the declarations, avoid restating the identifier name, and keep them one- or two-sentence, Go-doc style summaries to satisfy linters.
364-377: "Other database" switch has a silent catch-all that may hide type mismatches.The final
return parquet.Type_BYTE_ARRAY, ""(line 377) silently coerces any unrecognized column type to a string byte-array without any log or error. If a new TiDB type is added upstream (orcolumnInfo.Typeever arrives empty due to a plumbing bug), dumps will "succeed" with string-ified data. Consider at least logging a warning, or returning an error, the first time an unknown type is encountered per table.Also note that
"DOUBLE"is listed in both the TiDB branch (line 360) and the "Other database" branch (line 374) — the second entry is unreachable. Minor code-hygiene nit.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@dumpling/export/writer_parquet.go` around lines 364 - 377, The switch in the "Other database" branch that falls through to return parquet.Type_BYTE_ARRAY, "" silently coerces unknown columnType values; update the logic in the function containing that switch to (1) detect unrecognized columnType and either return an explicit error (e.g., propagate an error value) or emit a single-table-scoped warning log the first time an unknown type is seen for a given table, and (2) remove the duplicate "DOUBLE" case in this branch since it is already handled in the TiDB branch; refer to the existing cases (e.g., the branch that returns parquet.Type_BYTE_ARRAY, parquet.Type_INT32, parquet.Type_INT64, parquet.Type_BOOLEAN, parquet.Type_DOUBLE) and ensure unknown types do not silently map to parquet.Type_BYTE_ARRAY without logging or erroring.dumpling/export/writer_parquet_test.go (1)
117-122: Duplicate case branches — merge them.Lines 117–119 and 120–122 have identical bodies; differentiating solely on column-type name adds noise without behavioral difference.
♻️ Proposed fix
- case "TINYINT", "UNSIGNED TINYINT", "SMALLINT", "MEDIUMINT", "UNSIGNED SMALLINT", "UNSIGNED MEDIUMINT", "YEAR", "INT": - value := reflect.ValueOf(readRow).FieldByName(structName).Int() - require.Equal(t, data[0][i], strconv.FormatInt(value, 10)) - case "UNSIGNED INT", "BIGINT": + case "TINYINT", "UNSIGNED TINYINT", "SMALLINT", "MEDIUMINT", "UNSIGNED SMALLINT", + "UNSIGNED MEDIUMINT", "YEAR", "INT", "UNSIGNED INT", "BIGINT": value := reflect.ValueOf(readRow).FieldByName(structName).Int() require.Equal(t, data[0][i], strconv.FormatInt(value, 10))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@dumpling/export/writer_parquet_test.go` around lines 117 - 122, The switch in writer_parquet_test.go has duplicate branches for numeric types that both read the same field and assert via strconv.FormatInt; consolidate the two case blocks into a single case listing that includes "TINYINT", "UNSIGNED TINYINT", "SMALLINT", "MEDIUMINT", "UNSIGNED SMALLINT", "UNSIGNED MEDIUMINT", "YEAR", "INT", "UNSIGNED INT", and "BIGINT" and keep the shared body which uses reflect.ValueOf(readRow).FieldByName(structName).Int() and require.Equal(t, data[0][i], strconv.FormatInt(value, 10)) so the behavior stays identical but without duplicated code.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@dumpling/export/config.go`:
- Around line 857-860: The check in the FileFormatParquetString case uses an
undefined storage.NoCompression; change it to use the correct type's constant
compressedio.NoCompression (because conf.CompressType is a
compressedio.CompressType), i.e., replace storage.NoCompression with
compressedio.NoCompression in the if that compares conf.CompressType; leave the
existing ParquetCompressType.NoCompression constant untouched.
In `@dumpling/export/sql_type.go`:
- Around line 233-240: The GetRawBytes method on RowReceiverArr currently makes
a redundant call and uses a value receiver; change its signature to use a
pointer receiver (func (r *RowReceiverArr) GetRawBytes() []sql.RawBytes), remove
the dead receiver.GetRawBytes() call so you only call receiver.GetRawBytes()[0]
once when populating rawBytes from r.receivers, and add a short doc comment
above the exported GetRawBytes method; ensure consistency with the other methods
BindAddress, WriteToBuffer, and WriteToBufferInCsv which are pointer receivers.
In `@dumpling/export/writer_parquet_test.go`:
- Around line 1-2: Add the standard TiDB Apache-2.0 license header to the top of
the new test file (writer_parquet_test.go) so it matches the header used in
writer_parquet.go; place the copyright + Apache-2.0 block above the existing
"package export" line and update the year if needed.
- Around line 101-141: The switch over c.Type inside the loop in
writer_parquet_test.go currently lacks a default, so add a default branch that
fails the test (e.g., t.Fatalf or t.FailNow with a clear message) when an
unknown column type is encountered; update the switch in the loop that
references allTypesColumnInfos, using c.Type, structName and i in the error
message to make it easy to locate which column/type was unhandled.
In `@dumpling/export/writer_parquet.go`:
- Line 294: The error message is printing a pointer address because pT is a
*parquet.Type; change the fmt.Errorf call in the return path that currently uses
pT to print the actual type by dereferencing pT (e.g. use *pT) or call its
String() method if available so the error reads the type name instead of a
pointer address; update the return in writer_parquet.go (the line returning
fmt.Errorf("unsupported type %v", pT)) to use the dereferenced value or String()
representation.
- Around line 143-151: The final send to wp.input after writer.WriteStop
currently blocks if wp.Run has exited; replace the bare send (wp.input <- bf)
with the same guarded select used in the main loop so the send is
skipped/unblocked on cancellation or error: use a select that attempts to send
bf to wp.input and also listens on pCtx.Done() and wp.errCh (the same channels
used in the loop around lines 119–135) so the goroutine won't deadlock and
close(wp.input)/<-wp.closed can proceed; keep writer.WriteStop and the
subsequent close/receive logic unchanged.
- Around line 213-230: WriteRow currently indexes
SchemaHandler.SchemaElements[i+1] for each entry in sqlRaws without checking
lengths, which can panic if sqlRaws length !=
len(SchemaHandler.SchemaElements)-1; add an upfront validation in
SQLWriter.WriteRow that compares len(sqlRaws) to
len(w.SchemaHandler.SchemaElements)-1 and return a clear error if they differ,
or alternatively bound-check inside the loop (using
w.SchemaHandler.SchemaElements[i+1] only when i+1 < len(...)) so a mismatch
yields a returned error instead of a panic; reference SQLWriter, WriteRow,
sqlRaws, and w.SchemaHandler.SchemaElements when making the change.
- Around line 160-173: The current getParquetCompress silently returns
DefaultCompressionType for unknown ParquetCompressType values; change this so
unknown/invalid values are rejected rather than defaulted. Either update
getParquetCompress to return (parquet.CompressionCodec, error) and return an
explicit error for the default case (referencing getParquetCompress,
ParquetCompressType and DefaultCompressionType) and propagate that error to
callers, or implement/strengthen ParseParquetCompressType to validate input
early and only supply known ParquetCompressType values to getParquetCompress so
the default branch is never hit silently. Ensure callers handle the error and
surface configuration validation failures instead of silently falling back to
ZSTD.
- Around line 236-256: The DECIMAL scaling uses big.Float with the default
(≈64-bit) precision which can drop precision for large values; update the
DECIMAL branch (the lT.DECIMAL handling that creates numSca and num) to
explicitly increase precision (e.g., use new(big.Float).SetPrec(128) or higher
for both numSca and num before doing Mul/SetString) or add a clear comment
documenting the 64-bit assumption and why scale will always be 0 for UNSIGNED
BIGINT; ensure the change touches the numSca/num initializations in the code
paths that return INT32/INT64/FIXED_LEN_BYTE_ARRAY so no silent precision loss
can occur if scale becomes nonzero.
---
Outside diff comments:
In `@dumpling/export/writer.go`:
- Around line 237-275: The first chunk's compression-aware extension (currently
computed into fileFmtExtension before the loop) must be captured and reused for
subsequent chunk names instead of calling w.fileFmt.Extension(); change the loop
to use that stored extension when calling namer.NextName so all chunks use the
same compression-prefixed extension (update references to fileFmtExtension and
the namer.NextName call that currently uses w.fileFmt.Extension()); ensure you
still compute the initial fileName using namer.NextName(conf.OutputFileTemplate,
fileFmtExtension) before entering the loop and then reuse fileFmtExtension for
later iterations.
---
Nitpick comments:
In `@dumpling/export/config.go`:
- Around line 236-286: DefaultConfig() currently leaves ParquetCompressType,
ParquetPageSize and ParquetRowGroupSize as zero-values which can produce invalid
Parquet files when callers use the library path; set sensible defaults inside
DefaultConfig() by assigning ParquetCompressType to "Snappy" (or the enum/value
used), ParquetPageSize to 1 MiB and ParquetRowGroupSize to 16 MiB (matching the
CLI flag defaults) so programmatic consumers get the same safe defaults as the
CLI.
- Around line 651-658: ParseParquetCompressType is missing causing
build/validation failures for --parquet-compress; add a
ParseParquetCompressType(par string) (ParquetCompressType, error) function
(similar to ParseOutputDialect) that accepts both canonical values
("no-compression","snappy","gz","zst") and user-facing aliases ("gzip" -> "gz",
"gzip" alternative, "zstd" -> "zst") and returns the corresponding
ParquetCompressType constant (e.g. Snappy) or a descriptive error; update
references to flagParquetCompress handling to use this parser so inputs like
"gzip" and "zstd" validate as expected.
- Around line 209-228: Move the Parquet-related fields (ParquetCompressType,
ParquetPageSize, ParquetRowGroupSize) out from after the ClusterSSL* fields and
place them with the other output settings near FileType and CsvOutputDialect to
improve grouping and readability; also rename the Parquet compression constants
(currently NoCompression, Gzip, Snappy, Zstd) to prefixed names like
ParquetNoCompression, ParquetGzip, ParquetSnappy, ParquetZstd and update all
usages (e.g., any references that mistakenly used storage.NoCompression) to the
new names so callers of the ParquetCompressType type and its constants are
unambiguous.
In `@dumpling/export/ir.go`:
- Around line 41-47: Add a short exported doc comment for the ColumnInfo type
explaining its purpose and semantics: state that ColumnInfo summarizes a table
column's metadata used for Parquet schema generation (mapping Name, Type,
Nullable, and numeric Precision/Scale), and clarify units/meaning for Precision
and Scale if needed; place the comment immediately above the ColumnInfo type
declaration so it follows Go/TiDB convention.
In `@dumpling/export/sql_type.go`:
- Around line 265-267: Add brief TiDB-style doc comments for the exported
GetRawBytes methods (e.g., SQLTypeNumber.GetRawBytes and the other two
GetRawBytes methods referenced at lines 301-303 and 342-344): each comment
should be one short sentence that states the semantic purpose (for example,
"GetRawBytes returns the underlying SQL raw bytes for this type." or
"GetRawBytes implements the retrieval of the underlying sql.RawBytes for this
value.") and avoid merely repeating the method name; place the comment
immediately above each method declaration.
In `@dumpling/export/writer_parquet_test.go`:
- Around line 117-122: The switch in writer_parquet_test.go has duplicate
branches for numeric types that both read the same field and assert via
strconv.FormatInt; consolidate the two case blocks into a single case listing
that includes "TINYINT", "UNSIGNED TINYINT", "SMALLINT", "MEDIUMINT", "UNSIGNED
SMALLINT", "UNSIGNED MEDIUMINT", "YEAR", "INT", "UNSIGNED INT", and "BIGINT" and
keep the shared body which uses
reflect.ValueOf(readRow).FieldByName(structName).Int() and require.Equal(t,
data[0][i], strconv.FormatInt(value, 10)) so the behavior stays identical but
without duplicated code.
In `@dumpling/export/writer_parquet.go`:
- Line 56: The local variable named "writer" in the WriteInsertInParquet
function shadows the imported package alias "writer"; rename the local variable
returned by NewParquetWriter (e.g., to "pw" or "parquetWriter") and update all
its usages in WriteInsertInParquet so they no longer conflict with the package
name (this preserves clarity where the package type writer.ParquetWriter is
referenced elsewhere).
- Around line 28-33: Add short, meaningful doc comments for the exported symbols
TagTemplate, DefaultCompressionType, and the exported functions/types
WriteInsertInParquet, SQLWriter, and NewParquetWriter: explain what each
constant/value represents and any important semantic constraints (e.g.,
TagTemplate format, that DefaultCompressionType is the default codec used, what
WriteInsertInParquet does/accepts/returns, the role of SQLWriter, and what
NewParquetWriter constructs/expectations). Place comments directly above the
declarations, avoid restating the identifier name, and keep them one- or
two-sentence, Go-doc style summaries to satisfy linters.
- Around line 364-377: The switch in the "Other database" branch that falls
through to return parquet.Type_BYTE_ARRAY, "" silently coerces unknown
columnType values; update the logic in the function containing that switch to
(1) detect unrecognized columnType and either return an explicit error (e.g.,
propagate an error value) or emit a single-table-scoped warning log the first
time an unknown type is seen for a given table, and (2) remove the duplicate
"DOUBLE" case in this branch since it is already handled in the TiDB branch;
refer to the existing cases (e.g., the branch that returns
parquet.Type_BYTE_ARRAY, parquet.Type_INT32, parquet.Type_INT64,
parquet.Type_BOOLEAN, parquet.Type_DOUBLE) and ensure unknown types do not
silently map to parquet.Type_BYTE_ARRAY without logging or erroring.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 9ffeb4d2-06b5-44c0-a728-b67303629c6e
📒 Files selected for processing (10)
dumpling/export/BUILD.bazeldumpling/export/config.godumpling/export/ir.godumpling/export/ir_impl.godumpling/export/sql_type.godumpling/export/util_for_test.godumpling/export/writer.godumpling/export/writer_parquet.godumpling/export/writer_parquet_test.godumpling/export/writer_util.go
| case FileFormatParquetString: | ||
| if conf.CompressType != storage.NoCompression { | ||
| return errors.Errorf("parquet does not support --compress, please unset it or use --parquet-compress instead") | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Confirm the correct NoCompression constant in compressedio (or whatever package owns CompressType).
rg -nP '\bNoCompression\b' -g '!**/dumpling/export/**'
rg -nP 'type\s+CompressType\b' -g 'pkg/objstore/**'Repository: pingcap/tidb
Length of output: 1415
🏁 Script executed:
# Check imports at the top of the file
head -100 dumpling/export/config.go | grep -n "import\|^package"
# Check lines around 857-860
sed -n '850,865p' dumpling/export/config.go | cat -n
# Check line 221 for the ParquetCompressType constant
sed -n '215,230p' dumpling/export/config.go | cat -nRepository: pingcap/tidb
Length of output: 1441
🏁 Script executed:
# Show full imports section of the file
sed -n '5,50p' dumpling/export/config.go | cat -n
# Show what CompressType is defined as - check the Config struct
grep -n "CompressType" dumpling/export/config.go | head -20Repository: pingcap/tidb
Length of output: 2363
Compile error: storage is not imported.
Line 858 references storage.NoCompression, but this file does not import a storage package. It imports compressedio (line 20), and conf.CompressType is a compressedio.CompressType (line 146), so the check must use compressedio.NoCompression. Note that there is a distinct local NoCompression ParquetCompressType constant at line 221, but that's a different type.
Fix
case FileFormatParquetString:
- if conf.CompressType != storage.NoCompression {
+ if conf.CompressType != compressedio.NoCompression {
return errors.Errorf("parquet does not support --compress, please unset it or use --parquet-compress instead")
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| case FileFormatParquetString: | |
| if conf.CompressType != storage.NoCompression { | |
| return errors.Errorf("parquet does not support --compress, please unset it or use --parquet-compress instead") | |
| } | |
| case FileFormatParquetString: | |
| if conf.CompressType != compressedio.NoCompression { | |
| return errors.Errorf("parquet does not support --compress, please unset it or use --parquet-compress instead") | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@dumpling/export/config.go` around lines 857 - 860, The check in the
FileFormatParquetString case uses an undefined storage.NoCompression; change it
to use the correct type's constant compressedio.NoCompression (because
conf.CompressType is a compressedio.CompressType), i.e., replace
storage.NoCompression with compressedio.NoCompression in the if that compares
conf.CompressType; leave the existing ParquetCompressType.NoCompression constant
untouched.
| func (r RowReceiverArr) GetRawBytes() []sql.RawBytes { | ||
| rawBytes := make([]sql.RawBytes, len(r.receivers)) | ||
| for i, receiver := range r.receivers { | ||
| receiver.GetRawBytes() | ||
| rawBytes[i] = receiver.GetRawBytes()[0] | ||
| } | ||
| return rawBytes | ||
| } |
There was a problem hiding this comment.
Remove redundant GetRawBytes() call and use a pointer receiver.
Line 236 invokes receiver.GetRawBytes() and discards the result, then line 237 calls it again — the first call is dead code. Also, this method uses a value receiver while every other method on RowReceiverArr (BindAddress, WriteToBuffer, WriteToBufferInCsv) uses a pointer receiver; the inconsistency risks method-set surprises when satisfying interfaces through a pointer. Finally, the method is exported and lacks a doc comment.
🐛 Proposed fix
-func (r RowReceiverArr) GetRawBytes() []sql.RawBytes {
- rawBytes := make([]sql.RawBytes, len(r.receivers))
- for i, receiver := range r.receivers {
- receiver.GetRawBytes()
- rawBytes[i] = receiver.GetRawBytes()[0]
- }
- return rawBytes
-}
+// GetRawBytes implements Stringer.GetRawBytes by collecting the first raw-bytes
+// element from each underlying receiver.
+func (r *RowReceiverArr) GetRawBytes() []sql.RawBytes {
+ rawBytes := make([]sql.RawBytes, len(r.receivers))
+ for i, receiver := range r.receivers {
+ rawBytes[i] = receiver.GetRawBytes()[0]
+ }
+ return rawBytes
+}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (r RowReceiverArr) GetRawBytes() []sql.RawBytes { | |
| rawBytes := make([]sql.RawBytes, len(r.receivers)) | |
| for i, receiver := range r.receivers { | |
| receiver.GetRawBytes() | |
| rawBytes[i] = receiver.GetRawBytes()[0] | |
| } | |
| return rawBytes | |
| } | |
| // GetRawBytes implements Stringer.GetRawBytes by collecting the first raw-bytes | |
| // element from each underlying receiver. | |
| func (r *RowReceiverArr) GetRawBytes() []sql.RawBytes { | |
| rawBytes := make([]sql.RawBytes, len(r.receivers)) | |
| for i, receiver := range r.receivers { | |
| rawBytes[i] = receiver.GetRawBytes()[0] | |
| } | |
| return rawBytes | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@dumpling/export/sql_type.go` around lines 233 - 240, The GetRawBytes method
on RowReceiverArr currently makes a redundant call and uses a value receiver;
change its signature to use a pointer receiver (func (r *RowReceiverArr)
GetRawBytes() []sql.RawBytes), remove the dead receiver.GetRawBytes() call so
you only call receiver.GetRawBytes()[0] once when populating rawBytes from
r.receivers, and add a short doc comment above the exported GetRawBytes method;
ensure consistency with the other methods BindAddress, WriteToBuffer, and
WriteToBufferInCsv which are pointer receivers.
| package export | ||
|
|
There was a problem hiding this comment.
Missing TiDB Apache 2.0 license header.
New Go source files in this repo should carry the standard TiDB copyright + Apache-2.0 header (see writer_parquet.go line 1). Please add it here too.
📝 Proposed fix
+// Copyright 2024 PingCAP, Inc. Licensed under Apache-2.0.
+
package exportAs per coding guidelines: "For new source files (for example *.go), include the standard TiDB license header (copyright + Apache 2.0) by copying from a nearby file and updating year if needed."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@dumpling/export/writer_parquet_test.go` around lines 1 - 2, Add the standard
TiDB Apache-2.0 license header to the top of the new test file
(writer_parquet_test.go) so it matches the header used in writer_parquet.go;
place the copyright + Apache-2.0 block above the existing "package export" line
and update the year if needed.
| for i, c := range allTypesColumnInfos { | ||
| names := strings.Split(c.Name, "_") | ||
| structName := "" | ||
| for j := 1; j < len(names); j++ { | ||
| structName += fmt.Sprintf("%s%s", strings.ToUpper(names[j][:1]), names[j][1:]) | ||
| } | ||
| switch c.Type { | ||
| case "CHAR", "VARCHAR", "TEXT", "TINYTEXT", "MEDIUMTEXT", "LONGTEXT", "DATE", "TIME", "JSON", "VECTOR", "ENUM", "SET": | ||
| value := reflect.ValueOf(readRow).FieldByName(structName).String() | ||
| require.Equal(t, data[0][i], value) | ||
| case "BLOB", "TINYBLOB", "MEDIUMBLOB", "LONGBLOB", "BINARY", "VARBINARY", "BIT": | ||
| value := reflect.ValueOf(readRow).FieldByName(structName).String() | ||
| require.Equal(t, data[0][i], []byte(value)) | ||
| case "TIMESTAMP", "DATETIME": | ||
| value := reflect.ValueOf(readRow).FieldByName(structName).Interface().(*int64) | ||
| require.Equal(t, data[0][i], time.UnixMicro(*value).UTC().Format(time.DateTime)) | ||
| case "TINYINT", "UNSIGNED TINYINT", "SMALLINT", "MEDIUMINT", "UNSIGNED SMALLINT", "UNSIGNED MEDIUMINT", "YEAR", "INT": | ||
| value := reflect.ValueOf(readRow).FieldByName(structName).Int() | ||
| require.Equal(t, data[0][i], strconv.FormatInt(value, 10)) | ||
| case "UNSIGNED INT", "BIGINT": | ||
| value := reflect.ValueOf(readRow).FieldByName(structName).Int() | ||
| require.Equal(t, data[0][i], strconv.FormatInt(value, 10)) | ||
| case "UNSIGNED BIGINT": | ||
| value := reflect.ValueOf(readRow).FieldByName(structName).String() | ||
| bigint := types.DECIMAL_BYTE_ARRAY_ToString([]byte(value), 20, 0) | ||
| require.Equal(t, data[0][i], bigint) | ||
| case "FLOAT", "DOUBLE": | ||
| value := reflect.ValueOf(readRow).FieldByName(structName).Float() | ||
| require.Equal(t, data[0][i], fmt.Sprintf("%.3f", value)) | ||
| case "DECIMAL": | ||
| if c.Precision == 9 || c.Precision == 18 { | ||
| value := reflect.ValueOf(readRow).FieldByName(structName).Int() | ||
| stringInt := strconv.FormatInt(value, 10) | ||
| result := stringInt[:len(stringInt)-int(c.Scale)] + "." + stringInt[len(stringInt)-int(c.Scale):] | ||
| require.Equal(t, data[0][i], result) | ||
| } else if c.Precision == 38 || c.Precision == 40 { | ||
| value := reflect.ValueOf(readRow).FieldByName(structName).String() | ||
| require.Equal(t, data[0][i], value) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Default case silently passes for unknown column types.
The outer switch c.Type at line 107 has no default, so if a new column type is added to allTypesColumnInfos without adding a matching case here, the test will pass without asserting anything for that column. Mirror the t.FailNow() default used when constructing the row (line 80) to keep both sides symmetric.
📝 Proposed fix
case "DECIMAL":
...
+ default:
+ t.FailNow()
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for i, c := range allTypesColumnInfos { | |
| names := strings.Split(c.Name, "_") | |
| structName := "" | |
| for j := 1; j < len(names); j++ { | |
| structName += fmt.Sprintf("%s%s", strings.ToUpper(names[j][:1]), names[j][1:]) | |
| } | |
| switch c.Type { | |
| case "CHAR", "VARCHAR", "TEXT", "TINYTEXT", "MEDIUMTEXT", "LONGTEXT", "DATE", "TIME", "JSON", "VECTOR", "ENUM", "SET": | |
| value := reflect.ValueOf(readRow).FieldByName(structName).String() | |
| require.Equal(t, data[0][i], value) | |
| case "BLOB", "TINYBLOB", "MEDIUMBLOB", "LONGBLOB", "BINARY", "VARBINARY", "BIT": | |
| value := reflect.ValueOf(readRow).FieldByName(structName).String() | |
| require.Equal(t, data[0][i], []byte(value)) | |
| case "TIMESTAMP", "DATETIME": | |
| value := reflect.ValueOf(readRow).FieldByName(structName).Interface().(*int64) | |
| require.Equal(t, data[0][i], time.UnixMicro(*value).UTC().Format(time.DateTime)) | |
| case "TINYINT", "UNSIGNED TINYINT", "SMALLINT", "MEDIUMINT", "UNSIGNED SMALLINT", "UNSIGNED MEDIUMINT", "YEAR", "INT": | |
| value := reflect.ValueOf(readRow).FieldByName(structName).Int() | |
| require.Equal(t, data[0][i], strconv.FormatInt(value, 10)) | |
| case "UNSIGNED INT", "BIGINT": | |
| value := reflect.ValueOf(readRow).FieldByName(structName).Int() | |
| require.Equal(t, data[0][i], strconv.FormatInt(value, 10)) | |
| case "UNSIGNED BIGINT": | |
| value := reflect.ValueOf(readRow).FieldByName(structName).String() | |
| bigint := types.DECIMAL_BYTE_ARRAY_ToString([]byte(value), 20, 0) | |
| require.Equal(t, data[0][i], bigint) | |
| case "FLOAT", "DOUBLE": | |
| value := reflect.ValueOf(readRow).FieldByName(structName).Float() | |
| require.Equal(t, data[0][i], fmt.Sprintf("%.3f", value)) | |
| case "DECIMAL": | |
| if c.Precision == 9 || c.Precision == 18 { | |
| value := reflect.ValueOf(readRow).FieldByName(structName).Int() | |
| stringInt := strconv.FormatInt(value, 10) | |
| result := stringInt[:len(stringInt)-int(c.Scale)] + "." + stringInt[len(stringInt)-int(c.Scale):] | |
| require.Equal(t, data[0][i], result) | |
| } else if c.Precision == 38 || c.Precision == 40 { | |
| value := reflect.ValueOf(readRow).FieldByName(structName).String() | |
| require.Equal(t, data[0][i], value) | |
| } | |
| } | |
| } | |
| for i, c := range allTypesColumnInfos { | |
| names := strings.Split(c.Name, "_") | |
| structName := "" | |
| for j := 1; j < len(names); j++ { | |
| structName += fmt.Sprintf("%s%s", strings.ToUpper(names[j][:1]), names[j][1:]) | |
| } | |
| switch c.Type { | |
| case "CHAR", "VARCHAR", "TEXT", "TINYTEXT", "MEDIUMTEXT", "LONGTEXT", "DATE", "TIME", "JSON", "VECTOR", "ENUM", "SET": | |
| value := reflect.ValueOf(readRow).FieldByName(structName).String() | |
| require.Equal(t, data[0][i], value) | |
| case "BLOB", "TINYBLOB", "MEDIUMBLOB", "LONGBLOB", "BINARY", "VARBINARY", "BIT": | |
| value := reflect.ValueOf(readRow).FieldByName(structName).String() | |
| require.Equal(t, data[0][i], []byte(value)) | |
| case "TIMESTAMP", "DATETIME": | |
| value := reflect.ValueOf(readRow).FieldByName(structName).Interface().(*int64) | |
| require.Equal(t, data[0][i], time.UnixMicro(*value).UTC().Format(time.DateTime)) | |
| case "TINYINT", "UNSIGNED TINYINT", "SMALLINT", "MEDIUMINT", "UNSIGNED SMALLINT", "UNSIGNED MEDIUMINT", "YEAR", "INT": | |
| value := reflect.ValueOf(readRow).FieldByName(structName).Int() | |
| require.Equal(t, data[0][i], strconv.FormatInt(value, 10)) | |
| case "UNSIGNED INT", "BIGINT": | |
| value := reflect.ValueOf(readRow).FieldByName(structName).Int() | |
| require.Equal(t, data[0][i], strconv.FormatInt(value, 10)) | |
| case "UNSIGNED BIGINT": | |
| value := reflect.ValueOf(readRow).FieldByName(structName).String() | |
| bigint := types.DECIMAL_BYTE_ARRAY_ToString([]byte(value), 20, 0) | |
| require.Equal(t, data[0][i], bigint) | |
| case "FLOAT", "DOUBLE": | |
| value := reflect.ValueOf(readRow).FieldByName(structName).Float() | |
| require.Equal(t, data[0][i], fmt.Sprintf("%.3f", value)) | |
| case "DECIMAL": | |
| if c.Precision == 9 || c.Precision == 18 { | |
| value := reflect.ValueOf(readRow).FieldByName(structName).Int() | |
| stringInt := strconv.FormatInt(value, 10) | |
| result := stringInt[:len(stringInt)-int(c.Scale)] + "." + stringInt[len(stringInt)-int(c.Scale):] | |
| require.Equal(t, data[0][i], result) | |
| } else if c.Precision == 38 || c.Precision == 40 { | |
| value := reflect.ValueOf(readRow).FieldByName(structName).String() | |
| require.Equal(t, data[0][i], value) | |
| } | |
| default: | |
| t.FailNow() | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@dumpling/export/writer_parquet_test.go` around lines 101 - 141, The switch
over c.Type inside the loop in writer_parquet_test.go currently lacks a default,
so add a default branch that fails the test (e.g., t.Fatalf or t.FailNow with a
clear message) when an unknown column type is encountered; update the switch in
the loop that references allTypesColumnInfos, using c.Type, structName and i in
the error message to make it easy to locate which column/type was unhandled.
| // write remain data and meta file | ||
| if err = writer.WriteStop(); err != nil { | ||
| return counter, errors.Trace(err) | ||
| } | ||
| if bf.Len() > 0 { | ||
| wp.input <- bf | ||
| } | ||
| close(wp.input) | ||
| <-wp.closed |
There was a problem hiding this comment.
Blocking send to wp.input can deadlock if the pipe has errored or ctx was cancelled.
After WriteStop, line 148 does a bare wp.input <- bf. Inside the loop (line 119–135) the send is properly guarded by a select on pCtx.Done() and wp.errCh; the tail send has no such protection. If wp.Run exits (due to upstream writer error or context cancellation) before this final send, the goroutine stops draining wp.input and this send blocks forever — close(wp.input) on line 150 never runs, and <-wp.closed on line 151 hangs too. The deferred cancel()/wg.Wait() would then also deadlock.
🛡️ Proposed fix
if bf.Len() > 0 {
- wp.input <- bf
+ select {
+ case <-pCtx.Done():
+ return counter, pCtx.Err()
+ case err = <-wp.errCh:
+ return counter, err
+ case wp.input <- bf:
+ }
}
close(wp.input)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // write remain data and meta file | |
| if err = writer.WriteStop(); err != nil { | |
| return counter, errors.Trace(err) | |
| } | |
| if bf.Len() > 0 { | |
| wp.input <- bf | |
| } | |
| close(wp.input) | |
| <-wp.closed | |
| // write remain data and meta file | |
| if err = writer.WriteStop(); err != nil { | |
| return counter, errors.Trace(err) | |
| } | |
| if bf.Len() > 0 { | |
| select { | |
| case <-pCtx.Done(): | |
| return counter, pCtx.Err() | |
| case err = <-wp.errCh: | |
| return counter, err | |
| case wp.input <- bf: | |
| } | |
| } | |
| close(wp.input) | |
| <-wp.closed |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@dumpling/export/writer_parquet.go` around lines 143 - 151, The final send to
wp.input after writer.WriteStop currently blocks if wp.Run has exited; replace
the bare send (wp.input <- bf) with the same guarded select used in the main
loop so the send is skipped/unblocked on cancellation or error: use a select
that attempts to send bf to wp.input and also listens on pCtx.Done() and
wp.errCh (the same channels used in the loop around lines 119–135) so the
goroutine won't deadlock and close(wp.input)/<-wp.closed can proceed; keep
writer.WriteStop and the subsequent close/receive logic unchanged.
| func getParquetCompress(compress ParquetCompressType) parquet.CompressionCodec { | ||
| switch compress { | ||
| case NoCompression: | ||
| return parquet.CompressionCodec_UNCOMPRESSED | ||
| case Gzip: | ||
| return parquet.CompressionCodec_GZIP | ||
| case Snappy: | ||
| return parquet.CompressionCodec_SNAPPY | ||
| case Zstd: | ||
| return parquet.CompressionCodec_ZSTD | ||
| default: | ||
| return DefaultCompressionType | ||
| } | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify how ParquetCompressType is validated in config.go and whether a default
# is enforced elsewhere (the PR summary flags ParseParquetCompressType as missing).
rg -nP --type=go -C3 'ParquetCompressType|ParseParquetCompressType' dumpling/exportRepository: pingcap/tidb
Length of output: 4439
🏁 Script executed:
#!/bin/bash
# Find the definition of ParseParquetCompressType and DefaultCompressionType
rg -nP --type=go 'func ParseParquetCompressType|DefaultCompressionType\s*=' dumpling/exportRepository: pingcap/tidb
Length of output: 148
🏁 Script executed:
#!/bin/bash
# Search for ParseParquetCompressType function definition more broadly
rg -nP --type=go 'ParseParquetCompressType' dumpling/export -A5Repository: pingcap/tidb
Length of output: 443
🏁 Script executed:
#!/bin/bash
# Search entire repository for ParseParquetCompressType definition
rg -nP --type=go 'func ParseParquetCompressType' .Repository: pingcap/tidb
Length of output: 38
🏁 Script executed:
#!/bin/bash
# Look at the context in config.go around line 650-665 to see the full picture
sed -n '645,665p' dumpling/export/config.goRepository: pingcap/tidb
Length of output: 774
🏁 Script executed:
#!/bin/bash
# Check if Config has initialization or default value for ParquetCompressType
# Also check NewParquetWriter for any validation
sed -n '179,190p' dumpling/export/writer_parquet.goRepository: pingcap/tidb
Length of output: 448
🏁 Script executed:
#!/bin/bash
# Check the full getParquetCompress function to see what happens with empty/unrecognized values
sed -n '160,173p' dumpling/export/writer_parquet.goRepository: pingcap/tidb
Length of output: 427
Silent fallback to ZSTD without validation or logging.
getParquetCompress returns DefaultCompressionType (ZSTD) for any unrecognized compression type. Since ParseParquetCompressType is not yet implemented, the config validation at config.go:655 will fail at runtime. Even after implementation, any unset or invalid compression type silently becomes ZSTD with no warning. Add validation in getParquetCompress to reject unknown values with an error, or implement comprehensive validation in ParseParquetCompressType to catch misconfigurations early.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@dumpling/export/writer_parquet.go` around lines 160 - 173, The current
getParquetCompress silently returns DefaultCompressionType for unknown
ParquetCompressType values; change this so unknown/invalid values are rejected
rather than defaulted. Either update getParquetCompress to return
(parquet.CompressionCodec, error) and return an explicit error for the default
case (referencing getParquetCompress, ParquetCompressType and
DefaultCompressionType) and propagate that error to callers, or
implement/strengthen ParseParquetCompressType to validate input early and only
supply known ParquetCompressType values to getParquetCompress so the default
branch is never hit silently. Ensure callers handle the error and surface
configuration validation failures instead of silently falling back to ZSTD.
| // WriteRow writes a row to the parquet format | ||
| func (w *SQLWriter) WriteRow(r RowReceiverArr) error { | ||
| var err error | ||
| sqlRaws := r.GetRawBytes() | ||
| rec := make([]any, len(sqlRaws)) | ||
| for i := 0; i < len(sqlRaws); i++ { | ||
| rec[i] = nil | ||
| if sqlRaws[i] != nil { | ||
| rec[i], err = convertDataToParquet(sqlRaws[i], | ||
| w.SchemaHandler.SchemaElements[i+1].Type, | ||
| w.SchemaHandler.SchemaElements[i+1].LogicalType) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
| } | ||
| return w.Write(rec) | ||
| } |
There was a problem hiding this comment.
WriteRow assumes len(sqlRaws) == len(SchemaHandler.SchemaElements)-1.
SchemaElements[i+1] is accessed for every i in sqlRaws without bounds validation. If the row receiver and schema ever disagree (e.g., because of a future bug in ColumnInfos() vs ColumnTypes() plumbing), this panics instead of returning a clean error. Consider validating once up-front and returning an error.
🛡️ Proposed fix
func (w *SQLWriter) WriteRow(r RowReceiverArr) error {
var err error
sqlRaws := r.GetRawBytes()
+ if len(sqlRaws)+1 > len(w.SchemaHandler.SchemaElements) {
+ return fmt.Errorf("parquet: row has %d columns but schema has %d", len(sqlRaws), len(w.SchemaHandler.SchemaElements)-1)
+ }
rec := make([]any, len(sqlRaws))📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // WriteRow writes a row to the parquet format | |
| func (w *SQLWriter) WriteRow(r RowReceiverArr) error { | |
| var err error | |
| sqlRaws := r.GetRawBytes() | |
| rec := make([]any, len(sqlRaws)) | |
| for i := 0; i < len(sqlRaws); i++ { | |
| rec[i] = nil | |
| if sqlRaws[i] != nil { | |
| rec[i], err = convertDataToParquet(sqlRaws[i], | |
| w.SchemaHandler.SchemaElements[i+1].Type, | |
| w.SchemaHandler.SchemaElements[i+1].LogicalType) | |
| if err != nil { | |
| return err | |
| } | |
| } | |
| } | |
| return w.Write(rec) | |
| } | |
| // WriteRow writes a row to the parquet format | |
| func (w *SQLWriter) WriteRow(r RowReceiverArr) error { | |
| var err error | |
| sqlRaws := r.GetRawBytes() | |
| if len(sqlRaws)+1 > len(w.SchemaHandler.SchemaElements) { | |
| return fmt.Errorf("parquet: row has %d columns but schema has %d", len(sqlRaws), len(w.SchemaHandler.SchemaElements)-1) | |
| } | |
| rec := make([]any, len(sqlRaws)) | |
| for i := 0; i < len(sqlRaws); i++ { | |
| rec[i] = nil | |
| if sqlRaws[i] != nil { | |
| rec[i], err = convertDataToParquet(sqlRaws[i], | |
| w.SchemaHandler.SchemaElements[i+1].Type, | |
| w.SchemaHandler.SchemaElements[i+1].LogicalType) | |
| if err != nil { | |
| return err | |
| } | |
| } | |
| } | |
| return w.Write(rec) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@dumpling/export/writer_parquet.go` around lines 213 - 230, WriteRow currently
indexes SchemaHandler.SchemaElements[i+1] for each entry in sqlRaws without
checking lengths, which can panic if sqlRaws length !=
len(SchemaHandler.SchemaElements)-1; add an upfront validation in
SQLWriter.WriteRow that compares len(sqlRaws) to
len(w.SchemaHandler.SchemaElements)-1 and return a clear error if they differ,
or alternatively bound-check inside the loop (using
w.SchemaHandler.SchemaElements[i+1] only when i+1 < len(...)) so a mismatch
yields a returned error instead of a panic; reference SQLWriter, WriteRow,
sqlRaws, and w.SchemaHandler.SchemaElements when making the change.
| if lT != nil && lT.DECIMAL != nil { | ||
| numSca := big.NewFloat(1.0) | ||
| for i := 0; i < int(lT.DECIMAL.Scale); i++ { | ||
| numSca.Mul(numSca, big.NewFloat(10)) | ||
| } | ||
| num := new(big.Float) | ||
| num.SetString(s) | ||
| num.Mul(num, numSca) | ||
|
|
||
| if *pT == parquet.Type_INT32 { | ||
| tmp, _ := num.Int64() | ||
| return int32(tmp), nil | ||
| } else if *pT == parquet.Type_INT64 { | ||
| tmp, _ := num.Int64() | ||
| return tmp, nil | ||
| } else if *pT == parquet.Type_FIXED_LEN_BYTE_ARRAY { | ||
| s = num.Text('f', 0) | ||
| // only used by unsigned big int | ||
| res := types.StrIntToBinary(s, "BigEndian", 9, true) | ||
| return res, nil | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In Go's math/big, what is the default precision of a Float returned by new(big.Float) after calling SetString, and can it represent math.MaxUint64 exactly?
💡 Result:
In Go's math/big package, a Float created by new(big.Float) has an initial precision of 0 bits. When SetString is called on it, the precision is set to 64 bits before any rounding takes place during parsing. This is confirmed in the source code of floatconv.go: in the scan function (called by Parse, which is called by SetString), if prec == 0, prec = 64. With 64 bits of mantissa precision, a big.Float can represent math.MaxUint64 (18446744073709551615, which requires exactly 64 bits) exactly, as the mantissa can hold up to 64 bits without rounding loss for integer values of that magnitude. This aligns with similar setters like SetUint64 and SetInt64, which also default to 64 bits and preserve exactness for uint64 values.
Citations:
- 1: https://pkg.go.dev/math/big@go1.25.5
- 2: https://pkg.go.dev/math/big
- 3: https://pkg.go.dev/math/big@go1.24.2
- 4: https://github.com/golang/go/blob/master/src/math/big/float.go
- 5: https://go.dev/src/math/big/floatconv.go
- 6: https://go.dev/src/math/big/floatconv.go?s=8439%3A8491
🏁 Script executed:
# Find and examine the writer_parquet.go file
fd -p "writer_parquet.go" 2>/dev/nullRepository: pingcap/tidb
Length of output: 89
🏁 Script executed:
# Read the specific function around lines 236-256 with broader context
if fd -p "writer_parquet.go" 2>/dev/null | head -1; then
file=$(fd -p "writer_parquet.go" 2>/dev/null | head -1)
echo "=== File: $file ==="
# Read broader context around the lines
sed -n '200,280p' "$file" | cat -n
fiRepository: pingcap/tidb
Length of output: 3087
🏁 Script executed:
# Search for DECIMAL scale constraints and type information
rg -A 5 -B 5 "UNSIGNED.*BIGINT|BIGINT.*UNSIGNED|DECIMAL" dumpling/export/writer_parquet.go | head -100Repository: pingcap/tidb
Length of output: 1824
Add precision safeguard or document the 64-bit assumption for future DECIMAL scaling.
The code uses big.Float with default 64-bit precision (confirmed by SetString behavior) to scale decimal values. While UNSIGNED BIGINT currently reaches this branch only with scale=0 (safe), any future code path passing non-zero scale would cause silent precision loss: math.MaxUint64 requires exactly 64 bits, and each multiplication by 10 consumes additional precision beyond that limit. Either explicitly set precision to a higher value (e.g., new(big.Float).SetPrec(128)) or document why the current 64-bit default is safe, preventing accidental data corruption if the DECIMAL handling is broadened in the future.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@dumpling/export/writer_parquet.go` around lines 236 - 256, The DECIMAL
scaling uses big.Float with the default (≈64-bit) precision which can drop
precision for large values; update the DECIMAL branch (the lT.DECIMAL handling
that creates numSca and num) to explicitly increase precision (e.g., use
new(big.Float).SetPrec(128) or higher for both numSca and num before doing
Mul/SetString) or add a clear comment documenting the 64-bit assumption and why
scale will always be 0 for UNSIGNED BIGINT; ensure the change touches the
numSca/num initializations in the code paths that return
INT32/INT64/FIXED_LEN_BYTE_ARRAY so no silent precision loss can occur if scale
becomes nonzero.
| } else if *pT == parquet.Type_FIXED_LEN_BYTE_ARRAY { | ||
| return s, nil | ||
| } | ||
| return nil, fmt.Errorf("unsupported type %v", pT) |
There was a problem hiding this comment.
%v on *parquet.Type prints a pointer address, not the type name.
pT is a *parquet.Type; using %v formats the pointer. Dereference it so the error message is actionable.
🐛 Proposed fix
- return nil, fmt.Errorf("unsupported type %v", pT)
+ return nil, fmt.Errorf("unsupported type %v", *pT)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| return nil, fmt.Errorf("unsupported type %v", pT) | |
| return nil, fmt.Errorf("unsupported type %v", *pT) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@dumpling/export/writer_parquet.go` at line 294, The error message is printing
a pointer address because pT is a *parquet.Type; change the fmt.Errorf call in
the return path that currently uses pT to print the actual type by dereferencing
pT (e.g. use *pT) or call its String() method if available so the error reads
the type name instead of a pointer address; update the return in
writer_parquet.go (the line returning fmt.Errorf("unsupported type %v", pT)) to
use the dereferenced value or String() representation.
|
@shiyuhang0: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
|
we move forward to arrow-go which is the official library, and will not switch back to parquet-go, so close this pr |
What problem does this PR solve?
Issue Number: ref #67765
Note is a cherry-pick from tidbcloud/tidb-cse#1937
What is changed and how it works?
This PR cherry picks the dumpling parquet export feature from the tidbcloud/tidb-cse repository. The changes add support for exporting data in Parquet file format through dumpling.
Changes include:
writer_parquet.goandwriter_parquet_test.goimplementing parquet write logic with type conversion for all TiDB data typesir.go/ir_impl.go: AddColumnInfostruct andColumnInfos()method toTableMetainterfacesql_type.go: AddGetRawBytes()method toStringerinterface and all type implementationswriter_util.go: AddFileFormatParquetformat enum and dispatch inWriteInsertwriter.go: Handle parquet format in writer initialization and file extension with compression suffixconfig.go: Add parquet flag constants, filetype hint, parquet parsing inParseFromFlags, and parquet validation inadjustFileFormatutil_for_test.go: AddcolumnInfosfield andnewMockTableIRWithColumnInfohelperBUILD.bazel: Add new source files to build targetsBUILD.bazelgo_library/go_test deps: Missing@com_github_xitongsys_parquet_go//...dependenciesconfig.goConfig struct: MissingParquetCompressType,ParquetPageSize,ParquetRowGroupSizefields andParquetCompressTypetype definitionconfig.goDefineFlags: Missing parquet flag registration (--parquet-compress,--parquet-page-size,--parquet-row-group-size)config.go: MissingParseParquetCompressTypefunctionThese need to be manually added before the feature is fully functional.
Tests
Side effects
Documentation
Release note
Summary by CodeRabbit