Skip to content

Commit

Permalink
Merge pull request #291 from microsoft/mitchzhu/add_retry
Browse files Browse the repository at this point in the history
Add retry logic to image layer fetching and decompression
  • Loading branch information
miz060 authored Jan 16, 2025
2 parents 8b742d9 + fdfa224 commit 66d2248
Showing 1 changed file with 113 additions and 55 deletions.
168 changes: 113 additions & 55 deletions src/tardev-snapshotter/src/snapshotter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,93 @@ impl TarDevSnapshotter {
}
}

/// Fetches and processes an image layer.
///
/// Downloads the layer, decompresses it if needed, appends a tar index,
/// and generates a dm-verity tree. Prepares the layer for use in the snapshotter.
async fn fetch_and_process_layer(
&self,
upstream_name: PathBuf,
base_name: PathBuf,
salt: Vec<u8>,
digest_str: &str,
layer_type: &str,
) -> Result<(), Status> {
let layer_type = layer_type.to_string(); // Clone `layer_type` into an owned `String`

info!("Fetching {} layer image to {:?}", layer_type, upstream_name);

// Fetch the layer image
self.get_layer_image(&upstream_name, digest_str).await.map_err(|download_err| {
error!("Failed to fetch layer image: {:?}", download_err);
Status::unknown(format!("Failed to fetch layer image: {:?}", download_err))
})?;

// Process the layer
let process_result = tokio::task::spawn_blocking({
let upstream_name = upstream_name.clone();
let base_name = base_name.clone();
let salt = salt.clone();
let layer_type = layer_type.clone(); // Move `layer_type` into the closure

move || -> Result<_> {
if layer_type == TAR_EXTENSION {
info!("Renaming {:?} to {:?}", &upstream_name, &base_name);
std::fs::rename(&upstream_name, &base_name)?;
}
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(layer_type == TAR_GZ_EXTENSION)
.open(&base_name)?;
if layer_type == TAR_GZ_EXTENSION {
info!("Decompressing {:?} to {:?}", &upstream_name, &base_name);
let compressed = fs::File::open(&upstream_name).map_err(|e| {
let file_error = format!(
"Failed to open file {:?} for decompression: {:?}",
&upstream_name, e
);
error!("{}", file_error);
anyhow::anyhow!(file_error)
})?;
let mut gz_decoder = flate2::read::GzDecoder::new(compressed);

if let Err(e) = std::io::copy(&mut gz_decoder, &mut file) {
let copy_error = format!("failed to copy payload from gz decoder {:?}", e);
error!("{}", copy_error);
return Err(anyhow::anyhow!(copy_error));
}
}

trace!("Appending index to {:?}", &base_name);
file.rewind().context("failed to rewind the file handle")?;
tarindex::append_index(&mut file).context("failed to append tar index")?;

trace!("Appending dm-verity tree to {:?}", &base_name);
let root_hash = verity::append_tree::<Sha256>(&mut file, &salt)
.context("failed to append verity tree")?;

trace!("Root hash for {:?} is {:x}", &base_name, root_hash);
Ok(root_hash)
}
})
.await
.map_err(|e| Status::unknown(format!("Error in worker task: {:?}", e)))?;

match process_result {
Ok(generated_root_hash) => {
let generated_root_hash = format!("{:x}", generated_root_hash);
trace!("Generated root hash: {}", generated_root_hash);
Ok(())
}
Err(process_err) => {
error!("Failed to process layer: {:?}", process_err);
Err(Status::unknown(format!("Failed to process layer: {:?}", process_err)))
}
}
}

/// Creates a new snapshot for an image layer.
///
/// It downloads, decompresses, and creates the index for the layer before writing the new
Expand All @@ -849,7 +936,6 @@ impl TarDevSnapshotter {
salt_str: String, // base64 encoded
) -> Result<(), Status> {
let dir = self.store.read().await.staging_dir()?;

let base_name = dir.path().join(name_to_hash(&key));
let snapshot_name = base_name.clone();

Expand Down Expand Up @@ -887,63 +973,35 @@ impl TarDevSnapshotter {

let upstream_name = base_name.with_extension(layer_type);

info!("Fetching {} layer image to {:?}", layer_type, upstream_name);
self.get_layer_image(&upstream_name, digest_str).await?;

// Process the layer
let generated_root_hash = tokio::task::spawn_blocking(move || -> Result<_> {
if layer_type == TAR_EXTENSION {
info!("Renaming {:?} to {:?}", &upstream_name, &base_name);
std::fs::rename(&upstream_name, &base_name)?;
}
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(layer_type == TAR_GZ_EXTENSION)
.open(&base_name)?;
if layer_type == TAR_GZ_EXTENSION {
info!("Decompressing {:?} to {:?}", &upstream_name, &base_name);
let compressed = fs::File::open(&upstream_name).map_err(|e| {
let file_error = format!(
"Failed to open file {:?} for decompression: {:?}",
&upstream_name, e
// Retry logic to handle occasional transient errors during image layer extraction,
// such as "UnexpectedEof" errors observed in gzip decompression. These issues may
// be due to incomplete reads or temporary network failures. While the root cause
// is unclear, this provides resilience. Be mindful of containerd's operation deadlines.
const MAX_RETRIES: usize = 3;
const RETRY_DELAY: std::time::Duration = std::time::Duration::from_millis(500); // 500ms delay
let mut retries = 0;
while retries < MAX_RETRIES {
match self
.fetch_and_process_layer(upstream_name.clone(), base_name.clone(), salt.clone(), digest_str, layer_type)
.await
{
Ok(_) => break, // Success; exit loop
Err(err) => {
retries += 1;
error!(
"Failed to fetch/process layer (attempt {}/{}): {:?}",
retries, MAX_RETRIES, err
);
error!("{}", file_error);
anyhow::anyhow!(file_error)
})?;

let mut gz_decoder = flate2::read::GzDecoder::new(compressed);

if let Err(e) = std::io::copy(&mut gz_decoder, &mut file) {
let copy_error = format!("failed to copy payload from gz decoder {:?}", e);
error!("{}", copy_error);
return Err(anyhow::anyhow!(copy_error));
if retries >= MAX_RETRIES {
return Err(Status::unknown(format!(
"Failed to fetch/process layer after {} attempts: {:?}",
MAX_RETRIES, err
)));
}
warn!("Retrying fetch/process layer...");
tokio::time::sleep(RETRY_DELAY).await; // Sleep before retrying
}
}

trace!("Appending index to {:?}", &base_name);
file.rewind().context("failed to rewind the file handle")?;
tarindex::append_index(&mut file).context("failed to append tar index")?;

trace!("Appending dm-verity tree to {:?}", &base_name);
let root_hash = verity::append_tree::<Sha256>(&mut file, &salt)
.context("failed to append verity tree")?;

trace!("Root hash for {:?} is {:x}", &base_name, root_hash);
Ok(root_hash)
})
.await
.map_err(|e| Status::unknown(format!("error in worker task: {e}")))?
.map_err(|e| Status::unknown(format!("failed to extract image layer: {e}")))?;

let generated_root_hash = format!("{:x}", generated_root_hash);

if root_hash != generated_root_hash {
return Err(Status::internal(format!(
"root hash mismatch: expected {}, got {}",
root_hash, generated_root_hash
)));
}

// Store a label with the root hash so that we can recall it later when mounting.
Expand Down

0 comments on commit 66d2248

Please sign in to comment.