From 46e3559fb5477a374bf91f67555ca9d8a78b8597 Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Mon, 20 May 2024 09:52:13 +0800 Subject: [PATCH 1/4] fix(interactive): refine rust otel export (#3810) --- .../graphscope-store/templates/configmap.yaml | 9 +- charts/graphscope-store/values.yaml | 1 + .../assembly/src/conf/groot/config.template | 3 +- .../groot/common/config/CommonConfig.java | 5 - .../common/config/CoordinatorConfig.java | 2 +- .../groot/common/config/StoreConfig.java | 7 +- .../groot/src/executor/gaia/gaia_server.rs | 9 +- .../engine/pegasus/pegasus/src/config.rs | 5 +- .../executor/engine/pegasus/server/build.rs | 1 + .../tests/standalone2/server_config.toml | 1 - .../executor/engine/pegasus/server/src/rpc.rs | 104 +++++++++++++++--- .../com/alibaba/graphscope/groot/Utils.java | 3 + .../groot/coordinator/LogRecycler.java | 1 + .../groot/frontend/ClientService.java | 1 + .../groot/store/KafkaProcessor.java | 8 +- .../groot/store/PartitionService.java | 3 +- .../graphscope/groot/store/WriterAgent.java | 4 +- .../groot/tests/store/WriterAgentTest.java | 7 +- 18 files changed, 116 insertions(+), 58 deletions(-) diff --git a/charts/graphscope-store/templates/configmap.yaml b/charts/graphscope-store/templates/configmap.yaml index d08e431050f2..40eaa708e844 100644 --- a/charts/graphscope-store/templates/configmap.yaml +++ b/charts/graphscope-store/templates/configmap.yaml @@ -20,11 +20,13 @@ data: discovery.mode={{ .Values.discoveryMode }} role.name=ROLE node.idx=INDEX + release.full.name={{ include "graphscope-store.fullname" . }} {{- else }} rpc.port=0 discovery.mode=zookeeper role.name="" node.idx=0 + release.full.name=localhost {{- end }} store.node.count={{ .Values.store.replicaCount }} @@ -58,7 +60,6 @@ data: neo4j.bolt.server.disabled=true log4rs.config=LOG4RS_CONFIG - release.full.name={{ include "graphscope-store.fullname" . }} ## Auth config auth.username={{ .Values.auth.username }} auth.password={{ .Values.auth.password }} @@ -79,7 +80,6 @@ data: store.gc.interval.ms={{ .Values.storeGcIntervalMs }} write.ha.enabled={{ .Values.backup.enabled }} - tracing.enabled={{ .Values.otel.enabled }} ## Coordinator Config rpc.max.bytes.mb={{ .Values.rpcMaxBytesMb }} @@ -137,12 +137,13 @@ data: export OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE=DELTA export OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION=BASE2_EXPONENTIAL_BUCKET_HISTOGRAM - export OTEL_EXPORTER_OTLP_COMPRESSION=gzip + # export OTEL_EXPORTER_OTLP_COMPRESSION=gzip {{- end }} {{- if .Values.uptrace.enabled }} + export UPTRACE_DSN=http://{{ .Values.uptrace.token }}@${{ .Values.uptrace.service }}:14318?grpc=14317 export OTEL_EXPORTER_OTLP_ENDPOINT=http://{{ .Values.uptrace.service }}:14317 - export OTEL_EXPORTER_OTLP_HEADERS=uptrace-dsn=http://{{ .Values.uptrace.token }}@${{ .Values.uptrace.service }}:14318?grpc=14317 + export OTEL_EXPORTER_OTLP_HEADERS=uptrace-dsn=${UPTRACE_DSN} {{- end }} diff --git a/charts/graphscope-store/values.yaml b/charts/graphscope-store/values.yaml index c7a5fc3e9c14..4d92f1565dca 100644 --- a/charts/graphscope-store/values.yaml +++ b/charts/graphscope-store/values.yaml @@ -515,6 +515,7 @@ otel: registry: docker.io repository: jaegertracing/all-in-one tag: "latest" + # https://opentelemetry.io/docs/languages/sdk-configuration/general/#otel_traces_sampler traces: sampler: name: "traceidratio" diff --git a/interactive_engine/assembly/src/conf/groot/config.template b/interactive_engine/assembly/src/conf/groot/config.template index 84a1a4cd3f8f..6fb14b964f85 100644 --- a/interactive_engine/assembly/src/conf/groot/config.template +++ b/interactive_engine/assembly/src/conf/groot/config.template @@ -33,4 +33,5 @@ neo4j.bolt.server.disabled=true pegasus.worker.num=2 pegasus.hosts=localhost:8080 -kafka.test.cluster.enable=true \ No newline at end of file +kafka.test.cluster.enable=true +OTEL_SDK_DISABLED=true diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CommonConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CommonConfig.java index a5ab7010dc19..ce5ff432f524 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CommonConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CommonConfig.java @@ -58,9 +58,6 @@ public class CommonConfig { public static final Config PARTITION_COUNT = Config.intConfig("partition.count", 1); - public static final Config METRIC_UPDATE_INTERVAL_MS = - Config.longConfig("metric.update.interval.ms", 5000L); - public static final Config LOG4RS_CONFIG = Config.stringConfig("log4rs.config", ""); public static final Config DISCOVERY_MODE = @@ -74,8 +71,6 @@ public class CommonConfig { public static final Config SECONDARY_INSTANCE_ENABLED = Config.boolConfig("secondary.instance.enabled", false); - public static final Config TRACING_ENABLED = - Config.boolConfig("tracing.enabled", false); // Create an extra store pod for each original store pod for backup. // Only available in multi pod mode. diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java index b7e502ccb955..3ed9e97f58dc 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java @@ -26,7 +26,7 @@ public class CoordinatorConfig { Config.boolConfig("log.recycle.enable", false); public static final Config LOG_RECYCLE_INTERVAL_SECOND = - Config.longConfig("log.recycle.interval.second", 600L); + Config.longConfig("log.recycle.interval.second", 3600L); public static final Config FILE_META_STORE_PATH = Config.stringConfig("file.meta.store.path", "./meta"); diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/StoreConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/StoreConfig.java index c3107944ad27..35ba45a24570 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/StoreConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/StoreConfig.java @@ -29,17 +29,14 @@ public class StoreConfig { public static final Config STORE_QUEUE_WAIT_MS = Config.longConfig("store.queue.wait.ms", 3000L); - public static final Config STORE_COMMIT_INTERVAL_MS = - Config.longConfig("store.commit.interval.ms", 1000L); - public static final Config STORE_GC_ENABLE = Config.boolConfig("store.gc.enable", true); public static final Config STORE_GC_INTERVAL_MS = - Config.longConfig("store.gc.interval.ms", 5000L); + Config.longConfig("store.gc.interval.ms", 3600000L); public static final Config STORE_CATCHUP_INTERVAL_MS = - Config.longConfig("store.catchup.interval.ms", 30000L); + Config.longConfig("store.catchup.interval.ms", 5000L); // set by IS_SECONDARY_INSTANCE, used in graph.rs public static final Config STORE_STORAGE_ENGINE = diff --git a/interactive_engine/executor/assembly/groot/src/executor/gaia/gaia_server.rs b/interactive_engine/executor/assembly/groot/src/executor/gaia/gaia_server.rs index e120b6b41603..b400d51152b5 100644 --- a/interactive_engine/executor/assembly/groot/src/executor/gaia/gaia_server.rs +++ b/interactive_engine/executor/assembly/groot/src/executor/gaia/gaia_server.rs @@ -209,14 +209,7 @@ fn make_gaia_config(graph_config: Arc) -> GaiaConfig { .no_delay(no_delay) .send_buffer(send_buffer) .heartbeat_sec(heartbeat_sec); - let enable_tracing = graph_config - .get_storage_option("tracing.enabled") - .map(|config_str| { - config_str - .parse() - .expect("parse tracing.enabled failed") - }); - GaiaConfig { network: Some(network_config), max_pool_size, enable_tracing } + GaiaConfig { network: Some(network_config), max_pool_size } } fn make_gaia_rpc_config(graph_config: Arc) -> RPCServerConfig { diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/config.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/config.rs index cd182f9aaa96..5dd3a655869b 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/config.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/config.rs @@ -37,7 +37,6 @@ macro_rules! configure_with_default { pub struct Configuration { pub network: Option, pub max_pool_size: Option, - pub enable_tracing: Option, } impl Configuration { @@ -46,11 +45,11 @@ impl Configuration { } pub fn singleton() -> Self { - Configuration { network: None, max_pool_size: None, enable_tracing: None } + Configuration { network: None, max_pool_size: None } } pub fn with(network: NetworkConfig) -> Self { - Configuration { network: Some(network), max_pool_size: None, enable_tracing: None } + Configuration { network: Some(network), max_pool_size: None } } pub fn server_id(&self) -> u64 { diff --git a/interactive_engine/executor/engine/pegasus/server/build.rs b/interactive_engine/executor/engine/pegasus/server/build.rs index 57591107fd35..841af8d28430 100644 --- a/interactive_engine/executor/engine/pegasus/server/build.rs +++ b/interactive_engine/executor/engine/pegasus/server/build.rs @@ -38,6 +38,7 @@ fn codegen_inplace() -> Result<(), Box> { fn codegen_inplace() -> Result<(), Box> { tonic_build::configure() .build_server(true) + .build_client(true) .compile(&["proto/job_service.proto", "proto/job_plan.proto"], &["proto"])?; Ok(()) } diff --git a/interactive_engine/executor/engine/pegasus/server/config/tests/standalone2/server_config.toml b/interactive_engine/executor/engine/pegasus/server/config/tests/standalone2/server_config.toml index df57aba4b2a9..d1e0019186cb 100644 --- a/interactive_engine/executor/engine/pegasus/server/config/tests/standalone2/server_config.toml +++ b/interactive_engine/executor/engine/pegasus/server/config/tests/standalone2/server_config.toml @@ -1,7 +1,6 @@ # Set max threads will be created in the executor pool; # It will be set to CPU cores by default; max_pool_size = 8 -enable_tracing = false [network] # Set server id of current config belongs to; diff --git a/interactive_engine/executor/engine/pegasus/server/src/rpc.rs b/interactive_engine/executor/engine/pegasus/server/src/rpc.rs index 17ae301c1b82..f5011fafd8cf 100644 --- a/interactive_engine/executor/engine/pegasus/server/src/rpc.rs +++ b/interactive_engine/executor/engine/pegasus/server/src/rpc.rs @@ -27,15 +27,21 @@ use std::time::Duration; use futures::Stream; use hyper::server::accept::Accept; use hyper::server::conn::{AddrIncoming, AddrStream}; -use opentelemetry::trace::TraceContextExt; +use opentelemetry::trace::{TraceContextExt, TraceError}; use opentelemetry::{ global, propagation::Extractor, trace::{Span, SpanKind, Tracer}, KeyValue, }; -use opentelemetry_otlp::WithExportConfig; +use opentelemetry_otlp::{ExportConfig, Protocol, TonicExporterBuilder, WithExportConfig}; +use opentelemetry_sdk::metrics::SdkMeterProvider; use opentelemetry_sdk::propagation::TraceContextPropagator; +use opentelemetry_sdk::resource::{ + EnvResourceDetector, SdkProvidedResourceDetector, TelemetryResourceDetector, +}; +use opentelemetry_sdk::trace::BatchConfigBuilder; +use opentelemetry_sdk::Resource; use pegasus::api::function::FnResult; use pegasus::api::FromStream; use pegasus::errors::{ErrorKind, JobExecError}; @@ -188,7 +194,8 @@ where type SubmitStream = UnboundedReceiverStream>; async fn cancel(&self, req: Request) -> Result, Status> { - let parent_ctx = global::get_text_map_propagator(|prop| prop.extract(&MetadataMap(req.metadata()))); + let parent_ctx = + global::get_text_map_propagator(|prop| prop.extract(&MyMetadataMap(req.metadata()))); let tracer = global::tracer("executor"); let _span = tracer .span_builder("JobService/cancel") @@ -201,7 +208,8 @@ where async fn submit(&self, req: Request) -> Result, Status> { debug!("accept new request from {:?};", req.remote_addr()); - let parent_ctx = global::get_text_map_propagator(|prop| prop.extract(&MetadataMap(req.metadata()))); + let parent_ctx = + global::get_text_map_propagator(|prop| prop.extract(&MyMetadataMap(req.metadata()))); let tracer = global::tracer("executor"); let pb::JobRequest { conf, source, plan, resource } = req.into_inner(); @@ -292,9 +300,7 @@ where D: ServerDetect + 'static, E: ServiceStartListener, { - if server_config.enable_tracing.unwrap_or(false) { - let _tracer = init_tracer().expect("Failed to initialize tracer."); - } + init_otel().expect("Failed to initialize open telemetry"); let server_id = server_config.server_id(); if let Some(server_addr) = pegasus::startup_with(server_config, server_detector)? { listener.on_server_start(server_id, server_addr)?; @@ -378,24 +384,86 @@ impl RPCJobServer { } } -fn init_tracer() -> Result { +fn init_otel() -> Result> { + let otel_disable = std::env::var("OTEL_SDK_DISABLED").unwrap_or("true".to_string()); + info!("otel_disable: {}", otel_disable); + if otel_disable.trim().parse().unwrap() { + info!("OTEL is disabled"); + return Ok(true); + } + + // let mut metadata = tonic::metadata::MetadataMap::with_capacity(1); + // let dsn = std::env::var("UPTRACE_DSN").unwrap_or_default(); + // if !dsn.is_empty() { + // metadata.insert("uptrace-dsn", dsn.parse().unwrap()); + // info!("using DSN: {}", dsn); + // } else { + // warn!("Error: UPTRACE_DSN not found."); + // } + + let default_endpoint = "http://localhost:4317".to_string(); + let endpoint = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT").unwrap_or(default_endpoint); + + let resource = Resource::from_detectors( + Duration::from_secs(0), + vec![ + Box::new(SdkProvidedResourceDetector), + Box::new(EnvResourceDetector::new()), + Box::new(TelemetryResourceDetector), + ], + ); + + let exporter = opentelemetry_otlp::new_exporter() + .tonic() + .with_timeout(Duration::from_secs(5)) + .with_endpoint(endpoint.clone()); + // .with_metadata(metadata.clone()); + let _tracer = init_tracer(resource.clone(), exporter)?; + + let exporter = opentelemetry_otlp::new_exporter() + .tonic() + .with_timeout(Duration::from_secs(5)) + .with_endpoint(endpoint); + // .with_metadata(metadata); + + let _meter = init_meter_provider(resource, exporter)?; + global::set_meter_provider(_meter); + return Ok(true); +} + +fn init_tracer( + resource: Resource, exporter: TonicExporterBuilder, +) -> Result { global::set_text_map_propagator(TraceContextPropagator::new()); + let batch_config = opentelemetry_sdk::trace::BatchConfigBuilder::default() + .with_max_queue_size(2048) + .with_max_export_batch_size(512) + .with_scheduled_delay(Duration::from_millis(5000)) + .build(); + let trace_config = opentelemetry_sdk::trace::config().with_resource(resource); opentelemetry_otlp::new_pipeline() .tracing() - .with_exporter( - opentelemetry_otlp::new_exporter() - .tonic() - .with_endpoint("http://localhost:4317"), - ) - .with_trace_config(opentelemetry_sdk::trace::config().with_resource( - opentelemetry_sdk::Resource::new(vec![KeyValue::new("service.name", "pegasus")]), - )) + .with_exporter(exporter) + .with_batch_config(batch_config) + .with_trace_config(trace_config) .install_batch(opentelemetry_sdk::runtime::Tokio) } -struct MetadataMap<'a>(&'a tonic::metadata::MetadataMap); +fn init_meter_provider( + resource: Resource, exporter: TonicExporterBuilder, +) -> opentelemetry::metrics::Result { + opentelemetry_otlp::new_pipeline() + .metrics(opentelemetry_sdk::runtime::Tokio) + .with_exporter(exporter) + .with_period(Duration::from_secs(15)) + .with_timeout(Duration::from_secs(5)) + .with_resource(resource) + .build() +} + +struct MyMetadataMap<'a>(&'a tonic::metadata::MetadataMap); -impl<'a> Extractor for MetadataMap<'a> { +impl<'a> Extractor for MyMetadataMap<'a> { fn get(&self, key: &str) -> Option<&str> { self.0 .get(key) diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/Utils.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/Utils.java index 381fceff51d0..e34b281104f2 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/Utils.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/Utils.java @@ -41,6 +41,9 @@ public class Utils { public static String getHostTemplate(Configs configs, RoleType role) { String releaseName = DiscoveryConfig.RELEASE_FULL_NAME.get(configs); + if (releaseName.equals("localhost") || releaseName.equals("127.0.0.1")) { + return releaseName; + } // template = "{releaseName}-{role}-{}.{releaseName}-{role}-headless"; // i.e. demo-graphscope-store-frontend-0.demo-graphscope-store-frontend-headless String svcTemplate = "%s-%s"; diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/LogRecycler.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/LogRecycler.java index 01eccae521a3..1c03f6e8ce24 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/LogRecycler.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/LogRecycler.java @@ -84,6 +84,7 @@ private void doRecycle() { List queueOffsets = this.snapshotManager.getQueueOffsets(); for (int i = 0; i < queueOffsets.size(); i++) { long offset = queueOffsets.get(i); + offset = Math.max(offset - 3600, 0); // Leave some spaces try { logService.deleteBeforeOffset(i, offset); logger.info("recycled queue [{}] offset [{}]", i, offset); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java index 596f2f07a590..204ed7dca4f3 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java @@ -75,6 +75,7 @@ public void getPartitionNum( public void prepareDataLoad( PrepareDataLoadRequest request, StreamObserver responseObserver) { + logger.info("Preparing data load"); DdlRequestBatch.Builder builder = DdlRequestBatch.newBuilder(); for (DataLoadTargetPb dataLoadTargetPb : request.getDataLoadTargetsList()) { DataLoadTarget dataLoadTarget = DataLoadTarget.parseProto(dataLoadTargetPb); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java index 68cd0f536ddc..1c6cb6af8131 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java @@ -107,6 +107,11 @@ public void start() { public void stop() { this.shouldStop = true; + try { + updateQueueOffsets(); + } catch (IOException ex) { + logger.error("update queue offset failed", ex); + } if (this.persistOffsetsScheduler != null) { this.persistOffsetsScheduler.shutdown(); try { @@ -163,8 +168,7 @@ private void updateQueueOffsets() throws IOException { boolean changed = false; List consumedOffsets = writerAgent.getConsumedQueueOffsets(); for (int qId = 0; qId < queueOffsets.size(); qId++) { - long minOffset = Long.MAX_VALUE; - minOffset = Math.min(consumedOffsets.get(qId), minOffset); + long minOffset = Math.min(consumedOffsets.get(qId), Long.MAX_VALUE); if (minOffset != Long.MAX_VALUE && minOffset > newQueueOffsets.get(qId)) { newQueueOffsets.set(qId, minOffset); changed = true; diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/PartitionService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/PartitionService.java index a897f32052c0..f96c054c2475 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/PartitionService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/PartitionService.java @@ -24,8 +24,7 @@ public class PartitionService { public PartitionService(Configs configs, StoreService storeService) { this.storeService = storeService; this.isSecondary = CommonConfig.SECONDARY_INSTANCE_ENABLED.get(configs); - this.storeCatchupIntervalMS = StoreConfig.STORE_GC_INTERVAL_MS.get(configs); - // this.storeCatchupIntervalMS = StoreConfig.STORE_CATCHUP_INTERVAL_MS.get(configs); + this.storeCatchupIntervalMS = StoreConfig.STORE_CATCHUP_INTERVAL_MS.get(configs); this.scheduler = Executors.newScheduledThreadPool( diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java index b68f8e7d7cf1..d537032f1082 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java @@ -167,8 +167,8 @@ private void processBatches() { this.consumeSI = batchSI; this.availSnapshotInfoRef.set(new SnapshotInfo(availSI, availDdlSI)); this.commitExecutor.execute(this::asyncCommit); - } else { - logger.warn("consumedSI {} >= batchSI {}, ignored", consumeSI, batchSI); + } else { // a flurry of batches with same snapshot ID + logger.debug("consumedSI {} >= batchSI {}, ignored", consumeSI, batchSI); } if (hasDdl) { this.consumeDdlSnapshotId = batchSI; diff --git a/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/store/WriterAgentTest.java b/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/store/WriterAgentTest.java index fd89d7273194..c71b46d47014 100644 --- a/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/store/WriterAgentTest.java +++ b/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/store/WriterAgentTest.java @@ -17,7 +17,6 @@ import com.alibaba.graphscope.groot.common.config.CommonConfig; import com.alibaba.graphscope.groot.common.config.Configs; -import com.alibaba.graphscope.groot.common.config.StoreConfig; import com.alibaba.graphscope.groot.meta.MetaService; import com.alibaba.graphscope.groot.operation.StoreDataBatch; import com.alibaba.graphscope.groot.rpc.RoleClients; @@ -34,11 +33,7 @@ public class WriterAgentTest { @Test void testWriterAgent() throws InterruptedException, ExecutionException { - Configs configs = - Configs.newBuilder() - .put(CommonConfig.NODE_IDX.getKey(), "0") - .put(StoreConfig.STORE_COMMIT_INTERVAL_MS.getKey(), "10") - .build(); + Configs configs = Configs.newBuilder().put(CommonConfig.NODE_IDX.getKey(), "0").build(); StoreService mockStoreService = mock(StoreService.class); MetaService mockMetaService = mock(MetaService.class); From f1a49013fe4cf11dbc29c680ede151d3c8bb8604 Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Mon, 20 May 2024 10:30:55 +0800 Subject: [PATCH 2/4] fix(interactive): Fix bug in `ExpandGetVFusionRule`, and optimize when `GetV` has imprecise types (#3804) ## What do these changes do? As titled. This pr: 1. Fix a bug in `ExpandGetVFusionRule`. For example, assume we have edge types of `person-likes-comment`, `person-likes-post`, `person-knows-person` in schema. Then in queries, if we want to expand `person-likes-comment`, we would generate a `Expand(likes)+GetV(comment)` (before this fix, we generate a `Expand(likes)` only, which is a bug); And if we want to expand `person-knows-person`, we simply generate a `Expand(knows)`. 2. Optimize cases when the types in GetV is imprecise, to avoid unnecessary filtering in Runtime. ## Related issue number Fixes #3732 #3802 --------- Co-authored-by: Xiaoli Zhou --- .../planner/rules/ExpandGetVFusionRule.java | 110 +++++++++++++++++- .../proto/GraphRelToProtoConverter.java | 21 ++-- .../graphscope/common/ir/tools/Utils.java | 17 +++ .../common/ir/planner/cbo/BITest.java | 70 ++++++----- .../common/ir/planner/cbo/LdbcTest.java | 85 ++++++++------ .../ir/planner/rbo/ExpandGetVFusionTest.java | 77 ++++++++++++ .../runtime/src/process/operator/map/get_v.rs | 10 +- 7 files changed, 311 insertions(+), 79 deletions(-) diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/planner/rules/ExpandGetVFusionRule.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/planner/rules/ExpandGetVFusionRule.java index 8b885b086d0c..5ba60dee80ca 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/planner/rules/ExpandGetVFusionRule.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/planner/rules/ExpandGetVFusionRule.java @@ -16,6 +16,7 @@ package com.alibaba.graphscope.common.ir.planner.rules; +import com.alibaba.graphscope.common.ir.meta.schema.GraphOptTable; import com.alibaba.graphscope.common.ir.rel.graph.GraphLogicalExpand; import com.alibaba.graphscope.common.ir.rel.graph.GraphLogicalGetV; import com.alibaba.graphscope.common.ir.rel.graph.GraphLogicalPathExpand; @@ -23,10 +24,12 @@ import com.alibaba.graphscope.common.ir.rel.graph.GraphPhysicalGetV; import com.alibaba.graphscope.common.ir.tools.AliasInference; import com.alibaba.graphscope.common.ir.tools.config.GraphOpt; +import com.alibaba.graphscope.common.ir.type.GraphLabelType; import com.google.common.collect.ImmutableList; import org.apache.calcite.plan.GraphOptCluster; import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.plan.RelRule; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.rules.TransformationRule; @@ -34,6 +37,10 @@ import org.apache.commons.lang3.ObjectUtils; import org.checkerframework.checker.nullness.qual.Nullable; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + // This rule try to fuse GraphLogicalExpand and GraphLogicalGetV if GraphLogicalExpand has no alias // (that it won't be visited individually later): // 1. if GraphLogicalGetV has no filters, then: @@ -57,7 +64,7 @@ protected RelNode transform(GraphLogicalGetV getV, GraphLogicalExpand expand, Re && getV.getOpt().equals(GraphOpt.GetV.START) || expand.getOpt().equals(GraphOpt.Expand.BOTH) && getV.getOpt().equals(GraphOpt.GetV.OTHER)) { - if (ObjectUtils.isEmpty(getV.getFilters())) { + if (canFuse(getV, expand)) { GraphPhysicalExpand physicalExpand = GraphPhysicalExpand.create( expand.getCluster(), @@ -95,6 +102,107 @@ protected RelNode transform(GraphLogicalGetV getV, GraphLogicalExpand expand, Re } } + private boolean canFuse(GraphLogicalGetV getV, GraphLogicalExpand expand) { + // If GraphLogicalGetV has filters, then we cannot fuse them directly. Instead, we create a + // EdgeExpand(V) with an Auxilia for the filters. + // If GraphLogicalGetV has no filters, then: + // 1. if we want to expand "person-knows->person", and the type in getV is "person", + // we can fuse them into one EdgeExpand with type "knows" (where "knows" will be given in + // EdgeExpand's QueryParam in PhysicalPlan) + // 2. if we want to expand "person-create->post", while in schema, a "create" actually + // consists of "person-create->post" and "person-create->comment", + // we do not fuse them directly. Instead, we create a EdgeExpand(V) with type "create" and + // an Auxilia with type "post" as the filter. + // 3. if we want to expand "person-islocatedin->city", while in schema, a "islocatedin" + // actually + // consists of "person-islocatedin->city" and "post-islocatedin->country". + // Thought the edge type of "islocatedin" may generate "city" and "country", we can still + // fuse them into a single EdgeExpand(V) with type "islocatedin" directly if we can confirm + // that the expand starts from "person". + // 4. a special case is that, currently, for gremlin query like g.V().out("create"), we have + // not infer precise types for getV yet (getV may contain all vertex types). + // In this case, if getV's types contains all the types that expand will generate, we can + // fuse them. + + Set edgeExpandedVLabels = new HashSet<>(); + // the optTables in expand preserves the full schema information for the edges, + // that is, for edge type "create", it contains both "person-create->post" and + // "person-create->comment", "user-create->post" etc. + List optTables = expand.getTableConfig().getTables(); + // the edgeParamLabels in expand preserves the inferred schema information for the edges, + // that is, for edge type "create", it contains only "person-create->post" if user queries + // like g.V().hasLabel("person").out("create").hasLabel("post") + GraphLabelType edgeParamType = + com.alibaba.graphscope.common.ir.tools.Utils.getGraphLabels(expand.getRowType()); + List edgeParamLabels = edgeParamType.getLabelsEntry(); + GraphOpt.Expand direction = expand.getOpt(); + // First, we get all the source vertex types where the edge will be expanded from. + // e.g., expand from "person" + Set edgeExpandedSrcVLabels = new HashSet<>(); + for (GraphLabelType.Entry edgeLabel : edgeParamLabels) { + switch (direction) { + case OUT: + edgeExpandedSrcVLabels.add(edgeLabel.getSrcLabelId()); + break; + case IN: + edgeExpandedSrcVLabels.add(edgeLabel.getDstLabelId()); + break; + case BOTH: + edgeExpandedSrcVLabels.add(edgeLabel.getDstLabelId()); + edgeExpandedSrcVLabels.add(edgeLabel.getSrcLabelId()); + break; + } + } + // Then, we get all the destination vertex types where the edge will be expanded to. + // e.g., expand "likes" + for (RelOptTable optTable : optTables) { + if (optTable instanceof GraphOptTable) { + GraphOptTable graphOptTable = (GraphOptTable) optTable; + List edgeUserGivenParamLabels = + com.alibaba.graphscope.common.ir.tools.Utils.getGraphLabels( + graphOptTable.getRowType()) + .getLabelsEntry(); + for (GraphLabelType.Entry edgeLabel : edgeUserGivenParamLabels) { + switch (direction) { + case OUT: + if (edgeExpandedSrcVLabels.contains(edgeLabel.getSrcLabelId())) { + edgeExpandedVLabels.add(edgeLabel.getDstLabelId()); + } + break; + case IN: + if (edgeExpandedSrcVLabels.contains(edgeLabel.getDstLabelId())) { + edgeExpandedVLabels.add(edgeLabel.getSrcLabelId()); + } + break; + case BOTH: + if (edgeExpandedSrcVLabels.contains(edgeLabel.getSrcLabelId())) { + edgeExpandedVLabels.add(edgeLabel.getDstLabelId()); + } + if (edgeExpandedSrcVLabels.contains(edgeLabel.getDstLabelId())) { + edgeExpandedVLabels.add(edgeLabel.getSrcLabelId()); + } + break; + } + } + } + } + + // Finally, we check if the vertex types in getV to see if the type filter for the expanded + // vertex is necessary. + // e.g., if getV is "post" and expand type is "likes", then we cannot fuse them directly. + // Instead, we should create an EdgeExpand(V) with type "likes" and an Auxilia with type + // "post" as the filter. + List vertexParamLabels = + com.alibaba.graphscope.common.ir.tools.Utils.getGraphLabels(getV.getRowType()) + .getLabelsEntry(); + Set vertexExpandedVLabels = new HashSet<>(); + for (GraphLabelType.Entry vertexLabel : vertexParamLabels) { + vertexExpandedVLabels.add(vertexLabel.getLabelId()); + } + return ObjectUtils.isEmpty(getV.getFilters()) + && vertexExpandedVLabels.containsAll(edgeExpandedVLabels); + } + // transform expande + getv to GraphPhysicalExpandGetV public static class BasicExpandGetVFusionRule extends ExpandGetVFusionRule { diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/proto/GraphRelToProtoConverter.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/proto/GraphRelToProtoConverter.java index b98667944f1f..53e920b3a355 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/proto/GraphRelToProtoConverter.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/proto/GraphRelToProtoConverter.java @@ -33,7 +33,6 @@ import com.alibaba.graphscope.common.ir.tools.config.GraphOpt.PhysicalGetVOpt; import com.alibaba.graphscope.common.ir.type.GraphLabelType; import com.alibaba.graphscope.common.ir.type.GraphNameOrId; -import com.alibaba.graphscope.common.ir.type.GraphSchemaType; import com.alibaba.graphscope.gaia.proto.GraphAlgebra; import com.alibaba.graphscope.gaia.proto.GraphAlgebraPhysical; import com.alibaba.graphscope.gaia.proto.OuterExpression; @@ -170,7 +169,10 @@ public RelNode visit(GraphLogicalGetV getV) { Utils.protoGetVOpt(PhysicalGetVOpt.valueOf(getV.getOpt().name()))); // 1. build adjV without filter GraphAlgebra.QueryParams.Builder adjParamsBuilder = defaultQueryParams(); - addQueryTables(adjParamsBuilder, getGraphLabels(getV).getLabelsEntry()); + addQueryTables( + adjParamsBuilder, + com.alibaba.graphscope.common.ir.tools.Utils.getGraphLabels(getV.getRowType()) + .getLabelsEntry()); adjVertexBuilder.setParams(adjParamsBuilder); if (getV.getStartAlias().getAliasId() != AliasInference.DEFAULT_ID) { adjVertexBuilder.setTag(Utils.asAliasId(getV.getStartAlias().getAliasId())); @@ -975,16 +977,6 @@ private GraphAlgebra.IndexPredicate buildIndexPredicates(RexNode uniqueKeyFilter return indexPredicate; } - private GraphLabelType getGraphLabels(AbstractBindableTableScan tableScan) { - List fields = tableScan.getRowType().getFieldList(); - Preconditions.checkArgument( - !fields.isEmpty() && fields.get(0).getType() instanceof GraphSchemaType, - "data type of graph operators should be %s ", - GraphSchemaType.class); - GraphSchemaType schemaType = (GraphSchemaType) fields.get(0).getType(); - return schemaType.getLabelType(); - } - private GraphAlgebra.QueryParams.Builder defaultQueryParams() { GraphAlgebra.QueryParams.Builder paramsBuilder = GraphAlgebra.QueryParams.newBuilder(); // TODO: currently no sample rate fused into tableScan, so directly set 1.0 as default. @@ -1022,7 +1014,10 @@ private void addQueryColumns( private GraphAlgebra.QueryParams.Builder buildQueryParams(AbstractBindableTableScan tableScan) { GraphAlgebra.QueryParams.Builder paramsBuilder = defaultQueryParams(); - addQueryTables(paramsBuilder, getGraphLabels(tableScan).getLabelsEntry()); + addQueryTables( + paramsBuilder, + com.alibaba.graphscope.common.ir.tools.Utils.getGraphLabels(tableScan.getRowType()) + .getLabelsEntry()); addQueryFilters(paramsBuilder, tableScan.getFilters()); return paramsBuilder; } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/Utils.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/Utils.java index 0d38252fa7e9..3e5c72e610d0 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/Utils.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/Utils.java @@ -18,6 +18,9 @@ import com.alibaba.graphscope.common.ir.meta.schema.CommonOptTable; import com.alibaba.graphscope.common.ir.rel.CommonTableScan; +import com.alibaba.graphscope.common.ir.type.GraphLabelType; +import com.alibaba.graphscope.common.ir.type.GraphSchemaType; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Range; import com.google.common.collect.Sets; @@ -81,6 +84,20 @@ public static List getValuesAsList(Comparable value) { return values; } + public static GraphLabelType getGraphLabels(RelDataType rowType) { + if (rowType instanceof GraphSchemaType) { + return ((GraphSchemaType) rowType).getLabelType(); + } else { + List fields = rowType.getFieldList(); + Preconditions.checkArgument( + !fields.isEmpty() && fields.get(0).getType() instanceof GraphSchemaType, + "data type of graph operators should be %s ", + GraphSchemaType.class); + GraphSchemaType schemaType = (GraphSchemaType) fields.get(0).getType(); + return schemaType.getLabelType(); + } + } + /** * print root {@code RelNode} and nested {@code RelNode}s in each {@code CommonTableScan} * @param node diff --git a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/cbo/BITest.java b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/cbo/BITest.java index a70d3eafcc78..479d4514704f 100644 --- a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/cbo/BITest.java +++ b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/cbo/BITest.java @@ -196,13 +196,15 @@ public void bi3_test() { + " GraphPhysicalExpand(tableConfig=[{isAll=false," + " tables=[HASMODERATOR]}], alias=[forum], startAlias=[person], opt=[IN]," + " physicalOpt=[VERTEX])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(ISLOCATEDIN," - + " PERSON, CITY)]], alias=[person], startAlias=[PATTERN_VERTEX$1], opt=[IN]," + + " GraphPhysicalGetV(tableConfig=[{isAll=false," + + " tables=[PERSON]}], alias=[person], opt=[START], physicalOpt=[ITSELF])\n" + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(ISLOCATEDIN," + + " PERSON, CITY)]], alias=[_], startAlias=[PATTERN_VERTEX$1], opt=[IN]," + " physicalOpt=[VERTEX])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(ISPARTOF," + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(ISPARTOF," + " CITY, COUNTRY)]], alias=[PATTERN_VERTEX$1], startAlias=[country], opt=[IN]," + " physicalOpt=[VERTEX])\n" - + " GraphLogicalSource(tableConfig=[{isAll=false," + + " GraphLogicalSource(tableConfig=[{isAll=false," + " tables=[COUNTRY]}], alias=[country], fusedFilter=[[=(_.name," + " _UTF-8'China')]], opt=[VERTEX])", com.alibaba.graphscope.common.ir.tools.Utils.toString(after).trim()); @@ -258,10 +260,12 @@ public void bi5_test() { + " GraphPhysicalExpand(tableConfig=[{isAll=false," + " tables=[HASCREATOR]}], alias=[person], startAlias=[message], opt=[OUT]," + " physicalOpt=[VERTEX])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASTAG, COMMENT," - + " TAG), EdgeLabel(HASTAG, POST, TAG)]], alias=[message], startAlias=[tag]," + + " GraphPhysicalGetV(tableConfig=[{isAll=false, tables=[POST," + + " COMMENT]}], alias=[message], opt=[START], physicalOpt=[ITSELF])\n" + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASTAG, COMMENT," + + " TAG), EdgeLabel(HASTAG, POST, TAG)]], alias=[_], startAlias=[tag]," + " opt=[IN], physicalOpt=[VERTEX])\n" - + " GraphLogicalSource(tableConfig=[{isAll=false," + + " GraphLogicalSource(tableConfig=[{isAll=false," + " tables=[TAG]}], alias=[tag], fusedFilter=[[=(_.name, ?0)]], opt=[VERTEX])", com.alibaba.graphscope.common.ir.tools.Utils.toString(after).trim()); } @@ -306,10 +310,12 @@ public void bi6_test() { + " GraphPhysicalExpand(tableConfig=[{isAll=false," + " tables=[HASCREATOR]}], alias=[person1], startAlias=[message1], opt=[OUT]," + " physicalOpt=[VERTEX])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASTAG, COMMENT," - + " TAG), EdgeLabel(HASTAG, POST, TAG)]], alias=[message1], startAlias=[tag]," + + " GraphPhysicalGetV(tableConfig=[{isAll=false, tables=[POST," + + " COMMENT]}], alias=[message1], opt=[START], physicalOpt=[ITSELF])\n" + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASTAG, COMMENT," + + " TAG), EdgeLabel(HASTAG, POST, TAG)]], alias=[_], startAlias=[tag]," + " opt=[IN], physicalOpt=[VERTEX])\n" - + " GraphLogicalSource(tableConfig=[{isAll=false," + + " GraphLogicalSource(tableConfig=[{isAll=false," + " tables=[TAG]}], alias=[tag], fusedFilter=[[=(_.name, ?0)]], opt=[VERTEX])", com.alibaba.graphscope.common.ir.tools.Utils.toString(after).trim()); } @@ -348,14 +354,18 @@ public void bi7_test() { + " physicalOpt=[VERTEX])\n" + " GraphPhysicalExpand(tableConfig=[{isAll=false, tables=[REPLYOF]}]," + " alias=[comment], startAlias=[message], opt=[IN], physicalOpt=[VERTEX])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASTAG, COMMENT, TAG)," - + " EdgeLabel(HASTAG, POST, TAG)]], alias=[message], startAlias=[tag]," + + " GraphPhysicalGetV(tableConfig=[{isAll=false, tables=[POST," + + " COMMENT]}], alias=[message], opt=[START], physicalOpt=[ITSELF])\n" + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASTAG, COMMENT," + + " TAG), EdgeLabel(HASTAG, POST, TAG)]], alias=[_], startAlias=[tag]," + " opt=[IN], physicalOpt=[VERTEX])\n" - + " GraphLogicalSource(tableConfig=[{isAll=false, tables=[TAG]}]," + + " GraphLogicalSource(tableConfig=[{isAll=false, tables=[TAG]}]," + " alias=[tag], fusedFilter=[[=(_.name, ?0)]], opt=[VERTEX])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASTAG, COMMENT, TAG)]]," - + " alias=[comment], startAlias=[tag], opt=[IN], physicalOpt=[VERTEX])\n" - + " GraphLogicalSource(tableConfig=[{isAll=false, tables=[TAG]}]," + + " GraphPhysicalGetV(tableConfig=[{isAll=false, tables=[COMMENT]}]," + + " alias=[comment], opt=[START], physicalOpt=[ITSELF])\n" + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASTAG, COMMENT, TAG)]]," + + " alias=[_], startAlias=[tag], opt=[IN], physicalOpt=[VERTEX])\n" + + " GraphLogicalSource(tableConfig=[{isAll=false, tables=[TAG]}]," + " alias=[tag], fusedFilter=[[=(_.name, ?0)]], opt=[VERTEX])", after.explain().trim()); } @@ -529,12 +539,12 @@ public void bi11_test() { + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(ISLOCATEDIN, PERSON," + " CITY)]], alias=[PATTERN_VERTEX$1], startAlias=[a], opt=[OUT]," + " physicalOpt=[VERTEX])\n" - + " CommonTableScan(table=[[common#-1096947686]])\n" + + " CommonTableScan(table=[[common#1257237722]])\n" + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(ISPARTOF, CITY," + " COUNTRY)]], alias=[PATTERN_VERTEX$1], startAlias=[s], opt=[IN]," + " physicalOpt=[VERTEX])\n" - + " CommonTableScan(table=[[common#-1096947686]])\n" - + "common#-1096947686:\n" + + " CommonTableScan(table=[[common#1257237722]])\n" + + "common#1257237722:\n" + "MultiJoin(joinFilter=[=(a, a)], isFullOuterJoin=[false], joinTypes=[[INNER," + " INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])\n" + " GraphLogicalGetV(tableConfig=[{isAll=false, tables=[PERSON]}], alias=[a]," @@ -542,37 +552,39 @@ public void bi11_test() { + " GraphLogicalExpand(tableConfig=[{isAll=false, tables=[KNOWS]}]," + " alias=[k1], startAlias=[b], fusedFilter=[[AND(>=(_.creationDate, ?0)," + " <=(_.creationDate, ?1))]], opt=[BOTH])\n" - + " CommonTableScan(table=[[common#1348927974]])\n" + + " CommonTableScan(table=[[common#-393103770]])\n" + " GraphLogicalGetV(tableConfig=[{isAll=false, tables=[PERSON]}], alias=[a]," + " opt=[OTHER])\n" + " GraphLogicalExpand(tableConfig=[{isAll=false, tables=[KNOWS]}]," + " alias=[k2], startAlias=[c], fusedFilter=[[AND(>=(_.creationDate, ?0)," + " <=(_.creationDate, ?1))]], opt=[BOTH])\n" - + " CommonTableScan(table=[[common#1348927974]])\n" - + "common#1348927974:\n" + + " CommonTableScan(table=[[common#-393103770]])\n" + + "common#-393103770:\n" + "MultiJoin(joinFilter=[=(PATTERN_VERTEX$5, PATTERN_VERTEX$5)]," + " isFullOuterJoin=[false], joinTypes=[[INNER, INNER]]," + " outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])\n" + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(ISLOCATEDIN, PERSON, CITY)]]," + " alias=[PATTERN_VERTEX$5], startAlias=[b], opt=[OUT]," + " physicalOpt=[VERTEX])\n" - + " CommonTableScan(table=[[common#613791020]])\n" + + " CommonTableScan(table=[[common#1460176135]])\n" + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(ISPARTOF, CITY, COUNTRY)]]," + " alias=[PATTERN_VERTEX$5], startAlias=[s], opt=[IN], physicalOpt=[VERTEX])\n" - + " CommonTableScan(table=[[common#613791020]])\n" - + "common#613791020:\n" + + " CommonTableScan(table=[[common#1460176135]])\n" + + "common#1460176135:\n" + "GraphLogicalGetV(tableConfig=[{isAll=false, tables=[PERSON]}], alias=[b]," + " opt=[OTHER])\n" + " GraphLogicalExpand(tableConfig=[{isAll=false, tables=[KNOWS]}]," + " alias=[k3], startAlias=[c], fusedFilter=[[AND(>=(_.creationDate, ?0)," + " <=(_.creationDate, ?1))]], opt=[BOTH])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(ISLOCATEDIN, PERSON," - + " CITY)]], alias=[c], startAlias=[PATTERN_VERTEX$9], opt=[IN]," + + " GraphPhysicalGetV(tableConfig=[{isAll=false, tables=[PERSON]}]," + + " alias=[c], opt=[START], physicalOpt=[ITSELF])\n" + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(ISLOCATEDIN, PERSON," + + " CITY)]], alias=[_], startAlias=[PATTERN_VERTEX$9], opt=[IN]," + " physicalOpt=[VERTEX])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(ISPARTOF, CITY," + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(ISPARTOF, CITY," + " COUNTRY)]], alias=[PATTERN_VERTEX$9], startAlias=[s], opt=[IN]," + " physicalOpt=[VERTEX])\n" - + " GraphLogicalSource(tableConfig=[{isAll=false, tables=[COUNTRY]}]," + + " GraphLogicalSource(tableConfig=[{isAll=false, tables=[COUNTRY]}]," + " alias=[s], fusedFilter=[[=(_.name, _UTF-8'India')]], opt=[VERTEX])", com.alibaba.graphscope.common.ir.tools.Utils.toString(after).trim()); } diff --git a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/cbo/LdbcTest.java b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/cbo/LdbcTest.java index d3963ee6314e..9a0e9db8c622 100644 --- a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/cbo/LdbcTest.java +++ b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/cbo/LdbcTest.java @@ -334,13 +334,15 @@ public void ldbc4_test() { + " post]}], values=[[]])\n" + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASTAG, POST," + " TAG)]], alias=[tag], startAlias=[post], opt=[OUT], physicalOpt=[VERTEX])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASCREATOR, POST," - + " PERSON)]], alias=[post], startAlias=[friend], opt=[IN]," + + " GraphPhysicalGetV(tableConfig=[{isAll=false, tables=[POST]}]," + + " alias=[post], opt=[START], physicalOpt=[ITSELF])\n" + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASCREATOR," + + " POST, PERSON)]], alias=[_], startAlias=[friend], opt=[IN]," + " physicalOpt=[VERTEX])\n" - + " GraphPhysicalExpand(tableConfig=[{isAll=false," + + " GraphPhysicalExpand(tableConfig=[{isAll=false," + " tables=[KNOWS]}], alias=[friend], startAlias=[person], opt=[BOTH]," + " physicalOpt=[VERTEX])\n" - + " GraphLogicalSource(tableConfig=[{isAll=false," + + " GraphLogicalSource(tableConfig=[{isAll=false," + " tables=[PERSON]}], alias=[person], opt=[VERTEX], uniqueKeyFilters=[=(_.id," + " ?0)])", after.explain().trim()); @@ -388,10 +390,12 @@ public void ldbc5_test() { + " tables=[CONTAINEROF]}], alias=[post], startAlias=[forum], opt=[OUT]," + " physicalOpt=[VERTEX], optional=[true])\n" + " CommonTableScan(table=[[common#391831169]])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASCREATOR, POST," - + " PERSON)]], alias=[post], startAlias=[friend], opt=[IN]," - + " physicalOpt=[VERTEX], optional=[true])\n" - + " CommonTableScan(table=[[common#391831169]])\n" + + " GraphPhysicalGetV(tableConfig=[{isAll=false, tables=[POST]}]," + + " alias=[post], opt=[START], physicalOpt=[ITSELF])\n" + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASCREATOR, POST," + + " PERSON)]], alias=[_], startAlias=[friend], opt=[IN], physicalOpt=[VERTEX]," + + " optional=[true])\n" + + " CommonTableScan(table=[[common#391831169]])\n" + "common#391831169:\n" + "GraphLogicalGetV(tableConfig=[{isAll=false, tables=[FORUM]}], alias=[forum]," + " opt=[START])\n" @@ -440,17 +444,18 @@ public void ldbc6_test() { + " opt=[END], physicalOpt=[ITSELF])\n" + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASTAG, POST, TAG)]]," + " alias=[_], startAlias=[post], opt=[OUT], physicalOpt=[VERTEX])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASCREATOR, POST," - + " PERSON)]], alias=[post], startAlias=[other], opt=[IN]," - + " physicalOpt=[VERTEX])\n" - + " GraphLogicalGetV(tableConfig=[{isAll=false, tables=[PERSON]}]," - + " alias=[other], opt=[END])\n" - + " " + + " GraphPhysicalGetV(tableConfig=[{isAll=false, tables=[POST]}]," + + " alias=[post], opt=[START], physicalOpt=[ITSELF])\n" + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASCREATOR, POST," + + " PERSON)]], alias=[_], startAlias=[other], opt=[IN], physicalOpt=[VERTEX])\n" + + " GraphLogicalGetV(tableConfig=[{isAll=false," + + " tables=[PERSON]}], alias=[other], opt=[END])\n" + + " " + " GraphLogicalPathExpand(fused=[GraphPhysicalExpand(tableConfig=[{isAll=false," + " tables=[KNOWS]}], alias=[_], opt=[BOTH], physicalOpt=[VERTEX])\n" + "], offset=[1], fetch=[2], path_opt=[ARBITRARY], result_opt=[END_V]," + " alias=[_], start_alias=[person])\n" - + " GraphLogicalSource(tableConfig=[{isAll=false," + + " GraphLogicalSource(tableConfig=[{isAll=false," + " tables=[PERSON]}], alias=[person], opt=[VERTEX], uniqueKeyFilters=[=(_.id," + " 2199023382370)])", after.explain().trim()); @@ -687,37 +692,43 @@ public void ldbc10_test() { + " MultiJoin(joinFilter=[=(post1, post1)]," + " isFullOuterJoin=[false], joinTypes=[[INNER, INNER]]," + " outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASCREATOR," - + " POST, PERSON)]], alias=[post1], startAlias=[friend], opt=[IN]," + + " GraphPhysicalGetV(tableConfig=[{isAll=false," + + " tables=[POST]}], alias=[post1], opt=[START], physicalOpt=[ITSELF])\n" + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASCREATOR," + + " POST, PERSON)]], alias=[_], startAlias=[friend], opt=[IN]," + " physicalOpt=[VERTEX], optional=[true])\n" - + " CommonTableScan(table=[[common#-1774131414]])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASTAG, POST," - + " TAG)]], alias=[post1], startAlias=[tag], opt=[IN], physicalOpt=[VERTEX]," + + " CommonTableScan(table=[[common#-1626533514]])\n" + + " GraphPhysicalGetV(tableConfig=[{isAll=false," + + " tables=[POST]}], alias=[post1], opt=[START], physicalOpt=[ITSELF])\n" + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASTAG, POST," + + " TAG)]], alias=[_], startAlias=[tag], opt=[IN], physicalOpt=[VERTEX]," + " optional=[true])\n" - + " CommonTableScan(table=[[common#-1774131414]])\n" + + " CommonTableScan(table=[[common#-1626533514]])\n" + " GraphPhysicalExpand(tableConfig=[{isAll=false, tables=[KNOWS]}]," + " alias=[friend], startAlias=[person], opt=[BOTH], physicalOpt=[VERTEX])\n" + " GraphLogicalSource(tableConfig=[{isAll=false," + " tables=[PERSON]}], alias=[person], opt=[VERTEX], uniqueKeyFilters=[=(_.id," + " ?0)])\n" - + "common#-1774131414:\n" - + "GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASCREATOR, POST, PERSON)]]," - + " alias=[post], startAlias=[friend], opt=[IN], physicalOpt=[VERTEX]," + + "common#-1626533514:\n" + + "GraphPhysicalGetV(tableConfig=[{isAll=false, tables=[POST]}], alias=[post]," + + " opt=[START], physicalOpt=[ITSELF])\n" + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASCREATOR, POST, PERSON)]]," + + " alias=[_], startAlias=[friend], opt=[IN], physicalOpt=[VERTEX]," + " optional=[true])\n" - + " GraphPhysicalExpand(tableConfig=[{isAll=false, tables=[HASINTEREST]}]," + + " GraphPhysicalExpand(tableConfig=[{isAll=false, tables=[HASINTEREST]}]," + " alias=[tag], startAlias=[person], opt=[OUT], physicalOpt=[VERTEX]," + " optional=[true])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(ISLOCATEDIN, PERSON," + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(ISLOCATEDIN, PERSON," + " PLACE)]], alias=[city], startAlias=[friend], opt=[OUT]," + " physicalOpt=[VERTEX])\n" - + " GraphLogicalGetV(tableConfig=[{isAll=false, tables=[PERSON]}]," + + " GraphLogicalGetV(tableConfig=[{isAll=false, tables=[PERSON]}]," + " alias=[friend], opt=[END])\n" - + " " + + " " + " GraphLogicalPathExpand(fused=[GraphPhysicalExpand(tableConfig=[{isAll=false," + " tables=[KNOWS]}], alias=[_], opt=[BOTH], physicalOpt=[VERTEX])\n" + "], offset=[2], fetch=[1], path_opt=[ARBITRARY], result_opt=[END_V]," + " alias=[_], start_alias=[person])\n" - + " GraphLogicalSource(tableConfig=[{isAll=false, tables=[PERSON]}]," + + " GraphLogicalSource(tableConfig=[{isAll=false, tables=[PERSON]}]," + " alias=[person], opt=[VERTEX], uniqueKeyFilters=[=(_.id, ?0)])", com.alibaba.graphscope.common.ir.tools.Utils.toString(after).trim()); } @@ -826,16 +837,20 @@ public void ldbc12_test() { + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASTAG, POST, TAG)]]," + " alias=[tag], startAlias=[PATTERN_VERTEX$5], opt=[OUT]," + " physicalOpt=[VERTEX])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(REPLYOF, COMMENT," - + " POST)]], alias=[PATTERN_VERTEX$5], startAlias=[comment], opt=[OUT]," + + " GraphPhysicalGetV(tableConfig=[{isAll=false, tables=[POST]}]," + + " alias=[PATTERN_VERTEX$5], opt=[END], physicalOpt=[ITSELF])\n" + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(REPLYOF, COMMENT," + + " POST)]], alias=[_], startAlias=[comment], opt=[OUT]," + " physicalOpt=[VERTEX])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASCREATOR," - + " COMMENT, PERSON)]], alias=[comment], startAlias=[friend], opt=[IN]," + + " GraphPhysicalGetV(tableConfig=[{isAll=false," + + " tables=[COMMENT]}], alias=[comment], opt=[START], physicalOpt=[ITSELF])\n" + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASCREATOR," + + " COMMENT, PERSON)]], alias=[_], startAlias=[friend], opt=[IN]," + " physicalOpt=[VERTEX])\n" - + " GraphPhysicalExpand(tableConfig=[{isAll=false," + + " GraphPhysicalExpand(tableConfig=[{isAll=false," + " tables=[KNOWS]}], alias=[friend], startAlias=[PATTERN_VERTEX$0]," + " opt=[BOTH], physicalOpt=[VERTEX])\n" - + " GraphLogicalSource(tableConfig=[{isAll=false," + + " GraphLogicalSource(tableConfig=[{isAll=false," + " tables=[PERSON]}], alias=[PATTERN_VERTEX$0], opt=[VERTEX]," + " uniqueKeyFilters=[=(_.id, 2199023382370)])", after.explain().trim()); diff --git a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/rbo/ExpandGetVFusionTest.java b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/rbo/ExpandGetVFusionTest.java index dbfce24c370d..cd71d1a92696 100644 --- a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/rbo/ExpandGetVFusionTest.java +++ b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/rbo/ExpandGetVFusionTest.java @@ -332,6 +332,83 @@ public void expand_getv_fusion_6_test() { after.explain().trim()); } + // g.V().hasLabel("person").outE("likes").inV().hasLabel("comment"), can not be fused into a + // single GraphPhysicalExpand since we actually have person-likes-comment and person-likes-post. + @Test + public void expand_getv_fusion_7_test() { + GraphBuilder builder = Utils.mockGraphBuilder("schema/ldbc.json"); + RelNode before = + builder.source( + new SourceConfig( + GraphOpt.Source.VERTEX, + new LabelConfig(false).addLabel("PERSON"))) + .expand( + new ExpandConfig( + GraphOpt.Expand.OUT, + new LabelConfig(false).addLabel("LIKES"))) + .getV( + new GetVConfig( + GraphOpt.GetV.END, + new LabelConfig(false).addLabel("COMMENT"))) + .build(); + Assert.assertEquals( + "GraphLogicalGetV(tableConfig=[{isAll=false, tables=[COMMENT]}]," + + " alias=[_], opt=[END])\n" + + " GraphLogicalExpand(tableConfig=[{isAll=false, tables=[LIKES]}]," + + " alias=[_], opt=[OUT])\n" + + " GraphLogicalSource(tableConfig=[{isAll=false, tables=[PERSON]}]," + + " alias=[_], opt=[VERTEX])", + before.explain().trim()); + RelOptPlanner planner = + Utils.mockPlanner(ExpandGetVFusionRule.BasicExpandGetVFusionRule.Config.DEFAULT); + planner.setRoot(before); + RelNode after = planner.findBestExp(); + Assert.assertEquals( + "GraphPhysicalGetV(tableConfig=[{isAll=false, tables=[COMMENT]}], alias=[_]," + + " opt=[END], physicalOpt=[ITSELF])\n" + + " GraphPhysicalExpand(tableConfig=[{isAll=false, tables=[LIKES]}]," + + " alias=[_], opt=[OUT], physicalOpt=[VERTEX])\n" + + " GraphLogicalSource(tableConfig=[{isAll=false, tables=[PERSON]}]," + + " alias=[_], opt=[VERTEX])", + after.explain().trim()); + } + + // g.V().hasLabel("person").outE("likes").inV(), can be fused into a single GraphPhysicalExpand + // even without type infer for GetV + @Test + public void expand_getv_fusion_8_test() { + GraphBuilder builder = Utils.mockGraphBuilder("schema/ldbc.json"); + RelNode before = + builder.source( + new SourceConfig( + GraphOpt.Source.VERTEX, + new LabelConfig(false).addLabel("PERSON"))) + .expand( + new ExpandConfig( + GraphOpt.Expand.OUT, + new LabelConfig(false).addLabel("LIKES"))) + .getV(new GetVConfig(GraphOpt.GetV.END, new LabelConfig(true))) + .build(); + Assert.assertEquals( + "GraphLogicalGetV(tableConfig=[{isAll=true, tables=[PERSON, POST, TAG," + + " ORGANISATION, PLACE, TAGCLASS, COMMENT, FORUM]}], alias=[_], opt=[END])\n" + + " GraphLogicalExpand(tableConfig=[{isAll=false, tables=[LIKES]}], alias=[_]," + + " opt=[OUT])\n" + + " GraphLogicalSource(tableConfig=[{isAll=false, tables=[PERSON]}]," + + " alias=[_], opt=[VERTEX])", + before.explain().trim()); + RelOptPlanner planner = + Utils.mockPlanner(ExpandGetVFusionRule.BasicExpandGetVFusionRule.Config.DEFAULT); + planner.setRoot(before); + RelNode after = planner.findBestExp(); + Assert.assertEquals( + "GraphPhysicalExpand(tableConfig=[{isAll=false, tables=[LIKES]}], alias=[_]," + + " opt=[OUT], physicalOpt=[VERTEX])\n" + + " GraphLogicalSource(tableConfig=[{isAll=false, tables=[PERSON]}]," + + " alias=[_], opt=[VERTEX])", + after.explain().trim()); + } + // path expand: g.V().hasLabel("person").out('1..3', "knows").with('PATH_OPT', // SIMPLE).with('RESULT_OPT', ALL_V) @Test diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs b/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs index c8eee6a06f12..359f818b0bfd 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs @@ -156,7 +156,15 @@ impl FilterMapFunction for AuxiliaOperator { // pruning by labels return Ok(None); } else if !self.query_params.has_predicates() && !self.query_params.has_columns() { - // if only filter by labels, directly return the results. + // if only filter by labels, return the results. + // if it has alias, append without moving head + if let Some(alias) = self.alias { + // append without moving head + let entry_clone = entry.clone(); + input + .get_columns_mut() + .insert(alias as usize, entry_clone); + } return Ok(Some(input)); } } From bf4bd712041a04ce3adba56939a5bccaad13e137 Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Mon, 20 May 2024 20:52:02 +0800 Subject: [PATCH 3/4] chore: Add estimate memory usage command for gsctl (#3816) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Example usage: ```bash ❯ PYTHONPATH=`pwd` python3 graphscope/gsctl/gsctl.py estimate -v 222111111 -e 1610000000 -vf 2 -ef 28 -g gae -p 4 Disclaimer: The `estimate` command serves as a estimator for various kind of GraphScope resources. - GAE memory usage estimator: It would do a rough estimation of the total memory usage of the GAE. The actual memory usage may vary vastly due to the complexity of the graph algorithm and the data distribution. Here's some assumption when estimating the memory usage: 1. Assuming the graph is a simple graph, e.g. only has 1 label and at most 1 property for each label. 2. Assuming the graph algorithms has a fixed amount of memory consumption, e.g. PageRank, SSSP, CC; It should not generate a huge amount of intermediate result, like K-Hop, Louvain. 3. Assuming the vertex map is global vertex map User should take this estimation as a reference and a start point for tuning, and adjust the memory allocation according to the actual situation. [SUCCESS] The estimated memory usage is 30.61 GB per pod for 4 pods. ``` --- python/graphscope/gsctl/commands/common.py | 106 ++++++++++++++++++++- 1 file changed, 105 insertions(+), 1 deletion(-) diff --git a/python/graphscope/gsctl/commands/common.py b/python/graphscope/gsctl/commands/common.py index 587871274ea4..7d94a2aad577 100644 --- a/python/graphscope/gsctl/commands/common.py +++ b/python/graphscope/gsctl/commands/common.py @@ -52,7 +52,7 @@ def connect(coordinator_endpoint): context = get_current_context() if context is None: err( - "No available context found, try to connect by `gsctl conenct --coordinator-endpoint `." + "No available context found, try to connect by `gsctl connect --coordinator-endpoint `." ) return coordinator_endpoint = context.coordinator_endpoint @@ -82,5 +82,109 @@ def close(): info("Try 'gsctl --help' for help.") +disclaimer = """ +Disclaimer: The `estimate` command serves as an estimator for various kinds of GraphScope resources. + - GAE memory usage estimator: + It would do a rough estimation of the total memory usage of the GAE. + The actual memory usage may vary vastly due to the complexity of the graph algorithms and the data distribution. + + Here's some assumption when estimating the memory usage: + 1. Assuming the graph is a simple graph, e.g. only has 1 label and at most 1 property for each label. + 2. Assuming the graph algorithm has a fixed amount of memory consumption, e.g. PageRank, SSSP, CC; + It should not generate a huge amount of intermediate result, like K-Hop, Louvain. + 3. Assuming the vertex map type is global vertex map + + Users should take this estimation as a reference and a starting point for tuning, + and adjust the memory allocation according to the actual situation. +""" + + +def estimate_gae_memory_usage_for_graph( + v_num, e_num, v_file_size, e_file_size, partition +): + """ + The estimation is based on the following formulas: + let #V = number of vertices, #E = number of edges, + assuming load_factor of hashmap is 0.41, sizeof(inner_id) = sizeof(edge_id) = sizeof(vertex_id) = 8 bytes, + where vertex_id is the primary key of the vertex, default to int64, + it could be string though, in that case, the memory usage will grow. + Graph: + 1. vertex + - vertex map: #V * (sizeof(vertex_id) + sizeof(inner_id)) * (1 / load_factor) + - vertex table: size of vertex files in uncompressed CSV format, unit is GB + 2. edge + - CSR + CSC: 2 * #E * (sizeof(inner_id) + sizeof(edge_id)) + - offset array: #V * sizeof(size_t) + - edge table: size of vertex files in uncompressed CSV format, unit is GB + 3. additional data structure when graph is partitioned, assuming 10% of vertices will be outer vertices + - outer vertex ID to local ID map: 0.1 * #V * (sizeof(vertex_id) + sizeof(inner_id)) * (1 / load_factor) + - local ID to outer vertex ID array: 0.1 * #V * sizeof(size_t) + + This is the minimum usage of the GAE with 1 partition, the actual memory usage would be larger than this estimation. + """ + gb = 1024 * 1024 * 1024 + # vertex map + vertex_map = v_num * (8 + 8) * (1 / 0.41) / gb + # vertex table + vertex_table = v_file_size + # edge + edge = 2 * e_num * (8 + 8) / gb + # offset array + offset_array = v_num * 8 / gb + # edge table + edge_table = e_file_size + additional = 0.1 * v_num * (8 + 8) * (1 / 0.41) / gb + 0.1 * v_num * 8 / gb + if partition == 1: + additional = 0 + per_partition = ( + vertex_map + + (vertex_table + edge_table + edge + offset_array) / partition + + additional + ) + return per_partition + + +def estimate_gae_memory_usage_for_algorithm(v_num): + """ + Simple algorithms like PageRank or CC will have 1 slot for result for each vertex. + Formula: #V * sizeof(double) + """ + gb = 1024 * 1024 * 1024 + usage = v_num * 8 / gb + return usage + + +@cli.command() +@click.option( + "-g", + "--engine", + type=click.Choice(["gae"], case_sensitive=False), + help="Engine type", + required=True, +) +@click.option("-v", "--v-num", type=int, help="Number of vertices") +@click.option("-e", "--e-num", type=int, help="Number of edges") +@click.option("-vf", "--v-file-size", type=float, help="Size of vertex files in GB") +@click.option("-ef", "--e-file-size", type=float, help="Size of edge files in GB") +@click.option("-p", "--partition", type=int, help="Number of partitions", default=1) +def estimate(engine, v_num, e_num, v_file_size, e_file_size, partition): + """Estimate the resources requirement for various kinds of GraphScope components""" + if engine == "gae": + if v_num is None or e_num is None or v_file_size is None or e_file_size is None: + err("Please provide the required parameters.") + return + partition_usage = estimate_gae_memory_usage_for_graph( + v_num, e_num, v_file_size, e_file_size, partition + ) + algorithm_usage = estimate_gae_memory_usage_for_algorithm(v_num) + memory_usage = partition_usage + algorithm_usage + info(disclaimer) + succ( + f"The estimated memory usage is {memory_usage:.2f} GB per pod for {partition} pods.\n" + ) + else: + err(f"Estimating usage of engine {engine} is not supported yet.") + + if __name__ == "__main__": cli() From 0893b5f3618d7cfa1f59302052d390a6b7767389 Mon Sep 17 00:00:00 2001 From: Zhang Lei Date: Tue, 21 May 2024 11:13:38 +0800 Subject: [PATCH 4/4] fix(interactive): Refine `generate_sdk.sh` for interactive (#3807) Avoid downloading `openapi-generator-cli` repeatedly. Fix `generate_sdk.sh` on macOS. --- .../http_server/handler/hqps_http_handler.cc | 24 +++++++-- flex/interactive/sdk/generate_sdk.sh | 53 ++++++++++++++++--- 2 files changed, 64 insertions(+), 13 deletions(-) diff --git a/flex/engines/http_server/handler/hqps_http_handler.cc b/flex/engines/http_server/handler/hqps_http_handler.cc index f8a46603873c..679dfec0e895 100644 --- a/flex/engines/http_server/handler/hqps_http_handler.cc +++ b/flex/engines/http_server/handler/hqps_http_handler.cc @@ -123,7 +123,12 @@ seastar::future> hqps_ic_handler::handle( return executor_refs_[dst_executor] .run_graph_db_query(query_param{std::move(req->content)}) - .then([this, outer_span = outer_span](auto&& output) { + .then([this +#ifdef HAVE_OPENTELEMETRY_CPP + , + outer_span = outer_span +#endif // HAVE_OPENTELEMETRY_CPP + ](auto&& output) { if (output.content.size() < 4) { LOG(ERROR) << "Invalid output size: " << output.content.size(); #ifdef HAVE_OPENTELEMETRY_CPP @@ -132,7 +137,7 @@ seastar::future> hqps_ic_handler::handle( outer_span->End(); std::map labels = {{"status", "fail"}}; total_counter_->Add(1, labels); -#endif +#endif // HAVE_OPENTELEMETRY_CPP return seastar::make_ready_future(std::move(output)); } return seastar::make_ready_future( @@ -340,8 +345,12 @@ hqps_adhoc_query_handler::handle(const seastar::sstring& path, param.content.append(gs::Schema::HQPS_ADHOC_PLUGIN_ID_STR, 1); return executor_refs_[dst_executor] .run_graph_db_query(query_param{std::move(param.content)}) - .then([query_span = query_span, - query_scope = std::move(query_scope)](auto&& output) { + .then([ +#ifdef HAVE_OPENTELEMETRY_CPP + query_span = query_span, + query_scope = std::move(query_scope) +#endif // HAVE_OPENTELEMETRY_CPP + ](auto&& output) { #ifdef HAVE_OPENTELEMETRY_CPP query_span->End(); #endif // HAVE_OPENTELEMETRY_CPP @@ -349,7 +358,12 @@ hqps_adhoc_query_handler::handle(const seastar::sstring& path, std::move(output.content)); }); }) - .then([this, outer_span = outer_span](auto&& output) { + .then([this +#ifdef HAVE_OPENTELEMETRY_CPP + , + outer_span = outer_span +#endif // HAVE_OPENTELEMETRY_CPP + ](auto&& output) { if (output.content.size() < 4) { LOG(ERROR) << "Invalid output size: " << output.content.size(); #ifdef HAVE_OPENTELEMETRY_CPP diff --git a/flex/interactive/sdk/generate_sdk.sh b/flex/interactive/sdk/generate_sdk.sh index 48b96885f2fe..751f4c0bc1ed 100755 --- a/flex/interactive/sdk/generate_sdk.sh +++ b/flex/interactive/sdk/generate_sdk.sh @@ -109,17 +109,54 @@ if [ $# -eq 0 ]; then fi function install_generator() { - # first check openapi-generator-cli is executable + CLI_INSTALLED=false + JQ_INSTALLED=false + MVN_INSTALLED=false + # first check openapi-generator-cli exists is executable + if [ -f ~/bin/openapitools/openapi-generator-cli ]; then + echo "openapi-generator-cli is already installed" + export PATH=$PATH:~/bin/openapitools/ + fi if command -v openapi-generator-cli &>/dev/null; then echo "openapi-generator-cli is already installed" - return + CLI_INSTALLED=true + fi + if ! $CLI_INSTALLED; then + echo "Installing openapi-generator-cli" + mkdir -p ~/bin/openapitools + curl https://raw.githubusercontent.com/OpenAPITools/openapi-generator/master/bin/utils/openapi-generator-cli.sh > ~/bin/openapitools/openapi-generator-cli + chmod u+x ~/bin/openapitools/openapi-generator-cli + export PATH=$PATH:~/bin/openapitools/ + export OPENAPI_GENERATOR_VERSION=7.2.0 + fi + # on ubuntu apt-get jq on mac brew install jq + + if command -v jq &>/dev/null; then + echo "jq is already installed" + JQ_INSTALLED=true + fi + if command -v mvn &>/dev/null; then + echo "maven is already installed" + MVN_INSTALLED=true + fi + if [[ "$(uname -s)" == "Linux" ]]; then + if ! $JQ_INSTALLED; then + sudo apt-get update && sudo apt-get -y install jq + fi + if ! $MVN_INSTALLED; then + sudo apt-get update && sudo apt-get -y install maven + fi + elif [[ "$(uname -s)" == "Darwin" ]]; then + if ! $JQ_INSTALLED; then + brew install jq + fi + if ! $MVN_INSTALLED; then + brew install maven + fi + else + echo "Unsupported OS" + exit 1 fi - mkdir -p ~/bin/openapitools - curl https://raw.githubusercontent.com/OpenAPITools/openapi-generator/master/bin/utils/openapi-generator-cli.sh > ~/bin/openapitools/openapi-generator-cli - chmod u+x ~/bin/openapitools/openapi-generator-cli - export PATH=$PATH:~/bin/openapitools/ - export OPENAPI_GENERATOR_VERSION=7.2.0 - sudo apt-get update && sudo apt-get -y install jq } install_generator