Skip to content

Enable telemetry by default with DSN-controlled priority and full event correctness tests#349

Open
samikshya-db wants to merge 12 commits intomainfrom
telemetry-default-on-and-dsn-config
Open

Enable telemetry by default with DSN-controlled priority and full event correctness tests#349
samikshya-db wants to merge 12 commits intomainfrom
telemetry-default-on-and-dsn-config

Conversation

@samikshya-db
Copy link
Copy Markdown
Collaborator

@samikshya-db samikshya-db commented Apr 13, 2026

Summary

This PR extends the telemetry implementation across two areas.

1. DSN / config changes (original scope)

  • EnableTelemetry *bool tristate in telemetry.Config: nil = defer to server flag, &true = client opt-in, &false = client opt-out.
  • Two-level enable priority: DSN enableTelemetry=true → always on; otherwise use server feature flag.
  • Two new DSN params: telemetry_retry_count and telemetry_retry_delay.

2. Telemetry gap fixes (new in this PR)

Four correctness bugs found during end-to-end testing against a real warehouse:

EXECUTE_STATEMENT / CLOSE_STATEMENT silently lost on shutdown
Root cause: agg.cancel() fired while a worker was mid-HTTP-export.
Fix: added inFlight sync.WaitGroup; close() calls inFlight.Wait() before cancel().

total_chunks_present: null for paginated CloudFetch
Root cause: server reports 1 link per FetchResults call; grand total never in a single response.
Fix: pass r.chunkCount through closeCallback; connection.go sets chunk_total_present if the server never reported it.

operation_latency_ms: null for CLOSE_STATEMENT
Root cause: CloseOperation RPC completes in <1ms → rounds to 0; omitempty drops 0.
Fix: removed omitempty from OperationLatencyMs.

CloudFetch S3 timing fields not populated
Root cause: per-S3-file download time was not measured.
Fix: added onFileDownloaded func(downloadMs int64) callback to cloudIPCStreamIterator; connection.go aggregates initial/slowest/sum timings.

3. DSN parameters (full set)

Parameter Type Default Description
enableTelemetry bool unset Overrides server flag when set
telemetry_batch_size int 100 Events per batch
telemetry_flush_interval duration 5s Periodic flush interval
telemetry_retry_count int 3 Max retry attempts on export failure
telemetry_retry_delay duration 100ms Base delay between retries (exponential backoff)

Key files changed

  • telemetry/aggregator.goinFlight WaitGroup; 5-step close() ordering
  • telemetry/interceptor.goRecordOperation takes statementID so CLOSE_STATEMENT carries sql_statement_id
  • telemetry/request.go — removed omitempty from OperationLatencyMs
  • connection.gocloseCallback(latencyMs, chunkCount, err) + cloudFetchCallback wiring
  • internal/rows/rows.gocloseCallback passes r.chunkCount; cloudFetchCallback threaded through
  • internal/rows/arrowbased/batchloader.goonFileDownloaded callback per S3 file download

Test plan

  • go build ./... — clean compile
  • go test ./telemetry/... -count=1 — all pass
  • go test ./internal/rows/... -count=1 — all pass
  • go test ./... -short -count=1 — full suite passes

New correctness tests

telemetry/aggregator_test.go (new file, 5 tests):

  • WaitsForInFlightWorkerExportsclose() blocks until all HTTP exports finish, even if workers picked up jobs before the drain step ran
  • DrainsPendingQueueJobsBeforeCancel — jobs sitting in exportQueue are exported synchronously during drain
  • InFlightAddBeforeSendinFlight.Add(1) precedes the channel send so no job is invisible to Wait()
  • SafeToCallMultipleTimes — concurrent close() calls do not deadlock (sync.Once)
  • DropWhenQueueFull — drop path calls inFlight.Done() so Wait() is never permanently blocked

telemetry/integration_test.go (2 new tests):

  • OperationLatencyMs_ZeroNotOmitted — raw JSON contains "operation_latency_ms":0, not absent
  • ChunkTotalPresent_DerivedFromChunkCountchunk_total_present tag propagates to ChunkDetails

