Skip to content
This repository has been archived by the owner on Dec 6, 2024. It is now read-only.

[WIP] Add a Namer interface #80

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use super::{admin, resolver, router, server};
use super::balancer::BalancerFactory;
use super::connector::{ConfigError as ConnectorConfigError, ConnectorFactoryConfig};
use super::resolver::{ConfigError as ResolverConfigError, NamerdConfig, Namer, WithHandle};
use super::resolver::{ConfigError as ResolverConfigError, ConstantConfig, NamerdConfig, Namer};
use super::server::ConfigError as ServerConfigError;
use futures::{Future, Stream, sync};
use hyper::server::Http;
Expand Down Expand Up @@ -200,7 +200,11 @@ impl RouterConfig {
let (resolver, resolver_exec) = match self.interpreter {
InterpreterConfig::NamerdHttp(config) => {
let namerd = config.into_namerd(&metrics).map_err(Error::Interpreter)?;
resolver::new(namerd)
resolver::new(Namer::Namerd(namerd))
},
InterpreterConfig::Constant(config) => {
let constant = config.into_namer().map_err(Error::Interpreter)?;
resolver::new(Namer::Constant(constant))
}
};

Expand Down Expand Up @@ -263,6 +267,9 @@ pub enum InterpreterConfig {
/// Polls namerd for updates.
#[serde(rename = "io.l5d.namerd.http")]
NamerdHttp(NamerdConfig),
/// Static list of addresses
#[serde(rename = "io.l5d.static")]
Constant(ConstantConfig),
}

/// Configures the admin server.
Expand Down
45 changes: 44 additions & 1 deletion src/resolver/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use super::namerd::Namerd;
use super::WeightedAddr;
use super::namer::namerd::Namerd;
use std::time::Duration;
use tacho;
use url::{self, Url};
use std::net;
use std::collections::HashMap;
use std::iter::FromIterator;

pub type Result<T> = ::std::result::Result<T, Error>;

Expand Down Expand Up @@ -38,3 +42,42 @@ impl NamerdConfig {
Ok(namerd)
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields, rename_all = "camelCase")]
pub struct ConstantConfig {
addrs: HashMap<String, Vec<Addr>>
}

