Skip to content

Commit 36e9114

Browse files
committed
fix(buckets): Wait for buckets to become active after creation
As buckets can not be used in other configurations until they have become active, but the API returns immediately with the bucket still in 'creating' state, the client is changed to await the bucket actually becoming active and usable before Create() returns.
1 parent 89c3954 commit 36e9114

File tree

2 files changed

+159
-22
lines changed

2 files changed

+159
-22
lines changed

api/clients/buckets/bucket.go

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,48 @@ func (c Client) create(ctx context.Context, bucketName string, data []byte) (res
354354
if err != nil {
355355
return rest.Response{}, fmt.Errorf("failed to create object with bucketName %q: %w", bucketName, err)
356356
}
357-
return r, nil
357+
if !r.IsSuccess() {
358+
return r, nil
359+
}
360+
361+
timoutCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
362+
defer cancel() // cancel deadline if awaitBucketReady returns before deadline
363+
return c.awaitBucketReady(timoutCtx, bucketName)
364+
}
365+
366+
func (c Client) awaitBucketReady(ctx context.Context, bucketName string) (rest.Response, error) {
367+
logger := logr.FromContextOrDiscard(ctx)
368+
369+
for {
370+
select {
371+
case <-ctx.Done():
372+
return rest.Response{}, fmt.Errorf("context cancelled before bucket with bucktName %q became available", bucketName)
373+
default:
374+
// query bucket
375+
r, err := c.get(ctx, bucketName)
376+
if err != nil {
377+
return rest.Response{}, err
378+
}
379+
if !r.IsSuccess() && r.StatusCode != http.StatusNotFound { // if API returns 404 right after creation we want to wait
380+
return r, nil
381+
}
382+
383+
// try to unmarshal into internal struct
384+
res, err := unmarshalJSON(&r)
385+
if err != nil {
386+
return r, err
387+
}
388+
389+
if res.Status == "active" {
390+
logger.V(1).Info("Created bucket became active and ready to use")
391+
r.StatusCode = http.StatusCreated // return 'created' instead of the GET APIs 'ok'
392+
return r, nil
393+
}
394+
395+
logger.V(1).Info("Waiting for bucket to become active after creation...")
396+
time.Sleep(c.retrySettings.waitDuration)
397+
}
398+
}
358399
}
359400

360401
func (c Client) get(ctx context.Context, bucketName string) (rest.Response, error) {

api/clients/buckets/bucket_test.go

Lines changed: 117 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,17 @@ func TestList(t *testing.T) {
184184

185185
func TestUpsert(t *testing.T) {
186186

187-
const someBucketResponse = `{
187+
const creatingBucketResponse = `{
188+
"bucketName": "bucket name",
189+
"table": "metrics",
190+
"displayName": "Default metrics (15 months)",
191+
"status": "creating",
192+
"retentionDays": 462,
193+
"metricInterval": "PT1M",
194+
"version": 1
195+
}`
196+
197+
const activeBucketResponse = `{
188198
"bucketName": "bucket name",
189199
"table": "metrics",
190200
"displayName": "Default metrics (15 months)",
@@ -198,7 +208,7 @@ func TestUpsert(t *testing.T) {
198208
responses := testutils.ServerResponses{
199209
http.MethodPost: {
200210
ResponseCode: http.StatusOK,
201-
ResponseBody: someBucketResponse,
211+
ResponseBody: creatingBucketResponse,
202212
ValidateRequestFunc: func(req *http.Request) {
203213
data, err := io.ReadAll(req.Body)
204214
assert.NoError(t, err)
@@ -210,6 +220,10 @@ func TestUpsert(t *testing.T) {
210220
assert.Equal(t, "bucket name", m["bucketName"])
211221
},
212222
},
223+
http.MethodGet: {
224+
ResponseCode: http.StatusOK,
225+
ResponseBody: activeBucketResponse,
226+
},
213227
}
214228
server := testutils.NewHTTPTestServer(t, responses)
215229
defer server.Close()
@@ -229,6 +243,42 @@ func TestUpsert(t *testing.T) {
229243
assert.Equal(t, "bucket name", m["bucketName"])
230244
})
231245

246+
t.Run("create bucket - awaits bucket becoming ready", func(t *testing.T) {
247+
getRequests := 0
248+
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
249+
switch req.Method {
250+
case http.MethodPost:
251+
rw.WriteHeader(http.StatusCreated)
252+
rw.Write([]byte(creatingBucketResponse))
253+
case http.MethodGet:
254+
rw.WriteHeader(http.StatusOK)
255+
if getRequests < 5 {
256+
rw.Write([]byte(creatingBucketResponse))
257+
getRequests++
258+
} else {
259+
rw.Write([]byte(activeBucketResponse))
260+
}
261+
default:
262+
t.Fatalf("unexpected %s request", req.Method)
263+
264+
}
265+
}))
266+
267+
defer server.Close()
268+
269+
url, _ := url.Parse(server.URL) //nolint:errcheck
270+
271+
client := buckets.NewClient(rest.NewClient(url, server.Client()))
272+
273+
ctx := testutils.ContextWithLogger(t)
274+
275+
resp, err := client.Create(ctx, "bucket name", []byte("{}"))
276+
assert.NoError(t, err)
277+
assert.Equal(t, http.StatusCreated, resp.StatusCode)
278+
assert.Equal(t, activeBucketResponse, string(resp.Data))
279+
assert.Equal(t, 5, getRequests)
280+
})
281+
232282
t.Run("create fails", func(t *testing.T) {
233283
responses := testutils.ServerResponses{
234284
http.MethodPost: {
@@ -237,11 +287,11 @@ func TestUpsert(t *testing.T) {
237287
},
238288
http.MethodGet: {
239289
ResponseCode: http.StatusNotFound,
240-
ResponseBody: someBucketResponse,
290+
ResponseBody: "{}",
241291
},
242292
http.MethodPut: {
243293
ResponseCode: http.StatusOK,
244-
ResponseBody: someBucketResponse,
294+
ResponseBody: activeBucketResponse,
245295
},
246296
}
247297
server := testutils.NewHTTPTestServer(t, responses)
@@ -265,14 +315,14 @@ func TestUpsert(t *testing.T) {
265315
},
266316
http.MethodGet: {
267317
ResponseCode: http.StatusOK,
268-
ResponseBody: someBucketResponse,
318+
ResponseBody: activeBucketResponse,
269319
ValidateRequestFunc: func(req *http.Request) {
270320
assert.Contains(t, req.URL.String(), url.PathEscape("bucket name"))
271321
},
272322
},
273323
http.MethodPut: {
274324
ResponseCode: http.StatusOK,
275-
ResponseBody: someBucketResponse,
325+
ResponseBody: activeBucketResponse,
276326
ValidateRequestFunc: func(req *http.Request) {
277327
data, err := io.ReadAll(req.Body)
278328
assert.NoError(t, err)
@@ -311,7 +361,7 @@ func TestUpsert(t *testing.T) {
311361
},
312362
http.MethodGet: {
313363
ResponseCode: http.StatusOK,
314-
ResponseBody: someBucketResponse,
364+
ResponseBody: activeBucketResponse,
315365
},
316366
http.MethodPut: {
317367
ResponseCode: http.StatusForbidden,
@@ -339,7 +389,7 @@ func TestUpsert(t *testing.T) {
339389
},
340390
http.MethodGet: {
341391
ResponseCode: http.StatusOK,
342-
ResponseBody: someBucketResponse,
392+
ResponseBody: activeBucketResponse,
343393
},
344394
http.MethodPut: {
345395
ResponseCode: http.StatusConflict,
@@ -396,15 +446,15 @@ func TestUpsert(t *testing.T) {
396446
rw.WriteHeader(http.StatusConflict)
397447
rw.Write([]byte("no, this is an error"))
398448
case http.MethodGet:
399-
rw.Write([]byte(someBucketResponse))
449+
rw.Write([]byte(activeBucketResponse))
400450
case http.MethodPut:
401451
if firstTry {
402452
rw.WriteHeader(http.StatusConflict)
403453
rw.Write([]byte("conflict"))
404454
firstTry = false
405455
} else {
406456
rw.WriteHeader(http.StatusOK)
407-
rw.Write([]byte(someBucketResponse))
457+
rw.Write([]byte(activeBucketResponse))
408458
}
409459
default:
410460
assert.Failf(t, "unexpected method %q", req.Method)
@@ -436,7 +486,7 @@ func TestUpsert(t *testing.T) {
436486
},
437487
http.MethodGet: {
438488
ResponseCode: http.StatusOK,
439-
ResponseBody: someBucketResponse,
489+
ResponseBody: activeBucketResponse,
440490
ValidateRequestFunc: func(req *http.Request) {
441491
assert.Contains(t, req.URL.String(), url.PathEscape("bucket name"))
442492
},
@@ -446,7 +496,7 @@ func TestUpsert(t *testing.T) {
446496
defer server.Close()
447497

448498
client := buckets.NewClient(rest.NewClient(server.URL(), server.Client()))
449-
data := []byte(someBucketResponse)
499+
data := []byte(activeBucketResponse)
450500

451501
ctx := testutils.ContextWithLogger(t)
452502

@@ -536,7 +586,7 @@ func TestCreate(t *testing.T) {
536586
"version": 1
537587
}`
538588

539-
const someBucketResponse = `{
589+
const creatingBucketResponse = `{
540590
"bucketName": "bucket name",
541591
"table": "metrics",
542592
"displayName": "Default metrics (15 months)",
@@ -546,11 +596,25 @@ func TestCreate(t *testing.T) {
546596
"version": 1
547597
}`
548598

599+
const activeBucketResponse = `{
600+
"bucketName": "bucket name",
601+
"table": "metrics",
602+
"displayName": "Default metrics (15 months)",
603+
"status": "active",
604+
"retentionDays": 462,
605+
"metricInterval": "PT1M",
606+
"version": 1
607+
}`
608+
549609
t.Run("create bucket - OK", func(t *testing.T) {
550610
responses := testutils.ServerResponses{
551611
http.MethodPost: {
552612
ResponseCode: http.StatusCreated,
553-
ResponseBody: someBucketResponse,
613+
ResponseBody: creatingBucketResponse,
614+
},
615+
http.MethodGet: {
616+
ResponseCode: http.StatusOK,
617+
ResponseBody: activeBucketResponse,
554618
},
555619
}
556620
server := testutils.NewHTTPTestServer(t, responses)
@@ -563,14 +627,48 @@ func TestCreate(t *testing.T) {
563627
resp, err := client.Create(ctx, "bucket name", []byte(someBucketData))
564628
assert.NoError(t, err)
565629
assert.Equal(t, http.StatusCreated, resp.StatusCode)
566-
assert.Equal(t, someBucketResponse, string(resp.Data))
630+
assert.Equal(t, activeBucketResponse, string(resp.Data))
631+
})
632+
633+
t.Run("create bucket - awaits bucket becoming ready", func(t *testing.T) {
634+
getRequests := 0
635+
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
636+
switch req.Method {
637+
case http.MethodPost:
638+
rw.WriteHeader(http.StatusCreated)
639+
rw.Write([]byte(creatingBucketResponse))
640+
case http.MethodGet:
641+
rw.WriteHeader(http.StatusOK)
642+
if getRequests < 5 {
643+
rw.Write([]byte(creatingBucketResponse))
644+
getRequests++
645+
} else {
646+
rw.Write([]byte(activeBucketResponse))
647+
}
648+
default:
649+
t.Fatalf("unexpected %s request", req.Method)
650+
651+
}
652+
}))
653+
654+
defer server.Close()
655+
656+
url, _ := url.Parse(server.URL) //nolint:errcheck
657+
658+
client := buckets.NewClient(rest.NewClient(url, server.Client()))
659+
660+
ctx := testutils.ContextWithLogger(t)
661+
662+
resp, err := client.Create(ctx, "bucket name", []byte(someBucketData))
663+
assert.NoError(t, err)
664+
assert.Equal(t, http.StatusCreated, resp.StatusCode)
665+
assert.Equal(t, activeBucketResponse, string(resp.Data))
666+
assert.Equal(t, 5, getRequests)
567667
})
568668

569669
t.Run("create bucket - network error", func(t *testing.T) {
570670
responses := testutils.ServerResponses{
571-
http.MethodDelete: {
572-
ResponseCode: http.StatusNotFound,
573-
},
671+
// no request should reach test server
574672
}
575673
server := testutils.NewHTTPTestServer(t, responses)
576674
defer server.Close()
@@ -586,9 +684,7 @@ func TestCreate(t *testing.T) {
586684

587685
t.Run("create bucket - invalid data", func(t *testing.T) {
588686
responses := testutils.ServerResponses{
589-
http.MethodDelete: {
590-
ResponseCode: http.StatusNotFound,
591-
},
687+
// no request should reach test server
592688
}
593689
server := testutils.NewHTTPTestServer(t, responses)
594690
defer server.Close()

0 commit comments

Comments
 (0)