internal/rows/arrowbased/batchloader_test.go (2 new tests):

  • OnFileDownloaded callback invoked once per file with positive downloadMs
  • Nil callback is safe on non-telemetry paths (no panic)

internal/rows/rows_test.go (2 new tests):

  • CloseCallback_ReceivesChunkCount — callback gets correct total pages after multi-page iteration
  • CloseCallback_NilDoesNotPanic — nil closeCallback is safe on rows.Close()

This pull request was AI-assisted by Isaac.

…d full event correctness tests

- Add ClientExplicit flag to telemetry.Config so DSN-set enableTelemetry
  takes precedence over the server feature flag without a server round-trip
- New priority (highest → lowest): client DSN > server feature flag > disabled
- Add telemetry_retry_count and telemetry_retry_delay DSN params; expose
  TelemetryRetryCount/TelemetryRetryDelay in UserConfig and DeepCopy
- Update InitializeForConnection to accept retry params and set ClientExplicit
- Update config and integration tests to reflect new priority semantics
- Add TestIntegration_TelemetryEventCorrectnessAllFields: verifies every field
  of TelemetryRequest, TelemetryFrontendLog, SystemConfiguration, SQLOperation,
  ChunkDetails, OperationDetail, and ErrorInfo in the exported wire format

Co-authored-by: Isaac
…elemetry

nil = unset (server flag decides), &true = client opt-in, &false = client opt-out.
Eliminates the two-field smell; isTelemetryEnabled checks != nil to detect
whether the client made an explicit choice, avoiding the need for a companion flag.
Only two cases enable telemetry: client DSN=true (priority 1) or server flag
enabled with no client override (priority 2).

Co-authored-by: Isaac
config_test.go:
- Drop TestParseTelemetryConfig_EmptyParams (TestDefaultConfig covers it)
- Drop TestParseTelemetryConfig_EnableTelemetry_IsSet/IsNil (implied by EnabledTrue/EnabledFalse)
- Drop TestIsTelemetryEnabled_UserOptInServerDisabled (identical body to UserOptInServerEnabled)
- Drop TestIsTelemetryEnabled_ServerFlagOnly (identical body to DefaultChecksServerFlag)
- Rename UserOptInServerEnabled → ExplicitOptIn; Default/DefaultChecksServerFlag/DefaultServerDisabled → ServerEnabled/ServerDisabled

integration_test.go:
- Drop TestIntegration_OptInPriority_ExplicitOptOut (duplicate of config_test ExplicitOptOut)
- Drop TestIntegration_FieldMapping (fully subsumed by CorrectnessAllFields)

Co-authored-by: Isaac
…d, add CLOSE_STATEMENT

- Remove RecordOperation(EXECUTE_STATEMENT) from executeStatement(): it measured
  only the Thrift RPC submission time (~5s), not end-to-end query time. The
  correct end-to-end metric already flows through BeforeExecuteWithTime →
  AfterExecute → CompleteStatement.
- Add AddTag(operation_type=EXECUTE_STATEMENT) after BeforeExecuteWithTime in
  QueryContext and ExecContext so the aggregated statement metric carries the
  right operation_type, sql_statement_id, and accurate latency in one event.
- Add closeCallback func(latencyMs int64, err error) to rows.NewRows() and
  rows.Close() to emit CLOSE_STATEMENT telemetry when rows.Close() is called.

Co-authored-by: Isaac
Time each FetchResults call in fetchResultPage() and propagate the
latency through the telemetryUpdate callback. Aggregate initial,
slowest, and sum chunk fetch latencies in the QueryContext closure,
then map them to the ChunkDetails wire fields:
  - initial_chunk_latency_millis
  - slowest_chunk_latency_millis
  - sum_chunks_download_time_millis

Also populate total_chunks_present from server-reported data:
  - DirectResults with CloseOperation set → 1 (all data inline)
  - CloudFetch ResultLinks → len(links) from first response

