Skip to content

Commit bbe648c

Browse files
Fix ack for non-fastack, non-reply case
Signed-off-by: Peter Broadhurst <[email protected]>
1 parent 789f4a6 commit bbe648c

File tree

2 files changed

+42
-20
lines changed

2 files changed

+42
-20
lines changed

internal/events/webhooks/webhooks.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ func (wh *WebHooks) attemptRequest(sub *core.Subscription, event *core.EventDeli
271271
return req, res, nil
272272
}
273273

274-
func (wh *WebHooks) doDelivery(connID string, reply bool, sub *core.Subscription, event *core.EventDelivery, data core.DataArray) error {
274+
func (wh *WebHooks) doDelivery(connID string, reply bool, sub *core.Subscription, event *core.EventDelivery, data core.DataArray, fastAck bool) error {
275275
req, res, gwErr := wh.attemptRequest(sub, event, data)
276276
if gwErr != nil {
277277
// Generate a bad-gateway error response - we always want to send something back,
@@ -298,7 +298,7 @@ func (wh *WebHooks) doDelivery(connID string, reply bool, sub *core.Subscription
298298
txType = fftypes.FFEnum(strings.ToLower(req.replyTx))
299299
}
300300
if cb, ok := wh.callbacks[sub.Namespace]; ok {
301-
log.L(wh.ctx).Tracef("Sending reply message in response to webhook message %s", event.Message.Header.ID)
301+
log.L(wh.ctx).Debugf("Sending reply message for %s CID=%s", event.ID, event.Message.Header.ID)
302302
cb.DeliveryResponse(connID, &core.EventDeliveryResponse{
303303
ID: event.ID,
304304
Rejected: false,
@@ -320,6 +320,14 @@ func (wh *WebHooks) doDelivery(connID string, reply bool, sub *core.Subscription
320320
},
321321
})
322322
}
323+
} else if !fastAck {
324+
if cb, ok := wh.callbacks[sub.Namespace]; ok {
325+
cb.DeliveryResponse(connID, &core.EventDeliveryResponse{
326+
ID: event.ID,
327+
Rejected: false,
328+
Subscription: event.Subscription,
329+
})
330+
}
323331
}
324332
return nil
325333
}
@@ -347,13 +355,22 @@ func (wh *WebHooks) DeliveryRequest(connID string, sub *core.Subscription, event
347355
}
348356

349357
// In fastack mode we drive calls in parallel to the backend, immediately acknowledging the event
350-
if sub.Options.TransportOptions().GetBool("fastack") {
358+
// NOTE: We cannot use this with reply mode, as when we're sending a reply the `DeliveryResponse`
359+
// callback must include the reply in-line.
360+
if !reply && sub.Options.TransportOptions().GetBool("fastack") {
361+
if cb, ok := wh.callbacks[sub.Namespace]; ok {
362+
cb.DeliveryResponse(connID, &core.EventDeliveryResponse{
363+
ID: event.ID,
364+
Rejected: false,
365+
Subscription: event.Subscription,
366+
})
367+
}
351368
go func() {
352-
err := wh.doDelivery(connID, reply, sub, event, data)
369+
err := wh.doDelivery(connID, reply, sub, event, data, true)
353370
log.L(wh.ctx).Warnf("Webhook delivery failed in fastack mode for event '%s': %s", event.ID, err)
354371
}()
355372
return nil
356373
}
357374

358-
return wh.doDelivery(connID, reply, sub, event, data)
375+
return wh.doDelivery(connID, reply, sub, event, data, false)
359376
}

internal/events/webhooks/webhooks_test.go

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,11 @@ func TestRequestNoBodyNoReply(t *testing.T) {
360360

361361
dataID := fftypes.NewUUID()
362362
groupHash := fftypes.NewRandB32()
363-
sub := &core.Subscription{}
363+
sub := &core.Subscription{
364+
SubscriptionRef: core.SubscriptionRef{
365+
Namespace: "ns1",
366+
},
367+
}
364368
to := sub.Options.TransportOptions()
365369
to["url"] = fmt.Sprintf("http://%s/myapi", server.Listener.Addr())
366370
event := &core.EventDelivery{
@@ -380,7 +384,8 @@ func TestRequestNoBodyNoReply(t *testing.T) {
380384
},
381385
},
382386
Subscription: core.SubscriptionRef{
383-
ID: sub.ID,
387+
ID: sub.ID,
388+
Namespace: "ns1",
384389
},
385390
}
386391
data := &core.Data{
@@ -390,9 +395,16 @@ func TestRequestNoBodyNoReply(t *testing.T) {
390395
}`),
391396
}
392397

398+
mcb := wh.callbacks["ns1"].(*eventsmocks.Callbacks)
399+
mcb.On("DeliveryResponse", mock.Anything, mock.MatchedBy(func(response *core.EventDeliveryResponse) bool {
400+
return !response.Rejected
401+
})).Return(nil)
402+
393403
err := wh.DeliveryRequest(mock.Anything, sub, event, core.DataArray{data})
394404
assert.NoError(t, err)
395405
assert.True(t, called)
406+
407+
mcb.AssertExpectations(t)
396408
}
397409

398410
func TestRequestReplyEmptyData(t *testing.T) {
@@ -638,7 +650,7 @@ func TestRequestReplyDataArrayError(t *testing.T) {
638650
mcb.AssertExpectations(t)
639651
}
640652

641-
func TestRequestReplyBuildRequestFailFastAsk(t *testing.T) {
653+
func TestWebhookFailFastAsk(t *testing.T) {
642654
wh, cancel := newTestWebHooks(t)
643655
defer cancel()
644656

@@ -652,7 +664,6 @@ func TestRequestReplyBuildRequestFailFastAsk(t *testing.T) {
652664
Namespace: "ns1",
653665
},
654666
}
655-
sub.Options.TransportOptions()["reply"] = true
656667
sub.Options.TransportOptions()["fastack"] = true
657668
event := &core.EventDelivery{
658669
EnrichedEvent: core.EnrichedEvent{
@@ -673,17 +684,11 @@ func TestRequestReplyBuildRequestFailFastAsk(t *testing.T) {
673684

674685
waiter := make(chan struct{})
675686
mcb := wh.callbacks["ns1"].(*eventsmocks.Callbacks)
676-
dr := mcb.On("DeliveryResponse", mock.Anything, mock.MatchedBy(func(response *core.EventDeliveryResponse) bool {
677-
assert.Equal(t, *msgID, *response.Reply.Message.Header.CID)
678-
assert.Nil(t, response.Reply.Message.Header.Group)
679-
assert.Equal(t, core.MessageTypeBroadcast, response.Reply.Message.Header.Type)
680-
assert.Equal(t, float64(502), response.Reply.InlineData[0].Value.JSONObject()["status"])
681-
assert.Regexp(t, "FF10242", response.Reply.InlineData[0].Value.JSONObject().GetObject("body")["error"])
682-
return true
683-
})).Return(nil)
684-
dr.RunFn = func(a mock.Arguments) {
685-
close(waiter)
686-
}
687+
mcb.On("DeliveryResponse", mock.Anything, mock.Anything).
688+
Return(nil).
689+
Run(func(a mock.Arguments) {
690+
close(waiter)
691+
})
687692

688693
err := wh.DeliveryRequest(mock.Anything, sub, event, core.DataArray{
689694
{ID: fftypes.NewUUID(), Value: fftypes.JSONAnyPtr(`"value1"`)},

0 commit comments

Comments
 (0)