diff --git a/ocaml/message-switch/async/protocol_async.ml b/ocaml/message-switch/async/protocol_async.ml index ac0f2e0dfb3..9ec9cc42b74 100644 --- a/ocaml/message-switch/async/protocol_async.ml +++ b/ocaml/message-switch/async/protocol_async.ml @@ -28,8 +28,7 @@ module M = struct let map f t = Deferred.map ~f t - let iter_dontwait f t = - Deferred.don't_wait_for @@ Deferred.List.iter ~how:`Parallel t ~f + let iter f t = Deferred.List.iter t ~f let any = Deferred.any diff --git a/ocaml/message-switch/core/make.ml b/ocaml/message-switch/core/make.ml index e803e1fd2f5..54e8904e1a9 100644 --- a/ocaml/message-switch/core/make.ml +++ b/ocaml/message-switch/core/make.ml @@ -352,23 +352,16 @@ functor let listen ~process ~switch:port ~queue:name () = let token = Printf.sprintf "%d" (Unix.getpid ()) in - let reconnect () = - M.connect port >>= fun request_conn -> - Connection.rpc request_conn (In.Login token) >>|= fun (_ : string) -> - M.connect port >>= fun reply_conn -> - Connection.rpc reply_conn (In.Login token) >>|= fun (_ : string) -> - return (Ok (request_conn, reply_conn)) - in - reconnect () >>|= fun ((request_conn, reply_conn) as c) -> + M.connect port >>= fun c -> + Connection.rpc c (In.Login token) >>|= fun (_ : string) -> + Connection.rpc c (In.CreatePersistent name) >>|= fun _ -> let request_shutdown = M.Ivar.create () in let on_shutdown = M.Ivar.create () in - let mutex = M.Mutex.create () in - Connection.rpc request_conn (In.CreatePersistent name) >>|= fun _ -> let t = {request_shutdown; on_shutdown} in let rec loop c from = let transfer = {In.from; timeout; queues= [name]} in let frame = In.Transfer transfer in - let message = Connection.rpc request_conn frame in + let message = Connection.rpc c frame in any [map (fun _ -> ()) message; M.Ivar.read request_shutdown] >>= fun () -> if is_determined (M.Ivar.read request_shutdown) then ( @@ -376,18 +369,16 @@ functor ) else message >>= function | Error _e -> - M.Mutex.with_lock mutex (fun () -> - M.disconnect request_conn >>= fun () -> - M.disconnect reply_conn >>= fun () -> reconnect () - ) - >>|= fun c -> loop c from + M.connect port >>= fun c -> + Connection.rpc c (In.Login token) >>|= fun (_ : string) -> + loop c from | Ok raw -> ( let transfer = Out.transfer_of_rpc (Jsonrpc.of_string raw) in match transfer.Out.messages with | [] -> loop c from | _ :: _ -> - iter_dontwait + iter (fun (i, m) -> process m.Message.payload >>= fun response -> ( match m.Message.kind with @@ -403,20 +394,14 @@ functor } ) in - M.Mutex.with_lock mutex (fun () -> - Connection.rpc reply_conn request - ) - >>= fun _ -> return () + Connection.rpc c request >>= fun _ -> return () ) >>= fun () -> let request = In.Ack i in - M.Mutex.with_lock mutex (fun () -> - Connection.rpc reply_conn request - ) - >>= fun _ -> return () + Connection.rpc c request >>= fun _ -> return () ) - transfer.Out.messages ; - loop c (Some transfer.Out.next) + transfer.Out.messages + >>= fun () -> loop c (Some transfer.Out.next) ) in let _ = loop c None in diff --git a/ocaml/message-switch/core/s.ml b/ocaml/message-switch/core/s.ml index f7acf915c5d..f99e0582687 100644 --- a/ocaml/message-switch/core/s.ml +++ b/ocaml/message-switch/core/s.ml @@ -27,7 +27,7 @@ module type BACKEND = sig val map : ('a -> 'b) -> 'a t -> 'b t - val iter_dontwait : ('a -> unit t) -> 'a list -> unit + val iter : ('a -> unit t) -> 'a list -> unit t val any : 'a t list -> 'a t diff --git a/ocaml/message-switch/lwt/protocol_lwt.ml b/ocaml/message-switch/lwt/protocol_lwt.ml index 70f9ab6df52..6da59eb3212 100644 --- a/ocaml/message-switch/lwt/protocol_lwt.ml +++ b/ocaml/message-switch/lwt/protocol_lwt.ml @@ -25,7 +25,7 @@ module M = struct let map = Lwt.map - let iter_dontwait f lst = Lwt.async (fun () -> Lwt_list.iter_p f lst) + let iter = Lwt_list.iter_s let any = Lwt.choose diff --git a/ocaml/message-switch/switch/switch_main.ml b/ocaml/message-switch/switch/switch_main.ml index 583baf6e594..9bf78973a85 100644 --- a/ocaml/message-switch/switch/switch_main.ml +++ b/ocaml/message-switch/switch/switch_main.ml @@ -75,13 +75,6 @@ module Lwt_result = struct let ( >>= ) m f = m >>= fun x -> f (Stdlib.Result.get_ok x) end -let exn_hook e = - let bt = Printexc.get_raw_backtrace () in - error "Caught exception in Lwt.async: %s" (Printexc.to_string e) ; - error "backtrace: %s" (Printexc.raw_backtrace_to_string bt) - -let () = Lwt.async_exception_hook := exn_hook - let make_server config trace_config = let open Config in info "Started server on %s" config.path ;