The existing total_chunks_iterated (chunk_count tag) is unchanged.

Co-authored-by: Isaac
Previously AfterExecute/CompleteStatement fired when QueryContext()
returned — before the user ever called rows.Next(). This meant
chunk_count was always 1 and all per-chunk timing fields were null,
because fetchResultPage() calls happen during row iteration.

Fix:
- Add FinalizeLatency() to Interceptor: captures elapsed time at
  QueryContext return to preserve execute-only latency in mc.
- AfterExecute() uses the pre-captured latency if available, so the
  metric reports server-exec+poll time regardless of when it fires.
- Move AfterExecute/CompleteStatement from a defer in QueryContext to
  closeCallback, which rows.Close() invokes after all rows are read.
  At that point chunk_count and all timing tags are fully accumulated.
- Error path: still emits EXECUTE_STATEMENT immediately on runQuery
  failure (no rows means no chunks to wait for).

Co-authored-by: Isaac
flushSync() only flushes agg.batch, but flushUnlocked() (triggered by
terminal ops like CLOSE_STATEMENT) moves batch items into agg.exportQueue
for async worker processing.  When agg.close() called cancel() immediately
after flushSync(), those queue items were silently dropped because workers
exited before picking them up — causing EXECUTE_STATEMENT and
CLOSE_STATEMENT metrics to never reach the telemetry endpoint.

Fix: drain the exportQueue synchronously between flushSync and cancel,
processing any pending export jobs directly before workers stop.

Co-authored-by: Isaac
…, flush ordering

- Thread per-S3-file download timing from cloudFetchDownloadTask through
  cloudIPCStreamIterator via onFileDownloaded callback; aggregate into
  initial/slowest/sum chunk timing tags matching JDBC per-chunk HTTP GET model
- Zero FetchResults RPC latency for CloudFetch pages (URLs-only response) so
  Thrift round-trip is not misreported as data download time
- Add statementID param to RecordOperation so CLOSE_STATEMENT entries carry
  sql_statement_id in telemetry tables
- Fix EXECUTE_STATEMENT/CLOSE_STATEMENT loss on shutdown: add inFlight WaitGroup
  to metricsAggregator so close() waits for in-flight worker exports before
  calling cancel(), preventing metrics picked up by workers from being dropped
- Derive total_chunks_present from final chunkCount in closeCallback for
  paginated CloudFetch and inline results where server does not report total
- Remove omitempty from OperationLatencyMs so CLOSE_STATEMENT reports 0ms
  (instant server-side close) rather than null

Co-authored-by: samikshya-chand_data
Four targeted test suites covering the bug fixes from the previous commit:

aggregator_test.go (new):
- WaitsForInFlightWorkerExports: close() blocks until all HTTP exports complete
- DrainsPendingQueueJobsBeforeCancel: drain step exports pending queue jobs
- InFlightAddBeforeSend: inFlight.Add happens before queue send (no missed jobs)
- SafeToCallMultipleTimes: concurrent close() calls don't deadlock (sync.Once)
- DropWhenQueueFull: drop path calls inFlight.Done() so Wait() is never stuck

integration_test.go:
- OperationLatencyMs_ZeroNotOmitted: 0ms latency serialises as 0 not null (omitempty fix)
- ChunkTotalPresent_DerivedFromChunkCount: chunk_total_present tag propagates to payload

batchloader_test.go:
- OnFileDownloaded callback is called once per file with positive downloadMs
- Nil callback does not panic (non-telemetry paths)

rows_test.go:
- CloseCallback receives correct chunkCount after multi-page iteration
- Nil closeCallback does not panic

Co-authored-by: Isaac
The previous fix (checking chunkTotalPresent == 0 in closeCallback) only
worked for inline ArrowBatch results. For paginated CloudFetch (1 result
link per FetchResults call), telemetryUpdate set chunkTotalPresent = 1
on the first page, causing the closeCallback gate to never fire. The
final chunk_total_present would be 1 instead of the actual page count.

