diff --git a/Cargo.toml b/Cargo.toml index 445699b..df84d06 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ pin-utils = "0.1.0-alpha.4" slab = "0.4.2" streaming-iterator = "0.1.4" objekt-clonable = "0.2.2" -multiqueue = "0.3.2" +multiqueue2 = "0.1.6" crossbeam-channel = "0.3.9" bus = "2.2.2" diff --git a/examples/single_directed.rs b/examples/single_directed.rs new file mode 100644 index 0000000..7351736 --- /dev/null +++ b/examples/single_directed.rs @@ -0,0 +1,23 @@ +use bastion_streams::stream::flow::map::Map; +use bastion_streams::stream::sink::ignore::Ignore; +use bastion_streams::stream::source::single::Single; + +use bastion_streams::stream::topology::architect::Architect; +use bastion_streams::stream::topology::container::Container; + +fn main() { + let single = Single::::new(999); + let mapper = Map::::new(Box::new(|x: i32| x + 1)); + let sink = Ignore::::new(); + + let stages = vec![ + Container::Source(Box::new(single)), + Container::Transform(Box::new(mapper)), + Container::Sink(Box::new(sink)), + ]; + + let mut architect = Architect::graph(stages); + architect.check_bounds(); + architect.visit_stages(); + architect.run(); +} diff --git a/src/lib.rs b/src/lib.rs index 8dd59d6..d71bd41 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -#![feature(clone_closures)] +//#![feature(clone_closures)] #[macro_use] pub mod stream; diff --git a/src/stream/flow/map.rs b/src/stream/flow/map.rs index 0004312..bd1f6d8 100644 --- a/src/stream/flow/map.rs +++ b/src/stream/flow/map.rs @@ -5,20 +5,44 @@ use objekt_clonable::clonable; #[clonable] pub trait MapClosure: Fn(I) -> O + Clone + Send + Sync + 'static {} +impl MapClosure for T where T: Fn(I) -> O + Clone + Send + Sync + 'static {} + type MapFn = Box>; pub struct Map { - pub shape: FlowShape<'static, I, O>, - pub stage_id: usize, + pub shape: Option>, + pub stage_id: Option, pub map_fn: MapFn, - pub demand_rx: BroadcastReceiver, - pub demand_tx: BroadcastSender, + pub demand_rx: Option>, + pub demand_tx: Option>, - pub in_handler: Box, - pub out_handler: Box, - pub logic: GraphStageLogic, + pub in_handler: Option>, + pub out_handler: Option>, + pub logic: Option, +} + +impl Map +where + I: Clone, + O: Clone, +{ + pub fn new(map_fn: MapFn) -> Self { + Self { + shape: None, + stage_id: None, + + map_fn, + + demand_rx: None, + demand_tx: None, + + in_handler: None, + out_handler: None, + logic: None, + } + } } ///// Map handler @@ -39,16 +63,16 @@ struct MapHandler { } impl OutHandler for MapHandler - where - I: Clone + Send + Sync + 'static, - O: Clone + Send + Sync + 'static, +where + I: Clone + Send + Sync + 'static, + O: Clone + Send + Sync + 'static, { fn name(&self) -> String { String::from("map-flow-out") } fn on_pull(&self) { - unimplemented!() + if let Ok(_elem) = self.out_rx.as_ref().unwrap().try_recv() {} } fn on_downstream_finish(&self) { @@ -61,9 +85,9 @@ impl OutHandler for MapHandler } impl InHandler for MapHandler - where - I: Clone + Send + Sync, - O: Clone + Send + Sync, +where + I: Clone + Send + Sync, + O: Clone + Send + Sync, { fn name(&self) -> String { String::from("map-flow-in") @@ -78,7 +102,7 @@ impl InHandler for MapHandler // todo: on_pull make demand from the upper let demand = Demand { stage_id: self.stage_id, - style: DemandStyle::DemandFull(100) + style: DemandStyle::DemandFull(100), }; self.demand_tx.as_ref().unwrap().try_send(demand).unwrap(); } @@ -111,7 +135,7 @@ impl Default for MapHandler { } } -impl<'a, I, O> GraphStage<'a> for Map +impl GraphStage for Map where I: Clone + Send + Sync + 'static, O: Clone + Send + Sync + 'static, @@ -120,47 +144,62 @@ where let map_flow_inlet = Inlet::::new(0, "Map.in"); let map_flow_outlet = Outlet::::new(0, "Map.out"); - self.shape = FlowShape { + self.shape = Some(FlowShape { inlet: map_flow_inlet, outlet: map_flow_outlet, - }; + }); } - fn build_demand(&'a mut self, tx: BroadcastSender, rx: BroadcastReceiver) { - self.demand_tx = tx; - self.demand_rx = rx; + fn build_demand(&mut self, tx: BroadcastSender, rx: BroadcastReceiver) { + self.demand_tx = Some(tx); + self.demand_rx = Some(rx); } - fn create_logic(&mut self, _attributes: Attributes) -> GraphStageLogic { + fn create_logic(&mut self, stage_id: usize, _attributes: Attributes) { self.build_shape(); let (in_tx, in_rx) = unbounded::(); let (out_tx, out_rx) = unbounded::(); - let handler = Box::new(MapHandler { + let handler = Some(Box::new(MapHandler { map_fn: Some(self.map_fn.clone()), in_tx: Some(in_tx), in_rx: Some(in_rx), out_rx: Some(out_rx), out_tx: Some(out_tx), - demand_rx: Some(self.demand_rx.clone()), - demand_tx: Some(self.demand_tx.clone()), - stage_id: self.stage_id - }); + demand_rx: Some(self.demand_rx.as_ref().unwrap().clone()), + demand_tx: Some(self.demand_tx.as_ref().unwrap().clone()), + stage_id, + })); + + self.stage_id = Some(stage_id); - self.in_handler = handler.clone(); - self.out_handler = handler.clone(); + self.in_handler = Some(handler.as_ref().unwrap().clone()); + self.out_handler = Some(handler.as_ref().unwrap().clone()); - let shape = Box::new(self.shape.clone()); + let shape = Box::new(self.shape.as_ref().unwrap().clone()); let mut gsl = GraphStageLogic::from_shape::(shape); - gsl.set_inlet_handler(self.shape.inlet.clone(), self.in_handler.clone()); - gsl.set_outlet_handler(self.shape.outlet.clone(), self.out_handler.clone()); - self.logic = gsl.clone(); - gsl + gsl.set_inlet_handler( + self.shape.as_ref().unwrap().inlet.clone(), + self.in_handler.as_ref().unwrap().clone(), + ); + gsl.set_outlet_handler( + self.shape.as_ref().unwrap().outlet.clone(), + self.out_handler.as_ref().unwrap().clone(), + ); + self.logic = Some(gsl); + } + + fn get_shape(&self) -> ShapeType { + self.shape.as_ref().unwrap().shape_type() + } + + fn get_stage_id(&self) -> usize { + self.stage_id.unwrap() } - fn get_shape(&'a self) -> ShapeType { - self.shape.shape_type() + fn get_logic(&self) -> &GraphStageLogic { + self.logic.as_ref().unwrap() } } diff --git a/src/stream/sink/ignore.rs b/src/stream/sink/ignore.rs index 4784537..7183d04 100644 --- a/src/stream/sink/ignore.rs +++ b/src/stream/sink/ignore.rs @@ -3,15 +3,34 @@ use crossbeam_channel::{unbounded, Receiver, Sender}; use futures::io::Error; pub struct Ignore { - pub shape: SinkShape<'static, I>, - pub stage_id: usize, + pub shape: Option>, + pub stage_id: Option, - pub demand_rx: BroadcastReceiver, - pub demand_tx: BroadcastSender, + pub demand_rx: Option>, + pub demand_tx: Option>, - pub in_handler: Box, - pub out_handler: Box, - pub logic: GraphStageLogic, + pub in_handler: Option>, + pub out_handler: Option>, + pub logic: Option, +} + +impl Ignore +where + I: Clone, +{ + pub fn new() -> Self { + Self { + shape: None, + stage_id: None, + + demand_rx: None, + demand_tx: None, + + in_handler: None, + out_handler: None, + logic: None, + } + } } #[derive(Clone)] @@ -26,8 +45,8 @@ struct IgnoreHandler { } impl InHandler for IgnoreHandler - where - I: Clone + 'static, +where + I: Clone + 'static, { fn name(&self) -> String { String::from("ignore-sink-in") @@ -37,11 +56,12 @@ impl InHandler for IgnoreHandler if let Ok(_elem) = self.in_rx.as_ref().unwrap().try_recv() { println!("Ignored"); } else { + println!("Demanding"); // todo: handle error case of try_recv // todo: on_pull make demand from the upper let demand = Demand { stage_id: self.stage_id, - style: DemandStyle::DemandFull(100) + style: DemandStyle::DemandFull(100), }; self.demand_tx.as_ref().unwrap().try_send(demand).unwrap(); } @@ -56,45 +76,57 @@ impl InHandler for IgnoreHandler } } -impl<'a, I> GraphStage<'a> for Ignore - where - I: Clone + 'static, +impl GraphStage for Ignore +where + I: Clone + 'static, { fn build_shape(&mut self) { let ignore_sink_inlet = Inlet::::new(0, "Sink.out"); - self.shape = SinkShape { + self.shape = Some(SinkShape { inlet: ignore_sink_inlet, - }; + }); } - fn build_demand(&'a mut self, tx: BroadcastSender, rx: BroadcastReceiver) { - self.demand_tx = tx; - self.demand_rx = rx; + fn build_demand(&mut self, tx: BroadcastSender, rx: BroadcastReceiver) { + self.demand_tx = Some(tx); + self.demand_rx = Some(rx); } - fn create_logic(&mut self, _attributes: Attributes) -> GraphStageLogic { + fn create_logic(&mut self, stage_id: usize, _attributes: Attributes) { self.build_shape(); let (tx, rx) = unbounded::(); - self.in_handler = Box::new(IgnoreHandler { + self.in_handler = Some(Box::new(IgnoreHandler { in_tx: Some(tx), in_rx: Some(rx), - demand_rx: Some(self.demand_rx.clone()), - demand_tx: Some(self.demand_tx.clone()), - stage_id: self.stage_id - }); + demand_rx: Some(self.demand_rx.as_ref().unwrap().clone()), + demand_tx: Some(self.demand_tx.as_ref().unwrap().clone()), + stage_id, + })); + + self.stage_id = Some(stage_id); - let shape = Box::new(self.shape.clone()); + let shape = Box::new(self.shape.as_ref().unwrap().clone()); let mut gsl = GraphStageLogic::from_shape::(shape); - gsl.set_inlet_handler(self.shape.inlet.clone(), self.in_handler.clone()); - self.logic = gsl.clone(); - gsl + gsl.set_inlet_handler( + self.shape.as_ref().unwrap().inlet.clone(), + self.in_handler.as_ref().unwrap().clone(), + ); + self.logic = Some(gsl); } - fn get_shape(&'a self) -> ShapeType { - let shape: &dyn Shape = &self.shape; + fn get_shape(&self) -> ShapeType { + let shape: &dyn Shape = self.shape.as_ref().unwrap(); shape.shape_type() } + + fn get_stage_id(&self) -> usize { + self.stage_id.unwrap() + } + + fn get_logic(&self) -> &GraphStageLogic { + self.logic.as_ref().unwrap() + } } diff --git a/src/stream/source/single.rs b/src/stream/source/single.rs index 698f056..b6617f7 100644 --- a/src/stream/source/single.rs +++ b/src/stream/source/single.rs @@ -1,31 +1,53 @@ - use crate::stream::stage::prelude::*; use crossbeam_channel::{unbounded, Receiver, Sender}; use futures::io::Error; pub struct Single { - pub shape: SourceShape<'static, O>, + pub shape: Option>, + pub stage_id: Option, + + pub element: O, - pub elem: O, + pub demand_rx: Option>, + pub demand_tx: Option>, - pub demand_rx: BroadcastReceiver, - pub demand_tx: BroadcastSender, + pub in_handler: Option>, + pub out_handler: Option>, + pub logic: Option, +} - pub in_handler: Box, - pub out_handler: Box, - pub logic: GraphStageLogic, +impl Single +where + O: Clone, +{ + pub fn new(element: O) -> Self { + Self { + shape: None, + stage_id: None, + + element, + + demand_rx: None, + demand_tx: None, + + in_handler: None, + out_handler: None, + logic: None, + } + } } #[derive(Clone, Debug)] struct SingleHandler { + pub stage_id: usize, elem: O, pub rx: Receiver, pub tx: Sender, } impl OutHandler for SingleHandler - where - O: Clone + 'static, +where + O: Clone + 'static, { fn name(&self) -> String { String::from("single-source-out") @@ -45,43 +67,56 @@ impl OutHandler for SingleHandler } } -impl<'a, O> GraphStage<'a> for Single +impl GraphStage for Single where - O: Clone + 'static, + O: Clone + 'static, { fn build_shape(&mut self) { let single_source_outlet = Outlet::::new(0, "Single.out"); - self.shape = SourceShape { + self.shape = Some(SourceShape { outlet: single_source_outlet, - }; + }); } - fn build_demand(&'a mut self, tx: BroadcastSender, rx: BroadcastReceiver) { - self.demand_tx = tx; - self.demand_rx = rx; + fn build_demand(&mut self, tx: BroadcastSender, rx: BroadcastReceiver) { + self.demand_tx = Some(tx); + self.demand_rx = Some(rx); } - fn create_logic(&mut self, _attributes: Attributes) -> GraphStageLogic { + fn create_logic(&mut self, stage_id: usize, _attributes: Attributes) { self.build_shape(); let (tx, rx) = unbounded(); - self.out_handler = Box::new(SingleHandler { - elem: self.elem.clone(), + self.out_handler = Some(Box::new(SingleHandler { + elem: self.element.clone(), tx, rx, - }); + stage_id, + })); + + self.stage_id = Some(stage_id); - let shape = Box::new(self.shape.clone()); + let shape = Box::new(self.shape.as_ref().unwrap().clone()); let mut gsl = GraphStageLogic::from_shape::(shape); - gsl.set_outlet_handler(self.shape.outlet.clone(), self.out_handler.clone()); - self.logic = gsl.clone(); - gsl + gsl.set_outlet_handler( + self.shape.as_ref().unwrap().outlet.clone(), + self.out_handler.as_ref().unwrap().clone(), + ); + self.logic = Some(gsl); } - fn get_shape(&'a self) -> ShapeType { - let shape: &dyn Shape = &self.shape; + fn get_shape(&self) -> ShapeType { + let shape: &dyn Shape = self.shape.as_ref().unwrap(); shape.shape_type() } + + fn get_stage_id(&self) -> usize { + self.stage_id.unwrap() + } + + fn get_logic(&self) -> &GraphStageLogic { + self.logic.as_ref().unwrap() + } } diff --git a/src/stream/stage/demand.rs b/src/stream/stage/demand.rs index 78d7cce..2227ada 100644 --- a/src/stream/stage/demand.rs +++ b/src/stream/stage/demand.rs @@ -1,20 +1,26 @@ -use crossbeam_channel::{Sender, Receiver}; +use crossbeam_channel::{Receiver, Sender}; #[derive(Clone, Debug)] pub enum DemandStyle { DemandFull(usize), - DemandPartial(usize, usize) + DemandPartial(usize, usize), } #[derive(Clone, Debug)] pub struct Demand { pub stage_id: usize, - pub style: DemandStyle + pub style: DemandStyle, +} + +impl Demand { + pub fn new(stage_id: usize, style: DemandStyle) -> Self { + Demand { stage_id, style } + } } // Demand endpoint struct #[derive(Clone, Debug)] pub struct Demander { pub tx: Sender, - pub rx: Receiver + pub rx: Receiver, } diff --git a/src/stream/stage/error.rs b/src/stream/stage/error.rs index e69de29..8b13789 100644 --- a/src/stream/stage/error.rs +++ b/src/stream/stage/error.rs @@ -0,0 +1 @@ + diff --git a/src/stream/stage/graph.rs b/src/stream/stage/graph.rs index c21d5b6..5312fe1 100644 --- a/src/stream/stage/graph.rs +++ b/src/stream/stage/graph.rs @@ -1,19 +1,19 @@ - - use crate::stream::stage::attributes::Attributes; use crate::stream::stage::handlers::*; use crate::stream::stage::lets::{Inlet, Outlet}; use crate::stream::stage::shape::{Shape, ShapeType}; -use crate::stream::stage::demand::{Demand}; -use multiqueue::{BroadcastSender, BroadcastReceiver}; +use crate::stream::stage::demand::Demand; +use multiqueue2::{BroadcastReceiver, BroadcastSender}; -pub trait GraphStage<'a> { - fn build_shape(&'a mut self); - fn build_demand(&'a mut self, tx: BroadcastSender, rx: BroadcastReceiver); - fn create_logic(&'a mut self, attributes: Attributes) -> GraphStageLogic; +pub trait GraphStage { + fn build_shape(&mut self); + fn build_demand(&mut self, tx: BroadcastSender, rx: BroadcastReceiver); + fn create_logic(&mut self, stage_id: usize, attributes: Attributes); - fn get_shape(&'a self) -> ShapeType; + fn get_shape(&self) -> ShapeType; + fn get_stage_id(&self) -> usize; + fn get_logic(&self) -> &GraphStageLogic; } /////////////// diff --git a/src/stream/stage/handlers.rs b/src/stream/stage/handlers.rs index d562ffc..a1ea901 100644 --- a/src/stream/stage/handlers.rs +++ b/src/stream/stage/handlers.rs @@ -1,10 +1,6 @@ use futures::io; use objekt_clonable::*; - - - - //#[clonable] //pub trait Handler: Clone {} ////impl Handler for T where T: Clone {} diff --git a/src/stream/stage/mod.rs b/src/stream/stage/mod.rs index 24abccc..c93ca37 100644 --- a/src/stream/stage/mod.rs +++ b/src/stream/stage/mod.rs @@ -1,20 +1,20 @@ pub mod attributes; +pub mod demand; +pub mod error; pub mod graph; pub mod handlers; pub mod lets; pub mod shape; pub mod types; -pub mod demand; -pub mod error; pub mod prelude { - pub use multiqueue::{BroadcastReceiver, BroadcastSender}; pub use super::attributes::*; + pub use super::demand::*; + pub use super::error::*; pub use super::graph::*; pub use super::handlers::*; pub use super::lets::*; pub use super::shape::*; pub use super::types::*; - pub use super::demand::*; - pub use super::error::*; + pub use multiqueue2::{BroadcastReceiver, BroadcastSender}; } diff --git a/src/stream/stage/shape.rs b/src/stream/stage/shape.rs index 0fe2943..c8329a2 100644 --- a/src/stream/stage/shape.rs +++ b/src/stream/stage/shape.rs @@ -4,10 +4,9 @@ use crate::stream::stage::lets::{Inlet, Outlet}; pub enum ShapeType { Source, Flow, - Sink + Sink, } - pub trait Shape<'a, I, O> { fn shape_type(&self) -> ShapeType; fn inlets(&self) -> Vec>; diff --git a/src/stream/stage/types.rs b/src/stream/stage/types.rs index a1f1de2..4b702c5 100644 --- a/src/stream/stage/types.rs +++ b/src/stream/stage/types.rs @@ -1,7 +1,4 @@ - - pub struct NotUsed(); /// Exhaust pub(crate) struct Exhaust(); - diff --git a/src/stream/topology/architect.rs b/src/stream/topology/architect.rs index c7dcfd8..1eb106a 100644 --- a/src/stream/topology/architect.rs +++ b/src/stream/topology/architect.rs @@ -1,48 +1,109 @@ -use crate::stream::stage::demand::{Demand}; +use crate::stream::stage::demand::{Demand, DemandStyle}; +use crate::stream::stage::attributes::Attributes; use crate::stream::stage::graph::GraphStage; -use crate::stream::stage::shape::ShapeType; -use multiqueue::{broadcast_queue, BroadcastReceiver, BroadcastSender}; -pub struct Architect<'a> { +use crate::stream::topology::container::Container; +use multiqueue2::{broadcast_queue, BroadcastReceiver, BroadcastSender}; + +pub struct Architect { demand_tx: BroadcastSender, demand_rx: BroadcastReceiver, - stages: Vec>> + stages: Vec, } - -impl<'a> Architect<'a> { - pub fn graph(stages: Vec>>) -> Architect { - let stage_count = stages.len(); - let (demand_tx, demand_rx) = - broadcast_queue(stage_count as u64); +impl Architect { + pub fn graph(stages: Vec) -> Architect { + let stage_count = stages.len() * 2; + let (demand_tx, demand_rx) = broadcast_queue(stage_count as u64); Architect { demand_rx, demand_tx, - stages + stages, } } - pub fn run() { - unimplemented!() + pub fn run(&mut self) { + if let Some(Container::Sink(sink)) = self.stages.last() { + let stage_id = (**sink).get_stage_id(); + let demand = Demand::new(stage_id, DemandStyle::DemandFull(100)); + self.demand_tx.try_send(demand); + self.stages.iter_mut().rev().for_each(|c| { + use Container::*; + + match c { + Source(s) | Transform(s) | Sink(s) => { + for x in (**s).get_logic().in_handlers.iter() { + dbg!("on_push"); + x.on_push(); + } + + for x in (**s).get_logic().out_handlers.iter() { + dbg!("on_pull"); + x.on_pull(); + } + } + } + }); + } else { + panic!("Run failed"); + } } - fn check_bounds(&'a self) { - if let Some(root) = self.stages.first() { - if root.get_shape() != ShapeType::Source { - unimplemented!() + pub fn check_bounds(&self) { + if let Some(first_stage) = self.stages.first() { + use Container::*; + + match first_stage { + Transform(_) | Sink(_) => { + panic!("Stage traversal failed. Stream graph starts with a Source.") + } + _ => (), + } + } + + if let Some(last_stage) = self.stages.last() { + use Container::*; + + match last_stage { + Source(_) | Transform(_) => { + panic!("Stage traversal failed. Stream graph ends with a Sink.") + } + _ => (), } } } - fn visit_stages(&'a mut self) { + pub fn visit_stages(&mut self) { let tx = self.demand_tx.clone(); let rx = self.demand_rx.add_stream(); self.stages.iter_mut().for_each(|stage| { - stage.build_demand(tx.clone(), rx.clone()) + use Container::*; + + match stage { + Source(s) | Transform(s) | Sink(s) => { + s.build_demand(tx.clone(), rx.clone()); + } + _ => (), + } }); + + self.stages + .iter_mut() + .rev() + .enumerate() + .for_each(|(stage_id, stage)| { + use Container::*; + + match stage { + Source(s) | Transform(s) | Sink(s) => { + s.create_logic(stage_id, Attributes {}); + } + _ => (), + } + }); } } diff --git a/src/stream/topology/container.rs b/src/stream/topology/container.rs new file mode 100644 index 0000000..745b400 --- /dev/null +++ b/src/stream/topology/container.rs @@ -0,0 +1,7 @@ +use crate::stream::stage::graph::GraphStage; + +pub enum Container { + Source(Box), + Transform(Box), + Sink(Box), +} diff --git a/src/stream/topology/macros.rs b/src/stream/topology/macros.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/stream/topology/macros.rs @@ -0,0 +1 @@ + diff --git a/src/stream/topology/mod.rs b/src/stream/topology/mod.rs index 6917e6c..23c9b59 100644 --- a/src/stream/topology/mod.rs +++ b/src/stream/topology/mod.rs @@ -1 +1,4 @@ pub mod architect; +pub mod container; +#[macro_use] +pub mod macros; diff --git a/tests/test_graph_stage_logic.rs b/tests/test_graph_stage_logic.rs index dcacd31..7d45d3e 100644 --- a/tests/test_graph_stage_logic.rs +++ b/tests/test_graph_stage_logic.rs @@ -2,12 +2,11 @@ mod tests { use bastion_streams::stream::stage::graph::GraphStageLogic; use bastion_streams::stream::stage::handlers::{InHandler, OutHandler}; - use bastion_streams::stream::stage::lets::{Outlet}; + use bastion_streams::stream::stage::lets::Outlet; use bastion_streams::stream::stage::shape::SourceShape; - + use bastion_streams::stream::stage::types::NotUsed; use futures::io::Error; - // let inlet0 = Inlet::::new(0, "in0"); // let inlet1 = Inlet::::new(1, "in1");