Skip to content

Commit 003ef7a

Browse files
Merge pull request #1546 from hyperledger/fix_fabric_listeners
fix: fabric check existence of listener
2 parents ebf2349 + e479668 commit 003ef7a

File tree

3 files changed

+65
-6
lines changed

3 files changed

+65
-6
lines changed

internal/blockchain/fabric/eventstream.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,22 +156,25 @@ func (s *streamManager) getSubscriptions(ctx context.Context) (subs []*subscript
156156
return subs, nil
157157
}
158158

159-
func (s *streamManager) getSubscription(ctx context.Context, subID string) (sub *subscription, err error) {
159+
func (s *streamManager) getSubscription(ctx context.Context, subID string, okNotFound bool) (sub *subscription, err error) {
160160
res, err := s.client.R().
161161
SetContext(ctx).
162162
SetResult(&sub).
163163
Get(fmt.Sprintf("/subscriptions/%s", subID))
164164
if err != nil || !res.IsSuccess() {
165+
if okNotFound && res.StatusCode() == 404 {
166+
return nil, nil
167+
}
165168
return nil, ffresty.WrapRestErr(ctx, res, err, coremsgs.MsgFabconnectRESTErr)
166169
}
167170
return sub, nil
168171
}
169172

170-
func (s *streamManager) getSubscriptionName(ctx context.Context, subID string) (string, error) {
173+
func (s *streamManager) getSubscriptionName(ctx context.Context, subID string, okNotFound bool) (string, error) {
171174
if cachedValue := s.cache.GetString("sub:" + subID); cachedValue != "" {
172175
return cachedValue, nil
173176
}
174-
sub, err := s.getSubscription(ctx, subID)
177+
sub, err := s.getSubscription(ctx, subID, okNotFound)
175178
if err != nil {
176179
return "", err
177180
}

internal/blockchain/fabric/fabric.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ func (f *Fabric) buildEventLocationString(chaincode string) string {
396396

397397
func (f *Fabric) processContractEvent(ctx context.Context, events common.EventsToDispatch, msgJSON fftypes.JSONObject) (err error) {
398398
subID := msgJSON.GetString("subId")
399-
subName, err := f.streams.getSubscriptionName(ctx, subID)
399+
subName, err := f.streams.getSubscriptionName(ctx, subID, false)
400400
if err != nil {
401401
return err // this is a problem - we should be able to find the listener that dispatched this to us
402402
}
@@ -998,7 +998,13 @@ func (f *Fabric) DeleteContractListener(ctx context.Context, subscription *core.
998998

999999
func (f *Fabric) GetContractListenerStatus(ctx context.Context, namespace, subID string, okNotFound bool) (bool, interface{}, core.ContractListenerStatus, error) {
10001000
// Fabconnect does not currently provide any additional status info for listener subscriptions.
1001-
return true, nil, core.ContractListenerStatusUnknown, nil
1001+
// But we check for existence of the subscription
1002+
sub, err := f.streams.getSubscription(ctx, subID, okNotFound)
1003+
if err != nil || sub == nil {
1004+
return false, nil, core.ContractListenerStatusUnknown, err
1005+
}
1006+
1007+
return true, nil, core.ContractListenerStatusUnknown, err
10021008
}
10031009

10041010
func (f *Fabric) GetFFIParamValidator(ctx context.Context) (fftypes.FFIParamValidator, error) {

internal/blockchain/fabric/fabric_test.go

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3360,12 +3360,62 @@ func TestGetContractListenerStatus(t *testing.T) {
33603360
httpmock.ActivateNonDefault(e.client.GetClient())
33613361
defer httpmock.DeactivateAndReset()
33623362

3363-
_, detail, status, err := e.GetContractListenerStatus(context.Background(), "ns1", "id", true)
3363+
e.streams = &streamManager{
3364+
client: e.client,
3365+
}
3366+
3367+
httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions/id",
3368+
httpmock.NewJsonResponderOrPanic(200, subscription{
3369+
ID: "sb-cb37cc07-e873-4f58-44ab-55add6bba320", Stream: "es12345", Name: "ff-sub-ns1-11232312312",
3370+
}))
3371+
3372+
found, detail, status, err := e.GetContractListenerStatus(context.Background(), "ns1", "id", true)
3373+
assert.True(t, found)
3374+
assert.Nil(t, detail)
3375+
assert.Equal(t, core.ContractListenerStatusUnknown, status)
3376+
assert.NoError(t, err)
3377+
}
3378+
3379+
func TestGetContractListenerStatusNotFound(t *testing.T) {
3380+
e, cancel := newTestFabric()
3381+
defer cancel()
3382+
httpmock.ActivateNonDefault(e.client.GetClient())
3383+
defer httpmock.DeactivateAndReset()
3384+
3385+
e.streams = &streamManager{
3386+
client: e.client,
3387+
}
3388+
3389+
httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions/id",
3390+
httpmock.NewJsonResponderOrPanic(404, nil))
3391+
3392+
found, detail, status, err := e.GetContractListenerStatus(context.Background(), "ns1", "id", true)
3393+
assert.False(t, found)
33643394
assert.Nil(t, detail)
33653395
assert.Equal(t, core.ContractListenerStatusUnknown, status)
33663396
assert.NoError(t, err)
33673397
}
33683398

3399+
func TestGetContractListenerStatusError(t *testing.T) {
3400+
e, cancel := newTestFabric()
3401+
defer cancel()
3402+
httpmock.ActivateNonDefault(e.client.GetClient())
3403+
defer httpmock.DeactivateAndReset()
3404+
3405+
e.streams = &streamManager{
3406+
client: e.client,
3407+
}
3408+
3409+
httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions/id",
3410+
httpmock.NewJsonResponderOrPanic(500, nil))
3411+
3412+
found, detail, status, err := e.GetContractListenerStatus(context.Background(), "ns1", "id", true)
3413+
assert.False(t, found)
3414+
assert.Nil(t, detail)
3415+
assert.Equal(t, core.ContractListenerStatusUnknown, status)
3416+
assert.Error(t, err)
3417+
}
3418+
33693419
func TestGetTransactionStatus(t *testing.T) {
33703420
e, cancel := newTestFabric()
33713421
defer cancel()

0 commit comments

Comments
 (0)