diff --git a/docs/generated/eventlog.md b/docs/generated/eventlog.md index 80dfd84e2da9..72279cca7ba6 100644 --- a/docs/generated/eventlog.md +++ b/docs/generated/eventlog.md @@ -3364,6 +3364,11 @@ Fields in this struct should be updated in sync with apps_stats.proto. An event of type `sampled_query` is the SQL query event logged to the telemetry channel. It contains common SQL event/execution details. +Note: in version 26.1, these events will be moved to the `SQL_EXEC` channel. +To test compatability before this, set the cluster setting +`log.channel_compatibility_mode.enabled` to false. This will send the +events to `SQL_EXEC` instead of `TELEMETRY`. + | Field | Description | Sensitive | |--|--|--| @@ -3472,6 +3477,11 @@ contains common SQL event/execution details. An event of type `sampled_transaction` is the event logged to telemetry at the end of transaction execution. +Note: in version 26.1, these events will be moved to the `SQL_EXEC` channel. +To test compatability before this, set the cluster setting +`log.channel_compatibility_mode.enabled` to false. This will send the +events to `SQL_EXEC` instead of `TELEMETRY`. + | Field | Description | Sensitive | |--|--|--| diff --git a/pkg/ccl/telemetryccl/BUILD.bazel b/pkg/ccl/telemetryccl/BUILD.bazel index c1c160d00b42..8052f9250f52 100644 --- a/pkg/ccl/telemetryccl/BUILD.bazel +++ b/pkg/ccl/telemetryccl/BUILD.bazel @@ -20,6 +20,7 @@ go_test( "//pkg/security/securityassets", "//pkg/security/securitytest", "//pkg/server", + "//pkg/settings", "//pkg/sql", "//pkg/sql/sem/tree", "//pkg/sql/sqltestutils", diff --git a/pkg/ccl/telemetryccl/telemetry_logging_test.go b/pkg/ccl/telemetryccl/telemetry_logging_test.go index 52568a46f997..9a2c09a5162b 100644 --- a/pkg/ccl/telemetryccl/telemetry_logging_test.go +++ b/pkg/ccl/telemetryccl/telemetry_logging_test.go @@ -13,6 +13,7 @@ import ( "net/http" "net/http/httptest" "regexp" + "slices" "strings" "testing" @@ -21,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cloud/nodelocal" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -155,20 +157,29 @@ type expectedSampleQueryEvent struct { } type telemetrySpy struct { - t *testing.T + t *testing.T + sv *settings.Values sampledQueries []eventpb.SampledQuery sampledQueriesRaw []logpb.Entry recoveryEvents []eventpb.RecoveryEvent } +func (l *telemetrySpy) channelsToIntercept() []log.Channel { + if log.ShouldMigrateEvent(l.sv) { + return []log.Channel{logpb.Channel_TELEMETRY, logpb.Channel_SQL_EXEC} + } + + return []log.Channel{logpb.Channel_TELEMETRY} +} + func (l *telemetrySpy) Intercept(entry []byte) { var rawLog logpb.Entry if err := json.Unmarshal(entry, &rawLog); err != nil { l.t.Errorf("failed unmarshaling %s: %s", entry, err) } - if rawLog.Channel != logpb.Channel_TELEMETRY { + if !slices.Contains(l.channelsToIntercept(), rawLog.Channel) { return } @@ -204,12 +215,6 @@ func TestBulkJobTelemetryLogging(t *testing.T) { ctx := context.Background() - spy := &telemetrySpy{ - t: t, - } - cleanup := log.InterceptWith(ctx, spy) - defer cleanup() - st := logtestutils.StubTime{} sqm := logtestutils.StubQueryStats{} sts := logtestutils.StubTracingStatus{} @@ -229,6 +234,15 @@ func TestBulkJobTelemetryLogging(t *testing.T) { ExternalIODir: dir, }, }) + + spy := &telemetrySpy{ + t: t, + sv: &testCluster.Server(0).ClusterSettings().SV, + } + + cleanup := log.InterceptWith(ctx, spy) + defer cleanup() + sqlDB := testCluster.ServerConn(0) defer func() { testCluster.Stopper().Stop(context.Background()) diff --git a/pkg/sql/exec_log.go b/pkg/sql/exec_log.go index 31394da2d8d7..33f6924bb34b 100644 --- a/pkg/sql/exec_log.go +++ b/pkg/sql/exec_log.go @@ -358,6 +358,7 @@ func (p *planner) maybeLogStatementInternal( *sampledQuery = eventpb.SampledQuery{ CommonSQLExecDetails: execDetails, + CommonSQLEventDetails: p.getCommonSQLEventDetails(), SkippedQueries: skippedQueries, CostEstimate: p.curPlan.instrumentation.costEstimate, Distribution: p.curPlan.instrumentation.distribution.String(), @@ -436,7 +437,11 @@ func (p *planner) maybeLogStatementInternal( SchemaChangerMode: p.curPlan.instrumentation.schemaChangerMode.String(), } - p.logEventsOnlyExternally(ctx, sampledQuery) + migrator := log.NewStructuredEventMigrator(func() bool { + return log.ShouldMigrateEvent(p.ExecCfg().SV()) + }, logpb.Channel_SQL_EXEC) + + migrator.StructuredEvent(ctx, severity.INFO, sampledQuery) } } @@ -521,7 +526,11 @@ func (p *planner) logTransaction( } } - log.StructuredEvent(ctx, severity.INFO, sampledTxn) + migrator := log.NewStructuredEventMigrator(func() bool { + return log.ShouldMigrateEvent(p.ExecCfg().SV()) + }, logpb.Channel_SQL_EXEC) + + migrator.StructuredEvent(ctx, severity.INFO, sampledTxn) } func (p *planner) logEventsOnlyExternally(ctx context.Context, entries ...logpb.EventPayload) { diff --git a/pkg/sql/telemetry_datadriven_test.go b/pkg/sql/telemetry_datadriven_test.go index 7ebe9b8afb93..1a1836b85d8f 100644 --- a/pkg/sql/telemetry_datadriven_test.go +++ b/pkg/sql/telemetry_datadriven_test.go @@ -60,13 +60,12 @@ func TestTelemetryLoggingDataDriven(t *testing.T) { sc := log.Scope(t) defer sc.Close(t) - appName := "telemetry-logging-datadriven" ignoredAppname := "telemetry-datadriven-ignored-appname" ctx := context.Background() stmtSpy := logtestutils.NewStructuredLogSpy( t, - []logpb.Channel{logpb.Channel_TELEMETRY}, + []logpb.Channel{logpb.Channel_TELEMETRY, logpb.Channel_SQL_EXEC}, []string{"sampled_query"}, logtestutils.FormatEntryAsJSON, func(_ logpb.Entry, logStr string) bool { @@ -79,7 +78,7 @@ func TestTelemetryLoggingDataDriven(t *testing.T) { txnsSpy := logtestutils.NewStructuredLogSpy( t, - []logpb.Channel{logpb.Channel_TELEMETRY}, + []logpb.Channel{logpb.Channel_TELEMETRY, logpb.Channel_SQL_EXEC}, []string{"sampled_transaction"}, logtestutils.FormatEntryAsJSON, func(_ logpb.Entry, logStr string) bool { @@ -209,13 +208,13 @@ func TestTelemetryLoggingDataDriven(t *testing.T) { } newStmtLogCount := stmtSpy.Count() - sb.WriteString(strings.Join(stmtSpy.GetLastNLogs(logpb.Channel_TELEMETRY, newStmtLogCount-stmtLogCount), "\n")) + sb.WriteString(strings.Join(stmtSpy.GetLastNLogs(getSampleQueryLoggingChannel(&s.ClusterSettings().SV), newStmtLogCount-stmtLogCount), "\n")) if newStmtLogCount > stmtLogCount { sb.WriteString("\n") } newTxnLogCount := txnsSpy.Count() - sb.WriteString(strings.Join(txnsSpy.GetLastNLogs(logpb.Channel_TELEMETRY, newTxnLogCount-txnLogCount), "\n")) + sb.WriteString(strings.Join(txnsSpy.GetLastNLogs(getSampleQueryLoggingChannel(&s.ClusterSettings().SV), newTxnLogCount-txnLogCount), "\n")) return sb.String() case "reset-last-sampled": telemetryLogging.resetLastSampledTime() diff --git a/pkg/sql/telemetry_logging_test.go b/pkg/sql/telemetry_logging_test.go index de6fcf4044ab..2b7ac3aa1734 100644 --- a/pkg/sql/telemetry_logging_test.go +++ b/pkg/sql/telemetry_logging_test.go @@ -9,13 +9,13 @@ import ( "context" "encoding/json" "fmt" - "math" "regexp" "strings" "testing" "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/appstatspb" "github.com/cockroachdb/cockroach/pkg/sql/execstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/sslocal" @@ -40,9 +40,25 @@ func TestTelemetryLogging(t *testing.T) { sc := log.ScopeWithoutShowLogs(t) defer sc.Close(t) - cleanup := logtestutils.InstallLogFileSink(sc, t, logpb.Channel_TELEMETRY) + ctx := context.Background() + stmtSpy := logtestutils.NewStructuredLogSpy( + t, + []logpb.Channel{logpb.Channel_TELEMETRY, logpb.Channel_SQL_EXEC}, + []string{"sampled_query"}, + logtestutils.AsLogEntry, + ) + cleanup := log.InterceptWith(ctx, stmtSpy) defer cleanup() + txnSpy := logtestutils.NewStructuredLogSpy( + t, + []logpb.Channel{logpb.Channel_TELEMETRY, logpb.Channel_SQL_EXEC}, + []string{"sampled_transaction"}, + logtestutils.AsLogEntry, + ) + txnCleanup := log.InterceptWith(ctx, txnSpy) + defer txnCleanup() + st := logtestutils.StubTime{} sqm := logtestutils.StubQueryStats{} sts := logtestutils.StubTracingStatus{} @@ -432,27 +448,10 @@ func TestTelemetryLogging(t *testing.T) { // We should not see any transaction events in statement // telemetry mode. - txnEntries, err := log.FetchEntriesFromFiles( - 0, - math.MaxInt64, - 10000, - regexp.MustCompile(`"EventType":"sampled_transaction"`), - log.WithMarkedSensitiveData, - ) - require.NoError(t, err) + txnEntries := txnSpy.GetLastNLogs(getSampleQueryLoggingChannel(&s.ClusterSettings().SV), txnSpy.Count()) require.Emptyf(t, txnEntries, "found unexpected transaction telemetry events: %v", txnEntries) - entries, err := log.FetchEntriesFromFiles( - 0, - math.MaxInt64, - 10000, - regexp.MustCompile(`"EventType":"sampled_query"`), - log.WithMarkedSensitiveData, - ) - - if err != nil { - t.Fatal(err) - } + entries := stmtSpy.GetLastNLogs(getSampleQueryLoggingChannel(&s.ClusterSettings().SV), stmtSpy.Count()) if len(entries) == 0 { t.Fatal(errors.Newf("no entries found")) @@ -471,9 +470,7 @@ func TestTelemetryLogging(t *testing.T) { }) logCount := 0 expectedLogCount := len(tc.expectedSkipped) - // NB: FetchEntriesFromFiles delivers entries in reverse order. - for i := len(entries) - 1; i >= 0; i-- { - e := entries[i] + for _, e := range entries { if strings.Contains(e.Message, tc.expectedLogStatement+"\"") { if logCount == expectedLogCount { @@ -482,7 +479,7 @@ func TestTelemetryLogging(t *testing.T) { } var sampledQueryFromLog eventpb.SampledQuery - err = json.Unmarshal([]byte(e.Message), &sampledQueryFromLog) + err := json.Unmarshal([]byte(e.Message), &sampledQueryFromLog) require.NoError(t, err) require.Equal(t, tc.expectedSkipped[logCount], sampledQueryFromLog.SkippedQueries, "%v", e.Message) @@ -707,8 +704,14 @@ func TestTelemetryLoggingInternalEnabled(t *testing.T) { defer leaktest.AfterTest(t)() sc := log.ScopeWithoutShowLogs(t) defer sc.Close(t) - - cleanup := logtestutils.InstallLogFileSink(sc, t, logpb.Channel_TELEMETRY) + ctx := context.Background() + stmtSpy := logtestutils.NewStructuredLogSpy( + t, + []logpb.Channel{logpb.Channel_TELEMETRY, logpb.Channel_SQL_EXEC}, + []string{"sampled_query"}, + logtestutils.AsLogEntry, + ) + cleanup := log.InterceptWith(ctx, stmtSpy) defer cleanup() st := logtestutils.StubTime{} @@ -748,19 +751,7 @@ func TestTelemetryLoggingInternalEnabled(t *testing.T) { `TRUNCATE TABLE system.public.transaction_statistics`, } - log.FlushFiles() - - entries, err := log.FetchEntriesFromFiles( - 0, - math.MaxInt64, - 10000, - regexp.MustCompile(`"EventType":"sampled_query"`), - log.WithMarkedSensitiveData, - ) - - if err != nil { - t.Fatal(err) - } + entries := stmtSpy.GetLogs(getSampleQueryLoggingChannel(&s.ClusterSettings().SV)) if len(entries) == 0 { t.Fatal(errors.Newf("no entries found")) @@ -797,7 +788,14 @@ func TestTelemetryLoggingInternalConsoleEnabled(t *testing.T) { sc := log.ScopeWithoutShowLogs(t) defer sc.Close(t) - cleanup := logtestutils.InstallLogFileSink(sc, t, logpb.Channel_TELEMETRY) + ctx := context.Background() + stmtSpy := logtestutils.NewStructuredLogSpy( + t, + []logpb.Channel{logpb.Channel_TELEMETRY, logpb.Channel_SQL_EXEC}, + []string{"sampled_query"}, + logtestutils.AsLogEntry, + ) + cleanup := log.InterceptWith(ctx, stmtSpy) defer cleanup() st := logtestutils.StubTime{} @@ -862,16 +860,7 @@ func TestTelemetryLoggingInternalConsoleEnabled(t *testing.T) { db.Exec(t, query) log.FlushFiles() - entries, err := log.FetchEntriesFromFiles( - 0, - math.MaxInt64, - 10000, - regexp.MustCompile(`"EventType":"sampled_query"`), - log.WithMarkedSensitiveData, - ) - if err != nil { - t.Fatal(err) - } + entries := stmtSpy.GetUnreadLogs(getSampleQueryLoggingChannel(&s.ClusterSettings().SV)) if len(entries) == 0 { t.Fatal(errors.Newf("no entries found")) } @@ -895,7 +884,14 @@ func TestNoTelemetryLogOnTroubleshootMode(t *testing.T) { sc := log.ScopeWithoutShowLogs(t) defer sc.Close(t) - cleanup := logtestutils.InstallLogFileSink(sc, t, logpb.Channel_TELEMETRY) + ctx := context.Background() + stmtSpy := logtestutils.NewStructuredLogSpy( + t, + []logpb.Channel{logpb.Channel_TELEMETRY, logpb.Channel_SQL_EXEC}, + []string{"sampled_query"}, + logtestutils.AsLogEntry, + ) + cleanup := log.InterceptWith(ctx, stmtSpy) defer cleanup() st := logtestutils.StubTime{} @@ -963,19 +959,7 @@ func TestNoTelemetryLogOnTroubleshootMode(t *testing.T) { db.Exec(t, tc.query) } - log.FlushFiles() - - entries, err := log.FetchEntriesFromFiles( - 0, - math.MaxInt64, - 10000, - regexp.MustCompile(`"EventType":"sampled_query"`), - log.WithMarkedSensitiveData, - ) - - if err != nil { - t.Fatal(err) - } + entries := stmtSpy.GetUnreadLogs(getSampleQueryLoggingChannel(&s.ClusterSettings().SV)) if len(entries) == 0 { t.Fatal(errors.Newf("no entries found")) @@ -1004,7 +988,14 @@ func TestTelemetryLogJoinTypesAndAlgorithms(t *testing.T) { sc := log.ScopeWithoutShowLogs(t) defer sc.Close(t) - cleanup := logtestutils.InstallLogFileSink(sc, t, logpb.Channel_TELEMETRY) + ctx := context.Background() + stmtSpy := logtestutils.NewStructuredLogSpy( + t, + []logpb.Channel{logpb.Channel_TELEMETRY, logpb.Channel_SQL_EXEC}, + []string{"sampled_query"}, + logtestutils.AsLogEntry, + ) + cleanup := log.InterceptWith(ctx, stmtSpy) defer cleanup() s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{}) @@ -1167,19 +1158,7 @@ func TestTelemetryLogJoinTypesAndAlgorithms(t *testing.T) { db.Exec(t, tc.query) } - log.FlushFiles() - - entries, err := log.FetchEntriesFromFiles( - 0, - math.MaxInt64, - 10000, - regexp.MustCompile(`"EventType":"sampled_query"`), - log.WithMarkedSensitiveData, - ) - - if err != nil { - t.Fatal(err) - } + entries := stmtSpy.GetUnreadLogs(getSampleQueryLoggingChannel(&s.ClusterSettings().SV)) if len(entries) == 0 { t.Fatal(errors.Newf("no entries found")) @@ -1243,7 +1222,14 @@ func TestTelemetryScanCounts(t *testing.T) { sc := log.ScopeWithoutShowLogs(t) defer sc.Close(t) - cleanup := logtestutils.InstallLogFileSink(sc, t, logpb.Channel_TELEMETRY) + ctx := context.Background() + stmtSpy := logtestutils.NewStructuredLogSpy( + t, + []logpb.Channel{logpb.Channel_TELEMETRY, logpb.Channel_SQL_EXEC}, + []string{"sampled_query"}, + logtestutils.AsLogEntry, + ) + cleanup := log.InterceptWith(ctx, stmtSpy) defer cleanup() s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{}) @@ -1422,19 +1408,7 @@ func TestTelemetryScanCounts(t *testing.T) { db.Exec(t, tc.query) } - log.FlushFiles() - - entries, err := log.FetchEntriesFromFiles( - 0, - math.MaxInt64, - 10000, - regexp.MustCompile(`"EventType":"sampled_query"`), - log.WithMarkedSensitiveData, - ) - - if err != nil { - t.Fatal(err) - } + entries := stmtSpy.GetUnreadLogs(getSampleQueryLoggingChannel(&s.ClusterSettings().SV)) if len(entries) == 0 { t.Fatal(errors.Newf("no entries found")) @@ -1513,7 +1487,14 @@ func TestFunctionBodyRedacted(t *testing.T) { sc := log.ScopeWithoutShowLogs(t) defer sc.Close(t) - cleanup := logtestutils.InstallLogFileSink(sc, t, logpb.Channel_TELEMETRY) + ctx := context.Background() + stmtSpy := logtestutils.NewStructuredLogSpy( + t, + []logpb.Channel{logpb.Channel_TELEMETRY, logpb.Channel_SQL_EXEC}, + []string{"sampled_query"}, + logtestutils.AsLogEntry, + ) + cleanup := log.InterceptWith(ctx, stmtSpy) defer cleanup() s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{}) @@ -1536,19 +1517,7 @@ $$` db.Exec(t, stmt) - log.FlushFiles() - - entries, err := log.FetchEntriesFromFiles( - 0, - math.MaxInt64, - 10000, - regexp.MustCompile(`"EventType":"sampled_query"`), - log.WithMarkedSensitiveData, - ) - - if err != nil { - t.Fatal(err) - } + entries := stmtSpy.GetLogs(getSampleQueryLoggingChannel(&s.ClusterSettings().SV)) if len(entries) == 0 { t.Fatal(errors.Newf("no entries found")) @@ -1574,7 +1543,14 @@ func TestTelemetryLoggingStmtPosInTxn(t *testing.T) { sc := log.ScopeWithoutShowLogs(t) defer sc.Close(t) - cleanup := logtestutils.InstallLogFileSink(sc, t, logpb.Channel_TELEMETRY) + ctx := context.Background() + stmtSpy := logtestutils.NewStructuredLogSpy( + t, + []logpb.Channel{logpb.Channel_TELEMETRY, logpb.Channel_SQL_EXEC}, + []string{"sampled_query"}, + logtestutils.AsLogEntry, + ) + cleanup := log.InterceptWith(ctx, stmtSpy) defer cleanup() st := logtestutils.StubTime{} @@ -1617,19 +1593,7 @@ func TestTelemetryLoggingStmtPosInTxn(t *testing.T) { `BEGIN`, `SELECT ‹1›`, `SELECT ‹2›`, `SELECT ‹3›`, `COMMIT`, } - log.FlushFiles() - - entries, err := log.FetchEntriesFromFiles( - 0, - math.MaxInt64, - 10000, - regexp.MustCompile(`"EventType":"sampled_query"`), - log.WithMarkedSensitiveData, - ) - - if err != nil { - t.Fatal(err) - } + entries := stmtSpy.GetLogs(getSampleQueryLoggingChannel(&s.ClusterSettings().SV)) require.NotEmpty(t, entries) var expectedTxnID string @@ -1663,3 +1627,10 @@ func TestTelemetryLoggingStmtPosInTxn(t *testing.T) { } } } + +func getSampleQueryLoggingChannel(sv *settings.Values) logpb.Channel { + if log.ShouldMigrateEvent(sv) { + return logpb.Channel_SQL_EXEC + } + return logpb.Channel_TELEMETRY +} diff --git a/pkg/util/log/eventpb/telemetry.proto b/pkg/util/log/eventpb/telemetry.proto index 2346d75b287b..2a39a1900bd1 100644 --- a/pkg/util/log/eventpb/telemetry.proto +++ b/pkg/util/log/eventpb/telemetry.proto @@ -22,9 +22,19 @@ import "util/log/logpb/event.proto"; // The comment at the top has a specific format for the doc generator. // *Really look at doc.go before modifying this file.* +// TODO (#151948): Move this event definition to +// `pkg/util/log/eventpb/sql_audit_events.proto` to be in the +// `SQL Execution Log` channel once the `log.channel_compatibility_mode.enabled` +// cluster setting is set to false by default and cluster setting is set for +// removal. // SampledQuery is the SQL query event logged to the telemetry channel. It // contains common SQL event/execution details. +// +// Note: in version 26.1, these events will be moved to the `SQL_EXEC` channel. +// To test compatability before this, set the cluster setting +// `log.channel_compatibility_mode.enabled` to false. This will send the +// events to `SQL_EXEC` instead of `TELEMETRY`. message SampledQuery { CommonEventDetails common = 1 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "", (gogoproto.embed) = true]; CommonSQLEventDetails sql = 2 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "", (gogoproto.embed) = true]; @@ -406,7 +416,18 @@ message MVCCIteratorStats { int64 range_key_skipped_points = 13 [(gogoproto.jsontag) = ",includeempty"]; } +// TODO (#151948): Move this event definition to +// `pkg/util/log/eventpb/sql_audit_events.proto` to be in the +// `SQL Execution Log` channel once the `log.channel_compatibility_mode.enabled` +// cluster setting is set to false by default and cluster setting is set for +// removal. + // SampledTransaction is the event logged to telemetry at the end of transaction execution. +// +// Note: in version 26.1, these events will be moved to the `SQL_EXEC` channel. +// To test compatability before this, set the cluster setting +// `log.channel_compatibility_mode.enabled` to false. This will send the +// events to `SQL_EXEC` instead of `TELEMETRY`. message SampledTransaction { // Common contains common event details shared by all log events. CommonEventDetails common = 1 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "", (gogoproto.embed) = true]; diff --git a/pkg/util/log/logtestutils/structured_log_spy.go b/pkg/util/log/logtestutils/structured_log_spy.go index 9cd56d643be7..6e55fb9cd306 100644 --- a/pkg/util/log/logtestutils/structured_log_spy.go +++ b/pkg/util/log/logtestutils/structured_log_spy.go @@ -110,6 +110,11 @@ func FromLogEntry[T any](entry logpb.Entry) (T, error) { return payload, err } +func AsLogEntry(entry logpb.Entry) (logpb.Entry, error) { + entry.Message = entry.Message[entry.StructuredStart:entry.StructuredEnd] + return entry, nil +} + // StructuredLogSpy is a test utility that intercepts structured log entries // and stores them in memory. It can be used to verify the contents of log // entries in tests.