Skip to content

Commit fb931ee

Browse files
Merge pull request #118 from timescale/vperez/implement-column-mapping
feat: column mapping and auto column mapping
2 parents 926108e + 8d5e553 commit fb931ee

File tree

8 files changed

+1522
-80
lines changed

8 files changed

+1522
-80
lines changed

README.md

Lines changed: 102 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -188,10 +188,16 @@ Other options and flags are also available:
188188
$ timescaledb-parallel-copy --help
189189
190190
Usage of timescaledb-parallel-copy:
191-
-batch-error-output-dir string
192-
directory to store batch errors. Settings this will save a .csv file with the contents of the batch that failed and continue with the rest of the data.
191+
-auto-column-mapping
192+
Automatically map CSV headers to database columns with the same names
193+
-batch-byte-size int
194+
Max number of bytes to send in a batch (default 20971520)
193195
-batch-size int
194-
Number of rows per insert (default 5000)
196+
Number of rows per insert. It will be limited by batch-byte-size (default 5000)
197+
-buffer-byte-size int
198+
Number of bytes to buffer, it has to be big enough to hold a full row (default 2097152)
199+
-column-mapping string
200+
Column mapping from CSV to database columns (format: "csv_col1:db_col1,csv_col2:db_col2" or JSON)
195201
-columns string
196202
Comma-separated columns present in CSV
197203
-connection string
@@ -236,6 +242,7 @@ Usage of timescaledb-parallel-copy:
236242
Number of parallel requests to make (default 1)
237243
```
238244

245+
239246
## Purpose
240247

241248
PostgreSQL native `COPY` function is transactional and single-threaded, and may not be suitable for ingesting large
@@ -251,7 +258,7 @@ less often. This improves memory management and keeps operations on the disk as
251258

252259
We welcome contributions to this utility, which like TimescaleDB is released under the Apache2 Open Source License. The same [Contributors Agreement](//github.com/timescale/timescaledb/blob/master/CONTRIBUTING.md) applies; please sign the [Contributor License Agreement](https://cla-assistant.io/timescale/timescaledb-parallel-copy) (CLA) if you're a new contributor.
253260

254-
### Running Tests
261+
## Running Tests
255262

256263
Some of the tests require a running Postgres database. Set the `TEST_CONNINFO`
257264
environment variable to point at the database you want to run tests against.
@@ -264,3 +271,94 @@ For example:
264271
$ createdb gotest
265272
$ TEST_CONNINFO='dbname=gotest user=myuser' go test -v ./...
266273
```
274+
275+
## Advanced usage
276+
277+
### Column Mapping
278+
279+
The tool exposes two flags `--column-mapping` and `--auto-column-mapping` that allow to handle csv headers in a smart way.
280+
281+
`--column-mapping` allows to specify how the columns from your csv map into database columns. It supports two formats:
282+
283+
**Simple format:**
284+
```bash
285+
# Map CSV columns to database columns with different names
286+
$ timescaledb-parallel-copy --connection $DATABASE_URL --table metrics --file data.csv \
287+
--column-mapping "timestamp:time,temperature:temp_celsius,humidity:humidity_percent"
288+
```
289+
290+
**JSON format:**
291+
```bash
292+
# Same mapping using JSON format
293+
$ timescaledb-parallel-copy --connection $DATABASE_URL --table metrics --file data.csv \
294+
--column-mapping '{"timestamp":"time","temperature":"temp_celsius","humidity":"humidity_percent"}'
295+
```
296+
297+
Example CSV file with headers:
298+
```csv
299+
timestamp,temperature,humidity
300+
2023-01-01 00:00:00,20.5,65.2
301+
2023-01-01 01:00:00,21.0,64.8
302+
```
303+
304+
This maps the CSV columns to database columns: `timestamp``time`, `temperature``temp_celsius`, `humidity``humidity_percent`.
305+
306+
`--auto-column-mapping` covers the common case when your csv columns have the same name as your database columns.
307+
308+
```bash
309+
# Automatically map CSV headers to database columns with identical names
310+
$ timescaledb-parallel-copy --connection $DATABASE_URL --table sensors --file sensors.csv \
311+
--auto-column-mapping
312+
```
313+
314+
Example CSV file with headers matching database columns:
315+
```csv
316+
time,device_id,temperature,humidity
317+
2023-01-01 00:00:00,sensor_001,20.5,65.2
318+
2023-01-01 01:00:00,sensor_002,21.0,64.8
319+
```
320+
321+
Both flags automatically skip the header row and cannot be used together with `--skip-header` or `--columns`.
322+
323+
**Flexible Column Mapping:**
324+
325+
Column mappings can include entries for columns that are not present in the input CSV file. This allows you to use the same mapping configuration across multiple input files with different column sets:
326+
327+
```bash
328+
# Define a comprehensive mapping that works with multiple CSV formats
329+
$ timescaledb-parallel-copy --connection $DATABASE_URL --table sensors --file partial_data.csv \
330+
--column-mapping "timestamp:time,temp:temperature,humidity:humidity_percent,pressure:pressure_hpa,location:device_location"
331+
```
332+
333+
Example CSV file with only some of the mapped columns:
334+
```csv
335+
timestamp,temp,humidity
336+
2023-01-01 00:00:00,20.5,65.2
337+
2023-01-01 01:00:00,21.0,64.8
338+
```
339+
340+
In this case, only the `timestamp`, `temp`, and `humidity` columns from the CSV will be processed and mapped to `time`, `temperature`, and `humidity_percent` respectively. The unused mappings for `pressure` and `location` are simply ignored, allowing the same mapping configuration to work with different input files that may have varying column sets.
341+
342+
You can also map different CSV column names to the same database column, as long as only one of them appears in any given input file:
343+
344+
```bash
345+
# Map both 'temp' and 'temperature' to the same database column
346+
$ timescaledb-parallel-copy --connection $DATABASE_URL --table sensors --file data.csv \
347+
--column-mapping "timestamp:time,temp:temperature,temperature:temperature,humidity:humidity_percent"
348+
```
349+
350+
This allows importing from different file formats into the same table:
351+
352+
**File A** (uses 'temp'):
353+
```csv
354+
timestamp,temp,humidity
355+
2023-01-01 00:00:00,20.5,65.2
356+
```
357+
358+
**File B** (uses 'temperature'):
359+
```csv
360+
timestamp,temperature,humidity
361+
2023-01-01 02:00:00,22.1,63.5
362+
```
363+
364+
Both files can use the same mapping configuration and import successfully into the same database table, even though they use different column names for the temperature data. The tool only validates for duplicate database columns among the columns actually present in each specific input file.

