Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gtest/CMakeLists.txt.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
cmake_minimum_required(VERSION 2.8.2)
cmake_minimum_required(VERSION 3.5)

project(gtest-download NONE)

Expand Down
2 changes: 1 addition & 1 deletion include/Buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class Buffer {
enum aggmode { NONE, MAX, AVG, SUM };

Buffer();
virtual ~Buffer();
~Buffer();

void aggregate(int aggtime, bool aggFixedInterval);
void push(const Reading &rd);
Expand Down
164 changes: 164 additions & 0 deletions include/TransferBuffer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
#ifndef _TransferBuffer_hpp_
#define _TransferBuffer_hpp_

#include "Buffer.hpp"
#include "Reading.hpp"
#include "common.h"

#include <limits>
#include <mutex>
#include <vector>

namespace vz {

/**
* Buffer including history elemetns
*
* Basically a standard vector which keeps n elements of history before the
* current elements
*
*/
class TransferBuffer {
private:
typedef std::vector<Reading> Base;

public:
typedef Base::iterator iterator;
typedef Base::const_iterator const_iterator;
typedef Base::reference reference;
typedef Base::const_reference const_reference;
typedef Base::size_type size_type;
typedef Base::value_type value_type;

static constexpr size_type default_target_capacity = 4096;

TransferBuffer(size_type nmax = default_target_capacity)
: _history_size(0), _target_capacity(nmax) {
_impl.reserve(nmax);
}

TransferBuffer(const TransferBuffer &) = default;
TransferBuffer(TransferBuffer &&) = default;

iterator begin() noexcept { return _impl.begin() + _history_size; }
const_iterator begin() const noexcept { return _impl.begin() + _history_size; }
iterator end() noexcept { return _impl.end(); }
const_iterator end() const noexcept { return _impl.end(); }

bool empty() const noexcept { return begin() == end(); }
size_type size() const { return end() - begin(); }
void reserve(size_type n) { _impl.reserve(n + _history_size); }
size_type capacity() const { return _impl.capacity() - _history_size; }
size_type target_capacity() const { return _target_capacity; }

reference front() { return *begin(); }
const_reference front() const { return *begin(); }
reference back() { return _impl.back(); }
const_reference back() const { return _impl.back(); }

/**
* Clear all values, including history
*
*/
void clear() { _impl.clear(); }

/**
* Shrink buffer to target capacity if it
*
*/
void shrink_to_target_capacity() {
if (capacity() <= _target_capacity)
return;
Base other;
other.reserve(_target_capacity);
other.assign(_impl.begin(), _impl.end());
_impl.swap(other);
}

/**
* Delete readings at front and optionally keep a few
*
* Discards at most n values from the buffer. The last keep elements are
* kept as history.
*
* @param n Number of elements to discard. Defaults to all elements.
* @param keep Maximum number of elements to keep. Defaults to one. If
* this value is larger than the current size(), it will be reduced to
* size().
* @return Number of values discarded from buffer
*/
size_type discard(size_type n = std::numeric_limits<size_type>::max(), size_type keep = 1) {
iterator last(begin() + std::min(n, size()));
size_type ndel(last - begin());
_history_size = std::min(keep, _impl.size());

if (last - _history_size > _impl.begin())
_impl.erase(_impl.begin(), last - _history_size);

// avoid permanent large memory footprint
if (capacity() > 4 * _target_capacity)
shrink_to_target_capacity();
return ndel;
}

/**
* Copy readings from Buffer and mark them deleted
*
* @param src Buffer to copy from. Copied readings are marked deleted, but
* are not erased.
* @param Channel name used in debug message
* @param min_ms_between_duplicates Consecutive duplicate values are not sent if
* their respective time stamps are less than this value apart.
* @return Number of elements copied from buffer
*
*/
size_type append(Buffer &src, const char *channel, int64_t min_ms_between_duplicates = 0) {
std::lock_guard<Buffer> lock(src);

// advance it to first reading not deleted
auto it(src.begin()), last(src.end());
for (; it != last && it->deleted(); ++it) { /* do nothing */
}

if (it == last)
return 0;

iterator old_end(end());

if (_impl.empty()) {
// we do not have any prev. readings -> accept unconditionally
_impl.push_back(*it);
it->mark_delete();
++it;
}

int64_t t, dt; // timestamp of first and delta time with prev [ms]
for (; it != last; ++it) {
if (it->deleted())
continue;
iterator prev(--_impl.end());
t = it->time_ms();
dt = t - prev->time_ms();
print(log_debug, "compare: %lld %lld", channel, t - dt, t);

// force time to be strictly monotonically increasing
// note: only works if logger is not restarted (and history is lost)
// dt == 0 is possible if dt < 1000 us
// skip if we have a duplicate and min time interval between duplicates is not reached
if (dt > 0 && (dt >= min_ms_between_duplicates || it->value() != prev->value()))
_impl.push_back(*it);

it->mark_delete();
}
return end() - old_end;
}

private:
std::vector<Reading> _impl;
size_type _history_size;
size_type _target_capacity;
}; // class TransferBuffer

} // namespace vz

#endif // _TransferBuffer_hpp_
11 changes: 5 additions & 6 deletions include/api/InfluxDB.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

#include <ApiIF.hpp>
#include <Options.hpp>
#include <TransferBuffer.hpp>
#include <api/CurlIF.hpp>
#include <api/CurlResponse.hpp>
#include <common.h>
Expand All @@ -42,6 +43,7 @@ namespace api {
class InfluxDB : public ApiIF {
public:
typedef vz::shared_ptr<ApiIF> Ptr;
typedef TransferBuffer::size_type size_type;

InfluxDB(const Channel::Ptr &ch, const std::list<Option> &options);
~InfluxDB();
Expand All @@ -65,16 +67,13 @@ class InfluxDB : public ApiIF {
std::string _tags;
std::string _url;
int _max_batch_inserts;
int _max_buffer_size;
size_type _max_buffer_size;
unsigned int _curl_timeout;
bool _send_uuid;
bool _ssl_verifypeer;
std::list<Reading> _values;
CurlResponse::Ptr _response;

int64_t _last_timestamp; /* remember last timestamp */
// duplicates support:
Reading *_lastReadingSent;
TransferBuffer _buffer;
std::stringstream _request;

typedef struct {
CURL *curl;
Expand Down
12 changes: 3 additions & 9 deletions include/api/Volkszaehler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <stdint.h>

#include "Buffer.hpp"
#include "TransferBuffer.hpp"
#include <ApiIF.hpp>
#include <Options.hpp>

Expand All @@ -59,7 +60,6 @@ class Volkszaehler : public ApiIF {
typedef vz::shared_ptr<ApiIF> Ptr;

Volkszaehler(Channel::Ptr ch, std::list<Option> options);
~Volkszaehler();

void send();

Expand All @@ -71,6 +71,8 @@ class Volkszaehler : public ApiIF {
std::string _middleware;
unsigned int _curlTimeout;
std::string _url;
api_handle_t _api;
TransferBuffer _buffer;

/**
* Create JSON object of tuples
Expand All @@ -87,14 +89,6 @@ class Volkszaehler : public ApiIF {
void api_parse_exception(CURLresponse response, char *err, size_t n);

private:
api_handle_t _api;

// Volatil
std::list<Reading> _values;
int64_t _last_timestamp; /**< remember last timestamp */
// duplicate support:
Reading *_lastReadingSent;

}; // class Volkszaehler

/**
Expand Down
Loading
Loading