Skip to content

Commit

Permalink
Support hostname waypoints
Browse files Browse the repository at this point in the history
  • Loading branch information
howardjohn committed Jul 29, 2024
1 parent ea1be70 commit f851be1
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 72 deletions.
4 changes: 2 additions & 2 deletions src/proxy/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ impl OutboundConnection {
{
// if we have a waypoint for this svc, use it; otherwise route traffic normally
if let Some(waypoint) = state
.fetch_service_waypoint(&target_service, &source_workload)
.fetch_service_waypoint(&target_service, &source_workload, target)
.await?
{
let upstream_sans = waypoint.workload_and_services_san();
Expand Down Expand Up @@ -373,7 +373,7 @@ impl OutboundConnection {
if !from_waypoint && !svc_addressed {
// For case upstream server has enabled waypoint
let waypoint = state
.fetch_workload_waypoint(&us.workload, &source_workload)
.fetch_workload_waypoint(&us.workload, &source_workload, target)
.await?;
if let Some(waypoint) = waypoint {
let actual_destination = waypoint.workload_socket_addr();
Expand Down
186 changes: 116 additions & 70 deletions src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,35 +254,12 @@ impl ProxyState {
.services
.get_by_vip(&network_addr(network.clone(), addr.ip()))
{
// Randomly pick an upstream
// TODO: do this more efficiently, and not just randomly
let Some((ep, wl)) = self.load_balance(source_workload, &svc, addr, resolution_mode)
else {
debug!("VIP {} has no healthy endpoints", addr);
return None;
};

let svc_port = svc.ports.get(&addr.port()).copied().unwrap_or_default();
let target_port = if let Some(&port) = ep.port.get(&addr.port()) {
// prefer endpoint port mapping
port
} else if svc_port > 0 {
// otherwise, see if the service has this port
svc_port
} else if let Some(ApplicationTunnel { port: Some(_), .. }) = &wl.application_tunnel {
// when using app tunnel, we don't require the port to be found on the service
addr.port()
} else {
// no app tunnel or port mapping, error
debug!(
"found VIP {}, but port {} was unknown",
addr.ip(),
addr.port()
);
return None;
};

return Some((wl, target_port, Some(svc)));
return self.find_upstream_from_service(
source_workload,
addr.port(),
resolution_mode,
svc,
);
}
if let Some(wl) = self
.workloads
Expand All @@ -293,22 +270,55 @@ impl ProxyState {
None
}

fn find_upstream_from_service(
&self,
source_workload: &Workload,
svc_port: u16,
resolution_mode: ServiceResolutionMode,
svc: Arc<Service>,
) -> Option<(Arc<Workload>, u16, Option<Arc<Service>>)> {
// Randomly pick an upstream
// TODO: do this more efficiently, and not just randomly
let Some((ep, wl)) = self.load_balance(source_workload, &svc, svc_port, resolution_mode)
else {
debug!("Service {} has no healthy endpoints", svc.hostname);
return None;
};

let svc_target_port = svc.ports.get(&svc_port).copied().unwrap_or_default();
let target_port = if let Some(&ep_target_port) = ep.port.get(&svc_port) {
// prefer endpoint port mapping
ep_target_port
} else if svc_target_port > 0 {
// otherwise, see if the service has this port
svc_target_port
} else if let Some(ApplicationTunnel { port: Some(_), .. }) = &wl.application_tunnel {
// when using app tunnel, we don't require the port to be found on the service
svc_port
} else {
// no app tunnel or port mapping, error
debug!(
"found service {}, but port {} was unknown",
svc.hostname, svc_port
);
return None;
};

Some((wl, target_port, Some(svc)))
}

fn load_balance<'a>(
&self,
src: &Workload,
svc: &'a Service,
svc_addr: SocketAddr,
svc_port: u16,
resolution_mode: ServiceResolutionMode,
) -> Option<(&'a Endpoint, Arc<Workload>)> {
let target_port = svc.ports.get(&svc_addr.port()).copied();
let target_port = svc.ports.get(&svc_port).copied();

if resolution_mode == ServiceResolutionMode::Standard && target_port.is_none() {
// Port doesn't exist on the service at all, this is invalid
debug!(
"service {} does not have port {}",
svc.hostname,
svc_addr.port()
);
debug!("service {} does not have port {}", svc.hostname, svc_port);
return None;
};

Expand All @@ -319,14 +329,12 @@ impl ProxyState {
};
match resolution_mode {
ServiceResolutionMode::Standard => {
if target_port.unwrap_or_default() == 0
&& !ep.port.contains_key(&svc_addr.port())
{
if target_port.unwrap_or_default() == 0 && !ep.port.contains_key(&svc_port) {
// Filter workload out, it doesn't have a matching port
trace!(
"filter endpoint {}, it does not have service port {}",
ep.workload_uid,
svc_addr.port()
svc_port
);
return None;
}
Expand Down Expand Up @@ -698,18 +706,37 @@ impl DemandProxyState {
) -> Result<Option<Upstream>, Error> {
self.fetch_address(&network_addr(network.clone(), addr.ip()))
.await;
let Some((wl, port, svc)) = self.state.read().unwrap().find_upstream(
network,
source_workload,
addr,
resolution_mode,
) else {
let upstream = {
self.state.read().unwrap().find_upstream(
network,
source_workload,
addr,
resolution_mode,
)
// Drop the lock
};
self.finalize_upstream(source_workload, addr, upstream)
.await
}

async fn finalize_upstream(
&self,
source_workload: &Workload,
original_target_address: SocketAddr,
upstream: Option<(Arc<Workload>, u16, Option<Arc<Service>>)>,
) -> Result<Option<Upstream>, Error> {
let Some((wl, port, svc)) = upstream else {
return Ok(None);
};
let svc_desc = svc.clone().map(|s| ServiceDescription::from(s.as_ref()));
let ip_family_restriction = svc.as_ref().and_then(|s| s.ip_families);
let selected_workload_ip = self
.pick_workload_destination_or_resolve(&wl, source_workload, addr, ip_family_restriction)
.pick_workload_destination_or_resolve(
&wl,
source_workload,
original_target_address,
ip_family_restriction,
)
.await?; // if we can't load balance just return the error
Ok(Some(Upstream {
workload: wl,
Expand All @@ -724,36 +751,59 @@ impl DemandProxyState {
&self,
gw_address: &GatewayAddress,
source_workload: &Workload,
original_destination_address: SocketAddr,
) -> Result<Upstream, Error> {
let wp_nw_addr = match &gw_address.destination {
Destination::Address(ip) => ip,
Destination::Hostname(_) => {
return Err(Error::UnsupportedFeature(
"hostname lookup not supported yet".to_string(),
));
// Waypoint can be referred to by an IP or Hostname.
// Hostname is preferred as it is a more stable identifier.
let res = match &gw_address.destination {
Destination::Address(ip) => {
let addr = SocketAddr::new(ip.address, gw_address.hbone_mtls_port);
self.state.read().unwrap().find_upstream(
ip.network.clone(),
source_workload,
addr,
ServiceResolutionMode::Waypoint,
)
}
Destination::Hostname(host) => {
let state = self.state.read().unwrap();
match state.find_hostname(host) {
Some(Address::Service(s)) => state.find_upstream_from_service(
source_workload,
gw_address.hbone_mtls_port,
ServiceResolutionMode::Waypoint,
s,
),
Some(_) => {
return Err(Error::UnsupportedFeature(
"waypoint must be a service, not a workload".to_string(),
))
}
None => {
return Err(Error::UnknownWaypoint(format!(
"waypoint {} not found",
host.hostname
)))
}
}
}
};
let wp_socket_addr = SocketAddr::new(wp_nw_addr.address, gw_address.hbone_mtls_port);
self.fetch_upstream(
wp_nw_addr.network.clone(),
source_workload,
wp_socket_addr,
ServiceResolutionMode::Waypoint,
)
.await?
.ok_or_else(|| Error::UnknownWaypoint(format!("waypoint {} not found", wp_nw_addr.address)))
self.finalize_upstream(source_workload, original_destination_address, res)
.await?
.ok_or_else(|| Error::UnknownWaypoint(format!("waypoint {:?} not found", gw_address)))
}

pub async fn fetch_service_waypoint(
&self,
service: &Service,
source_workload: &Workload,
original_destination_address: SocketAddr,
) -> Result<Option<Upstream>, Error> {
let Some(gw_address) = &service.waypoint else {
// no waypoint
return Ok(None);
};
self.fetch_waypoint(gw_address, source_workload)
self.fetch_waypoint(gw_address, source_workload, original_destination_address)
.await
.map(Some)
}
Expand All @@ -762,12 +812,13 @@ impl DemandProxyState {
&self,
wl: &Workload,
source_workload: &Workload,
original_destination_address: SocketAddr,
) -> Result<Option<Upstream>, Error> {
let Some(gw_address) = &wl.waypoint else {
// no waypoint
return Ok(None);
};
self.fetch_waypoint(gw_address, source_workload)
self.fetch_waypoint(gw_address, source_workload, original_destination_address)
.await
.map(Some)
}
Expand Down Expand Up @@ -1424,12 +1475,7 @@ mod tests {

let assert_endpoint = |src: &Workload, svc: &Service, ips: Vec<&str>, desc: &str| {
let got = state
.load_balance(
src,
svc,
"0.0.0.0:80".parse().unwrap(),
ServiceResolutionMode::Standard,
)
.load_balance(src, svc, 80, ServiceResolutionMode::Standard)
.and_then(|(ep, _)| ep.address.clone())
.map(|addr| addr.address.to_string());
if ips.is_empty() {
Expand Down

0 comments on commit f851be1

Please sign in to comment.