From 2636c4189223b2fde935e2b131a9cbc810db0ef8 Mon Sep 17 00:00:00 2001 From: Jules Aguillon Date: Thu, 10 Jul 2025 12:36:39 +0200 Subject: [PATCH 01/10] client: Eliom_comet: Remove usage of `Lwt_stream` `Lwt_stream` is difficult to migrate to other concurrency libraries because of its vast API and its many usecases. Its usage in `Eliom_comet` can easily be changed to a callback-based approach. This might increase performances as well. --- src/lib/eliom_bus.client.ml | 25 +++------- src/lib/eliom_comet.client.ml | 87 +++++++++++++++++----------------- src/lib/eliom_comet.client.mli | 10 ++-- 3 files changed, 57 insertions(+), 65 deletions(-) diff --git a/src/lib/eliom_bus.client.ml b/src/lib/eliom_bus.client.ml index 2440901be..49002280b 100644 --- a/src/lib/eliom_bus.client.ml +++ b/src/lib/eliom_bus.client.ml @@ -38,17 +38,6 @@ type ('a, 'b) t = ; mutable original_stream_available : bool ; error_h : 'b option Lwt.t * exn Lwt.u } -(* clone streams such that each clone of the original stream raise the same exceptions *) -let consume (t, u) s = - let t' = - Lwt.catch - (fun () -> Lwt_stream.iter (fun _ -> ()) s) - (fun e -> - (match Lwt.state t with Lwt.Sleep -> Lwt.wakeup_exn u e | _ -> ()); - Lwt.fail e) - in - Lwt.choose [Lwt.bind t (fun _ -> Lwt.return_unit); t'] - let clone_exn (t, u) s = let s' = Lwt_stream.clone s in Lwt_stream.from (fun () -> @@ -72,6 +61,12 @@ type ('a, 'att, 'co, 'ext, 'reg) callable_bus_service = , Eliom_registration.Action.return ) Eliom_service.t +(** Wrap [Eliom_comet.register] into a [Lwt_stream]. *) +let comet_register_stream chan = + let stream, push = Lwt_stream.create () in + Eliom_comet.register chan (fun x -> push (Some x); Lwt.return_unit); + stream + let create service channel waiter = let write x = Lwt.catch @@ -95,13 +90,7 @@ let create service channel waiter = (fun e -> Lwt.fail e) , u ) in - let stream = - lazy - (let stream = Eliom_comet.register channel in - (* iterate on the stream to consume messages: avoid memory leak *) - let _ = consume error_h stream in - stream) - in + let stream = lazy (comet_register_stream channel) in let t = { channel ; stream diff --git a/src/lib/eliom_comet.client.ml b/src/lib/eliom_comet.client.ml index 119d65407..edfe259b1 100644 --- a/src/lib/eliom_comet.client.ml +++ b/src/lib/eliom_comet.client.ml @@ -637,16 +637,22 @@ end type 'a handler = { hd_service_handler : 'a Service_handler.t - ; hd_stream : (string * int option * string Ecb.channel_data) Lwt_stream.t } + ; mutable hd_callbacks : + (string * int option * string Ecb.channel_data -> unit Lwt.t) list } -let handler_stream hd = - Lwt_stream.map_list - (fun x -> x) - (Lwt_stream.from (fun () -> - Lwt.try_bind - (fun () -> Service_handler.wait_data hd) - (fun s -> Lwt.return_some s) - (fun _ -> Lwt.return_none))) +let wait_data_daemon hd = + let notify_callbacks data = + Lwt_list.iter_s (fun callback -> callback data) hd.hd_callbacks + in + let rec wait_data_loop () = + let* data = Service_handler.wait_data hd.hd_service_handler in + let* () = Lwt_list.iter_s notify_callbacks data in + wait_data_loop () + in + Lwt.async wait_data_loop + +let register_callback hd callback = + hd.hd_callbacks <- callback :: hd.hd_callbacks let stateful_handler_table : (Ecb.comet_service, Service_handler.stateful handler) Hashtbl.t @@ -660,8 +666,8 @@ let stateless_handler_table : let init (service : Ecb.comet_service) kind table = let hd_service_handler = Service_handler.make service kind in - let hd_stream = handler_stream hd_service_handler in - let hd = {hd_service_handler; hd_stream} in + let hd = {hd_service_handler; hd_callbacks = []} in + wait_data_daemon hd; Hashtbl.add table service hd; hd @@ -742,49 +748,42 @@ let check_and_update_position position msg_pos data = (* stateless channels are registered with a position: when a channel is registered more than one time, it is possible to receive old messages: the position is used to filter them out. *) -let register' hd position (_ : Ecb.comet_service) (chan_id : 'a Ecb.chan_id) = +let register' + hd + position + (_ : Ecb.comet_service) + (chan_id : 'a Ecb.chan_id) + callback + = let chan_id = Ecb.string_of_chan_id chan_id in - let stream = - Lwt_stream.filter_map_s - (function - | id, pos, data - when id = chan_id && check_and_update_position position pos data -> ( - match data with - | Ecb.Full -> Lwt.fail Channel_full - | Ecb.Closed -> Lwt.fail Channel_closed - | Ecb.Data x -> Lwt.return_some (unmarshal x : 'a)) - | _ -> Lwt.return_none) - (Lwt_stream.clone hd.hd_stream) - in - let protect_and_close t = - let t' = Lwt.protected t in - Lwt.on_cancel t' (fun () -> - Service_handler.close hd.hd_service_handler chan_id); - t' - in - (* protect the stream from cancels *) - Lwt_stream.from (fun () -> protect_and_close (Lwt_stream.get stream)) - -let register_stateful ?(wake = true) service chan_id = + register_callback hd (function + | id, pos, data + when id = chan_id && check_and_update_position position pos data -> ( + match data with + | Ecb.Full -> Lwt.return_unit + | Ecb.Closed -> (* TODO: Notify callback and unregister *) Lwt.return_unit + | Ecb.Data x -> callback (unmarshal x : 'a)) + | _ -> Lwt.return_unit) + +let register_stateful ?(wake = true) service chan_id callback = let hd = get_stateful_hd service in - let stream = register' hd No_position service chan_id in + register' hd No_position service chan_id callback; let chan_id = Ecb.string_of_chan_id chan_id in Service_handler.add_channel_stateful hd.hd_service_handler chan_id; - if wake then Service_handler.activate hd.hd_service_handler; - stream + if wake then Service_handler.activate hd.hd_service_handler -let register_stateless ?(wake = true) service chan_id kind = +let register_stateless ?(wake = true) service chan_id kind callback = let hd = get_stateless_hd service in - let stream = register' hd (position_of_kind kind) service chan_id in + register' hd (position_of_kind kind) service chan_id callback; let chan_id = Ecb.string_of_chan_id chan_id in Service_handler.add_channel_stateless hd.hd_service_handler chan_id kind; - if wake then Service_handler.activate hd.hd_service_handler; - stream + if wake then Service_handler.activate hd.hd_service_handler -let register ?(wake = true) (wrapped_chan : 'a Ecb.wrapped_channel) = +let register ?(wake = true) (wrapped_chan : 'a Ecb.wrapped_channel) callback = match wrapped_chan with - | Ecb.Stateful_channel (s, c) -> register_stateful ~wake s c - | Ecb.Stateless_channel (s, c, kind) -> register_stateless ~wake s c kind + | Ecb.Stateful_channel (s, c) -> register_stateful ~wake s c callback + | Ecb.Stateless_channel (s, c, kind) -> + register_stateless ~wake s c kind callback let internal_unwrap (wrapped_chan, _unwrapper) = register wrapped_chan diff --git a/src/lib/eliom_comet.client.mli b/src/lib/eliom_comet.client.mli index bd12ee987..704a7412e 100644 --- a/src/lib/eliom_comet.client.mli +++ b/src/lib/eliom_comet.client.mli @@ -123,9 +123,13 @@ end val register : ?wake:bool -> 'a Eliom_comet_base.wrapped_channel - -> 'a Lwt_stream.t -(** if wake is false, the registration of the channel won't - activate the handling loop ( no request will be sent ). Default is true *) + -> ('a -> unit Lwt.t) + -> unit +(** [register ~wake chan callback] registers a callback to be called for new + messages from the server. If wake is false, the registration of the channel + won't activate the handling loop ( no request will be sent ). Default is + true. + Not thread-safe. *) val restart : unit -> unit (** [restart ()] Restarts the loop waiting for server messages. It is From 79167dc1f5c48883f032b9326964743c849fd778 Mon Sep 17 00:00:00 2001 From: Jules Aguillon Date: Thu, 10 Jul 2025 15:50:29 +0200 Subject: [PATCH 02/10] client: Eliom_bus: Remove usage of `Lwt_stream` Add a new callback-based API, `Eliom_bus.register`, which avoids using `Lwt_stream` internally. `Eliom_bus.stream` is re-implemented against the register API. `Eliom_bus.original_stream` is not re-implemented yet. --- src/lib/eliom_bus.client.ml | 75 ++++++++++++++---------------------- src/lib/eliom_bus.client.mli | 17 ++++---- 2 files changed, 36 insertions(+), 56 deletions(-) diff --git a/src/lib/eliom_bus.client.ml b/src/lib/eliom_bus.client.ml index 49002280b..685843e0e 100644 --- a/src/lib/eliom_bus.client.ml +++ b/src/lib/eliom_bus.client.ml @@ -27,25 +27,16 @@ let section = Logs.Src.create "eliom:bus" module Ecb = Eliom_comet_base +type 'a consumers = {mutable consumers : ('a -> unit Lwt.t) list} + type ('a, 'b) t = { channel : 'b Ecb.wrapped_channel - ; stream : 'b Lwt_stream.t Lazy.t + ; consumers : 'b consumers Lazy.t ; queue : 'a Queue.t ; mutable max_size : int ; write : 'a list -> unit Lwt.t ; mutable waiter : unit -> unit Lwt.t - ; mutable last_wait : unit Lwt.t - ; mutable original_stream_available : bool - ; error_h : 'b option Lwt.t * exn Lwt.u } - -let clone_exn (t, u) s = - let s' = Lwt_stream.clone s in - Lwt_stream.from (fun () -> - Lwt.catch - (fun () -> Lwt.choose [Lwt_stream.get s'; t]) - (fun e -> - (match Lwt.state t with Lwt.Sleep -> Lwt.wakeup_exn u e | _ -> ()); - Lwt.fail e)) + ; mutable last_wait : unit Lwt.t } type ('a, 'att, 'co, 'ext, 'reg) callable_bus_service = ( unit @@ -61,11 +52,14 @@ type ('a, 'att, 'co, 'ext, 'reg) callable_bus_service = , Eliom_registration.Action.return ) Eliom_service.t -(** Wrap [Eliom_comet.register] into a [Lwt_stream]. *) -let comet_register_stream chan = - let stream, push = Lwt_stream.create () in - Eliom_comet.register chan (fun x -> push (Some x); Lwt.return_unit); - stream +(** Register a callback in the underlying comet. *) +let comet_register ~error_h:_ chan = + let t = {consumers = []} in + Eliom_comet.register chan (fun x -> + (* TODO: On error, [Lwt.wakeup_exn] on [error_h]. *) + (* TODO: Propagate errors from [Eliom_comet]. *) + Lwt_list.iter_s (fun callback -> callback x) t.consumers); + t let create service channel waiter = let write x = @@ -90,27 +84,14 @@ let create service channel waiter = (fun e -> Lwt.fail e) , u ) in - let stream = lazy (comet_register_stream channel) in - let t = - { channel - ; stream - ; queue = Queue.create () - ; max_size = 20 - ; write - ; waiter - ; last_wait = Lwt.return_unit - ; original_stream_available = true - ; error_h } - in - (* the comet channel start receiving after the load phase, so the - original channel (i.e. without message lost) is only available in - the first loading phase. *) - let _ = - let* () = Eliom_client.wait_load_end () in - t.original_stream_available <- false; - Lwt.return_unit - in - t + let consumers = lazy (comet_register ~error_h channel) in + { channel + ; consumers + ; queue = Queue.create () + ; max_size = 20 + ; write + ; waiter + ; last_wait = Lwt.return_unit } let internal_unwrap ((wrapped_bus : ('a, 'b) Ecb.wrapped_bus), _unwrapper) = let waiter () = Js_of_ocaml_lwt.Lwt_js.sleep 0.05 in @@ -120,14 +101,16 @@ let internal_unwrap ((wrapped_bus : ('a, 'b) Ecb.wrapped_bus), _unwrapper) = let () = Eliom_unwrap.register_unwrapper Eliom_common.bus_unwrap_id internal_unwrap -let stream t = clone_exn t.error_h (Lazy.force t.stream) +let register t callback = + let (lazy c) = t.consumers in + c.consumers <- callback :: c.consumers -let original_stream t = - if Eliom_client_core.in_onload () && t.original_stream_available - then stream t - else - raise_error ~section - "original_stream: the original stream is not available anymore" +let stream t = + let stream, push = Lwt_stream.create () in + register t (fun data -> push (Some data); Lwt.return_unit); + stream + +let original_stream = stream let flush t = let l = List.rev (Queue.fold (fun l v -> v :: l) [] t.queue) in diff --git a/src/lib/eliom_bus.client.mli b/src/lib/eliom_bus.client.mli index 06950d839..708499717 100644 --- a/src/lib/eliom_bus.client.mli +++ b/src/lib/eliom_bus.client.mli @@ -25,19 +25,16 @@ type ('a, 'b) t +val register : ('a, 'b) t -> ('b -> unit Lwt.t) -> unit +(** Register a callback that will get called on every messages from the server. + Messages received before the call to [register] are lost. *) + val stream : ('a, 'b) t -> 'b Lwt_stream.t -(** [stream b] returns the stream of data sent to bus [b]. A new - stream is created each time this function is called. Some messages - from the bus can be lost if they were sent before the call to - [stream]. If you need to receive every message, use original stream - instead. *) +(** Create a new stream from the messages from the server. This has the same + behavior as {!register}. *) val original_stream : ('a, 'b) t -> 'b Lwt_stream.t -(** [stream b] returns the stream of data sent to bus [b]. A new - stream is created each time this function is called. Every - messages sent to the bus after the generation of the page are - received. This function can be called only in the onload event - handler, if called outside, it will raise a Failure. *) +(** @deprecated Deprecated alias to [stream]. *) val write : ('a, 'b) t -> 'a -> unit Lwt.t (** [write b v] send [v] to the bus [b]. Every participant of the bus From e21152691028fa1c5e753d8ed86cbd2bdc298692 Mon Sep 17 00:00:00 2001 From: Jules Aguillon Date: Fri, 11 Jul 2025 17:40:36 +0200 Subject: [PATCH 03/10] client: Eliom_bus: Propagate channel errors from Eliom_comet `Eliom_comet.register` and `Eliom_bus.register` propagate errors from the server using an `option` type. `Eliom_bus` ensures that the callback won't be called again after being called with `None`. This "end of stream or error" signal was present before through `Lwt_stream`, which could propagate the `Closed` exception. In both `Eliom_comet` and `Eliom_bus`, the callbacks are released when the channel closes to allow memory to be collected. --- src/lib/eliom_bus.client.ml | 34 ++++++++++++------------ src/lib/eliom_bus.client.mli | 6 +++-- src/lib/eliom_comet.client.ml | 48 +++++++++++++++++++++++----------- src/lib/eliom_comet.client.mli | 16 +++++------- 4 files changed, 60 insertions(+), 44 deletions(-) diff --git a/src/lib/eliom_bus.client.ml b/src/lib/eliom_bus.client.ml index 685843e0e..6f72c0944 100644 --- a/src/lib/eliom_bus.client.ml +++ b/src/lib/eliom_bus.client.ml @@ -27,7 +27,7 @@ let section = Logs.Src.create "eliom:bus" module Ecb = Eliom_comet_base -type 'a consumers = {mutable consumers : ('a -> unit Lwt.t) list} +type 'a consumers = {mutable consumers : ('a option -> unit Lwt.t) list} type ('a, 'b) t = { channel : 'b Ecb.wrapped_channel @@ -53,12 +53,21 @@ type ('a, 'att, 'co, 'ext, 'reg) callable_bus_service = Eliom_service.t (** Register a callback in the underlying comet. *) -let comet_register ~error_h:_ chan = +let comet_register chan = let t = {consumers = []} in - Eliom_comet.register chan (fun x -> - (* TODO: On error, [Lwt.wakeup_exn] on [error_h]. *) - (* TODO: Propagate errors from [Eliom_comet]. *) - Lwt_list.iter_s (fun callback -> callback x) t.consumers); + let notify data = + Lwt_list.iter_s (fun callback -> callback data) t.consumers + in + let teardown () = + (* Notify that the channel reached its end. Clear the [consumers] list to + avoid memory leaks. *) + let* () = notify None in + t.consumers <- []; + Lwt.return_unit + in + Eliom_comet.register chan (function + | Some data -> notify (Some data) + | None -> teardown ()); t let create service channel waiter = @@ -75,16 +84,7 @@ let create service channel waiter = | Eliom_request.Failed_request 204 -> Lwt.return_unit | exc -> Lwt.reraise exc) in - let error_h = - let t, u = Lwt.wait () in - ( Lwt.catch - (fun () -> - let* _ = t in - assert false) - (fun e -> Lwt.fail e) - , u ) - in - let consumers = lazy (comet_register ~error_h channel) in + let consumers = lazy (comet_register channel) in { channel ; consumers ; queue = Queue.create () @@ -107,7 +107,7 @@ let register t callback = let stream t = let stream, push = Lwt_stream.create () in - register t (fun data -> push (Some data); Lwt.return_unit); + register t (fun data -> push data; Lwt.return_unit); stream let original_stream = stream diff --git a/src/lib/eliom_bus.client.mli b/src/lib/eliom_bus.client.mli index 708499717..40f5c8ba6 100644 --- a/src/lib/eliom_bus.client.mli +++ b/src/lib/eliom_bus.client.mli @@ -25,9 +25,11 @@ type ('a, 'b) t -val register : ('a, 'b) t -> ('b -> unit Lwt.t) -> unit +val register : ('a, 'b) t -> ('b option -> unit Lwt.t) -> unit (** Register a callback that will get called on every messages from the server. - Messages received before the call to [register] are lost. *) + Messages received before the call to [register] are lost. The callback is + called with [Some data] when receiving a message from the server or with + [None] when no more data will be received. *) val stream : ('a, 'b) t -> 'b Lwt_stream.t (** Create a new stream from the messages from the server. This has the same diff --git a/src/lib/eliom_comet.client.ml b/src/lib/eliom_comet.client.ml index edfe259b1..ac1ae55a3 100644 --- a/src/lib/eliom_comet.client.ml +++ b/src/lib/eliom_comet.client.ml @@ -638,22 +638,40 @@ end type 'a handler = { hd_service_handler : 'a Service_handler.t ; mutable hd_callbacks : - (string * int option * string Ecb.channel_data -> unit Lwt.t) list } + (string * int option * string Ecb.channel_data -> unit Lwt.t) list + ; mutable hd_error_callbacks : (unit -> unit Lwt.t) list } let wait_data_daemon hd = + let on_error _ = + (* Notify callbacks of an error and release callbacks to avoid memory leaks. *) + hd.hd_callbacks <- []; + let* () = + Lwt_list.iter_s (fun callback -> callback ()) hd.hd_error_callbacks + in + hd.hd_error_callbacks <- []; + Lwt.return_unit + in let notify_callbacks data = Lwt_list.iter_s (fun callback -> callback data) hd.hd_callbacks in let rec wait_data_loop () = - let* data = Service_handler.wait_data hd.hd_service_handler in - let* () = Lwt_list.iter_s notify_callbacks data in - wait_data_loop () + Lwt.try_bind + (fun () -> Service_handler.wait_data hd.hd_service_handler) + (fun data -> + let* () = Lwt_list.iter_s notify_callbacks data in + wait_data_loop ()) + on_error in Lwt.async wait_data_loop let register_callback hd callback = hd.hd_callbacks <- callback :: hd.hd_callbacks +(** Register a callback for when the channel closes and no more message will be + received for every channel ids. *) +let register_error_callback hd callback = + hd.hd_error_callbacks <- callback :: hd.hd_error_callbacks + let stateful_handler_table : (Ecb.comet_service, Service_handler.stateful handler) Hashtbl.t = @@ -666,7 +684,7 @@ let stateless_handler_table : let init (service : Ecb.comet_service) kind table = let hd_service_handler = Service_handler.make service kind in - let hd = {hd_service_handler; hd_callbacks = []} in + let hd = {hd_service_handler; hd_callbacks = []; hd_error_callbacks = []} in wait_data_daemon hd; Hashtbl.add table service hd; hd @@ -745,9 +763,9 @@ let check_and_update_position position msg_pos data = true) else false) -(* stateless channels are registered with a position: when a channel - is registered more than one time, it is possible to receive old - messages: the position is used to filter them out. *) +(* stateless channels are registered with a position: when a channel is + registered more than one time, it is possible to receive old messages: the + position is used to filter them out. *) let register' hd position @@ -756,14 +774,14 @@ let register' callback = let chan_id = Ecb.string_of_chan_id chan_id in - register_callback hd (function - | id, pos, data - when id = chan_id && check_and_update_position position pos data -> ( + register_callback hd (fun (id, pos, data) -> + if id = chan_id && check_and_update_position position pos data + then match data with - | Ecb.Full -> Lwt.return_unit - | Ecb.Closed -> (* TODO: Notify callback and unregister *) Lwt.return_unit - | Ecb.Data x -> callback (unmarshal x : 'a)) - | _ -> Lwt.return_unit) + | Ecb.Full | Closed -> callback None + | Data x -> callback (Some (unmarshal x : 'a)) + else Lwt.return_unit); + register_error_callback hd (fun () -> callback None) let register_stateful ?(wake = true) service chan_id callback = let hd = get_stateful_hd service in diff --git a/src/lib/eliom_comet.client.mli b/src/lib/eliom_comet.client.mli index 704a7412e..596fe73cf 100644 --- a/src/lib/eliom_comet.client.mli +++ b/src/lib/eliom_comet.client.mli @@ -35,15 +35,10 @@ will close the channel. *) exception Channel_full -(** [Channel_full] is raised when trying to read on a channel marked - full by the server. It is not possible to read anything else from a - full channel. *) +(** @deprecated Not raised anymore. *) exception Channel_closed -(** [Channel_closed] is raised when reading on a channel and the - server side of the application closed channel ( the server was restarted, - a session was closed, or a stateless channel was garbage collected). - *) +(** @deprecated Not raised anymore. *) val is_active : unit -> [`Active | `Idle | `Inactive] (** [is_active ()] returns the current activity state *) @@ -123,13 +118,14 @@ end val register : ?wake:bool -> 'a Eliom_comet_base.wrapped_channel - -> ('a -> unit Lwt.t) + -> ('a option -> unit Lwt.t) -> unit (** [register ~wake chan callback] registers a callback to be called for new messages from the server. If wake is false, the registration of the channel won't activate the handling loop ( no request will be sent ). Default is - true. - Not thread-safe. *) + true. The callback receives [Some data] for each new messages from the + server and [None] when the server closes the channel or an error occurs. Not + thread-safe. *) val restart : unit -> unit (** [restart ()] Restarts the loop waiting for server messages. It is From e7552ec45f223cf3d1eef19ad44bd1fccbac98ab Mon Sep 17 00:00:00 2001 From: Jules Aguillon Date: Tue, 15 Jul 2025 17:03:42 +0200 Subject: [PATCH 04/10] client: Eliom_comet.Channel: Callback-based API Implement the callback-based API that can be used with values of type [Eliom_comet.Channel.t] that can be passed from the server. --- src/lib/eliom_bus.client.ml | 8 +++-- src/lib/eliom_comet.client.ml | 66 +++++++++++++++++----------------- src/lib/eliom_comet.client.mli | 22 +++++++----- 3 files changed, 52 insertions(+), 44 deletions(-) diff --git a/src/lib/eliom_bus.client.ml b/src/lib/eliom_bus.client.ml index 6f72c0944..3e4c1f834 100644 --- a/src/lib/eliom_bus.client.ml +++ b/src/lib/eliom_bus.client.ml @@ -65,9 +65,11 @@ let comet_register chan = t.consumers <- []; Lwt.return_unit in - Eliom_comet.register chan (function - | Some data -> notify (Some data) - | None -> teardown ()); + let _chan = + Eliom_comet.register_wrapped chan (function + | Some data -> notify (Some data) + | None -> teardown ()) + in t let create service channel waiter = diff --git a/src/lib/eliom_comet.client.ml b/src/lib/eliom_comet.client.ml index ac1ae55a3..6f3e62f44 100644 --- a/src/lib/eliom_comet.client.ml +++ b/src/lib/eliom_comet.client.ml @@ -763,47 +763,53 @@ let check_and_update_position position msg_pos data = true) else false) -(* stateless channels are registered with a position: when a channel is +module Channel = struct + type 'a t = + | C : {hd : _ handler; chan_pos : position; chan_id : string} -> 'a t + + let make hd chan_pos chan_id = C {hd; chan_pos; chan_id} + let wake (C {hd; _}) = Service_handler.activate hd.hd_service_handler + + (* stateless channels are registered with a position: when a channel is registered more than one time, it is possible to receive old messages: the position is used to filter them out. *) -let register' - hd - position - (_ : Ecb.comet_service) - (chan_id : 'a Ecb.chan_id) - callback - = - let chan_id = Ecb.string_of_chan_id chan_id in - register_callback hd (fun (id, pos, data) -> - if id = chan_id && check_and_update_position position pos data - then - match data with - | Ecb.Full | Closed -> callback None - | Data x -> callback (Some (unmarshal x : 'a)) - else Lwt.return_unit); - register_error_callback hd (fun () -> callback None) + let register (C {hd; chan_pos; chan_id}) callback = + register_callback hd (fun (id, pos, data) -> + if id = chan_id && check_and_update_position chan_pos pos data + then + match data with + | Ecb.Full | Closed -> callback None + | Data x -> callback (Some (unmarshal x : 'a)) + else Lwt.return_unit); + register_error_callback hd (fun () -> callback None) +end -let register_stateful ?(wake = true) service chan_id callback = +let register_stateful service chan_id = let hd = get_stateful_hd service in - register' hd No_position service chan_id callback; let chan_id = Ecb.string_of_chan_id chan_id in Service_handler.add_channel_stateful hd.hd_service_handler chan_id; - if wake then Service_handler.activate hd.hd_service_handler + Channel.make hd No_position chan_id -let register_stateless ?(wake = true) service chan_id kind callback = +let register_stateless service chan_id kind = let hd = get_stateless_hd service in - register' hd (position_of_kind kind) service chan_id callback; let chan_id = Ecb.string_of_chan_id chan_id in Service_handler.add_channel_stateless hd.hd_service_handler chan_id kind; - if wake then Service_handler.activate hd.hd_service_handler + Channel.make hd (position_of_kind kind) chan_id -let register ?(wake = true) (wrapped_chan : 'a Ecb.wrapped_channel) callback = +let unwrap (wrapped_chan : 'a Ecb.wrapped_channel) : 'a Channel.t = match wrapped_chan with - | Ecb.Stateful_channel (s, c) -> register_stateful ~wake s c callback - | Ecb.Stateless_channel (s, c, kind) -> - register_stateless ~wake s c kind callback + | Ecb.Stateful_channel (s, c) -> register_stateful s c + | Ecb.Stateless_channel (s, c, kind) -> register_stateless s c kind + +let register_wrapped ?(wake = true) wrapped_chan callback = + let chan = unwrap wrapped_chan in + Channel.register chan callback; + if wake then Channel.wake chan; + chan -let internal_unwrap (wrapped_chan, _unwrapper) = register wrapped_chan +let internal_unwrap (wrapped_chan, _unwrapper) = + let chan = unwrap wrapped_chan in + Channel.wake chan; chan let () = Eliom_unwrap.register_unwrapper Eliom_common.comet_channel_unwrap_id @@ -825,8 +831,4 @@ let is_active () = max (Hashtbl.fold f stateless_handler_table `Active) (fun () -> Hashtbl.fold f stateful_handler_table `Active) -module Channel = struct - type 'a t = 'a Lwt_stream.t -end - let force_link = () diff --git a/src/lib/eliom_comet.client.mli b/src/lib/eliom_comet.client.mli index 596fe73cf..123559595 100644 --- a/src/lib/eliom_comet.client.mli +++ b/src/lib/eliom_comet.client.mli @@ -110,22 +110,26 @@ module Configuration : sig end module Channel : sig - type 'a t = 'a Lwt_stream.t + type 'a t + + val register : 'a t -> ('a option -> unit Lwt.t) -> unit + (** [register chan callback] registers a callback to be called for new messages + from the server. The callback receives [Some data] for each new messages + from the server and [None] when the server closes the channel or an error + occurs. Not thread-safe. *) end (**/**) -val register : +val register_wrapped : ?wake:bool -> 'a Eliom_comet_base.wrapped_channel -> ('a option -> unit Lwt.t) - -> unit -(** [register ~wake chan callback] registers a callback to be called for new - messages from the server. If wake is false, the registration of the channel - won't activate the handling loop ( no request will be sent ). Default is - true. The callback receives [Some data] for each new messages from the - server and [None] when the server closes the channel or an error occurs. Not - thread-safe. *) + -> 'a Channel.t +(** [register_wrapped ~wake chan callback] registers a callback to a wrapped + channel and return a [Channel.t]. If wake is false, the registration of the + channel won't activate the handling loop ( no request will be sent ), + default is true. *) val restart : unit -> unit (** [restart ()] Restarts the loop waiting for server messages. It is From ddcb6bda657e6db8e87426dd7810b184fa72d466 Mon Sep 17 00:00:00 2001 From: Jules Aguillon Date: Tue, 22 Jul 2025 15:22:35 +0200 Subject: [PATCH 05/10] Eliom_bus.client: Remove the intermediate callback list This simplifies the handling of messages in `Eliom_bus` by registering all callbacks directly into the underlying `Eliom_comet`. `Eliom_comet.register_wrapped` is removed in favor of `Eliom_comet.register` in the previous API. This changes requires signaling when the channel should be aware from `Eliom_bus`. --- src/lib/eliom_bus.client.ml | 44 +++++++++++----------------------- src/lib/eliom_comet.client.ml | 9 ++----- src/lib/eliom_comet.client.mli | 20 +++++++++------- 3 files changed, 27 insertions(+), 46 deletions(-) diff --git a/src/lib/eliom_bus.client.ml b/src/lib/eliom_bus.client.ml index 3e4c1f834..e3f89efde 100644 --- a/src/lib/eliom_bus.client.ml +++ b/src/lib/eliom_bus.client.ml @@ -27,11 +27,11 @@ let section = Logs.Src.create "eliom:bus" module Ecb = Eliom_comet_base -type 'a consumers = {mutable consumers : ('a option -> unit Lwt.t) list} - type ('a, 'b) t = - { channel : 'b Ecb.wrapped_channel - ; consumers : 'b consumers Lazy.t + { channel : 'b Eliom_comet.Channel.t + ; mutable channel_awake : bool + (** Whether [Eliom_comet.Channel.wake] was called before. *) + ; wrapped_channel : 'b Ecb.wrapped_channel ; queue : 'a Queue.t ; mutable max_size : int ; write : 'a list -> unit Lwt.t @@ -52,27 +52,7 @@ type ('a, 'att, 'co, 'ext, 'reg) callable_bus_service = , Eliom_registration.Action.return ) Eliom_service.t -(** Register a callback in the underlying comet. *) -let comet_register chan = - let t = {consumers = []} in - let notify data = - Lwt_list.iter_s (fun callback -> callback data) t.consumers - in - let teardown () = - (* Notify that the channel reached its end. Clear the [consumers] list to - avoid memory leaks. *) - let* () = notify None in - t.consumers <- []; - Lwt.return_unit - in - let _chan = - Eliom_comet.register_wrapped chan (function - | Some data -> notify (Some data) - | None -> teardown ()) - in - t - -let create service channel waiter = +let create service wrapped_channel waiter = let write x = Lwt.catch (fun () -> @@ -86,9 +66,10 @@ let create service channel waiter = | Eliom_request.Failed_request 204 -> Lwt.return_unit | exc -> Lwt.reraise exc) in - let consumers = lazy (comet_register channel) in + let channel = Eliom_comet.register ~wake:false wrapped_channel in { channel - ; consumers + ; channel_awake = false + ; wrapped_channel ; queue = Queue.create () ; max_size = 20 ; write @@ -104,8 +85,11 @@ let () = Eliom_unwrap.register_unwrapper Eliom_common.bus_unwrap_id internal_unwrap let register t callback = - let (lazy c) = t.consumers in - c.consumers <- callback :: c.consumers + Eliom_comet.Channel.register t.channel callback; + if not t.channel_awake + then ( + Eliom_comet.Channel.wake t.channel; + t.channel_awake <- true) let stream t = let stream, push = Lwt_stream.create () in @@ -129,7 +113,7 @@ let try_flush t = Lwt.return_unit let write t v = Queue.add v t.queue; try_flush t -let close {channel; _} = Eliom_comet.close channel +let close {wrapped_channel; _} = Eliom_comet.close wrapped_channel let set_queue_size b s = b.max_size <- s let set_time_before_flush b t = diff --git a/src/lib/eliom_comet.client.ml b/src/lib/eliom_comet.client.ml index 6f3e62f44..e8a9b37e1 100644 --- a/src/lib/eliom_comet.client.ml +++ b/src/lib/eliom_comet.client.ml @@ -801,19 +801,14 @@ let unwrap (wrapped_chan : 'a Ecb.wrapped_channel) : 'a Channel.t = | Ecb.Stateful_channel (s, c) -> register_stateful s c | Ecb.Stateless_channel (s, c, kind) -> register_stateless s c kind -let register_wrapped ?(wake = true) wrapped_chan callback = +let register ?(wake = true) wrapped_chan = let chan = unwrap wrapped_chan in - Channel.register chan callback; if wake then Channel.wake chan; chan -let internal_unwrap (wrapped_chan, _unwrapper) = - let chan = unwrap wrapped_chan in - Channel.wake chan; chan - let () = Eliom_unwrap.register_unwrapper Eliom_common.comet_channel_unwrap_id - internal_unwrap + (fun (wrapped_chan, _unwrapper) -> register wrapped_chan) let is_active () = (*VVV Check. Isn't it the contrary? (fold from `Inactive?) *) diff --git a/src/lib/eliom_comet.client.mli b/src/lib/eliom_comet.client.mli index 123559595..522e2122e 100644 --- a/src/lib/eliom_comet.client.mli +++ b/src/lib/eliom_comet.client.mli @@ -117,19 +117,21 @@ module Channel : sig from the server. The callback receives [Some data] for each new messages from the server and [None] when the server closes the channel or an error occurs. Not thread-safe. *) + + (**/**) + + val wake : 'a t -> unit + (** Activate the handling loop, making sure the channel can receive messages. + No request will be sent. *) + + (**/**) end (**/**) -val register_wrapped : - ?wake:bool - -> 'a Eliom_comet_base.wrapped_channel - -> ('a option -> unit Lwt.t) - -> 'a Channel.t -(** [register_wrapped ~wake chan callback] registers a callback to a wrapped - channel and return a [Channel.t]. If wake is false, the registration of the - channel won't activate the handling loop ( no request will be sent ), - default is true. *) +val register : ?wake:bool -> 'a Eliom_comet_base.wrapped_channel -> 'a Channel.t +(** The [~wake] argument controls whether [Channel.wake] is called on the + channel. Default is [true]. *) val restart : unit -> unit (** [restart ()] Restarts the loop waiting for server messages. It is From 5bc98a0983d85e7f4d0141ad991bd9b90280cd97 Mon Sep 17 00:00:00 2001 From: Jules Aguillon Date: Tue, 22 Jul 2025 15:28:23 +0200 Subject: [PATCH 06/10] Eliom_bus.client: Simplify closing With the same intention as the previous commit, this simplifies the implementation of `Eliom_bus`. This removes the internal function `Eliom_comet.close` in favor of `Eliom_comet.Channel.close`, which is much easier to implement. --- src/lib/eliom_bus.client.ml | 4 +--- src/lib/eliom_comet.client.ml | 11 +++-------- src/lib/eliom_comet.client.mli | 10 +++++----- 3 files changed, 9 insertions(+), 16 deletions(-) diff --git a/src/lib/eliom_bus.client.ml b/src/lib/eliom_bus.client.ml index e3f89efde..9982b755d 100644 --- a/src/lib/eliom_bus.client.ml +++ b/src/lib/eliom_bus.client.ml @@ -31,7 +31,6 @@ type ('a, 'b) t = { channel : 'b Eliom_comet.Channel.t ; mutable channel_awake : bool (** Whether [Eliom_comet.Channel.wake] was called before. *) - ; wrapped_channel : 'b Ecb.wrapped_channel ; queue : 'a Queue.t ; mutable max_size : int ; write : 'a list -> unit Lwt.t @@ -69,7 +68,6 @@ let create service wrapped_channel waiter = let channel = Eliom_comet.register ~wake:false wrapped_channel in { channel ; channel_awake = false - ; wrapped_channel ; queue = Queue.create () ; max_size = 20 ; write @@ -113,7 +111,7 @@ let try_flush t = Lwt.return_unit let write t v = Queue.add v t.queue; try_flush t -let close {wrapped_channel; _} = Eliom_comet.close wrapped_channel +let close {channel; _} = Eliom_comet.Channel.close channel let set_queue_size b s = b.max_size <- s let set_time_before_flush b t = diff --git a/src/lib/eliom_comet.client.ml b/src/lib/eliom_comet.client.ml index e8a9b37e1..838372b81 100644 --- a/src/lib/eliom_comet.client.ml +++ b/src/lib/eliom_comet.client.ml @@ -717,14 +717,6 @@ let restart () = Hashtbl.iter f stateless_handler_table; Hashtbl.iter f stateful_handler_table -let close = function - | Ecb.Stateful_channel (chan_service, chan_id) -> - let {hd_service_handler; _} = get_stateful_hd chan_service in - Service_handler.close hd_service_handler (Ecb.string_of_chan_id chan_id) - | Ecb.Stateless_channel (chan_service, chan_id, _kind) -> - let {hd_service_handler; _} = get_stateless_hd chan_service in - Service_handler.close hd_service_handler (Ecb.string_of_chan_id chan_id) - let unmarshal s : 'a = Eliom_unwrap.unwrap (Eliom_lib.Url.decode s) 0 type position_relation = @@ -770,6 +762,9 @@ module Channel = struct let make hd chan_pos chan_id = C {hd; chan_pos; chan_id} let wake (C {hd; _}) = Service_handler.activate hd.hd_service_handler + let close (C {hd; chan_id; _}) = + Service_handler.close hd.hd_service_handler chan_id + (* stateless channels are registered with a position: when a channel is registered more than one time, it is possible to receive old messages: the position is used to filter them out. *) diff --git a/src/lib/eliom_comet.client.mli b/src/lib/eliom_comet.client.mli index 522e2122e..faf574977 100644 --- a/src/lib/eliom_comet.client.mli +++ b/src/lib/eliom_comet.client.mli @@ -124,6 +124,11 @@ module Channel : sig (** Activate the handling loop, making sure the channel can receive messages. No request will be sent. *) + val close : 'a t -> unit + (** [close c] closes the channel c. This function should be only use + internally. The normal way to close a channel is to cancel a thread + waiting on inputs. *) + (**/**) end @@ -141,11 +146,6 @@ val restart : unit -> unit case, preventing client code from receiving the failure notification. This shouldn't be used by average user. *) -val close : 'a Eliom_comet_base.wrapped_channel -> unit -(** [close c] closes the channel c. This function should be only use - internally. The normal way to close a channel is to cancel a thread - waiting on inputs. *) - val force_link : unit val handle_exn : ?exn:exn -> unit -> unit Lwt.t From c43c15a699c516ce4b42a96ccc472145710aa445 Mon Sep 17 00:00:00 2001 From: Jules Aguillon Date: Tue, 22 Jul 2025 16:48:13 +0200 Subject: [PATCH 07/10] Eliom_comet.client: Faster and simpler callback handling Callbacks are no longer an ever-growing list of functions. The changes are: - Callbacks are grouped by channel ID in a Hashtbl. This dramatically reduces the number of callback that need to be considered. - The channel position is stored along side the callback instead of in a closure. This will make writing a `unregister` function easier. - `hd_error_callbacks` is removed. This was needed only to workaround the internal complexity. Some code is moved for ordering. The `position` type and related code are wrapped in a `Position` module to improve readability. --- src/lib/eliom_comet.client.ml | 148 ++++++++++++++++++---------------- 1 file changed, 80 insertions(+), 68 deletions(-) diff --git a/src/lib/eliom_comet.client.ml b/src/lib/eliom_comet.client.ml index 838372b81..d3762f1db 100644 --- a/src/lib/eliom_comet.client.ml +++ b/src/lib/eliom_comet.client.ml @@ -635,42 +635,98 @@ end = struct handle_visibility hd; hd end +module Position = struct + type relation = + | Equal + (* stateless after channels *) + | Greater + (* stateless newest channels *) + + type t = + | No_position (* stateful channels*) + | Position of relation * int option ref + (* stateless channels *) + + let of_kind = function + | Ecb.After_kind i -> Position (Equal, ref (Some i)) + | Ecb.Newest_kind i -> Position (Greater, ref (Some i)) + | Ecb.Last_kind None -> Position (Greater, ref None) + | Ecb.Last_kind (Some _) -> Position (Equal, ref None) + + let check_and_update position msg_pos data = + match position, msg_pos, data with + | No_position, None, _ -> true + | No_position, Some _, _ | Position _, None, Ecb.Data _ -> + raise_error ~section + "check_position: channel kind and message do not match" + | Position _, None, (Ecb.Full | Ecb.Closed) -> true + | Position (relation, r), Some j, _ -> ( + match !r with + | None -> + r := Some (j + 1); + true + | Some i -> + if match relation with Equal -> j = i | Greater -> j >= i + then ( + r := Some (j + 1); + true) + else false) +end + +let unmarshal s : 'a = Eliom_unwrap.unwrap (Eliom_lib.Url.decode s) 0 + +module StringTbl = Hashtbl.Make (String) + +type callback = + | Cb : {c_pos : Position.t; c_callback : 'a option -> unit Lwt.t} -> callback + type 'a handler = { hd_service_handler : 'a Service_handler.t - ; mutable hd_callbacks : - (string * int option * string Ecb.channel_data -> unit Lwt.t) list - ; mutable hd_error_callbacks : (unit -> unit Lwt.t) list } + ; hd_callbacks : callback list StringTbl.t + (** Callbacks are grouped by their channel ID. *) } let wait_data_daemon hd = let on_error _ = (* Notify callbacks of an error and release callbacks to avoid memory leaks. *) - hd.hd_callbacks <- []; - let* () = - Lwt_list.iter_s (fun callback -> callback ()) hd.hd_error_callbacks + let callbacks = + StringTbl.fold + (fun _k cs acc -> List.rev_append cs acc) + hd.hd_callbacks [] in - hd.hd_error_callbacks <- []; - Lwt.return_unit + StringTbl.reset hd.hd_callbacks; + Lwt_list.iter_s (fun (Cb c) -> c.c_callback None) callbacks in - let notify_callbacks data = - Lwt_list.iter_s (fun callback -> callback data) hd.hd_callbacks + let handle_message (id, pos, data) = + match StringTbl.find hd.hd_callbacks id with + | exception Not_found -> Lwt.return_unit + | cs -> + Lwt_list.iter_s + (fun (Cb c) -> + if Position.check_and_update c.c_pos pos data + then + match data with + | Ecb.Full | Closed -> c.c_callback None + | Data x -> c.c_callback (Some (unmarshal x)) + else Lwt.return_unit) + cs in let rec wait_data_loop () = Lwt.try_bind (fun () -> Service_handler.wait_data hd.hd_service_handler) - (fun data -> - let* () = Lwt_list.iter_s notify_callbacks data in + (fun messages -> + let* () = Lwt_list.iter_s handle_message messages in wait_data_loop ()) on_error in Lwt.async wait_data_loop -let register_callback hd callback = - hd.hd_callbacks <- callback :: hd.hd_callbacks - -(** Register a callback for when the channel closes and no more message will be - received for every channel ids. *) -let register_error_callback hd callback = - hd.hd_error_callbacks <- callback :: hd.hd_error_callbacks +let register_callback hd chan_id callback = + let cs = + match StringTbl.find_opt hd.hd_callbacks chan_id with + | Some cs -> cs + | None -> [] + in + StringTbl.replace hd.hd_callbacks chan_id (callback :: cs) let stateful_handler_table : (Ecb.comet_service, Service_handler.stateful handler) Hashtbl.t @@ -684,7 +740,8 @@ let stateless_handler_table : let init (service : Ecb.comet_service) kind table = let hd_service_handler = Service_handler.make service kind in - let hd = {hd_service_handler; hd_callbacks = []; hd_error_callbacks = []} in + let hd_callbacks = StringTbl.create 16 in + let hd = {hd_service_handler; hd_callbacks} in wait_data_daemon hd; Hashtbl.add table service hd; hd @@ -717,47 +774,9 @@ let restart () = Hashtbl.iter f stateless_handler_table; Hashtbl.iter f stateful_handler_table -let unmarshal s : 'a = Eliom_unwrap.unwrap (Eliom_lib.Url.decode s) 0 - -type position_relation = - | Equal - (* stateless after channels *) - | Greater -(* stateless newest channels *) - -type position = - | No_position (* stateful channels*) - | Position of position_relation * int option ref -(* stateless channels *) - -let position_of_kind = function - | Ecb.After_kind i -> Position (Equal, ref (Some i)) - | Ecb.Newest_kind i -> Position (Greater, ref (Some i)) - | Ecb.Last_kind None -> Position (Greater, ref None) - | Ecb.Last_kind (Some _) -> Position (Equal, ref None) - -let check_and_update_position position msg_pos data = - match position, msg_pos, data with - | No_position, None, _ -> true - | No_position, Some _, _ | Position _, None, Ecb.Data _ -> - raise_error ~section - "check_position: channel kind and message do not match" - | Position _, None, (Ecb.Full | Ecb.Closed) -> true - | Position (relation, r), Some j, _ -> ( - match !r with - | None -> - r := Some (j + 1); - true - | Some i -> - if match relation with Equal -> j = i | Greater -> j >= i - then ( - r := Some (j + 1); - true) - else false) - module Channel = struct type 'a t = - | C : {hd : _ handler; chan_pos : position; chan_id : string} -> 'a t + | C : {hd : _ handler; chan_pos : Position.t; chan_id : string} -> 'a t let make hd chan_pos chan_id = C {hd; chan_pos; chan_id} let wake (C {hd; _}) = Service_handler.activate hd.hd_service_handler @@ -769,14 +788,7 @@ module Channel = struct registered more than one time, it is possible to receive old messages: the position is used to filter them out. *) let register (C {hd; chan_pos; chan_id}) callback = - register_callback hd (fun (id, pos, data) -> - if id = chan_id && check_and_update_position chan_pos pos data - then - match data with - | Ecb.Full | Closed -> callback None - | Data x -> callback (Some (unmarshal x : 'a)) - else Lwt.return_unit); - register_error_callback hd (fun () -> callback None) + register_callback hd chan_id (Cb {c_pos = chan_pos; c_callback = callback}) end let register_stateful service chan_id = @@ -789,7 +801,7 @@ let register_stateless service chan_id kind = let hd = get_stateless_hd service in let chan_id = Ecb.string_of_chan_id chan_id in Service_handler.add_channel_stateless hd.hd_service_handler chan_id kind; - Channel.make hd (position_of_kind kind) chan_id + Channel.make hd (Position.of_kind kind) chan_id let unwrap (wrapped_chan : 'a Ecb.wrapped_channel) : 'a Channel.t = match wrapped_chan with From 65507235195f228cfa23e89b48256c0339656fe9 Mon Sep 17 00:00:00 2001 From: Jules Aguillon Date: Tue, 22 Jul 2025 17:15:02 +0200 Subject: [PATCH 08/10] Callback unregistration in Eliom_comet.client and Eliom_bus.client Add `Eliom_comet.Channel.unregister` and `Eliom_bus.unregister` to unregister callbacks. --- src/lib/eliom_bus.client.ml | 11 ++++++++--- src/lib/eliom_bus.client.mli | 10 +++++++++- src/lib/eliom_comet.client.ml | 27 ++++++++++++++++++++++++--- src/lib/eliom_comet.client.mli | 10 +++++++++- 4 files changed, 50 insertions(+), 8 deletions(-) diff --git a/src/lib/eliom_bus.client.ml b/src/lib/eliom_bus.client.ml index 9982b755d..8cc4db046 100644 --- a/src/lib/eliom_bus.client.ml +++ b/src/lib/eliom_bus.client.ml @@ -37,6 +37,8 @@ type ('a, 'b) t = ; mutable waiter : unit -> unit Lwt.t ; mutable last_wait : unit Lwt.t } +type callback_id = Eliom_comet.Channel.callback_id + type ('a, 'att, 'co, 'ext, 'reg) callable_bus_service = ( unit , 'a list @@ -83,15 +85,18 @@ let () = Eliom_unwrap.register_unwrapper Eliom_common.bus_unwrap_id internal_unwrap let register t callback = - Eliom_comet.Channel.register t.channel callback; + let callback_id = Eliom_comet.Channel.register t.channel callback in if not t.channel_awake then ( Eliom_comet.Channel.wake t.channel; - t.channel_awake <- true) + t.channel_awake <- true); + callback_id + +let unregister t id = Eliom_comet.Channel.unregister t.channel id let stream t = let stream, push = Lwt_stream.create () in - register t (fun data -> push data; Lwt.return_unit); + let _ = register t (fun data -> push data; Lwt.return_unit) in stream let original_stream = stream diff --git a/src/lib/eliom_bus.client.mli b/src/lib/eliom_bus.client.mli index 40f5c8ba6..f3b57506e 100644 --- a/src/lib/eliom_bus.client.mli +++ b/src/lib/eliom_bus.client.mli @@ -25,12 +25,20 @@ type ('a, 'b) t -val register : ('a, 'b) t -> ('b option -> unit Lwt.t) -> unit +type callback_id +(** Handler returned by {!register} that allows unregistering a callback later + with {!unregister}. *) + +val register : ('a, 'b) t -> ('b option -> unit Lwt.t) -> callback_id (** Register a callback that will get called on every messages from the server. Messages received before the call to [register] are lost. The callback is called with [Some data] when receiving a message from the server or with [None] when no more data will be received. *) +val unregister : ('a, 'b) t -> callback_id -> unit +(** Unregister a callback previously registered with {!register}, which will + stop receiving new messages. No-op if the callback was unregistered before. *) + val stream : ('a, 'b) t -> 'b Lwt_stream.t (** Create a new stream from the messages from the server. This has the same behavior as {!register}. *) diff --git a/src/lib/eliom_comet.client.ml b/src/lib/eliom_comet.client.ml index d3762f1db..0c4a31ed0 100644 --- a/src/lib/eliom_comet.client.ml +++ b/src/lib/eliom_comet.client.ml @@ -678,7 +678,11 @@ let unmarshal s : 'a = Eliom_unwrap.unwrap (Eliom_lib.Url.decode s) 0 module StringTbl = Hashtbl.Make (String) type callback = - | Cb : {c_pos : Position.t; c_callback : 'a option -> unit Lwt.t} -> callback + | Cb : + { c_id : int (** Unique identifier used for unregistration. *) + ; c_pos : Position.t + ; c_callback : 'a option -> unit Lwt.t } + -> callback type 'a handler = { hd_service_handler : 'a Service_handler.t @@ -728,6 +732,13 @@ let register_callback hd chan_id callback = in StringTbl.replace hd.hd_callbacks chan_id (callback :: cs) +let unregister_callback hd chan_id id = + try + StringTbl.find hd.hd_callbacks chan_id + |> List.filter (fun (Cb c) -> c.c_id <> id) + |> StringTbl.replace hd.hd_callbacks chan_id + with Not_found -> () + let stateful_handler_table : (Ecb.comet_service, Service_handler.stateful handler) Hashtbl.t = @@ -778,6 +789,12 @@ module Channel = struct type 'a t = | C : {hd : _ handler; chan_pos : Position.t; chan_id : string} -> 'a t + type callback_id = int + + let next_callback_id = + let i = ref 0 in + fun () -> incr i; !i + let make hd chan_pos chan_id = C {hd; chan_pos; chan_id} let wake (C {hd; _}) = Service_handler.activate hd.hd_service_handler @@ -787,8 +804,12 @@ module Channel = struct (* stateless channels are registered with a position: when a channel is registered more than one time, it is possible to receive old messages: the position is used to filter them out. *) - let register (C {hd; chan_pos; chan_id}) callback = - register_callback hd chan_id (Cb {c_pos = chan_pos; c_callback = callback}) + let register (C {hd; chan_pos = c_pos; chan_id}) c_callback = + let c_id = next_callback_id () in + register_callback hd chan_id (Cb {c_id; c_pos; c_callback}); + c_id + + let unregister (C {hd; chan_id; _}) id = unregister_callback hd chan_id id end let register_stateful service chan_id = diff --git a/src/lib/eliom_comet.client.mli b/src/lib/eliom_comet.client.mli index faf574977..2fcb0fb1c 100644 --- a/src/lib/eliom_comet.client.mli +++ b/src/lib/eliom_comet.client.mli @@ -112,12 +112,20 @@ end module Channel : sig type 'a t - val register : 'a t -> ('a option -> unit Lwt.t) -> unit + type callback_id + (** Handler returned by {!register} that allows unregistering a callback + later with {!unregister}. *) + + val register : 'a t -> ('a option -> unit Lwt.t) -> callback_id (** [register chan callback] registers a callback to be called for new messages from the server. The callback receives [Some data] for each new messages from the server and [None] when the server closes the channel or an error occurs. Not thread-safe. *) + val unregister : 'a t -> callback_id -> unit + (** Unregister a callback previously registered with {!register}, which will + stop receiving new messages. No-op if the callback was unregistered before. *) + (**/**) val wake : 'a t -> unit From ba7f62e10669fb0e8bc80b52001770b8f8e1faf2 Mon Sep 17 00:00:00 2001 From: Jules Aguillon Date: Thu, 24 Jul 2025 11:34:24 +0200 Subject: [PATCH 09/10] Eliom_comet.client: Expose `Channel.close` There were no way to close a channel when using `Eliom_comet` alone. This exposes the `close` function, which was internally used by `Eliom_bus`. It's also changed to make sure that registered callbacks are unregistered to avoid memory leaks. --- src/lib/eliom_comet.client.ml | 6 +++++- src/lib/eliom_comet.client.mli | 9 ++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/lib/eliom_comet.client.ml b/src/lib/eliom_comet.client.ml index 0c4a31ed0..65754654d 100644 --- a/src/lib/eliom_comet.client.ml +++ b/src/lib/eliom_comet.client.ml @@ -739,6 +739,9 @@ let unregister_callback hd chan_id id = |> StringTbl.replace hd.hd_callbacks chan_id with Not_found -> () +let unregister_all_callbacks hd chan_id = + StringTbl.remove hd.hd_callbacks chan_id + let stateful_handler_table : (Ecb.comet_service, Service_handler.stateful handler) Hashtbl.t = @@ -799,7 +802,8 @@ module Channel = struct let wake (C {hd; _}) = Service_handler.activate hd.hd_service_handler let close (C {hd; chan_id; _}) = - Service_handler.close hd.hd_service_handler chan_id + Service_handler.close hd.hd_service_handler chan_id; + unregister_all_callbacks hd chan_id (* stateless channels are registered with a position: when a channel is registered more than one time, it is possible to receive old messages: the diff --git a/src/lib/eliom_comet.client.mli b/src/lib/eliom_comet.client.mli index 2fcb0fb1c..563fbb90e 100644 --- a/src/lib/eliom_comet.client.mli +++ b/src/lib/eliom_comet.client.mli @@ -126,17 +126,16 @@ module Channel : sig (** Unregister a callback previously registered with {!register}, which will stop receiving new messages. No-op if the callback was unregistered before. *) + val close : 'a t -> unit + (** Unregister all callbacks associated to the given channel and close it. + The channel will not receive any more messages. *) + (**/**) val wake : 'a t -> unit (** Activate the handling loop, making sure the channel can receive messages. No request will be sent. *) - val close : 'a t -> unit - (** [close c] closes the channel c. This function should be only use - internally. The normal way to close a channel is to cancel a thread - waiting on inputs. *) - (**/**) end From 37b86bfe6006116cc425b9e7f0565dd7af589ca3 Mon Sep 17 00:00:00 2001 From: Jules Aguillon Date: Thu, 24 Jul 2025 11:49:48 +0200 Subject: [PATCH 10/10] Compatibility with OCaml 4 --- src/lib/eliom_comet.client.ml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/lib/eliom_comet.client.ml b/src/lib/eliom_comet.client.ml index 65754654d..ce51852d9 100644 --- a/src/lib/eliom_comet.client.ml +++ b/src/lib/eliom_comet.client.ml @@ -675,7 +675,12 @@ end let unmarshal s : 'a = Eliom_unwrap.unwrap (Eliom_lib.Url.decode s) 0 -module StringTbl = Hashtbl.Make (String) +module StringTbl = Hashtbl.Make (struct + type t = string + + let equal = ( = ) + let hash = Hashtbl.hash + end) type callback = | Cb :