@@ -68,10 +68,10 @@ pub fn partition_key_to_shard_id(
6868 } ;
6969
7070 for shard in shards {
71- let start = BigInt :: from_str_radix ( & shard. start_hash_range_key , 16 ) . map_err ( |err| {
71+ let start = BigInt :: from_str_radix ( & shard. start_hash_range_key , 10 ) . map_err ( |err| {
7272 common:: Error :: PartitionKeyError ( common:: PartitionKeyError :: ParseBigIntError ( err) )
7373 } ) ?;
74- let end = BigInt :: from_str_radix ( & shard. end_hash_range_key , 16 ) . map_err ( |err| {
74+ let end = BigInt :: from_str_radix ( & shard. end_hash_range_key , 10 ) . map_err ( |err| {
7575 common:: Error :: PartitionKeyError ( common:: PartitionKeyError :: ParseBigIntError ( err) )
7676 } ) ?;
7777
@@ -85,9 +85,66 @@ pub fn partition_key_to_shard_id(
8585 ) )
8686}
8787
88+ #[ doc( hidden) ]
8889#[ macro_export]
8990macro_rules! format_url {
9091 ( $scheme: expr, $host: expr, $port: expr) => {
9192 format!( "{}://{}:{}" , $scheme, $host, $port)
9293 } ;
9394}
95+
96+ #[ cfg( test) ]
97+ mod tests {
98+ use std:: collections:: HashMap ;
99+ use std:: env;
100+
101+ use hstreamdb_pb:: { ListShardsRequest , Stream } ;
102+ use hstreamdb_test_utils:: rand_alphanumeric;
103+
104+ use super :: partition_key_to_shard_id;
105+ use crate :: client:: Client ;
106+ use crate :: ChannelProviderSettings ;
107+
108+ #[ tokio:: test( flavor = "multi_thread" ) ]
109+ async fn test_partition_key_to_shard_id ( ) {
110+ let addr = env:: var ( "TEST_SERVER_ADDR" ) . unwrap ( ) ;
111+ let mut client = Client :: new (
112+ addr,
113+ ChannelProviderSettings {
114+ concurrency_limit : None ,
115+ } ,
116+ )
117+ . await
118+ . unwrap ( ) ;
119+
120+ let stream_name = rand_alphanumeric ( 20 ) ;
121+
122+ client
123+ . create_stream ( Stream {
124+ stream_name : stream_name. clone ( ) ,
125+ replication_factor : 1 ,
126+ backlog_duration : 10 * 60 ,
127+ shard_count : 200 ,
128+ } )
129+ . await
130+ . unwrap ( ) ;
131+
132+ let shards = client
133+ . channels
134+ . channel ( )
135+ . await
136+ . list_shards ( ListShardsRequest { stream_name } )
137+ . await
138+ . unwrap ( )
139+ . into_inner ( )
140+ . shards ;
141+
142+ let mut result = HashMap :: new ( ) ;
143+ for _ in 0 ..400 {
144+ let shard_id = partition_key_to_shard_id ( & shards, rand_alphanumeric ( 20 ) ) . unwrap ( ) ;
145+ result. insert ( shard_id, ( ) ) ;
146+ }
147+ println ! ( "result.len() = {}" , result. len( ) ) ;
148+ assert ! ( result. len( ) > 100 )
149+ }
150+ }
0 commit comments