Skip to content

Commit

Permalink
Support hostname waypoints (#1224)
Browse files Browse the repository at this point in the history
* Improve XDS error diagnostics

Based on user feedback.

Before/after

dns outage
```
2024-07-29T17:10:59.111431Z     warn    xds::client:xds{id=14}  XDS client connection error: gRPC connection error:status: Unknown, message: "client error (Connect)", source: dns error: failed to lookup address information: Temporary failure in name resolution, retrying in 15s
2024-07-29T17:22:14.958433Z     warn    xds::client:xds{id=3}   XDS client connection error: gRPC connection error connecting to https://istiod.istio-system.svc:15012: status: Unknown, message: "client error (Connect)", source: dns error: failed to lookup address information: Temporary failure in name resolution (hint: is the DNS server reachable?), retrying in 80ms
```

wrong dns name
```
2024-07-29T17:22:47.910253Z     warn    xds::client:xds{id=10}  XDS client connection error: gRPC connection error:status: Unknown, message: "client error (Connect)", source: dns error: failed to lookup address information: Name or service not known, retrying in 10.24s
2024-07-29T17:22:47.910253Z     warn    xds::client:xds{id=10}  XDS client connection error: gRPC connection error connecting to https://istiodx.istio-system.svc:15012: status: Unknown, message: "client error (Connect)", source: dns error: failed to lookup address information: Name or service not known, retrying in 10.24s
```

Bad auth (ztunnel)
```
2024-07-29T17:25:29.137815Z     warn    xds::client:xds{id=11}  XDS client connection error: gRPC connection error connecting to https://istiod.istio-system.svc:15012: status: Unauthenticated, message: "authentication failure", retrying in 15s
2024-07-29T17:35:00.273104Z     warn    xds::client:xds{id=9}   XDS client connection error: gRPC connection error connecting to https://istiod.istio-system.svc:15012: status: Unauthenticated, message: "authentication failure" (hint: check the control plane logs for more information), retrying in 5.12s
```

* Support hostname waypoints

* add test and drop unwrap
  • Loading branch information
howardjohn authored Aug 1, 2024
1 parent cb56b5b commit 189df4c
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 73 deletions.
4 changes: 2 additions & 2 deletions src/proxy/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ impl OutboundConnection {
{
// if we have a waypoint for this svc, use it; otherwise route traffic normally
if let Some(waypoint) = state
.fetch_service_waypoint(&target_service, &source_workload)
.fetch_service_waypoint(&target_service, &source_workload, target)
.await?
{
let upstream_sans = waypoint.workload_and_services_san();
Expand Down Expand Up @@ -373,7 +373,7 @@ impl OutboundConnection {
if !from_waypoint && !svc_addressed {
// For case upstream server has enabled waypoint
let waypoint = state
.fetch_workload_waypoint(&us.workload, &source_workload)
.fetch_workload_waypoint(&us.workload, &source_workload, target)
.await?;
if let Some(waypoint) = waypoint {
let actual_destination = waypoint.workload_socket_addr();
Expand Down
184 changes: 113 additions & 71 deletions src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,35 +254,12 @@ impl ProxyState {
.services
.get_by_vip(&network_addr(network.clone(), addr.ip()))
{
// Randomly pick an upstream
// TODO: do this more efficiently, and not just randomly
let Some((ep, wl)) = self.load_balance(source_workload, &svc, addr, resolution_mode)
else {
debug!("VIP {} has no healthy endpoints", addr);
return None;
};

let svc_port = svc.ports.get(&addr.port()).copied().unwrap_or_default();
let target_port = if let Some(&port) = ep.port.get(&addr.port()) {
// prefer endpoint port mapping
port
} else if svc_port > 0 {
// otherwise, see if the service has this port
svc_port
} else if let Some(ApplicationTunnel { port: Some(_), .. }) = &wl.application_tunnel {
// when using app tunnel, we don't require the port to be found on the service
addr.port()
} else {
// no app tunnel or port mapping, error
debug!(
"found VIP {}, but port {} was unknown",
addr.ip(),
addr.port()
);
return None;
};

return Some((wl, target_port, Some(svc)));
return self.find_upstream_from_service(
source_workload,
addr.port(),
resolution_mode,
svc,
);
}
if let Some(wl) = self
.workloads
Expand All @@ -293,22 +270,55 @@ impl ProxyState {
None
}

fn find_upstream_from_service(
&self,
source_workload: &Workload,
svc_port: u16,
resolution_mode: ServiceResolutionMode,
svc: Arc<Service>,
) -> Option<(Arc<Workload>, u16, Option<Arc<Service>>)> {
// Randomly pick an upstream
// TODO: do this more efficiently, and not just randomly
let Some((ep, wl)) = self.load_balance(source_workload, &svc, svc_port, resolution_mode)
else {
debug!("Service {} has no healthy endpoints", svc.hostname);
return None;
};

let svc_target_port = svc.ports.get(&svc_port).copied().unwrap_or_default();
let target_port = if let Some(&ep_target_port) = ep.port.get(&svc_port) {
// prefer endpoint port mapping
ep_target_port
} else if svc_target_port > 0 {
// otherwise, see if the service has this port
svc_target_port
} else if let Some(ApplicationTunnel { port: Some(_), .. }) = &wl.application_tunnel {
// when using app tunnel, we don't require the port to be found on the service
svc_port
} else {
// no app tunnel or port mapping, error
debug!(
"found service {}, but port {} was unknown",
svc.hostname, svc_port
);
return None;
};

Some((wl, target_port, Some(svc)))
}

fn load_balance<'a>(
&self,
src: &Workload,
svc: &'a Service,
svc_addr: SocketAddr,
svc_port: u16,
resolution_mode: ServiceResolutionMode,
) -> Option<(&'a Endpoint, Arc<Workload>)> {
let target_port = svc.ports.get(&svc_addr.port()).copied();
let target_port = svc.ports.get(&svc_port).copied();

if resolution_mode == ServiceResolutionMode::Standard && target_port.is_none() {
// Port doesn't exist on the service at all, this is invalid
debug!(
"service {} does not have port {}",
svc.hostname,
svc_addr.port()
);
debug!("service {} does not have port {}", svc.hostname, svc_port);
return None;
};

Expand All @@ -319,14 +329,12 @@ impl ProxyState {
};
match resolution_mode {
ServiceResolutionMode::Standard => {
if target_port.unwrap_or_default() == 0
&& !ep.port.contains_key(&svc_addr.port())
{
if target_port.unwrap_or_default() == 0 && !ep.port.contains_key(&svc_port) {
// Filter workload out, it doesn't have a matching port
trace!(
"filter endpoint {}, it does not have service port {}",
ep.workload_uid,
svc_addr.port()
svc_port
);
return None;
}
Expand Down Expand Up @@ -686,7 +694,7 @@ impl DemandProxyState {
return None;
}
self.fetch_on_demand(uid.clone()).await;
self.state.read().unwrap().workloads.find_uid(uid)
self.read().workloads.find_uid(uid)
}

pub async fn fetch_upstream(
Expand All @@ -698,18 +706,33 @@ impl DemandProxyState {
) -> Result<Option<Upstream>, Error> {
self.fetch_address(&network_addr(network.clone(), addr.ip()))
.await;
let Some((wl, port, svc)) = self.state.read().unwrap().find_upstream(
network,
source_workload,
addr,
resolution_mode,
) else {
let upstream = {
self.read()
.find_upstream(network, source_workload, addr, resolution_mode)
// Drop the lock
};
self.finalize_upstream(source_workload, addr, upstream)
.await
}

async fn finalize_upstream(
&self,
source_workload: &Workload,
original_target_address: SocketAddr,
upstream: Option<(Arc<Workload>, u16, Option<Arc<Service>>)>,
) -> Result<Option<Upstream>, Error> {
let Some((wl, port, svc)) = upstream else {
return Ok(None);
};
let svc_desc = svc.clone().map(|s| ServiceDescription::from(s.as_ref()));
let ip_family_restriction = svc.as_ref().and_then(|s| s.ip_families);
let selected_workload_ip = self
.pick_workload_destination_or_resolve(&wl, source_workload, addr, ip_family_restriction)
.pick_workload_destination_or_resolve(
&wl,
source_workload,
original_target_address,
ip_family_restriction,
)
.await?; // if we can't load balance just return the error
Ok(Some(Upstream {
workload: wl,
Expand All @@ -724,36 +747,59 @@ impl DemandProxyState {
&self,
gw_address: &GatewayAddress,
source_workload: &Workload,
original_destination_address: SocketAddr,
) -> Result<Upstream, Error> {
let wp_nw_addr = match &gw_address.destination {
Destination::Address(ip) => ip,
Destination::Hostname(_) => {
return Err(Error::UnsupportedFeature(
"hostname lookup not supported yet".to_string(),
));
// Waypoint can be referred to by an IP or Hostname.
// Hostname is preferred as it is a more stable identifier.
let res = match &gw_address.destination {
Destination::Address(ip) => {
let addr = SocketAddr::new(ip.address, gw_address.hbone_mtls_port);
self.state.read().unwrap().find_upstream(
ip.network.clone(),
source_workload,
addr,
ServiceResolutionMode::Waypoint,
)
}
Destination::Hostname(host) => {
let state = self.read();
match state.find_hostname(host) {
Some(Address::Service(s)) => state.find_upstream_from_service(
source_workload,
gw_address.hbone_mtls_port,
ServiceResolutionMode::Waypoint,
s,
),
Some(_) => {
return Err(Error::UnsupportedFeature(
"waypoint must be a service, not a workload".to_string(),
))
}
None => {
return Err(Error::UnknownWaypoint(format!(
"waypoint {} not found",
host.hostname
)))
}
}
}
};
let wp_socket_addr = SocketAddr::new(wp_nw_addr.address, gw_address.hbone_mtls_port);
self.fetch_upstream(
wp_nw_addr.network.clone(),
source_workload,
wp_socket_addr,
ServiceResolutionMode::Waypoint,
)
.await?
.ok_or_else(|| Error::UnknownWaypoint(format!("waypoint {} not found", wp_nw_addr.address)))
self.finalize_upstream(source_workload, original_destination_address, res)
.await?
.ok_or_else(|| Error::UnknownWaypoint(format!("waypoint {:?} not found", gw_address)))
}

pub async fn fetch_service_waypoint(
&self,
service: &Service,
source_workload: &Workload,
original_destination_address: SocketAddr,
) -> Result<Option<Upstream>, Error> {
let Some(gw_address) = &service.waypoint else {
// no waypoint
return Ok(None);
};
self.fetch_waypoint(gw_address, source_workload)
self.fetch_waypoint(gw_address, source_workload, original_destination_address)
.await
.map(Some)
}
Expand All @@ -762,12 +808,13 @@ impl DemandProxyState {
&self,
wl: &Workload,
source_workload: &Workload,
original_destination_address: SocketAddr,
) -> Result<Option<Upstream>, Error> {
let Some(gw_address) = &wl.waypoint else {
// no waypoint
return Ok(None);
};
self.fetch_waypoint(gw_address, source_workload)
self.fetch_waypoint(gw_address, source_workload, original_destination_address)
.await
.map(Some)
}
Expand Down Expand Up @@ -1430,12 +1477,7 @@ mod tests {

let assert_endpoint = |src: &Workload, svc: &Service, ips: Vec<&str>, desc: &str| {
let got = state
.load_balance(
src,
svc,
"0.0.0.0:80".parse().unwrap(),
ServiceResolutionMode::Standard,
)
.load_balance(src, svc, 80, ServiceResolutionMode::Standard)
.and_then(|(ep, _)| ep.address.clone())
.map(|addr| addr.address.to_string());
if ips.is_empty() {
Expand Down
24 changes: 24 additions & 0 deletions src/test_helpers/linux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,18 @@ impl<'a> TestServiceBuilder<'a> {
self
}

/// Set the service waypoint by hostname
pub fn waypoint_hostname(mut self, hostname: &str) -> Self {
self.s.waypoint = Some(GatewayAddress {
destination: gatewayaddress::Destination::Hostname(NamespacedHostname {
namespace: strng::literal!("default"),
hostname: hostname.into(),
}),
hbone_mtls_port: 15008,
});
self
}

/// Finish building the service.
pub async fn register(self) -> anyhow::Result<()> {
self.manager
Expand Down Expand Up @@ -411,6 +423,18 @@ impl<'a> TestWorkloadBuilder<'a> {
self
}

/// Set the service waypoint by hostname
pub fn waypoint_hostname(mut self, hostname: &str) -> Self {
self.w.workload.waypoint = Some(GatewayAddress {
destination: gatewayaddress::Destination::Hostname(NamespacedHostname {
namespace: strng::literal!("default"),
hostname: hostname.into(),
}),
hbone_mtls_port: 15008,
});
self
}

/// Set a waypoint to the workload
pub fn mutate_workload(mut self, f: impl FnOnce(&mut Workload)) -> Self {
f(&mut self.w.workload);
Expand Down
Loading

0 comments on commit 189df4c

Please sign in to comment.