Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions enterprise/server/backends/migration_cache/migration_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1025,14 +1025,13 @@ func (mc *MigrationCache) copy(c *copyData) {
log.CtxDebugf(ctx, "Migration already copied on dequeue, returning early: digest %v", c.d)
return
}
if c.d.GetDigest().GetSizeBytes() > 100 && c.conf.src.SupportsCompressor(repb.Compressor_ZSTD) && c.conf.dest.SupportsCompressor(repb.Compressor_ZSTD) {
if c.d.GetDigest().GetSizeBytes() >= *compression.MinBytesAutoZstdCompression && c.conf.src.SupportsCompressor(repb.Compressor_ZSTD) && c.conf.dest.SupportsCompressor(repb.Compressor_ZSTD) {
// Use compression if both caches support it. This will usually mean
// that at src cache doesn't need to decompress and the dest cache
// doesn't need to compress, which saves CPU and speeds up the copy.
// Ideally, we would only do this when the destination cache would
// automatically compress, but since there's no way to check that, rely
// on the fact that `cache.pebble.min_bytes_auto_zstd_compression` is
// 100.
// automatically compress, but since there's no way to check that, check
// the same flag that the destination cache would use.
c.d = c.d.CloneVT()
c.d.Compressor = repb.Compressor_ZSTD
}
Expand Down
5 changes: 1 addition & 4 deletions enterprise/server/backends/pebble_cache/pebble_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,6 @@ var (
activeKeyVersion = flag.Int64("cache.pebble.active_key_version", int64(filestore.UnspecifiedKeyVersion), "The key version new data will be written with. If negative, will write to the highest existing version in the database, or the highest known version if a new database is created.")
migrationQPSLimit = flag.Int("cache.pebble.migration_qps_limit", 50, "QPS limit for data version migration")

// Compression related flags
minBytesAutoZstdCompression = flag.Int64("cache.pebble.min_bytes_auto_zstd_compression", 100, "Blobs larger than this will be zstd compressed before written to disk.")

// Chunking related flags
averageChunkSizeBytes = flag.Int("cache.pebble.average_chunk_size_bytes", 0, "Average size of chunks that's stored in the cache. Disabled if 0.")

Expand Down Expand Up @@ -342,7 +339,7 @@ func Register(env *real_environment.RealEnv) error {
BlockCacheSizeBytes: *blockCacheSizeBytesFlag,
MaxSizeBytes: cache_config.MaxSizeBytes(),
MaxInlineFileSizeBytes: *maxInlineFileSizeBytesFlag,
MinBytesAutoZstdCompression: *minBytesAutoZstdCompression,
MinBytesAutoZstdCompression: *compression.MinBytesAutoZstdCompression,
AtimeUpdateThreshold: atimeUpdateThresholdFlag,
AtimeBufferSize: atimeBufferSizeFlag,
NumAtimeUpdateWorkers: numAtimeUpdateWorkers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (s *ByteStreamServer) beginWrite(ctx context.Context, req *bspb.WriteReques
casRN.SetCompressor(r.GetCompressor())
}
compressData := false
if casRN.GetCompressor() == repb.Compressor_IDENTITY && s.cache.SupportsCompressor(repb.Compressor_ZSTD) && r.GetDigest().GetSizeBytes() >= 100 {
if casRN.GetCompressor() == repb.Compressor_IDENTITY && s.cache.SupportsCompressor(repb.Compressor_ZSTD) && r.GetDigest().GetSizeBytes() >= *compression.MinBytesAutoZstdCompression {
casRN.SetCompressor(repb.Compressor_ZSTD)
compressData = true
}
Expand Down
5 changes: 5 additions & 0 deletions server/util/compression/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package compression

import (
"errors"
"flag"
"io"
"runtime"
"sync"
Expand Down Expand Up @@ -31,6 +32,10 @@ var (
zstdDecompressedBytesMetric = metrics.BytesDecompressed.With(prometheus.Labels{metrics.CompressionType: "zstd"})
)

var (
MinBytesAutoZstdCompression = flag.Int64("compression.min_bytes_auto_zstd_compression", 100, "Blobs larger than this will be zstd compressed before written to disk.")
)

func mustGetZstdEncoder() *zstd.Encoder {
enc, err := zstd.NewWriter(nil)
if err != nil {
Expand Down
Loading