Skip to content

Commit e90ffbc

Browse files
committed
feat(outputs.nats): rebase off master, refactored tests and moved layout handling into its own file
1 parent 6f5d885 commit e90ffbc

File tree

5 files changed

+197
-147
lines changed

5 files changed

+197
-147
lines changed

plugins/outputs/nats/README.md

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,8 @@ to use them.
6666
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
6767
data_format = "influx"
6868

69-
## Set to true if the stream is managed externally and Telegraf should not attempt
70-
## to create or update it. When true, Telegraf will connect to the existing stream
71-
## and fail if it does not exist.
72-
## Default is false (Telegraf may create or modify the stream).
73-
# external_stream_config = false
74-
7569
## Subject Layout Config
76-
# This configuration allows you to define the NATS subject used when publishing metrics,
70+
# Allows defining a template for the NATS subject used when publishing metrics,
7771
# using Go template formatting. Each element in the with_subject_layout array represents
7872
# a segment of the final subject, dynamically populated from the metric’s tags, name,
7973
# and field name.

plugins/outputs/nats/nats.go

Lines changed: 16 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"fmt"
1010
"strings"
1111
"text/template"
12-
"text/template/parse"
1312
"time"
1413

1514
"github.com/nats-io/nats.go"
@@ -26,16 +25,15 @@ import (
2625
var sampleConfig string
2726

2827
type NATS struct {
29-
Servers []string `toml:"servers"`
30-
Secure bool `toml:"secure"`
31-
Name string `toml:"name"`
32-
Username config.Secret `toml:"username"`
33-
Password config.Secret `toml:"password"`
34-
Credentials string `toml:"credentials"`
35-
Subject string `toml:"subject"`
36-
Jetstream *StreamConfig `toml:"jetstream"`
37-
ExternalStreamConfig bool `toml:"external_stream_config"`
38-
SubjectLayout []string `toml:"with_subject_layout"`
28+
Servers []string `toml:"servers"`
29+
Secure bool `toml:"secure"`
30+
Name string `toml:"name"`
31+
Username config.Secret `toml:"username"`
32+
Password config.Secret `toml:"password"`
33+
Credentials string `toml:"credentials"`
34+
Subject string `toml:"subject"`
35+
Jetstream *StreamConfig `toml:"jetstream"`
36+
SubjectLayout []string `toml:"with_subject_layout"`
3937

4038
tls.ClientConfig
4139

@@ -89,44 +87,6 @@ type StreamConfig struct {
8987
DisableStreamCreation bool `toml:"disable_stream_creation"`
9088
}
9189

92-
type subMsgPair struct {
93-
subject string
94-
metric telegraf.Metric
95-
}
96-
type metricSubjectTmplCtx struct {
97-
Name string
98-
getTag func(string) string
99-
getField func() string
100-
}
101-
102-
func (m metricSubjectTmplCtx) GetTag(key string) string {
103-
return m.getTag(key)
104-
}
105-
106-
func (m metricSubjectTmplCtx) Field() string {
107-
return m.getField()
108-
}
109-
110-
func createmetricSubjectTmplCtx(metric telegraf.Metric) metricSubjectTmplCtx {
111-
return metricSubjectTmplCtx{
112-
Name: metric.Name(),
113-
getTag: func(key string) string {
114-
return metric.Tags()[key]
115-
},
116-
getField: func() string {
117-
fields := metric.FieldList()
118-
if len(fields) == 0 {
119-
return "emptyFields"
120-
}
121-
if len(fields) > 1 {
122-
return "tooManyFields"
123-
}
124-
125-
return fields[0].Key
126-
},
127-
}
128-
}
129-
13090
func (*NATS) SampleConfig() string {
13191
return sampleConfig
13292
}
@@ -302,7 +262,6 @@ func (n *NATS) getJetstreamConfig() (*jetstream.StreamConfig, error) {
302262
}
303263

304264
func (n *NATS) Init() error {
305-
n.Log.Debug("Initializing NATS output plugin")
306265
// If layout is enabled, we will use the subject as the
307266
// base of the template and add more tokens based on
308267
// the template.
@@ -422,7 +381,12 @@ func (n *NATS) Write(metrics []telegraf.Metric) error {
422381

423382
for i, pair := range subMsgPairList {
424383
if strings.Contains(pair.subject, "..") {
425-
n.Log.Errorf("double dots are not allowed in the subject: %s, most likely a missing value in the template", pair.subject)
384+
n.Log.Errorf("invalid subject: %s, incorrect template", pair.subject)
385+
continue
386+
}
387+
388+
if strings.HasSuffix(pair.subject, ".") {
389+
n.Log.Errorf("invalid subject: %s, incorrect template", pair.subject)
426390
continue
427391
}
428392

@@ -433,6 +397,7 @@ func (n *NATS) Write(metrics []telegraf.Metric) error {
433397
}
434398

435399
n.Log.Debugf("Publishing on Subject: %s, Metrics: %s", pair.subject, string(buf))
400+
fmt.Printf("Publishing on Subject: %s, Metrics: %s\n", pair.subject, string(buf))
436401
if n.Jetstream != nil {
437402
if n.Jetstream.AsyncPublish {
438403
pafs[i], err = n.jetstreamClient.PublishAsync(pair.subject, buf, jetstream.WithExpectStream(n.Jetstream.Name))
@@ -475,50 +440,3 @@ func init() {
475440
}
476441
})
477442
}
478-
479-
// Check the template for any references to `.Field`.
480-
// If the template includes a `.Field` reference, we will need to split the metric
481-
// into separate messages based on the field.
482-
func usesFieldField(node parse.Node) bool {
483-
switch n := node.(type) {
484-
case *parse.ListNode:
485-
for _, sub := range n.Nodes {
486-
if usesFieldField(sub) {
487-
return true
488-
}
489-
}
490-
case *parse.ActionNode:
491-
return usesFieldField(n.Pipe)
492-
case *parse.PipeNode:
493-
for _, cmd := range n.Cmds {
494-
if usesFieldField(cmd) {
495-
return true
496-
}
497-
}
498-
case *parse.CommandNode:
499-
for _, arg := range n.Args {
500-
if usesFieldField(arg) {
501-
return true
502-
}
503-
}
504-
case *parse.FieldNode:
505-
// .Field will be represented as []string{"Field"}
506-
return len(n.Ident) == 1 && n.Ident[0] == "Field"
507-
}
508-
return false
509-
}
510-
511-
// splitMetricByField will create a new metric that only contains the specified field.
512-
// This is used when the user wants to include the field name in the subject.
513-
func splitMetricByField(metric telegraf.Metric, field string) telegraf.Metric {
514-
metricCopy := metric.Copy()
515-
516-
for _, f := range metric.FieldList() {
517-
if f.Key != field {
518-
// Remove all fields that are not the specified field
519-
metricCopy.RemoveField(f.Key)
520-
}
521-
}
522-
523-
return metricCopy
524-
}

0 commit comments

Comments
 (0)