@@ -25,6 +25,7 @@ use quickwit_proto::types::Position;
2525use quickwit_storage:: { OwnedBytes , StorageResolver } ;
2626use serde_json:: Value ;
2727use thiserror:: Error ;
28+ use time:: { OffsetDateTime , format_description:: well_known:: Rfc3339 } ;
2829use tracing:: info;
2930
3031use super :: visibility:: VisibilityTaskHandle ;
@@ -82,18 +83,23 @@ impl RawMessage {
8283 self ,
8384 message_type : MessageType ,
8485 ) -> Result < PreProcessedMessage , PreProcessingError > {
85- let payload = match message_type {
86- MessageType :: S3Notification => PreProcessedPayload :: ObjectUri (
87- uri_from_s3_notification ( & self . payload , & self . metadata . ack_id ) ?,
88- ) ,
86+ let ( payload, timestamp_opt) = match message_type {
87+ MessageType :: S3Notification => {
88+ let ( uri, timestamp) = parse_s3_notification ( & self . payload , & self . metadata . ack_id ) ?;
89+ ( PreProcessedPayload :: ObjectUri ( uri) , Some ( timestamp) )
90+ }
8991 MessageType :: RawUri => {
9092 let payload_str = read_to_string ( self . payload ) . context ( "failed to read payload" ) ?;
91- PreProcessedPayload :: ObjectUri ( Uri :: from_str ( & payload_str) ?)
93+ (
94+ PreProcessedPayload :: ObjectUri ( Uri :: from_str ( & payload_str) ?) ,
95+ None ,
96+ )
9297 }
9398 } ;
9499 Ok ( PreProcessedMessage {
95100 metadata : self . metadata ,
96101 payload,
102+ timestamp_opt,
97103 } )
98104 }
99105}
@@ -122,6 +128,7 @@ impl PreProcessedPayload {
122128pub struct PreProcessedMessage {
123129 pub metadata : MessageMetadata ,
124130 pub payload : PreProcessedPayload ,
131+ pub timestamp_opt : Option < OffsetDateTime > ,
125132}
126133
127134impl PreProcessedMessage {
@@ -130,7 +137,10 @@ impl PreProcessedMessage {
130137 }
131138}
132139
133- fn uri_from_s3_notification ( message : & [ u8 ] , ack_id : & str ) -> Result < Uri , PreProcessingError > {
140+ fn parse_s3_notification (
141+ message : & [ u8 ] ,
142+ ack_id : & str ,
143+ ) -> Result < ( Uri , OffsetDateTime ) , PreProcessingError > {
134144 let value: Value = serde_json:: from_slice ( message) . context ( "invalid JSON message" ) ?;
135145 if matches ! ( value[ "Event" ] . as_str( ) , Some ( "s3:TestEvent" ) ) {
136146 info ! ( "discarding S3 test event" ) ;
@@ -151,6 +161,13 @@ fn uri_from_s3_notification(message: &[u8], ack_id: &str) -> Result<Uri, PreProc
151161 ack_id : ack_id. to_string ( ) ,
152162 } ) ;
153163 }
164+
165+ let event_time = value[ "Records" ] [ 0 ] [ "eventTime" ]
166+ . as_str ( )
167+ . context ( "invalid S3 notification: Records[0].eventTime not found" ) ?;
168+ let timestamp = OffsetDateTime :: parse ( event_time, & Rfc3339 )
169+ . context ( "invalid S3 notification: Records[0].eventTime not in rfc3339" ) ?;
170+
154171 let key = value[ "Records" ] [ 0 ] [ "s3" ] [ "object" ] [ "key" ]
155172 . as_str ( )
156173 . context ( "invalid S3 notification: Records[0].s3.object.key not found" ) ?;
@@ -160,7 +177,9 @@ fn uri_from_s3_notification(message: &[u8], ack_id: &str) -> Result<Uri, PreProc
160177 let encoded_key = percent_encoding:: percent_decode ( key. as_bytes ( ) )
161178 . decode_utf8 ( )
162179 . context ( "invalid S3 notification: Records[0].s3.object.key could not be url decoded" ) ?;
163- Uri :: from_str ( & format ! ( "s3://{}/{}" , bucket, encoded_key) ) . map_err ( |e| e. into ( ) )
180+ let uri = Uri :: from_str ( & format ! ( "s3://{}/{}" , bucket, encoded_key) ) ?;
181+
182+ Ok ( ( uri, timestamp) )
164183}
165184
166185/// A message for which we know as much of the global processing status as
@@ -193,6 +212,7 @@ impl ReadyMessage {
193212 batch_reader,
194213 partition_id,
195214 visibility_handle : self . visibility_handle ,
215+ timestamp_opt : self . content . timestamp_opt ,
196216 } ) )
197217 }
198218 }
@@ -209,6 +229,7 @@ pub struct InProgressMessage {
209229 pub partition_id : PartitionId ,
210230 pub visibility_handle : VisibilityTaskHandle ,
211231 pub batch_reader : ObjectUriBatchReader ,
232+ pub timestamp_opt : Option < OffsetDateTime > ,
212233}
213234
214235#[ cfg( test) ]
@@ -257,9 +278,13 @@ mod tests {
257278 }
258279 ]
259280 }"# ;
260- let actual_uri = uri_from_s3_notification ( test_message. as_bytes ( ) , "myackid" ) . unwrap ( ) ;
281+ let ( actual_uri, actual_timestamp) =
282+ parse_s3_notification ( test_message. as_bytes ( ) , "myackid" ) . unwrap ( ) ;
261283 let expected_uri = Uri :: from_str ( "s3://mybucket/logs.json" ) . unwrap ( ) ;
284+ let expected_timestamp =
285+ OffsetDateTime :: parse ( "2021-05-22T09:22:41.789Z" , & Rfc3339 ) . unwrap ( ) ;
262286 assert_eq ! ( actual_uri, expected_uri) ;
287+ assert_eq ! ( actual_timestamp, expected_timestamp) ;
263288 }
264289
265290 #[ test]
@@ -275,8 +300,7 @@ mod tests {
275300 }
276301 ]
277302 }"# ;
278- let result =
279- uri_from_s3_notification ( & OwnedBytes :: new ( invalid_message. as_bytes ( ) ) , "myackid" ) ;
303+ let result = parse_s3_notification ( & OwnedBytes :: new ( invalid_message. as_bytes ( ) ) , "myackid" ) ;
280304 assert ! ( matches!(
281305 result,
282306 Err ( PreProcessingError :: UnexpectedFormat ( _) )
@@ -321,8 +345,7 @@ mod tests {
321345 }
322346 ]
323347 }"# ;
324- let result =
325- uri_from_s3_notification ( & OwnedBytes :: new ( invalid_message. as_bytes ( ) ) , "myackid" ) ;
348+ let result = parse_s3_notification ( & OwnedBytes :: new ( invalid_message. as_bytes ( ) ) , "myackid" ) ;
326349 assert ! ( matches!(
327350 result,
328351 Err ( PreProcessingError :: Discardable { .. } )
@@ -339,8 +362,7 @@ mod tests {
339362 "RequestId":"5582815E1AEA5ADF",
340363 "HostId":"8cLeGAmw098X5cv4Zkwcmo8vvZa3eH3eKxsPzbB9wrR+YstdA6Knx4Ip8EXAMPLE"
341364 }"# ;
342- let result =
343- uri_from_s3_notification ( & OwnedBytes :: new ( invalid_message. as_bytes ( ) ) , "myackid" ) ;
365+ let result = parse_s3_notification ( & OwnedBytes :: new ( invalid_message. as_bytes ( ) ) , "myackid" ) ;
344366 if let Err ( PreProcessingError :: Discardable { ack_id } ) = result {
345367 assert_eq ! ( ack_id, "myackid" ) ;
346368 } else {
@@ -390,8 +412,12 @@ mod tests {
390412 }
391413 ]
392414 }"# ;
393- let actual_uri = uri_from_s3_notification ( test_message. as_bytes ( ) , "myackid" ) . unwrap ( ) ;
415+ let ( actual_uri, actual_timestamp) =
416+ parse_s3_notification ( test_message. as_bytes ( ) , "myackid" ) . unwrap ( ) ;
394417 let expected_uri = Uri :: from_str ( "s3://mybucket/hello::world::logs.json" ) . unwrap ( ) ;
418+ let expected_timestamp =
419+ OffsetDateTime :: parse ( "2021-05-22T09:22:41.789Z" , & Rfc3339 ) . unwrap ( ) ;
395420 assert_eq ! ( actual_uri, expected_uri) ;
421+ assert_eq ! ( actual_timestamp, expected_timestamp) ;
396422 }
397423}
0 commit comments