Skip to content

Commit

Permalink
fixing
Browse files Browse the repository at this point in the history
Committed-by: xiaolei.zl from Dev container
  • Loading branch information
zhanglei1949 committed Oct 29, 2024
1 parent 3bbf5ac commit 288dffa
Show file tree
Hide file tree
Showing 18 changed files with 87 additions and 105 deletions.
19 changes: 13 additions & 6 deletions .github/workflows/interactive.yml
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ jobs:
./tests/wal/wal_reader_test localhost:9092 kafka topic_1
# kill the kafka server
kill $(ps aux | grep 'kafka_2.13-3.8.0' | awk '{print $2}')
(kill $(ps aux | grep 'kafka_2.13-3.8.0' | awk '{print $2}')) || true
- name: Test Interactive Server persisting wal on kafka
env:
Expand All @@ -646,12 +646,19 @@ jobs:
echo "interactive_server is not running"
exit 1
fi
curl -X POST http://localhost:10000/v1/graph/current/vertex \
res=`curl -X POST http://localhost:10000/v1/graph/current/vertex \
-d '{"vertex_request": [{"label": "person", "primary_key_value": "1234", "properties": [{"name": "name", "value": "Alice"}, {"name": "age", "value": 20}]}], "edge_request": []}' \
-H "Content-Type: application/json"
-H "Content-Type: application/json"`
# expect success
if [[ $res != *"success"* ]]; then
echo "Failed to write vertex"
exit 1
else
echo "Succeed to write vertex"
fi
# kill the interactive server
kill $(ps aux | grep 'interactive_server' | awk '{print $2}')
(kill $(ps aux | grep 'interactive_server' | awk '{print $2}')) || true
./bin/interactive_server -g ${SCHEMA_FILE} --data-path /tmp/csr-data-dir/ \
-c ../tests/hqps/interactive_config_test.yaml -k localhost:9092 --kafka-topic test_graph --wal-type kafka &
sleep 3
Expand All @@ -667,5 +674,5 @@ jobs:
fi
# kill the servers
kill $(ps aux | grep 'interactive_server' | awk '{print $2}')
kill $(ps aux | grep 'kafka_2.13-3.8.0' | awk '{print $2}')
(kill $(ps aux | grep 'interactive_server' | awk '{print $2}')) || true
(kill $(ps aux | grep 'kafka_2.13-3.8.0' | awk '{print $2}')) || true
6 changes: 5 additions & 1 deletion docs/flex/interactive/development/dev_and_test.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,11 @@ mvn clean package -DskipTests -Pexperimental

Interactive use WAL(Write Ahead Logging) to ensure the data integrity. Two different wal writer is provided with respect to different storage interface: `LocalWalWriter` for writing wals to local disk and `KafkaWalWriter` for persisting wals on kafka.