impl ConstantConfig {
pub fn into_namer(mut self) -> Result<HashMap<String, Vec<WeightedAddr>>> {
let weighted_iter = self.addrs.drain().map(|(path, addrs)| (path, to_weighted_addrs(&addrs)));
let weighted_addrs = HashMap::from_iter(weighted_iter);
Ok(weighted_addrs)
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
struct Addr {
ip: String,
port: u16,
weight: Option<f64>,
}

fn to_weighted_addrs(addrs: &[Addr]) -> Vec<WeightedAddr> {
// We never intentionally clear the EndpointMap.
let mut dsts: Vec<WeightedAddr> = Vec::new();
let mut sum = 0.0;
for na in addrs {
let addr = net::SocketAddr::new(na.ip.parse().unwrap(), na.port);
let w = na.weight.unwrap_or(1.0);
sum += w;
dsts.push(WeightedAddr::new(addr, w));
}
// Normalize weights on [0.0, 0.1].
for dst in &mut dsts {
dst.weight /= sum;
}
dsts
}

18 changes: 8 additions & 10 deletions src/resolver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ use tokio_timer::{Timer, TimerError};

mod config;
mod namer;
mod namerd;
pub use self::config::{Error as ConfigError, NamerdConfig};
pub use self::namerd::{Namerd, Addrs};
pub use self::namer::{Namer, WithHandle};
pub use self::config::{Error as ConfigError, ConstantConfig, NamerdConfig};
pub use self::namer::Namer;

#[derive(Debug)]
pub enum Error {
Expand All @@ -31,14 +29,14 @@ pub type Result<T> = ::std::result::Result<T, Error>;

/// Creates a multithreaded resolver.
///
/// The `Resolver` side is a client of the `Executor`. Namerd work is performed on
/// The `Resolver` side is a client of the `Executor`. Namer work is performed on
/// whatever thread the executor is spawned on.
pub fn new<N: Namer + 'static + Send>(namer: N) -> (Resolver, Executor) {
pub fn new(namer: Namer) -> (Resolver, Executor) {
let (tx, rx) = mpsc::unbounded();
let res = Resolver { requests: tx };
let exe = Executor {
requests: rx,
namer: Box::new(namer),
namer: namer,
};
(res, exe)
}
Expand Down Expand Up @@ -79,15 +77,15 @@ impl Stream for Resolve {
/// Serves resolutions from `Resolver`s.
pub struct Executor {
requests: mpsc::UnboundedReceiver<(Path, mpsc::UnboundedSender<Result<Vec<WeightedAddr>>>)>,
namer: Box<Namer + Send>,
namer: Namer,
}

impl Executor where {
impl Executor {
pub fn execute(self, handle: &Handle, timer: &Timer) -> Execute {
let handle = handle.clone();
let namer = self.namer.with_handle(&handle, timer);
let f = self.requests.for_each(move |(path, rsp_tx)| {
// Stream namerd resolutions to the response channel.
// Stream namer resolutions to the response channel.
let resolve = namer.resolve(path.as_str());
let respond = resolve.forward(rsp_tx).map_err(|_| {}).map(|_| {});
// Do all of this work in another task so that we can receive
Expand Down
13 changes: 0 additions & 13 deletions src/resolver/namer.rs

This file was deleted.

63 changes: 63 additions & 0 deletions src/resolver/namer/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use super::{Result, Error, WeightedAddr};
use tokio_core::reactor::Handle;
use tokio_timer::Timer;
use futures::Stream;
use std::collections::HashMap;
use futures::stream;
use futures::Poll;

pub mod namerd;

use self::namerd::{Addrs, WithClient, Namerd};

pub enum StreamEither<A, B> {
A(A),
B(B),
}

impl<A, B> Stream for StreamEither<A, B>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is cute - I bet we could easily generalize this to make an Either that implements any trait implemented by both A and B?

where A: Stream,
B: Stream<Item = A::Item, Error = A::Error>
{
type Item = A::Item;
type Error = A::Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match *self {
StreamEither::A(ref mut a) => a.poll(),
StreamEither::B(ref mut b) => b.poll(),
}
}
}

pub enum Namer {
Constant(HashMap<String, Vec<WeightedAddr>>),
Namerd(Namerd)
}

impl Namer {
pub fn with_handle(self, handle: &Handle, timer: &Timer) -> WithHandle {
match self {
Namer::Constant(addrs) => WithHandle::Constant(addrs),
Namer::Namerd(namerd) => WithHandle::WithClient(namerd.with_handle(handle, timer)),
}
}
}

pub enum WithHandle {
Constant(HashMap<String, Vec<WeightedAddr>>),
WithClient(WithClient),
}

impl WithHandle {
pub fn resolve(&self, target: &str) -> StreamEither<stream::Once<Result<Vec<WeightedAddr>>, Error>, Addrs> {
match self {
&WithHandle::Constant(ref addrs) => match addrs.get(target) {
Some(addrs) => StreamEither::A(stream::once(Ok(Ok(addrs.clone())))),
None => StreamEither::A(stream::once(Ok(Err(Error::NotBound)))),
},
&WithHandle::WithClient(ref with_client) =>
StreamEither::B(with_client.resolve(target))
}
}
}
49 changes: 17 additions & 32 deletions src/resolver/namerd.rs → src/resolver/namer/namerd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// balancers can be shared across logical names. In the meantime, it's sufficient to have
// a balancer per logical name.

use super::{WeightedAddr, Result, Error, Namer, WithHandle};
use super::{WeightedAddr, Result, Error};
use bytes::{Buf, BufMut, IntoBuf, Bytes, BytesMut};
use futures::{Async, Future, IntoFuture, Poll, Stream};
use hyper::{Body, Chunk, Client, StatusCode, Uri};
Expand All @@ -18,24 +18,11 @@ use tokio_core::reactor::Handle;
use tokio_timer::{Timer, Interval};
use url::Url;

type HttpConnectorFactory = Client<HttpConnector>;
pub type HttpConnectorFactory = Client<HttpConnector>;

type AddrsFuture = Box<Future<Item = Vec<WeightedAddr>, Error = Error>>;

// pub struct Addrs(Box<Stream<Item = Result<Vec<WeightedAddr>>, Error = ()>>);
// impl Stream for Addrs {
// type Item = Result<Vec<WeightedAddr>>;
// type Error = ();
// fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// self.0.poll()
// }
// }

#[derive(Clone)]
pub struct Namerd {
base_url: String,
period: time::Duration,
namespace: String,
stats: Stats,
}

Expand All @@ -49,46 +36,45 @@ impl Namerd {
Namerd {
base_url: format!("{}/api/1/resolve/{}", base_url, namespace),
stats: Stats::new(metrics),
namespace,
period,
}
}
}

impl Namer for Namerd {
fn with_handle(self: Box<Namerd>, handle: &Handle, timer: &Timer) -> Box<WithHandle> {
Box::new(WithClient {
pub fn with_handle(self, handle: &Handle, timer: &Timer) -> WithClient {
WithClient {
namerd: self,
client: Rc::new(Client::new(handle)),
timer: timer.clone(),
})
}
}
}

/// A name
pub struct WithClient {
namerd: Box<Namerd>,
client: Rc<HttpConnectorFactory>,
timer: Timer,
namerd: Namerd,
client: Rc<HttpConnectorFactory>,
timer: Timer
}
impl WithHandle for WithClient {
fn resolve(&self, target: &str) -> Box<Stream<Item = Result<Vec<WeightedAddr>>, Error = Error>> {
let uri = Url::parse_with_params(&self.namerd.base_url, &[("path", &target)])

impl WithClient {
pub fn resolve(&self, target: &str) -> Addrs {
let uri = Url::parse_with_params(self.namerd.base_url.as_ref(), &[("path", &target)])
.expect("invalid namerd url")
.as_str()
.parse::<Uri>()
.expect("Could not parse namerd URI");
let init = request(self.client.clone(), uri.clone(), self.namerd.stats.clone());
let interval = self.timer.interval(self.namerd.period);
Box::new(Addrs {
Addrs {
client: self.client.clone(),
stats: self.namerd.stats.clone(),
state: Some(State::Pending(init, interval)),
uri,
})
}
}
}

type AddrsFuture = Box<Future<Item = Vec<WeightedAddr>, Error = Error>>;

/// Streams
pub struct Addrs {
state: Option<State>,
Expand Down Expand Up @@ -270,7 +256,6 @@ struct Meta {
endpoint_addr_weight: Option<f64>,
}


#[derive(Clone)]
pub struct Stats {
request_latency: tacho::Timer,
Expand All @@ -285,4 +270,4 @@ impl Stats {
failure_count: metrics.counter("failure_count".into()),
}
}
}
}