diff --git a/src/proxy/outbound.rs b/src/proxy/outbound.rs index 2e2239611..14169fe11 100644 --- a/src/proxy/outbound.rs +++ b/src/proxy/outbound.rs @@ -539,7 +539,19 @@ mod tests { service_account: "waypoint-sa".to_string(), ..Default::default() }; - let mut workloads = vec![source, waypoint]; + let waypoint_dual = XdsWorkload { + uid: "cluster1//v1/Pod/ns/waypoint-workload-dual".to_string(), + name: "waypoint-workload-dual".to_string(), + namespace: "ns".to_string(), + addresses: vec![ + Bytes::copy_from_slice(&[127, 0, 0, 11]), + Bytes::copy_from_slice("ff06::c5".parse::().unwrap().octets().as_slice()), + ], + node: "local-node".to_string(), + service_account: "waypoint-sa".to_string(), + ..Default::default() + }; + let mut workloads = vec![source, waypoint, waypoint_dual]; let mut services = vec![]; for x in xds { match x { @@ -745,6 +757,7 @@ mod tests { ) .await; } + #[tokio::test] async fn build_request_destination_waypoint() { run_build_request( @@ -774,6 +787,40 @@ mod tests { .await; } + #[tokio::test] + async fn build_request_destination_waypoint_mismatch_ip() { + run_build_request( + "127.0.0.1", + "[ff06::c3]:80", + XdsAddressType::Workload(XdsWorkload { + uid: "cluster1//v1/Pod/default/my-pod".to_string(), + addresses: vec![ + Bytes::copy_from_slice(&[127, 0, 0, 2]), + Bytes::copy_from_slice( + "ff06::c3".parse::().unwrap().octets().as_slice(), + ), + ], + waypoint: Some(xds::istio::workload::GatewayAddress { + destination: Some(xds::istio::workload::gateway_address::Destination::Address( + XdsNetworkAddress { + network: "".to_string(), + address: [127, 0, 0, 11].to_vec(), + }, + )), + hbone_mtls_port: 15008, + }), + ..Default::default() + }), + // Should use the waypoint + Some(ExpectedRequest { + protocol: Protocol::HBONE, + hbone_destination: "[ff06::c3]:80", + destination: "127.0.0.11:15008", + }), + ) + .await; + } + #[tokio::test] async fn build_request_destination_svc_waypoint() { run_build_request( diff --git a/src/state.rs b/src/state.rs index 78610c86a..503c7471e 100644 --- a/src/state.rs +++ b/src/state.rs @@ -711,6 +711,7 @@ impl DemandProxyState { .find_upstream(network, source_workload, addr, resolution_mode) // Drop the lock }; + tracing::trace!(%addr, ?upstream, "fetch_upstream"); self.finalize_upstream(source_workload, addr, upstream) .await } @@ -734,13 +735,15 @@ impl DemandProxyState { ip_family_restriction, ) .await?; // if we can't load balance just return the error - Ok(Some(Upstream { + let res = Upstream { workload: wl, selected_workload_ip, port, service_sans: svc.map(|s| s.subject_alt_names.clone()).unwrap_or_default(), destination_service: svc_desc, - })) + }; + tracing::trace!(?res, "finalize_upstream"); + Ok(Some(res)) } async fn fetch_waypoint( @@ -751,25 +754,35 @@ impl DemandProxyState { ) -> Result { // Waypoint can be referred to by an IP or Hostname. // Hostname is preferred as it is a more stable identifier. - let res = match &gw_address.destination { + let (res, target_address) = match &gw_address.destination { Destination::Address(ip) => { let addr = SocketAddr::new(ip.address, gw_address.hbone_mtls_port); - self.state.read().unwrap().find_upstream( + let us = self.state.read().unwrap().find_upstream( ip.network.clone(), source_workload, addr, ServiceResolutionMode::Waypoint, - ) + ); + // If they referenced a waypoint by IP, use that IP as the destination. + // Note this means that an IPv6 call may be translated to IPv4 if the waypoint is specified + // as an IPv4 address. + // For this reason, the Hostname method is preferred which can adapt to the callers IP family. + (us, addr) } Destination::Hostname(host) => { let state = self.read(); match state.find_hostname(host) { - Some(Address::Service(s)) => state.find_upstream_from_service( - source_workload, - gw_address.hbone_mtls_port, - ServiceResolutionMode::Waypoint, - s, - ), + Some(Address::Service(s)) => { + let us = state.find_upstream_from_service( + source_workload, + gw_address.hbone_mtls_port, + ServiceResolutionMode::Waypoint, + s, + ); + // For hostname, use the original_destination_address as the target so we can + // adapt to the callers IP family. + (us, original_destination_address) + } Some(_) => { return Err(Error::UnsupportedFeature( "waypoint must be a service, not a workload".to_string(), @@ -784,7 +797,7 @@ impl DemandProxyState { } } }; - self.finalize_upstream(source_workload, original_destination_address, res) + self.finalize_upstream(source_workload, target_address, res) .await? .ok_or_else(|| Error::UnknownWaypoint(format!("waypoint {:?} not found", gw_address))) }