@@ -268,42 +268,45 @@ func createStream(t *testing.T, server string, cfg *nats.StreamConfig) {
268
268
_ , err = js .AddStream (cfg )
269
269
require .NoError (t , err )
270
270
}
271
+
271
272
func TestWriteWithLayoutIntegration (t * testing.T ) {
272
273
if testing .Short () {
273
274
t .Skip ("Skipping integration test in short mode" )
274
275
}
275
276
natsServicePort := "4222"
277
+
278
+ container := testutil.Container {
279
+ Image : "nats:latest" ,
280
+ ExposedPorts : []string {natsServicePort },
281
+ Cmd : []string {"--js" },
282
+ WaitingFor : wait .ForListeningPort (nat .Port (natsServicePort )),
283
+ }
284
+
285
+ natsInstance := & NATS {
286
+ Name : "telegraf" ,
287
+ Jetstream : & StreamConfig {
288
+ Name : "my-telegraf-stream" ,
289
+ Subjects : []string {"my-subject.>" },
290
+ },
291
+ serializer : & influx.Serializer {},
292
+ Log : testutil.Logger {},
293
+ }
294
+
295
+ streamConfigCompareFunc := func (t * testing.T , si * jetstream.StreamInfo ) {
296
+ require .Equal (t , "my-telegraf-stream" , si .Config .Name )
297
+ require .Equal (t , []string {"my-subject.>" }, si .Config .Subjects )
298
+ }
299
+
276
300
type testConfig struct {
277
- name string
278
- container testutil.Container
279
- nats * NATS
280
- streamConfigCompareFunc func (* testing.T , * jetstream.StreamInfo )
281
- sendMetrics []telegraf.Metric
282
- expectedSubjects []string
301
+ name string
302
+ subject string
303
+ sendMetrics []telegraf.Metric
304
+ expectedSubjects []string
283
305
}
284
306
testCases := []testConfig {
285
307
{
286
- name : "subject layout with tags" ,
287
- container : testutil.Container {
288
- Image : "nats:latest" ,
289
- ExposedPorts : []string {natsServicePort },
290
- Cmd : []string {"--js" },
291
- WaitingFor : wait .ForListeningPort (nat .Port (natsServicePort )),
292
- },
293
- nats : & NATS {
294
- Name : "telegraf" ,
295
- Subject : "my-subject.metrics.{{ .Name }}.{{ .Tag \" tag1\" }}.{{ .Tag \" tag2\" }}" ,
296
- Jetstream : & StreamConfig {
297
- Name : "my-telegraf-stream" ,
298
- Subjects : []string {"my-subject.>" },
299
- },
300
- serializer : & influx.Serializer {},
301
- Log : testutil.Logger {},
302
- },
303
- streamConfigCompareFunc : func (t * testing.T , si * jetstream.StreamInfo ) {
304
- require .Equal (t , "my-telegraf-stream" , si .Config .Name )
305
- require .Equal (t , []string {"my-subject.>" }, si .Config .Subjects )
306
- },
308
+ name : "subject layout with tags" ,
309
+ subject : "my-subject.metrics.{{ .Name }}.{{ .Tag \" tag1\" }}.{{ .Tag \" tag2\" }}" ,
307
310
sendMetrics : []telegraf.Metric {metric .New (
308
311
"test1" ,
309
312
map [string ]string {"tag1" : "foo" , "tag2" : "bar" },
@@ -315,27 +318,8 @@ func TestWriteWithLayoutIntegration(t *testing.T) {
315
318
},
316
319
},
317
320
{
318
- name : "subject layout with field name" ,
319
- container : testutil.Container {
320
- Image : "nats:latest" ,
321
- ExposedPorts : []string {natsServicePort },
322
- Cmd : []string {"--js" },
323
- WaitingFor : wait .ForListeningPort (nat .Port (natsServicePort )),
324
- },
325
- nats : & NATS {
326
- Name : "telegraf" ,
327
- Subject : "my-subject.metrics.{{ .Tag \" tag1\" }}.{{ .Tag \" tag2\" }}.{{ .Name }}.{{ .Field \" value\" }}" ,
328
- Jetstream : & StreamConfig {
329
- Name : "my-telegraf-stream" ,
330
- Subjects : []string {"my-subject.>" },
331
- },
332
- serializer : & influx.Serializer {},
333
- Log : testutil.Logger {},
334
- },
335
- streamConfigCompareFunc : func (t * testing.T , si * jetstream.StreamInfo ) {
336
- require .Equal (t , "my-telegraf-stream" , si .Config .Name )
337
- require .Equal (t , []string {"my-subject.>" }, si .Config .Subjects )
338
- },
321
+ name : "subject layout with field name" ,
322
+ subject : "my-subject.metrics.{{ .Tag \" tag1\" }}.{{ .Tag \" tag2\" }}.{{ .Name }}.{{ .Field \" value\" }}" ,
339
323
sendMetrics : []telegraf.Metric {metric .New (
340
324
"test1" ,
341
325
map [string ]string {"tag1" : "foo" , "tag2" : "bar" },
@@ -348,42 +332,46 @@ func TestWriteWithLayoutIntegration(t *testing.T) {
348
332
},
349
333
}
350
334
335
+ require .NoError (t , container .Start (), "failed to start container" )
336
+ defer container .Terminate ()
337
+
351
338
for _ , tc := range testCases {
352
339
t .Run (tc .name , func (t * testing.T ) {
353
- require .NoError (t , tc .container .Start (), "failed to start container" )
354
- defer tc .container .Terminate ()
340
+ natsInstance .Subject = tc .subject
355
341
356
- server := []string {fmt .Sprintf ("nats://%s:%s" , tc . container .Address , tc . container .Ports [natsServicePort ])}
357
- tc . nats .Servers = server
358
- require .NoError (t , tc . nats .Init ())
359
- require .NoError (t , tc . nats .Connect ())
342
+ server := []string {fmt .Sprintf ("nats://%s:%s" , container .Address , container .Ports [natsServicePort ])}
343
+ natsInstance .Servers = server
344
+ require .NoError (t , natsInstance .Init ())
345
+ require .NoError (t , natsInstance .Connect ())
360
346
361
- stream , err := tc . nats . jetstreamClient .Stream (t .Context (), tc . nats .Jetstream .Name )
347
+ stream , err := natsInstance . jetstreamClient .Stream (t .Context (), natsInstance .Jetstream .Name )
362
348
require .NoError (t , err )
363
349
si , err := stream .Info (t .Context ())
364
350
require .NoError (t , err )
365
351
366
- tc . streamConfigCompareFunc (t , si )
367
- require .NoError (t , tc . nats .Write (tc .sendMetrics ))
368
- metricCound := len (tc .sendMetrics )
352
+ streamConfigCompareFunc (t , si )
353
+ require .NoError (t , natsInstance .Write (tc .sendMetrics ))
354
+ metricCount := len (tc .sendMetrics )
369
355
370
- foundSubjects := make ([]string , 0 , metricCound )
371
- if tc . nats .Jetstream != nil {
372
- js , err := tc . nats .conn .JetStream ()
356
+ foundSubjects := make ([]string , 0 , metricCount )
357
+ if natsInstance .Jetstream != nil {
358
+ js , err := natsInstance .conn .JetStream ()
373
359
require .NoError (t , err )
374
- sub , err := js .PullSubscribe (tc . nats .Jetstream .Subjects [0 ], "" )
360
+ sub , err := js .PullSubscribe (natsInstance .Jetstream .Subjects [0 ], "" )
375
361
require .NoError (t , err )
376
362
377
- msgs , err := sub .Fetch (metricCound , nats .MaxWait (1 * time .Second ))
363
+ msgs , err := sub .Fetch (metricCount , nats .MaxWait (1 * time .Second ))
378
364
require .NoError (t , err )
379
365
380
- require .Len (t , msgs , metricCound , "unexpected number of messages" )
366
+ require .Len (t , msgs , metricCount , "unexpected number of messages" )
381
367
for _ , msg := range msgs {
382
368
foundSubjects = append (foundSubjects , msg .Subject )
383
369
}
370
+ js .PurgeStream (natsInstance .Jetstream .Name )
384
371
}
385
372
386
373
assert .Equal (t , tc .expectedSubjects , foundSubjects )
374
+
387
375
}) // end of test case
388
376
}
389
377
}
0 commit comments