Skip to content

Commit

Permalink
Merge pull request #5309 from Vincent-lau/private/shul2/multi-lwt
Browse files Browse the repository at this point in the history
CP-47033: Allow `protocol_lwt` server to handle multiple requests at the same time
  • Loading branch information
robhoes authored Jan 9, 2024
2 parents 999c263 + 1221764 commit 18212e6
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 15 deletions.
3 changes: 2 additions & 1 deletion ocaml/message-switch/async/protocol_async.ml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ module M = struct

let map f t = Deferred.map ~f t

let iter f t = Deferred.List.iter t ~f
let iter_dontwait f t =
Deferred.don't_wait_for @@ Deferred.List.iter ~how:`Parallel t ~f

let any = Deferred.any

Expand Down
39 changes: 27 additions & 12 deletions ocaml/message-switch/core/make.ml
Original file line number Diff line number Diff line change
Expand Up @@ -352,33 +352,42 @@ functor

let listen ~process ~switch:port ~queue:name () =
let token = Printf.sprintf "%d" (Unix.getpid ()) in
M.connect port >>= fun c ->
Connection.rpc c (In.Login token) >>|= fun (_ : string) ->
Connection.rpc c (In.CreatePersistent name) >>|= fun _ ->
let reconnect () =
M.connect port >>= fun request_conn ->
Connection.rpc request_conn (In.Login token) >>|= fun (_ : string) ->
M.connect port >>= fun reply_conn ->
Connection.rpc reply_conn (In.Login token) >>|= fun (_ : string) ->
return (Ok (request_conn, reply_conn))
in
reconnect () >>|= fun ((request_conn, reply_conn) as c) ->
let request_shutdown = M.Ivar.create () in
let on_shutdown = M.Ivar.create () in
let mutex = M.Mutex.create () in
Connection.rpc request_conn (In.CreatePersistent name) >>|= fun _ ->
let t = {request_shutdown; on_shutdown} in
let rec loop c from =
let transfer = {In.from; timeout; queues= [name]} in
let frame = In.Transfer transfer in
let message = Connection.rpc c frame in
let message = Connection.rpc request_conn frame in
any [map (fun _ -> ()) message; M.Ivar.read request_shutdown]
>>= fun () ->
if is_determined (M.Ivar.read request_shutdown) then (
M.Ivar.fill on_shutdown () ; return (Ok ())
) else
message >>= function
| Error _e ->
M.connect port >>= fun c ->
Connection.rpc c (In.Login token) >>|= fun (_ : string) ->
loop c from
M.Mutex.with_lock mutex (fun () ->
M.disconnect request_conn >>= fun () ->
M.disconnect reply_conn >>= fun () -> reconnect ()
)
>>|= fun c -> loop c from
| Ok raw -> (
let transfer = Out.transfer_of_rpc (Jsonrpc.of_string raw) in
match transfer.Out.messages with
| [] ->
loop c from
| _ :: _ ->
iter
iter_dontwait
(fun (i, m) ->
process m.Message.payload >>= fun response ->
( match m.Message.kind with
Expand All @@ -394,14 +403,20 @@ functor
}
)
in
Connection.rpc c request >>= fun _ -> return ()
M.Mutex.with_lock mutex (fun () ->
Connection.rpc reply_conn request
)
>>= fun _ -> return ()
)
>>= fun () ->
let request = In.Ack i in
Connection.rpc c request >>= fun _ -> return ()
M.Mutex.with_lock mutex (fun () ->
Connection.rpc reply_conn request
)
>>= fun _ -> return ()
)
transfer.Out.messages
>>= fun () -> loop c (Some transfer.Out.next)
transfer.Out.messages ;
loop c (Some transfer.Out.next)
)
in
let _ = loop c None in
Expand Down
2 changes: 1 addition & 1 deletion ocaml/message-switch/core/s.ml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ module type BACKEND = sig

val map : ('a -> 'b) -> 'a t -> 'b t

val iter : ('a -> unit t) -> 'a list -> unit t
val iter_dontwait : ('a -> unit t) -> 'a list -> unit

val any : 'a t list -> 'a t

Expand Down
2 changes: 1 addition & 1 deletion ocaml/message-switch/lwt/protocol_lwt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ module M = struct

let map = Lwt.map

let iter = Lwt_list.iter_s
let iter_dontwait f lst = Lwt.async (fun () -> Lwt_list.iter_p f lst)

let any = Lwt.choose

Expand Down
7 changes: 7 additions & 0 deletions ocaml/message-switch/switch/switch_main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ module Lwt_result = struct
let ( >>= ) m f = m >>= fun x -> f (Stdlib.Result.get_ok x)
end

let exn_hook e =
let bt = Printexc.get_raw_backtrace () in
error "Caught exception in Lwt.async: %s" (Printexc.to_string e) ;
error "backtrace: %s" (Printexc.raw_backtrace_to_string bt)

let () = Lwt.async_exception_hook := exn_hook

let make_server config trace_config =
let open Config in
info "Started server on %s" config.path ;
Expand Down

0 comments on commit 18212e6

Please sign in to comment.