-
Notifications
You must be signed in to change notification settings - Fork 599
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(stream): add synced logstore #20204
base: main
Are you sure you want to change the base?
Conversation
e7c0130
to
31d6e5d
Compare
//! This is done by constructing a `flushed_chunk_future` which will read the log store | ||
//! using the `seq_id`. | ||
//! - Barrier, | ||
//! because they are directly propagated from the upstream when polling it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See this docstring for the design.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So what does "synced" mean here? (and what is unsynced logstore?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unsynced logstore is our current design for the sink logstore, where we have 2 separate streams for reading from and to the logstore.
Synced means that we have a single stream instead, rather than 2 concurrently running ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not very familiar with our current design for the sink logstore. So why can't we use the "unsynced" version here? What problems do the "synced" version want to solve?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, why sink into table needs this new logstore, instead of the old one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not very familiar with our current design for the sink logstore. So why can't we use the "unsynced" version here? What problems do the "synced" version want to solve?
Updated in PR description. Briefly:
- Truncate the logstore before barrier gets committed, rather than asynchronously. This ensures exactly-once semantics.
- Yield barriers downstream from the writer side. External sink does not need barriers so the desynced logstore does not have barriers yielded downstream.
This pull request has been modified. If you want me to regenerate unit test for any of the files related, please find the file in "Files Changed" tab and add a comment |
ea8d543
to
48bea0d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM. The implementation is similar to the sink logstore, but we need to take care of the watermark truncation which is related to compaction.
table_id, | ||
metrics, | ||
state_store, | ||
barrier.epoch.curr, // FIXME(kwannoel): Should this be curr or prev? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to be the prev barrier. It works like backill snapshot read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leave some early comments. Will continue review the executor implementation. At first glance, the implementation seems to be too complicated. Besides, it seems that update vnode bitmap is not handled in this PR.
src/stream/Cargo.toml
Outdated
@@ -72,6 +72,7 @@ tokio-retry = "0.3" | |||
tokio-stream = { workspace = true } | |||
tonic = { workspace = true } | |||
tracing = "0.1" | |||
tracing-subscriber = "0.3.17" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems to be used only in test. May add to dev-dependencies
.
type StateStoreStream<S> = Pin<Box<LogStoreItemMergeStream<TimeoutAutoRebuildIter<S>>>>; | ||
type ReadFlushedChunkFuture = BoxFuture<'static, LogStoreResult<(ChunkId, StreamChunk, u64)>>; | ||
|
||
struct SyncedKvLogStoreExecutor<S: StateStore, LS: LocalStateStore> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only need a single generic type S: StateStore
. LS
is S::Local
.
@@ -28,7 +28,7 @@ use crate::common::log_store_impl::kv_log_store::{ | |||
}; | |||
|
|||
#[derive(Clone)] | |||
pub(crate) enum LogStoreBufferItem { | |||
pub enum LogStoreBufferItem { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The synced_kv_log_store
is also in the same crate, so not need to extend the visibility.
@@ -32,11 +32,11 @@ use crate::common::log_store_impl::kv_log_store::serde::LogStoreRowSerde; | |||
use crate::common::log_store_impl::kv_log_store::writer::KvLogStoreWriter; | |||
use crate::executor::monitor::StreamingMetrics; | |||
|
|||
mod buffer; | |||
mod reader; | |||
pub mod buffer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub(crate)
should be enough for the changes in this file.
@@ -672,6 +598,107 @@ impl<S: StateStoreRead + Clone> LogReader for KvLogStoreReader<S> { | |||
} | |||
} | |||
|
|||
#[allow(clippy::too_many_arguments)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are the read_flushed_chunk
and read_persisted_log_store
just a simple copy-paste from the original code without any change in the logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. These are refactored and reused in the synced_log_store executor.
let Some(msg) = input.next().await else { | ||
bail!("Expected a barrier message, got end of stream") | ||
}; | ||
let barrier = match msg? { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use expect_first_barrier
.
Message::Barrier(barrier) => barrier, | ||
other => bail!("Expected a barrier message, got {:?}", other), | ||
}; | ||
let mut state_store_stream = Some( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should yield the barrier first and then init the local state store. Otherwise, we will get stuck at LocalStateStore::init
.
metrics: &mut KvLogStoreMetrics, | ||
truncation_offset: Option<ReaderTruncationOffsetType>, | ||
seq_id: &mut SeqIdType, | ||
buffer: &mut Mutex<SyncedLogStoreBuffer>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current code, the Mutex<SyncedLogStoreBuffer>
always has the mutable reference. Therefore, it's likely that we don't have to wrap the buffer with the Mutex
.
d3629fd
to
5cccf42
Compare
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Add synced logstore for unaligned join and decoupled sink into table.
We have 2 futures we repeatedly construct and poll:
May look at #20204 (comment) for the implementation design.
We can't use the desynced (existing) logstore because:
TODOs:
Checklist
Documentation
Release note