You could switch the wal writer type in the configuration. See [Configuration](./../configuration.md#service-configuration).
The customization for Wal storage type is current not exposed at user level. In development, you could enable it with following options. If not specified, `LocalWalWriter` is the default option.

```bash
./bin/interactive_server -k localhost:9092 -t kafka ...
```

#### Local Wal Writer

Expand Down
1 change: 0 additions & 1 deletion flex/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ if (BUILD_KAFKA_WAL_WRITER)
if (NOT CppKafka_FOUND)
message(STATUS "cppkafka not found, try to build with third_party/cppkafka")
add_subdirectory(third_party/cppkafka)
# if cppkafka/CMakeLists.txt not exits, tell user to run git submodule update --init --recursive
if (NOT EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cppkafka/CMakeLists.txt)
message(FATAL_ERROR "cppkafka not found, please run git submodule update --init --recursive")
endif ()
Expand Down
6 changes: 3 additions & 3 deletions flex/bin/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ target_link_libraries(stored_procedure_runner flex_rt_mutable_graph flex_utils f
install_without_export_flex_target(stored_procedure_runner)

if (BUILD_KAFKA_WAL_WRITER)
add_executable(wal_consumer wal_consumer.cc)
target_link_libraries(wal_consumer PUBLIC ${CppKafka_LIBRARIES} ${Boost_LIBRARIES} ${GLOG_LIBRARIES} flex_graph_db)
install_without_export_flex_target(wal_consumer)
add_executable(kafka_wal_consumer kafka_wal_consumer.cc)
target_link_libraries(kafka_wal_consumer PUBLIC ${CppKafka_LIBRARIES} ${Boost_LIBRARIES} ${GLOG_LIBRARIES} flex_graph_db)
install_without_export_flex_target(kafka_wal_consumer)
endif()
6 changes: 4 additions & 2 deletions flex/bin/wal_consumer.cc → flex/bin/kafka_wal_consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
#include "flex/engines/graph_db/database/wal.h"
#include "flex/third_party/httplib.h"

/**
* Consume the WAL from Kafka and forward to the Interactive Engine.
*/
namespace gs {

// Give a WAL(in string format), forward to the Interactive Engine, which should
Expand Down Expand Up @@ -62,7 +65,7 @@ class WalSender {
int port_;
httplib::Client client_;
std::string req_url_;
}; // namespace gs
};

} // namespace gs

Expand Down Expand Up @@ -114,7 +117,6 @@ int main(int argc, char** argv) {
LOG(INFO) << "Kafka brokers: " << kafka_brokers;
LOG(INFO) << "engine endpoint: " << engine_endpoint;

// Construct the configuration
cppkafka::Configuration config = {{"metadata.broker.list", kafka_brokers},
{"group.id", group_id},
// Disable auto commit
Expand Down
2 changes: 1 addition & 1 deletion flex/engines/graph_db/database/compact_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void CompactTransaction::Commit() {
header->timestamp = timestamp_;
header->type = 1;

logger_.append(arc_.GetBuffer(), arc_.GetSize());
CHECK(logger_.append(arc_.GetBuffer(), arc_.GetSize())) << "append failed";
arc_.Clear();

LOG(INFO) << "before compact - " << timestamp_;
Expand Down
54 changes: 20 additions & 34 deletions flex/engines/graph_db/database/graph_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -377,17 +377,8 @@ static void IngestWalRange(SessionLocalContext* contexts,
}
}

void GraphDB::ingestWalsFromLocalFiles(const std::string& wal_dir_path,
const std::string& work_dir,
int thread_num) {
if (!std::filesystem::exists(wal_dir_path)) {
std::filesystem::create_directory(wal_dir_path);
}
std::vector<std::string> wals;
for (const auto& entry : std::filesystem::directory_iterator(wal_dir_path)) {
wals.push_back(entry.path().string());
}
LocalWalsParser parser(wals);
void GraphDB::ingestWalsFromIWalsParser(const IWalsParser& parser,
int thread_num) {
uint32_t from_ts = 1;
for (auto& update_wal : parser.update_wals()) {
uint32_t to_ts = update_wal.timestamp;
Expand All @@ -398,8 +389,9 @@ void GraphDB::ingestWalsFromLocalFiles(const std::string& wal_dir_path,
graph_.Compact(update_wal.timestamp);
last_compaction_ts_ = update_wal.timestamp;
} else {
UpdateTransaction::IngestWal(graph_, work_dir, to_ts, update_wal.ptr,
update_wal.size, contexts_[0].allocator);
UpdateTransaction::IngestWal(graph_, work_dir_, update_wal.timestamp,
update_wal.ptr, update_wal.size,
contexts_[0].allocator);
}
from_ts = to_ts + 1;
}
Expand All @@ -410,6 +402,20 @@ void GraphDB::ingestWalsFromLocalFiles(const std::string& wal_dir_path,
version_manager_.init_ts(parser.last_ts(), thread_num);
}

void GraphDB::ingestWalsFromLocalFiles(const std::string& wal_dir_path,
const std::string& work_dir,
int thread_num) {
if (!std::filesystem::exists(wal_dir_path)) {
std::filesystem::create_directory(wal_dir_path);
}
std::vector<std::string> wals;
for (const auto& entry : std::filesystem::directory_iterator(wal_dir_path)) {
wals.push_back(entry.path().string());
}
LocalWalsParser parser(wals);
ingestWalsFromIWalsParser(parser, thread_num);
}

#ifdef BUILD_KAFKA_WAL_WRITER
void GraphDB::ingestWalsFromKafka(const std::string& kafka_brokers,
const std::string& kafka_topic,
Expand All @@ -419,27 +425,7 @@ void GraphDB::ingestWalsFromKafka(const std::string& kafka_brokers,
// Disable auto commit
{"enable.auto.commit", false}};
KafkaWalsParser parser(config, kafka_topic);
uint32_t from_ts = 1;
for (auto& update_wal : parser.update_wals()) {
uint32_t to_ts = update_wal.timestamp;
if (from_ts < to_ts) {
IngestWalRange(contexts_, graph_, parser, from_ts, to_ts, thread_num);
}
if (update_wal.size == 0) {
graph_.Compact(update_wal.timestamp);
last_compaction_ts_ = update_wal.timestamp;
} else {
UpdateTransaction::IngestWal(graph_, work_dir, to_ts, update_wal.ptr,
update_wal.size, contexts_[0].allocator);
}
from_ts = to_ts + 1;
}
if (from_ts <= parser.last_ts()) {
IngestWalRange(contexts_, graph_, parser, from_ts, parser.last_ts() + 1,
thread_num);
}

version_manager_.init_ts(parser.last_ts(), thread_num);
ingestWalsFromIWalsParser(parser, thread_num);
}
#endif

Expand Down
2 changes: 2 additions & 0 deletions flex/engines/graph_db/database/graph_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ class GraphDB {
private:
bool registerApp(const std::string& path, uint8_t index = 0);

void ingestWalsFromIWalsParser(const IWalsParser& parser, int thread_num);

void ingestWalsFromLocalFiles(const std::string& wal_dir,
const std::string& work_dir, int thread_num);

Expand Down
4 changes: 2 additions & 2 deletions flex/engines/graph_db/database/graph_db_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ Result<std::string> GraphDBSession::deserialize_and_apply_insert_wal(
LOG(INFO) << "Applying insert wal with timestamp: " << ts
<< ", length: " << length
<< ", logger type: " << static_cast<int>(logger_.type());
logger_.append(data, length);
CHECK(logger_.append(data, length)) << "Failed to append wal to logger";

InsertTransaction::IngestWal(db_.graph(), ts,
const_cast<char*>(data) + sizeof(WalHeader),
Expand All @@ -296,7 +296,7 @@ Result<std::string> GraphDBSession::deserialize_and_apply_update_wal(
db_.version_manager_.revert_update_timestamp(ts);
return Status(StatusCode::INVALID_ARGUMENT, "Invalid wal timestamp");
}
logger_.append(data, length);
CHECK(logger_.append(data, length)) << "Failed to append wal to logger";
UpdateTransaction::IngestWal(db_.graph(), work_dir_, ts,
const_cast<char*>(data), length, alloc_);
db_.version_manager_.release_update_timestamp(ts);
Expand Down
2 changes: 1 addition & 1 deletion flex/engines/graph_db/database/insert_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ void InsertTransaction::Commit() {
header->type = 0;
header->timestamp = timestamp_;

logger_.append(arc_.GetBuffer(), arc_.GetSize());
CHECK(logger_.append(arc_.GetBuffer(), arc_.GetSize())) << "append failed";
IngestWal(graph_, timestamp_, arc_.GetBuffer() + sizeof(WalHeader),
header->length, alloc_);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ void SingleEdgeInsertTransaction::Commit() {
header->length = arc_.GetSize() - sizeof(WalHeader);
header->type = 0;
header->timestamp = timestamp_;
logger_.append(arc_.GetBuffer(), arc_.GetSize());
CHECK(logger_.append(arc_.GetBuffer(), arc_.GetSize())) << "append failed";

grape::OutArchive arc;
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ void SingleVertexInsertTransaction::Commit() {
header->type = 0;
header->timestamp = timestamp_;

logger_.append(arc_.GetBuffer(), arc_.GetSize());
CHECK(logger_.append(arc_.GetBuffer(), arc_.GetSize())) << "append failed";
ingestWal();

vm_.release_insert_timestamp(timestamp_);
Expand Down
4 changes: 2 additions & 2 deletions flex/engines/graph_db/database/update_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ void UpdateTransaction::Commit() {
header->length = arc_.GetSize() - sizeof(WalHeader);
header->type = 1;
header->timestamp = timestamp_;
logger_.append(arc_.GetBuffer(), arc_.GetSize());
CHECK(logger_.append(arc_.GetBuffer(), arc_.GetSize())) << "append failed";

applyVerticesUpdates();
applyEdgesUpdates();
Expand Down Expand Up @@ -748,7 +748,7 @@ void UpdateTransaction::batch_commit(UpdateBatch& batch) {
header->length = arc.GetSize() - sizeof(WalHeader);
header->type = 1;
header->timestamp = timestamp_;
logger_.append(arc.GetBuffer(), arc.GetSize());
CHECK(logger_.append(arc.GetBuffer(), arc.GetSize())) << "append failed";
}

release();
Expand Down
4 changes: 1 addition & 3 deletions flex/engines/http_server/handler/graph_db_http_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,6 @@ class stored_proc_handler : public StoppableHandler {
auto scope = tracer->WithActiveSpan(outer_span);
auto start_ts = gs::GetCurrentTimeStamp();
#endif // HAVE_OPENTELEMETRY_CPP
LOG(INFO) << "Eval procedure on shard: " << StoppableHandler::shard_id()
<< " executor: " << dst_executor;
return get_executors()[StoppableHandler::shard_id()][dst_executor]
.run_graph_db_query(query_param{std::move(req->content)})
.then([last_byte
Expand Down Expand Up @@ -906,7 +904,7 @@ seastar::future<> graph_db_http_handler::stop_query_actors(size_t index) {
return all_graph_query_handlers_[index]->stop();
})
.then([this, index] {
LOG(INFO) << "Stopped all query actors on shard id: " << index;
LOG(INFO) << "Stopped all wal handlers on shard id: " << index;
return all_wal_handlers_[index]->stop();
})
.then([this, index] {
Expand Down
7 changes: 3 additions & 4 deletions flex/interactive/sdk/examples/python/basic_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def addVertex(sess: Session, graph_id: str):
vertex_request = [
VertexRequest(
label="person",
primary_key_value=12,
primary_key_value=8,
properties=[
ModelProperty(name="name", value="mike"),
ModelProperty(name="age", value=1),
Expand All @@ -156,7 +156,7 @@ def addVertex(sess: Session, graph_id: str):
src_label="person",
dst_label="person",
edge_label="knows",
src_primary_key_value=12,
src_primary_key_value=8,
dst_primary_key_value=1,
properties=[ModelProperty(name="weight", value=7)],
),
Expand Down Expand Up @@ -279,7 +279,6 @@ def addEdge(sess: Session, graph_id: str):
job_id = bulkLoading(sess, graph_id)
waitJobFinish(sess, job_id)
print("bulk loading finished")
graph_id = "3"

# Now start service on the created graph.
resp = sess.start_service(
Expand All @@ -289,7 +288,7 @@ def addEdge(sess: Session, graph_id: str):
time.sleep(5)
print("restart service on graph ", graph_id)

running a simple cypher query
# running a simple cypher query
query = "MATCH (n) RETURN COUNT(n);"
with driver.getNeo4jSession() as session:
resp = session.run(query)
Expand Down
42 changes: 0 additions & 42 deletions flex/tests/hqps/interactive_config_test_2.yaml

This file was deleted.

15 changes: 14 additions & 1 deletion flex/tests/wal/wal_reader_test.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,17 @@

/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "cppkafka/cppkafka.h"
#include "librdkafka/rdkafka.h"

Expand Down
14 changes: 14 additions & 0 deletions flex/tests/wal/wal_writer_test.cc
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "cppkafka/cppkafka.h"
#include "librdkafka/rdkafka.h"

Expand Down

0 comments on commit 288dffa

Please sign in to comment.