diff --git a/ocaml/xapi/xapi_vm_snapshot.ml b/ocaml/xapi/xapi_vm_snapshot.ml index 9790d2725f2..b533aef5da4 100644 --- a/ocaml/xapi/xapi_vm_snapshot.ml +++ b/ocaml/xapi/xapi_vm_snapshot.ml @@ -104,23 +104,15 @@ let checkpoint ~__context ~vm ~new_name = in (* Check if SR has snapshot feature *) let sr_has_snapshot_feature sr = - if - not - Smint.( - has_capability Vdi_snapshot - (Xapi_sr_operations.features_of_sr ~__context sr) - ) - then - false - else - true + Smint.has_capability Vdi_snapshot + (Xapi_sr_operations.features_of_sr ~__context sr) in List.iter (fun sr -> if not (sr_has_snapshot_feature sr) then raise - (Api_errors.Server_error - (Api_errors.sr_operation_not_supported, [Ref.string_of vm]) + Api_errors.( + Server_error (sr_operation_not_supported, [Ref.string_of vm]) ) ) sr_records ; @@ -211,8 +203,8 @@ let safe_destroy_vusb ~__context ~rpc ~session_id vusb = if Db.is_valid_ref __context vusb then Client.VUSB.destroy ~rpc ~session_id ~self:vusb -(* Copy the VBDs and VIFs from a source VM to a dest VM and then delete the old disks. *) -(* This operation destroys the data of the dest VM. *) +(* Copy the VBDs and VIFs from a source VM to a dest VM and then delete the old + disks. This operation destroys the data of the dest VM. *) let update_vifs_vbds_vgpus_and_vusbs ~__context ~snapshot ~vm = let snap_VBDs = Db.VM.get_VBDs ~__context ~self:snapshot in let snap_VBDs_disk, snap_VBDs_CD = @@ -240,7 +232,7 @@ let update_vifs_vbds_vgpus_and_vusbs ~__context ~snapshot ~vm = List.map (fun vbd -> Db.VBD.get_VDI ~__context ~self:vbd) vm_VBDs_disk in (* Filter out VM disks for which the snapshot does not have a corresponding - * disk - these disks will be left unattached after the revert is complete. *) + disk - these disks will be left unattached after the revert is complete. *) let vm_disks_with_snapshot = List.filter (fun vdi -> List.mem vdi snap_disks_snapshot_of) vm_disks in @@ -271,10 +263,10 @@ let update_vifs_vbds_vgpus_and_vusbs ~__context ~snapshot ~vm = List.iter2 (fun snap_disk (_, cloned_disk, _) -> (* For each snapshot disk which was just cloned: - * 1) Find the value of snapshot_of - * 2) Find all snapshots with the same snapshot_of - * 3) Update each of these snapshots so that their snapshot_of points - * to the new cloned disk. *) + 1) Find the value of snapshot_of + 2) Find all snapshots with the same snapshot_of + 3) Update each of these snapshots so that their snapshot_of points + to the new cloned disk. *) let open Db_filter_types in let snapshot_of = Db.VDI.get_snapshot_of ~__context ~self:snap_disk in let all_snaps_in_tree = diff --git a/ocaml/xapi/xapi_xenops.ml b/ocaml/xapi/xapi_xenops.ml index 2c776a887cd..9e8f35c4b98 100644 --- a/ocaml/xapi/xapi_xenops.ml +++ b/ocaml/xapi/xapi_xenops.ml @@ -1122,10 +1122,10 @@ module MD = struct let open Vm in let scheduler_params = (* vcpu <-> pcpu affinity settings are stored here. - Format is either: - 1,2,3 :: all vCPUs receive this mask - 1,2,3; 4,5,6 :: vCPU n receives mask n. Unlisted vCPUs - receive first mask *) + Format is either: + 1,2,3 :: all vCPUs receive this mask + 1,2,3; 4,5,6 :: vCPU n receives mask n. Unlisted vCPUs + receive first mask *) let affinity = try List.map @@ -1811,21 +1811,19 @@ module Events_from_xenopsd = struct let module Client = (val make_client queue_name : XENOPS) in Client.UPDATES.remove_barrier dbg id ; let t = - with_lock active_m (fun () -> - if not (Hashtbl.mem active id) then ( - warn "Events_from_xenopsd.wakeup: unknown id %d" id ; - None - ) else - let t = Hashtbl.find active id in - Hashtbl.remove active id ; Some t - ) + with_lock active_m @@ fun () -> + if not (Hashtbl.mem active id) then ( + warn "Events_from_xenopsd.wakeup: unknown id %d" id ; + None + ) else + let t = Hashtbl.find active id in + Hashtbl.remove active id ; Some t in Option.iter (fun t -> - with_lock t.m (fun () -> - t.finished <- true ; - Condition.signal t.c - ) + with_lock t.m @@ fun () -> + t.finished <- true ; + Condition.signal t.c ) t @@ -1892,12 +1890,7 @@ let update_vm ~__context id = a <> b in (* Helpers to create and update guest metrics when needed *) - let lookup state key = - if List.mem_assoc key state.Vm.guest_agent then - Some (List.assoc key state.Vm.guest_agent) - else - None - in + let lookup state key = List.assoc_opt key state.Vm.guest_agent in let list state dir = let dir = if dir.[0] = '/' then @@ -3923,58 +3916,54 @@ let resume ~__context ~self ~start_paused ~force:_ = let dbg = Context.string_of_task_and_tracing __context in let queue_name = queue_of_vm ~__context ~self in let vm_id = id_of_vm ~__context ~self in - transform_xenops_exn ~__context ~vm:self queue_name (fun () -> - maybe_refresh_vm ~__context ~self ; - let vdi = Db.VM.get_suspend_VDI ~__context ~self in - if vdi = Ref.null then ( - info "VM suspend VDI not found; Performing VM hard_shutdown" ; - Xapi_vm_lifecycle.force_state_reset ~__context ~self ~value:`Halted ; - raise - Api_errors.( - Server_error (vm_has_no_suspend_vdi, ["VM"; Ref.string_of self]) - ) - ) ; - let d = disk_of_vdi ~__context ~self:vdi |> Option.get in - let module Client = (val make_client queue_name : XENOPS) in - (* NB we don't set resident_on because we don't want to - modify the VM.power_state, {VBD,VIF}.currently_attached in the - failures cases. This means we must remove the metadata from - xenopsd on failure. *) - ( try - Events_from_xenopsd.with_suppressed queue_name dbg vm_id (fun () -> - debug "Sending VM %s configuration to xenopsd" (Ref.string_of self) ; - let id = Xenopsd_metadata.push ~__context ~self in - Xapi_network.with_networks_attached_for_vm ~__context ~vm:self - (fun () -> - info "xenops: VM.resume %s from %s" id - (d |> rpc_of disk |> Jsonrpc.to_string) ; - Client.VM.resume dbg id d - |> sync_with_task __context ~cancellable:false queue_name ; - if not start_paused then ( - info "xenops: VM.unpause %s" id ; - Client.VM.unpause dbg id - |> sync_with_task __context ~cancellable:false queue_name - ) - ) - ) - with e -> - error "Caught exception resuming VM: %s" (string_of_exn e) ; - let id = id_of_vm ~__context ~self in - Xenopsd_metadata.delete ~__context id ; - raise e - ) ; - set_resident_on ~__context ~self ; - Db.VM.set_suspend_VDI ~__context ~self ~value:Ref.null ; - (* Clearing vGPU metadata should happen as late as possible - * to make sure we only do it on a successful resume - *) - Xapi_gpumon.clear_vgpu_metadata ~__context ~vm:self ; - Helpers.call_api_functions ~__context (fun rpc session_id -> - XenAPI.VDI.destroy ~rpc ~session_id ~self:vdi - ) ; - check_power_state_is ~__context ~self - ~expected:(if start_paused then `Paused else `Running) - ) + transform_xenops_exn ~__context ~vm:self queue_name @@ fun () -> + maybe_refresh_vm ~__context ~self ; + let vdi = Db.VM.get_suspend_VDI ~__context ~self in + if vdi = Ref.null then ( + info "VM suspend VDI not found; Performing VM hard_shutdown" ; + Xapi_vm_lifecycle.force_state_reset ~__context ~self ~value:`Halted ; + let err_content = ["VM"; Ref.string_of self] in + raise Api_errors.(Server_error (vm_has_no_suspend_vdi, err_content)) + ) ; + let d = disk_of_vdi ~__context ~self:vdi |> Option.get in + let module Client = (val make_client queue_name : XENOPS) in + (* NB we don't set resident_on because we don't want to + modify the VM.power_state, {VBD,VIF}.currently_attached in the + failures cases. This means we must remove the metadata from + xenopsd on failure. *) + ( try + Events_from_xenopsd.with_suppressed queue_name dbg vm_id @@ fun () -> + debug "Sending VM %s configuration to xenopsd" (Ref.string_of self) ; + let id = Xenopsd_metadata.push ~__context ~self in + + Xapi_network.with_networks_attached_for_vm ~__context ~vm:self + @@ fun () -> + info "%s: VM.resume %s from %s" __FUNCTION__ id + (d |> rpc_of disk |> Jsonrpc.to_string) ; + Client.VM.resume dbg id d + |> sync_with_task __context ~cancellable:false queue_name ; + if not start_paused then ( + info "%s: VM.unpause %s" __FUNCTION__ id ; + Client.VM.unpause dbg id + |> sync_with_task __context ~cancellable:false queue_name + ) + with e -> + error "Caught exception resuming VM: %s" (string_of_exn e) ; + let id = id_of_vm ~__context ~self in + Xenopsd_metadata.delete ~__context id ; + raise e + ) ; + set_resident_on ~__context ~self ; + Db.VM.set_suspend_VDI ~__context ~self ~value:Ref.null ; + (* Clearing vGPU metadata should happen as late as possible + * to make sure we only do it on a successful resume + *) + Xapi_gpumon.clear_vgpu_metadata ~__context ~vm:self ; + Helpers.call_api_functions ~__context (fun rpc session_id -> + XenAPI.VDI.destroy ~rpc ~session_id ~self:vdi + ) ; + check_power_state_is ~__context ~self + ~expected:(if start_paused then `Paused else `Running) let s3suspend ~__context ~self = let queue_name = queue_of_vm ~__context ~self in diff --git a/ocaml/xenopsd/lib/xenops_server.ml b/ocaml/xenopsd/lib/xenops_server.ml index 6887a13684b..85d58a84a6d 100644 --- a/ocaml/xenopsd/lib/xenops_server.ml +++ b/ocaml/xenopsd/lib/xenops_server.ml @@ -760,11 +760,7 @@ module VUSB_DB = struct remove id end -module StringMap = Map.Make (struct - type t = string - - let compare = compare -end) +module StringMap = Map.Make (String) let push_with_coalesce should_keep item queue = (* [filter_with_memory p xs] returns elements [x \in xs] where [p (x_i, @@ -2523,663 +2519,646 @@ and perform_exn ?subtask ?result (op : operation) (t : Xenops_task.task_handle) : unit = let module B = (val get_backend () : S) in with_tracing ~name:(name_of_operation op) ~task:t @@ fun () -> - let one = function - | VM_start (id, force) -> ( - debug "VM.start %s (force=%b)" id force ; - let power = (B.VM.get_state (VM_DB.read_exn id)).Vm.power_state in - match power with - | Running -> - info "VM %s is already running" id - | _ -> - perform_atomics (atomics_of_operation op) t ; - VM_DB.signal id - ) - | VM_poweroff (id, _timeout) -> - debug "VM.poweroff %s" id ; - perform_atomics (atomics_of_operation op) t ; - VM_DB.signal id - | VM_reboot (id, _timeout) -> - debug "VM.reboot %s" id ; - rebooting id (fun () -> perform_atomics (atomics_of_operation op) t) ; - VM_DB.signal id - | VM_shutdown (id, _timeout) -> - debug "VM.shutdown %s" id ; - perform_atomics (atomics_of_operation op) t ; - VM_DB.signal id - | VM_suspend (id, _data) -> - debug "VM.suspend %s" id ; - perform_atomics (atomics_of_operation op) t ; - VM_DB.signal id - | VM_restore_vifs id -> - debug "VM_restore_vifs %s" id ; - perform_atomics (atomics_of_operation op) t - | VM_restore_devices (id, restore_vifs) -> - (* XXX: this is delayed due to the 'attach'/'activate' behaviour *) - debug "VM_restore_devices %s %b" id restore_vifs ; - perform_atomics (atomics_of_operation op) t - | VM_resume (id, _data) -> - debug "VM.resume %s" id ; - perform_atomics (atomics_of_operation op) t ; - VM_DB.signal id - | VBD_hotplug id -> - debug "VBD_hotplug %s.%s" (fst id) (snd id) ; - perform_atomics (atomics_of_operation op) t - | VBD_hotunplug (id, force) -> - debug "VBD_hotplug %s.%s %b" (fst id) (snd id) force ; - perform_atomics (atomics_of_operation op) t - | VIF_hotplug id -> - debug "VIF_hotplug %s.%s" (fst id) (snd id) ; - perform_atomics (atomics_of_operation op) t - | VIF_hotunplug (id, force) -> - debug "VIF_hotplug %s.%s %b" (fst id) (snd id) force ; - perform_atomics (atomics_of_operation op) t - | VM_migrate vmm -> - debug "VM.migrate %s -> %s" vmm.vmm_id vmm.vmm_url ; - let id = vmm.vmm_id in - let new_src_id = vmm.vmm_tmp_src_id in - let new_dest_id = vmm.vmm_tmp_dest_id in - let vm = VM_DB.read_exn id in - let dbg = (Xenops_task.to_interface_task t).Task.dbg in - let url = Uri.of_string vmm.vmm_url in - let compress_memory = vmm.vmm_compress in - let compress = - match compress_memory with - | true -> - Zstd.Fast.compress - | false -> - fun fd fn -> fn fd - (* do nothing *) - in - let compress_vgpu vgpu_fd f = - match vgpu_fd with - | Some (FD fd) when compress_memory -> - compress fd (fun fd -> f (Some (FD fd))) - | vgpu_fd -> - f vgpu_fd - in - debug "%s compress memory: %b" __FUNCTION__ compress_memory ; - let verify_cert = - (* Stunnel_client.pool (which xapi normally uses) is not right here, - because xenopsd does not get notified if certificate checking is - turned on or off in xapi. Xapi takes the global on/off switch into - account when setting `verify_dest`. *) - if vmm.vmm_verify_dest then Some Stunnel.pool else None - in - (* We need to perform version exchange here *) - let module B = (val get_backend () : S) in - B.VM.assert_can_save vm ; - let extra_args = B.VM.get_hook_args id in - Xenops_hooks.vm ~script:Xenops_hooks.VM_pre_migrate - ~reason:Xenops_hooks.reason__migrate_source ~id ~extra_args ; - let module Remote = - Xenops_interface.XenopsAPI (Idl.Exn.GenClient (struct - let rpc = - Xcp_client.xml_http_rpc ~srcstr:"xenops" ~dststr:"dst_xenops" - ~verify_cert (fun () -> vmm.vmm_url - ) - end)) in - let regexp = Re.Pcre.regexp id in - debug "Destination domain will be built with uuid=%s" new_dest_id ; - debug "Original domain will be moved to uuid=%s" new_src_id ; - (* Redirect operations on new_src_id to our worker thread. *) - (* This is the id our domain will have when we've streamed its memory *) - (* image to the destination. *) - Redirector.alias ~tag:id ~alias:new_src_id ; - let id' = - let dbg = dbg_with_traceparent_of_task t in - Remote.VM.import_metadata dbg - (Re.replace_string regexp ~by:new_dest_id - (export_metadata vmm.vmm_vdi_map vmm.vmm_vif_map - vmm.vmm_vgpu_pci_map id - ) - ) - in - debug "Received vm-id = %s" id' ; - let make_url snippet id_str = - Uri.make ?scheme:(Uri.scheme url) ?host:(Uri.host url) - ?port:(Uri.port url) - ~path:(Uri.path url ^ snippet ^ id_str) - ~query:(Uri.query url) () - in - (* CA-78365: set the memory dynamic range to a single value to stop - ballooning. *) - let atomic = - VM_set_memory_dynamic_range - (id, vm.Vm.memory_dynamic_min, vm.Vm.memory_dynamic_min) - in - let (_ : unit) = - perform_atomic ~subtask:(string_of_atomic atomic) - ~progress_callback:(fun _ -> ()) - atomic t - in - (* Waiting here is not essential but adds a degree of safety and - reducess unnecessary memory copying. *) - ( try B.VM.wait_ballooning t vm - with Xenopsd_error Ballooning_timeout_before_migration -> () - ) ; - (* Find out the VM's current memory_limit: this will be used to allocate - memory on the receiver *) - let state = B.VM.get_state vm in - info "VM %s has memory_limit = %Ld" id state.Vm.memory_limit ; - let url = make_url "/migrate/vm/" new_dest_id in - let https = Uri.scheme url = Some "https" in - Open_uri.with_open_uri ~verify_cert url (fun vm_fd -> - let module Handshake = Xenops_migrate.Handshake in - let do_request fd extra_cookies url = - if not https then - Sockopt.set_sock_keepalives fd ; - let module Request = - Cohttp.Request.Make (Cohttp_posix_io.Unbuffered_IO) in - let cookies = - List.concat + match op with + | VM_start (id, force) -> ( + debug "VM.start %s (force=%b)" id force ; + let power = (B.VM.get_state (VM_DB.read_exn id)).Vm.power_state in + match power with + | Running -> + info "VM %s is already running" id + | _ -> + perform_atomics (atomics_of_operation op) t ; + VM_DB.signal id + ) + | VM_poweroff (id, _timeout) -> + debug "VM.poweroff %s" id ; + perform_atomics (atomics_of_operation op) t ; + VM_DB.signal id + | VM_reboot (id, _timeout) -> + debug "VM.reboot %s" id ; + rebooting id (fun () -> perform_atomics (atomics_of_operation op) t) ; + VM_DB.signal id + | VM_shutdown (id, _timeout) -> + debug "VM.shutdown %s" id ; + perform_atomics (atomics_of_operation op) t ; + VM_DB.signal id + | VM_suspend (id, _data) -> + debug "VM.suspend %s" id ; + perform_atomics (atomics_of_operation op) t ; + VM_DB.signal id + | VM_restore_vifs id -> + debug "VM_restore_vifs %s" id ; + perform_atomics (atomics_of_operation op) t + | VM_restore_devices (id, restore_vifs) -> + (* XXX: this is delayed due to the 'attach'/'activate' behaviour *) + debug "VM_restore_devices %s %b" id restore_vifs ; + perform_atomics (atomics_of_operation op) t + | VM_resume (id, _data) -> + debug "VM.resume %s" id ; + perform_atomics (atomics_of_operation op) t ; + VM_DB.signal id + | VBD_hotplug id -> + debug "VBD_hotplug %s.%s" (fst id) (snd id) ; + perform_atomics (atomics_of_operation op) t + | VBD_hotunplug (id, force) -> + debug "VBD_hotplug %s.%s %b" (fst id) (snd id) force ; + perform_atomics (atomics_of_operation op) t + | VIF_hotplug id -> + debug "VIF_hotplug %s.%s" (fst id) (snd id) ; + perform_atomics (atomics_of_operation op) t + | VIF_hotunplug (id, force) -> + debug "VIF_hotplug %s.%s %b" (fst id) (snd id) force ; + perform_atomics (atomics_of_operation op) t + | VM_migrate vmm -> + debug "VM.migrate %s -> %s" vmm.vmm_id vmm.vmm_url ; + let id = vmm.vmm_id in + let new_src_id = vmm.vmm_tmp_src_id in + let new_dest_id = vmm.vmm_tmp_dest_id in + let vm = VM_DB.read_exn id in + let dbg = (Xenops_task.to_interface_task t).Task.dbg in + let url = Uri.of_string vmm.vmm_url in + let compress_memory = vmm.vmm_compress in + let compress = + match compress_memory with + | true -> + Zstd.Fast.compress + | false -> + fun fd fn -> fn fd + (* do nothing *) + in + let compress_vgpu vgpu_fd f = + match vgpu_fd with + | Some (FD fd) when compress_memory -> + compress fd (fun fd -> f (Some (FD fd))) + | vgpu_fd -> + f vgpu_fd + in + debug "%s compress memory: %b" __FUNCTION__ compress_memory ; + let verify_cert = + (* Stunnel_client.pool (which xapi normally uses) is not right here, + because xenopsd does not get notified if certificate checking is + turned on or off in xapi. Xapi takes the global on/off switch into + account when setting `verify_dest`. *) + if vmm.vmm_verify_dest then Some Stunnel.pool else None + in + (* We need to perform version exchange here *) + let module B = (val get_backend () : S) in + B.VM.assert_can_save vm ; + let extra_args = B.VM.get_hook_args id in + Xenops_hooks.vm ~script:Xenops_hooks.VM_pre_migrate + ~reason:Xenops_hooks.reason__migrate_source ~id ~extra_args ; + let module Remote = Xenops_interface.XenopsAPI (Idl.Exn.GenClient (struct + let rpc = + Xcp_client.xml_http_rpc ~srcstr:"xenops" ~dststr:"dst_xenops" + ~verify_cert (fun () -> vmm.vmm_url + ) + end)) in + let regexp = Re.Pcre.regexp id in + debug "Destination domain will be built with uuid=%s" new_dest_id ; + debug "Original domain will be moved to uuid=%s" new_src_id ; + (* Redirect operations on new_src_id to our worker thread. *) + (* This is the id our domain will have when we've streamed its memory *) + (* image to the destination. *) + Redirector.alias ~tag:id ~alias:new_src_id ; + let id' = + let dbg = dbg_with_traceparent_of_task t in + Remote.VM.import_metadata dbg + (Re.replace_string regexp ~by:new_dest_id + (export_metadata vmm.vmm_vdi_map vmm.vmm_vif_map + vmm.vmm_vgpu_pci_map id + ) + ) + in + debug "Received vm-id = %s" id' ; + let make_url snippet id_str = + Uri.make ?scheme:(Uri.scheme url) ?host:(Uri.host url) + ?port:(Uri.port url) + ~path:(Uri.path url ^ snippet ^ id_str) + ~query:(Uri.query url) () + in + (* CA-78365: set the memory dynamic range to a single value to stop + ballooning. *) + let atomic = + VM_set_memory_dynamic_range + (id, vm.Vm.memory_dynamic_min, vm.Vm.memory_dynamic_min) + in + let (_ : unit) = + perform_atomic ~subtask:(string_of_atomic atomic) + ~progress_callback:(fun _ -> ()) + atomic t + in + (* Waiting here is not essential but adds a degree of safety and + reducess unnecessary memory copying. *) + ( try B.VM.wait_ballooning t vm + with Xenopsd_error Ballooning_timeout_before_migration -> () + ) ; + (* Find out the VM's current memory_limit: this will be used to allocate + memory on the receiver *) + let state = B.VM.get_state vm in + info "VM %s has memory_limit = %Ld" id state.Vm.memory_limit ; + let url = make_url "/migrate/vm/" new_dest_id in + let https = Uri.scheme url = Some "https" in + Open_uri.with_open_uri ~verify_cert url (fun vm_fd -> + let module Handshake = Xenops_migrate.Handshake in + let do_request fd extra_cookies url = + if not https then + Sockopt.set_sock_keepalives fd ; + let module Request = + Cohttp.Request.Make (Cohttp_posix_io.Unbuffered_IO) in + let cookies = + List.concat + [ [ - [ - ("instance_id", instance_id) - ; ("final_id", id) - ; ("dbg", dbg) - ; (cookie_mem_migration, cookie_mem_migration_value) - ] - ; ( if compress_memory then - [(cookie_mem_compression, cookie_mem_compression_value)] - else - [] - ) - ; extra_cookies + ("instance_id", instance_id) + ; ("final_id", id) + ; ("dbg", dbg) + ; (cookie_mem_migration, cookie_mem_migration_value) ] - in - let headers = - Cohttp.Header.of_list - ([ - Cohttp.Cookie.Cookie_hdr.serialize cookies - ; ("Connection", "keep-alive") - ; ("User-agent", "xenopsd") - ] - @ traceparent_header_of_task t + ; ( if compress_memory then + [(cookie_mem_compression, cookie_mem_compression_value)] + else + [] ) - in - let request = - Cohttp.Request.make ~meth:`PUT ~version:`HTTP_1_1 ~headers url - in - Request.write (fun _ -> ()) request fd + ; extra_cookies + ] in - do_request vm_fd - [("memory_limit", Int64.to_string state.Vm.memory_limit)] - url ; - let first_handshake () = - ( match Handshake.recv vm_fd with - | Handshake.Success -> - () - | Handshake.Error msg -> ( - error "cannot transmit vm to host: %s" msg ; - match - Rpcmarshal.unmarshal Errors.error.Rpc.Types.ty - (Jsonrpc.of_string msg) - with - | Ok e -> - raise (Xenopsd_error e) - | Error _ -> - raise (Xenopsd_error (Internal_error msg)) - | exception _ -> - raise (Xenopsd_error (Internal_error msg)) + let headers = + Cohttp.Header.of_list + ([ + Cohttp.Cookie.Cookie_hdr.serialize cookies + ; ("Connection", "keep-alive") + ; ("User-agent", "xenopsd") + ] + @ traceparent_header_of_task t ) - ) ; - debug "VM.migrate: Synchronisation point 1" in - let final_handshake () = - Handshake.send vm_fd Handshake.Success ; - debug "VM.migrate: Synchronisation point 3" ; - match Handshake.recv vm_fd with - | Success -> - debug "VM.migrate: Synchronisation point 4" - | Error msg -> - (* at this point, the VM has already been transferred to - the destination host. even though the destination host - failed to respond successfully to our handshake, the VM - should still be running correctly *) - error - "VM.migrate: Failed during Synchronisation point 4. msg: %s" - msg ; - raise (Xenopsd_error (Internal_error msg)) + let request = + Cohttp.Request.make ~meth:`PUT ~version:`HTTP_1_1 ~headers url in - let save ?vgpu_fd () = - let url = make_url "/migrate/mem/" new_dest_id in - Open_uri.with_open_uri ~verify_cert url (fun mem_fd -> - (* vm_fd: signaling channel, mem_fd: memory stream *) - do_request mem_fd [] url ; - Handshake.recv_success mem_fd ; - debug "VM.migrate: Synchronisation point 1-mem" ; - Handshake.send vm_fd Handshake.Success ; - debug "VM.migrate: Synchronisation point 1-mem ACK" ; - - compress mem_fd @@ fun mem_fd -> - compress_vgpu vgpu_fd @@ fun vgpu_fd -> - perform_atomics - [ - VM_save (id, [Live], FD mem_fd, vgpu_fd) - ; VM_rename (id, new_src_id, Pre_migration) - ] - t ; - debug "VM.migrate: Synchronisation point 2" + Request.write (fun _ -> ()) request fd + in + do_request vm_fd + [("memory_limit", Int64.to_string state.Vm.memory_limit)] + url ; + let first_handshake () = + ( match Handshake.recv vm_fd with + | Handshake.Success -> + () + | Handshake.Error msg -> ( + error "cannot transmit vm to host: %s" msg ; + match + Rpcmarshal.unmarshal Errors.error.Rpc.Types.ty + (Jsonrpc.of_string msg) + with + | Ok e -> + raise (Xenopsd_error e) + | Error _ -> + raise (Xenopsd_error (Internal_error msg)) + | exception _ -> + raise (Xenopsd_error (Internal_error msg)) ) - in - (* If we have a vGPU, kick off its migration process before starting - the main VM migration sequence. *) - match VGPU_DB.ids id with - | [] -> - first_handshake () ; save () ; final_handshake () - | (_vm_id, dev_id) :: _ -> - let url = - make_url "/migrate/vgpu/" - (VGPU_DB.string_of_id (new_dest_id, dev_id)) - in - Open_uri.with_open_uri ~verify_cert url (fun vgpu_fd -> - if not https then - Sockopt.set_sock_keepalives vgpu_fd ; - do_request vgpu_fd [(cookie_vgpu_migration, "")] url ; - Handshake.recv_success vgpu_fd ; - debug "VM.migrate: Synchronisation point 1-vgpu" ; - Handshake.send vm_fd Handshake.Success ; - debug "VM.migrate: Synchronisation point 1-vgpu ACK" ; - first_handshake () ; - save ~vgpu_fd:(FD vgpu_fd) () - ) ; - final_handshake () - ) ; - (* cleanup tmp src VM *) - let atomics = - [ + ) ; + debug "VM.migrate: Synchronisation point 1" + in + let final_handshake () = + Handshake.send vm_fd Handshake.Success ; + debug "VM.migrate: Synchronisation point 3" ; + match Handshake.recv vm_fd with + | Success -> + debug "VM.migrate: Synchronisation point 4" + | Error msg -> + (* at this point, the VM has already been transferred to + the destination host. even though the destination host + failed to respond successfully to our handshake, the VM + should still be running correctly *) + error + "VM.migrate: Failed during Synchronisation point 4. msg: %s" + msg ; + raise (Xenopsd_error (Internal_error msg)) + in + let save ?vgpu_fd () = + let url = make_url "/migrate/mem/" new_dest_id in + Open_uri.with_open_uri ~verify_cert url (fun mem_fd -> + (* vm_fd: signaling channel, mem_fd: memory stream *) + do_request mem_fd [] url ; + Handshake.recv_success mem_fd ; + debug "VM.migrate: Synchronisation point 1-mem" ; + Handshake.send vm_fd Handshake.Success ; + debug "VM.migrate: Synchronisation point 1-mem ACK" ; + + compress mem_fd @@ fun mem_fd -> + compress_vgpu vgpu_fd @@ fun vgpu_fd -> + perform_atomics + [ + VM_save (id, [Live], FD mem_fd, vgpu_fd) + ; VM_rename (id, new_src_id, Pre_migration) + ] + t ; + debug "VM.migrate: Synchronisation point 2" + ) + in + (* If we have a vGPU, kick off its migration process before starting + the main VM migration sequence. *) + match VGPU_DB.ids id with + | [] -> + first_handshake () ; save () ; final_handshake () + | (_vm_id, dev_id) :: _ -> + let url = + make_url "/migrate/vgpu/" + (VGPU_DB.string_of_id (new_dest_id, dev_id)) + in + Open_uri.with_open_uri ~verify_cert url (fun vgpu_fd -> + if not https then + Sockopt.set_sock_keepalives vgpu_fd ; + do_request vgpu_fd [(cookie_vgpu_migration, "")] url ; + Handshake.recv_success vgpu_fd ; + debug "VM.migrate: Synchronisation point 1-vgpu" ; + Handshake.send vm_fd Handshake.Success ; + debug "VM.migrate: Synchronisation point 1-vgpu ACK" ; + first_handshake () ; + save ~vgpu_fd:(FD vgpu_fd) () + ) ; + final_handshake () + ) ; + (* cleanup tmp src VM *) + let atomics = + [ + VM_hook_script_stable + ( id + , Xenops_hooks.VM_pre_destroy + , Xenops_hooks.reason__suspend + , new_src_id + ) + ] + @ atomics_of_operation (VM_shutdown (new_src_id, None)) + @ [ VM_hook_script_stable ( id - , Xenops_hooks.VM_pre_destroy + , Xenops_hooks.VM_post_destroy , Xenops_hooks.reason__suspend , new_src_id ) + ; VM_remove new_src_id ] - @ atomics_of_operation (VM_shutdown (new_src_id, None)) - @ [ - VM_hook_script_stable - ( id - , Xenops_hooks.VM_post_destroy - , Xenops_hooks.reason__suspend - , new_src_id - ) - ; VM_remove new_src_id - ] - in - perform_atomics atomics t - | VM_receive_memory - { - vmr_id= id - ; vmr_final_id= final_id - ; vmr_memory_limit= memory_limit - ; vmr_socket= s - ; vmr_handshake= handshake - ; vmr_compressed - } -> ( - let decompress = - match vmr_compressed with - | true -> - Zstd.Fast.decompress_passive - | false -> - fun fd fn -> fn fd - in - let decompress_vgpu vgpu_info f = - match vgpu_info with - | Some info when vmr_compressed -> - decompress info.vgpu_fd (fun fd -> f (Some (FD fd))) - | Some info -> - f (Some (FD info.vgpu_fd)) - | None -> - f None - in + in + perform_atomics atomics t + | VM_receive_memory + { + vmr_id= id + ; vmr_final_id= final_id + ; vmr_memory_limit= memory_limit + ; vmr_socket= s + ; vmr_handshake= handshake + ; vmr_compressed + } -> ( + let decompress = + match vmr_compressed with + | true -> + Zstd.Fast.decompress_passive + | false -> + fun fd fn -> fn fd + in + let decompress_vgpu vgpu_info f = + match vgpu_info with + | Some info when vmr_compressed -> + decompress info.vgpu_fd (fun fd -> f (Some (FD fd))) + | Some info -> + f (Some (FD info.vgpu_fd)) + | None -> + f None + in - if final_id <> id then - (* Note: In the localhost case, there are necessarily two worker - threads operating on the same VM. The first one is using tag - xxxxxxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxx and has tag - xxxxxxxxxxxx-xxxx-xxxx-xxxx-00000000 aliased to it (so actions - being queued against either will get queued on that worker thread). - The second worker thread has just started up at this point and has - tag xxxxxxxxxxxx-xxxx-xxxx-xxxx-00000001. The following line will - add a new alias of the original id to this second worker thread so - we end up with a situation where there are two worker threads that - can conceivably queue actions related to the original uuid - xxxxxxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxx. However, the alias is always - resolved first and hence in practice any further actions related to - the real uuid will be queued up on this worker thread's queue. *) - Redirector.alias ~tag:id ~alias:final_id ; - debug "VM.receive_memory: %s (compressed=%b)" id vmr_compressed ; - Sockopt.set_sock_keepalives s ; - let open Xenops_migrate in - (* set up the destination domain *) - debug "VM.receive_memory: creating domain and restoring VIFs" ; - finally - (fun () -> - (* If we have a vGPU, wait for the vgpu-1 ACK, which indicates that - the vgpu_receiver_sync entry for this vm id has already been - initialised by the parallel receive_vgpu thread in this receiving - host *) - ( match VGPU_DB.ids id with - | [] -> - () - | _ -> - Handshake.recv_success s ; - debug "VM.receive_memory: Synchronisation point 1-vgpu ACK" - (* After this point, vgpu_receiver_sync is initialised by the - corresponding receive_vgpu thread and therefore can be used - by this VM_receive_memory thread *) - ) ; - ( try - let no_sharept = - VGPU_DB.vgpus id |> List.exists is_no_sharept - in - debug "VM %s no_sharept=%b (%s)" id no_sharept __LOC__ ; - perform_atomics - ([ - VM_create (id, Some memory_limit, Some final_id, no_sharept) - ] - @ (* Perform as many operations as possible on the destination - domain before pausing the original domain *) - atomics_of_operation (VM_restore_vifs id) - ) - t ; - Handshake.send s Handshake.Success - with e -> - Backtrace.is_important e ; - let msg = - match e with - | Xenopsd_error error -> - Rpcmarshal.marshal Errors.error.Rpc.Types.ty error - |> Jsonrpc.to_string - | _ -> - Printexc.to_string e - in - Handshake.send s (Handshake.Error msg) ; - raise e - ) ; - debug "VM.receive_memory: Synchronisation point 1" ; - debug "VM.receive_memory: restoring VM" ; - try - (* Check if there is a separate vGPU data channel *) - let vgpu_info = - with_lock vgpu_receiver_sync_m (fun () -> - Hashtbl.find_opt vgpu_receiver_sync id - ) - in - let pcis = PCI_DB.pcis id |> pci_plug_order in - let receive_mem_fd id s = - debug - "VM.receive_memory: using new handshake protocol for VM %s" id ; - Handshake.recv_success s ; - debug "VM.receive_memory: Synchronisation point 1-mem ACK" ; - with_lock mem_receiver_sync_m @@ fun () -> - match Hashtbl.find_opt mem_receiver_sync id with - | Some fd -> - fd.mem_fd - | None -> - error - "VM.receive_memory: Failed to receive FD for VM memory \ - (id=%s)" - id ; - failwith __FUNCTION__ - in - let vgpu_start_operations () = - match VGPU_DB.ids id with - | [] -> - [] - | vgpus -> - let vgpus' = VGPU_DB.vgpus id in - let pcis_sriov = - List.filter (is_nvidia_sriov vgpus') pcis - in - List.concat - [ - dequarantine_ops vgpus' - ; List.map - (fun pci -> PCI_plug (pci.Pci.id, false)) - pcis_sriov - ; [VGPU_start (vgpus, true)] - ] - in - let mem_fd = - match handshake with - | Some _ -> - receive_mem_fd id s (* new handshake protocol *) - | None -> - s - (* receiving memory on this connection *) - in - Sockopt.set_sock_keepalives mem_fd ; - decompress mem_fd @@ fun mem_fd -> - decompress_vgpu vgpu_info @@ fun vgpu_info -> + if final_id <> id then + (* Note: In the localhost case, there are necessarily two worker + threads operating on the same VM. The first one is using tag + xxxxxxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxx and has tag + xxxxxxxxxxxx-xxxx-xxxx-xxxx-00000000 aliased to it (so actions + being queued against either will get queued on that worker thread). + The second worker thread has just started up at this point and has + tag xxxxxxxxxxxx-xxxx-xxxx-xxxx-00000001. The following line will + add a new alias of the original id to this second worker thread so + we end up with a situation where there are two worker threads that + can conceivably queue actions related to the original uuid + xxxxxxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxx. However, the alias is always + resolved first and hence in practice any further actions related to + the real uuid will be queued up on this worker thread's queue. *) + Redirector.alias ~tag:id ~alias:final_id ; + debug "VM.receive_memory: %s (compressed=%b)" id vmr_compressed ; + Sockopt.set_sock_keepalives s ; + let open Xenops_migrate in + (* set up the destination domain *) + debug "VM.receive_memory: creating domain and restoring VIFs" ; + finally + (fun () -> + (* If we have a vGPU, wait for the vgpu-1 ACK, which indicates that + the vgpu_receiver_sync entry for this vm id has already been + initialised by the parallel receive_vgpu thread in this receiving + host *) + ( match VGPU_DB.ids id with + | [] -> + () + | _ -> + Handshake.recv_success s ; + debug "VM.receive_memory: Synchronisation point 1-vgpu ACK" + (* After this point, vgpu_receiver_sync is initialised by the + corresponding receive_vgpu thread and therefore can be used + by this VM_receive_memory thread *) + ) ; + ( try + let no_sharept = VGPU_DB.vgpus id |> List.exists is_no_sharept in + debug "VM %s no_sharept=%b (%s)" id no_sharept __LOC__ ; perform_atomics - (List.concat - [ - vgpu_start_operations () - ; [VM_restore (id, FD mem_fd, vgpu_info)] - ] + ([VM_create (id, Some memory_limit, Some final_id, no_sharept)] + @ (* Perform as many operations as possible on the destination + domain before pausing the original domain *) + atomics_of_operation (VM_restore_vifs id) ) t ; - debug "VM.receive_memory: restore complete" + Handshake.send s Handshake.Success with e -> Backtrace.is_important e ; - Debug.log_backtrace e (Backtrace.get e) ; - debug "Caught %s during VM_restore: cleaning up VM state" - (Printexc.to_string e) ; - perform_atomics [VM_destroy id; VM_remove id] t - ) - (fun () -> - (* Inform the vgpu and mem handler threads we are done *) + let msg = + match e with + | Xenopsd_error error -> + Rpcmarshal.marshal Errors.error.Rpc.Types.ty error + |> Jsonrpc.to_string + | _ -> + Printexc.to_string e + in + Handshake.send s (Handshake.Error msg) ; + raise e + ) ; + debug "VM.receive_memory: Synchronisation point 1" ; + debug "VM.receive_memory: restoring VM" ; + try + (* Check if there is a separate vGPU data channel *) let vgpu_info = with_lock vgpu_receiver_sync_m (fun () -> Hashtbl.find_opt vgpu_receiver_sync id ) in - let mem_info = - with_lock mem_receiver_sync_m (fun () -> - Hashtbl.find_opt mem_receiver_sync id - ) + let pcis = PCI_DB.pcis id |> pci_plug_order in + let receive_mem_fd id s = + debug "VM.receive_memory: using new handshake protocol for VM %s" + id ; + Handshake.recv_success s ; + debug "VM.receive_memory: Synchronisation point 1-mem ACK" ; + with_lock mem_receiver_sync_m @@ fun () -> + match Hashtbl.find_opt mem_receiver_sync id with + | Some fd -> + fd.mem_fd + | None -> + error + "VM.receive_memory: Failed to receive FD for VM memory \ + (id=%s)" + id ; + failwith __FUNCTION__ in - Option.iter - (fun x -> Event.send x.vgpu_channel () |> Event.sync) - vgpu_info ; - Option.iter - (fun x -> Event.send x.mem_channel () |> Event.sync) - mem_info - ) ; - debug "VM.receive_memory: Synchronisation point 2" ; - try - (* Receive the all-clear to unpause *) - Handshake.recv_success s ; - debug "VM.receive_memory: Synchronisation point 3" ; - if final_id <> id then ( - debug "VM.receive_memory: Renaming domain" ; - perform_atomics [VM_rename (id, final_id, Post_migration)] t - ) ; - debug "VM.receive_memory: restoring remaining devices and unpausing" ; - perform_atomics - (atomics_of_operation (VM_restore_devices (final_id, false)) - @ [ - VM_unpause final_id - ; VM_set_domain_action_request (final_id, None) - ; VM_hook_script - ( final_id - , Xenops_hooks.VM_post_migrate - , Xenops_hooks.reason__migrate_dest - ) - ] + let vgpu_start_operations () = + match VGPU_DB.ids id with + | [] -> + [] + | vgpus -> + let vgpus' = VGPU_DB.vgpus id in + let pcis_sriov = List.filter (is_nvidia_sriov vgpus') pcis in + List.concat + [ + dequarantine_ops vgpus' + ; List.map + (fun pci -> PCI_plug (pci.Pci.id, false)) + pcis_sriov + ; [VGPU_start (vgpus, true)] + ] + in + let mem_fd = + match handshake with + | Some _ -> + receive_mem_fd id s (* new handshake protocol *) + | None -> + s + (* receiving memory on this connection *) + in + Sockopt.set_sock_keepalives mem_fd ; + decompress mem_fd @@ fun mem_fd -> + decompress_vgpu vgpu_info @@ fun vgpu_info -> + perform_atomics + (List.concat + [ + vgpu_start_operations () + ; [VM_restore (id, FD mem_fd, vgpu_info)] + ] + ) + t ; + debug "VM.receive_memory: restore complete" + with e -> + Backtrace.is_important e ; + Debug.log_backtrace e (Backtrace.get e) ; + debug "Caught %s during VM_restore: cleaning up VM state" + (Printexc.to_string e) ; + perform_atomics [VM_destroy id; VM_remove id] t + ) + (fun () -> + (* Inform the vgpu and mem handler threads we are done *) + let vgpu_info = + with_lock vgpu_receiver_sync_m (fun () -> + Hashtbl.find_opt vgpu_receiver_sync id ) - t ; - Handshake.send s Handshake.Success ; - debug "VM.receive_memory: Synchronisation point 4" - with e -> - finally - (fun () -> - Backtrace.is_important e ; - Debug.log_backtrace e (Backtrace.get e) ; - debug "Caught %s: cleaning up VM state" (Printexc.to_string e) ; - perform_atomics - (atomics_of_operation (VM_shutdown (id, None)) @ [VM_remove id]) - t + in + let mem_info = + with_lock mem_receiver_sync_m (fun () -> + Hashtbl.find_opt mem_receiver_sync id ) - (fun () -> Handshake.send s (Handshake.Error (Printexc.to_string e))) - ) - | VM_check_state id -> - let vm = VM_DB.read_exn id in - let state = B.VM.get_state vm in - let run_time = Unix.gettimeofday () -. state.Vm.last_start_time in - let actions = - match B.VM.get_domain_action_request vm with - | Some Needs_reboot -> - vm.Vm.on_reboot - | Some Needs_poweroff -> - vm.Vm.on_shutdown - | Some Needs_crashdump -> - (* A VM which crashes too quickly should be shutdown *) - if run_time < 120. then ( - warn - "VM %s crashed too quickly after start (%.2f seconds); \ - shutting down" - id run_time ; - vm.Vm.on_crash - |> List.map (function Vm.Start -> Vm.Shutdown | other -> other) - ) else - vm.Vm.on_crash - | Some Needs_suspend -> - warn "VM %s has unexpectedly suspended" id ; - [Vm.Shutdown] - | Some Needs_softreset -> - vm.Vm.on_softreboot - | None -> - debug "VM %s is not requesting any attention" id ; - [] - in - let operations_of_action = function - | Vm.Coredump -> - [] - | Vm.Shutdown -> - [VM_shutdown (id, None)] - | Vm.Start -> - let delay = - if run_time < B.VM.minimum_reboot_delay then ( - debug "VM %s rebooted too quickly; inserting delay" id ; - [Atomic (VM_delay (id, 15.))] - ) else - [] - in - delay @ [VM_reboot (id, None)] - | Vm.Pause -> - [Atomic (VM_pause id)] - | Vm.Softreboot -> - [Atomic (VM_softreboot id)] - in - let operations = List.concat (List.map operations_of_action actions) in - List.iter (fun x -> perform_exn x t) operations ; - VM_DB.signal id - | PCI_check_state id -> - debug "PCI.check_state %s" (PCI_DB.string_of_id id) ; - let vif_t = PCI_DB.read_exn id in - let vm_state = B.VM.get_state (VM_DB.read_exn (PCI_DB.vm_of id)) in - let request = - if - vm_state.Vm.power_state = Running - || vm_state.Vm.power_state = Paused - then - B.PCI.get_device_action_request (VIF_DB.vm_of id) vif_t - else - Some Needs_unplug - in - let operations_of_request = function - | Needs_unplug -> - Some (Atomic (PCI_unplug id)) - | Needs_set_qos -> - None - in - let operations = - List.filter_map operations_of_request (Option.to_list request) - in - List.iter (fun x -> perform_exn x t) operations - | VBD_check_state id -> - debug "VBD.check_state %s" (VBD_DB.string_of_id id) ; - let vbd_t = VBD_DB.read_exn id in - let vm_state = B.VM.get_state (VM_DB.read_exn (VBD_DB.vm_of id)) in - let request = - if - vm_state.Vm.power_state = Running - || vm_state.Vm.power_state = Paused - then - B.VBD.get_device_action_request (VBD_DB.vm_of id) vbd_t - else ( - debug "VM %s is not running: VBD_unplug needed" (VBD_DB.vm_of id) ; - Some Needs_unplug + in + Option.iter + (fun x -> Event.send x.vgpu_channel () |> Event.sync) + vgpu_info ; + Option.iter + (fun x -> Event.send x.mem_channel () |> Event.sync) + mem_info + ) ; + debug "VM.receive_memory: Synchronisation point 2" ; + try + (* Receive the all-clear to unpause *) + Handshake.recv_success s ; + debug "VM.receive_memory: Synchronisation point 3" ; + if final_id <> id then ( + debug "VM.receive_memory: Renaming domain" ; + perform_atomics [VM_rename (id, final_id, Post_migration)] t + ) ; + debug "VM.receive_memory: restoring remaining devices and unpausing" ; + perform_atomics + (atomics_of_operation (VM_restore_devices (final_id, false)) + @ [ + VM_unpause final_id + ; VM_set_domain_action_request (final_id, None) + ; VM_hook_script + ( final_id + , Xenops_hooks.VM_post_migrate + , Xenops_hooks.reason__migrate_dest + ) + ] ) - in - let operations_of_request = function - | Needs_unplug -> - Some (Atomic (VBD_unplug (id, true))) - | Needs_set_qos -> - Some (Atomic (VBD_set_qos id)) - in - let operations = - List.filter_map operations_of_request (Option.to_list request) - in - List.iter (fun x -> perform_exn x t) operations ; - (* Needed (eg) to reflect a spontaneously-ejected CD *) - VBD_DB.signal id - | VIF_check_state id -> - debug "VIF.check_state %s" (VIF_DB.string_of_id id) ; - let vif_t = VIF_DB.read_exn id in - let vm_state = B.VM.get_state (VM_DB.read_exn (VIF_DB.vm_of id)) in - let request = - if - vm_state.Vm.power_state = Running - || vm_state.Vm.power_state = Paused - then - B.VIF.get_device_action_request (VIF_DB.vm_of id) vif_t - else - Some Needs_unplug - in - let operations_of_request = function - | Needs_unplug -> - Some (Atomic (VIF_unplug (id, true))) - | Needs_set_qos -> - None - in - let operations = - List.filter_map operations_of_request (Option.to_list request) - in - List.iter (fun x -> perform_exn x t) operations - | VUSB_check_state id -> - debug "VUSB.check_state %s" (VUSB_DB.string_of_id id) ; - let vusb_t = VUSB_DB.read_exn id in - let vm_state = B.VM.get_state (VM_DB.read_exn (VUSB_DB.vm_of id)) in - let request = - if - vm_state.Vm.power_state = Running - || vm_state.Vm.power_state = Paused - then - B.VUSB.get_device_action_request (VUSB_DB.vm_of id) vusb_t - else ( - debug "VM %s is not running: VUSB_unplug needed" (VUSB_DB.vm_of id) ; - Some Needs_unplug + t ; + Handshake.send s Handshake.Success ; + debug "VM.receive_memory: Synchronisation point 4" + with e -> + finally + (fun () -> + Backtrace.is_important e ; + Debug.log_backtrace e (Backtrace.get e) ; + debug "Caught %s: cleaning up VM state" (Printexc.to_string e) ; + perform_atomics + (atomics_of_operation (VM_shutdown (id, None)) @ [VM_remove id]) + t ) - in - let operations_of_request = function - | Needs_unplug -> - Some (Atomic (VUSB_unplug id)) - | Needs_set_qos -> - None - in - let operations = - List.filter_map operations_of_request (Option.to_list request) - in - List.iter (fun x -> perform_exn x t) operations ; - VUSB_DB.signal id - | Atomic op -> - let progress_callback = progress_callback 0. 1. t in - perform_atomic ~progress_callback ?subtask ?result op t - in - one op + (fun () -> Handshake.send s (Handshake.Error (Printexc.to_string e))) + ) + | VM_check_state id -> + let vm = VM_DB.read_exn id in + let state = B.VM.get_state vm in + let run_time = Unix.gettimeofday () -. state.Vm.last_start_time in + let actions = + match B.VM.get_domain_action_request vm with + | Some Needs_reboot -> + vm.Vm.on_reboot + | Some Needs_poweroff -> + vm.Vm.on_shutdown + | Some Needs_crashdump -> + (* A VM which crashes too quickly should be shutdown *) + if run_time < 120. then ( + warn + "VM %s crashed too quickly after start (%.2f seconds); \ + shutting down" + id run_time ; + vm.Vm.on_crash + |> List.map (function Vm.Start -> Vm.Shutdown | other -> other) + ) else + vm.Vm.on_crash + | Some Needs_suspend -> + warn "VM %s has unexpectedly suspended" id ; + [Vm.Shutdown] + | Some Needs_softreset -> + vm.Vm.on_softreboot + | None -> + debug "VM %s is not requesting any attention" id ; + [] + in + let operations_of_action = function + | Vm.Coredump -> + [] + | Vm.Shutdown -> + [VM_shutdown (id, None)] + | Vm.Start -> + let delay = + if run_time < B.VM.minimum_reboot_delay then ( + debug "VM %s rebooted too quickly; inserting delay" id ; + [Atomic (VM_delay (id, 15.))] + ) else + [] + in + delay @ [VM_reboot (id, None)] + | Vm.Pause -> + [Atomic (VM_pause id)] + | Vm.Softreboot -> + [Atomic (VM_softreboot id)] + in + let operations = List.concat (List.map operations_of_action actions) in + List.iter (fun x -> perform_exn x t) operations ; + VM_DB.signal id + | PCI_check_state id -> + debug "PCI.check_state %s" (PCI_DB.string_of_id id) ; + let vif_t = PCI_DB.read_exn id in + let vm_state = B.VM.get_state (VM_DB.read_exn (PCI_DB.vm_of id)) in + let request = + if vm_state.Vm.power_state = Running || vm_state.Vm.power_state = Paused + then + B.PCI.get_device_action_request (VIF_DB.vm_of id) vif_t + else + Some Needs_unplug + in + let operations_of_request = function + | Needs_unplug -> + Some (Atomic (PCI_unplug id)) + | Needs_set_qos -> + None + in + let operations = + List.filter_map operations_of_request (Option.to_list request) + in + List.iter (fun x -> perform_exn x t) operations + | VBD_check_state id -> + debug "VBD.check_state %s" (VBD_DB.string_of_id id) ; + let vbd_t = VBD_DB.read_exn id in + let vm_state = B.VM.get_state (VM_DB.read_exn (VBD_DB.vm_of id)) in + let request = + if vm_state.Vm.power_state = Running || vm_state.Vm.power_state = Paused + then + B.VBD.get_device_action_request (VBD_DB.vm_of id) vbd_t + else ( + debug "VM %s is not running: VBD_unplug needed" (VBD_DB.vm_of id) ; + Some Needs_unplug + ) + in + let operations_of_request = function + | Needs_unplug -> + Some (Atomic (VBD_unplug (id, true))) + | Needs_set_qos -> + Some (Atomic (VBD_set_qos id)) + in + let operations = + List.filter_map operations_of_request (Option.to_list request) + in + List.iter (fun x -> perform_exn x t) operations ; + (* Needed (eg) to reflect a spontaneously-ejected CD *) + VBD_DB.signal id + | VIF_check_state id -> + debug "VIF.check_state %s" (VIF_DB.string_of_id id) ; + let vif_t = VIF_DB.read_exn id in + let vm_state = B.VM.get_state (VM_DB.read_exn (VIF_DB.vm_of id)) in + let request = + if vm_state.Vm.power_state = Running || vm_state.Vm.power_state = Paused + then + B.VIF.get_device_action_request (VIF_DB.vm_of id) vif_t + else + Some Needs_unplug + in + let operations_of_request = function + | Needs_unplug -> + Some (Atomic (VIF_unplug (id, true))) + | Needs_set_qos -> + None + in + let operations = + List.filter_map operations_of_request (Option.to_list request) + in + List.iter (fun x -> perform_exn x t) operations + | VUSB_check_state id -> + debug "VUSB.check_state %s" (VUSB_DB.string_of_id id) ; + let vusb_t = VUSB_DB.read_exn id in + let vm_state = B.VM.get_state (VM_DB.read_exn (VUSB_DB.vm_of id)) in + let request = + if vm_state.Vm.power_state = Running || vm_state.Vm.power_state = Paused + then + B.VUSB.get_device_action_request (VUSB_DB.vm_of id) vusb_t + else ( + debug "VM %s is not running: VUSB_unplug needed" (VUSB_DB.vm_of id) ; + Some Needs_unplug + ) + in + let operations_of_request = function + | Needs_unplug -> + Some (Atomic (VUSB_unplug id)) + | Needs_set_qos -> + None + in + let operations = + List.filter_map operations_of_request (Option.to_list request) + in + List.iter (fun x -> perform_exn x t) operations ; + VUSB_DB.signal id + | Atomic op -> + let progress_callback = progress_callback 0. 1. t in + perform_atomic ~progress_callback ?subtask ?result op t and verify_power_state op = let module B = (val get_backend () : S) in diff --git a/ocaml/xenopsd/lib/xenops_utils.ml b/ocaml/xenopsd/lib/xenops_utils.ml index 4993cb44df1..a7a374393cd 100644 --- a/ocaml/xenopsd/lib/xenops_utils.ml +++ b/ocaml/xenopsd/lib/xenops_utils.ml @@ -61,10 +61,6 @@ module Socket = struct end end -let all = List.fold_left ( && ) true - -let any = List.fold_left ( || ) false - module type ITEM = sig type t @@ -88,11 +84,7 @@ let set_root service_name = let get_root () = match !root with None -> failwith "Xenops_utils.root not set" | Some x -> x -module StringMap = Map.Make (struct - type t = string - - let compare = compare -end) +module StringMap = Map.Make (String) type 'a fs = Dir of 'a fs StringMap.t ref | Leaf of 'a @@ -154,7 +146,7 @@ let dropnone x = List.filter_map (Option.map Fun.id) x module FileFS = struct (** A directory tree containiign files, each of which contain strings *) - let filename_of k = Printf.sprintf "%s/%s" (get_root ()) (String.concat "/" k) + let filename_of k = String.concat Filename.dir_sep (get_root () :: k) let paths_of k = List.map filename_of (prefixes_of k) @@ -378,18 +370,21 @@ functor let get_path k = k |> I.key |> of_key + let path_to_string = String.concat Filename.dir_sep + (* Non-thread-safe functions: avoid calling these directly *) let write (k : I.key) (x : t) = let module FS = (val get_fs_backend () : FS) in let path = get_path k in - debug "TypedTable: Writing %s" (String.concat "/" path) ; + debug "TypedTable: Writing %s" (path_to_string path) ; FS.write path (Rpcmarshal.marshal t.Rpc.Types.ty x) let delete (k : I.key) = - debug "TypedTable: Deleting %s" (k |> I.key |> of_key |> String.concat "/") ; let module FS = (val get_fs_backend () : FS) in - FS.rm (get_path k) + let path = get_path k in + debug "TypedTable: Deleting %s" (path_to_string path) ; + FS.rm path (* Thread-safe functions *) @@ -412,11 +407,8 @@ functor | Some x -> x | None -> - raise - (Xenopsd_error - (Errors.Does_not_exist (I.namespace, I.key k |> String.concat "/") - ) - ) + let key = (I.namespace, I.key k |> path_to_string) in + raise (Xenopsd_error (Errors.Does_not_exist key)) let exists (k : I.key) = let module FS = (val get_fs_backend () : FS) in diff --git a/ocaml/xenopsd/xc/device.ml b/ocaml/xenopsd/xc/device.ml index 425a61a6bc4..6727f81502c 100644 --- a/ocaml/xenopsd/xc/device.ml +++ b/ocaml/xenopsd/xc/device.ml @@ -3687,6 +3687,8 @@ module Dm = struct type action = Start | Restore + let action_to_string = function Start -> "Start" | Restore -> "Restore" + let __start (task : Xenops_task.task_handle) ~xc ~xs ~dm ?(timeout = !Xenopsd.qemu_dm_ready_timeout) action info domid = let args = @@ -3696,7 +3698,8 @@ module Dm = struct | Restore -> qemu_args ~xs ~dm info true domid in - debug "Device.Dm.start domid=%d args: [%s]" domid + debug "Device.Dm.start domid=%d action=%s qemu args: [%s]" domid + (action_to_string action) (String.concat " " args.argv) ; (* start vgpu emulation if appropriate *) let () = @@ -3717,19 +3720,22 @@ module Dm = struct (* start swtpm-wrapper if appropriate and modify QEMU arguments as needed *) let tpmargs = + let mk_args vtpm_uuid = + let tpm_socket_path = + Service.Swtpm.start ~xs task domid ~vtpm_uuid ~index:0 + in + [ + "-chardev" + ; Printf.sprintf "socket,id=chrtpm,path=%s" tpm_socket_path + ; "-tpmdev" + ; "emulator,id=tpm0,chardev=chrtpm" + ; "-device" + ; "tpm-crb,tpmdev=tpm0" + ] + in match info.tpm with | Some (Vtpm vtpm_uuid) -> - let tpm_socket_path = - Service.Swtpm.start ~xs task domid ~vtpm_uuid ~index:0 - in - [ - "-chardev" - ; Printf.sprintf "socket,id=chrtpm,path=%s" tpm_socket_path - ; "-tpmdev" - ; "emulator,id=tpm0,chardev=chrtpm" - ; "-device" - ; "tpm-crb,tpmdev=tpm0" - ] + mk_args vtpm_uuid | None -> D.debug "VM domid %d has no vTPM" domid ; [] diff --git a/ocaml/xenopsd/xc/xenops_helpers.ml b/ocaml/xenopsd/xc/xenops_helpers.ml index f76865848ba..81e15f09d07 100644 --- a/ocaml/xenopsd/xc/xenops_helpers.ml +++ b/ocaml/xenopsd/xc/xenops_helpers.ml @@ -58,3 +58,9 @@ let domains_of_uuid ~xc uuid = ) ) (Xenctrl.domain_getinfolist xc 0) + +let domain_exists ~xc di = + try + ignore (Xenctrl.domain_getinfo xc di.Xenctrl.domid) ; + true + with _ -> false diff --git a/ocaml/xenopsd/xc/xenops_server_xen.ml b/ocaml/xenopsd/xc/xenops_server_xen.ml index 2aca5aac632..f76acd083da 100644 --- a/ocaml/xenopsd/xc/xenops_server_xen.ml +++ b/ocaml/xenopsd/xc/xenops_server_xen.ml @@ -236,30 +236,23 @@ let uuid_of_string x = let uuid_of_vm vm = uuid_of_string vm.Vm.id +let domid_of_di di = string_of_int di.Xenctrl.domid + let di_of_uuid ~xc uuid = - let open Xenctrl in match Xenops_helpers.domains_of_uuid ~xc uuid with | [] -> None | [x] -> Some x | possible -> - let domid_list = - String.concat ", " (List.map (fun x -> string_of_int x.domid) possible) - in + let domid_list = String.concat ", " (List.map domid_of_di possible) in let uuid' = Uuidx.to_string uuid in - error - "VM %s: there are %d domains (%s) with the same uuid: one or more have \ - leaked" - uuid' (List.length possible) domid_list ; - raise - (Xenopsd_error - (Internal_error - (Printf.sprintf "More than one domain with uuid (%s): %s" uuid' - domid_list - ) - ) - ) + let err_msg = + Printf.sprintf "More than one domain with uuid %s: (%s)" uuid' + domid_list + in + error "%s: %s" __FUNCTION__ err_msg ; + raise (Xenopsd_error (Internal_error err_msg)) | exception Failure r -> raise (Xenopsd_error (Internal_error r)) @@ -268,28 +261,20 @@ let domid_of_uuid ~xs uuid = actually destroy a domain on suspend. Therefore we only rely on state in xenstore *) let dir = Printf.sprintf "/vm/%s/domains" (Uuidx.to_string uuid) in - try - match - xs.Xs.directory dir |> List.map int_of_string |> List.sort compare - with - | [] -> - None - | [x] -> - Some x - | xs -> - let domid_list = String.concat ", " (List.map string_of_int xs) in - error "More than 1 domain associated with a VM. This is no longer OK!" ; - raise - (Xenopsd_error - (Internal_error - (Printf.sprintf "More than one domain with uuid (%s): %s" - (Uuidx.to_string uuid) domid_list - ) - ) - ) - with _ -> - error "Failed to read %s: has this domain already been cleaned up?" dir ; - None + match xs.Xs.directory dir |> List.map int_of_string |> List.sort compare with + | [] -> + None + | [x] -> + Some x + | xs -> + let domid_list = String.concat ", " (List.map string_of_int xs) in + let uuid' = Uuidx.to_string uuid in + error "%s: More than one domain with uuid %s: (%s)" __FUNCTION__ uuid' + domid_list ; + None + | exception _ -> + warn "Failed to read %s: has this domain already been cleaned up?" dir ; + None let get_uuid ~xc domid = Domain.get_uuid ~xc domid @@ -311,16 +296,11 @@ let params_of_backend backend = else ("backend-kind", backend_kind) :: xenstore_data | [] -> - raise - (Xenopsd_error - (Internal_error - ("Could not find XenDisk implementation: " - ^ (Storage_interface.(rpc_of backend) backend - |> Jsonrpc.to_string - ) - ) - ) - ) + let err_msg = + Printf.sprintf "Could not find XenDisk implementation: %s" + (Storage_interface.(rpc_of backend) backend |> Jsonrpc.to_string) + in + raise (Xenopsd_error (Internal_error err_msg)) in let params, extra_keys = match (blockdevs, files, nbds, xendisks) with @@ -331,16 +311,12 @@ let params_of_backend backend = | _, _, _, xendisk :: _ -> ("", [("qemu-params", xendisk.Storage_interface.params)]) | _ -> - raise - (Xenopsd_error - (Internal_error - ("Could not find BlockDevice, File, or Nbd implementation: " - ^ (Storage_interface.(rpc_of backend) backend - |> Jsonrpc.to_string - ) - ) - ) - ) + let err_msg = + Printf.sprintf + "Could not find BlockDevice, File, or Nbd implementation: %s" + (Storage_interface.(rpc_of backend) backend |> Jsonrpc.to_string) + in + raise (Xenopsd_error (Internal_error err_msg)) in (params, xenstore_data, extra_keys) @@ -364,16 +340,14 @@ let create_vbd_frontend ~xc ~xs task frontend_domid vdi = | _, _, nbd :: _ -> Nbd nbd | [], [], [] -> - raise - (Xenopsd_error - (Internal_error - ("Could not find File, BlockDevice, or Nbd implementation: " - ^ (Storage_interface.(rpc_of backend) vdi.attach_info - |> Jsonrpc.to_string - ) - ) - ) - ) + let err_msg = + Printf.sprintf + "Could not find BlockDevice, File, or Nbd implementation: %s" + (Storage_interface.(rpc_of backend) vdi.attach_info + |> Jsonrpc.to_string + ) + in + raise (Xenopsd_error (Internal_error err_msg)) ) | Some backend_domid -> let params, xenstore_data, extra_keys = @@ -1617,7 +1591,7 @@ module VM = struct let create = create_exn - let on_domain f (task : Xenops_task.task_handle) vm = + let on_domain (task : Xenops_task.task_handle) vm f = let uuid = uuid_of_vm vm in with_xc_and_xs (fun xc xs -> match di_of_uuid ~xc uuid with @@ -1628,7 +1602,7 @@ module VM = struct ) let on_domain_if_exists f (task : Xenops_task.task_handle) vm = - try on_domain f task vm + try on_domain task vm f with Xenopsd_error (Does_not_exist ("domain", _)) -> debug "Domain for VM %s does not exist: ignoring" vm.Vm.id @@ -1798,28 +1772,27 @@ module VM = struct ) ) - let pause = - on_domain (fun xc _ _ _ di -> + let pause t vm = + on_domain t vm (fun xc _ _ _ di -> if di.Xenctrl.total_memory_pages = 0n then raise (Xenopsd_error Domain_not_built) ; Domain.pause ~xc di.Xenctrl.domid ) - let unpause = - on_domain (fun xc _ _ _ di -> + let unpause t vm = + on_domain t vm (fun xc _ _ _ di -> if di.Xenctrl.total_memory_pages = 0n then raise (Xenopsd_error Domain_not_built) ; Domain.unpause ~xc di.Xenctrl.domid ) let set_xsdata task vm xsdata = - on_domain - (fun _ xs _ _ di -> Domain.set_xsdata ~xs di.Xenctrl.domid xsdata) - task vm + on_domain task vm (fun _ xs _ _ di -> + Domain.set_xsdata ~xs di.Xenctrl.domid xsdata + ) let set_vcpus task vm target = - on_domain - (fun _ xs _ _ di -> + on_domain task vm (fun _ xs _ _ di -> let domid = di.Xenctrl.domid in (* Returns the instantaneous CPU number from xenstore *) let current = @@ -1839,12 +1812,10 @@ module VM = struct i = current to target - 1 do Device.Vcpu.set ~xs ~dm:(dm_of ~vm) ~devid:i domid true done - ) - task vm + ) let set_shadow_multiplier task vm target = - on_domain - (fun xc xs _ _ di -> + on_domain task vm (fun xc xs _ _ di -> if get_domain_type ~xs di = Vm.Domain_PV then raise (Xenopsd_error (Unimplemented "shadow_multiplier for PV domains")) ; @@ -1879,20 +1850,17 @@ module VM = struct debug "VM = %s; domid = %d; shadow_allocation_setto %d MiB" vm.Vm.id domid newshadow ; Xenctrl.shadow_allocation_set xc domid newshadow - ) - task vm + ) let set_memory_dynamic_range task vm min max = - on_domain - (fun xc xs _ _ di -> + on_domain task vm (fun xc xs _ _ di -> let domid = di.Xenctrl.domid in Domain.set_memory_dynamic_range ~xc ~xs ~min:(Int64.to_int (Int64.div min 1024L)) ~max:(Int64.to_int (Int64.div max 1024L)) domid ; Mem.balance_memory (Xenops_task.get_dbg task) - ) - task vm + ) let qemu_device_of_vbd_frontend = function | Empty -> @@ -2036,6 +2004,10 @@ module VM = struct Device.Dm.Disabled in let parallel = List.assoc_opt "parallel" vm.Vm.platformdata in + (* During snapshot restores only the uuid in ~vm is up to date *) + let tpm = + match vtpm_of ~vm with None -> hvm_info.tpm | vtpm -> vtpm + in Some (make ~video_mib:hvm_info.video_mib ~firmware:hvm_info.firmware ~video:hvm_info.video ~acpi:hvm_info.acpi @@ -2043,8 +2015,7 @@ module VM = struct ?vnc_ip:hvm_info.vnc_ip ~usb ~parallel ~pci_emulations:hvm_info.pci_emulations ~pci_passthrough:hvm_info.pci_passthrough - ~boot_order:hvm_info.boot_order ~nics ~disks ~vgpus - ~tpm:hvm_info.tpm () + ~boot_order:hvm_info.boot_order ~nics ~disks ~vgpus ~tpm () ) ) @@ -2338,7 +2309,7 @@ module VM = struct (fun () -> clean_memory_reservation task di.Xenctrl.domid) let build ?restore_fd:_ task vm vbds vifs vgpus vusbs extras force = - on_domain (build_domain vm vbds vifs vgpus vusbs extras force) task vm + on_domain task vm (build_domain vm vbds vifs vgpus vusbs extras force) let create_device_model_exn vbds vifs vgpus vusbs saved_state vmextra xc xs task vm di = @@ -2373,37 +2344,34 @@ module VM = struct let create_device_model task vm vbds vifs vgpus vusbs saved_state = let _ = - DB.update_exn vm.Vm.id (fun d -> - let vmextra = - (* Fill in the xen-platform data, if it is not yet set *) - match d.VmExtra.persistent.xen_platform with - | None -> - VmExtra. + DB.update_exn vm.Vm.id @@ fun d -> + let vmextra = + (* Fill in the xen-platform data, if it is not yet set *) + match d.VmExtra.persistent.xen_platform with + | None -> + VmExtra. + { + persistent= { - persistent= - { - d.persistent with - xen_platform= Some (xen_platform_of ~vm ~vmextra:d) - } + d.persistent with + xen_platform= Some (xen_platform_of ~vm ~vmextra:d) } - | _ -> - d - in - let () = - on_domain - (create_device_model_exn vbds vifs vgpus vusbs saved_state vmextra) - task vm - in - (* Ensure that the updated vmextra is written back to the DB *) - Some vmextra - ) + } + | _ -> + d + in + let () = + on_domain task vm + (create_device_model_exn vbds vifs vgpus vusbs saved_state vmextra) + in + (* Ensure that the updated vmextra is written back to the DB *) + Some vmextra in () let request_shutdown task vm reason ack_delay = let reason = shutdown_reason reason in - on_domain - (fun xc xs task _vm di -> + on_domain task vm (fun xc xs task _vm di -> let domain_type = match get_domain_type ~xs di with | Vm.Domain_HVM -> @@ -2422,8 +2390,7 @@ module VM = struct domain_type reason ; true with Watch.Timeout _ -> false - ) - task vm + ) let wait_shutdown task vm _reason timeout = let is_vm_event = function @@ -2437,7 +2404,7 @@ module VM = struct debug "OTHER EVENT" ; None in let vm_has_shutdown () = - on_domain (fun _ _ _ _ di -> di.Xenctrl.shutdown) task vm + on_domain task vm (fun _ _ _ _ di -> di.Xenctrl.shutdown) in Option.is_some (event_wait internal_updates task timeout is_vm_event vm_has_shutdown) @@ -2528,8 +2495,7 @@ module VM = struct ) let wait_ballooning task vm = - on_domain - (fun _ xs _ _ di -> + on_domain task vm (fun _ xs _ _ di -> let domid = di.Xenctrl.domid in let balloon_active_path = xs.Xs.getdomainpath domid ^ "/control/balloon-active" @@ -2561,8 +2527,7 @@ module VM = struct Ballooning_timeout_before_migration ) ) - ) - task vm + ) let assert_can_save vm = with_xs (fun xs -> @@ -2576,8 +2541,7 @@ module VM = struct let save task progress_callback vm flags data vgpu_data pre_suspend_callback = let flags' = List.map (function Live -> Domain.Live) flags in - on_domain - (fun xc xs (task : Xenops_task.task_handle) vm di -> + on_domain task vm (fun xc xs (task : Xenops_task.task_handle) vm di -> let domain_type = match get_domain_type ~xs di with | Vm.Domain_HVM -> @@ -2697,8 +2661,7 @@ module VM = struct in () ) - ) - task vm + ) let inject_igmp_query domid vifs = let vif_names = @@ -2719,107 +2682,91 @@ module VM = struct Forkhelpers.dontwaitpid pid let restore task _progress_callback vm _vbds vifs data vgpu_data extras = - on_domain - (fun xc xs task vm di -> - finally - (fun () -> - let domid = di.Xenctrl.domid in - let qemu_domid = this_domid ~xs in - let k = vm.Vm.id in - let vmextra = DB.read_exn k in - let build_info, timeoffset = - match vmextra.VmExtra.persistent with - | {VmExtra.build_info= None; _} -> - error "VM = %s; No stored build_info: cannot safely restore" - vm.Vm.id ; - raise (Xenopsd_error (Does_not_exist ("build_info", vm.Vm.id))) - | {VmExtra.build_info= Some x; VmExtra.ty; _} -> - let initial_target = get_initial_target ~xs domid in - let timeoffset = - match ty with - | Some x -> ( - match x with - | HVM hvm_info -> - hvm_info.timeoffset - | _ -> - "" - ) - | _ -> - "" - in - ({x with Domain.memory_target= initial_target}, timeoffset) + on_domain task vm @@ fun xc xs task vm di -> + let do_restore () = + let domid = di.Xenctrl.domid in + let qemu_domid = this_domid ~xs in + let k = vm.Vm.id in + let vmextra = DB.read_exn k in + let build_info, timeoffset = + match vmextra.VmExtra.persistent with + | {VmExtra.build_info= None; _} -> + error "VM = %s; No stored build_info: cannot safely restore" + vm.Vm.id ; + raise (Xenopsd_error (Does_not_exist ("build_info", vm.Vm.id))) + | {VmExtra.build_info= Some x; VmExtra.ty; _} -> + let initial_target = get_initial_target ~xs domid in + let timeoffset = + match ty with + | Some (HVM hvm_info) -> + hvm_info.timeoffset + | _ -> + "" in - let no_incr_generationid = false in - ( try - with_data ~xc ~xs task data false (fun fd -> - let vgpu_fd = - match vgpu_data with - | Some (FD vgpu_fd) -> - Some vgpu_fd - | Some disk when disk = data -> - Some fd (* Don't open the file twice *) - | Some _other_disk -> - None (* We don't support this *) - | None -> - None - in - let manager_path = choose_emu_manager vm.Vm.platformdata in - Domain.restore task ~xc ~xs ~dm:(dm_of ~vm) ~store_domid - ~console_domid - ~no_incr_generationid (* XXX progress_callback *) - ~timeoffset ~extras build_info ~manager_path - ~vtpm:(vtpm_of ~vm) domid fd vgpu_fd - ) - with e -> - error "VM %s: restore failed: %s" vm.Vm.id (Printexc.to_string e) ; - (* As of xen-unstable.hg 779c0ef9682 libxenguest will destroy - the domain on failure *) - ( if - try - ignore (Xenctrl.domain_getinfo xc di.Xenctrl.domid) ; - false - with _ -> true - then - try - debug - "VM %s: libxenguest has destroyed domid %d; cleaning \ - up xenstore for consistency" - vm.Vm.id di.Xenctrl.domid ; - Domain.destroy task ~xc ~xs ~qemu_domid - ~vtpm:(vtpm_of ~vm) ~dm:(dm_of ~vm) di.Xenctrl.domid - with _ -> - debug "Domain.destroy failed. Re-raising original error." - ) ; - raise e - ) ; - Int64.( - let min = to_int (div vm.Vm.memory_dynamic_min 1024L) - and max = to_int (div vm.Vm.memory_dynamic_max 1024L) in - Domain.set_memory_dynamic_range ~xc ~xs ~min ~max domid - ) ; - try inject_igmp_query domid vifs |> ignore - with e -> - error "VM %s: inject IGMP query failed: %s" vm.Vm.id - (Printexc.to_string e) - ) - (fun () -> clean_memory_reservation task di.Xenctrl.domid) - ) - task vm + ({x with Domain.memory_target= initial_target}, timeoffset) + in + let no_incr_generationid = false in + let vtpm = vtpm_of ~vm in + ( try + with_data ~xc ~xs task data false @@ fun fd -> + let vgpu_fd = + match vgpu_data with + | Some (FD vgpu_fd) -> + Some vgpu_fd + | Some disk when disk = data -> + Some fd (* Don't open the file twice *) + | Some _other_disk -> + None (* We don't support this *) + | None -> + None + in + let manager_path = choose_emu_manager vm.Vm.platformdata in + Domain.restore task ~xc ~xs ~dm:(dm_of ~vm) ~store_domid + ~console_domid ~no_incr_generationid (* XXX progress_callback *) + ~timeoffset ~extras build_info ~manager_path ~vtpm domid fd vgpu_fd + with e -> + error "VM %s: restore failed: %s" vm.Vm.id (Printexc.to_string e) ; + (* As of xen-unstable.hg 779c0ef9682 libxenguest will destroy + the domain on failure *) + ( if not (Xenops_helpers.domain_exists ~xc di) then + try + debug + "VM %s: libxenguest has destroyed domid %d; cleaning up \ + xenstore for consistency" + vm.Vm.id di.Xenctrl.domid ; + Domain.destroy task ~xc ~xs ~qemu_domid ~vtpm ~dm:(dm_of ~vm) + di.Xenctrl.domid + with _ -> + debug "Domain.destroy failed. Re-raising original error." + ) ; + raise e + ) ; + Int64.( + let min = to_int (div vm.Vm.memory_dynamic_min 1024L) + and max = to_int (div vm.Vm.memory_dynamic_max 1024L) in + Domain.set_memory_dynamic_range ~xc ~xs ~min ~max domid + ) ; + try inject_igmp_query domid vifs |> ignore + with e -> + error "VM %s: inject IGMP query failed: %s" vm.Vm.id + (Printexc.to_string e) + in + finally do_restore (fun () -> clean_memory_reservation task di.Xenctrl.domid) - let s3suspend = + let s3suspend t vm = (* XXX: TODO: monitor the guest's response; track the s3 state *) - on_domain (fun xc xs _task _vm di -> + on_domain t vm (fun xc xs _task _vm di -> Domain.shutdown ~xc ~xs di.Xenctrl.domid Domain.S3Suspend ) - let s3resume = + let s3resume t vm = (* XXX: TODO: monitor the guest's response; track the s3 state *) - on_domain (fun xc _xs _task _vm di -> + on_domain t vm (fun xc _xs _task _vm di -> Domain.send_s3resume ~xc di.Xenctrl.domid ) - let soft_reset = - on_domain (fun xc xs _task _vm di -> + let soft_reset t vm = + on_domain t vm (fun xc xs _task _vm di -> Domain.soft_reset ~xc ~xs di.Xenctrl.domid )