Fix: track actual S3 file downloads via cloudFetchFileCount (incremented
in cloudFetchCallback per file). closeCallback now sets chunk_total_present
from cloudFetchFileCount when CloudFetch was used, or from chunkCount for
inline ArrowBatch results. This correctly handles all three cases:
  - Paginated CloudFetch (1 link/page): fileCount == pageCount == correct
  - Bulk CloudFetch (all links in DirectResults): fileCount == S3 downloads
  - Inline ArrowBatch: fileCount == 0, falls back to chunkCount

Also removes the now-incorrect early chunk_total_present setting from
telemetryUpdate (len(ResultLinks) per response is not the grand total for
paginated CloudFetch) and fixes the misleading comment in rows.go.

Co-authored-by: Isaac
- Remove unused slowExporter type and method from aggregator_test.go
  (flagged by unused linter in CI with golangci-lint v1.51)
- Apply gofmt -s to telemetry/aggregator.go and telemetry/integration_test.go

Co-authored-by: Isaac
default:
goto drained
}
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[F1] inFlight.Wait() has no timeout — can block conn.Close() indefinitely (Severity: High)

agg.inFlight.Wait() is context-unaware. If a telemetry export hangs (slow server, no HTTP client timeout), close() blocks forever → conn.Close() blocks forever → sql.DB.Close() can hang.

The parent telemetryClient.close() creates a 5s timeout context, but sync.WaitGroup.Wait() doesn't respect context cancellation. The per-connection Interceptor.Close()flushSync() path has no timeout at all.

Suggested fix: Replace bare Wait() with a select on a channel + context deadline:

waitCh := make(chan struct{})
go func() { agg.inFlight.Wait(); close(waitCh) }()
select {
case <-waitCh:
case <-ctx.Done():
    logger.Debug().Msg("telemetry: close timed out waiting for in-flight exports")
}

Also pass a timeout context from Interceptor.Close().

(Flagged by 3 reviewers: ops, performance, security)


Code Review Squad — feedback

if retryCount > 0 {
cfg.MaxRetries = retryCount
}
if retryDelay > 0 {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[F2] retryCount=0 silently ignored — user cannot disable retries via DSN (Severity: High)

DSN parsing at config.go:256 accepts telemetry_retry_count=0 (retryCount >= 0). The test TestParseTelemetryConfig_RetryCountZero asserts MaxRetries=0 means "disable retries." But here, the guard retryCount > 0 silently discards 0, defaulting to 3 retries.

A user setting telemetry_retry_count=0 in their DSN expects no retries but gets 3.

Suggested fix: Change to if retryCount >= 0 to match the DSN parser and test intent. Or use a sentinel (-1) for "not set."

(Flagged by 4 reviewers: agent-compat, language, architecture, devils-advocate)


Code Review Squad — feedback

@@ -85,6 +86,7 @@ func (agg *metricsAggregator) exportWorker() {
return
}
agg.exporter.export(job.ctx, job.metrics)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[F3] inFlight.Done() not deferred — panic in export() causes permanent deadlock (Severity: High)

If exporter.export() panics (edge case in JSON marshal or HTTP transport), inFlight.Done() is never called, and inFlight.Wait() in close() blocks forever. While export() has internal panic recovery, defense-in-depth suggests protecting the WaitGroup balance.

Suggested fix:

case job, ok := <-agg.exportQueue:
    if !ok { return }
    func() {
        defer agg.inFlight.Done()
        agg.exporter.export(job.ctx, job.metrics)
    }()

(Flagged by: devils-advocate)


Code Review Squad — feedback

cloudFetchCallback func(downloadMs int64),
) (driver.Rows, dbsqlerr.DBError) {

connId := driverctx.ConnIdFromContext(ctx)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[F4] Callback parameter proliferation: 8-param NewRows, 5-param telemetryUpdate (Severity: Medium)

Three function-typed params with anonymous signatures, two nullable. All non-telemetry callers pass nil, nil, nil. The telemetryUpdate callback has 5 positional numeric params where 0 is a sentinel value.

Suggested fix: Bundle into a struct:

type RowsTelemetryCallbacks struct {
    OnChunkFetched   func(data ChunkFetchEvent)
    OnClose          func(latencyMs int64, chunkCount int, err error)
    OnCloudFetchFile func(downloadMs int64)
}

Pass *RowsTelemetryCallbacks (nil = no telemetry) instead of three separate function params.

(Flagged by 4 reviewers: agent-compat, language, maintainability, architecture)


Code Review Squad — feedback

c.telemetry.FinalizeLatency(ctx)
}

