Skip to content

Commit 7435394

Browse files
authored
add parquet labels cache (#6835)
* add parquet labels cache Signed-off-by: yeya24 <[email protected]> * changelog Signed-off-by: yeya24 <[email protected]> --------- Signed-off-by: yeya24 <[email protected]>
1 parent 03c5e49 commit 7435394

File tree

7 files changed

+91
-21
lines changed

7 files changed

+91
-21
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
* [ENHANCEMENT] Metadata Cache: Support inmemory and multi level cache backend. #6829
5353
* [ENHANCEMENT] Store Gateway: Allow to ignore syncing blocks older than certain time using `ignore_blocks_before`. #6830
5454
* [ENHANCEMENT] Distributor: Add native histograms max sample size bytes limit validation. #6834
55+
* [ENHANCEMENT] Querier: Support caching parquet labels file in parquet queryable. #6835
5556
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
5657
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
5758
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576

integration/parquet_querier_test.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/thanos-io/thanos/pkg/block/metadata"
2121

2222
"github.com/cortexproject/cortex/integration/e2e"
23+
e2ecache "github.com/cortexproject/cortex/integration/e2e/cache"
2324
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
2425
"github.com/cortexproject/cortex/integration/e2ecortex"
2526
"github.com/cortexproject/cortex/pkg/storage/bucket"
@@ -35,7 +36,8 @@ func TestParquetFuzz(t *testing.T) {
3536
defer s.Close()
3637

3738
consul := e2edb.NewConsulWithName("consul")
38-
require.NoError(t, s.StartAndWaitReady(consul))
39+
memcached := e2ecache.NewMemcached()
40+
require.NoError(t, s.StartAndWaitReady(consul, memcached))
3941

4042
baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())
4143
flags := mergeFlags(
@@ -72,6 +74,11 @@ func TestParquetFuzz(t *testing.T) {
7274
"-parquet-converter.enabled": "true",
7375
// Querier
7476
"-querier.enable-parquet-queryable": "true",
77+
// Enable cache for parquet labels and chunks
78+
"-blocks-storage.bucket-store.parquet-labels-cache.backend": "inmemory,memcached",
79+
"-blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
80+
"-blocks-storage.bucket-store.chunks-cache.backend": "inmemory,memcached",
81+
"-blocks-storage.bucket-store.chunks-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
7582
},
7683
)
7784

pkg/querier/bucket.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func createCachingBucketClient(ctx context.Context, storageCfg cortex_tsdb.Block
2222

2323
// Blocks finder doesn't use chunks, but we pass config for consistency.
2424
matchers := cortex_tsdb.NewMatchers()
25-
cachingBucket, err := cortex_tsdb.CreateCachingBucket(storageCfg.BucketStore.ChunksCache, storageCfg.BucketStore.MetadataCache, matchers, bucketClient, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": name}, reg))
25+
cachingBucket, err := cortex_tsdb.CreateCachingBucket(storageCfg.BucketStore.ChunksCache, storageCfg.BucketStore.MetadataCache, storageCfg.BucketStore.ParquetLabelsCache, matchers, bucketClient, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": name}, reg))
2626
if err != nil {
2727
return nil, errors.Wrap(err, "create caching bucket")
2828
}

pkg/storage/tsdb/caching_bucket.go

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,39 @@ func (cfg *MetadataCacheConfig) Validate() error {
188188
return cfg.BucketCacheBackend.Validate()
189189
}
190190

191-
func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig MetadataCacheConfig, matchers Matchers, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) {
191+
type ParquetLabelsCacheConfig struct {
192+
BucketCacheBackend `yaml:",inline"`
193+
194+
SubrangeSize int64 `yaml:"subrange_size"`
195+
MaxGetRangeRequests int `yaml:"max_get_range_requests"`
196+
AttributesTTL time.Duration `yaml:"attributes_ttl"`
197+
SubrangeTTL time.Duration `yaml:"subrange_ttl"`
198+
}
199+
200+
func (cfg *ParquetLabelsCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
201+
f.StringVar(&cfg.Backend, prefix+"backend", "", fmt.Sprintf("The parquet labels cache backend type. Single or Multiple cache backend can be provided. "+
202+
"Supported values in single cache: %s, %s, %s, and '' (disable). "+
203+
"Supported values in multi level cache: a comma-separated list of (%s)", CacheBackendMemcached, CacheBackendRedis, CacheBackendInMemory, strings.Join(supportedBucketCacheBackends, ", ")))
204+
205+
cfg.Memcached.RegisterFlagsWithPrefix(f, prefix+"memcached.")
206+
cfg.Redis.RegisterFlagsWithPrefix(f, prefix+"redis.")
207+
cfg.InMemory.RegisterFlagsWithPrefix(f, prefix+"inmemory.", "parquet-labels")
208+
cfg.MultiLevel.RegisterFlagsWithPrefix(f, prefix+"multilevel.")
209+
210+
f.Int64Var(&cfg.SubrangeSize, prefix+"subrange-size", 16000, "Size of each subrange that bucket object is split into for better caching.")
211+
f.IntVar(&cfg.MaxGetRangeRequests, prefix+"max-get-range-requests", 3, "Maximum number of sub-GetRange requests that a single GetRange request can be split into when fetching parquet labels file. Zero or negative value = unlimited number of sub-requests.")
212+
f.DurationVar(&cfg.AttributesTTL, prefix+"attributes-ttl", 168*time.Hour, "TTL for caching object attributes for parquet labels file.")
213+
f.DurationVar(&cfg.SubrangeTTL, prefix+"subrange-ttl", 24*time.Hour, "TTL for caching individual subranges.")
214+
215+
// In the multi level parquet labels cache, backfill TTL follows subrange TTL
216+
cfg.MultiLevel.BackFillTTL = cfg.SubrangeTTL
217+
}
218+
219+
func (cfg *ParquetLabelsCacheConfig) Validate() error {
220+
return cfg.BucketCacheBackend.Validate()
221+
}
222+
223+
func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig MetadataCacheConfig, parquetLabelsConfig ParquetLabelsCacheConfig, matchers Matchers, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) {
192224
cfg := cache.NewCachingBucketConfig()
193225
cachingConfigured := false
194226

@@ -223,6 +255,16 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata
223255
cfg.CacheIter("chunks-iter", metadataCache, matchers.GetChunksIterMatcher(), metadataConfig.ChunksListTTL, codec, "")
224256
}
225257

258+
parquetLabelsCache, err := createBucketCache("parquet-labels-cache", &parquetLabelsConfig.BucketCacheBackend, logger, reg)
259+
if err != nil {
260+
return nil, errors.Wrapf(err, "parquet-labels-cache")
261+
}
262+
if parquetLabelsCache != nil {
263+
cachingConfigured = true
264+
parquetLabelsCache = cache.NewTracingCache(parquetLabelsCache)
265+
cfg.CacheGetRange("parquet-labels", parquetLabelsCache, matchers.GetParquetLabelsMatcher(), parquetLabelsConfig.SubrangeSize, parquetLabelsConfig.AttributesTTL, parquetLabelsConfig.SubrangeTTL, parquetLabelsConfig.MaxGetRangeRequests)
266+
}
267+
226268
if !cachingConfigured {
227269
// No caching is configured.
228270
return bkt, nil
@@ -323,6 +365,7 @@ func NewMatchers() Matchers {
323365
matcherMap := make(map[string]func(string) bool)
324366
matcherMap["chunks"] = isTSDBChunkFile
325367
matcherMap["parquet-chunks"] = isParquetChunkFile
368+
matcherMap["parquet-labels"] = isParquetLabelsFile
326369
matcherMap["metafile"] = isMetaFile
327370
matcherMap["block-index"] = isBlockIndexFile
328371
matcherMap["bucket-index"] = isBucketIndexFiles
@@ -347,6 +390,10 @@ func (m *Matchers) SetParquetChunksMatcher(f func(string) bool) {
347390
m.matcherMap["parquet-chunks"] = f
348391
}
349392

393+
func (m *Matchers) SetParquetLabelsMatcher(f func(string) bool) {
394+
m.matcherMap["parquet-labels"] = f
395+
}
396+
350397
func (m *Matchers) SetBlockIndexMatcher(f func(string) bool) {
351398
m.matcherMap["block-index"] = f
352399
}
@@ -379,6 +426,10 @@ func (m *Matchers) GetParquetChunksMatcher() func(string) bool {
379426
return m.matcherMap["parquet-chunks"]
380427
}
381428

429+
func (m *Matchers) GetParquetLabelsMatcher() func(string) bool {
430+
return m.matcherMap["parquet-labels"]
431+
}
432+
382433
func (m *Matchers) GetMetafileMatcher() func(string) bool {
383434
return m.matcherMap["metafile"]
384435
}
@@ -413,6 +464,8 @@ func isTSDBChunkFile(name string) bool { return chunksMatcher.MatchString(name)
413464

414465
func isParquetChunkFile(name string) bool { return strings.HasSuffix(name, "chunks.parquet") }
415466

467+
func isParquetLabelsFile(name string) bool { return strings.HasSuffix(name, "labels.parquet") }
468+
416469
func isMetaFile(name string) bool {
417470
return strings.HasSuffix(name, "/"+metadata.MetaFilename) || strings.HasSuffix(name, "/"+metadata.DeletionMarkFilename) || strings.HasSuffix(name, "/"+TenantDeletionMarkFile)
418471
}

pkg/storage/tsdb/config.go

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -274,23 +274,24 @@ func (cfg *TSDBConfig) IsBlocksShippingEnabled() bool {
274274

275275
// BucketStoreConfig holds the config information for Bucket Stores used by the querier and store-gateway.
276276
type BucketStoreConfig struct {
277-
SyncDir string `yaml:"sync_dir"`
278-
SyncInterval time.Duration `yaml:"sync_interval"`
279-
MaxConcurrent int `yaml:"max_concurrent"`
280-
MaxInflightRequests int `yaml:"max_inflight_requests"`
281-
TenantSyncConcurrency int `yaml:"tenant_sync_concurrency"`
282-
BlockSyncConcurrency int `yaml:"block_sync_concurrency"`
283-
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
284-
ConsistencyDelay time.Duration `yaml:"consistency_delay"`
285-
IndexCache IndexCacheConfig `yaml:"index_cache"`
286-
ChunksCache ChunksCacheConfig `yaml:"chunks_cache"`
287-
MetadataCache MetadataCacheConfig `yaml:"metadata_cache"`
288-
MatchersCacheMaxItems int `yaml:"matchers_cache_max_items"`
289-
IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay"`
290-
IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within"`
291-
IgnoreBlocksBefore time.Duration `yaml:"ignore_blocks_before"`
292-
BucketIndex BucketIndexConfig `yaml:"bucket_index"`
293-
BlockDiscoveryStrategy string `yaml:"block_discovery_strategy"`
277+
SyncDir string `yaml:"sync_dir"`
278+
SyncInterval time.Duration `yaml:"sync_interval"`
279+
MaxConcurrent int `yaml:"max_concurrent"`
280+
MaxInflightRequests int `yaml:"max_inflight_requests"`
281+
TenantSyncConcurrency int `yaml:"tenant_sync_concurrency"`
282+
BlockSyncConcurrency int `yaml:"block_sync_concurrency"`
283+
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
284+
ConsistencyDelay time.Duration `yaml:"consistency_delay"`
285+
IndexCache IndexCacheConfig `yaml:"index_cache"`
286+
ChunksCache ChunksCacheConfig `yaml:"chunks_cache"`
287+
MetadataCache MetadataCacheConfig `yaml:"metadata_cache"`
288+
ParquetLabelsCache ParquetLabelsCacheConfig `yaml:"parquet_labels_cache" doc:"hidden"`
289+
MatchersCacheMaxItems int `yaml:"matchers_cache_max_items"`
290+
IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay"`
291+
IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within"`
292+
IgnoreBlocksBefore time.Duration `yaml:"ignore_blocks_before"`
293+
BucketIndex BucketIndexConfig `yaml:"bucket_index"`
294+
BlockDiscoveryStrategy string `yaml:"block_discovery_strategy"`
294295

295296
// Chunk pool.
296297
MaxChunkPoolBytes uint64 `yaml:"max_chunk_pool_bytes"`
@@ -348,6 +349,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
348349
cfg.IndexCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.index-cache.")
349350
cfg.ChunksCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.chunks-cache.")
350351
cfg.MetadataCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.metadata-cache.")
352+
cfg.ParquetLabelsCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.parquet-labels-cache.")
351353
cfg.BucketIndex.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.bucket-index.")
352354

353355
f.StringVar(&cfg.SyncDir, "blocks-storage.bucket-store.sync-dir", "tsdb-sync", "Directory to store synchronized TSDB index headers.")
@@ -403,6 +405,10 @@ func (cfg *BucketStoreConfig) Validate() error {
403405
if err != nil {
404406
return errors.Wrap(err, "metadata-cache configuration")
405407
}
408+
err = cfg.ParquetLabelsCache.Validate()
409+
if err != nil {
410+
return errors.Wrap(err, "parquet-labels-cache configuration")
411+
}
406412
if !util.StringsContain(supportedBlockDiscoveryStrategies, cfg.BlockDiscoveryStrategy) {
407413
return ErrInvalidBucketIndexBlockDiscoveryStrategy
408414
}

pkg/storage/tsdb/multilevel_bucket_cache.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ func newMultiLevelBucketCache(name string, cfg MultiLevelBucketCacheConfig, reg
6767
case "metadata-cache":
6868
itemName = "metadata_cache"
6969
metricHelpText = "metadata cache"
70+
case "parquet-labels-cache":
71+
itemName = "parquet_labels_cache"
72+
metricHelpText = "parquet labels cache"
7073
default:
7174
itemName = name
7275
}

pkg/storegateway/bucket_stores.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ var ErrTooManyInflightRequests = status.Error(codes.ResourceExhausted, "too many
101101
// NewBucketStores makes a new BucketStores.
102102
func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) {
103103
matchers := tsdb.NewMatchers()
104-
cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, matchers, bucketClient, logger, reg)
104+
cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, tsdb.ParquetLabelsCacheConfig{}, matchers, bucketClient, logger, reg)
105105
if err != nil {
106106
return nil, errors.Wrapf(err, "create caching bucket")
107107
}

0 commit comments

Comments
 (0)