Skip to content

Commit

Permalink
Add support for publishNotReadyAddresses (istio#1231)
Browse files Browse the repository at this point in the history
* Add support for `publishNotReadyAddresses`

* rename
  • Loading branch information
howardjohn authored and antonioberben committed Oct 1, 2024
1 parent d82ade3 commit 0b8a784
Show file tree
Hide file tree
Showing 10 changed files with 377 additions and 54 deletions.
8 changes: 8 additions & 0 deletions proto/workload.proto
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,20 @@ message LoadBalancing {
// 4. Any endpoints
FAILOVER = 2;
}
enum HealthPolicy {
// Only select healthy endpoints
ONLY_HEALTHY = 0;
// Include all endpoints, even if they are unhealthy.
ALLOW_ALL = 1;
}

// routing_preference defines what scopes we want to keep traffic within.
// The `mode` determines how these routing preferences are handled
repeated Scope routing_preference = 1;
// mode defines how we should handle the routing preferences.
Mode mode = 2;
// health_policy defines how we should filter endpoints
HealthPolicy health_policy = 3;
}

// Workload represents a workload - an endpoint (or collection behind a hostname).
Expand Down
1 change: 1 addition & 0 deletions src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,7 @@ mod tests {
load_balancing: Some(XdsLoadBalancing {
routing_preference: vec![1, 2],
mode: 1,
health_policy: 1,
}), // ..Default::default() // intentionally don't default. we want all fields populated
ip_families: 0,
};
Expand Down
1 change: 1 addition & 0 deletions src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,7 @@ mod tests {
},
address: addr,
port: ports.clone(),
status: state::workload::HealthStatus::Healthy,
},
);
Service {
Expand Down
2 changes: 2 additions & 0 deletions src/proxy/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ mod tests {
test_helpers,
};

use crate::state::workload::HealthStatus;
use hickory_resolver::config::{ResolverConfig, ResolverOpts};
use prometheus_client::registry::Registry;
use test_case::test_case;
Expand Down Expand Up @@ -621,6 +622,7 @@ mod tests {
},
address: ep_addr,
port: std::collections::HashMap::new(),
status: HealthStatus::Healthy,
},
)]
.into_iter()
Expand Down
14 changes: 10 additions & 4 deletions src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,7 @@ impl ProxyState {
});

match svc.load_balancer {
None => endpoints.choose(&mut rand::thread_rng()),
Some(ref lb) => {
Some(ref lb) if lb.mode != LoadBalancerMode::Standard => {
let ranks = endpoints
.filter_map(|(ep, wl)| {
// Load balancer will define N targets we want to match
Expand Down Expand Up @@ -394,6 +393,7 @@ impl ProxyState {
.map(|(_, ep, wl)| (ep, wl))
.choose(&mut rand::thread_rng())
}
_ => endpoints.choose(&mut rand::thread_rng()),
}
}
}
Expand Down Expand Up @@ -908,8 +908,8 @@ impl ProxyStateManager {

#[cfg(test)]
mod tests {
use crate::state::service::LoadBalancer;
use crate::state::workload::Locality;
use crate::state::service::{LoadBalancer, LoadBalancerHealthPolicy};
use crate::state::workload::{HealthStatus, Locality};
use prometheus_client::registry::Registry;
use std::{net::Ipv4Addr, net::SocketAddrV4, time::Duration};

Expand Down Expand Up @@ -1180,6 +1180,7 @@ mod tests {
network: "".into(),
}),
port: tc.endpoint_mapping(),
status: HealthStatus::Healthy,
},
)]),
ports: tc.service_mapping(),
Expand Down Expand Up @@ -1355,6 +1356,7 @@ mod tests {
network: "".into(),
}),
port: HashMap::from([(80u16, 80u16)]),
status: HealthStatus::Healthy,
},
),
(
Expand All @@ -1370,6 +1372,7 @@ mod tests {
network: "".into(),
}),
port: HashMap::from([(80u16, 80u16)]),
status: HealthStatus::Healthy,
},
),
(
Expand All @@ -1385,6 +1388,7 @@ mod tests {
network: "".into(),
}),
port: HashMap::from([(80u16, 80u16)]),
status: HealthStatus::Healthy,
},
),
]);
Expand All @@ -1397,6 +1401,7 @@ mod tests {
LoadBalancerScopes::Region,
LoadBalancerScopes::Zone,
],
health_policy: LoadBalancerHealthPolicy::OnlyHealthy,
}),
ports: HashMap::from([(80u16, 80u16)]),
..test_helpers::mock_default_service()
Expand All @@ -1410,6 +1415,7 @@ mod tests {
LoadBalancerScopes::Region,
LoadBalancerScopes::Zone,
],
health_policy: LoadBalancerHealthPolicy::OnlyHealthy,
}),
ports: HashMap::from([(80u16, 80u16)]),
..test_helpers::mock_default_service()
Expand Down
56 changes: 52 additions & 4 deletions src/state/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ use tracing::trace;

use xds::istio::workload::Service as XdsService;

