Skip to content

Commit

Permalink
Merge pull request #4851 from robhoes/event-logout
Browse files Browse the repository at this point in the history
Eliminate unnecessarily scary log lines
  • Loading branch information
robhoes authored Nov 18, 2022
2 parents 34ec94f + 65b6012 commit 8f4c4e5
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 23 deletions.
26 changes: 23 additions & 3 deletions ocaml/libs/log/debug.ml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ let get_thread_id () = try Thread.id (Thread.self ()) with _ -> -1
module IntMap = Map.Make (Int)

module ThreadLocalTable = struct
type 'a t = {mutable tbl: 'a IntMap.t; m: Mutex.t}
(* The map values behave like stacks here, with shadowing as in Hashtbl.
A Hashtbl is not used here, in order to avoid taking the lock in `find`. *)
type 'a t = {mutable tbl: 'a list IntMap.t; m: Mutex.t}

let make () =
let tbl = IntMap.empty in
Expand All @@ -36,15 +38,33 @@ module ThreadLocalTable = struct

let add t v =
let id = get_thread_id () in
Mutex.execute t.m (fun () -> t.tbl <- IntMap.add id v t.tbl)
Mutex.execute t.m (fun () ->
t.tbl <-
IntMap.update id
(function Some v' -> Some (v :: v') | None -> Some [v])
t.tbl
)

let remove t =
let id = get_thread_id () in
Mutex.execute t.m (fun () -> t.tbl <- IntMap.remove id t.tbl)
Mutex.execute t.m (fun () ->
t.tbl <-
IntMap.update id
(function
| Some [_] ->
None
| Some (_hd :: tl) ->
Some tl
| Some [] | None ->
None
)
t.tbl
)

let find t =
let id = get_thread_id () in
IntMap.find_opt id t.tbl
|> Option.fold ~none:None ~some:(function v :: _ -> Some v | [] -> None)
end

type task = {desc: string; client: string option}
Expand Down
53 changes: 33 additions & 20 deletions ocaml/xapi/xapi_xenops.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3207,7 +3207,9 @@ end
(* XXX: PR-1255: we also want to only listen for events on VMs and fields we care about *)
let events_from_xapi () =
let open Event_types in
Server_helpers.exec_with_new_task "xapi events" (fun __context ->
Server_helpers.exec_with_new_task ~task_in_database:true "xapi events"
(fun __context ->
let task = Context.get_task_id __context |> Ref.string_of in
let localhost = Helpers.get_localhost ~__context in
let token = ref "" in
let stop = ref false in
Expand All @@ -3216,19 +3218,18 @@ let events_from_xapi () =
Helpers.call_api_functions ~__context (fun rpc session_id ->
(trigger_xenapi_reregister :=
fun () ->
(* This causes Event.next () and Event.from () to return SESSION_INVALID *)
debug
"triggering xapi event thread to re-register via event \
injection" ;
try
debug
"triggering xapi event thread to re-register via \
session.logout" ;
XenAPI.Session.logout ~rpc ~session_id
with
| Api_errors.Server_error (code, _)
when code = Api_errors.session_invalid ->
debug "Event thread has already woken up"
| e ->
error "Waking up the xapi event thread: %s"
(string_of_exn e)
let _ =
XenAPI.Event.inject ~rpc ~session_id ~_class:"task"
~_ref:task
in
()
with e ->
error "Waking up the xapi event thread: %s"
(string_of_exn e)
) ;
(* We register for events on resident_VMs only *)
let resident_VMs =
Expand All @@ -3251,13 +3252,15 @@ let events_from_xapi () =
error "events_from_xapi: extra items in the cache: [ %s ]"
(String.concat "; " (StringSet.elements extra_in_cache)) ;
let classes =
List.map
(fun x -> Printf.sprintf "VM/%s" (Ref.string_of x))
resident_VMs
Printf.sprintf "task/%s" task
:: List.map
(fun x -> Printf.sprintf "VM/%s" (Ref.string_of x))
resident_VMs
in
(* NB we re-use the old token so we don't get events we've already
received BUT we will not necessarily receive events for the new VMs *)
while true do
received BUT we will not necessarily receive events for the new VMs *)
let reregister = ref false in
while not !reregister do
let api_timeout = 60. in
let timeout =
30.
Expand Down Expand Up @@ -3307,6 +3310,11 @@ let events_from_xapi () =
raise e
)
)
| {ty= "task"; reference= t; _} when t = task ->
debug
"Woken event thread: updating list of event \
subscriptions" ;
reregister := true
| _ ->
warn
"Received event for something we didn't register for!"
Expand All @@ -3319,15 +3327,20 @@ let events_from_xapi () =
with
| Api_errors.Server_error (code, _)
when code = Api_errors.session_invalid ->
debug "Woken event thread: updating list of event subscriptions"
debug
"Caught SESSION_INVALID listening to events from xapi. \
Restarting thread immediately."
| Api_errors.Server_error (code, _)
when code = Api_errors.xen_incompatible ->
warn
"Stopping events-from-xapi thread due to Xen/libxenctrl \
incompatibility" ;
stop := true
| e ->
debug "Caught %s listening to events from xapi" (string_of_exn e) ;
debug
"Caught %s listening to events from xapi. Restarting thread \
after 15 seconds."
(string_of_exn e) ;
(* Start from scratch *)
token := "" ;
Thread.delay 15.
Expand Down

0 comments on commit 8f4c4e5

Please sign in to comment.