// Per-chunk timing state accumulated across all fetchResultPage calls.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[F8] Shared mutable closure state needs single-goroutine invariant documented (Severity: Medium)

The chunkTimingInitialMs, chunkTimingSlowestMs, chunkTimingSumMs, chunkTimingInitialSet, and cloudFetchFileCount variables are captured by both telemetryUpdate and cloudFetchCallback closures without synchronization.

After verification: both callbacks are invoked from the same goroutine (the caller of rows.Next()) — telemetryUpdate from fetchResultPage and cloudFetchCallback from cloudIPCStreamIterator.Next(). No data race in current code. However, this invariant is undocumented and any future refactoring that adds concurrency would silently introduce a race.

Suggested fix: Add a comment near the variable declarations:

// These variables are safe without a mutex because they are only mutated
// from callbacks invoked sequentially by the single goroutine that calls
// rows.Next() (telemetryUpdate from fetchResultPage, cloudFetchCallback
// from cloudIPCStreamIterator.Next).

(Flagged by 8 reviewers: all — consensus that it's safe today but fragile)


Code Review Squad — feedback

@vikrantpuppala
Copy link
Copy Markdown
Collaborator

Code Review Squad Report

Merge Safety Score: 62/100 — HIGH RISK

9 specialized reviewers analyzed this PR in parallel. Below is the aggregated report.

Executive Summary

The telemetry correctness fixes (inFlight WaitGroup, FinalizeLatency, omitempty, CloudFetch timing) are well-motivated and thoroughly tested. Two bugs and a deadlock risk need addressing before merge.

Must Address (High)

ID Finding Reviewers
F1 inFlight.Wait() has no timeout — can block conn.Close() indefinitely ops, performance, security
F2 retryCount=0 silently ignored via DSN — user cannot disable retries agent-compat, language, architecture, devils-advocate
F3 inFlight.Done() not deferred in exportWorker — panic causes deadlock devils-advocate

Should Address (Medium)

ID Finding Reviewers
F4 Callback parameter proliferation (8-param NewRows, 5-param callback) agent-compat, language, maintainability, architecture
F5 Dual tristate: *bool vs ConfigValue[bool] for EnableTelemetry language, maintainability, architecture
F6 Duplicated chunk timing accumulation in telemetryUpdate/cloudFetchCallback maintainability, performance
F7 OperationLatencyMs omitempty removal affects all metric types (0 vs "not measured") devils-advocate
F8 Shared mutable closure state needs thread-safety invariant documented 8/9 reviewers

Consider (Low)

ID Finding
F9 telemetry_retry_delay parse errors silently swallowed (inconsistent with retry_count)
F10 Tag constants defined in tags.go but raw strings used in connection.go and request.go
F11 closeCallback always passes nil/false to AfterExecute/CompleteStatement
F12 goto drained style — use labeled break instead

What's Good

  • Telemetry failure isolation is well-designed — all paths have panic recovery, errors swallowed, no query impact
  • inFlight WaitGroup shutdown fix correctly prevents lost EXECUTE_STATEMENT/CLOSE_STATEMENT on close
  • FinalizeLatency cleanly separates execute-phase timing from row-iteration timing
  • *bool tristate correctly models nil/true/false semantics and server flag fallback
  • Test coverage is thorough: aggregator close ordering, callback nil-safety, chunk propagation, event correctness

Score Breakdown

Category Count Deductions
High 3 -30
Medium 5 -15
Low 4 -4
Total 62/100

Code Review Squadfeedback

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants