Skip to content
99 changes: 30 additions & 69 deletions src/lib/eliom_bus.client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -28,35 +28,16 @@ let section = Logs.Src.create "eliom:bus"
module Ecb = Eliom_comet_base

type ('a, 'b) t =
{ channel : 'b Ecb.wrapped_channel
; stream : 'b Lwt_stream.t Lazy.t
{ channel : 'b Eliom_comet.Channel.t
; mutable channel_awake : bool
(** Whether [Eliom_comet.Channel.wake] was called before. *)
; queue : 'a Queue.t
; mutable max_size : int
; write : 'a list -> unit Lwt.t
; mutable waiter : unit -> unit Lwt.t
; mutable last_wait : unit Lwt.t
; mutable original_stream_available : bool
; error_h : 'b option Lwt.t * exn Lwt.u }
; mutable last_wait : unit Lwt.t }

(* clone streams such that each clone of the original stream raise the same exceptions *)
let consume (t, u) s =
let t' =
Lwt.catch
(fun () -> Lwt_stream.iter (fun _ -> ()) s)
(fun e ->
(match Lwt.state t with Lwt.Sleep -> Lwt.wakeup_exn u e | _ -> ());
Lwt.fail e)
in
Lwt.choose [Lwt.bind t (fun _ -> Lwt.return_unit); t']

let clone_exn (t, u) s =
let s' = Lwt_stream.clone s in
Lwt_stream.from (fun () ->
Lwt.catch
(fun () -> Lwt.choose [Lwt_stream.get s'; t])
(fun e ->
(match Lwt.state t with Lwt.Sleep -> Lwt.wakeup_exn u e | _ -> ());
Lwt.fail e))
type callback_id = Eliom_comet.Channel.callback_id

type ('a, 'att, 'co, 'ext, 'reg) callable_bus_service =
( unit
Expand All @@ -72,7 +53,7 @@ type ('a, 'att, 'co, 'ext, 'reg) callable_bus_service =
, Eliom_registration.Action.return )
Eliom_service.t

let create service channel waiter =
let create service wrapped_channel waiter =
let write x =
Lwt.catch
(fun () ->
Expand All @@ -86,42 +67,14 @@ let create service channel waiter =
| Eliom_request.Failed_request 204 -> Lwt.return_unit
| exc -> Lwt.reraise exc)
in
let error_h =
let t, u = Lwt.wait () in
( Lwt.catch
(fun () ->
let* _ = t in
assert false)
(fun e -> Lwt.fail e)
, u )
in
let stream =
lazy
(let stream = Eliom_comet.register channel in
(* iterate on the stream to consume messages: avoid memory leak *)
let _ = consume error_h stream in
stream)
in
let t =
{ channel
; stream
; queue = Queue.create ()
; max_size = 20
; write
; waiter
; last_wait = Lwt.return_unit
; original_stream_available = true
; error_h }
in
(* the comet channel start receiving after the load phase, so the
original channel (i.e. without message lost) is only available in
the first loading phase. *)
let _ =
let* () = Eliom_client.wait_load_end () in
t.original_stream_available <- false;
Lwt.return_unit
in
t
let channel = Eliom_comet.register ~wake:false wrapped_channel in
{ channel
; channel_awake = false
; queue = Queue.create ()
; max_size = 20
; write
; waiter
; last_wait = Lwt.return_unit }

let internal_unwrap ((wrapped_bus : ('a, 'b) Ecb.wrapped_bus), _unwrapper) =
let waiter () = Js_of_ocaml_lwt.Lwt_js.sleep 0.05 in
Expand All @@ -131,14 +84,22 @@ let internal_unwrap ((wrapped_bus : ('a, 'b) Ecb.wrapped_bus), _unwrapper) =
let () =
Eliom_unwrap.register_unwrapper Eliom_common.bus_unwrap_id internal_unwrap

let stream t = clone_exn t.error_h (Lazy.force t.stream)
let register t callback =
let callback_id = Eliom_comet.Channel.register t.channel callback in
if not t.channel_awake
then (
Eliom_comet.Channel.wake t.channel;
t.channel_awake <- true);
callback_id

let original_stream t =
if Eliom_client_core.in_onload () && t.original_stream_available
then stream t
else
raise_error ~section
"original_stream: the original stream is not available anymore"
let unregister t id = Eliom_comet.Channel.unregister t.channel id

let stream t =
let stream, push = Lwt_stream.create () in
let _ = register t (fun data -> push data; Lwt.return_unit) in
stream

let original_stream = stream

let flush t =
let l = List.rev (Queue.fold (fun l v -> v :: l) [] t.queue) in
Expand All @@ -155,7 +116,7 @@ let try_flush t =
Lwt.return_unit

let write t v = Queue.add v t.queue; try_flush t
let close {channel; _} = Eliom_comet.close channel
let close {channel; _} = Eliom_comet.Channel.close channel
let set_queue_size b s = b.max_size <- s

let set_time_before_flush b t =
Expand Down
27 changes: 17 additions & 10 deletions src/lib/eliom_bus.client.mli
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,26 @@

type ('a, 'b) t

type callback_id
(** Handler returned by {!register} that allows unregistering a callback later
with {!unregister}. *)

val register : ('a, 'b) t -> ('b option -> unit Lwt.t) -> callback_id
(** Register a callback that will get called on every messages from the server.
Messages received before the call to [register] are lost. The callback is
called with [Some data] when receiving a message from the server or with
[None] when no more data will be received. *)

val unregister : ('a, 'b) t -> callback_id -> unit
(** Unregister a callback previously registered with {!register}, which will
stop receiving new messages. No-op if the callback was unregistered before. *)

val stream : ('a, 'b) t -> 'b Lwt_stream.t
(** [stream b] returns the stream of data sent to bus [b]. A new
stream is created each time this function is called. Some messages
from the bus can be lost if they were sent before the call to
[stream]. If you need to receive every message, use original stream
instead. *)
(** Create a new stream from the messages from the server. This has the same
behavior as {!register}. *)

val original_stream : ('a, 'b) t -> 'b Lwt_stream.t
(** [stream b] returns the stream of data sent to bus [b]. A new
stream is created each time this function is called. Every
messages sent to the bus after the generation of the page are
received. This function can be called only in the onload event
handler, if called outside, it will raise a Failure. *)
(** @deprecated Deprecated alias to [stream]. *)

val write : ('a, 'b) t -> 'a -> unit Lwt.t
(** [write b v] send [v] to the bus [b]. Every participant of the bus
Expand Down
Loading
Loading