Skip to content

Commit e486b2e

Browse files
Merge pull request #1652 from kaleido-io/certmisatch-gauages
[metrics] [dataexchange] [networkmap] DXConnect Callbacks for Node Identity Check Metrics
2 parents 7f9364d + a5cb23a commit e486b2e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+1465
-148
lines changed

.gitignore

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,6 @@ __debug*
1212
!deploy/charts/firefly
1313
containerlogs
1414
.vscode/*.log
15-
.idea
16-
doc-site/site
15+
.idea/
16+
doc-site/site
17+
*.iml

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ GOGC=30
1414

1515
all: build test go-mod-tidy
1616
test: deps lint
17-
$(VGO) test ./internal/... ./pkg/... ./cmd/... ./doc-site ./ffconfig/... -cover -coverprofile=coverage.txt -covermode=atomic -timeout=30s ${TEST_ARGS}
17+
$(VGO) test ./internal/... ./pkg/... ./cmd/... ./doc-site ./ffconfig/... -cover -coverprofile=coverage.txt -covermode=atomic -timeout=45s ${TEST_ARGS}
1818
coverage.html:
1919
$(VGO) tool cover -html=coverage.txt
2020
coverage: test coverage.html

go.work.sum

Lines changed: 202 additions & 6 deletions
Large diffs are not rendered by default.

internal/apiserver/route_get_subscription_events_filtered_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,4 @@ func TestGetSubscriptionEventsFilteredNoSequenceIDsProvided(t *testing.T) {
8282

8383
r.ServeHTTP(res, req)
8484
assert.Equal(t, 200, res.Result().StatusCode)
85-
}
85+
}

internal/coremsgs/en_error_messages.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,4 +318,7 @@ var (
318318
MsgDuplicateContractListenerFilterLocation = ffe("FF10477", "Duplicate filter provided for contract listener for location", 400)
319319
MsgInvalidNamespaceForOperationUpdate = ffe("FF10478", "Received different namespace for operation update '%s' than expected for manager '%s'")
320320
MsgEmptyPluginForOperationUpdate = ffe("FF10479", "Received empty plugin for operation update '%s'")
321+
MsgInvalidIdentityPatch = ffe("FF10480", "A profile must be provided when updating an identity", 400)
322+
MsgNodeNotProvidedForCheck = ffe("FF10481", "Node not provided for check", 500)
323+
MsgNodeMissingProfile = ffe("FF10482", "Node provided for check does not have a profile", 500)
321324
)

internal/database/sqlcommon/event_sql_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ func TestGetEventsInSequenceRangeE2EWithDB(t *testing.T) {
107107
Type: core.EventTypeMessageConfirmed,
108108
Reference: fftypes.NewUUID(),
109109
Correlator: fftypes.NewUUID(),
110-
Topic: fmt.Sprintf("topic%d", i % 2),
110+
Topic: fmt.Sprintf("topic%d", i%2),
111111
Created: fftypes.Now(),
112112
}
113113
err := s.InsertEvent(ctx, event)
@@ -322,10 +322,9 @@ func TestGetEventsInSequenceRangeBuildQueryFail(t *testing.T) {
322322

323323
func TestGetEventsInSequenceRangeShouldCallGetEventsWhenNoSequencedProvidedAndThrowAnError(t *testing.T) {
324324
s, mock := newMockProvider().init()
325-
mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{"id", }).AddRow("only one"))
325+
mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("only one"))
326326
f := database.EventQueryFactory.NewFilter(context.Background()).And()
327327
_, _, err := s.GetEventsInSequenceRange(context.Background(), "ns1", f, -1, -1)
328328
assert.NotNil(t, err)
329329
assert.NoError(t, mock.ExpectationsWereMet())
330330
}
331-

internal/dataexchange/ffdx/ffdx.go

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,17 @@ package ffdx
1818

1919
import (
2020
"context"
21+
"crypto/x509"
2122
"encoding/json"
23+
"encoding/pem"
24+
"errors"
2225
"fmt"
2326
"io"
2427
"strings"
2528
"sync"
29+
"time"
30+
31+
"github.com/hyperledger/firefly/internal/metrics"
2632

2733
"github.com/go-resty/resty/v2"
2834
"github.com/hyperledger/firefly-common/pkg/config"
@@ -54,6 +60,8 @@ type FFDX struct {
5460
retry *retry.Retry
5561
backgroundStart bool
5662
backgroundRetry *retry.Retry
63+
64+
metrics metrics.Manager // optional
5765
}
5866

5967
type dxNode struct {
@@ -168,7 +176,7 @@ func (h *FFDX) Name() string {
168176
return "ffdx"
169177
}
170178

171-
func (h *FFDX) Init(ctx context.Context, cancelCtx context.CancelFunc, config config.Section) (err error) {
179+
func (h *FFDX) Init(ctx context.Context, cancelCtx context.CancelFunc, config config.Section, metrics metrics.Manager) (err error) {
172180
h.ctx = log.WithLogField(ctx, "dx", "https")
173181
h.cancelCtx = cancelCtx
174182
h.ackChannel = make(chan *ack)
@@ -179,6 +187,7 @@ func (h *FFDX) Init(ctx context.Context, cancelCtx context.CancelFunc, config co
179187
}
180188
h.needsInit = config.GetBool(DataExchangeInitEnabled)
181189
h.nodes = make(map[string]*dxNode)
190+
h.metrics = metrics
182191

183192
if config.GetString(ffresty.HTTPConfigURL) == "" {
184193
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "url", "dataexchange.ffdx")
@@ -295,6 +304,11 @@ func (h *FFDX) beforeConnect(ctx context.Context, w wsclient.WSClient) error {
295304
return fmt.Errorf("DX returned non-ready status: %s", status.Status)
296305
}
297306
}
307+
308+
for _, cb := range h.callbacks.handlers {
309+
cb.DXConnect(h)
310+
}
311+
298312
h.initialized = true
299313
return nil
300314
}
@@ -448,6 +462,95 @@ func (h *FFDX) TransferBlob(ctx context.Context, nsOpID string, peer, sender fft
448462
return nil
449463
}
450464

465+
func (h *FFDX) CheckNodeIdentityStatus(ctx context.Context, node *core.Identity) error {
466+
if err := h.checkInitialized(ctx); err != nil {
467+
return err
468+
}
469+
470+
if node == nil {
471+
return i18n.NewError(ctx, coremsgs.MsgNodeNotProvidedForCheck)
472+
}
473+
474+
var mismatchState = metrics.NodeIdentityDXCertMismatchStatusUnknown
475+
defer func() {
476+
if h.metrics != nil && h.metrics.IsMetricsEnabled() {
477+
h.metrics.NodeIdentityDXCertMismatch(node.Namespace, mismatchState)
478+
}
479+
log.L(ctx).Debugf("Identity status checked against DX node='%s' mismatch_state='%s'", node.Name, mismatchState)
480+
}()
481+
482+
dxPeer, err := h.GetEndpointInfo(ctx, node.Name) // should be the same as the local node
483+
if err != nil {
484+
return err
485+
}
486+
487+
dxPeerCert := dxPeer.GetString("cert")
488+
// if this occurs, it is either a misconfigured / broken DX or likely a DX that is compatible from an API perspective
489+
// but does not have the same peer info as the HTTPS mTLS DX
490+
if dxPeerCert == "" {
491+
log.L(ctx).Debugf("DX peer does not have a 'cert', DX plugin may be unsupported")
492+
return nil
493+
}
494+
495+
expiry, err := extractSoonestExpiryFromCertBundle(strings.ReplaceAll(dxPeerCert, `\n`, "\n"))
496+
if err == nil {
497+
if expiry.Before(time.Now()) {
498+
log.L(ctx).Warnf("DX certificate for node '%s' has expired", node.Name)
499+
}
500+
501+
if h.metrics != nil && h.metrics.IsMetricsEnabled() {
502+
h.metrics.NodeIdentityDXCertExpiry(node.Namespace, expiry)
503+
}
504+
} else {
505+
log.L(ctx).Errorf("Failed to find x509 cert within DX cert bundle node='%s' namespace='%s'", node.Name, node.Namespace)
506+
}
507+
508+
if node.Profile == nil {
509+
return i18n.NewError(ctx, coremsgs.MsgNodeNotProvidedForCheck)
510+
}
511+
512+
nodeCert := node.Profile.GetString("cert")
513+
if nodeCert != "" {
514+
mismatchState = metrics.NodeIdentityDXCertMismatchStatusHealthy
515+
if dxPeerCert != nodeCert {
516+
log.L(ctx).Warnf("DX certificate for node '%s' is out-of-sync with on-chain identity", node.Name)
517+
mismatchState = metrics.NodeIdentityDXCertMismatchStatusMismatched
518+
}
519+
}
520+
521+
return nil
522+
}
523+
524+
// We assume the cert with the soonest expiry is the leaf cert, but even if its the CA,
525+
// that's what will invalidate the leaf anyways, so really we only care about the soonest expiry.
526+
// So we loop through the bundle finding the soonest expiry, not necessarily the leaf.
527+
func extractSoonestExpiryFromCertBundle(certBundle string) (time.Time, error) {
528+
var expiringCert *x509.Certificate
529+
var block *pem.Block
530+
var rest = []byte(certBundle)
531+
532+
for {
533+
block, rest = pem.Decode(rest)
534+
if block == nil {
535+
break
536+
}
537+
538+
cert, err := x509.ParseCertificate(block.Bytes)
539+
if err != nil {
540+
return time.Time{}, fmt.Errorf("failed to parse non-certificate within bundle: %v", err)
541+
}
542+
if expiringCert == nil || cert.NotAfter.Before(expiringCert.NotAfter) {
543+
expiringCert = cert
544+
}
545+
}
546+
547+
if expiringCert == nil {
548+
return time.Time{}, errors.New("no valid certificate found")
549+
}
550+
551+
return expiringCert.NotAfter.UTC(), nil
552+
}
553+
451554
func (h *FFDX) ackLoop() {
452555
for {
453556
select {

0 commit comments

Comments
 (0)