Skip to content

Commit

Permalink
Introduced handling of Authorization http header. For now "Bearer" to…
Browse files Browse the repository at this point in the history
…ken authentication is being handled. For now tokens and usernames are defined in the config file of sidecar
  • Loading branch information
Jakub Zajkowski committed Nov 20, 2023
1 parent 6db42f7 commit 2c613a1
Show file tree
Hide file tree
Showing 9 changed files with 613 additions and 71 deletions.
27 changes: 27 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions sidecar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ utoipa-swagger-ui = { version = "3.1.5" }
warp = { version = "0.3.6", features = ["compression"] }
wheelbuf = "0.2.0"
once_cell = { workspace = true }
tower-http = { version = "0.4.4", features = ["auth"] }
futures-util = { workspace = true }

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.5"
Expand Down
163 changes: 163 additions & 0 deletions sidecar/src/authorization/layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
use futures_util::future::BoxFuture;
use http::{header::AUTHORIZATION, StatusCode};
use hyper::{Body, Request, Response};
use std::sync::Arc;
use tower_http::auth::AsyncAuthorizeRequest;

use super::{Authorizer, UserId, BEARER};

#[derive(Clone)]
pub struct HeaderBasedAuthorizeRequest<T>
where
T: Authorizer + Send + Sync + 'static,
{
api_token_authorizer: Arc<T>,
}

impl<T> HeaderBasedAuthorizeRequest<T>
where
T: Authorizer + Send + Sync + 'static,
{
pub(crate) fn new(api_token_authorizer: T) -> Self {
HeaderBasedAuthorizeRequest {
api_token_authorizer: Arc::new(api_token_authorizer),
}
}
}

impl<B, T> AsyncAuthorizeRequest<B> for HeaderBasedAuthorizeRequest<T>
where
B: Send + Sync + 'static,
T: Authorizer + Send + Sync + 'static,
{
type RequestBody = B;
type ResponseBody = Body;
type Future = BoxFuture<'static, Result<Request<B>, Response<Self::ResponseBody>>>;

fn authorize(&mut self, mut request: Request<B>) -> Self::Future {
let authorizer = self.api_token_authorizer.clone();
Box::pin(async {
if let Some(user_id) = check_auth(authorizer, &request).await {
request.extensions_mut().insert(user_id);
Ok(request)
} else {
Err(unauthorized_response())
}
})
}
}

async fn check_auth<T: Authorizer, B>(
api_token_authorizer: Arc<T>,
request: &Request<B>,
) -> Option<UserId> {
let maybe_token = request
.headers()
.get(AUTHORIZATION)
.and_then(|header: &http::HeaderValue| header.to_str().ok());
if let Some(token) = maybe_token {
let parts = token.split(' ').collect::<Vec<&str>>();
if parts.len() != 2 {
return None;
}
if parts[0].to_lowercase() != BEARER {
return None;
}
api_token_authorizer.authorize(parts[1].to_string()).await
} else {
None
}
}

#[cfg(test)]
mod tests {
use super::*;
use async_trait::async_trait;

struct MockAuthorizer {
expected_token: Option<String>,
user_id: Option<UserId>,
}

impl MockAuthorizer {
fn new(expected_token: Option<String>, user_id: Option<UserId>) -> Self {
MockAuthorizer {
expected_token,
user_id,
}
}
}

#[async_trait]
impl Authorizer for MockAuthorizer {
async fn authorize(&self, token: String) -> Option<UserId> {
if let Some(expected_token) = self.expected_token.clone() {
if expected_token != token {
unreachable!("Expected token {} but got {}", expected_token, token);
}
}
self.user_id.clone()
}
}

#[tokio::test]
async fn should_fail_authorization_if_no_token_given() {
let result = prepare_and_authorize(None, None, None).await;
assert!(result.is_err());
}

#[tokio::test]
async fn should_fail_authorization_if_no_user_for_token() {
let result = prepare_and_authorize(Some("xyz"), None, Some("Bearer xyz")).await;
assert!(result.is_err());
}

#[tokio::test]
async fn should_fail_authorization_if_bearer_prefix_missing() {
let result = prepare_and_authorize(Some("xyz"), Some("user_1"), Some("xyz")).await;
assert!(result.is_err());
}

#[tokio::test]
async fn should_fail_authorization_if_correct_token_under_wrong_method() {
let result = prepare_and_authorize(Some("abc"), Some("user_1"), Some("Basic abc")).await;
assert!(result.is_err());
}

#[tokio::test]
async fn should_pass() {
let result = prepare_and_authorize(Some("abc"), Some("user_1"), Some("Bearer abc")).await;
let user_id = result
.unwrap()
.extensions()
.get::<UserId>()
.unwrap()
.clone();
assert_eq!(user_id, UserId("user_1".into()));
}

async fn prepare_and_authorize(
expected_token: Option<&str>,
user_id: Option<&str>,
authorization_header: Option<&str>,
) -> Result<Request<Body>, Response<Body>> {
let authorizer = MockAuthorizer::new(
expected_token.map(|token| token.to_string()),
user_id.map(|id| UserId(id.to_string())),
);
let mut auth = HeaderBasedAuthorizeRequest::new(authorizer);
let mut builder = Request::builder();
if let Some(header) = authorization_header {
builder = builder.header("Authorization", header);
}
let request = builder.body(Body::empty()).unwrap();
auth.authorize(request).await
}
}

