Skip to content

Commit c52f662

Browse files
committed
Refactor streams
1 parent 2694c22 commit c52f662

File tree

3 files changed

+31
-57
lines changed

3 files changed

+31
-57
lines changed

src/client.rs

Lines changed: 15 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
//! This module contains definitions of all the complex data structures that are returned by calls
44
55
use std::collections::{BTreeMap, VecDeque};
6-
#[cfg(test)]
7-
use std::fs::File;
86
use std::io::{self, BufRead, BufReader, Read, Write};
97
use std::net::{TcpStream, ToSocketAddrs};
108

@@ -31,8 +29,6 @@ use socks::{Socks5Stream, ToTargetAddr};
3129
use stream::ClonableStream;
3230

3331
use batch::Batch;
34-
#[cfg(test)]
35-
use test_stream::TestStream;
3632
use types::*;
3733

3834
macro_rules! impl_batch_call {
@@ -72,8 +68,8 @@ pub struct Client<S>
7268
where
7369
S: Read + Write,
7470
{
75-
stream: S,
76-
buf_reader: BufReader<S>,
71+
stream: ClonableStream<S>,
72+
buf_reader: BufReader<ClonableStream<S>>,
7773

7874
headers: VecDeque<HeaderNotification>,
7975
script_notifications: BTreeMap<ScriptHash, VecDeque<ScriptStatus>>,
@@ -82,7 +78,7 @@ where
8278
calls: usize,
8379
}
8480

85-
impl<S> From<S> for Client<ClonableStream<S>>
81+
impl<S> From<S> for Client<S>
8682
where
8783
S: Read + Write,
8884
{
@@ -107,23 +103,14 @@ impl Client<ElectrumPlaintextStream> {
107103
/// Creates a new plaintext client and tries to connect to `socket_addr`.
108104
pub fn new<A: ToSocketAddrs>(socket_addr: A) -> Result<Self, Error> {
109105
let stream = TcpStream::connect(socket_addr)?;
110-
let buf_reader = BufReader::new(stream.try_clone()?);
111-
112-
Ok(Self {
113-
stream,
114-
buf_reader,
115-
headers: VecDeque::new(),
116-
script_notifications: BTreeMap::new(),
117106

118-
#[cfg(feature = "debug-calls")]
119-
calls: 0,
120-
})
107+
Ok(stream.into())
121108
}
122109
}
123110

124111
#[cfg(feature = "use-openssl")]
125112
/// Transport type used to establish an OpenSSL TLS encrypted/authenticated connection with the server
126-
pub type ElectrumSslStream = ClonableStream<SslStream<TcpStream>>;
113+
pub type ElectrumSslStream = SslStream<TcpStream>;
127114
#[cfg(feature = "use-openssl")]
128115
impl Client<ElectrumSslStream> {
129116
/// Creates a new SSL client and tries to connect to `socket_addr`. Optionally, if `domain` is not
@@ -174,7 +161,7 @@ mod danger {
174161
not(feature = "use-openssl")
175162
))]
176163
/// Transport type used to establish a Rustls TLS encrypted/authenticated connection with the server
177-
pub type ElectrumSslStream = ClonableStream<StreamOwned<ClientSession, TcpStream>>;
164+
pub type ElectrumSslStream = StreamOwned<ClientSession, TcpStream>;
178165
#[cfg(all(
179166
any(feature = "default", feature = "use-rustls"),
180167
not(feature = "use-openssl")
@@ -209,7 +196,7 @@ impl Client<ElectrumSslStream> {
209196

210197
#[cfg(any(feature = "default", feature = "proxy"))]
211198
/// Transport type used to establish a connection to a server through a socks proxy
212-
pub type ElectrumProxyStream = ClonableStream<Socks5Stream>;
199+
pub type ElectrumProxyStream = Socks5Stream;
213200
#[cfg(any(feature = "default", feature = "proxy"))]
214201
impl Client<ElectrumProxyStream> {
215202
/// Creates a new socks client and tries to connect to `target_addr` using `proxy_addr` as an
@@ -226,21 +213,6 @@ impl Client<ElectrumProxyStream> {
226213
}
227214
}
228215

229-
#[cfg(test)]
230-
impl Client<TestStream> {
231-
pub fn new_test(file: File) -> Self {
232-
let stream = TestStream::new_out();
233-
let buf_reader = BufReader::new(TestStream::new_in(file));
234-
235-
Self {
236-
stream,
237-
buf_reader,
238-
headers: VecDeque::new(),
239-
script_notifications: BTreeMap::new(),
240-
}
241-
}
242-
}
243-
244216
impl<S: Read + Write> Client<S> {
245217
fn call(&mut self, req: Request) -> Result<serde_json::Value, Error> {
246218
let mut raw = serde_json::to_vec(&req)?;
@@ -650,9 +622,16 @@ impl<S: Read + Write> Client<S> {
650622
mod test {
651623
use std::fs::File;
652624
use std::io::Read;
625+
use test_stream::TestStream;
653626

654627
use client::Client;
655628

629+
impl Client<TestStream> {
630+
pub fn new_test(file: File) -> Self {
631+
TestStream::new(file).into()
632+
}
633+
}
634+
656635
macro_rules! impl_test_prelude {
657636
( $testcase:expr ) => {{
658637
let data_in = File::open(format!("./test_data/{}.in", $testcase)).unwrap();
@@ -665,7 +644,7 @@ mod test {
665644
let mut data_out = File::open(format!("./test_data/{}.out", $testcase)).unwrap();
666645
let mut buffer = Vec::new();
667646
data_out.read_to_end(&mut buffer).unwrap();
668-
let stream_buffer: Vec<u8> = $stream.into();
647+
let stream_buffer = $stream.stream().lock().unwrap().buffer.clone();
669648

670649
assert_eq!(
671650
stream_buffer,

src/stream.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::io::{self, Read, Write};
22
use std::sync::{Arc, Mutex};
33

4+
#[derive(Debug)]
45
pub struct ClonableStream<T: Read + Write>(Arc<Mutex<T>>);
56

67
impl<T: Read + Write> Read for ClonableStream<T> {
@@ -30,3 +31,10 @@ impl<T: Read + Write> Clone for ClonableStream<T> {
3031
ClonableStream(Arc::clone(&self.0))
3132
}
3233
}
34+
35+
#[cfg(test)]
36+
impl<T: Read + Write> ClonableStream<T> {
37+
pub fn stream(&self) -> Arc<Mutex<T>> {
38+
Arc::clone(&self.0)
39+
}
40+
}

src/test_stream.rs

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,44 +3,31 @@ use std::io::{Read, Result, Write};
33
use std::fs::File;
44

55
pub struct TestStream {
6-
file: Option<File>,
7-
buffer: Option<Vec<u8>>,
6+
pub file: File,
7+
pub buffer: Vec<u8>,
88
}
99

1010
impl TestStream {
11-
pub fn new_in(file: File) -> Self {
11+
pub fn new(file: File) -> Self {
1212
TestStream {
13-
file: Some(file),
14-
buffer: None,
15-
}
16-
}
17-
18-
pub fn new_out() -> Self {
19-
TestStream {
20-
file: None,
21-
buffer: Some(Vec::new()),
13+
file,
14+
buffer: Vec::new(),
2215
}
2316
}
2417
}
2518

2619
impl Read for TestStream {
2720
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
28-
self.file.as_ref().unwrap().read(buf)
21+
self.file.read(buf)
2922
}
3023
}
3124

3225
impl Write for TestStream {
3326
fn write(&mut self, buf: &[u8]) -> Result<usize> {
34-
self.buffer.as_mut().unwrap().write(buf)
27+
self.buffer.write(buf)
3528
}
3629

3730
fn flush(&mut self) -> Result<()> {
38-
self.buffer.as_mut().unwrap().flush()
39-
}
40-
}
41-
42-
impl Into<Vec<u8>> for TestStream {
43-
fn into(self) -> Vec<u8> {
44-
self.buffer.unwrap()
31+
self.buffer.flush()
4532
}
4633
}

0 commit comments

Comments
 (0)