Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): let stream actors in fragment share same stream node #20310

Open
wants to merge 2 commits into
base: yiming/fragment-aware-actor-graph-builder
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ message TableFragments {
// supported, in which case a default value of 256 (or 1 for singleton) should be used.
// Use `VnodeCountCompat::vnode_count` to access it.
optional uint32 maybe_vnode_count = 8;

stream_plan.StreamNode nodes = 9;
}
// The id of the streaming job.
uint32 table_id = 1;
Expand Down Expand Up @@ -218,7 +220,6 @@ message ListTableFragmentsRequest {
message ListTableFragmentsResponse {
message ActorInfo {
uint32 id = 1;
stream_plan.StreamNode node = 2;
repeated stream_plan.Dispatcher dispatcher = 3;
}
message FragmentInfo {
Expand Down Expand Up @@ -258,6 +259,7 @@ message ListFragmentDistributionResponse {
uint32 fragment_type_mask = 6;
uint32 parallelism = 7;
uint32 vnode_count = 8;
stream_plan.StreamNode node = 9;
}
repeated FragmentDistribution distributions = 1;
}
Expand Down
2 changes: 1 addition & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -965,7 +965,7 @@ message StreamActor {

uint32 actor_id = 1;
uint32 fragment_id = 2;
StreamNode nodes = 3;
StreamNode nodes = 3 [deprecated = true];
repeated Dispatcher dispatcher = 4;
// The actors that send messages to this actor.
// Note that upstream actor ids are also stored in the proto of merge nodes.
Expand Down
8 changes: 7 additions & 1 deletion proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ message InjectBarrierRequest {
repeated uint32 table_ids_to_sync = 5;
uint32 partial_graph_id = 6;

message FragmentBuildActorInfo {
uint32 fragment_id = 1;
stream_plan.StreamNode node = 2;
repeated BuildActorInfo actors = 3;
}

message BuildActorInfo {
message UpstreamActors {
repeated uint32 actors = 1;
Expand All @@ -27,7 +33,7 @@ message InjectBarrierRequest {
}

repeated common.ActorInfo broadcast_info = 8;
repeated BuildActorInfo actors_to_build = 9;
repeated FragmentBuildActorInfo actors_to_build = 9;
repeated stream_plan.SubscriptionUpstreamInfo subscriptions_to_add = 10;
repeated stream_plan.SubscriptionUpstreamInfo subscriptions_to_remove = 11;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ struct RwActorInfo {
#[primary_key]
actor_id: i32,
fragment_id: i32,
node: JsonbVal,
dispatcher: JsonbVal,
}

Expand All @@ -47,7 +46,6 @@ async fn read_rw_actors(reader: &SysCatalogReaderImpl) -> Result<Vec<RwActorInfo
fragment.actors.into_iter().map(move |actor| RwActorInfo {
actor_id: actor.id as _,
fragment_id: fragment_id as _,
node: json!(actor.node).into(),
dispatcher: json!(actor.dispatcher).into(),
})
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::Fields;
use risingwave_common::types::{Fields, JsonbVal};
use risingwave_frontend_macro::system_catalog;
use risingwave_pb::stream_plan::FragmentTypeFlag;
use serde_json::json;

use crate::catalog::system_catalog::SysCatalogReaderImpl;
use crate::error::Result;
Expand All @@ -30,6 +31,7 @@ struct RwFragment {
flags: Vec<String>,
parallelism: i32,
max_parallelism: i32,
node: JsonbVal,
}

pub(super) fn extract_fragment_type_flag(mask: u32) -> Vec<FragmentTypeFlag> {
Expand Down Expand Up @@ -73,6 +75,7 @@ async fn read_rw_fragment(reader: &SysCatalogReaderImpl) -> Result<Vec<RwFragmen
.collect(),
parallelism: distribution.parallelism as i32,
max_parallelism: distribution.vnode_count as i32,
node: json!(distribution.node).into(),
})
.collect())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,8 @@ fn read_rw_sinks_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwSink>> {
"rw_catalog.rw_sink_decouple",
"WITH decoupled_sink_internal_table_ids AS (
SELECT
distinct (node->'sink'->'table'->'id')::int as internal_table_id
FROM rw_catalog.rw_actor_infos actor
JOIN
rw_catalog.rw_fragments fragment
ON actor.fragment_id = fragment.fragment_id
(node->'sink'->'table'->'id')::int as internal_table_id
FROM rw_catalog.rw_fragments
WHERE
'SINK' = any(flags)
AND
Expand Down
2 changes: 1 addition & 1 deletion src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ impl StreamManagerService for StreamServiceImpl {
.into_iter()
.map(|actor| ActorInfo {
id: actor.actor_id,
node: actor.nodes,
dispatcher: actor.dispatcher,
})
.collect_vec(),
Expand Down Expand Up @@ -309,6 +308,7 @@ impl StreamManagerService for StreamServiceImpl {
fragment_type_mask: fragment_desc.fragment_type_mask as _,
parallelism: fragment_desc.parallelism as _,
vnode_count: fragment_desc.vnode_count as _,
node: Some(fragment_desc.stream_node.to_protobuf()),
},
)
.collect_vec();
Expand Down
9 changes: 2 additions & 7 deletions src/meta/src/barrier/checkpoint/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1009,14 +1009,9 @@ impl DatabaseCheckpointControl {
"must not set previously"
);
}
for stream_actor in info
.stream_job_fragments
.fragments
.values_mut()
.flat_map(|fragment| fragment.actors.iter_mut())
{
for fragment in info.stream_job_fragments.fragments.values_mut() {
fill_snapshot_backfill_epoch(
stream_actor.nodes.as_mut().expect("should exist"),
fragment.nodes.as_mut().expect("should exist"),
&snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch,
)?;
}
Expand Down
12 changes: 4 additions & 8 deletions src/meta/src/barrier/checkpoint/creating_job/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::mem::take;

use risingwave_common::hash::ActorId;
use risingwave_common::util::epoch::Epoch;
use risingwave_meta_model::WorkerId;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
use risingwave_pb::stream_service::barrier_complete_response::{
Expand All @@ -28,7 +27,7 @@ use tracing::warn;

use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::{BarrierInfo, BarrierKind, TracedEpoch};
use crate::model::StreamActorWithUpstreams;
use crate::model::StreamJobActorsToCreate;

#[derive(Debug)]
pub(super) struct CreateMviewLogStoreProgressTracker {
Expand Down Expand Up @@ -110,7 +109,7 @@ pub(super) enum CreatingStreamingJobStatus {
pending_non_checkpoint_barriers: Vec<u64>,
/// Info of the first barrier: (`actors_to_create`, `mutation`)
/// Take the mutation out when injecting the first barrier
initial_barrier_info: Option<(HashMap<WorkerId, Vec<StreamActorWithUpstreams>>, Mutation)>,
initial_barrier_info: Option<(StreamJobActorsToCreate, Mutation)>,
},
/// The creating job is consuming log store.
///
Expand All @@ -126,7 +125,7 @@ pub(super) enum CreatingStreamingJobStatus {

pub(super) struct CreatingJobInjectBarrierInfo {
pub barrier_info: BarrierInfo,
pub new_actors: Option<HashMap<WorkerId, Vec<StreamActorWithUpstreams>>>,
pub new_actors: Option<StreamJobActorsToCreate>,
pub mutation: Option<Mutation>,
}

Expand Down Expand Up @@ -252,10 +251,7 @@ impl CreatingStreamingJobStatus {
pub(super) fn new_fake_barrier(
prev_epoch_fake_physical_time: &mut u64,
pending_non_checkpoint_barriers: &mut Vec<u64>,
initial_barrier_info: &mut Option<(
HashMap<WorkerId, Vec<StreamActorWithUpstreams>>,
Mutation,
)>,
initial_barrier_info: &mut Option<(StreamJobActorsToCreate, Mutation)>,
is_checkpoint: bool,
) -> CreatingJobInjectBarrierInfo {
{
Expand Down
64 changes: 50 additions & 14 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,16 @@ use risingwave_pb::stream_plan::{
use risingwave_pb::stream_service::BarrierCompleteResponse;
use tracing::warn;

use super::info::{CommandFragmentChanges, InflightStreamingJobInfo};
use super::info::{CommandFragmentChanges, InflightDatabaseInfo, InflightStreamingJobInfo};
use crate::barrier::info::BarrierInfo;
use crate::barrier::utils::collect_resp_info;
use crate::barrier::InflightSubscriptionInfo;
use crate::controller::fragment::InflightFragmentInfo;
use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
use crate::manager::{StreamingJob, StreamingJobType};
use crate::model::{
ActorId, DispatcherId, FragmentId, StreamActorWithUpstreams, StreamJobFragments,
ActorId, DispatcherId, FragmentId, StreamActorWithUpstreams, StreamJobActorsToCreate,
StreamJobFragments,
};
use crate::stream::{
build_actor_connector_splits, JobReschedulePostUpdates, SplitAssignment, ThrottleConfig,
Expand Down Expand Up @@ -121,6 +122,8 @@ impl ReplaceStreamJobPlan {
let fragment_change = CommandFragmentChanges::NewFragment(
self.streaming_job.id().into(),
InflightFragmentInfo {
fragment_id: fragment.fragment_id,
nodes: fragment.nodes.clone().unwrap(),
actors: fragment
.actors
.iter()
Expand All @@ -142,14 +145,30 @@ impl ReplaceStreamJobPlan {
.collect(),
},
);
assert!(fragment_changes
.insert(fragment.fragment_id, fragment_change)
.is_none());
fragment_changes
.try_insert(fragment.fragment_id, fragment_change)
.expect("non-duplicate");
}
for fragment in self.old_fragments.fragments.values() {
assert!(fragment_changes
.insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
.is_none());
fragment_changes
.try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
.expect("non-duplicate");
}
for (fragment_id, merge_updates) in &self.merge_updates {
let replace_map = merge_updates
.iter()
.filter_map(|m| {
m.new_upstream_fragment_id.map(|new_upstream_fragment_id| {
(m.upstream_fragment_id, new_upstream_fragment_id)
})
})
.collect();
fragment_changes
.try_insert(
*fragment_id,
CommandFragmentChanges::ReplaceNodeUpstream(replace_map),
)
.expect("non-duplicate");
}
fragment_changes
}
Expand Down Expand Up @@ -208,6 +227,8 @@ impl CreateStreamingJobCommandInfo {
(
fragment.fragment_id,
InflightFragmentInfo {
fragment_id: fragment.fragment_id,
nodes: fragment.nodes.clone().unwrap(),
actors: fragment
.actors
.iter()
Expand Down Expand Up @@ -956,7 +977,10 @@ impl Command {
mutation
}

pub fn actors_to_create(&self) -> Option<HashMap<WorkerId, Vec<StreamActorWithUpstreams>>> {
pub fn actors_to_create(
&self,
graph_info: &InflightDatabaseInfo,
) -> Option<StreamJobActorsToCreate> {
match self {
Command::CreateStreamingJob { info, job_type } => {
let mut map = match job_type {
Expand All @@ -975,13 +999,25 @@ impl Command {
Some(map)
}
Command::RescheduleFragment { reschedules, .. } => {
let mut map: HashMap<WorkerId, Vec<_>> = HashMap::new();
for (actor, status) in reschedules
.values()
.flat_map(|reschedule| reschedule.newly_created_actors.iter())
let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>)>> = HashMap::new();
for (fragment_id, actor, status) in
reschedules.iter().flat_map(|(fragment_id, reschedule)| {
reschedule
.newly_created_actors
.iter()
.map(|(actors, status)| (*fragment_id, actors, status))
})
{
let worker_id = status.location.as_ref().unwrap().worker_node_id as _;
map.entry(worker_id).or_default().push(actor.clone());
map.entry(worker_id)
.or_default()
.entry(fragment_id)
.or_insert_with(|| {
let node = graph_info.fragment(fragment_id).nodes.clone();
(node, vec![])
})
.1
.push(actor.clone());
}
Some(map)
}
Expand Down
Loading