@@ -203,40 +203,42 @@ var _ = Describe("Environment test", func() {
203203 })
204204
205205 Describe ("TCP Parameters" , func () {
206+ It ("should accept custom TCP parameters" , func () {
207+ env , err := NewEnvironment (& EnvironmentOptions {
208+ ConnectionParameters : []* Broker {
209+ newBrokerDefault (),
210+ },
211+ TCPParameters : & TCPParameters {
212+ tlsConfig : nil ,
213+ RequestedHeartbeat : defaultHeartbeat ,
214+ RequestedMaxFrameSize : defaultMaxFrameSize ,
215+ WriteBuffer : 100 ,
216+ ReadBuffer : 200 ,
217+ NoDelay : false ,
218+ },
219+ MaxProducersPerClient : 1 ,
220+ MaxConsumersPerClient : 1 ,
221+ AddressResolver : nil ,
222+ RPCTimeout : defaultSocketCallTimeout ,
223+ })
206224
207- env , err := NewEnvironment (& EnvironmentOptions {
208- ConnectionParameters : []* Broker {
209- newBrokerDefault (),
210- },
211- TCPParameters : & TCPParameters {
212- tlsConfig : nil ,
213- RequestedHeartbeat : defaultHeartbeat ,
214- RequestedMaxFrameSize : defaultMaxFrameSize ,
215- WriteBuffer : 100 ,
216- ReadBuffer : 200 ,
217- NoDelay : false ,
218- },
219- MaxProducersPerClient : 1 ,
220- MaxConsumersPerClient : 1 ,
221- AddressResolver : nil ,
222- RPCTimeout : defaultSocketCallTimeout ,
225+ Expect (err ).NotTo (HaveOccurred ())
226+ Expect (env .Close ()).NotTo (HaveOccurred ())
223227 })
224-
225- Expect (err ).NotTo (HaveOccurred ())
226- Expect (env .Close ()).NotTo (HaveOccurred ())
227-
228228 })
229229
230230 Describe ("Environment Validations" , func () {
231- _ , err := NewEnvironment (NewEnvironmentOptions ().
232- SetMaxConsumersPerClient (0 ).
233- SetMaxProducersPerClient (0 ))
234- Expect (err ).To (HaveOccurred ())
231+ It ("should reject invalid max consumers/producers" , func () {
232+ _ , err := NewEnvironment (NewEnvironmentOptions ().
233+ SetMaxConsumersPerClient (0 ).
234+ SetMaxProducersPerClient (0 ))
235+ Expect (err ).To (HaveOccurred ())
235236
236- _ , err = NewEnvironment (NewEnvironmentOptions ().
237- SetMaxConsumersPerClient (500 ).
238- SetMaxProducersPerClient (500 ))
239- Expect (err ).To (HaveOccurred ())
237+ _ , err = NewEnvironment (NewEnvironmentOptions ().
238+ SetMaxConsumersPerClient (500 ).
239+ SetMaxProducersPerClient (500 ))
240+ Expect (err ).To (HaveOccurred ())
241+ })
240242
241243 It ("Malformed URI" , func () {
242244 _ , err := NewEnvironment (NewEnvironmentOptions ().
@@ -273,63 +275,66 @@ var _ = Describe("Environment test", func() {
273275 })
274276
275277 Describe ("Validation Query Offset/Sequence" , func () {
278+ It ("should return error for non-existent streams" , func () {
279+ env , err := NewEnvironment (NewEnvironmentOptions ())
280+ Expect (err ).NotTo (HaveOccurred ())
281+ _ , err = env .QuerySequence ("my_prod" ,
282+ "Stream_Doesnt_exist" )
283+ Expect (err ).To (HaveOccurred ())
276284
277- env , err := NewEnvironment (NewEnvironmentOptions ())
278- Expect (err ).NotTo (HaveOccurred ())
279- _ , err = env .QuerySequence ("my_prod" ,
280- "Stream_Doesnt_exist" )
281- Expect (err ).To (HaveOccurred ())
282-
283- _ , err = env .QueryOffset ("my_cons" ,
284- "Stream_Doesnt_exist" )
285- Expect (err ).To (HaveOccurred ())
286- Expect (env .Close ()).NotTo (HaveOccurred ())
285+ _ , err = env .QueryOffset ("my_cons" ,
286+ "Stream_Doesnt_exist" )
287+ Expect (err ).To (HaveOccurred ())
288+ Expect (env .Close ()).NotTo (HaveOccurred ())
289+ })
287290 })
288291
289292 Describe ("Stream Existing/Meta data" , func () {
290-
291- env , err := NewEnvironment (NewEnvironmentOptions ().SetPort (5552 ).
292- SetUser ("guest" ).
293- SetPassword ("guest" ).SetHost ("localhost" ))
294- Expect (err ).NotTo (HaveOccurred ())
295- stream := uuid .New ().String ()
296- err = env .DeclareStream (stream , nil )
297- Expect (err ).NotTo (HaveOccurred ())
298- exists , err := env .StreamExists (stream )
299- Expect (err ).NotTo (HaveOccurred ())
300- Expect (exists ).To (Equal (true ))
301- metaData , err := env .StreamMetaData (stream )
302- Expect (err ).NotTo (HaveOccurred ())
303- Expect (metaData .Leader .Host ).To (Equal ("localhost" ))
304- Expect (metaData .Leader .Port ).To (Equal ("5552" ))
305- Expect (len (metaData .Replicas )).To (Equal (0 ))
306- Expect (env .DeleteStream (stream )).NotTo (HaveOccurred ())
307- exists , err = env .StreamExists (stream )
308- Expect (err ).NotTo (HaveOccurred ())
309- Expect (exists ).To (Equal (false ))
310- Expect (env .Close ()).NotTo (HaveOccurred ())
311-
293+ It ( "should check stream existence and metadata" , func () {
294+ env , err := NewEnvironment (NewEnvironmentOptions ().SetPort (5552 ).
295+ SetUser ("guest" ).
296+ SetPassword ("guest" ).SetHost ("localhost" ))
297+ Expect (err ).NotTo (HaveOccurred ())
298+ stream := uuid .New ().String ()
299+ err = env .DeclareStream (stream , nil )
300+ Expect (err ).NotTo (HaveOccurred ())
301+ exists , err := env .StreamExists (stream )
302+ Expect (err ).NotTo (HaveOccurred ())
303+ Expect (exists ).To (Equal (true ))
304+ metaData , err := env .StreamMetaData (stream )
305+ Expect (err ).NotTo (HaveOccurred ())
306+ Expect (metaData .Leader .Host ).To (Equal ("localhost" ))
307+ Expect (metaData .Leader .Port ).To (Equal ("5552" ))
308+ Expect (len (metaData .Replicas )).To (Equal (0 ))
309+ Expect (env .DeleteStream (stream )).NotTo (HaveOccurred ())
310+ exists , err = env .StreamExists (stream )
311+ Expect (err ).NotTo (HaveOccurred ())
312+ Expect (exists ).To (Equal (false ))
313+ Expect (env .Close ()).NotTo (HaveOccurred ())
314+ })
312315 })
313316
314317 Describe ("Address Resolver" , func () {
315- addressResolver := AddressResolver {
316- Host : "localhost" ,
317- Port : 5552 ,
318- }
319- env , err := NewEnvironment (
320- NewEnvironmentOptions ().
321- SetHost (addressResolver .Host ).
322- SetPort (addressResolver .Port ).
323- SetAddressResolver (addressResolver ).
324- SetMaxProducersPerClient (1 ))
325- Expect (err ).NotTo (HaveOccurred ())
326- streamName := uuid .New ().String ()
327- Expect (env .DeclareStream (streamName , nil )).NotTo (HaveOccurred ())
328- p , err := env .NewProducer (streamName , nil )
329- Expect (err ).NotTo (HaveOccurred ())
330- Expect (p .Close ()).NotTo (HaveOccurred ())
331- Expect (env .DeleteStream (streamName )).NotTo (HaveOccurred ())
332- Expect (env .Close ()).NotTo (HaveOccurred ())
318+ It ("should connect using address resolver" , func () {
319+ addressResolver := AddressResolver {
320+ Host : "localhost" ,
321+ Port : 5552 ,
322+ }
323+ env , err := NewEnvironment (
324+ NewEnvironmentOptions ().
325+ SetHost (addressResolver .Host ).
326+ SetPort (addressResolver .Port ).
327+ SetAddressResolver (addressResolver ).
328+ SetMaxProducersPerClient (1 ))
329+ Expect (err ).NotTo (HaveOccurred ())
330+ streamName := uuid .New ().String ()
331+ Expect (env .DeclareStream (streamName , nil )).NotTo (HaveOccurred ())
332+ p , err := env .NewProducer (streamName , nil )
333+ Expect (err ).NotTo (HaveOccurred ())
334+ Expect (p .Close ()).NotTo (HaveOccurred ())
335+ Expect (env .DeleteStream (streamName )).NotTo (HaveOccurred ())
336+ Expect (env .Close ()).NotTo (HaveOccurred ())
337+ })
333338 })
334339
335340 It ("Multi Uris/Multi Uris Fails" , func () {
@@ -477,44 +482,48 @@ var _ = Describe("Environment test", func() {
477482 })
478483
479484 Describe ("Query Offset should return the value from Store Offset" , func () {
480- env , err := NewEnvironment (NewEnvironmentOptions ())
481- Expect (err ).NotTo (HaveOccurred ())
482- streamName := uuid .New ().String ()
483- Expect (env .DeclareStream (streamName , nil )).NotTo (HaveOccurred ())
484- const consumerName = "my_consumer"
485- Expect (env .StoreOffset (consumerName , streamName , 123 )).NotTo (HaveOccurred ())
486- off , err := env .QueryOffset (consumerName , streamName )
487- Expect (err ).NotTo (HaveOccurred ())
488- Expect (off ).To (Equal (int64 (123 )))
489- Expect (env .DeleteStream (streamName )).NotTo (HaveOccurred ())
490- Expect (env .Close ()).NotTo (HaveOccurred ())
485+ It ("should store and query offset" , func () {
486+ env , err := NewEnvironment (NewEnvironmentOptions ())
487+ Expect (err ).NotTo (HaveOccurred ())
488+ streamName := uuid .New ().String ()
489+ Expect (env .DeclareStream (streamName , nil )).NotTo (HaveOccurred ())
490+ const consumerName = "my_consumer"
491+ Expect (env .StoreOffset (consumerName , streamName , 123 )).NotTo (HaveOccurred ())
492+ off , err := env .QueryOffset (consumerName , streamName )
493+ Expect (err ).NotTo (HaveOccurred ())
494+ Expect (off ).To (Equal (int64 (123 )))
495+ Expect (env .DeleteStream (streamName )).NotTo (HaveOccurred ())
496+ Expect (env .Close ()).NotTo (HaveOccurred ())
497+ })
491498 })
492499
493500 // PR:https://github.com/rabbitmq/rabbitmq-stream-go-client/pull/388
494501 Describe ("QueryOffset DeclareStream StoreOffset should reconnect the locator" , func () {
495- env , err := NewEnvironment (NewEnvironmentOptions ())
496- Expect (err ).NotTo (HaveOccurred ())
497- streamName := uuid .New ().String ()
498- // here we force the client closing
499- env .locator .client .Close ()
500- Expect (env .DeclareStream (streamName , nil )).NotTo (HaveOccurred ())
501- Expect (env .locator .client .socket .isOpen ()).To (BeTrue ())
502- const consumerName = "my_consumer_1"
503- // here we force the client closing
504- env .locator .client .Close ()
505- Expect (env .StoreOffset (consumerName , streamName , 123 )).NotTo (HaveOccurred ())
506- Expect (env .locator .client .socket .isOpen ()).To (BeTrue ())
507- // here we force the client closing
508- env .locator .client .Close ()
509- off , err := env .QueryOffset (consumerName , streamName )
510- Expect (err ).NotTo (HaveOccurred ())
511- Expect (env .locator .client .socket .isOpen ()).To (BeTrue ())
512- Expect (off ).To (Equal (int64 (123 )))
513- // here we force the client closing
514- env .locator .client .Close ()
515- Expect (env .DeleteStream (streamName )).NotTo (HaveOccurred ())
516- Expect (env .locator .client .socket .isOpen ()).To (BeTrue ())
517- Expect (env .Close ()).NotTo (HaveOccurred ())
502+ It ("should reconnect locator when needed" , func () {
503+ env , err := NewEnvironment (NewEnvironmentOptions ())
504+ Expect (err ).NotTo (HaveOccurred ())
505+ streamName := uuid .New ().String ()
506+ // here we force the client closing
507+ env .locator .client .Close ()
508+ Expect (env .DeclareStream (streamName , nil )).NotTo (HaveOccurred ())
509+ Expect (env .locator .client .socket .isOpen ()).To (BeTrue ())
510+ const consumerName = "my_consumer_1"
511+ // here we force the client closing
512+ env .locator .client .Close ()
513+ Expect (env .StoreOffset (consumerName , streamName , 123 )).NotTo (HaveOccurred ())
514+ Expect (env .locator .client .socket .isOpen ()).To (BeTrue ())
515+ // here we force the client closing
516+ env .locator .client .Close ()
517+ off , err := env .QueryOffset (consumerName , streamName )
518+ Expect (err ).NotTo (HaveOccurred ())
519+ Expect (env .locator .client .socket .isOpen ()).To (BeTrue ())
520+ Expect (off ).To (Equal (int64 (123 )))
521+ // here we force the client closing
522+ env .locator .client .Close ()
523+ Expect (env .DeleteStream (streamName )).NotTo (HaveOccurred ())
524+ Expect (env .locator .client .socket .isOpen ()).To (BeTrue ())
525+ Expect (env .Close ()).NotTo (HaveOccurred ())
526+ })
518527 })
519528
520529})
0 commit comments