diff --git a/charts/graphscope-store-one-pod/templates/configmap.yaml b/charts/graphscope-store-one-pod/templates/configmap.yaml index ea2a9168d074..5bc4310f5826 100644 --- a/charts/graphscope-store-one-pod/templates/configmap.yaml +++ b/charts/graphscope-store-one-pod/templates/configmap.yaml @@ -77,9 +77,14 @@ data: kafka.test.cluster.enable={{ not .Values.kafka.enabled }} ## Zk Config - zk.base.path=/graphscope/groot + zk.base.path={{ .Values.zkBasePath }} zk.connect.string=ZK_CONNECT + ## Secondary config + secondary.instance.enabled={{ .Values.secondary.enabled }} + store.data.secondary.path={{ .Values.secondary.storeDataPath }} + store.gc.interval.ms={{ .Values.storeGcIntervalMs }} + ## Extra Config {{- if .Values.extraConfig }} {{- $config_list := regexSplit ";" .Values.extraConfig -1 }} diff --git a/charts/graphscope-store-one-pod/values.yaml b/charts/graphscope-store-one-pod/values.yaml index ddbfa16a9029..119ffd133500 100644 --- a/charts/graphscope-store-one-pod/values.yaml +++ b/charts/graphscope-store-one-pod/values.yaml @@ -369,10 +369,15 @@ storeDataPath: "/var/lib/graphscope-store" storeWriteThreadCount: 1 storeQueueBufferSize: 102400 +storeGcIntervalMs: 5000 + ## Kafka Config kafkaTopic: "graphscope" kafkaProducerCustomConfigs: "" +## Zk Config +zkBasePath: "/graphscope/groot" + ## Key-value pair separated by ; ## For example extraConfig="k1=v1;k2=v2" extraConfig: "" @@ -385,3 +390,7 @@ pegasus: worker: num: 1 timeout: 240000 + +secondary: + enabled: false + storeDataPath: "./data_secondary" diff --git a/charts/graphscope-store/templates/configmap.yaml b/charts/graphscope-store/templates/configmap.yaml index 291c618863c2..6d1351c462a7 100644 --- a/charts/graphscope-store/templates/configmap.yaml +++ b/charts/graphscope-store/templates/configmap.yaml @@ -65,10 +65,6 @@ data: dns.name.prefix.coordinator=COORDINATOR dns.name.prefix.store=STORE - executor.graph.port=55556 - executor.query.port=55557 - executor.engine.port=55558 - log4rs.config=LOG4RS_CONFIG ## GAIA Config @@ -86,6 +82,11 @@ data: pegasus.output.capacity=16 pegasus.hosts=PEGASUS_HOSTS + ## Secondary config + secondary.instance.enabled={{ .Values.secondary.enabled }} + store.data.secondary.path={{ .Values.secondary.storeDataPath }} + store.gc.interval.ms={{ .Values.storeGcIntervalMs }} + ## Extra Config {{- if .Values.extraConfig }} {{- $config_list := regexSplit ";" .Values.extraConfig -1 }} diff --git a/charts/graphscope-store/values.yaml b/charts/graphscope-store/values.yaml index 4306818e722f..996a7673f045 100644 --- a/charts/graphscope-store/values.yaml +++ b/charts/graphscope-store/values.yaml @@ -515,6 +515,8 @@ storeDataPath: "/var/lib/graphscope-store" storeWriteThreadCount: 1 storeQueueBufferSize: 102400 +storeGcIntervalMs: 5000 + ## Kafka Config kafkaTopic: "graphscope" kafkaProducerCustomConfigs: "" @@ -535,3 +537,7 @@ pegasus: worker: num: 1 timeout: 240000 + +secondary: + enabled: false + storeDataPath: "./data_secondary" diff --git a/docs/storage_engine/groot.md b/docs/storage_engine/groot.md index 70e1a64c15f0..960c1b575c4e 100644 --- a/docs/storage_engine/groot.md +++ b/docs/storage_engine/groot.md @@ -678,3 +678,15 @@ The location of the logging configuration file in the container is: - configuration file of `logback` is in `/usr/local/groot/conf/logback.xml` - configuration file of `log4rs` is in `/usr/local/groot/conf/log4rs.yml` + +### Secondary Instance + +Groot support open secondary instance along with primary instances. It leverages the [Secondary Instance](https://github.com/facebook/rocksdb/wiki/Read-only-and-Secondary-instances) of RocksDB +to provide the ability to serve the querying requests as well as catching up the schema and data updates. + +To use it, just set the `secondary.enabled=true` in the helm charts. +Also remember the data path, ZK connect string as well as Kafka endpoint and topic should be as same as the primary instance. +And use a different `zk.base.path` for each secondary instance to avoid conflict with each other when doing node discovery. + +`storeGcIntervalMs` controls how often should the secondary perform a `try_catch_up_with_primary` call, default to `5000` which is 5 seconds. + diff --git a/interactive_engine/assembly/src/bin/groot/store_ctl.sh b/interactive_engine/assembly/src/bin/groot/store_ctl.sh index 2b82674e4d2b..3d2555c66fdf 100755 --- a/interactive_engine/assembly/src/bin/groot/store_ctl.sh +++ b/interactive_engine/assembly/src/bin/groot/store_ctl.sh @@ -100,7 +100,7 @@ start_server() { -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=4 -XX:GCLogFileSize=64m" - + export RUST_BACKTRACE=full java ${java_opt} \ -Dlogback.configurationFile="${GROOT_LOGBACK_FILE}" \ -Dconfig.file="${GROOT_CONF_FILE}" \ diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/RoleType.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/RoleType.java index e581196acb79..402b2930355e 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/RoleType.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/RoleType.java @@ -19,14 +19,10 @@ public enum RoleType { UNKNOWN("unknown"), FRONTEND("frontend"), - FRONTEND_SERVICE("frontend_service"), INGESTOR("ingestor"), STORE("store"), COORDINATOR("coordinator"), - EXECUTOR_GRAPH("executor_graph"), - EXECUTOR_QUERY("executor_query"), - EXECUTOR_MANAGE("executor_manage"), - EXECUTOR_ENGINE("executor_engine"), + FRONTEND_SERVICE("frontend_service"), GAIA_ENGINE("gaia_engine"), GAIA_RPC("gaia_rpc"); diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CommonConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CommonConfig.java index 561577b6a182..b71032800092 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CommonConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CommonConfig.java @@ -26,6 +26,17 @@ public class CommonConfig { public static final Config RPC_PORT = Config.intConfig("rpc.port", 0); + public static final Config GAIA_RPC_PORT = Config.stringConfig("gaia.rpc.port", ""); + public static final Config GAIA_ENGINE_PORT = + Config.stringConfig("gaia.engine.port", ""); + public static final Config FRONTEND_RPC_PORT = + Config.stringConfig("frontend.rpc.port", ""); + public static final Config COORDINATOR_RPC_PORT = + Config.stringConfig("coordinator.rpc.port", ""); + public static final Config INGESTOR_RPC_PORT = + Config.stringConfig("ingestor.rpc.port", ""); + public static final Config STORE_RPC_PORT = Config.stringConfig("store.rpc.port", ""); + public static final Config RPC_THREAD_COUNT = Config.intConfig( "rpc.thread.count", @@ -75,4 +86,7 @@ public class CommonConfig { // Whether to create test kafka cluster on MaxNode public static final Config KAFKA_TEST_CLUSTER_ENABLE = Config.boolConfig("kafka.test.cluster.enable", true); + + public static final Config SECONDARY_INSTANCE_ENABLED = + Config.boolConfig("secondary.instance.enabled", false); } diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java index 3ac622586ef0..b7e502ccb955 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java @@ -23,7 +23,7 @@ public class CoordinatorConfig { Config.longConfig("offsets.persist.interval.ms", 3000L); public static final Config LOG_RECYCLE_ENABLE = - Config.boolConfig("log.recycle.enable", true); + Config.boolConfig("log.recycle.enable", false); public static final Config LOG_RECYCLE_INTERVAL_SECOND = Config.longConfig("log.recycle.interval.second", 600L); diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/FrontendConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/FrontendConfig.java index 2be5e01ea2ed..5df7d5d47421 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/FrontendConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/FrontendConfig.java @@ -18,8 +18,11 @@ public class FrontendConfig { public static final Config AUTH_PASSWORD = Config.stringConfig("auth.password", ""); + // public static final Config FRONTEND_SERVICE_PORT = - Config.intConfig("frontend.service.port", 8182); + Config.intConfig("frontend.service.port", 55556); + public static final Config GREMLIN_SERVER_PORT = + Config.intConfig("gremlin.server.port", 8182); public static final Config FRONTEND_SERVICE_THREAD_COUNT = Config.intConfig( diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/GaiaConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/GaiaConfig.java deleted file mode 100644 index 23876992a19c..000000000000 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/GaiaConfig.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.alibaba.graphscope.groot.common.config; - -public class GaiaConfig { - public static final Config GAIA_ENABLE = Config.boolConfig("gaia.enable", true); - - public static final Config GAIA_REPORT = Config.boolConfig("gaia.report", false); - - public static final Config GAIA_RPC_PORT = Config.intConfig("gaia.rpc.port", 0); - - public static final Config GAIA_ENGINE_PORT = Config.intConfig("gaia.engine.port", 0); - - public static final Config GAIA_NONBLOCKING = - Config.boolConfig("gaia.nonblocking", false); - - public static final Config GAIA_READ_TIMEOUT_MS = - Config.intConfig("gaia.read.timeout.ms", 0); - - public static final Config GAIA_WRITE_TIMEOUT_MS = - Config.intConfig("gaia.write.timeout.ms", 0); - - public static final Config GAIA_READ_SLAB_SIZE = - Config.intConfig("gaia.read.slab.size", 0); - - public static final Config GAIA_NO_DELAY = Config.boolConfig("gaia.no.delay", false); - - public static final Config GAIA_SEND_BUFFER = Config.intConfig("gaia.send.buffer", 0); - - public static final Config GAIA_HEARTBEAT_SEC = - Config.intConfig("gaia.heartbeat.sec", 0); - - public static final Config GAIA_MAX_POOL_SIZE = - Config.intConfig("gaia.max.pool.size", 0); -} diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/GremlinConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/GremlinConfig.java deleted file mode 100644 index 358c7c0f2eff..000000000000 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/GremlinConfig.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Copyright 2020 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.graphscope.groot.common.config; - -/** - * Gremlin related config - */ -public class GremlinConfig { - /** - * Get gremlin server port - * - * @return The gremlin server port - */ - public static final Config GREMLIN_PORT = Config.intConfig("gremlin.server.port", 0); - - /** - * Get gremlin server write buffer high water - * - * @return The gremlin server netty write buffer high water - */ - public static final Config SERVER_WRITE_BUFFER_HIGH_WATER = - Config.intConfig("gremlin.server.buffer.high.water", 16 * 1024 * 1024); - - /** - * Get gremlin server write buffer low water - * - * @return The gremlin server netty write buffer low water - */ - public static final Config SERVER_WRITE_BUFFER_LOW_WATER = - Config.intConfig("gremlin.server.buffer.low.water", 8 * 1024 * 1024); -} diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/IngestorConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/IngestorConfig.java index c8e05cfb73bb..4c70d347ad14 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/IngestorConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/IngestorConfig.java @@ -24,5 +24,5 @@ public class IngestorConfig { Config.intConfig("ingestor.sender.operation.max.count", 102400); public static final Config INGESTOR_CHECK_PROCESSOR_INTERVAL_MS = - Config.longConfig("ingestor.check.processor.interval.ms", 3000L); + Config.longConfig("ingestor.check.processor.interval.ms", 2000L); } diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/StoreConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/StoreConfig.java index fe2d1be9ca33..d47a7d87e9f6 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/StoreConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/StoreConfig.java @@ -32,12 +32,12 @@ public class StoreConfig { public static final Config STORE_GC_ENABLE = Config.boolConfig("store.gc.enable", true); - public static final Config EXECUTOR_GRAPH_PORT = - Config.intConfig("executor.graph.port", 0); - - public static final Config EXECUTOR_QUERY_PORT = - Config.intConfig("executor.query.port", 0); - - public static final Config EXECUTOR_ENGINE_PORT = - Config.intConfig("executor.engine.port", 0); + public static final Config STORE_GC_INTERVAL_MS = + Config.longConfig("store.gc.interval.ms", 5000L); + + // set by IS_SECONDARY_INSTANCE, used in graph.rs + public static final Config STORE_STORAGE_ENGINE = + Config.stringConfig("store.storage.engine", "rocksdb"); + public static final Config STORE_SECONDARY_DATA_PATH = + Config.stringConfig("store.data.secondary.path", "./data_secondary"); } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/QueryLogger.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/QueryLogger.java index e8d4165fbce2..3f0fe746cb8f 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/QueryLogger.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/QueryLogger.java @@ -31,6 +31,10 @@ public QueryLogger(String query, long queryId) { this.queryId = queryId; } + public void debug(String format, Object... args) { + defaultLogger.debug(this + " : " + format, args); + } + public void info(String format, Object... args) { defaultLogger.info(this + " : " + format, args); } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java index 75b6305767d3..0161dc8ec494 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java @@ -323,7 +323,6 @@ protected void processTraversal( InterOpCollection.process(opCollection); long jobId = queryLogger.getQueryId(); - String jobName = "ir_plan_" + jobId; IrPlan irPlan = new IrPlan(irMeta, opCollection); // print script and jobName with ir plan queryLogger.info("ir plan {}", irPlan.getPlanAsJson()); @@ -334,6 +333,7 @@ protected void processTraversal( PegasusClient.JobRequest.newBuilder() .setPlan(ByteString.copyFrom(physicalPlanBytes)) .build(); + String jobName = "ir_plan_" + jobId; PegasusClient.JobConfig jobConfig = PegasusClient.JobConfig.newBuilder() .setJobId(jobId) @@ -346,6 +346,7 @@ protected void processTraversal( .setAll(PegasusClient.Empty.newBuilder().build()) .build(); request = request.toBuilder().setConf(jobConfig).build(); + this.rpcClient.submit(request, resultProcessor, timeoutConfig.getChannelTimeoutMS()); } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java index 0932f285d486..8785165f9015 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java @@ -67,7 +67,7 @@ protected AbstractResultProcessor( RequestMessage msg = writeResult.getRequestMessage(); Settings settings = writeResult.getSettings(); // init batch size from resultIterationBatchSize in conf/gremlin-server.yaml, - // or args in RequestMessage which is originate from gremlin client + // or args in RequestMessage which is originated from gremlin client this.resultCollectorsBatchSize = (Integer) msg.optionalArgs(Tokens.ARGS_BATCH_SIZE) diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/util/VolumeFS.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/util/VolumeFS.java index 8bfbc47b5776..3ab2d42e50a6 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/util/VolumeFS.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/util/VolumeFS.java @@ -162,7 +162,7 @@ public void createVolumeIfNotExists(Odps odps) throws IOException { volumeName, "created by groot data-load-tools", Volume.Type.OLD, - 7L); + 1L); } } catch (OdpsException e) { System.out.println( diff --git a/interactive_engine/executor/assembly/groot/src/store/graph.rs b/interactive_engine/executor/assembly/groot/src/store/graph.rs index 1fef26d175c8..0fe8b728cb13 100644 --- a/interactive_engine/executor/assembly/groot/src/store/graph.rs +++ b/interactive_engine/executor/assembly/groot/src/store/graph.rs @@ -55,7 +55,12 @@ pub extern "C" fn openGraphStore(config_bytes: *const u8, len: usize) -> GraphHa let buf = unsafe { ::std::slice::from_raw_parts(config_bytes, len) }; let proto = parse_pb::(buf).expect("parse config pb failed"); let mut config_builder = GraphConfigBuilder::new(); - config_builder.set_storage_engine("rocksdb"); + let engine = "rocksdb".to_string(); + let engine = proto + .get_configs() + .get("store.storage.engine") + .unwrap_or(&engine); + config_builder.set_storage_engine(engine); config_builder.set_storage_options(proto.get_configs().clone()); let config = config_builder.build(); let path = config @@ -438,14 +443,22 @@ fn delete_edge(graph: &G, snapshot_id: i64, op: &Operation #[no_mangle] pub extern "C" fn garbageCollectSnapshot(ptr: GraphHandle, snapshot_id: i64) -> Box { - unsafe { - let graph_store_ptr = &*(ptr as *const GraphStore); - match graph_store_ptr.gc(snapshot_id) { - Ok(_) => JnaResponse::new_success(), - Err(e) => { - let msg = format!("{:?}", e); - JnaResponse::new_error(&msg) - } + let graph_store_ptr = unsafe { &*(ptr as *const GraphStore) }; + + match graph_store_ptr.try_catch_up_with_primary() { + Ok(_) => (), + Err(e) => { + error!("Error during catch up primary {:?}", e); + } + }; + if snapshot_id % 3600 != 0 { + return JnaResponse::new_success(); + } + match graph_store_ptr.gc(snapshot_id) { + Ok(_) => JnaResponse::new_success(), + Err(e) => { + let msg = format!("{:?}", e); + JnaResponse::new_error(&msg) } } } diff --git a/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcClient.java b/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcClient.java index 978ad2a1ca4d..3e0cf5c4fe2b 100644 --- a/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcClient.java +++ b/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcClient.java @@ -99,7 +99,6 @@ public void onError(Throwable throwable) { @Override public void onCompleted() { - logger.info("finish get job response from one server"); if (counter.decrementAndGet() == 0) { logger.info("finish get job response from all servers"); processor.finish(); diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/result.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/result.rs index dd58f0b3255f..d3b6083e869d 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/result.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/result.rs @@ -55,6 +55,7 @@ impl ResultSink { info_worker!("Job is canceled"); let msg = "Job is canceled".to_string(); let err = JobExecError::from(msg); + warn_worker!("Job is canceled"); tx.on_error(Box::new(err)); } _ => (), diff --git a/interactive_engine/executor/ir/graph_proxy/src/apis/graph/mod.rs b/interactive_engine/executor/ir/graph_proxy/src/apis/graph/mod.rs index b38b0e8538be..ecd0837e5c1e 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/apis/graph/mod.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/apis/graph/mod.rs @@ -65,7 +65,7 @@ impl From for Direction { } } -#[derive(Default, Debug)] +#[derive(Default, Debug, Clone)] pub struct QueryParams { pub labels: Vec, pub limit: Option, diff --git a/interactive_engine/executor/store/groot/src/db/graph/store.rs b/interactive_engine/executor/store/groot/src/db/graph/store.rs index 28dceadb7015..49dc78c85c56 100644 --- a/interactive_engine/executor/store/groot/src/db/graph/store.rs +++ b/interactive_engine/executor/store/groot/src/db/graph/store.rs @@ -1,6 +1,7 @@ #![allow(dead_code)] use std::collections::HashMap; use std::collections::HashSet; +use std::fs; use std::path::Path; use std::sync::atomic::{AtomicIsize, Ordering}; use std::sync::Arc; @@ -609,6 +610,10 @@ impl MultiVersionGraph for GraphStore { let data_file_path = format!("{}/../{}/{}/part-r-{:0>5}.sst", self.data_root, "download", unique_path, partition_id); if Path::new(data_file_path.as_str()).exists() { + if let Ok(metadata) = fs::metadata(data_file_path.clone()) { + let size = metadata.len(); + println!("Ingesting file: {} with size: {} bytes", data_file_path, size); + } self.ingest(data_file_path.as_str())? } if target.src_label_id > 0 { @@ -638,7 +643,7 @@ impl MultiVersionGraph for GraphStore { impl GraphStore { pub fn open(config: &GraphConfig, path: &str) -> GraphResult { - info!("open graph store at {}, with config: {:?}", path, config); + info!("open graph store at {} with config {:?}", path, config); match config.get_storage_engine() { "rocksdb" => { let res = RocksDB::open(config.get_storage_options(), path).and_then(|db| { @@ -647,6 +652,17 @@ impl GraphStore { }); res_unwrap!(res, open, config, path) } + "rocksdb_as_secondary" => { + let secondary_path = config + .get_storage_option("store.data.secondary.path") + .expect("invalid config, missing store.data.secondary.path"); + let res = RocksDB::open_as_secondary(config.get_storage_options(), path, secondary_path) + .and_then(|db| { + let storage = Arc::new(db); + Self::init(config, storage, path) + }); + res_unwrap!(res, open, config, path) + } unknown => { let msg = format!("unknown storage {}", unknown); let err = gen_graph_err!(GraphErrorCode::NotSupported, msg, open, config, path); @@ -655,6 +671,10 @@ impl GraphStore { } } + pub fn try_catch_up_with_primary(&self) -> GraphResult<()> { + self.storage.try_catch_up_with_primary() + } + fn init(config: &GraphConfig, storage: Arc, path: &str) -> GraphResult { let meta = Meta::new(storage.clone()); let (vertex_manager, edge_manager) = res_unwrap!(meta.recover(), init)?; diff --git a/interactive_engine/executor/store/groot/src/db/storage/mod.rs b/interactive_engine/executor/store/groot/src/db/storage/mod.rs index a4fef60cf6af..b9060ac2c091 100644 --- a/interactive_engine/executor/store/groot/src/db/storage/mod.rs +++ b/interactive_engine/executor/store/groot/src/db/storage/mod.rs @@ -16,6 +16,7 @@ pub trait ExternalStorage: Send + Sync { fn load(&self, files: &[&str]) -> GraphResult<()>; fn open_backup_engine(&self, backup_path: &str) -> GraphResult>; fn new_scan(&self, prefix: &[u8]) -> GraphResult + Send>>; + fn try_catch_up_with_primary(&self) -> GraphResult<()>; } pub trait ExternalStorageBackup { diff --git a/interactive_engine/executor/store/groot/src/db/storage/rocksdb.rs b/interactive_engine/executor/store/groot/src/db/storage/rocksdb.rs index 65d1a335077f..5a202396e43b 100644 --- a/interactive_engine/executor/store/groot/src/db/storage/rocksdb.rs +++ b/interactive_engine/executor/store/groot/src/db/storage/rocksdb.rs @@ -11,6 +11,7 @@ use crate::db::storage::{KvPair, RawBytes}; pub struct RocksDB { db: Arc, + is_secondary: bool, } pub struct RocksDBBackupEngine { @@ -25,7 +26,32 @@ impl RocksDB { let msg = format!("open rocksdb at {} failed, because {}", path, e.into_string()); gen_graph_err!(GraphErrorCode::ExternalStorageError, msg, open, options, path) })?; - let ret = RocksDB { db: Arc::new(db) }; + let ret = RocksDB { db: Arc::new(db), is_secondary: false }; + Ok(ret) + } + + pub fn open_as_secondary( + options: &HashMap, primary_path: &str, secondary_path: &str, + ) -> GraphResult { + let mut opts = Options::default(); + opts.set_max_open_files(-1); + let db = DB::open_as_secondary(&opts, primary_path, secondary_path).map_err(|e| { + let msg = format!( + "open rocksdb at {}, {} failed, because {}", + primary_path, + secondary_path, + e.into_string() + ); + gen_graph_err!( + GraphErrorCode::ExternalStorageError, + msg, + open_as_secondary, + options, + primary_path + ) + })?; + + let ret = RocksDB { db: Arc::new(db), is_secondary: true }; Ok(ret) } } @@ -44,6 +70,10 @@ impl ExternalStorage for RocksDB { } fn put(&self, key: &[u8], val: &[u8]) -> GraphResult<()> { + if self.is_secondary { + info!("Cannot put in secondary instance"); + return Ok(()); + } self.db.put(key, val).map_err(|e| { let msg = format!("rocksdb.put failed because {}", e.into_string()); gen_graph_err!(GraphErrorCode::ExternalStorageError, msg) @@ -51,6 +81,10 @@ impl ExternalStorage for RocksDB { } fn delete(&self, key: &[u8]) -> GraphResult<()> { + if self.is_secondary { + info!("Cannot delete in secondary instance"); + return Ok(()); + } self.db.delete(key).map_err(|e| { let msg = format!("rocksdb.delete failed because {}", e.into_string()); gen_graph_err!(GraphErrorCode::ExternalStorageError, msg) @@ -85,6 +119,10 @@ impl ExternalStorage for RocksDB { } fn delete_range(&self, start: &[u8], end: &[u8]) -> GraphResult<()> { + if self.is_secondary { + info!("Cannot delete_range in secondary instance"); + return Ok(()); + } let mut batch = WriteBatch::default(); self.db .delete_file_in_range(start, end) @@ -103,6 +141,10 @@ impl ExternalStorage for RocksDB { } fn load(&self, files: &[&str]) -> GraphResult<()> { + if self.is_secondary { + info!("Cannot ingest in secondary instance"); + return Ok(()); + } let mut options = IngestExternalFileOptions::default(); options.set_move_files(true); self.db @@ -150,6 +192,19 @@ impl ExternalStorage for RocksDB { iter.seek(prefix); Ok(Box::new(Scan::new(iter))) } + + fn try_catch_up_with_primary(&self) -> GraphResult<()> { + if self.is_secondary { + self.db + .try_catch_up_with_primary() + .map_err(|e| { + let msg = format!("try to catch up with primary failed because {:?}", e); + gen_graph_err!(GraphErrorCode::ExternalStorageError, msg) + }) + } else { + return Ok(()); + } + } } pub struct Scan { @@ -238,7 +293,14 @@ impl ExternalStorageBackup for RocksDBBackupEngine { fn init_options(options: &HashMap) -> Options { let mut ret = Options::default(); ret.create_if_missing(true); - // TODO: Add other customized db options. + ret.set_max_background_jobs(6); + ret.set_write_buffer_size(256 << 20); + ret.set_max_open_files(-1); + ret.set_max_log_file_size(1024 << 10); + ret.set_keep_log_file_num(10); + // https://github.com/facebook/rocksdb/wiki/Basic-Operations#non-sync-writes + ret.set_use_fsync(true); + if let Some(conf_str) = options.get("store.rocksdb.compression.type") { match conf_str.as_str() { "none" => ret.set_compression_type(DBCompressionType::None), diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/Utils.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/Utils.java new file mode 100644 index 000000000000..f77705d6d235 --- /dev/null +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/Utils.java @@ -0,0 +1,89 @@ +/** + * Copyright 2020 Alibaba Group Holding Limited. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.graphscope.groot; + +import com.alibaba.graphscope.groot.common.RoleType; +import com.alibaba.graphscope.groot.common.config.CommonConfig; +import com.alibaba.graphscope.groot.common.config.Configs; +import com.alibaba.graphscope.groot.common.config.DiscoveryConfig; + +public class Utils { + + public static String getHostTemplate(Configs configs, RoleType role) { + switch (role) { + case FRONTEND: + return DiscoveryConfig.DNS_NAME_PREFIX_FRONTEND.get(configs); + case INGESTOR: + return DiscoveryConfig.DNS_NAME_PREFIX_INGESTOR.get(configs); + case COORDINATOR: + return DiscoveryConfig.DNS_NAME_PREFIX_COORDINATOR.get(configs); + case STORE: + case GAIA_RPC: + case GAIA_ENGINE: + return DiscoveryConfig.DNS_NAME_PREFIX_STORE.get(configs); + default: + throw new IllegalArgumentException("invalid role [" + role + "]"); + } + } + + public static int getPort(Configs configs) { + String discoveryMode = CommonConfig.DISCOVERY_MODE.get(configs).toLowerCase(); + if (discoveryMode.equals("file")) { + RoleType role = RoleType.fromName(CommonConfig.ROLE_NAME.get(configs)); + int idx = CommonConfig.NODE_IDX.get(configs); + return getPort(configs, role, idx); + } else { + return CommonConfig.RPC_PORT.get(configs); + } + } + + public static int getPort(Configs configs, RoleType role, int idx) { + String s; + switch (role) { + case FRONTEND: + s = CommonConfig.FRONTEND_RPC_PORT.get(configs); + break; + case INGESTOR: + s = CommonConfig.INGESTOR_RPC_PORT.get(configs); + break; + case COORDINATOR: + s = CommonConfig.COORDINATOR_RPC_PORT.get(configs); + break; + case STORE: + s = CommonConfig.STORE_RPC_PORT.get(configs); + break; + case GAIA_RPC: + s = CommonConfig.GAIA_RPC_PORT.get(configs); + break; + case GAIA_ENGINE: + s = CommonConfig.GAIA_ENGINE_PORT.get(configs); + break; + default: + throw new IllegalArgumentException("invalid role [" + role + "]"); + } + if (s.isEmpty()) { // For backward compatibility + return CommonConfig.RPC_PORT.get(configs); + } else { + String[] array = s.split(","); + if (idx >= array.length) { + // throw new IllegalArgumentException("Invalid index " + idx + " of " + s); + idx = 0; // Just use the first one. In this case, assume they are in different pods. + } + if (array[idx].isEmpty()) { + throw new IllegalArgumentException("Invalid port " + array[idx] + " of " + role); + } + return Integer.parseInt(array[idx]); + } + } +} diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GarbageCollectManager.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GarbageCollectManager.java index 813935b71cd9..172ba8f03467 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GarbageCollectManager.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GarbageCollectManager.java @@ -2,6 +2,7 @@ import com.alibaba.graphscope.groot.common.config.CommonConfig; import com.alibaba.graphscope.groot.common.config.Configs; +import com.alibaba.graphscope.groot.common.config.StoreConfig; import com.alibaba.graphscope.groot.common.util.ThreadFactoryUtils; import com.alibaba.graphscope.groot.rpc.RoleClients; @@ -20,11 +21,13 @@ public class GarbageCollectManager { private final ConcurrentHashMap hashMap; private final RoleClients clients; private ScheduledExecutorService updateScheduler; + private long interval; public GarbageCollectManager(Configs configs, RoleClients clients) { this.configs = configs; this.hashMap = new ConcurrentHashMap<>(); this.clients = clients; + this.interval = StoreConfig.STORE_GC_INTERVAL_MS.get(configs); } public void put(int frontendId, long snapshotId) { @@ -56,7 +59,7 @@ public void start() { } }, 5000L, - 2000L, + interval, TimeUnit.MILLISECONDS); } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GraphInitializer.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GraphInitializer.java index eebfb20dca72..eabf94c3b1fb 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GraphInitializer.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GraphInitializer.java @@ -13,9 +13,7 @@ */ package com.alibaba.graphscope.groot.coordinator; -import com.alibaba.graphscope.groot.common.config.CommonConfig; -import com.alibaba.graphscope.groot.common.config.Configs; -import com.alibaba.graphscope.groot.common.config.ZkConfig; +import com.alibaba.graphscope.groot.common.config.*; import com.alibaba.graphscope.groot.common.exception.GrootException; import com.alibaba.graphscope.groot.common.util.BackupInfo; import com.alibaba.graphscope.groot.meta.MetaStore; @@ -96,14 +94,17 @@ private void initializeMetaIfNeeded() throws IOException { byte[] b = this.objectMapper.writeValueAsBytes(0L); this.metaStore.write(IdAllocator.ID_ALLOCATE_INFO_PATH, b); } - if (!this.metaStore.exists(BackupManager.GLOBAL_BACKUP_ID_PATH)) { - byte[] b = this.objectMapper.writeValueAsBytes(0); - this.metaStore.write(BackupManager.GLOBAL_BACKUP_ID_PATH, b); - } - if (!this.metaStore.exists(BackupManager.BACKUP_INFO_PATH)) { - List backupInfoList = new ArrayList<>(); - byte[] b = this.objectMapper.writeValueAsBytes(backupInfoList); - this.metaStore.write(BackupManager.BACKUP_INFO_PATH, b); + + if (BackupConfig.BACKUP_ENABLE.get(this.configs)) { + if (!this.metaStore.exists(BackupManager.GLOBAL_BACKUP_ID_PATH)) { + byte[] b = this.objectMapper.writeValueAsBytes(0); + this.metaStore.write(BackupManager.GLOBAL_BACKUP_ID_PATH, b); + } + if (!this.metaStore.exists(BackupManager.BACKUP_INFO_PATH)) { + List backupInfoList = new ArrayList<>(); + byte[] b = this.objectMapper.writeValueAsBytes(backupInfoList); + this.metaStore.write(BackupManager.BACKUP_INFO_PATH, b); + } } } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/IngestProgressService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/IngestProgressService.java index 4812acb32481..c554d0d5282f 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/IngestProgressService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/IngestProgressService.java @@ -37,7 +37,7 @@ public IngestProgressService(SnapshotManager snapshotManager) { public void getTailOffsets( GetTailOffsetsRequest request, StreamObserver responseObserver) { - logger.info("Get offset of [" + request.getQueueIdList() + "]"); + logger.info("Get offset of {}", request.getQueueIdList()); List queueIdList = request.getQueueIdList(); List tailOffsets = this.snapshotManager.getTailOffsets(queueIdList); GetTailOffsetsResponse response = diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/IngestorWriteSnapshotIdNotifier.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/IngestorWriteSnapshotIdNotifier.java index a76c26978ca9..b8080ced89d6 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/IngestorWriteSnapshotIdNotifier.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/IngestorWriteSnapshotIdNotifier.java @@ -35,37 +35,26 @@ public IngestorWriteSnapshotIdNotifier( } @Override - public void notifyWriteSnapshotIdChanged(long snapshotId) { + public void notifyWriteSnapshotIdChanged(long si) { + CompletionCallback callback = + new CompletionCallback() { + @Override + public void onCompleted(Long prev) { + if (prev > si) { + logger.error( + "unexpected previousSnapshotId {}, should <= {}", prev, si); + } + } + + @Override + public void onError(Throwable t) { + logger.error("error in advanceIngestSnapshotId {}: {}", si, t.toString()); + } + }; for (int i = 0; i < this.ingestorCount; i++) { try { - int realtimeWriterId = i; - this.ingestorSnapshotClients - .getClient(realtimeWriterId) - .advanceIngestSnapshotId( - snapshotId, - new CompletionCallback() { - @Override - public void onCompleted(Long previousSnapshotId) { - if (previousSnapshotId > snapshotId) { - logger.error( - "unexpected previousSnapshotId [{}], should <=" - + " [{}]. target realtime writer [{}]", - previousSnapshotId, - snapshotId, - realtimeWriterId); - } - } - - @Override - public void onError(Throwable t) { - logger.error( - "error in advanceIngestSnapshotId [{}]. realtime" - + " writer [{}], reason [{}]", - snapshotId, - realtimeWriterId, - t.getMessage()); - } - }); + IngestorSnapshotClient client = ingestorSnapshotClients.getClient(i); + client.advanceIngestSnapshotId(si, callback); } catch (Exception e) { logger.warn("update writeSnapshotId failed. realtimeWriter [{}]", i, e); } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/NotifyFrontendListener.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/NotifyFrontendListener.java index b0ca09c80abc..6b9ce0a5baa5 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/NotifyFrontendListener.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/NotifyFrontendListener.java @@ -43,12 +43,8 @@ public NotifyFrontendListener( @Override public void snapshotAdvanced(long snapshotId, long ddlSnapshotId) { - logger.debug( - "snapshot advanced to [{}]-[{}], will notify frontend", snapshotId, ddlSnapshotId); - GraphDef graphDef = null; - if (ddlSnapshotId > this.lastDdlSnapshotId.get()) { - graphDef = this.schemaManager.getGraphDef(); - } + GraphDef graphDef = this.schemaManager.getGraphDef(); + logger.debug("snapshot advanced to {}-{}, will notify frontend", snapshotId, ddlSnapshotId); this.frontendSnapshotClient.advanceQuerySnapshot( snapshotId, graphDef, diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaManager.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaManager.java index e82e5974c7a2..aae7c6344c76 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaManager.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaManager.java @@ -30,12 +30,7 @@ import java.io.IOException; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; public class SchemaManager { @@ -52,6 +47,7 @@ public class SchemaManager { private volatile boolean ready = false; private ExecutorService singleThreadExecutor; + private ScheduledExecutorService scheduler; public SchemaManager( SnapshotManager snapshotManager, @@ -80,6 +76,13 @@ public void start() { ThreadFactoryUtils.daemonThreadFactoryWithLogExceptionHandler( "ddl-executor", logger)); recover(); + logger.info(graphDefRef.get().toProto().toString()); + + this.scheduler = + Executors.newSingleThreadScheduledExecutor( + ThreadFactoryUtils.daemonThreadFactoryWithLogExceptionHandler( + "recover", logger)); + this.scheduler.scheduleWithFixedDelay(this::recover, 5, 2, TimeUnit.SECONDS); } private void recover() { @@ -97,7 +100,6 @@ private void recover() { } private void recoverInternal() throws IOException, ExecutionException, InterruptedException { - logger.info("start recover"); long snapshotId = this.snapshotManager.increaseWriteSnapshotId(); CompletableFuture future = new CompletableFuture<>(); this.snapshotManager.addSnapshotListener(snapshotId, () -> future.complete(null)); @@ -105,8 +107,7 @@ private void recoverInternal() throws IOException, ExecutionException, Interrupt GraphDef graphDef = this.graphDefFetcher.fetchGraphDef(); this.graphDefRef.set(graphDef); this.ready = true; - logger.info("SchemaManager recovered. version [" + graphDef.getVersion() + "]"); - logger.info(graphDef.toProto().toString()); + // logger.info("SchemaManager recovered. version [" + graphDef.getVersion() + "]"); } public void stop() { diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SnapshotManager.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SnapshotManager.java index b7cb786fd42f..777e2280a52c 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SnapshotManager.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SnapshotManager.java @@ -137,6 +137,8 @@ public class SnapshotManager { private final ObjectMapper objectMapper; + private boolean isSecondary; + public SnapshotManager( Configs configs, MetaStore metaStore, @@ -154,6 +156,8 @@ public SnapshotManager( CoordinatorConfig.SNAPSHOT_INCREASE_INTERVAL_MS.get(configs); this.offsetsPersistIntervalMs = CoordinatorConfig.OFFSETS_PERSIST_INTERVAL_MS.get(configs); + this.isSecondary = CommonConfig.SECONDARY_INSTANCE_ENABLED.get(configs); + this.storeToSnapshotInfo = new ConcurrentHashMap<>(); this.storeToOffsets = new ConcurrentHashMap<>(); } @@ -230,38 +234,39 @@ private void recover() throws IOException { checkMetaPath(WRITE_SNAPSHOT_ID_PATH); checkMetaPath(QUEUE_OFFSETS_PATH); - byte[] querySnapshotInfoBytes = this.metaStore.read(QUERY_SNAPSHOT_INFO_PATH); - SnapshotInfo recoveredQuerySnapshotInfo = - this.objectMapper.readValue(querySnapshotInfoBytes, SnapshotInfo.class); - - byte[] writeSnapshotIdBytes = this.metaStore.read(WRITE_SNAPSHOT_ID_PATH); - long recoveredWriteSnapshotId = - this.objectMapper.readValue(writeSnapshotIdBytes, Long.class); + byte[] queryBytes = this.metaStore.read(QUERY_SNAPSHOT_INFO_PATH); + SnapshotInfo querySI = objectMapper.readValue(queryBytes, SnapshotInfo.class); + logger.info("recovered query snapshot info {}", querySI); - if (recoveredQuerySnapshotInfo.getSnapshotId() > recoveredWriteSnapshotId) { + byte[] writeBytes = this.metaStore.read(WRITE_SNAPSHOT_ID_PATH); + long writeSI = objectMapper.readValue(writeBytes, Long.class); + logger.info("recovered write snapshot id {}", writeSI); + if (querySI.getSnapshotId() > writeSI) { throw new IllegalStateException( "recovered querySnapshotInfo [" - + recoveredQuerySnapshotInfo + + querySI + "] > writeSnapshotId [" - + recoveredWriteSnapshotId + + writeSI + "]"); } - byte[] queueOffsetsBytes = this.metaStore.read(QUEUE_OFFSETS_PATH); - List recoveredQueueOffsets = - this.objectMapper.readValue(queueOffsetsBytes, new TypeReference>() {}); - logger.info("recovered queue offsets " + recoveredQueueOffsets + ""); - if (recoveredQueueOffsets.size() != this.queueCount) { + byte[] offsetBytes = this.metaStore.read(QUEUE_OFFSETS_PATH); + List offsets = objectMapper.readValue(offsetBytes, new TypeReference<>() {}); + logger.info("recovered queue offsets {}", offsets); + if (isSecondary) { + offsets = offsets.subList(0, 1); + } + if (offsets.size() != this.queueCount) { throw new IllegalStateException( "recovered queueCount [" - + recoveredQueueOffsets.size() + + offsets.size() + "], but expect queueCount [" + this.queueCount + "]"); } for (int i = 0; i < this.queueCount; i++) { - long recoveredOffset = recoveredQueueOffsets.get(i); + long recoveredOffset = offsets.get(i); try (LogReader reader = logService.createReader(i, recoveredOffset + 1)) { } catch (Exception e) { throw new IOException( @@ -274,9 +279,9 @@ private void recover() throws IOException { } } - this.querySnapshotInfo = recoveredQuerySnapshotInfo; - this.writeSnapshotId = recoveredWriteSnapshotId; - this.queueOffsetsRef = new AtomicReference<>(recoveredQueueOffsets); + this.querySnapshotInfo = querySI; + this.writeSnapshotId = writeSI; + this.queueOffsetsRef = new AtomicReference<>(offsets); } /** @@ -369,7 +374,7 @@ private void maybeUpdateQuerySnapshotId() { // "], currentSnapshotInfo [" + // this.querySnapshotInfo + "]"); } - persistQuerySnapshotId(minSnapshotInfo); + persistObject(minSnapshotInfo, QUERY_SNAPSHOT_INFO_PATH); this.querySnapshotInfo = minSnapshotInfo; logger.debug("querySnapshotInfo updated to [{}]", querySnapshotInfo); } catch (IOException e) { @@ -407,16 +412,11 @@ private void maybeUpdateQuerySnapshotId() { } } - private void persistQuerySnapshotId(SnapshotInfo snapshotInfo) throws IOException { - byte[] b = this.objectMapper.writeValueAsBytes(snapshotInfo); - this.metaStore.write(QUERY_SNAPSHOT_INFO_PATH, b); - } - public long increaseWriteSnapshotId() throws IOException { this.writeSnapshotLock.lock(); try { long snapshotId = this.writeSnapshotId + 1; - persistWriteSnapshotId(snapshotId); + persistObject(snapshotId, WRITE_SNAPSHOT_ID_PATH); this.writeSnapshotId = snapshotId; this.writeSnapshotIdNotifier.notifyWriteSnapshotIdChanged(this.writeSnapshotId); return this.writeSnapshotId; @@ -437,11 +437,6 @@ public long getCurrentWriteSnapshotId() { return this.writeSnapshotId; } - private void persistWriteSnapshotId(long snapshotId) throws IOException { - byte[] b = this.objectMapper.writeValueAsBytes(snapshotId); - this.metaStore.write(WRITE_SNAPSHOT_ID_PATH, b); - } - private void updateQueueOffsets() throws IOException { if (this.storeToOffsets.size() < this.storeCount) { logger.warn( @@ -462,20 +457,17 @@ private void updateQueueOffsets() throws IOException { } } if (changed) { - persistQueueOffsets(newQueueOffsets); + persistObject(newQueueOffsets, QUEUE_OFFSETS_PATH); this.queueOffsetsRef.set(newQueueOffsets); } } - private void persistQueueOffsets(List queueOffsets) throws IOException { - byte[] bytes = this.objectMapper.writeValueAsBytes(queueOffsets); - this.metaStore.write(QUEUE_OFFSETS_PATH, bytes); - } - - public SnapshotInfo getQuerySnapshotInfo() { - synchronized (this.querySnapshotLock) { - return querySnapshotInfo; + private void persistObject(Object value, String path) throws IOException { + if (isSecondary) { + return; } + byte[] b = objectMapper.writeValueAsBytes(value); + metaStore.write(path, b); } /** diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/DiscoveryFactory.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/DiscoveryFactory.java index 5fa2e20584a5..7dce53432b3b 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/DiscoveryFactory.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/DiscoveryFactory.java @@ -1,3 +1,16 @@ +/** + * Copyright 2020 Alibaba Group Holding Limited. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ package com.alibaba.graphscope.groot.discovery; import com.alibaba.graphscope.groot.CuratorUtils; diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/FileDiscovery.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/FileDiscovery.java index 5945d953f7be..54ca52f0c951 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/FileDiscovery.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/FileDiscovery.java @@ -13,6 +13,9 @@ */ package com.alibaba.graphscope.groot.discovery; +import static com.alibaba.graphscope.groot.common.RoleType.*; + +import com.alibaba.graphscope.groot.Utils; import com.alibaba.graphscope.groot.common.RoleType; import com.alibaba.graphscope.groot.common.config.*; @@ -28,75 +31,44 @@ public class FileDiscovery implements NodeDiscovery { private final Map> allNodes = new HashMap<>(); private boolean started = false; + Configs configs; + public FileDiscovery(Configs configs) { + this.configs = configs; // Store related nodes - int storeCount = CommonConfig.STORE_NODE_COUNT.get(configs); - String storeNamePrefix = DiscoveryConfig.DNS_NAME_PREFIX_STORE.get(configs); - int port = CommonConfig.RPC_PORT.get(configs); - Map storeNodes = - makeRoleNodes(storeCount, storeNamePrefix, RoleType.STORE.getName(), port); - this.allNodes.put(RoleType.STORE, storeNodes); - - int graphPort = StoreConfig.EXECUTOR_GRAPH_PORT.get(configs); - Map graphNodes = - makeRoleNodes( - storeCount, storeNamePrefix, RoleType.EXECUTOR_GRAPH.getName(), graphPort); - this.allNodes.put(RoleType.EXECUTOR_GRAPH, graphNodes); - - int queryPort = StoreConfig.EXECUTOR_QUERY_PORT.get(configs); - Map queryNodes = - makeRoleNodes( - storeCount, storeNamePrefix, RoleType.EXECUTOR_QUERY.getName(), queryPort); - this.allNodes.put(RoleType.EXECUTOR_QUERY, queryNodes); - - int enginePort = StoreConfig.EXECUTOR_ENGINE_PORT.get(configs); - Map engineNodes = - makeRoleNodes( - storeCount, - storeNamePrefix, - RoleType.EXECUTOR_ENGINE.getName(), - enginePort); - this.allNodes.put(RoleType.EXECUTOR_ENGINE, engineNodes); - - int gaiaRpcPort = GaiaConfig.GAIA_RPC_PORT.get(configs); - Map gaiaRpcNodes = - makeRoleNodes( - storeCount, storeNamePrefix, RoleType.GAIA_RPC.getName(), gaiaRpcPort); - this.allNodes.put(RoleType.GAIA_RPC, gaiaRpcNodes); - - int gaiaEnginePort = GaiaConfig.GAIA_ENGINE_PORT.get(configs); - Map gaiaEngineNodes = - makeRoleNodes( - storeCount, - storeNamePrefix, - RoleType.GAIA_ENGINE.getName(), - gaiaEnginePort); - this.allNodes.put(RoleType.GAIA_ENGINE, gaiaEngineNodes); + String storePrefix = DiscoveryConfig.DNS_NAME_PREFIX_STORE.get(configs); + String frontendPrefix = DiscoveryConfig.DNS_NAME_PREFIX_FRONTEND.get(configs); + String ingestorPrefix = DiscoveryConfig.DNS_NAME_PREFIX_INGESTOR.get(configs); + String coordinatorPrefix = DiscoveryConfig.DNS_NAME_PREFIX_COORDINATOR.get(configs); // Frontend nodes int frontendCount = CommonConfig.FRONTEND_NODE_COUNT.get(configs); - String frontendNamePrefix = DiscoveryConfig.DNS_NAME_PREFIX_FRONTEND.get(configs); Map frontendNodes = - makeRoleNodes(frontendCount, frontendNamePrefix, RoleType.FRONTEND.getName(), port); - this.allNodes.put(RoleType.FRONTEND, frontendNodes); + makeRoleNodes(frontendCount, frontendPrefix, FRONTEND); + this.allNodes.put(FRONTEND, frontendNodes); // Ingestor nodes int ingestorCount = CommonConfig.INGESTOR_NODE_COUNT.get(configs); - String ingestorNamePrefix = DiscoveryConfig.DNS_NAME_PREFIX_INGESTOR.get(configs); Map ingestorNodes = - makeRoleNodes(ingestorCount, ingestorNamePrefix, RoleType.INGESTOR.getName(), port); - this.allNodes.put(RoleType.INGESTOR, ingestorNodes); + makeRoleNodes(ingestorCount, ingestorPrefix, INGESTOR); + this.allNodes.put(INGESTOR, ingestorNodes); // Coordinator nodes int coordinatorCount = CommonConfig.COORDINATOR_NODE_COUNT.get(configs); - String coordinatorNamePrefix = DiscoveryConfig.DNS_NAME_PREFIX_COORDINATOR.get(configs); Map coordinatorNodes = - makeRoleNodes( - coordinatorCount, - coordinatorNamePrefix, - RoleType.COORDINATOR.getName(), - port); - this.allNodes.put(RoleType.COORDINATOR, coordinatorNodes); + makeRoleNodes(coordinatorCount, coordinatorPrefix, COORDINATOR); + this.allNodes.put(COORDINATOR, coordinatorNodes); + + int storeCount = CommonConfig.STORE_NODE_COUNT.get(configs); + Map storeNodes = makeRoleNodes(storeCount, storePrefix, STORE); + this.allNodes.put(STORE, storeNodes); + + Map gaiaRpcNodes = makeRoleNodes(storeCount, storePrefix, GAIA_RPC); + this.allNodes.put(GAIA_RPC, gaiaRpcNodes); + + Map gaiaEngineNodes = + makeRoleNodes(storeCount, storePrefix, GAIA_ENGINE); + this.allNodes.put(GAIA_ENGINE, gaiaEngineNodes); } @Override @@ -106,12 +78,12 @@ public void start() { } } - private Map makeRoleNodes( - int nodeCount, String namePrefix, String role, int port) { + private Map makeRoleNodes(int nodeCount, String namePrefix, RoleType role) { Map nodes = new HashMap<>(); for (int i = 0; i < nodeCount; i++) { + int port = Utils.getPort(configs, role, i); String host = namePrefix.replace("{}", String.valueOf(i)); - nodes.put(i, new GrootNode(role, i, host, port)); + nodes.put(i, new GrootNode(role.getName(), i, host, port)); } return nodes; } @@ -126,13 +98,13 @@ public void addListener(Listener listener) { for (Map.Entry> e : allNodes.entrySet()) { RoleType role = e.getKey(); Map nodes = e.getValue(); - if (!nodes.isEmpty()) { - try { - listener.nodesJoin(role, nodes); - } catch (Exception ex) { - logger.error( - "listener [" + listener + "] failed on nodesJoin [" + nodes + "]", ex); - } + if (nodes.isEmpty()) { + continue; + } + try { + listener.nodesJoin(role, nodes); + } catch (Exception ex) { + logger.error("listener {} failed on nodesJoin {}", listener, nodes, ex); } } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/ZkDiscovery.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/ZkDiscovery.java index 8fa9d4df235e..e9f41b9e1b10 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/ZkDiscovery.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/ZkDiscovery.java @@ -100,11 +100,7 @@ public void start() { .serializer(serializer) .thisInstance(instance) .build(); - logger.debug( - "Start to add node " - + localNode - + " to discovery with base path " - + discoveryBasePath); + logger.info("Add node {} to with path {}", localNode, discoveryBasePath); this.serviceDiscovery.start(); RoleType[] roleTypes = RoleType.values(); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/BatchSender.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/BatchSender.java index ea495777e505..bc5c06573ba4 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/BatchSender.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/BatchSender.java @@ -162,12 +162,22 @@ public void asyncSendWithRetry( class SendTask { int storeId; + int retryCount = 0; List dataToRetry; public SendTask(int storeId, List dataToRetry) { + this(storeId, 0, dataToRetry); + } + + public SendTask(int storeId, int retryCount, List dataToRetry) { this.storeId = storeId; + this.retryCount = retryCount; this.dataToRetry = dataToRetry; } + + public void retry() { + retryCount += 1; + } } private void sendBatch() { @@ -183,7 +193,22 @@ private void sendBatch() { } int storeId = sendTask.storeId; + int retryCount = sendTask.retryCount; List dataToSend = sendTask.dataToRetry; + + if (retryCount > 0) { + try { + Thread.sleep(100); + } catch (InterruptedException ignored) { + + } + } + + if (retryCount > 1000) { + logger.error("Failed to send batch of {}", dataToSend); + return; + } + if (dataToSend == null) { dataToSend = new ArrayList<>(); BlockingQueue buffer = this.storeSendBuffer.get(storeId); @@ -236,23 +261,20 @@ private void finish(boolean suc) { long finishTime = System.nanoTime(); callbackLatencyMetrics.get(storeId).add(finishTime - beforeWriteTime); if (suc) { - addTask(storeId, null); + addTask(storeId, 0, null); } else { - addTask(storeId, finalDataToSend); + addTask(storeId, retryCount + 1, finalDataToSend); } } }); } else { - addTask(storeId, null); + addTask(storeId, 0, null); } } - private void addTask(int storeId, List dataToRetry) { - if (!sendTasks.offer(new SendTask(storeId, dataToRetry))) { - logger.error( - "Unexpected error, failed to add send task to queue. storeId [" - + storeId - + "]"); + private void addTask(int storeId, int retryCount, List dataToRetry) { + if (!sendTasks.offer(new SendTask(storeId, retryCount, dataToRetry))) { + logger.error("failed to add task. storeId [{}]", storeId); } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/IngestProcessor.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/IngestProcessor.java index e32684d7eb90..6714547958d1 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/IngestProcessor.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/IngestProcessor.java @@ -13,6 +13,7 @@ */ package com.alibaba.graphscope.groot.ingestor; +import com.alibaba.graphscope.groot.common.config.CommonConfig; import com.alibaba.graphscope.groot.common.config.Configs; import com.alibaba.graphscope.groot.common.config.IngestorConfig; import com.alibaba.graphscope.groot.common.exception.IngestRejectException; @@ -26,7 +27,10 @@ import com.alibaba.graphscope.groot.wal.LogService; import com.alibaba.graphscope.groot.wal.LogWriter; import com.alibaba.graphscope.groot.wal.ReadLogEntry; +import com.alibaba.graphscope.groot.wal.readonly.ReadOnlyLogReader; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,6 +62,7 @@ public class IngestProcessor implements MetricsAgent { private final int bufferSize; private BlockingQueue ingestBuffer; private Thread ingestThread; + private Thread tailWALThread; private final AtomicLong ingestSnapshotId; private final LogService logService; @@ -76,6 +81,7 @@ public class IngestProcessor implements MetricsAgent { private volatile long walBlockPerSecondMs; private volatile long lastUpdateStoreBlockTimeNano; private volatile long storeBlockPerSecondMs; + private boolean isSecondary; public IngestProcessor( Configs configs, @@ -90,6 +96,8 @@ public IngestProcessor( this.ingestSnapshotId = ingestSnapshotId; this.bufferSize = IngestorConfig.INGESTOR_QUEUE_BUFFER_MAX_COUNT.get(configs); + this.isSecondary = CommonConfig.SECONDARY_INSTANCE_ENABLED.get(configs); + initMetrics(); metricsCollector.register(this, this::updateMetrics); } @@ -132,6 +140,20 @@ public void start() { }); this.ingestThread.setDaemon(true); this.ingestThread.start(); + if (isSecondary) { + this.tailWALThread = + new Thread( + () -> { + try { + tailWAL(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + this.tailWALThread.setDaemon(true); + this.tailWALThread.start(); + } + started = true; logger.info("ingestProcessor queue#[" + queueId + "] started"); } @@ -149,8 +171,17 @@ public void stop() { } this.ingestThread = null; } + if (tailWALThread != null && tailWALThread.isAlive()) { + try { + this.tailWALThread.interrupt(); + this.tailWALThread.join(); + } catch (InterruptedException e) { + logger.warn("stop ingestProcessor queue#[" + queueId + "] interrupted"); + } + this.tailWALThread = null; + } this.batchSender.stop(); - logger.info("ingestProcessor queue#[" + queueId + "] stopped"); + logger.debug("ingestProcessor queue#[" + queueId + "] stopped"); } private void checkStarted() { @@ -159,6 +190,10 @@ private void checkStarted() { } } + public boolean isStarted() { + return started; + } + public void ingestBatch( String requestId, OperationBatch operationBatch, IngestCallback callback) { checkStarted(); @@ -257,6 +292,39 @@ public void setTailOffset(long offset) { this.tailOffset = offset; } + public void tailWAL() throws IOException { + List types = new ArrayList<>(); + types.add(OperationType.CREATE_VERTEX_TYPE); + types.add(OperationType.CREATE_EDGE_TYPE); + types.add(OperationType.ADD_EDGE_KIND); + types.add(OperationType.DROP_VERTEX_TYPE); + types.add(OperationType.DROP_EDGE_TYPE); + types.add(OperationType.REMOVE_EDGE_KIND); + types.add(OperationType.PREPARE_DATA_LOAD); + types.add(OperationType.COMMIT_DATA_LOAD); + try (ReadOnlyLogReader reader = (ReadOnlyLogReader) logService.createReader(queueId, 0)) { + while (!shouldStop) { + ConsumerRecords records = reader.getLatestUpdates(); + for (ConsumerRecord record : records) { + long offset = record.offset(); + LogEntry logEntry = record.value(); + OperationBatch batch = extractOperations(logEntry.getOperationBatch(), types); + long snapshotId = logEntry.getSnapshotId(); + if (batch.getOperationCount() > 0) { + long batchSnapshotId = this.ingestSnapshotId.get(); + this.batchSender.asyncSendWithRetry( + "", queueId, batchSnapshotId, offset, batch); + logger.info( + "Sent logEntry snapshot Id {}, SnapshotId {}, batch {}", + snapshotId, + batchSnapshotId, + batch.toProto()); + } + } + } + } + } + public void replayWAL(long tailOffset) throws IOException { long replayFrom = tailOffset + 1; logger.info("replay WAL of queue#[{}] from offset [{}]", queueId, replayFrom); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/IngestService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/IngestService.java index e2950d00d9c0..5ca4982efb56 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/IngestService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/IngestService.java @@ -62,7 +62,6 @@ public class IngestService implements NodeDiscovery.Listener { private Map queueToProcessor; private AtomicLong ingestSnapshotId; - private volatile boolean processorStarted; private volatile boolean storeNodeReady; private ScheduledExecutorService scheduler; private ExecutorService singleThreadExecutor; @@ -113,7 +112,6 @@ public void start() { this.ingestSnapshotId, this.metricsCollector)); } - this.processorStarted = false; this.storeNodeReady = false; this.discovery.addListener(this); this.singleThreadExecutor = @@ -132,7 +130,7 @@ public void start() { long delay = IngestorConfig.INGESTOR_CHECK_PROCESSOR_INTERVAL_MS.get(configs); this.scheduler.scheduleWithFixedDelay( - this::tryStartProcessors, 0, delay, TimeUnit.MILLISECONDS); + this::tryStartProcessors, 2000, delay, TimeUnit.MILLISECONDS); this.started = true; logger.info("IngestService started"); } @@ -247,6 +245,16 @@ public void onFailure(Exception e) { callback.onError(e); } }); + } catch (IllegalStateException e) { + if (finished.getAndSet(true)) { + return; + } + logger.warn( + "ingest marker failed. queue#{}, snapshotId {}, {}", + queue, + snapshotId, + e.getMessage()); + callback.onError(e); } catch (Exception e) { if (finished.getAndSet(true)) { return; @@ -260,7 +268,7 @@ public void onFailure(Exception e) { private void startProcessors() { this.singleThreadExecutor.execute( () -> { - if (processorStarted) { + if (isProcessorStarted()) { return; } for (IngestProcessor processor : this.queueToProcessor.values()) { @@ -276,11 +284,19 @@ private void startProcessors() { for (IngestProcessor processor : this.queueToProcessor.values()) { processor.start(); } - processorStarted = true; logger.info("processors started"); }); } + private boolean isProcessorStarted() { + for (IngestProcessor processor : this.queueToProcessor.values()) { + if (!processor.isStarted()) { + return false; + } + } + return true; + } + private void stopProcessors() { if (this.singleThreadExecutor == null) { logger.warn("no executor for stop processors, ignore"); @@ -288,11 +304,10 @@ private void stopProcessors() { } this.singleThreadExecutor.execute( () -> { - if (processorStarted) { + if (isProcessorStarted()) { for (IngestProcessor processor : this.queueToProcessor.values()) { processor.stop(); } - processorStarted = false; logger.info("processors stopped"); } }); @@ -313,6 +328,8 @@ private void tryStartProcessors() { try { if (this.storeNodeReady) { startProcessors(); + } else { + logger.warn("Store node is not ready when trying to start processors"); } } catch (Exception e) { logger.error("tryStartProcessors failed, ignore", e); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/rpc/ChannelManager.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/rpc/ChannelManager.java index e1663ac0c654..ea210c9dd5c6 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/rpc/ChannelManager.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/rpc/ChannelManager.java @@ -13,6 +13,7 @@ */ package com.alibaba.graphscope.groot.rpc; +import com.alibaba.graphscope.groot.Utils; import com.alibaba.graphscope.groot.common.RoleType; import com.alibaba.graphscope.groot.common.config.*; import com.alibaba.graphscope.groot.common.exception.NodeConnectException; @@ -33,18 +34,17 @@ public class ChannelManager { private static final Logger logger = LoggerFactory.getLogger(ChannelManager.class); public static final String SCHEME = "node"; - private Configs configs; - private NameResolver.Factory nameResolverFactory; + private final Configs configs; + private final NameResolver.Factory nameResolverFactory; - private Set targetRoles = new HashSet<>(); + private final Set targetRoles = new HashSet<>(); private Map> roleToChannels; - private int rpcMaxBytes; + private final int rpcMaxBytes; public ChannelManager(Configs configs, NameResolver.Factory nameResolverFactory) { this.configs = configs; this.nameResolverFactory = nameResolverFactory; - this.rpcMaxBytes = CommonConfig.RPC_MAX_BYTES_MB.get(configs) * 1024 * 1024; } @@ -52,72 +52,12 @@ public void start() { this.roleToChannels = new HashMap<>(); for (RoleType role : this.targetRoles) { Map idxToChannel = - this.roleToChannels.computeIfAbsent(role, k -> new HashMap<>()); - int count = - Integer.parseInt( - this.configs.get( - String.format(CommonConfig.NODE_COUNT_FORMAT, role.getName()), - "0")); - if (CommonConfig.DISCOVERY_MODE.get(configs).equalsIgnoreCase("file")) { - String hostTemplate; - int port; - switch (role) { - case FRONTEND: - hostTemplate = DiscoveryConfig.DNS_NAME_PREFIX_FRONTEND.get(configs); - port = CommonConfig.RPC_PORT.get(configs); - break; - case INGESTOR: - hostTemplate = DiscoveryConfig.DNS_NAME_PREFIX_INGESTOR.get(configs); - port = CommonConfig.RPC_PORT.get(configs); - break; - case COORDINATOR: - hostTemplate = DiscoveryConfig.DNS_NAME_PREFIX_COORDINATOR.get(configs); - port = CommonConfig.RPC_PORT.get(configs); - break; - case STORE: - hostTemplate = DiscoveryConfig.DNS_NAME_PREFIX_STORE.get(configs); - port = CommonConfig.RPC_PORT.get(configs); - break; - case EXECUTOR_GRAPH: - case EXECUTOR_MANAGE: - hostTemplate = DiscoveryConfig.DNS_NAME_PREFIX_STORE.get(configs); - port = StoreConfig.EXECUTOR_GRAPH_PORT.get(configs); - break; - case EXECUTOR_QUERY: - hostTemplate = DiscoveryConfig.DNS_NAME_PREFIX_STORE.get(configs); - port = StoreConfig.EXECUTOR_QUERY_PORT.get(configs); - break; - case EXECUTOR_ENGINE: - hostTemplate = DiscoveryConfig.DNS_NAME_PREFIX_STORE.get(configs); - port = StoreConfig.EXECUTOR_ENGINE_PORT.get(configs); - break; - case GAIA_ENGINE: - hostTemplate = DiscoveryConfig.DNS_NAME_PREFIX_STORE.get(configs); - port = GaiaConfig.GAIA_ENGINE_PORT.get(configs); - break; - case GAIA_RPC: - hostTemplate = DiscoveryConfig.DNS_NAME_PREFIX_STORE.get(configs); - port = GaiaConfig.GAIA_RPC_PORT.get(configs); - break; - default: - throw new IllegalArgumentException("invalid role [" + role + "]"); - } - for (int i = 0; i < count; i++) { - String host = hostTemplate.replace("{}", String.valueOf(i)); - logger.info( - "Create channel to role {} #{}, host {}, port {}", - role.getName(), - i, - host, - port); - ManagedChannel channel = - ManagedChannelBuilder.forAddress(host, port) - .maxInboundMessageSize(this.rpcMaxBytes) - .usePlaintext() - .build(); - idxToChannel.put(i, channel); - } - } else { + roleToChannels.computeIfAbsent(role, k -> new HashMap<>()); + String nodeCount = + configs.get(String.format(CommonConfig.NODE_COUNT_FORMAT, role.getName())); + int count = Integer.parseInt(nodeCount); + String discoveryMode = CommonConfig.DISCOVERY_MODE.get(configs).toLowerCase(); + if (discoveryMode.equals("zookeeper")) { for (int i = 0; i < count; i++) { logger.info("Create channel to role {} #{}", role.getName(), i); String uri = SCHEME + "://" + role.getName() + "/" + i; @@ -130,6 +70,7 @@ public void start() { idxToChannel.put(i, channel); } } + // channel in file discovery mode will be lazy created } logger.info("ChannelManager started"); } @@ -160,10 +101,17 @@ public ManagedChannel getChannel(RoleType role, int idx) { if (idToChannel == null) { throw new NodeConnectException("invalid role [" + role + "]"); } - ManagedChannel channel = idToChannel.get(idx); - if (channel == null) { - throw new NodeConnectException("not connected to role [" + role + "] #[" + idx + "]"); + if (idToChannel.get(idx) == null) { + String host = Utils.getHostTemplate(configs, role).replace("{}", String.valueOf(idx)); + int port = Utils.getPort(configs, role, idx); + logger.info("Create channel to {}#{}, {}:{}", role.getName(), idx, host, port); + ManagedChannel channel = + ManagedChannelBuilder.forAddress(host, port) + .maxInboundMessageSize(this.rpcMaxBytes) + .usePlaintext() + .build(); + idToChannel.put(idx, channel); } - return channel; + return idToChannel.get(idx); } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/rpc/NodeNameResolver.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/rpc/NodeNameResolver.java index f458c0b47560..05aa12cea3ad 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/rpc/NodeNameResolver.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/rpc/NodeNameResolver.java @@ -43,7 +43,7 @@ public NodeNameResolver(NodeDiscovery discovery, URI uri) { this.discovery = discovery; this.uri = uri; this.roleType = RoleType.fromName(uri.getAuthority()); - this.idx = Integer.valueOf(uri.getPath().substring(1)); + this.idx = Integer.parseInt(uri.getPath().substring(1)); } @Override @@ -53,7 +53,7 @@ public String getServiceAuthority() { @Override public void start(Listener2 listener) { - logger.info("starting resolver for role [" + roleType + "] #[" + idx + "]"); + logger.debug("starting resolver for role [{}] #[{}]", roleType, idx); this.listener = listener; this.discovery.addListener(this); } @@ -65,13 +65,7 @@ public void shutdown() { @Override public void nodesJoin(RoleType role, Map nodes) { - logger.debug( - "Add nodes " - + nodes - + " for role " - + role - + " with current role type " - + this.roleType); + logger.debug("Add nodes {} for role {}, roleType {}", nodes, role, roleType); if (role != this.roleType) { return; } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/rpc/RpcServer.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/rpc/RpcServer.java index 0615752efb78..f06530b0da8e 100755 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/rpc/RpcServer.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/rpc/RpcServer.java @@ -15,6 +15,7 @@ import static com.alibaba.graphscope.groot.common.util.RpcUtils.createGrpcExecutor; +import com.alibaba.graphscope.groot.Utils; import com.alibaba.graphscope.groot.common.config.CommonConfig; import com.alibaba.graphscope.groot.common.config.Configs; import com.alibaba.graphscope.groot.discovery.GrootNode; @@ -40,7 +41,7 @@ public class RpcServer { public RpcServer( Configs conf, LocalNodeProvider localNodeProvider, BindableService... services) { this( - CommonConfig.RPC_PORT.get(conf), + Utils.getPort(conf), CommonConfig.RPC_THREAD_COUNT.get(conf), CommonConfig.RPC_MAX_BYTES_MB.get(conf) * 1024 * 1024, localNodeProvider, diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java index 8263f198b921..8d5894554bcd 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java @@ -272,13 +272,16 @@ private Map writeStore( } } catch (Exception ex) { logger.error( - "write to partition [" - + partitionId - + "] failed, snapshotId [" - + snapshotId - + "]. will retry", + "write to partition [{}] failed, snapshotId [{}].", + partitionId, + snapshotId, ex); - batchNeedRetry.put(partitionId, batch); + String msg = "Not supported operation in secondary mode"; + if (ex.getMessage().contains(msg)) { + logger.warn("Ignored write in secondary instance, {}", msg); + } else { + batchNeedRetry.put(partitionId, batch); + } } if (counter.decrementAndGet() == 0) { future.complete(null); @@ -316,8 +319,9 @@ public void ingestData( if (!Files.isDirectory(uniquePath)) { try { Files.createDirectories(uniquePath); + logger.info("Created uniquePath {}", uniquePath); } catch (IOException e) { - logger.error("create uniquePath failed. uniquePath [" + uniquePath + "]", e); + logger.error("create uniquePath failed. uniquePath {}", uniquePath, e); callback.onError(e); return; } @@ -405,9 +409,6 @@ public void garbageCollect(long snapshotId, CompletionCallback callback) { } private void garbageCollectInternal(long snapshotId) throws IOException { - if (snapshotId % 3600 != 0) { // schedule every 1 hour - return; - } for (Map.Entry entry : this.idToPartition.entrySet()) { GraphPartition partition = entry.getValue(); partition.garbageCollect(snapshotId); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/jna/JnaGraphStore.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/jna/JnaGraphStore.java index 6d6273f9dea2..f8d14386f9d9 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/jna/JnaGraphStore.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/jna/JnaGraphStore.java @@ -44,6 +44,11 @@ public JnaGraphStore(Configs configs, int partitionId) throws IOException { Path partitionPath = Paths.get(dataRoot, "" + partitionId); this.downloadPath = Paths.get(dataRoot, "download"); this.backupPath = Paths.get(dataRoot, "backups", "" + partitionId); + String secondaryDataRoot = StoreConfig.STORE_SECONDARY_DATA_PATH.get(configs); + Path secondPath = Paths.get(secondaryDataRoot, "" + partitionId); + if (!Files.isDirectory(secondPath)) { + Files.createDirectories(secondPath); + } if (!Files.isDirectory(partitionPath)) { Files.createDirectories(partitionPath); } @@ -55,11 +60,10 @@ public JnaGraphStore(Configs configs, int partitionId) throws IOException { if (!Files.isDirectory(backupPath)) { Files.createDirectories(backupPath); } - Configs storeConfigs = - Configs.newBuilder(configs) - .put("store.data.path", partitionPath.toString()) - .build(); - byte[] configBytes = storeConfigs.toProto().toByteArray(); + Configs.Builder builder = Configs.newBuilder(configs); + builder.put(StoreConfig.STORE_DATA_PATH.getKey(), partitionPath.toString()); + builder.put(StoreConfig.STORE_SECONDARY_DATA_PATH.getKey(), secondPath.toString()); + byte[] configBytes = builder.build().toProto().toByteArray(); this.pointer = GraphLibrary.INSTANCE.openGraphStore(configBytes, configBytes.length); this.partitionId = partitionId; logger.info("JNA store opened. partition [" + partitionId + "]"); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/LogServiceFactory.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/LogServiceFactory.java new file mode 100644 index 000000000000..bf04809a1252 --- /dev/null +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/LogServiceFactory.java @@ -0,0 +1,29 @@ +/** + * Copyright 2020 Alibaba Group Holding Limited. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.graphscope.groot.wal; + +import com.alibaba.graphscope.groot.common.config.CommonConfig; +import com.alibaba.graphscope.groot.common.config.Configs; +import com.alibaba.graphscope.groot.wal.kafka.KafkaLogService; +import com.alibaba.graphscope.groot.wal.readonly.ReadOnlyLogService; + +public class LogServiceFactory { + public static LogService makeLogService(Configs configs) { + if (CommonConfig.SECONDARY_INSTANCE_ENABLED.get(configs)) { + return new ReadOnlyLogService(configs); + } else { + return new KafkaLogService(configs); + } + } +} diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/kafka/KafkaLogService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/kafka/KafkaLogService.java index b4622c3740fe..5e453db4dbd2 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/kafka/KafkaLogService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/kafka/KafkaLogService.java @@ -52,6 +52,7 @@ public KafkaLogService(Configs configs) { this.queueCount = CommonConfig.INGESTOR_QUEUE_COUNT.get(configs); this.replicationFactor = KafkaConfig.KAFKA_REPLICATION_FACTOR.get(configs); this.maxMessageMb = KafkaConfig.KAFKA_MAX_MESSAGE_MB.get(configs); + logger.info("Initialized KafkaLogService"); } @Override diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/readonly/ReadOnlyLogReader.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/readonly/ReadOnlyLogReader.java new file mode 100644 index 000000000000..584fb6c48623 --- /dev/null +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/readonly/ReadOnlyLogReader.java @@ -0,0 +1,64 @@ +/** + * Copyright 2020 Alibaba Group Holding Limited. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.graphscope.groot.wal.readonly; + +import com.alibaba.graphscope.groot.wal.LogEntry; +import com.alibaba.graphscope.groot.wal.LogReader; +import com.alibaba.graphscope.groot.wal.ReadLogEntry; +import com.alibaba.graphscope.groot.wal.kafka.LogEntryDeserializer; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.*; + +public class ReadOnlyLogReader implements LogReader { + + private static final Logger logger = LoggerFactory.getLogger(ReadOnlyLogReader.class); + + private static final LogEntryDeserializer deSer = new LogEntryDeserializer(); + private final Consumer consumer; + + public ReadOnlyLogReader(String servers, String topicName, int partitionId) throws IOException { + Map kafkaConfigs = new HashMap<>(); + kafkaConfigs.put("bootstrap.servers", servers); + + TopicPartition partition = new TopicPartition(topicName, partitionId); + + consumer = new KafkaConsumer<>(kafkaConfigs, deSer, deSer); + consumer.assign(List.of(partition)); + consumer.seekToEnd(consumer.assignment()); + logger.info("Created MockLogReader"); + } + + public ConsumerRecords getLatestUpdates() { + ConsumerRecords consumerRecords = + consumer.poll(Duration.ofMillis(1000L)); + return consumerRecords; + } + + @Override + public ReadLogEntry readNext() { + return null; + } + + @Override + public void close() throws IOException {} +} diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/readonly/ReadOnlyLogService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/readonly/ReadOnlyLogService.java new file mode 100644 index 000000000000..2508f15dbcee --- /dev/null +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/readonly/ReadOnlyLogService.java @@ -0,0 +1,66 @@ +/** + * Copyright 2020 Alibaba Group Holding Limited. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.graphscope.groot.wal.readonly; + +import com.alibaba.graphscope.groot.common.config.Configs; +import com.alibaba.graphscope.groot.common.config.KafkaConfig; +import com.alibaba.graphscope.groot.wal.LogReader; +import com.alibaba.graphscope.groot.wal.LogService; +import com.alibaba.graphscope.groot.wal.LogWriter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class ReadOnlyLogService implements LogService { + private static final Logger logger = LoggerFactory.getLogger(ReadOnlyLogService.class); + private final String servers; + private final String topic; + + public ReadOnlyLogService(Configs configs) { + this.servers = KafkaConfig.KAFKA_SERVERS.get(configs); + this.topic = KafkaConfig.KAKFA_TOPIC.get(configs); + logger.info("Initialized MockLogService"); + } + + @Override + public void init() {} + + @Override + public void destroy() {} + + @Override + public boolean initialized() { + return true; + } + + @Override + public LogWriter createWriter(int queueId) { + return new ReadOnlyLogWriter(); + } + + @Override + public LogReader createReader(int queueId, long offset) throws IOException { + return createReader(queueId, offset, -1); + } + + @Override + public LogReader createReader(int queueId, long offset, long timestamp) throws IOException { + return new ReadOnlyLogReader(servers, topic, queueId); + } + + @Override + public void deleteBeforeOffset(int queueId, long offset) throws IOException {} +} diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/readonly/ReadOnlyLogWriter.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/readonly/ReadOnlyLogWriter.java new file mode 100644 index 000000000000..0c9d860513a5 --- /dev/null +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/readonly/ReadOnlyLogWriter.java @@ -0,0 +1,34 @@ +/** + * Copyright 2020 Alibaba Group Holding Limited. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.graphscope.groot.wal.readonly; + +import com.alibaba.graphscope.groot.wal.LogEntry; +import com.alibaba.graphscope.groot.wal.LogWriter; + +import java.io.IOException; + +public class ReadOnlyLogWriter implements LogWriter { + private long offset = 0; + + public ReadOnlyLogWriter() {} + + @Override + public long append(LogEntry logEntry) throws IOException { + offset += 1; + return offset; + } + + @Override + public void close() throws IOException {} +} diff --git a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Coordinator.java b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Coordinator.java index b1f922183c74..9af8b6ccbab7 100644 --- a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Coordinator.java +++ b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Coordinator.java @@ -31,7 +31,7 @@ import com.alibaba.graphscope.groot.rpc.RpcServer; import com.alibaba.graphscope.groot.schema.ddl.DdlExecutors; import com.alibaba.graphscope.groot.wal.LogService; -import com.alibaba.graphscope.groot.wal.kafka.KafkaLogService; +import com.alibaba.graphscope.groot.wal.LogServiceFactory; import io.grpc.NameResolver; @@ -60,14 +60,13 @@ public Coordinator(Configs configs) { super(configs); configs = reConfig(configs); LocalNodeProvider localNodeProvider = new LocalNodeProvider(configs); - MetaStore metaStore; + MetaStore metaStore = new FileMetaStore(configs); if (CommonConfig.DISCOVERY_MODE.get(configs).equalsIgnoreCase("file")) { this.discovery = new FileDiscovery(configs); - metaStore = new FileMetaStore(configs); } else { this.curator = CuratorUtils.makeCurator(configs); this.discovery = new ZkDiscovery(configs, localNodeProvider, this.curator); - metaStore = new ZkMetaStore(configs, this.curator); + // metaStore = new ZkMetaStore(configs, this.curator); } NameResolver.Factory nameResolverFactory = new GrootNameResolverFactory(this.discovery); this.channelManager = new ChannelManager(configs, nameResolverFactory); @@ -80,7 +79,8 @@ public Coordinator(Configs configs) { this.channelManager, RoleType.INGESTOR, IngestorSnapshotClient::new); WriteSnapshotIdNotifier writeSnapshotIdNotifier = new IngestorWriteSnapshotIdNotifier(configs, ingestorSnapshotClients); - LogService logService = new KafkaLogService(configs); + + LogService logService = LogServiceFactory.makeLogService(configs); this.snapshotManager = new SnapshotManager(configs, metaStore, logService, writeSnapshotIdNotifier); DdlExecutors ddlExecutors = new DdlExecutors(); diff --git a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Frontend.java b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Frontend.java index 4f32812a5f25..5c5d994d8f0d 100644 --- a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Frontend.java +++ b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Frontend.java @@ -158,8 +158,9 @@ public Frontend(Configs configs) { clientWriteService, clientBackupService); + boolean isSecondary = CommonConfig.SECONDARY_INSTANCE_ENABLED.get(configs); WrappedSchemaFetcher wrappedSchemaFetcher = - new WrappedSchemaFetcher(snapshotCache, metaService); + new WrappedSchemaFetcher(snapshotCache, metaService, isSecondary); ComputeServiceProducer serviceProducer = ServiceProducerFactory.getProducer(configs); this.graphService = serviceProducer.makeGraphService(wrappedSchemaFetcher, channelManager); } diff --git a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/GrootGraph.java b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/GrootGraph.java index 80150c3b4bca..ca426f6a3636 100644 --- a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/GrootGraph.java +++ b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/GrootGraph.java @@ -14,7 +14,7 @@ package com.alibaba.graphscope.groot.servers; import com.alibaba.graphscope.groot.common.RoleType; -import com.alibaba.graphscope.groot.common.config.Configs; +import com.alibaba.graphscope.groot.common.config.*; import com.alibaba.graphscope.groot.common.exception.GrootException; import org.slf4j.Logger; @@ -28,6 +28,13 @@ public class GrootGraph { public static void main(String[] args) throws IOException { String configFile = System.getProperty("config.file"); Configs conf = new Configs(configFile); + if (CommonConfig.SECONDARY_INSTANCE_ENABLED.get(conf)) { + conf = + Configs.newBuilder(conf) + .put(StoreConfig.STORE_STORAGE_ENGINE.getKey(), "rocksdb_as_secondary") + .build(); + } + logger.info("Configs {}", conf); NodeBase node; if (args.length == 0) { diff --git a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Ingestor.java b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Ingestor.java index 6cd59533a457..8f8b5a999d96 100644 --- a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Ingestor.java +++ b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Ingestor.java @@ -35,7 +35,7 @@ import com.alibaba.graphscope.groot.rpc.GrootNameResolverFactory; import com.alibaba.graphscope.groot.rpc.RpcServer; import com.alibaba.graphscope.groot.wal.LogService; -import com.alibaba.graphscope.groot.wal.kafka.KafkaLogService; +import com.alibaba.graphscope.groot.wal.LogServiceFactory; import io.grpc.NameResolver; @@ -66,7 +66,7 @@ public Ingestor(Configs configs) { NameResolver.Factory nameResolverFactory = new GrootNameResolverFactory(this.discovery); this.channelManager = new ChannelManager(configs, nameResolverFactory); this.metaService = new DefaultMetaService(configs); - LogService logService = new KafkaLogService(configs); + LogService logService = LogServiceFactory.makeLogService(configs); IngestProgressFetcher ingestProgressClients = new RemoteIngestProgressFetcher(this.channelManager); StoreWriter storeWriteClients = diff --git a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/MaxNode.java b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/MaxNode.java index 34a85ffb7e62..ea6416c34cfd 100644 --- a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/MaxNode.java +++ b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/MaxNode.java @@ -49,52 +49,13 @@ public MaxNode(Configs configs) throws Exception { } int frontendCount = CommonConfig.FRONTEND_NODE_COUNT.get(configs); - int ingestorCount = 2; + int ingestorCount = CommonConfig.INGESTOR_NODE_COUNT.get(configs); int storeCount = CommonConfig.STORE_NODE_COUNT.get(configs); Configs baseConfigs = Configs.newBuilder(configs) .put(ZkConfig.ZK_CONNECT_STRING.getKey(), zkConnectString) .put(KafkaConfig.KAFKA_SERVERS.getKey(), kafkaServers) - .put( - CommonConfig.INGESTOR_NODE_COUNT.getKey(), - String.valueOf(ingestorCount)) - .put( - CommonConfig.INGESTOR_QUEUE_COUNT.getKey(), - String.valueOf(ingestorCount)) - .put( - String.format( - CommonConfig.NODE_COUNT_FORMAT, - RoleType.EXECUTOR_ENGINE.getName()), - String.valueOf(storeCount)) - .put( - String.format( - CommonConfig.NODE_COUNT_FORMAT, - RoleType.EXECUTOR_GRAPH.getName()), - String.valueOf(storeCount)) - .put( - String.format( - CommonConfig.NODE_COUNT_FORMAT, - RoleType.EXECUTOR_MANAGE.getName()), - String.valueOf(storeCount)) - .put( - String.format( - CommonConfig.NODE_COUNT_FORMAT, - RoleType.EXECUTOR_QUERY.getName()), - String.valueOf(storeCount)) - .put( - String.format( - CommonConfig.NODE_COUNT_FORMAT, - RoleType.GAIA_RPC.getName()), - String.valueOf(storeCount)) - .put( - String.format( - CommonConfig.NODE_COUNT_FORMAT, - RoleType.GAIA_ENGINE.getName()), - String.valueOf(storeCount)) - .put( - CommonConfig.FRONTEND_NODE_COUNT.getKey(), - String.valueOf(frontendCount)) .build(); Configs coordinatorConfigs = @@ -108,8 +69,6 @@ public MaxNode(Configs configs) throws Exception { Configs.newBuilder(baseConfigs) .put(CommonConfig.ROLE_NAME.getKey(), RoleType.FRONTEND.getName()) .put(CommonConfig.NODE_IDX.getKey(), String.valueOf(i)) - .put(CommonConfig.RPC_PORT.getKey(), "55555") - .put(FrontendConfig.FRONTEND_SERVICE_PORT.getKey(), "55556") .build(); this.frontends.add(new Frontend(frontendConfigs)); } @@ -133,6 +92,12 @@ public MaxNode(Configs configs) throws Exception { public void start() { List startThreads = new ArrayList<>(); + startThreads.add( + new Thread( + () -> { + this.coordinator.start(); + logger.info("[" + this.coordinator.getName() + "] started"); + })); for (NodeBase store : this.stores) { startThreads.add( new Thread( @@ -141,6 +106,7 @@ public void start() { logger.info("[" + store.getName() + "] started"); })); } + for (NodeBase frontend : this.frontends) { startThreads.add( new Thread( @@ -158,12 +124,6 @@ public void start() { })); } - startThreads.add( - new Thread( - () -> { - this.coordinator.start(); - logger.info("[" + this.coordinator.getName() + "] started"); - })); for (Thread startThread : startThreads) { startThread.start(); } @@ -202,7 +162,6 @@ public void close() throws IOException { public static void main(String[] args) throws Exception { String configFile = System.getProperty("config.file"); Configs conf = new Configs(configFile); - conf = Configs.newBuilder(conf).put(CommonConfig.ENGINE_TYPE.getKey(), "gaia").build(); MaxNode maxNode = new MaxNode(conf); NodeLauncher nodeLauncher = new NodeLauncher(maxNode); nodeLauncher.start(); diff --git a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/NodeBase.java b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/NodeBase.java index 5aa2dd3783d2..0527e9a02dd7 100755 --- a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/NodeBase.java +++ b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/NodeBase.java @@ -36,9 +36,6 @@ public NodeBase() { public NodeBase(Configs configs) { this.roleType = RoleType.fromName(CommonConfig.ROLE_NAME.get(configs)); this.idx = CommonConfig.NODE_IDX.get(configs); - if (idx == 0) { - logger.info("Configs {}", configs); - } } public NodeBase(Configs configs, RoleType roleType) { @@ -53,22 +50,6 @@ protected Configs reConfig(Configs configs) { int storeCount = CommonConfig.STORE_NODE_COUNT.get(configs); int ingestorCount = CommonConfig.INGESTOR_NODE_COUNT.get(configs); return Configs.newBuilder(configs) - .put( - String.format( - CommonConfig.NODE_COUNT_FORMAT, RoleType.EXECUTOR_ENGINE.getName()), - String.valueOf(storeCount)) - .put( - String.format( - CommonConfig.NODE_COUNT_FORMAT, RoleType.EXECUTOR_GRAPH.getName()), - String.valueOf(storeCount)) - .put( - String.format( - CommonConfig.NODE_COUNT_FORMAT, RoleType.EXECUTOR_MANAGE.getName()), - String.valueOf(storeCount)) - .put( - String.format( - CommonConfig.NODE_COUNT_FORMAT, RoleType.EXECUTOR_QUERY.getName()), - String.valueOf(storeCount)) .put( String.format(CommonConfig.NODE_COUNT_FORMAT, RoleType.GAIA_RPC.getName()), String.valueOf(storeCount)) diff --git a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/WrappedSchemaFetcher.java b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/WrappedSchemaFetcher.java index 7e1a4192e76d..fcec66625d2d 100644 --- a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/WrappedSchemaFetcher.java +++ b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/WrappedSchemaFetcher.java @@ -29,18 +29,23 @@ public class WrappedSchemaFetcher implements SchemaFetcher { private SnapshotCache snapshotCache; private MetaService metaService; + // If this is a secondary instance, then always use the latest snapshot ID. + private boolean isSecondary; - public WrappedSchemaFetcher(SnapshotCache snapshotCache, MetaService metaService) { + private long MAX_SNAPSHOT_ID = Long.MAX_VALUE - 1; + + public WrappedSchemaFetcher( + SnapshotCache snapshotCache, MetaService metaService, boolean isSecondary) { this.snapshotCache = snapshotCache; this.metaService = metaService; + this.isSecondary = isSecondary; } @Override public Map getSchemaSnapshotPair() { SnapshotWithSchema snapshotSchema = this.snapshotCache.getSnapshotWithSchema(); - long snapshotId = snapshotSchema.getSnapshotId(); + long snapshotId = isSecondary ? MAX_SNAPSHOT_ID : snapshotSchema.getSnapshotId(); GraphSchema schema = snapshotSchema.getGraphDef(); - logger.debug("fetch schema of snapshot id [" + snapshotId + "]"); return Map.of(snapshotId, schema); } diff --git a/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/gremlin/GrootGraph.java b/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/gremlin/GrootGraph.java index b03317ce38d8..0823e8f9626d 100644 --- a/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/gremlin/GrootGraph.java +++ b/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/gremlin/GrootGraph.java @@ -17,7 +17,7 @@ import com.alibaba.graphscope.gremlin.plugin.traversal.IrCustomizedTraversalSource; import com.alibaba.graphscope.groot.common.config.CommonConfig; import com.alibaba.graphscope.groot.common.config.Configs; -import com.alibaba.graphscope.groot.common.config.GremlinConfig; +import com.alibaba.graphscope.groot.common.config.FrontendConfig; import com.alibaba.graphscope.groot.common.exception.GrootException; import com.alibaba.graphscope.groot.sdk.GrootClient; import com.alibaba.graphscope.groot.sdk.schema.Edge; @@ -72,7 +72,7 @@ public GrootGraph(Configs configs) { // This is to ensure the frontend can communicate some RPC after start, to make the // graphdef not null anymore, in snapshotCache. Thread.sleep(3000); - int port = GremlinConfig.GREMLIN_PORT.get(configs); + int port = FrontendConfig.GREMLIN_SERVER_PORT.get(configs); this.cluster = createCluster("localhost", port); this.ddlClient = GrootClient.newBuilder().addHost("localhost", 55556).build(); this.remoteConnection = DriverRemoteConnection.using(cluster); diff --git a/python/graphscope/deploy/kubernetes/resource_builder.py b/python/graphscope/deploy/kubernetes/resource_builder.py index 7976adb0e2a5..429aecf3e2c1 100644 --- a/python/graphscope/deploy/kubernetes/resource_builder.py +++ b/python/graphscope/deploy/kubernetes/resource_builder.py @@ -65,7 +65,7 @@ def get_role_binding(name, namespace, role_name, service_account_name, labels): role_ref = kube_client.V1RoleRef( kind="Role", name=role_name, api_group="rbac.authorization.k8s.io" ) - subject = kube_client.V1Subject( + subject = kube_client.RbacV1Subject( kind="ServiceAccount", name=service_account_name, namespace=namespace ) role_binding = kube_client.V1RoleBinding( @@ -81,7 +81,7 @@ def get_cluster_role_binding( role_ref = kube_client.V1RoleRef( kind="ClusterRole", name=role_name, api_group="rbac.authorization.k8s.io" ) - subject = kube_client.V1Subject( + subject = kube_client.RbacV1Subject( kind="ServiceAccount", name=service_account_name, namespace=namespace ) role_binding = kube_client.V1ClusterRoleBinding(