Skip to content

Commit 6ac014d

Browse files
feat: adjust to flagd metadata toggle (#287)
* feat: adjust to flagd metadata toggle Signed-off-by: Konvalinka <[email protected]> * fix: implementation, update schema Signed-off-by: Konvalinka <[email protected]> * fix: type issues Signed-off-by: Konvalinka <[email protected]> * fix: pr feedback Signed-off-by: Konvalinka <[email protected]> * fix: tests Signed-off-by: Konvalinka <[email protected]> * fix: type issues Signed-off-by: Konvalinka <[email protected]> * Add grpc_watcher unit tests Signed-off-by: Konvalinka <[email protected]> * pr feedback Signed-off-by: Konvalinka <[email protected]> * pr feedback Signed-off-by: Konvalinka <[email protected]> * up test time limit Signed-off-by: Konvalinka <[email protected]> --------- Signed-off-by: Konvalinka <[email protected]> Co-authored-by: Simon Schrottner <[email protected]>
1 parent 6f3793a commit 6ac014d

File tree

6 files changed

+169
-17
lines changed

6 files changed

+169
-17
lines changed

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
import grpc
88
from google.protobuf.json_format import MessageToDict
9-
from google.protobuf.struct_pb2 import Struct
9+
from grpc import StatusCode
1010

1111
from openfeature.evaluation_context import EvaluationContext
1212
from openfeature.event import ProviderEventDetails
@@ -210,6 +210,23 @@ def _create_request_args(self) -> dict:
210210

211211
return request_args
212212

213+
def _fetch_metadata(self) -> typing.Optional[sync_pb2.GetMetadataResponse]:
214+
if self.config.sync_metadata_disabled:
215+
return None
216+
217+
context_values_request = sync_pb2.GetMetadataRequest()
218+
try:
219+
context_values_response: sync_pb2.GetMetadataResponse = (
220+
self.stub.GetMetadata(context_values_request, wait_for_ready=True)
221+
)
222+
return context_values_response
223+
except grpc.RpcError as e:
224+
if e.code() == StatusCode.UNIMPLEMENTED:
225+
logger.debug(f"Error getting sync metadata: {e}")
226+
return None
227+
else:
228+
raise e
229+
213230
def listen(self) -> None:
214231
call_args = (
215232
{"timeout": self.streamline_deadline_seconds}
@@ -220,18 +237,7 @@ def listen(self) -> None:
220237

221238
while self.active:
222239
try:
223-
context_values_response: sync_pb2.GetMetadataResponse
224-
if self.config.sync_metadata_disabled:
225-
context_values_response = sync_pb2.GetMetadataResponse(
226-
metadata=Struct()
227-
)
228-
else:
229-
context_values_request = sync_pb2.GetMetadataRequest()
230-
context_values_response = self.stub.GetMetadata(
231-
context_values_request, wait_for_ready=True
232-
)
233-
234-
context_values = MessageToDict(context_values_response)
240+
context_values_response = self._fetch_metadata()
235241

236242
request = sync_pb2.SyncFlagsRequest(**request_args)
237243

@@ -245,12 +251,20 @@ def listen(self) -> None:
245251
)
246252
self.flag_store.update(json.loads(flag_str))
247253

254+
context_values = {}
255+
if flag_rsp.sync_context:
256+
context_values = MessageToDict(flag_rsp.sync_context)
257+
elif context_values_response:
258+
context_values = MessageToDict(context_values_response)[
259+
"metadata"
260+
]
261+
248262
if not self.connected:
249263
self.emit_provider_ready(
250264
ProviderEventDetails(
251265
message="gRPC sync connection established"
252266
),
253-
context_values["metadata"],
267+
context_values,
254268
)
255269
self.connected = True
256270

providers/openfeature-provider-flagd/tests/e2e/step/provider_steps.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ class TestProviderType(Enum):
3232
SSL = "ssl"
3333
SOCKET = "socket"
3434
METADATA = "metadata"
35+
SYNCPAYLOAD = "syncpayload"
3536

3637

3738
@given("a provider is registered", target_fixture="client")
@@ -71,6 +72,8 @@ def get_default_options_for_provider(
7172
return options, True
7273
elif t == TestProviderType.METADATA:
7374
launchpad = "metadata"
75+
elif t == TestProviderType.SYNCPAYLOAD:
76+
launchpad = "sync-payload"
7477

7578
if resolver_type == ResolverType.FILE:
7679
if "selector" in option_values:

providers/openfeature-provider-flagd/tests/test_errors.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,5 +138,5 @@ def fail(*args, **kwargs):
138138
)
139139

140140
elapsed = time.time() - t
141-
assert abs(elapsed - wait * 0.001) < 0.15
141+
assert abs(elapsed - wait * 0.001) < 0.17
142142
assert init_failed
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import threading
2+
import time
3+
import typing
4+
import unittest
5+
from unittest.mock import MagicMock, Mock, patch
6+
7+
from google.protobuf.json_format import MessageToDict
8+
from google.protobuf.struct_pb2 import Struct
9+
from grpc import Channel
10+
11+
from openfeature.contrib.provider.flagd.config import Config
12+
from openfeature.contrib.provider.flagd.resolvers.process.connector.grpc_watcher import (
13+
GrpcWatcher,
14+
)
15+
from openfeature.contrib.provider.flagd.resolvers.process.flags import FlagStore
16+
from openfeature.event import ProviderEventDetails
17+
from openfeature.schemas.protobuf.flagd.sync.v1.sync_pb2 import (
18+
GetMetadataResponse,
19+
SyncFlagsResponse,
20+
)
21+
from openfeature.schemas.protobuf.flagd.sync.v1.sync_pb2_grpc import FlagSyncServiceStub
22+
23+
24+
class TestGrpcWatcher(unittest.TestCase):
25+
def setUp(self):
26+
config = Mock(spec=Config)
27+
config.retry_backoff_ms = 1000
28+
config.retry_backoff_max_ms = 5000
29+
config.retry_grace_period = 5
30+
config.stream_deadline_ms = 1000
31+
config.deadline_ms = 5000
32+
config.selector = None
33+
config.provider_id = None
34+
config.tls = False
35+
config.cert_path = None
36+
config.channel_credentials = None
37+
config.host = "localhost"
38+
config.port = 5000
39+
config.sync_metadata_disabled = False
40+
41+
flag_store = Mock(spec=FlagStore)
42+
flag_store.update.return_value = None
43+
emit_provider_error = Mock()
44+
emit_provider_stale = Mock()
45+
channel = Mock(spec=Channel)
46+
self.provider_done = False
47+
self.provider_details: typing.Optional[ProviderEventDetails] = None
48+
self.context: typing.Optional[dict] = None
49+
50+
with patch(
51+
"openfeature.contrib.provider.flagd.resolvers.process.connector.grpc_watcher.GrpcWatcher._generate_channel",
52+
return_value=channel,
53+
):
54+
self.grpc_watcher = GrpcWatcher(
55+
config=config,
56+
flag_store=flag_store,
57+
emit_provider_ready=self.provider_ready,
58+
emit_provider_error=emit_provider_error,
59+
emit_provider_stale=emit_provider_stale,
60+
)
61+
self.mock_stub = MagicMock(spec=FlagSyncServiceStub)
62+
self.mock_metadata = GetMetadataResponse(metadata={"attribute": "value1"})
63+
self.mock_stub.GetMetadata = Mock(return_value=self.mock_metadata)
64+
self.grpc_watcher.stub = self.mock_stub
65+
self.grpc_watcher.active = True
66+
67+
def provider_ready(self, details: ProviderEventDetails, context: dict):
68+
self.provider_done = True
69+
self.provider_details = details
70+
self.context = context
71+
72+
def run_listen_and_shutdown_after(self):
73+
listener = threading.Thread(target=self.grpc_watcher.listen)
74+
listener.start()
75+
for _i in range(0, 100):
76+
if self.provider_done:
77+
break
78+
time.sleep(0.001)
79+
80+
self.assertTrue(self.provider_done)
81+
self.grpc_watcher.shutdown()
82+
listener.join(timeout=0.5)
83+
84+
def test_listen_with_sync_metadata_and_sync_context(self):
85+
sync_context = Struct()
86+
sync_context.update({"attribute": "value"})
87+
mock_stream_with_sync_context = iter(
88+
[
89+
SyncFlagsResponse(
90+
flag_configuration='{"flag_key": "flag_value"}',
91+
sync_context=sync_context,
92+
),
93+
]
94+
)
95+
self.mock_stub.SyncFlags = Mock(return_value=mock_stream_with_sync_context)
96+
97+
self.run_listen_and_shutdown_after()
98+
99+
self.assertEqual(
100+
self.provider_details.message, "gRPC sync connection established"
101+
)
102+
self.assertEqual(self.context, MessageToDict(sync_context))
103+
104+
def test_listen_with_sync_metadata_only(self):
105+
mock_stream_no_sync_context = iter(
106+
[
107+
SyncFlagsResponse(flag_configuration='{"flag_key": "flag_value"}'),
108+
]
109+
)
110+
self.mock_stub.SyncFlags = Mock(return_value=mock_stream_no_sync_context)
111+
112+
self.run_listen_and_shutdown_after()
113+
114+
self.assertEqual(
115+
self.provider_details.message, "gRPC sync connection established"
116+
)
117+
self.assertEqual(self.context, MessageToDict(self.mock_metadata.metadata))
118+
119+
def test_listen_with_sync_metadata_disabled_in_config(self):
120+
self.grpc_watcher.config.sync_metadata_disabled = True
121+
mock_stream_no_sync_context = iter(
122+
[
123+
SyncFlagsResponse(flag_configuration='{"flag_key": "flag_value"}'),
124+
]
125+
)
126+
self.mock_stub.SyncFlags = Mock(return_value=mock_stream_no_sync_context)
127+
128+
self.run_listen_and_shutdown_after()
129+
130+
self.mock_stub.GetMetadata.assert_not_called()
131+
132+
self.assertEqual(
133+
self.provider_details.message, "gRPC sync connection established"
134+
)
135+
self.assertEqual(self.context, {})

0 commit comments

Comments
 (0)