Skip to content

WIP kvcoord: avoid piggybacking on any early-returning batch #151916

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1900,9 +1900,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
}
}()

canParallelize := ba.Header.MaxSpanRequestKeys == 0 && ba.Header.TargetBytes == 0 &&
!ba.Header.ReturnOnRangeBoundary &&
!ba.Header.ReturnElasticCPUResumeSpans
canParallelize := !ba.MightStopEarly()
if ba.IsSingleCheckConsistencyRequest() {
// Don't parallelize full checksum requests as they have to touch the
// entirety of each replica of each range they touch.
Expand Down Expand Up @@ -2006,9 +2004,8 @@ func (ds *DistSender) divideAndSendBatchToRanges(
ba.UpdateTxn(resp.reply.Txn)
}

mightStopEarly := ba.MaxSpanRequestKeys > 0 || ba.TargetBytes > 0 || ba.ReturnOnRangeBoundary || ba.ReturnElasticCPUResumeSpans
// Check whether we've received enough responses to exit query loop.
if mightStopEarly {
if ba.MightStopEarly() {
var replyKeys int64
var replyBytes int64
for _, r := range resp.reply.Responses {
Expand Down
106 changes: 103 additions & 3 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1771,7 +1771,7 @@ func (twb *txnWriteBuffer) flushBufferAndSendBatch(

// SkipLocked reads cannot be in a batch with basically anything else. If we
// encounter one, we need to flush our buffer in its own batch.
splitBatchRequired := ba.WaitPolicy == lock.WaitPolicy_SkipLocked
splitBatchRequired := ba.WaitPolicy == lock.WaitPolicy_SkipLocked || ba.MightStopEarly()

// Flush all buffered writes by pre-pending them to the requests being sent
// in the batch.
Expand Down Expand Up @@ -1813,10 +1813,11 @@ func (twb *txnWriteBuffer) flushBufferAndSendBatch(
})

if splitBatchRequired {
log.VEventf(ctx, 2, "flushing buffer via separate batch")
flushBatch := ba.ShallowCopy()
flushBatch.WaitPolicy = 0
clearBatchRequestOptions(flushBatch)
flushBatch.Requests = reqs
log.VEventf(ctx, 2, "flushing %d buffered requests via separate batch")

br, pErr := twb.wrapped.SendLocked(ctx, flushBatch)
if pErr != nil {
pErr.Index = nil
Expand All @@ -1839,6 +1840,105 @@ func (twb *txnWriteBuffer) flushBufferAndSendBatch(
}
}

func clearBatchRequestOptions(ba *kvpb.BatchRequest) {
// TODO(ssd): Maintaining this list does not seem practical. I've made this
// annotated list for review, but we should consider how we want to construct
// this batch.

// Managed by store_send:
// ba.Timestamp
// ba.TimestampFromServerClock
// ba.Now

// Managed by dist_sender
// ba.Replica
// ba.RangeID

// TODO(review): It seems reasonable to reasonable to flush at the transaction
// priority.
// ba.UserPriority

// We want this batch to be part of the same transaction.
// ba.Txn

// If read consistency is set to anything but CONSISTENT, our flush will fail
// because we only allow inconsistent reads for read only requests.
ba.ReadConsistency = 0

// TODO(ssd): It could make sense to reset the routing policy to LEASEHOLDER
// since this is a write batch. But it won't affect correctness since this
// just affects the ordering that we try things in. But, it am not sure why we
// would have a NEAREST routing policy on a request that caused us to flush.
// ba.RoutingPolicy

// If WaitPolicy is set to SkipLocked, our request may fail validation.
ba.WaitPolicy = 0

// Using the configured lock timeout seems reasonable.
// ba.LockTimeout

// Reset options that could result in an early batch return.
ba.MaxSpanRequestKeys = 0
ba.TargetBytes = 0

// These two field only matter if the above to field are set
// ba.WholeRowsOfSize
// ba.AllowEmpty

// Isn't set by anyone currently. Reset it anyway.
ba.ReturnOnRangeBoundary = false

// Controlled by interceptors below us
// ba.DistinctSpans
// ba.AsyncConsensus
// ba.CanForwardReadTimestamp

// This should be the same, no reason to change
// ba.GatewayNodeID

// This can be set be the caller, but it seems fine to ask for range info on
// the flush.
// ba.ClientRangeInfo

// We shouldn't see this because it is only allowed via NegotiateAndSend which
// is only allowed for non-transactional requests.
// ba.BoundedStaleness

// No need to touch trace info
// ba.TraceInfo

// This is only used by Scan and ReverseScan requests using
// COL_BATCH_RESPONSE. We don't have those requests in a flush batch and we
// don't support that response type even if we did, so we can leave it.
// ba.IndexFetchSpec

// Only set by ExportRequest which we don't support here. Reset it anyway.
ba.ReturnElasticCPUResumeSpans = false

// Seems good to keep the labels, we could add some.
// ba.ProfileLabels

// Controlled by dist_sender
// ba.AmbiguousReplayProtection

// Flushes should be on the same connection as the original request
// ba.ConnectionClass

// Managed by dist_sender
// ba.ProxyRangeInfo

// TODO(ssd): We check this in validateBatch so we shouldn't have this. But
// the validation goes field by field, might be brittle.
// ba.WriteOptions

// Seems reasonable to use the same deadlock timeout as the inbound request
// for the flush.
// ba.DeadlockTimeout

// This one is controlled by us.
// ba.HasBufferedAllPrecedingWrites
}

// hasBufferedWrites returns whether the interceptor has buffered any writes
// locally.
func (twb *txnWriteBuffer) hasBufferedWrites() bool {
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,14 @@ func (ba *BatchRequest) ValidateForEvaluation() error {
return nil
}

// MightStopEarly returns true if any of the batch's options might result in the
// batch response being returned before all requests have been fully processed
// without an error.
func (ba *BatchRequest) MightStopEarly() bool {
h := ba.Header
return h.MaxSpanRequestKeys != 0 || h.TargetBytes != 0 || h.ReturnElasticCPUResumeSpans || h.ReturnOnRangeBoundary
}

func (cb ColBatches) Size() int {
var size int
for _, b := range cb.ColBatches {
Expand Down
Loading