Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduced handling of Authorization http header. For now "Bearer" to… #220

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions sidecar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ utoipa-swagger-ui = { version = "3.1.5" }
warp = { version = "0.3.6", features = ["compression"] }
wheelbuf = "0.2.0"
once_cell = { workspace = true }
tower-http = { version = "0.4.4", features = ["auth"] }
futures-util = { workspace = true }

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.5"
Expand Down
163 changes: 163 additions & 0 deletions sidecar/src/authorization/layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
use futures_util::future::BoxFuture;
use http::{header::AUTHORIZATION, StatusCode};
use hyper::{Body, Request, Response};
use std::sync::Arc;
use tower_http::auth::AsyncAuthorizeRequest;

use super::{Authorizer, UserId, BEARER};

#[derive(Clone)]
pub struct HeaderBasedAuthorizeRequest<T>
where
T: Authorizer + Send + Sync + 'static,
{
api_token_authorizer: Arc<T>,
}

impl<T> HeaderBasedAuthorizeRequest<T>
where
T: Authorizer + Send + Sync + 'static,
{
pub(crate) fn new(api_token_authorizer: T) -> Self {
HeaderBasedAuthorizeRequest {
api_token_authorizer: Arc::new(api_token_authorizer),
}
}
}

impl<B, T> AsyncAuthorizeRequest<B> for HeaderBasedAuthorizeRequest<T>
where
B: Send + Sync + 'static,
T: Authorizer + Send + Sync + 'static,
{
type RequestBody = B;
type ResponseBody = Body;
type Future = BoxFuture<'static, Result<Request<B>, Response<Self::ResponseBody>>>;

fn authorize(&mut self, mut request: Request<B>) -> Self::Future {
let authorizer = self.api_token_authorizer.clone();
Box::pin(async {
if let Some(user_id) = check_auth(authorizer, &request).await {
request.extensions_mut().insert(user_id);
Ok(request)
} else {
Err(unauthorized_response())
}
})
}
}

async fn check_auth<T: Authorizer, B>(
api_token_authorizer: Arc<T>,
request: &Request<B>,
) -> Option<UserId> {
let maybe_token = request
.headers()
.get(AUTHORIZATION)
.and_then(|header: &http::HeaderValue| header.to_str().ok());
if let Some(token) = maybe_token {
let parts = token.split(' ').collect::<Vec<&str>>();
if parts.len() != 2 {
return None;
}
if parts[0].to_lowercase() != BEARER {
return None;
}
api_token_authorizer.authorize(parts[1].to_string()).await
} else {
None
}
}

#[cfg(test)]
mod tests {
use super::*;
use async_trait::async_trait;

struct MockAuthorizer {
expected_token: Option<String>,
user_id: Option<UserId>,
}

impl MockAuthorizer {
fn new(expected_token: Option<String>, user_id: Option<UserId>) -> Self {
MockAuthorizer {
expected_token,
user_id,
}
}
}

#[async_trait]
impl Authorizer for MockAuthorizer {
async fn authorize(&self, token: String) -> Option<UserId> {
if let Some(expected_token) = self.expected_token.clone() {
if expected_token != token {
unreachable!("Expected token {} but got {}", expected_token, token);
}
}
self.user_id.clone()
}
}

#[tokio::test]
async fn should_fail_authorization_if_no_token_given() {
let result = prepare_and_authorize(None, None, None).await;
assert!(result.is_err());
}

#[tokio::test]
async fn should_fail_authorization_if_no_user_for_token() {
let result = prepare_and_authorize(Some("xyz"), None, Some("Bearer xyz")).await;
assert!(result.is_err());
}

#[tokio::test]
async fn should_fail_authorization_if_bearer_prefix_missing() {
let result = prepare_and_authorize(Some("xyz"), Some("user_1"), Some("xyz")).await;
assert!(result.is_err());
}

#[tokio::test]
async fn should_fail_authorization_if_correct_token_under_wrong_method() {
let result = prepare_and_authorize(Some("abc"), Some("user_1"), Some("Basic abc")).await;
assert!(result.is_err());
}

#[tokio::test]
async fn should_pass() {
let result = prepare_and_authorize(Some("abc"), Some("user_1"), Some("Bearer abc")).await;
let user_id = result
.unwrap()
.extensions()
.get::<UserId>()
.unwrap()
.clone();
assert_eq!(user_id, UserId("user_1".into()));
}

async fn prepare_and_authorize(
expected_token: Option<&str>,
user_id: Option<&str>,
authorization_header: Option<&str>,
) -> Result<Request<Body>, Response<Body>> {
let authorizer = MockAuthorizer::new(
expected_token.map(|token| token.to_string()),
user_id.map(|id| UserId(id.to_string())),
);
let mut auth = HeaderBasedAuthorizeRequest::new(authorizer);
let mut builder = Request::builder();
if let Some(header) = authorization_header {
builder = builder.header("Authorization", header);
}
let request = builder.body(Body::empty()).unwrap();
auth.authorize(request).await
}
}

