Skip to content

Commit

Permalink
wip: sketch spawning processes
Browse files Browse the repository at this point in the history
  • Loading branch information
cablehead committed Jun 6, 2024
1 parent 14ff264 commit fe4f9b8
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 40 deletions.
108 changes: 68 additions & 40 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::path::PathBuf;

use clap::Parser;

use xs::store::ReadOptions;
use xs::store::Store;

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -29,53 +30,80 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {

use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};

tokio::spawn(async move {
let origin = "wss://gateway.discord.gg";
let command = format!(
"websocat {} --ping-interval 5 --ping-timeout 10 -E -t",
origin
);
let mut child = tokio::process::Command::new("sh")
.arg("-c")
.arg(&command)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.spawn()
.expect("Failed to spawn command");
{
let mut store = store.clone();
tokio::spawn(async move {
let origin = "wss://gateway.discord.gg";
let command = format!(
"websocat {} --ping-interval 5 --ping-timeout 10 -E -t",
origin
);
let mut child = tokio::process::Command::new("sh")
.arg("-c")
.arg(&command)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.spawn()
.expect("Failed to spawn command");

let mut stdin = child.stdin.take().expect("Failed to open stdin");
let stdout = child.stdout.take().expect("Failed to open stdout");
let mut stdin = child.stdin.take().expect("Failed to open stdin");
let stdout = child.stdout.take().expect("Failed to open stdout");

tokio::spawn(async move {
loop {
if let Err(e) = stdin.write_all(b"{\"op\":1,\"d\":null}\n").await {
eprintln!("Failed to write to stdin: {}", e);
break;
}
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
});
{
let store = store.clone();
tokio::spawn(async move {
let mut recver = store
.read(ReadOptions {
follow: true,
tail: true,
last_id: None,
})
.await;

tokio::spawn(async move {
let mut reader = BufReader::new(stdout);
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => break, // EOF
Ok(_) => {
eprint!("{}", line);
while let Some(frame) = recver.recv().await {
eprintln!("FRAME: {:?}", &frame.topic);
if frame.topic == "ws.send" {
let content = store.cas_read(&frame.hash.unwrap()).await.unwrap();
let mut content = content;
content.push(b'\n');
eprintln!("CONTENT: {}", std::str::from_utf8(&content).unwrap());
if let Err(e) = stdin.write_all(&content).await {
eprintln!("Failed to write to stdin: {}", e);
break;
}
}
}
Err(e) => {
eprintln!("Failed to read from stdout: {}", e);
break;
/*
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(20)).await;
}
}
*/
});
}
});

let _ = child.wait().await;
});
tokio::spawn(async move {
let mut reader = BufReader::new(stdout);
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => break, // EOF
Ok(_) => {
let hash = store.cas_insert(&line).await.unwrap();
let frame = store.append("ws.recv", Some(hash.clone()), None).await;
eprintln!("inserted: {} {:?} :: {:?}", line, hash, frame);
}
Err(e) => {
eprintln!("Failed to read from stdout: {}", e);
break;
}
}
}
});

let _ = child.wait().await;
});
}

xs::api::serve(store).await
// TODO: graceful shutdown
Expand Down
8 changes: 8 additions & 0 deletions src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,14 @@ impl Store {
.await
}

pub async fn cas_insert(&self, content: &str) -> cacache::Result<ssri::Integrity> {
cacache::write_hash(&self.path.join("cacache"), content).await
}

pub async fn cas_read(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
cacache::read_hash(&self.path.join("cacache"), hash).await
}

pub async fn append(
&mut self,
topic: &str,
Expand Down

0 comments on commit fe4f9b8

Please sign in to comment.