1
1
use async_trait:: async_trait;
2
+ use chrono:: { DateTime , Duration , Utc } ;
3
+ use once_cell:: sync:: Lazy ;
2
4
use open_feature:: provider:: { FeatureProvider , ProviderMetadata , ResolutionDetails } ;
3
5
use open_feature:: {
4
6
EvaluationContext , EvaluationContextFieldValue , EvaluationError , EvaluationErrorCode ,
5
7
EvaluationResult , StructValue , Value ,
6
8
} ;
7
9
use reqwest:: Client ;
8
10
use reqwest:: StatusCode ;
11
+ use reqwest:: header:: RETRY_AFTER ;
9
12
use std:: any;
13
+ use tokio:: sync:: Mutex ;
10
14
use tracing:: { debug, error, instrument} ;
11
15
12
16
use crate :: OfrepOptions ;
13
17
18
+ static CURRENT_RETRY_AFTER : Lazy < Mutex < DateTime < Utc > > > = Lazy :: new ( || Mutex :: new ( Utc :: now ( ) ) ) ;
19
+
14
20
#[ derive( Debug ) ]
15
21
pub struct Resolver {
16
22
base_url : String ,
@@ -31,13 +37,51 @@ impl Resolver {
31
37
}
32
38
}
33
39
40
+ async fn parse_retry_after ( retry_after : & str ) -> DateTime < Utc > {
41
+ let now = Utc :: now ( ) ;
42
+
43
+ if retry_after. trim ( ) . is_empty ( ) {
44
+ return now;
45
+ }
46
+
47
+ if let Ok ( seconds) = retry_after. trim ( ) . parse :: < i64 > ( ) {
48
+ return now + Duration :: seconds ( seconds) ;
49
+ }
50
+
51
+ if let Ok ( parsed_date) = retry_after. trim ( ) . parse :: < DateTime < Utc > > ( ) {
52
+ return parsed_date. with_timezone ( & Utc ) ;
53
+ }
54
+
55
+ debug ! ( "Failed to parse Retry-After header : {}" , retry_after) ;
56
+ now
57
+ }
58
+
59
+ async fn update_retry_after ( new_retry_after : DateTime < Utc > ) {
60
+ let mut retry_after = CURRENT_RETRY_AFTER . lock ( ) . await ;
61
+ * retry_after = new_retry_after;
62
+ }
63
+
64
+ async fn is_rate_limit_exceeded ( ) -> bool {
65
+ let retry_after = CURRENT_RETRY_AFTER . lock ( ) . await ;
66
+ Utc :: now ( ) < * retry_after
67
+ }
68
+
34
69
#[ instrument( skip( self , evaluation_context) , fields( flag_key = %flag_key) ) ]
35
70
async fn resolve_value < T : std:: fmt:: Debug > (
36
71
& self ,
37
72
flag_key : & str ,
38
73
evaluation_context : & EvaluationContext ,
39
74
convertor : fn ( serde_json:: Value ) -> Option < T > ,
40
75
) -> EvaluationResult < ResolutionDetails < T > > {
76
+ if Resolver :: is_rate_limit_exceeded ( ) . await {
77
+ return Err ( EvaluationError {
78
+ code : EvaluationErrorCode :: General ( "Rate limit exceeded" . to_string ( ) ) ,
79
+ message : Some (
80
+ "Rate limit exceeded. Please wait before making another request." . to_string ( ) ,
81
+ ) ,
82
+ } ) ;
83
+ }
84
+
41
85
debug ! ( "Resolving {} flag" , std:: any:: type_name:: <T >( ) ) ;
42
86
let payload = serde_json:: json!( {
43
87
"context" : context_to_json( evaluation_context)
@@ -87,6 +131,28 @@ impl Resolver {
87
131
message : Some ( format ! ( "Flag: {flag_key} not found" ) ) ,
88
132
} ) ;
89
133
}
134
+ StatusCode :: TOO_MANY_REQUESTS => {
135
+ let header_retry_after: Option < & str > = response
136
+ . headers ( )
137
+ . get ( RETRY_AFTER )
138
+ . and_then ( |value| value. to_str ( ) . ok ( ) ) ;
139
+
140
+ if let Some ( header_retry_after) = header_retry_after {
141
+ let new_retry_after: DateTime < Utc > =
142
+ Resolver :: parse_retry_after ( header_retry_after) . await ;
143
+ Resolver :: update_retry_after ( new_retry_after) . await ;
144
+ } else {
145
+ debug ! ( "Couldn't parse the retry-after header." ) ;
146
+ let mut retry_after = CURRENT_RETRY_AFTER . lock ( ) . await ;
147
+ * retry_after = Utc :: now ( ) ;
148
+ }
149
+
150
+ let retry_after = CURRENT_RETRY_AFTER . lock ( ) . await ;
151
+ return Err ( EvaluationError {
152
+ code : EvaluationErrorCode :: General ( "Rate limit exceeded" . to_string ( ) ) ,
153
+ message : Some ( format ! ( "Rate limit exceeded. Retry after {}" , * retry_after) ) ,
154
+ } ) ;
155
+ }
90
156
_ => {
91
157
let result = response. json :: < serde_json:: Value > ( ) . await . map_err ( |e| {
92
158
error ! ( error = %e, "Failed to parse {} response" , any:: type_name:: <T >( ) ) ;
@@ -242,9 +308,15 @@ mod tests {
242
308
use super :: * ;
243
309
use serde_json:: json;
244
310
use test_log:: test;
311
+ use tokio:: time:: { Duration , sleep} ;
245
312
use wiremock:: matchers:: { method, path} ;
246
313
use wiremock:: { Mock , MockServer , ResponseTemplate } ;
247
314
315
+ async fn reset_states ( ) {
316
+ let mut retry_after = CURRENT_RETRY_AFTER . lock ( ) . await ;
317
+ * retry_after = Utc :: now ( ) ;
318
+ }
319
+
248
320
async fn setup_mock_server ( ) -> ( MockServer , Resolver ) {
249
321
let mock_server = MockServer :: start ( ) . await ;
250
322
let options = OfrepOptions {
@@ -256,7 +328,9 @@ mod tests {
256
328
}
257
329
258
330
#[ test( tokio:: test) ]
331
+ #[ serial_test:: serial]
259
332
async fn test_resolve_bool_value ( ) {
333
+ reset_states ( ) . await ;
260
334
let ( mock_server, resolver) = setup_mock_server ( ) . await ;
261
335
262
336
Mock :: given ( method ( "POST" ) )
@@ -281,7 +355,9 @@ mod tests {
281
355
}
282
356
283
357
#[ test( tokio:: test) ]
358
+ #[ serial_test:: serial]
284
359
async fn test_resolve_string_value ( ) {
360
+ reset_states ( ) . await ;
285
361
let ( mock_server, resolver) = setup_mock_server ( ) . await ;
286
362
287
363
Mock :: given ( method ( "POST" ) )
@@ -306,7 +382,9 @@ mod tests {
306
382
}
307
383
308
384
#[ test( tokio:: test) ]
385
+ #[ serial_test:: serial]
309
386
async fn test_resolve_float_value ( ) {
387
+ reset_states ( ) . await ;
310
388
let ( mock_server, resolver) = setup_mock_server ( ) . await ;
311
389
312
390
Mock :: given ( method ( "POST" ) )
@@ -331,7 +409,9 @@ mod tests {
331
409
}
332
410
333
411
#[ test( tokio:: test) ]
412
+ #[ serial_test:: serial]
334
413
async fn test_resolve_int_value ( ) {
414
+ reset_states ( ) . await ;
335
415
let ( mock_server, resolver) = setup_mock_server ( ) . await ;
336
416
337
417
Mock :: given ( method ( "POST" ) )
@@ -356,7 +436,9 @@ mod tests {
356
436
}
357
437
358
438
#[ test( tokio:: test) ]
439
+ #[ serial_test:: serial]
359
440
async fn test_resolve_struct_value ( ) {
441
+ reset_states ( ) . await ;
360
442
let ( mock_server, resolver) = setup_mock_server ( ) . await ;
361
443
362
444
Mock :: given ( method ( "POST" ) )
@@ -401,7 +483,9 @@ mod tests {
401
483
}
402
484
403
485
#[ test( tokio:: test) ]
486
+ #[ serial_test:: serial]
404
487
async fn test_error_400 ( ) {
488
+ reset_states ( ) . await ;
405
489
let ( mock_server, resolver) = setup_mock_server ( ) . await ;
406
490
407
491
Mock :: given ( method ( "POST" ) )
@@ -446,7 +530,9 @@ mod tests {
446
530
}
447
531
448
532
#[ test( tokio:: test) ]
533
+ #[ serial_test:: serial]
449
534
async fn test_error_401 ( ) {
535
+ reset_states ( ) . await ;
450
536
let ( mock_server, resolver) = setup_mock_server ( ) . await ;
451
537
452
538
Mock :: given ( method ( "POST" ) )
@@ -492,7 +578,9 @@ mod tests {
492
578
}
493
579
494
580
#[ test( tokio:: test) ]
581
+ #[ serial_test:: serial]
495
582
async fn test_error_403 ( ) {
583
+ reset_states ( ) . await ;
496
584
let ( mock_server, resolver) = setup_mock_server ( ) . await ;
497
585
498
586
Mock :: given ( method ( "POST" ) )
@@ -538,7 +626,9 @@ mod tests {
538
626
}
539
627
540
628
#[ test( tokio:: test) ]
629
+ #[ serial_test:: serial]
541
630
async fn test_error_404 ( ) {
631
+ reset_states ( ) . await ;
542
632
let ( mock_server, resolver) = setup_mock_server ( ) . await ;
543
633
544
634
Mock :: given ( method ( "POST" ) )
@@ -596,4 +686,67 @@ mod tests {
596
686
"Flag: test-flag not found"
597
687
) ;
598
688
}
689
+
690
+ #[ test( tokio:: test) ]
691
+ #[ serial_test:: serial]
692
+ async fn test_error_429 ( ) {
693
+ reset_states ( ) . await ;
694
+ let ( mock_server, resolver) = setup_mock_server ( ) . await ;
695
+
696
+ Mock :: given ( method ( "POST" ) )
697
+ . and ( path ( "/ofrep/v1/evaluate/flags/test-flag" ) )
698
+ . respond_with (
699
+ ResponseTemplate :: new ( 429 )
700
+ . insert_header ( "Retry-After" , "3" )
701
+ . set_body_json ( json ! ( { } ) ) ,
702
+ )
703
+ . mount ( & mock_server)
704
+ . await ;
705
+
706
+ let context = EvaluationContext :: default ( ) ;
707
+
708
+ let result_bool = resolver. resolve_bool_value ( "test-flag" , & context) . await ;
709
+ let result_bool_2 = resolver. resolve_bool_value ( "test-flag" , & context) . await ;
710
+
711
+ assert ! ( result_bool. is_err( ) ) ;
712
+ let result_bool_error = result_bool. unwrap_err ( ) ;
713
+ assert_eq ! (
714
+ result_bool_error. code,
715
+ EvaluationErrorCode :: General ( "Rate limit exceeded" . to_string( ) )
716
+ ) ;
717
+ assert ! (
718
+ result_bool_error
719
+ . message
720
+ . unwrap( )
721
+ . starts_with( "Rate limit exceeded. Retry after" )
722
+ ) ;
723
+
724
+ assert ! ( result_bool_2. is_err( ) ) ;
725
+ let result_bool_error_2 = result_bool_2. unwrap_err ( ) ;
726
+ assert_eq ! (
727
+ result_bool_error_2. code,
728
+ EvaluationErrorCode :: General ( "Rate limit exceeded" . to_string( ) )
729
+ ) ;
730
+ assert_eq ! (
731
+ result_bool_error_2. message. unwrap( ) ,
732
+ "Rate limit exceeded. Please wait before making another request."
733
+ ) ;
734
+
735
+ sleep ( Duration :: from_secs ( 3 ) ) . await ;
736
+
737
+ let result_bool_3 = resolver. resolve_bool_value ( "test-flag" , & context) . await ;
738
+ assert ! ( result_bool_3. is_err( ) ) ;
739
+
740
+ let result_bool_error_3 = result_bool_3. unwrap_err ( ) ;
741
+ assert_eq ! (
742
+ result_bool_error_3. code,
743
+ EvaluationErrorCode :: General ( "Rate limit exceeded" . to_string( ) )
744
+ ) ;
745
+ assert ! (
746
+ result_bool_error_3
747
+ . message
748
+ . unwrap( )
749
+ . starts_with( "Rate limit exceeded. Retry after" )
750
+ ) ;
751
+ }
599
752
}
0 commit comments