From b5bcfec62d6d217c09b7d9674bb9e82faba15281 Mon Sep 17 00:00:00 2001 From: John Howard Date: Thu, 1 Aug 2024 14:46:13 -0700 Subject: [PATCH] Support hostname waypoints (#1224) * Improve XDS error diagnostics Based on user feedback. Before/after dns outage ``` 2024-07-29T17:10:59.111431Z warn xds::client:xds{id=14} XDS client connection error: gRPC connection error:status: Unknown, message: "client error (Connect)", source: dns error: failed to lookup address information: Temporary failure in name resolution, retrying in 15s 2024-07-29T17:22:14.958433Z warn xds::client:xds{id=3} XDS client connection error: gRPC connection error connecting to https://istiod.istio-system.svc:15012: status: Unknown, message: "client error (Connect)", source: dns error: failed to lookup address information: Temporary failure in name resolution (hint: is the DNS server reachable?), retrying in 80ms ``` wrong dns name ``` 2024-07-29T17:22:47.910253Z warn xds::client:xds{id=10} XDS client connection error: gRPC connection error:status: Unknown, message: "client error (Connect)", source: dns error: failed to lookup address information: Name or service not known, retrying in 10.24s 2024-07-29T17:22:47.910253Z warn xds::client:xds{id=10} XDS client connection error: gRPC connection error connecting to https://istiodx.istio-system.svc:15012: status: Unknown, message: "client error (Connect)", source: dns error: failed to lookup address information: Name or service not known, retrying in 10.24s ``` Bad auth (ztunnel) ``` 2024-07-29T17:25:29.137815Z warn xds::client:xds{id=11} XDS client connection error: gRPC connection error connecting to https://istiod.istio-system.svc:15012: status: Unauthenticated, message: "authentication failure", retrying in 15s 2024-07-29T17:35:00.273104Z warn xds::client:xds{id=9} XDS client connection error: gRPC connection error connecting to https://istiod.istio-system.svc:15012: status: Unauthenticated, message: "authentication failure" (hint: check the control plane logs for more information), retrying in 5.12s ``` * Support hostname waypoints * add test and drop unwrap --- src/proxy/outbound.rs | 4 +- src/state.rs | 184 +++++++++++++++++++++++--------------- src/test_helpers/linux.rs | 24 +++++ tests/namespaced.rs | 76 ++++++++++++++++ 4 files changed, 215 insertions(+), 73 deletions(-) diff --git a/src/proxy/outbound.rs b/src/proxy/outbound.rs index b13dcee43..2e2239611 100644 --- a/src/proxy/outbound.rs +++ b/src/proxy/outbound.rs @@ -312,7 +312,7 @@ impl OutboundConnection { { // if we have a waypoint for this svc, use it; otherwise route traffic normally if let Some(waypoint) = state - .fetch_service_waypoint(&target_service, &source_workload) + .fetch_service_waypoint(&target_service, &source_workload, target) .await? { let upstream_sans = waypoint.workload_and_services_san(); @@ -373,7 +373,7 @@ impl OutboundConnection { if !from_waypoint && !svc_addressed { // For case upstream server has enabled waypoint let waypoint = state - .fetch_workload_waypoint(&us.workload, &source_workload) + .fetch_workload_waypoint(&us.workload, &source_workload, target) .await?; if let Some(waypoint) = waypoint { let actual_destination = waypoint.workload_socket_addr(); diff --git a/src/state.rs b/src/state.rs index 5cc80fcac..78610c86a 100644 --- a/src/state.rs +++ b/src/state.rs @@ -254,35 +254,12 @@ impl ProxyState { .services .get_by_vip(&network_addr(network.clone(), addr.ip())) { - // Randomly pick an upstream - // TODO: do this more efficiently, and not just randomly - let Some((ep, wl)) = self.load_balance(source_workload, &svc, addr, resolution_mode) - else { - debug!("VIP {} has no healthy endpoints", addr); - return None; - }; - - let svc_port = svc.ports.get(&addr.port()).copied().unwrap_or_default(); - let target_port = if let Some(&port) = ep.port.get(&addr.port()) { - // prefer endpoint port mapping - port - } else if svc_port > 0 { - // otherwise, see if the service has this port - svc_port - } else if let Some(ApplicationTunnel { port: Some(_), .. }) = &wl.application_tunnel { - // when using app tunnel, we don't require the port to be found on the service - addr.port() - } else { - // no app tunnel or port mapping, error - debug!( - "found VIP {}, but port {} was unknown", - addr.ip(), - addr.port() - ); - return None; - }; - - return Some((wl, target_port, Some(svc))); + return self.find_upstream_from_service( + source_workload, + addr.port(), + resolution_mode, + svc, + ); } if let Some(wl) = self .workloads @@ -293,22 +270,55 @@ impl ProxyState { None } + fn find_upstream_from_service( + &self, + source_workload: &Workload, + svc_port: u16, + resolution_mode: ServiceResolutionMode, + svc: Arc, + ) -> Option<(Arc, u16, Option>)> { + // Randomly pick an upstream + // TODO: do this more efficiently, and not just randomly + let Some((ep, wl)) = self.load_balance(source_workload, &svc, svc_port, resolution_mode) + else { + debug!("Service {} has no healthy endpoints", svc.hostname); + return None; + }; + + let svc_target_port = svc.ports.get(&svc_port).copied().unwrap_or_default(); + let target_port = if let Some(&ep_target_port) = ep.port.get(&svc_port) { + // prefer endpoint port mapping + ep_target_port + } else if svc_target_port > 0 { + // otherwise, see if the service has this port + svc_target_port + } else if let Some(ApplicationTunnel { port: Some(_), .. }) = &wl.application_tunnel { + // when using app tunnel, we don't require the port to be found on the service + svc_port + } else { + // no app tunnel or port mapping, error + debug!( + "found service {}, but port {} was unknown", + svc.hostname, svc_port + ); + return None; + }; + + Some((wl, target_port, Some(svc))) + } + fn load_balance<'a>( &self, src: &Workload, svc: &'a Service, - svc_addr: SocketAddr, + svc_port: u16, resolution_mode: ServiceResolutionMode, ) -> Option<(&'a Endpoint, Arc)> { - let target_port = svc.ports.get(&svc_addr.port()).copied(); + let target_port = svc.ports.get(&svc_port).copied(); if resolution_mode == ServiceResolutionMode::Standard && target_port.is_none() { // Port doesn't exist on the service at all, this is invalid - debug!( - "service {} does not have port {}", - svc.hostname, - svc_addr.port() - ); + debug!("service {} does not have port {}", svc.hostname, svc_port); return None; }; @@ -319,14 +329,12 @@ impl ProxyState { }; match resolution_mode { ServiceResolutionMode::Standard => { - if target_port.unwrap_or_default() == 0 - && !ep.port.contains_key(&svc_addr.port()) - { + if target_port.unwrap_or_default() == 0 && !ep.port.contains_key(&svc_port) { // Filter workload out, it doesn't have a matching port trace!( "filter endpoint {}, it does not have service port {}", ep.workload_uid, - svc_addr.port() + svc_port ); return None; } @@ -686,7 +694,7 @@ impl DemandProxyState { return None; } self.fetch_on_demand(uid.clone()).await; - self.state.read().unwrap().workloads.find_uid(uid) + self.read().workloads.find_uid(uid) } pub async fn fetch_upstream( @@ -698,18 +706,33 @@ impl DemandProxyState { ) -> Result, Error> { self.fetch_address(&network_addr(network.clone(), addr.ip())) .await; - let Some((wl, port, svc)) = self.state.read().unwrap().find_upstream( - network, - source_workload, - addr, - resolution_mode, - ) else { + let upstream = { + self.read() + .find_upstream(network, source_workload, addr, resolution_mode) + // Drop the lock + }; + self.finalize_upstream(source_workload, addr, upstream) + .await + } + + async fn finalize_upstream( + &self, + source_workload: &Workload, + original_target_address: SocketAddr, + upstream: Option<(Arc, u16, Option>)>, + ) -> Result, Error> { + let Some((wl, port, svc)) = upstream else { return Ok(None); }; let svc_desc = svc.clone().map(|s| ServiceDescription::from(s.as_ref())); let ip_family_restriction = svc.as_ref().and_then(|s| s.ip_families); let selected_workload_ip = self - .pick_workload_destination_or_resolve(&wl, source_workload, addr, ip_family_restriction) + .pick_workload_destination_or_resolve( + &wl, + source_workload, + original_target_address, + ip_family_restriction, + ) .await?; // if we can't load balance just return the error Ok(Some(Upstream { workload: wl, @@ -724,36 +747,59 @@ impl DemandProxyState { &self, gw_address: &GatewayAddress, source_workload: &Workload, + original_destination_address: SocketAddr, ) -> Result { - let wp_nw_addr = match &gw_address.destination { - Destination::Address(ip) => ip, - Destination::Hostname(_) => { - return Err(Error::UnsupportedFeature( - "hostname lookup not supported yet".to_string(), - )); + // Waypoint can be referred to by an IP or Hostname. + // Hostname is preferred as it is a more stable identifier. + let res = match &gw_address.destination { + Destination::Address(ip) => { + let addr = SocketAddr::new(ip.address, gw_address.hbone_mtls_port); + self.state.read().unwrap().find_upstream( + ip.network.clone(), + source_workload, + addr, + ServiceResolutionMode::Waypoint, + ) + } + Destination::Hostname(host) => { + let state = self.read(); + match state.find_hostname(host) { + Some(Address::Service(s)) => state.find_upstream_from_service( + source_workload, + gw_address.hbone_mtls_port, + ServiceResolutionMode::Waypoint, + s, + ), + Some(_) => { + return Err(Error::UnsupportedFeature( + "waypoint must be a service, not a workload".to_string(), + )) + } + None => { + return Err(Error::UnknownWaypoint(format!( + "waypoint {} not found", + host.hostname + ))) + } + } } }; - let wp_socket_addr = SocketAddr::new(wp_nw_addr.address, gw_address.hbone_mtls_port); - self.fetch_upstream( - wp_nw_addr.network.clone(), - source_workload, - wp_socket_addr, - ServiceResolutionMode::Waypoint, - ) - .await? - .ok_or_else(|| Error::UnknownWaypoint(format!("waypoint {} not found", wp_nw_addr.address))) + self.finalize_upstream(source_workload, original_destination_address, res) + .await? + .ok_or_else(|| Error::UnknownWaypoint(format!("waypoint {:?} not found", gw_address))) } pub async fn fetch_service_waypoint( &self, service: &Service, source_workload: &Workload, + original_destination_address: SocketAddr, ) -> Result, Error> { let Some(gw_address) = &service.waypoint else { // no waypoint return Ok(None); }; - self.fetch_waypoint(gw_address, source_workload) + self.fetch_waypoint(gw_address, source_workload, original_destination_address) .await .map(Some) } @@ -762,12 +808,13 @@ impl DemandProxyState { &self, wl: &Workload, source_workload: &Workload, + original_destination_address: SocketAddr, ) -> Result, Error> { let Some(gw_address) = &wl.waypoint else { // no waypoint return Ok(None); }; - self.fetch_waypoint(gw_address, source_workload) + self.fetch_waypoint(gw_address, source_workload, original_destination_address) .await .map(Some) } @@ -1430,12 +1477,7 @@ mod tests { let assert_endpoint = |src: &Workload, svc: &Service, ips: Vec<&str>, desc: &str| { let got = state - .load_balance( - src, - svc, - "0.0.0.0:80".parse().unwrap(), - ServiceResolutionMode::Standard, - ) + .load_balance(src, svc, 80, ServiceResolutionMode::Standard) .and_then(|(ep, _)| ep.address.clone()) .map(|addr| addr.address.to_string()); if ips.is_empty() { diff --git a/src/test_helpers/linux.rs b/src/test_helpers/linux.rs index 61e6be2e8..aae431931 100644 --- a/src/test_helpers/linux.rs +++ b/src/test_helpers/linux.rs @@ -344,6 +344,18 @@ impl<'a> TestServiceBuilder<'a> { self } + /// Set the service waypoint by hostname + pub fn waypoint_hostname(mut self, hostname: &str) -> Self { + self.s.waypoint = Some(GatewayAddress { + destination: gatewayaddress::Destination::Hostname(NamespacedHostname { + namespace: strng::literal!("default"), + hostname: hostname.into(), + }), + hbone_mtls_port: 15008, + }); + self + } + /// Finish building the service. pub async fn register(self) -> anyhow::Result<()> { self.manager @@ -411,6 +423,18 @@ impl<'a> TestWorkloadBuilder<'a> { self } + /// Set the service waypoint by hostname + pub fn waypoint_hostname(mut self, hostname: &str) -> Self { + self.w.workload.waypoint = Some(GatewayAddress { + destination: gatewayaddress::Destination::Hostname(NamespacedHostname { + namespace: strng::literal!("default"), + hostname: hostname.into(), + }), + hbone_mtls_port: 15008, + }); + self + } + /// Set a waypoint to the workload pub fn mutate_workload(mut self, f: impl FnOnce(&mut Workload)) -> Self { f(&mut self.w.workload); diff --git a/tests/namespaced.rs b/tests/namespaced.rs index 32571e29b..5ce1d81e3 100644 --- a/tests/namespaced.rs +++ b/tests/namespaced.rs @@ -249,6 +249,82 @@ mod namespaced { Ok(()) } + #[tokio::test] + async fn service_waypoint_hostname() -> anyhow::Result<()> { + let mut manager = setup_netns_test!(Shared); + + let zt = manager.deploy_ztunnel(DEFAULT_NODE).await?; + + manager + .service_builder("waypoint") + .addresses(vec![NetworkAddress { + network: strng::EMPTY, + address: TEST_VIP.parse::()?, + }]) + .ports(HashMap::from([(15008u16, 15008u16)])) + .register() + .await?; + let waypoint = manager + .workload_builder("waypoint", DEFAULT_NODE) + .uncaptured() + .service( + "default/waypoint.default.svc.cluster.local", + 80, + SERVER_PORT, + ) + .register() + .await?; + run_hbone_server(waypoint, "waypoint")?; + + manager + .workload_builder("server", DEFAULT_NODE) + .waypoint_hostname("waypoint.default.svc.cluster.local") + .register() + .await?; + let client = manager + .workload_builder("client", DEFAULT_NODE) + .register() + .await?; + + let server_ip = manager.resolver().resolve("server")?; + let waypoint_pod_ip = manager.resolver().resolve("waypoint")?; + run_tcp_to_hbone_client(client, manager.resolver(), "server")?; + + let metrics = [ + (CONNECTIONS_OPENED, 1), + (CONNECTIONS_CLOSED, 1), + (BYTES_RECV, REQ_SIZE), + (BYTES_SENT, HBONE_REQ_SIZE), + ]; + verify_metrics(&zt, &metrics, &source_labels()).await; + + let sent = format!("{REQ_SIZE}"); + let recv = format!("{HBONE_REQ_SIZE}"); + let hbone_addr = format!("{server_ip}:8080"); + let dst_addr = format!("{waypoint_pod_ip}:15008"); + let want = HashMap::from([ + ("scope", "access"), + ("src.workload", "client"), + ("dst.workload", "waypoint"), + ("dst.hbone_addr", &hbone_addr), + ("dst.addr", &dst_addr), + ("bytes_sent", &sent), + ("bytes_recv", &recv), + ("direction", "outbound"), + ("message", "connection complete"), + ( + "src.identity", + "spiffe://cluster.local/ns/default/sa/client", + ), + ( + "dst.identity", + "spiffe://cluster.local/ns/default/sa/waypoint", + ), + ]); + telemetry::testing::assert_contains(want); + Ok(()) + } + #[tokio::test] async fn sandwich_waypoint_plain() -> anyhow::Result<()> { let mut manager = setup_netns_test!(Shared);