Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(interactive): Fix memory leak when reopening rocksdb instance #4250

Merged
merged 2 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion analytical_engine/core/launcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include <memory>
#include <string>

#include "boost/process/detail/child_decl.hpp"
#include "boost/process.hpp"
#include "core/flags.h"
#include "grape/worker/comm_spec.h"

Expand Down
69 changes: 69 additions & 0 deletions analytical_engine/core/object/fragment_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "boost/leaf/error.hpp"
#include "boost/leaf/result.hpp"
#include "grape/fragment/immutable_edgecut_fragment.h"
// #include "grape/fragment/immutable_vertexcut_fragment.h"
#include "grape/serialization/in_archive.h"
#include "grape/worker/comm_spec.h"
#include "vineyard/client/client.h"
Expand Down Expand Up @@ -1094,6 +1095,74 @@ class FragmentWrapper<
std::shared_ptr<fragment_t> fragment_;
};

/*
template <typename OID_T, typename VID_T, typename VDATA_T, typename EDATA_T>
class FragmentWrapper<
grape::ImmutableVertexcutFragment<OID_T, VID_T, VDATA_T, EDATA_T>>
: public IFragmentWrapper {
using fragment_t =
grape::ImmutableVertexcutFragment<OID_T, VID_T, VDATA_T, EDATA_T>;

public:
FragmentWrapper(const std::string& id, rpc::graph::GraphDefPb graph_def,
std::shared_ptr<fragment_t> fragment)
: IFragmentWrapper(id),
graph_def_(std::move(graph_def)),
fragment_(std::move(fragment)) {
CHECK_EQ(graph_def_.graph_type(), rpc::graph::IMMUTABLE_EDGECUT);
}

std::shared_ptr<void> fragment() const override {
return std::static_pointer_cast<void>(fragment_);
}

const rpc::graph::GraphDefPb& graph_def() const override {
return graph_def_;
}

rpc::graph::GraphDefPb& mutable_graph_def() override { return graph_def_; }

bl::result<std::shared_ptr<IFragmentWrapper>> CopyGraph(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
const std::string& copy_type) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Cannot copy the ArrowProjectedFragment");
}

bl::result<std::unique_ptr<grape::InArchive>> ReportGraph(
const grape::CommSpec& comm_spec, const rpc::GSParams& params) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Not implemented.");
}

bl::result<std::shared_ptr<IFragmentWrapper>> ToDirected(
const grape::CommSpec& comm_spec,
const std::string& dst_graph_name) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Cannot convert to the directed DynamicProjectedFragment");
}

bl::result<std::shared_ptr<IFragmentWrapper>> ToUndirected(
const grape::CommSpec& comm_spec,
const std::string& dst_graph_name) override {
RETURN_GS_ERROR(
vineyard::ErrorCode::kInvalidOperationError,
"Cannot convert to the undirected DynamicProjectedFragment");
}

bl::result<std::shared_ptr<IFragmentWrapper>> CreateGraphView(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
const std::string& copy_type) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Cannot generate a view over the ArrowProjectedFragment");
}

private:
rpc::graph::GraphDefPb graph_def_;
std::shared_ptr<fragment_t> fragment_;
};
*/

