@@ -8,23 +8,99 @@ use std::rc::Weak;
88use serde:: { Deserialize , Serialize } ;
99
1010use super :: handler_id:: HandlerId ;
11+ use super :: messages:: FromWorker ;
1112use super :: messages:: ToWorker ;
13+ use super :: native_worker:: DedicatedWorker ;
1214use super :: native_worker:: NativeWorkerExt ;
1315use super :: traits:: Worker ;
14- use super :: { Callback , Shared } ;
16+ use super :: Callback ;
1517use crate :: codec:: Codec ;
1618
1719pub ( crate ) type ToWorkerQueue < W > = Vec < ToWorker < W > > ;
1820pub ( crate ) type CallbackMap < W > = HashMap < HandlerId , Weak < dyn Fn ( <W as Worker >:: Output ) > > ;
1921
20- struct WorkerBridgeInner < W >
22+ pub ( crate ) struct WorkerBridgeInner < W >
2123where
2224 W : Worker ,
2325{
2426 // When worker is loaded, queue becomes None.
25- pending_queue : Shared < Option < ToWorkerQueue < W > > > ,
26- callbacks : Shared < CallbackMap < W > > ,
27- post_msg : Rc < dyn Fn ( ToWorker < W > ) > ,
27+ pending_queue : RefCell < Option < ToWorkerQueue < W > > > ,
28+ callbacks : RefCell < CallbackMap < W > > ,
29+ native_worker : RefCell < Option < DedicatedWorker > > ,
30+ post_msg : Box < dyn Fn ( & DedicatedWorker , ToWorker < W > ) > ,
31+ }
32+
33+ impl < W > WorkerBridgeInner < W >
34+ where
35+ W : Worker + ' static ,
36+ {
37+ pub ( crate ) fn new < CODEC > (
38+ native_worker : DedicatedWorker ,
39+ callbacks : CallbackMap < W > ,
40+ ) -> Rc < Self >
41+ where
42+ CODEC : Codec ,
43+ W :: Input : Serialize + for < ' de > Deserialize < ' de > ,
44+ W :: Output : Serialize + for < ' de > Deserialize < ' de > ,
45+ {
46+ let worker = native_worker. clone ( ) ;
47+
48+ let pending_queue = RefCell :: new ( Some ( Vec :: new ( ) ) ) ;
49+ let callbacks = RefCell :: new ( callbacks) ;
50+ let native_worker = RefCell :: new ( Some ( native_worker) ) ;
51+ let post_msg = move |worker : & DedicatedWorker , msg : ToWorker < W > |
52+ worker. post_packed_message :: < _ , CODEC > ( msg) ;
53+
54+ let self_ = Self {
55+ pending_queue,
56+ callbacks,
57+ native_worker,
58+ post_msg : Box :: new ( post_msg) ,
59+ } ;
60+ let self_ = Rc :: new ( self_) ;
61+
62+ let handler = {
63+ let bridge_inner = Rc :: downgrade ( & self_) ;
64+ // If all bridges are dropped then `self_` is dropped and `upgrade` returns `None`.
65+ move |msg : FromWorker < W > | if let Some ( bridge_inner) = Weak :: upgrade ( & bridge_inner) {
66+ match msg {
67+ FromWorker :: WorkerLoaded => {
68+ // Set pending queue to `None`. Unless `WorkerLoaded` is
69+ // sent twice, this will always be `Some`.
70+ if let Some ( pending_queue) = bridge_inner. take_queue ( ) {
71+ // Will be `None` if the worker has been terminated.
72+ if let Some ( worker) = bridge_inner. native_worker . borrow_mut ( ) . as_ref ( ) {
73+ // Send all pending messages.
74+ for to_worker in pending_queue. into_iter ( ) {
75+ ( bridge_inner. post_msg ) ( worker, to_worker) ;
76+ }
77+ }
78+ }
79+ }
80+ FromWorker :: ProcessOutput ( id, output) => {
81+ let mut callbacks = bridge_inner. callbacks . borrow_mut ( ) ;
82+
83+ if let Some ( m) = callbacks. get ( & id) {
84+ if let Some ( m) = Weak :: upgrade ( m) {
85+ m ( output) ;
86+ } else {
87+ // The bridge has been dropped.
88+ callbacks. remove ( & id) ;
89+ }
90+ }
91+ }
92+ }
93+ }
94+ } ;
95+
96+ worker. set_on_packed_message :: < _ , CODEC , _ > ( handler) ;
97+
98+ self_
99+ }
100+
101+ fn take_queue ( & self ) -> Option < ToWorkerQueue < W > > {
102+ self . pending_queue . borrow_mut ( ) . take ( )
103+ }
28104}
29105
30106impl < W > fmt:: Debug for WorkerBridgeInner < W >
@@ -49,10 +125,24 @@ where
49125 m. push ( msg) ;
50126 }
51127 None => {
52- ( self . post_msg ) ( msg) ;
128+ if let Some ( worker) = self . native_worker . borrow ( ) . as_ref ( ) {
129+ ( self . post_msg ) ( worker, msg) ;
130+ }
53131 }
54132 }
55133 }
134+
135+ /// Terminate the worker, no more messages can be sent after this.
136+ fn terminate ( & self ) {
137+ if let Some ( worker) = self . native_worker . borrow_mut ( ) . take ( ) {
138+ worker. terminate ( ) ;
139+ }
140+ }
141+
142+ /// Returns true if the worker is terminated.
143+ fn is_terminated ( & self ) -> bool {
144+ self . native_worker . borrow ( ) . is_none ( )
145+ }
56146}
57147
58148impl < W > Drop for WorkerBridgeInner < W >
@@ -66,6 +156,15 @@ where
66156}
67157
68158/// A connection manager for components interaction with workers.
159+ ///
160+ /// Dropping this object will send a disconnect message to the worker and drop
161+ /// the callback if set, but will have no effect on forked bridges. Note that
162+ /// the worker will still receive and process any messages sent over the bridge
163+ /// up to that point, but the reply will not trigger a callback. If all forked
164+ /// bridges for a worker are dropped, the worker will be sent a destroy message.
165+ ///
166+ /// To terminate the worker and stop execution immediately, use
167+ /// [`terminate`](#method.terminate).
69168pub struct WorkerBridge < W >
70169where
71170 W : Worker ,
@@ -84,26 +183,16 @@ where
84183 self . inner . send_message ( ToWorker :: Connected ( self . id ) ) ;
85184 }
86185
87- pub ( crate ) fn new < CODEC > (
186+ pub ( crate ) fn new (
88187 id : HandlerId ,
89- native_worker : web_sys:: Worker ,
90- pending_queue : Rc < RefCell < Option < ToWorkerQueue < W > > > > ,
91- callbacks : Rc < RefCell < CallbackMap < W > > > ,
188+ inner : Rc < WorkerBridgeInner < W > > ,
92189 callback : Option < Callback < W :: Output > > ,
93190 ) -> Self
94191 where
95- CODEC : Codec ,
96192 W :: Input : Serialize + for < ' de > Deserialize < ' de > ,
97193 {
98- let post_msg = move |msg : ToWorker < W > | native_worker. post_packed_message :: < _ , CODEC > ( msg) ;
99-
100194 let self_ = Self {
101- inner : WorkerBridgeInner {
102- pending_queue,
103- callbacks,
104- post_msg : Rc :: new ( post_msg) ,
105- }
106- . into ( ) ,
195+ inner,
107196 id,
108197 _worker : PhantomData ,
109198 _cb : callback,
@@ -146,6 +235,23 @@ where
146235
147236 self_
148237 }
238+
239+ /// Immediately terminates the worker and stops any execution in progress,
240+ /// for this and all forked bridges. All messages will be dropped without
241+ /// the worker receiving them. No disconnect or destroy message is sent. Any
242+ /// messages sent after this point are dropped (from this bridge or any
243+ /// forks).
244+ ///
245+ /// For more details see
246+ /// [`web_sys::Worker::terminate`](https://rustwasm.github.io/wasm-bindgen/api/web_sys/struct.Worker.html#method.terminate).
247+ pub fn terminate ( & self ) {
248+ self . inner . terminate ( )
249+ }
250+
251+ /// Returns true if the worker is terminated.
252+ pub fn is_terminated ( & self ) -> bool {
253+ self . inner . is_terminated ( )
254+ }
149255}
150256
151257impl < W > Drop for WorkerBridge < W >
0 commit comments