Skip to content

Commit 7f0105e

Browse files
committed
support RoutingMetadata.
1 parent b4154b3 commit 7f0105e

File tree

4 files changed

+110
-23
lines changed

4 files changed

+110
-23
lines changed

src/extension.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,94 @@ use crate::result::RSocketResult;
55
use bytes::{Buf, BufMut, Bytes, BytesMut};
66

77
const MAX_MIME_LEN: usize = 0x7F;
8+
const MAX_ROUTING_TAG_LEN: usize = 0xFF;
89

910
#[derive(Debug, Clone, Eq, PartialEq)]
1011
pub struct CompositeMetadata {
1112
mime: String,
1213
payload: Bytes,
1314
}
1415

16+
#[derive(Debug, Clone)]
17+
pub struct RoutingMetadata {
18+
tags: Vec<String>,
19+
}
20+
21+
pub struct RoutingMetadataBuilder {
22+
inner: RoutingMetadata,
23+
}
24+
25+
impl RoutingMetadataBuilder {
26+
pub fn push_str(self, tag: &str) -> Self {
27+
self.push(String::from(tag))
28+
}
29+
30+
pub fn push(mut self, tag: String) -> Self {
31+
if tag.len() > MAX_ROUTING_TAG_LEN {
32+
panic!("exceeded maximum routing tag length!");
33+
}
34+
self.inner.tags.push(tag);
35+
self
36+
}
37+
pub fn build(self) -> RoutingMetadata {
38+
self.inner
39+
}
40+
}
41+
42+
impl RoutingMetadata {
43+
pub fn builder() -> RoutingMetadataBuilder {
44+
RoutingMetadataBuilder {
45+
inner: RoutingMetadata { tags: vec![] },
46+
}
47+
}
48+
49+
pub fn decode(bf: &mut BytesMut) -> RSocketResult<RoutingMetadata> {
50+
let mut bu = RoutingMetadata::builder();
51+
loop {
52+
match Self::decode_once(bf) {
53+
Ok(v) => match v {
54+
Some(tag) => bu = bu.push(tag),
55+
None => break,
56+
},
57+
Err(e) => return Err(e),
58+
}
59+
}
60+
Ok(bu.build())
61+
}
62+
63+
fn decode_once(bf: &mut BytesMut) -> RSocketResult<Option<String>> {
64+
if bf.is_empty() {
65+
return Ok(None);
66+
}
67+
let size = bf.get_u8() as usize;
68+
if bf.len() < size {
69+
return Err(RSocketError::from(ErrorKind::WithDescription(
70+
"require more bytes!",
71+
)));
72+
}
73+
let tag = String::from_utf8(bf.split_to(size).to_vec()).unwrap();
74+
Ok(Some(tag))
75+
}
76+
77+
pub fn get_tags(&self) -> &Vec<String> {
78+
&self.tags
79+
}
80+
81+
pub fn write_to(&self, bf: &mut BytesMut) {
82+
for tag in &self.tags {
83+
let size = tag.len() as u8;
84+
bf.put_u8(size);
85+
bf.put_slice(tag.as_bytes());
86+
}
87+
}
88+
89+
pub fn bytes(&self) -> Bytes {
90+
let mut bf = BytesMut::new();
91+
self.write_to(&mut bf);
92+
bf.freeze()
93+
}
94+
}
95+
1596
impl CompositeMetadata {
1697
pub fn new(mime: String, payload: Bytes) -> CompositeMetadata {
1798
if mime.len() > MAX_MIME_LEN {

src/payload/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
mod default;
22
mod setup;
33

4-
pub use default::{Payload,PayloadBuilder};
5-
pub use setup::{SetupPayload,SetupPayloadBuilder};
4+
pub use default::{Payload, PayloadBuilder};
5+
pub use setup::{SetupPayload, SetupPayloadBuilder};

tests/clients.rs

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,8 @@
11
extern crate rsocket_rust;
2-
2+
#[macro_use]
3+
extern crate log;
34
use rsocket_rust::prelude::*;
45

5-
#[tokio::main]
6-
#[test]
7-
async fn test() {
8-
let cli = RSocketFactory::connect()
9-
.acceptor(|| Box::new(EchoRSocket))
10-
.transport(URI::Tcp("127.0.0.1:7878".to_string()))
11-
.setup(Payload::from("READY!"))
12-
.mime_type("text/plain", "text/plain")
13-
.start()
14-
.await
15-
.unwrap();
16-
let req = Payload::builder()
17-
.set_data_utf8("Hello World!")
18-
.set_metadata_utf8("Rust")
19-
.build();
20-
let res = cli.request_response(req).await.unwrap();
21-
println!("got: {:?}", res);
22-
cli.close();
23-
}
24-
256
#[tokio::main]
267
#[test]
278
async fn test_client() {

tests/routing_metadata_test.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
extern crate rsocket_rust;
2+
3+
use bytes::{Bytes, BytesMut};
4+
use rsocket_rust::extension::RoutingMetadata;
5+
6+
#[test]
7+
fn routing_metadata_codec() {
8+
let m = RoutingMetadata::builder()
9+
.push_str("/orders")
10+
.push_str("/orders/77778888")
11+
.push_str("/users")
12+
.push_str("/users/1234")
13+
.build();
14+
15+
let mut bf = BytesMut::new();
16+
m.write_to(&mut bf);
17+
println!("encode routing metadata: {}", hex::encode(bf.to_vec()));
18+
let m2 = RoutingMetadata::decode(&mut bf).unwrap();
19+
let tags = m2.get_tags();
20+
for tag in tags {
21+
println!("decode tag: {}", tag);
22+
}
23+
assert_eq!(4, tags.len());
24+
assert_eq!(m.get_tags(), tags);
25+
}

0 commit comments

Comments
 (0)