Skip to content

Commit 13b78f2

Browse files
feat: add environment ID support (#291)
**Requirements** - [X] I have added test coverage for new or changed functionality - [X] I have followed the repository's [pull request submission guidelines](../blob/v5/CONTRIBUTING.md#submitting-pull-requests) - [X] I have validated my changes against all supported platform versions **Describe the solution you've provided** Added support for environment ID. This also bumps eventsource to v1.10.0 and go-test-helpers to v3.1.0.
1 parent 19e81d9 commit 13b78f2

31 files changed

+445
-183
lines changed

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ require (
66
github.com/fsnotify/fsnotify v1.4.7
77
github.com/gregjones/httpcache v0.0.0-20171119193500-2bcd89a1743f
88
github.com/launchdarkly/ccache v1.1.0
9-
github.com/launchdarkly/eventsource v1.9.1
9+
github.com/launchdarkly/eventsource v1.10.0
1010
github.com/launchdarkly/go-jsonstream/v3 v3.1.0
1111
github.com/launchdarkly/go-ntlm-proxy-auth v1.0.2
1212
github.com/launchdarkly/go-sdk-common/v3 v3.1.0
1313
github.com/launchdarkly/go-sdk-events/v3 v3.5.0
1414
github.com/launchdarkly/go-server-sdk-evaluation/v3 v3.0.1
15-
github.com/launchdarkly/go-test-helpers/v3 v3.0.2
15+
github.com/launchdarkly/go-test-helpers/v3 v3.1.0
1616
github.com/patrickmn/go-cache v2.1.0+incompatible
1717
github.com/stretchr/testify v1.9.0
1818
golang.org/x/sync v0.8.0

go.sum

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
1717
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
1818
github.com/launchdarkly/ccache v1.1.0 h1:voD1M+ZJXR3MREOKtBwgTF9hYHl1jg+vFKS/+VAkR2k=
1919
github.com/launchdarkly/ccache v1.1.0/go.mod h1:TlxzrlnzvYeXiLHmesMuvoZetu4Z97cV1SsdqqBJi1Q=
20-
github.com/launchdarkly/eventsource v1.9.1 h1:Ov89TFuSrKZewotRvnh6CuqXqlxJhk4JMPWdR3l1DJQ=
21-
github.com/launchdarkly/eventsource v1.9.1/go.mod h1:IBckHy1VOjJGqSg07EJJLiUnk5DPunX9LKD9vbcgeHo=
20+
github.com/launchdarkly/eventsource v1.10.0 h1:H9Tp6AfGu/G2qzBJC26iperrvwhzdbiA/gx7qE2nDFI=
21+
github.com/launchdarkly/eventsource v1.10.0/go.mod h1:J3oa50bPvJesZqNAJtb5btSIo5N6roDWhiAS3IpsKck=
2222
github.com/launchdarkly/go-jsonstream/v3 v3.1.0 h1:U/7/LplZO72XefBQ+FzHf6o4FwLHVqBE+4V58Ornu/E=
2323
github.com/launchdarkly/go-jsonstream/v3 v3.1.0/go.mod h1:2Pt4BR5AwWgsuVTCcIpB6Os04JFIKWfoA+7faKkZB5E=
2424
github.com/launchdarkly/go-ntlm-proxy-auth v1.0.2 h1:LnChqC/CulrA+N26DF4rjbDjAARc9eTwmpEdgGsRjrY=
@@ -33,10 +33,8 @@ github.com/launchdarkly/go-semver v1.0.3 h1:agIy/RN3SqeQDIfKkl+oFslEdeIs7pgsJBs3
3333
github.com/launchdarkly/go-semver v1.0.3/go.mod h1:xFmMwXba5Mb+3h72Z+VeSs9ahCvKo2QFUTHRNHVqR28=
3434
github.com/launchdarkly/go-server-sdk-evaluation/v3 v3.0.1 h1:rTgcYAFraGFj7sBMB2b7JCYCm0b9kph4FaMX02t4osQ=
3535
github.com/launchdarkly/go-server-sdk-evaluation/v3 v3.0.1/go.mod h1:fPS5d+zOsgFnMunj+Ki6jjlZtFvo4h9iNbtNXxzYn58=
36-
github.com/launchdarkly/go-test-helpers/v2 v2.2.0 h1:L3kGILP/6ewikhzhdNkHy1b5y4zs50LueWenVF0sBbs=
37-
github.com/launchdarkly/go-test-helpers/v2 v2.2.0/go.mod h1:L7+th5govYp5oKU9iN7To5PgznBuIjBPn+ejqKR0avw=
38-
github.com/launchdarkly/go-test-helpers/v3 v3.0.2 h1:rh0085g1rVJM5qIukdaQ8z1XTWZztbJ49vRZuveqiuU=
39-
github.com/launchdarkly/go-test-helpers/v3 v3.0.2/go.mod h1:u2ZvJlc/DDJTFrshWW50tWMZHLVYXofuSHUfTU/eIwM=
36+
github.com/launchdarkly/go-test-helpers/v3 v3.1.0 h1:E3bxJMzMoA+cJSF3xxtk2/chr1zshl1ZWa0/oR+8bvg=
37+
github.com/launchdarkly/go-test-helpers/v3 v3.1.0/go.mod h1:Ake5+hZFS/DmIGKx/cizhn5W9pGA7pplcR7xCxWiLIo=
4038
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
4139
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
4240
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=

internal/datasource/data_source_status_provider_impl_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,14 @@ func dataSourceStatusProviderImplTest(action func(dataSourceStatusProviderImplTe
2727
defer flagBroadcaster.Close()
2828
store := datastore.NewInMemoryDataStore(sharedtest.NewTestLoggers())
2929
dataStoreStatusProvider := datastore.NewDataStoreStatusProviderImpl(store, nil)
30-
p.dataSourceUpdates = NewDataSourceUpdateSinkImpl(store, dataStoreStatusProvider, statusBroadcaster, flagBroadcaster,
31-
0, sharedtest.NewTestLoggers())
30+
p.dataSourceUpdates = NewDataSourceUpdateSinkImpl(
31+
store,
32+
dataStoreStatusProvider,
33+
statusBroadcaster,
34+
flagBroadcaster,
35+
0,
36+
sharedtest.NewTestLoggers(),
37+
)
3238
p.dataSourceStatusProvider = NewDataSourceStatusProviderImpl(statusBroadcaster, p.dataSourceUpdates)
3339

3440
action(p)

internal/datasource/data_source_update_sink_impl.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/launchdarkly/go-server-sdk/v7/internal/toposort"
99

1010
"github.com/launchdarkly/go-sdk-common/v3/ldlog"
11+
"github.com/launchdarkly/go-sdk-common/v3/ldvalue"
1112
intf "github.com/launchdarkly/go-server-sdk/v7/interfaces"
1213
"github.com/launchdarkly/go-server-sdk/v7/internal"
1314
"github.com/launchdarkly/go-server-sdk/v7/internal/datakinds"
@@ -29,6 +30,7 @@ type DataSourceUpdateSinkImpl struct {
2930
currentStatus intf.DataSourceStatus
3031
lastStoreUpdateFailed bool
3132
lock sync.Mutex
33+
environmentID ldvalue.OptionalString
3234
}
3335

3436
// NewDataSourceUpdateSinkImpl creates the internal implementation of DataSourceUpdateSink.
@@ -296,6 +298,16 @@ func (d *DataSourceUpdateSinkImpl) computeChangedItemsForFullDataSet(
296298
return affectedItems
297299
}
298300

301+
//nolint:revive // EnvironmentIDProvider method.
302+
func (d *DataSourceUpdateSinkImpl) GetEnvironmentID() ldvalue.OptionalString {
303+
return d.environmentID
304+
}
305+
306+
//nolint:revive // DataSourceUpdateSinkWithEnvironmentID method.
307+
func (d *DataSourceUpdateSinkImpl) SetEnvironmentID(environmentID ldvalue.OptionalString) {
308+
d.environmentID = environmentID
309+
}
310+
299311
type outageTracker struct {
300312
outageLoggingTimeout time.Duration
301313
loggers ldlog.Loggers

internal/datasource/polling_data_source.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package datasource
22

33
import (
4+
"net/http"
45
"sync"
56
"time"
67

@@ -28,7 +29,7 @@ type PollingConfig struct {
2829
// Requester allows PollingProcessor to delegate fetching data to another component.
2930
// This is useful for testing the PollingProcessor without needing to set up a test HTTP server.
3031
type Requester interface {
31-
Request() (data []ldstoretypes.Collection, cached bool, err error)
32+
Request() (data []ldstoretypes.Collection, cached bool, headers http.Header, err error)
3233
BaseURI() string
3334
FilterKey() string
3435
}
@@ -145,7 +146,7 @@ func (pp *PollingProcessor) Start(closeWhenReady chan<- struct{}) {
145146
}
146147

147148
func (pp *PollingProcessor) poll() error {
148-
allData, cached, err := pp.requester.Request()
149+
allData, cached, headers, err := pp.requester.Request()
149150

150151
if err != nil {
151152
return err
@@ -154,6 +155,10 @@ func (pp *PollingProcessor) poll() error {
154155
// We initialize the store only if the request wasn't cached
155156
if !cached {
156157
pp.dataSourceUpdates.Init(allData)
158+
if dataSourceWithInitMetadata, ok := pp.dataSourceUpdates.(subsystems.DataSourceUpdateSinkWithEnvironmentID); ok {
159+
initMetadata := internal.NewInitMetadataFromHeaders(headers)
160+
dataSourceWithInitMetadata.SetEnvironmentID(initMetadata.GetEnvironmentID())
161+
}
157162
}
158163
return nil
159164
}

internal/datasource/polling_http_request.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -77,35 +77,35 @@ func (r *PollingRequester) FilterKey() string {
7777
// It returns the data in the form of a slice of ldstoretypes.Collection.
7878
// The boolean return value indicates whether the data was served from the cache.
7979
// If an error occurs, it will be returned as the third return value.
80-
func (r *PollingRequester) Request() ([]ldstoretypes.Collection, bool, error) {
80+
func (r *PollingRequester) Request() ([]ldstoretypes.Collection, bool, http.Header, error) {
8181
if r.loggers.IsDebugEnabled() {
8282
r.loggers.Debug("Polling LaunchDarkly for feature flag updates")
8383
}
8484

85-
body, cached, err := r.makeRequest(endpoints.PollingRequestPath)
85+
body, cached, headers, err := r.makeRequest(endpoints.PollingRequestPath)
8686
if err != nil {
87-
return nil, false, err
87+
return nil, false, headers, err
8888
}
8989
if cached {
90-
return nil, true, nil
90+
return nil, true, headers, nil
9191
}
9292

9393
reader := jreader.NewReader(body)
9494
data := parseAllStoreDataFromJSONReader(&reader)
9595
if err := reader.Error(); err != nil {
96-
return nil, false, malformedJSONError{err}
96+
return nil, false, headers, malformedJSONError{err}
9797
}
98-
return data, cached, nil
98+
return data, cached, headers, nil
9999
}
100100

101-
func (r *PollingRequester) makeRequest(resource string) ([]byte, bool, error) {
101+
func (r *PollingRequester) makeRequest(resource string) ([]byte, bool, http.Header, error) {
102102
req, reqErr := http.NewRequest("GET", endpoints.AddPath(r.baseURI, resource), nil)
103103
if reqErr != nil {
104104
reqErr = fmt.Errorf(
105105
"unable to create a poll request; this is not a network problem, most likely a bad base URI: %w",
106106
reqErr,
107107
)
108-
return nil, false, reqErr
108+
return nil, false, nil, reqErr
109109
}
110110
if r.filterKey != "" {
111111
req.URL.RawQuery = url.Values{
@@ -120,7 +120,7 @@ func (r *PollingRequester) makeRequest(resource string) ([]byte, bool, error) {
120120
res, resErr := r.httpClient.Do(req)
121121

122122
if resErr != nil {
123-
return nil, false, resErr
123+
return nil, false, nil, resErr
124124
}
125125

126126
defer func() {
@@ -129,15 +129,15 @@ func (r *PollingRequester) makeRequest(resource string) ([]byte, bool, error) {
129129
}()
130130

131131
if err := checkForHTTPError(res.StatusCode, url); err != nil {
132-
return nil, false, err
132+
return nil, false, res.Header, err
133133
}
134134

135135
cached := res.Header.Get(httpcache.XFromCache) != ""
136136

137137
body, ioErr := io.ReadAll(res.Body)
138138

139139
if ioErr != nil {
140-
return nil, false, ioErr // COVERAGE: there is no way to simulate this condition in unit tests
140+
return nil, false, res.Header, ioErr // COVERAGE: there is no way to simulate this condition in unit tests
141141
}
142-
return body, cached, nil
142+
return body, cached, res.Header, nil
143143
}

internal/datasource/polling_http_request_test.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,11 @@ func TestRequestorImplRequestAll(t *testing.T) {
3131
httphelpers.WithServer(handler, func(ts *httptest.Server) {
3232
r := NewPollingRequester(sharedtest.BasicClientContext(), nil, ts.URL, filter.key)
3333

34-
data, cached, err := r.Request()
34+
data, cached, headers, err := r.Request()
3535

3636
assert.NoError(t, err)
3737
assert.False(t, cached)
38+
assert.NotEmpty(t, headers)
3839

3940
assert.Equal(t, sharedtest.NormalizeDataSet(expectedData.Build()), sharedtest.NormalizeDataSet(data))
4041

@@ -49,14 +50,15 @@ func TestRequestorImplRequestAll(t *testing.T) {
4950
httphelpers.WithServer(handler, func(ts *httptest.Server) {
5051
r := NewPollingRequester(sharedtest.BasicClientContext(), nil, ts.URL, filter.key)
5152

52-
data, cached, err := r.Request()
53+
data, cached, headers, err := r.Request()
5354

5455
assert.Error(t, err)
5556
if he, ok := err.(httpStatusError); assert.True(t, ok) {
5657
assert.Equal(t, 500, he.Code)
5758
}
5859
assert.False(t, cached)
5960
assert.Nil(t, data)
61+
assert.NotEmpty(t, headers)
6062
})
6163
})
6264

@@ -68,37 +70,40 @@ func TestRequestorImplRequestAll(t *testing.T) {
6870
})
6971
r := NewPollingRequester(sharedtest.BasicClientContext(), nil, closedServerURL, filter.key)
7072

71-
data, cached, err := r.Request()
73+
data, cached, headers, err := r.Request()
7274

7375
assert.Error(t, err)
7476
assert.False(t, cached)
7577
assert.Nil(t, data)
78+
assert.Nil(t, headers)
7679
})
7780

7881
t.Run("malformed data", func(t *testing.T) {
7982
handler := httphelpers.HandlerWithResponse(200, nil, []byte("{"))
8083
httphelpers.WithServer(handler, func(ts *httptest.Server) {
8184
r := NewPollingRequester(sharedtest.BasicClientContext(), nil, ts.URL, filter.key)
8285

83-
data, cached, err := r.Request()
86+
data, cached, headers, err := r.Request()
8487

8588
require.Error(t, err)
8689
_, ok := err.(malformedJSONError)
8790
assert.True(t, ok)
8891
assert.False(t, cached)
8992
assert.Nil(t, data)
93+
assert.NotEmpty(t, headers)
9094
})
9195
})
9296

9397
t.Run("malformed base URI", func(t *testing.T) {
9498
r := NewPollingRequester(sharedtest.BasicClientContext(), nil, "::::", filter.key)
9599

96-
data, cached, err := r.Request()
100+
data, cached, headers, err := r.Request()
97101

98102
require.Error(t, err)
99103
assert.Contains(t, err.Error(), "missing protocol scheme")
100104
assert.False(t, cached)
101105
assert.Nil(t, data)
106+
assert.Nil(t, headers)
102107
})
103108

104109
t.Run("sends configured headers", func(t *testing.T) {
@@ -113,7 +118,7 @@ func TestRequestorImplRequestAll(t *testing.T) {
113118
httphelpers.WithServer(handler, func(ts *httptest.Server) {
114119
r := NewPollingRequester(context, nil, ts.URL, filter.key)
115120

116-
_, _, err := r.Request()
121+
_, _, _, err := r.Request()
117122
assert.NoError(t, err)
118123

119124
req := <-requestsCh
@@ -130,7 +135,7 @@ func TestRequestorImplRequestAll(t *testing.T) {
130135
httphelpers.WithServer(handler, func(ts *httptest.Server) {
131136
r := NewPollingRequester(context, nil, ts.URL, filter.key)
132137

133-
_, _, err := r.Request()
138+
_, _, _, err := r.Request()
134139
assert.NoError(t, err)
135140

136141
assert.Equal(t, []string{"Polling LaunchDarkly for feature flag updates"},
@@ -159,10 +164,11 @@ func TestRequestorImplCaching(t *testing.T) {
159164
httphelpers.WithServer(handler, func(ts *httptest.Server) {
160165
r := NewPollingRequester(sharedtest.BasicClientContext(), nil, ts.URL, filter.key)
161166

162-
data1, cached1, err1 := r.Request()
167+
data1, cached1, headers1, err1 := r.Request()
163168

164169
assert.NoError(t, err1)
165170
assert.False(t, cached1)
171+
assert.NotEmpty(t, headers1)
166172
assert.Equal(t, sharedtest.NormalizeDataSet(expectedData.Build()), sharedtest.NormalizeDataSet(data1))
167173

168174
req1 := <-requestsCh
@@ -171,10 +177,11 @@ func TestRequestorImplCaching(t *testing.T) {
171177

172178
assert.Equal(t, "", req1.Request.Header.Get("If-None-Match"))
173179

174-
data2, cached2, err2 := r.Request()
180+
data2, cached2, headers2, err2 := r.Request()
175181

176182
assert.NoError(t, err2)
177183
assert.True(t, cached2)
184+
assert.NotEmpty(t, headers2)
178185
assert.Nil(t, data2) // for cached data, requestAll doesn't bother parsing the body
179186

180187
req2 := <-requestsCh
@@ -196,7 +203,7 @@ func TestRequestorImplCanUseCustomHTTPClientFactory(t *testing.T) {
196203
httphelpers.WithServer(pollHandler, func(ts *httptest.Server) {
197204
r := NewPollingRequester(context, nil, ts.URL, "")
198205

199-
_, _, _ = r.Request()
206+
_, _, _, _ = r.Request()
200207

201208
req := <-requestsCh
202209

@@ -212,7 +219,7 @@ func TestRequestorImplCanAppendsFilterParameter(t *testing.T) {
212219
httphelpers.WithServer(pollHandler, func(ts *httptest.Server) {
213220
r := NewPollingRequester(sharedtest.BasicClientContext(), nil, ts.URL, filter.key)
214221

215-
_, _, _ = r.Request()
222+
_, _, _, _ = r.Request()
216223

217224
req := <-requestsCh
218225

internal/datasource/streaming_data_source.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,13 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan<
191191
break
192192
}
193193
if sp.dataSourceUpdates.Init(put.Data) {
194+
if eventWithHeaders, ok := event.(es.EventWithHeaders); ok {
195+
if dataSourceUpdatesWithInitMetadata, ok :=
196+
sp.dataSourceUpdates.(subsystems.DataSourceUpdateSinkWithEnvironmentID); ok {
197+
initMetadata := internal.NewInitMetadataFromHeaders(eventWithHeaders.Headers())
198+
dataSourceUpdatesWithInitMetadata.SetEnvironmentID(initMetadata.GetEnvironmentID())
199+
}
200+
}
194201
sp.setInitializedAndNotifyClient(true, closeWhenReady)
195202
} else {
196203
storeUpdateFailed("initial streaming data")

internal/datasourcev2/fdv1_polling_data_source.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package datasourcev2
33
import (
44
"context"
55
"encoding/json"
6+
"net/http"
67

78
"github.com/launchdarkly/go-sdk-common/v3/ldlog"
89
"github.com/launchdarkly/go-server-sdk/v7/internal/datakinds"
@@ -18,10 +19,10 @@ type fdv1ToFDv2Requester struct {
1819
func (r *fdv1ToFDv2Requester) Request(
1920
ctx context.Context,
2021
selector subsystems.Selector,
21-
) (*subsystems.ChangeSet, error) {
22-
data, cached, err := r.requester.Request()
22+
) (*subsystems.ChangeSet, http.Header, error) {
23+
data, cached, headers, err := r.requester.Request()
2324
if err != nil {
24-
return nil, err
25+
return nil, headers, err
2526
}
2627

2728
code := subsystems.IntentTransferFull
@@ -42,7 +43,7 @@ func (r *fdv1ToFDv2Requester) Request(
4243
},
4344
})
4445
if err != nil {
45-
return nil, err
46+
return nil, headers, err
4647
}
4748

4849
for _, item := range data {
@@ -59,14 +60,15 @@ func (r *fdv1ToFDv2Requester) Request(
5960
bytes, err := json.Marshal(keyedItem.Item.Item)
6061
if err != nil {
6162
r.loggers.Warn("Error marshalling v1 item to JSON: %s", err)
62-
return nil, err
63+
return nil, headers, err
6364
}
6465

6566
changeSetBuilder.AddPut(kind, keyedItem.Key, keyedItem.Item.Version, bytes)
6667
}
6768
}
6869

69-
return changeSetBuilder.Finish(subsystems.NewSelector("state", 1))
70+
changeset, err := changeSetBuilder.Finish(subsystems.NewSelector("state", 1))
71+
return changeset, headers, err
7072
}
7173

7274
func (r *fdv1ToFDv2Requester) BaseURI() string {

0 commit comments

Comments
 (0)