diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 87d33510cd34..00fd84693189 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -1378,6 +1378,10 @@ void TPartition::ProcessPendingEvent(std::unique_ptr ev, c void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx) { + PQ_LOG_D("Handle TEvPQ::TEvTxRollback" << + " Step " << ev->Get()->Step << + ", TxId " << ev->Get()->TxId); + ProcessPendingEvent(ev, ctx); } @@ -1496,7 +1500,7 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx) break; } txSourceIds.insert(s.first); - PQ_LOG_D("Tx " << tx.GetTxId() << " affect SourceId " << s.first); + PQ_LOG_D("TxId " << tx.GetTxId() << " affect SourceId " << s.first); } if (auto inFlightIter = TxInflightMaxSeqNoPerSourceId.find(s.first); !inFlightIter.IsEnd()) { @@ -2134,7 +2138,7 @@ size_t TPartition::GetUserActCount(const TString& consumer) const void TPartition::ProcessTxsAndUserActs(const TActorContext& ctx) { if (KVWriteInProgress) { - PQ_LOG_D("Can't process txs"); + PQ_LOG_D("Writing. Can't process transactions and user actions"); return; } if (DeletePartitionState == DELETION_INITED) { @@ -2156,48 +2160,45 @@ void TPartition::ProcessTxsAndUserActs(const TActorContext& ctx) return; } PQ_LOG_D("Batching state before ContinueProcessTxsAndUserActs: " << (int)BatchingState); - while (true) { - if (CanProcessUserActionAndTransactionEvents()) { - ContinueProcessTxsAndUserActs(ctx); - } - if (BatchingState == ETxBatchingState::PreProcessing) { - PQ_LOG_D("Still preprocessing - waiting for something"); - return; // Still preprocessing - waiting for something; - } - PQ_LOG_D("Batching state after ContinueProcessTxsAndUserActs: " << (int)BatchingState); + if (CanProcessUserActionAndTransactionEvents()) { + ContinueProcessTxsAndUserActs(ctx); + } + // Still preprocessing? Waiting for something + if (CanProcessUserActionAndTransactionEvents()) { + PQ_LOG_D("Still preprocessing - waiting for something"); + return; + } + PQ_LOG_D("Batching state after ContinueProcessTxsAndUserActs: " << (int)BatchingState); - // Preprocessing complete; - if (CurrentBatchSize > 0) { - Send(SelfId(), new TEvPQ::TEvTxBatchComplete(CurrentBatchSize)); - } - CurrentBatchSize = 0; + // Preprocessing complete; + if (CurrentBatchSize > 0) { + PQ_LOG_D("Batch completed (" << CurrentBatchSize << ")"); + Send(SelfId(), new TEvPQ::TEvTxBatchComplete(CurrentBatchSize)); + } + CurrentBatchSize = 0; - if (UserActionAndTxPendingCommit.empty()) { - // Processing stopped and nothing to commit - finalize - BatchingState = ETxBatchingState::Finishing; - } else { - // Process commit queue - ProcessCommitQueue(); - } - if (!UserActionAndTxPendingCommit.empty()) { - // Still pending for come commits - PQ_LOG_D("Still pending for come commits"); - return; - } - // Commit queue processing complete. Now can either swith to persist or continue preprocessing; - if (BatchingState == ETxBatchingState::Finishing) { // Persist required; - RunPersist(); - return; - } - BatchingState = ETxBatchingState::PreProcessing; + if (UserActionAndTxPendingCommit.empty()) { + // Processing stopped and nothing to commit - finalize + BatchingState = ETxBatchingState::Finishing; + } else { + // Process commit queue + ProcessCommitQueue(); + } + // BatchingState can go to Finishing in ContinueProcessTxsAndUserActs. Therefore, it is necessary to check + // the size of the UserActionAndTxPendingCommit queue here. + if (!UserActionAndTxPendingCommit.empty()) { + // Still pending for come commits + PQ_LOG_D("Still pending for come commits"); + return; } + PQ_LOG_D("Try persist"); + // Here we have an empty UserActionAndTxPendingCommit queue and BatchingState is equal to Finishing. + RunPersist(); } bool TPartition::CanProcessUserActionAndTransactionEvents() const { - return - (BatchingState == ETxBatchingState::PreProcessing) || - (BatchingState == ETxBatchingState::Executing); + return (BatchingState == ETxBatchingState::PreProcessing); } void TPartition::ContinueProcessTxsAndUserActs(const TActorContext&) @@ -2283,10 +2284,7 @@ void TPartition::ProcessCommitQueue() { std::visit(visitor, event); } if (UserActionAndTxPendingCommit.empty()) { - TxAffectedConsumers.clear(); - TxAffectedSourcesIds.clear(); - Y_ABORT_UNLESS(UserActionAndTxPendingCommit.empty()); - TransactionsInflight.clear(); + BatchingState = ETxBatchingState::Finishing; } } @@ -2509,13 +2507,13 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimple auto result = EProcessResult::Continue; if (t->SupportivePartitionActor && !t->WriteInfo && !t->WriteInfoApplied) { // Pending for write info - PQ_LOG_TX_D("The Tx " << t->GetTxId() << " is waiting for TEvGetWriteInfoResponse"); + PQ_LOG_TX_D("The TxId " << t->GetTxId() << " is waiting for TEvGetWriteInfoResponse"); return EProcessResult::NotReady; } if (t->WriteInfo && !t->WriteInfoApplied) { //Recieved write info but not applied result = ApplyWriteInfoResponse(*t); if (!t->WriteInfoApplied) { // Tried to apply write info but couldn't - TX must be blocked. - PQ_LOG_TX_D("The Tx " << t->GetTxId() << " must be blocked"); + PQ_LOG_TX_D("The TxId " << t->GetTxId() << " must be blocked"); Y_ABORT_UNLESS(result != EProcessResult::Continue); return result; } @@ -2683,7 +2681,7 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr break; } consumers.insert(consumer); - PQ_LOG_TX_D("Tx " << tx.TxId << " affect consumer " << consumer); + PQ_LOG_TX_D("TxId " << tx.TxId << " affect consumer " << consumer); } } diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp index 115483c7329d..57411d353595 100644 --- a/ydb/core/persqueue/ut/partition_ut.cpp +++ b/ydb/core/persqueue/ut/partition_ut.cpp @@ -2230,16 +2230,19 @@ Y_UNIT_TEST_F(CorrectRange_Multiple_Transactions, TPartitionFixture) SendCommitTx(step, txId_1); + WaitCmdWrite({.Count=1, .PlanStep=step, .TxId=txId_1, .UserInfos={{1, {.Session=session, .Offset=1}}}}); + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + + WaitCommitTxDone({.TxId=txId_1, .Partition=TPartitionId(partition)}); + WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=TPartitionId(partition), .Predicate=false}); SendRollbackTx(step, txId_2); WaitCalcPredicateResult({.Step=step, .TxId=txId_3, .Partition=TPartitionId(partition), .Predicate=false}); SendRollbackTx(step, txId_3); - WaitCmdWrite({.Count=3, .PlanStep=step, .TxId=txId_3, .UserInfos={{1, {.Session=session, .Offset=1}}}}); + WaitCmdWrite({.Count=1, .PlanStep=step, .TxId=txId_3, .UserInfos={{1, {.Session=session, .Offset=1}}}}); SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); - - WaitCommitTxDone({.TxId=txId_1, .Partition=TPartitionId(partition)}); } Y_UNIT_TEST_F(CorrectRange_Multiple_Consumers, TPartitionFixture) @@ -2345,6 +2348,9 @@ Y_UNIT_TEST_F(CorrectRange_Rollback, TPartitionFixture) SendCalcPredicate(step, txId_2, client, 0, 5); SendRollbackTx(step, txId_1); + WaitCmdWrite({.Count=1, .PlanStep=step, .TxId=txId_1, .UserInfos={{1, {.Consumer="client", .Session="session", .Offset=0}}}}); + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=TPartitionId(partition), .Predicate=true}); } @@ -2902,27 +2908,31 @@ Y_UNIT_TEST_F(ConflictingActsInSeveralBatches, TPartitionTxTestHelper) { SendTxCommit(tx1); SendTxRollback(tx2); - ExpectNoKvRequest(); + WaitKvRequest(); + SendKvResponse(); WaitTxPredicateReply(tx3); WaitBatchCompletion(1); SendTxCommit(tx3); //2 Normal writes with src1 & src4 - WaitBatchCompletion(6 + 2); // Normal writes produce 1 act for each message ExpectNoTxPredicateReply(); WaitKvRequest(); SendKvResponse(); WaitCommitDone(tx1); WaitCommitDone(tx3); WaitTxPredicateReply(tx5); - WaitBatchCompletion(1); + WaitBatchCompletion(6 + 2); // Normal writes produce 1 act for each message SendTxCommit(tx5); - WaitBatchCompletion(1 + 6); //Normal write & immTx for src4; + WaitBatchCompletion(1); WaitKvRequest(); SendKvResponse(); WaitCommitDone(tx5); + + WaitBatchCompletion(1 + 6); //Normal write & immTx for src4; + WaitKvRequest(); + SendKvResponse(); WaitImmediateTxComplete(immTx1, true); } @@ -2972,7 +2982,6 @@ Y_UNIT_TEST_F(ConflictingTxProceedAfterRollback, TPartitionTxTestHelper) { WaitBatchCompletion(1); SendTxRollback(tx1); - ExpectNoKvRequest(); WaitTxPredicateReply(tx2); WaitBatchCompletion(2); @@ -3002,26 +3011,27 @@ Y_UNIT_TEST_F(ConflictingSrcIdForTxInDifferentBatches, TPartitionTxTestHelper) { Cerr << "Wait batch of 1 completion\n"; SendTxCommit(tx1); WaitBatchCompletion(1); - Cerr << "Expect no KV request\n"; - ExpectNoKvRequest(); + Cerr << "Expect KV request\n"; + WaitKvRequest(); + SendKvResponse(); WaitTxPredicateReply(tx2); SendTxCommit(tx2); - Cerr << "Waif or tx 3 predicate failure\n"; + Cerr << "Wait for tx 3 predicate failure\n"; WaitTxPredicateFailure(tx3); - Cerr << "Waif or tx 4 predicate failure\n"; + Cerr << "Wait for tx 4 predicate failure\n"; WaitTxPredicateFailure(tx4); Cerr << "Wait batch of 3 completion\n"; WaitBatchCompletion(1); // Immediate Tx 2 - 4. - Cerr << "Expect no KV request\n"; - ExpectNoKvRequest(); + Cerr << "Expect KV request\n"; + WaitKvRequest(); + SendKvResponse(); SendTxRollback(tx3); SendTxRollback(tx4); WaitBatchCompletion(2); // Immediate Tx 2 - 4. - ExpectNoCommitDone(); WaitKvRequest(); SendKvResponse(); Cerr << "Wait for commits\n"; @@ -3049,16 +3059,18 @@ Y_UNIT_TEST_F(ConflictingSrcIdTxAndWritesDifferentBatches, TPartitionTxTestHelpe SendTxCommit(tx1); WaitBatchCompletion(1); - ExpectNoKvRequest(); + WaitKvRequest(); + SendKvResponse(); + + WaitCommitDone(tx1); + WaitTxPredicateFailure(tx2); WaitTxPredicateReply(tx3); SendTxRollback(tx2); SendTxCommit(tx3); WaitBatchCompletion(2); // Tx 2 & 3. - ExpectNoCommitDone(); WaitKvRequest(); SendKvResponse(); - WaitCommitDone(tx1); WaitCommitDone(tx3); WaitBatchCompletion(3); WaitKvRequest(); @@ -3208,9 +3220,10 @@ Y_UNIT_TEST_F(DifferentWriteTxBatchingOptions, TPartitionTxTestHelper) { WaitBatchCompletion(1+1); ExpectNoKvRequest(); SendTxCommit(tx); - WaitBatchCompletion(1); EmulateKVTablet(); WaitCommitDone(tx); + WaitBatchCompletion(1); + EmulateKVTablet(); } { // 5. WriteTx -> ImmTx = 2 batches @@ -3224,13 +3237,13 @@ Y_UNIT_TEST_F(DifferentWriteTxBatchingOptions, TPartitionTxTestHelper) { WaitBatchCompletion(1+1); WaitTxPredicateReply(tx); SendTxCommit(tx); - WaitBatchCompletion(1); ExpectNoCommitDone(); EmulateKVTablet(); + WaitBatchCompletion(1); WaitCommitDone(tx); + EmulateKVTablet(); WaitImmediateTxComplete(immTx, true); } - } Y_UNIT_TEST_F(FailedTxsDontBlock, TPartitionTxTestHelper) { Init({.WriterSessions={"src1", "src2"}, .EndOffset = 1}); @@ -3325,16 +3338,16 @@ Y_UNIT_TEST_F(ConflictingCommitsInSeveralBatches, TPartitionTxTestHelper) { //Just block processing so every message arrives before batching starts auto txTmp = MakeAndSendWriteTx({}); - MakeAndSendNormalOffsetCommit(1, 2); + MakeAndSendNormalOffsetCommit(1, 2); // act-1 auto tx1 = MakeAndSendTxOffsetCommit(1, 2, 5); auto tx2 = MakeAndSendTxOffsetCommit(1, 5, 10); - MakeAndSendNormalOffsetCommit(1, 20); + MakeAndSendNormalOffsetCommit(1, 20); // act-2 ResetBatchCompletion(); WaitWriteInfoRequest(txTmp, true); WaitTxPredicateReply(txTmp); - WaitBatchCompletion(2); + WaitBatchCompletion(2); // txTmp + act-1 SendTxRollback(txTmp); ExpectNoTxPredicateReply(); @@ -3342,22 +3355,22 @@ Y_UNIT_TEST_F(ConflictingCommitsInSeveralBatches, TPartitionTxTestHelper) { SendKvResponse(); WaitTxPredicateReply(tx1); - WaitBatchCompletion(1); + WaitBatchCompletion(1); // tx1 ExpectNoTxPredicateReply(); SendTxCommit(tx1); - ExpectNoKvRequest(); WaitTxPredicateReply(tx2); - WaitBatchCompletion(1); SendTxCommit(tx2); - WaitBatchCompletion(1); + WaitBatchCompletion(1); // tx2 WaitKvRequest(); SendKvResponse(); WaitCommitDone(tx1); WaitCommitDone(tx2); - + WaitBatchCompletion(1); // act-2 + WaitKvRequest(); + SendKvResponse(); txTmp = MakeAndSendWriteTx({}); auto immTx1 = MakeAndSendImmediateTxOffsetCommit(2, 0, 5); @@ -3366,7 +3379,7 @@ Y_UNIT_TEST_F(ConflictingCommitsInSeveralBatches, TPartitionTxTestHelper) { WaitTxPredicateReply(txTmp); SendTxRollback(txTmp); - WaitBatchCompletion(2 + 1); + WaitBatchCompletion(3); WaitKvRequest(); SendKvResponse(); WaitImmediateTxComplete(immTx1, true); @@ -3454,7 +3467,8 @@ Y_UNIT_TEST_F(ConflictingCommitProccesAfterRollback, TPartitionTxTestHelper) { WaitBatchCompletion(1); SendTxRollback(tx1); - ExpectNoKvRequest(); + WaitKvRequest(); + SendKvResponse(); WaitTxPredicateReply(tx2); WaitBatchCompletion(1); @@ -3629,19 +3643,27 @@ Y_UNIT_TEST_F(TEvTxCalcPredicate_Without_Conflicts, TPartitionTxTestHelper) { Init(); - auto tx1 = MakeAndSendWriteTx({{"sourceid", {1, 3}}}); + auto tx1 = MakeAndSendWriteTx({{"sourceid-1", {1, 3}}}); WaitWriteInfoRequest(tx1); SendWriteInfoResponse(tx1); WaitTxPredicateReply(tx1); - auto tx2 = MakeAndSendWriteTx({{"another-sourceid", {1, 3}}}); + SendTxCommit(tx1); + EmulateKVTablet(); + + auto tx2 = MakeAndSendWriteTx({{"sourceid-2", {1, 3}}}); + auto tx3 = MakeAndSendWriteTx({{"sourceid-3", {1, 3}}}); WaitWriteInfoRequest(tx2); + WaitWriteInfoRequest(tx3); + + SendWriteInfoResponse(tx3); SendWriteInfoResponse(tx2); WaitTxPredicateReply(tx2); + WaitTxPredicateReply(tx3); } Y_UNIT_TEST_F(TEvTxCalcPredicate_With_Conflicts, TPartitionTxTestHelper)