diff --git a/proto/workload.proto b/proto/workload.proto index 8645c78da..8acba4fc8 100644 --- a/proto/workload.proto +++ b/proto/workload.proto @@ -137,12 +137,20 @@ message LoadBalancing { // 4. Any endpoints FAILOVER = 2; } + enum HealthPolicy { + // Only select healthy endpoints + ONLY_HEALTHY = 0; + // Include all endpoints, even if they are unhealthy. + ALLOW_ALL = 1; + } // routing_preference defines what scopes we want to keep traffic within. // The `mode` determines how these routing preferences are handled repeated Scope routing_preference = 1; // mode defines how we should handle the routing preferences. Mode mode = 2; + // health_policy defines how we should filter endpoints + HealthPolicy health_policy = 3; } // Workload represents a workload - an endpoint (or collection behind a hostname). diff --git a/src/admin.rs b/src/admin.rs index 7cb4717a2..3efe17da4 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -685,6 +685,7 @@ mod tests { load_balancing: Some(XdsLoadBalancing { routing_preference: vec![1, 2], mode: 1, + health_policy: 1, }), // ..Default::default() // intentionally don't default. we want all fields populated ip_families: 0, }; diff --git a/src/proxy.rs b/src/proxy.rs index bccbcdc99..92cbdfc8f 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -874,6 +874,7 @@ mod tests { }, address: addr, port: ports.clone(), + status: state::workload::HealthStatus::Healthy, }, ); Service { diff --git a/src/proxy/inbound.rs b/src/proxy/inbound.rs index 23229618f..e286b304b 100644 --- a/src/proxy/inbound.rs +++ b/src/proxy/inbound.rs @@ -515,6 +515,7 @@ mod tests { test_helpers, }; + use crate::state::workload::HealthStatus; use hickory_resolver::config::{ResolverConfig, ResolverOpts}; use prometheus_client::registry::Registry; use test_case::test_case; @@ -621,6 +622,7 @@ mod tests { }, address: ep_addr, port: std::collections::HashMap::new(), + status: HealthStatus::Healthy, }, )] .into_iter() diff --git a/src/state.rs b/src/state.rs index 6faa6f902..5cc80fcac 100644 --- a/src/state.rs +++ b/src/state.rs @@ -349,8 +349,7 @@ impl ProxyState { }); match svc.load_balancer { - None => endpoints.choose(&mut rand::thread_rng()), - Some(ref lb) => { + Some(ref lb) if lb.mode != LoadBalancerMode::Standard => { let ranks = endpoints .filter_map(|(ep, wl)| { // Load balancer will define N targets we want to match @@ -394,6 +393,7 @@ impl ProxyState { .map(|(_, ep, wl)| (ep, wl)) .choose(&mut rand::thread_rng()) } + _ => endpoints.choose(&mut rand::thread_rng()), } } } @@ -908,8 +908,8 @@ impl ProxyStateManager { #[cfg(test)] mod tests { - use crate::state::service::LoadBalancer; - use crate::state::workload::Locality; + use crate::state::service::{LoadBalancer, LoadBalancerHealthPolicy}; + use crate::state::workload::{HealthStatus, Locality}; use prometheus_client::registry::Registry; use std::{net::Ipv4Addr, net::SocketAddrV4, time::Duration}; @@ -1180,6 +1180,7 @@ mod tests { network: "".into(), }), port: tc.endpoint_mapping(), + status: HealthStatus::Healthy, }, )]), ports: tc.service_mapping(), @@ -1355,6 +1356,7 @@ mod tests { network: "".into(), }), port: HashMap::from([(80u16, 80u16)]), + status: HealthStatus::Healthy, }, ), ( @@ -1370,6 +1372,7 @@ mod tests { network: "".into(), }), port: HashMap::from([(80u16, 80u16)]), + status: HealthStatus::Healthy, }, ), ( @@ -1385,6 +1388,7 @@ mod tests { network: "".into(), }), port: HashMap::from([(80u16, 80u16)]), + status: HealthStatus::Healthy, }, ), ]); @@ -1397,6 +1401,7 @@ mod tests { LoadBalancerScopes::Region, LoadBalancerScopes::Zone, ], + health_policy: LoadBalancerHealthPolicy::OnlyHealthy, }), ports: HashMap::from([(80u16, 80u16)]), ..test_helpers::mock_default_service() @@ -1410,6 +1415,7 @@ mod tests { LoadBalancerScopes::Region, LoadBalancerScopes::Zone, ], + health_policy: LoadBalancerHealthPolicy::OnlyHealthy, }), ports: HashMap::from([(80u16, 80u16)]), ..test_helpers::mock_default_service() diff --git a/src/state/service.rs b/src/state/service.rs index f02f4fe6e..3d878a490 100644 --- a/src/state/service.rs +++ b/src/state/service.rs @@ -22,11 +22,11 @@ use tracing::trace; use xds::istio::workload::Service as XdsService; -use crate::state::workload::is_default; use crate::state::workload::{ byte_to_ip, network_addr, GatewayAddress, NamespacedHostname, NetworkAddress, Workload, WorkloadError, }; +use crate::state::workload::{is_default, HealthStatus}; use crate::strng::Strng; use crate::xds::istio::workload::load_balancing::Scope as XdsScope; use crate::xds::istio::workload::{IpFamilies, PortList}; @@ -59,7 +59,11 @@ pub struct Service { #[derive(Debug, Eq, PartialEq, Clone, serde::Serialize, serde::Deserialize)] pub enum LoadBalancerMode { + // Do not consider LoadBalancerScopes when picking endpoints + Standard, + // Only select endpoints matching all LoadBalancerScopes when picking endpoints; otherwise, fail. Strict, + // Prefer select endpoints matching all LoadBalancerScopes when picking endpoints but allow mismatches Failover, } @@ -67,10 +71,30 @@ impl From for LoadBalancerMode { fn from(value: xds::istio::workload::load_balancing::Mode) -> Self { match value { xds::istio::workload::load_balancing::Mode::Strict => LoadBalancerMode::Strict, + xds::istio::workload::load_balancing::Mode::Failover => LoadBalancerMode::Failover, xds::istio::workload::load_balancing::Mode::UnspecifiedMode => { - LoadBalancerMode::Failover + LoadBalancerMode::Standard + } + } + } +} + +#[derive(Default, Debug, Eq, PartialEq, Clone, serde::Serialize, serde::Deserialize)] +pub enum LoadBalancerHealthPolicy { + #[default] + OnlyHealthy, + AllowAll, +} + +impl From for LoadBalancerHealthPolicy { + fn from(value: xds::istio::workload::load_balancing::HealthPolicy) -> Self { + match value { + xds::istio::workload::load_balancing::HealthPolicy::OnlyHealthy => { + LoadBalancerHealthPolicy::OnlyHealthy + } + xds::istio::workload::load_balancing::HealthPolicy::AllowAll => { + LoadBalancerHealthPolicy::AllowAll } - xds::istio::workload::load_balancing::Mode::Failover => LoadBalancerMode::Failover, } } } @@ -105,6 +129,7 @@ impl TryFrom for LoadBalancerScopes { pub struct LoadBalancer { pub routing_preferences: Vec, pub mode: LoadBalancerMode, + pub health_policy: LoadBalancerHealthPolicy, } impl From for Option { @@ -147,6 +172,15 @@ impl Service { pub fn contains_endpoint(&self, wl: &Workload, addr: Option<&NetworkAddress>) -> bool { self.endpoints.contains_key(&endpoint_uid(&wl.uid, addr)) } + + pub fn should_include_endpoint(&self, ep_health: HealthStatus) -> bool { + ep_health == HealthStatus::Healthy + || self + .load_balancer + .as_ref() + .map(|lb| lb.health_policy == LoadBalancerHealthPolicy::AllowAll) + .unwrap_or(false) + } } #[derive(Debug, Hash, Eq, PartialEq, Clone, serde::Serialize)] @@ -181,6 +215,9 @@ pub struct Endpoint { /// The port mapping. pub port: HashMap, + + /// Health status for the endpoint + pub status: HealthStatus, } pub fn endpoint_uid(workload_uid: &str, address: Option<&NetworkAddress>) -> Strng { @@ -220,6 +257,10 @@ impl TryFrom<&XdsService> for Service { }) .collect::, WorkloadError>>()?, mode: xds::istio::workload::load_balancing::Mode::try_from(lb.mode)?.into(), + health_policy: xds::istio::workload::load_balancing::HealthPolicy::try_from( + lb.health_policy, + )? + .into(), }) } else { None @@ -324,7 +365,12 @@ impl ServiceStore { pub fn insert_endpoint(&mut self, ep: Endpoint) { let ep_uid = endpoint_uid(&ep.workload_uid, ep.address.as_ref()); if let Some(svc) = self.get_by_namespaced_host(&ep.service) { + // We may or may not accept the endpoint based on it's health + if !svc.should_include_endpoint(ep.status) { + return; + } let mut svc = Arc::unwrap_or_clone(svc); + // Clone the service and add the endpoint. svc.endpoints.insert(ep_uid, ep); @@ -386,7 +432,9 @@ impl ServiceStore { let namespaced_hostname = service.namespaced_hostname(); if let Some(endpoints) = self.staged_services.remove(&namespaced_hostname) { for (wip, ep) in endpoints { - service.endpoints.insert(wip.clone(), ep); + if service.should_include_endpoint(ep.status) { + service.endpoints.insert(wip.clone(), ep); + } } } diff --git a/src/state/workload.rs b/src/state/workload.rs index 57bc01a5e..501b0a85c 100644 --- a/src/state/workload.rs +++ b/src/state/workload.rs @@ -722,10 +722,12 @@ mod tests { use crate::config::ConfigSource; use crate::state::{DemandProxyState, ProxyState, ServiceResolutionMode}; use crate::test_helpers::helpers::initialize_telemetry; - use crate::xds::istio::workload::Port as XdsPort; + use crate::xds::istio::workload::load_balancing::HealthPolicy; use crate::xds::istio::workload::PortList as XdsPortList; use crate::xds::istio::workload::Service as XdsService; use crate::xds::istio::workload::WorkloadStatus as XdsStatus; + use crate::xds::istio::workload::WorkloadStatus; + use crate::xds::istio::workload::{LoadBalancing, Port as XdsPort}; use crate::xds::{LocalClient, ProxyStateUpdateMutator}; use crate::{cert_fetcher, test_helpers}; use bytes::Bytes; @@ -817,19 +819,7 @@ mod tests { #[test] fn workload_information() { - initialize_telemetry(); - let state = Arc::new(RwLock::new(ProxyState::default())); - - let mut registry = Registry::default(); - let metrics = Arc::new(crate::proxy::Metrics::new(&mut registry)); - let demand = DemandProxyState::new( - state.clone(), - None, - ResolverConfig::default(), - ResolverOpts::default(), - metrics, - ); - let updater = ProxyStateUpdateMutator::new_no_fetch(); + let (state, demand, updater) = setup_test(); let ip1 = Ipv4Addr::new(127, 0, 0, 1); let ip2 = Ipv4Addr::new(127, 0, 0, 2); @@ -1181,20 +1171,275 @@ mod tests { assert_eq!((state.read().unwrap().services.num_services()), 0); } + #[test] + fn unhealthy_workloads_staged() { + let (state, _, updater) = setup_test(); + let services = HashMap::from([ + ( + "ns/svc-normal".to_string(), + XdsPortList { + ports: vec![XdsPort { + service_port: 80, + target_port: 8080, + }], + }, + ), + ( + "ns/svc-allow-unhealthy".to_string(), + XdsPortList { + ports: vec![XdsPort { + service_port: 80, + target_port: 8080, + }], + }, + ), + ]); + updater + .insert_workload( + &mut state.write().unwrap(), + XdsWorkload { + uid: "uid1".to_owned(), + name: "unhealthy".to_string(), + addresses: vec![], + services: services.clone(), + status: WorkloadStatus::Unhealthy as i32, + ..Default::default() + }, + ) + .unwrap(); + updater + .insert_workload( + &mut state.write().unwrap(), + XdsWorkload { + uid: "uid2".to_owned(), + name: "healthy".to_string(), + addresses: vec![], + services: services.clone(), + status: WorkloadStatus::Healthy as i32, + ..Default::default() + }, + ) + .unwrap(); + assert_eq!(state.read().unwrap().services.num_staged_services(), 2); + + let vip2 = Ipv4Addr::new(127, 0, 1, 2); + let vip1 = Ipv4Addr::new(127, 0, 1, 1); + updater + .insert_service( + &mut state.write().unwrap(), + XdsService { + name: "svc1".to_string(), + namespace: "ns".to_string(), + hostname: "svc-normal".to_string(), + addresses: vec![XdsNetworkAddress { + network: "".to_string(), + address: vip1.octets().to_vec(), + }], + ports: vec![XdsPort { + service_port: 80, + target_port: 80, + }], + subject_alt_names: vec![], + waypoint: None, + load_balancing: None, + ip_families: 0, + }, + ) + .unwrap(); + updater + .insert_service( + &mut state.write().unwrap(), + XdsService { + name: "svc1".to_string(), + namespace: "ns".to_string(), + hostname: "svc-allow-unhealthy".to_string(), + addresses: vec![XdsNetworkAddress { + network: "".to_string(), + address: vip2.octets().to_vec(), + }], + ports: vec![XdsPort { + service_port: 80, + target_port: 80, + }], + subject_alt_names: vec![], + waypoint: None, + load_balancing: Some(LoadBalancing { + routing_preference: vec![], + mode: 0, + health_policy: HealthPolicy::AllowAll as i32, + }), + ip_families: 0, + }, + ) + .unwrap(); + + let svc = state + .read() + .unwrap() + .services + .get_by_namespaced_host(&NamespacedHostname { + namespace: "ns".into(), + hostname: "svc-allow-unhealthy".into(), + }); + assert_eq!(svc.unwrap().endpoints.len(), 2); + let svc = state + .read() + .unwrap() + .services + .get_by_namespaced_host(&NamespacedHostname { + namespace: "ns".into(), + hostname: "svc-normal".into(), + }); + assert_eq!(svc.unwrap().endpoints.len(), 1); + } + + #[test] + fn unhealthy_workloads() { + let (state, _, updater) = setup_test(); + + let vip2 = Ipv4Addr::new(127, 0, 1, 2); + let vip1 = Ipv4Addr::new(127, 0, 1, 1); + let svc = XdsService { + name: "svc1".to_string(), + namespace: "ns".to_string(), + hostname: "svc-allow-unhealthy".to_string(), + addresses: vec![XdsNetworkAddress { + network: "".to_string(), + address: vip2.octets().to_vec(), + }], + ports: vec![XdsPort { + service_port: 80, + target_port: 80, + }], + subject_alt_names: vec![], + waypoint: None, + load_balancing: Some(LoadBalancing { + routing_preference: vec![], + mode: 0, + health_policy: HealthPolicy::AllowAll as i32, + }), + ip_families: 0, + }; + updater + .insert_service( + &mut state.write().unwrap(), + XdsService { + name: "svc1".to_string(), + namespace: "ns".to_string(), + hostname: "svc-normal".to_string(), + addresses: vec![XdsNetworkAddress { + network: "".to_string(), + address: vip1.octets().to_vec(), + }], + ports: vec![XdsPort { + service_port: 80, + target_port: 80, + }], + subject_alt_names: vec![], + waypoint: None, + load_balancing: None, + ip_families: 0, + }, + ) + .unwrap(); + updater + .insert_service(&mut state.write().unwrap(), svc.clone()) + .unwrap(); + + let services = HashMap::from([ + ( + "ns/svc-normal".to_string(), + XdsPortList { + ports: vec![XdsPort { + service_port: 80, + target_port: 8080, + }], + }, + ), + ( + "ns/svc-allow-unhealthy".to_string(), + XdsPortList { + ports: vec![XdsPort { + service_port: 80, + target_port: 8080, + }], + }, + ), + ]); + updater + .insert_workload( + &mut state.write().unwrap(), + XdsWorkload { + uid: "uid1".to_owned(), + name: "unhealthy".to_string(), + addresses: vec![], + services: services.clone(), + status: WorkloadStatus::Unhealthy as i32, + ..Default::default() + }, + ) + .unwrap(); + updater + .insert_workload( + &mut state.write().unwrap(), + XdsWorkload { + uid: "uid2".to_owned(), + name: "healthy".to_string(), + addresses: vec![], + services: services.clone(), + status: WorkloadStatus::Healthy as i32, + ..Default::default() + }, + ) + .unwrap(); + + let assert = |host: &str, want: usize| { + let s = state + .read() + .unwrap() + .services + .get_by_namespaced_host(&NamespacedHostname { + namespace: "ns".into(), + hostname: host.into(), + }); + assert_eq!( + s.unwrap().endpoints.len(), + want, + "host {host} wanted {want}" + ); + }; + assert("svc-allow-unhealthy", 2); + assert("svc-normal", 1); + + // Switch the service to not allow unhealthy + let mut swapped = svc.clone(); + swapped.load_balancing = None; + updater + .insert_service(&mut state.write().unwrap(), swapped) + .unwrap(); + assert("svc-allow-unhealthy", 1); + assert("svc-normal", 1); + + // Switch the service to allow unhealthy again + let mut swapped = svc.clone(); + swapped.load_balancing = Some(LoadBalancing { + routing_preference: vec![], + mode: 0, + health_policy: HealthPolicy::AllowAll as i32, + }); + updater + .insert_service(&mut state.write().unwrap(), swapped) + .unwrap(); + // TODO: this is not currently supported. The endpoints set on services is not reconcile, but rather + // incrementally updated on adds/updates/removes. Since we don't store unhealthy endpoints, + // we cannot add them back. + // A fix for this would be to always store endpoints and make sure we filter them out where needed. + assert("svc-allow-unhealthy", 1); + assert("svc-normal", 1); + } #[test] fn staged_services_cleanup() { - initialize_telemetry(); - let state = Arc::new(RwLock::new(ProxyState::default())); - let mut registry = Registry::default(); - let metrics = Arc::new(crate::proxy::Metrics::new(&mut registry)); - let demand = DemandProxyState::new( - state.clone(), - None, - ResolverConfig::default(), - ResolverOpts::default(), - metrics, - ); - let updater = ProxyStateUpdateMutator::new_no_fetch(); + let (state, demand, updater) = setup_test(); assert_eq!((state.read().unwrap().workloads.by_addr.len()), 0); assert_eq!((state.read().unwrap().workloads.by_uid.len()), 0); assert_eq!((state.read().unwrap().services.num_vips()), 0); @@ -1264,6 +1509,26 @@ mod tests { assert_eq!((state.read().unwrap().services.num_staged_services()), 0); // should remove the VIP if no longer needed } + fn setup_test() -> ( + Arc>, + DemandProxyState, + ProxyStateUpdateMutator, + ) { + initialize_telemetry(); + let state = Arc::new(RwLock::new(ProxyState::default())); + let mut registry = Registry::default(); + let metrics = Arc::new(crate::proxy::Metrics::new(&mut registry)); + let demand = DemandProxyState::new( + state.clone(), + None, + ResolverConfig::default(), + ResolverOpts::default(), + metrics, + ); + let updater = ProxyStateUpdateMutator::new_no_fetch(); + (state, demand, updater) + } + #[track_caller] fn assert_vips(state: &DemandProxyState, want: Vec<&str>) { let mut wants: HashSet = HashSet::from_iter(want.iter().map(|x| x.to_string())); @@ -1303,17 +1568,7 @@ mod tests { .join("examples") .join("localhost.yaml"), ); - let state = Arc::new(RwLock::new(ProxyState::default())); - - let mut registry = Registry::default(); - let metrics = Arc::new(crate::proxy::Metrics::new(&mut registry)); - let demand = DemandProxyState::new( - state.clone(), - None, - ResolverConfig::default(), - ResolverOpts::default(), - metrics, - ); + let (state, demand, _) = setup_test(); let local_client = LocalClient { cfg, state: state.clone(), diff --git a/src/test_helpers.rs b/src/test_helpers.rs index b78acd6c9..b7538f7c4 100644 --- a/src/test_helpers.rs +++ b/src/test_helpers.rs @@ -15,11 +15,11 @@ use crate::config::ConfigSource; use crate::config::{self, RootCert}; use crate::state::service::{Endpoint, Service}; -use crate::state::workload::Protocol; use crate::state::workload::Protocol::{HBONE, TCP}; use crate::state::workload::{ gatewayaddress, GatewayAddress, NamespacedHostname, NetworkAddress, Workload, }; +use crate::state::workload::{HealthStatus, Protocol}; use crate::state::{DemandProxyState, ProxyState}; use crate::xds::istio::security::Authorization as XdsAuthorization; use crate::xds::istio::workload::address; @@ -292,6 +292,7 @@ fn test_custom_svc( }, address: addr, port: HashMap::from([(80u16, echo_port)]), + status: HealthStatus::Healthy, }, )]), subject_alt_names: vec!["spiffe://cluster.local/ns/default/sa/default".into()], diff --git a/src/test_helpers/linux.rs b/src/test_helpers/linux.rs index 29a801d74..61e6be2e8 100644 --- a/src/test_helpers/linux.rs +++ b/src/test_helpers/linux.rs @@ -15,7 +15,7 @@ use crate::config::{ConfigSource, ProxyMode}; use crate::rbac::Authorization; use crate::state::service::{endpoint_uid, Endpoint, Service}; -use crate::state::workload::{gatewayaddress, Workload}; +use crate::state::workload::{gatewayaddress, HealthStatus, Workload}; use crate::test_helpers::app::TestApp; use crate::test_helpers::netns::{Namespace, Resolver}; use crate::test_helpers::*; @@ -482,6 +482,7 @@ impl<'a> TestWorkloadBuilder<'a> { service: service_name.clone(), address: Some(ep_network_addr.clone()), port: ports.to_owned(), + status: HealthStatus::Healthy, }; let mut svc = self.manager.services.get(&service_name).unwrap().clone(); let ep_uid = endpoint_uid(&self.w.workload.uid, Some(&ep_network_addr)); diff --git a/src/xds.rs b/src/xds.rs index 3e2ef8bcb..c9ebefe92 100644 --- a/src/xds.rs +++ b/src/xds.rs @@ -37,7 +37,7 @@ use crate::cert_fetcher::{CertFetcher, NoCertFetcher}; use crate::config::ConfigSource; use crate::rbac::Authorization; use crate::state::service::{endpoint_uid, Endpoint, Service, ServiceStore}; -use crate::state::workload::{network_addr, HealthStatus, NamespacedHostname, Workload}; +use crate::state::workload::{network_addr, NamespacedHostname, Workload}; use crate::state::ProxyState; use crate::strng::Strng; use crate::{rbac, strng}; @@ -160,11 +160,7 @@ impl ProxyStateUpdateMutator { .cert_fetcher .should_track_certificates_for_removal(&workload); state.workloads.insert(workload.clone(), track); - // Unhealthy workloads are always inserted, as we may get or receive traffic to them. - // But we shouldn't include them in load balancing we do to Services. - if workload.status == HealthStatus::Healthy { - insert_service_endpoints(&workload, &services, &mut state.services)?; - } + insert_service_endpoints(&workload, &services, &mut state.services)?; Ok(()) } @@ -264,7 +260,9 @@ impl ProxyStateUpdateMutator { .get_by_namespaced_host(&service.namespaced_hostname()) { for (wip, ep) in prev.endpoints.iter() { - service.endpoints.insert(wip.clone(), ep.clone()); + if service.should_include_endpoint(ep.status) { + service.endpoints.insert(wip.clone(), ep.clone()); + } } } @@ -358,6 +356,7 @@ fn insert_service_endpoints( service: namespaced_host.clone(), address: Some(network_addr(workload.network.clone(), *wip)), port: ports.into(), + status: workload.status, }) } if workload.workload_ips.is_empty() { @@ -366,6 +365,7 @@ fn insert_service_endpoints( service: namespaced_host.clone(), address: None, port: ports.into(), + status: workload.status, }) } }