From 111f621e630ba7c18c63c643034ced30330bcb4b Mon Sep 17 00:00:00 2001 From: Ava Hahn Date: Wed, 5 Mar 2025 09:58:35 -0800 Subject: [PATCH 1/2] Configurable span generation for upstream calls (fix: #17) This commit adds support to configure client span generation for upstream requests. Client span generation can be configured per upstream using a configuration snippet like the one below: upstream echo.sunnypup.io { otel_upstream_span_enable; server echo.sunnypup.io:443; } The primary vehicle for this is the load balancer API. Existing load balancer callbacks and data are stored in a new context type. New callbacks are added for getting, freeing requests as well as setting and saving ssl sessions. These modifications are made if and only if the otel_upstream_span_enable configuration directive is made (per upstream). In addition to the above, A new enumeration is made in the batch exporter for span type. Currently Client and Server are recognized, however this can be extended as needed in the future. This enum is matched with corresponding elements in the underlying opentelemetry library. The logic path between getOtelCtx/ensureOtelCtx/onRequestStart is modified to make sure that even upstream calls made by subrequests have the correct metadata needed for proper span propagation. Additionally, some logic is refactored out of onRequestEnd for reuse in the upstream span generation. Finally, a new routine is added to set client span specific attrs. Note: these changes do require that NGINX is compiled with SSL support. This is needed because use of the load balancer API, specifically wrapping around the data argument to callbacks, must be handled properly when setting SSL sessions or else a memory corruption error can be triggered by the round robin SSL handlers recieving bad data. Signed-off-by: Ava Hahn --- src/batch_exporter.hpp | 6 +- src/http_module.cpp | 422 ++++++++++++++++++++++++++++++++++------- 2 files changed, 354 insertions(+), 74 deletions(-) diff --git a/src/batch_exporter.hpp b/src/batch_exporter.hpp index a2e65b12..285024be 100644 --- a/src/batch_exporter.hpp +++ b/src/batch_exporter.hpp @@ -21,6 +21,10 @@ class BatchExporter { opentelemetry::trace::SpanId parent; uint64_t start; uint64_t end; + enum SpanKind{ + CLIENT = opentelemetry::proto::trace::v1::Span::SPAN_KIND_CLIENT, + SERVER = opentelemetry::proto::trace::v1::Span::SPAN_KIND_SERVER + } type; }; class Span { @@ -32,7 +36,7 @@ class BatchExporter { : span(span) { span->set_kind( - opentelemetry::proto::trace::v1::Span::SPAN_KIND_SERVER); + (opentelemetry::proto::trace::v1::Span::SpanKind) info.type); // Short setters, like set_name(), use additional std::string as an // intermediary at least up to v21.5 of protobuf. diff --git a/src/http_module.cpp b/src/http_module.cpp index 78a5e896..4681f3cc 100644 --- a/src/http_module.cpp +++ b/src/http_module.cpp @@ -17,6 +17,20 @@ struct OtelCtx { TraceContext current; }; +struct OtelUpstreamCtx { + OtelCtx *reqCtx; + ngx_http_request_t *req; + void *data; // original peer conn data + ngx_uint_t childProcessingStart; + ngx_int_t (*originalGetRequest) (ngx_peer_connection_t *pc, void *data); + void (*originalFreeRequest)(ngx_peer_connection_t *pc, void *data, + ngx_uint_t state); + void (*originalNotify) (ngx_peer_connection_t *pc, void *data, + ngx_uint_t type); + ngx_int_t (*originalSetSession) (ngx_peer_connection_t *pc, void *data); + void (*originalSaveSession)(ngx_peer_connection_t *pc, void *data); +}; + struct MainConfBase { ngx_str_t endpoint; ngx_msec_t interval; @@ -46,11 +60,17 @@ struct LocationConf { ngx_array_t spanAttrs; }; +struct ServerConf { + ngx_http_upstream_init_pt original_init_upstream; + ngx_http_upstream_init_peer_pt original_init_peer; +}; + char* setExporter(ngx_conf_t* cf, ngx_command_t* cmd, void* conf); char* addResourceAttr(ngx_conf_t* cf, ngx_command_t* cmd, void* conf); char* addSpanAttr(ngx_conf_t* cf, ngx_command_t* cmd, void* conf); char* setTrustedCertificate(ngx_conf_t* cf, ngx_command_t* cmd, void* conf); char* addExporterHeader(ngx_conf_t* cf, ngx_command_t* cmd, void* conf); +char* onUpstreamConfiguration(ngx_conf_t* cf, ngx_command_t *cmd, void *conf); namespace Propagation { @@ -107,6 +127,11 @@ ngx_command_t gCommands[] = { addSpanAttr, NGX_HTTP_LOC_CONF_OFFSET }, + { ngx_string("otel_upstream_span_enable"), + NGX_HTTP_UPS_CONF|NGX_CONF_NOARGS, + onUpstreamConfiguration, + NGX_HTTP_SRV_CONF_OFFSET }, + ngx_null_command }; @@ -159,6 +184,11 @@ ngx_str_t toNgxStr(StrView str) return ngx_str_t{str.size(), (u_char*)str.data()}; } +uint64_t toNanoSec(time_t sec, ngx_msec_t msec) +{ + return (sec * 1000 + msec) * 1000000; +} + bool iremovePrefix(ngx_str_t* str, StrView p) { if (str->len >= p.size() && @@ -171,6 +201,10 @@ bool iremovePrefix(ngx_str_t* str, StrView p) return false; } +void cleanupOtelCtx(void* data) +{ +} + MainConf* getMainConf(ngx_conf_t* cf) { return static_cast( @@ -188,8 +222,21 @@ LocationConf* getLocationConf(ngx_http_request_t* r) return (LocationConf*)ngx_http_get_module_loc_conf(r, gHttpModule); } -void cleanupOtelCtx(void* data) +OtelCtx* createOtelCtx(ngx_http_request_t* r) { + static_assert(std::is_trivially_destructible::value, ""); + + auto storage = ngx_pool_cleanup_add(r->pool, sizeof(OtelCtx)); + if (storage == NULL) { + return NULL; + } + + storage->handler = cleanupOtelCtx; + + auto ctx = new (storage->data) OtelCtx{}; + ngx_http_set_ctx(r, ctx, gHttpModule); + + return ctx; } OtelCtx* getOtelCtx(ngx_http_request_t* r) @@ -198,11 +245,26 @@ OtelCtx* getOtelCtx(ngx_http_request_t* r) // restore module context if it was reset by e.g. internal redirect if (ctx == NULL && (r->internal || r->filter_finalize)) { - for (auto cln = r->pool->cleanup; cln; cln = cln->next) { if (cln->handler == cleanupOtelCtx) { - ctx = (OtelCtx*)cln->data; - ngx_http_set_ctx(r, ctx, gHttpModule); + + // restore module context if it was reset by finalize filter + if (r->filter_finalize) { + ctx = (OtelCtx*)cln->data; + ngx_http_set_ctx(r, ctx, gHttpModule); + + // create child context if it was reset by internal redirect + } else if (r->internal) { + auto ctx_orig = (OtelCtx*)cln->data; + ctx = createOtelCtx(r); + if (ctx == NULL) { + return NULL; + } + ctx->parent = ctx_orig->current; + ctx->current = + TraceContext::generate(false, ctx->parent); + } + break; } } @@ -211,23 +273,6 @@ OtelCtx* getOtelCtx(ngx_http_request_t* r) return ctx; } -OtelCtx* createOtelCtx(ngx_http_request_t* r) -{ - static_assert(std::is_trivially_destructible::value, ""); - - auto storage = ngx_pool_cleanup_add(r->pool, sizeof(OtelCtx)); - if (storage == NULL) { - return NULL; - } - - storage->handler = cleanupOtelCtx; - - auto ctx = new (storage->data) OtelCtx{}; - ngx_http_set_ctx(r, ctx, gHttpModule); - - return ctx; -} - ngx_table_elt_t* findHeader(ngx_list_t* list, ngx_uint_t hash, StrView key) { auto part = &list->part; @@ -338,6 +383,10 @@ OtelCtx* ensureOtelCtx(ngx_http_request_t* r) return ctx; } + if (r->internal) { + return NULL; + } + ctx = createOtelCtx(r); if (!ctx) { return NULL; @@ -355,14 +404,9 @@ OtelCtx* ensureOtelCtx(ngx_http_request_t* r) ngx_int_t onRequestStart(ngx_http_request_t* r) { - // don't let internal redirects to override sampling decision - if (r->internal) { - return NGX_DECLINED; - } - bool sampled = false; - auto lcf = getLocationConf(r); + if (lcf->trace != NULL) { ngx_str_t trace; if (ngx_http_complex_value(r, lcf->trace, &trace) != NGX_OK) { @@ -372,7 +416,7 @@ ngx_int_t onRequestStart(ngx_http_request_t* r) sampled = toStrView(trace) == "on" || toStrView(trace) == "1"; } - if (!lcf->traceContext && !sampled) { + if (!lcf->traceContext && !sampled && !r->internal) { return NGX_DECLINED; } @@ -381,6 +425,9 @@ ngx_int_t onRequestStart(ngx_http_request_t* r) return NGX_ERROR; } + if (r->internal) { + sampled = ctx->parent.sampled; + } ctx->current.sampled = sampled; ngx_int_t rc = NGX_OK; @@ -405,22 +452,19 @@ StrView getServerName(ngx_http_request_t* r) return toStrView(name); } -void addDefaultAttrs(BatchExporter::Span& span, ngx_http_request_t* r) +void addDefaultSrvAttrs(BatchExporter::Span& span, ngx_http_request_t* r) { // based on trace semantic conventions for HTTP from 1.16.0 OTel spec span.add("http.method", toStrView(r->method_name)); - span.add("http.target", toStrView(r->unparsed_uri)); - + span.add("http.scheme", r->connection->ssl ? "https" : "http"); auto clcf = (ngx_http_core_loc_conf_t*) ngx_http_get_module_loc_conf(r, ngx_http_core_module); if (clcf->name.len) { span.add("http.route", toStrView(clcf->name)); } - span.add("http.scheme", r->connection->ssl ? "https" : "http"); - auto protocol = toStrView(r->http_protocol); if (protocol.size() > 5) { // "HTTP/" span.add("http.flavor", protocol.substr(5)); @@ -433,20 +477,9 @@ void addDefaultAttrs(BatchExporter::Span& span, ngx_http_request_t* r) auto received = r->headers_in.content_length_n; span.add("http.request_content_length", received > 0 ? received : 0); - auto sent = r->connection->sent - (off_t)r->header_size; - span.add("http.response_content_length", sent > 0 ? sent : 0); - - auto status = r->err_status ? r->err_status : r->headers_out.status; - if (status) { - span.add("http.status_code", status); - - if (status >= 500) { - span.setError(); - } - } - span.add("net.host.name", getServerName(r)); - + span.add("net.sock.peer.addr", toStrView(r->connection->addr_text)); + span.add("net.sock.peer.port", ngx_inet_get_port(r->connection->sockaddr)); if (ngx_connection_local_sockaddr(r->connection, NULL, 0) == NGX_OK) { auto port = ngx_inet_get_port(r->connection->local_sockaddr); auto defaultPort = r->connection->ssl ? 443 : 80; @@ -456,12 +489,100 @@ void addDefaultAttrs(BatchExporter::Span& span, ngx_http_request_t* r) } } - span.add("net.sock.peer.addr", toStrView(r->connection->addr_text)); - span.add("net.sock.peer.port", ngx_inet_get_port(r->connection->sockaddr)); + auto sent = r->connection->sent - (off_t)r->header_size; + span.add("http.response_content_length", sent > 0 ? sent : 0); + + auto status = r->err_status ? r->err_status : r->headers_out.status; + if (status) { + span.add("http.status_code", status); + + if (status >= 500) { + span.setError(); + } + } } -StrView getSpanName(ngx_http_request_t* r) +void addDefaultClientAttrs(BatchExporter::Span& span, ngx_http_request_t* r) { + if (!r->upstream->peer.name) { + throw std::runtime_error("upstream peer has no name"); + } + + if (!r->upstream->schema.data || !r->upstream->schema.len) { + throw std::runtime_error("upstream has no schema"); + } + + if (!r->upstream->uri.data || !r->upstream->uri.len) { + throw std::runtime_error("upstream has no uri"); + } + + if (!r->upstream->upstream || !r->upstream->upstream->host.len) { + throw std::runtime_error("upstream has no upstream or empty host"); + } + + uint64_t addr_len = 0; + // set len at last instance of ':' + for (int i = 0; i < r->upstream->peer.name->len; i++) { + if (r->upstream->peer.name->data[i] == ':') { + addr_len = i; + } + } + + addr_len = addr_len ? addr_len : r->upstream->peer.name->len; + StrView addr((const char *) r->upstream->peer.name->data, addr_len); + span.add("server.address", addr); + + if (addr_len < r->upstream->peer.name->len) { + uint64_t port_len = (r->upstream->peer.name->len - addr_len) + 1; + StrView port((const char *) r->upstream->peer.name->data + addr_len + 1, + port_len); + span.add("server.port", port); + } + + uint64_t url_len = r->upstream->schema.len + + r->upstream->upstream->host.len + + r->upstream->uri.len + 3; + char *url_buf = (char *) ngx_palloc(r->pool, url_len); + if (!ngx_cpystrn((u_char *) url_buf, r->upstream->schema.data, + r->upstream->schema.len + 1)) { + throw std::runtime_error("failed to copy URL schema"); + } + char *cursor = url_buf + r->upstream->schema.len + 1; + if (!ngx_cpystrn((u_char *) cursor, r->upstream->upstream->host.data, + r->upstream->upstream->host.len + 1)) { + throw std::runtime_error("failed to copy URL host"); + } + cursor += r->upstream->upstream->host.len + 1; + if (!ngx_cpystrn((u_char *) cursor, r->upstream->uri.data, + r->upstream->uri.len + 1)) { + throw std::runtime_error("failed to copy URL path"); + } + + StrView url(url_buf, url_len); + span.add("url.full", url); + + auto sent = r->connection->sent - (off_t)r->header_size; + span.add("http.response_content_length", sent > 0 ? sent : 0); + + if (r->upstream->method.len) { + span.add("http.request.method", toStrView(r->upstream->method)); + } else { + span.add("http.request.method", toStrView(r->method_name)); + } + + auto status = r->err_status ? r->err_status : r->headers_out.status; + if (status) { + span.add("http.status_code", status); + + if (status >= 500) { + span.setError(); + } + } +} + +StrView getSrvSpanName( + ngx_http_request_t* r +) { auto lcf = getLocationConf(r); if (lcf->spanName) { @@ -469,12 +590,11 @@ StrView getSpanName(ngx_http_request_t* r) if (ngx_http_complex_value(r, lcf->spanName, &result) != NGX_OK) { throw std::runtime_error("failed to compute complex value"); } - return toStrView(result); + } else { auto clcf = (ngx_http_core_loc_conf_t*) ngx_http_get_module_loc_conf(r, ngx_http_core_module); - return toStrView(clcf->name); } } @@ -502,6 +622,21 @@ void addCustomAttrs(BatchExporter::Span& span, ngx_http_request_t* r) } } +// bool return for convenience in short circuiting +bool log_drop(ngx_http_request_t *r) { + static size_t dropped = 0; + static time_t lastLog = 0; + + ++dropped; + if (lastLog != ngx_time()) { + lastLog = ngx_time(); + ngx_log_error(NGX_LOG_NOTICE, r->connection->log, 0, + "OTel dropped records: %uz", dropped); + } + + return true; +} + ngx_int_t onRequestEnd(ngx_http_request_t* r) { auto ctx = getOtelCtx(r); @@ -511,31 +646,21 @@ ngx_int_t onRequestEnd(ngx_http_request_t* r) auto now = ngx_timeofday(); - auto toNanoSec = [](time_t sec, ngx_msec_t msec) -> uint64_t { - return (sec * 1000 + msec) * 1000000; - }; + // subrequests can only make upstream spans + if (r->internal) { + return NGX_DECLINED; + } try { - BatchExporter::SpanInfo info{ - getSpanName(r), ctx->current, ctx->parent.spanId, + BatchExporter::SpanInfo server{ + getSrvSpanName(r), ctx->current, ctx->parent.spanId, toNanoSec(r->start_sec, r->start_msec), - toNanoSec(now->sec, now->msec)}; - - bool ok = gExporter->add(info, [r](BatchExporter::Span& span) { - addDefaultAttrs(span, r); + toNanoSec(now->sec, now->msec), + BatchExporter::SpanInfo::SpanKind::SERVER}; + gExporter->add(server, [r](BatchExporter::Span& span) { + addDefaultSrvAttrs(span, r); addCustomAttrs(span, r); - }); - - if (!ok) { - static size_t dropped = 0; - static time_t lastLog = 0; - ++dropped; - if (lastLog != ngx_time()) { - lastLog = ngx_time(); - ngx_log_error(NGX_LOG_NOTICE, r->connection->log, 0, - "OTel dropped records: %uz", dropped); - } - } + }) || log_drop(r); } catch (const std::exception& e) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, @@ -546,6 +671,151 @@ ngx_int_t onRequestEnd(ngx_http_request_t* r) return NGX_DECLINED; } +ngx_int_t onUpstreamStart(ngx_peer_connection_t *pc, void *data) +{ + OtelUpstreamCtx *d = (OtelUpstreamCtx *) data; + auto now = ngx_timeofday(); + + if (!d) { + return NGX_ERROR; + } + + d->childProcessingStart = (now->sec * 1000 + now->msec) * 1000000; + + if (d->originalGetRequest) { + return d->originalGetRequest(pc, d->data); + } else { + return NGX_OK; + } +} + +void onUpstreamFinish(ngx_peer_connection_t *pc, void *data, ngx_uint_t state) +{ + OtelUpstreamCtx *d = (OtelUpstreamCtx *) data; + if (!d) { + return; + } + + auto now = ngx_timeofday(); + if (d->reqCtx && d->req) { + auto childContext = \ + TraceContext::generate(d->reqCtx->current.sampled, d->reqCtx->current); + BatchExporter::SpanInfo child{ + toStrView(d->req->uri), childContext, d->reqCtx->current.spanId, + d->childProcessingStart, toNanoSec(now->sec, now->msec), + BatchExporter::SpanInfo::SpanKind::CLIENT}; + gExporter->add(child, [d](BatchExporter::Span& span) { + addDefaultClientAttrs(span, d->req); + addCustomAttrs(span, d->req); + }) || log_drop(d->req); + } + + if (d->originalFreeRequest) { + d->originalFreeRequest(pc, d->data, state); + } +} + +void onUpstreamNotify(ngx_peer_connection_t *pc, void *data, ngx_uint_t type) +{ + OtelUpstreamCtx *d = (OtelUpstreamCtx *) data; + if (d && d->originalNotify) { + d->originalNotify(pc, d->data, type); + } +} + +ngx_int_t onUpstreamSetSes(ngx_peer_connection_t *pc, void *data) +{ + OtelUpstreamCtx *d = (OtelUpstreamCtx *) data; + if (d && d->originalSetSession) { + return d->originalSetSession(pc, d->data); + } + + // This path should never happen + return NGX_ERROR; +} + +void onUpstreamSaveSes(ngx_peer_connection_t *pc, void *data) +{ + OtelUpstreamCtx *d = (OtelUpstreamCtx *) data; + if (d && d->originalSaveSession) { + d->originalSaveSession(pc, d->data); + } +} + +ngx_int_t +onUpstreamInitPeer(ngx_http_request_t *r, ngx_http_upstream_srv_conf_t *us) +{ + if (!r->upstream) { + return NGX_DECLINED; + } + + // run original init func + auto kcf = (ServerConf *) ngx_http_conf_upstream_srv_conf(us, gHttpModule); + if (kcf->original_init_peer && kcf->original_init_peer(r, us) != NGX_OK) { + return NGX_ERROR; + } + + auto ctx = getOtelCtx(r); + if (!ctx) { + return NGX_ERROR; + } + + if (ctx->current.sampled) { + auto dat = (OtelUpstreamCtx *) ngx_palloc(r->pool, sizeof(OtelUpstreamCtx)); + dat->data = r->upstream->peer.data; + dat->req = r; + dat->reqCtx = ctx; + dat->originalGetRequest = r->upstream->peer.get; + dat->originalFreeRequest = r->upstream->peer.free; + dat->originalNotify = r->upstream->peer.notify; + + r->upstream->peer.data = dat; + r->upstream->peer.get = onUpstreamStart; + r->upstream->peer.free = onUpstreamFinish; + r->upstream->peer.notify = onUpstreamNotify; + + dat->originalSetSession = r->upstream->peer.set_session; + dat->originalSaveSession = r->upstream->peer.save_session; + r->upstream->peer.set_session = onUpstreamSetSes; + r->upstream->peer.save_session = onUpstreamSaveSes; + } + + return NGX_OK; +} + +ngx_int_t +onUpstreamInit(ngx_conf_t *cf, ngx_http_upstream_srv_conf_t *us) +{ + ServerConf *conf = + (ServerConf *) ngx_http_conf_upstream_srv_conf(us, gHttpModule); + + if (conf->original_init_upstream + && conf->original_init_upstream(cf, us) != NGX_OK) { + return NGX_ERROR; + } + + conf->original_init_peer = us->peer.init; + us->peer.init = onUpstreamInitPeer; + + return NGX_OK; +} + +char * +onUpstreamConfiguration(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +{ + ngx_http_upstream_srv_conf_t *uscf; + ServerConf *sc = (ServerConf *) conf; + + uscf = (ngx_http_upstream_srv_conf_t *) + ngx_http_conf_get_module_srv_conf(cf, ngx_http_upstream_module); + sc->original_init_upstream = + (ngx_http_upstream_init_pt) uscf->peer.init_upstream + ? uscf->peer.init_upstream + : ngx_http_upstream_init_round_robin; + uscf->peer.init_upstream = &onUpstreamInit; + return NGX_CONF_OK; +} + ngx_int_t initModule(ngx_conf_t* cf) { auto cmcf = (ngx_http_core_main_conf_t*)ngx_http_conf_get_module_main_conf( @@ -972,6 +1242,12 @@ char* mergeLocationConf(ngx_conf_t* cf, void* parent, void* child) return NGX_CONF_OK; } +void * +createSrvConf(ngx_conf_t *cf) +{ + return ngx_pcalloc(cf->pool, sizeof(ServerConf)); +} + ngx_http_module_t gHttpModuleCtx = { addVariables, /* preconfiguration */ initModule, /* postconfiguration */ @@ -979,7 +1255,7 @@ ngx_http_module_t gHttpModuleCtx = { createMainConf, /* create main configuration */ initMainConf, /* init main configuration */ - NULL, /* create server configuration */ + createSrvConf, /* create server configuration */ NULL, /* merge server configuration */ createLocationConf, /* create location configuration */ From 35d342a9cd60dd17a826168bbe5976c1340f069e Mon Sep 17 00:00:00 2001 From: Ava Hahn Date: Wed, 21 May 2025 15:59:22 -0700 Subject: [PATCH 2/2] Additional testing for upstream spans This commit adds some loose testing for the functionality included in the previous commit. A new test is minted that verifies the type, quantity, and return code of client and server spans after a request that leverages an upstream. Signed-off-by: Ava Hahn --- tests/test_otel.py | 36 ++++++++++++++++++++++++++++-------- tests/trace_service.py | 12 ++++++++---- 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/tests/test_otel.py b/tests/test_otel.py index fef771a5..f1468ddf 100644 --- a/tests/test_otel.py +++ b/tests/test_otel.py @@ -32,6 +32,11 @@ otel_trace on; {{ resource_attrs }} + upstream uptest { + otel_upstream_span_enable; + server 127.0.0.1:18080; + } + server { listen 127.0.0.1:18443 ssl; listen 127.0.0.1:18443 quic; @@ -93,6 +98,12 @@ add_header "X-Otel-Tracestate" $http_tracestate; return 204; } + + location /upstream_trace { + otel_trace on; + otel_trace_context inject; + proxy_pass http://uptest/ok; + } } } @@ -140,7 +151,7 @@ def get_http09(host, port, path): assert get_http09("127.0.0.1", 18080, "/ok") == "OK" - span = trace_service.get_span() + span = trace_service.get_span(1) assert span.name == "/ok" @@ -154,8 +165,7 @@ def test_default_attributes(client, trace_service, http_ver, path, status): if http_ver == "3.0": client.quic_cache_layer.add_domain("127.0.0.1", port) r = client.get(f"{scheme}://127.0.0.1:{port}{path}", verify=False) - - span = trace_service.get_span() + span = trace_service.get_span(1) assert span.name == path assert get_attr(span, "http.method") == "GET" @@ -178,7 +188,7 @@ def test_default_attributes(client, trace_service, http_ver, path, status): def test_custom_attributes(client, trace_service): assert client.get("http://127.0.0.1:18080/custom").status_code == 200 - span = trace_service.get_span() + span = trace_service.get_span(1) assert span.name == "custom_location" assert get_attr(span, "http.request.completion") == "OK" @@ -193,12 +203,22 @@ def test_trace_off(client, trace_service): time.sleep(0.01) # wait for spans assert len(trace_service.batches) == 0 +def test_upstream_tracing(client, trace_service): + assert client.get("http://127.0.0.1:18080/upstream_trace").status_code == 200 + spans = trace_service.get_span(3) + client_spans = [x for x in spans if x.kind.__str__() == '2'] + server_span = [x for x in spans if x.kind.__str__() == '3'] + assert len(client_spans) == 2 + assert len(server_span) == 1 + for i in client_spans: + assert get_attr(i, "http.status_code") == 200 + @pytest.mark.parametrize("parent", [None, parent_ctx]) def test_variables(client, trace_service, parent): r = client.get("http://127.0.0.1:18080/vars", headers=trace_headers(parent)) - span = trace_service.get_span() + span = trace_service.get_span(1) if parent: assert span.trace_id.hex() == parent.trace_id @@ -220,7 +240,7 @@ def test_context(client, trace_service, parent, path): r = client.get(f"http://127.0.0.1:18080{path}", headers=headers) - span = trace_service.get_span() + span = trace_service.get_span(1) if path in ["/extract", "/propagate"] and parent: assert span.trace_id.hex() == parent.trace_id @@ -308,7 +328,7 @@ def test_custom_resource_attributes(client, trace_service): def test_exporter_headers(client, trace_service): assert client.get("http://127.0.0.1:18080/ok").status_code == 200 - assert trace_service.get_span().name == "/ok" + assert trace_service.get_span(1).name == "/ok" headers = dict(trace_service.last_metadata) assert headers["x-api-token"] == "api.value" @@ -328,4 +348,4 @@ def test_exporter_headers(client, trace_service): def test_tls_export(client, trace_service): assert client.get("http://127.0.0.1:18080/ok").status_code == 200 - assert trace_service.get_span().name == "/ok" + assert trace_service.get_span(1).name == "/ok" diff --git a/tests/trace_service.py b/tests/trace_service.py index 5ef2bc67..f16a3f94 100644 --- a/tests/trace_service.py +++ b/tests/trace_service.py @@ -19,16 +19,20 @@ def get_batch(self): for _ in range(10): if len(self.batches): break - time.sleep(0.001) + time.sleep(1) assert len(self.batches) == 1 assert len(self.batches[0]) == 1 return self.batches.pop()[0] - def get_span(self): + def get_span(self, n): batch = self.get_batch() assert len(batch.scope_spans) == 1 - assert len(batch.scope_spans[0].spans) == 1 - return batch.scope_spans[0].spans.pop() + l = len(batch.scope_spans[0].spans) + assert l == n + s = batch.scope_spans[0].spans[l - n:] + for _ in range(n): + batch.scope_spans[0].spans.pop() + return s[0] if len(s) == 1 else s @pytest.fixture(scope="module")