Skip to content

Commit

Permalink
feat: introduce timeout protection
Browse files Browse the repository at this point in the history
Ensure a policy cannot run more than a given amount of time.

This is required to fix kubewarden#254

Signed-off-by: Flavio Castelli <[email protected]>
  • Loading branch information
flavio committed Dec 19, 2022
1 parent bc89879 commit 3538b31
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 30 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ lazy_static = "1.4.0"
num_cpus = "1.13.1"
opentelemetry-otlp = { version = "0.10.0", features = ["metrics", "tonic"] }
opentelemetry = { version = "0.17", default-features = false, features = ["metrics", "trace", "rt-tokio", "serialize"] }
policy-evaluator = { git = "https://github.com/kubewarden/policy-evaluator", tag = "v0.5.0" }
#policy-evaluator = { git = "https://github.com/kubewarden/policy-evaluator", tag = "v0.5.0" }
policy-evaluator = { git = "https://github.com/flavio/policy-evaluator", branch = "timeout-protection" }
rayon = "1.6"
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
Expand Down
15 changes: 15 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,21 @@ pub(crate) fn build_cli() -> Command {
.required(false)
.help("Always accept AdmissionReviews that target the given namespace"),
)
.arg(
Arg::new("disable-timeout-protection")
.long("disable-timeout-protection")
.env("KUBEWARDEN_DISABLE_TIMEOUT_PROTECTION")
.required(false)
.help("Disable policy timeout protection"),
)
.arg(
Arg::new("policy-timeout")
.long("policy-timeout")
.env("KUBEWARDEN_POLICY_TIMEOUT")
.value_name("MAXIMUM_EXECUTION_TIME_SECONDS")
.default_value("2")
.help("Interrupt policy evaluation after the given time"),
)
.long_version(VERSION_AND_BUILTINS.as_str())
}

