diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ff1a4b2ce3..a1b40c7b2dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ * [FEATURE] Distributor/Ingester: Implemented experimental feature to use gRPC stream connection for push requests. This can be enabled by setting `-distributor.use-stream-push=true`. #6580 * [FEATURE] Compactor: Add support for percentage based sharding for compactors. #6738 * [FEATURE] Querier: Allow choosing PromQL engine via header. #6777 +* [ENHANCEMENT] Parquet Converter: Allow 2h blocks conversion if the block has a no-compact-mark.json file. #6865 * [ENHANCEMENT] Tenant Federation: Add a # of query result limit logic when the `-tenant-federation.regex-matcher-enabled` is enabled. #6845 * [ENHANCEMENT] Query Frontend: Add a `cortex_slow_queries_total` metric to track # of slow queries per user. #6859 * [ENHANCEMENT] Query Frontend: Change to return 400 when the tenant resolving fail. #6715 diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index dd957d264ce..39fd7531d11 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -45,6 +45,7 @@ type BlocksCleanerConfig struct { ShardingStrategy string CompactionStrategy string BlockRanges []int64 + NoCompactMarkCheckAfter time.Duration } type BlocksCleaner struct { @@ -752,7 +753,12 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us } level.Info(userLogger).Log("msg", "finish writing new index", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) } - c.updateBucketMetrics(userID, parquetEnabled, idx, float64(len(partials)), float64(totalBlocksBlocksMarkedForNoCompaction)) + + noCompactMarkCheckFunc := func(blockID ulid.ULID) bool { + return cortex_parquet.ExistBlockNoCompact(ctx, userBucket, userLogger, blockID) + } + + c.updateBucketMetrics(userID, parquetEnabled, idx, float64(len(partials)), float64(totalBlocksBlocksMarkedForNoCompaction), noCompactMarkCheckFunc) if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle && c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning { begin = time.Now() @@ -762,7 +768,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us return nil } -func (c *BlocksCleaner) updateBucketMetrics(userID string, parquetEnabled bool, idx *bucketindex.Index, partials, totalBlocksBlocksMarkedForNoCompaction float64) { +func (c *BlocksCleaner) updateBucketMetrics(userID string, parquetEnabled bool, idx *bucketindex.Index, partials, totalBlocksBlocksMarkedForNoCompaction float64, noCompactMarkCheckFunc cortex_parquet.NoCompactMarkCheckFunc) { c.tenantBlocks.WithLabelValues(userID).Set(float64(len(idx.Blocks))) c.tenantBlocksMarkedForDelete.WithLabelValues(userID).Set(float64(len(idx.BlockDeletionMarks))) c.tenantBlocksMarkedForNoCompaction.WithLabelValues(userID).Set(totalBlocksBlocksMarkedForNoCompaction) @@ -772,7 +778,7 @@ func (c *BlocksCleaner) updateBucketMetrics(userID string, parquetEnabled bool, c.tenantParquetBlocks.WithLabelValues(userID).Set(float64(len(idx.ParquetBlocks()))) remainingBlocksToConvert := 0 for _, b := range idx.NonParquetBlocks() { - if cortex_parquet.ShouldConvertBlockToParquet(b.MinTime, b.MaxTime, c.cfg.BlockRanges) { + if cortex_parquet.ShouldConvertBlockToParquet(b.MinTime, b.MaxTime, c.cfg.NoCompactMarkCheckAfter.Milliseconds(), c.cfg.BlockRanges, b.ID, noCompactMarkCheckFunc) { remainingBlocksToConvert++ } } diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index 787d377d63b..4f8da94babe 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -353,7 +353,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions cortex_bucket_parquet_blocks_count{user="user-6"} 1 # HELP cortex_bucket_parquet_unconverted_blocks_count Total number of unconverted parquet blocks in the bucket. Blocks marked for deletion are included. # TYPE cortex_bucket_parquet_unconverted_blocks_count gauge - cortex_bucket_parquet_unconverted_blocks_count{user="user-5"} 0 + cortex_bucket_parquet_unconverted_blocks_count{user="user-5"} 1 cortex_bucket_parquet_unconverted_blocks_count{user="user-6"} 0 `), "cortex_bucket_blocks_count", @@ -1109,8 +1109,12 @@ func TestBlocksCleaner_ParquetMetrics(t *testing.T) { }, } + mockNoCompactMarkCheckFunc := func(blockID ulid.ULID) bool { + return false + } + // Update metrics - cleaner.updateBucketMetrics("user1", true, idx, 0, 0) + cleaner.updateBucketMetrics("user1", true, idx, 0, 0, mockNoCompactMarkCheckFunc) // Verify metrics require.NoError(t, prom_testutil.CollectAndCompare(cleaner.tenantParquetBlocks, strings.NewReader(` diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index e9ac396cf2a..38a0b48ac73 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -304,6 +304,9 @@ type Config struct { AcceptMalformedIndex bool `yaml:"accept_malformed_index"` CachingBucketEnabled bool `yaml:"caching_bucket_enabled"` CleanerCachingBucketEnabled bool `yaml:"cleaner_caching_bucket_enabled"` + + // Injected internally + NoCompactMarkCheckAfter time.Duration `yaml:"-"` } // RegisterFlags registers the Compactor flags. @@ -753,6 +756,7 @@ func (c *Compactor) starting(ctx context.Context) error { ShardingStrategy: c.compactorCfg.ShardingStrategy, CompactionStrategy: c.compactorCfg.CompactionStrategy, BlockRanges: c.compactorCfg.BlockRanges.ToMilliseconds(), + NoCompactMarkCheckAfter: c.compactorCfg.NoCompactMarkCheckAfter, }, cleanerBucketClient, cleanerUsersScanner, c.compactorCfg.CompactionVisitMarkerTimeout, c.limits, c.parentLogger, cleanerRingLifecyclerID, c.registerer, c.compactorCfg.CleanerVisitMarkerTimeout, c.compactorCfg.CleanerVisitMarkerFileUpdateInterval, c.compactorMetrics.syncerBlocksMarkedForDeletion, c.compactorMetrics.remainingPlannedCompactions) diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 5f4679b9d4d..a9724beab77 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -737,6 +737,7 @@ func (t *Cortex) initParquetConverter() (serv services.Service, err error) { func (t *Cortex) initCompactor() (serv services.Service, err error) { t.Cfg.Compactor.ShardingRing.ListenPort = t.Cfg.Server.GRPCListenPort ingestionReplicationFactor := t.Cfg.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor + t.Cfg.Compactor.NoCompactMarkCheckAfter = t.Cfg.ParquetConverter.NoCompactMarkCheckAfter t.Compactor, err = compactor.NewCompactor(t.Cfg.Compactor, t.Cfg.BlocksStorage, util_log.Logger, prometheus.DefaultRegisterer, t.Overrides, ingestionReplicationFactor) if err != nil { diff --git a/pkg/parquetconverter/converter.go b/pkg/parquetconverter/converter.go index 4eca20ac0a5..d662a04f81d 100644 --- a/pkg/parquetconverter/converter.go +++ b/pkg/parquetconverter/converter.go @@ -15,6 +15,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/oklog/ulid/v2" "github.com/parquet-go/parquet-go" "github.com/pkg/errors" "github.com/prometheus-community/parquet-common/convert" @@ -53,11 +54,14 @@ const ( var RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil) +type NoCompactMarkCheckFunc = func(bId ulid.ULID) bool + type Config struct { - MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` - ConversionInterval time.Duration `yaml:"conversion_interval"` - MaxRowsPerRowGroup int `yaml:"max_rows_per_row_group"` - FileBufferEnabled bool `yaml:"file_buffer_enabled"` + MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` + ConversionInterval time.Duration `yaml:"conversion_interval"` + MaxRowsPerRowGroup int `yaml:"max_rows_per_row_group"` + FileBufferEnabled bool `yaml:"file_buffer_enabled"` + NoCompactMarkCheckAfter time.Duration `yaml:"no_compact_mark_check_after"` DataDir string `yaml:"data_dir"` @@ -109,6 +113,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.MaxRowsPerRowGroup, "parquet-converter.max-rows-per-row-group", 1e6, "Max number of rows per parquet row group.") f.DurationVar(&cfg.ConversionInterval, "parquet-converter.conversion-interval", time.Minute, "The frequency at which the conversion job runs.") f.BoolVar(&cfg.FileBufferEnabled, "parquet-converter.file-buffer-enabled", true, "Whether to enable buffering the writes in disk to reduce memory utilization.") + f.DurationVar(&cfg.NoCompactMarkCheckAfter, "parquet-converter.no-compact-mark-check-after", time.Hour*13, "The time after which a `no-compact-mark.json` file should be checked.") } func NewConverter(cfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, blockRanges []int64, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) (*Converter, error) { @@ -392,7 +397,11 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin continue } - if !cortex_parquet.ShouldConvertBlockToParquet(b.MinTime, b.MaxTime, c.blockRanges) { + noCompactMarkCheckFunc := func(bId ulid.ULID) bool { + return cortex_parquet.ExistBlockNoCompact(ctx, uBucket, logger, b.ULID) + } + + if !cortex_parquet.ShouldConvertBlockToParquet(b.MinTime, b.MaxTime, c.cfg.NoCompactMarkCheckAfter.Milliseconds(), c.blockRanges, b.ULID, noCompactMarkCheckFunc) { continue } diff --git a/pkg/parquetconverter/converter_test.go b/pkg/parquetconverter/converter_test.go index fc8f6e99805..ecb455fd893 100644 --- a/pkg/parquetconverter/converter_test.go +++ b/pkg/parquetconverter/converter_test.go @@ -14,10 +14,12 @@ import ( "github.com/go-kit/log" "github.com/oklog/ulid/v2" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/providers/filesystem" @@ -232,6 +234,76 @@ func TestConverter_CleanupMetricsForNotOwnedUser(t *testing.T) { assert.Equal(t, 0.0, testutil.ToFloat64(converter.metrics.convertBlockFailures.WithLabelValues(userID))) } +func TestConverter_ShouldConvertNoCompactMarkBlocks(t *testing.T) { + cfg := prepareConfig() + userID := "user" + ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + dir := t.TempDir() + + cfg.Ring.InstanceID = "parquet-converter-1" + cfg.Ring.InstanceAddr = "1.2.3.4" + cfg.Ring.KVStore.Mock = ringStore + bucketClient, err := filesystem.NewBucket(t.TempDir()) + require.NoError(t, err) + userBucket := bucket.NewPrefixedBucketClient(bucketClient, userID) + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.ParquetConverterEnabled = true + + c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits) + c.ringLifecycler = &ring.Lifecycler{ + Addr: "1.2.3.4", + } + + ctx := context.Background() + + lbls := labels.Labels{labels.Label{ + Name: "__name__", + Value: "test", + }} + + blocks := []ulid.ULID{} + numBlocks := 2 + mint := time.Hour * 13 + maxt := time.Hour * 15 + // Create 2h blocks + for i := 0; i < numBlocks; i++ { + rnd := rand.New(rand.NewSource(time.Now().Unix())) + id, err := e2e.CreateBlock(ctx, rnd, dir, []labels.Labels{lbls}, 2, mint.Milliseconds(), maxt.Milliseconds(), time.Minute.Milliseconds(), 10) + require.NoError(t, err) + blocks = append(blocks, id) + } + + for _, bId := range blocks { + blockDir := fmt.Sprintf("%s/%s", dir, bId.String()) + b, err := tsdb.OpenBlock(nil, blockDir, nil, nil) + require.NoError(t, err) + // upload block + err = block.Upload(ctx, logger, userBucket, b.Dir(), metadata.NoneFunc) + require.NoError(t, err) + + // upload no-compact-mark.json + err = block.MarkForNoCompact(ctx, logger, userBucket, bId, metadata.ManualNoCompactReason, "", promauto.With(nil).NewCounter(prometheus.CounterOpts{})) + require.NoError(t, err) + } + + ringMock := &ring.RingMock{} + ringMock.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(ring.ReplicationSet{ + Instances: []ring.InstanceDesc{ + { + Addr: "1.2.3.4", + }, + }, + }, nil) + + err = c.convertUser(ctx, log.NewNopLogger(), ringMock, userID) + require.NoError(t, err) + + // Verify the converted metric was incremented + assert.Equal(t, float64(numBlocks), testutil.ToFloat64(c.metrics.convertedBlocks.WithLabelValues(userID))) +} + func TestConverter_BlockConversionFailure(t *testing.T) { // Create a new registry for testing reg := prometheus.NewRegistry() @@ -283,7 +355,16 @@ func TestConverter_BlockConversionFailure(t *testing.T) { Addr: "1.2.3.4", } - err = converter.convertUser(context.Background(), logger, &RingMock{ReadRing: &ring.Ring{}}, userID) + ringMock := &ring.RingMock{} + ringMock.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(ring.ReplicationSet{ + Instances: []ring.InstanceDesc{ + { + Addr: "1.2.3.4", + }, + }, + }, nil) + + err = converter.convertUser(context.Background(), logger, ringMock, userID) require.NoError(t, err) // Verify the failure metric was incremented @@ -352,7 +433,16 @@ func TestConverter_ShouldNotFailOnAccessDenyError(t *testing.T) { Addr: "1.2.3.4", } - err = converter.convertUser(context.Background(), logger, &RingMock{ReadRing: &ring.Ring{}}, userID) + ringMock := &ring.RingMock{} + ringMock.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(ring.ReplicationSet{ + Instances: []ring.InstanceDesc{ + { + Addr: "1.2.3.4", + }, + }, + }, nil) + + err = converter.convertUser(context.Background(), logger, ringMock, userID) require.Error(t, err) // Verify the failure metric was not incremented @@ -379,17 +469,3 @@ func (m *mockBucket) Get(ctx context.Context, name string) (io.ReadCloser, error } return m.Bucket.Get(ctx, name) } - -type RingMock struct { - ring.ReadRing -} - -func (r *RingMock) Get(key uint32, op ring.Operation, bufDescs []ring.InstanceDesc, bufHosts []string, bufZones map[string]int) (ring.ReplicationSet, error) { - return ring.ReplicationSet{ - Instances: []ring.InstanceDesc{ - { - Addr: "1.2.3.4", - }, - }, - }, nil -} diff --git a/pkg/storage/parquet/converter_marker.go b/pkg/storage/parquet/converter_marker.go index f53b4f53792..49762e9353e 100644 --- a/pkg/storage/parquet/converter_marker.go +++ b/pkg/storage/parquet/converter_marker.go @@ -9,8 +9,10 @@ import ( "github.com/efficientgo/core/errors" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/oklog/ulid/v2" "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/runutil" "github.com/cortexproject/cortex/pkg/storage/tsdb" @@ -26,6 +28,15 @@ type ConverterMark struct { Version int `json:"version"` } +func ExistBlockNoCompact(ctx context.Context, userBkt objstore.InstrumentedBucket, logger log.Logger, blockID ulid.ULID) bool { + noCompactMarkerExists, err := userBkt.Exists(ctx, path.Join(blockID.String(), metadata.NoCompactMarkFilename)) + if err != nil { + level.Warn(logger).Log("msg", "unable to get stats of no-compact-mark.json for block", "block", blockID.String()) + return false + } + return noCompactMarkerExists +} + func ReadConverterMark(ctx context.Context, id ulid.ULID, userBkt objstore.InstrumentedBucket, logger log.Logger) (*ConverterMark, error) { markerPath := path.Join(id.String(), ConverterMarkerFileName) reader, err := userBkt.WithExpectedErrs(tsdb.IsOneOfTheExpectedErrors(userBkt.IsAccessDeniedErr, userBkt.IsObjNotFoundErr)).Get(ctx, markerPath) diff --git a/pkg/storage/parquet/util.go b/pkg/storage/parquet/util.go index 1e08ab301fa..f664393643d 100644 --- a/pkg/storage/parquet/util.go +++ b/pkg/storage/parquet/util.go @@ -1,8 +1,28 @@ package parquet -func ShouldConvertBlockToParquet(mint, maxt int64, timeRanges []int64) bool { +import ( + "github.com/oklog/ulid/v2" +) + +type NoCompactMarkCheckFunc = func(bId ulid.ULID) bool + +func ShouldConvertBlockToParquet(mint, maxt, noCompactMarkCheckAfter int64, timeRanges []int64, bId ulid.ULID, checkFunc NoCompactMarkCheckFunc) bool { // We assume timeRanges[0] is the TSDB block duration (2h), and we don't convert them. - return getBlockTimeRange(mint, maxt, timeRanges) > timeRanges[0] + blockTimeRange := getBlockTimeRange(mint, maxt, timeRanges) + if blockTimeRange > timeRanges[0] { + return true + } + + // We should check if 2h blocks have a `no-compact-mark.json` file + // since these will never be compacted to 12h block. + // We check if the `no-compact-mark.json` file exists only for blocks + // after the noCompactMarkCheckAfter to reduce object storage calls. + if mint >= noCompactMarkCheckAfter { + if blockTimeRange == timeRanges[0] && checkFunc(bId) { + return true + } + } + return false } func getBlockTimeRange(mint, maxt int64, timeRanges []int64) int64 { diff --git a/pkg/storage/parquet/util_test.go b/pkg/storage/parquet/util_test.go index db0b3423366..52ac412c01a 100644 --- a/pkg/storage/parquet/util_test.go +++ b/pkg/storage/parquet/util_test.go @@ -4,6 +4,9 @@ import ( "testing" "time" + "crypto/rand" + + "github.com/oklog/ulid/v2" "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/pkg/storage/tsdb" @@ -11,10 +14,12 @@ import ( func TestShouldConvertBlockToParquet(t *testing.T) { for _, tc := range []struct { - name string - mint, maxt int64 - durations tsdb.DurationList - expected bool + name string + mint, maxt int64 + noCompactMarkCheckAfter int64 + durations tsdb.DurationList + expected bool + checkFunc NoCompactMarkCheckFunc }{ { name: "2h block. Don't convert", @@ -22,6 +27,9 @@ func TestShouldConvertBlockToParquet(t *testing.T) { maxt: 2 * time.Hour.Milliseconds(), durations: tsdb.DurationList{2 * time.Hour, 12 * time.Hour}, expected: false, + checkFunc: func(bId ulid.ULID) bool { + return false + }, }, { name: "1h block. Don't convert", @@ -29,6 +37,40 @@ func TestShouldConvertBlockToParquet(t *testing.T) { maxt: 1 * time.Hour.Milliseconds(), durations: tsdb.DurationList{2 * time.Hour, 12 * time.Hour}, expected: false, + checkFunc: func(bId ulid.ULID) bool { + return false + }, + }, + { + name: "2h block. Exist NoCompactMark. Convert", + mint: 0, + maxt: 2 * time.Hour.Milliseconds(), + durations: tsdb.DurationList{2 * time.Hour, 12 * time.Hour}, + expected: true, + checkFunc: func(bId ulid.ULID) bool { + return true + }, + }, + { + name: "2h block. Exist NoCompactMark. noCompactMarkCheckAfter is one hour. Not Convert", + mint: 0, + maxt: 2 * time.Hour.Milliseconds(), + noCompactMarkCheckAfter: 1 * time.Hour.Milliseconds(), + durations: tsdb.DurationList{2 * time.Hour, 12 * time.Hour}, + expected: false, + checkFunc: func(bId ulid.ULID) bool { + return true + }, + }, + { + name: "1h block. Exist NoCompactMark. Convert", + mint: 0, + maxt: 1 * time.Hour.Milliseconds(), + durations: tsdb.DurationList{2 * time.Hour, 12 * time.Hour}, + expected: true, + checkFunc: func(bId ulid.ULID) bool { + return true + }, }, { name: "3h block. Convert", @@ -36,6 +78,9 @@ func TestShouldConvertBlockToParquet(t *testing.T) { maxt: 3 * time.Hour.Milliseconds(), durations: tsdb.DurationList{2 * time.Hour, 12 * time.Hour}, expected: true, + checkFunc: func(bId ulid.ULID) bool { + return false + }, }, { name: "12h block. Convert", @@ -43,6 +88,9 @@ func TestShouldConvertBlockToParquet(t *testing.T) { maxt: 12 * time.Hour.Milliseconds(), durations: tsdb.DurationList{2 * time.Hour, 12 * time.Hour}, expected: true, + checkFunc: func(bId ulid.ULID) bool { + return false + }, }, { name: "12h block with 1h offset. Convert", @@ -50,6 +98,9 @@ func TestShouldConvertBlockToParquet(t *testing.T) { maxt: 13 * time.Hour.Milliseconds(), durations: tsdb.DurationList{2 * time.Hour, 12 * time.Hour}, expected: true, + checkFunc: func(bId ulid.ULID) bool { + return false + }, }, { name: "24h block. Convert", @@ -57,10 +108,15 @@ func TestShouldConvertBlockToParquet(t *testing.T) { maxt: 24 * time.Hour.Milliseconds(), durations: tsdb.DurationList{2 * time.Hour, 12 * time.Hour}, expected: true, + checkFunc: func(bId ulid.ULID) bool { + return false + }, }, } { t.Run(tc.name, func(t *testing.T) { - res := ShouldConvertBlockToParquet(tc.mint, tc.maxt, (&tc.durations).ToMilliseconds()) + id, err := ulid.New(ulid.Now(), rand.Reader) + require.NoError(t, err) + res := ShouldConvertBlockToParquet(tc.mint, tc.maxt, tc.noCompactMarkCheckAfter, (&tc.durations).ToMilliseconds(), id, tc.checkFunc) require.Equal(t, tc.expected, res) }) }