Skip to content

Commit

Permalink
Merge branch 'main' into fix_pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
longbinlai authored May 21, 2024
2 parents 1bf231e + 0893b5f commit 4fff52d
Show file tree
Hide file tree
Showing 28 changed files with 596 additions and 151 deletions.
9 changes: 5 additions & 4 deletions charts/graphscope-store/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ data:
discovery.mode={{ .Values.discoveryMode }}
role.name=ROLE
node.idx=INDEX
release.full.name={{ include "graphscope-store.fullname" . }}
{{- else }}
rpc.port=0
discovery.mode=zookeeper
role.name=""
node.idx=0
release.full.name=localhost
{{- end }}

store.node.count={{ .Values.store.replicaCount }}
Expand Down Expand Up @@ -58,7 +60,6 @@ data:
neo4j.bolt.server.disabled=true

log4rs.config=LOG4RS_CONFIG
release.full.name={{ include "graphscope-store.fullname" . }}
## Auth config
auth.username={{ .Values.auth.username }}
auth.password={{ .Values.auth.password }}
Expand All @@ -79,7 +80,6 @@ data:
store.gc.interval.ms={{ .Values.storeGcIntervalMs }}

write.ha.enabled={{ .Values.backup.enabled }}
tracing.enabled={{ .Values.otel.enabled }}

