From 94e4823811763705faaa5f06030af84a34897a5f Mon Sep 17 00:00:00 2001 From: John Howard Date: Wed, 7 Aug 2024 11:56:23 -0700 Subject: [PATCH] Fix regression in waypoints to use original IP (#1253) Regression from https://github.com/istio/ztunnel/pull/1224 Before that PR (and now the behavior returns), we would use the IP family of the IP stuck in the workload.waypoint or service.waypoint field. This was always IPv4 due to istiod's implementation. With the PR, we started to use the original IP family of the request. Since waypoints had IPv6 addresses, we would use them. Due to another bug in Istiod (fixed in https://github.com/istio/istio/pull/52555), the waypoints didn't actually listen on IPv6, though, so would drop the requests. With this PR, we will continue to send to the IP actually specified. In the near future we will move to hostname references, where we can correctly pick the best IP family based on the request. --- src/proxy/outbound.rs | 49 ++++++++++++++++++++++++++++++++++++++++++- src/state.rs | 37 +++++++++++++++++++++----------- 2 files changed, 73 insertions(+), 13 deletions(-) diff --git a/src/proxy/outbound.rs b/src/proxy/outbound.rs index 2e2239611..14169fe11 100644 --- a/src/proxy/outbound.rs +++ b/src/proxy/outbound.rs @@ -539,7 +539,19 @@ mod tests { service_account: "waypoint-sa".to_string(), ..Default::default() }; - let mut workloads = vec![source, waypoint]; + let waypoint_dual = XdsWorkload { + uid: "cluster1//v1/Pod/ns/waypoint-workload-dual".to_string(), + name: "waypoint-workload-dual".to_string(), + namespace: "ns".to_string(), + addresses: vec![ + Bytes::copy_from_slice(&[127, 0, 0, 11]), + Bytes::copy_from_slice("ff06::c5".parse::().unwrap().octets().as_slice()), + ], + node: "local-node".to_string(), + service_account: "waypoint-sa".to_string(), + ..Default::default() + }; + let mut workloads = vec![source, waypoint, waypoint_dual]; let mut services = vec![]; for x in xds { match x { @@ -745,6 +757,7 @@ mod tests { ) .await; } + #[tokio::test] async fn build_request_destination_waypoint() { run_build_request( @@ -774,6 +787,40 @@ mod tests { .await; } + #[tokio::test] + async fn build_request_destination_waypoint_mismatch_ip() { + run_build_request( + "127.0.0.1", + "[ff06::c3]:80", + XdsAddressType::Workload(XdsWorkload { + uid: "cluster1//v1/Pod/default/my-pod".to_string(), + addresses: vec![ + Bytes::copy_from_slice(&[127, 0, 0, 2]), + Bytes::copy_from_slice( + "ff06::c3".parse::().unwrap().octets().as_slice(), + ), + ], + waypoint: Some(xds::istio::workload::GatewayAddress { + destination: Some(xds::istio::workload::gateway_address::Destination::Address( + XdsNetworkAddress { + network: "".to_string(), + address: [127, 0, 0, 11].to_vec(), + }, + )), + hbone_mtls_port: 15008, + }), + ..Default::default() + }), + // Should use the waypoint + Some(ExpectedRequest { + protocol: Protocol::HBONE, + hbone_destination: "[ff06::c3]:80", + destination: "127.0.0.11:15008", + }), + ) + .await; + } + #[tokio::test] async fn build_request_destination_svc_waypoint() { run_build_request( diff --git a/src/state.rs b/src/state.rs index 78610c86a..503c7471e 100644 --- a/src/state.rs +++ b/src/state.rs @@ -711,6 +711,7 @@ impl DemandProxyState { .find_upstream(network, source_workload, addr, resolution_mode) // Drop the lock }; + tracing::trace!(%addr, ?upstream, "fetch_upstream"); self.finalize_upstream(source_workload, addr, upstream) .await } @@ -734,13 +735,15 @@ impl DemandProxyState { ip_family_restriction, ) .await?; // if we can't load balance just return the error - Ok(Some(Upstream { + let res = Upstream { workload: wl, selected_workload_ip, port, service_sans: svc.map(|s| s.subject_alt_names.clone()).unwrap_or_default(), destination_service: svc_desc, - })) + }; + tracing::trace!(?res, "finalize_upstream"); + Ok(Some(res)) } async fn fetch_waypoint( @@ -751,25 +754,35 @@ impl DemandProxyState { ) -> Result { // Waypoint can be referred to by an IP or Hostname. // Hostname is preferred as it is a more stable identifier. - let res = match &gw_address.destination { + let (res, target_address) = match &gw_address.destination { Destination::Address(ip) => { let addr = SocketAddr::new(ip.address, gw_address.hbone_mtls_port); - self.state.read().unwrap().find_upstream( + let us = self.state.read().unwrap().find_upstream( ip.network.clone(), source_workload, addr, ServiceResolutionMode::Waypoint, - ) + ); + // If they referenced a waypoint by IP, use that IP as the destination. + // Note this means that an IPv6 call may be translated to IPv4 if the waypoint is specified + // as an IPv4 address. + // For this reason, the Hostname method is preferred which can adapt to the callers IP family. + (us, addr) } Destination::Hostname(host) => { let state = self.read(); match state.find_hostname(host) { - Some(Address::Service(s)) => state.find_upstream_from_service( - source_workload, - gw_address.hbone_mtls_port, - ServiceResolutionMode::Waypoint, - s, - ), + Some(Address::Service(s)) => { + let us = state.find_upstream_from_service( + source_workload, + gw_address.hbone_mtls_port, + ServiceResolutionMode::Waypoint, + s, + ); + // For hostname, use the original_destination_address as the target so we can + // adapt to the callers IP family. + (us, original_destination_address) + } Some(_) => { return Err(Error::UnsupportedFeature( "waypoint must be a service, not a workload".to_string(), @@ -784,7 +797,7 @@ impl DemandProxyState { } } }; - self.finalize_upstream(source_workload, original_destination_address, res) + self.finalize_upstream(source_workload, target_address, res) .await? .ok_or_else(|| Error::UnknownWaypoint(format!("waypoint {:?} not found", gw_address))) }