Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/io/http_command.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ struct http_command : public std::enable_shared_from_this<http_command<Request>>
if (span_->uses_tags()) {
span_->add_tag(tracing::attributes::service,
tracing::service_name_for_http_service(request.type));
span_->add_tag(tracing::attributes::operation_id, client_context_id_);
span_->add_tag(tracing::attributes::dispatch::operation_id, client_context_id_);
}

handler_ = std::move(handler);
Expand Down Expand Up @@ -209,7 +209,7 @@ struct http_command : public std::enable_shared_from_this<http_command<Request>>
return;
}
if (span_->uses_tags()) {
span_->add_tag(tracing::attributes::local_id, session_->id());
span_->add_tag(tracing::attributes::dispatch::local_id, session_->id());
}
send();
}
Expand Down
75 changes: 50 additions & 25 deletions core/io/mcbp_command.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,6 @@ struct mcbp_command : public std::enable_shared_from_this<mcbp_command<Manager,
mcbp_command_handler handler{};
std::swap(handler, handler_);
if (span_ != nullptr) {
if (msg) {
const auto server_duration_us =
static_cast<std::uint64_t>(protocol::parse_server_duration_us(msg.value()));
span_->add_tag(tracing::attributes::server_duration, server_duration_us);
server_durations_.emplace_back(std::chrono::microseconds(server_duration_us));
}
span_->end();
span_ = nullptr;
}
Expand Down Expand Up @@ -234,9 +228,6 @@ struct mcbp_command : public std::enable_shared_from_this<mcbp_command<Manager,
{
opaque_ = session_->next_opaque();
request.opaque = *opaque_;
if (span_->uses_tags()) {
span_->add_tag(tracing::attributes::operation_id, fmt::format("0x{:x}", request.opaque));
}
if (request.id.use_collections() && !request.id.is_collection_resolved()) {
if (session_->supports_feature(protocol::hello_feature::collections)) {
auto collection_id = session_->get_collection_uid(request.id.collection_path());
Expand Down Expand Up @@ -269,14 +260,23 @@ struct mcbp_command : public std::enable_shared_from_this<mcbp_command<Manager,
}
}

auto dispatch_span = create_dispatch_span();
session_->write_and_subscribe(
request.opaque,
encoded.data(session_->supports_feature(protocol::hello_feature::snappy)),
[self = this->shared_from_this(), start = std::chrono::steady_clock::now()](
[self = this->shared_from_this(),
start = std::chrono::steady_clock::now(),
dispatch_span = std::move(dispatch_span)](
std::error_code ec,
retry_reason reason,
io::mcbp_message&& msg,
std::optional<key_value_error_map_info> /* error_info */) mutable {
{
const auto server_duration_us =
static_cast<std::uint64_t>(protocol::parse_server_duration_us(msg));
self->server_durations_.emplace_back(std::chrono::microseconds(server_duration_us));
self->close_dispatch_span(dispatch_span, server_duration_us);
}
{
auto latency = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start);
Expand Down Expand Up @@ -306,15 +306,17 @@ struct mcbp_command : public std::enable_shared_from_this<mcbp_command<Manager,
telemetry_recorder->record_latency(category, latency);
}

metrics::metric_attributes attrs{
service_type::key_value,
self->request.observability_identifier,
ec,
self->request.id.bucket(),
self->request.id.scope(),
self->request.id.collection(),
};
self->manager_->meter()->record_value(std::move(attrs), start);
{
metrics::metric_attributes attrs{
service_type::key_value,
self->request.observability_identifier,
ec,
self->request.id.bucket(),
self->request.id.scope(),
self->request.id.collection(),
};
self->manager_->meter()->record_value(std::move(attrs), start);
}

self->retry_backoff.cancel();
if (ec == asio::error::operation_aborted) {
Expand Down Expand Up @@ -397,12 +399,6 @@ struct mcbp_command : public std::enable_shared_from_this<mcbp_command<Manager,
return;
}
session_ = std::move(session);
if (span_->uses_tags())
span_->add_tag(tracing::attributes::remote_socket, session_->remote_address());
if (span_->uses_tags())
span_->add_tag(tracing::attributes::local_socket, session_->local_address());
if (span_->uses_tags())
span_->add_tag(tracing::attributes::local_id, session_->id());
send();
}

Expand All @@ -427,6 +423,35 @@ private:

return attrs;
}

auto create_dispatch_span() const -> std::shared_ptr<couchbase::tracing::request_span>
{
std::shared_ptr<couchbase::tracing::request_span> dispatch_span =
manager_->tracer()->create_span(tracing::operation::step_dispatch, span_);
if (dispatch_span->uses_tags()) {
dispatch_span->add_tag(tracing::attributes::dispatch::network_transport, "tcp");
dispatch_span->add_tag(tracing::attributes::dispatch::operation_id,
fmt::format("0x{:x}", request.opaque));
dispatch_span->add_tag(tracing::attributes::dispatch::local_id, session_->id());
}
return dispatch_span;
}

void close_dispatch_span(const std::shared_ptr<couchbase::tracing::request_span>& dispatch_span,
const std::uint64_t server_duration_us) const
{
if (dispatch_span->uses_tags()) {
dispatch_span->add_tag(tracing::attributes::dispatch::server_duration, server_duration_us);
dispatch_span->add_tag(tracing::attributes::dispatch::server_address,
session_->canonical_hostname());
dispatch_span->add_tag(tracing::attributes::dispatch::server_port,
session_->canonical_port_number());
dispatch_span->add_tag(tracing::attributes::dispatch::peer_address,
session_->remote_hostname());
dispatch_span->add_tag(tracing::attributes::dispatch::peer_port, session_->remote_port());
}
dispatch_span->end();
}
};

} // namespace couchbase::core::operations
60 changes: 58 additions & 2 deletions core/io/mcbp_session.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,16 @@ class mcbp_session_impl
return connection_endpoints_.remote_address_with_port;
}