## Coordinator Config
rpc.max.bytes.mb={{ .Values.rpcMaxBytesMb }}
Expand Down Expand Up @@ -137,12 +137,13 @@ data:
export OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE=DELTA
export OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION=BASE2_EXPONENTIAL_BUCKET_HISTOGRAM
export OTEL_EXPORTER_OTLP_COMPRESSION=gzip
# export OTEL_EXPORTER_OTLP_COMPRESSION=gzip
{{- end }}
{{- if .Values.uptrace.enabled }}
export UPTRACE_DSN=http://{{ .Values.uptrace.token }}@${{ .Values.uptrace.service }}:14318?grpc=14317
export OTEL_EXPORTER_OTLP_ENDPOINT=http://{{ .Values.uptrace.service }}:14317
export OTEL_EXPORTER_OTLP_HEADERS=uptrace-dsn=http://{{ .Values.uptrace.token }}@${{ .Values.uptrace.service }}:14318?grpc=14317
export OTEL_EXPORTER_OTLP_HEADERS=uptrace-dsn=${UPTRACE_DSN}
{{- end }}
Expand Down
1 change: 1 addition & 0 deletions charts/graphscope-store/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ otel:
registry: docker.io
repository: jaegertracing/all-in-one
tag: "latest"
# https://opentelemetry.io/docs/languages/sdk-configuration/general/#otel_traces_sampler
traces:
sampler:
name: "traceidratio"
Expand Down
24 changes: 19 additions & 5 deletions flex/engines/http_server/handler/hqps_http_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,12 @@ seastar::future<std::unique_ptr<seastar::httpd::reply>> hqps_ic_handler::handle(

return executor_refs_[dst_executor]
.run_graph_db_query(query_param{std::move(req->content)})
.then([this, outer_span = outer_span](auto&& output) {
.then([this
#ifdef HAVE_OPENTELEMETRY_CPP
,
outer_span = outer_span
#endif // HAVE_OPENTELEMETRY_CPP
](auto&& output) {
if (output.content.size() < 4) {
LOG(ERROR) << "Invalid output size: " << output.content.size();
#ifdef HAVE_OPENTELEMETRY_CPP
Expand All @@ -132,7 +137,7 @@ seastar::future<std::unique_ptr<seastar::httpd::reply>> hqps_ic_handler::handle(
outer_span->End();
std::map<std::string, std::string> labels = {{"status", "fail"}};
total_counter_->Add(1, labels);
#endif
#endif // HAVE_OPENTELEMETRY_CPP
return seastar::make_ready_future<query_param>(std::move(output));
}
return seastar::make_ready_future<query_param>(
Expand Down Expand Up @@ -340,16 +345,25 @@ hqps_adhoc_query_handler::handle(const seastar::sstring& path,
param.content.append(gs::Schema::HQPS_ADHOC_PLUGIN_ID_STR, 1);
return executor_refs_[dst_executor]
.run_graph_db_query(query_param{std::move(param.content)})
.then([query_span = query_span,
query_scope = std::move(query_scope)](auto&& output) {
.then([
#ifdef HAVE_OPENTELEMETRY_CPP
query_span = query_span,
query_scope = std::move(query_scope)
#endif // HAVE_OPENTELEMETRY_CPP
](auto&& output) {
#ifdef HAVE_OPENTELEMETRY_CPP
query_span->End();
#endif // HAVE_OPENTELEMETRY_CPP
return seastar::make_ready_future<query_param>(
std::move(output.content));
});
})
.then([this, outer_span = outer_span](auto&& output) {
.then([this
#ifdef HAVE_OPENTELEMETRY_CPP
,
outer_span = outer_span
#endif // HAVE_OPENTELEMETRY_CPP
](auto&& output) {
if (output.content.size() < 4) {
LOG(ERROR) << "Invalid output size: " << output.content.size();
#ifdef HAVE_OPENTELEMETRY_CPP
Expand Down
53 changes: 45 additions & 8 deletions flex/interactive/sdk/generate_sdk.sh
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,54 @@ if [ $# -eq 0 ]; then
fi

function install_generator() {
# first check openapi-generator-cli is executable
CLI_INSTALLED=false
JQ_INSTALLED=false
MVN_INSTALLED=false
# first check openapi-generator-cli exists is executable
if [ -f ~/bin/openapitools/openapi-generator-cli ]; then
echo "openapi-generator-cli is already installed"
export PATH=$PATH:~/bin/openapitools/
fi
if command -v openapi-generator-cli &>/dev/null; then
echo "openapi-generator-cli is already installed"
return
CLI_INSTALLED=true
fi
if ! $CLI_INSTALLED; then
echo "Installing openapi-generator-cli"
mkdir -p ~/bin/openapitools
curl https://raw.githubusercontent.com/OpenAPITools/openapi-generator/master/bin/utils/openapi-generator-cli.sh > ~/bin/openapitools/openapi-generator-cli
chmod u+x ~/bin/openapitools/openapi-generator-cli
export PATH=$PATH:~/bin/openapitools/
export OPENAPI_GENERATOR_VERSION=7.2.0
fi
# on ubuntu apt-get jq on mac brew install jq

if command -v jq &>/dev/null; then
echo "jq is already installed"
JQ_INSTALLED=true
fi
if command -v mvn &>/dev/null; then
echo "maven is already installed"
MVN_INSTALLED=true
fi
if [[ "$(uname -s)" == "Linux" ]]; then
if ! $JQ_INSTALLED; then
sudo apt-get update && sudo apt-get -y install jq
fi
if ! $MVN_INSTALLED; then
sudo apt-get update && sudo apt-get -y install maven
fi
elif [[ "$(uname -s)" == "Darwin" ]]; then
if ! $JQ_INSTALLED; then
brew install jq
fi
if ! $MVN_INSTALLED; then
brew install maven
fi
else
echo "Unsupported OS"
exit 1
fi
mkdir -p ~/bin/openapitools
curl https://raw.githubusercontent.com/OpenAPITools/openapi-generator/master/bin/utils/openapi-generator-cli.sh > ~/bin/openapitools/openapi-generator-cli
chmod u+x ~/bin/openapitools/openapi-generator-cli
export PATH=$PATH:~/bin/openapitools/
export OPENAPI_GENERATOR_VERSION=7.2.0
sudo apt-get update && sudo apt-get -y install jq
}

install_generator
Expand Down
3 changes: 2 additions & 1 deletion interactive_engine/assembly/src/conf/groot/config.template
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ neo4j.bolt.server.disabled=true
pegasus.worker.num=2
pegasus.hosts=localhost:8080

kafka.test.cluster.enable=true
kafka.test.cluster.enable=true
OTEL_SDK_DISABLED=true
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@ public class CommonConfig {

public static final Config<Integer> PARTITION_COUNT = Config.intConfig("partition.count", 1);

public static final Config<Long> METRIC_UPDATE_INTERVAL_MS =
Config.longConfig("metric.update.interval.ms", 5000L);

public static final Config<String> LOG4RS_CONFIG = Config.stringConfig("log4rs.config", "");

public static final Config<String> DISCOVERY_MODE =
Expand All @@ -74,8 +71,6 @@ public class CommonConfig {

public static final Config<Boolean> SECONDARY_INSTANCE_ENABLED =
Config.boolConfig("secondary.instance.enabled", false);
public static final Config<Boolean> TRACING_ENABLED =
Config.boolConfig("tracing.enabled", false);

// Create an extra store pod for each original store pod for backup.
// Only available in multi pod mode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class CoordinatorConfig {
Config.boolConfig("log.recycle.enable", false);

public static final Config<Long> LOG_RECYCLE_INTERVAL_SECOND =
Config.longConfig("log.recycle.interval.second", 600L);
Config.longConfig("log.recycle.interval.second", 3600L);

public static final Config<String> FILE_META_STORE_PATH =
Config.stringConfig("file.meta.store.path", "./meta");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,14 @@ public class StoreConfig {
public static final Config<Long> STORE_QUEUE_WAIT_MS =
Config.longConfig("store.queue.wait.ms", 3000L);

public static final Config<Long> STORE_COMMIT_INTERVAL_MS =
Config.longConfig("store.commit.interval.ms", 1000L);

public static final Config<Boolean> STORE_GC_ENABLE =
Config.boolConfig("store.gc.enable", true);

public static final Config<Long> STORE_GC_INTERVAL_MS =
Config.longConfig("store.gc.interval.ms", 5000L);
Config.longConfig("store.gc.interval.ms", 3600000L);

public static final Config<Long> STORE_CATCHUP_INTERVAL_MS =
Config.longConfig("store.catchup.interval.ms", 30000L);
Config.longConfig("store.catchup.interval.ms", 5000L);

// set by IS_SECONDARY_INSTANCE, used in graph.rs
public static final Config<String> STORE_STORAGE_ENGINE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,31 @@

package com.alibaba.graphscope.common.ir.planner.rules;

import com.alibaba.graphscope.common.ir.meta.schema.GraphOptTable;
import com.alibaba.graphscope.common.ir.rel.graph.GraphLogicalExpand;
import com.alibaba.graphscope.common.ir.rel.graph.GraphLogicalGetV;
import com.alibaba.graphscope.common.ir.rel.graph.GraphLogicalPathExpand;
import com.alibaba.graphscope.common.ir.rel.graph.GraphPhysicalExpand;
import com.alibaba.graphscope.common.ir.rel.graph.GraphPhysicalGetV;
import com.alibaba.graphscope.common.ir.tools.AliasInference;
import com.alibaba.graphscope.common.ir.tools.config.GraphOpt;
import com.alibaba.graphscope.common.ir.type.GraphLabelType;
import com.google.common.collect.ImmutableList;

import org.apache.calcite.plan.GraphOptCluster;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.rules.TransformationRule;
import org.apache.calcite.tools.RelBuilderFactory;
import org.apache.commons.lang3.ObjectUtils;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

// This rule try to fuse GraphLogicalExpand and GraphLogicalGetV if GraphLogicalExpand has no alias
// (that it won't be visited individually later):
// 1. if GraphLogicalGetV has no filters, then:
Expand All @@ -57,7 +64,7 @@ protected RelNode transform(GraphLogicalGetV getV, GraphLogicalExpand expand, Re
&& getV.getOpt().equals(GraphOpt.GetV.START)
|| expand.getOpt().equals(GraphOpt.Expand.BOTH)
&& getV.getOpt().equals(GraphOpt.GetV.OTHER)) {
if (ObjectUtils.isEmpty(getV.getFilters())) {
if (canFuse(getV, expand)) {
GraphPhysicalExpand physicalExpand =
GraphPhysicalExpand.create(
expand.getCluster(),
Expand Down Expand Up @@ -95,6 +102,107 @@ protected RelNode transform(GraphLogicalGetV getV, GraphLogicalExpand expand, Re
}
}

private boolean canFuse(GraphLogicalGetV getV, GraphLogicalExpand expand) {
// If GraphLogicalGetV has filters, then we cannot fuse them directly. Instead, we create a
// EdgeExpand(V) with an Auxilia for the filters.
// If GraphLogicalGetV has no filters, then:
// 1. if we want to expand "person-knows->person", and the type in getV is "person",
// we can fuse them into one EdgeExpand with type "knows" (where "knows" will be given in
// EdgeExpand's QueryParam in PhysicalPlan)
// 2. if we want to expand "person-create->post", while in schema, a "create" actually
// consists of "person-create->post" and "person-create->comment",
// we do not fuse them directly. Instead, we create a EdgeExpand(V) with type "create" and
// an Auxilia with type "post" as the filter.
// 3. if we want to expand "person-islocatedin->city", while in schema, a "islocatedin"
// actually
// consists of "person-islocatedin->city" and "post-islocatedin->country".
// Thought the edge type of "islocatedin" may generate "city" and "country", we can still
// fuse them into a single EdgeExpand(V) with type "islocatedin" directly if we can confirm
// that the expand starts from "person".
// 4. a special case is that, currently, for gremlin query like g.V().out("create"), we have
// not infer precise types for getV yet (getV may contain all vertex types).
// In this case, if getV's types contains all the types that expand will generate, we can
// fuse them.

Set<Integer> edgeExpandedVLabels = new HashSet<>();
// the optTables in expand preserves the full schema information for the edges,
// that is, for edge type "create", it contains both "person-create->post" and
// "person-create->comment", "user-create->post" etc.
List<RelOptTable> optTables = expand.getTableConfig().getTables();
// the edgeParamLabels in expand preserves the inferred schema information for the edges,
// that is, for edge type "create", it contains only "person-create->post" if user queries
// like g.V().hasLabel("person").out("create").hasLabel("post")
GraphLabelType edgeParamType =
com.alibaba.graphscope.common.ir.tools.Utils.getGraphLabels(expand.getRowType());
List<GraphLabelType.Entry> edgeParamLabels = edgeParamType.getLabelsEntry();
GraphOpt.Expand direction = expand.getOpt();
// First, we get all the source vertex types where the edge will be expanded from.
// e.g., expand from "person"
Set<Integer> edgeExpandedSrcVLabels = new HashSet<>();
for (GraphLabelType.Entry edgeLabel : edgeParamLabels) {
switch (direction) {
case OUT:
edgeExpandedSrcVLabels.add(edgeLabel.getSrcLabelId());
break;
case IN:
edgeExpandedSrcVLabels.add(edgeLabel.getDstLabelId());
break;
case BOTH:
edgeExpandedSrcVLabels.add(edgeLabel.getDstLabelId());
edgeExpandedSrcVLabels.add(edgeLabel.getSrcLabelId());
break;
}
}
// Then, we get all the destination vertex types where the edge will be expanded to.
// e.g., expand "likes"
for (RelOptTable optTable : optTables) {
if (optTable instanceof GraphOptTable) {
GraphOptTable graphOptTable = (GraphOptTable) optTable;
List<GraphLabelType.Entry> edgeUserGivenParamLabels =
com.alibaba.graphscope.common.ir.tools.Utils.getGraphLabels(
graphOptTable.getRowType())
.getLabelsEntry();
for (GraphLabelType.Entry edgeLabel : edgeUserGivenParamLabels) {
switch (direction) {
case OUT:
if (edgeExpandedSrcVLabels.contains(edgeLabel.getSrcLabelId())) {
edgeExpandedVLabels.add(edgeLabel.getDstLabelId());
}
break;
case IN:
if (edgeExpandedSrcVLabels.contains(edgeLabel.getDstLabelId())) {
edgeExpandedVLabels.add(edgeLabel.getSrcLabelId());
}
break;
case BOTH:
if (edgeExpandedSrcVLabels.contains(edgeLabel.getSrcLabelId())) {
edgeExpandedVLabels.add(edgeLabel.getDstLabelId());
}
if (edgeExpandedSrcVLabels.contains(edgeLabel.getDstLabelId())) {
edgeExpandedVLabels.add(edgeLabel.getSrcLabelId());
}
break;
}
}
}
}

// Finally, we check if the vertex types in getV to see if the type filter for the expanded
// vertex is necessary.
// e.g., if getV is "post" and expand type is "likes", then we cannot fuse them directly.
// Instead, we should create an EdgeExpand(V) with type "likes" and an Auxilia with type
// "post" as the filter.
List<GraphLabelType.Entry> vertexParamLabels =
com.alibaba.graphscope.common.ir.tools.Utils.getGraphLabels(getV.getRowType())
.getLabelsEntry();
Set<Integer> vertexExpandedVLabels = new HashSet<>();
for (GraphLabelType.Entry vertexLabel : vertexParamLabels) {
vertexExpandedVLabels.add(vertexLabel.getLabelId());
}
return ObjectUtils.isEmpty(getV.getFilters())
&& vertexExpandedVLabels.containsAll(edgeExpandedVLabels);
}

// transform expande + getv to GraphPhysicalExpandGetV
public static class BasicExpandGetVFusionRule
extends ExpandGetVFusionRule<BasicExpandGetVFusionRule.Config> {
Expand Down
Loading

0 comments on commit 4fff52d

Please sign in to comment.