Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 102 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,16 @@ Other options and flags are also available:
$ timescaledb-parallel-copy --help

Usage of timescaledb-parallel-copy:
-batch-error-output-dir string
directory to store batch errors. Settings this will save a .csv file with the contents of the batch that failed and continue with the rest of the data.
-auto-column-mapping
Automatically map CSV headers to database columns with the same names
-batch-byte-size int
Max number of bytes to send in a batch (default 20971520)
-batch-size int
Number of rows per insert (default 5000)
Number of rows per insert. It will be limited by batch-byte-size (default 5000)
-buffer-byte-size int
Number of bytes to buffer, it has to be big enough to hold a full row (default 2097152)
-column-mapping string
Column mapping from CSV to database columns (format: "csv_col1:db_col1,csv_col2:db_col2" or JSON)
-columns string
Comma-separated columns present in CSV
-connection string
Expand Down Expand Up @@ -222,6 +228,7 @@ Usage of timescaledb-parallel-copy:
Number of parallel requests to make (default 1)
```


## Purpose

PostgreSQL native `COPY` function is transactional and single-threaded, and may not be suitable for ingesting large
Expand All @@ -237,7 +244,7 @@ less often. This improves memory management and keeps operations on the disk as

We welcome contributions to this utility, which like TimescaleDB is released under the Apache2 Open Source License. The same [Contributors Agreement](//github.com/timescale/timescaledb/blob/master/CONTRIBUTING.md) applies; please sign the [Contributor License Agreement](https://cla-assistant.io/timescale/timescaledb-parallel-copy) (CLA) if you're a new contributor.

### Running Tests
## Running Tests

Some of the tests require a running Postgres database. Set the `TEST_CONNINFO`
environment variable to point at the database you want to run tests against.
Expand All @@ -250,3 +257,94 @@ For example:
$ createdb gotest
$ TEST_CONNINFO='dbname=gotest user=myuser' go test -v ./...
```

## Advanced usage

### Column Mapping

The tool exposes two flags `--column-mapping` and `--auto-column-mapping` that allow to handle csv headers in a smart way.

`--column-mapping` allows to specify how the columns from your csv map into database columns. It supports two formats:

**Simple format:**
```bash
# Map CSV columns to database columns with different names
$ timescaledb-parallel-copy --connection $DATABASE_URL --table metrics --file data.csv \
--column-mapping "timestamp:time,temperature:temp_celsius,humidity:humidity_percent"
```

**JSON format:**
```bash
# Same mapping using JSON format
$ timescaledb-parallel-copy --connection $DATABASE_URL --table metrics --file data.csv \
--column-mapping '{"timestamp":"time","temperature":"temp_celsius","humidity":"humidity_percent"}'
```

Example CSV file with headers:
```csv
timestamp,temperature,humidity
2023-01-01 00:00:00,20.5,65.2
2023-01-01 01:00:00,21.0,64.8
```

This maps the CSV columns to database columns: `timestamp` → `time`, `temperature` → `temp_celsius`, `humidity` → `humidity_percent`.

`--auto-column-mapping` covers the common case when your csv columns have the same name as your database columns.

```bash
# Automatically map CSV headers to database columns with identical names
$ timescaledb-parallel-copy --connection $DATABASE_URL --table sensors --file sensors.csv \
--auto-column-mapping
```

Example CSV file with headers matching database columns:
```csv
time,device_id,temperature,humidity
2023-01-01 00:00:00,sensor_001,20.5,65.2
2023-01-01 01:00:00,sensor_002,21.0,64.8
```

Both flags automatically skip the header row and cannot be used together with `--skip-header` or `--columns`.

**Flexible Column Mapping:**

Column mappings can include entries for columns that are not present in the input CSV file. This allows you to use the same mapping configuration across multiple input files with different column sets:

```bash
# Define a comprehensive mapping that works with multiple CSV formats
$ timescaledb-parallel-copy --connection $DATABASE_URL --table sensors --file partial_data.csv \
--column-mapping "timestamp:time,temp:temperature,humidity:humidity_percent,pressure:pressure_hpa,location:device_location"
```

