Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ Usage of timescaledb-parallel-copy:
Number of rows to insert overall; 0 means to insert all
-log-batches
Whether to time individual batches.
-on-conflict-do-nothing
Skip duplicate rows on unique constraint violations
-quote character
The QUOTE character to use during COPY (default '"')
-reporting-period duration
Expand Down Expand Up @@ -362,3 +364,20 @@ timestamp,temperature,humidity
```

Both files can use the same mapping configuration and import successfully into the same database table, even though they use different column names for the temperature data. The tool only validates for duplicate database columns among the columns actually present in each specific input file.

### Conflict Resolution

Use `--on-conflict-do-nothing` to automatically skip duplicate rows when unique constraint violations occur:

```bash
# Skip duplicate rows and continue importing
$ timescaledb-parallel-copy --connection $DATABASE_URL --table metrics --file data.csv \
--on-conflict-do-nothing
```

This uses PostgreSQL's `ON CONFLICT DO NOTHING` clause to ignore rows that would violate unique constraints, allowing the import to continue with just the non-duplicate data.

Note that this statement is not allowed within a `COPY FROM`. The tool will fallback to moving your data into a temporal table and running `INSERT INTO ... SELECT * FROM ... ON CONFLICT DO NOTHING`.

This flag is intended to detect real duplicates and not incremental changes to rows. This means it is safe to use this setting is you expect your data to have duplicate rows, but it is not ok to use this as an ingestion pipeline where you expect updates for the same unique constraint.

13 changes: 13 additions & 0 deletions cmd/timescaledb-parallel-copy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/timescale/timescaledb-parallel-copy/pkg/csvcopy"
"github.com/timescale/timescaledb-parallel-copy/pkg/errorhandlers"
)

const (
Expand Down Expand Up @@ -55,6 +56,8 @@ var (
verbose bool
showVersion bool

onConflictDoNothing bool

dbName string
)

Expand Down Expand Up @@ -92,6 +95,8 @@ func init() {
flag.DurationVar(&reportingPeriod, "reporting-period", 0*time.Second, "Period to report insert stats; if 0s, intermediate results will not be reported")
flag.BoolVar(&verbose, "verbose", false, "Print more information about copying statistics")

flag.BoolVar(&onConflictDoNothing, "on-conflict-do-nothing", false, "Skip duplicate rows on unique constraint violations")

flag.BoolVar(&showVersion, "version", false, "Show the version of this tool")

flag.Parse()
Expand All @@ -117,6 +122,7 @@ func main() {
log.Fatalf("Error: -header-line-count is deprecated. Use -skip-lines instead")
}


logger := &csvCopierLogger{}

opts := []csvcopy.Option{
Expand Down Expand Up @@ -157,6 +163,13 @@ func main() {
if skipBatchErrors {
batchErrorHandler = csvcopy.BatchHandlerNoop()
}

if onConflictDoNothing {
batchErrorHandler = errorhandlers.BatchConflictHandler(
errorhandlers.WithConflictHandlerNext(batchErrorHandler),
)
}

if verbose || skipBatchErrors {
batchErrorHandler = csvcopy.BatchHandlerLog(logger, batchErrorHandler)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.22.0
toolchain go1.23.4

require (
github.com/google/uuid v1.6.0
github.com/jackc/pgx/v5 v5.7.2
github.com/jmoiron/sqlx v1.4.0
github.com/stretchr/testify v1.10.0
Expand Down Expand Up @@ -34,6 +33,7 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
Expand Down
171 changes: 171 additions & 0 deletions pkg/buffer/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Package seekablebuffers provides a seekable wrapper around net.Buffers
// that enables retry functionality for database copy operations.
package buffer

import (
"fmt"
"io"
)

// Buffers contains zero or more runs of bytes to write.
//
// On certain machines, for certain types of connections, this is
// optimized into an OS-specific batch write operation (such as
// "writev").
type Seekable struct {
buf [][]byte
position int64
}

var (
_ io.WriterTo = (*Seekable)(nil)
_ io.Reader = (*Seekable)(nil)
_ io.Writer = (*Seekable)(nil)
_ io.Seeker = (*Seekable)(nil)
)

func NewSeekable(buf [][]byte) *Seekable {
return &Seekable{
buf: buf,
position: 0,
}
}

func (v *Seekable) HasData() bool {
return v.position < v.TotalSize()
}

func (v *Seekable) TotalSize() int64 {
var size int64
for _, b := range v.buf {
size += int64(len(b))
}
return size
}

// WriteTo writes contents of the buffers to w.
//
// WriteTo implements [io.WriterTo] for [Buffers].
//
// WriteTo modifies the slice v as well as v[i] for 0 <= i < len(v),
// but does not modify v[i][j] for any i, j.
func (v *Seekable) WriteTo(w io.Writer) (n int64, err error) {
if v.position >= v.TotalSize() {
return 0, nil
}

currentPos := v.position

for _, buf := range v.buf {
bufLen := int64(len(buf))
if currentPos >= bufLen {
currentPos -= bufLen
continue
}

startInBuf := currentPos
bytesToWrite := buf[startInBuf:]

nb, err := w.Write(bytesToWrite)
n += int64(nb)
if err != nil {
v.position += n
return n, err
}
currentPos = 0
}

v.position += n
return n, nil
}

// Read from the buffers.
//
// Read implements [io.Reader] for [Buffers].
//
// Read modifies the slice v as well as v[i] for 0 <= i < len(v),
// but does not modify v[i][j] for any i, j.
func (v *Seekable) Read(p []byte) (n int, err error) {
if v.position >= v.TotalSize() {
return 0, io.EOF
}

remaining := len(p)
currentPos := v.position

for i, buf := range v.buf {
if remaining == 0 {
break
}

bufLen := int64(len(buf))
if currentPos >= bufLen {
currentPos -= bufLen
continue
}

startInBuf := currentPos
bytesToRead := bufLen - startInBuf
if int64(remaining) < bytesToRead {
bytesToRead = int64(remaining)
}

copied := copy(p[n:], buf[startInBuf:startInBuf+bytesToRead])
n += copied
remaining -= copied
currentPos = 0

if i == len(v.buf)-1 && n < len(p) {
err = io.EOF
}
}

v.position += int64(n)
return n, err
}

// Write appends data to the buffer
func (v *Seekable) Write(p []byte) (n int, err error) {
if len(p) == 0 {
return 0, nil
}

// Create a copy of the input data to avoid issues with caller reusing the slice
data := make([]byte, len(p))
copy(data, p)
Comment on lines +133 to +135
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no copy involved with net.Buffers., should we retain that property to avoid excessive garbage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


v.buf = append(v.buf, data)
return len(p), nil
}

// WriteString appends a string to the buffer
func (v *Seekable) WriteString(s string) (n int, err error) {
return v.Write([]byte(s))
}

// Seek sets the position for next Read or Write operation
func (v *Seekable) Seek(offset int64, whence int) (int64, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a use-case for SeekCurrent and SeekEnd? I think we could have just stayed with net.Buffers and buf[0][0] might be good enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

net buffer discards data on read.

here is the implementation. on Read it consumes the bytes by resizing the slice. That is why this implementation uses the moving indexes instead. It allows to just reset it to come back to the start.

// Read from the buffers.
//
// Read implements [io.Reader] for [Buffers].
//
// Read modifies the slice v as well as v[i] for 0 <= i < len(v),
// but does not modify v[i][j] for any i, j.
func (v *Buffers) Read(p []byte) (n int, err error) {
	for len(p) > 0 && len(*v) > 0 {
		n0 := copy(p, (*v)[0])
		v.consume(int64(n0))
		p = p[n0:]
		n += n0
	}
	if len(*v) == 0 {
		err = io.EOF
	}
	return
}

func (v *Buffers) consume(n int64) {
	for len(*v) > 0 {
		ln0 := int64(len((*v)[0]))
		if ln0 > n {
			(*v)[0] = (*v)[0][n:]
			return
		}
		n -= ln0
		(*v)[0] = nil
		*v = (*v)[1:]
	}
}

totalSize := v.TotalSize()

var newPos int64
switch whence {
case io.SeekStart:
newPos = offset
case io.SeekCurrent:
newPos = v.position + offset
case io.SeekEnd:
newPos = totalSize + offset
default:
return v.position, fmt.Errorf("invalid whence value: %d", whence)
}

if newPos < 0 {
return v.position, fmt.Errorf("seek position cannot be negative: %d", newPos)
}
if newPos > totalSize {
return v.position, fmt.Errorf("seek position beyond buffer size: %d > %d", newPos, totalSize)
}

v.position = newPos
return v.position, nil
}
Loading
Loading