Skip to content
This repository has been archived by the owner on Dec 6, 2024. It is now read-only.

[WIP] Add a Namer interface #80

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 62 additions & 62 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 9 additions & 2 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use super::{admin, resolver, router, server};
use super::balancer::BalancerFactory;
use super::connector::{ConfigError as ConnectorConfigError, ConnectorFactoryConfig};
use super::resolver::{ConfigError as ResolverConfigError, NamerdConfig};
use super::resolver::{ConfigError as ResolverConfigError, ConstantConfig, NamerdConfig, Namer};
use super::server::ConfigError as ServerConfigError;
use futures::{Future, Stream, sync};
use hyper::server::Http;
Expand Down Expand Up @@ -200,7 +200,11 @@ impl RouterConfig {
let (resolver, resolver_exec) = match self.interpreter {
InterpreterConfig::NamerdHttp(config) => {
let namerd = config.into_namerd(&metrics).map_err(Error::Interpreter)?;
resolver::new(namerd)
resolver::new(Namer::Namerd(namerd))
},
InterpreterConfig::Constant(config) => {
let constant = config.into_namer().map_err(Error::Interpreter)?;
resolver::new(Namer::Constant(constant))
}
};

Expand Down Expand Up @@ -263,6 +267,9 @@ pub enum InterpreterConfig {
/// Polls namerd for updates.
#[serde(rename = "io.l5d.namerd.http")]
NamerdHttp(NamerdConfig),
/// Static list of addresses
#[serde(rename = "io.l5d.static")]
Constant(ConstantConfig),
}

/// Configures the admin server.
Expand Down
45 changes: 44 additions & 1 deletion src/resolver/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use super::namerd::Namerd;
use super::WeightedAddr;
use super::namer::namerd::Namerd;
use std::time::Duration;
use tacho;
use url::{self, Url};
use std::net;
use std::collections::HashMap;
use std::iter::FromIterator;

pub type Result<T> = ::std::result::Result<T, Error>;

Expand Down Expand Up @@ -38,3 +42,42 @@ impl NamerdConfig {
Ok(namerd)
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields, rename_all = "camelCase")]
pub struct ConstantConfig {
addrs: HashMap<String, Vec<Addr>>
}

