@@ -2,10 +2,12 @@ package parquetconverter
2
2
3
3
import (
4
4
"context"
5
+ "errors"
5
6
"fmt"
6
7
"io"
7
8
"math/rand"
8
9
"path"
10
+ "strings"
9
11
"testing"
10
12
"time"
11
13
@@ -21,6 +23,8 @@ import (
21
23
"github.com/thanos-io/objstore/providers/filesystem"
22
24
"github.com/thanos-io/thanos/pkg/block"
23
25
"github.com/thanos-io/thanos/pkg/block/metadata"
26
+ "google.golang.org/grpc/codes"
27
+ "google.golang.org/grpc/status"
24
28
25
29
"github.com/cortexproject/cortex/integration/e2e"
26
30
"github.com/cortexproject/cortex/pkg/ring"
@@ -31,6 +35,7 @@ import (
31
35
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
32
36
"github.com/cortexproject/cortex/pkg/storage/tsdb/users"
33
37
"github.com/cortexproject/cortex/pkg/util/concurrency"
38
+ cortex_errors "github.com/cortexproject/cortex/pkg/util/errors"
34
39
"github.com/cortexproject/cortex/pkg/util/flagext"
35
40
"github.com/cortexproject/cortex/pkg/util/services"
36
41
"github.com/cortexproject/cortex/pkg/util/test"
@@ -269,7 +274,8 @@ func TestConverter_BlockConversionFailure(t *testing.T) {
269
274
270
275
// Create a mock bucket that wraps the filesystem bucket but fails uploads
271
276
mockBucket := & mockBucket {
272
- Bucket : fsBucket ,
277
+ Bucket : fsBucket ,
278
+ uploadFailure : fmt .Errorf ("mock upload failure" ),
273
279
}
274
280
275
281
converter := newConverter (cfg , objstore .WithNoopInstr (mockBucket ), storageCfg , []int64 {3600000 , 7200000 }, nil , reg , overrides , nil )
@@ -284,13 +290,94 @@ func TestConverter_BlockConversionFailure(t *testing.T) {
284
290
assert .Equal (t , 1.0 , testutil .ToFloat64 (converter .metrics .convertBlockFailures .WithLabelValues (userID )))
285
291
}
286
292
293
+ func TestConverter_ShouldNotFailOnAccessDenyError (t * testing.T ) {
294
+ // Create a new registry for testing
295
+ reg := prometheus .NewRegistry ()
296
+
297
+ // Create a new converter with test configuration
298
+ cfg := Config {
299
+ MetaSyncConcurrency : 1 ,
300
+ DataDir : t .TempDir (),
301
+ }
302
+ logger := log .NewNopLogger ()
303
+ storageCfg := cortex_tsdb.BlocksStorageConfig {}
304
+ flagext .DefaultValues (& storageCfg )
305
+ limits := & validation.Limits {}
306
+ flagext .DefaultValues (limits )
307
+ overrides := validation .NewOverrides (* limits , nil )
308
+ limits .ParquetConverterEnabled = true
309
+
310
+ // Create a filesystem bucket for initial block upload
311
+ fsBucket , err := filesystem .NewBucket (t .TempDir ())
312
+ require .NoError (t , err )
313
+
314
+ // Create test labels
315
+ lbls := labels.Labels {labels.Label {
316
+ Name : "__name__" ,
317
+ Value : "test" ,
318
+ }}
319
+
320
+ // Create a real TSDB block
321
+ dir := t .TempDir ()
322
+ rnd := rand .New (rand .NewSource (time .Now ().Unix ()))
323
+ blockID , err := e2e .CreateBlock (context .Background (), rnd , dir , []labels.Labels {lbls }, 2 , 0 , 2 * time .Hour .Milliseconds (), time .Minute .Milliseconds (), 10 )
324
+ require .NoError (t , err )
325
+ bdir := path .Join (dir , blockID .String ())
326
+
327
+ userID := "test-user"
328
+
329
+ // Upload the block to filesystem bucket
330
+ err = block .Upload (context .Background (), logger , bucket .NewPrefixedBucketClient (fsBucket , userID ), bdir , metadata .NoneFunc )
331
+ require .NoError (t , err )
332
+
333
+ var mb * mockBucket
334
+ t .Run ("get failure" , func (t * testing.T ) {
335
+ // Create a mock bucket that wraps the filesystem bucket but fails with permission denied error.
336
+ mb = & mockBucket {
337
+ Bucket : fsBucket ,
338
+ getFailure : cortex_errors .WithCause (errors .New ("dummy error" ), status .Error (codes .PermissionDenied , "dummy" )),
339
+ }
340
+ })
341
+
342
+ t .Run ("upload failure" , func (t * testing.T ) {
343
+ // Create a mock bucket that wraps the filesystem bucket but fails with permission denied error.
344
+ mb = & mockBucket {
345
+ Bucket : fsBucket ,
346
+ uploadFailure : cortex_errors .WithCause (errors .New ("dummy error" ), status .Error (codes .PermissionDenied , "dummy" )),
347
+ }
348
+ })
349
+
350
+ converter := newConverter (cfg , objstore .WithNoopInstr (mb ), storageCfg , []int64 {3600000 , 7200000 }, nil , reg , overrides , nil )
351
+ converter .ringLifecycler = & ring.Lifecycler {
352
+ Addr : "1.2.3.4" ,
353
+ }
354
+
355
+ err = converter .convertUser (context .Background (), logger , & RingMock {ReadRing : & ring.Ring {}}, userID )
356
+ require .Error (t , err )
357
+
358
+ // Verify the failure metric was not incremented
359
+ assert .Equal (t , 0.0 , testutil .ToFloat64 (converter .metrics .convertBlockFailures .WithLabelValues (userID )))
360
+ }
361
+
287
362
// mockBucket implements objstore.Bucket for testing
288
363
type mockBucket struct {
289
364
objstore.Bucket
365
+ uploadFailure error
366
+ getFailure error
290
367
}
291
368
292
369
func (m * mockBucket ) Upload (ctx context.Context , name string , r io.Reader ) error {
293
- return fmt .Errorf ("mock upload failure" )
370
+ if m .uploadFailure != nil {
371
+ return m .uploadFailure
372
+ }
373
+ return m .Bucket .Upload (ctx , name , r )
374
+ }
375
+
376
+ func (m * mockBucket ) Get (ctx context.Context , name string ) (io.ReadCloser , error ) {
377
+ if m .getFailure != nil && strings .Contains (name , "index" ) {
378
+ return nil , m .getFailure
379
+ }
380
+ return m .Bucket .Get (ctx , name )
294
381
}
295
382
296
383
type RingMock struct {
0 commit comments