Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 42 additions & 44 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1378,6 +1378,10 @@ void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvTxRollback> ev, c

void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx)
{
PQ_LOG_D("Handle TEvPQ::TEvTxRollback" <<
" Step " << ev->Get()->Step <<
", TxId " << ev->Get()->TxId);

ProcessPendingEvent(ev, ctx);
}

Expand Down Expand Up @@ -1496,7 +1500,7 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx)
break;
}
txSourceIds.insert(s.first);
PQ_LOG_D("Tx " << tx.GetTxId() << " affect SourceId " << s.first);
PQ_LOG_D("TxId " << tx.GetTxId() << " affect SourceId " << s.first);
}

if (auto inFlightIter = TxInflightMaxSeqNoPerSourceId.find(s.first); !inFlightIter.IsEnd()) {
Expand Down Expand Up @@ -2134,7 +2138,7 @@ size_t TPartition::GetUserActCount(const TString& consumer) const
void TPartition::ProcessTxsAndUserActs(const TActorContext& ctx)
{
if (KVWriteInProgress) {
PQ_LOG_D("Can't process txs");
PQ_LOG_D("Writing. Can't process transactions and user actions");
return;
}
if (DeletePartitionState == DELETION_INITED) {
Expand All @@ -2156,48 +2160,45 @@ void TPartition::ProcessTxsAndUserActs(const TActorContext& ctx)
return;
}
PQ_LOG_D("Batching state before ContinueProcessTxsAndUserActs: " << (int)BatchingState);
while (true) {
if (CanProcessUserActionAndTransactionEvents()) {
ContinueProcessTxsAndUserActs(ctx);
}
if (BatchingState == ETxBatchingState::PreProcessing) {
PQ_LOG_D("Still preprocessing - waiting for something");
return; // Still preprocessing - waiting for something;
}
PQ_LOG_D("Batching state after ContinueProcessTxsAndUserActs: " << (int)BatchingState);
if (CanProcessUserActionAndTransactionEvents()) {
ContinueProcessTxsAndUserActs(ctx);
}
// Still preprocessing? Waiting for something
if (CanProcessUserActionAndTransactionEvents()) {
PQ_LOG_D("Still preprocessing - waiting for something");
return;
}
PQ_LOG_D("Batching state after ContinueProcessTxsAndUserActs: " << (int)BatchingState);

// Preprocessing complete;
if (CurrentBatchSize > 0) {
Send(SelfId(), new TEvPQ::TEvTxBatchComplete(CurrentBatchSize));
}
CurrentBatchSize = 0;
// Preprocessing complete;
if (CurrentBatchSize > 0) {
PQ_LOG_D("Batch completed (" << CurrentBatchSize << ")");
Send(SelfId(), new TEvPQ::TEvTxBatchComplete(CurrentBatchSize));
}
CurrentBatchSize = 0;

if (UserActionAndTxPendingCommit.empty()) {
// Processing stopped and nothing to commit - finalize
BatchingState = ETxBatchingState::Finishing;
} else {
// Process commit queue
ProcessCommitQueue();
}
if (!UserActionAndTxPendingCommit.empty()) {
// Still pending for come commits
PQ_LOG_D("Still pending for come commits");
return;
}
// Commit queue processing complete. Now can either swith to persist or continue preprocessing;
if (BatchingState == ETxBatchingState::Finishing) { // Persist required;
RunPersist();
return;
}
BatchingState = ETxBatchingState::PreProcessing;
if (UserActionAndTxPendingCommit.empty()) {
// Processing stopped and nothing to commit - finalize
BatchingState = ETxBatchingState::Finishing;
} else {
// Process commit queue
ProcessCommitQueue();
}
// BatchingState can go to Finishing in ContinueProcessTxsAndUserActs. Therefore, it is necessary to check
// the size of the UserActionAndTxPendingCommit queue here.
if (!UserActionAndTxPendingCommit.empty()) {
// Still pending for come commits
PQ_LOG_D("Still pending for come commits");
return;
}
PQ_LOG_D("Try persist");
// Here we have an empty UserActionAndTxPendingCommit queue and BatchingState is equal to Finishing.
RunPersist();
}

bool TPartition::CanProcessUserActionAndTransactionEvents() const
{
return
(BatchingState == ETxBatchingState::PreProcessing) ||
(BatchingState == ETxBatchingState::Executing);
return (BatchingState == ETxBatchingState::PreProcessing);
}

void TPartition::ContinueProcessTxsAndUserActs(const TActorContext&)
Expand Down Expand Up @@ -2283,10 +2284,7 @@ void TPartition::ProcessCommitQueue() {
std::visit(visitor, event);
}
if (UserActionAndTxPendingCommit.empty()) {
TxAffectedConsumers.clear();
TxAffectedSourcesIds.clear();
Y_ABORT_UNLESS(UserActionAndTxPendingCommit.empty());
TransactionsInflight.clear();
BatchingState = ETxBatchingState::Finishing;
}
}

Expand Down Expand Up @@ -2509,13 +2507,13 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimple

auto result = EProcessResult::Continue;
if (t->SupportivePartitionActor && !t->WriteInfo && !t->WriteInfoApplied) { // Pending for write info
PQ_LOG_TX_D("The Tx " << t->GetTxId() << " is waiting for TEvGetWriteInfoResponse");
PQ_LOG_TX_D("The TxId " << t->GetTxId() << " is waiting for TEvGetWriteInfoResponse");
return EProcessResult::NotReady;
}
if (t->WriteInfo && !t->WriteInfoApplied) { //Recieved write info but not applied
result = ApplyWriteInfoResponse(*t);
if (!t->WriteInfoApplied) { // Tried to apply write info but couldn't - TX must be blocked.
PQ_LOG_TX_D("The Tx " << t->GetTxId() << " must be blocked");
PQ_LOG_TX_D("The TxId " << t->GetTxId() << " must be blocked");
Y_ABORT_UNLESS(result != EProcessResult::Continue);
return result;
}
Expand Down Expand Up @@ -2683,7 +2681,7 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr
break;
}
consumers.insert(consumer);
PQ_LOG_TX_D("Tx " << tx.TxId << " affect consumer " << consumer);
PQ_LOG_TX_D("TxId " << tx.TxId << " affect consumer " << consumer);
}
}

Expand Down
Loading
Loading