Skip to content
48 changes: 48 additions & 0 deletions src/api/InfluxDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,41 @@ void vz::api::InfluxDB::send() {
const int duplicates = channel()->duplicates();
const int duplicates_ms = duplicates * 1000;

// Snapshot _last_timestamp and _lastReadingSent before the build loop so we
// can roll back on send failure. The loop advances both fields as it runs,
// but the HTTP send has not happened yet. If the send fails, buf->undelete()
// restores the buffer items but without rolling back these fields the readings
// would be permanently skipped on every subsequent attempt.
//
// Three cases must be handled correctly:
// A) duplicates=0 — only _last_timestamp matters
// B) duplicates>0, _lastReadingSent was nullptr before loop
// — loop may allocate it; must free and null on rollback
// C) duplicates>0, _lastReadingSent was already set before loop
// — must restore its value, not just the pointer
const int64_t snapshot_last_timestamp = _last_timestamp;
Reading *snapshot_lastReadingSent = nullptr;
if (_lastReadingSent) {
snapshot_lastReadingSent = new Reading(*_lastReadingSent);
}

auto rollback_state = [&]() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the LLM wasn't allowed to add methods and C++ does not support nested functions,
so it inlines a closure and assigns it to a function pointer variable 😭
LLMs always do this, humans would never...

_last_timestamp = snapshot_last_timestamp;
if (snapshot_lastReadingSent) {
// Case C: _lastReadingSent existed before — restore its value
if (!_lastReadingSent)
_lastReadingSent = new Reading(*snapshot_lastReadingSent);
else
*_lastReadingSent = *snapshot_lastReadingSent;
} else {
// Cases A & B: _lastReadingSent was nullptr before the loop.
// If the loop allocated it, free it now so the next send()
// call starts fresh and doesn't compare against a stale reading.
delete _lastReadingSent;
_lastReadingSent = nullptr;
}
};

// build request body from buffer contents
buf->lock();
for (it = buf->begin(); it != buf->end(); it++) {
Expand Down Expand Up @@ -393,6 +428,9 @@ void vz::api::InfluxDB::send() {
buf->clean(); // delete the stuff we just sent to InfluxDB from the buffer
} else {
buf->undelete(); // failure to insert, so dont delete the buffer
rollback_state(); // restore _last_timestamp and _lastReadingSent so all
// buffered readings are retried on next send() call

if (curl_code != CURLE_OK) {
print(log_error, "CURL Error: %s", channel()->name(),
curl_easy_strerror(curl_code));
Expand All @@ -402,11 +440,21 @@ void vz::api::InfluxDB::send() {
print(log_error, "InfluxDB response was %s", channel()->name(),
_response->get_response().c_str());
}
if (buf->size() > 30) {
print(log_error, "InfluxDB buffer contains %i undelivered readings",
channel()->name(), buf->size());
}
}
} else { // there is nothing to send
print(log_info, "Nothing to send to InfluxDB api", channel()->name());
}

// Free the snapshot in all code paths.
// In the failure+Case C path rollback_state() restored _lastReadingSent from it,
// but snapshot_lastReadingSent itself is a separate allocation that must always
// be freed here.
delete snapshot_lastReadingSent;

if (curlSessionProvider) {
// release our curl session
curlSessionProvider->return_session(_host + channel()->uuid(), _api.curl);
Expand Down
Loading