Skip to content

Commit

Permalink
Merge pull request #5052 from edwintorok/private/edvint/redo-log-space
Browse files Browse the repository at this point in the history
CA-378222: assert_sr_can_host_statefile has to take available space i…
  • Loading branch information
edwintorok authored Jun 16, 2023
2 parents 420074c + aed3f8c commit 2e2e9df
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 120 deletions.
60 changes: 24 additions & 36 deletions ocaml/database/redo_log.ml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ let redo_log_sm_config = [("type", "raw")]
(* ---------------------------------------------------- *)
(* Encapsulate the state of a single redo_log instance. *)

type redo_log = {
type redo_log_conf = {
name: string
; marker: string
; read_only: bool
Expand All @@ -65,8 +65,10 @@ type redo_log = {
; num_dying_processes: int ref
}

type 'a redo_log = redo_log_conf

module RedoLogSet = Set.Make (struct
type t = redo_log
type t = redo_log_conf

let compare log1 log2 = compare log1.marker log2.marker
end)
Expand All @@ -83,12 +85,12 @@ let ready_to_write = ref true

let is_enabled log = !(log.enabled)

let enable log vdi_reason =
let enable_existing log vdi_reason =
R.info "Enabling use of redo log" ;
log.device := get_static_device vdi_reason ;
log.enabled := true

let enable_block log path =
let enable_block_existing log path =
R.info "Enabling use of redo log" ;
log.device := Some path ;
log.enabled := true
Expand Down Expand Up @@ -365,34 +367,6 @@ let rec read_read_response sock fn_db fn_delta expected_gen_count
raise
(CommunicationsProblem ("unrecognised read response prefix [" ^ e ^ "]"))

let action_empty sock _datasockpath =
R.debug "Performing empty" ;
(* Compute desired response time *)
let latest_response_time =
get_latest_response_time !Db_globs.redo_log_max_block_time_empty
in
(* Empty *)
let str = Bytes.of_string "empty_____" in
Unixext.time_limited_write sock (Bytes.length str) str latest_response_time ;
(* Read response *)
let response_length = 10 in
let response =
Unixext.time_limited_read sock response_length latest_response_time
in
match response with
| "empty|ack_" ->
()
| "empty|nack" ->
(* Read the error message *)
let error = read_length_and_string sock latest_response_time in
R.warn "Emptying was unsuccessful: [%s]" error ;
if error = Block_device_io_errors.timeout_error_msg then
raise Unixext.Timeout
else
raise (RedoLogFailure error)
| e ->
raise (CommunicationsProblem ("unrecognised empty response [" ^ e ^ "]"))

let action_read fn_db fn_delta sock datasockpath =
R.debug "Performing read" ;
(* Compute desired response time *)
Expand Down Expand Up @@ -800,6 +774,10 @@ let create ~name ~state_change_callback ~read_only =
) ;
instance

let create_rw = create ~read_only:false

let create_ro = create ~read_only:true

let delete log =
shutdown log ;
disable log ;
Expand Down Expand Up @@ -871,10 +849,6 @@ let apply fn_db fn_delta log =
(fun () -> ready_to_write := true)
)

let empty log =
if is_enabled log then
connect_and_perform_action action_empty "invalidate the redo log" log

(** ------------------------------------------------ *)

(** Functions which operate on all active redo_logs. *)
Expand All @@ -888,6 +862,20 @@ let flush_db_to_redo_log db log =
write_db_to_fd log ;
!(log.currently_accessible)

let flush_db_exn db log =
assert (not log.read_only) ;
(* phantom type parameter ensures this only gets called with RW *)
R.debug "Flushing database on redo log enable" ;
if not (flush_db_to_redo_log db log) then
raise (RedoLogFailure "Cannot connect to redo log")

let enable_and_flush db log reason =
enable_existing log reason ; flush_db_exn db log

let enable_block_and_flush db log path =
enable_block_existing log path ;
flush_db_exn db log

(* Write the given database to all active redo_logs *)
let flush_db_to_all_active_redo_logs db =
R.info "Flushing database to all active redo-logs" ;
Expand Down
81 changes: 30 additions & 51 deletions ocaml/database/redo_log.mli
Original file line number Diff line number Diff line change
Expand Up @@ -28,34 +28,30 @@ val redo_log_sm_config : (string * string) list
(** SM config for redo log VDI *)