cmd/timescaledb-parallel-copy/main.go

Lines changed: 107 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@ package main
33

44
import (
55
"context"
6+
"encoding/json"
67
"errors"
78
"flag"
89
"fmt"
910
"io"
1011
"log"
1112
"os"
1213
"runtime"
14+
"strings"
1315
"time"
1416

1517
"github.com/timescale/timescaledb-parallel-copy/pkg/csvcopy"
@@ -33,11 +35,14 @@ var (
3335
quoteCharacter string
3436
escapeCharacter string
3537

36-
fromFile string
37-
columns string
38-
skipHeader bool
39-
headerLinesCnt int
40-
skipBatchErrors bool
38+
fromFile string
39+
columns string
40+
columnMapping string
41+
autoColumnMapping bool
42+
skipHeader bool
43+
headerLinesCnt int
44+
skipLines int
45+
skipBatchErrors bool
4146

4247
importID string
4348
workers int
@@ -68,8 +73,12 @@ func init() {
6873
flag.StringVar(&escapeCharacter, "escape", "", "The ESCAPE `character` to use during COPY (default '\"')")
6974
flag.StringVar(&fromFile, "file", "", "File to read from rather than stdin")
7075
flag.StringVar(&columns, "columns", "", "Comma-separated columns present in CSV")
76+
flag.StringVar(&columnMapping, "column-mapping", "", "Column mapping from CSV to database columns (format: \"csv_col1:db_col1,csv_col2:db_col2\" or JSON)")
77+
flag.BoolVar(&autoColumnMapping, "auto-column-mapping", false, "Automatically map CSV headers to database columns with the same names")
78+
7179
flag.BoolVar(&skipHeader, "skip-header", false, "Skip the first line of the input")
72-
flag.IntVar(&headerLinesCnt, "header-line-count", 1, "Number of header lines")
80+
flag.IntVar(&headerLinesCnt, "header-line-count", 1, "(deprecated) Number of header lines")
81+
flag.IntVar(&skipLines, "skip-lines", 0, "Skip the first n lines of the input. it is applied before skip-header")
7382

7483
flag.BoolVar(&skipBatchErrors, "skip-batch-errors", false, "if true, the copy will continue even if a batch fails")
7584

@@ -103,6 +112,11 @@ func main() {
103112
if dbName != "" {
104113
log.Fatalf("Error: Deprecated flag -db-name is being used. Update -connection to connect to the given database")
105114
}
115+
116+
if headerLinesCnt != 1 {
117+
log.Fatalf("Error: -header-line-count is deprecated. Use -skip-lines instead")
118+
}
119+
106120
logger := &csvCopierLogger{}
107121

108122
opts := []csvcopy.Option{
@@ -127,6 +141,18 @@ func main() {
127141
opts = append(opts, csvcopy.WithImportID(importID))
128142
}
129143

144+
if columnMapping != "" {
145+
mapping, err := parseColumnMapping(columnMapping)
146+
if err != nil {
147+
log.Fatalf("Error parsing column mapping: %v", err)
148+
}
149+
opts = append(opts, csvcopy.WithColumnMapping(mapping))
150+
}
151+
152+
if autoColumnMapping {
153+
opts = append(opts, csvcopy.WithAutoColumnMapping())
154+
}
155+
130156
batchErrorHandler := csvcopy.BatchHandlerError()
131157
if skipBatchErrors {
132158
batchErrorHandler = csvcopy.BatchHandlerNoop()
@@ -136,10 +162,12 @@ func main() {
136162
}
137163
opts = append(opts, csvcopy.WithBatchErrorHandler(batchErrorHandler))
138164

165+
if skipLines > 0 {
166+
opts = append(opts, csvcopy.WithSkipHeaderCount(skipLines))
167+
}
168+
139169
if skipHeader {
140-
opts = append(opts,
141-
csvcopy.WithSkipHeaderCount(headerLinesCnt),
142-
)
170+
opts = append(opts, csvcopy.WithSkipHeader(true))
143171
}
144172

145173
copier, err := csvcopy.NewCopier(
@@ -190,3 +218,73 @@ func main() {
190218
}
191219
fmt.Println(res)
192220
}
221+
222+
// parseColumnMapping parses column mapping string into csvcopy.ColumnsMapping
223+
// Supports two formats:
224+
// 1. Simple: "csv_col1:db_col1,csv_col2:db_col2"
225+
// 2. JSON: {"csv_col1":"db_col1","csv_col2":"db_col2"}
226+
func parseColumnMapping(mappingStr string) (csvcopy.ColumnsMapping, error) {
227+
if mappingStr == "" {
228+
return nil, nil
229+
}
230+
231+
mappingStr = strings.TrimSpace(mappingStr)
232+
233+
// Check if it's JSON format (starts with '{')
234+
if strings.HasPrefix(mappingStr, "{") {
235+
return parseJSONColumnMapping(mappingStr)
236+
}
237+
238+
// Parse simple format: "csv_col1:db_col1,csv_col2:db_col2"
239+
return parseSimpleColumnMapping(mappingStr)
240+
}
241+
242+
// parseJSONColumnMapping parses JSON format column mapping
243+
func parseJSONColumnMapping(jsonStr string) (csvcopy.ColumnsMapping, error) {
244+
var mappingMap map[string]string
245+
if err := json.Unmarshal([]byte(jsonStr), &mappingMap); err != nil {
246+
return nil, fmt.Errorf("invalid JSON format for column mapping: %w", err)
247+
}
248+
249+
var mapping csvcopy.ColumnsMapping
250+
for csvCol, dbCol := range mappingMap {
251+
mapping = append(mapping, csvcopy.ColumnMapping{
252+
CSVColumnName: csvCol,
253+
DatabaseColumnName: dbCol,
254+
})
255+
}
256+
257+
return mapping, nil
258+
}
259+
260+
// parseSimpleColumnMapping parses simple format: "csv_col1:db_col1,csv_col2:db_col2"
261+
func parseSimpleColumnMapping(simpleStr string) (csvcopy.ColumnsMapping, error) {
262+
pairs := strings.Split(simpleStr, ",")
263+
var mapping csvcopy.ColumnsMapping
264+
265+
for i, pair := range pairs {
266+
pair = strings.TrimSpace(pair)
267+
if pair == "" {
268+
continue
269+
}
270+
271+
parts := strings.Split(pair, ":")
272+
if len(parts) != 2 {
273+
return nil, fmt.Errorf("invalid column mapping format at position %d: '%s', expected 'csv_column:db_column'", i+1, pair)
274+
}
275+
276+
csvCol := strings.TrimSpace(parts[0])
277+
dbCol := strings.TrimSpace(parts[1])
278+
279+
if csvCol == "" || dbCol == "" {
280+
return nil, fmt.Errorf("empty column name in mapping at position %d: '%s'", i+1, pair)
281+
}
282+
283+
mapping = append(mapping, csvcopy.ColumnMapping{
284+
CSVColumnName: csvCol,
285+
DatabaseColumnName: dbCol,
286+
})
287+
}
288+
289+
return mapping, nil
290+
}

0 commit comments

Comments
 (0)