auto remote_hostname() const -> std::string
{
return connection_endpoints_.remote_address;
}

auto remote_port() const -> std::uint16_t
{
return connection_endpoints_.remote.port();
}

auto local_address() const -> std::string
{
return connection_endpoints_.local_address_with_port;
Expand Down Expand Up @@ -1533,6 +1543,16 @@ class mcbp_session_impl
return bootstrap_port_number_;
}

[[nodiscard]] auto canonical_hostname() const -> const std::string&
{
return canonical_hostname_;
}

[[nodiscard]] auto canonical_port_number() const -> std::uint16_t
{
return canonical_port_number_;
}

[[nodiscard]] auto next_opaque() -> std::uint32_t
{
return ++opaque_;
Expand Down Expand Up @@ -1743,10 +1763,18 @@ class mcbp_session_impl
if (ec) {
return stop(retry_reason::node_not_available);
}
if (node_uuid_.empty() && config_.has_value()) {
if (config_.has_value()) {
for (const auto& node : config_.value().nodes) {
if (node.this_node) {
node_uuid_ = node.node_uuid;
if (node_uuid_.empty()) {
node_uuid_ = node.node_uuid;
}
if (canonical_hostname_.empty()) {
canonical_hostname_ = node.hostname;
}
if (canonical_port_number_ == 0) {
canonical_port_number_ = node.port_or(service_type::key_value, is_tls_, 0);
}
}
}
}
Expand Down Expand Up @@ -2112,6 +2140,10 @@ class mcbp_session_impl
std::optional<error_map> error_map_;
collection_cache collection_cache_;

// Only used for tracing & metrics. They represent the address of the node as given in the config.
std::string canonical_hostname_{};
std::uint16_t canonical_port_number_{};

const bool is_tls_;
std::shared_ptr<impl::bootstrap_state_listener> state_listener_{ nullptr };

Expand Down Expand Up @@ -2241,6 +2273,18 @@ mcbp_session::remote_address() const -> std::string
return impl_->remote_address();
}

auto
mcbp_session::remote_hostname() const -> std::string
{
return impl_->remote_hostname();
}

auto
mcbp_session::remote_port() const -> std::uint16_t
{
return impl_->remote_port();
}

auto
mcbp_session::local_address() const -> std::string
{
Expand Down Expand Up @@ -2277,6 +2321,18 @@ mcbp_session::last_bootstrap_error() const& -> const std::optional<impl::bootstr
return impl_->last_bootstrap_error();
}

auto
mcbp_session::canonical_hostname() const -> const std::string&
{
return impl_->canonical_hostname();
}

auto
mcbp_session::canonical_port_number() const -> std::uint16_t
{
return impl_->canonical_port_number();
}

void
mcbp_session::write_and_subscribe(std::uint32_t opaque,
std::vector<std::byte>&& data,
Expand Down
4 changes: 4 additions & 0 deletions core/io/mcbp_session.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,17 @@ public:
[[nodiscard]] auto id() const -> const std::string&;
[[nodiscard]] auto node_uuid() const -> const std::string&;
[[nodiscard]] auto remote_address() const -> std::string;
[[nodiscard]] auto remote_hostname() const -> std::string;
[[nodiscard]] auto remote_port() const -> std::uint16_t;
[[nodiscard]] auto local_address() const -> std::string;
[[nodiscard]] auto bootstrap_address() const -> const std::string&;
[[nodiscard]] auto bootstrap_hostname() const -> const std::string&;
[[nodiscard]] auto bootstrap_port() const -> const std::string&;
[[nodiscard]] auto bootstrap_port_number() const -> std::uint16_t;
[[nodiscard]] auto last_bootstrap_error() && -> std::optional<impl::bootstrap_error>;
[[nodiscard]] auto last_bootstrap_error() const& -> const std::optional<impl::bootstrap_error>&;
[[nodiscard]] auto canonical_hostname() const -> const std::string&;
[[nodiscard]] auto canonical_port_number() const -> std::uint16_t;
void write_and_flush(std::vector<std::byte>&& buffer);
void write_and_subscribe(const std::shared_ptr<mcbp::queue_request>&,
const std::shared_ptr<response_handler>& handler);
Expand Down
23 changes: 16 additions & 7 deletions core/tracing/constants.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ namespace couchbase::core::tracing
{
namespace operation
{
constexpr auto step_dispatch = "cb.dispatch_to_server";
constexpr auto step_request_encoding = "cb.request_encoding";
constexpr auto step_dispatch = "dispatch_to_server";
constexpr auto step_request_encoding = "request_encoding";

constexpr auto http_query = "cb.query";
constexpr auto http_analytics = "cb.analytics";
constexpr auto http_search = "cb.search";
Expand Down Expand Up @@ -77,7 +78,7 @@ constexpr auto mcbp_internal = "cb.internal";

namespace attributes
{
constexpr auto system = "db.system";
constexpr auto system = "db.system.name";
constexpr auto cluster_name = "db.couchbase.cluster_name";
constexpr auto cluster_uuid = "db.couchbase.cluster_uuid";

Expand All @@ -86,12 +87,20 @@ constexpr auto component = "db.couchbase.component";
constexpr auto instance = "db.instance";

constexpr auto service = "cb.service";
constexpr auto operation_id = "cb.operation_id";

constexpr auto server_duration = "cb.server_duration";
constexpr auto local_id = "cb.local_id";
constexpr auto local_socket = "cb.local_socket";
constexpr auto remote_socket = "cb.remote_socket";

namespace dispatch
{
constexpr auto server_duration = "db.couchbase.server_duration";
constexpr auto local_id = "db.couchbase.local_id";
constexpr auto server_address = "server.address";
constexpr auto server_port = "server.port";
constexpr auto peer_address = "network.peer.address";
constexpr auto peer_port = "network.peer.port";
constexpr auto network_transport = "network.transport";
constexpr auto operation_id = "db.couchbase.operation_id";
} // namespace dispatch
} // namespace attributes

namespace service
Expand Down
Loading
Loading