Skip to content

Commit f20dc5f

Browse files
authored
Merge pull request #51 from skrater/fix-producer
Don't log errors in consumer event
2 parents 399e6f3 + c0b8bd1 commit f20dc5f

File tree

1 file changed

+40
-48
lines changed

1 file changed

+40
-48
lines changed

amqp/consumer.go

Lines changed: 40 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,12 @@ import (
1212
"time"
1313

1414
"github.com/eventials/goevents/messaging"
15-
log "github.com/sirupsen/logrus"
15+
"github.com/sirupsen/logrus"
1616
amqplib "github.com/streadway/amqp"
1717
)
1818

1919
var (
20-
logger = log.WithFields(log.Fields{
20+
logger = logrus.WithFields(logrus.Fields{
2121
"type": "goevents",
2222
"sub_type": "consumer",
2323
})
@@ -115,7 +115,7 @@ func (c *consumer) dispatch(msg amqplib.Delivery) {
115115
delay, isRetry := getXRetryDelayHeader(msg)
116116

117117
if isRetry {
118-
logger.WithFields(log.Fields{
118+
logger.WithFields(logrus.Fields{
119119
"delay": delay.String(),
120120
"message_id": msg.MessageId,
121121
}).Debug("Delaying message.")
@@ -131,7 +131,7 @@ func (c *consumer) dispatch(msg amqplib.Delivery) {
131131
err := msg.Nack(false, true)
132132

133133
if err != nil {
134-
logger.WithFields(log.Fields{
134+
logger.WithFields(logrus.Fields{
135135
"error": err,
136136
"message_id": msg.MessageId,
137137
}).Error("Failed to nack message.")
@@ -174,48 +174,42 @@ func (c *consumer) callAndHandlePanic(msg amqplib.Delivery, h *handler) (err err
174174
func (c *consumer) doDispatch(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) {
175175
err := c.callAndHandlePanic(msg, h)
176176

177+
log := logger.WithFields(logrus.Fields{
178+
"action": h.action,
179+
"body": string(msg.Body),
180+
"message_id": msg.MessageId,
181+
})
182+
177183
if err == nil {
178-
logger.WithFields(log.Fields{
179-
"action": h.action,
180-
"body": string(msg.Body),
181-
"message_id": msg.MessageId,
182-
}).Debug("Message handled successfully.")
184+
log.Debug("Message handled successfully.")
183185

184186
if !c.autoAck {
185187
msg.Ack(false)
186188
}
187-
} else {
188-
if h.maxRetries > 0 {
189-
if retryCount >= h.maxRetries {
190-
logger.WithFields(log.Fields{
191-
"action": h.action,
192-
"body": string(msg.Body),
193-
"error": err,
194-
"message_id": msg.MessageId,
195-
}).Error("Maximum retries reached. Giving up.")
196-
197-
if !c.autoAck {
198-
msg.Ack(false)
199-
}
200-
} else {
201-
logger.WithFields(log.Fields{
202-
"action": h.action,
203-
"body": string(msg.Body),
204-
"error": err,
205-
"message_id": msg.MessageId,
206-
}).Error("Failed to process event. Retrying...")
207189

208-
c.retryMessage(msg, h, retryCount, delay)
209-
}
210-
} else {
211-
logger.WithFields(log.Fields{
212-
"action": h.action,
213-
"body": string(msg.Body),
214-
"error": err,
215-
"message_id": msg.MessageId,
216-
}).Error("Failed to process event.")
190+
return
191+
}
192+
193+
log = log.WithError(err)
194+
195+
if h.maxRetries == 0 {
196+
log.Warn("Failed to process event.")
197+
return
198+
}
199+
200+
if retryCount >= h.maxRetries {
201+
log.Error("Maximum retries reached. Giving up.")
202+
203+
if !c.autoAck {
204+
msg.Ack(false)
217205
}
206+
207+
return
218208
}
209+
210+
log.Debug("Failed to process event. Retrying...")
211+
212+
c.retryMessage(msg, h, retryCount, delay)
219213
}
220214

221215
func (c *consumer) publishMessage(msg amqplib.Publishing, queue string) error {
@@ -271,9 +265,7 @@ func (c *consumer) retryMessage(msg amqplib.Delivery, h *handler, retryCount int
271265
err := c.publishMessage(retryMsg, c.queueName)
272266

273267
if err != nil {
274-
logger.WithFields(log.Fields{
275-
"error": err,
276-
}).Error("Failed to retry.")
268+
logger.WithError(err).Error("Failed to retry.")
277269

278270
if !c.autoAck {
279271
msg.Nack(false, true)
@@ -468,9 +460,11 @@ func (c *consumer) setupTopology(channel *amqplib.Channel) (err error) {
468460
}
469461

470462
func (c *consumer) doConsume() error {
471-
logger.WithFields(log.Fields{
463+
log := logger.WithFields(logrus.Fields{
472464
"queue": c.queueName,
473-
}).Debug("Setting up consumer channel...")
465+
})
466+
467+
log.Debug("Setting up consumer channel...")
474468

475469
channel, err := c.conn.openChannel()
476470

@@ -500,9 +494,7 @@ func (c *consumer) doConsume() error {
500494
return err
501495
}
502496

503-
logger.WithFields(log.Fields{
504-
"queue": c.queueName,
505-
}).Info("Consuming messages...")
497+
log.Info("Consuming messages...")
506498

507499
for m := range msgs {
508500
c.wg.Add(1)
@@ -545,12 +537,12 @@ func (c *consumer) Consume() {
545537
err := c.doConsume()
546538

547539
if err == nil {
548-
logger.WithFields(log.Fields{
540+
logger.WithFields(logrus.Fields{
549541
"queue": c.queueName,
550542
"closed": c.closed,
551543
}).Info("Consumption finished.")
552544
} else {
553-
logger.WithFields(log.Fields{
545+
logger.WithFields(logrus.Fields{
554546
"queue": c.queueName,
555547
"error": err,
556548
}).Error("Error consuming events.")

0 commit comments

Comments
 (0)