Skip to content

Commit 5a32555

Browse files
authored
Eventstream Refactor (#816)
This PR is a substantial rewrite of the eventstream RPC bindings as well as the code-generated clients for eventstream-based services. This refactor was necessitated by a variety of deadlock and race condition problems with the original implementation. The complexity of the original implementation made targeted fixes nearly impossible to apply. ## Refactor Goals * No blocking in destructors. In order to try and maintain behavioral compatibility with the previous implementation, we try and synchronously simulate the asynchronous events that would happen during a blocking destroy. * Simplified synchronization model where * * User callbacks are never invoked inside a lock * * C APIs are never invoked inside a lock ## Public API Changes The original implementation exposed a large amount of unnecessary details in the public API. As part of the refactor, we make a number of publicly visible changes that, while technically breaking, we believe should not be user-impacting. We consider a change to be user-impacting if it is a breaking change to a type that is used during service client interaction. We detail each change below as well as the reasoning why we think making this change is safe. Obviously, if you were mocking out any of these changed type contracts, then they will be breaking. * All OperationModelContext subclasses have been made private. These types were used internally by the service model and there is no reason to expose them. * ContinuationCallbackData removed. Was not user-facing. Unneeded in refactor * ClientContinuationHandler - Public functions that were only for internal use have been removed. Class now useless but has been retained in case users were tracking operations by it. * ClientContinuation - Internal type that has been re-implemented as the private type ClientContinuationImpl * ClientOperation * * Constructor type signature has changed - This type is only constructed internally by generated code * * GetOperationResult API removed - This function could not be called externally without triggering exceptions by multi-consuming a promise's future * * WithLaunchMode - This function persists but no longer does anything useful. Launch mode is no longer relevant to the processing of operations and was a mistake to include originally. * ClientConnection - This is an internal type used by generated service clients. * * Constructor signature changed. * * SendPing and SendPingResponse removed. * * Connect and NewStream signatures changed. * * bool operator removed ## Additional Changes We now launch an EchoTest RPC server in CI and run a much larger suite of tests against it. By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
1 parent c31daf7 commit 5a32555

File tree

16 files changed

+6627
-3761
lines changed

16 files changed

+6627
-3761
lines changed
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
import atexit
2+
import Builder
3+
import os
4+
import shutil
5+
import subprocess
6+
import sys
7+
import time
8+
9+
class SetupEventstreamServer(Builder.Action):
10+
11+
def _build_and_run_eventstream_echo_server(self, env):
12+
java_sdk_dir = None
13+
14+
try:
15+
env.shell.exec(["mvn", "--version"], check=True)
16+
17+
# maven is installed, so this is a configuration we can start an event stream echo server
18+
java_sdk_dir = env.shell.mktemp()
19+
20+
env.shell.exec(["git", "clone", "https://github.com/aws/aws-iot-device-sdk-java-v2"], working_dir=java_sdk_dir, check=True)
21+
22+
sdk_dir = os.path.join(java_sdk_dir, "aws-iot-device-sdk-java-v2", "sdk")
23+
env.shell.pushd(sdk_dir)
24+
25+
try:
26+
# The EchoTest server is in test-only code
27+
env.shell.exec(["mvn", "test-compile"], check=True)
28+
env.shell.exec(["mvn", "dependency:build-classpath", "-Dmdep.outputFile=classpath.txt"], check=True)
29+
30+
time.sleep(1)
31+
with open('classpath.txt', 'r') as file:
32+
classpath = file.read()
33+
34+
test_class_path = os.path.join(sdk_dir, "target", "test-classes")
35+
target_class_path = os.path.join(sdk_dir, "target", "classes")
36+
directory_separator = os.pathsep
37+
38+
echo_server_probe_command = [
39+
"java",
40+
"-classpath",
41+
f"{test_class_path}{directory_separator}{target_class_path}{directory_separator}{classpath}",
42+
"software.amazon.awssdk.eventstreamrpc.echotest.EchoTestServiceRunner"]
43+
44+
probe_command_flat = " ".join(echo_server_probe_command)
45+
print(f'Echo probe command: {probe_command_flat}')
46+
47+
"""
48+
Try to run the echo server in the foreground without required arguments. This always fails, but
49+
the output can tell us whether or not the Java CRT is available on the platform (we have SDK CI
50+
that runs on platforms that the Java CRT does not support).
51+
52+
If the CRT supports the platform, we fail with an out-of-index exception (referencing non-existent
53+
command line arguments)
54+
55+
If the CRT does not support the platform, we fail with
56+
'java.io.IOException: Unable to open library in jar for AWS CRT'
57+
"""
58+
probe_output = ""
59+
probe = subprocess.Popen(
60+
echo_server_probe_command,
61+
stderr=subprocess.PIPE)
62+
with probe:
63+
64+
# Convert all output to strings, which makes it much easier to both print
65+
# and process, since all known uses of parsing output want strings anyway
66+
line = probe.stderr.readline()
67+
while (line):
68+
# ignore weird characters coming back from the shell (colors, etc)
69+
if not isinstance(line, str):
70+
line = line.decode('ascii', 'ignore')
71+
# We're reading in binary mode, so no automatic newline translation
72+
if sys.platform == 'win32':
73+
line = line.replace('\r\n', '\n')
74+
probe_output += line
75+
line = probe.stderr.readline()
76+
probe.wait()
77+
78+
print("Probe stderr:\n\n")
79+
print(probe_output)
80+
81+
if "java.io.IOException" in probe_output:
82+
print("Skipping eventstream server unsupported platform")
83+
raise Exception("Java CRT not supported by this platform")
84+
85+
echo_server_command = [
86+
"java",
87+
"-classpath",
88+
f"{test_class_path}{directory_separator}{target_class_path}{directory_separator}{classpath}",
89+
"software.amazon.awssdk.eventstreamrpc.echotest.EchoTestServiceRunner",
90+
"127.0.0.1",
91+
"8033"]
92+
93+
print(f'Echo server command: {echo_server_command}')
94+
95+
# bypass builder's exec wrapper since it doesn't allow for background execution
96+
proc = subprocess.Popen(echo_server_command)
97+
98+
@atexit.register
99+
def _terminate_echo_server():
100+
proc.terminate()
101+
proc.wait()
102+
shutil.rmtree(java_sdk_dir)
103+
104+
env.shell.setenv("AWS_TEST_EVENT_STREAM_ECHO_SERVER_HOST", "127.0.0.1", quiet=False)
105+
env.shell.setenv("AWS_TEST_EVENT_STREAM_ECHO_SERVER_PORT", "8033", quiet=False)
106+
finally:
107+
env.shell.popd()
108+
109+
except Exception as ex:
110+
print('Failed to set up event stream server. Eventstream CI tests will not be run.')
111+
print(ex)
112+
113+
return
114+
115+
def run(self, env):
116+
117+
actions = []
118+
119+
try:
120+
self._build_and_run_eventstream_echo_server(env)
121+
except:
122+
pass
123+
124+
return Builder.Script(actions, name='setup-eventstream-server')

0 commit comments

Comments
 (0)