Example CSV file with only some of the mapped columns:
```csv
timestamp,temp,humidity
2023-01-01 00:00:00,20.5,65.2
2023-01-01 01:00:00,21.0,64.8
```

In this case, only the `timestamp`, `temp`, and `humidity` columns from the CSV will be processed and mapped to `time`, `temperature`, and `humidity_percent` respectively. The unused mappings for `pressure` and `location` are simply ignored, allowing the same mapping configuration to work with different input files that may have varying column sets.

You can also map different CSV column names to the same database column, as long as only one of them appears in any given input file:

```bash
# Map both 'temp' and 'temperature' to the same database column
$ timescaledb-parallel-copy --connection $DATABASE_URL --table sensors --file data.csv \
--column-mapping "timestamp:time,temp:temperature,temperature:temperature,humidity:humidity_percent"
```

This allows importing from different file formats into the same table:

**File A** (uses 'temp'):
```csv
timestamp,temp,humidity
2023-01-01 00:00:00,20.5,65.2
```

**File B** (uses 'temperature'):
```csv
timestamp,temperature,humidity
2023-01-01 02:00:00,22.1,63.5
```

Both files can use the same mapping configuration and import successfully into the same database table, even though they use different column names for the temperature data. The tool only validates for duplicate database columns among the columns actually present in each specific input file.
116 changes: 107 additions & 9 deletions cmd/timescaledb-parallel-copy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package main

import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"log"
"os"
"runtime"
"strings"
"time"

