Skip to content

Commit

Permalink
wip: sketch spawning processes
Browse files Browse the repository at this point in the history
  • Loading branch information
cablehead committed Jun 7, 2024
1 parent 6257da8 commit a5d26a3
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 68 deletions.
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod api;
pub mod spawn;
pub mod http;
pub mod listener;
pub mod store;
71 changes: 3 additions & 68 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::path::PathBuf;

use clap::Parser;

use xs::store::ReadOptions;
use xs::store::Store;

#[derive(Parser, Debug)]
Expand All @@ -28,75 +27,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
});
}

use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};

{
let mut store = store.clone();
let store = store.clone();
tokio::spawn(async move {
let origin = "wss://gateway.discord.gg";
let command = format!(
"websocat {} --ping-interval 5 --ping-timeout 10 -E -t",
origin
);
let mut child = tokio::process::Command::new("sh")
.arg("-c")
.arg(&command)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.spawn()
.expect("Failed to spawn command");

let mut stdin = child.stdin.take().expect("Failed to open stdin");
let stdout = child.stdout.take().expect("Failed to open stdout");

{
let store = store.clone();
tokio::spawn(async move {
let mut recver = store
.read(ReadOptions {
follow: true,
tail: true,
last_id: None,
})
.await;

while let Some(frame) = recver.recv().await {
eprintln!("FRAME: {:?}", &frame.topic);
if frame.topic == "ws.send" {
let content = store.cas_read(&frame.hash.unwrap()).await.unwrap();
let mut content = content;
content.push(b'\n');
eprintln!("CONTENT: {}", std::str::from_utf8(&content).unwrap());
if let Err(e) = stdin.write_all(&content).await {
eprintln!("Failed to write to stdin: {}", e);
break;
}
}
}
});
}

tokio::spawn(async move {
let mut reader = BufReader::new(stdout);
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => break, // EOF
Ok(_) => {
let hash = store.cas_insert(&line).await.unwrap();
let frame = store.append("ws.recv", Some(hash.clone()), None).await;
eprintln!("inserted: {} {:?} :: {:?}", line, hash, frame);
}
Err(e) => {
eprintln!("Failed to read from stdout: {}", e);
break;
}
}
}
});

let _ = child.wait().await;
let res = xs::spawn::spawn(store).await;
eprintln!("peace from spawn: {:?}", res);
});
}

Expand Down
76 changes: 76 additions & 0 deletions src/spawn.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use crate::store::ReadOptions;
use crate::store::Store;

use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};

pub async fn spawn(mut store: Store) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let origin = "wss://gateway.discord.gg";
let command = format!(
"websocat {} --ping-interval 5 --ping-timeout 10 -E -t",
origin
);
let mut child = tokio::process::Command::new("sh")
.arg("-c")
.arg(&command)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.spawn()
.expect("Failed to spawn command");

let mut stdin = child.stdin.take().expect("Failed to open stdin");
let stdout = child.stdout.take().expect("Failed to open stdout");

{
let store = store.clone();
tokio::spawn(async move {
let mut recver = store
.read(ReadOptions {
follow: true,
tail: true,
last_id: None,
})
.await;

while let Some(frame) = recver.recv().await {
eprintln!("FRAME: {:?}", &frame.topic);
if frame.topic == "ws.send" {
let content = store.cas_read(&frame.hash.unwrap()).await.unwrap();
let mut content = content;
content.push(b'\n');
eprintln!("CONTENT: {}", std::str::from_utf8(&content).unwrap());
if let Err(e) = stdin.write_all(&content).await {
eprintln!("Failed to write to stdin: {}", e);
break;
}
}
}
eprintln!("writer: outie");
});
}

tokio::spawn(async move {
let mut reader = BufReader::new(stdout);
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => break, // EOF
Ok(_) => {
let hash = store.cas_insert(&line).await.unwrap();
let frame = store.append("ws.recv", Some(hash.clone()), None).await;
eprintln!("inserted: {} {:?} :: {:?}", line, hash, frame);
}
Err(e) => {
eprintln!("Failed to read from stdout: {}", e);
break;
}
}
}
eprintln!("reader: outie");
});

let _ = child.wait().await;
eprintln!("child: outie");

Ok(())
}

0 comments on commit a5d26a3

Please sign in to comment.