Skip to content

release-25.3.1-rc: kvcoord: fix txnWriteBuffer for batches with limits and Dels #151901

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: release-25.3.1-rc
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1080,9 +1080,8 @@ func (ds *DistSender) initAndVerifyBatch(ctx context.Context, ba *kvpb.BatchRequ
}

if ba.MaxSpanRequestKeys != 0 || ba.TargetBytes != 0 {
// Verify that the batch contains only specific range requests or the
// EndTxnRequest. Verify that a batch with a ReverseScan only contains
// ReverseScan range requests.
// Verify that the batch contains only specific requests. Verify that a
// batch with a ReverseScan only contains ReverseScan range requests.
var foundForward, foundReverse bool
for _, req := range ba.Requests {
inner := req.GetInner()
Expand Down
49 changes: 40 additions & 9 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,15 @@ func (twb *txnWriteBuffer) validateRequests(ba *kvpb.BatchRequest) error {
if t.OriginTimestamp.IsSet() {
return unsupportedOptionError(t.Method(), "OriginTimestamp")
}
assertTrue(ba.MaxSpanRequestKeys == 0 && ba.TargetBytes == 0, "unexpectedly found CPut in a BatchRequest with a limit")
case *kvpb.PutRequest:
// TODO(yuzefovich): the DistSender allows Puts to be in batches
// with limits, which can happen when we're forced to flush the
// buffered Puts, and the batch we piggy-back on has a limit set.
// However, SQL never constructs such a batch on its own, so we're
// asserting the expectations from SQL. Figure out how to reconcile
// this with more permissive DistSender-level checks.
assertTrue(ba.MaxSpanRequestKeys == 0 && ba.TargetBytes == 0, "unexpectedly found Put in a BatchRequest with a limit")
case *kvpb.DeleteRequest:
case *kvpb.GetRequest:
// ReturnRawMVCCValues is unsupported because we don't know how to serve
Expand Down Expand Up @@ -1380,6 +1388,11 @@ func (rr requestRecord) toResp(
// We only use the response from KV if there wasn't already a
// buffered value for this key that our transaction wrote
// previously.
// TODO(yuzefovich): for completeness, we should check whether
// ResumeSpan is non-nil, in which case the response from KV is
// incomplete. This can happen when MaxSpanRequestKeys and/or
// TargetBytes limits are set on the batch, and SQL currently
// doesn't do that for batches with CPuts.
val = br.GetInner().(*kvpb.GetResponse).Value
}

Expand Down Expand Up @@ -1411,6 +1424,11 @@ func (rr requestRecord) toResp(

case *kvpb.PutRequest:
var dla *bufferedDurableLockAcquisition
// TODO(yuzefovich): for completeness, we should check whether
// ResumeSpan is non-nil if we transformed the request, in which case
// the response from KV is incomplete. This can happen when
// MaxSpanRequestKeys and/or TargetBytes limits are set on the batch,
// and SQL currently doesn't do that for batches with Puts.
if rr.transformed && exclusionTimestampRequired {
dla = &bufferedDurableLockAcquisition{
str: lock.Exclusive,
Expand All @@ -1424,19 +1442,20 @@ func (rr requestRecord) toResp(
case *kvpb.DeleteRequest:
// To correctly populate FoundKey in the response, we must prefer any
// buffered values (if they exist).
var foundKey bool
var resp kvpb.DeleteResponse
val, _, served := twb.maybeServeRead(req.Key, req.Sequence)
if served {
log.VEventf(ctx, 2, "serving read portion of %s on key %s from the buffer", req.Method(), req.Key)
foundKey = val.IsPresent()
resp.FoundKey = val.IsPresent()
} else if rr.transformed {
// We sent a GetRequest to the KV layer to acquire an exclusive lock
// on the key, populate FoundKey using the response.
getResp := br.GetInner().(*kvpb.GetResponse)
if log.ExpensiveLogEnabled(ctx, 2) {
log.Eventf(ctx, "synthesizing DeleteResponse from GetResponse: %#v", getResp)
}
foundKey = getResp.Value.IsPresent()
resp.FoundKey = getResp.Value.IsPresent()
resp.ResumeSpan = getResp.ResumeSpan
} else {
// NB: If MustAcquireExclusiveLock wasn't set by the client then we
// eschew sending a Get request to the KV layer just to populate
Expand All @@ -1448,7 +1467,14 @@ func (rr requestRecord) toResp(
// TODO(arul): improve the FoundKey semantics to have callers opt
// into whether the care about the key being found. Alternatively,
// clarify the behaviour on DeleteRequest.
foundKey = false
resp.FoundKey = false
}

ru.MustSetInner(&resp)
if resp.ResumeSpan != nil {
// When the Get was incomplete, we haven't actually processed this
// Del, so we cannot buffer the write.
break
}

var dla *bufferedDurableLockAcquisition
Expand All @@ -1460,14 +1486,16 @@ func (rr requestRecord) toResp(
}
}

ru.MustSetInner(&kvpb.DeleteResponse{
FoundKey: foundKey,
})
twb.addToBuffer(req.Key, roachpb.Value{}, req.Sequence, req.KVNemesisSeq, dla)

case *kvpb.GetRequest:
val, _, served := twb.maybeServeRead(req.Key, req.Sequence)
if served {
// TODO(yuzefovich): we're effectively ignoring the limits of
// BatchRequest when serving the Get from the buffer. We should
// consider setting the ResumeSpan if a limit has already been
// reached by this point. This will force us to set ResumeSpan on
// all remaining requests in the batch.
getResp := &kvpb.GetResponse{}
if val.IsPresent() {
getResp.Value = val
Expand Down Expand Up @@ -2417,8 +2445,6 @@ func (s *respIter) startKey() roachpb.Key {
// For ReverseScans, the EndKey of the ResumeSpan is updated to indicate the
// start key for the "next" page, which is exactly the last key that was
// reverse-scanned for the current response.
// TODO(yuzefovich): we should have some unit tests that exercise the
// ResumeSpan case.
if s.resumeSpan != nil {
return s.resumeSpan.EndKey
}
Expand Down Expand Up @@ -2489,6 +2515,11 @@ func makeRespSizeHelper(it *respIter) respSizeHelper {
}

func (h *respSizeHelper) acceptBuffer(key roachpb.Key, value *roachpb.Value) {
// TODO(yuzefovich): we're effectively ignoring the limits of BatchRequest
// when serving the reads from the buffer. We should consider checking how
// many keys and bytes have already been included to see whether we've
// reached a limit, and set the ResumeSpan if so (which can result in some
// wasted work by the server).
h.numKeys++
lenKV, _ := encKVLength(key, value)
h.numBytes += int64(lenKV)
Expand Down
69 changes: 69 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1331,6 +1331,75 @@ func TestTxnWriteBufferRespectsMustAcquireExclusiveLock(t *testing.T) {
require.IsType(t, &kvpb.EndTxnResponse{}, br.Responses[0].GetInner())
}

// TestTxnWriteBufferResumeSpans verifies that the txnWriteBuffer behaves
// correctly in presence of BatchRequest's limits that result in non-nil
// ResumeSpans.
// TODO(yuzefovich): extend the test for Scans and ReverseScans.
func TestTxnWriteBufferResumeSpans(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
twb, mockSender, _ := makeMockTxnWriteBuffer(ctx)

txn := makeTxnProto()
txn.Sequence = 1
keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")

// Delete 3 keys while setting MaxSpanRequestKeys to 2 (only the first two
// Dels should be processed).
ba := &kvpb.BatchRequest{}
ba.Header = kvpb.Header{Txn: &txn, MaxSpanRequestKeys: 2}
for _, k := range []roachpb.Key{keyA, keyB, keyC} {
del := delArgs(k, txn.Sequence)
// Set MustAcquireExclusiveLock so that Del is transformed into Get.
del.MustAcquireExclusiveLock = true
ba.Add(del)
}

// Simulate a scenario where each transformed Get finds something and the
// limit is reached after the second Get.
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Equal(t, int64(2), ba.MaxSpanRequestKeys)
require.Len(t, ba.Requests, 3)
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[0].GetInner())
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[1].GetInner())
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[2].GetInner())

br := ba.CreateReply()
br.Txn = ba.Txn
br.Responses = []kvpb.ResponseUnion{
{Value: &kvpb.ResponseUnion_Get{
Get: &kvpb.GetResponse{Value: &roachpb.Value{RawBytes: []byte("a")}},
}},
{Value: &kvpb.ResponseUnion_Get{
Get: &kvpb.GetResponse{Value: &roachpb.Value{RawBytes: []byte("b")}},
}},
{Value: &kvpb.ResponseUnion_Get{
Get: &kvpb.GetResponse{ResponseHeader: kvpb.ResponseHeader{
ResumeSpan: &roachpb.Span{Key: keyC},
}},
}},
}
return br, nil
})

br, pErr := twb.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)

// Even though the txnWriteBuffer did not send any Del requests to the KV
// layer above, the responses should still be populated.
require.Len(t, br.Responses, 3)
require.Equal(t, &kvpb.DeleteResponse{FoundKey: true}, br.Responses[0].GetInner())
require.Equal(t, &kvpb.DeleteResponse{FoundKey: true}, br.Responses[1].GetInner())
// The last Del wasn't processed, so we should see the ResumeSpan set in the
// header.
require.NotNil(t, br.Responses[2].GetInner().(*kvpb.DeleteResponse).ResumeSpan)

// Verify that only two writes are buffered.
require.Equal(t, 2, len(twb.testingBufferedWritesAsSlice()))
}

// TestTxnWriteBufferMustSortBatchesBySequenceNumber verifies that flushed
// batches are sorted in sequence number order, as currently required by the txn
// pipeliner interceptor.
Expand Down
9 changes: 7 additions & 2 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2779,11 +2779,16 @@ message Header {
// - RevertRangeRequest
// - ResolveIntentRangeRequest
// - QueryLocksRequest
// - IsSpanEmptyRequest
//
// The following two requests types are also allowed in the batch, although
// the limit has no effect on them:
// The following requests types are also allowed in the batch, although the
// limit has no effect on them:
// - ExportRequest
// - QueryIntentRequest
// - EndTxnRequest
// - ResolveIntentRequest
// - DeleteRequest
// - PutRequest
//
// [*] DeleteRangeRequests are generally not allowed to be batched together
// with a commit (i.e. 1PC), except if Require1PC is also set. See #37457.
Expand Down