@@ -5,14 +5,15 @@ use hstreamdb_pb::{
55 GetSubscriptionRequest , ListConsumersRequest , ListStreamsRequest , ListSubscriptionsRequest ,
66 LookupSubscriptionRequest , NodeState ,
77} ;
8- use tonic:: transport:: { Channel , ClientTlsConfig } ;
8+ use tonic:: transport:: { Channel , Endpoint } ;
99use tonic:: Request ;
1010use url:: Url ;
1111
1212use crate :: appender:: Appender ;
1313use crate :: channel_provider:: { new_channel_provider, ChannelProviderSettings , Channels } ;
1414use crate :: common:: Error :: PBUnwrapError ;
1515use crate :: producer:: { FlushCallback , FlushSettings , Producer } ;
16+ use crate :: tls:: ClientTlsConfig ;
1617use crate :: { common, flow_controller, format_url, producer} ;
1718
1819pub struct Client {
@@ -29,41 +30,29 @@ impl Client {
2930 where
3031 Destination : std:: convert:: Into < String > ,
3132 {
32- const HSTREAM_PREFIX : & str = "hstream" ;
33- let server_url = server_url. into ( ) ;
34- let ( url_scheme, url) = {
35- let url = {
36- let mut url = Url :: parse ( & server_url) ?;
37- if url. port ( ) . is_none ( ) {
38- url. set_port ( Some ( 6570 ) )
39- . map_err ( |( ) | common:: Error :: SetPortError ( server_url. to_string ( ) ) ) ?;
40- }
41- url
42- } ;
43-
44- if url. scheme ( ) == HSTREAM_PREFIX {
45- let url_scheme = if channel_provider_settings. client_tls_config . is_none ( ) {
46- "http"
47- } else {
48- "https"
49- } ;
50- let server_url = & server_url[ 7 ..] ;
51- (
52- url_scheme. to_string ( ) ,
53- Url :: parse ( format ! ( "{url_scheme}{server_url}" ) . as_str ( ) ) ?,
54- )
55- } else if url. scheme ( ) == "hstreams" {
56- let url_scheme = "https" ;
57- (
58- url_scheme. to_string ( ) ,
59- Url :: parse ( format ! ( "{url_scheme}{server_url}" ) . as_str ( ) ) ?,
60- )
61- } else {
62- ( url. scheme ( ) . to_string ( ) , url)
33+ let server_url: String = server_url. into ( ) ;
34+ Url :: parse ( & server_url) ?;
35+ let server_url = set_scheme ( & server_url) . ok_or ( common:: Error :: InvalidUrl ( server_url) ) ?;
36+ let ( url_scheme, server_url) = {
37+ let mut server_url = Url :: parse ( & server_url) ?;
38+ let port = server_url. port ( ) ;
39+ if port. is_none ( ) {
40+ server_url
41+ . set_port ( Some ( 6570 ) )
42+ . map_err ( |( ) | common:: Error :: InvalidUrl ( server_url. to_string ( ) ) ) ?;
6343 }
44+ ( server_url. scheme ( ) . to_string ( ) , server_url)
6445 } ;
65- let mut hstream_api_client = HStreamApiClient :: connect ( String :: from ( url) ) . await ?;
46+
47+ log:: debug!( "client init connect: scheme = {url_scheme}, url = {server_url}" ) ;
6648 let tls_config = channel_provider_settings. client_tls_config . clone ( ) ;
49+ let mut hstream_api_client = HStreamApiClient :: new ( {
50+ let mut endpoint = Endpoint :: new ( server_url. to_string ( ) ) ?;
51+ if let Some ( tls_config) = tls_config. clone ( ) {
52+ endpoint = endpoint. tls_config ( tls_config) ?;
53+ }
54+ endpoint. connect ( ) . await ?
55+ } ) ;
6756 let channels = new_channel_provider (
6857 & url_scheme,
6958 & mut hstream_api_client,
@@ -78,6 +67,13 @@ impl Client {
7867 }
7968}
8069
70+ fn set_scheme ( url : & str ) -> Option < String > {
71+ Some (
72+ url. replace ( "hstream://" , "http://" )
73+ . replace ( "hstreams://" , "https://" ) ,
74+ )
75+ }
76+
8177impl Client {
8278 async fn new_channel_provider (
8379 & self ,
0 commit comments