impl ConstantConfig {
pub fn into_namer(mut self) -> Result<HashMap<String, Vec<WeightedAddr>>> {
let weighted_iter = self.addrs.drain().map(|(path, addrs)| (path, to_weighted_addrs(&addrs)));
let weighted_addrs = HashMap::from_iter(weighted_iter);
Ok(weighted_addrs)
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
struct Addr {
ip: String,
port: u16,
weight: Option<f64>,
}

fn to_weighted_addrs(addrs: &[Addr]) -> Vec<WeightedAddr> {
// We never intentionally clear the EndpointMap.
let mut dsts: Vec<WeightedAddr> = Vec::new();
let mut sum = 0.0;
for na in addrs {
let addr = net::SocketAddr::new(na.ip.parse().unwrap(), na.port);
let w = na.weight.unwrap_or(1.0);
sum += w;
dsts.push(WeightedAddr::new(addr, w));
}
// Normalize weights on [0.0, 0.1].
for dst in &mut dsts {
dst.weight /= sum;
}
dsts
}

20 changes: 10 additions & 10 deletions src/resolver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use tokio_core::reactor::Handle;
use tokio_timer::{Timer, TimerError};

mod config;
mod namerd;
pub use self::config::{Error as ConfigError, NamerdConfig};
pub use self::namerd::{Namerd, Addrs};
mod namer;
pub use self::config::{Error as ConfigError, ConstantConfig, NamerdConfig};
pub use self::namer::Namer;

#[derive(Debug)]
pub enum Error {
Expand All @@ -29,14 +29,14 @@ pub type Result<T> = ::std::result::Result<T, Error>;

/// Creates a multithreaded resolver.
///
/// The `Resolver` side is a client of the `Executor`. Namerd work is performed on
/// The `Resolver` side is a client of the `Executor`. Namer work is performed on
/// whatever thread the executor is spawned on.
pub fn new(namerd: Namerd) -> (Resolver, Executor) {
pub fn new(namer: Namer) -> (Resolver, Executor) {
let (tx, rx) = mpsc::unbounded();
let res = Resolver { requests: tx };
let exe = Executor {
requests: rx,
namerd: namerd,
namer: namer,
};
(res, exe)
}
Expand Down Expand Up @@ -77,16 +77,16 @@ impl Stream for Resolve {
/// Serves resolutions from `Resolver`s.
pub struct Executor {
requests: mpsc::UnboundedReceiver<(Path, mpsc::UnboundedSender<Result<Vec<WeightedAddr>>>)>,
namerd: Namerd,
namer: Namer,
}

impl Executor {
pub fn execute(self, handle: &Handle, timer: &Timer) -> Execute {
let handle = handle.clone();
let namerd = self.namerd.with_client(&handle, timer);
let namer = self.namer.with_handle(&handle, timer);
let f = self.requests.for_each(move |(path, rsp_tx)| {
// Stream namerd resolutions to the response channel.
let resolve = namerd.resolve(path.as_str());
// Stream namer resolutions to the response channel.
let resolve = namer.resolve(path.as_str());
let respond = resolve.forward(rsp_tx).map_err(|_| {}).map(|_| {});
// Do all of this work in another task so that we can receive
// additional requests.
Expand Down
63 changes: 63 additions & 0 deletions src/resolver/namer/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use super::{Result, Error, WeightedAddr};
use tokio_core::reactor::Handle;
use tokio_timer::Timer;
use futures::Stream;
use std::collections::HashMap;
use futures::stream;
use futures::Poll;

pub mod namerd;

use self::namerd::{Addrs, WithClient, Namerd};

pub enum StreamEither<A, B> {
A(A),
B(B),
}

impl<A, B> Stream for StreamEither<A, B>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is cute - I bet we could easily generalize this to make an Either that implements any trait implemented by both A and B?

where A: Stream,
B: Stream<Item = A::Item, Error = A::Error>
{
type Item = A::Item;
type Error = A::Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match *self {
StreamEither::A(ref mut a) => a.poll(),
StreamEither::B(ref mut b) => b.poll(),
}
}
}

pub enum Namer {
Constant(HashMap<String, Vec<WeightedAddr>>),
Namerd(Namerd)
}

impl Namer {
pub fn with_handle(self, handle: &Handle, timer: &Timer) -> WithHandle {
match self {
Namer::Constant(addrs) => WithHandle::Constant(addrs),
Namer::Namerd(namerd) => WithHandle::WithClient(namerd.with_handle(handle, timer)),
}
}
}

pub enum WithHandle {
Constant(HashMap<String, Vec<WeightedAddr>>),
WithClient(WithClient),
}

impl WithHandle {
pub fn resolve(&self, target: &str) -> StreamEither<stream::Once<Result<Vec<WeightedAddr>>, Error>, Addrs> {
match self {
&WithHandle::Constant(ref addrs) => match addrs.get(target) {
Some(addrs) => StreamEither::A(stream::once(Ok(Ok(addrs.clone())))),
None => StreamEither::A(stream::once(Ok(Err(Error::NotBound)))),
},
&WithHandle::WithClient(ref with_client) =>
StreamEither::B(with_client.resolve(target))
}
}
}
35 changes: 10 additions & 25 deletions src/resolver/namerd.rs → src/resolver/namer/namerd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,11 @@ use tokio_core::reactor::Handle;
use tokio_timer::{Timer, Interval};
use url::Url;

type HttpConnectorFactory = Client<HttpConnector>;
pub type HttpConnectorFactory = Client<HttpConnector>;

type AddrsFuture = Box<Future<Item = Vec<WeightedAddr>, Error = Error>>;

// pub struct Addrs(Box<Stream<Item = Result<Vec<WeightedAddr>>, Error = ()>>);
// impl Stream for Addrs {
// type Item = Result<Vec<WeightedAddr>>;
// type Error = ();
// fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// self.0.poll()
// }
// }

#[derive(Clone)]
pub struct Namerd {
base_url: String,
period: time::Duration,
namespace: String,
stats: Stats,
}

Expand All @@ -49,14 +36,11 @@ impl Namerd {
Namerd {
base_url: format!("{}/api/1/resolve/{}", base_url, namespace),
stats: Stats::new(metrics),
namespace,
period,
}
}
}

impl Namerd {
pub fn with_client(self, handle: &Handle, timer: &Timer) -> WithClient {
pub fn with_handle(self, handle: &Handle, timer: &Timer) -> WithClient {
WithClient {
namerd: self,
client: Rc::new(Client::new(handle)),
Expand All @@ -65,15 +49,15 @@ impl Namerd {
}
}

/// A name
pub struct WithClient {
namerd: Namerd,
client: Rc<HttpConnectorFactory>,
timer: Timer,
namerd: Namerd,
client: Rc<HttpConnectorFactory>,
timer: Timer
}

impl WithClient {
pub fn resolve(&self, target: &str) -> Addrs {
let uri = Url::parse_with_params(&self.namerd.base_url, &[("path", &target)])
let uri = Url::parse_with_params(self.namerd.base_url.as_ref(), &[("path", &target)])
.expect("invalid namerd url")
.as_str()
.parse::<Uri>()
Expand All @@ -89,6 +73,8 @@ impl WithClient {
}
}

type AddrsFuture = Box<Future<Item = Vec<WeightedAddr>, Error = Error>>;

/// Streams
pub struct Addrs {
state: Option<State>,
Expand Down Expand Up @@ -270,7 +256,6 @@ struct Meta {
endpoint_addr_weight: Option<f64>,
}


#[derive(Clone)]
pub struct Stats {
request_latency: tacho::Timer,
Expand All @@ -285,4 +270,4 @@ impl Stats {
failure_count: metrics.counter("failure_count".into()),
}
}
}
}