(** {redo_log data type} *)
type redo_log = {
name: string
; marker: string
; read_only: bool
; enabled: bool ref
; device: string option ref
; currently_accessible: bool ref
; state_change_callback: (bool -> unit) option
; time_of_last_failure: float ref
; backoff_delay: int ref
; sock: Unix.file_descr option ref
; pid: (Forkhelpers.pidty * string * string) option ref
; dying_processes_mutex: Mutex.t
; num_dying_processes: int ref
}
type 'a redo_log

(** {2 Enabling and disabling writing} *)

val is_enabled : redo_log -> bool
val is_enabled : _ redo_log -> bool
(** Returns [true] iff writing deltas to the block device is enabled. *)

val enable : redo_log -> string -> unit
(** Enables writing deltas to the block device. Subsequent modifications to the database will be persisted to the block device. Takes a static-VDI reason as argument to select the device to use. *)
val enable_existing : [< `RO | `RW] redo_log -> string -> unit
(** Enables writing deltas to the block device. Subsequent modifications to the database will be persisted to the block device. Takes a static-VDI reason as argument to select the device to use.
The redo log is expected to contain some data to be played back.
*)

val enable_and_flush :
Db_cache_types.Database.t -> [< `RW] redo_log -> string -> unit
(** Like {!enable_existing} but the redo log is freshly created and will trigger an immediate database flush *)

val enable_block : redo_log -> string -> unit
val enable_block_existing : [< `RO] redo_log -> string -> unit
(** Enables writing deltas to the block device. Subsequent modifications to the database will be persisted to the block device. Takes a path as argument to select the device to use. *)

val disable : redo_log -> unit
val enable_block_and_flush :
Db_cache_types.Database.t -> [< `RW] redo_log -> string -> unit
(** Like {!enable_block_existing} but the redo log is freshly created and will trigger an immediate database flush *)

val disable : _ redo_log -> unit
(** Disables writing deltas to the block device. Subsequent modifications to the database will not be persisted to the block device. *)

(** Communication with other threads. *)
Expand All @@ -64,32 +60,31 @@ val redo_log_events : (string * bool) Event.channel

(** {2 Lifecycle of I/O process} *)

val startup : redo_log -> unit
val startup : _ redo_log -> unit
(** Start the I/O process. Will do nothing if it's already started. *)

val shutdown : redo_log -> unit

val switch : redo_log -> string -> unit
val shutdown : _ redo_log -> unit
(** Stop the I/O process. Will do nothing if it's not already started. *)

val switch : _ redo_log -> string -> unit
(** Start using the VDI with the given reason as redo-log, discarding the current one. *)

val create :
name:string
-> state_change_callback:(bool -> unit) option
-> read_only:bool
-> redo_log
(** {Keeping track of existing redo_log instances} *)

(* Create a redo log instance and add it to the set. *)
val create_ro :
name:string -> state_change_callback:(bool -> unit) option -> [> `RO] redo_log
(** Create a RO redo log instance and add it to the set. *)

