diff --git a/src/api/InfluxDB.cpp b/src/api/InfluxDB.cpp index ee3a8a20..163b8c0d 100644 --- a/src/api/InfluxDB.cpp +++ b/src/api/InfluxDB.cpp @@ -282,6 +282,41 @@ void vz::api::InfluxDB::send() { const int duplicates = channel()->duplicates(); const int duplicates_ms = duplicates * 1000; + // Snapshot _last_timestamp and _lastReadingSent before the build loop so we + // can roll back on send failure. The loop advances both fields as it runs, + // but the HTTP send has not happened yet. If the send fails, buf->undelete() + // restores the buffer items but without rolling back these fields the readings + // would be permanently skipped on every subsequent attempt. + // + // Three cases must be handled correctly: + // A) duplicates=0 — only _last_timestamp matters + // B) duplicates>0, _lastReadingSent was nullptr before loop + // — loop may allocate it; must free and null on rollback + // C) duplicates>0, _lastReadingSent was already set before loop + // — must restore its value, not just the pointer + const int64_t snapshot_last_timestamp = _last_timestamp; + Reading *snapshot_lastReadingSent = nullptr; + if (_lastReadingSent) { + snapshot_lastReadingSent = new Reading(*_lastReadingSent); + } + + auto rollback_state = [&]() { + _last_timestamp = snapshot_last_timestamp; + if (snapshot_lastReadingSent) { + // Case C: _lastReadingSent existed before — restore its value + if (!_lastReadingSent) + _lastReadingSent = new Reading(*snapshot_lastReadingSent); + else + *_lastReadingSent = *snapshot_lastReadingSent; + } else { + // Cases A & B: _lastReadingSent was nullptr before the loop. + // If the loop allocated it, free it now so the next send() + // call starts fresh and doesn't compare against a stale reading. + delete _lastReadingSent; + _lastReadingSent = nullptr; + } + }; + // build request body from buffer contents buf->lock(); for (it = buf->begin(); it != buf->end(); it++) { @@ -393,6 +428,9 @@ void vz::api::InfluxDB::send() { buf->clean(); // delete the stuff we just sent to InfluxDB from the buffer } else { buf->undelete(); // failure to insert, so dont delete the buffer + rollback_state(); // restore _last_timestamp and _lastReadingSent so all + // buffered readings are retried on next send() call + if (curl_code != CURLE_OK) { print(log_error, "CURL Error: %s", channel()->name(), curl_easy_strerror(curl_code)); @@ -402,11 +440,21 @@ void vz::api::InfluxDB::send() { print(log_error, "InfluxDB response was %s", channel()->name(), _response->get_response().c_str()); } + if (buf->size() > 30) { + print(log_error, "InfluxDB buffer contains %i undelivered readings", + channel()->name(), buf->size()); + } } } else { // there is nothing to send print(log_info, "Nothing to send to InfluxDB api", channel()->name()); } + // Free the snapshot in all code paths. + // In the failure+Case C path rollback_state() restored _lastReadingSent from it, + // but snapshot_lastReadingSent itself is a separate allocation that must always + // be freed here. + delete snapshot_lastReadingSent; + if (curlSessionProvider) { // release our curl session curlSessionProvider->return_session(_host + channel()->uuid(), _api.curl);