From 565b7ae2740df9796dd5e6cba701ee1755344d9f Mon Sep 17 00:00:00 2001 From: John Howard Date: Wed, 7 Aug 2024 09:01:45 -0700 Subject: [PATCH] Fix regression in waypoints to use original IP Regression from https://github.com/istio/ztunnel/pull/1224 Before that PR (and now the behavior returns), we would use the IP family of the IP stuck in the workload.waypoint or service.waypoint field. This was always IPv4 due to istiod's implementation. With the PR, we started to use the original IP family of the request. Since waypoints had IPv6 addresses, we would use them. Due to another bug in Istiod (fixed in https://github.com/istio/istio/pull/52555), the waypoints didn't actually listen on IPv6, though, so would drop the requests. With this PR, we will continue to send to the IP actually specified. In the near future we will move to hostname references, where we can correctly pick the best IP family based on the request. --- src/proxy/outbound.rs | 49 ++++++++++++++++++++++++++++++++++++++++++- src/state.rs | 37 +++++++++++++++++++++----------- 2 files changed, 73 insertions(+), 13 deletions(-) diff --git a/src/proxy/outbound.rs b/src/proxy/outbound.rs index 6ca6c27b0..32322f3e4 100644 --- a/src/proxy/outbound.rs +++ b/src/proxy/outbound.rs @@ -538,7 +538,19 @@ mod tests { service_account: "waypoint-sa".to_string(), ..Default::default() }; - let mut workloads = vec![source, waypoint]; + let waypoint_dual = XdsWorkload { + uid: "cluster1//v1/Pod/ns/waypoint-workload-dual".to_string(), + name: "waypoint-workload-dual".to_string(), + namespace: "ns".to_string(), + addresses: vec![ + Bytes::copy_from_slice(&[127, 0, 0, 11]), + Bytes::copy_from_slice("ff06::c5".parse::().unwrap().octets().as_slice()), + ], + node: "local-node".to_string(), + service_account: "waypoint-sa".to_string(), + ..Default::default() + }; + let mut workloads = vec![source, waypoint, waypoint_dual]; let mut services = vec![]; for x in xds { match x { @@ -743,6 +755,7 @@ mod tests { ) .await; } + #[tokio::test] async fn build_request_destination_waypoint() { run_build_request( @@ -772,6 +785,40 @@ mod tests { .await; } + #[tokio::test] + async fn build_request_destination_waypoint_mismatch_ip() { + run_build_request( + "127.0.0.1", + "[ff06::c3]:80", + XdsAddressType::Workload(XdsWorkload { + uid: "cluster1//v1/Pod/default/my-pod".to_string(), + addresses: vec![ + Bytes::copy_from_slice(&[127, 0, 0, 2]), + Bytes::copy_from_slice( + "ff06::c3".parse::().unwrap().octets().as_slice(), + ), + ], + waypoint: Some(xds::istio::workload::GatewayAddress { + destination: Some(xds::istio::workload::gateway_address::Destination::Address( + XdsNetworkAddress { + network: "".to_string(), + address: [127, 0, 0, 11].to_vec(), + }, + )), + hbone_mtls_port: 15008, + }), + ..Default::default() + }), + // Should use the waypoint + Some(ExpectedRequest { + protocol: Protocol::HBONE, + hbone_destination: "[ff06::c3]:80", + destination: "127.0.0.11:15008", + }), + ) + .await; + } + #[tokio::test] async fn build_request_destination_svc_waypoint() { run_build_request( diff --git a/src/state.rs b/src/state.rs index 6e2b004bd..5626914e5 100644 --- a/src/state.rs +++ b/src/state.rs @@ -711,6 +711,7 @@ impl DemandProxyState { .find_upstream(network, source_workload, addr, resolution_mode) // Drop the lock }; + tracing::trace!(%addr, ?upstream, "fetch_upstream"); self.finalize_upstream(source_workload, addr, upstream) .await } @@ -734,13 +735,15 @@ impl DemandProxyState { ip_family_restriction, ) .await?; // if we can't load balance just return the error - Ok(Some(Upstream { + let res = Upstream { workload: wl, selected_workload_ip, port, service_sans: svc.map(|s| s.subject_alt_names.clone()).unwrap_or_default(), destination_service: svc_desc, - })) + }; + tracing::trace!(?res, "finalize_upstream"); + Ok(Some(res)) } async fn fetch_waypoint( @@ -751,25 +754,35 @@ impl DemandProxyState { ) -> Result { // Waypoint can be referred to by an IP or Hostname. // Hostname is preferred as it is a more stable identifier. - let res = match &gw_address.destination { + let (res, target_address) = match &gw_address.destination { Destination::Address(ip) => { let addr = SocketAddr::new(ip.address, gw_address.hbone_mtls_port); - self.state.read().unwrap().find_upstream( + let us = self.state.read().unwrap().find_upstream( ip.network.clone(), source_workload, addr, ServiceResolutionMode::Waypoint, - ) + ); + // If they referenced a waypoint by IP, use that IP as the destination. + // Note this means that an IPv6 call may be translated to IPv4 if the waypoint is specified + // as an IPv4 address. + // For this reason, the Hostname method is preferred which can adapt to the callers IP family. + (us, addr) } Destination::Hostname(host) => { let state = self.read(); match state.find_hostname(host) { - Some(Address::Service(s)) => state.find_upstream_from_service( - source_workload, - gw_address.hbone_mtls_port, - ServiceResolutionMode::Waypoint, - s, - ), + Some(Address::Service(s)) => { + let us = state.find_upstream_from_service( + source_workload, + gw_address.hbone_mtls_port, + ServiceResolutionMode::Waypoint, + s, + ); + // For hostname, use the original_destination_address as the target so we can + // adapt to the callers IP family. + (us, original_destination_address) + } Some(_) => { return Err(Error::UnsupportedFeature( "waypoint must be a service, not a workload".to_string(), @@ -784,7 +797,7 @@ impl DemandProxyState { } } }; - self.finalize_upstream(source_workload, original_destination_address, res) + self.finalize_upstream(source_workload, target_address, res) .await? .ok_or_else(|| Error::UnknownWaypoint(format!("waypoint {:?} not found", gw_address))) }