Skip to content

Commit

Permalink
wip: cont on http interface
Browse files Browse the repository at this point in the history
  • Loading branch information
cablehead committed Jun 1, 2024
1 parent e1ae8be commit c206f90
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 14 deletions.
52 changes: 52 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ ssri = "9.2.0"
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1.15"
tokio-util = { version = "0.7.11", features = ["compat"] }
url = "2.5.0"

[dev-dependencies]
assert_cmd = "2.0.14"
Expand Down
8 changes: 4 additions & 4 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,7 @@ async fn post(mut store: Store, req: Request<hyper::body::Incoming>) -> HTTPResu
.map(|x| x.to_str())
.transpose()
.unwrap()
.map(|s| {
serde_json::from_str(s).map_err(|_| format!("xs-meta isn't valid JSON: {}", s))
})
.map(|s| serde_json::from_str(s).map_err(|_| format!("xs-meta isn't valid JSON: {}", s)))
.transpose()
{
Ok(meta) => meta,
Expand All @@ -140,7 +138,9 @@ async fn post(mut store: Store, req: Request<hyper::body::Incoming>) -> HTTPResu
eprintln!("meta: {:?}", &meta);

let hash = writer.commit().await?;
let frame = store.append(parts.uri.path().trim_start_matches('/'), Some(hash), meta).await;
let frame = store
.append(parts.uri.path().trim_start_matches('/'), Some(hash), meta)
.await;

Ok(Response::builder()
.status(StatusCode::OK)
Expand Down
80 changes: 71 additions & 9 deletions src/http.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
use std::collections::HashMap;
use std::error::Error;
use std::net::SocketAddr;

use serde::{Deserialize, Serialize};

use tokio::io::AsyncWriteExt;

// needed to convert async-std Async to a tokio Async
use tokio_util::compat::FuturesAsyncWriteCompatExt;

use http_body_util::{combinators::BoxBody, BodyExt, Full};
use hyper::body::Bytes;
use hyper::server::conn::http1;
Expand All @@ -15,8 +21,6 @@ use crate::store::Store;

#[derive(Serialize, Deserialize, Debug)]
pub struct Request {
pub stamp: scru128::Scru128Id,
pub message: String,
pub proto: String,
#[serde(with = "http_serde::method")]
pub method: http::method::Method,
Expand All @@ -32,8 +36,6 @@ pub struct Request {
pub uri: http::Uri,
pub path: String,
pub query: HashMap<String, String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub response: Option<Response>,
}

#[derive(Default, Debug, Serialize, Deserialize, Clone)]
Expand All @@ -47,12 +49,69 @@ pub struct Response {
type BoxError = Box<dyn std::error::Error + Send + Sync>;
type HTTPResult = Result<hyper::Response<BoxBody<Bytes, BoxError>>, BoxError>;

async fn handle(_store: Store, req: hyper::Request<hyper::body::Incoming>) -> HTTPResult {
eprintln!("\n\nreq: {:?}", &req);
async fn handle(
mut store: Store,
req: hyper::Request<hyper::body::Incoming>,
addr: Option<SocketAddr>,
) -> HTTPResult {
let (parts, mut body) = req.into_parts();

let uri = parts.uri.clone().into_parts();

let authority: Option<String> = uri.authority.as_ref().map(|a| a.to_string()).or_else(|| {
parts
.headers
.get("host")
.map(|a| a.to_str().unwrap().to_owned())
});

let path = parts.uri.path().to_string();

let query: HashMap<String, String> = parts
.uri
.query()
.map(|v| {
url::form_urlencoded::parse(v.as_bytes())
.into_owned()
.collect()
})
.unwrap_or_else(HashMap::new);

let req_meta = Request {
proto: format!("{:?}", parts.version),
method: parts.method,
authority,
remote_ip: addr.as_ref().map(|a| a.ip()),
remote_port: addr.as_ref().map(|a| a.port()),
headers: parts.headers,
uri: parts.uri,
path,
query,
};

let writer = store.cas_writer().await?;
// convert writer from async-std -> tokio
let mut writer = writer.compat_write();
while let Some(frame) = body.frame().await {
let data = frame?.into_data().unwrap();
writer.write_all(&data).await?;
}
// get the original writer back
let writer = writer.into_inner();
let hash = writer.commit().await?;

let frame = store
.append(
"http.request",
Some(hash),
Some(serde_json::to_value(&req_meta).unwrap()),
)
.await;

Ok(hyper::Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(full("Hello world.\n".to_string()))?)
.body(full(serde_json::to_string(&frame).unwrap()))?)
}

pub async fn serve(
Expand All @@ -62,12 +121,15 @@ pub async fn serve(
println!("starting http interface: {:?}", addr);
let mut listener = Listener::bind(addr).await?;
loop {
let (stream, _) = listener.accept().await?;
let (stream, remote_addr) = listener.accept().await?;
let io = TokioIo::new(stream);
let store = store.clone();
tokio::task::spawn(async move {
if let Err(err) = http1::Builder::new()
.serve_connection(io, service_fn(move |req| handle(store.clone(), req)))
.serve_connection(
io,
service_fn(move |req| handle(store.clone(), req, remote_addr)),
)
.await
{
// Match against the error kind to selectively ignore `NotConnected` errors
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub mod api;
pub mod http;
pub mod store;
pub mod listener;
pub mod store;

0 comments on commit c206f90

Please sign in to comment.