diff --git a/disque/disque.go b/disque/disque.go index c27d736..ed5289c 100644 --- a/disque/disque.go +++ b/disque/disque.go @@ -76,6 +76,9 @@ type Client interface { // difference between ACK and FASTACK FastAck(jobIds ...string) error + // Nack sends and NACK command with the given job ids + Nack(jobIds ...string) error + // Qlen returns the length of a given queue Qlen(qname string) (int, error) @@ -239,6 +242,16 @@ func (c *RedisClient) FastAck(jobIds ...string) error { return nil } +// Nack sends a NACK command with the given job ids +func (c *RedisClient) Nack(jobIds ...string) error { + args := make(redis.Args, 0, len(jobIds)) + args = args.AddFlat(jobIds) + if _, err := c.conn.Do("NACK", args...); err != nil { + return fmt.Errorf("disque: error sending NACK: %s", err) + } + return nil +} + // Qlen returns the length of a given queue func (c *RedisClient) Qlen(qname string) (int, error) { diff --git a/disque/disque_test.go b/disque/disque_test.go index 0dd8f96..6a8382a 100644 --- a/disque/disque_test.go +++ b/disque/disque_test.go @@ -262,6 +262,34 @@ func TestClient(t *testing.T) { } +func TestNack(t *testing.T) { + pool := NewPool(DialFunc(dial), addr) + + client, err := pool.Get() + if err != nil || client == nil { + panic("could not get client" + err.Error()) + } + defer client.Close() + qname := "test1" + ja := AddRequest{ + Job: Job{ + Queue: qname, + Data: []byte("foo"), + }, + Timeout: time.Millisecond * 100, + Replicate: pool.Size(), + } + + id, err := client.Add(ja) + if err != nil { + t.Fatal(err) + } + + if err = client.Nack(id); err != nil { + t.Fatal(err) + } +} + func BenchmarkAdd(b *testing.B) { pool := NewPool(DialFunc(dial), addr)