val delete : redo_log -> unit
val create_rw :
name:string -> state_change_callback:(bool -> unit) option -> [> `RW] redo_log
(** Create a RW redo log instance and add it to the set. *)

(* Shutdown a redo_log instance and remove it from the set. *)
val delete : _ redo_log -> unit
(** Shutdown a redo_log instance and remove it from the set. *)

val with_active_redo_logs : (redo_log -> unit) -> unit
(** {Finding active redo_log instances} *)

val with_active_redo_logs : (_ redo_log -> unit) -> unit
(* Apply the supplied function to all active redo_logs. *)

(** {2 Interacting with the block device} *)
Expand All @@ -106,22 +101,10 @@ type t =
(** [WriteField (tblname, objref, fldname, newval)]
represents the write to the field with name [fldname] of a row in table [tblname] with key [objref], overwriting its value with [newval]. *)

val write_db : Generation.t -> (Unix.file_descr -> unit) -> redo_log -> unit
(** Write a database.
This function is best-effort only and does not raise any exceptions in the case of error.
[write_db gen_count f] is used to write a database with generation count [gen_count] to the block device.
A file descriptor is passed to [f] which is expected to write the contents of the database to it. *)

val write_delta : Generation.t -> t -> (unit -> unit) -> redo_log -> unit
(** Write a database delta.
This function is best-effort only and does not raise any exceptions in the case of error.
[write_delta gen_count delta db_flush_fn] writes a delta [delta] with generation count [gen_count] to the block device.
If the redo log wishes to flush the database before writing the delta, it will invoke [db_flush_fn]. It is expected that this function implicitly attempts to reconnect to the block device I/O process if not already connected. *)

val apply :
(Generation.t -> Unix.file_descr -> int -> float -> unit)
-> (Generation.t -> t -> unit)
-> redo_log
-> [< `RO | `RW] redo_log
-> unit
(** Read from the block device.
This function is best-effort only and does not raise any exceptions in the case of error.
Expand All @@ -130,11 +113,7 @@ val apply :
For each database, [db_fn] is invoked with the database's generation count, a file descriptor from which to read the database's contents, the length of the database in bytes and the latest response time. The [db_fn] function may raise {!Unixext.Timeout} if the transfer is not complete by the latest response time.
For each database delta, [delta_fn] is invoked with the delta's generation count and the value of the delta. *)

val empty : redo_log -> unit
(** Invalidate the block device. This means that subsequent attempts to read from the block device will not find anything.
This function is best-effort only and does not raise any exceptions in the case of error. *)

val flush_db_to_redo_log : Db_cache_types.Database.t -> redo_log -> bool
val flush_db_to_redo_log : Db_cache_types.Database.t -> [< `RW] redo_log -> bool
(** Immediately write the given database to the given redo_log instance *)

val flush_db_to_all_active_redo_logs : Db_cache_types.Database.t -> unit
Expand Down
5 changes: 3 additions & 2 deletions ocaml/xapi/redo_log_usage.mli
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
* @group Redo-log
*)

val read_from_redo_log : Redo_log.redo_log -> string -> Db_ref.t -> unit
val read_from_redo_log :
[< `RO | `RW] Redo_log.redo_log -> string -> Db_ref.t -> unit
(** Connect to the block device and write the latest version of the database
* on it to a file with a given name. *)

