-
Notifications
You must be signed in to change notification settings - Fork 358
[Exporter.Geneva] implement resource attributes for logs #3531
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,7 @@ | |
| using OpenTelemetry.Exporter.Geneva.Transports; | ||
| using OpenTelemetry.Internal; | ||
| using OpenTelemetry.Logs; | ||
| using OpenTelemetry.Resources; | ||
|
|
||
| namespace OpenTelemetry.Exporter.Geneva.MsgPack; | ||
|
|
||
|
|
@@ -33,26 +34,33 @@ internal sealed class MsgPackLogExporter : MsgPackExporter, IDisposable | |
|
|
||
| #if NET | ||
| private readonly FrozenSet<string>? customFields; | ||
| private readonly FrozenDictionary<string, object>? prepopulatedFields; | ||
| #else | ||
| private readonly HashSet<string>? customFields; | ||
| private readonly Dictionary<string, object>? prepopulatedFields; | ||
| #endif | ||
|
|
||
| private readonly ExceptionStackExportMode exportExceptionStack; | ||
| private readonly List<string>? prepopulatedFieldKeys; | ||
| private readonly byte[] bufferEpilogue; | ||
| private readonly IDataTransport dataTransport; | ||
| private readonly Func<Resource> resourceProvider; | ||
|
|
||
| // These are values that are always added to the body as dedicated fields | ||
| private readonly Dictionary<string, object> prepopulatedFields; | ||
|
|
||
| // These are values that are always added to env_properties | ||
| private readonly Dictionary<string, object> propertiesEntries; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FrozenDictionary is optimized for read-heavy scenarios with zero-allocation lookups and perfect hashing. Switching to regular Dictionary adds a lookup overhead |
||
| private readonly int stringFieldSizeLimitCharCount; // the maximum string size limit for MsgPack strings | ||
|
|
||
| // This is used for Scopes | ||
| private readonly ThreadLocal<SerializationDataForScopes> serializationData = new(); | ||
|
|
||
| private bool isDisposed; | ||
|
|
||
| public MsgPackLogExporter(GenevaExporterOptions options) | ||
| public MsgPackLogExporter(GenevaExporterOptions options, Func<Resource> resourceProvider) | ||
| { | ||
| Guard.ThrowIfNull(options); | ||
| Guard.ThrowIfNull(resourceProvider); | ||
|
|
||
| this.resourceProvider = resourceProvider; | ||
|
|
||
| this.tableNameSerializer = new(options, defaultTableName: "Log"); | ||
| this.exportExceptionStack = options.ExceptionStackExportMode; | ||
|
|
@@ -88,21 +96,17 @@ public MsgPackLogExporter(GenevaExporterOptions options) | |
| } | ||
|
|
||
| this.stringFieldSizeLimitCharCount = connectionStringBuilder.PrivatePreviewLogMessagePackStringSizeLimit; | ||
|
|
||
| this.propertiesEntries = []; | ||
|
|
||
| this.prepopulatedFields = new Dictionary<string, object>(options.PrepopulatedFields.Count, StringComparer.Ordinal); | ||
|
|
||
| if (options.PrepopulatedFields != null) | ||
| { | ||
| this.prepopulatedFieldKeys = []; | ||
| var tempPrepopulatedFields = new Dictionary<string, object>(options.PrepopulatedFields.Count, StringComparer.Ordinal); | ||
| foreach (var kv in options.PrepopulatedFields) | ||
| { | ||
| tempPrepopulatedFields[kv.Key] = kv.Value; | ||
| this.prepopulatedFieldKeys.Add(kv.Key); | ||
| this.prepopulatedFields[kv.Key] = kv.Value; | ||
| } | ||
|
|
||
| #if NET | ||
| this.prepopulatedFields = tempPrepopulatedFields.ToFrozenDictionary(StringComparer.Ordinal); | ||
| #else | ||
| this.prepopulatedFields = tempPrepopulatedFields; | ||
| #endif | ||
| } | ||
|
|
||
| // TODO: Validate custom fields (reserved name? etc). | ||
|
|
@@ -174,27 +178,67 @@ public void Dispose() | |
| this.isDisposed = true; | ||
| } | ||
|
|
||
| internal void AddResourceAttributesToPrepopulated() | ||
| { | ||
| // This function needs to be idempotent | ||
|
|
||
| foreach (var entry in this.resourceProvider().Attributes) | ||
| { | ||
| string key = entry.Key; | ||
| bool isDedicatedField = false; | ||
| if (entry.Value is string) | ||
| { | ||
| switch (key) | ||
| { | ||
| case "service.name": | ||
| key = Schema.V40.PartA.Extensions.Cloud.Role; | ||
| isDedicatedField = true; | ||
| break; | ||
| case "service.instanceId": | ||
| key = Schema.V40.PartA.Extensions.Cloud.RoleInstance; | ||
| isDedicatedField = true; | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| if (isDedicatedField || this.customFields == null || this.customFields.Contains(key)) | ||
| { | ||
| if (!this.prepopulatedFields.ContainsKey(key)) | ||
| { | ||
| this.prepopulatedFields.Add(key, entry.Value); | ||
| } | ||
| } | ||
| else | ||
| { | ||
| if (!this.propertiesEntries.ContainsKey(key)) | ||
| { | ||
| this.propertiesEntries.Add(key, entry.Value); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| internal ArraySegment<byte> SerializeLogRecord(LogRecord logRecord) | ||
| { | ||
| // `LogRecord.State` and `LogRecord.StateValues` were marked Obsolete in https://github.com/open-telemetry/opentelemetry-dotnet/pull/4334 | ||
| #pragma warning disable 0618 | ||
| IReadOnlyList<KeyValuePair<string, object?>>? listKvp; | ||
| IReadOnlyList<KeyValuePair<string, object?>>? logFields; | ||
| if (logRecord.StateValues != null) | ||
| { | ||
| listKvp = logRecord.StateValues; | ||
| logFields = logRecord.StateValues; | ||
| } | ||
| else | ||
| { | ||
| // Attempt to see if State could be ROL_KVP. | ||
| listKvp = logRecord.State as IReadOnlyList<KeyValuePair<string, object?>>; | ||
| logFields = logRecord.State as IReadOnlyList<KeyValuePair<string, object?>> ?? []; | ||
| } | ||
| #pragma warning restore 0618 | ||
|
|
||
| var buffer = Buffer.Value ??= new byte[BUFFER_SIZE]; // TODO: handle OOM | ||
|
|
||
| /* Fluentd Forward Mode: | ||
| [ | ||
| "Log", | ||
| "Log", // (or category name) | ||
| [ | ||
| [ <timestamp>, { "env_ver": "4.0", ... } ] | ||
| ], | ||
|
|
@@ -227,15 +271,20 @@ internal ArraySegment<byte> SerializeLogRecord(LogRecord logRecord) | |
| ushort cntFields = 0; | ||
| var idxMapSizePatch = cursor - 2; | ||
|
|
||
| if (this.prepopulatedFieldKeys != null) | ||
| this.AddResourceAttributesToPrepopulated(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method runs on every log record, which is concerning for the hot path. While the idempotency guards prevent duplicate additions, we're still invoking the resourceProvider delegate, iterating all resource attributes, and performing dictionary lookups per-record. Additionally, the .Any() calls allocate an enumerator and closure on each invocation. |
||
|
|
||
| foreach (var entry in this.prepopulatedFields) | ||
| { | ||
| for (var i = 0; i < this.prepopulatedFieldKeys.Count; i++) | ||
| // A prepopulated entry should not be added if the same key exists in the log, | ||
| // and customFields configuration would make it a dedicated field. | ||
| if ((this.customFields == null || this.customFields.Contains(entry.Key)) | ||
| && logFields.Any(kvp => kvp.Key == entry.Key)) | ||
| { | ||
| var key = this.prepopulatedFieldKeys[i]; | ||
| var value = this.prepopulatedFields![key]; | ||
| cursor = AddPartAField(buffer, cursor, key, value); | ||
| cntFields += 1; | ||
| continue; | ||
| } | ||
|
|
||
| cursor = AddPartAField(buffer, cursor, entry.Key, entry.Value); | ||
| cntFields += 1; | ||
| } | ||
|
|
||
| // Part A - core envelope | ||
|
|
@@ -295,10 +344,8 @@ internal ArraySegment<byte> SerializeLogRecord(LogRecord logRecord) | |
| var hasEnvProperties = false; | ||
| var bodyPopulated = false; | ||
| var namePopulated = false; | ||
| for (var i = 0; i < listKvp?.Count; i++) | ||
| foreach (var entry in logFields) | ||
| { | ||
| var entry = listKvp[i]; | ||
|
|
||
| // Iteration #1 - Get those fields which become dedicated columns | ||
| // i.e all Part B fields and opt-in Part C fields. | ||
| if (entry.Key == "{OriginalFormat}") | ||
|
|
@@ -366,27 +413,44 @@ internal ArraySegment<byte> SerializeLogRecord(LogRecord logRecord) | |
| cursor = dataForScopes.Cursor; | ||
| cntFields = dataForScopes.FieldsCount; | ||
|
|
||
| if (hasEnvProperties) | ||
| if (hasEnvProperties || this.propertiesEntries.Count > 0) | ||
| { | ||
| // Iteration #2 - Get all "other" fields and collapse them into single field | ||
| // named "env_properties". | ||
| // Anything that's not a dedicated field gets put into a part C field called "env_properties". | ||
| ushort envPropertiesCount = 0; | ||
| cursor = MessagePackSerializer.SerializeAsciiString(buffer, cursor, "env_properties"); | ||
| cursor = MessagePackSerializer.WriteMapHeader(buffer, cursor, ushort.MaxValue); | ||
| var idxMapSizeEnvPropertiesPatch = cursor - 2; | ||
| for (var i = 0; i < listKvp!.Count; i++) | ||
|
|
||
| if (hasEnvProperties) | ||
| { | ||
| var entry = listKvp[i]; | ||
| if (entry.Key == "{OriginalFormat}" || this.customFields!.Contains(entry.Key)) | ||
| foreach (var entry in logFields) | ||
| { | ||
| continue; | ||
| if (entry.Key == "{OriginalFormat}" || this.customFields!.Contains(entry.Key)) | ||
| { | ||
| continue; | ||
| } | ||
| else | ||
| { | ||
| cursor = MessagePackSerializer.SerializeUnicodeString(buffer, cursor, entry.Key, this.stringFieldSizeLimitCharCount); | ||
| cursor = MessagePackSerializer.Serialize(buffer, cursor, entry.Value); | ||
| envPropertiesCount += 1; | ||
| } | ||
| } | ||
| else | ||
| } | ||
|
|
||
| foreach (var entry in this.propertiesEntries) | ||
| { | ||
| // A prepopulated env_properties entry should not be added if the same key exists in the log, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. GenevaExporter previously did not attempt de-duplication for performance reasons (neither did upstream OTel sdk). I don't think we need to do it now either as this will affect perf.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it was previously not possible to have a duplicate in env_properties. Do you think I should remove this check for duplicates between resource attributes and log fields? How do I weigh sending bad data to the agents versus performance impact?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For Logs, duplicates were possible before. (eg: #736). The current behavior is to let Agent deal with the duplicate data. It may/may-not be ideal, but that is the current state. (IIRC, metrics also don't do de-duplication of attributes. Agent deals with de-duplication) |
||
| // and lack of customFields configuration would place it in env_properties. | ||
| if (this.customFields != null && !this.customFields.Contains(entry.Key) | ||
| && logFields.Any(kvp => kvp.Key == entry.Key)) | ||
| { | ||
| cursor = MessagePackSerializer.SerializeUnicodeString(buffer, cursor, entry.Key, this.stringFieldSizeLimitCharCount); | ||
| cursor = MessagePackSerializer.Serialize(buffer, cursor, entry.Value); | ||
| envPropertiesCount += 1; | ||
| continue; | ||
| } | ||
|
|
||
| cursor = MessagePackSerializer.SerializeUnicodeString(buffer, cursor, entry.Key); | ||
| cursor = MessagePackSerializer.Serialize(buffer, cursor, entry.Value); | ||
| envPropertiesCount += 1; | ||
| } | ||
|
|
||
| // Prepare state for scopes | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a simple boolean setting, which, when enabled, will add all resource attributes to the log?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it's a connection string-based opt-in feature switch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. I spoke to @rajkumar-rangaraj also, and IMHO, it should be a per-attribute opt-in instead of adding all resource attributes. I understand users can control Resource by limiting things added to Resource, but Resource is not per exporter, so it's possible they need full resource in another exporter but limited one in Geneva, which is impossible to achieve now.
Few reasons why I think this should be enabled on per-resource-attribute basis.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this comment apply to the other PR relating to traces? #3214
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually attempted to make the change you are suggesting for traces, but it received negative feedback so I decided not to pursue it: #3367 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I am aware. I am asking Raj to reconsider, so as to be consistent with the prior work (OTel Rust, which has this as a stable feature already). OTel Rust did it the way it did for the reasons I shared in earlier comment.