@@ -19,6 +19,12 @@ use crate::protocol::list_offset::ListOffsetVersion;
1919pub use crate :: utils:: PartitionOffset ;
2020use crate :: utils:: TimestampedPartitionOffset ;
2121
22+ #[ cfg( feature = "producer_timestamp" ) ]
23+ pub use crate :: protocol:: produce:: ProducerTimestamp ;
24+
25+ #[ cfg( not( feature = "producer_timestamp" ) ) ]
26+ use crate :: protocol:: produce:: ProducerTimestamp ;
27+
2228#[ cfg( feature = "security" ) ]
2329pub use self :: network:: SecurityConfig ;
2430
@@ -78,6 +84,9 @@ pub const DEFAULT_RETRY_MAX_ATTEMPTS: u32 = 120_000 / DEFAULT_RETRY_BACKOFF_TIME
7884/// The default value for `KafkaClient::set_connection_idle_timeout(..)`
7985pub const DEFAULT_CONNECTION_IDLE_TIMEOUT_MILLIS : u64 = 540_000 ;
8086
87+ /// The default value for `KafkaClient::set_producer_timestamp(..)`
88+ pub ( crate ) const DEFAULT_PRODUCER_TIMESTAMP : Option < ProducerTimestamp > = None ;
89+
8190/// Client struct keeping track of brokers and topic metadata.
8291///
8392/// Implements methods described by the [Kafka Protocol](http://kafka.apache.org/protocol.html).
@@ -122,6 +131,9 @@ struct ClientConfig {
122131 // ~ the number of repeated retry attempts; prevents endless
123132 // repetition of a retry attempt
124133 retry_max_attempts : u32 ,
134+ // ~ producer's message timestamp option CreateTime/LogAppendTime
135+ #[ allow( unused) ]
136+ producer_timestamp : Option < ProducerTimestamp > ,
125137}
126138
127139// --------------------------------------------------------------------
@@ -408,6 +420,7 @@ impl KafkaClient {
408420 offset_storage : DEFAULT_GROUP_OFFSET_STORAGE ,
409421 retry_backoff_time : Duration :: from_millis ( DEFAULT_RETRY_BACKOFF_TIME_MILLIS ) ,
410422 retry_max_attempts : DEFAULT_RETRY_MAX_ATTEMPTS ,
423+ producer_timestamp : DEFAULT_PRODUCER_TIMESTAMP ,
411424 } ,
412425 conn_pool : network:: Connections :: new (
413426 default_conn_rw_timeout ( ) ,
@@ -477,6 +490,7 @@ impl KafkaClient {
477490 offset_storage : DEFAULT_GROUP_OFFSET_STORAGE ,
478491 retry_backoff_time : Duration :: from_millis ( DEFAULT_RETRY_BACKOFF_TIME_MILLIS ) ,
479492 retry_max_attempts : DEFAULT_RETRY_MAX_ATTEMPTS ,
493+ producer_timestamp : DEFAULT_PRODUCER_TIMESTAMP ,
480494 } ,
481495 conn_pool : network:: Connections :: new_with_security (
482496 default_conn_rw_timeout ( ) ,
@@ -722,6 +736,31 @@ impl KafkaClient {
722736 self . conn_pool . idle_timeout ( )
723737 }
724738
739+ #[ cfg( feature = "producer_timestamp" ) ]
740+ /// Sets the compression algorithm to use when sending out messages.
741+ ///
742+ /// # Example
743+ ///
744+ /// ```no_run
745+ /// use kafka::client::{Compression, KafkaClient};
746+ ///
747+ /// let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
748+ /// client.load_metadata_all().unwrap();
749+ /// client.set_producer_timestamp(Timestamp::CreateTime);
750+ /// ```
751+ #[ inline]
752+ pub fn set_producer_timestamp ( & mut self , producer_timestamp : Option < ProducerTimestamp > ) {
753+ self . config . producer_timestamp = producer_timestamp;
754+ }
755+
756+ #[ cfg( feature = "producer_timestamp" ) ]
757+ /// Retrieves the current `KafkaClient::producer_timestamp` setting.
758+ #[ inline]
759+ #[ must_use]
760+ pub fn producer_timestamp ( & self ) -> Option < ProducerTimestamp > {
761+ self . config . producer_timestamp
762+ }
763+
725764 /// Provides a view onto the currently loaded metadata of known .
726765 ///
727766 /// # Examples
@@ -1455,6 +1494,8 @@ impl KafkaClientInternals for KafkaClient {
14551494 correlation,
14561495 & config. client_id ,
14571496 config. compression ,
1497+ #[ cfg( feature = "producer_timestamp" ) ]
1498+ config. producer_timestamp ,
14581499 )
14591500 } )
14601501 . add ( msg. topic , msg. partition , msg. key , msg. value ) ,
0 commit comments