Skip to content

TxMessageAttempts optionally ignores offline attempts #398

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
- Support for TransactionMessageAttempts/-RetryInterval ([#345](https://github.com/matth-x/MicroOcpp/pull/345))
- Heap profiler and custom allocator support ([#350](https://github.com/matth-x/MicroOcpp/pull/350))
- Migration of persistent storage ([#355](https://github.com/matth-x/MicroOcpp/pull/355))
- Build flag `MO_TX_ATTEMPT_TIMEOUT` to ignore offline attempts ([#398](https://github.com/matth-x/MicroOcpp/pull/398))

### Removed

Expand Down
92 changes: 66 additions & 26 deletions src/MicroOcpp/Model/ConnectorBase/Connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,30 @@ Connector::Connector(Context& context, std::shared_ptr<FilesystemAdapter> filesy
txNrFront = txNrBegin;

if (model.getTransactionStore()) {
unsigned int txNrLatest = (txNrEnd + MAX_TX_CNT - 1) % MAX_TX_CNT; //txNr of the most recent tx on flash
transaction = model.getTransactionStore()->getTransaction(connectorId, txNrLatest); //returns nullptr if txNrLatest does not exist on flash

//find and preload front transaction
unsigned int txSize = (txNrEnd + MAX_TX_CNT - txNrFront) % MAX_TX_CNT;
for (unsigned int i = 0; i < txSize; i++) {

transactionFront = model.getTransactionStore()->getTransaction(connectorId, txNrFront);

if (!transactionFront || (transactionFront->isAborted() || transactionFront->isCompleted() || transactionFront->isSilent())) {
//advance front
transactionFront = nullptr;
txNrFront = (txNrFront + 1) % MAX_TX_CNT;
MO_DBG_VERBOSE("txNrBegin=%u, txNrFront=%u, txNrEnd=%u", txNrBegin, txNrFront, txNrEnd);
} else {
//front is accurate. Done here
break;
}
}

//preload back transaction
if (txNrEnd != txNrBegin) {
//there exists at least 1 transaction. Take most recent
unsigned int txNrLatest = (txNrEnd + MAX_TX_CNT - 1) % MAX_TX_CNT; //txNr of the most recent tx on flash
transaction = model.getTransactionStore()->getTransaction(connectorId, txNrLatest); //returns nullptr if txNrLatest does not exist on flash
}
} else {
MO_DBG_ERR("must initialize TxStore before Connector");
}
Expand Down Expand Up @@ -249,9 +271,11 @@ void Connector::loop() {
}
}

if (transaction && ((transaction->isAborted() && MO_TX_CLEAN_ABORTED) || (transaction->isSilent() && transaction->getStopSync().isRequested()))) {
if (transaction && ((transaction->isAborted() && MO_TX_CLEAN_ABORTED) || (transaction->isSilent() && transaction->getStopSync().isRequested())) &&
transaction->getTxNr() != txNrFront) {
//If the transaction is aborted (invalidated before started) or is silent and has stopped. Delete all artifacts from flash
//This is an optimization. The memory management will attempt to remove those files again later
//Don't do this if tx is at front. Then, `getFrontRequestOpNr()` has responsibility for object lifecycle
bool removed = true;
if (auto mService = model.getMeteringService()) {
mService->abortTxMeterData(connectorId);
Expand All @@ -263,9 +287,6 @@ void Connector::loop() {
}

if (removed) {
if (txNrFront == txNrEnd) {
txNrFront = transaction->getTxNr();
}
txNrEnd = transaction->getTxNr(); //roll back creation of last tx entry
}

Expand Down Expand Up @@ -517,7 +538,6 @@ void Connector::loop() {
makeRequest(
new Ocpp16::StatusNotification(connectorId, reportedStatus, reportedTimestamp, errorData));

statusNotification->setTimeout(0);
context.initiateRequest(std::move(statusNotification));
return;
}
Expand Down Expand Up @@ -549,10 +569,10 @@ std::shared_ptr<Transaction> Connector::allocateTransaction() {

std::shared_ptr<Transaction> tx;

//clean possible aborted tx
//clean possible aborted tx. Go from one after front to end (all unsent messages, but skip front)
unsigned int txr = txNrEnd;
unsigned int txSize = (txNrEnd + MAX_TX_CNT - txNrBegin) % MAX_TX_CNT;
for (unsigned int i = 0; i < txSize; i++) {
unsigned int txSize = (txNrEnd + MAX_TX_CNT - txNrFront) % MAX_TX_CNT;
for (unsigned int i = 0; i + 1 < txSize; i++) {
txr = (txr + MAX_TX_CNT - 1) % MAX_TX_CNT; //decrement by 1

auto tx = model.getTransactionStore()->getTransaction(connectorId, txr);
Expand All @@ -567,9 +587,6 @@ std::shared_ptr<Transaction> Connector::allocateTransaction() {
removed &= model.getTransactionStore()->remove(connectorId, txr);
}
if (removed) {
if (txNrFront == txNrEnd) {
txNrFront = txr;
}
txNrEnd = txr;
MO_DBG_WARN("deleted dangling silent or aborted tx for new transaction");
} else {
Expand All @@ -593,7 +610,7 @@ std::shared_ptr<Transaction> Connector::allocateTransaction() {
//could not create transaction - now, try to replace tx history entry

unsigned int txl = txNrBegin;
txSize = (txNrEnd + MAX_TX_CNT - txNrBegin) % MAX_TX_CNT;
txSize = (txNrFront + MAX_TX_CNT - txNrBegin) % MAX_TX_CNT; //range from oldest history tx to one tx before front

for (unsigned int i = 0; i < txSize; i++) {

Expand All @@ -617,9 +634,6 @@ std::shared_ptr<Transaction> Connector::allocateTransaction() {
}
if (removed) {
txNrBegin = (txl + 1) % MAX_TX_CNT;
if (txNrFront == txl) {
txNrFront = txNrBegin;
}
MO_DBG_DEBUG("deleted tx history entry for new transaction");
MO_DBG_VERBOSE("txNrBegin=%u, txNrFront=%u, txNrEnd=%u", txNrBegin, txNrFront, txNrEnd);

Expand Down Expand Up @@ -1085,13 +1099,6 @@ unsigned int Connector::getFrontRequestOpNr() {

unsigned int txSize = (txNrEnd + MAX_TX_CNT - txNrFront) % MAX_TX_CNT;

if (transactionFront && txSize == 0) {
//catch edge case where txBack has been rolled back and txFront was equal to txBack
MO_DBG_DEBUG("collect front transaction %u-%u after tx rollback", connectorId, transactionFront->getTxNr());
MO_DBG_VERBOSE("txNrBegin=%u, txNrFront=%u, txNrEnd=%u", txNrBegin, txNrFront, txNrEnd);
transactionFront = nullptr;
}

for (unsigned int i = 0; i < txSize; i++) {

if (!transactionFront) {
Expand All @@ -1105,9 +1112,14 @@ unsigned int Connector::getFrontRequestOpNr() {
#endif
}

if (transactionFront && (transactionFront->isAborted() || transactionFront->isCompleted() || transactionFront->isSilent())) {
if (transactionFront && transactionFront == transaction) {
// Don't advance front tx beyond back tx
break;
}

if (!transactionFront || (transactionFront->isAborted() || transactionFront->isCompleted() || transactionFront->isSilent())) {
//advance front
MO_DBG_DEBUG("collect front transaction %u-%u", connectorId, transactionFront->getTxNr());
MO_DBG_DEBUG("collect front transaction %u-%u", connectorId, txNrFront);
transactionFront = nullptr;
txNrFront = (txNrFront + 1) % MAX_TX_CNT;
MO_DBG_VERBOSE("txNrBegin=%u, txNrFront=%u, txNrEnd=%u", txNrBegin, txNrFront, txNrEnd);
Expand Down Expand Up @@ -1172,6 +1184,7 @@ std::unique_ptr<Request> Connector::fetchFrontRequest() {
return nullptr;
}

const auto attemptNr_capture = transactionFront->getStartSync().getAttemptNr();
transactionFront->getStartSync().advanceAttemptNr();
transactionFront->getStartSync().setAttemptTime(model.getClock().now());
transactionFront->commit();
Expand Down Expand Up @@ -1203,6 +1216,19 @@ std::unique_ptr<Request> Connector::fetchFrontRequest() {
}
});

#if MO_TX_ATTEMPT_TIMEOUT == 0
//a timeout should not increase the attemptNr. Roll back to previous attemptNr
startTx->setOnTimeoutListener([transactionFront_capture, attemptNr_capture] () {
MO_DBG_DEBUG("StartTx timeout -> roll back attemptNr and try again");
if (transactionFront_capture) {
transactionFront_capture->getStartSync().setAttemptNr(attemptNr_capture);
transactionFront_capture->commit();
}
});
#else
(void)attemptNr_capture;
#endif

return startTx;
}

Expand All @@ -1229,6 +1255,7 @@ std::unique_ptr<Request> Connector::fetchFrontRequest() {
return nullptr;
}

const auto attemptNr_capture = transactionFront->getStopSync().getAttemptNr();
transactionFront->getStopSync().advanceAttemptNr();
transactionFront->getStopSync().setAttemptTime(model.getClock().now());
transactionFront->commit();
Expand Down Expand Up @@ -1264,6 +1291,19 @@ std::unique_ptr<Request> Connector::fetchFrontRequest() {
}
});

#if MO_TX_ATTEMPT_TIMEOUT == 0
//a timeout should not increase the attemptNr. Roll back to previous attemptNr
stopTx->setOnTimeoutListener([transactionFront_capture, attemptNr_capture] () {
MO_DBG_DEBUG("StopTx timeout -> roll back attemptNr and try again");
if (transactionFront_capture) {
transactionFront_capture->getStopSync().setAttemptNr(attemptNr_capture);
transactionFront_capture->commit();
}
});
#else
(void)attemptNr_capture;
#endif

return stopTx;
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/MicroOcpp/Model/ConnectorBase/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
#define MO_REPORT_NOERROR 0
#endif

#ifndef MO_TX_ATTEMPT_TIMEOUT
#define MO_TX_ATTEMPT_TIMEOUT 1 //if the timeout of a tx-related msg should increase its attempt counter
#endif

namespace MicroOcpp {

class Context;
Expand Down