Skip to content

Commit

Permalink
feat(interactive): support string as edge property type (#3403)
Browse files Browse the repository at this point in the history
Support string type property on edge. 

Fix #3405
  • Loading branch information
liulx20 authored Dec 7, 2023
1 parent c19808d commit 01e9c8f
Show file tree
Hide file tree
Showing 17 changed files with 1,517 additions and 249 deletions.
11 changes: 11 additions & 0 deletions .github/workflows/flex.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,17 @@ jobs:
run: |
git clone -b master --single-branch --depth=1 https://github.com/GraphScope/gstest.git ${GS_TEST_DIR}
- name: Test String edge property on modern graph
env:
FLEX_DATA_DIR: ${{ github.workspace }}/flex/interactive/examples/modern_graph/
run: |
rm -rf /tmp/csr-data-dir/
cd ${GITHUB_WORKSPACE}/flex/build/
SCHEMA_FILE=../tests/rt_mutable_graph/modern_graph_string_edge.yaml
BULK_LOAD_FILE=../interactive/examples/modern_graph/bulk_load.yaml
GLOG_v=10 ./bin/bulk_loader -g ${SCHEMA_FILE} -l ${BULK_LOAD_FILE} -d /tmp/csr-data-dir/
GLOG_v=10 ./tests/rt_mutable_graph/string_edge_property_test ${SCHEMA_FILE} /tmp/csr-data-dir/
- name: Test Graph Loading on modern graph
env:
FLEX_DATA_DIR: ${{ github.workspace }}/flex/interactive/examples/modern_graph/
Expand Down
4 changes: 3 additions & 1 deletion flex/engines/graph_db/database/graph_db_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ Result<std::vector<char>> GraphDBSession::Eval(const std::string& input) {

LOG(INFO) << "[Query-" << (int) type << "][Thread-" << thread_id_
<< "] retry - " << i << " / " << MAX_RETRY;
std::this_thread::sleep_for(std::chrono::milliseconds(1));
if (i + 1 < MAX_RETRY) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}

decoder.reset(str_data, str_len);
result_buffer.clear();
Expand Down
2 changes: 1 addition & 1 deletion flex/engines/graph_db/database/graph_db_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void put_argment(gs::Encoder& encoder, const query::Argument& argment);

class GraphDBSession {
public:
static constexpr int32_t MAX_RETRY = 4;
static constexpr int32_t MAX_RETRY = 3;
GraphDBSession(GraphDB& db, Allocator& alloc, WalWriter& logger,
const std::string& work_dir, int thread_id)
: db_(db),
Expand Down
19 changes: 10 additions & 9 deletions flex/engines/graph_db/database/read_transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,28 @@ namespace gs {

class MutablePropertyFragment;
class VersionManager;

template <typename EDATA_T>
class AdjListView {
class nbr_iterator {
using nbr_t = MutableNbr<EDATA_T>;
using const_nbr_t = typename MutableNbrSlice<EDATA_T>::const_nbr_t;
using const_nbr_ptr_t = typename MutableNbrSlice<EDATA_T>::const_nbr_ptr_t;

public:
nbr_iterator(const nbr_t* ptr, const nbr_t* end, timestamp_t timestamp)
nbr_iterator(const_nbr_ptr_t ptr, const_nbr_ptr_t end,
timestamp_t timestamp)
: ptr_(ptr), end_(end), timestamp_(timestamp) {
while (ptr_ != end_ && ptr_->timestamp > timestamp_) {
while (ptr_ != end_ && ptr_->get_timestamp() > timestamp_) {
++ptr_;
}
}

const nbr_t& operator*() const { return *ptr_; }
const_nbr_t& operator*() const { return *ptr_; }

const nbr_t* operator->() const { return ptr_; }
const_nbr_ptr_t operator->() const { return ptr_; }

nbr_iterator& operator++() {
++ptr_;
while (ptr_ != end_ && ptr_->timestamp > timestamp_) {
while (ptr_ != end_ && ptr_->get_timestamp() > timestamp_) {
++ptr_;
}
return *this;
Expand All @@ -62,8 +63,8 @@ class AdjListView {
}

private:
const nbr_t* ptr_;
const nbr_t* end_;
const_nbr_ptr_t ptr_;
const_nbr_ptr_t end_;
timestamp_t timestamp_;
};

Expand Down
56 changes: 7 additions & 49 deletions flex/engines/graph_db/database/update_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -708,35 +708,8 @@ void UpdateTransaction::batch_commit(UpdateBatch& batch) {
bool dst_flag = graph_.get_lid(dst_label, dst, dst_lid);

if (src_flag && dst_flag) {
auto oe = graph_.get_outgoing_edges_mut(src_label, src_lid, dst_label,
edge_label);
while (oe != nullptr && oe->is_valid()) {
if (oe->get_neighbor() == dst_lid) {
oe->set_data(prop, timestamp_);
src_flag = false;
break;
}
oe->next();
}
auto ie = graph_.get_incoming_edges_mut(dst_label, dst_lid, src_label,
edge_label);
while (ie != nullptr && ie->is_valid()) {
if (ie->get_neighbor() == src_lid) {
dst_flag = false;
ie->set_data(prop, timestamp_);
break;
}
ie->next();
}
if ((!src_flag) || (!dst_flag)) {
} else {
grape::InArchive arc;
arc << prop;

grape::OutArchive out_arc(std::move(arc));
graph_.IngestEdge(src_label, src_lid, dst_label, dst_lid, edge_label,
timestamp_, out_arc, alloc_);
}
graph_.UpdateEdge(src_label, src_lid, dst_label, dst_lid, edge_label,
timestamp_, prop, alloc_);
}
}
auto& arc = batch.GetArc();
Expand Down Expand Up @@ -822,9 +795,6 @@ void UpdateTransaction::applyEdgesUpdates() {
}
}

MutableCsrBase* csr =
graph_.get_oe_csr(src_label, dst_label, edge_label);

for (auto& pair : added_edges_[oe_csr_index]) {
vid_t v = pair.first;
auto& add_list = pair.second;
Expand All @@ -839,7 +809,11 @@ void UpdateTransaction::applyEdgesUpdates() {
continue;
auto u = add_list[idx];
auto value = edge_data.at(u).first;
csr->put_generic_edge(v, u, value, timestamp_, alloc_);
grape::InArchive iarc;
serialize_field(iarc, value);
grape::OutArchive oarc(std::move(iarc));
graph_.IngestEdge(src_label, v, dst_label, u, edge_label,
timestamp_, oarc, alloc_);
}
}
}
Expand Down Expand Up @@ -875,22 +849,6 @@ void UpdateTransaction::applyEdgesUpdates() {
}
}
}

MutableCsrBase* csr =
graph_.get_ie_csr(dst_label, src_label, edge_label);

for (auto& pair : added_edges_[ie_csr_index]) {
vid_t v = pair.first;
auto& add_list = pair.second;
if (add_list.empty()) {
continue;
}
auto& edge_data = updated_edge_data_[ie_csr_index].at(v);
for (auto u : add_list) {
auto value = edge_data.at(u).first;
csr->put_generic_edge(v, u, value, timestamp_, alloc_);
}
}
}
}
}
Expand Down
Loading

0 comments on commit 01e9c8f

Please sign in to comment.