From 9a92c1f07f07fd348677ba4f7b42f4ffd82900d2 Mon Sep 17 00:00:00 2001 From: Teng Yanxi Date: Sun, 22 Mar 2026 04:54:41 +0800 Subject: [PATCH] fix: use configurable timezone for daily stats keys (#671) ProcessedKey() and FailedKey() in base.go previously hardcoded .UTC() conversion, causing tasks processed before 8 AM in UTC+8 to be counted towards the previous day's stats. This change: - Removes .UTC() from ProcessedKey/FailedKey (caller decides timezone) - Adds location field to RDB struct (defaults to time.UTC) - Adds SetLocation() method for timezone configuration - Updates all stats-recording methods to use configured location - Adds timezone-specific test cases for Asia/Shanghai --- internal/base/base.go | 6 ++++-- internal/base/base_test.go | 16 ++++++++++++++++ internal/rdb/inspect.go | 4 ++-- internal/rdb/rdb.go | 31 +++++++++++++++++++++++-------- 4 files changed, 45 insertions(+), 12 deletions(-) diff --git a/internal/base/base.go b/internal/base/base.go index 390e24db5..a7ac11dfc 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -167,13 +167,15 @@ func FailedTotalKey(qname string) string { } // ProcessedKey returns a redis key for processed count for the given day for the queue. +// The caller is responsible for converting t to the desired timezone before calling this function. func ProcessedKey(qname string, t time.Time) string { - return QueueKeyPrefix(qname) + "processed:" + t.UTC().Format("2006-01-02") + return QueueKeyPrefix(qname) + "processed:" + t.Format("2006-01-02") } // FailedKey returns a redis key for failure count for the given day for the queue. +// The caller is responsible for converting t to the desired timezone before calling this function. func FailedKey(qname string, t time.Time) string { - return QueueKeyPrefix(qname) + "failed:" + t.UTC().Format("2006-01-02") + return QueueKeyPrefix(qname) + "failed:" + t.Format("2006-01-02") } // ServerInfoKey returns a redis key for process info. diff --git a/internal/base/base_test.go b/internal/base/base_test.go index 3ac23802a..c22796b09 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -209,6 +209,10 @@ func TestFailedTotalKey(t *testing.T) { } func TestProcessedKey(t *testing.T) { + shanghaiLoc, err := time.LoadLocation("Asia/Shanghai") + if err != nil { + t.Skip("could not load Asia/Shanghai timezone, skipping") + } tests := []struct { qname string input time.Time @@ -217,6 +221,11 @@ func TestProcessedKey(t *testing.T) { {"default", time.Date(2019, 11, 14, 10, 30, 1, 1, time.UTC), "asynq:{default}:processed:2019-11-14"}, {"critical", time.Date(2020, 12, 1, 1, 0, 1, 1, time.UTC), "asynq:{critical}:processed:2020-12-01"}, {"default", time.Date(2020, 1, 6, 15, 02, 1, 1, time.UTC), "asynq:{default}:processed:2020-01-06"}, + // Timezone-aware test: 2024-03-22 07:30 CST = 2024-03-21 23:30 UTC + // With the fix, the key should use the local date (03-22), not UTC date (03-21). + {"default", time.Date(2024, 3, 22, 7, 30, 0, 0, shanghaiLoc), "asynq:{default}:processed:2024-03-22"}, + // Edge case: exactly midnight CST = 16:00 previous day UTC + {"default", time.Date(2024, 3, 22, 0, 0, 0, 0, shanghaiLoc), "asynq:{default}:processed:2024-03-22"}, } for _, tc := range tests { @@ -228,6 +237,10 @@ func TestProcessedKey(t *testing.T) { } func TestFailedKey(t *testing.T) { + shanghaiLoc, err := time.LoadLocation("Asia/Shanghai") + if err != nil { + t.Skip("could not load Asia/Shanghai timezone, skipping") + } tests := []struct { qname string input time.Time @@ -236,6 +249,9 @@ func TestFailedKey(t *testing.T) { {"default", time.Date(2019, 11, 14, 10, 30, 1, 1, time.UTC), "asynq:{default}:failed:2019-11-14"}, {"custom", time.Date(2020, 12, 1, 1, 0, 1, 1, time.UTC), "asynq:{custom}:failed:2020-12-01"}, {"low", time.Date(2020, 1, 6, 15, 02, 1, 1, time.UTC), "asynq:{low}:failed:2020-01-06"}, + // Timezone-aware test: 2024-03-22 07:30 CST = 2024-03-21 23:30 UTC + // With the fix, the key should use the local date (03-22), not UTC date (03-21). + {"default", time.Date(2024, 3, 22, 7, 30, 0, 0, shanghaiLoc), "asynq:{default}:failed:2024-03-22"}, } for _, tc := range tests { diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 4bd797555..5cf37e9c6 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -146,7 +146,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) { if !exists { return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) } - now := r.clock.Now() + now := r.clock.Now().In(r.location) keys := []string{ base.PendingKey(qname), base.ActiveKey(qname), @@ -373,7 +373,7 @@ func (r *RDB) HistoricalStats(qname string, n int) ([]*DailyStats, error) { return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) } const day = 24 * time.Hour - now := r.clock.Now().UTC() + now := r.clock.Now().In(r.location) var days []time.Time var keys []string for i := 0; i < n; i++ { diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 22df5060e..11ab3ea7e 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -29,14 +29,16 @@ const LeaseDuration = 30 * time.Second type RDB struct { client redis.UniversalClient clock timeutil.Clock + location *time.Location queuesPublished sync.Map } // NewRDB returns a new instance of RDB. func NewRDB(client redis.UniversalClient) *RDB { return &RDB{ - client: client, - clock: timeutil.NewRealClock(), + client: client, + clock: timeutil.NewRealClock(), + location: time.UTC, } } @@ -57,6 +59,19 @@ func (r *RDB) SetClock(c timeutil.Clock) { r.clock = c } +// SetLocation sets the time zone location used by RDB to determine +// the date boundary for daily processed/failed stats keys. +// +// By default, RDB uses time.UTC. Setting this to a different location +// (e.g., time.Local, or a location loaded via time.LoadLocation("Asia/Shanghai")) +// will cause daily stats to be aggregated according to that timezone's date boundary. +func (r *RDB) SetLocation(loc *time.Location) { + if loc == nil { + loc = time.UTC + } + r.location = loc +} + // Ping checks the connection with redis server. func (r *RDB) Ping() error { return r.client.Ping(context.Background()).Err() @@ -351,7 +366,7 @@ func (r *RDB) Done(ctx context.Context, msg *base.TaskMessage) error { base.ActiveKey(msg.Queue), base.LeaseKey(msg.Queue), base.TaskKey(msg.Queue, msg.ID), - base.ProcessedKey(msg.Queue, now), + base.ProcessedKey(msg.Queue, now.In(r.location)), base.ProcessedTotalKey(msg.Queue), } argv := []interface{}{ @@ -459,7 +474,7 @@ func (r *RDB) MarkAsComplete(ctx context.Context, msg *base.TaskMessage) error { base.LeaseKey(msg.Queue), base.CompletedKey(msg.Queue), base.TaskKey(msg.Queue, msg.ID), - base.ProcessedKey(msg.Queue, now), + base.ProcessedKey(msg.Queue, now.In(r.location)), base.ProcessedTotalKey(msg.Queue), } argv := []interface{}{ @@ -820,8 +835,8 @@ func (r *RDB) Retry(ctx context.Context, msg *base.TaskMessage, processAt time.T base.ActiveKey(msg.Queue), base.LeaseKey(msg.Queue), base.RetryKey(msg.Queue), - base.ProcessedKey(msg.Queue, now), - base.FailedKey(msg.Queue, now), + base.ProcessedKey(msg.Queue, now.In(r.location)), + base.FailedKey(msg.Queue, now.In(r.location)), base.ProcessedTotalKey(msg.Queue), base.FailedTotalKey(msg.Queue), } @@ -920,8 +935,8 @@ func (r *RDB) Archive(ctx context.Context, msg *base.TaskMessage, errMsg string) base.ActiveKey(msg.Queue), base.LeaseKey(msg.Queue), base.ArchivedKey(msg.Queue), - base.ProcessedKey(msg.Queue, now), - base.FailedKey(msg.Queue, now), + base.ProcessedKey(msg.Queue, now.In(r.location)), + base.FailedKey(msg.Queue, now.In(r.location)), base.ProcessedTotalKey(msg.Queue), base.FailedTotalKey(msg.Queue), base.TaskKeyPrefix(msg.Queue),