"github.com/timescale/timescaledb-parallel-copy/pkg/csvcopy"
Expand All @@ -33,11 +35,14 @@ var (
quoteCharacter string
escapeCharacter string

fromFile string
columns string
skipHeader bool
headerLinesCnt int
skipBatchErrors bool
fromFile string
columns string
columnMapping string
autoColumnMapping bool
skipHeader bool
headerLinesCnt int
skipLines int
skipBatchErrors bool

importID string
workers int
Expand Down Expand Up @@ -68,8 +73,12 @@ func init() {
flag.StringVar(&escapeCharacter, "escape", "", "The ESCAPE `character` to use during COPY (default '\"')")
flag.StringVar(&fromFile, "file", "", "File to read from rather than stdin")
flag.StringVar(&columns, "columns", "", "Comma-separated columns present in CSV")
flag.StringVar(&columnMapping, "column-mapping", "", "Column mapping from CSV to database columns (format: \"csv_col1:db_col1,csv_col2:db_col2\" or JSON)")
flag.BoolVar(&autoColumnMapping, "auto-column-mapping", false, "Automatically map CSV headers to database columns with the same names")

flag.BoolVar(&skipHeader, "skip-header", false, "Skip the first line of the input")
flag.IntVar(&headerLinesCnt, "header-line-count", 1, "Number of header lines")
flag.IntVar(&headerLinesCnt, "header-line-count", 1, "(deprecated) Number of header lines")
flag.IntVar(&skipLines, "skip-lines", 0, "Skip the first n lines of the input. it is applied before skip-header")

flag.BoolVar(&skipBatchErrors, "skip-batch-errors", false, "if true, the copy will continue even if a batch fails")

Expand Down Expand Up @@ -103,6 +112,11 @@ func main() {
if dbName != "" {
log.Fatalf("Error: Deprecated flag -db-name is being used. Update -connection to connect to the given database")
}

if headerLinesCnt != 1 {
log.Fatalf("Error: -header-line-count is deprecated. Use -skip-lines instead")
}
Comment on lines +116 to +118
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we deprecating this flag? And why in the context of this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is in the thread above #118 (comment)


logger := &csvCopierLogger{}

opts := []csvcopy.Option{
Expand All @@ -127,6 +141,18 @@ func main() {
opts = append(opts, csvcopy.WithImportID(importID))
}

if columnMapping != "" {
mapping, err := parseColumnMapping(columnMapping)
if err != nil {
log.Fatalf("Error parsing column mapping: %v", err)
}
opts = append(opts, csvcopy.WithColumnMapping(mapping))
}

if autoColumnMapping {
opts = append(opts, csvcopy.WithAutoColumnMapping())
}

batchErrorHandler := csvcopy.BatchHandlerError()
if skipBatchErrors {
batchErrorHandler = csvcopy.BatchHandlerNoop()
Expand All @@ -136,10 +162,12 @@ func main() {
}
opts = append(opts, csvcopy.WithBatchErrorHandler(batchErrorHandler))

if skipLines > 0 {
opts = append(opts, csvcopy.WithSkipHeaderCount(skipLines))
}

if skipHeader {
opts = append(opts,
csvcopy.WithSkipHeaderCount(headerLinesCnt),
)
opts = append(opts, csvcopy.WithSkipHeader(true))
}

copier, err := csvcopy.NewCopier(
Expand Down Expand Up @@ -190,3 +218,73 @@ func main() {
}
fmt.Println(res)
}

// parseColumnMapping parses column mapping string into csvcopy.ColumnsMapping
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should move these to a config package and create some tests to validate the mapping works. We should also test weird valid Postgres column names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll rather keep this function near in main.go file than to create a separate package. Specially given this is specific to the cli interface.

// Supports two formats:
// 1. Simple: "csv_col1:db_col1,csv_col2:db_col2"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to support 2 methods? Wouldn't one simplify UX for the user?

Have you run tests with random column names that need to be quoted?

Copy link
Contributor Author

@MetalBlueberry MetalBlueberry Jul 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simple method works for most of use cases and is very comfortable to type in the terminal. BUT as you already noticed, it will fail with strange column names as it will provably conflict with terminal syntax.

the json format is bullet proof. as json encoding is well defined and you can easily define any column name you want without having to worry about your terminal. This includes syntax to quote characters. So there is no need for extra validation. IF you mess it up, the code will fail on the first attempt to insert data into your database.

It also doesn't make a lot of sense to have unit tests for this. As it is just json.Unmarshall

// 2. JSON: {"csv_col1":"db_col1","csv_col2":"db_col2"}
func parseColumnMapping(mappingStr string) (csvcopy.ColumnsMapping, error) {
if mappingStr == "" {
return nil, nil
}

mappingStr = strings.TrimSpace(mappingStr)

// Check if it's JSON format (starts with '{')
if strings.HasPrefix(mappingStr, "{") {
return parseJSONColumnMapping(mappingStr)
}

// Parse simple format: "csv_col1:db_col1,csv_col2:db_col2"
return parseSimpleColumnMapping(mappingStr)
}

// parseJSONColumnMapping parses JSON format column mapping
func parseJSONColumnMapping(jsonStr string) (csvcopy.ColumnsMapping, error) {
var mappingMap map[string]string
if err := json.Unmarshal([]byte(jsonStr), &mappingMap); err != nil {
return nil, fmt.Errorf("invalid JSON format for column mapping: %w", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's return the correct format as part of the error.

}

var mapping csvcopy.ColumnsMapping
for csvCol, dbCol := range mappingMap {
mapping = append(mapping, csvcopy.ColumnMapping{
CSVColumnName: csvCol,
DatabaseColumnName: dbCol,
})
}

return mapping, nil
}

// parseSimpleColumnMapping parses simple format: "csv_col1:db_col1,csv_col2:db_col2"
func parseSimpleColumnMapping(simpleStr string) (csvcopy.ColumnsMapping, error) {
pairs := strings.Split(simpleStr, ",")
var mapping csvcopy.ColumnsMapping

for i, pair := range pairs {
pair = strings.TrimSpace(pair)
if pair == "" {
continue
}

parts := strings.Split(pair, ":")
if len(parts) != 2 {
return nil, fmt.Errorf("invalid column mapping format at position %d: '%s', expected 'csv_column:db_column'", i+1, pair)
}

csvCol := strings.TrimSpace(parts[0])
dbCol := strings.TrimSpace(parts[1])

if csvCol == "" || dbCol == "" {
return nil, fmt.Errorf("empty column name in mapping at position %d: '%s'", i+1, pair)
}

mapping = append(mapping, csvcopy.ColumnMapping{
CSVColumnName: csvCol,
DatabaseColumnName: dbCol,
})
}

return mapping, nil
}
Loading
Loading