diff --git a/charts/graphscope-store/templates/configmap.yaml b/charts/graphscope-store/templates/configmap.yaml index d08e431050f2..40eaa708e844 100644 --- a/charts/graphscope-store/templates/configmap.yaml +++ b/charts/graphscope-store/templates/configmap.yaml @@ -20,11 +20,13 @@ data: discovery.mode={{ .Values.discoveryMode }} role.name=ROLE node.idx=INDEX + release.full.name={{ include "graphscope-store.fullname" . }} {{- else }} rpc.port=0 discovery.mode=zookeeper role.name="" node.idx=0 + release.full.name=localhost {{- end }} store.node.count={{ .Values.store.replicaCount }} @@ -58,7 +60,6 @@ data: neo4j.bolt.server.disabled=true log4rs.config=LOG4RS_CONFIG - release.full.name={{ include "graphscope-store.fullname" . }} ## Auth config auth.username={{ .Values.auth.username }} auth.password={{ .Values.auth.password }} @@ -79,7 +80,6 @@ data: store.gc.interval.ms={{ .Values.storeGcIntervalMs }} write.ha.enabled={{ .Values.backup.enabled }} - tracing.enabled={{ .Values.otel.enabled }} ## Coordinator Config rpc.max.bytes.mb={{ .Values.rpcMaxBytesMb }} @@ -137,12 +137,13 @@ data: export OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE=DELTA export OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION=BASE2_EXPONENTIAL_BUCKET_HISTOGRAM - export OTEL_EXPORTER_OTLP_COMPRESSION=gzip + # export OTEL_EXPORTER_OTLP_COMPRESSION=gzip {{- end }} {{- if .Values.uptrace.enabled }} + export UPTRACE_DSN=http://{{ .Values.uptrace.token }}@${{ .Values.uptrace.service }}:14318?grpc=14317 export OTEL_EXPORTER_OTLP_ENDPOINT=http://{{ .Values.uptrace.service }}:14317 - export OTEL_EXPORTER_OTLP_HEADERS=uptrace-dsn=http://{{ .Values.uptrace.token }}@${{ .Values.uptrace.service }}:14318?grpc=14317 + export OTEL_EXPORTER_OTLP_HEADERS=uptrace-dsn=${UPTRACE_DSN} {{- end }} diff --git a/charts/graphscope-store/values.yaml b/charts/graphscope-store/values.yaml index c7a5fc3e9c14..4d92f1565dca 100644 --- a/charts/graphscope-store/values.yaml +++ b/charts/graphscope-store/values.yaml @@ -515,6 +515,7 @@ otel: registry: docker.io repository: jaegertracing/all-in-one tag: "latest" + # https://opentelemetry.io/docs/languages/sdk-configuration/general/#otel_traces_sampler traces: sampler: name: "traceidratio" diff --git a/flex/engines/http_server/handler/hqps_http_handler.cc b/flex/engines/http_server/handler/hqps_http_handler.cc index f8a46603873c..679dfec0e895 100644 --- a/flex/engines/http_server/handler/hqps_http_handler.cc +++ b/flex/engines/http_server/handler/hqps_http_handler.cc @@ -123,7 +123,12 @@ seastar::future> hqps_ic_handler::handle( return executor_refs_[dst_executor] .run_graph_db_query(query_param{std::move(req->content)}) - .then([this, outer_span = outer_span](auto&& output) { + .then([this +#ifdef HAVE_OPENTELEMETRY_CPP + , + outer_span = outer_span +#endif // HAVE_OPENTELEMETRY_CPP + ](auto&& output) { if (output.content.size() < 4) { LOG(ERROR) << "Invalid output size: " << output.content.size(); #ifdef HAVE_OPENTELEMETRY_CPP @@ -132,7 +137,7 @@ seastar::future> hqps_ic_handler::handle( outer_span->End(); std::map labels = {{"status", "fail"}}; total_counter_->Add(1, labels); -#endif +#endif // HAVE_OPENTELEMETRY_CPP return seastar::make_ready_future(std::move(output)); } return seastar::make_ready_future( @@ -340,8 +345,12 @@ hqps_adhoc_query_handler::handle(const seastar::sstring& path, param.content.append(gs::Schema::HQPS_ADHOC_PLUGIN_ID_STR, 1); return executor_refs_[dst_executor] .run_graph_db_query(query_param{std::move(param.content)}) - .then([query_span = query_span, - query_scope = std::move(query_scope)](auto&& output) { + .then([ +#ifdef HAVE_OPENTELEMETRY_CPP + query_span = query_span, + query_scope = std::move(query_scope) +#endif // HAVE_OPENTELEMETRY_CPP + ](auto&& output) { #ifdef HAVE_OPENTELEMETRY_CPP query_span->End(); #endif // HAVE_OPENTELEMETRY_CPP @@ -349,7 +358,12 @@ hqps_adhoc_query_handler::handle(const seastar::sstring& path, std::move(output.content)); }); }) - .then([this, outer_span = outer_span](auto&& output) { + .then([this +#ifdef HAVE_OPENTELEMETRY_CPP + , + outer_span = outer_span +#endif // HAVE_OPENTELEMETRY_CPP + ](auto&& output) { if (output.content.size() < 4) { LOG(ERROR) << "Invalid output size: " << output.content.size(); #ifdef HAVE_OPENTELEMETRY_CPP diff --git a/flex/interactive/sdk/generate_sdk.sh b/flex/interactive/sdk/generate_sdk.sh index 48b96885f2fe..751f4c0bc1ed 100755 --- a/flex/interactive/sdk/generate_sdk.sh +++ b/flex/interactive/sdk/generate_sdk.sh @@ -109,17 +109,54 @@ if [ $# -eq 0 ]; then fi function install_generator() { - # first check openapi-generator-cli is executable + CLI_INSTALLED=false + JQ_INSTALLED=false + MVN_INSTALLED=false + # first check openapi-generator-cli exists is executable + if [ -f ~/bin/openapitools/openapi-generator-cli ]; then + echo "openapi-generator-cli is already installed" + export PATH=$PATH:~/bin/openapitools/ + fi if command -v openapi-generator-cli &>/dev/null; then echo "openapi-generator-cli is already installed" - return + CLI_INSTALLED=true + fi + if ! $CLI_INSTALLED; then + echo "Installing openapi-generator-cli" + mkdir -p ~/bin/openapitools + curl https://raw.githubusercontent.com/OpenAPITools/openapi-generator/master/bin/utils/openapi-generator-cli.sh > ~/bin/openapitools/openapi-generator-cli + chmod u+x ~/bin/openapitools/openapi-generator-cli + export PATH=$PATH:~/bin/openapitools/ + export OPENAPI_GENERATOR_VERSION=7.2.0 + fi + # on ubuntu apt-get jq on mac brew install jq + + if command -v jq &>/dev/null; then + echo "jq is already installed" + JQ_INSTALLED=true + fi + if command -v mvn &>/dev/null; then + echo "maven is already installed" + MVN_INSTALLED=true + fi + if [[ "$(uname -s)" == "Linux" ]]; then + if ! $JQ_INSTALLED; then + sudo apt-get update && sudo apt-get -y install jq + fi + if ! $MVN_INSTALLED; then + sudo apt-get update && sudo apt-get -y install maven + fi + elif [[ "$(uname -s)" == "Darwin" ]]; then + if ! $JQ_INSTALLED; then + brew install jq + fi + if ! $MVN_INSTALLED; then + brew install maven + fi + else + echo "Unsupported OS" + exit 1 fi - mkdir -p ~/bin/openapitools - curl https://raw.githubusercontent.com/OpenAPITools/openapi-generator/master/bin/utils/openapi-generator-cli.sh > ~/bin/openapitools/openapi-generator-cli - chmod u+x ~/bin/openapitools/openapi-generator-cli - export PATH=$PATH:~/bin/openapitools/ - export OPENAPI_GENERATOR_VERSION=7.2.0 - sudo apt-get update && sudo apt-get -y install jq } install_generator diff --git a/interactive_engine/assembly/src/conf/groot/config.template b/interactive_engine/assembly/src/conf/groot/config.template index 84a1a4cd3f8f..6fb14b964f85 100644 --- a/interactive_engine/assembly/src/conf/groot/config.template +++ b/interactive_engine/assembly/src/conf/groot/config.template @@ -33,4 +33,5 @@ neo4j.bolt.server.disabled=true pegasus.worker.num=2 pegasus.hosts=localhost:8080 -kafka.test.cluster.enable=true \ No newline at end of file +kafka.test.cluster.enable=true +OTEL_SDK_DISABLED=true diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CommonConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CommonConfig.java index a5ab7010dc19..ce5ff432f524 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CommonConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CommonConfig.java @@ -58,9 +58,6 @@ public class CommonConfig { public static final Config PARTITION_COUNT = Config.intConfig("partition.count", 1); - public static final Config METRIC_UPDATE_INTERVAL_MS = - Config.longConfig("metric.update.interval.ms", 5000L); - public static final Config LOG4RS_CONFIG = Config.stringConfig("log4rs.config", ""); public static final Config DISCOVERY_MODE = @@ -74,8 +71,6 @@ public class CommonConfig { public static final Config SECONDARY_INSTANCE_ENABLED = Config.boolConfig("secondary.instance.enabled", false); - public static final Config TRACING_ENABLED = - Config.boolConfig("tracing.enabled", false); // Create an extra store pod for each original store pod for backup. // Only available in multi pod mode. diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java index b7e502ccb955..3ed9e97f58dc 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java @@ -26,7 +26,7 @@ public class CoordinatorConfig { Config.boolConfig("log.recycle.enable", false); public static final Config LOG_RECYCLE_INTERVAL_SECOND = - Config.longConfig("log.recycle.interval.second", 600L); + Config.longConfig("log.recycle.interval.second", 3600L); public static final Config FILE_META_STORE_PATH = Config.stringConfig("file.meta.store.path", "./meta"); diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/StoreConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/StoreConfig.java index c3107944ad27..35ba45a24570 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/StoreConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/StoreConfig.java @@ -29,17 +29,14 @@ public class StoreConfig { public static final Config STORE_QUEUE_WAIT_MS = Config.longConfig("store.queue.wait.ms", 3000L); - public static final Config STORE_COMMIT_INTERVAL_MS = - Config.longConfig("store.commit.interval.ms", 1000L); - public static final Config STORE_GC_ENABLE = Config.boolConfig("store.gc.enable", true); public static final Config STORE_GC_INTERVAL_MS = - Config.longConfig("store.gc.interval.ms", 5000L); + Config.longConfig("store.gc.interval.ms", 3600000L); public static final Config STORE_CATCHUP_INTERVAL_MS = - Config.longConfig("store.catchup.interval.ms", 30000L); + Config.longConfig("store.catchup.interval.ms", 5000L); // set by IS_SECONDARY_INSTANCE, used in graph.rs public static final Config STORE_STORAGE_ENGINE = diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/planner/rules/ExpandGetVFusionRule.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/planner/rules/ExpandGetVFusionRule.java index 8b885b086d0c..5ba60dee80ca 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/planner/rules/ExpandGetVFusionRule.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/planner/rules/ExpandGetVFusionRule.java @@ -16,6 +16,7 @@ package com.alibaba.graphscope.common.ir.planner.rules; +import com.alibaba.graphscope.common.ir.meta.schema.GraphOptTable; import com.alibaba.graphscope.common.ir.rel.graph.GraphLogicalExpand; import com.alibaba.graphscope.common.ir.rel.graph.GraphLogicalGetV; import com.alibaba.graphscope.common.ir.rel.graph.GraphLogicalPathExpand; @@ -23,10 +24,12 @@ import com.alibaba.graphscope.common.ir.rel.graph.GraphPhysicalGetV; import com.alibaba.graphscope.common.ir.tools.AliasInference; import com.alibaba.graphscope.common.ir.tools.config.GraphOpt; +import com.alibaba.graphscope.common.ir.type.GraphLabelType; import com.google.common.collect.ImmutableList; import org.apache.calcite.plan.GraphOptCluster; import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.plan.RelRule; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.rules.TransformationRule; @@ -34,6 +37,10 @@ import org.apache.commons.lang3.ObjectUtils; import org.checkerframework.checker.nullness.qual.Nullable; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + // This rule try to fuse GraphLogicalExpand and GraphLogicalGetV if GraphLogicalExpand has no alias // (that it won't be visited individually later): // 1. if GraphLogicalGetV has no filters, then: @@ -57,7 +64,7 @@ protected RelNode transform(GraphLogicalGetV getV, GraphLogicalExpand expand, Re && getV.getOpt().equals(GraphOpt.GetV.START) || expand.getOpt().equals(GraphOpt.Expand.BOTH) && getV.getOpt().equals(GraphOpt.GetV.OTHER)) { - if (ObjectUtils.isEmpty(getV.getFilters())) { + if (canFuse(getV, expand)) { GraphPhysicalExpand physicalExpand = GraphPhysicalExpand.create( expand.getCluster(), @@ -95,6 +102,107 @@ protected RelNode transform(GraphLogicalGetV getV, GraphLogicalExpand expand, Re } } + private boolean canFuse(GraphLogicalGetV getV, GraphLogicalExpand expand) { + // If GraphLogicalGetV has filters, then we cannot fuse them directly. Instead, we create a + // EdgeExpand(V) with an Auxilia for the filters. + // If GraphLogicalGetV has no filters, then: + // 1. if we want to expand "person-knows->person", and the type in getV is "person", + // we can fuse them into one EdgeExpand with type "knows" (where "knows" will be given in + // EdgeExpand's QueryParam in PhysicalPlan) + // 2. if we want to expand "person-create->post", while in schema, a "create" actually + // consists of "person-create->post" and "person-create->comment", + // we do not fuse them directly. Instead, we create a EdgeExpand(V) with type "create" and + // an Auxilia with type "post" as the filter. + // 3. if we want to expand "person-islocatedin->city", while in schema, a "islocatedin" + // actually + // consists of "person-islocatedin->city" and "post-islocatedin->country". + // Thought the edge type of "islocatedin" may generate "city" and "country", we can still + // fuse them into a single EdgeExpand(V) with type "islocatedin" directly if we can confirm + // that the expand starts from "person". + // 4. a special case is that, currently, for gremlin query like g.V().out("create"), we have + // not infer precise types for getV yet (getV may contain all vertex types). + // In this case, if getV's types contains all the types that expand will generate, we can + // fuse them. + + Set edgeExpandedVLabels = new HashSet<>(); + // the optTables in expand preserves the full schema information for the edges, + // that is, for edge type "create", it contains both "person-create->post" and + // "person-create->comment", "user-create->post" etc. + List optTables = expand.getTableConfig().getTables(); + // the edgeParamLabels in expand preserves the inferred schema information for the edges, + // that is, for edge type "create", it contains only "person-create->post" if user queries + // like g.V().hasLabel("person").out("create").hasLabel("post") + GraphLabelType edgeParamType = + com.alibaba.graphscope.common.ir.tools.Utils.getGraphLabels(expand.getRowType()); + List edgeParamLabels = edgeParamType.getLabelsEntry(); + GraphOpt.Expand direction = expand.getOpt(); + // First, we get all the source vertex types where the edge will be expanded from. + // e.g., expand from "person" + Set edgeExpandedSrcVLabels = new HashSet<>(); + for (GraphLabelType.Entry edgeLabel : edgeParamLabels) { + switch (direction) { + case OUT: + edgeExpandedSrcVLabels.add(edgeLabel.getSrcLabelId()); + break; + case IN: + edgeExpandedSrcVLabels.add(edgeLabel.getDstLabelId()); + break; + case BOTH: + edgeExpandedSrcVLabels.add(edgeLabel.getDstLabelId()); + edgeExpandedSrcVLabels.add(edgeLabel.getSrcLabelId()); + break; + } + } + // Then, we get all the destination vertex types where the edge will be expanded to. + // e.g., expand "likes" + for (RelOptTable optTable : optTables) { + if (optTable instanceof GraphOptTable) { + GraphOptTable graphOptTable = (GraphOptTable) optTable; + List edgeUserGivenParamLabels = + com.alibaba.graphscope.common.ir.tools.Utils.getGraphLabels( + graphOptTable.getRowType()) + .getLabelsEntry(); + for (GraphLabelType.Entry edgeLabel : edgeUserGivenParamLabels) { + switch (direction) { + case OUT: + if (edgeExpandedSrcVLabels.contains(edgeLabel.getSrcLabelId())) { + edgeExpandedVLabels.add(edgeLabel.getDstLabelId()); + } + break; + case IN: + if (edgeExpandedSrcVLabels.contains(edgeLabel.getDstLabelId())) { + edgeExpandedVLabels.add(edgeLabel.getSrcLabelId()); + } + break; + case BOTH: + if (edgeExpandedSrcVLabels.contains(edgeLabel.getSrcLabelId())) { + edgeExpandedVLabels.add(edgeLabel.getDstLabelId()); + } + if (edgeExpandedSrcVLabels.contains(edgeLabel.getDstLabelId())) { + edgeExpandedVLabels.add(edgeLabel.getSrcLabelId()); + } + break; + } + } + } + } + + // Finally, we check if the vertex types in getV to see if the type filter for the expanded + // vertex is necessary. + // e.g., if getV is "post" and expand type is "likes", then we cannot fuse them directly. + // Instead, we should create an EdgeExpand(V) with type "likes" and an Auxilia with type + // "post" as the filter. + List vertexParamLabels = + com.alibaba.graphscope.common.ir.tools.Utils.getGraphLabels(getV.getRowType()) + .getLabelsEntry(); + Set vertexExpandedVLabels = new HashSet<>(); + for (GraphLabelType.Entry vertexLabel : vertexParamLabels) { + vertexExpandedVLabels.add(vertexLabel.getLabelId()); + } + return ObjectUtils.isEmpty(getV.getFilters()) + && vertexExpandedVLabels.containsAll(edgeExpandedVLabels); + } + // transform expande + getv to GraphPhysicalExpandGetV public static class BasicExpandGetVFusionRule extends ExpandGetVFusionRule { diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/proto/GraphRelToProtoConverter.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/proto/GraphRelToProtoConverter.java index b98667944f1f..53e920b3a355 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/proto/GraphRelToProtoConverter.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/proto/GraphRelToProtoConverter.java @@ -33,7 +33,6 @@ import com.alibaba.graphscope.common.ir.tools.config.GraphOpt.PhysicalGetVOpt; import com.alibaba.graphscope.common.ir.type.GraphLabelType; import com.alibaba.graphscope.common.ir.type.GraphNameOrId; -import com.alibaba.graphscope.common.ir.type.GraphSchemaType; import com.alibaba.graphscope.gaia.proto.GraphAlgebra; import com.alibaba.graphscope.gaia.proto.GraphAlgebraPhysical; import com.alibaba.graphscope.gaia.proto.OuterExpression; @@ -170,7 +169,10 @@ public RelNode visit(GraphLogicalGetV getV) { Utils.protoGetVOpt(PhysicalGetVOpt.valueOf(getV.getOpt().name()))); // 1. build adjV without filter GraphAlgebra.QueryParams.Builder adjParamsBuilder = defaultQueryParams(); - addQueryTables(adjParamsBuilder, getGraphLabels(getV).getLabelsEntry()); + addQueryTables( + adjParamsBuilder, + com.alibaba.graphscope.common.ir.tools.Utils.getGraphLabels(getV.getRowType()) + .getLabelsEntry()); adjVertexBuilder.setParams(adjParamsBuilder); if (getV.getStartAlias().getAliasId() != AliasInference.DEFAULT_ID) { adjVertexBuilder.setTag(Utils.asAliasId(getV.getStartAlias().getAliasId())); @@ -975,16 +977,6 @@ private GraphAlgebra.IndexPredicate buildIndexPredicates(RexNode uniqueKeyFilter return indexPredicate; } - private GraphLabelType getGraphLabels(AbstractBindableTableScan tableScan) { - List fields = tableScan.getRowType().getFieldList(); - Preconditions.checkArgument( - !fields.isEmpty() && fields.get(0).getType() instanceof GraphSchemaType, - "data type of graph operators should be %s ", - GraphSchemaType.class); - GraphSchemaType schemaType = (GraphSchemaType) fields.get(0).getType(); - return schemaType.getLabelType(); - } - private GraphAlgebra.QueryParams.Builder defaultQueryParams() { GraphAlgebra.QueryParams.Builder paramsBuilder = GraphAlgebra.QueryParams.newBuilder(); // TODO: currently no sample rate fused into tableScan, so directly set 1.0 as default. @@ -1022,7 +1014,10 @@ private void addQueryColumns( private GraphAlgebra.QueryParams.Builder buildQueryParams(AbstractBindableTableScan tableScan) { GraphAlgebra.QueryParams.Builder paramsBuilder = defaultQueryParams(); - addQueryTables(paramsBuilder, getGraphLabels(tableScan).getLabelsEntry()); + addQueryTables( + paramsBuilder, + com.alibaba.graphscope.common.ir.tools.Utils.getGraphLabels(tableScan.getRowType()) + .getLabelsEntry()); addQueryFilters(paramsBuilder, tableScan.getFilters()); return paramsBuilder; } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/Utils.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/Utils.java index 0d38252fa7e9..3e5c72e610d0 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/Utils.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/Utils.java @@ -18,6 +18,9 @@ import com.alibaba.graphscope.common.ir.meta.schema.CommonOptTable; import com.alibaba.graphscope.common.ir.rel.CommonTableScan; +import com.alibaba.graphscope.common.ir.type.GraphLabelType; +import com.alibaba.graphscope.common.ir.type.GraphSchemaType; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Range; import com.google.common.collect.Sets; @@ -81,6 +84,20 @@ public static List getValuesAsList(Comparable value) { return values; } + public static GraphLabelType getGraphLabels(RelDataType rowType) { + if (rowType instanceof GraphSchemaType) { + return ((GraphSchemaType) rowType).getLabelType(); + } else { + List fields = rowType.getFieldList(); + Preconditions.checkArgument( + !fields.isEmpty() && fields.get(0).getType() instanceof GraphSchemaType, + "data type of graph operators should be %s ", + GraphSchemaType.class); + GraphSchemaType schemaType = (GraphSchemaType) fields.get(0).getType(); + return schemaType.getLabelType(); + } + } + /** * print root {@code RelNode} and nested {@code RelNode}s in each {@code CommonTableScan} * @param node diff --git a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/cbo/BITest.java b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/cbo/BITest.java index a70d3eafcc78..479d4514704f 100644 --- a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/cbo/BITest.java +++ b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/cbo/BITest.java @@ -196,13 +196,15 @@ public void bi3_test() { + " GraphPhysicalExpand(tableConfig=[{isAll=false," + " tables=[HASMODERATOR]}], alias=[forum], startAlias=[person], opt=[IN]," + " physicalOpt=[VERTEX])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(ISLOCATEDIN," - + " PERSON, CITY)]], alias=[person], startAlias=[PATTERN_VERTEX$1], opt=[IN]," + + " GraphPhysicalGetV(tableConfig=[{isAll=false," + + " tables=[PERSON]}], alias=[person], opt=[START], physicalOpt=[ITSELF])\n" + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(ISLOCATEDIN," + + " PERSON, CITY)]], alias=[_], startAlias=[PATTERN_VERTEX$1], opt=[IN]," + " physicalOpt=[VERTEX])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(ISPARTOF," + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(ISPARTOF," + " CITY, COUNTRY)]], alias=[PATTERN_VERTEX$1], startAlias=[country], opt=[IN]," + " physicalOpt=[VERTEX])\n" - + " GraphLogicalSource(tableConfig=[{isAll=false," + + " GraphLogicalSource(tableConfig=[{isAll=false," + " tables=[COUNTRY]}], alias=[country], fusedFilter=[[=(_.name," + " _UTF-8'China')]], opt=[VERTEX])", com.alibaba.graphscope.common.ir.tools.Utils.toString(after).trim()); @@ -258,10 +260,12 @@ public void bi5_test() { + " GraphPhysicalExpand(tableConfig=[{isAll=false," + " tables=[HASCREATOR]}], alias=[person], startAlias=[message], opt=[OUT]," + " physicalOpt=[VERTEX])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASTAG, COMMENT," - + " TAG), EdgeLabel(HASTAG, POST, TAG)]], alias=[message], startAlias=[tag]," + + " GraphPhysicalGetV(tableConfig=[{isAll=false, tables=[POST," + + " COMMENT]}], alias=[message], opt=[START], physicalOpt=[ITSELF])\n" + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASTAG, COMMENT," + + " TAG), EdgeLabel(HASTAG, POST, TAG)]], alias=[_], startAlias=[tag]," + " opt=[IN], physicalOpt=[VERTEX])\n" - + " GraphLogicalSource(tableConfig=[{isAll=false," + + " GraphLogicalSource(tableConfig=[{isAll=false," + " tables=[TAG]}], alias=[tag], fusedFilter=[[=(_.name, ?0)]], opt=[VERTEX])", com.alibaba.graphscope.common.ir.tools.Utils.toString(after).trim()); } @@ -306,10 +310,12 @@ public void bi6_test() { + " GraphPhysicalExpand(tableConfig=[{isAll=false," + " tables=[HASCREATOR]}], alias=[person1], startAlias=[message1], opt=[OUT]," + " physicalOpt=[VERTEX])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASTAG, COMMENT," - + " TAG), EdgeLabel(HASTAG, POST, TAG)]], alias=[message1], startAlias=[tag]," + + " GraphPhysicalGetV(tableConfig=[{isAll=false, tables=[POST," + + " COMMENT]}], alias=[message1], opt=[START], physicalOpt=[ITSELF])\n" + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASTAG, COMMENT," + + " TAG), EdgeLabel(HASTAG, POST, TAG)]], alias=[_], startAlias=[tag]," + " opt=[IN], physicalOpt=[VERTEX])\n" - + " GraphLogicalSource(tableConfig=[{isAll=false," + + " GraphLogicalSource(tableConfig=[{isAll=false," + " tables=[TAG]}], alias=[tag], fusedFilter=[[=(_.name, ?0)]], opt=[VERTEX])", com.alibaba.graphscope.common.ir.tools.Utils.toString(after).trim()); } @@ -348,14 +354,18 @@ public void bi7_test() { + " physicalOpt=[VERTEX])\n" + " GraphPhysicalExpand(tableConfig=[{isAll=false, tables=[REPLYOF]}]," + " alias=[comment], startAlias=[message], opt=[IN], physicalOpt=[VERTEX])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASTAG, COMMENT, TAG)," - + " EdgeLabel(HASTAG, POST, TAG)]], alias=[message], startAlias=[tag]," + + " GraphPhysicalGetV(tableConfig=[{isAll=false, tables=[POST," + + " COMMENT]}], alias=[message], opt=[START], physicalOpt=[ITSELF])\n" + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASTAG, COMMENT," + + " TAG), EdgeLabel(HASTAG, POST, TAG)]], alias=[_], startAlias=[tag]," + " opt=[IN], physicalOpt=[VERTEX])\n" - + " GraphLogicalSource(tableConfig=[{isAll=false, tables=[TAG]}]," + + " GraphLogicalSource(tableConfig=[{isAll=false, tables=[TAG]}]," + " alias=[tag], fusedFilter=[[=(_.name, ?0)]], opt=[VERTEX])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASTAG, COMMENT, TAG)]]," - + " alias=[comment], startAlias=[tag], opt=[IN], physicalOpt=[VERTEX])\n" - + " GraphLogicalSource(tableConfig=[{isAll=false, tables=[TAG]}]," + + " GraphPhysicalGetV(tableConfig=[{isAll=false, tables=[COMMENT]}]," + + " alias=[comment], opt=[START], physicalOpt=[ITSELF])\n" + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASTAG, COMMENT, TAG)]]," + + " alias=[_], startAlias=[tag], opt=[IN], physicalOpt=[VERTEX])\n" + + " GraphLogicalSource(tableConfig=[{isAll=false, tables=[TAG]}]," + " alias=[tag], fusedFilter=[[=(_.name, ?0)]], opt=[VERTEX])", after.explain().trim()); } @@ -529,12 +539,12 @@ public void bi11_test() { + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(ISLOCATEDIN, PERSON," + " CITY)]], alias=[PATTERN_VERTEX$1], startAlias=[a], opt=[OUT]," + " physicalOpt=[VERTEX])\n" - + " CommonTableScan(table=[[common#-1096947686]])\n" + + " CommonTableScan(table=[[common#1257237722]])\n" + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(ISPARTOF, CITY," + " COUNTRY)]], alias=[PATTERN_VERTEX$1], startAlias=[s], opt=[IN]," + " physicalOpt=[VERTEX])\n" - + " CommonTableScan(table=[[common#-1096947686]])\n" - + "common#-1096947686:\n" + + " CommonTableScan(table=[[common#1257237722]])\n" + + "common#1257237722:\n" + "MultiJoin(joinFilter=[=(a, a)], isFullOuterJoin=[false], joinTypes=[[INNER," + " INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])\n" + " GraphLogicalGetV(tableConfig=[{isAll=false, tables=[PERSON]}], alias=[a]," @@ -542,37 +552,39 @@ public void bi11_test() { + " GraphLogicalExpand(tableConfig=[{isAll=false, tables=[KNOWS]}]," + " alias=[k1], startAlias=[b], fusedFilter=[[AND(>=(_.creationDate, ?0)," + " <=(_.creationDate, ?1))]], opt=[BOTH])\n" - + " CommonTableScan(table=[[common#1348927974]])\n" + + " CommonTableScan(table=[[common#-393103770]])\n" + " GraphLogicalGetV(tableConfig=[{isAll=false, tables=[PERSON]}], alias=[a]," + " opt=[OTHER])\n" + " GraphLogicalExpand(tableConfig=[{isAll=false, tables=[KNOWS]}]," + " alias=[k2], startAlias=[c], fusedFilter=[[AND(>=(_.creationDate, ?0)," + " <=(_.creationDate, ?1))]], opt=[BOTH])\n" - + " CommonTableScan(table=[[common#1348927974]])\n" - + "common#1348927974:\n" + + " CommonTableScan(table=[[common#-393103770]])\n" + + "common#-393103770:\n" + "MultiJoin(joinFilter=[=(PATTERN_VERTEX$5, PATTERN_VERTEX$5)]," + " isFullOuterJoin=[false], joinTypes=[[INNER, INNER]]," + " outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])\n" + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(ISLOCATEDIN, PERSON, CITY)]]," + " alias=[PATTERN_VERTEX$5], startAlias=[b], opt=[OUT]," + " physicalOpt=[VERTEX])\n" - + " CommonTableScan(table=[[common#613791020]])\n" + + " CommonTableScan(table=[[common#1460176135]])\n" + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(ISPARTOF, CITY, COUNTRY)]]," + " alias=[PATTERN_VERTEX$5], startAlias=[s], opt=[IN], physicalOpt=[VERTEX])\n" - + " CommonTableScan(table=[[common#613791020]])\n" - + "common#613791020:\n" + + " CommonTableScan(table=[[common#1460176135]])\n" + + "common#1460176135:\n" + "GraphLogicalGetV(tableConfig=[{isAll=false, tables=[PERSON]}], alias=[b]," + " opt=[OTHER])\n" + " GraphLogicalExpand(tableConfig=[{isAll=false, tables=[KNOWS]}]," + " alias=[k3], startAlias=[c], fusedFilter=[[AND(>=(_.creationDate, ?0)," + " <=(_.creationDate, ?1))]], opt=[BOTH])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(ISLOCATEDIN, PERSON," - + " CITY)]], alias=[c], startAlias=[PATTERN_VERTEX$9], opt=[IN]," + + " GraphPhysicalGetV(tableConfig=[{isAll=false, tables=[PERSON]}]," + + " alias=[c], opt=[START], physicalOpt=[ITSELF])\n" + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(ISLOCATEDIN, PERSON," + + " CITY)]], alias=[_], startAlias=[PATTERN_VERTEX$9], opt=[IN]," + " physicalOpt=[VERTEX])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(ISPARTOF, CITY," + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(ISPARTOF, CITY," + " COUNTRY)]], alias=[PATTERN_VERTEX$9], startAlias=[s], opt=[IN]," + " physicalOpt=[VERTEX])\n" - + " GraphLogicalSource(tableConfig=[{isAll=false, tables=[COUNTRY]}]," + + " GraphLogicalSource(tableConfig=[{isAll=false, tables=[COUNTRY]}]," + " alias=[s], fusedFilter=[[=(_.name, _UTF-8'India')]], opt=[VERTEX])", com.alibaba.graphscope.common.ir.tools.Utils.toString(after).trim()); } diff --git a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/cbo/LdbcTest.java b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/cbo/LdbcTest.java index d3963ee6314e..9a0e9db8c622 100644 --- a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/cbo/LdbcTest.java +++ b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/cbo/LdbcTest.java @@ -334,13 +334,15 @@ public void ldbc4_test() { + " post]}], values=[[]])\n" + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASTAG, POST," + " TAG)]], alias=[tag], startAlias=[post], opt=[OUT], physicalOpt=[VERTEX])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASCREATOR, POST," - + " PERSON)]], alias=[post], startAlias=[friend], opt=[IN]," + + " GraphPhysicalGetV(tableConfig=[{isAll=false, tables=[POST]}]," + + " alias=[post], opt=[START], physicalOpt=[ITSELF])\n" + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASCREATOR," + + " POST, PERSON)]], alias=[_], startAlias=[friend], opt=[IN]," + " physicalOpt=[VERTEX])\n" - + " GraphPhysicalExpand(tableConfig=[{isAll=false," + + " GraphPhysicalExpand(tableConfig=[{isAll=false," + " tables=[KNOWS]}], alias=[friend], startAlias=[person], opt=[BOTH]," + " physicalOpt=[VERTEX])\n" - + " GraphLogicalSource(tableConfig=[{isAll=false," + + " GraphLogicalSource(tableConfig=[{isAll=false," + " tables=[PERSON]}], alias=[person], opt=[VERTEX], uniqueKeyFilters=[=(_.id," + " ?0)])", after.explain().trim()); @@ -388,10 +390,12 @@ public void ldbc5_test() { + " tables=[CONTAINEROF]}], alias=[post], startAlias=[forum], opt=[OUT]," + " physicalOpt=[VERTEX], optional=[true])\n" + " CommonTableScan(table=[[common#391831169]])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASCREATOR, POST," - + " PERSON)]], alias=[post], startAlias=[friend], opt=[IN]," - + " physicalOpt=[VERTEX], optional=[true])\n" - + " CommonTableScan(table=[[common#391831169]])\n" + + " GraphPhysicalGetV(tableConfig=[{isAll=false, tables=[POST]}]," + + " alias=[post], opt=[START], physicalOpt=[ITSELF])\n" + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASCREATOR, POST," + + " PERSON)]], alias=[_], startAlias=[friend], opt=[IN], physicalOpt=[VERTEX]," + + " optional=[true])\n" + + " CommonTableScan(table=[[common#391831169]])\n" + "common#391831169:\n" + "GraphLogicalGetV(tableConfig=[{isAll=false, tables=[FORUM]}], alias=[forum]," + " opt=[START])\n" @@ -440,17 +444,18 @@ public void ldbc6_test() { + " opt=[END], physicalOpt=[ITSELF])\n" + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASTAG, POST, TAG)]]," + " alias=[_], startAlias=[post], opt=[OUT], physicalOpt=[VERTEX])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASCREATOR, POST," - + " PERSON)]], alias=[post], startAlias=[other], opt=[IN]," - + " physicalOpt=[VERTEX])\n" - + " GraphLogicalGetV(tableConfig=[{isAll=false, tables=[PERSON]}]," - + " alias=[other], opt=[END])\n" - + " " + + " GraphPhysicalGetV(tableConfig=[{isAll=false, tables=[POST]}]," + + " alias=[post], opt=[START], physicalOpt=[ITSELF])\n" + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASCREATOR, POST," + + " PERSON)]], alias=[_], startAlias=[other], opt=[IN], physicalOpt=[VERTEX])\n" + + " GraphLogicalGetV(tableConfig=[{isAll=false," + + " tables=[PERSON]}], alias=[other], opt=[END])\n" + + " " + " GraphLogicalPathExpand(fused=[GraphPhysicalExpand(tableConfig=[{isAll=false," + " tables=[KNOWS]}], alias=[_], opt=[BOTH], physicalOpt=[VERTEX])\n" + "], offset=[1], fetch=[2], path_opt=[ARBITRARY], result_opt=[END_V]," + " alias=[_], start_alias=[person])\n" - + " GraphLogicalSource(tableConfig=[{isAll=false," + + " GraphLogicalSource(tableConfig=[{isAll=false," + " tables=[PERSON]}], alias=[person], opt=[VERTEX], uniqueKeyFilters=[=(_.id," + " 2199023382370)])", after.explain().trim()); @@ -687,37 +692,43 @@ public void ldbc10_test() { + " MultiJoin(joinFilter=[=(post1, post1)]," + " isFullOuterJoin=[false], joinTypes=[[INNER, INNER]]," + " outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASCREATOR," - + " POST, PERSON)]], alias=[post1], startAlias=[friend], opt=[IN]," + + " GraphPhysicalGetV(tableConfig=[{isAll=false," + + " tables=[POST]}], alias=[post1], opt=[START], physicalOpt=[ITSELF])\n" + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASCREATOR," + + " POST, PERSON)]], alias=[_], startAlias=[friend], opt=[IN]," + " physicalOpt=[VERTEX], optional=[true])\n" - + " CommonTableScan(table=[[common#-1774131414]])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASTAG, POST," - + " TAG)]], alias=[post1], startAlias=[tag], opt=[IN], physicalOpt=[VERTEX]," + + " CommonTableScan(table=[[common#-1626533514]])\n" + + " GraphPhysicalGetV(tableConfig=[{isAll=false," + + " tables=[POST]}], alias=[post1], opt=[START], physicalOpt=[ITSELF])\n" + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASTAG, POST," + + " TAG)]], alias=[_], startAlias=[tag], opt=[IN], physicalOpt=[VERTEX]," + " optional=[true])\n" - + " CommonTableScan(table=[[common#-1774131414]])\n" + + " CommonTableScan(table=[[common#-1626533514]])\n" + " GraphPhysicalExpand(tableConfig=[{isAll=false, tables=[KNOWS]}]," + " alias=[friend], startAlias=[person], opt=[BOTH], physicalOpt=[VERTEX])\n" + " GraphLogicalSource(tableConfig=[{isAll=false," + " tables=[PERSON]}], alias=[person], opt=[VERTEX], uniqueKeyFilters=[=(_.id," + " ?0)])\n" - + "common#-1774131414:\n" - + "GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASCREATOR, POST, PERSON)]]," - + " alias=[post], startAlias=[friend], opt=[IN], physicalOpt=[VERTEX]," + + "common#-1626533514:\n" + + "GraphPhysicalGetV(tableConfig=[{isAll=false, tables=[POST]}], alias=[post]," + + " opt=[START], physicalOpt=[ITSELF])\n" + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASCREATOR, POST, PERSON)]]," + + " alias=[_], startAlias=[friend], opt=[IN], physicalOpt=[VERTEX]," + " optional=[true])\n" - + " GraphPhysicalExpand(tableConfig=[{isAll=false, tables=[HASINTEREST]}]," + + " GraphPhysicalExpand(tableConfig=[{isAll=false, tables=[HASINTEREST]}]," + " alias=[tag], startAlias=[person], opt=[OUT], physicalOpt=[VERTEX]," + " optional=[true])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(ISLOCATEDIN, PERSON," + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(ISLOCATEDIN, PERSON," + " PLACE)]], alias=[city], startAlias=[friend], opt=[OUT]," + " physicalOpt=[VERTEX])\n" - + " GraphLogicalGetV(tableConfig=[{isAll=false, tables=[PERSON]}]," + + " GraphLogicalGetV(tableConfig=[{isAll=false, tables=[PERSON]}]," + " alias=[friend], opt=[END])\n" - + " " + + " " + " GraphLogicalPathExpand(fused=[GraphPhysicalExpand(tableConfig=[{isAll=false," + " tables=[KNOWS]}], alias=[_], opt=[BOTH], physicalOpt=[VERTEX])\n" + "], offset=[2], fetch=[1], path_opt=[ARBITRARY], result_opt=[END_V]," + " alias=[_], start_alias=[person])\n" - + " GraphLogicalSource(tableConfig=[{isAll=false, tables=[PERSON]}]," + + " GraphLogicalSource(tableConfig=[{isAll=false, tables=[PERSON]}]," + " alias=[person], opt=[VERTEX], uniqueKeyFilters=[=(_.id, ?0)])", com.alibaba.graphscope.common.ir.tools.Utils.toString(after).trim()); } @@ -826,16 +837,20 @@ public void ldbc12_test() { + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASTAG, POST, TAG)]]," + " alias=[tag], startAlias=[PATTERN_VERTEX$5], opt=[OUT]," + " physicalOpt=[VERTEX])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(REPLYOF, COMMENT," - + " POST)]], alias=[PATTERN_VERTEX$5], startAlias=[comment], opt=[OUT]," + + " GraphPhysicalGetV(tableConfig=[{isAll=false, tables=[POST]}]," + + " alias=[PATTERN_VERTEX$5], opt=[END], physicalOpt=[ITSELF])\n" + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(REPLYOF, COMMENT," + + " POST)]], alias=[_], startAlias=[comment], opt=[OUT]," + " physicalOpt=[VERTEX])\n" - + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASCREATOR," - + " COMMENT, PERSON)]], alias=[comment], startAlias=[friend], opt=[IN]," + + " GraphPhysicalGetV(tableConfig=[{isAll=false," + + " tables=[COMMENT]}], alias=[comment], opt=[START], physicalOpt=[ITSELF])\n" + + " GraphPhysicalExpand(tableConfig=[[EdgeLabel(HASCREATOR," + + " COMMENT, PERSON)]], alias=[_], startAlias=[friend], opt=[IN]," + " physicalOpt=[VERTEX])\n" - + " GraphPhysicalExpand(tableConfig=[{isAll=false," + + " GraphPhysicalExpand(tableConfig=[{isAll=false," + " tables=[KNOWS]}], alias=[friend], startAlias=[PATTERN_VERTEX$0]," + " opt=[BOTH], physicalOpt=[VERTEX])\n" - + " GraphLogicalSource(tableConfig=[{isAll=false," + + " GraphLogicalSource(tableConfig=[{isAll=false," + " tables=[PERSON]}], alias=[PATTERN_VERTEX$0], opt=[VERTEX]," + " uniqueKeyFilters=[=(_.id, 2199023382370)])", after.explain().trim()); diff --git a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/rbo/ExpandGetVFusionTest.java b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/rbo/ExpandGetVFusionTest.java index dbfce24c370d..cd71d1a92696 100644 --- a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/rbo/ExpandGetVFusionTest.java +++ b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/rbo/ExpandGetVFusionTest.java @@ -332,6 +332,83 @@ public void expand_getv_fusion_6_test() { after.explain().trim()); } + // g.V().hasLabel("person").outE("likes").inV().hasLabel("comment"), can not be fused into a + // single GraphPhysicalExpand since we actually have person-likes-comment and person-likes-post. + @Test + public void expand_getv_fusion_7_test() { + GraphBuilder builder = Utils.mockGraphBuilder("schema/ldbc.json"); + RelNode before = + builder.source( + new SourceConfig( + GraphOpt.Source.VERTEX, + new LabelConfig(false).addLabel("PERSON"))) + .expand( + new ExpandConfig( + GraphOpt.Expand.OUT, + new LabelConfig(false).addLabel("LIKES"))) + .getV( + new GetVConfig( + GraphOpt.GetV.END, + new LabelConfig(false).addLabel("COMMENT"))) + .build(); + Assert.assertEquals( + "GraphLogicalGetV(tableConfig=[{isAll=false, tables=[COMMENT]}]," + + " alias=[_], opt=[END])\n" + + " GraphLogicalExpand(tableConfig=[{isAll=false, tables=[LIKES]}]," + + " alias=[_], opt=[OUT])\n" + + " GraphLogicalSource(tableConfig=[{isAll=false, tables=[PERSON]}]," + + " alias=[_], opt=[VERTEX])", + before.explain().trim()); + RelOptPlanner planner = + Utils.mockPlanner(ExpandGetVFusionRule.BasicExpandGetVFusionRule.Config.DEFAULT); + planner.setRoot(before); + RelNode after = planner.findBestExp(); + Assert.assertEquals( + "GraphPhysicalGetV(tableConfig=[{isAll=false, tables=[COMMENT]}], alias=[_]," + + " opt=[END], physicalOpt=[ITSELF])\n" + + " GraphPhysicalExpand(tableConfig=[{isAll=false, tables=[LIKES]}]," + + " alias=[_], opt=[OUT], physicalOpt=[VERTEX])\n" + + " GraphLogicalSource(tableConfig=[{isAll=false, tables=[PERSON]}]," + + " alias=[_], opt=[VERTEX])", + after.explain().trim()); + } + + // g.V().hasLabel("person").outE("likes").inV(), can be fused into a single GraphPhysicalExpand + // even without type infer for GetV + @Test + public void expand_getv_fusion_8_test() { + GraphBuilder builder = Utils.mockGraphBuilder("schema/ldbc.json"); + RelNode before = + builder.source( + new SourceConfig( + GraphOpt.Source.VERTEX, + new LabelConfig(false).addLabel("PERSON"))) + .expand( + new ExpandConfig( + GraphOpt.Expand.OUT, + new LabelConfig(false).addLabel("LIKES"))) + .getV(new GetVConfig(GraphOpt.GetV.END, new LabelConfig(true))) + .build(); + Assert.assertEquals( + "GraphLogicalGetV(tableConfig=[{isAll=true, tables=[PERSON, POST, TAG," + + " ORGANISATION, PLACE, TAGCLASS, COMMENT, FORUM]}], alias=[_], opt=[END])\n" + + " GraphLogicalExpand(tableConfig=[{isAll=false, tables=[LIKES]}], alias=[_]," + + " opt=[OUT])\n" + + " GraphLogicalSource(tableConfig=[{isAll=false, tables=[PERSON]}]," + + " alias=[_], opt=[VERTEX])", + before.explain().trim()); + RelOptPlanner planner = + Utils.mockPlanner(ExpandGetVFusionRule.BasicExpandGetVFusionRule.Config.DEFAULT); + planner.setRoot(before); + RelNode after = planner.findBestExp(); + Assert.assertEquals( + "GraphPhysicalExpand(tableConfig=[{isAll=false, tables=[LIKES]}], alias=[_]," + + " opt=[OUT], physicalOpt=[VERTEX])\n" + + " GraphLogicalSource(tableConfig=[{isAll=false, tables=[PERSON]}]," + + " alias=[_], opt=[VERTEX])", + after.explain().trim()); + } + // path expand: g.V().hasLabel("person").out('1..3', "knows").with('PATH_OPT', // SIMPLE).with('RESULT_OPT', ALL_V) @Test diff --git a/interactive_engine/executor/assembly/groot/src/executor/gaia/gaia_server.rs b/interactive_engine/executor/assembly/groot/src/executor/gaia/gaia_server.rs index e120b6b41603..b400d51152b5 100644 --- a/interactive_engine/executor/assembly/groot/src/executor/gaia/gaia_server.rs +++ b/interactive_engine/executor/assembly/groot/src/executor/gaia/gaia_server.rs @@ -209,14 +209,7 @@ fn make_gaia_config(graph_config: Arc) -> GaiaConfig { .no_delay(no_delay) .send_buffer(send_buffer) .heartbeat_sec(heartbeat_sec); - let enable_tracing = graph_config - .get_storage_option("tracing.enabled") - .map(|config_str| { - config_str - .parse() - .expect("parse tracing.enabled failed") - }); - GaiaConfig { network: Some(network_config), max_pool_size, enable_tracing } + GaiaConfig { network: Some(network_config), max_pool_size } } fn make_gaia_rpc_config(graph_config: Arc) -> RPCServerConfig { diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/config.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/config.rs index cd182f9aaa96..5dd3a655869b 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/config.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/config.rs @@ -37,7 +37,6 @@ macro_rules! configure_with_default { pub struct Configuration { pub network: Option, pub max_pool_size: Option, - pub enable_tracing: Option, } impl Configuration { @@ -46,11 +45,11 @@ impl Configuration { } pub fn singleton() -> Self { - Configuration { network: None, max_pool_size: None, enable_tracing: None } + Configuration { network: None, max_pool_size: None } } pub fn with(network: NetworkConfig) -> Self { - Configuration { network: Some(network), max_pool_size: None, enable_tracing: None } + Configuration { network: Some(network), max_pool_size: None } } pub fn server_id(&self) -> u64 { diff --git a/interactive_engine/executor/engine/pegasus/server/build.rs b/interactive_engine/executor/engine/pegasus/server/build.rs index 57591107fd35..841af8d28430 100644 --- a/interactive_engine/executor/engine/pegasus/server/build.rs +++ b/interactive_engine/executor/engine/pegasus/server/build.rs @@ -38,6 +38,7 @@ fn codegen_inplace() -> Result<(), Box> { fn codegen_inplace() -> Result<(), Box> { tonic_build::configure() .build_server(true) + .build_client(true) .compile(&["proto/job_service.proto", "proto/job_plan.proto"], &["proto"])?; Ok(()) } diff --git a/interactive_engine/executor/engine/pegasus/server/config/tests/standalone2/server_config.toml b/interactive_engine/executor/engine/pegasus/server/config/tests/standalone2/server_config.toml index df57aba4b2a9..d1e0019186cb 100644 --- a/interactive_engine/executor/engine/pegasus/server/config/tests/standalone2/server_config.toml +++ b/interactive_engine/executor/engine/pegasus/server/config/tests/standalone2/server_config.toml @@ -1,7 +1,6 @@ # Set max threads will be created in the executor pool; # It will be set to CPU cores by default; max_pool_size = 8 -enable_tracing = false [network] # Set server id of current config belongs to; diff --git a/interactive_engine/executor/engine/pegasus/server/src/rpc.rs b/interactive_engine/executor/engine/pegasus/server/src/rpc.rs index 17ae301c1b82..f5011fafd8cf 100644 --- a/interactive_engine/executor/engine/pegasus/server/src/rpc.rs +++ b/interactive_engine/executor/engine/pegasus/server/src/rpc.rs @@ -27,15 +27,21 @@ use std::time::Duration; use futures::Stream; use hyper::server::accept::Accept; use hyper::server::conn::{AddrIncoming, AddrStream}; -use opentelemetry::trace::TraceContextExt; +use opentelemetry::trace::{TraceContextExt, TraceError}; use opentelemetry::{ global, propagation::Extractor, trace::{Span, SpanKind, Tracer}, KeyValue, }; -use opentelemetry_otlp::WithExportConfig; +use opentelemetry_otlp::{ExportConfig, Protocol, TonicExporterBuilder, WithExportConfig}; +use opentelemetry_sdk::metrics::SdkMeterProvider; use opentelemetry_sdk::propagation::TraceContextPropagator; +use opentelemetry_sdk::resource::{ + EnvResourceDetector, SdkProvidedResourceDetector, TelemetryResourceDetector, +}; +use opentelemetry_sdk::trace::BatchConfigBuilder; +use opentelemetry_sdk::Resource; use pegasus::api::function::FnResult; use pegasus::api::FromStream; use pegasus::errors::{ErrorKind, JobExecError}; @@ -188,7 +194,8 @@ where type SubmitStream = UnboundedReceiverStream>; async fn cancel(&self, req: Request) -> Result, Status> { - let parent_ctx = global::get_text_map_propagator(|prop| prop.extract(&MetadataMap(req.metadata()))); + let parent_ctx = + global::get_text_map_propagator(|prop| prop.extract(&MyMetadataMap(req.metadata()))); let tracer = global::tracer("executor"); let _span = tracer .span_builder("JobService/cancel") @@ -201,7 +208,8 @@ where async fn submit(&self, req: Request) -> Result, Status> { debug!("accept new request from {:?};", req.remote_addr()); - let parent_ctx = global::get_text_map_propagator(|prop| prop.extract(&MetadataMap(req.metadata()))); + let parent_ctx = + global::get_text_map_propagator(|prop| prop.extract(&MyMetadataMap(req.metadata()))); let tracer = global::tracer("executor"); let pb::JobRequest { conf, source, plan, resource } = req.into_inner(); @@ -292,9 +300,7 @@ where D: ServerDetect + 'static, E: ServiceStartListener, { - if server_config.enable_tracing.unwrap_or(false) { - let _tracer = init_tracer().expect("Failed to initialize tracer."); - } + init_otel().expect("Failed to initialize open telemetry"); let server_id = server_config.server_id(); if let Some(server_addr) = pegasus::startup_with(server_config, server_detector)? { listener.on_server_start(server_id, server_addr)?; @@ -378,24 +384,86 @@ impl RPCJobServer { } } -fn init_tracer() -> Result { +fn init_otel() -> Result> { + let otel_disable = std::env::var("OTEL_SDK_DISABLED").unwrap_or("true".to_string()); + info!("otel_disable: {}", otel_disable); + if otel_disable.trim().parse().unwrap() { + info!("OTEL is disabled"); + return Ok(true); + } + + // let mut metadata = tonic::metadata::MetadataMap::with_capacity(1); + // let dsn = std::env::var("UPTRACE_DSN").unwrap_or_default(); + // if !dsn.is_empty() { + // metadata.insert("uptrace-dsn", dsn.parse().unwrap()); + // info!("using DSN: {}", dsn); + // } else { + // warn!("Error: UPTRACE_DSN not found."); + // } + + let default_endpoint = "http://localhost:4317".to_string(); + let endpoint = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT").unwrap_or(default_endpoint); + + let resource = Resource::from_detectors( + Duration::from_secs(0), + vec![ + Box::new(SdkProvidedResourceDetector), + Box::new(EnvResourceDetector::new()), + Box::new(TelemetryResourceDetector), + ], + ); + + let exporter = opentelemetry_otlp::new_exporter() + .tonic() + .with_timeout(Duration::from_secs(5)) + .with_endpoint(endpoint.clone()); + // .with_metadata(metadata.clone()); + let _tracer = init_tracer(resource.clone(), exporter)?; + + let exporter = opentelemetry_otlp::new_exporter() + .tonic() + .with_timeout(Duration::from_secs(5)) + .with_endpoint(endpoint); + // .with_metadata(metadata); + + let _meter = init_meter_provider(resource, exporter)?; + global::set_meter_provider(_meter); + return Ok(true); +} + +fn init_tracer( + resource: Resource, exporter: TonicExporterBuilder, +) -> Result { global::set_text_map_propagator(TraceContextPropagator::new()); + let batch_config = opentelemetry_sdk::trace::BatchConfigBuilder::default() + .with_max_queue_size(2048) + .with_max_export_batch_size(512) + .with_scheduled_delay(Duration::from_millis(5000)) + .build(); + let trace_config = opentelemetry_sdk::trace::config().with_resource(resource); opentelemetry_otlp::new_pipeline() .tracing() - .with_exporter( - opentelemetry_otlp::new_exporter() - .tonic() - .with_endpoint("http://localhost:4317"), - ) - .with_trace_config(opentelemetry_sdk::trace::config().with_resource( - opentelemetry_sdk::Resource::new(vec![KeyValue::new("service.name", "pegasus")]), - )) + .with_exporter(exporter) + .with_batch_config(batch_config) + .with_trace_config(trace_config) .install_batch(opentelemetry_sdk::runtime::Tokio) } -struct MetadataMap<'a>(&'a tonic::metadata::MetadataMap); +fn init_meter_provider( + resource: Resource, exporter: TonicExporterBuilder, +) -> opentelemetry::metrics::Result { + opentelemetry_otlp::new_pipeline() + .metrics(opentelemetry_sdk::runtime::Tokio) + .with_exporter(exporter) + .with_period(Duration::from_secs(15)) + .with_timeout(Duration::from_secs(5)) + .with_resource(resource) + .build() +} + +struct MyMetadataMap<'a>(&'a tonic::metadata::MetadataMap); -impl<'a> Extractor for MetadataMap<'a> { +impl<'a> Extractor for MyMetadataMap<'a> { fn get(&self, key: &str) -> Option<&str> { self.0 .get(key) diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs b/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs index c8eee6a06f12..359f818b0bfd 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs @@ -156,7 +156,15 @@ impl FilterMapFunction for AuxiliaOperator { // pruning by labels return Ok(None); } else if !self.query_params.has_predicates() && !self.query_params.has_columns() { - // if only filter by labels, directly return the results. + // if only filter by labels, return the results. + // if it has alias, append without moving head + if let Some(alias) = self.alias { + // append without moving head + let entry_clone = entry.clone(); + input + .get_columns_mut() + .insert(alias as usize, entry_clone); + } return Ok(Some(input)); } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/Utils.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/Utils.java index 381fceff51d0..e34b281104f2 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/Utils.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/Utils.java @@ -41,6 +41,9 @@ public class Utils { public static String getHostTemplate(Configs configs, RoleType role) { String releaseName = DiscoveryConfig.RELEASE_FULL_NAME.get(configs); + if (releaseName.equals("localhost") || releaseName.equals("127.0.0.1")) { + return releaseName; + } // template = "{releaseName}-{role}-{}.{releaseName}-{role}-headless"; // i.e. demo-graphscope-store-frontend-0.demo-graphscope-store-frontend-headless String svcTemplate = "%s-%s"; diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/LogRecycler.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/LogRecycler.java index 01eccae521a3..1c03f6e8ce24 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/LogRecycler.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/LogRecycler.java @@ -84,6 +84,7 @@ private void doRecycle() { List queueOffsets = this.snapshotManager.getQueueOffsets(); for (int i = 0; i < queueOffsets.size(); i++) { long offset = queueOffsets.get(i); + offset = Math.max(offset - 3600, 0); // Leave some spaces try { logService.deleteBeforeOffset(i, offset); logger.info("recycled queue [{}] offset [{}]", i, offset); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java index 596f2f07a590..204ed7dca4f3 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java @@ -75,6 +75,7 @@ public void getPartitionNum( public void prepareDataLoad( PrepareDataLoadRequest request, StreamObserver responseObserver) { + logger.info("Preparing data load"); DdlRequestBatch.Builder builder = DdlRequestBatch.newBuilder(); for (DataLoadTargetPb dataLoadTargetPb : request.getDataLoadTargetsList()) { DataLoadTarget dataLoadTarget = DataLoadTarget.parseProto(dataLoadTargetPb); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java index 68cd0f536ddc..1c6cb6af8131 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java @@ -107,6 +107,11 @@ public void start() { public void stop() { this.shouldStop = true; + try { + updateQueueOffsets(); + } catch (IOException ex) { + logger.error("update queue offset failed", ex); + } if (this.persistOffsetsScheduler != null) { this.persistOffsetsScheduler.shutdown(); try { @@ -163,8 +168,7 @@ private void updateQueueOffsets() throws IOException { boolean changed = false; List consumedOffsets = writerAgent.getConsumedQueueOffsets(); for (int qId = 0; qId < queueOffsets.size(); qId++) { - long minOffset = Long.MAX_VALUE; - minOffset = Math.min(consumedOffsets.get(qId), minOffset); + long minOffset = Math.min(consumedOffsets.get(qId), Long.MAX_VALUE); if (minOffset != Long.MAX_VALUE && minOffset > newQueueOffsets.get(qId)) { newQueueOffsets.set(qId, minOffset); changed = true; diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/PartitionService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/PartitionService.java index a897f32052c0..f96c054c2475 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/PartitionService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/PartitionService.java @@ -24,8 +24,7 @@ public class PartitionService { public PartitionService(Configs configs, StoreService storeService) { this.storeService = storeService; this.isSecondary = CommonConfig.SECONDARY_INSTANCE_ENABLED.get(configs); - this.storeCatchupIntervalMS = StoreConfig.STORE_GC_INTERVAL_MS.get(configs); - // this.storeCatchupIntervalMS = StoreConfig.STORE_CATCHUP_INTERVAL_MS.get(configs); + this.storeCatchupIntervalMS = StoreConfig.STORE_CATCHUP_INTERVAL_MS.get(configs); this.scheduler = Executors.newScheduledThreadPool( diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java index b68f8e7d7cf1..d537032f1082 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java @@ -167,8 +167,8 @@ private void processBatches() { this.consumeSI = batchSI; this.availSnapshotInfoRef.set(new SnapshotInfo(availSI, availDdlSI)); this.commitExecutor.execute(this::asyncCommit); - } else { - logger.warn("consumedSI {} >= batchSI {}, ignored", consumeSI, batchSI); + } else { // a flurry of batches with same snapshot ID + logger.debug("consumedSI {} >= batchSI {}, ignored", consumeSI, batchSI); } if (hasDdl) { this.consumeDdlSnapshotId = batchSI; diff --git a/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/store/WriterAgentTest.java b/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/store/WriterAgentTest.java index fd89d7273194..c71b46d47014 100644 --- a/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/store/WriterAgentTest.java +++ b/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/store/WriterAgentTest.java @@ -17,7 +17,6 @@ import com.alibaba.graphscope.groot.common.config.CommonConfig; import com.alibaba.graphscope.groot.common.config.Configs; -import com.alibaba.graphscope.groot.common.config.StoreConfig; import com.alibaba.graphscope.groot.meta.MetaService; import com.alibaba.graphscope.groot.operation.StoreDataBatch; import com.alibaba.graphscope.groot.rpc.RoleClients; @@ -34,11 +33,7 @@ public class WriterAgentTest { @Test void testWriterAgent() throws InterruptedException, ExecutionException { - Configs configs = - Configs.newBuilder() - .put(CommonConfig.NODE_IDX.getKey(), "0") - .put(StoreConfig.STORE_COMMIT_INTERVAL_MS.getKey(), "10") - .build(); + Configs configs = Configs.newBuilder().put(CommonConfig.NODE_IDX.getKey(), "0").build(); StoreService mockStoreService = mock(StoreService.class); MetaService mockMetaService = mock(MetaService.class); diff --git a/python/graphscope/gsctl/commands/common.py b/python/graphscope/gsctl/commands/common.py index 587871274ea4..7d94a2aad577 100644 --- a/python/graphscope/gsctl/commands/common.py +++ b/python/graphscope/gsctl/commands/common.py @@ -52,7 +52,7 @@ def connect(coordinator_endpoint): context = get_current_context() if context is None: err( - "No available context found, try to connect by `gsctl conenct --coordinator-endpoint `." + "No available context found, try to connect by `gsctl connect --coordinator-endpoint `." ) return coordinator_endpoint = context.coordinator_endpoint @@ -82,5 +82,109 @@ def close(): info("Try 'gsctl --help' for help.") +disclaimer = """ +Disclaimer: The `estimate` command serves as an estimator for various kinds of GraphScope resources. + - GAE memory usage estimator: + It would do a rough estimation of the total memory usage of the GAE. + The actual memory usage may vary vastly due to the complexity of the graph algorithms and the data distribution. + + Here's some assumption when estimating the memory usage: + 1. Assuming the graph is a simple graph, e.g. only has 1 label and at most 1 property for each label. + 2. Assuming the graph algorithm has a fixed amount of memory consumption, e.g. PageRank, SSSP, CC; + It should not generate a huge amount of intermediate result, like K-Hop, Louvain. + 3. Assuming the vertex map type is global vertex map + + Users should take this estimation as a reference and a starting point for tuning, + and adjust the memory allocation according to the actual situation. +""" + + +def estimate_gae_memory_usage_for_graph( + v_num, e_num, v_file_size, e_file_size, partition +): + """ + The estimation is based on the following formulas: + let #V = number of vertices, #E = number of edges, + assuming load_factor of hashmap is 0.41, sizeof(inner_id) = sizeof(edge_id) = sizeof(vertex_id) = 8 bytes, + where vertex_id is the primary key of the vertex, default to int64, + it could be string though, in that case, the memory usage will grow. + Graph: + 1. vertex + - vertex map: #V * (sizeof(vertex_id) + sizeof(inner_id)) * (1 / load_factor) + - vertex table: size of vertex files in uncompressed CSV format, unit is GB + 2. edge + - CSR + CSC: 2 * #E * (sizeof(inner_id) + sizeof(edge_id)) + - offset array: #V * sizeof(size_t) + - edge table: size of vertex files in uncompressed CSV format, unit is GB + 3. additional data structure when graph is partitioned, assuming 10% of vertices will be outer vertices + - outer vertex ID to local ID map: 0.1 * #V * (sizeof(vertex_id) + sizeof(inner_id)) * (1 / load_factor) + - local ID to outer vertex ID array: 0.1 * #V * sizeof(size_t) + + This is the minimum usage of the GAE with 1 partition, the actual memory usage would be larger than this estimation. + """ + gb = 1024 * 1024 * 1024 + # vertex map + vertex_map = v_num * (8 + 8) * (1 / 0.41) / gb + # vertex table + vertex_table = v_file_size + # edge + edge = 2 * e_num * (8 + 8) / gb + # offset array + offset_array = v_num * 8 / gb + # edge table + edge_table = e_file_size + additional = 0.1 * v_num * (8 + 8) * (1 / 0.41) / gb + 0.1 * v_num * 8 / gb + if partition == 1: + additional = 0 + per_partition = ( + vertex_map + + (vertex_table + edge_table + edge + offset_array) / partition + + additional + ) + return per_partition + + +def estimate_gae_memory_usage_for_algorithm(v_num): + """ + Simple algorithms like PageRank or CC will have 1 slot for result for each vertex. + Formula: #V * sizeof(double) + """ + gb = 1024 * 1024 * 1024 + usage = v_num * 8 / gb + return usage + + +@cli.command() +@click.option( + "-g", + "--engine", + type=click.Choice(["gae"], case_sensitive=False), + help="Engine type", + required=True, +) +@click.option("-v", "--v-num", type=int, help="Number of vertices") +@click.option("-e", "--e-num", type=int, help="Number of edges") +@click.option("-vf", "--v-file-size", type=float, help="Size of vertex files in GB") +@click.option("-ef", "--e-file-size", type=float, help="Size of edge files in GB") +@click.option("-p", "--partition", type=int, help="Number of partitions", default=1) +def estimate(engine, v_num, e_num, v_file_size, e_file_size, partition): + """Estimate the resources requirement for various kinds of GraphScope components""" + if engine == "gae": + if v_num is None or e_num is None or v_file_size is None or e_file_size is None: + err("Please provide the required parameters.") + return + partition_usage = estimate_gae_memory_usage_for_graph( + v_num, e_num, v_file_size, e_file_size, partition + ) + algorithm_usage = estimate_gae_memory_usage_for_algorithm(v_num) + memory_usage = partition_usage + algorithm_usage + info(disclaimer) + succ( + f"The estimated memory usage is {memory_usage:.2f} GB per pod for {partition} pods.\n" + ) + else: + err(f"Estimating usage of engine {engine} is not supported yet.") + + if __name__ == "__main__": cli()