@@ -3,7 +3,7 @@ use std::env;
33use hstreamdb:: appender:: Appender ;
44use hstreamdb:: common:: { CompressionType , SpecialOffset , Stream } ;
55use hstreamdb:: producer:: { FlushSettings , Producer } ;
6- use hstreamdb:: { ChannelProviderSettings , Record , Subscription } ;
6+ use hstreamdb:: { ChannelProviderSettings , ConsumerStream , Record , Subscription , SubscriptionId } ;
77use hstreamdb_test_utils:: rand_alphanumeric;
88
99pub async fn init_client ( ) -> anyhow:: Result < Client > {
@@ -15,7 +15,15 @@ pub async fn init_client() -> anyhow::Result<Client> {
1515
1616#[ tokio:: test( flavor = "multi_thread" ) ]
1717async fn make_ci_happy ( ) {
18- init_client ( ) . await . unwrap ( ) . 0 . list_streams ( ) . await . unwrap ( ) ;
18+ let client = init_client ( ) . await . unwrap ( ) ;
19+ let ( stream, sub) = client. new_stream_subscription ( ) . await . unwrap ( ) ;
20+ let mut consumer = client. new_consumer ( sub. subscription_id ) . await . unwrap ( ) ;
21+ let ( appender, producer) = client. new_sync_producer ( stream. stream_name ) . await . unwrap ( ) ;
22+ appender. append ( rand_raw_record ( 4500 ) ) . await . unwrap ( ) ;
23+ producer. start ( ) . await ;
24+ while let Some ( x) = consumer. next ( ) . await {
25+ x. 1 . ack ( ) . unwrap ( ) ;
26+ }
1927}
2028
2129pub struct Client ( pub hstreamdb:: Client ) ;
@@ -110,4 +118,23 @@ impl Client {
110118 let subscription = self . new_subscription ( stream_name) . await ?;
111119 Ok ( ( stream, subscription) )
112120 }
121+
122+ pub async fn new_consumer (
123+ & self ,
124+ subscription_id : SubscriptionId ,
125+ ) -> anyhow:: Result < ConsumerStream > {
126+ let fetching_stream = self
127+ . 0
128+ . streaming_fetch ( rand_alphanumeric ( 20 ) , subscription_id)
129+ . await
130+ . unwrap ( ) ;
131+ Ok ( fetching_stream)
132+ }
133+ }
134+
135+ pub fn rand_raw_record ( len : usize ) -> Record {
136+ Record {
137+ partition_key : rand_alphanumeric ( 10 ) ,
138+ payload : hstreamdb:: Payload :: RawRecord ( rand_alphanumeric ( len) . into_bytes ( ) ) ,
139+ }
113140}
0 commit comments