Skip to content

Commit

Permalink
add a tail option to cat
Browse files Browse the repository at this point in the history
  • Loading branch information
cablehead committed Jun 4, 2024
1 parent c16b2f0 commit 7e1493f
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 14 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Built with:
- [x] cat
- [x] last-id
- [x] follow
- [x] tail
- [ ] tac
- [ ] last-id
- [x] get
Expand Down
1 change: 1 addition & 0 deletions src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ async fn handle(
let mut recver = store
.read(ReadOptions {
follow: true,
tail: false,
last_id: Some(frame_id),
})
.await;
Expand Down
31 changes: 18 additions & 13 deletions src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ where
pub struct ReadOptions {
#[serde(default, deserialize_with = "deserialize_bool")]
pub follow: bool,
#[serde(default, deserialize_with = "deserialize_bool")]
pub tail: bool,
#[serde(rename = "last-id")]
pub last_id: Option<Scru128Id>,
}
Expand Down Expand Up @@ -87,19 +89,21 @@ impl Store {
'outer: while let Some(command) = rx.blocking_recv() {
match command {
Command::Read(tx, options) => {
let range = match &options.last_id {
Some(last_id) => (
Bound::Excluded(last_id.to_bytes()),
Bound::<[u8; 16]>::Unbounded,
),
None => (Bound::Unbounded, Bound::Unbounded),
};
for record in store.partition.range(range) {
let record = record.unwrap();
let frame: Frame = serde_json::from_slice(&record.1).unwrap();
if tx.blocking_send(frame).is_err() {
// looks like the tx closed, skip adding it to subscribers
continue 'outer;
if !options.tail {
let range = match &options.last_id {
Some(last_id) => (
Bound::Excluded(last_id.to_bytes()),
Bound::<[u8; 16]>::Unbounded,
),
None => (Bound::Unbounded, Bound::Unbounded),
};
for record in store.partition.range(range) {
let record = record.unwrap();
let frame: Frame = serde_json::from_slice(&record.1).unwrap();
if tx.blocking_send(frame).is_err() {
// looks like the tx closed, skip adding it to subscribers
continue 'outer;
}
}
}
if options.follow {
Expand Down Expand Up @@ -292,6 +296,7 @@ mod tests_store {
let recver = store
.read(ReadOptions {
follow: false,
tail: false,
last_id: Some(f1.id),
})
.await;
Expand Down
3 changes: 2 additions & 1 deletion xs.nu
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ export def cat [
store: string
--last-id: any
--follow
--tail
] {
let path = "/"
let query = ( build-query { "last-id": $last_id, follow: $follow } )
let query = ( build-query { "last-id": $last_id, follow: $follow, tail: $tail } )
let url = $"localhost($path)($query)"
curl -sN --unix-socket $"($store)/sock" $url | lines | each { from json }
}
Expand Down

0 comments on commit 7e1493f

Please sign in to comment.