diff --git a/flex/bin/bulk_loader.cc b/flex/bin/bulk_loader.cc index 0d74d4b6cb97..6157640df3c9 100644 --- a/flex/bin/bulk_loader.cc +++ b/flex/bin/bulk_loader.cc @@ -37,7 +37,7 @@ void signal_handler(int signal) { << ",Clearing directory: " << work_dir << ", exiting..."; // remove all files in work_dir std::filesystem::remove_all(work_dir); - exit(0); + exit(signal); } else { LOG(ERROR) << "Received unexpected signal " << signal << ", exiting..."; exit(1); diff --git a/flex/engines/http_server/service/hqps_service.cc b/flex/engines/http_server/service/hqps_service.cc index 342db5fb34e7..397caceebbc8 100644 --- a/flex/engines/http_server/service/hqps_service.cc +++ b/flex/engines/http_server/service/hqps_service.cc @@ -99,7 +99,9 @@ void HQPSService::init(const ServiceConfig& config) { } } if (config.start_compiler) { - start_compiler_subprocess(); + if (!start_compiler_subprocess()) { + LOG(FATAL) << "Failed to start compiler subprocess! exiting..."; + } } start_time_.store(gs::GetCurrentTimeStamp()); } diff --git a/flex/interactive/sdk/python/test/test_driver.py b/flex/interactive/sdk/python/test/test_driver.py index 456f08541e26..eec2d8fb69df 100644 --- a/flex/interactive/sdk/python/test/test_driver.py +++ b/flex/interactive/sdk/python/test/test_driver.py @@ -62,7 +62,7 @@ def test_example(self): self._graph_id = self.createGraph() self.bulkLoading() self.bulkLoadingUploading() - self.waitJobFinish() + self.bulkLoadingFailure() self.list_graph() self.runCypherQuery() self.runGremlinQuery() @@ -157,7 +157,8 @@ def bulkLoading(self): ) resp = self._sess.bulk_loading(self._graph_id, schema_mapping) assert resp.is_ok() - self._job_id = resp.get_value().job_id + job_id = resp.get_value().job_id + assert self.waitJobFinish(job_id) def bulkLoadingUploading(self): @@ -191,22 +192,59 @@ def bulkLoadingUploading(self): ) resp = self._sess.bulk_loading(self._graph_id, schema_mapping) assert resp.is_ok() - self._job_id = resp.get_value().job_id + job_id = resp.get_value().job_id + assert self.waitJobFinish(job_id) - def waitJobFinish(self): - assert self._job_id is not None + def waitJobFinish(self, job_id: str): + assert job_id is not None while True: - resp = self._sess.get_job(self._job_id) + resp = self._sess.get_job(job_id) assert resp.is_ok() status = resp.get_value().status print("job status: ", status) if status == "SUCCESS": - break + return True elif status == "FAILED": - raise Exception("job failed") + return False else: time.sleep(1) - print("job finished") + + def bulkLoadingFailure(self): + """ + Submit a bulk loading job with invalid data, and expect the job to fail. + """ + assert os.environ.get("FLEX_DATA_DIR") is not None + person_csv_path = os.path.join(os.environ.get("FLEX_DATA_DIR"), "person.csv") + knows_csv_path = os.path.join( + os.environ.get("FLEX_DATA_DIR"), "person_knows_person.csv" + ) + print("test bulk loading: ", self._graph_id) + schema_mapping = SchemaMapping( + loading_config=SchemaMappingLoadingConfig( + import_option="init", + format=SchemaMappingLoadingConfigFormat(type="csv"), + ), + vertex_mappings=[ + # Intentionally use the wrong file for the vertex mapping + VertexMapping(type_name="person", inputs=[knows_csv_path]) + ], + edge_mappings=[ + EdgeMapping( + type_triplet=EdgeMappingTypeTriplet( + edge="knows", + source_vertex="person", + destination_vertex="person", + ), + # Intentionally use the wrong file for the edge mapping + inputs=[person_csv_path], + ) + ], + ) + resp = self._sess.bulk_loading(self._graph_id, schema_mapping) + assert resp.is_ok() + job_id = resp.get_value().job_id + # Expect to fail + assert self.waitJobFinish(job_id) == False def list_graph(self): resp = self._sess.list_graphs() diff --git a/flex/tests/interactive/test_call_proc.py b/flex/tests/interactive/test_call_proc.py index d6e82c016a79..d173d222a70e 100644 --- a/flex/tests/interactive/test_call_proc.py +++ b/flex/tests/interactive/test_call_proc.py @@ -104,14 +104,21 @@ def callProcedureWithEncoder(self, graph_id : str): print("call count_vertex_num failed: ", resp.get_status_message()) exit(1) - # plus_one, should be with id 3 - # construct a byte array with bytes: the 4 bytes of integer 1, and a byte 3 - byte_string = bytes([0,0,0,0,2]) # 4 bytes of integer 1, and a byte 3 + # plus_one, should be with id 2 + # construct a byte array with bytes: the 4 bytes of integer 1, and a byte 2 + value = 1 + byte_string = value.to_bytes(4, byteorder=sys.byteorder) + bytes([2]) + # byte_string = bytes([1,0,0,0,2]) # 4 bytes of integer 1, and a byte 3 params = byte_string.decode('utf-8') resp = self._sess.call_procedure_raw(graph_id, params) if not resp.is_ok(): print("call plus_one failed: ", resp.get_status_message()) exit(1) + res = resp.get_value() + assert len(res) == 4 + # the four byte represent a integer + res = int.from_bytes(res, byteorder=sys.byteorder) + assert(res == 2) if __name__ == "__main__": #parse command line args