Expand Down
20 changes: 20 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,25 @@ fn main() -> Result<()> {
.map(PathBuf::from)
.expect("This should not happen, there's a default value for sigstore-cache-dir");

let policy_evaluation_limit = if matches.contains_id("disable-timeout-protection") {
None
} else {
match matches
.get_one::<String>("policy-timeout")
.expect("policy-timeout should always be set")
.parse::<u64>()
{
Ok(v) => Some(v),
Err(e) => {
fatal_error(format!(
"'policy-timeout' value cannot be converted to unsigned int: {}",
e
));
unreachable!()
}
}
};

////////////////////////////////////////////////////////////////////////////
// //
// Phase 1: setup the CallbackHandler. This is used by the synchronous //
Expand Down Expand Up @@ -135,6 +154,7 @@ fn main() -> Result<()> {
api_rx,
callback_sender_channel,
always_accept_admission_reviews_on_namespace,
policy_evaluation_limit,
);
worker_pool.run();
});
Expand Down
22 changes: 4 additions & 18 deletions src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use policy_evaluator::callback_requests::CallbackRequest;
use policy_evaluator::wasmtime;
use policy_evaluator::{
admission_response::{AdmissionResponse, AdmissionResponseStatus},
policy_evaluator::Evaluator,
policy_evaluator::ValidateRequest,
policy_evaluator::{Evaluator, ValidateRequest},
};
use std::{collections::HashMap, fmt, time::Instant};
use tokio::sync::mpsc::{Receiver, Sender};
Expand All @@ -26,11 +25,6 @@ struct PolicyEvaluatorWithSettings {
pub(crate) struct Worker {
evaluators: HashMap<String, PolicyEvaluatorWithSettings>,
channel_rx: Receiver<EvalRequest>,

// TODO: remove clippy's exception. This is going to be used to
// implement the epoch handling
#[allow(dead_code)]
engine: wasmtime::Engine,
}

pub struct PolicyErrors(HashMap<String, String>);
Expand All @@ -55,22 +49,14 @@ impl Worker {
rx: Receiver<EvalRequest>,
policies: &HashMap<String, Policy>,
precompiled_policies: &PrecompiledPolicies,
wasmtime_config: &wasmtime::Config,
engine: wasmtime::Engine,
callback_handler_tx: Sender<CallbackRequest>,
always_accept_admission_reviews_on_namespace: Option<String>,
policy_evaluation_limit_seconds: Option<u64>,
) -> Result<Worker, PolicyErrors> {
let mut evs_errors = HashMap::new();
let mut evs = HashMap::new();

let engine = wasmtime::Engine::new(wasmtime_config).map_err(|e| {
let mut errors = HashMap::new();
errors.insert(
"*".to_string(),
format!("Cannot create wasmtime::Engine: {:?}", e),
);
PolicyErrors(errors)
})?;

for (id, policy) in policies.iter() {
// It's safe to clone the outer engine. This creates a shallow copy
let inner_engine = engine.clone();
Expand All @@ -80,6 +66,7 @@ impl Worker {
&inner_engine,
precompiled_policies,
callback_handler_tx.clone(),
policy_evaluation_limit_seconds,
) {
Ok(pe) => Box::new(pe),
Err(e) => {
Expand Down Expand Up @@ -109,7 +96,6 @@ impl Worker {
Ok(Worker {
evaluators: evs,
channel_rx: rx,
engine,
})
}

Expand Down
76 changes: 69 additions & 7 deletions src/worker_pool.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::{anyhow, Result};
use core::time;
use policy_evaluator::{
callback_requests::CallbackRequest,
policy_evaluator::{Evaluator, PolicyEvaluator, PolicyExecutionMode},
Expand All @@ -20,7 +21,7 @@ use std::{
vec::Vec,
};
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, error, info};
use tracing::{debug, error, info, warn};

use crate::communication::{EvalRequest, WorkerPoolBootRequest};
use crate::policy_downloader::FetchedPolicies;
Expand Down Expand Up @@ -70,6 +71,7 @@ pub(crate) struct WorkerPool {
bootstrap_rx: oneshot::Receiver<WorkerPoolBootRequest>,
callback_handler_tx: mpsc::Sender<CallbackRequest>,
always_accept_admission_reviews_on_namespace: Option<String>,
policy_evaluation_limit_seconds: Option<u64>,
}

impl WorkerPool {
Expand All @@ -78,17 +80,20 @@ impl WorkerPool {
api_rx: mpsc::Receiver<EvalRequest>,
callback_handler_tx: mpsc::Sender<CallbackRequest>,
always_accept_admission_reviews_on_namespace: Option<String>,
policy_evaluation_limit_seconds: Option<u64>,
) -> WorkerPool {
WorkerPool {
api_rx,
bootstrap_rx,
callback_handler_tx,
always_accept_admission_reviews_on_namespace,
policy_evaluation_limit_seconds,
}
}

pub(crate) fn run(mut self) {
let mut worker_tx_chans = Vec::<mpsc::Sender<EvalRequest>>::new();
let mut worker_engines = Vec::<wasmtime::Engine>::new();
let mut join_handles = Vec::<JoinHandle<Result<()>>>::new();

// Phase 1: wait for bootstrap data to be received by the main
Expand All @@ -108,8 +113,10 @@ impl WorkerPool {

// To reduce bootstrap time, we will precompile all the WebAssembly
// modules we are going to use.
let wasmtime_config = wasmtime::Config::new();
// TODO: enable epoch deadlines
let mut wasmtime_config = wasmtime::Config::new();
if self.policy_evaluation_limit_seconds.is_some() {
wasmtime_config.epoch_interruption(true);
}

let engine = match wasmtime::Engine::new(&wasmtime_config) {
Ok(e) => e,
Expand All @@ -136,6 +143,7 @@ impl WorkerPool {
&bootstrap_data.policies,
&precompiled_policies,
self.callback_handler_tx.clone(),
self.policy_evaluation_limit_seconds,
) {
error!(?error, "cannot validate policy settings");
match bootstrap_data.resp_chan.send(Err(error)) {
Expand All @@ -151,13 +159,43 @@ impl WorkerPool {
let barrier = Arc::new(Barrier::new(pool_size + 1));
let boot_canary = Arc::new(AtomicBool::new(true));

if let Some(limit) = self.policy_evaluation_limit_seconds {
info!(
execution_limit_seconds = limit,
"policy timeout protection is enabled"
);
} else {
warn!("policy timeout protection is disabled");
}

for n in 1..=pool_size {
let (tx, rx) = mpsc::channel::<EvalRequest>(32);
worker_tx_chans.push(tx);

let engine = match wasmtime::Engine::new(&wasmtime_config) {
Ok(e) => e,
Err(e) => {
if bootstrap_data
.resp_chan
.send(Err(anyhow!(
"cannot create wasmtime engine for one of the workers: {}",
e
)))
.is_err()
{
eprint!(
"cannot create wasmtime engine for one of the workers: {}",
e
);
std::process::exit(1);
};
return;
}
};
worker_engines.push(engine.clone());

let policies = bootstrap_data.policies.clone();
let modules = precompiled_policies.clone();
let wasmtime_config = wasmtime_config.clone();
let b = barrier.clone();
let canary = boot_canary.clone();
let callback_handler_tx = self.callback_handler_tx.clone();
Expand All @@ -170,9 +208,10 @@ impl WorkerPool {
rx,
&policies,
&modules,
&wasmtime_config,
engine,
callback_handler_tx,
always_accept_admission_reviews_on_namespace,
self.policy_evaluation_limit_seconds,
) {
Ok(w) => w,
Err(e) => {
Expand Down Expand Up @@ -217,6 +256,21 @@ impl WorkerPool {
// We can start waiting for admission review requests to be evaluated
let mut next_worker_id = 0;

if self.policy_evaluation_limit_seconds.is_some() {
// start a dedicated thread that send tick events to all
// the workers. This is used by the wasmtime's epoch_interruption
// to keep track of the execution time of each wasm module
thread::spawn(move || {
let one_second = time::Duration::from_secs(1);
loop {
thread::sleep(one_second);
for engine in &worker_engines {
engine.increment_epoch();
}
}
});
}

while let Some(req) = self.api_rx.blocking_recv() {
let _ = worker_tx_chans[next_worker_id].blocking_send(req);
next_worker_id += 1;
Expand All @@ -237,6 +291,7 @@ pub(crate) fn build_policy_evaluator(
engine: &wasmtime::Engine,
policy_modules: &PrecompiledPolicies,
callback_handler_tx: mpsc::Sender<CallbackRequest>,
policy_evaluation_limit_seconds: Option<u64>,
) -> Result<PolicyEvaluator> {
let policy_module = policy_modules.get(policy.url.as_str()).ok_or_else(|| {
anyhow!(
Expand All @@ -248,7 +303,7 @@ pub(crate) fn build_policy_evaluator(
// See `wasmtime::Module::deserialize` to know why this method is `unsafe`.
// However, in our context, nothing bad will happen because we have
// full control of the precompiled module. This is generated by the
// WorkerPool thred
// WorkerPool thread
let module =
unsafe { wasmtime::Module::deserialize(engine, &policy_module.precompiled_module) }
.map_err(|e| {
Expand All @@ -259,13 +314,18 @@ pub(crate) fn build_policy_evaluator(
)
})?;

let policy_evaluator_builder = PolicyEvaluatorBuilder::new(policy_id.to_string())
let mut policy_evaluator_builder = PolicyEvaluatorBuilder::new(policy_id.to_string())
.engine(engine.clone())
.policy_module(module)
.settings(policy.settings_to_json()?)
.callback_channel(callback_handler_tx)
.execution_mode(policy_module.execution_mode);

if let Some(limit) = policy_evaluation_limit_seconds {
policy_evaluator_builder =
policy_evaluator_builder.enable_epoch_interruptions(limit, limit);
}

policy_evaluator_builder.build()
}

Expand Down Expand Up @@ -318,6 +378,7 @@ fn verify_policy_settings(
policies: &HashMap<String, crate::settings::Policy>,
policy_modules: &HashMap<String, PrecompiledPolicy>,
callback_handler_tx: mpsc::Sender<CallbackRequest>,
policy_evaluation_limit_seconds: Option<u64>,
) -> Result<()> {
let mut errors = vec![];
for (id, policy) in policies.iter() {
Expand All @@ -327,6 +388,7 @@ fn verify_policy_settings(
engine,
policy_modules,
callback_handler_tx.clone(),
policy_evaluation_limit_seconds,
) {
Ok(pe) => pe,
Err(e) => {
Expand Down

0 comments on commit 3538b31

Please sign in to comment.