Skip to content

Conversation

MetalBlueberry
Copy link
Contributor

No description provided.

@MetalBlueberry MetalBlueberry force-pushed the vperez/implement-rewind-buffer branch from a0179e6 to b7f2705 Compare September 2, 2025 14:31
@MetalBlueberry MetalBlueberry changed the title feat: implement rewind buffer feat: Extend error handling to allow conflict resolution Sep 2, 2025
@MetalBlueberry MetalBlueberry marked this pull request as ready for review September 25, 2025 10:03
Copy link
Member

@arajkumar arajkumar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first pass!

copier, err := csvcopy.NewCopier(connStr, "test_metrics",
csvcopy.WithColumns("device_id,label,value"),
csvcopy.WithBatchSize(2),
csvcopy.WithBatchErrorHandler(BatchConflictHandler(WithConflictHandlerNext(csvcopy.BatchHandlerNoop()))),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, having next handler complicates the API. Do you think it would be useful? if so, why not just use array of batch error handlers.?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current design is a decorator pattern. A next handler is something is called after your function finishes and allows to chain options.

Now that you mention it, next it is provably not correct working, as the actual goal here is to fallback in case of not having a conflict error.

Having an array doesn't solve the problem. because if you do that, how the array should behave? execute all the handlers? just the last one? how do you combine outputs? .... it becomes hard to manage.

If we rename it, do you have any ideas for the name? 😅 I can't think of any good one.

Comment on lines +133 to +135
// Create a copy of the input data to avoid issues with caller reusing the slice
data := make([]byte, len(p))
copy(data, p)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no copy involved with net.Buffers., should we retain that property to avoid excessive garbage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

// Seek sets the position for next Read or Write operation
func (v *Seekable) Seek(offset int64, whence int) (int64, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a use-case for SeekCurrent and SeekEnd? I think we could have just stayed with net.Buffers and buf[0][0] might be good enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

net buffer discards data on read.

here is the implementation. on Read it consumes the bytes by resizing the slice. That is why this implementation uses the moving indexes instead. It allows to just reset it to come back to the start.

// Read from the buffers.
//
// Read implements [io.Reader] for [Buffers].
//
// Read modifies the slice v as well as v[i] for 0 <= i < len(v),
// but does not modify v[i][j] for any i, j.
func (v *Buffers) Read(p []byte) (n int, err error) {
	for len(p) > 0 && len(*v) > 0 {
		n0 := copy(p, (*v)[0])
		v.consume(int64(n0))
		p = p[n0:]
		n += n0
	}
	if len(*v) == 0 {
		err = io.EOF
	}
	return
}

func (v *Buffers) consume(n int64) {
	for len(*v) > 0 {
		ln0 := int64(len((*v)[0]))
		if ln0 > n {
			(*v)[0] = (*v)[0][n:]
			return
		}
		n -= ln0
		(*v)[0] = nil
		*v = (*v)[1:]
	}
}

defer workerWg.Done()
err := c.processBatches(ctx, batchChan)
// Add worker ID to context for all operations in this worker
workerCtx := WithWorkerID(ctx, i)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I failed to understand the usefulness of adding worker id into the ctx. Also, why adding worker id into the COPY statements will be useful?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. why adding worker id to the context
    Being in the context, it is automatically propagated across interfaces. This means you can implement the BatchErrorHandler and you will still receive the worker id over the context. Then you can use it to log which worker executed the handler. The main advantage is that if you want, you can forget it exists.

  2. why adding worker id to copy statements
    I hit a situation where there was a dead lock and the error statements display the entire query. having the id in there allows me to correlate the error with an specific worker. So it is easy for me to link app logs with postgres logs.

Comment on lines +479 to +482
// Add worker ID comment if available in context
if workerID := GetWorkerIDFromContext(ctx); workerID >= 0 {
baseCmd = fmt.Sprintf("/* Worker-%d */ %s", workerID, baseCmd)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I failed to understand the usefulness of adding worker id to the copy command.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why adding worker id to copy statements
I hit a situation where there was a dead lock and the error statements display the entire query. having the id in there allows me to correlate the error with an specific worker. So it is easy for me to link app logs with postgres logs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants