|  | 
|  | 1 | +package cache | 
|  | 2 | + | 
|  | 3 | +import ( | 
|  | 4 | +	"context" | 
|  | 5 | +	"flag" | 
|  | 6 | +	"time" | 
|  | 7 | + | 
|  | 8 | +	"github.com/cortexproject/cortex/pkg/util" | 
|  | 9 | +	"github.com/go-kit/kit/log/level" | 
|  | 10 | +	"github.com/gomodule/redigo/redis" | 
|  | 11 | +) | 
|  | 12 | + | 
|  | 13 | +// RedisCache type caches chunks in redis | 
|  | 14 | +type RedisCache struct { | 
|  | 15 | +	name       string | 
|  | 16 | +	expiration int | 
|  | 17 | +	timeout    time.Duration | 
|  | 18 | +	pool       *redis.Pool | 
|  | 19 | +} | 
|  | 20 | + | 
|  | 21 | +// RedisConfig defines how a RedisCache should be constructed. | 
|  | 22 | +type RedisConfig struct { | 
|  | 23 | +	Endpoint       string        `yaml:"endpoint,omitempty"` | 
|  | 24 | +	Timeout        time.Duration `yaml:"timeout,omitempty"` | 
|  | 25 | +	Expiration     time.Duration `yaml:"expiration,omitempty"` | 
|  | 26 | +	MaxIdleConns   int           `yaml:"max_idle_conns,omitempty"` | 
|  | 27 | +	MaxActiveConns int           `yaml:"max_active_conns,omitempty"` | 
|  | 28 | +} | 
|  | 29 | + | 
|  | 30 | +// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet | 
|  | 31 | +func (cfg *RedisConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { | 
|  | 32 | +	f.StringVar(&cfg.Endpoint, prefix+"redis.endpoint", "", description+"Redis service endpoint to use when caching chunks. If empty, no redis will be used.") | 
|  | 33 | +	f.DurationVar(&cfg.Timeout, prefix+"redis.timeout", 100*time.Millisecond, description+"Maximum time to wait before giving up on redis requests.") | 
|  | 34 | +	f.DurationVar(&cfg.Expiration, prefix+"redis.expiration", 0, description+"How long keys stay in the redis.") | 
|  | 35 | +	f.IntVar(&cfg.MaxIdleConns, prefix+"redis.max-idle-conns", 80, description+"Maximum number of idle connections in pool.") | 
|  | 36 | +	f.IntVar(&cfg.MaxActiveConns, prefix+"redis.max-active-conns", 0, description+"Maximum number of active connections in pool.") | 
|  | 37 | +} | 
|  | 38 | + | 
|  | 39 | +// NewRedisCache creates a new RedisCache | 
|  | 40 | +func NewRedisCache(cfg RedisConfig, name string, pool *redis.Pool) *RedisCache { | 
|  | 41 | +	// pool != nil only in unit tests | 
|  | 42 | +	if pool == nil { | 
|  | 43 | +		pool = &redis.Pool{ | 
|  | 44 | +			MaxIdle:   cfg.MaxIdleConns, | 
|  | 45 | +			MaxActive: cfg.MaxActiveConns, | 
|  | 46 | +			Dial: func() (redis.Conn, error) { | 
|  | 47 | +				c, err := redis.Dial("tcp", cfg.Endpoint) | 
|  | 48 | +				if err != nil { | 
|  | 49 | +					return nil, err | 
|  | 50 | +				} | 
|  | 51 | +				return c, err | 
|  | 52 | +			}, | 
|  | 53 | +		} | 
|  | 54 | +	} | 
|  | 55 | + | 
|  | 56 | +	cache := &RedisCache{ | 
|  | 57 | +		expiration: int(cfg.Expiration.Seconds()), | 
|  | 58 | +		timeout:    cfg.Timeout, | 
|  | 59 | +		name:       name, | 
|  | 60 | +		pool:       pool, | 
|  | 61 | +	} | 
|  | 62 | + | 
|  | 63 | +	if err := cache.ping(context.Background()); err != nil { | 
|  | 64 | +		level.Error(util.Logger).Log("msg", "error connecting to redis", "endpoint", cfg.Endpoint, "err", err) | 
|  | 65 | +	} | 
|  | 66 | + | 
|  | 67 | +	return cache | 
|  | 68 | +} | 
|  | 69 | + | 
|  | 70 | +// Fetch gets keys from the cache. The keys that are found must be in the order of the keys requested. | 
|  | 71 | +func (c *RedisCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string) { | 
|  | 72 | +	data, err := c.mget(ctx, keys) | 
|  | 73 | + | 
|  | 74 | +	if err != nil { | 
|  | 75 | +		level.Error(util.Logger).Log("msg", "failed to get from redis", "name", c.name, "err", err) | 
|  | 76 | +		missed = make([]string, len(keys)) | 
|  | 77 | +		copy(missed, keys) | 
|  | 78 | +		return | 
|  | 79 | +	} | 
|  | 80 | +	for i, key := range keys { | 
|  | 81 | +		if data[i] != nil { | 
|  | 82 | +			found = append(found, key) | 
|  | 83 | +			bufs = append(bufs, data[i]) | 
|  | 84 | +		} else { | 
|  | 85 | +			missed = append(missed, key) | 
|  | 86 | +		} | 
|  | 87 | +	} | 
|  | 88 | +	return | 
|  | 89 | +} | 
|  | 90 | + | 
|  | 91 | +// Store stores the key in the cache. | 
|  | 92 | +func (c *RedisCache) Store(ctx context.Context, keys []string, bufs [][]byte) { | 
|  | 93 | +	err := c.mset(ctx, keys, bufs, c.expiration) | 
|  | 94 | +	if err != nil { | 
|  | 95 | +		level.Error(util.Logger).Log("msg", "failed to put to redis", "name", c.name, "err", err) | 
|  | 96 | +	} | 
|  | 97 | +} | 
|  | 98 | + | 
|  | 99 | +// Stop stops the redis client. | 
|  | 100 | +func (c *RedisCache) Stop() error { | 
|  | 101 | +	return c.pool.Close() | 
|  | 102 | +} | 
|  | 103 | + | 
|  | 104 | +// mset adds key-value pairs to the cache. | 
|  | 105 | +func (c *RedisCache) mset(ctx context.Context, keys []string, bufs [][]byte, ttl int) error { | 
|  | 106 | +	conn := c.pool.Get() | 
|  | 107 | +	defer conn.Close() | 
|  | 108 | + | 
|  | 109 | +	if err := conn.Send("MULTI"); err != nil { | 
|  | 110 | +		return err | 
|  | 111 | +	} | 
|  | 112 | +	for i := range keys { | 
|  | 113 | +		if err := conn.Send("SETEX", keys[i], ttl, bufs[i]); err != nil { | 
|  | 114 | +			return err | 
|  | 115 | +		} | 
|  | 116 | +	} | 
|  | 117 | +	_, err := redis.DoWithTimeout(conn, c.timeout, "EXEC") | 
|  | 118 | +	return err | 
|  | 119 | +} | 
|  | 120 | + | 
|  | 121 | +// mget retrieves values from the cache. | 
|  | 122 | +func (c *RedisCache) mget(ctx context.Context, keys []string) ([][]byte, error) { | 
|  | 123 | +	intf := make([]interface{}, len(keys)) | 
|  | 124 | +	for i, key := range keys { | 
|  | 125 | +		intf[i] = key | 
|  | 126 | +	} | 
|  | 127 | + | 
|  | 128 | +	conn := c.pool.Get() | 
|  | 129 | +	defer conn.Close() | 
|  | 130 | + | 
|  | 131 | +	return redis.ByteSlices(redis.DoWithTimeout(conn, c.timeout, "MGET", intf...)) | 
|  | 132 | +} | 
|  | 133 | + | 
|  | 134 | +func (c *RedisCache) ping(ctx context.Context) error { | 
|  | 135 | +	conn := c.pool.Get() | 
|  | 136 | +	defer conn.Close() | 
|  | 137 | + | 
|  | 138 | +	pong, err := redis.DoWithTimeout(conn, c.timeout, "PING") | 
|  | 139 | +	if err == nil { | 
|  | 140 | +		_, err = redis.String(pong, err) | 
|  | 141 | +	} | 
|  | 142 | +	return err | 
|  | 143 | +} | 
|  | 144 | + | 
|  | 145 | +func redisStatusCode(err error) string { | 
|  | 146 | +	switch err { | 
|  | 147 | +	case nil: | 
|  | 148 | +		return "200" | 
|  | 149 | +	case redis.ErrNil: | 
|  | 150 | +		return "404" | 
|  | 151 | +	default: | 
|  | 152 | +		return "500" | 
|  | 153 | +	} | 
|  | 154 | +} | 
0 commit comments