fn unauthorized_response() -> Response<Body> {
Response::builder()
.status(StatusCode::UNAUTHORIZED)
.body(b"Unauthorized".to_vec().into())
.unwrap()
}
59 changes: 59 additions & 0 deletions sidecar/src/authorization/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use crate::types::config::AuthConfig;
use async_trait::async_trait;
pub use layer::HeaderBasedAuthorizeRequest;
use std::collections::HashMap;
mod layer;

const BEARER: &str = "bearer";

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct UserId(String);

#[async_trait]
pub trait Authorizer {
async fn authorize(&self, token: String) -> Option<UserId>;
}

#[derive(Clone)]
pub struct BearerAuthorizer {
api_key_to_username: HashMap<String, String>,
}

#[async_trait]
impl Authorizer for BearerAuthorizer {
async fn authorize(&self, token: String) -> Option<UserId> {
self.api_key_to_username
.get(&token)
.map(|username| UserId(username.clone()))
}
}

impl BearerAuthorizer {
pub fn new(token_user_map: HashMap<String, String>) -> Self {
BearerAuthorizer {
api_key_to_username: token_user_map,
}
}
}

pub enum AuthorizationValidator {
SimpleAuthorization(BearerAuthorizer),
NoAuthorization,
}

pub fn build_authorization_validator(auth_config: &Option<AuthConfig>) -> AuthorizationValidator {
if let Some(auth) = auth_config {
if auth.enabled {
let token_to_user = auth
.users
.iter()
.map(|user| (user.api_key.clone(), user.username.clone()))
.collect();
AuthorizationValidator::SimpleAuthorization(BearerAuthorizer::new(token_to_user))
} else {
AuthorizationValidator::NoAuthorization
}
} else {
AuthorizationValidator::NoAuthorization
}
}
25 changes: 6 additions & 19 deletions sidecar/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
extern crate core;
mod admin_server;
mod api_version_manager;
mod authorization;
mod database;
mod event_stream_server;
pub mod rest_server;
Expand All @@ -29,7 +30,6 @@ use crate::{
admin_server::run_server as start_admin_server,
database::sqlite_database::SqliteDatabase,
event_stream_server::{Config as SseConfig, EventStreamServer},
rest_server::run_server as start_rest_server,
types::{
config::{read_config, Config},
database::{DatabaseWriteError, DatabaseWriter},
Expand All @@ -46,6 +46,7 @@ use clap::Parser;
use database::postgresql_database::PostgreSqlDatabase;
use futures::future::join_all;
use hex_fmt::HexFmt;
use rest_server::build_and_start_rest_server;
#[cfg(not(target_env = "msvc"))]
use tikv_jemallocator::Jemalloc;
use tokio::{
Expand All @@ -54,6 +55,7 @@ use tokio::{
time::sleep,
};
use tracing::{debug, error, info, trace, warn};
use types::config::validate_auth_config;
use types::{
config::StorageConfig,
database::{Database, DatabaseReader},
Expand Down Expand Up @@ -101,7 +103,8 @@ async fn run(config: Config) -> Result<(), Error> {
let connection_configs = config.connections.clone();
let storage_config = config.storage.clone();
let database = build_database(&storage_config).await?;
let rest_server_handle = build_and_start_rest_server(&config, database.clone());
let rest_server_handle =
build_and_start_rest_server(&config.rest_server, &config.auth, database.clone());

// Task to manage incoming events from all three filters
let listening_task_handle = start_sse_processors(
Expand Down Expand Up @@ -229,23 +232,6 @@ fn spawn_sse_processor(
}
}

fn build_and_start_rest_server(
config: &Config,
database: Database,
) -> JoinHandle<Result<(), Error>> {
let rest_server_config = config.rest_server.clone();
tokio::spawn(async move {
match database {
Database::SqliteDatabaseWrapper(db) => {
start_rest_server(rest_server_config, db.clone()).await
}
Database::PostgreSqlDatabaseWrapper(db) => {
start_rest_server(rest_server_config, db.clone()).await
}
}
})
}

fn build_and_start_admin_server(config: &Config) -> JoinHandle<Result<(), Error>> {
let admin_server_config = config.admin_server.clone();
tokio::spawn(async move {
Expand Down Expand Up @@ -333,6 +319,7 @@ fn validate_config(config: &Config) -> Result<(), Error> {
"Unable to run: max_attempts setting must be above 0 for the sidecar to attempt connection"
));
}
validate_auth_config(&config.auth)?;
Ok(())
}

Expand Down
Loading