#ifdef NETWORKX
/**
* @brief A specialized FragmentWrapper for DynamicFragment.
Expand Down
1 change: 1 addition & 0 deletions charts/graphscope-store/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ data:
secondary.instance.enabled={{ .Values.secondary.enabled }}
store.data.secondary.path={{ .Values.secondary.storeDataPath }}
store.gc.interval.ms={{ .Values.storeGcIntervalMs }}
store.catchup.interval.ms={{ .Values.storeCatchupIntervalMs }}

write.ha.enabled={{ .Values.backup.enabled }}

Expand Down
5 changes: 3 additions & 2 deletions charts/graphscope-store/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ discoveryMode: "file"

## Coordinator Config
snapshotIncreaseIntervalMs: 1000
offsetsPersistIntervalMs: 3000
offsetsPersistIntervalMs: 1000
fileMetaStorePath: "/etc/groot/my.meta"
logRecycleEnable: true
logRecycleOffsetReserve: 86400
Expand All @@ -538,9 +538,10 @@ storeDataPath: "/var/lib/graphscope-store"
storeDataDownloadPath: "/var/lib/graphscope-store/download"
storeDataSecondaryPath: "/home/graphscope/secondary"
storeWriteThreadCount: 1
storeQueueBufferSize: "1024000"
storeQueueBufferSize: "102400"

storeGcIntervalMs: 5000
storeCatchupIntervalMs: 5000

## Kafka Config
##
Expand Down
1 change: 1 addition & 0 deletions interactive_engine/assembly/src/conf/groot/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
<logger name="org.apache.zookeeper" level="ERROR" />
<logger name="org.apache.kafka" level="ERROR" />
<logger name="kafka" level="ERROR" />
<logger name="io.grpc.netty" level="ERROR" />
<Logger name="MetricLog" level="INFO" additivity="false">
<appender-ref ref="Metric"/>
</Logger>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class CoordinatorConfig {
Config.longConfig("snapshot.increase.interval.ms", 1000L);

public static final Config<Long> OFFSETS_PERSIST_INTERVAL_MS =
Config.longConfig("offsets.persist.interval.ms", 3000L);
Config.longConfig("offsets.persist.interval.ms", 1000L);

public static final Config<Boolean> LOG_RECYCLE_ENABLE =
Config.boolConfig("log.recycle.enable", false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class StoreConfig {
Config.intConfig("store.write.thread.count", 1);

public static final Config<Integer> STORE_QUEUE_BUFFER_SIZE =
Config.intConfig("store.queue.buffer.size", 1024000);
Config.intConfig("store.queue.buffer.size", 102400);

public static final Config<Long> STORE_QUEUE_WAIT_MS =
Config.longConfig("store.queue.wait.ms", 3000L);
Expand Down
40 changes: 20 additions & 20 deletions interactive_engine/executor/assembly/groot/src/store/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ static INIT: Once = Once::new();

#[no_mangle]
pub extern "C" fn openGraphStore(config_bytes: *const u8, len: usize) -> GraphHandle {
trace!("openGraphStore");
debug!("openGraphStore");
let buf = unsafe { ::std::slice::from_raw_parts(config_bytes, len) };
let proto = parse_pb::<ConfigPb>(buf).expect("parse config pb failed");
let mut config_builder = GraphConfigBuilder::new();
Expand Down Expand Up @@ -77,7 +77,7 @@ pub extern "C" fn closeGraphStore(handle: GraphHandle) -> bool {

#[no_mangle]
pub extern "C" fn getGraphDefBlob(ptr: GraphHandle) -> Box<JnaResponse> {
trace!("getGraphDefBlob");
debug!("getGraphDefBlob");
unsafe {
let graph_store_ptr = &*(ptr as *const GraphStore);
match graph_store_ptr.get_graph_def_blob() {
Expand All @@ -100,7 +100,7 @@ pub extern "C" fn getGraphDefBlob(ptr: GraphHandle) -> Box<JnaResponse> {

#[no_mangle]
pub extern "C" fn ingestData(ptr: GraphHandle, path: *const c_char) -> Box<JnaResponse> {
trace!("ingestData");
debug!("ingestData");
unsafe {
let graph_store_ptr = &*(ptr as *const GraphStore);
let slice = CStr::from_ptr(path).to_bytes();
Expand All @@ -119,7 +119,7 @@ pub extern "C" fn ingestData(ptr: GraphHandle, path: *const c_char) -> Box<JnaRe
pub extern "C" fn writeBatch(
ptr: GraphHandle, snapshot_id: i64, data: *const u8, len: usize,
) -> Box<JnaResponse> {
trace!("writeBatch");
debug!("writeBatch");

let graph_store_ptr = unsafe { &*(ptr as *const GraphStore) };
let buf = unsafe { ::std::slice::from_raw_parts(data, len) };
Expand All @@ -139,7 +139,7 @@ pub extern "C" fn writeBatch(

#[no_mangle]
pub extern "C" fn getGraphStatistics(ptr: GraphHandle, snapshot_id: i64) -> Box<JnaResponse> {
trace!("getGraphStatistics");
debug!("getGraphStatistics");
unsafe {
let graph_store_ptr = &*(ptr as *const GraphStore);
match graph_store_ptr.get_graph_statistics_blob(snapshot_id) {
Expand All @@ -163,7 +163,7 @@ pub extern "C" fn getGraphStatistics(ptr: GraphHandle, snapshot_id: i64) -> Box<
fn do_write_batch<G: MultiVersionGraph>(
graph: &G, snapshot_id: SnapshotId, buf: &[u8],
) -> GraphResult<bool> {
trace!("do_write_batch");
debug!("do_write_batch");
let proto = parse_pb::<OperationBatchPb>(buf)?;
let mut has_ddl = false;
let operations = proto.get_operations();
Expand Down Expand Up @@ -241,7 +241,7 @@ fn do_write_batch<G: MultiVersionGraph>(
fn commit_data_load<G: MultiVersionGraph>(
graph: &G, snapshot_id: i64, op: &OperationPb,
) -> GraphResult<bool> {
trace!("commit_data_load");
info!("commit_data_load");
let ddl_operation_pb = parse_pb::<DdlOperationPb>(op.get_dataBytes())?;
let schema_version = ddl_operation_pb.get_schemaVersion();
let commit_data_load_pb = parse_pb::<CommitDataLoadPb>(ddl_operation_pb.get_ddlBlob())?;
Expand All @@ -256,7 +256,7 @@ fn commit_data_load<G: MultiVersionGraph>(
fn prepare_data_load<G: MultiVersionGraph>(
graph: &G, snapshot_id: i64, op: &OperationPb,
) -> GraphResult<bool> {
trace!("prepare_data_load");
info!("prepare_data_load");
let ddl_operation_pb = parse_pb::<DdlOperationPb>(op.get_dataBytes())?;
let schema_version = ddl_operation_pb.get_schemaVersion();
let prepare_data_load_pb = parse_pb::<PrepareDataLoadPb>(ddl_operation_pb.get_ddlBlob())?;
Expand All @@ -269,7 +269,7 @@ fn prepare_data_load<G: MultiVersionGraph>(
fn create_vertex_type<G: MultiVersionGraph>(
graph: &G, snapshot_id: i64, op: &OperationPb,
) -> GraphResult<bool> {
trace!("create_vertex_type");
info!("create_vertex_type");
let ddl_operation_pb = parse_pb::<DdlOperationPb>(op.get_dataBytes())?;
let schema_version = ddl_operation_pb.get_schemaVersion();
let create_vertex_type_pb = parse_pb::<CreateVertexTypePb>(ddl_operation_pb.get_ddlBlob())?;
Expand All @@ -283,7 +283,7 @@ fn create_vertex_type<G: MultiVersionGraph>(
fn add_vertex_type_properties<G: MultiVersionGraph>(
graph: &G, snapshot_id: i64, op: &OperationPb,
) -> GraphResult<bool> {
trace!("add_vertex_type_properties");
info!("add_vertex_type_properties");
let ddl_operation_pb = parse_pb::<DdlOperationPb>(op.get_dataBytes())?;
let schema_version = ddl_operation_pb.get_schemaVersion();
let add_vertex_type_properties_pb =
Expand All @@ -298,7 +298,7 @@ fn add_vertex_type_properties<G: MultiVersionGraph>(
fn drop_vertex_type<G: MultiVersionGraph>(
graph: &G, snapshot_id: i64, op: &OperationPb,
) -> GraphResult<bool> {
trace!("drop_vertex_type");
info!("drop_vertex_type");
let ddl_operation_pb = parse_pb::<DdlOperationPb>(op.get_dataBytes())?;
let schema_version = ddl_operation_pb.get_schemaVersion();
let label_id_pb = parse_pb::<LabelIdPb>(ddl_operation_pb.get_ddlBlob())?;
Expand All @@ -309,7 +309,7 @@ fn drop_vertex_type<G: MultiVersionGraph>(
fn create_edge_type<G: MultiVersionGraph>(
graph: &G, snapshot_id: i64, op: &OperationPb,
) -> GraphResult<bool> {
trace!("create_edge_type");
info!("create_edge_type");
let ddl_operation_pb = parse_pb::<DdlOperationPb>(op.get_dataBytes())?;
let schema_version = ddl_operation_pb.get_schemaVersion();
let typedef_pb = parse_pb::<TypeDefPb>(ddl_operation_pb.get_ddlBlob())?;
Expand All @@ -321,7 +321,7 @@ fn create_edge_type<G: MultiVersionGraph>(
fn add_edge_type_properties<G: MultiVersionGraph>(
graph: &G, snapshot_id: i64, op: &OperationPb,
) -> GraphResult<bool> {
trace!("add_edge_type_properties");
info!("add_edge_type_properties");
let ddl_operation_pb = parse_pb::<DdlOperationPb>(op.get_dataBytes())?;
let schema_version = ddl_operation_pb.get_schemaVersion();
let typedef_pb = parse_pb::<TypeDefPb>(ddl_operation_pb.get_ddlBlob())?;
Expand All @@ -333,7 +333,7 @@ fn add_edge_type_properties<G: MultiVersionGraph>(
fn drop_edge_type<G: MultiVersionGraph>(
graph: &G, snapshot_id: i64, op: &OperationPb,
) -> GraphResult<bool> {
trace!("drop_edge_type");
info!("drop_edge_type");
let ddl_operation_pb = parse_pb::<DdlOperationPb>(op.get_dataBytes())?;
let schema_version = ddl_operation_pb.get_schemaVersion();
let label_id_pb = parse_pb::<LabelIdPb>(ddl_operation_pb.get_ddlBlob())?;
Expand All @@ -342,7 +342,7 @@ fn drop_edge_type<G: MultiVersionGraph>(
}

fn add_edge_kind<G: MultiVersionGraph>(graph: &G, snapshot_id: i64, op: &OperationPb) -> GraphResult<bool> {
trace!("add_edge_kind");
info!("add_edge_kind");
let ddl_operation_pb = parse_pb::<DdlOperationPb>(op.get_dataBytes())?;
let schema_version = ddl_operation_pb.get_schemaVersion();
let add_edge_kind_pb = parse_pb::<AddEdgeKindPb>(ddl_operation_pb.get_ddlBlob())?;
Expand All @@ -355,7 +355,7 @@ fn add_edge_kind<G: MultiVersionGraph>(graph: &G, snapshot_id: i64, op: &Operati
fn remove_edge_kind<G: MultiVersionGraph>(
graph: &G, snapshot_id: i64, op: &OperationPb,
) -> GraphResult<bool> {
trace!("remove_edge_kind");
info!("remove_edge_kind");
let ddl_operation_pb = parse_pb::<DdlOperationPb>(op.get_dataBytes())?;
let schema_version = ddl_operation_pb.get_schemaVersion();
let edge_kind_pb = parse_pb::<EdgeKindPb>(ddl_operation_pb.get_ddlBlob())?;
Expand All @@ -366,7 +366,7 @@ fn remove_edge_kind<G: MultiVersionGraph>(
fn overwrite_vertex<G: MultiVersionGraph>(
graph: &G, snapshot_id: i64, op: &OperationPb,
) -> GraphResult<()> {
trace!("overwrite_vertex");
debug!("overwrite_vertex");
let data_operation_pb = parse_pb::<DataOperationPb>(op.get_dataBytes())?;

let vertex_id_pb = parse_pb::<VertexIdPb>(data_operation_pb.get_keyBlob())?;
Expand All @@ -380,7 +380,7 @@ fn overwrite_vertex<G: MultiVersionGraph>(
}

fn update_vertex<G: MultiVersionGraph>(graph: &G, snapshot_id: i64, op: &OperationPb) -> GraphResult<()> {
trace!("update_vertex");
debug!("update_vertex");
let data_operation_pb = parse_pb::<DataOperationPb>(op.get_dataBytes())?;

let vertex_id_pb = parse_pb::<VertexIdPb>(data_operation_pb.get_keyBlob())?;
Expand All @@ -396,7 +396,7 @@ fn update_vertex<G: MultiVersionGraph>(graph: &G, snapshot_id: i64, op: &Operati
fn clear_vertex_properties<G: MultiVersionGraph>(
graph: &G, snapshot_id: i64, op: &OperationPb,
) -> GraphResult<()> {
trace!("clear vertex properties");
debug!("clear vertex properties");
let data_operation_pb = parse_pb::<DataOperationPb>(op.get_dataBytes())?;

let vertex_id_pb = parse_pb::<VertexIdPb>(data_operation_pb.get_keyBlob())?;
Expand All @@ -409,7 +409,7 @@ fn clear_vertex_properties<G: MultiVersionGraph>(
}

fn delete_vertex<G: MultiVersionGraph>(graph: &G, snapshot_id: i64, op: &OperationPb) -> GraphResult<()> {
trace!("delete_vertex");
debug!("delete_vertex");
let data_operation_pb = parse_pb::<DataOperationPb>(op.get_dataBytes())?;

let vertex_id_pb = parse_pb::<VertexIdPb>(data_operation_pb.get_keyBlob())?;
Expand Down
2 changes: 2 additions & 0 deletions interactive_engine/executor/store/groot/src/db/graph/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ impl Meta {
}

pub fn recover(&self) -> GraphResult<(VertexTypeManager, EdgeTypeManager)> {
debug!("meta graph recover start");
{
let mut graph_def_val = self.graph_def_lock.lock()?;
*graph_def_val = GraphDef::new(
Expand Down Expand Up @@ -237,6 +238,7 @@ impl Meta {
}
}
}
debug!("meta graph recovered");
Ok((vertex_manager_builder.build(), edge_manager_builder.build()))
}

Expand Down
Loading
Loading