Skip to content

Commit

Permalink
fix(interactive): Convert Schema from Groot Spec to Flex Spec (#4447)
Browse files Browse the repository at this point in the history
<!--
Thanks for your contribution! please review
https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before
opening an issue.
-->

## What do these changes do?
as titled.

<!-- Please give a short brief about these changes. -->

## Related issue number

<!-- Are there any issues opened that will be resolved by merging this
change? -->

Fixes
  • Loading branch information
shirly121 authored Jan 24, 2025
1 parent a39855e commit 8eaa4b0
Show file tree
Hide file tree
Showing 8 changed files with 280 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.alibaba.graphscope.groot.common.schema.api.EdgeRelation;
import com.alibaba.graphscope.groot.common.schema.api.GraphVertex;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;

public class DefaultEdgeRelation implements EdgeRelation {
private final GraphVertex source;
Expand Down Expand Up @@ -57,4 +58,19 @@ public String toString() {
public long getTableId() {
return this.tableId;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DefaultEdgeRelation that = (DefaultEdgeRelation) o;
return tableId == that.tableId
&& Objects.equal(source, that.source)
&& Objects.equal(target, that.target);
}

@Override
public int hashCode() {
return Objects.hashCode(source, target, tableId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.alibaba.graphscope.groot.common.schema.api.GraphProperty;
import com.alibaba.graphscope.groot.common.schema.wrapper.DataType;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;

public class DefaultGraphProperty implements GraphProperty {
private final int id;
Expand Down Expand Up @@ -71,4 +72,20 @@ public String toString() {
.add("dataType", dataType)
.toString();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DefaultGraphProperty that = (DefaultGraphProperty) o;
return id == that.id
&& Objects.equal(name, that.name)
&& dataType == that.dataType
&& Objects.equal(defaultValue, that.defaultValue);
}

@Override
public int hashCode() {
return Objects.hashCode(id, name, dataType, defaultValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ private RelDataType deriveType(GraphProperty property) {
}
RelDataTypeFactory typeFactory = this.schema.getTypeFactory();
requireNonNull(typeFactory, "typeFactory");
IrDataTypeConvertor<DataType> typeConvertor = new IrDataTypeConvertor.Groot(typeFactory);
IrDataTypeConvertor<DataType> typeConvertor =
new IrDataTypeConvertor.Groot(typeFactory, true);
return typeConvertor.convert(property.getDataType());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,11 @@ public interface IrDataTypeConvertor<T> {

class Groot implements IrDataTypeConvertor<DataType> {
private final RelDataTypeFactory typeFactory;
private final boolean throwsOnFail;

public Groot(RelDataTypeFactory typeFactory) {
public Groot(RelDataTypeFactory typeFactory, boolean throwsOnFail) {
this.typeFactory = typeFactory;
this.throwsOnFail = throwsOnFail;
}

@Override
Expand Down Expand Up @@ -118,10 +120,13 @@ public RelDataType convert(DataType from) {
case BYTES:
case BYTES_LIST:
default:
throw new UnsupportedOperationException(
"convert GrootDataType ["
+ from.name()
+ "] to RelDataType is unsupported yet");
if (throwsOnFail) {
throw new UnsupportedOperationException(
"convert GrootDataType ["
+ from.name()
+ "] to RelDataType is unsupported yet");
}
return typeFactory.createSqlType(SqlTypeName.ANY);
}
}

Expand Down Expand Up @@ -181,8 +186,15 @@ public DataType convert(RelDataType dataFrom) {
}
break;
case UNKNOWN:
return DataType.UNKNOWN;
default:
}
if (throwsOnFail) {
throw new UnsupportedOperationException(
"convert RelDataType ["
+ dataFrom
+ "] to GrootDataType is unsupported yet");
}
return DataType.UNKNOWN;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,33 @@
package com.alibaba.graphscope.common.ir.meta.schema;

import com.alibaba.graphscope.groot.common.schema.impl.DefaultGraphProperty;
import com.google.common.base.Objects;

import org.apache.calcite.rel.type.RelDataType;

public class IrGraphProperty extends DefaultGraphProperty {
private final RelDataType relDataType;

public IrGraphProperty(int id, String name, RelDataType relDataType) {
super(id, name, new IrDataTypeConvertor.Groot(null).convert(relDataType));
super(id, name, new IrDataTypeConvertor.Groot(null, false).convert(relDataType));
this.relDataType = relDataType;
}

public RelDataType getRelDataType() {
return this.relDataType;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
IrGraphProperty that = (IrGraphProperty) o;
return Objects.equal(relDataType, that.relDataType);
}

@Override
public int hashCode() {
return Objects.hashCode(super.hashCode(), relDataType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import com.alibaba.graphscope.groot.common.exception.PropertyNotFoundException;
import com.alibaba.graphscope.groot.common.exception.TypeNotFoundException;
import com.alibaba.graphscope.groot.common.schema.api.*;
import com.google.common.collect.ImmutableMap;

import org.apache.calcite.rel.type.RelDataTypeFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand All @@ -42,14 +45,19 @@ public IrGraphSchema(Configs configs, SchemaInputStream schemaInputStream) throw
schemaInputStream.getInputStream().readAllBytes(), StandardCharsets.UTF_8);
schemaInputStream.getInputStream().close();
SchemaSpec spec = new SchemaSpec(schemaInputStream.getType(), content);
this.graphSchema = spec.convert(new GraphTypeFactoryImpl(configs));
this.specManager = new SchemaSpecManager(this, spec);
RelDataTypeFactory typeFactory = new GraphTypeFactoryImpl(configs);
this.graphSchema = spec.convert(typeFactory);
this.specManager = new SchemaSpecManager(this.graphSchema, false, typeFactory, spec);
}

public IrGraphSchema(GraphSchema graphSchema, boolean isColumnId) {
this.graphSchema = graphSchema;
this.isColumnId = isColumnId;
this.specManager = new SchemaSpecManager(this);
this.specManager =
new SchemaSpecManager(
this.graphSchema,
this.isColumnId,
new GraphTypeFactoryImpl(new Configs(ImmutableMap.of())));
}

public boolean isColumnId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,47 @@

package com.alibaba.graphscope.common.ir.meta.schema;

import com.alibaba.graphscope.groot.common.schema.api.*;
import com.alibaba.graphscope.groot.common.util.IrSchemaParser;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;

import org.checkerframework.checker.nullness.qual.Nullable;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class SchemaSpecManager {
private static final Logger logger = LoggerFactory.getLogger(SchemaSpecManager.class);
private final IrGraphSchema parent;
private final GraphSchema rootSchema;
private final boolean isColumnId;
private final RelDataTypeFactory typeFactory;
private final List<SchemaSpec> specifications;

public SchemaSpecManager(IrGraphSchema parent) {
this.parent = parent;
this.specifications = Lists.newArrayList(convert(null, SchemaSpec.Type.IR_CORE_IN_JSON));
public SchemaSpecManager(
GraphSchema rootSchema, boolean isColumnId, RelDataTypeFactory typeFactory) {
this.rootSchema = rootSchema;
this.isColumnId = isColumnId;
this.typeFactory = typeFactory;
this.specifications = Lists.newArrayList();
}

public SchemaSpecManager(IrGraphSchema parent, SchemaSpec input) {
this.parent = parent;
public SchemaSpecManager(
GraphSchema rootSchema,
boolean isColumnId,
RelDataTypeFactory typeFactory,
SchemaSpec input) {
this.rootSchema = rootSchema;
this.isColumnId = isColumnId;
this.typeFactory = typeFactory;
this.specifications = Lists.newArrayList(input);
}

Expand All @@ -52,62 +68,103 @@ public SchemaSpec getSpec(SchemaSpec.Type type) {
return spec;
}
}
// if not exist, try to create a new JsonSpecification with content converted from others
SchemaSpec newSpec;
List<SchemaSpec.Type> existing = Lists.newArrayList();
for (SchemaSpec spec : specifications) {
if ((newSpec = convert(spec, type)) != null) {
specifications.add(newSpec);
return newSpec;
}
existing.add(spec.getType());
switch (type) {
case IR_CORE_IN_JSON:
newSpec =
new SchemaSpec(
type, IrSchemaParser.getInstance().parse(rootSchema, isColumnId));
break;
case FLEX_IN_JSON:
SchemaSpec yamlSpec = getSpec(SchemaSpec.Type.FLEX_IN_YAML);
Preconditions.checkArgument(
yamlSpec != null,
"cannot get schema specification of type " + SchemaSpec.Type.FLEX_IN_YAML);
Yaml yaml = new Yaml();
Map yamlMap = yaml.load(yamlSpec.getContent());
ObjectMapper mapper = new ObjectMapper();
try {
newSpec = new SchemaSpec(type, mapper.writeValueAsString(yamlMap));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
break;
case FLEX_IN_YAML:
default:
newSpec = convertToFlex(rootSchema);
break;
}
throw new IllegalArgumentException(
"spec type ["
+ type
+ "] cannot be converted from any existing spec types "
+ existing);
this.specifications.add(newSpec);
return newSpec;
}

private @Nullable SchemaSpec convert(@Nullable SchemaSpec source, SchemaSpec.Type target) {
try {
if (source != null && source.getType() == target) {
return source;
}
switch (target) {
case IR_CORE_IN_JSON:
return new SchemaSpec(
target,
IrSchemaParser.getInstance()
.parse(parent.getGraphSchema(), parent.isColumnId()));
case FLEX_IN_JSON:
if (source != null && source.getType() == SchemaSpec.Type.FLEX_IN_YAML) {
Yaml yaml = new Yaml();
Map rootMap = yaml.load(source.getContent());
ObjectMapper mapper = new ObjectMapper();
return new SchemaSpec(target, mapper.writeValueAsString(rootMap));
}
// todo: convert from JSON in IR_CORE to JSON in FLEX
return null;
case FLEX_IN_YAML:
default:
if (source != null && source.getType() == SchemaSpec.Type.FLEX_IN_JSON) {
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(source.getContent());
Map rootMap = mapper.convertValue(rootNode, Map.class);
Yaml yaml = new Yaml();
return new SchemaSpec(target, yaml.dump(rootMap));
}
// todo: convert from JSON in IR_CORE to YAML in FlEX
return null;
}
} catch (Exception e) {
logger.warn(
"can not convert from {} to {} due to some unexpected exception:",
source == null ? null : source.getType(),
target,
e);
return null;
private SchemaSpec convertToFlex(GraphSchema schema) {
List<Map> vertices =
schema.getVertexList().stream()
.map(this::convertVertex)
.collect(Collectors.toList());
List<Map> edges =
schema.getEdgeList().stream().map(this::convertEdge).collect(Collectors.toList());
Map<String, Object> flexMap =
ImmutableMap.of(
"schema", ImmutableMap.of("vertex_types", vertices, "edge_types", edges));
Yaml yaml = new Yaml();
return new SchemaSpec(SchemaSpec.Type.FLEX_IN_YAML, yaml.dump(flexMap));
}

private Map<String, Object> convertVertex(GraphVertex vertex) {
return ImmutableMap.of(
"type_name", vertex.getLabel(),
"type_id", vertex.getLabelId(),
"properties",
vertex.getPropertyList().stream()
.map(this::convertProperty)
.collect(Collectors.toList()),
"primary_keys",
vertex.getPrimaryKeyList().stream()
.map(GraphProperty::getName)
.collect(Collectors.toList()));
}

private Map<String, Object> convertEdge(GraphEdge edge) {
return ImmutableMap.of(
"type_name", edge.getLabel(),
"type_id", edge.getLabelId(),
"vertex_type_pair_relations",
edge.getRelationList().stream()
.map(this::convertRelation)
.collect(Collectors.toList()),
"properties",
edge.getPropertyList().stream()
.map(this::convertProperty)
.collect(Collectors.toList()),
"primary_keys",
edge.getPrimaryKeyList().stream()
.map(GraphProperty::getName)
.collect(Collectors.toList()));
}

private Map<String, Object> convertRelation(EdgeRelation relation) {
return ImmutableMap.of(
"source_vertex", relation.getSource().getLabel(),
"destination_vertex", relation.getTarget().getLabel());
}

private Map<String, Object> convertProperty(GraphProperty property) {
RelDataType propertyType;
if (property instanceof IrGraphProperty) {
propertyType = ((IrGraphProperty) property).getRelDataType();
} else {
propertyType =
(new IrDataTypeConvertor.Groot(typeFactory, false))
.convert(property.getDataType());
}
// convert property type to flex format
IrDataTypeConvertor.Flex flexConvertor = new IrDataTypeConvertor.Flex(typeFactory, false);
GSDataTypeDesc typeDesc = flexConvertor.convert(propertyType);
return ImmutableMap.of(
"property_id", property.getId(),
"property_name", property.getName(),
"property_type", typeDesc.getYamlDesc());
}
}
Loading

0 comments on commit 8eaa4b0

Please sign in to comment.