Skip to content

Commit

Permalink
feat(meta): let stream actors in fragment share same stream node
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jan 26, 2025
1 parent 16121c8 commit e70df67
Show file tree
Hide file tree
Showing 27 changed files with 555 additions and 398 deletions.
4 changes: 3 additions & 1 deletion proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ message TableFragments {
// supported, in which case a default value of 256 (or 1 for singleton) should be used.
// Use `VnodeCountCompat::vnode_count` to access it.
optional uint32 maybe_vnode_count = 8;

stream_plan.StreamNode nodes = 9;
}
// The id of the streaming job.
uint32 table_id = 1;
Expand Down Expand Up @@ -218,7 +220,6 @@ message ListTableFragmentsRequest {
message ListTableFragmentsResponse {
message ActorInfo {
uint32 id = 1;
stream_plan.StreamNode node = 2;
repeated stream_plan.Dispatcher dispatcher = 3;
}
message FragmentInfo {
Expand Down Expand Up @@ -258,6 +259,7 @@ message ListFragmentDistributionResponse {
uint32 fragment_type_mask = 6;
uint32 parallelism = 7;
uint32 vnode_count = 8;
stream_plan.StreamNode node = 9;
}
repeated FragmentDistribution distributions = 1;
}
Expand Down
2 changes: 1 addition & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,7 @@ message StreamActor {

uint32 actor_id = 1;
uint32 fragment_id = 2;
StreamNode nodes = 3;
StreamNode nodes = 3 [deprecated = true];
repeated Dispatcher dispatcher = 4;
// The actors that send messages to this actor.
// Note that upstream actor ids are also stored in the proto of merge nodes.
Expand Down
8 changes: 7 additions & 1 deletion proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ message InjectBarrierRequest {
repeated uint32 table_ids_to_sync = 5;
uint32 partial_graph_id = 6;

message FragmentBuildActorInfo {
uint32 fragment_id = 1;
stream_plan.StreamNode node = 2;
repeated BuildActorInfo actors = 3;
}

message BuildActorInfo {
message UpstreamActors {
repeated uint32 actors = 1;
Expand All @@ -27,7 +33,7 @@ message InjectBarrierRequest {
}

repeated common.ActorInfo broadcast_info = 8;
repeated BuildActorInfo actors_to_build = 9;
repeated FragmentBuildActorInfo actors_to_build = 9;
repeated stream_plan.SubscriptionUpstreamInfo subscriptions_to_add = 10;
repeated stream_plan.SubscriptionUpstreamInfo subscriptions_to_remove = 11;
}
Expand Down
39 changes: 19 additions & 20 deletions src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,11 @@ use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{agg_call_state, StreamNode};

/// A utility for visiting and mutating the [`NodeBody`] of the [`StreamNode`]s recursively.
pub fn visit_stream_node<F>(stream_node: &mut StreamNode, mut f: F)
where
F: FnMut(&mut NodeBody),
{
fn visit_inner<F>(stream_node: &mut StreamNode, f: &mut F)
where
F: FnMut(&mut NodeBody),
{
pub fn visit_stream_node_mut(stream_node: &mut StreamNode, mut f: impl FnMut(&mut NodeBody)) {
visit_stream_node_cont_mut(stream_node, |stream_node| {
f(stream_node.node_body.as_mut().unwrap());
for input in &mut stream_node.input {
visit_inner(input, f);
}
}

visit_inner(stream_node, &mut f)
true
})
}

/// A utility for to accessing the [`StreamNode`] mutably. The returned bool is used to determine whether the access needs to continue.
Expand All @@ -56,6 +46,14 @@ where
visit_inner(stream_node, &mut f)
}

/// A utility for visiting the [`NodeBody`] of the [`StreamNode`]s recursively.
pub fn visit_stream_node(stream_node: &StreamNode, mut f: impl FnMut(&NodeBody)) {
visit_stream_node_cont(stream_node, |stream_node| {
f(stream_node.node_body.as_ref().unwrap());
true
})
}

/// A utility for to accessing the [`StreamNode`] immutably. The returned bool is used to determine whether the access needs to continue.
pub fn visit_stream_node_cont<F>(stream_node: &StreamNode, mut f: F)
where
Expand All @@ -78,11 +76,12 @@ where

/// A utility for visiting and mutating the [`NodeBody`] of the [`StreamNode`]s in a
/// [`StreamFragment`] recursively.
pub fn visit_fragment<F>(fragment: &mut StreamFragment, f: F)
where
F: FnMut(&mut NodeBody),
{
visit_stream_node(fragment.node.as_mut().unwrap(), f)
pub fn visit_fragment_mut(fragment: &mut StreamFragment, f: impl FnMut(&mut NodeBody)) {
visit_stream_node_mut(fragment.node.as_mut().unwrap(), f)
}

pub fn visit_fragment(fragment: &StreamFragment, f: impl FnMut(&NodeBody)) {
visit_stream_node(fragment.node.as_ref().unwrap(), f)
}

/// Visit the tables of a [`StreamNode`].
Expand Down Expand Up @@ -279,7 +278,7 @@ pub fn visit_stream_node_tables_inner<F>(
}
};
if visit_child_recursively {
visit_stream_node(stream_node, visit_body)
visit_stream_node_mut(stream_node, visit_body)
} else {
visit_body(stream_node.node_body.as_mut().unwrap())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ struct RwActorInfo {
#[primary_key]
actor_id: i32,
fragment_id: i32,
node: JsonbVal,
dispatcher: JsonbVal,
}

Expand All @@ -47,7 +46,6 @@ async fn read_rw_actors(reader: &SysCatalogReaderImpl) -> Result<Vec<RwActorInfo
fragment.actors.into_iter().map(move |actor| RwActorInfo {
actor_id: actor.id as _,
fragment_id: fragment_id as _,
node: json!(actor.node).into(),
dispatcher: json!(actor.dispatcher).into(),
})
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::Fields;
use risingwave_common::types::{Fields, JsonbVal};
use risingwave_frontend_macro::system_catalog;
use risingwave_pb::stream_plan::FragmentTypeFlag;
use serde_json::json;

use crate::catalog::system_catalog::SysCatalogReaderImpl;
use crate::error::Result;
Expand All @@ -30,6 +31,7 @@ struct RwFragment {
flags: Vec<String>,
parallelism: i32,
max_parallelism: i32,
node: JsonbVal,
}

pub(super) fn extract_fragment_type_flag(mask: u32) -> Vec<FragmentTypeFlag> {
Expand Down Expand Up @@ -73,6 +75,7 @@ async fn read_rw_fragment(reader: &SysCatalogReaderImpl) -> Result<Vec<RwFragmen
.collect(),
parallelism: distribution.parallelism as i32,
max_parallelism: distribution.vnode_count as i32,
node: json!(distribution.node).into(),
})
.collect())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,8 @@ fn read_rw_sinks_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwSink>> {
"rw_catalog.rw_sink_decouple",
"WITH decoupled_sink_internal_table_ids AS (
SELECT
distinct (node->'sink'->'table'->'id')::int as internal_table_id
FROM rw_catalog.rw_actor_infos actor
JOIN
rw_catalog.rw_fragments fragment
ON actor.fragment_id = fragment.fragment_id
(node->'sink'->'table'->'id')::int as internal_table_id
FROM rw_catalog.rw_fragments
WHERE
'SINK' = any(flags)
AND
Expand Down
2 changes: 1 addition & 1 deletion src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ impl StreamManagerService for StreamServiceImpl {
.into_iter()
.map(|actor| ActorInfo {
id: actor.actor_id,
node: actor.nodes,
dispatcher: actor.dispatcher,
})
.collect_vec(),
Expand Down Expand Up @@ -309,6 +308,7 @@ impl StreamManagerService for StreamServiceImpl {
fragment_type_mask: fragment_desc.fragment_type_mask as _,
parallelism: fragment_desc.parallelism as _,
vnode_count: fragment_desc.vnode_count as _,
node: Some(fragment_desc.stream_node.to_protobuf()),
},
)
.collect_vec();
Expand Down
9 changes: 2 additions & 7 deletions src/meta/src/barrier/checkpoint/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1009,14 +1009,9 @@ impl DatabaseCheckpointControl {
"must not set previously"
);
}
for stream_actor in info
.stream_job_fragments
.fragments
.values_mut()
.flat_map(|fragment| fragment.actors.iter_mut())
{
for fragment in info.stream_job_fragments.fragments.values_mut() {
fill_snapshot_backfill_epoch(
stream_actor.nodes.as_mut().expect("should exist"),
fragment.nodes.as_mut().expect("should exist"),
&snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch,
)?;
}
Expand Down
12 changes: 4 additions & 8 deletions src/meta/src/barrier/checkpoint/creating_job/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::mem::take;

use risingwave_common::hash::ActorId;
use risingwave_common::util::epoch::Epoch;
use risingwave_meta_model::WorkerId;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
use risingwave_pb::stream_service::barrier_complete_response::{
Expand All @@ -28,7 +27,7 @@ use tracing::warn;

use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::{BarrierInfo, BarrierKind, TracedEpoch};
use crate::model::StreamActorWithUpstreams;
use crate::model::StreamJobActorsToCreate;

#[derive(Debug)]
pub(super) struct CreateMviewLogStoreProgressTracker {
Expand Down Expand Up @@ -110,7 +109,7 @@ pub(super) enum CreatingStreamingJobStatus {
pending_non_checkpoint_barriers: Vec<u64>,
/// Info of the first barrier: (`actors_to_create`, `mutation`)
/// Take the mutation out when injecting the first barrier
initial_barrier_info: Option<(HashMap<WorkerId, Vec<StreamActorWithUpstreams>>, Mutation)>,
initial_barrier_info: Option<(StreamJobActorsToCreate, Mutation)>,
},
/// The creating job is consuming log store.
///
Expand All @@ -126,7 +125,7 @@ pub(super) enum CreatingStreamingJobStatus {

pub(super) struct CreatingJobInjectBarrierInfo {
pub barrier_info: BarrierInfo,
pub new_actors: Option<HashMap<WorkerId, Vec<StreamActorWithUpstreams>>>,
pub new_actors: Option<StreamJobActorsToCreate>,
pub mutation: Option<Mutation>,
}

Expand Down Expand Up @@ -252,10 +251,7 @@ impl CreatingStreamingJobStatus {
pub(super) fn new_fake_barrier(
prev_epoch_fake_physical_time: &mut u64,
pending_non_checkpoint_barriers: &mut Vec<u64>,
initial_barrier_info: &mut Option<(
HashMap<WorkerId, Vec<StreamActorWithUpstreams>>,
Mutation,
)>,
initial_barrier_info: &mut Option<(StreamJobActorsToCreate, Mutation)>,
is_checkpoint: bool,
) -> CreatingJobInjectBarrierInfo {
{
Expand Down
36 changes: 28 additions & 8 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,16 @@ use risingwave_pb::stream_plan::{
use risingwave_pb::stream_service::BarrierCompleteResponse;
use tracing::warn;

use super::info::{CommandFragmentChanges, InflightStreamingJobInfo};
use super::info::{CommandFragmentChanges, InflightDatabaseInfo, InflightStreamingJobInfo};
use crate::barrier::info::BarrierInfo;
use crate::barrier::utils::collect_resp_info;
use crate::barrier::InflightSubscriptionInfo;
use crate::controller::fragment::InflightFragmentInfo;
use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
use crate::manager::{StreamingJob, StreamingJobType};
use crate::model::{
ActorId, DispatcherId, FragmentId, StreamActorWithUpstreams, StreamJobFragments,
ActorId, DispatcherId, FragmentId, StreamActorWithUpstreams, StreamJobActorsToCreate,
StreamJobFragments,
};
use crate::stream::{
build_actor_connector_splits, JobReschedulePostUpdates, SplitAssignment, ThrottleConfig,
Expand Down Expand Up @@ -121,6 +122,8 @@ impl ReplaceStreamJobPlan {
let fragment_change = CommandFragmentChanges::NewFragment(
self.streaming_job.id().into(),
InflightFragmentInfo {
fragment_id: fragment.fragment_id,
nodes: fragment.nodes.clone().unwrap(),
actors: fragment
.actors
.iter()
Expand Down Expand Up @@ -207,6 +210,8 @@ impl CreateStreamingJobCommandInfo {
(
fragment.fragment_id,
InflightFragmentInfo {
fragment_id: fragment.fragment_id,
nodes: fragment.nodes.clone().unwrap(),
actors: fragment
.actors
.iter()
Expand Down Expand Up @@ -954,7 +959,10 @@ impl Command {
mutation
}

pub fn actors_to_create(&self) -> Option<HashMap<WorkerId, Vec<StreamActorWithUpstreams>>> {
pub fn actors_to_create(
&self,
graph_info: &InflightDatabaseInfo,
) -> Option<StreamJobActorsToCreate> {
match self {
Command::CreateStreamingJob { info, job_type } => {
let mut map = match job_type {
Expand All @@ -973,13 +981,25 @@ impl Command {
Some(map)
}
Command::RescheduleFragment { reschedules, .. } => {
let mut map: HashMap<WorkerId, Vec<_>> = HashMap::new();
for (actor, status) in reschedules
.values()
.flat_map(|reschedule| reschedule.newly_created_actors.iter())
let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>)>> = HashMap::new();
for (fragment_id, actor, status) in
reschedules.iter().flat_map(|(fragment_id, reschedule)| {
reschedule
.newly_created_actors
.iter()
.map(|(actors, status)| (*fragment_id, actors, status))
})
{
let worker_id = status.location.as_ref().unwrap().worker_node_id as _;
map.entry(worker_id).or_default().push(actor.clone());
map.entry(worker_id)
.or_default()
.entry(fragment_id)
.or_insert_with(|| {
let node = graph_info.fragment(fragment_id).nodes.clone();
(node, vec![])
})
.1
.push(actor.clone());
}
Some(map)
}
Expand Down
10 changes: 10 additions & 0 deletions src/meta/src/barrier/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ impl InflightDatabaseInfo {
pub fn contains_job(&self, job_id: TableId) -> bool {
self.jobs.contains_key(&job_id)
}

pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
let job_id = self.fragment_location[&fragment_id];
self.jobs
.get(&job_id)
.expect("should exist")
.fragment_infos
.get(&fragment_id)
.expect("should exist")
}
}

impl InflightDatabaseInfo {
Expand Down
Loading

0 comments on commit e70df67

Please sign in to comment.