use crate::state::workload::is_default;
use crate::state::workload::{
byte_to_ip, network_addr, GatewayAddress, NamespacedHostname, NetworkAddress, Workload,
WorkloadError,
};
use crate::state::workload::{is_default, HealthStatus};
use crate::strng::Strng;
use crate::xds::istio::workload::load_balancing::Scope as XdsScope;
use crate::xds::istio::workload::{IpFamilies, PortList};
Expand Down Expand Up @@ -59,18 +59,42 @@ pub struct Service {

#[derive(Debug, Eq, PartialEq, Clone, serde::Serialize, serde::Deserialize)]
pub enum LoadBalancerMode {
// Do not consider LoadBalancerScopes when picking endpoints
Standard,
// Only select endpoints matching all LoadBalancerScopes when picking endpoints; otherwise, fail.
Strict,
// Prefer select endpoints matching all LoadBalancerScopes when picking endpoints but allow mismatches
Failover,
}

impl From<xds::istio::workload::load_balancing::Mode> for LoadBalancerMode {
fn from(value: xds::istio::workload::load_balancing::Mode) -> Self {
match value {
xds::istio::workload::load_balancing::Mode::Strict => LoadBalancerMode::Strict,
xds::istio::workload::load_balancing::Mode::Failover => LoadBalancerMode::Failover,
xds::istio::workload::load_balancing::Mode::UnspecifiedMode => {
LoadBalancerMode::Failover
LoadBalancerMode::Standard
}
}
}
}

#[derive(Default, Debug, Eq, PartialEq, Clone, serde::Serialize, serde::Deserialize)]
pub enum LoadBalancerHealthPolicy {
#[default]
OnlyHealthy,
AllowAll,
}

impl From<xds::istio::workload::load_balancing::HealthPolicy> for LoadBalancerHealthPolicy {
fn from(value: xds::istio::workload::load_balancing::HealthPolicy) -> Self {
match value {
xds::istio::workload::load_balancing::HealthPolicy::OnlyHealthy => {
LoadBalancerHealthPolicy::OnlyHealthy
}
xds::istio::workload::load_balancing::HealthPolicy::AllowAll => {
LoadBalancerHealthPolicy::AllowAll
}
xds::istio::workload::load_balancing::Mode::Failover => LoadBalancerMode::Failover,
}
}
}
Expand Down Expand Up @@ -105,6 +129,7 @@ impl TryFrom<XdsScope> for LoadBalancerScopes {
pub struct LoadBalancer {
pub routing_preferences: Vec<LoadBalancerScopes>,
pub mode: LoadBalancerMode,
pub health_policy: LoadBalancerHealthPolicy,
}

impl From<xds::istio::workload::IpFamilies> for Option<IpFamily> {
Expand Down Expand Up @@ -147,6 +172,15 @@ impl Service {
pub fn contains_endpoint(&self, wl: &Workload, addr: Option<&NetworkAddress>) -> bool {
self.endpoints.contains_key(&endpoint_uid(&wl.uid, addr))
}

pub fn should_include_endpoint(&self, ep_health: HealthStatus) -> bool {
ep_health == HealthStatus::Healthy
|| self
.load_balancer
.as_ref()
.map(|lb| lb.health_policy == LoadBalancerHealthPolicy::AllowAll)
.unwrap_or(false)
}
}

#[derive(Debug, Hash, Eq, PartialEq, Clone, serde::Serialize)]
Expand Down Expand Up @@ -181,6 +215,9 @@ pub struct Endpoint {

/// The port mapping.
pub port: HashMap<u16, u16>,

/// Health status for the endpoint
pub status: HealthStatus,
}

pub fn endpoint_uid(workload_uid: &str, address: Option<&NetworkAddress>) -> Strng {
Expand Down Expand Up @@ -220,6 +257,10 @@ impl TryFrom<&XdsService> for Service {
})
.collect::<Result<Vec<LoadBalancerScopes>, WorkloadError>>()?,
mode: xds::istio::workload::load_balancing::Mode::try_from(lb.mode)?.into(),
health_policy: xds::istio::workload::load_balancing::HealthPolicy::try_from(
lb.health_policy,
)?
.into(),
})
} else {
None
Expand Down Expand Up @@ -324,7 +365,12 @@ impl ServiceStore {
pub fn insert_endpoint(&mut self, ep: Endpoint) {
let ep_uid = endpoint_uid(&ep.workload_uid, ep.address.as_ref());
if let Some(svc) = self.get_by_namespaced_host(&ep.service) {
// We may or may not accept the endpoint based on it's health
if !svc.should_include_endpoint(ep.status) {
return;
}
let mut svc = Arc::unwrap_or_clone(svc);

// Clone the service and add the endpoint.
svc.endpoints.insert(ep_uid, ep);

Expand Down Expand Up @@ -386,7 +432,9 @@ impl ServiceStore {
let namespaced_hostname = service.namespaced_hostname();
if let Some(endpoints) = self.staged_services.remove(&namespaced_hostname) {
for (wip, ep) in endpoints {
service.endpoints.insert(wip.clone(), ep);
if service.should_include_endpoint(ep.status) {
service.endpoints.insert(wip.clone(), ep);
}
}
}

Expand Down
Loading

0 comments on commit 0b8a784

Please sign in to comment.