From 557162d2c20644296a194a8a457e442ff91c5016 Mon Sep 17 00:00:00 2001
From: Alex <6953309+isarrider@users.noreply.github.com>
Date: Sun, 19 Apr 2026 12:49:45 +0200
Subject: [PATCH 1/8] Refactor logging statements and code formatting
Refactor logging statements for improved readability and consistency. Adjust formatting and indentation in various sections of the InfluxDB API implementation.
---
src/api/InfluxDB.cpp | 126 +++++++++++++++++++++++++++----------------
1 file changed, 80 insertions(+), 46 deletions(-)
diff --git a/src/api/InfluxDB.cpp b/src/api/InfluxDB.cpp
index ee3a8a20..fae65bc5 100644
--- a/src/api/InfluxDB.cpp
+++ b/src/api/InfluxDB.cpp
@@ -1,15 +1,14 @@
-/***********************************************************************/
+/*****************************************************************************/
/** @file InfluxDB.cpp
* This is cobbled together from the Volkszaehler and MySmartGrid API...
*
* @author Stefan Kuntz
- * @email Stefan.github@gmail.com
+ * @email Stefan.github@gmail.com
* @copyright Copyright (c) 2017 - 2023, The volkszaehler.org project
* @package vzlogger
* @license http://opensource.org/licenses/gpl-license.php GNU Public License
**/
/*---------------------------------------------------------------------*/
-
/*
* This file is part of volkzaehler.org
*
@@ -20,7 +19,7 @@
*
* volkzaehler.org is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
@@ -41,8 +40,9 @@
extern Config_Options options;
vz::api::InfluxDB::InfluxDB(const Channel::Ptr &ch, const std::list &pOptions)
- : ApiIF(ch), _response(new vz::api::CurlResponse()), _last_timestamp(0), _lastReadingSent(0) {
+ : ApiIF(ch), _response(new vz::api::CurlResponse()), _last_timestamp(0), _lastReadingSent(0) {
OptionList optlist;
+
print(log_debug, "InfluxDB API initialize", ch->name());
// parse config file options
@@ -73,12 +73,14 @@ vz::api::InfluxDB::InfluxDB(const Channel::Ptr &ch, const std::list &pOp
try {
_organization = optlist.lookup_string(pOptions, "organization");
- print(log_finest, "api InfluxDB using organization %s", ch->name(), _organization.c_str());
+ print(log_finest, "api InfluxDB using organization %s", ch->name(),
+ _organization.c_str());
} catch (vz::OptionNotFoundException &e) {
print(log_finest, "api InfluxDB no organization set", ch->name());
_organization = "";
} catch (vz::VZException &e) {
- print(log_alert, "api InfluxDB requires parameter \"organization\" as string!", ch->name());
+ print(log_alert, "api InfluxDB requires parameter \"organization\" as string!",
+ ch->name());
throw;
}
@@ -98,8 +100,6 @@ vz::api::InfluxDB::InfluxDB(const Channel::Ptr &ch, const std::list &pOp
// dont log passwords by default
// print(log_finest, "api InfluxDB using password %s", ch->name(), _password.c_str());
} catch (vz::OptionNotFoundException &e) {
- // print(log_alert, "api InfluxDB requires parameter \"password\"!", ch->name());
- // throw;
print(log_finest, "api InfluxDB no password set", ch->name());
_password = "";
} catch (vz::VZException &e) {
@@ -121,14 +121,14 @@ vz::api::InfluxDB::InfluxDB(const Channel::Ptr &ch, const std::list &pOp
try {
_measurement_name = optlist.lookup_string(pOptions, "measurement_name");
print(log_finest, "api InfluxDB using measurement name %s", ch->name(),
- _measurement_name.c_str());
+ _measurement_name.c_str());
} catch (vz::OptionNotFoundException &e) {
print(log_finest, "api InfluxDB will use default measurement name \"vzlogger\"",
- ch->name());
+ ch->name());
_measurement_name = "vzlogger";
} catch (vz::VZException &e) {
print(log_alert, "api InfluxDB requires parameter \"measurement_name\" as string!",
- ch->name());
+ ch->name());
throw;
}
@@ -148,7 +148,8 @@ vz::api::InfluxDB::InfluxDB(const Channel::Ptr &ch, const std::list &pOp
print(log_finest, "api InfluxDB using curl timeout %i", ch->name(), _curl_timeout);
} catch (vz::OptionNotFoundException &e) {
_curl_timeout = 30; // seconds
- print(log_finest, "api InfluxDB will use default timeout %i", ch->name(), _curl_timeout);
+ print(log_finest, "api InfluxDB will use default timeout %i", ch->name(),
+ _curl_timeout);
} catch (vz::VZException &e) {
print(log_alert, "api InfluxDB requires parameter \"timeout\" as int!", ch->name());
throw;
@@ -157,37 +158,39 @@ vz::api::InfluxDB::InfluxDB(const Channel::Ptr &ch, const std::list &pOp
try {
_max_batch_inserts = optlist.lookup_int(pOptions, "max_batch_inserts");
print(log_finest, "api InfluxDB using max batch inserts: %i", ch->name(),
- _max_batch_inserts);
+ _max_batch_inserts);
} catch (vz::OptionNotFoundException &e) {
_max_batch_inserts = 4500; // max lines per request
print(log_finest, "api InfluxDB will use default max_batch_inserts %i", ch->name(),
- _max_batch_inserts);
+ _max_batch_inserts);
} catch (vz::VZException &e) {
print(log_alert, "api InfluxDB requires parameter \"max_batch_inserts\" as int!",
- ch->name());
+ ch->name());
throw;
}
try {
_max_buffer_size = optlist.lookup_int(pOptions, "max_buffer_size");
- print(log_finest, "api InfluxDB using max_buffer_size: %i", ch->name(), _max_buffer_size);
+ print(log_finest, "api InfluxDB using max_buffer_size: %i", ch->name(),
+ _max_buffer_size);
} catch (vz::OptionNotFoundException &e) {
_max_buffer_size = _max_batch_inserts * 100; // max items in buffer
print(log_finest, "api InfluxDB will use default max_buffer_size %i", ch->name(),
- _max_buffer_size);
+ _max_buffer_size);
} catch (vz::VZException &e) {
- print(log_alert, "api InfluxDB requires parameter \"max_buffer_size\" as int!", ch->name());
+ print(log_alert, "api InfluxDB requires parameter \"max_buffer_size\" as int!",
+ ch->name());
throw;
}
try {
_send_uuid = optlist.lookup_bool(pOptions, "send_uuid");
print(log_finest, "api InfluxDB using send_uuid: %s", ch->name(),
- _send_uuid ? "true" : "false");
+ _send_uuid ? "true" : "false");
} catch (vz::OptionNotFoundException &e) {
_send_uuid = true;
print(log_finest, "api InfluxDB will use default send_uuid %s", ch->name(),
- _send_uuid ? "true" : "false");
+ _send_uuid ? "true" : "false");
} catch (vz::VZException &e) {
print(log_alert, "api InfluxDB requires parameter \"send_uuid\" as bool!", ch->name());
throw;
@@ -196,14 +199,14 @@ vz::api::InfluxDB::InfluxDB(const Channel::Ptr &ch, const std::list &pOp
try {
_ssl_verifypeer = optlist.lookup_bool(pOptions, "ssl_verifypeer");
print(log_finest, "api InfluxDB using ssl_verifypeer: %s", ch->name(),
- _ssl_verifypeer ? "true" : "false");
+ _ssl_verifypeer ? "true" : "false");
} catch (vz::OptionNotFoundException &e) {
_ssl_verifypeer = true;
print(log_finest, "api InfluxDB will use default ssl_verifypeer %s", ch->name(),
- _ssl_verifypeer ? "true" : "false");
+ _ssl_verifypeer ? "true" : "false");
} catch (vz::VZException &e) {
print(log_alert, "api InfluxDB requires parameter \"_ssl_verifypeer\" as bool!",
- ch->name());
+ ch->name());
throw;
}
@@ -211,6 +214,7 @@ vz::api::InfluxDB::InfluxDB(const Channel::Ptr &ch, const std::list &pOp
if (!curlhelper) {
throw vz::VZException("CURL: cannot create handle for urlencode.");
}
+
char *database_urlencoded = curl_easy_escape(curlhelper, _database.c_str(), 0);
if (!database_urlencoded) {
throw vz::VZException("Cannot url-encode database name.");
@@ -219,7 +223,8 @@ vz::api::InfluxDB::InfluxDB(const Channel::Ptr &ch, const std::list &pOp
// build request url
_url = _host;
if (!_organization.empty()) {
- char *organization_urlencoded = curl_easy_escape(curlhelper, _organization.c_str(), 0);
+ char *organization_urlencoded =
+ curl_easy_escape(curlhelper, _organization.c_str(), 0);
if (!organization_urlencoded) {
throw vz::VZException("Cannot url-encode organization name.");
}
@@ -232,6 +237,7 @@ vz::api::InfluxDB::InfluxDB(const Channel::Ptr &ch, const std::list &pOp
}
_url.append(database_urlencoded);
_url.append("&precision=ms");
+
print(log_debug, "api InfluxDB using url %s", ch->name(), _url.c_str());
curl_free(database_urlencoded);
}
@@ -244,11 +250,12 @@ void vz::api::InfluxDB::send() {
CURLcode curl_code;
int request_body_lines = 0;
std::string request_body;
+
Buffer::Ptr buf = channel()->buffer();
Buffer::iterator it;
_api.curl =
- curlSessionProvider ? curlSessionProvider->get_easy_session(_host + channel()->uuid()) : 0;
+ curlSessionProvider ? curlSessionProvider->get_easy_session(_host + channel()->uuid()) : 0;
if (!_api.curl) {
throw vz::VZException("CURL: cannot create handle.");
@@ -259,16 +266,15 @@ void vz::api::InfluxDB::send() {
// delete items if the buffer grows too large
if (buf->size() > (unsigned)_max_buffer_size) {
print(log_warning,
- "Buffer too big (%i items). Deleting items. (This indicates a connection problem)",
- channel()->name(), buf->size());
+ "Buffer too big (%i items). Deleting items. (This indicates a connection problem)",
+ channel()->name(), buf->size());
unsigned int delta_delete =
- buf->size() - (unsigned)_max_buffer_size; // number of items to delete from buffer
+ buf->size() - (unsigned)_max_buffer_size; // number of items to delete from buffer
buf->lock();
it = buf->begin();
- while (!(
- delta_delete == 0 ||
- (it ==
- buf->end()))) { // buf->end() check shouldnt be necessary. But better safe than sorry.
+ while (!(delta_delete == 0 ||
+ (it == buf->end()))) { // buf->end() check shouldnt be necessary. But better safe
+ // than sorry.
it->mark_delete();
it++;
delta_delete--;
@@ -282,23 +288,42 @@ void vz::api::InfluxDB::send() {
const int duplicates = channel()->duplicates();
const int duplicates_ms = duplicates * 1000;
+ // Snapshot state before the build loop so we can roll back on failure.
+ // _last_timestamp and _lastReadingSent are advanced during the loop even
+ // though the HTTP send has not happened yet. If the send fails,
+ // buf->undelete() restores the buffer items, but without rolling back
+ // these fields the readings will be skipped on every subsequent attempt
+ // and silently dropped when the next buf->clean() runs.
+ const int64_t snapshot_last_timestamp = _last_timestamp;
+ Reading *snapshot_lastReadingSent = nullptr;
+ if (_lastReadingSent) {
+ snapshot_lastReadingSent = new Reading(*_lastReadingSent);
+ }
+
+ auto rollback_state = [&]() {
+ _last_timestamp = snapshot_last_timestamp;
+ if (snapshot_lastReadingSent) {
+ if (!_lastReadingSent) _lastReadingSent = new Reading(*snapshot_lastReadingSent);
+ else *_lastReadingSent = *snapshot_lastReadingSent;
+ }
+ };
+
// build request body from buffer contents
buf->lock();
for (it = buf->begin(); it != buf->end(); it++) {
if (request_body_lines >= _max_batch_inserts) {
print(log_debug, "reached maximum lines for InfluxDB insertion request.",
- channel()->name());
+ channel()->name());
break;
}
bool sendData = false;
-
timestamp = it->time_ms();
print(log_finest, "Reading buffer: timestamp %lld value %f", channel()->name(),
- it->time_ms(), it->value());
-
+ it->time_ms(), it->value());
print(log_debug, "compare: %lld %lld", channel()->name(), _last_timestamp, timestamp);
+
// we can only add/consider a timestamp if the ms resolution is not before than from
// previous one:
if (_last_timestamp <= timestamp) {
@@ -307,9 +332,9 @@ void vz::api::InfluxDB::send() {
_last_timestamp = timestamp;
} else {
const Reading &r = *it;
+
// duplicates should be ignored
// but send at least each seconds
-
if (!_lastReadingSent) { // first one from the duplicate consideration -> send it
sendData = true;
_lastReadingSent = new Reading(r);
@@ -318,7 +343,7 @@ void vz::api::InfluxDB::send() {
// a) timestamp
// b) duplicate value
if ((timestamp >= (_last_timestamp + duplicates_ms)) ||
- (r.value() != _lastReadingSent->value())) {
+ (r.value() != _lastReadingSent->value())) {
// send the current one:
sendData = true;
_last_timestamp = timestamp;
@@ -351,7 +376,6 @@ void vz::api::InfluxDB::send() {
it->mark_delete();
}
-
buf->unlock();
if (request_body_lines > 0) { // there is something to send
@@ -367,25 +391,27 @@ void vz::api::InfluxDB::send() {
} else if (_token_header) {
curl_easy_setopt(_api.curl, CURLOPT_HTTPHEADER, _token_header);
}
+
curl_easy_setopt(_api.curl, CURLOPT_URL, _url.c_str());
curl_easy_setopt(_api.curl, CURLOPT_VERBOSE, options.verbosity() > 0);
curl_easy_setopt(_api.curl, CURLOPT_SSL_VERIFYPEER, _ssl_verifypeer);
-
curl_easy_setopt(_api.curl, CURLOPT_DEBUGFUNCTION,
- &(vz::api::CurlCallback::debug_callback));
+ &(vz::api::CurlCallback::debug_callback));
curl_easy_setopt(_api.curl, CURLOPT_DEBUGDATA, response());
+
// signal-handling in libcurl is NOT thread-safe. so force to deactivated them!
curl_easy_setopt(_api.curl, CURLOPT_NOSIGNAL, 1);
curl_easy_setopt(_api.curl, CURLOPT_TIMEOUT, _curl_timeout);
-
curl_easy_setopt(_api.curl, CURLOPT_POSTFIELDS, request_body.c_str());
curl_easy_setopt(_api.curl, CURLOPT_WRITEFUNCTION,
- &(vz::api::CurlCallback::write_callback));
+ &(vz::api::CurlCallback::write_callback));
curl_easy_setopt(_api.curl, CURLOPT_WRITEDATA, response());
// actually send the request to InfluxDB
curl_code = curl_easy_perform(_api.curl);
+
print(log_finest, "Influxdb curl terminated", channel()->name());
+
curl_easy_getinfo(_api.curl, CURLINFO_RESPONSE_CODE, &http_code);
if (curl_code == CURLE_OK && http_code >= 200 && http_code < 300) { // everything is ok
@@ -393,20 +419,28 @@ void vz::api::InfluxDB::send() {
buf->clean(); // delete the stuff we just sent to InfluxDB from the buffer
} else {
buf->undelete(); // failure to insert, so dont delete the buffer
+ rollback_state(); // restore _last_timestamp and _lastReadingSent so buffered
+ // readings are retried correctly when InfluxDB comes back up
if (curl_code != CURLE_OK) {
print(log_error, "CURL Error: %s", channel()->name(),
- curl_easy_strerror(curl_code));
+ curl_easy_strerror(curl_code));
}
print(log_error, "InfluxDB error! - HTTP Status %i", channel()->name(), http_code);
if (!_response->get_response().empty()) {
print(log_error, "InfluxDB response was %s", channel()->name(),
- _response->get_response().c_str());
+ _response->get_response().c_str());
}
}
+
} else { // there is nothing to send
print(log_info, "Nothing to send to InfluxDB api", channel()->name());
}
+ // Free the snapshot in all code paths (success, failure, nothing-to-send).
+ // In the failure path rollback_state() may have copied it into _lastReadingSent,
+ // but that is a separate allocation; snapshot_lastReadingSent itself must be freed here.
+ delete snapshot_lastReadingSent;
+
if (curlSessionProvider) {
// release our curl session
curlSessionProvider->return_session(_host + channel()->uuid(), _api.curl);
From ca75f6f8184e65b99f42f4756c414fbed3d50b4b Mon Sep 17 00:00:00 2001
From: Alex <6953309+isarrider@users.noreply.github.com>
Date: Sun, 19 Apr 2026 12:54:15 +0200
Subject: [PATCH 2/8] Fix formatting and improve logging statements
---
src/api/InfluxDB.cpp | 126 ++++++++++++++++---------------------------
1 file changed, 46 insertions(+), 80 deletions(-)
diff --git a/src/api/InfluxDB.cpp b/src/api/InfluxDB.cpp
index fae65bc5..ee3a8a20 100644
--- a/src/api/InfluxDB.cpp
+++ b/src/api/InfluxDB.cpp
@@ -1,14 +1,15 @@
-/*****************************************************************************/
+/***********************************************************************/
/** @file InfluxDB.cpp
* This is cobbled together from the Volkszaehler and MySmartGrid API...
*
* @author Stefan Kuntz
- * @email Stefan.github@gmail.com
+ * @email Stefan.github@gmail.com
* @copyright Copyright (c) 2017 - 2023, The volkszaehler.org project
* @package vzlogger
* @license http://opensource.org/licenses/gpl-license.php GNU Public License
**/
/*---------------------------------------------------------------------*/
+
/*
* This file is part of volkzaehler.org
*
@@ -19,7 +20,7 @@
*
* volkzaehler.org is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
@@ -40,9 +41,8 @@
extern Config_Options options;
vz::api::InfluxDB::InfluxDB(const Channel::Ptr &ch, const std::list &pOptions)
- : ApiIF(ch), _response(new vz::api::CurlResponse()), _last_timestamp(0), _lastReadingSent(0) {
+ : ApiIF(ch), _response(new vz::api::CurlResponse()), _last_timestamp(0), _lastReadingSent(0) {
OptionList optlist;
-
print(log_debug, "InfluxDB API initialize", ch->name());
// parse config file options
@@ -73,14 +73,12 @@ vz::api::InfluxDB::InfluxDB(const Channel::Ptr &ch, const std::list &pOp
try {
_organization = optlist.lookup_string(pOptions, "organization");
- print(log_finest, "api InfluxDB using organization %s", ch->name(),
- _organization.c_str());
+ print(log_finest, "api InfluxDB using organization %s", ch->name(), _organization.c_str());
} catch (vz::OptionNotFoundException &e) {
print(log_finest, "api InfluxDB no organization set", ch->name());
_organization = "";
} catch (vz::VZException &e) {
- print(log_alert, "api InfluxDB requires parameter \"organization\" as string!",
- ch->name());
+ print(log_alert, "api InfluxDB requires parameter \"organization\" as string!", ch->name());
throw;
}
@@ -100,6 +98,8 @@ vz::api::InfluxDB::InfluxDB(const Channel::Ptr &ch, const std::list &pOp
// dont log passwords by default
// print(log_finest, "api InfluxDB using password %s", ch->name(), _password.c_str());
} catch (vz::OptionNotFoundException &e) {
+ // print(log_alert, "api InfluxDB requires parameter \"password\"!", ch->name());
+ // throw;
print(log_finest, "api InfluxDB no password set", ch->name());
_password = "";
} catch (vz::VZException &e) {
@@ -121,14 +121,14 @@ vz::api::InfluxDB::InfluxDB(const Channel::Ptr &ch, const std::list &pOp
try {
_measurement_name = optlist.lookup_string(pOptions, "measurement_name");
print(log_finest, "api InfluxDB using measurement name %s", ch->name(),
- _measurement_name.c_str());
+ _measurement_name.c_str());
} catch (vz::OptionNotFoundException &e) {
print(log_finest, "api InfluxDB will use default measurement name \"vzlogger\"",
- ch->name());
+ ch->name());
_measurement_name = "vzlogger";
} catch (vz::VZException &e) {
print(log_alert, "api InfluxDB requires parameter \"measurement_name\" as string!",
- ch->name());
+ ch->name());
throw;
}
@@ -148,8 +148,7 @@ vz::api::InfluxDB::InfluxDB(const Channel::Ptr &ch, const std::list &pOp
print(log_finest, "api InfluxDB using curl timeout %i", ch->name(), _curl_timeout);
} catch (vz::OptionNotFoundException &e) {
_curl_timeout = 30; // seconds
- print(log_finest, "api InfluxDB will use default timeout %i", ch->name(),
- _curl_timeout);
+ print(log_finest, "api InfluxDB will use default timeout %i", ch->name(), _curl_timeout);
} catch (vz::VZException &e) {
print(log_alert, "api InfluxDB requires parameter \"timeout\" as int!", ch->name());
throw;
@@ -158,39 +157,37 @@ vz::api::InfluxDB::InfluxDB(const Channel::Ptr &ch, const std::list &pOp
try {
_max_batch_inserts = optlist.lookup_int(pOptions, "max_batch_inserts");
print(log_finest, "api InfluxDB using max batch inserts: %i", ch->name(),
- _max_batch_inserts);
+ _max_batch_inserts);
} catch (vz::OptionNotFoundException &e) {
_max_batch_inserts = 4500; // max lines per request
print(log_finest, "api InfluxDB will use default max_batch_inserts %i", ch->name(),
- _max_batch_inserts);
+ _max_batch_inserts);
} catch (vz::VZException &e) {
print(log_alert, "api InfluxDB requires parameter \"max_batch_inserts\" as int!",
- ch->name());
+ ch->name());
throw;
}
try {
_max_buffer_size = optlist.lookup_int(pOptions, "max_buffer_size");
- print(log_finest, "api InfluxDB using max_buffer_size: %i", ch->name(),
- _max_buffer_size);
+ print(log_finest, "api InfluxDB using max_buffer_size: %i", ch->name(), _max_buffer_size);
} catch (vz::OptionNotFoundException &e) {
_max_buffer_size = _max_batch_inserts * 100; // max items in buffer
print(log_finest, "api InfluxDB will use default max_buffer_size %i", ch->name(),
- _max_buffer_size);
+ _max_buffer_size);
} catch (vz::VZException &e) {
- print(log_alert, "api InfluxDB requires parameter \"max_buffer_size\" as int!",
- ch->name());
+ print(log_alert, "api InfluxDB requires parameter \"max_buffer_size\" as int!", ch->name());
throw;
}
try {
_send_uuid = optlist.lookup_bool(pOptions, "send_uuid");
print(log_finest, "api InfluxDB using send_uuid: %s", ch->name(),
- _send_uuid ? "true" : "false");
+ _send_uuid ? "true" : "false");
} catch (vz::OptionNotFoundException &e) {
_send_uuid = true;
print(log_finest, "api InfluxDB will use default send_uuid %s", ch->name(),
- _send_uuid ? "true" : "false");
+ _send_uuid ? "true" : "false");
} catch (vz::VZException &e) {
print(log_alert, "api InfluxDB requires parameter \"send_uuid\" as bool!", ch->name());
throw;
@@ -199,14 +196,14 @@ vz::api::InfluxDB::InfluxDB(const Channel::Ptr &ch, const std::list &pOp
try {
_ssl_verifypeer = optlist.lookup_bool(pOptions, "ssl_verifypeer");
print(log_finest, "api InfluxDB using ssl_verifypeer: %s", ch->name(),
- _ssl_verifypeer ? "true" : "false");
+ _ssl_verifypeer ? "true" : "false");
} catch (vz::OptionNotFoundException &e) {
_ssl_verifypeer = true;
print(log_finest, "api InfluxDB will use default ssl_verifypeer %s", ch->name(),
- _ssl_verifypeer ? "true" : "false");
+ _ssl_verifypeer ? "true" : "false");
} catch (vz::VZException &e) {
print(log_alert, "api InfluxDB requires parameter \"_ssl_verifypeer\" as bool!",
- ch->name());
+ ch->name());
throw;
}
@@ -214,7 +211,6 @@ vz::api::InfluxDB::InfluxDB(const Channel::Ptr &ch, const std::list &pOp
if (!curlhelper) {
throw vz::VZException("CURL: cannot create handle for urlencode.");
}
-
char *database_urlencoded = curl_easy_escape(curlhelper, _database.c_str(), 0);
if (!database_urlencoded) {
throw vz::VZException("Cannot url-encode database name.");
@@ -223,8 +219,7 @@ vz::api::InfluxDB::InfluxDB(const Channel::Ptr &ch, const std::list &pOp
// build request url
_url = _host;
if (!_organization.empty()) {
- char *organization_urlencoded =
- curl_easy_escape(curlhelper, _organization.c_str(), 0);
+ char *organization_urlencoded = curl_easy_escape(curlhelper, _organization.c_str(), 0);
if (!organization_urlencoded) {
throw vz::VZException("Cannot url-encode organization name.");
}
@@ -237,7 +232,6 @@ vz::api::InfluxDB::InfluxDB(const Channel::Ptr &ch, const std::list &pOp
}
_url.append(database_urlencoded);
_url.append("&precision=ms");
-
print(log_debug, "api InfluxDB using url %s", ch->name(), _url.c_str());
curl_free(database_urlencoded);
}
@@ -250,12 +244,11 @@ void vz::api::InfluxDB::send() {
CURLcode curl_code;
int request_body_lines = 0;
std::string request_body;
-
Buffer::Ptr buf = channel()->buffer();
Buffer::iterator it;
_api.curl =
- curlSessionProvider ? curlSessionProvider->get_easy_session(_host + channel()->uuid()) : 0;
+ curlSessionProvider ? curlSessionProvider->get_easy_session(_host + channel()->uuid()) : 0;
if (!_api.curl) {
throw vz::VZException("CURL: cannot create handle.");
@@ -266,15 +259,16 @@ void vz::api::InfluxDB::send() {
// delete items if the buffer grows too large
if (buf->size() > (unsigned)_max_buffer_size) {
print(log_warning,
- "Buffer too big (%i items). Deleting items. (This indicates a connection problem)",
- channel()->name(), buf->size());
+ "Buffer too big (%i items). Deleting items. (This indicates a connection problem)",
+ channel()->name(), buf->size());
unsigned int delta_delete =
- buf->size() - (unsigned)_max_buffer_size; // number of items to delete from buffer
+ buf->size() - (unsigned)_max_buffer_size; // number of items to delete from buffer
buf->lock();
it = buf->begin();
- while (!(delta_delete == 0 ||
- (it == buf->end()))) { // buf->end() check shouldnt be necessary. But better safe
- // than sorry.
+ while (!(
+ delta_delete == 0 ||
+ (it ==
+ buf->end()))) { // buf->end() check shouldnt be necessary. But better safe than sorry.
it->mark_delete();
it++;
delta_delete--;
@@ -288,42 +282,23 @@ void vz::api::InfluxDB::send() {
const int duplicates = channel()->duplicates();
const int duplicates_ms = duplicates * 1000;
- // Snapshot state before the build loop so we can roll back on failure.
- // _last_timestamp and _lastReadingSent are advanced during the loop even
- // though the HTTP send has not happened yet. If the send fails,
- // buf->undelete() restores the buffer items, but without rolling back
- // these fields the readings will be skipped on every subsequent attempt
- // and silently dropped when the next buf->clean() runs.
- const int64_t snapshot_last_timestamp = _last_timestamp;
- Reading *snapshot_lastReadingSent = nullptr;
- if (_lastReadingSent) {
- snapshot_lastReadingSent = new Reading(*_lastReadingSent);
- }
-
- auto rollback_state = [&]() {
- _last_timestamp = snapshot_last_timestamp;
- if (snapshot_lastReadingSent) {
- if (!_lastReadingSent) _lastReadingSent = new Reading(*snapshot_lastReadingSent);
- else *_lastReadingSent = *snapshot_lastReadingSent;
- }
- };
-
// build request body from buffer contents
buf->lock();
for (it = buf->begin(); it != buf->end(); it++) {
if (request_body_lines >= _max_batch_inserts) {
print(log_debug, "reached maximum lines for InfluxDB insertion request.",
- channel()->name());
+ channel()->name());
break;
}
bool sendData = false;
+
timestamp = it->time_ms();
print(log_finest, "Reading buffer: timestamp %lld value %f", channel()->name(),
- it->time_ms(), it->value());
- print(log_debug, "compare: %lld %lld", channel()->name(), _last_timestamp, timestamp);
+ it->time_ms(), it->value());
+ print(log_debug, "compare: %lld %lld", channel()->name(), _last_timestamp, timestamp);
// we can only add/consider a timestamp if the ms resolution is not before than from
// previous one:
if (_last_timestamp <= timestamp) {
@@ -332,9 +307,9 @@ void vz::api::InfluxDB::send() {
_last_timestamp = timestamp;
} else {
const Reading &r = *it;
-
// duplicates should be ignored
// but send at least each seconds
+
if (!_lastReadingSent) { // first one from the duplicate consideration -> send it
sendData = true;
_lastReadingSent = new Reading(r);
@@ -343,7 +318,7 @@ void vz::api::InfluxDB::send() {
// a) timestamp
// b) duplicate value
if ((timestamp >= (_last_timestamp + duplicates_ms)) ||
- (r.value() != _lastReadingSent->value())) {
+ (r.value() != _lastReadingSent->value())) {
// send the current one:
sendData = true;
_last_timestamp = timestamp;
@@ -376,6 +351,7 @@ void vz::api::InfluxDB::send() {
it->mark_delete();
}
+
buf->unlock();
if (request_body_lines > 0) { // there is something to send
@@ -391,27 +367,25 @@ void vz::api::InfluxDB::send() {
} else if (_token_header) {
curl_easy_setopt(_api.curl, CURLOPT_HTTPHEADER, _token_header);
}
-
curl_easy_setopt(_api.curl, CURLOPT_URL, _url.c_str());
curl_easy_setopt(_api.curl, CURLOPT_VERBOSE, options.verbosity() > 0);
curl_easy_setopt(_api.curl, CURLOPT_SSL_VERIFYPEER, _ssl_verifypeer);
+
curl_easy_setopt(_api.curl, CURLOPT_DEBUGFUNCTION,
- &(vz::api::CurlCallback::debug_callback));
+ &(vz::api::CurlCallback::debug_callback));
curl_easy_setopt(_api.curl, CURLOPT_DEBUGDATA, response());
-
// signal-handling in libcurl is NOT thread-safe. so force to deactivated them!
curl_easy_setopt(_api.curl, CURLOPT_NOSIGNAL, 1);
curl_easy_setopt(_api.curl, CURLOPT_TIMEOUT, _curl_timeout);
+
curl_easy_setopt(_api.curl, CURLOPT_POSTFIELDS, request_body.c_str());
curl_easy_setopt(_api.curl, CURLOPT_WRITEFUNCTION,
- &(vz::api::CurlCallback::write_callback));
+ &(vz::api::CurlCallback::write_callback));
curl_easy_setopt(_api.curl, CURLOPT_WRITEDATA, response());
// actually send the request to InfluxDB
curl_code = curl_easy_perform(_api.curl);
-
print(log_finest, "Influxdb curl terminated", channel()->name());
-
curl_easy_getinfo(_api.curl, CURLINFO_RESPONSE_CODE, &http_code);
if (curl_code == CURLE_OK && http_code >= 200 && http_code < 300) { // everything is ok
@@ -419,28 +393,20 @@ void vz::api::InfluxDB::send() {
buf->clean(); // delete the stuff we just sent to InfluxDB from the buffer
} else {
buf->undelete(); // failure to insert, so dont delete the buffer
- rollback_state(); // restore _last_timestamp and _lastReadingSent so buffered
- // readings are retried correctly when InfluxDB comes back up
if (curl_code != CURLE_OK) {
print(log_error, "CURL Error: %s", channel()->name(),
- curl_easy_strerror(curl_code));
+ curl_easy_strerror(curl_code));
}
print(log_error, "InfluxDB error! - HTTP Status %i", channel()->name(), http_code);
if (!_response->get_response().empty()) {
print(log_error, "InfluxDB response was %s", channel()->name(),
- _response->get_response().c_str());
+ _response->get_response().c_str());
}
}
-
} else { // there is nothing to send
print(log_info, "Nothing to send to InfluxDB api", channel()->name());
}
- // Free the snapshot in all code paths (success, failure, nothing-to-send).
- // In the failure path rollback_state() may have copied it into _lastReadingSent,
- // but that is a separate allocation; snapshot_lastReadingSent itself must be freed here.
- delete snapshot_lastReadingSent;
-
if (curlSessionProvider) {
// release our curl session
curlSessionProvider->return_session(_host + channel()->uuid(), _api.curl);
From 63373654e2bb2836274c032f88bbee3902ecfe00 Mon Sep 17 00:00:00 2001
From: Alex <6953309+isarrider@users.noreply.github.com>
Date: Sun, 19 Apr 2026 13:09:38 +0200
Subject: [PATCH 3/8] Implement rollback state for InfluxDB insert failures
Added rollback mechanism to restore state on InfluxDB insert failure.
---
src/api/InfluxDB.cpp | 27 +++++++++++++++++++++++++++
1 file changed, 27 insertions(+)
diff --git a/src/api/InfluxDB.cpp b/src/api/InfluxDB.cpp
index ee3a8a20..67272a4c 100644
--- a/src/api/InfluxDB.cpp
+++ b/src/api/InfluxDB.cpp
@@ -282,6 +282,26 @@ void vz::api::InfluxDB::send() {
const int duplicates = channel()->duplicates();
const int duplicates_ms = duplicates * 1000;
+ // Snapshot state before the build loop so we can roll back on failure.
+ // _last_timestamp and _lastReadingSent are advanced during the loop even
+ // though the HTTP send has not happened yet. If the send fails,
+ // buf->undelete() restores the buffer items, but without rolling back
+ // these fields the readings will be skipped on every subsequent attempt
+ // and silently dropped when the next buf->clean() runs.
+ const int64_t snapshot_last_timestamp = _last_timestamp;
+ Reading *snapshot_lastReadingSent = nullptr;
+ if (_lastReadingSent) {
+ snapshot_lastReadingSent = new Reading(*_lastReadingSent);
+ }
+
+ auto rollback_state = [&]() {
+ _last_timestamp = snapshot_last_timestamp;
+ if (snapshot_lastReadingSent) {
+ if (!_lastReadingSent) _lastReadingSent = new Reading(*snapshot_lastReadingSent);
+ else *_lastReadingSent = *snapshot_lastReadingSent;
+ }
+ };
+
// build request body from buffer contents
buf->lock();
for (it = buf->begin(); it != buf->end(); it++) {
@@ -393,6 +413,8 @@ void vz::api::InfluxDB::send() {
buf->clean(); // delete the stuff we just sent to InfluxDB from the buffer
} else {
buf->undelete(); // failure to insert, so dont delete the buffer
+ rollback_state(); // restore _last_timestamp and _lastReadingSent so buffered
+ // readings are retried correctly when InfluxDB comes back up
if (curl_code != CURLE_OK) {
print(log_error, "CURL Error: %s", channel()->name(),
curl_easy_strerror(curl_code));
@@ -407,6 +429,11 @@ void vz::api::InfluxDB::send() {
print(log_info, "Nothing to send to InfluxDB api", channel()->name());
}
+ // Free the snapshot in all code paths (success, failure, nothing-to-send).
+ // In the failure path rollback_state() may have copied it into _lastReadingSent,
+ // but that is a separate allocation; snapshot_lastReadingSent itself must be freed here.
+ delete snapshot_lastReadingSent;
+
if (curlSessionProvider) {
// release our curl session
curlSessionProvider->return_session(_host + channel()->uuid(), _api.curl);
From 6f988fcc5b3a237194998c66d0d514f276c224c3 Mon Sep 17 00:00:00 2001
From: Alex <6953309+isarrider@users.noreply.github.com>
Date: Sun, 19 Apr 2026 16:19:41 +0200
Subject: [PATCH 4/8] Fix rollback logic for lastReadingSent in InfluxDB
---
src/api/InfluxDB.cpp | 5 +++++
1 file changed, 5 insertions(+)
diff --git a/src/api/InfluxDB.cpp b/src/api/InfluxDB.cpp
index 67272a4c..e18a2e10 100644
--- a/src/api/InfluxDB.cpp
+++ b/src/api/InfluxDB.cpp
@@ -297,8 +297,13 @@ void vz::api::InfluxDB::send() {
auto rollback_state = [&]() {
_last_timestamp = snapshot_last_timestamp;
if (snapshot_lastReadingSent) {
+ // _lastReadingSent existed before the loop - restore it to its previous value
if (!_lastReadingSent) _lastReadingSent = new Reading(*snapshot_lastReadingSent);
else *_lastReadingSent = *snapshot_lastReadingSent;
+ } else {
+ // _lastReadingSent was null before the loop - if the loop allocated it, free it
+ delete _lastReadingSent;
+ _lastReadingSent = nullptr;
}
};
From 2b3f294597f2d6553be0ee88e448def2811c0a4a Mon Sep 17 00:00:00 2001
From: Alex <6953309+isarrider@users.noreply.github.com>
Date: Sun, 19 Apr 2026 22:26:38 +0200
Subject: [PATCH 5/8] Refactor rollback logic for lastReadingSent handling
---
src/api/InfluxDB.cpp | 52 +++++++++++++++++++++++++++-----------------
1 file changed, 32 insertions(+), 20 deletions(-)
diff --git a/src/api/InfluxDB.cpp b/src/api/InfluxDB.cpp
index e18a2e10..3f692aa5 100644
--- a/src/api/InfluxDB.cpp
+++ b/src/api/InfluxDB.cpp
@@ -282,13 +282,19 @@ void vz::api::InfluxDB::send() {
const int duplicates = channel()->duplicates();
const int duplicates_ms = duplicates * 1000;
- // Snapshot state before the build loop so we can roll back on failure.
- // _last_timestamp and _lastReadingSent are advanced during the loop even
- // though the HTTP send has not happened yet. If the send fails,
- // buf->undelete() restores the buffer items, but without rolling back
- // these fields the readings will be skipped on every subsequent attempt
- // and silently dropped when the next buf->clean() runs.
- const int64_t snapshot_last_timestamp = _last_timestamp;
+ // Snapshot _last_timestamp and _lastReadingSent before the build loop so we
+ // can roll back on send failure. The loop advances both fields as it runs,
+ // but the HTTP send has not happened yet. If the send fails, buf->undelete()
+ // restores the buffer items but without rolling back these fields the readings
+ // would be permanently skipped on every subsequent attempt.
+ //
+ // Three cases must be handled correctly:
+ // A) duplicates=0 — only _last_timestamp matters
+ // B) duplicates>0, _lastReadingSent was nullptr before loop
+ // — loop may allocate it; must free and null on rollback
+ // C) duplicates>0, _lastReadingSent was already set before loop
+ // — must restore its value, not just the pointer
+const int64_t snapshot_last_timestamp = _last_timestamp;
Reading *snapshot_lastReadingSent = nullptr;
if (_lastReadingSent) {
snapshot_lastReadingSent = new Reading(*_lastReadingSent);
@@ -297,12 +303,16 @@ void vz::api::InfluxDB::send() {
auto rollback_state = [&]() {
_last_timestamp = snapshot_last_timestamp;
if (snapshot_lastReadingSent) {
- // _lastReadingSent existed before the loop - restore it to its previous value
- if (!_lastReadingSent) _lastReadingSent = new Reading(*snapshot_lastReadingSent);
- else *_lastReadingSent = *snapshot_lastReadingSent;
- } else {
- // _lastReadingSent was null before the loop - if the loop allocated it, free it
- delete _lastReadingSent;
+ // Case C: _lastReadingSent existed before — restore its value
+ if (!_lastReadingSent)
+ _lastReadingSent = new Reading(*snapshot_lastReadingSent);
+ else
+ *_lastReadingSent = *snapshot_lastReadingSent;
+ } else {
+ // Cases A & B: _lastReadingSent was nullptr before the loop.
+ // If the loop allocated it, free it now so the next send()
+ // call starts fresh and doesn't compare against a stale reading.
+ delete _lastReadingSent;
_lastReadingSent = nullptr;
}
};
@@ -418,9 +428,10 @@ void vz::api::InfluxDB::send() {
buf->clean(); // delete the stuff we just sent to InfluxDB from the buffer
} else {
buf->undelete(); // failure to insert, so dont delete the buffer
- rollback_state(); // restore _last_timestamp and _lastReadingSent so buffered
- // readings are retried correctly when InfluxDB comes back up
- if (curl_code != CURLE_OK) {
+ rollback_state(); // restore _last_timestamp and _lastReadingSent so all
+ // buffered readings are retried on next send() call
+
+ if (curl_code != CURLE_OK) {
print(log_error, "CURL Error: %s", channel()->name(),
curl_easy_strerror(curl_code));
}
@@ -434,10 +445,11 @@ void vz::api::InfluxDB::send() {
print(log_info, "Nothing to send to InfluxDB api", channel()->name());
}
- // Free the snapshot in all code paths (success, failure, nothing-to-send).
- // In the failure path rollback_state() may have copied it into _lastReadingSent,
- // but that is a separate allocation; snapshot_lastReadingSent itself must be freed here.
- delete snapshot_lastReadingSent;
+ // Free the snapshot in all code paths.
+ // In the failure+Case C path rollback_state() restored _lastReadingSent from it,
+ // but snapshot_lastReadingSent itself is a separate allocation that must always
+ // be freed here.
+delete snapshot_lastReadingSent;
if (curlSessionProvider) {
// release our curl session
From 11fb0ce4b72c518fa1004dbfea2cc212692db25c Mon Sep 17 00:00:00 2001
From: Alex <6953309+isarrider@users.noreply.github.com>
Date: Sun, 19 Apr 2026 22:30:24 +0200
Subject: [PATCH 6/8] Fix formatting issues in InfluxDB.cpp
---
src/api/InfluxDB.cpp | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/src/api/InfluxDB.cpp b/src/api/InfluxDB.cpp
index 3f692aa5..987d5023 100644
--- a/src/api/InfluxDB.cpp
+++ b/src/api/InfluxDB.cpp
@@ -294,7 +294,7 @@ void vz::api::InfluxDB::send() {
// — loop may allocate it; must free and null on rollback
// C) duplicates>0, _lastReadingSent was already set before loop
// — must restore its value, not just the pointer
-const int64_t snapshot_last_timestamp = _last_timestamp;
+ const int64_t snapshot_last_timestamp = _last_timestamp;
Reading *snapshot_lastReadingSent = nullptr;
if (_lastReadingSent) {
snapshot_lastReadingSent = new Reading(*_lastReadingSent);
@@ -449,7 +449,7 @@ const int64_t snapshot_last_timestamp = _last_timestamp;
// In the failure+Case C path rollback_state() restored _lastReadingSent from it,
// but snapshot_lastReadingSent itself is a separate allocation that must always
// be freed here.
-delete snapshot_lastReadingSent;
+ delete snapshot_lastReadingSent;
if (curlSessionProvider) {
// release our curl session
From ecb986fe9fc1c3b2dacaa8ae5a8e153e163c31fb Mon Sep 17 00:00:00 2001
From: Alex <6953309+isarrider@users.noreply.github.com>
Date: Sun, 19 Apr 2026 22:31:03 +0200
Subject: [PATCH 7/8] Fix indentation for CURL error handling
---
src/api/InfluxDB.cpp | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/api/InfluxDB.cpp b/src/api/InfluxDB.cpp
index 987d5023..01f222ca 100644
--- a/src/api/InfluxDB.cpp
+++ b/src/api/InfluxDB.cpp
@@ -431,7 +431,7 @@ void vz::api::InfluxDB::send() {
rollback_state(); // restore _last_timestamp and _lastReadingSent so all
// buffered readings are retried on next send() call
- if (curl_code != CURLE_OK) {
+ if (curl_code != CURLE_OK) {
print(log_error, "CURL Error: %s", channel()->name(),
curl_easy_strerror(curl_code));
}
From 04bd7f2237cf7bff3d7803c588eae031d6e67898 Mon Sep 17 00:00:00 2001
From: Alex <6953309+isarrider@users.noreply.github.com>
Date: Tue, 5 May 2026 19:23:26 +0200
Subject: [PATCH 8/8] Log undelivered readings count for InfluxDB buffer
---
src/api/InfluxDB.cpp | 4 ++++
1 file changed, 4 insertions(+)
diff --git a/src/api/InfluxDB.cpp b/src/api/InfluxDB.cpp
index 01f222ca..163b8c0d 100644
--- a/src/api/InfluxDB.cpp
+++ b/src/api/InfluxDB.cpp
@@ -440,6 +440,10 @@ void vz::api::InfluxDB::send() {
print(log_error, "InfluxDB response was %s", channel()->name(),
_response->get_response().c_str());
}
+ if (buf->size() > 30) {
+ print(log_error, "InfluxDB buffer contains %i undelivered readings",
+ channel()->name(), buf->size());
+ }
}
} else { // there is nothing to send
print(log_info, "Nothing to send to InfluxDB api", channel()->name());