diff --git a/.github/workflows/interactive.yml b/.github/workflows/interactive.yml index adca7e120104..ad1c01b1963f 100644 --- a/.github/workflows/interactive.yml +++ b/.github/workflows/interactive.yml @@ -623,7 +623,7 @@ jobs: ./tests/wal/wal_reader_test localhost:9092 kafka topic_1 # kill the kafka server - kill $(ps aux | grep 'kafka_2.13-3.8.0' | awk '{print $2}') + (kill $(ps aux | grep 'kafka_2.13-3.8.0' | awk '{print $2}')) || true - name: Test Interactive Server persisting wal on kafka env: @@ -646,12 +646,19 @@ jobs: echo "interactive_server is not running" exit 1 fi - curl -X POST http://localhost:10000/v1/graph/current/vertex \ + res=`curl -X POST http://localhost:10000/v1/graph/current/vertex \ -d '{"vertex_request": [{"label": "person", "primary_key_value": "1234", "properties": [{"name": "name", "value": "Alice"}, {"name": "age", "value": 20}]}], "edge_request": []}' \ - -H "Content-Type: application/json" + -H "Content-Type: application/json"` + # expect success + if [[ $res != *"success"* ]]; then + echo "Failed to write vertex" + exit 1 + else + echo "Succeed to write vertex" + fi # kill the interactive server - kill $(ps aux | grep 'interactive_server' | awk '{print $2}') + (kill $(ps aux | grep 'interactive_server' | awk '{print $2}')) || true ./bin/interactive_server -g ${SCHEMA_FILE} --data-path /tmp/csr-data-dir/ \ -c ../tests/hqps/interactive_config_test.yaml -k localhost:9092 --kafka-topic test_graph --wal-type kafka & sleep 3 @@ -667,5 +674,5 @@ jobs: fi # kill the servers - kill $(ps aux | grep 'interactive_server' | awk '{print $2}') - kill $(ps aux | grep 'kafka_2.13-3.8.0' | awk '{print $2}') \ No newline at end of file + (kill $(ps aux | grep 'interactive_server' | awk '{print $2}')) || true + (kill $(ps aux | grep 'kafka_2.13-3.8.0' | awk '{print $2}')) || true \ No newline at end of file diff --git a/docs/flex/interactive/development/dev_and_test.md b/docs/flex/interactive/development/dev_and_test.md index 8630ec9f14d1..78b819d2003f 100644 --- a/docs/flex/interactive/development/dev_and_test.md +++ b/docs/flex/interactive/development/dev_and_test.md @@ -110,7 +110,11 @@ mvn clean package -DskipTests -Pexperimental Interactive use WAL(Write Ahead Logging) to ensure the data integrity. Two different wal writer is provided with respect to different storage interface: `LocalWalWriter` for writing wals to local disk and `KafkaWalWriter` for persisting wals on kafka. -You could switch the wal writer type in the configuration. See [Configuration](./../configuration.md#service-configuration). +The customization for Wal storage type is current not exposed at user level. In development, you could enable it with following options. If not specified, `LocalWalWriter` is the default option. + +```bash +./bin/interactive_server -k localhost:9092 -t kafka ... +``` #### Local Wal Writer diff --git a/flex/CMakeLists.txt b/flex/CMakeLists.txt index 595bb26d1d49..35925d3690b5 100644 --- a/flex/CMakeLists.txt +++ b/flex/CMakeLists.txt @@ -63,7 +63,6 @@ if (BUILD_KAFKA_WAL_WRITER) if (NOT CppKafka_FOUND) message(STATUS "cppkafka not found, try to build with third_party/cppkafka") add_subdirectory(third_party/cppkafka) - # if cppkafka/CMakeLists.txt not exits, tell user to run git submodule update --init --recursive if (NOT EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cppkafka/CMakeLists.txt) message(FATAL_ERROR "cppkafka not found, please run git submodule update --init --recursive") endif () diff --git a/flex/bin/CMakeLists.txt b/flex/bin/CMakeLists.txt index 1a9c62d79b7b..6bd577cc0a1e 100644 --- a/flex/bin/CMakeLists.txt +++ b/flex/bin/CMakeLists.txt @@ -47,7 +47,7 @@ target_link_libraries(stored_procedure_runner flex_rt_mutable_graph flex_utils f install_without_export_flex_target(stored_procedure_runner) if (BUILD_KAFKA_WAL_WRITER) - add_executable(wal_consumer wal_consumer.cc) - target_link_libraries(wal_consumer PUBLIC ${CppKafka_LIBRARIES} ${Boost_LIBRARIES} ${GLOG_LIBRARIES} flex_graph_db) - install_without_export_flex_target(wal_consumer) + add_executable(kafka_wal_consumer kafka_wal_consumer.cc) + target_link_libraries(kafka_wal_consumer PUBLIC ${CppKafka_LIBRARIES} ${Boost_LIBRARIES} ${GLOG_LIBRARIES} flex_graph_db) + install_without_export_flex_target(kafka_wal_consumer) endif() \ No newline at end of file diff --git a/flex/bin/wal_consumer.cc b/flex/bin/kafka_wal_consumer.cc similarity index 98% rename from flex/bin/wal_consumer.cc rename to flex/bin/kafka_wal_consumer.cc index 79f5d42e0952..6de8ae1ba2f6 100644 --- a/flex/bin/wal_consumer.cc +++ b/flex/bin/kafka_wal_consumer.cc @@ -27,6 +27,9 @@ #include "flex/engines/graph_db/database/wal.h" #include "flex/third_party/httplib.h" +/** + * Consume the WAL from Kafka and forward to the Interactive Engine. + */ namespace gs { // Give a WAL(in string format), forward to the Interactive Engine, which should @@ -62,7 +65,7 @@ class WalSender { int port_; httplib::Client client_; std::string req_url_; -}; // namespace gs +}; } // namespace gs @@ -114,7 +117,6 @@ int main(int argc, char** argv) { LOG(INFO) << "Kafka brokers: " << kafka_brokers; LOG(INFO) << "engine endpoint: " << engine_endpoint; - // Construct the configuration cppkafka::Configuration config = {{"metadata.broker.list", kafka_brokers}, {"group.id", group_id}, // Disable auto commit diff --git a/flex/engines/graph_db/database/compact_transaction.cc b/flex/engines/graph_db/database/compact_transaction.cc index 58183551aaf5..0e979a8cb058 100644 --- a/flex/engines/graph_db/database/compact_transaction.cc +++ b/flex/engines/graph_db/database/compact_transaction.cc @@ -38,7 +38,7 @@ void CompactTransaction::Commit() { header->timestamp = timestamp_; header->type = 1; - logger_.append(arc_.GetBuffer(), arc_.GetSize()); + CHECK(logger_.append(arc_.GetBuffer(), arc_.GetSize())) << "append failed"; arc_.Clear(); LOG(INFO) << "before compact - " << timestamp_; diff --git a/flex/engines/graph_db/database/graph_db.cc b/flex/engines/graph_db/database/graph_db.cc index f9642b113f41..c0b37a1d9c71 100644 --- a/flex/engines/graph_db/database/graph_db.cc +++ b/flex/engines/graph_db/database/graph_db.cc @@ -377,17 +377,8 @@ static void IngestWalRange(SessionLocalContext* contexts, } } -void GraphDB::ingestWalsFromLocalFiles(const std::string& wal_dir_path, - const std::string& work_dir, - int thread_num) { - if (!std::filesystem::exists(wal_dir_path)) { - std::filesystem::create_directory(wal_dir_path); - } - std::vector wals; - for (const auto& entry : std::filesystem::directory_iterator(wal_dir_path)) { - wals.push_back(entry.path().string()); - } - LocalWalsParser parser(wals); +void GraphDB::ingestWalsFromIWalsParser(const IWalsParser& parser, + int thread_num) { uint32_t from_ts = 1; for (auto& update_wal : parser.update_wals()) { uint32_t to_ts = update_wal.timestamp; @@ -398,8 +389,9 @@ void GraphDB::ingestWalsFromLocalFiles(const std::string& wal_dir_path, graph_.Compact(update_wal.timestamp); last_compaction_ts_ = update_wal.timestamp; } else { - UpdateTransaction::IngestWal(graph_, work_dir, to_ts, update_wal.ptr, - update_wal.size, contexts_[0].allocator); + UpdateTransaction::IngestWal(graph_, work_dir_, update_wal.timestamp, + update_wal.ptr, update_wal.size, + contexts_[0].allocator); } from_ts = to_ts + 1; } @@ -410,6 +402,20 @@ void GraphDB::ingestWalsFromLocalFiles(const std::string& wal_dir_path, version_manager_.init_ts(parser.last_ts(), thread_num); } +void GraphDB::ingestWalsFromLocalFiles(const std::string& wal_dir_path, + const std::string& work_dir, + int thread_num) { + if (!std::filesystem::exists(wal_dir_path)) { + std::filesystem::create_directory(wal_dir_path); + } + std::vector wals; + for (const auto& entry : std::filesystem::directory_iterator(wal_dir_path)) { + wals.push_back(entry.path().string()); + } + LocalWalsParser parser(wals); + ingestWalsFromIWalsParser(parser, thread_num); +} + #ifdef BUILD_KAFKA_WAL_WRITER void GraphDB::ingestWalsFromKafka(const std::string& kafka_brokers, const std::string& kafka_topic, @@ -419,27 +425,7 @@ void GraphDB::ingestWalsFromKafka(const std::string& kafka_brokers, // Disable auto commit {"enable.auto.commit", false}}; KafkaWalsParser parser(config, kafka_topic); - uint32_t from_ts = 1; - for (auto& update_wal : parser.update_wals()) { - uint32_t to_ts = update_wal.timestamp; - if (from_ts < to_ts) { - IngestWalRange(contexts_, graph_, parser, from_ts, to_ts, thread_num); - } - if (update_wal.size == 0) { - graph_.Compact(update_wal.timestamp); - last_compaction_ts_ = update_wal.timestamp; - } else { - UpdateTransaction::IngestWal(graph_, work_dir, to_ts, update_wal.ptr, - update_wal.size, contexts_[0].allocator); - } - from_ts = to_ts + 1; - } - if (from_ts <= parser.last_ts()) { - IngestWalRange(contexts_, graph_, parser, from_ts, parser.last_ts() + 1, - thread_num); - } - - version_manager_.init_ts(parser.last_ts(), thread_num); + ingestWalsFromIWalsParser(parser, thread_num); } #endif diff --git a/flex/engines/graph_db/database/graph_db.h b/flex/engines/graph_db/database/graph_db.h index 2910e9acb70f..077be7bce693 100644 --- a/flex/engines/graph_db/database/graph_db.h +++ b/flex/engines/graph_db/database/graph_db.h @@ -167,6 +167,8 @@ class GraphDB { private: bool registerApp(const std::string& path, uint8_t index = 0); + void ingestWalsFromIWalsParser(const IWalsParser& parser, int thread_num); + void ingestWalsFromLocalFiles(const std::string& wal_dir, const std::string& work_dir, int thread_num); diff --git a/flex/engines/graph_db/database/graph_db_session.cc b/flex/engines/graph_db/database/graph_db_session.cc index 8e437dabf8cd..89d3321c55f3 100644 --- a/flex/engines/graph_db/database/graph_db_session.cc +++ b/flex/engines/graph_db/database/graph_db_session.cc @@ -278,7 +278,7 @@ Result GraphDBSession::deserialize_and_apply_insert_wal( LOG(INFO) << "Applying insert wal with timestamp: " << ts << ", length: " << length << ", logger type: " << static_cast(logger_.type()); - logger_.append(data, length); + CHECK(logger_.append(data, length)) << "Failed to append wal to logger"; InsertTransaction::IngestWal(db_.graph(), ts, const_cast(data) + sizeof(WalHeader), @@ -296,7 +296,7 @@ Result GraphDBSession::deserialize_and_apply_update_wal( db_.version_manager_.revert_update_timestamp(ts); return Status(StatusCode::INVALID_ARGUMENT, "Invalid wal timestamp"); } - logger_.append(data, length); + CHECK(logger_.append(data, length)) << "Failed to append wal to logger"; UpdateTransaction::IngestWal(db_.graph(), work_dir_, ts, const_cast(data), length, alloc_); db_.version_manager_.release_update_timestamp(ts); diff --git a/flex/engines/graph_db/database/insert_transaction.cc b/flex/engines/graph_db/database/insert_transaction.cc index 60b93aab7db0..e4500e0d97e0 100644 --- a/flex/engines/graph_db/database/insert_transaction.cc +++ b/flex/engines/graph_db/database/insert_transaction.cc @@ -149,7 +149,7 @@ void InsertTransaction::Commit() { header->type = 0; header->timestamp = timestamp_; - logger_.append(arc_.GetBuffer(), arc_.GetSize()); + CHECK(logger_.append(arc_.GetBuffer(), arc_.GetSize())) << "append failed"; IngestWal(graph_, timestamp_, arc_.GetBuffer() + sizeof(WalHeader), header->length, alloc_); diff --git a/flex/engines/graph_db/database/single_edge_insert_transaction.cc b/flex/engines/graph_db/database/single_edge_insert_transaction.cc index 56330846c40c..309086fb445a 100644 --- a/flex/engines/graph_db/database/single_edge_insert_transaction.cc +++ b/flex/engines/graph_db/database/single_edge_insert_transaction.cc @@ -117,7 +117,7 @@ void SingleEdgeInsertTransaction::Commit() { header->length = arc_.GetSize() - sizeof(WalHeader); header->type = 0; header->timestamp = timestamp_; - logger_.append(arc_.GetBuffer(), arc_.GetSize()); + CHECK(logger_.append(arc_.GetBuffer(), arc_.GetSize())) << "append failed"; grape::OutArchive arc; { diff --git a/flex/engines/graph_db/database/single_vertex_insert_transaction.cc b/flex/engines/graph_db/database/single_vertex_insert_transaction.cc index 5bd313893fc4..66f79dc89e4b 100644 --- a/flex/engines/graph_db/database/single_vertex_insert_transaction.cc +++ b/flex/engines/graph_db/database/single_vertex_insert_transaction.cc @@ -164,7 +164,7 @@ void SingleVertexInsertTransaction::Commit() { header->type = 0; header->timestamp = timestamp_; - logger_.append(arc_.GetBuffer(), arc_.GetSize()); + CHECK(logger_.append(arc_.GetBuffer(), arc_.GetSize())) << "append failed"; ingestWal(); vm_.release_insert_timestamp(timestamp_); diff --git a/flex/engines/graph_db/database/update_transaction.cc b/flex/engines/graph_db/database/update_transaction.cc index d568e1794407..a143663ee28c 100644 --- a/flex/engines/graph_db/database/update_transaction.cc +++ b/flex/engines/graph_db/database/update_transaction.cc @@ -105,7 +105,7 @@ void UpdateTransaction::Commit() { header->length = arc_.GetSize() - sizeof(WalHeader); header->type = 1; header->timestamp = timestamp_; - logger_.append(arc_.GetBuffer(), arc_.GetSize()); + CHECK(logger_.append(arc_.GetBuffer(), arc_.GetSize())) << "append failed"; applyVerticesUpdates(); applyEdgesUpdates(); @@ -748,7 +748,7 @@ void UpdateTransaction::batch_commit(UpdateBatch& batch) { header->length = arc.GetSize() - sizeof(WalHeader); header->type = 1; header->timestamp = timestamp_; - logger_.append(arc.GetBuffer(), arc.GetSize()); + CHECK(logger_.append(arc.GetBuffer(), arc.GetSize())) << "append failed"; } release(); diff --git a/flex/engines/http_server/handler/graph_db_http_handler.cc b/flex/engines/http_server/handler/graph_db_http_handler.cc index 7da34edf608d..a245796dc7f2 100644 --- a/flex/engines/http_server/handler/graph_db_http_handler.cc +++ b/flex/engines/http_server/handler/graph_db_http_handler.cc @@ -338,8 +338,6 @@ class stored_proc_handler : public StoppableHandler { auto scope = tracer->WithActiveSpan(outer_span); auto start_ts = gs::GetCurrentTimeStamp(); #endif // HAVE_OPENTELEMETRY_CPP - LOG(INFO) << "Eval procedure on shard: " << StoppableHandler::shard_id() - << " executor: " << dst_executor; return get_executors()[StoppableHandler::shard_id()][dst_executor] .run_graph_db_query(query_param{std::move(req->content)}) .then([last_byte @@ -906,7 +904,7 @@ seastar::future<> graph_db_http_handler::stop_query_actors(size_t index) { return all_graph_query_handlers_[index]->stop(); }) .then([this, index] { - LOG(INFO) << "Stopped all query actors on shard id: " << index; + LOG(INFO) << "Stopped all wal handlers on shard id: " << index; return all_wal_handlers_[index]->stop(); }) .then([this, index] { diff --git a/flex/interactive/sdk/examples/python/basic_example.py b/flex/interactive/sdk/examples/python/basic_example.py index 5e52792c12e0..f53209a77600 100644 --- a/flex/interactive/sdk/examples/python/basic_example.py +++ b/flex/interactive/sdk/examples/python/basic_example.py @@ -144,7 +144,7 @@ def addVertex(sess: Session, graph_id: str): vertex_request = [ VertexRequest( label="person", - primary_key_value=12, + primary_key_value=8, properties=[ ModelProperty(name="name", value="mike"), ModelProperty(name="age", value=1), @@ -156,7 +156,7 @@ def addVertex(sess: Session, graph_id: str): src_label="person", dst_label="person", edge_label="knows", - src_primary_key_value=12, + src_primary_key_value=8, dst_primary_key_value=1, properties=[ModelProperty(name="weight", value=7)], ), @@ -279,7 +279,6 @@ def addEdge(sess: Session, graph_id: str): job_id = bulkLoading(sess, graph_id) waitJobFinish(sess, job_id) print("bulk loading finished") - graph_id = "3" # Now start service on the created graph. resp = sess.start_service( @@ -289,7 +288,7 @@ def addEdge(sess: Session, graph_id: str): time.sleep(5) print("restart service on graph ", graph_id) - running a simple cypher query + # running a simple cypher query query = "MATCH (n) RETURN COUNT(n);" with driver.getNeo4jSession() as session: resp = session.run(query) diff --git a/flex/tests/hqps/interactive_config_test_2.yaml b/flex/tests/hqps/interactive_config_test_2.yaml deleted file mode 100644 index 34ff4cae8df2..000000000000 --- a/flex/tests/hqps/interactive_config_test_2.yaml +++ /dev/null @@ -1,42 +0,0 @@ -log_level: INFO -verbose_level: 10 -default_graph: modern_graph -compute_engine: - type: hiactor - workers: - - localhost:10001 - thread_num_per_worker: 1 - store: - type: cpp-mcsr - metadata_store: - type: file # file/sqlite/etcd -compiler: - planner: - is_on: true - opt: RBO - rules: - - FilterIntoJoinRule - - FilterMatchRule - - NotMatchToAntiJoinRule - meta: - reader: - schema: - uri: http://localhost:7777/v1/service/status - interval: 1000 # ms - statistics: - uri: http://localhost:7777/v1/graph/%s/statistics - interval: 86400000 # ms - endpoint: - default_listen_address: localhost - bolt_connector: - disabled: false - port: 7687 - gremlin_connector: - disabled: false - port: 8182 - query_timeout: 40000 - gremlin_script_language_name: antlr_gremlin_calcite -http_service: - default_listen_address: localhost - admin_port: 7778 - query_port: 10001 diff --git a/flex/tests/wal/wal_reader_test.cc b/flex/tests/wal/wal_reader_test.cc index 7a62ce25bb2b..98f5f1699129 100644 --- a/flex/tests/wal/wal_reader_test.cc +++ b/flex/tests/wal/wal_reader_test.cc @@ -1,4 +1,17 @@ - +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ #include "cppkafka/cppkafka.h" #include "librdkafka/rdkafka.h" diff --git a/flex/tests/wal/wal_writer_test.cc b/flex/tests/wal/wal_writer_test.cc index 1c74382e3d8f..d849e04a4077 100644 --- a/flex/tests/wal/wal_writer_test.cc +++ b/flex/tests/wal/wal_writer_test.cc @@ -1,3 +1,17 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ #include "cppkafka/cppkafka.h" #include "librdkafka/rdkafka.h"