val stop_using_redo_log : Redo_log.redo_log -> unit
val stop_using_redo_log : _ Redo_log.redo_log -> unit
(** Disconnect from the block device. May be safely called even when not currently connected. *)
3 changes: 2 additions & 1 deletion ocaml/xapi/xapi.ml
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,8 @@ let start_redo_log () =
then (
debug "Redo log was enabled when shutting down, so restarting it" ;
(* enable the use of the redo log *)
Redo_log.enable Xapi_ha.ha_redo_log Xapi_globs.gen_metadata_vdi_reason ;
Redo_log.enable_existing Xapi_ha.ha_redo_log
Xapi_globs.gen_metadata_vdi_reason ;
debug "Attempting to extract a database from a metadata VDI" ;
(* read from redo log and store results in a staging file for use in the
* next step; best effort only: does not raise any exceptions *)
Expand Down
9 changes: 5 additions & 4 deletions ocaml/xapi/xapi_ha.ml
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ open Xha_scripts

(* Create a redo_log instance to use for HA. *)
let ha_redo_log =
Redo_log.create ~name:"HA redo log" ~state_change_callback:None
~read_only:false
Redo_log.create_rw ~name:"HA redo log" ~state_change_callback:None

(*********************************************************************************************)
(* Interface with the low-level HA subsystem *)
Expand Down Expand Up @@ -926,7 +925,9 @@ let redo_log_ha_enabled_during_runtime __context =
Redo_log.switch ha_redo_log Xapi_globs.ha_metadata_vdi_reason
) else (
info "Switching on HA redo log." ;
Redo_log.enable ha_redo_log Xapi_globs.ha_metadata_vdi_reason
Redo_log.enable_and_flush
(Context.database_of __context |> Db_ref.get_database)
ha_redo_log Xapi_globs.ha_metadata_vdi_reason
(* upon the first attempt to write a delta, it will realise that a DB flush
* is necessary as the I/O process will not be running *)
)
Expand All @@ -952,7 +953,7 @@ let redo_log_ha_enabled_at_startup () =
(* If we are still the master, extract any HA metadata database so we can consider population from it *)
if Pool_role.is_master () then (
debug "HA is enabled, so enabling writing to redo-log" ;
Redo_log.enable ha_redo_log Xapi_globs.ha_metadata_vdi_reason ;
Redo_log.enable_existing ha_redo_log Xapi_globs.ha_metadata_vdi_reason ;
(* enable the use of the redo log *)
debug
"This node is a master; attempting to extract a database from a metadata \
Expand Down
2 changes: 1 addition & 1 deletion ocaml/xapi/xapi_ha.mli
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
(** Functions for implementing 'High Availability' (HA).
@group High Availability (HA) *)

val ha_redo_log : Redo_log.redo_log
val ha_redo_log : [`RW] Redo_log.redo_log
(** The redo log instance used for HA *)

(******************************************************************************)
Expand Down
4 changes: 3 additions & 1 deletion ocaml/xapi/xapi_pool.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2924,7 +2924,9 @@ let enable_redo_log ~__context ~sr =
(* enable the new redo log, unless HA is enabled (which means a redo log
* is already in use) *)
if not (Db.Pool.get_ha_enabled ~__context ~self:pool) then (
Redo_log.enable Xapi_ha.ha_redo_log Xapi_globs.gen_metadata_vdi_reason ;
Redo_log.enable_and_flush
(Context.database_of __context |> Db_ref.get_database)
Xapi_ha.ha_redo_log Xapi_globs.gen_metadata_vdi_reason ;
Localdb.put Constants.redo_log_enabled "true"
) ;
info "The redo log is now enabled"
Expand Down
16 changes: 8 additions & 8 deletions ocaml/xapi/xapi_vdi_helpers.ml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ let assert_managed ~__context ~vdi =
let redo_log_lifecycle_mutex = Mutex.create ()

let metadata_replication :
(API.ref_VDI, API.ref_VBD * Redo_log.redo_log) Hashtbl.t =
(API.ref_VDI, API.ref_VBD * [`RW] Redo_log.redo_log) Hashtbl.t =
Hashtbl.create Xapi_globs.redo_log_max_instances

let get_master_dom0 ~__context =
Expand Down Expand Up @@ -140,12 +140,12 @@ let enable_database_replication ~__context ~get_vdi_callback =
in
(* Enable redo_log and point it at the new device *)
let log_name = Printf.sprintf "DR redo log for VDI %s" vdi_uuid in
let log =
Redo_log.create ~name:log_name ~state_change_callback ~read_only:false
in
let log = Redo_log.create_rw ~name:log_name ~state_change_callback in
let device = Db.VBD.get_device ~__context ~self:vbd in
try
Redo_log.enable_block log ("/dev/" ^ device) ;
Redo_log.enable_block_and_flush
(Context.database_of __context |> Db_ref.get_database)
log ("/dev/" ^ device) ;
Hashtbl.add metadata_replication vdi (vbd, log) ;
let vbd_uuid = Db.VBD.get_uuid ~__context ~self:vbd in
Db.VDI.set_metadata_latest ~__context ~self:vdi ~value:true ;
Expand Down Expand Up @@ -196,11 +196,11 @@ let database_open_mutex = Mutex.create ()
let database_ref_of_vdi ~__context ~vdi =
let database_ref_of_device device =
let log =
Redo_log.create ~name:"Foreign database redo log"
~state_change_callback:None ~read_only:true
Redo_log.create_ro ~name:"Foreign database redo log"
~state_change_callback:None
in
debug "Enabling redo_log with device reason [%s]" device ;
Redo_log.enable_block log device ;
Redo_log.enable_block_existing log device ;
let db = Database.make (Datamodel_schema.of_datamodel ()) in
let db_ref = Db_ref.in_memory (ref (ref db)) in
Redo_log_usage.read_from_redo_log log Xapi_globs.foreign_metadata_db db_ref ;
Expand Down
Loading

0 comments on commit 2e2e9df

Please sign in to comment.