1+ import logging
12from time import sleep
23from unittest import SkipTest
34from unittest .case import skipIf
45
56from parameterized import parameterized
6- from tenacity import retry , retry_if_exception , stop_after_attempt
7+ from tenacity import (
8+ after_log ,
9+ retry ,
10+ retry_if_exception ,
11+ stop_after_attempt ,
12+ wait_exponential ,
13+ wait_random ,
14+ )
715
816from integration .config .service_names import SCHEDULE_EVENT
917from integration .conftest import clean_bucket
1018from integration .helpers .base_test import S3_BUCKET_PREFIX , BaseTest , nonblocking
1119from integration .helpers .resource import current_region_does_not_support , generate_suffix
1220
13- retry_once = retry (
14- stop = stop_after_attempt (2 ),
15- # unittest raises SkipTest for skipping tests
21+ LOG = logging .getLogger (__name__ )
22+
23+ retry_with_backoff = retry (
24+ stop = stop_after_attempt (5 ),
25+ wait = wait_exponential (multiplier = 1 , min = 4 , max = 16 ) + wait_random (0 , 1 ),
1626 retry = retry_if_exception (lambda e : not isinstance (e , SkipTest )),
27+ after = after_log (LOG , logging .WARNING ),
28+ reraise = True ,
1729)
1830
1931
@@ -31,19 +43,18 @@ class TestConnectorsWithEventBus(BaseTest):
3143 ("combination/connector_function_to_eventbus_write" ,),
3244 ]
3345 )
34- @retry_once
3546 def test_connector_by_invoking_a_function_with_eventbus (self , template_file_path ):
3647 self .create_and_verify_stack (template_file_path )
3748
3849 lambda_function_name = self .get_physical_id_by_logical_id ("TriggerFunction" )
39- lambda_client = self .client_provider . lambda_client
40-
41- request_params = {
42- "FunctionName" : lambda_function_name ,
43- "InvocationType" : "RequestResponse" ,
44- "Payload" : "{}" ,
45- }
46- response = lambda_client . invoke ( ** request_params )
50+ self .verify_lambda_invocation ( lambda_function_name )
51+
52+ @ retry_with_backoff
53+ def verify_lambda_invocation ( self , lambda_function_name ):
54+ """Verify Lambda function invocation with retry logic."""
55+ response = self . client_provider . lambda_client . invoke (
56+ FunctionName = lambda_function_name , InvocationType = "RequestResponse" , Payload = "{}"
57+ )
4758 self .assertEqual (response .get ("StatusCode" ), 200 )
4859 self .assertEqual (response .get ("FunctionError" ), None )
4960
@@ -57,6 +68,15 @@ def tearDown(self):
5768 clean_bucket (bucket_name , self .client_provider .s3_client )
5869 super ().tearDown ()
5970
71+ @retry_with_backoff
72+ def verify_lambda_invocation (self , lambda_function_name ):
73+ """Verify Lambda function invocation with retry logic."""
74+ response = self .client_provider .lambda_client .invoke (
75+ FunctionName = lambda_function_name , InvocationType = "RequestResponse" , Payload = "{}"
76+ )
77+ self .assertEqual (response .get ("StatusCode" ), 200 )
78+ self .assertEqual (response .get ("FunctionError" ), None )
79+
6080 @parameterized .expand (
6181 [
6282 ("combination/connector_appsync_api_to_lambda" ,),
@@ -88,46 +108,26 @@ def tearDown(self):
88108 ("combination/embedded_connector" ,),
89109 ]
90110 )
91- @retry_once
92111 def test_connector_by_invoking_a_function (self , template_file_path ):
93112 self .skip_using_service_detector (template_file_path )
94113 self .create_and_verify_stack (template_file_path )
95114
96115 lambda_function_name = self .get_physical_id_by_logical_id ("TriggerFunction" )
97- lambda_client = self .client_provider .lambda_client
98-
99- request_params = {
100- "FunctionName" : lambda_function_name ,
101- "InvocationType" : "RequestResponse" ,
102- "Payload" : "{}" ,
103- }
104- response = lambda_client .invoke (** request_params )
105- self .assertEqual (response .get ("StatusCode" ), 200 )
106- self .assertEqual (response .get ("FunctionError" ), None )
116+ self .verify_lambda_invocation (lambda_function_name )
107117
108118 @parameterized .expand (
109119 [
110120 ("combination/connector_function_to_location_place_index" ,),
111121 ]
112122 )
113- @retry_once
114123 def test_connector_by_invoking_a_function_with_parameters (self , template_file_path ):
115124 parameters = []
116125 parameters .append (self .generate_parameter ("IndexName" , f"PlaceIndex-{ generate_suffix ()} " ))
117126 self .skip_using_service_detector (template_file_path )
118127 self .create_and_verify_stack (template_file_path , parameters )
119128
120129 lambda_function_name = self .get_physical_id_by_logical_id ("TriggerFunction" )
121- lambda_client = self .client_provider .lambda_client
122-
123- request_params = {
124- "FunctionName" : lambda_function_name ,
125- "InvocationType" : "RequestResponse" ,
126- "Payload" : "{}" ,
127- }
128- response = lambda_client .invoke (** request_params )
129- self .assertEqual (response .get ("StatusCode" ), 200 )
130- self .assertEqual (response .get ("FunctionError" ), None )
130+ self .verify_lambda_invocation (lambda_function_name )
131131
132132 @parameterized .expand (
133133 [
@@ -144,17 +144,17 @@ def test_connector_by_invoking_a_function_with_parameters(self, template_file_pa
144144 ("combination/connector_sfn_to_eb_custom_write" ,),
145145 ]
146146 )
147- @retry_once
148147 def test_connector_by_sync_execute_an_state_machine (self , template_file_path ):
149148 self .skip_using_service_detector (template_file_path )
150149 self .create_and_verify_stack (template_file_path )
151150
152151 state_machine_arn = self .get_physical_id_by_logical_id ("TriggerStateMachine" )
153- sfn_client = self .client_provider . sfn_client
152+ self .verify_sync_step_function_execution ( state_machine_arn )
154153
155- response = sfn_client .start_sync_execution (
156- stateMachineArn = state_machine_arn ,
157- )
154+ @retry_with_backoff
155+ def verify_sync_step_function_execution (self , state_machine_arn ):
156+ """Verify synchronous Step Function execution with retry logic."""
157+ response = self .client_provider .sfn_client .start_sync_execution (stateMachineArn = state_machine_arn )
158158 # Without permission, it will be "FAILED"
159159 self .assertEqual (response .get ("status" ), "SUCCEEDED" )
160160
@@ -163,17 +163,18 @@ def test_connector_by_sync_execute_an_state_machine(self, template_file_path):
163163 ("combination/connector_sfn_to_sfn_sync" ,),
164164 ]
165165 )
166- @retry_once
167166 def test_connector_by_async_execute_an_state_machine (self , template_file_path ):
168167 self .skip_using_service_detector (template_file_path )
169168 self .create_and_verify_stack (template_file_path )
170169
171170 state_machine_arn = self .get_physical_id_by_logical_id ("TriggerStateMachine" )
172- sfn_client = self .client_provider . sfn_client
171+ self .verify_async_step_function_execution ( state_machine_arn )
173172
174- response = sfn_client .start_execution (
175- stateMachineArn = state_machine_arn ,
176- )
173+ @retry_with_backoff
174+ def verify_async_step_function_execution (self , state_machine_arn ):
175+ """Verify asynchronous Step Function execution with retry logic."""
176+ sfn_client = self .client_provider .sfn_client
177+ response = sfn_client .start_execution (stateMachineArn = state_machine_arn )
177178 execution_arn = response ["executionArn" ]
178179
179180 status = None
@@ -196,7 +197,6 @@ def test_connector_by_async_execute_an_state_machine(self, template_file_path):
196197 ("combination/connector_bucket_to_function_write" ,),
197198 ]
198199 )
199- @retry_once
200200 def test_connector_by_execute_a_s3_bucket (self , template_file_path ):
201201 self .skip_using_service_detector (template_file_path )
202202 bucket_name = S3_BUCKET_PREFIX + "connector" + generate_suffix ()
@@ -205,13 +205,4 @@ def test_connector_by_execute_a_s3_bucket(self, template_file_path):
205205 )
206206
207207 lambda_function_name = self .get_physical_id_by_logical_id ("TriggerFunction" )
208- lambda_client = self .client_provider .lambda_client
209-
210- request_params = {
211- "FunctionName" : lambda_function_name ,
212- "InvocationType" : "RequestResponse" ,
213- "Payload" : "{}" ,
214- }
215- response = lambda_client .invoke (** request_params )
216- self .assertEqual (response .get ("StatusCode" ), 200 )
217- self .assertEqual (response .get ("FunctionError" ), None )
208+ self .verify_lambda_invocation (lambda_function_name )
0 commit comments