diff --git a/dune-project b/dune-project index 5559fd9..622daf9 100644 --- a/dune-project +++ b/dune-project @@ -2,6 +2,7 @@ (name ocsipersist) (generate_opam_files true) +(cram enable) (maintainers "Jan Rochel ") (authors "Ocsigen team ") @@ -14,7 +15,7 @@ (description "This is an virtual library defining a unified frontend for a number of key-value storage implementations. Implementations of the following backends currently exist: DBM, PostgreSQL, SQLite.") (depends (ocsipersist-lib (= :version)) - (lwt (>= 4.2.0))) + eio) (conflicts (ocsipersist-dbm (<> :version)) (ocsipersist-pgsql (<> :version)) @@ -27,7 +28,7 @@ (depends (ocsipersist (= :version)) (lwt (>= 4.2.0)) - lwt_log + logs dbm)) (package @@ -44,7 +45,7 @@ (synopsis "Persistent key/value storage for OCaml - support library") (description "This library defines signatures and auxiliary tools for defining backends for the Ocsipersist frontent. Ocsipersist is used pervasively in Eliom/Ocsigen to handle sessions and references. It can be used as an extension for ocsigenserver or as a library. Implementations of the following backends currently exist: DBM, PostgreSQL, SQLite.") (depends - (lwt (>= 4.2.0)))) + eio)) (package (name ocsipersist-pgsql) @@ -54,7 +55,7 @@ (ocaml (>= 4.08)) (ocsipersist (= :version)) (lwt (>= 4.2.0)) - lwt_log + logs pgocaml)) (package @@ -71,8 +72,8 @@ (synopsis "Persistent key/value storage for OCaml using SQLite") (description "This library provides a SQLite backend for the unified key/value storage frontend as defined in the ocsipersist package.") (depends - (lwt (>= 4.2.0)) - lwt_log + eio + logs ocsipersist sqlite3)) diff --git a/ocsipersist-dbm.opam b/ocsipersist-dbm.opam index 4830e34..a322fc9 100644 --- a/ocsipersist-dbm.opam +++ b/ocsipersist-dbm.opam @@ -12,7 +12,7 @@ depends: [ "dune" {>= "2.8"} "ocsipersist" {= version} "lwt" {>= "4.2.0"} - "lwt_log" + "logs" "dbm" "odoc" {with-doc} ] diff --git a/ocsipersist-lib.opam b/ocsipersist-lib.opam index f78a76e..f3ce9fd 100644 --- a/ocsipersist-lib.opam +++ b/ocsipersist-lib.opam @@ -10,7 +10,7 @@ homepage: "https://github.com/ocsigen/ocsipersist" bug-reports: "https://github.com/ocsigen/ocsipersist/issues" depends: [ "dune" {>= "2.8"} - "lwt" {>= "4.2.0"} + "eio" "odoc" {with-doc} ] build: [ diff --git a/ocsipersist-pgsql.opam b/ocsipersist-pgsql.opam index b5131d6..046d620 100644 --- a/ocsipersist-pgsql.opam +++ b/ocsipersist-pgsql.opam @@ -13,7 +13,7 @@ depends: [ "ocaml" {>= "4.08"} "ocsipersist" {= version} "lwt" {>= "4.2.0"} - "lwt_log" + "logs" "pgocaml" "odoc" {with-doc} ] diff --git a/ocsipersist-sqlite.opam b/ocsipersist-sqlite.opam index 6beaceb..c91fc3b 100644 --- a/ocsipersist-sqlite.opam +++ b/ocsipersist-sqlite.opam @@ -10,8 +10,8 @@ homepage: "https://github.com/ocsigen/ocsipersist" bug-reports: "https://github.com/ocsigen/ocsipersist/issues" depends: [ "dune" {>= "2.8"} - "lwt" {>= "4.2.0"} - "lwt_log" + "eio" + "logs" "ocsipersist" "sqlite3" "odoc" {with-doc} diff --git a/ocsipersist.opam b/ocsipersist.opam index 1a8538b..8add02b 100644 --- a/ocsipersist.opam +++ b/ocsipersist.opam @@ -11,7 +11,7 @@ bug-reports: "https://github.com/ocsigen/ocsipersist/issues" depends: [ "dune" {>= "2.8"} "ocsipersist-lib" {= version} - "lwt" {>= "4.2.0"} + "eio" "odoc" {with-doc} ] conflicts: [ diff --git a/src/dbm/dune b/src/dbm/dune index 4129756..e1b0e6a 100644 --- a/src/dbm/dune +++ b/src/dbm/dune @@ -18,7 +18,7 @@ ocsipersist_settings) (libraries dbm - lwt_log + logs ocsipersist_dbmtypes ocsipersist_lib ocsipersist_dbm_settings)) diff --git a/src/dbm/ocsidbm.ml b/src/dbm/ocsidbm.ml index ab90276..b5587ab 100644 --- a/src/dbm/ocsidbm.ml +++ b/src/dbm/ocsidbm.ml @@ -1,3 +1,5 @@ +open Eio.Std + (* Ocsigen * http://www.ocsigen.org * Module ocsidbm.ml @@ -22,7 +24,6 @@ open Dbm open Ocsidbmtypes -open Lwt.Infix let directory = Sys.argv.(1) @@ -47,10 +48,10 @@ let errlog s = (** Internal functions: storage in files using DBM *) module Tableoftables = Map.Make (struct - type t = string + type t = string - let compare = compare - end) + let compare = compare +end) let tableoftables = ref Tableoftables.empty @@ -65,25 +66,27 @@ let list_tables () = let rec aux () = try let n = Unix.readdir d in - if Filename.check_suffix n suffix - then Filename.chop_extension n :: aux () - else if Filename.check_suffix n (suffix ^ ".pag") - (* depending on the version of dbm, there may be a .pag suffix *) + if Filename.check_suffix n suffix then Filename.chop_extension n :: aux () + else if + Filename.check_suffix n (suffix ^ ".pag") + (* depending on the version of dbm, there may be a .pag suffix *) then Filename.chop_extension (Filename.chop_extension n) :: aux () else aux () - with End_of_file -> Unix.closedir d; [] + with End_of_file -> + Unix.closedir d; + [] in aux () (* try to create the directory if it does not exist *) -let _ = - try Unix.access directory [Unix.R_OK; Unix.W_OK; Unix.X_OK; Unix.F_OK] with +let () = + try Unix.access directory [ Unix.R_OK; Unix.W_OK; Unix.X_OK; Unix.F_OK ] with | Unix.Unix_error (Unix.ENOENT, _, _) -> ( - try Unix.mkdir directory 0o750 - with Unix.Unix_error (error, _, _) -> - failwith - (Printf.sprintf "Ocsidbm: can't create directory %s: %s" directory - (Unix.error_message error))) + try Unix.mkdir directory 0o750 + with Unix.Unix_error (error, _, _) -> + failwith + (Printf.sprintf "Ocsidbm: can't create directory %s: %s" directory + (Unix.error_message error))) | Unix.Unix_error (error, _, _) -> failwith (Printf.sprintf "Ocsidbm: can't access directory %s: %s" directory @@ -91,14 +94,14 @@ let _ = let open_db name = let t = - opendbm (directory ^ "/" ^ name ^ suffix) [Dbm_rdwr; Dbm_create] 0o640 + opendbm (directory ^ "/" ^ name ^ suffix) [ Dbm_rdwr; Dbm_create ] 0o640 in tableoftables := Tableoftables.add name t !tableoftables; t let open_db_if_exists name = try - let t = opendbm (directory ^ "/" ^ name ^ suffix) [Dbm_rdwr] 0o640 in + let t = opendbm (directory ^ "/" ^ name ^ suffix) [ Dbm_rdwr ] 0o640 in tableoftables := Tableoftables.add name t !tableoftables; t with Unix.Unix_error (Unix.ENOENT, _, _) | Dbm.Dbm_error _ -> @@ -136,11 +139,13 @@ let db_nextkey t = Dbm.nextkey (find_dont_create_table t) let db_length t = let table = find_dont_create_table t in let rec aux f n = - Lwt.catch - (fun () -> - ignore (f table); - Lwt.pause () >>= fun () -> aux Dbm.nextkey (n + 1)) - (function Not_found -> Lwt.return n | e -> Lwt.fail e) + try + ignore (f table : string); + Fiber.yield (); + aux Dbm.nextkey (n + 1) + with + | Not_found -> n + | e -> raise e in aux Dbm.firstkey 0 (* Because of Dbm implementation, the result may be less than the expected @@ -158,113 +163,155 @@ let the_end i = exit i open Sys let sigs = - [ sigabrt - ; sigalrm - ; sigfpe - ; sighup - ; sigill - ; sigint - ; sigquit - ; sigsegv - ; sigterm - ; sigusr1 - ; sigusr2 - ; sigchld - ; sigttin - ; sigttou - ; sigvtalrm - ; sigprof ] - -let _ = List.iter (fun s -> Sys.set_signal s (Signal_handle (close_all 0))) sigs -let _ = Sys.set_signal Sys.sigpipe Sys.Signal_ignore -let _ = Unix.setsid () + [ + sigabrt; + sigalrm; + sigfpe; + sighup; + sigill; + sigint; + sigquit; + sigsegv; + sigterm; + sigusr1; + sigusr2; + sigchld; + sigttin; + sigttou; + sigvtalrm; + sigprof; + ] + +let () = + List.iter (fun s -> Sys.set_signal s (Signal_handle (close_all 0))) sigs + +let () = Sys.set_signal Sys.sigpipe Sys.Signal_ignore +let _ : int = Unix.setsid () (*****************************************************************************) (** Communication functions: *) -let send outch v = Lwt_io.write_value outch v >>= fun () -> Lwt_io.flush outch +let send outch v = + Marshal.to_channel outch v; + Eio.Buf_write.flush outch let execute outch = let handle_errors f = try f () with e -> send outch (Error e) in function | Get (t, k) -> handle_errors (fun () -> - try send outch (Value (db_get t k)) - with Not_found -> send outch Dbm_not_found) - | Remove (t, k) -> handle_errors (fun () -> db_remove t k; send outch Ok) + try send outch (Value (db_get t k)) + with Not_found -> send outch Dbm_not_found) + | Remove (t, k) -> + handle_errors (fun () -> + db_remove t k; + send outch Ok) | Replace (t, k, v) -> - handle_errors (fun () -> db_replace t k v; send outch Ok) - | Replace_if_exists (t, k, v) -> handle_errors (fun () -> - try - ignore (db_get t k); db_replace t k v; - send outch Ok - with Not_found -> send outch Dbm_not_found) + send outch Ok) + | Replace_if_exists (t, k, v) -> + handle_errors (fun () -> + try + ignore (db_get t k : string); + db_replace t k v; + send outch Ok + with Not_found -> send outch Dbm_not_found) | Firstkey t -> handle_errors (fun () -> - try send outch (Key (db_firstkey t)) with Not_found -> send outch End) + try send outch (Key (db_firstkey t)) + with Not_found -> send outch End) | Nextkey t -> handle_errors (fun () -> - try send outch (Key (db_nextkey t)) with Not_found -> send outch End) + try send outch (Key (db_nextkey t)) with Not_found -> send outch End) | Length t -> handle_errors (fun () -> - Lwt.catch - (fun () -> - db_length t >>= fun i -> - send outch (Value (Marshal.to_string i []))) - (function - | Not_found -> send outch Dbm_not_found | e -> send outch (Error e))) + try + let i = db_length t in + send outch (Value (Marshal.to_string i [])) + with + | Not_found -> send outch Dbm_not_found + | e -> send outch (Error e)) let nb_clients = ref 0 let rec listen_client inch outch = - Lwt_io.read_value inch >>= fun v -> - execute outch v >>= fun () -> listen_client inch outch + let v = Marshal.from_channel inch in + execute outch v; + listen_client inch outch let finish _ = nb_clients := !nb_clients - 1; - if !nb_clients = 0 then close_all 0 (); - Lwt.return () + if !nb_clients = 0 then close_all 0 () let b = ref false let rec loop socket = - Lwt_unix.accept socket >>= fun (indescr, _) -> - ignore - (b := true; - nb_clients := !nb_clients + 1; - let inch = Lwt_io.of_fd ~mode:Lwt_io.input indescr in - let outch = Lwt_io.of_fd ~mode:Lwt_io.output indescr in - Lwt.catch (fun () -> listen_client inch outch >>= finish) finish); + let indescr, _ = + Unix.accept + (* TODO: lwt-to-direct-style: This call to [Unix.accept] was [Lwt_unix.accept] before. It's now blocking. *) + socket + in + Fiber.fork + ~sw:(Stdlib.Option.get (Fiber.get Ocsipersist_lib.current_switch)) + (fun () -> + b := true; + nb_clients := !nb_clients + 1; + let inch = + Eio.Buf_read.of_flow ~max_size:1_000_000 + (Eio_unix.Net.import_socket_stream + ~sw:(Stdlib.Option.get (Fiber.get Ocsipersist_lib.current_switch)) + ~close_unix:true indescr + : [ `R | `Flow | `Close ] r) + in + let outch = + Eio.Buf_write.with_flow + (Eio_unix.Net.import_socket_stream + ~sw:(Stdlib.Option.get (Fiber.get Ocsipersist_lib.current_switch)) + ~close_unix:true + (* TODO: lwt-to-direct-style: Write operations to buffered IO should be moved inside [with_flow]. *) + indescr + : [ `W | `Flow | `Close ] r) + (fun outbuf -> `Move_writing_code_here) + in + try finish (listen_client inch outch) with v -> finish v); loop socket -let _ = - Lwt_main.run - (let socket = Lwt_unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 in - Lwt.catch - (fun () -> - Lwt_unix.bind socket (Unix.ADDR_UNIX (directory ^ "/" ^ socketname))) - (fun _exn -> - errlog - ("Please make sure that the directory " ^ directory - ^ " exists, writable for ocsidbm, and no other ocsidbm process is running on the same directory. If not, remove the file " - ^ directory ^ "/" ^ socketname); - the_end 1) - >>= fun () -> - Lwt_unix.listen socket 20; - (* Done in ocsipersist.ml +let () = + Eio_main.run (fun env -> + Fiber.with_binding Ocsipersist_lib.env env (fun () -> + Switch.run (fun sw -> + Fiber.with_binding Ocsipersist_lib.current_switch sw (fun () -> + (* TODO: lwt-to-direct-style: [Eio_main.run] argument used to be a [Lwt] promise and is now a [fun]. Make sure no asynchronous or IO calls are done outside of this [fun]. *) + let socket = Unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 in + (try + Unix.bind + (* TODO: lwt-to-direct-style: This call to [Unix.bind] was [Lwt_unix.bind] before. It's now blocking. *) + socket + (Unix.ADDR_UNIX (directory ^ "/" ^ socketname)) + with _exn -> + errlog + ("Please make sure that the directory " ^ directory + ^ " exists, writable for ocsidbm, and no other ocsidbm \ + process is running on the same directory. If not, \ + remove the file " ^ directory ^ "/" ^ socketname); + the_end 1); + (* Done in ocsipersist.ml let devnull = Unix.openfile "/dev/null" [Unix.O_WRONLY] 0 in Unix.dup2 devnull Unix.stdout; Unix.dup2 devnull Unix.stderr; Unix.close devnull; Unix.close Unix.stdin; *) - ignore - ( Lwt_unix.sleep 4.1 >>= fun () -> - if not !b then close_all 0 (); - Lwt.return () ); - (* If nothing happened during 5 seconds, I quit *) - loop socket) + Unix.listen socket 20; + Fiber.fork + ~sw: + (Stdlib.Option.get + (Fiber.get Ocsipersist_lib.current_switch)) + (fun () -> + Eio_unix.sleep 4.1; + if not !b then close_all 0 ()); + (* If nothing happened during 5 seconds, I quit *) + loop socket)))) (*****************************************************************************) (** Garbage collection of expired data *) diff --git a/src/dbm/ocsipersist.ml b/src/dbm/ocsipersist.ml index c03ecb7..e120f11 100644 --- a/src/dbm/ocsipersist.ml +++ b/src/dbm/ocsipersist.ml @@ -1,11 +1,12 @@ +open Eio.Std + (* FIX: the log file is never reopened *) open Ocsidbmtypes -open Lwt.Infix module type TABLE = Ocsipersist_lib.Sigs.TABLE -let section = Lwt_log.Section.make "ocsigen:ocsipersist:dbm" +let section = Logs.Src.create "ocsigen:ocsipersist:dbm" exception Ocsipersist_error @@ -19,116 +20,123 @@ end module Db = struct let try_connect sname = - Lwt.catch - (fun () -> - let socket = Lwt_unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 in - Lwt_unix.connect socket (Unix.ADDR_UNIX sname) >>= fun () -> - Lwt.return socket) - (fun _ -> - Lwt_log.ign_warning_f ~section - "Launching a new Ocsidbm process: %s on directory %s." - !Config.ocsidbm !Config.directory; - let param = [|!Config.ocsidbm; !Config.directory|] in - let child () = - let log = - Unix.openfile !Config.error_log_path - [Unix.O_WRONLY; Unix.O_CREAT; Unix.O_APPEND] - 0o640 - in - Unix.dup2 log Unix.stderr; - Unix.close log; - let devnull = Unix.openfile "/dev/null" [Unix.O_WRONLY] 0 in - Unix.dup2 devnull Unix.stdout; - Unix.close devnull; - Unix.close Unix.stdin; - Unix.execvp !Config.ocsidbm param - in - let pid = Lwt_unix.fork () in - if pid = 0 - then - if (* double fork *) - Lwt_unix.fork () = 0 - then child () - else Aux.sys_exit 0 - else - Lwt_unix.waitpid [] pid >>= fun _ -> - Lwt_unix.sleep 1.1 >>= fun () -> - let socket = Lwt_unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 in - Lwt_unix.connect socket (Unix.ADDR_UNIX sname) >>= fun () -> - Lwt.return socket) + try + let socket = Unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 in + Unix.connect + (* TODO: lwt-to-direct-style: This call to [Unix.connect] was [Lwt_unix.connect] before. It's now blocking. *) + socket (Unix.ADDR_UNIX sname); + socket + with _ -> + Logs.warn ~src:section (fun fmt -> + fmt "Launching a new Ocsidbm process: %s on directory %s." + !Config.ocsidbm !Config.directory); + let param = [| !Config.ocsidbm; !Config.directory |] in + let child () = + let log = + Unix.openfile !Config.error_log_path + [ Unix.O_WRONLY; Unix.O_CREAT; Unix.O_APPEND ] + 0o640 + in + Unix.dup2 log Unix.stderr; + Unix.close log; + let devnull = Unix.openfile "/dev/null" [ Unix.O_WRONLY ] 0 in + Unix.dup2 devnull Unix.stdout; + Unix.close devnull; + Unix.close Unix.stdin; + Unix.execvp !Config.ocsidbm param + in + let pid = Lwt_unix.fork () in + if pid = 0 then + if + (* double fork *) + Lwt_unix.fork () = 0 + then child () + else Aux.sys_exit 0 + else + let _ = Unix.waitpid [] pid in + Eio_unix.sleep 1.1; + let socket = Unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 in + Unix.connect + (* TODO: lwt-to-direct-style: This call to [Unix.connect] was [Lwt_unix.connect] before. It's now blocking. *) + socket (Unix.ADDR_UNIX sname); + socket let rec get_indescr i = - Lwt.catch - (fun () -> try_connect (!Config.directory ^ "/" ^ socketname)) - (fun e -> - if i = 0 - then ( - Lwt_log.ign_error_f ~section - "Cannot connect to Ocsidbm. Will continue without persistent session support. Error message is: %s .Have a look at the logs to see if there is an error message from the Ocsidbm process." - (match e with - | Unix.Unix_error (a, b, c) -> - Printf.sprintf "%a in %s(%s)" - (fun () -> Unix.error_message) - a b c - | _ -> Printexc.to_string e); - Lwt.fail e) - else Lwt_unix.sleep 2.1 >>= fun () -> get_indescr (i - 1)) + try try_connect (!Config.directory ^ "/" ^ socketname) + with e -> + if i = 0 then ( + Logs.err ~src:section (fun fmt -> + fmt + "Cannot connect to Ocsidbm. Will continue without persistent \ + session support. Error message is: %s .Have a look at the logs \ + to see if there is an error message from the Ocsidbm process." + (match e with + | Unix.Unix_error (a, b, c) -> + Printf.sprintf "%a in %s(%s)" + (fun () -> Unix.error_message) + a b c + | _ -> Printexc.to_string e)); + raise e) + else ( + Eio_unix.sleep 2.1; + get_indescr (i - 1)) let send = - let previous = ref (Lwt.return Ok) in + let previous = ref Ok in fun v -> - Lwt.catch (fun () -> !previous) (fun _ -> Lwt.return Ok) >>= fun _ -> - !Config.inch >>= fun inch -> - !Config.outch >>= fun outch -> - (previous := - Lwt_io.write_value outch v >>= fun () -> - Lwt_io.flush outch >>= fun () -> Lwt_io.read_value inch); + let _ = try !previous with _ -> Ok in + let inch = !Config.inch in + let outch = !Config.outch in + previous := + (Marshal.to_channel outch v; + Eio.Buf_write.flush outch; + Marshal.from_channel inch); !previous let get (store, name) = - send (Get (store, name)) >>= function - | Value v -> Lwt.return v - | Dbm_not_found -> Lwt.fail Not_found - | Error e -> Lwt.fail e - | _ -> Lwt.fail Ocsipersist_error + match send (Get (store, name)) with + | Value v -> v + | Dbm_not_found -> raise Not_found + | Error e -> raise e + | _ -> raise Ocsipersist_error let remove (store, name) = - send (Remove (store, name)) >>= function - | Ok -> Lwt.return () - | Error e -> Lwt.fail e - | _ -> Lwt.fail Ocsipersist_error + match send (Remove (store, name)) with + | Ok -> () + | Error e -> raise e + | _ -> raise Ocsipersist_error let replace (store, name) value = - send (Replace (store, name, value)) >>= function - | Ok -> Lwt.return () - | Error e -> Lwt.fail e - | _ -> Lwt.fail Ocsipersist_error + match send (Replace (store, name, value)) with + | Ok -> () + | Error e -> raise e + | _ -> raise Ocsipersist_error let replace_if_exists (store, name) value = - send (Replace_if_exists (store, name, value)) >>= function - | Ok -> Lwt.return () - | Dbm_not_found -> Lwt.fail Not_found - | Error e -> Lwt.fail e - | _ -> Lwt.fail Ocsipersist_error + match send (Replace_if_exists (store, name, value)) with + | Ok -> () + | Dbm_not_found -> raise Not_found + | Error e -> raise e + | _ -> raise Ocsipersist_error let firstkey store = - send (Firstkey store) >>= function - | Key k -> Lwt.return (Some k) - | Error e -> Lwt.fail e - | _ -> Lwt.return None + match send (Firstkey store) with + | Key k -> Some k + | Error e -> raise e + | _ -> None let nextkey store = - send (Nextkey store) >>= function - | Key k -> Lwt.return (Some k) - | Error e -> Lwt.fail e - | _ -> Lwt.return None + match send (Nextkey store) with + | Key k -> Some k + | Error e -> raise e + | _ -> None let length store = - send (Length store) >>= function - | Value v -> Lwt.return (Marshal.from_string v 0) - | Dbm_not_found -> Lwt.return 0 - | Error e -> Lwt.fail e - | _ -> Lwt.fail Ocsipersist_error + match send (Length store) with + | Value v -> Marshal.from_string v 0 + | Dbm_not_found -> 0 + | Error e -> raise e + | _ -> raise Ocsipersist_error end module Store = struct @@ -137,28 +145,30 @@ module Store = struct type 'a t = store * string (** Type of persistent data *) - let open_store name = Lwt.return name + let open_store name = name let make_persistent_lazy_lwt ~store ~name ~default = - let pvname = store, name in - Lwt.catch - (fun () -> Db.get pvname >>= fun _ -> Lwt.return ()) - (function - | Not_found -> - default () >>= fun def -> - Db.replace pvname (Marshal.to_string def []) - | e -> Lwt.fail e) - >>= fun () -> Lwt.return pvname + let pvname = (store, name) in + (try + let _ = Db.get pvname in + () + with + | Not_found -> + let def = default () in + Db.replace pvname (Marshal.to_string def []) + | e -> raise e); + pvname let make_persistent_lazy ~store ~name ~default = - let default () = Lwt.wrap default in + let default () = default () in make_persistent_lazy_lwt ~store ~name ~default let make_persistent ~store ~name ~default = make_persistent_lazy ~store ~name ~default:(fun () -> default) let get (pvname : 'a t) : 'a = - Db.get pvname >>= fun r -> Lwt.return (Marshal.from_string r 0) + let r = Db.get pvname in + Marshal.from_string r 0 let set pvname v = let data = Marshal.to_string v [] in @@ -181,8 +191,8 @@ module Functorial = struct module Table (T : sig - val name : string - end) + val name : string + end) (Key : COLUMN) (Value : COLUMN) : Ocsipersist_lib.Sigs.TABLE with type key = Key.t and type value = Value.t = @@ -191,7 +201,7 @@ module Functorial = struct type value = Value.t let name = T.name - let find key = Lwt.map Value.decode @@ Db.get (name, Key.encode key) + let find key = Value.decode (Db.get (name, Key.encode key)) let add key value = Db.replace (name, Key.encode key) (Value.encode value) let replace_if_exists key value = @@ -203,20 +213,21 @@ module Functorial = struct let i = ref 0L in let rec aux nextkey beg = match count with - | Some c when !i >= c -> Lwt.return beg + | Some c when !i >= c -> beg | _ -> ( - nextkey name >>= function - | None -> Lwt.return beg + match nextkey name with + | None -> beg | Some k -> ( let k = Key.decode k in - match gt, geq, lt, leq with - | _, _, Some lt, _ when k >= lt -> Lwt.return beg - | _, _, _, Some le when k > le -> Lwt.return beg + match (gt, geq, lt, leq) with + | _, _, Some lt, _ when k >= lt -> beg + | _, _, _, Some le when k > le -> beg | Some gt, _, _, _ when k <= gt -> aux Db.nextkey beg | _, Some ge, _, _ when k < ge -> aux Db.nextkey beg | _ -> i := Int64.succ !i; - find k >>= fun r -> f k r beg >>= aux Db.nextkey)) + let r = find k in + (aux Db.nextkey) (f k r beg))) in aux Db.firstkey beg @@ -231,10 +242,14 @@ module Functorial = struct "iter_block not implemented for DBM. Please use Ocsipersist with sqlite" let modify_opt key f = - Lwt.catch - (fun () -> find key >>= fun v -> Lwt.return_some v) - (function Not_found -> Lwt.return_none | _ -> assert false) - >>= fun old_value -> + let old_value = + try + let v = find key in + Some v + with + | Not_found -> None + | _ -> assert false + in match f old_value with | None -> remove key | Some new_value -> replace_if_exists key new_value @@ -244,12 +259,12 @@ module Functorial = struct Db.length name module Variable = Ocsipersist_lib.Variable (struct - type k = key - type v = value + type k = key + type v = value - let find = find - let add = add - end) + let find = find + let add = add + end) end module Column = struct @@ -270,8 +285,8 @@ module Functorial = struct end module Marshal (C : sig - type t - end) : COLUMN with type t = C.t = struct + type t + end) : COLUMN with type t = C.t = struct type t = C.t let column_type = "_" @@ -325,20 +340,53 @@ type 'value table = 'value Polymorphic.table *) let init () = - if !Ocsipersist_settings.delay_loading - then - Lwt_log.ign_warning ~section "Asynchronuous initialization (may fail later)" - else Lwt_log.ign_warning ~section "Initializing ..."; + if !Ocsipersist_settings.delay_loading then + Logs.warn ~src:section (fun fmt -> + fmt "Asynchronuous initialization (may fail later)") + else Logs.warn ~src:section (fun fmt -> fmt "Initializing ..."); let indescr = Db.get_indescr 2 in - if !Ocsipersist_settings.delay_loading - then ( + if !Ocsipersist_settings.delay_loading then ( Ocsipersist_settings.inch := - Lwt.map (Lwt_io.of_fd ~mode:Lwt_io.input) indescr; + (fun x1 -> + Eio.Buf_read.of_flow ~max_size:1_000_000 + (Eio_unix.Net.import_socket_stream + ~sw:(Stdlib.Option.get (Fiber.get Ocsipersist_lib.current_switch)) + ~close_unix:true x1 + : [ `R | `Flow | `Close ] r)) + indescr; Ocsipersist_settings.outch := - Lwt.map (Lwt_io.of_fd ~mode:Lwt_io.output) indescr) + (fun x1 -> + Eio.Buf_write.with_flow + (Eio_unix.Net.import_socket_stream + ~sw:(Stdlib.Option.get (Fiber.get Ocsipersist_lib.current_switch)) + ~close_unix:true x1 + : [ `W | `Flow | `Close ] r) + (fun outbuf -> `Move_writing_code_here)) + (* TODO: lwt-to-direct-style: Write operations to buffered IO should be moved inside [with_flow]. *) + indescr) else - let r = Lwt_main.run indescr in - Ocsipersist_settings.inch := Lwt.return (Lwt_io.of_fd ~mode:Lwt_io.input r); + let r = + Eio_main.run (fun env -> + Fiber.with_binding Ocsipersist_lib.env env (fun () -> + Switch.run (fun sw -> + Fiber.with_binding Ocsipersist_lib.current_switch sw + (fun () -> + (* TODO: lwt-to-direct-style: [Eio_main.run] argument used to be a [Lwt] promise and is now a [fun]. Make sure no asynchronous or IO calls are done outside of this [fun]. *) + indescr)))) + in + Ocsipersist_settings.inch := + Eio.Buf_read.of_flow ~max_size:1_000_000 + (Eio_unix.Net.import_socket_stream + ~sw:(Stdlib.Option.get (Fiber.get Ocsipersist_lib.current_switch)) + ~close_unix:true r + : [ `R | `Flow | `Close ] r); Ocsipersist_settings.outch := - Lwt.return (Lwt_io.of_fd ~mode:Lwt_io.output r); - Lwt_log.ign_warning ~section "...Initialization complete" + Eio.Buf_write.with_flow + (Eio_unix.Net.import_socket_stream + ~sw:(Stdlib.Option.get (Fiber.get Ocsipersist_lib.current_switch)) + ~close_unix:true + (* TODO: lwt-to-direct-style: Write operations to buffered IO should be moved inside [with_flow]. *) + r + : [ `W | `Flow | `Close ] r) + (fun outbuf -> `Move_writing_code_here); + Logs.warn ~src:section (fun fmt -> fmt "...Initialization complete") diff --git a/src/dbm/ocsipersist_config.ml b/src/dbm/ocsipersist_config.ml index e59df9b..7a313d6 100644 --- a/src/dbm/ocsipersist_config.ml +++ b/src/dbm/ocsipersist_config.ml @@ -34,4 +34,4 @@ let init_fun config = | Some d -> Ocsipersist_settings.ocsidbm := d); Ocsipersist.init () -let _ = Ocsigen_extensions.register ~name:"ocsipersist" ~init_fun () +let () = Ocsigen_extensions.register ~name:"ocsipersist" ~init_fun () diff --git a/src/dbm/ocsipersist_settings.ml b/src/dbm/ocsipersist_settings.ml index 2f40448..71572c0 100644 --- a/src/dbm/ocsipersist_settings.ml +++ b/src/dbm/ocsipersist_settings.ml @@ -1,6 +1,6 @@ -let directory, ocsidbm = ref "ocsipersist-store", ref "ocsidbm" -let inch = ref (Lwt.fail (Failure "Ocsipersist not initialised")) -let outch = ref (Lwt.fail (Failure "Ocsipersist not initialised")) +let directory, ocsidbm = (ref "ocsipersist-store", ref "ocsidbm") +let inch = ref (raise (Failure "Ocsipersist not initialised")) +let outch = ref (raise (Failure "Ocsipersist not initialised")) let delay_loading = ref false let error_log_path = ref "ocsipersist-errors" let set_error_log_path s = error_log_path := s diff --git a/src/dbm/ocsipersist_settings.mli b/src/dbm/ocsipersist_settings.mli index e12140b..b40c4fb 100644 --- a/src/dbm/ocsipersist_settings.mli +++ b/src/dbm/ocsipersist_settings.mli @@ -1,3 +1,5 @@ +open Eio.Std + val set_store : string -> unit val set_delay_loading : bool -> unit val set_ocsidbm : string -> unit @@ -9,5 +11,5 @@ val directory : string ref val ocsidbm : string ref val delay_loading : bool ref val error_log_path : string ref -val inch : Lwt_io.input_channel Lwt.t ref -val outch : Lwt_io.output_channel Lwt.t ref +val inch : Eio.Buf_read.t Promise.t ref +val outch : Eio.Buf_write.t Promise.t ref diff --git a/src/dune b/src/dune index 09bd611..4e0a74a 100644 --- a/src/dune +++ b/src/dune @@ -3,14 +3,14 @@ (public_name ocsipersist-lib) (modules ocsipersist_lib) (wrapped false) - (libraries lwt)) + (libraries eio eio.unix)) (library (name ocsipersist) (public_name ocsipersist) (virtual_modules ocsipersist) (modules ocsipersist) - (libraries ocsipersist_lib lwt)) + (libraries ocsipersist_lib eio)) (env (_ diff --git a/src/ocsipersist.mli b/src/ocsipersist.mli index 1912d65..741609f 100644 --- a/src/ocsipersist.mli +++ b/src/ocsipersist.mli @@ -65,4 +65,4 @@ module Store : Ocsipersist_lib.Sigs.STORE type store = Store.store type 'a variable = 'a Store.t -val init : unit -> unit +val init : env:Eio_unix.Stdenv.base -> unit diff --git a/src/ocsipersist_lib.ml b/src/ocsipersist_lib.ml index 9363f4c..c85656c 100644 --- a/src/ocsipersist_lib.ml +++ b/src/ocsipersist_lib.ml @@ -1,65 +1,67 @@ -open Lwt.Syntax +open Eio.Std (** This modules provides tools for creating more implementations of the {!Ocsipersist} virtual module. *) +let current_switch : Switch.t Fiber.key = Fiber.create_key () + module Sigs = struct module type TABLE = sig type key type value val name : string - val find : key -> value Lwt.t - val add : key -> value -> unit Lwt.t - val replace_if_exists : key -> value -> unit Lwt.t - val remove : key -> unit Lwt.t - val modify_opt : key -> (value option -> value option) -> unit Lwt.t - val length : unit -> int Lwt.t + val find : key -> value + val add : key -> value -> unit + val replace_if_exists : key -> value -> unit + val remove : key -> unit + val modify_opt : key -> (value option -> value option) -> unit + val length : unit -> int val iter : - ?count:int64 - -> ?gt:key - -> ?geq:key - -> ?lt:key - -> ?leq:key - -> (key -> value -> unit Lwt.t) - -> unit Lwt.t + ?count:int64 -> + ?gt:key -> + ?geq:key -> + ?lt:key -> + ?leq:key -> + (key -> value -> unit) -> + unit val fold : - ?count:int64 - -> ?gt:key - -> ?geq:key - -> ?lt:key - -> ?leq:key - -> (key -> value -> 'a -> 'a Lwt.t) - -> 'a - -> 'a Lwt.t + ?count:int64 -> + ?gt:key -> + ?geq:key -> + ?lt:key -> + ?leq:key -> + (key -> value -> 'a -> 'a) -> + 'a -> + 'a val iter_block : - ?count:int64 - -> ?gt:key - -> ?geq:key - -> ?lt:key - -> ?leq:key - -> (key -> value -> unit) - -> unit Lwt.t + ?count:int64 -> + ?gt:key -> + ?geq:key -> + ?lt:key -> + ?leq:key -> + (key -> value -> unit) -> + unit val iter_batch : - ?count:int64 - -> ?gt:key - -> ?geq:key - -> ?lt:key - -> ?leq:key - -> ((key * value) list -> unit Lwt.t) - -> unit Lwt.t + ?count:int64 -> + ?gt:key -> + ?geq:key -> + ?lt:key -> + ?leq:key -> + ((key * value) list -> unit) -> + unit module Variable : sig type t val make : name:key -> default:value -> t val make_lazy : name:key -> default:(unit -> value) -> t - val make_lazy_lwt : name:key -> default:(unit -> value Lwt.t) -> t - val get : t -> value Lwt.t - val set : t -> value -> unit Lwt.t + val make_lazy_lwt : name:key -> default:(unit -> value) -> t + val get : t -> value + val set : t -> value -> unit end end @@ -76,8 +78,8 @@ module Sigs = struct module Table (T : sig - val name : string - end) + val name : string + end) (Key : COLUMN) (Value : COLUMN) : TABLE with type key = Key.t and type value = Value.t @@ -86,8 +88,8 @@ module Sigs = struct module Float : COLUMN with type t = float module Marshal (C : sig - type t - end) : COLUMN with type t = C.t + type t + end) : COLUMN with type t = C.t end end @@ -95,54 +97,50 @@ module Sigs = struct type 'value table (** Type of persistent table *) - val table_name : 'value table -> string Lwt.t + val table_name : 'value table -> string (** returns the name of the table *) - val open_table : string -> 'value table Lwt.t + val open_table : string -> 'value table (** Open a table (and create it if it does not exist) *) - val find : 'value table -> string -> 'value Lwt.t + val find : 'value table -> string -> 'value (** [find table key] gives the value associated to [key]. Fails with [Not_found] if not found. *) - val add : 'value table -> string -> 'value -> unit Lwt.t + val add : 'value table -> string -> 'value -> unit (** [add table key value] associates [value] to [key]. If the database already contains data associated with [key], that data is discarded and silently replaced by the new data. *) - val replace_if_exists : 'value table -> string -> 'value -> unit Lwt.t + val replace_if_exists : 'value table -> string -> 'value -> unit (** [replace_if_exists table key value] associates [value] to [key] only if [key] is already bound. If the database does not contain any data associated with [key], fails with [Not_found]. *) - val remove : 'value table -> string -> unit Lwt.t + val remove : 'value table -> string -> unit (** [remove table key] removes the entry in the table if it exists *) - val length : 'value table -> int Lwt.t + val length : 'value table -> int (** Size of a table. *) - val iter_step : (string -> 'a -> unit Lwt.t) -> 'a table -> unit Lwt.t + val iter_step : (string -> 'a -> unit) -> 'a table -> unit (** Important warning: this iterator may not iter on all data of the table if another thread is modifying it in the same time. Nonetheless, it should not miss more than a very few data from time to time, except if the table is very old (at least 9 223 372 036 854 775 807 insertions). *) - val fold_step : - (string -> 'a -> 'b -> 'b Lwt.t) - -> 'a table - -> 'b - -> 'b Lwt.t + val fold_step : (string -> 'a -> 'b -> 'b) -> 'a table -> 'b -> 'b (** Important warning: this iterator may not iter on all data of the table if another thread is modifying it in the same time. Nonetheless, it should not miss more than a very few data from time to time, except if the table is very old (at least 9 223 372 036 854 775 807 insertions). *) - val iter_block : (string -> 'a -> unit) -> 'a table -> unit Lwt.t + val iter_block : (string -> 'a -> unit) -> 'a table -> unit (** MAJOR WARNING: Unlike iter_step, this iterator won't miss any entry and will run in one shot. It is therefore more efficient, BUT: it will lock the WHOLE database during its execution, @@ -169,10 +167,10 @@ module Sigs = struct Be careful to change this name every time you change the type of the value. *) - val get : 'a t -> 'a Lwt.t + val get : 'a t -> 'a (** Get the value of a reference *) - val set : 'a t -> 'a -> unit Lwt.t + val set : 'a t -> 'a -> unit (** Set the value of a reference *) end @@ -184,42 +182,35 @@ module Sigs = struct (** Data are divided into stores. Create one store for your project, where you will save all your data. *) - val open_store : string -> store Lwt.t + val open_store : string -> store (** Open a store (and create it if it does not exist) *) - val make_persistent : store:store -> name:string -> default:'a -> 'a t Lwt.t + val make_persistent : store:store -> name:string -> default:'a -> 'a t (** [make_persistent store name default] find a persistent value named [name] in store [store] from database, or create it with the default value [default] if it does not exist. *) val make_persistent_lazy : - store:store - -> name:string - -> default:(unit -> 'a) - -> 'a t Lwt.t + store:store -> name:string -> default:(unit -> 'a) -> 'a t (** Same as make_persistent but the default value is evaluated only if needed *) val make_persistent_lazy_lwt : - store:store - -> name:string - -> default:(unit -> 'a Lwt.t) - -> 'a t Lwt.t + store:store -> name:string -> default:(unit -> 'a) -> 'a t (** Lwt version of make_persistent_lazy. *) - val get : 'a t -> 'a Lwt.t + val get : 'a t -> 'a (** [get pv] gives the value of [pv] *) - val set : 'a t -> 'a -> unit Lwt.t + val set : 'a t -> 'a -> unit (** [set pv value] sets a persistent value [pv] to [value] *) end end open Sigs -open Lwt.Infix (** deriving polymorphic interface from the functorial one *) module Polymorphic (Functorial : FUNCTORIAL) : POLYMORPHIC = struct @@ -236,14 +227,12 @@ module Polymorphic (Functorial : FUNCTORIAL) : POLYMORPHIC = struct end) (Column.String) (Column.Marshal (struct - type t = a - end)) + type t = a + end)) in - Lwt.return (module T : POLYMORPHIC with type value = a) - - let table_name (type a) (module T : POLYMORPHIC with type value = a) = - Lwt.return T.name + (module T : POLYMORPHIC with type value = a) + let table_name (type a) (module T : POLYMORPHIC with type value = a) = T.name let find (type a) (module T : POLYMORPHIC with type value = a) = T.find let add (type a) (module T : POLYMORPHIC with type value = a) = T.add @@ -264,59 +253,53 @@ module Polymorphic (Functorial : FUNCTORIAL) : POLYMORPHIC = struct end module Variable (T : sig - type k - type v + type k + type v - val find : k -> v Lwt.t - val add : k -> v -> unit Lwt.t - end) = + val find : k -> v + val add : k -> v -> unit +end) = struct - type t = {name : T.k; default : unit -> T.v Lwt.t} - - let make_lazy_lwt ~name ~default = {name; default} - - let make_lazy ~name ~default = - {name; default = (fun () -> Lwt.return @@ default ())} + type t = { name : T.k; default : unit -> T.v } - let make ~name ~default = {name; default = (fun () -> Lwt.return default)} + let make_lazy_lwt ~name ~default = { name; default } + let make_lazy ~name ~default = { name; default = (fun () -> default ()) } + let make ~name ~default = { name; default = (fun () -> default) } - let get {name; default} = - Lwt.catch - (fun () -> T.find name) - (function - | Not_found -> - default () >>= fun d -> - T.add name d >>= fun () -> Lwt.return d - | exc -> Lwt.reraise exc) + let get { name; default } = + try T.find name + with Not_found -> + let d = default () in + T.add name d; + d - let set {name} = T.add name + let set { name } = T.add name end module Ref (Store : STORE) = struct let store = lazy (Store.open_store "__ocsipersist_ref_store__") - type 'a t = Ref of 'a ref | Per of 'a Store.t Lwt.t + type 'a t = Ref of 'a ref | Per of 'a Store.t Lazy.t let ref ?persistent v = match persistent with | None -> Ref (ref v) | Some name -> Per - (let* store = Lazy.force store in - Store.make_persistent ~store ~name ~default:v) + (lazy + (let store = Lazy.force store in + Store.make_persistent ~store ~name ~default:v)) let get = function - | Ref r -> Lwt.return !r + | Ref r -> !r | Per r -> - let* r = r in + let (lazy r) = r in Store.get r let set r v = match r with - | Ref r -> - r := v; - Lwt.return_unit + | Ref r -> r := v | Per r -> - let* r = r in + let (lazy r) = r in Store.set r v end diff --git a/src/pgsql/dune b/src/pgsql/dune index a795067..85eebec 100644 --- a/src/pgsql/dune +++ b/src/pgsql/dune @@ -3,7 +3,7 @@ (public_name ocsipersist-pgsql) (implements ocsipersist) (modules :standard \ ocsipersist_config ocsipersist_settings) - (libraries pgocaml lwt_log ocsipersist_lib ocsipersist_pgsql_settings)) + (libraries pgocaml lwt.unix logs ocsipersist_lib ocsipersist_pgsql_settings)) ; Configuration functions (part of ocsipersist-pgsql package): diff --git a/src/pgsql/ocsipersist.ml b/src/pgsql/ocsipersist.ml index cc4f178..0c0984f 100644 --- a/src/pgsql/ocsipersist.ml +++ b/src/pgsql/ocsipersist.ml @@ -2,27 +2,32 @@ module type TABLE = Ocsipersist_lib.Sigs.TABLE -let section = Lwt_log.Section.make "ocsigen:ocsipersist:pgsql" +let section = Logs.Src.create "ocsigen:ocsipersist:pgsql" module Lwt_thread = struct - include Lwt + let close_in = fun x1 -> Eio.Resource.close x1 + + let really_input + (* TODO: lwt-to-direct-style: [x2] should be a [Cstruct.t]. *) + (* TODO: lwt-to-direct-style: [Eio.Flow.single_read] operates on a [Flow.source] but [x1] is likely of type [Eio.Buf_read.t]. Rewrite this code to use [Buf_read] (which contains an internal buffer) or change the call to [Eio.Buf_read.of_flow] used to create the buffer. *) + (* TODO: lwt-to-direct-style: Dropped expression (buffer offset): [x3]. This will behave as if it was [0]. *) + (* TODO: lwt-to-direct-style: Dropped expression (buffer length): [x4]. This will behave as if it was [Cstruct.length buffer]. *) + = + fun x1 x2 x3 x4 -> Eio.Flow.read_exact x1 x2 - let close_in = Lwt_io.close - let really_input = Lwt_io.read_into_exactly let input_binary_int = Lwt_io.BE.read_int let input_char = Lwt_io.read_char - let output_string = Lwt_io.write + let output_string = fun x1 x2 -> Eio.Buf_write.string x1 x2 let output_binary_int = Lwt_io.BE.write_int let output_char = Lwt_io.write_char - let flush = Lwt_io.flush + let flush = fun x1 -> Eio.Buf_write.flush x1 let open_connection x = Lwt_io.open_connection x - type out_channel = Lwt_io.output_channel - type in_channel = Lwt_io.input_channel + type out_channel = Eio.Buf_write.t + type in_channel = Eio.Buf_read.t end module PGOCaml = PGOCaml_generic.Make (Lwt_thread) -open Lwt.Infix open Printf exception Ocsipersist_error @@ -30,37 +35,38 @@ exception Ocsipersist_error module Config = Ocsipersist_settings let connect () = - PGOCaml.connect ?host:!Config.host ?port:!Config.port ?user:!Config.user - ?password:!Config.password ?database:(Some !Config.database) - ?unix_domain_socket_dir:!Config.unix_domain_socket_dir - () - >>= fun dbhandle -> + let dbhandle = + PGOCaml.connect ?host:!Config.host ?port:!Config.port ?user:!Config.user + ?password:!Config.password ?database:(Some !Config.database) + ?unix_domain_socket_dir:!Config.unix_domain_socket_dir + () + in PGOCaml.set_private_data dbhandle @@ Hashtbl.create 8; - Lwt.return dbhandle + dbhandle -let ( >> ) f g = f >>= fun _ -> g +let ( >> ) f g = + let _ = f in + g let conn_pool : (string, unit) Hashtbl.t PGOCaml.t Lwt_pool.t ref = - let dispose db = - Lwt.catch (fun () -> PGOCaml.close db) (fun _ -> Lwt.return_unit) - in + let dispose db = try PGOCaml.close db with _ -> () in (* This connection pool will be overwritten by init_fun! *) ref (Lwt_pool.create !Config.size_conn_pool ~validate:PGOCaml.alive ~dispose - (fun () -> Lwt.fail (Failure "Ocsipersist db not initialised"))) + (fun () -> raise (Failure "Ocsipersist db not initialised"))) let use_pool f = Lwt_pool.use !conn_pool @@ fun db -> - Lwt.catch - (fun () -> f db) - (function - | PGOCaml.Error msg as e -> - Lwt_log.ign_error_f ~section "postgresql protocol error: %s" msg; - PGOCaml.close db >>= fun () -> Lwt.fail e - | Lwt.Canceled as e -> - Lwt_log.ign_error ~section "thread canceled"; - PGOCaml.close db >>= fun () -> Lwt.fail e - | e -> Lwt.fail e) + try f db with + | PGOCaml.Error msg as e -> + Logs.err ~src:section (fun fmt -> fmt "postgresql protocol error: %s" msg); + PGOCaml.close db; + raise e + | Lwt.Canceled as e -> + Logs.err ~src:section (fun fmt -> fmt "thread canceled"); + PGOCaml.close db; + raise e + | e -> raise e (* escapes characters that are not in the range of 0x20..0x7e; this is to meet PostgreSQL's format requirements for text fields @@ -71,10 +77,9 @@ let escape_string s = for i = 0 to len - 1 do let c = s.[i] in let cc = Char.code c in - if cc < 0x20 || cc > 0x7e - then Buffer.add_string buf (sprintf "\\%03o" cc) (* non-print -> \ooo *) - else if c = '\\' - then Buffer.add_string buf "\\\\" (* \ -> \\ *) + if cc < 0x20 || cc > 0x7e then Buffer.add_string buf (sprintf "\\%03o" cc) + (* non-print -> \ooo *) + else if c = '\\' then Buffer.add_string buf "\\\\" (* \ -> \\ *) else Buffer.add_char buf c done; Buffer.contents buf @@ -88,15 +93,16 @@ let unescape_string str = let i = ref 0 in while !i < len do let c = str.[!i] in - if c = '\\' - then ( + if c = '\\' then ( incr i; - if !i < len && str.[!i] = '\\' - then (Buffer.add_char buf '\\'; incr i) - else if !i + 2 < len - && is_first_oct_digit str.[!i] - && is_oct_digit str.[!i + 1] - && is_oct_digit str.[!i + 2] + if !i < len && str.[!i] = '\\' then ( + Buffer.add_char buf '\\'; + incr i) + else if + !i + 2 < len + && is_first_oct_digit str.[!i] + && is_oct_digit str.[!i + 1] + && is_oct_digit str.[!i + 2] then ( let byte = oct_val str.[!i] in incr i; @@ -105,7 +111,9 @@ let unescape_string str = let byte = (byte lsl 3) + oct_val str.[!i] in incr i; Buffer.add_char buf (Char.chr byte))) - else (incr i; Buffer.add_char buf c) + else ( + incr i; + Buffer.add_char buf c) done; Buffer.contents buf @@ -118,11 +126,11 @@ let pack = function let unpack_value value = Marshal.from_string (PGOCaml.bytea_of_string value) 0 let rec list_last l = - match l with [x] -> x | _ :: r -> list_last r | [] -> raise Not_found + match l with [ x ] -> x | _ :: r -> list_last r | [] -> raise Not_found (* get one value from the result of a query *) let one_value = function - | [Some value] :: _xs -> unpack_value value + | [ Some value ] :: _xs -> unpack_value value | _ -> raise Not_found let prepare db query = @@ -131,19 +139,16 @@ let prepare db query = let name = Digest.to_hex (Digest.string query) in (* Have we prepared this statement already? If not, do so. *) let is_prepared = Hashtbl.mem hashtbl name in - (if is_prepared - then Lwt.return () - else - PGOCaml.prepare db ~name ~query () - >> Lwt.return @@ Hashtbl.add hashtbl name ()) - >>= fun () -> Lwt.return name + if is_prepared then () + else PGOCaml.prepare db ~name ~query () >> Hashtbl.add hashtbl name (); + name let exec db query params = - prepare db query >>= fun name -> + let name = prepare db query in let params = List.map (fun x -> Some (pack x)) params in PGOCaml.execute db ~name ~params () -let exec_ db query params = exec db query params >> Lwt.return_unit +let exec_ db query params = exec db query params >> () module Functorial = struct type internal = string @@ -158,8 +163,8 @@ module Functorial = struct module Table (T : sig - val name : string - end) + val name : string + end) (Key : COLUMN) (Value : COLUMN) : TABLE with type key = Key.t and type value = Value.t = struct @@ -170,43 +175,48 @@ module Functorial = struct module Aux = struct let exec_opt db query params = - prepare db query >>= fun name -> PGOCaml.execute db ~name ~params () + let name = prepare db query in + PGOCaml.execute db ~name ~params () let exec db query params = - prepare db query >>= fun name -> + let name = prepare db query in let params = List.map (fun x -> Some x) params in PGOCaml.execute db ~name ~params () - let exec_ db query params = exec db query params >> Lwt.return_unit - let encode_pair key value = [Key.encode key; Value.encode value] + let exec_ db query params = exec db query params >> () + let encode_pair key value = [ Key.encode key; Value.encode value ] end let init = let create_table table db = let query = sprintf - "CREATE TABLE IF NOT EXISTS %s (key %s, value %s, PRIMARY KEY (key))" + "CREATE TABLE IF NOT EXISTS %s (key %s, value %s, PRIMARY KEY \ + (key))" table Key.column_type Value.column_type in Aux.exec_ db query [] in lazy (use_pool @@ create_table T.name) - let with_table f = Lazy.force init >>= fun () -> use_pool f + let with_table f = + Lazy.force init; + use_pool f let find key = with_table @@ fun db -> let query = sprintf "SELECT value FROM %s WHERE key = $1 " name in - Aux.exec db query [Key.encode key] >>= function - | [Some value] :: _ -> Lwt.return (Value.decode value) - | _ -> Lwt.fail Not_found + match Aux.exec db query [ Key.encode key ] with + | [ Some value ] :: _ -> Value.decode value + | _ -> raise Not_found let add key value = with_table @@ fun db -> let query = sprintf - "INSERT INTO %s VALUES ($1, $2) - ON CONFLICT (key) DO UPDATE SET value = $2" + "INSERT INTO %s VALUES ($1, $2)\n\ + \ ON CONFLICT (key) DO UPDATE SET value = \ + $2" name in Aux.exec_ db query @@ Aux.encode_pair key value @@ -216,61 +226,62 @@ module Functorial = struct let query = sprintf "UPDATE %s SET value = $2 WHERE key = $1 RETURNING 0" name in - Aux.exec db query (Aux.encode_pair key value) >>= function + match Aux.exec db query (Aux.encode_pair key value) with | [] -> raise Not_found - | _ -> Lwt.return_unit + | _ -> () let remove key = with_table @@ fun db -> let query = sprintf "DELETE FROM %s WHERE key = $1" name in - Aux.exec_ db query [Key.encode key] + Aux.exec_ db query [ Key.encode key ] let modify_opt key f = with_table @@ fun db -> let query = sprintf "SELECT value FROM %s WHERE key = $1" name in - Aux.exec db query [Key.encode key] >>= fun value -> + let value = Aux.exec db query [ Key.encode key ] in let old_value = - match value with [Some v] :: _ -> Some (Value.decode v) | _ -> None + match value with [ Some v ] :: _ -> Some (Value.decode v) | _ -> None in let new_value = f old_value in - match new_value = old_value, new_value with - | true, _ -> Lwt.return_unit + match (new_value = old_value, new_value) with + | true, _ -> () | false, Some new_value -> let query = sprintf - "INSERT INTO %s VALUES ($1, $2) - ON CONFLICT (key) DO UPDATE SET value = $2" + "INSERT INTO %s VALUES ($1, $2)\n\ + \ ON CONFLICT (key) DO UPDATE SET \ + value = $2" name in Aux.exec_ db query @@ Aux.encode_pair key new_value | false, None -> let query = sprintf "DELETE FROM %s WHERE key = $1" name in - Aux.exec_ db query [Key.encode key] + Aux.exec_ db query [ Key.encode key ] let length () = with_table @@ fun db -> let query = sprintf "SELECT count (1) FROM %s" name in - Lwt.map one_value @@ Aux.exec db query [] + one_value (Aux.exec db query []) let max_iter_block_size = 1000L let rec iter_rec last ?count ?gt ?geq ?lt ?leq f = match count with - | Some c when c <= 0L -> Lwt.return_unit + | Some c when c <= 0L -> () | _ -> let key_value_of_row = function - | [Some key; Some value] -> Key.decode key, Value.decode value + | [ Some key; Some value ] -> (Key.decode key, Value.decode value) | _ -> raise Ocsipersist_error in let query = sprintf - "SELECT * FROM %s - WHERE ($1 :: %s IS NULL OR key > $1) - AND ($2 :: %s IS NULL OR key > $2) - AND ($3 :: %s IS NULL OR key >= $3) - AND ($4 :: %s IS NULL OR key < $4) - AND ($5 :: %s IS NULL OR key <= $5) - ORDER BY key LIMIT $6" + "SELECT * FROM %s\n\ + \ WHERE ($1 :: %s IS NULL OR key > $1)\n\ + \ AND ($2 :: %s IS NULL OR key > $2)\n\ + \ AND ($3 :: %s IS NULL OR key >= $3)\n\ + \ AND ($4 :: %s IS NULL OR key < $4)\n\ + \ AND ($5 :: %s IS NULL OR key <= $5)\n\ + \ ORDER BY key LIMIT $6" name Key.column_type Key.column_type Key.column_type Key.column_type Key.column_type in @@ -280,20 +291,21 @@ module Functorial = struct | _ -> max_iter_block_size in let args = - [ Option.map Key.encode last - ; Option.map Key.encode gt - ; Option.map Key.encode geq - ; Option.map Key.encode lt - ; Option.map Key.encode leq - ; Some (Int64.to_string limit) ] + [ + Option.map Key.encode last; + Option.map Key.encode gt; + Option.map Key.encode geq; + Option.map Key.encode lt; + Option.map Key.encode leq; + Some (Int64.to_string limit); + ] in - with_table (fun db -> Aux.exec_opt db query args) >>= fun l -> + let l = with_table (fun db -> Aux.exec_opt db query args) in let key_values = List.map key_value_of_row l in - f key_values >>= fun () -> - if Int64.of_int (List.length l) < limit - then Lwt.return_unit + f key_values; + if Int64.of_int (List.length l) < limit then () else - let last, _ = list_last key_values in + let last, (_ : value) = list_last key_values in let count = Option.map Int64.(fun c -> sub c @@ of_int @@ List.length key_values) @@ -304,28 +316,27 @@ module Functorial = struct let iter_batch = iter_rec None let iter ?count ?gt ?geq ?lt ?leq f = - let f key_values = Lwt_list.iter_s (fun (k, v) -> f k v) key_values in + let f key_values = List.iter (fun (k, v) -> f k v) key_values in iter_rec None ?count ?gt ?geq ?lt ?leq f let fold ?count ?gt ?geq ?lt ?leq f x = let res = ref x in let g key value = - f key value !res >>= fun res' -> - res := res'; - Lwt.return_unit + let res' = f key value !res in + res := res' in - iter ?count ?gt ?geq ?lt ?leq g >> Lwt.return !res + iter ?count ?gt ?geq ?lt ?leq g >> !res let iter_block ?count:_ ?gt:_ ?geq:_ ?lt:_ ?leq:_ _ = failwith "Ocsipersist.iter_block: not implemented" module Variable = Ocsipersist_lib.Variable (struct - type k = key - type v = value + type k = key + type v = value - let find = find - let add = add - end) + let find = find + let add = add + end) end module Column = struct @@ -346,8 +357,8 @@ module Functorial = struct end module Marshal (C : sig - type t - end) : COLUMN with type t = C.t = struct + type t + end) : COLUMN with type t = C.t = struct type t = C.t let column_type = "bytea" @@ -363,29 +374,30 @@ type 'value table = 'value Polymorphic.table module Store = struct type store = string - type 'a t = {store : string; name : string} + type 'a t = { store : string; name : string } let open_store store = use_pool @@ fun db -> let create_table db table = let query = sprintf - "CREATE TABLE IF NOT EXISTS %s (key TEXT, value BYTEA, PRIMARY KEY(key))" + "CREATE TABLE IF NOT EXISTS %s (key TEXT, value BYTEA, PRIMARY \ + KEY(key))" table in exec_ db query [] in - create_table db store >> Lwt.return store + create_table db store >> store let make_persistent_worker ~store ~name ~default db = let query = sprintf - "INSERT INTO %s VALUES ( $1 , $2 ) - ON CONFLICT ( key ) DO NOTHING" + "INSERT INTO %s VALUES ( $1 , $2 )\n\ + \ ON CONFLICT ( key ) DO NOTHING" store in (* NOTE: incompatible with < 9.5 *) - exec_ db query [Key name; Value default] >> Lwt.return {store; name} + exec_ db query [ Key name; Value default ] >> { store; name } let make_persistent ~store ~name ~default = use_pool @@ fun db -> make_persistent_worker ~store ~name ~default db @@ -393,25 +405,25 @@ module Store = struct let make_persistent_lazy_lwt ~store ~name ~default = use_pool @@ fun db -> let query = sprintf "SELECT 1 FROM %s WHERE key = $1 " store in - exec db query [Key name] >>= function + match exec db query [ Key name ] with | [] -> - default () >>= fun default -> + let default = default () in make_persistent_worker ~store ~name ~default db - | _ -> Lwt.return {store; name} + | _ -> { store; name } let make_persistent_lazy ~store ~name ~default = - let default () = Lwt.wrap default in + let default () = default () in make_persistent_lazy_lwt ~store ~name ~default let get p = use_pool @@ fun db -> let query = sprintf "SELECT value FROM %s WHERE key = $1 " p.store in - Lwt.map one_value (exec db query [Key p.name]) + one_value (exec db query [ Key p.name ]) let set p v = use_pool @@ fun db -> let query = sprintf "UPDATE %s SET value = $2 WHERE key = $1 " p.store in - exec db query [Key p.name; Value v] >> Lwt.return () + exec db query [ Key p.name; Value v ] >> () end module Ref = Ocsipersist_lib.Ref (Store) diff --git a/src/pgsql/ocsipersist_config.ml b/src/pgsql/ocsipersist_config.ml index 7563e6a..5dab77c 100644 --- a/src/pgsql/ocsipersist_config.ml +++ b/src/pgsql/ocsipersist_config.ml @@ -1,5 +1,7 @@ -let section = Lwt_log.Section.make "ocsigen:ocsipersist:pgsql:config" -let _ = Lwt_log.ign_info ~section "Init for Ocsigen Server config file" +let section = Logs.Src.create "ocsigen:ocsipersist:pgsql:config" + +let () = + Logs.info ~src:section (fun fmt -> fmt "Init for Ocsigen Server config file") let parse_global_config = function | [] -> () @@ -35,4 +37,4 @@ let parse_global_config = function "Unexpected content inside Ocsipersist config" let init_fun config = parse_global_config config; Ocsipersist.init () -let _ = Ocsigen_extensions.register ~name:"ocsipersist" ~init_fun () +let () = Ocsigen_extensions.register ~name:"ocsipersist" ~init_fun () diff --git a/src/sqlite/dune b/src/sqlite/dune index 4ef220a..c6884ab 100644 --- a/src/sqlite/dune +++ b/src/sqlite/dune @@ -3,7 +3,13 @@ (public_name ocsipersist-sqlite) (implements ocsipersist) (modules ocsipersist) - (libraries sqlite3 lwt_log ocsipersist_lib ocsipersist_sqlite_settings)) + (libraries + sqlite3 + eio + eio_main + logs + ocsipersist_lib + ocsipersist_sqlite_settings)) ; Configuration functions (part of ocsipersist-sqlite package): diff --git a/src/sqlite/ocsipersist.ml b/src/sqlite/ocsipersist.ml index 5d1040a..0f84915 100644 --- a/src/sqlite/ocsipersist.ml +++ b/src/sqlite/ocsipersist.ml @@ -1,8 +1,9 @@ +open Eio.Std + module type TABLE = Ocsipersist_lib.Sigs.TABLE -let section = Lwt_log.Section.make "ocsigen:ocsipersist:sqlite" +let section = Logs.Src.create "ocsigen:ocsipersist:sqlite" -open Lwt.Infix open Sqlite3 open Printf @@ -10,21 +11,25 @@ module Aux = struct (* This reference is overwritten when the init function (at the end of the file) is run, which occurs when the extension is loaded *) let db_file = Ocsipersist_settings.db_file - let yield () = Thread.yield () + let yield () = Fiber.yield () + let domain_mgr = ref None + let get_domain_mgr () : _ Eio.Domain_manager.t = Option.get !domain_mgr let rec bind_safely stmt = function | [] -> stmt | (value, name) :: q as l -> ( - match Sqlite3.bind stmt (bind_parameter_index stmt name) value with - | Rc.OK -> bind_safely stmt q - | Rc.BUSY | Rc.LOCKED -> yield (); bind_safely stmt l - | rc -> - ignore (finalize stmt); - failwith (Rc.to_string rc)) + match Sqlite3.bind stmt (bind_parameter_index stmt name) value with + | Rc.OK -> bind_safely stmt q + | Rc.BUSY | Rc.LOCKED -> + yield (); + bind_safely stmt l + | rc -> + ignore (finalize stmt : Rc.t); + failwith (Rc.to_string rc)) let close_safely db = - if not (db_close db) - then Lwt_log.ign_error ~section "Couldn't close database" + if not (db_close db) then + Logs.err ~src:section (fun fmt -> fmt "Couldn't close database") let m = Mutex.create () @@ -32,14 +37,22 @@ module Aux = struct let aux () = let db = Mutex.lock m; - try db_open !db_file with e -> Mutex.unlock m; raise e + try db_open !db_file + with e -> + Mutex.unlock m; + raise e in try let r = f db in - close_safely db; Mutex.unlock m; r - with e -> close_safely db; Mutex.unlock m; raise e + close_safely db; + Mutex.unlock m; + r + with e -> + close_safely db; + Mutex.unlock m; + raise e in - Lwt_preemptive.detach aux () + Eio.Domain_manager.run (get_domain_mgr ()) aux (* Référence indispensable pour les codes de retours et leur signification : * http://sqlite.org/capi3ref.html @@ -49,41 +62,47 @@ module Aux = struct let db_create table = let sql = sprintf - "CREATE TABLE IF NOT EXISTS %s (key TEXT, value BLOB, PRIMARY KEY(key) ON CONFLICT REPLACE)" + "CREATE TABLE IF NOT EXISTS %s (key TEXT, value BLOB, PRIMARY \ + KEY(key) ON CONFLICT REPLACE)" table in let create db = let stmt = prepare db sql in let rec aux () = match step stmt with - | Rc.DONE -> ignore (finalize stmt) - | Rc.BUSY | Rc.LOCKED -> yield (); aux () + | Rc.DONE -> ignore (finalize stmt : Rc.t) + | Rc.BUSY | Rc.LOCKED -> + yield (); + aux () | rc -> - ignore (finalize stmt); + ignore (finalize stmt : Rc.t); failwith (Rc.to_string rc) in aux () in - exec_safely create >>= fun () -> Lwt.return table + exec_safely create; + table let db_get, db_replace = let get (table, key) db = let sqlget = sprintf "SELECT value FROM %s WHERE key = :key " table in - let stmt = bind_safely (prepare db sqlget) [Data.TEXT key, ":key"] in + let stmt = bind_safely (prepare db sqlget) [ (Data.TEXT key, ":key") ] in let rec aux () = match step stmt with | Rc.ROW -> let value = match column stmt 0 with Data.BLOB s -> s | _ -> assert false in - ignore (finalize stmt); + ignore (finalize stmt : Rc.t); value | Rc.DONE -> - ignore (finalize stmt); + ignore (finalize stmt : Rc.t); raise Not_found - | Rc.BUSY | Rc.LOCKED -> yield (); aux () + | Rc.BUSY | Rc.LOCKED -> + yield (); + aux () | rc -> - ignore (finalize stmt); + ignore (finalize stmt : Rc.t); failwith (Rc.to_string rc) in aux () @@ -94,20 +113,22 @@ module Aux = struct in let stmt = bind_safely (prepare db sqlreplace) - [Data.TEXT key, ":key"; Data.BLOB value, ":value"] + [ (Data.TEXT key, ":key"); (Data.BLOB value, ":value") ] in let rec aux () = match step stmt with - | Rc.DONE -> ignore (finalize stmt) - | Rc.BUSY | Rc.LOCKED -> yield (); aux () + | Rc.DONE -> ignore (finalize stmt : Rc.t) + | Rc.BUSY | Rc.LOCKED -> + yield (); + aux () | rc -> - ignore (finalize stmt); + ignore (finalize stmt : Rc.t); failwith (Rc.to_string rc) in aux () in - ( (fun tablekey -> exec_safely (get tablekey)) - , fun tablekey value -> exec_safely (replace tablekey value) ) + ( (fun tablekey -> exec_safely (get tablekey)), + fun tablekey value -> exec_safely (replace tablekey value) ) end module Store = struct @@ -119,25 +140,26 @@ module Store = struct Aux.db_create s let make_persistent_lazy_lwt ~store ~name ~default = - let pvname = store, name in - Lwt.catch - (fun () -> Aux.db_get pvname >>= fun _ -> Lwt.return ()) - (function - | Not_found -> - default () >>= fun def -> - Aux.db_replace pvname (Marshal.to_string def []) - | e -> Lwt.fail e) - >>= fun () -> Lwt.return pvname + let pvname = (store, name) in + (try + let _ = Aux.db_get pvname in + () + with + | Not_found -> + let def = default () in + Aux.db_replace pvname (Marshal.to_string def []) + | e -> raise e); + pvname let make_persistent_lazy ~store ~name ~default = - let default () = Lwt.wrap default in make_persistent_lazy_lwt ~store ~name ~default let make_persistent ~store ~name ~default = make_persistent_lazy ~store ~name ~default:(fun () -> default) let get (pvname : 'a t) : 'a = - Aux.db_get pvname >>= fun r -> Lwt.return (Marshal.from_string r 0) + let r = Aux.db_get pvname in + Marshal.from_string r 0 let set pvname v = let data = Marshal.to_string v [] in @@ -160,8 +182,8 @@ module Functorial = struct module Table (T : sig - val name : string - end) + val name : string + end) (Key : COLUMN) (Value : COLUMN) : Ocsipersist_lib.Sigs.TABLE with type key = Key.t and type value = Value.t = @@ -175,40 +197,49 @@ module Functorial = struct let create db = let sql = sprintf - "CREATE TABLE IF NOT EXISTS %s - (key %s, value %s, PRIMARY KEY (key) ON CONFLICT REPLACE)" + "CREATE TABLE IF NOT EXISTS %s\n\ + \ (key %s, value %s, PRIMARY KEY (key) ON CONFLICT \ + REPLACE)" name Key.column_type Value.column_type in let stmt = prepare db sql in let rec aux () = match step stmt with - | Rc.DONE -> ignore (finalize stmt) - | Rc.BUSY | Rc.LOCKED -> Aux.yield (); aux () + | Rc.DONE -> ignore (finalize stmt : Rc.t) + | Rc.BUSY | Rc.LOCKED -> + Aux.yield (); + aux () | rc -> - ignore (finalize stmt); + ignore (finalize stmt : Rc.t); failwith (Rc.to_string rc) in aux () in lazy (Aux.exec_safely create) - let with_table f = Lazy.force init >>= fun () -> Aux.exec_safely f + let with_table f = + Lazy.force init; + Aux.exec_safely f let db_get key db = let sqlget = sprintf "SELECT value FROM %s WHERE key = :key" name in - let stmt = Aux.bind_safely (prepare db sqlget) [Key.encode key, ":key"] in + let stmt = + Aux.bind_safely (prepare db sqlget) [ (Key.encode key, ":key") ] + in let rec aux () = match step stmt with | Rc.ROW -> let value = column stmt 0 in - ignore (finalize stmt); + ignore (finalize stmt : Rc.t); value | Rc.DONE -> - ignore (finalize stmt); + ignore (finalize stmt : Rc.t); raise Not_found - | Rc.BUSY | Rc.LOCKED -> Aux.yield (); aux () + | Rc.BUSY | Rc.LOCKED -> + Aux.yield (); + aux () | rc -> - ignore (finalize stmt); + ignore (finalize stmt : Rc.t); failwith (Rc.to_string rc) in Value.decode @@ aux () @@ -217,27 +248,33 @@ module Functorial = struct let sqlreplace = sprintf "INSERT INTO %s VALUES (:key, :value)" name in let stmt = Aux.bind_safely (prepare db sqlreplace) - [Key.encode key, ":key"; Value.encode value, ":value"] + [ (Key.encode key, ":key"); (Value.encode value, ":value") ] in let rec aux () = match step stmt with - | Rc.DONE -> ignore (finalize stmt) - | Rc.BUSY | Rc.LOCKED -> Aux.yield (); aux () + | Rc.DONE -> ignore (finalize stmt : Rc.t) + | Rc.BUSY | Rc.LOCKED -> + Aux.yield (); + aux () | rc -> - ignore (finalize stmt); + ignore (finalize stmt : Rc.t); failwith (Rc.to_string rc) in aux () let db_remove key db = let sql = sprintf "DELETE FROM %s WHERE key = :key " name in - let stmt = Aux.bind_safely (prepare db sql) [Key.encode key, ":key"] in + let stmt = + Aux.bind_safely (prepare db sql) [ (Key.encode key, ":key") ] + in let rec aux () = match step stmt with - | Rc.DONE -> ignore (finalize stmt) - | Rc.BUSY | Rc.LOCKED -> Aux.yield (); aux () + | Rc.DONE -> ignore (finalize stmt : Rc.t) + | Rc.BUSY | Rc.LOCKED -> + Aux.yield (); + aux () | rc -> - ignore (finalize stmt); + ignore (finalize stmt : Rc.t); failwith (Rc.to_string rc) in aux () @@ -253,14 +290,16 @@ module Functorial = struct | Data.INT s -> Int64.to_int s | _ -> assert false in - ignore (finalize stmt); + ignore (finalize stmt : Rc.t); value | Rc.DONE -> - ignore (finalize stmt); + ignore (finalize stmt : Rc.t); raise Not_found - | Rc.BUSY | Rc.LOCKED -> Aux.yield (); aux () + | Rc.BUSY | Rc.LOCKED -> + Aux.yield (); + aux () | rc -> - ignore (finalize stmt); + ignore (finalize stmt : Rc.t); failwith (Rc.to_string rc) in aux () @@ -268,12 +307,12 @@ module Functorial = struct let db_iter ?gt ?geq ?lt ?leq table rowid db = let sql = sprintf - "SELECT key, value, ROWID FROM %s - WHERE ROWID > :rowid - AND coalesce (key > :gt, true) - AND coalesce (key >= :geq, true) - AND coalesce (key < :lt, true) - AND coalesce (key <= :leq, true)" + "SELECT key, value, ROWID FROM %s\n\ + \ WHERE ROWID > :rowid\n\ + \ AND coalesce (key > :gt, true)\n\ + \ AND coalesce (key >= :geq, true)\n\ + \ AND coalesce (key < :lt, true)\n\ + \ AND coalesce (key <= :leq, true)" table in let encode_key_opt = function @@ -286,26 +325,30 @@ module Functorial = struct and leq_sql = encode_key_opt leq in let stmt = Aux.bind_safely (prepare db sql) - [ Data.INT rowid, ":rowid" - ; gt_sql, ":gt" - ; geq_sql, ":geq" - ; lt_sql, ":lt" - ; leq_sql, ":leq" ] + [ + (Data.INT rowid, ":rowid"); + (gt_sql, ":gt"); + (geq_sql, ":geq"); + (lt_sql, ":lt"); + (leq_sql, ":leq"); + ] in let rec aux () = match step stmt with | Rc.ROW -> ( - match column stmt 0, column stmt 1, column stmt 2 with - | k, v, Data.INT rowid -> - ignore (finalize stmt); - Some (k, v, rowid) - | _ -> assert false) + match (column stmt 0, column stmt 1, column stmt 2) with + | k, v, Data.INT rowid -> + ignore (finalize stmt : Rc.t); + Some (k, v, rowid) + | _ -> assert false) | Rc.DONE -> - ignore (finalize stmt); + ignore (finalize stmt : Rc.t); None - | Rc.BUSY | Rc.LOCKED -> Aux.yield (); aux () + | Rc.BUSY | Rc.LOCKED -> + Aux.yield (); + aux () | rc -> - ignore (finalize stmt); + ignore (finalize stmt : Rc.t); failwith (Rc.to_string rc) in aux () @@ -315,7 +358,7 @@ module Functorial = struct let replace_if_exists k v = with_table @@ fun db -> - ignore (db_get k db); + ignore (db_get k db : value); db_replace k v db let remove key = with_table @@ db_remove key @@ -331,13 +374,13 @@ module Functorial = struct let i = ref 0L in let rec aux rowid beg = match count with - | Some c when !i >= c -> Lwt.return beg + | Some c when !i >= c -> beg | _ -> ( i := Int64.succ !i; - with_table (db_iter ?gt ?geq ?lt ?leq name rowid) >>= function - | None -> Lwt.return beg + match with_table (db_iter ?gt ?geq ?lt ?leq name rowid) with + | None -> beg | Some (k, v, rowid') -> - f (Key.decode k) (Value.decode v) beg >>= aux rowid') + (aux rowid') (f (Key.decode k) (Value.decode v) beg)) in aux Int64.zero beg @@ -350,12 +393,12 @@ module Functorial = struct let iter_block ?count ?gt ?geq ?lt ?leq f = let sql = sprintf - "SELECT key, value FROM %s - WHERE coalesce (key > :gt, true) - AND coalesce (key >= :geq, true) - AND coalesce (key < :lt, true) - AND coalesce (key <= :leq, true) - LIMIT coalesce (:count, -1)" + "SELECT key, value FROM %s\n\ + \ WHERE coalesce (key > :gt, true)\n\ + \ AND coalesce (key >= :geq, true)\n\ + \ AND coalesce (key < :lt, true)\n\ + \ AND coalesce (key <= :leq, true)\n\ + \ LIMIT coalesce (:count, -1)" name in let encode_key_opt = function @@ -372,21 +415,25 @@ module Functorial = struct let iter db = let stmt = Aux.bind_safely (prepare db sql) - [ gt_sql, ":gt" - ; geq_sql, ":geq" - ; lt_sql, ":lt" - ; leq_sql, ":leq" - ; count_sql, ":count" ] + [ + (gt_sql, ":gt"); + (geq_sql, ":geq"); + (lt_sql, ":lt"); + (leq_sql, ":leq"); + (count_sql, ":count"); + ] in let rec aux () = match step stmt with | Rc.ROW -> f (Key.decode @@ column stmt 0) (Value.decode @@ column stmt 1); aux () - | Rc.DONE -> ignore (finalize stmt) - | Rc.BUSY | Rc.LOCKED -> Aux.yield (); aux () + | Rc.DONE -> ignore (finalize stmt : Rc.t) + | Rc.BUSY | Rc.LOCKED -> + Aux.yield (); + aux () | rc -> - ignore (finalize stmt); + ignore (finalize stmt : Rc.t); failwith (Rc.to_string rc) in aux () @@ -396,12 +443,12 @@ module Functorial = struct let length () = with_table @@ db_length name module Variable = Ocsipersist_lib.Variable (struct - type k = key - type v = value + type k = key + type v = value - let find = find - let add = add - end) + let find = find + let add = add + end) end module Column = struct @@ -422,8 +469,8 @@ module Functorial = struct end module Marshal (C : sig - type t - end) : COLUMN with type t = C.t = struct + type t + end) : COLUMN with type t = C.t = struct type t = C.t let column_type = "blob" @@ -441,6 +488,10 @@ module Ref = Ocsipersist_lib.Ref (Store) type 'value table = 'value Polymorphic.table -let init () = - (* We check that we can access the database *) - Lwt_main.run (Aux.exec_safely (fun _ -> ())) +let init ~env = + Switch.run (fun sw -> + Aux.domain_mgr := Some env#domain_mgr; + Fiber.with_binding Ocsipersist_lib.current_switch sw (fun () -> + (* We check that we can access the database *) + (* TODO: lwt-to-direct-style: [Eio_main.run] argument used to be a [Lwt] promise and is now a [fun]. Make sure no asynchronous or IO calls are done outside of this [fun]. *) + Aux.exec_safely (fun _ -> ()))) diff --git a/src/sqlite/ocsipersist_config.ml b/src/sqlite/ocsipersist_config.ml index a3f70a4..37b638b 100644 --- a/src/sqlite/ocsipersist_config.ml +++ b/src/sqlite/ocsipersist_config.ml @@ -1,3 +1,5 @@ +open Eio.Std + let parse_global_config = function | [] -> None | [Xml.Element ("database", [("file", s)], [])] -> Some s @@ -7,11 +9,12 @@ let parse_global_config = function "Unexpected content inside Ocsipersist config") let init config = + let env = Option.get (Fiber.get Ocsigen_lib.env) in Ocsipersist_settings.db_file := Ocsigen_config.get_datadir () ^ "/ocsidb"; (match parse_global_config config with | None -> () | Some d -> Ocsipersist_settings.db_file := d); - try Ocsipersist.init () + try Ocsipersist.init ~env with e -> Ocsigen_messages.errlog (Printf.sprintf @@ -19,4 +22,4 @@ let init config = !Ocsipersist_settings.db_file); raise e -let _ = Ocsigen_extensions.register ~name:"ocsipersist" ~init_fun:init () +let () = Ocsigen_extensions.register ~name:"ocsipersist" ~init_fun:init () diff --git a/test/dune b/test/dune new file mode 100644 index 0000000..ba8030a --- /dev/null +++ b/test/dune @@ -0,0 +1,6 @@ +(subdir + sqlite + (cram + (package ocsipersist-sqlite) + (deps + (package ocsipersist-sqlite)))) diff --git a/test/sqlite/sqlite.t/dune b/test/sqlite/sqlite.t/dune new file mode 100644 index 0000000..ef58579 --- /dev/null +++ b/test/sqlite/sqlite.t/dune @@ -0,0 +1,3 @@ +(executable + (name test) + (libraries ocsipersist ocsipersist-sqlite)) diff --git a/test/sqlite/sqlite.t/dune-project b/test/sqlite/sqlite.t/dune-project new file mode 100644 index 0000000..c2e4660 --- /dev/null +++ b/test/sqlite/sqlite.t/dune-project @@ -0,0 +1 @@ +(lang dune 2.8) diff --git a/test/sqlite/sqlite.t/run.t b/test/sqlite/sqlite.t/run.t new file mode 100644 index 0000000..03f01b8 --- /dev/null +++ b/test/sqlite/sqlite.t/run.t @@ -0,0 +1,8 @@ +This program will print an incremented number each time: + + $ dune exec -- ./test.exe + 1 + $ dune exec -- ./test.exe + 2 + $ dune exec -- ./test.exe + 3 diff --git a/test/sqlite/sqlite.t/test.ml b/test/sqlite/sqlite.t/test.ml new file mode 100644 index 0000000..3440e36 --- /dev/null +++ b/test/sqlite/sqlite.t/test.ml @@ -0,0 +1,13 @@ +let r = Ocsipersist.Ref.ref ~persistent:"r" 0 + +let main () = + let v = Ocsipersist.Ref.get r in + let v = v + 1 in + let () = Ocsipersist.Ref.set r v in + Printf.printf "%d\n%!" v + +let () = + Eio_main.run (fun env -> + Ocsipersist.init ~env; + main () + )