Skip to content

Commit

Permalink
feat(interactive): Support secondary instance of groot (#3479)
Browse files Browse the repository at this point in the history
Support open as secondary instance to support more QPS requests and high
availability for read.
  • Loading branch information
siyuan0322 authored Jan 12, 2024
1 parent 982de58 commit f0441bb
Show file tree
Hide file tree
Showing 58 changed files with 809 additions and 485 deletions.
7 changes: 6 additions & 1 deletion charts/graphscope-store-one-pod/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,14 @@ data:
kafka.test.cluster.enable={{ not .Values.kafka.enabled }}
## Zk Config
zk.base.path=/graphscope/groot
zk.base.path={{ .Values.zkBasePath }}
zk.connect.string=ZK_CONNECT
## Secondary config
secondary.instance.enabled={{ .Values.secondary.enabled }}
store.data.secondary.path={{ .Values.secondary.storeDataPath }}
store.gc.interval.ms={{ .Values.storeGcIntervalMs }}
## Extra Config
{{- if .Values.extraConfig }}
{{- $config_list := regexSplit ";" .Values.extraConfig -1 }}
Expand Down
9 changes: 9 additions & 0 deletions charts/graphscope-store-one-pod/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -369,10 +369,15 @@ storeDataPath: "/var/lib/graphscope-store"
storeWriteThreadCount: 1
storeQueueBufferSize: 102400

storeGcIntervalMs: 5000

## Kafka Config
kafkaTopic: "graphscope"
kafkaProducerCustomConfigs: ""

## Zk Config
zkBasePath: "/graphscope/groot"

## Key-value pair separated by ;
## For example extraConfig="k1=v1;k2=v2"
extraConfig: ""
Expand All @@ -385,3 +390,7 @@ pegasus:
worker:
num: 1
timeout: 240000

secondary:
enabled: false
storeDataPath: "./data_secondary"
9 changes: 5 additions & 4 deletions charts/graphscope-store/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@ data:
dns.name.prefix.coordinator=COORDINATOR
dns.name.prefix.store=STORE
executor.graph.port=55556
executor.query.port=55557
executor.engine.port=55558
log4rs.config=LOG4RS_CONFIG
## GAIA Config
Expand All @@ -86,6 +82,11 @@ data:
pegasus.output.capacity=16
pegasus.hosts=PEGASUS_HOSTS
## Secondary config
secondary.instance.enabled={{ .Values.secondary.enabled }}
store.data.secondary.path={{ .Values.secondary.storeDataPath }}
store.gc.interval.ms={{ .Values.storeGcIntervalMs }}
## Extra Config
{{- if .Values.extraConfig }}
{{- $config_list := regexSplit ";" .Values.extraConfig -1 }}
Expand Down
6 changes: 6 additions & 0 deletions charts/graphscope-store/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,8 @@ storeDataPath: "/var/lib/graphscope-store"
storeWriteThreadCount: 1
storeQueueBufferSize: 102400

storeGcIntervalMs: 5000

## Kafka Config
kafkaTopic: "graphscope"
kafkaProducerCustomConfigs: ""
Expand All @@ -535,3 +537,7 @@ pegasus:
worker:
num: 1
timeout: 240000

secondary:
enabled: false
storeDataPath: "./data_secondary"
12 changes: 12 additions & 0 deletions docs/storage_engine/groot.md
Original file line number Diff line number Diff line change
Expand Up @@ -678,3 +678,15 @@ The location of the logging configuration file in the container is:

- configuration file of `logback` is in `/usr/local/groot/conf/logback.xml`
- configuration file of `log4rs` is in `/usr/local/groot/conf/log4rs.yml`

### Secondary Instance

Groot support open secondary instance along with primary instances. It leverages the [Secondary Instance](https://github.com/facebook/rocksdb/wiki/Read-only-and-Secondary-instances) of RocksDB
to provide the ability to serve the querying requests as well as catching up the schema and data updates.

To use it, just set the `secondary.enabled=true` in the helm charts.
Also remember the data path, ZK connect string as well as Kafka endpoint and topic should be as same as the primary instance.
And use a different `zk.base.path` for each secondary instance to avoid conflict with each other when doing node discovery.

`storeGcIntervalMs` controls how often should the secondary perform a `try_catch_up_with_primary` call, default to `5000` which is 5 seconds.

2 changes: 1 addition & 1 deletion interactive_engine/assembly/src/bin/groot/store_ctl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ start_server() {
-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=4
-XX:GCLogFileSize=64m"

export RUST_BACKTRACE=full
java ${java_opt} \
-Dlogback.configurationFile="${GROOT_LOGBACK_FILE}" \
-Dconfig.file="${GROOT_CONF_FILE}" \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,10 @@
public enum RoleType {
UNKNOWN("unknown"),
FRONTEND("frontend"),
FRONTEND_SERVICE("frontend_service"),
INGESTOR("ingestor"),
STORE("store"),
COORDINATOR("coordinator"),
EXECUTOR_GRAPH("executor_graph"),
EXECUTOR_QUERY("executor_query"),
EXECUTOR_MANAGE("executor_manage"),
EXECUTOR_ENGINE("executor_engine"),
FRONTEND_SERVICE("frontend_service"),
GAIA_ENGINE("gaia_engine"),
GAIA_RPC("gaia_rpc");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ public class CommonConfig {

public static final Config<Integer> RPC_PORT = Config.intConfig("rpc.port", 0);

public static final Config<String> GAIA_RPC_PORT = Config.stringConfig("gaia.rpc.port", "");
public static final Config<String> GAIA_ENGINE_PORT =
Config.stringConfig("gaia.engine.port", "");
public static final Config<String> FRONTEND_RPC_PORT =
Config.stringConfig("frontend.rpc.port", "");
public static final Config<String> COORDINATOR_RPC_PORT =
Config.stringConfig("coordinator.rpc.port", "");
public static final Config<String> INGESTOR_RPC_PORT =
Config.stringConfig("ingestor.rpc.port", "");
public static final Config<String> STORE_RPC_PORT = Config.stringConfig("store.rpc.port", "");

public static final Config<Integer> RPC_THREAD_COUNT =
Config.intConfig(
"rpc.thread.count",
Expand Down Expand Up @@ -75,4 +86,7 @@ public class CommonConfig {
// Whether to create test kafka cluster on MaxNode
public static final Config<Boolean> KAFKA_TEST_CLUSTER_ENABLE =
Config.boolConfig("kafka.test.cluster.enable", true);

public static final Config<Boolean> SECONDARY_INSTANCE_ENABLED =
Config.boolConfig("secondary.instance.enabled", false);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class CoordinatorConfig {
Config.longConfig("offsets.persist.interval.ms", 3000L);

public static final Config<Boolean> LOG_RECYCLE_ENABLE =
Config.boolConfig("log.recycle.enable", true);
Config.boolConfig("log.recycle.enable", false);

public static final Config<Long> LOG_RECYCLE_INTERVAL_SECOND =
Config.longConfig("log.recycle.interval.second", 600L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ public class FrontendConfig {

public static final Config<String> AUTH_PASSWORD = Config.stringConfig("auth.password", "");

//
public static final Config<Integer> FRONTEND_SERVICE_PORT =
Config.intConfig("frontend.service.port", 8182);
Config.intConfig("frontend.service.port", 55556);
public static final Config<Integer> GREMLIN_SERVER_PORT =
Config.intConfig("gremlin.server.port", 8182);

public static final Config<Integer> FRONTEND_SERVICE_THREAD_COUNT =
Config.intConfig(
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ public class IngestorConfig {
Config.intConfig("ingestor.sender.operation.max.count", 102400);

public static final Config<Long> INGESTOR_CHECK_PROCESSOR_INTERVAL_MS =
Config.longConfig("ingestor.check.processor.interval.ms", 3000L);
Config.longConfig("ingestor.check.processor.interval.ms", 2000L);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ public class StoreConfig {
public static final Config<Boolean> STORE_GC_ENABLE =
Config.boolConfig("store.gc.enable", true);

public static final Config<Integer> EXECUTOR_GRAPH_PORT =
Config.intConfig("executor.graph.port", 0);

public static final Config<Integer> EXECUTOR_QUERY_PORT =
Config.intConfig("executor.query.port", 0);

public static final Config<Integer> EXECUTOR_ENGINE_PORT =
Config.intConfig("executor.engine.port", 0);
public static final Config<Long> STORE_GC_INTERVAL_MS =
Config.longConfig("store.gc.interval.ms", 5000L);

// set by IS_SECONDARY_INSTANCE, used in graph.rs
public static final Config<String> STORE_STORAGE_ENGINE =
Config.stringConfig("store.storage.engine", "rocksdb");
public static final Config<String> STORE_SECONDARY_DATA_PATH =
Config.stringConfig("store.data.secondary.path", "./data_secondary");
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public QueryLogger(String query, long queryId) {
this.queryId = queryId;
}

public void debug(String format, Object... args) {
defaultLogger.debug(this + " : " + format, args);
}

public void info(String format, Object... args) {
defaultLogger.info(this + " : " + format, args);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,6 @@ protected void processTraversal(
InterOpCollection.process(opCollection);

long jobId = queryLogger.getQueryId();
String jobName = "ir_plan_" + jobId;
IrPlan irPlan = new IrPlan(irMeta, opCollection);
// print script and jobName with ir plan
queryLogger.info("ir plan {}", irPlan.getPlanAsJson());
Expand All @@ -334,6 +333,7 @@ protected void processTraversal(
PegasusClient.JobRequest.newBuilder()
.setPlan(ByteString.copyFrom(physicalPlanBytes))
.build();
String jobName = "ir_plan_" + jobId;
PegasusClient.JobConfig jobConfig =
PegasusClient.JobConfig.newBuilder()
.setJobId(jobId)
Expand All @@ -346,6 +346,7 @@ protected void processTraversal(
.setAll(PegasusClient.Empty.newBuilder().build())
.build();
request = request.toBuilder().setConf(jobConfig).build();

this.rpcClient.submit(request, resultProcessor, timeoutConfig.getChannelTimeoutMS());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ protected AbstractResultProcessor(
RequestMessage msg = writeResult.getRequestMessage();
Settings settings = writeResult.getSettings();
// init batch size from resultIterationBatchSize in conf/gremlin-server.yaml,
// or args in RequestMessage which is originate from gremlin client
// or args in RequestMessage which is originated from gremlin client
this.resultCollectorsBatchSize =
(Integer)
msg.optionalArgs(Tokens.ARGS_BATCH_SIZE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public void createVolumeIfNotExists(Odps odps) throws IOException {
volumeName,
"created by groot data-load-tools",
Volume.Type.OLD,
7L);
1L);
}
} catch (OdpsException e) {
System.out.println(
Expand Down
31 changes: 22 additions & 9 deletions interactive_engine/executor/assembly/groot/src/store/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@ pub extern "C" fn openGraphStore(config_bytes: *const u8, len: usize) -> GraphHa
let buf = unsafe { ::std::slice::from_raw_parts(config_bytes, len) };
let proto = parse_pb::<ConfigPb>(buf).expect("parse config pb failed");
let mut config_builder = GraphConfigBuilder::new();
config_builder.set_storage_engine("rocksdb");
let engine = "rocksdb".to_string();
let engine = proto
.get_configs()
.get("store.storage.engine")
.unwrap_or(&engine);
config_builder.set_storage_engine(engine);
config_builder.set_storage_options(proto.get_configs().clone());
let config = config_builder.build();
let path = config
Expand Down Expand Up @@ -438,14 +443,22 @@ fn delete_edge<G: MultiVersionGraph>(graph: &G, snapshot_id: i64, op: &Operation

#[no_mangle]
pub extern "C" fn garbageCollectSnapshot(ptr: GraphHandle, snapshot_id: i64) -> Box<JnaResponse> {
unsafe {
let graph_store_ptr = &*(ptr as *const GraphStore);
match graph_store_ptr.gc(snapshot_id) {
Ok(_) => JnaResponse::new_success(),
Err(e) => {
let msg = format!("{:?}", e);
JnaResponse::new_error(&msg)
}
let graph_store_ptr = unsafe { &*(ptr as *const GraphStore) };

match graph_store_ptr.try_catch_up_with_primary() {
Ok(_) => (),
Err(e) => {
error!("Error during catch up primary {:?}", e);
}
};
if snapshot_id % 3600 != 0 {
return JnaResponse::new_success();
}
match graph_store_ptr.gc(snapshot_id) {
Ok(_) => JnaResponse::new_success(),
Err(e) => {
let msg = format!("{:?}", e);
JnaResponse::new_error(&msg)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ public void onError(Throwable throwable) {

@Override
public void onCompleted() {
logger.info("finish get job response from one server");
if (counter.decrementAndGet() == 0) {
logger.info("finish get job response from all servers");
processor.finish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl<T: 'static> ResultSink<T> {
info_worker!("Job is canceled");
let msg = "Job is canceled".to_string();
let err = JobExecError::from(msg);
warn_worker!("Job is canceled");
tx.on_error(Box::new(err));
}
_ => (),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl From<pb::edge_expand::Direction> for Direction {
}
}

#[derive(Default, Debug)]
#[derive(Default, Debug, Clone)]
pub struct QueryParams {
pub labels: Vec<LabelId>,
pub limit: Option<usize>,
Expand Down
Loading

0 comments on commit f0441bb

Please sign in to comment.