fn unauthorized_response() -> Response<Body> {
Response::builder()
.status(StatusCode::UNAUTHORIZED)
.body(b"Unauthorized".to_vec().into())
.unwrap()
}
59 changes: 59 additions & 0 deletions sidecar/src/authorization/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use crate::types::config::AuthConfig;
use async_trait::async_trait;
pub use layer::HeaderBasedAuthorizeRequest;
use std::collections::HashMap;
mod layer;

const BEARER: &str = "bearer";

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct UserId(String);

#[async_trait]
pub trait Authorizer {
async fn authorize(&self, token: String) -> Option<UserId>;
}

#[derive(Clone)]
pub struct BearerAuthorizer {
api_key_to_username: HashMap<String, String>,
}

#[async_trait]
impl Authorizer for BearerAuthorizer {
async fn authorize(&self, token: String) -> Option<UserId> {
self.api_key_to_username
.get(&token)
.map(|username| UserId(username.clone()))
}
}

impl BearerAuthorizer {
pub fn new(token_user_map: HashMap<String, String>) -> Self {
BearerAuthorizer {
api_key_to_username: token_user_map,
}
}
}

pub enum AuthorizationValidator {
SimpleAuthorization(BearerAuthorizer),
NoAuthorization,
}

pub fn build_authorization_validator(auth_config: &Option<AuthConfig>) -> AuthorizationValidator {
if let Some(auth) = auth_config {
if auth.enabled {
let token_to_user = auth
.users
.iter()
.map(|user| (user.api_key.clone(), user.username.clone()))
.collect();
AuthorizationValidator::SimpleAuthorization(BearerAuthorizer::new(token_to_user))
} else {
AuthorizationValidator::NoAuthorization
}
} else {
AuthorizationValidator::NoAuthorization
}
}
25 changes: 6 additions & 19 deletions sidecar/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
extern crate core;
mod admin_server;
mod api_version_manager;
mod authorization;
mod database;
mod event_stream_server;
pub mod rest_server;
Expand All @@ -29,7 +30,6 @@ use crate::{
admin_server::run_server as start_admin_server,
database::sqlite_database::SqliteDatabase,
event_stream_server::{Config as SseConfig, EventStreamServer},
rest_server::run_server as start_rest_server,
types::{
config::{read_config, Config},
database::{DatabaseWriteError, DatabaseWriter},
Expand All @@ -46,6 +46,7 @@ use clap::Parser;
use database::postgresql_database::PostgreSqlDatabase;
use futures::future::join_all;
use hex_fmt::HexFmt;
use rest_server::build_and_start_rest_server;
#[cfg(not(target_env = "msvc"))]
use tikv_jemallocator::Jemalloc;
use tokio::{
Expand All @@ -54,6 +55,7 @@ use tokio::{
time::sleep,
};
use tracing::{debug, error, info, trace, warn};
use types::config::validate_auth_config;
use types::{
config::StorageConfig,
database::{Database, DatabaseReader},
Expand Down Expand Up @@ -101,7 +103,8 @@ async fn run(config: Config) -> Result<(), Error> {
let connection_configs = config.connections.clone();
let storage_config = config.storage.clone();
let database = build_database(&storage_config).await?;
let rest_server_handle = build_and_start_rest_server(&config, database.clone());
let rest_server_handle =
build_and_start_rest_server(&config.rest_server, &config.auth, database.clone());

// Task to manage incoming events from all three filters
let listening_task_handle = start_sse_processors(
Expand Down Expand Up @@ -229,23 +232,6 @@ fn spawn_sse_processor(
}
}

fn build_and_start_rest_server(
config: &Config,
database: Database,
) -> JoinHandle<Result<(), Error>> {
let rest_server_config = config.rest_server.clone();
tokio::spawn(async move {
match database {
Database::SqliteDatabaseWrapper(db) => {
start_rest_server(rest_server_config, db.clone()).await
}
Database::PostgreSqlDatabaseWrapper(db) => {
start_rest_server(rest_server_config, db.clone()).await
}
}
})
}

fn build_and_start_admin_server(config: &Config) -> JoinHandle<Result<(), Error>> {
let admin_server_config = config.admin_server.clone();
tokio::spawn(async move {
Expand Down Expand Up @@ -333,6 +319,7 @@ fn validate_config(config: &Config) -> Result<(), Error> {
"Unable to run: max_attempts setting must be above 0 for the sidecar to attempt connection"
));
}
validate_auth_config(&config.auth)?;
Ok(())
}

Expand Down
Loading

0 comments on commit 2c613a1

Please sign in to comment.