From fce2b8b4b22d8f761456e6d1b66ad96832e05439 Mon Sep 17 00:00:00 2001 From: Vincent Liu Date: Wed, 24 Jan 2024 17:21:12 +0000 Subject: [PATCH 1/2] Revert "Add mutex for concurrent processing of messages" This reverts commit 1221764db2d37524f44fde255b25af9779a4b756. Signed-off-by: Vincent Liu --- ocaml/message-switch/core/make.ml | 33 +++++++++---------------------- 1 file changed, 9 insertions(+), 24 deletions(-) diff --git a/ocaml/message-switch/core/make.ml b/ocaml/message-switch/core/make.ml index e803e1fd2f5..6d1a94cba39 100644 --- a/ocaml/message-switch/core/make.ml +++ b/ocaml/message-switch/core/make.ml @@ -352,23 +352,16 @@ functor let listen ~process ~switch:port ~queue:name () = let token = Printf.sprintf "%d" (Unix.getpid ()) in - let reconnect () = - M.connect port >>= fun request_conn -> - Connection.rpc request_conn (In.Login token) >>|= fun (_ : string) -> - M.connect port >>= fun reply_conn -> - Connection.rpc reply_conn (In.Login token) >>|= fun (_ : string) -> - return (Ok (request_conn, reply_conn)) - in - reconnect () >>|= fun ((request_conn, reply_conn) as c) -> + M.connect port >>= fun c -> + Connection.rpc c (In.Login token) >>|= fun (_ : string) -> + Connection.rpc c (In.CreatePersistent name) >>|= fun _ -> let request_shutdown = M.Ivar.create () in let on_shutdown = M.Ivar.create () in - let mutex = M.Mutex.create () in - Connection.rpc request_conn (In.CreatePersistent name) >>|= fun _ -> let t = {request_shutdown; on_shutdown} in let rec loop c from = let transfer = {In.from; timeout; queues= [name]} in let frame = In.Transfer transfer in - let message = Connection.rpc request_conn frame in + let message = Connection.rpc c frame in any [map (fun _ -> ()) message; M.Ivar.read request_shutdown] >>= fun () -> if is_determined (M.Ivar.read request_shutdown) then ( @@ -376,11 +369,9 @@ functor ) else message >>= function | Error _e -> - M.Mutex.with_lock mutex (fun () -> - M.disconnect request_conn >>= fun () -> - M.disconnect reply_conn >>= fun () -> reconnect () - ) - >>|= fun c -> loop c from + M.connect port >>= fun c -> + Connection.rpc c (In.Login token) >>|= fun (_ : string) -> + loop c from | Ok raw -> ( let transfer = Out.transfer_of_rpc (Jsonrpc.of_string raw) in match transfer.Out.messages with @@ -403,17 +394,11 @@ functor } ) in - M.Mutex.with_lock mutex (fun () -> - Connection.rpc reply_conn request - ) - >>= fun _ -> return () + Connection.rpc c request >>= fun _ -> return () ) >>= fun () -> let request = In.Ack i in - M.Mutex.with_lock mutex (fun () -> - Connection.rpc reply_conn request - ) - >>= fun _ -> return () + Connection.rpc c request >>= fun _ -> return () ) transfer.Out.messages ; loop c (Some transfer.Out.next) From 3e84ef264b45e326603532ff7b4f00f9b909f633 Mon Sep 17 00:00:00 2001 From: Vincent Liu Date: Wed, 24 Jan 2024 17:21:29 +0000 Subject: [PATCH 2/2] CA-388064: Revert "Protocol_{lwt,async}: process requests concurrently" This reverts commit e9923e366112f1513f5dc864a0ce48e0c3dc79f9. Bugs seen in timeout of unplugging 2TB VBD. Reverting this concurrent processing until this is tested more extensively. Signed-off-by: Vincent Liu --- ocaml/message-switch/async/protocol_async.ml | 3 +-- ocaml/message-switch/core/make.ml | 6 +++--- ocaml/message-switch/core/s.ml | 2 +- ocaml/message-switch/lwt/protocol_lwt.ml | 2 +- ocaml/message-switch/switch/switch_main.ml | 7 ------- 5 files changed, 6 insertions(+), 14 deletions(-) diff --git a/ocaml/message-switch/async/protocol_async.ml b/ocaml/message-switch/async/protocol_async.ml index ac0f2e0dfb3..9ec9cc42b74 100644 --- a/ocaml/message-switch/async/protocol_async.ml +++ b/ocaml/message-switch/async/protocol_async.ml @@ -28,8 +28,7 @@ module M = struct let map f t = Deferred.map ~f t - let iter_dontwait f t = - Deferred.don't_wait_for @@ Deferred.List.iter ~how:`Parallel t ~f + let iter f t = Deferred.List.iter t ~f let any = Deferred.any diff --git a/ocaml/message-switch/core/make.ml b/ocaml/message-switch/core/make.ml index 6d1a94cba39..54e8904e1a9 100644 --- a/ocaml/message-switch/core/make.ml +++ b/ocaml/message-switch/core/make.ml @@ -378,7 +378,7 @@ functor | [] -> loop c from | _ :: _ -> - iter_dontwait + iter (fun (i, m) -> process m.Message.payload >>= fun response -> ( match m.Message.kind with @@ -400,8 +400,8 @@ functor let request = In.Ack i in Connection.rpc c request >>= fun _ -> return () ) - transfer.Out.messages ; - loop c (Some transfer.Out.next) + transfer.Out.messages + >>= fun () -> loop c (Some transfer.Out.next) ) in let _ = loop c None in diff --git a/ocaml/message-switch/core/s.ml b/ocaml/message-switch/core/s.ml index f7acf915c5d..f99e0582687 100644 --- a/ocaml/message-switch/core/s.ml +++ b/ocaml/message-switch/core/s.ml @@ -27,7 +27,7 @@ module type BACKEND = sig val map : ('a -> 'b) -> 'a t -> 'b t - val iter_dontwait : ('a -> unit t) -> 'a list -> unit + val iter : ('a -> unit t) -> 'a list -> unit t val any : 'a t list -> 'a t diff --git a/ocaml/message-switch/lwt/protocol_lwt.ml b/ocaml/message-switch/lwt/protocol_lwt.ml index 70f9ab6df52..6da59eb3212 100644 --- a/ocaml/message-switch/lwt/protocol_lwt.ml +++ b/ocaml/message-switch/lwt/protocol_lwt.ml @@ -25,7 +25,7 @@ module M = struct let map = Lwt.map - let iter_dontwait f lst = Lwt.async (fun () -> Lwt_list.iter_p f lst) + let iter = Lwt_list.iter_s let any = Lwt.choose diff --git a/ocaml/message-switch/switch/switch_main.ml b/ocaml/message-switch/switch/switch_main.ml index 583baf6e594..9bf78973a85 100644 --- a/ocaml/message-switch/switch/switch_main.ml +++ b/ocaml/message-switch/switch/switch_main.ml @@ -75,13 +75,6 @@ module Lwt_result = struct let ( >>= ) m f = m >>= fun x -> f (Stdlib.Result.get_ok x) end -let exn_hook e = - let bt = Printexc.get_raw_backtrace () in - error "Caught exception in Lwt.async: %s" (Printexc.to_string e) ; - error "backtrace: %s" (Printexc.raw_backtrace_to_string bt) - -let () = Lwt.async_exception_hook := exn_hook - let make_server config trace_config = let open Config in info "Started server on %s" config.path ;