Skip to content

Commit a26e220

Browse files
authored
Merge pull request #1072 from kaleido-io/wh-logging
Fix acknowledgement for webhooks in non-reply, non-fastack cases
2 parents 1b7dc9b + 96f3e76 commit a26e220

File tree

7 files changed

+113
-24
lines changed

7 files changed

+113
-24
lines changed

docs/reference/config.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,47 @@ nav_order: 2
531531
|default|The default event transport for new subscriptions|`string`|`<nil>`
532532
|enabled|Which event interface plugins are enabled|`boolean`|`<nil>`
533533

534+
## events.webhooks
535+
536+
|Key|Description|Type|Default Value|
537+
|---|-----------|----|-------------|
538+
|connectionTimeout|The maximum amount of time that a connection is allowed to remain with no data transmitted|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
539+
|expectContinueTimeout|See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)|[`time.Duration`](https://pkg.go.dev/time#Duration)|`1s`
540+
|headers|Adds custom headers to HTTP requests|`map[string]string`|`<nil>`
541+
|idleTimeout|The max duration to hold a HTTP keepalive connection between calls|[`time.Duration`](https://pkg.go.dev/time#Duration)|`475ms`
542+
|maxIdleConns|The max number of idle connections to hold pooled|`int`|`100`
543+
|requestTimeout|The maximum amount of time that a request is allowed to remain open|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
544+
|tlsHandshakeTimeout|The maximum amount of time to wait for a successful TLS handshake|[`time.Duration`](https://pkg.go.dev/time#Duration)|`10s`
545+
546+
## events.webhooks.auth
547+
548+
|Key|Description|Type|Default Value|
549+
|---|-----------|----|-------------|
550+
|password|Password|`string`|`<nil>`
551+
|username|Username|`string`|`<nil>`
552+
553+
## events.webhooks.proxy
554+
555+
|Key|Description|Type|Default Value|
556+
|---|-----------|----|-------------|
557+
|url|Optional HTTP proxy server to connect through|`string`|`<nil>`
558+
559+
## events.webhooks.retry
560+
561+
|Key|Description|Type|Default Value|
562+
|---|-----------|----|-------------|
563+
|count|The maximum number of times to retry|`int`|`5`
564+
|enabled|Enables retries|`boolean`|`false`
565+
|initWaitTime|The initial retry delay|[`time.Duration`](https://pkg.go.dev/time#Duration)|`250ms`
566+
|maxWaitTime|The maximum retry delay|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
567+
568+
## events.websockets
569+
570+
|Key|Description|Type|Default Value|
571+
|---|-----------|----|-------------|
572+
|readBufferSize|WebSocket read buffer size|[`BytesSize`](https://pkg.go.dev/github.com/docker/go-units#BytesSize)|`16Kb`
573+
|writeBufferSize|WebSocket write buffer size|[`BytesSize`](https://pkg.go.dev/github.com/docker/go-units#BytesSize)|`16Kb`
574+
534575
## histograms
535576

536577
|Key|Description|Type|Default Value|

docs/reference/types/_includes/subscription_description.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,12 @@ allowing you to customize your HTTP requests as follows:
167167

168168
- Set the HTTP request details:
169169
- Method, URL, query, headers and input body
170-
- Wait for a successful `2xx` HTTP code from the back-end service, before
171-
acknowledging (default).
170+
- Wait for a invocation of the back-end service, before acknowledging
171+
- To retry requests to your Webhook on a non-`2xx` HTTP status code
172+
or other error, then you should enable and configure
173+
[events.webhooks.retry](../../config.html#eventswebhooksretry)
174+
- The event is acknowledged once the request (with any retries), is
175+
completed - regardless of whether the outcome was a success or failure.
172176
- Use `fastack` to acknowledge against FireFly immediately and make multiple
173177
parallel calls to the HTTP API in a fire-and-forget fashion.
174178
- Set the HTTP request details dynamically from `message_confirmed` events:

internal/coremsgs/en_config_descriptions.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,4 +351,9 @@ var (
351351
ConfigPluginsAuth = ffc("config.plugins.auth", "Authorization plugin configuration", i18n.MapStringStringType)
352352
ConfigPluginsAuthName = ffc("config.plugins.auth[].name", "The name of the auth plugin to use", i18n.StringType)
353353
ConfigPluginsAuthType = ffc("config.plugins.auth[].type", "The type of the auth plugin to use", i18n.StringType)
354+
355+
ConfigPluginsEventSystemReadAhead = ffc("config.events.system.readAhead", "", i18n.IgnoredType)
356+
ConfigPluginsEventWebhooksURL = ffc("config.events.webhooks.url", "", i18n.IgnoredType)
357+
ConfigPluginsEventWebSocketsReadBufferSize = ffc("config.events.websockets.readBufferSize", "WebSocket read buffer size", i18n.ByteSizeType)
358+
ConfigPluginsEventWebSocketsWriteBufferSize = ffc("config.events.websockets.writeBufferSize", "WebSocket write buffer size", i18n.ByteSizeType)
354359
)

internal/events/eifactory/factory.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package eifactory
1919
import (
2020
"context"
2121

22+
"github.com/hyperledger/firefly-common/pkg/config"
2223
"github.com/hyperledger/firefly-common/pkg/i18n"
2324
"github.com/hyperledger/firefly/internal/coremsgs"
2425
"github.com/hyperledger/firefly/internal/events/system"
@@ -41,6 +42,12 @@ func init() {
4142
}
4243
}
4344

45+
func InitConfig(config config.Section) {
46+
for name, plugin := range pluginsByName {
47+
plugin.InitConfig(config.SubSection(name))
48+
}
49+
}
50+
4451
func GetPlugin(ctx context.Context, pluginType string) (events.Plugin, error) {
4552
plugin, ok := pluginsByName[pluginType]
4653
if !ok {

internal/events/webhooks/webhooks.go

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,13 @@ type whResponse struct {
6363
func (wh *WebHooks) Name() string { return "webhooks" }
6464

6565
func (wh *WebHooks) Init(ctx context.Context, config config.Section) (err error) {
66+
connID := fftypes.ShortID()
6667
*wh = WebHooks{
67-
ctx: ctx,
68+
ctx: log.WithLogField(ctx, "webhook", wh.connID),
6869
capabilities: &events.Capabilities{},
6970
callbacks: make(map[string]events.Callbacks),
7071
client: ffresty.New(ctx, config),
71-
connID: fftypes.ShortID(),
72+
connID: connID,
7273
}
7374
return nil
7475
}
@@ -85,7 +86,9 @@ func (wh *WebHooks) Capabilities() *events.Capabilities {
8586

8687
func (wh *WebHooks) buildRequest(options fftypes.JSONObject, firstData fftypes.JSONObject) (req *whRequest, err error) {
8788
req = &whRequest{
88-
r: wh.client.R().SetDoNotParseResponse(true),
89+
r: wh.client.R().
90+
SetDoNotParseResponse(true).
91+
SetContext(wh.ctx),
8992
url: options.GetString("url"),
9093
method: options.GetString("method"),
9194
forceJSON: options.GetBool("json"),
@@ -223,8 +226,10 @@ func (wh *WebHooks) attemptRequest(sub *core.Subscription, event *core.EventDeli
223226
}
224227
}
225228

229+
log.L(wh.ctx).Debugf("Webhook-> %s %s event %s on subscription %s", req.method, req.url, event.ID, sub.ID)
226230
resp, err := req.r.Execute(req.method, req.url)
227231
if err != nil {
232+
log.L(wh.ctx).Errorf("Webhook<- %s %s event %s on subscription %s failed: %s", req.method, req.url, event.ID, sub.ID, err)
228233
return nil, nil, err
229234
}
230235
defer func() { _ = resp.RawBody().Close() }()
@@ -233,6 +238,7 @@ func (wh *WebHooks) attemptRequest(sub *core.Subscription, event *core.EventDeli
233238
Status: resp.StatusCode(),
234239
Headers: fftypes.JSONObject{},
235240
}
241+
log.L(wh.ctx).Infof("Webhook<- %s %s event %s on subscription %s returned %d", req.method, req.url, event.ID, sub.ID, res.Status)
236242
header := resp.Header()
237243
for h := range header {
238244
res.Headers[h] = header.Get(h)
@@ -265,7 +271,7 @@ func (wh *WebHooks) attemptRequest(sub *core.Subscription, event *core.EventDeli
265271
return req, res, nil
266272
}
267273

268-
func (wh *WebHooks) doDelivery(connID string, reply bool, sub *core.Subscription, event *core.EventDelivery, data core.DataArray) error {
274+
func (wh *WebHooks) doDelivery(connID string, reply bool, sub *core.Subscription, event *core.EventDelivery, data core.DataArray, fastAck bool) error {
269275
req, res, gwErr := wh.attemptRequest(sub, event, data)
270276
if gwErr != nil {
271277
// Generate a bad-gateway error response - we always want to send something back,
@@ -292,6 +298,7 @@ func (wh *WebHooks) doDelivery(connID string, reply bool, sub *core.Subscription
292298
txType = fftypes.FFEnum(strings.ToLower(req.replyTx))
293299
}
294300
if cb, ok := wh.callbacks[sub.Namespace]; ok {
301+
log.L(wh.ctx).Debugf("Sending reply message for %s CID=%s", event.ID, event.Message.Header.ID)
295302
cb.DeliveryResponse(connID, &core.EventDeliveryResponse{
296303
ID: event.ID,
297304
Rejected: false,
@@ -313,6 +320,14 @@ func (wh *WebHooks) doDelivery(connID string, reply bool, sub *core.Subscription
313320
},
314321
})
315322
}
323+
} else if !fastAck {
324+
if cb, ok := wh.callbacks[sub.Namespace]; ok {
325+
cb.DeliveryResponse(connID, &core.EventDeliveryResponse{
326+
ID: event.ID,
327+
Rejected: false,
328+
Subscription: event.Subscription,
329+
})
330+
}
316331
}
317332
return nil
318333
}
@@ -340,13 +355,22 @@ func (wh *WebHooks) DeliveryRequest(connID string, sub *core.Subscription, event
340355
}
341356

342357
// In fastack mode we drive calls in parallel to the backend, immediately acknowledging the event
343-
if sub.Options.TransportOptions().GetBool("fastack") {
358+
// NOTE: We cannot use this with reply mode, as when we're sending a reply the `DeliveryResponse`
359+
// callback must include the reply in-line.
360+
if !reply && sub.Options.TransportOptions().GetBool("fastack") {
361+
if cb, ok := wh.callbacks[sub.Namespace]; ok {
362+
cb.DeliveryResponse(connID, &core.EventDeliveryResponse{
363+
ID: event.ID,
364+
Rejected: false,
365+
Subscription: event.Subscription,
366+
})
367+
}
344368
go func() {
345-
err := wh.doDelivery(connID, reply, sub, event, data)
369+
err := wh.doDelivery(connID, reply, sub, event, data, true)
346370
log.L(wh.ctx).Warnf("Webhook delivery failed in fastack mode for event '%s': %s", event.ID, err)
347371
}()
348372
return nil
349373
}
350374

351-
return wh.doDelivery(connID, reply, sub, event, data)
375+
return wh.doDelivery(connID, reply, sub, event, data, false)
352376
}

internal/events/webhooks/webhooks_test.go

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,11 @@ func TestRequestNoBodyNoReply(t *testing.T) {
360360

361361
dataID := fftypes.NewUUID()
362362
groupHash := fftypes.NewRandB32()
363-
sub := &core.Subscription{}
363+
sub := &core.Subscription{
364+
SubscriptionRef: core.SubscriptionRef{
365+
Namespace: "ns1",
366+
},
367+
}
364368
to := sub.Options.TransportOptions()
365369
to["url"] = fmt.Sprintf("http://%s/myapi", server.Listener.Addr())
366370
event := &core.EventDelivery{
@@ -380,7 +384,8 @@ func TestRequestNoBodyNoReply(t *testing.T) {
380384
},
381385
},
382386
Subscription: core.SubscriptionRef{
383-
ID: sub.ID,
387+
ID: sub.ID,
388+
Namespace: "ns1",
384389
},
385390
}
386391
data := &core.Data{
@@ -390,9 +395,16 @@ func TestRequestNoBodyNoReply(t *testing.T) {
390395
}`),
391396
}
392397

398+
mcb := wh.callbacks["ns1"].(*eventsmocks.Callbacks)
399+
mcb.On("DeliveryResponse", mock.Anything, mock.MatchedBy(func(response *core.EventDeliveryResponse) bool {
400+
return !response.Rejected
401+
})).Return(nil)
402+
393403
err := wh.DeliveryRequest(mock.Anything, sub, event, core.DataArray{data})
394404
assert.NoError(t, err)
395405
assert.True(t, called)
406+
407+
mcb.AssertExpectations(t)
396408
}
397409

398410
func TestRequestReplyEmptyData(t *testing.T) {
@@ -638,7 +650,7 @@ func TestRequestReplyDataArrayError(t *testing.T) {
638650
mcb.AssertExpectations(t)
639651
}
640652

641-
func TestRequestReplyBuildRequestFailFastAsk(t *testing.T) {
653+
func TestWebhookFailFastAsk(t *testing.T) {
642654
wh, cancel := newTestWebHooks(t)
643655
defer cancel()
644656

@@ -652,7 +664,6 @@ func TestRequestReplyBuildRequestFailFastAsk(t *testing.T) {
652664
Namespace: "ns1",
653665
},
654666
}
655-
sub.Options.TransportOptions()["reply"] = true
656667
sub.Options.TransportOptions()["fastack"] = true
657668
event := &core.EventDelivery{
658669
EnrichedEvent: core.EnrichedEvent{
@@ -673,17 +684,11 @@ func TestRequestReplyBuildRequestFailFastAsk(t *testing.T) {
673684

674685
waiter := make(chan struct{})
675686
mcb := wh.callbacks["ns1"].(*eventsmocks.Callbacks)
676-
dr := mcb.On("DeliveryResponse", mock.Anything, mock.MatchedBy(func(response *core.EventDeliveryResponse) bool {
677-
assert.Equal(t, *msgID, *response.Reply.Message.Header.CID)
678-
assert.Nil(t, response.Reply.Message.Header.Group)
679-
assert.Equal(t, core.MessageTypeBroadcast, response.Reply.Message.Header.Type)
680-
assert.Equal(t, float64(502), response.Reply.InlineData[0].Value.JSONObject()["status"])
681-
assert.Regexp(t, "FF10242", response.Reply.InlineData[0].Value.JSONObject().GetObject("body")["error"])
682-
return true
683-
})).Return(nil)
684-
dr.RunFn = func(a mock.Arguments) {
685-
close(waiter)
686-
}
687+
mcb.On("DeliveryResponse", mock.Anything, mock.Anything).
688+
Return(nil).
689+
Run(func(a mock.Arguments) {
690+
close(waiter)
691+
})
687692

688693
err := wh.DeliveryRequest(mock.Anything, sub, event, core.DataArray{
689694
{ID: fftypes.NewUUID(), Value: fftypes.JSONAnyPtr(`"value1"`)},

internal/namespace/manager.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,9 @@ func NewNamespaceManager(withDefaults bool) Manager {
190190
tifactory.InitConfig(tokensConfig)
191191
authfactory.InitConfigArray(authConfig)
192192

193+
// Events still live at the root of the config
194+
eifactory.InitConfig(config.RootSection("events"))
195+
193196
return nm
194197
}
195198

0 commit comments

Comments
 (0)