diff --git a/.github/workflows/interactive.yml b/.github/workflows/interactive.yml index ac1cbe04e55d..63b124469804 100644 --- a/.github/workflows/interactive.yml +++ b/.github/workflows/interactive.yml @@ -568,8 +568,8 @@ jobs: # write to the kafka brokers : localhost:9092 cd ${GITHUB_WORKSPACE}/flex/build/ - ./tests/wal_writer/wal_writer_test localhost:9092 kafka topic_1 1 10 - ./bin/wal_consumer -b localhost:9092 -t topic_1 + ./tests/wal/wal_writer_test localhost:9092 kafka topic_1 1 10 + ./tests/wal/wal_reader_test localhost:9092 kafka topic_1 1 # kill the kafka server kill $(ps aux | grep 'kafka_2.13-3.8.0' | awk '{print $2}') diff --git a/docs/flex/interactive/development/dev_and_test.md b/docs/flex/interactive/development/dev_and_test.md index 622c6c1fbffc..4feda4c916f1 100644 --- a/docs/flex/interactive/development/dev_and_test.md +++ b/docs/flex/interactive/development/dev_and_test.md @@ -124,8 +124,12 @@ You need to deploy a kafka cluster first. For details, please refer to [Kafka Do ##### Settings +We Compare the performance of `LocalWalWriter` and `KafkaWalWriter` on a host with + ##### Producing Wals + + ##### Consuming Wals ## Testing diff --git a/flex/tests/CMakeLists.txt b/flex/tests/CMakeLists.txt index d486539a1534..705f2d095b7f 100644 --- a/flex/tests/CMakeLists.txt +++ b/flex/tests/CMakeLists.txt @@ -1,5 +1,5 @@ add_subdirectory(hqps) add_subdirectory(rt_mutable_graph) if (BUILD_KAFKA_WAL_WRITER) - add_subdirectory(wal_writer) + add_subdirectory(wal) endif() \ No newline at end of file diff --git a/flex/tests/wal/CMakeLists.txt b/flex/tests/wal/CMakeLists.txt new file mode 100644 index 000000000000..de64fea926bf --- /dev/null +++ b/flex/tests/wal/CMakeLists.txt @@ -0,0 +1,5 @@ +add_executable(wal_writer_test ${CMAKE_CURRENT_SOURCE_DIR}/wal_writer_test.cc) +target_link_libraries(wal_writer_test PUBLIC ${CppKafka_LIBRARIES} ${LIBGRAPELITE_LIBRARIES} flex_graph_db) + +add_executable(wal_reader_test ${CMAKE_CURRENT_SOURCE_DIR}/wal_reader_test.cc) +target_link_libraries(wal_reader_test PUBLIC ${CppKafka_LIBRARIES} ${LIBGRAPELITE_LIBRARIES} flex_graph_db) \ No newline at end of file diff --git a/flex/tests/wal/wal_reader_test.cc b/flex/tests/wal/wal_reader_test.cc new file mode 100644 index 000000000000..938e26ba9895 --- /dev/null +++ b/flex/tests/wal/wal_reader_test.cc @@ -0,0 +1,48 @@ + +#include "cppkafka/cppkafka.h" +#include "librdkafka/rdkafka.h" + +#include +#include +#include "flex/engines/graph_db/database/wal.h" +#include "grape/grape.h" + +int main(int argc, char** argv) { + if (argc != 5) { + std::cerr << "Usage: " << argv[0] + << " " + << std::endl; + return 1; + } + std::string brokers = argv[1]; + std::string type = argv[2]; + std::string topic_name = argv[3]; + int thread_num = std::stoi(argv[4]); + + double t = -grape::GetCurrentTime(); + + if (type == "local") { + std::cout << "Consuming message from directory " << topic_name + << " thread num " << thread_num; + std::vector wals; + for (const auto& entry : std::filesystem::directory_iterator(topic_name)) { + wals.push_back(entry.path().string()); + } + std::unique_ptr parser = + std::make_unique(wals); + } else { + std::cout << "Consuming message from topic " << topic_name << " thread num " + << thread_num; + cppkafka::Configuration config = {{"metadata.broker.list", brokers}, + {"group.id", "primary_group"}, + // Disable auto commit + {"enable.auto.commit", false}}; + std::unique_ptr parser = + std::make_unique(config, topic_name); + } + + t += grape::GetCurrentTime(); + std::cout << "Consuming message took " << t << " seconds" << std::endl; + + return 0; +} \ No newline at end of file diff --git a/flex/tests/wal_writer/wal_writer_test.cc b/flex/tests/wal/wal_writer_test.cc similarity index 97% rename from flex/tests/wal_writer/wal_writer_test.cc rename to flex/tests/wal/wal_writer_test.cc index 0ea87fae2cdd..d8108506e0a8 100644 --- a/flex/tests/wal_writer/wal_writer_test.cc +++ b/flex/tests/wal/wal_writer_test.cc @@ -37,9 +37,8 @@ void test_local_wal_writer(const std::string& topic_name, int thread_num, kafka_writers.emplace_back(new gs::LocalWalWriter()); } for (int i = 0; i < thread_num; ++i) { - std::string dst_path = "/tmp/"; // check whether files exist - kafka_writers[i]->open(dst_path, i); + kafka_writers[i]->open(topic_name, i); } run(kafka_writers, payload, message_cnt); } diff --git a/flex/tests/wal_writer/CMakeLists.txt b/flex/tests/wal_writer/CMakeLists.txt deleted file mode 100644 index 2a3a629d83e8..000000000000 --- a/flex/tests/wal_writer/CMakeLists.txt +++ /dev/null @@ -1,2 +0,0 @@ -add_executable(wal_writer_test ${CMAKE_CURRENT_SOURCE_DIR}/wal_writer_test.cc) -target_link_libraries(wal_writer_test PUBLIC ${CppKafka_LIBRARIES} ${LIBGRAPELITE_LIBRARIES} flex_graph_db) \ No newline at end of file