diff --git a/src/lib/eliom_bus.client.ml b/src/lib/eliom_bus.client.ml index 2440901be..8cc4db046 100644 --- a/src/lib/eliom_bus.client.ml +++ b/src/lib/eliom_bus.client.ml @@ -28,35 +28,16 @@ let section = Logs.Src.create "eliom:bus" module Ecb = Eliom_comet_base type ('a, 'b) t = - { channel : 'b Ecb.wrapped_channel - ; stream : 'b Lwt_stream.t Lazy.t + { channel : 'b Eliom_comet.Channel.t + ; mutable channel_awake : bool + (** Whether [Eliom_comet.Channel.wake] was called before. *) ; queue : 'a Queue.t ; mutable max_size : int ; write : 'a list -> unit Lwt.t ; mutable waiter : unit -> unit Lwt.t - ; mutable last_wait : unit Lwt.t - ; mutable original_stream_available : bool - ; error_h : 'b option Lwt.t * exn Lwt.u } + ; mutable last_wait : unit Lwt.t } -(* clone streams such that each clone of the original stream raise the same exceptions *) -let consume (t, u) s = - let t' = - Lwt.catch - (fun () -> Lwt_stream.iter (fun _ -> ()) s) - (fun e -> - (match Lwt.state t with Lwt.Sleep -> Lwt.wakeup_exn u e | _ -> ()); - Lwt.fail e) - in - Lwt.choose [Lwt.bind t (fun _ -> Lwt.return_unit); t'] - -let clone_exn (t, u) s = - let s' = Lwt_stream.clone s in - Lwt_stream.from (fun () -> - Lwt.catch - (fun () -> Lwt.choose [Lwt_stream.get s'; t]) - (fun e -> - (match Lwt.state t with Lwt.Sleep -> Lwt.wakeup_exn u e | _ -> ()); - Lwt.fail e)) +type callback_id = Eliom_comet.Channel.callback_id type ('a, 'att, 'co, 'ext, 'reg) callable_bus_service = ( unit @@ -72,7 +53,7 @@ type ('a, 'att, 'co, 'ext, 'reg) callable_bus_service = , Eliom_registration.Action.return ) Eliom_service.t -let create service channel waiter = +let create service wrapped_channel waiter = let write x = Lwt.catch (fun () -> @@ -86,42 +67,14 @@ let create service channel waiter = | Eliom_request.Failed_request 204 -> Lwt.return_unit | exc -> Lwt.reraise exc) in - let error_h = - let t, u = Lwt.wait () in - ( Lwt.catch - (fun () -> - let* _ = t in - assert false) - (fun e -> Lwt.fail e) - , u ) - in - let stream = - lazy - (let stream = Eliom_comet.register channel in - (* iterate on the stream to consume messages: avoid memory leak *) - let _ = consume error_h stream in - stream) - in - let t = - { channel - ; stream - ; queue = Queue.create () - ; max_size = 20 - ; write - ; waiter - ; last_wait = Lwt.return_unit - ; original_stream_available = true - ; error_h } - in - (* the comet channel start receiving after the load phase, so the - original channel (i.e. without message lost) is only available in - the first loading phase. *) - let _ = - let* () = Eliom_client.wait_load_end () in - t.original_stream_available <- false; - Lwt.return_unit - in - t + let channel = Eliom_comet.register ~wake:false wrapped_channel in + { channel + ; channel_awake = false + ; queue = Queue.create () + ; max_size = 20 + ; write + ; waiter + ; last_wait = Lwt.return_unit } let internal_unwrap ((wrapped_bus : ('a, 'b) Ecb.wrapped_bus), _unwrapper) = let waiter () = Js_of_ocaml_lwt.Lwt_js.sleep 0.05 in @@ -131,14 +84,22 @@ let internal_unwrap ((wrapped_bus : ('a, 'b) Ecb.wrapped_bus), _unwrapper) = let () = Eliom_unwrap.register_unwrapper Eliom_common.bus_unwrap_id internal_unwrap -let stream t = clone_exn t.error_h (Lazy.force t.stream) +let register t callback = + let callback_id = Eliom_comet.Channel.register t.channel callback in + if not t.channel_awake + then ( + Eliom_comet.Channel.wake t.channel; + t.channel_awake <- true); + callback_id -let original_stream t = - if Eliom_client_core.in_onload () && t.original_stream_available - then stream t - else - raise_error ~section - "original_stream: the original stream is not available anymore" +let unregister t id = Eliom_comet.Channel.unregister t.channel id + +let stream t = + let stream, push = Lwt_stream.create () in + let _ = register t (fun data -> push data; Lwt.return_unit) in + stream + +let original_stream = stream let flush t = let l = List.rev (Queue.fold (fun l v -> v :: l) [] t.queue) in @@ -155,7 +116,7 @@ let try_flush t = Lwt.return_unit let write t v = Queue.add v t.queue; try_flush t -let close {channel; _} = Eliom_comet.close channel +let close {channel; _} = Eliom_comet.Channel.close channel let set_queue_size b s = b.max_size <- s let set_time_before_flush b t = diff --git a/src/lib/eliom_bus.client.mli b/src/lib/eliom_bus.client.mli index 06950d839..f3b57506e 100644 --- a/src/lib/eliom_bus.client.mli +++ b/src/lib/eliom_bus.client.mli @@ -25,19 +25,26 @@ type ('a, 'b) t +type callback_id +(** Handler returned by {!register} that allows unregistering a callback later + with {!unregister}. *) + +val register : ('a, 'b) t -> ('b option -> unit Lwt.t) -> callback_id +(** Register a callback that will get called on every messages from the server. + Messages received before the call to [register] are lost. The callback is + called with [Some data] when receiving a message from the server or with + [None] when no more data will be received. *) + +val unregister : ('a, 'b) t -> callback_id -> unit +(** Unregister a callback previously registered with {!register}, which will + stop receiving new messages. No-op if the callback was unregistered before. *) + val stream : ('a, 'b) t -> 'b Lwt_stream.t -(** [stream b] returns the stream of data sent to bus [b]. A new - stream is created each time this function is called. Some messages - from the bus can be lost if they were sent before the call to - [stream]. If you need to receive every message, use original stream - instead. *) +(** Create a new stream from the messages from the server. This has the same + behavior as {!register}. *) val original_stream : ('a, 'b) t -> 'b Lwt_stream.t -(** [stream b] returns the stream of data sent to bus [b]. A new - stream is created each time this function is called. Every - messages sent to the bus after the generation of the page are - received. This function can be called only in the onload event - handler, if called outside, it will raise a Failure. *) +(** @deprecated Deprecated alias to [stream]. *) val write : ('a, 'b) t -> 'a -> unit Lwt.t (** [write b v] send [v] to the bus [b]. Every participant of the bus diff --git a/src/lib/eliom_comet.client.ml b/src/lib/eliom_comet.client.ml index 119d65407..ce51852d9 100644 --- a/src/lib/eliom_comet.client.ml +++ b/src/lib/eliom_comet.client.ml @@ -635,18 +635,117 @@ end = struct handle_visibility hd; hd end +module Position = struct + type relation = + | Equal + (* stateless after channels *) + | Greater + (* stateless newest channels *) + + type t = + | No_position (* stateful channels*) + | Position of relation * int option ref + (* stateless channels *) + + let of_kind = function + | Ecb.After_kind i -> Position (Equal, ref (Some i)) + | Ecb.Newest_kind i -> Position (Greater, ref (Some i)) + | Ecb.Last_kind None -> Position (Greater, ref None) + | Ecb.Last_kind (Some _) -> Position (Equal, ref None) + + let check_and_update position msg_pos data = + match position, msg_pos, data with + | No_position, None, _ -> true + | No_position, Some _, _ | Position _, None, Ecb.Data _ -> + raise_error ~section + "check_position: channel kind and message do not match" + | Position _, None, (Ecb.Full | Ecb.Closed) -> true + | Position (relation, r), Some j, _ -> ( + match !r with + | None -> + r := Some (j + 1); + true + | Some i -> + if match relation with Equal -> j = i | Greater -> j >= i + then ( + r := Some (j + 1); + true) + else false) +end + +let unmarshal s : 'a = Eliom_unwrap.unwrap (Eliom_lib.Url.decode s) 0 + +module StringTbl = Hashtbl.Make (struct + type t = string + + let equal = ( = ) + let hash = Hashtbl.hash + end) + +type callback = + | Cb : + { c_id : int (** Unique identifier used for unregistration. *) + ; c_pos : Position.t + ; c_callback : 'a option -> unit Lwt.t } + -> callback + type 'a handler = { hd_service_handler : 'a Service_handler.t - ; hd_stream : (string * int option * string Ecb.channel_data) Lwt_stream.t } + ; hd_callbacks : callback list StringTbl.t + (** Callbacks are grouped by their channel ID. *) } + +let wait_data_daemon hd = + let on_error _ = + (* Notify callbacks of an error and release callbacks to avoid memory leaks. *) + let callbacks = + StringTbl.fold + (fun _k cs acc -> List.rev_append cs acc) + hd.hd_callbacks [] + in + StringTbl.reset hd.hd_callbacks; + Lwt_list.iter_s (fun (Cb c) -> c.c_callback None) callbacks + in + let handle_message (id, pos, data) = + match StringTbl.find hd.hd_callbacks id with + | exception Not_found -> Lwt.return_unit + | cs -> + Lwt_list.iter_s + (fun (Cb c) -> + if Position.check_and_update c.c_pos pos data + then + match data with + | Ecb.Full | Closed -> c.c_callback None + | Data x -> c.c_callback (Some (unmarshal x)) + else Lwt.return_unit) + cs + in + let rec wait_data_loop () = + Lwt.try_bind + (fun () -> Service_handler.wait_data hd.hd_service_handler) + (fun messages -> + let* () = Lwt_list.iter_s handle_message messages in + wait_data_loop ()) + on_error + in + Lwt.async wait_data_loop + +let register_callback hd chan_id callback = + let cs = + match StringTbl.find_opt hd.hd_callbacks chan_id with + | Some cs -> cs + | None -> [] + in + StringTbl.replace hd.hd_callbacks chan_id (callback :: cs) + +let unregister_callback hd chan_id id = + try + StringTbl.find hd.hd_callbacks chan_id + |> List.filter (fun (Cb c) -> c.c_id <> id) + |> StringTbl.replace hd.hd_callbacks chan_id + with Not_found -> () -let handler_stream hd = - Lwt_stream.map_list - (fun x -> x) - (Lwt_stream.from (fun () -> - Lwt.try_bind - (fun () -> Service_handler.wait_data hd) - (fun s -> Lwt.return_some s) - (fun _ -> Lwt.return_none))) +let unregister_all_callbacks hd chan_id = + StringTbl.remove hd.hd_callbacks chan_id let stateful_handler_table : (Ecb.comet_service, Service_handler.stateful handler) Hashtbl.t @@ -660,8 +759,9 @@ let stateless_handler_table : let init (service : Ecb.comet_service) kind table = let hd_service_handler = Service_handler.make service kind in - let hd_stream = handler_stream hd_service_handler in - let hd = {hd_service_handler; hd_stream} in + let hd_callbacks = StringTbl.create 16 in + let hd = {hd_service_handler; hd_callbacks} in + wait_data_daemon hd; Hashtbl.add table service hd; hd @@ -693,104 +793,59 @@ let restart () = Hashtbl.iter f stateless_handler_table; Hashtbl.iter f stateful_handler_table -let close = function - | Ecb.Stateful_channel (chan_service, chan_id) -> - let {hd_service_handler; _} = get_stateful_hd chan_service in - Service_handler.close hd_service_handler (Ecb.string_of_chan_id chan_id) - | Ecb.Stateless_channel (chan_service, chan_id, _kind) -> - let {hd_service_handler; _} = get_stateless_hd chan_service in - Service_handler.close hd_service_handler (Ecb.string_of_chan_id chan_id) +module Channel = struct + type 'a t = + | C : {hd : _ handler; chan_pos : Position.t; chan_id : string} -> 'a t -let unmarshal s : 'a = Eliom_unwrap.unwrap (Eliom_lib.Url.decode s) 0 + type callback_id = int -type position_relation = - | Equal - (* stateless after channels *) - | Greater -(* stateless newest channels *) - -type position = - | No_position (* stateful channels*) - | Position of position_relation * int option ref -(* stateless channels *) - -let position_of_kind = function - | Ecb.After_kind i -> Position (Equal, ref (Some i)) - | Ecb.Newest_kind i -> Position (Greater, ref (Some i)) - | Ecb.Last_kind None -> Position (Greater, ref None) - | Ecb.Last_kind (Some _) -> Position (Equal, ref None) - -let check_and_update_position position msg_pos data = - match position, msg_pos, data with - | No_position, None, _ -> true - | No_position, Some _, _ | Position _, None, Ecb.Data _ -> - raise_error ~section - "check_position: channel kind and message do not match" - | Position _, None, (Ecb.Full | Ecb.Closed) -> true - | Position (relation, r), Some j, _ -> ( - match !r with - | None -> - r := Some (j + 1); - true - | Some i -> - if match relation with Equal -> j = i | Greater -> j >= i - then ( - r := Some (j + 1); - true) - else false) + let next_callback_id = + let i = ref 0 in + fun () -> incr i; !i -(* stateless channels are registered with a position: when a channel - is registered more than one time, it is possible to receive old - messages: the position is used to filter them out. *) -let register' hd position (_ : Ecb.comet_service) (chan_id : 'a Ecb.chan_id) = - let chan_id = Ecb.string_of_chan_id chan_id in - let stream = - Lwt_stream.filter_map_s - (function - | id, pos, data - when id = chan_id && check_and_update_position position pos data -> ( - match data with - | Ecb.Full -> Lwt.fail Channel_full - | Ecb.Closed -> Lwt.fail Channel_closed - | Ecb.Data x -> Lwt.return_some (unmarshal x : 'a)) - | _ -> Lwt.return_none) - (Lwt_stream.clone hd.hd_stream) - in - let protect_and_close t = - let t' = Lwt.protected t in - Lwt.on_cancel t' (fun () -> - Service_handler.close hd.hd_service_handler chan_id); - t' - in - (* protect the stream from cancels *) - Lwt_stream.from (fun () -> protect_and_close (Lwt_stream.get stream)) + let make hd chan_pos chan_id = C {hd; chan_pos; chan_id} + let wake (C {hd; _}) = Service_handler.activate hd.hd_service_handler -let register_stateful ?(wake = true) service chan_id = + let close (C {hd; chan_id; _}) = + Service_handler.close hd.hd_service_handler chan_id; + unregister_all_callbacks hd chan_id + + (* stateless channels are registered with a position: when a channel is + registered more than one time, it is possible to receive old messages: the + position is used to filter them out. *) + let register (C {hd; chan_pos = c_pos; chan_id}) c_callback = + let c_id = next_callback_id () in + register_callback hd chan_id (Cb {c_id; c_pos; c_callback}); + c_id + + let unregister (C {hd; chan_id; _}) id = unregister_callback hd chan_id id +end + +let register_stateful service chan_id = let hd = get_stateful_hd service in - let stream = register' hd No_position service chan_id in let chan_id = Ecb.string_of_chan_id chan_id in Service_handler.add_channel_stateful hd.hd_service_handler chan_id; - if wake then Service_handler.activate hd.hd_service_handler; - stream + Channel.make hd No_position chan_id -let register_stateless ?(wake = true) service chan_id kind = +let register_stateless service chan_id kind = let hd = get_stateless_hd service in - let stream = register' hd (position_of_kind kind) service chan_id in let chan_id = Ecb.string_of_chan_id chan_id in Service_handler.add_channel_stateless hd.hd_service_handler chan_id kind; - if wake then Service_handler.activate hd.hd_service_handler; - stream + Channel.make hd (Position.of_kind kind) chan_id -let register ?(wake = true) (wrapped_chan : 'a Ecb.wrapped_channel) = +let unwrap (wrapped_chan : 'a Ecb.wrapped_channel) : 'a Channel.t = match wrapped_chan with - | Ecb.Stateful_channel (s, c) -> register_stateful ~wake s c - | Ecb.Stateless_channel (s, c, kind) -> register_stateless ~wake s c kind + | Ecb.Stateful_channel (s, c) -> register_stateful s c + | Ecb.Stateless_channel (s, c, kind) -> register_stateless s c kind -let internal_unwrap (wrapped_chan, _unwrapper) = register wrapped_chan +let register ?(wake = true) wrapped_chan = + let chan = unwrap wrapped_chan in + if wake then Channel.wake chan; + chan let () = Eliom_unwrap.register_unwrapper Eliom_common.comet_channel_unwrap_id - internal_unwrap + (fun (wrapped_chan, _unwrapper) -> register wrapped_chan) let is_active () = (*VVV Check. Isn't it the contrary? (fold from `Inactive?) *) @@ -808,8 +863,4 @@ let is_active () = max (Hashtbl.fold f stateless_handler_table `Active) (fun () -> Hashtbl.fold f stateful_handler_table `Active) -module Channel = struct - type 'a t = 'a Lwt_stream.t -end - let force_link = () diff --git a/src/lib/eliom_comet.client.mli b/src/lib/eliom_comet.client.mli index bd12ee987..563fbb90e 100644 --- a/src/lib/eliom_comet.client.mli +++ b/src/lib/eliom_comet.client.mli @@ -35,15 +35,10 @@ will close the channel. *) exception Channel_full -(** [Channel_full] is raised when trying to read on a channel marked - full by the server. It is not possible to read anything else from a - full channel. *) +(** @deprecated Not raised anymore. *) exception Channel_closed -(** [Channel_closed] is raised when reading on a channel and the - server side of the application closed channel ( the server was restarted, - a session was closed, or a stateless channel was garbage collected). - *) +(** @deprecated Not raised anymore. *) val is_active : unit -> [`Active | `Idle | `Inactive] (** [is_active ()] returns the current activity state *) @@ -115,17 +110,40 @@ module Configuration : sig end module Channel : sig - type 'a t = 'a Lwt_stream.t + type 'a t + + type callback_id + (** Handler returned by {!register} that allows unregistering a callback + later with {!unregister}. *) + + val register : 'a t -> ('a option -> unit Lwt.t) -> callback_id + (** [register chan callback] registers a callback to be called for new messages + from the server. The callback receives [Some data] for each new messages + from the server and [None] when the server closes the channel or an error + occurs. Not thread-safe. *) + + val unregister : 'a t -> callback_id -> unit + (** Unregister a callback previously registered with {!register}, which will + stop receiving new messages. No-op if the callback was unregistered before. *) + + val close : 'a t -> unit + (** Unregister all callbacks associated to the given channel and close it. + The channel will not receive any more messages. *) + + (**/**) + + val wake : 'a t -> unit + (** Activate the handling loop, making sure the channel can receive messages. + No request will be sent. *) + + (**/**) end (**/**) -val register : - ?wake:bool - -> 'a Eliom_comet_base.wrapped_channel - -> 'a Lwt_stream.t -(** if wake is false, the registration of the channel won't - activate the handling loop ( no request will be sent ). Default is true *) +val register : ?wake:bool -> 'a Eliom_comet_base.wrapped_channel -> 'a Channel.t +(** The [~wake] argument controls whether [Channel.wake] is called on the + channel. Default is [true]. *) val restart : unit -> unit (** [restart ()] Restarts the loop waiting for server messages. It is @@ -135,11 +153,6 @@ val restart : unit -> unit case, preventing client code from receiving the failure notification. This shouldn't be used by average user. *) -val close : 'a Eliom_comet_base.wrapped_channel -> unit -(** [close c] closes the channel c. This function should be only use - internally. The normal way to close a channel is to cancel a thread - waiting on inputs. *) - val force_link : unit val handle_exn : ?exn:exn -> unit -> unit Lwt.t