From bb91e5cae087759b6b1a39f4e72967bc922b8422 Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Fri, 22 Sep 2023 14:16:41 +0800 Subject: [PATCH] fix(interactive): Fix several bugs in data-load-tools (#3249) 1. Fix that partspec is not included in the table identifier that caused label id could not be correctly retrieved. 2. Disable the backup instance of reducer, which could truncated the file that uploaded to volume. 3. Limit the total size of log files. --- .../graphscope-store/templates/configmap.yaml | 2 +- .../assembly/src/bin/groot/store_ctl.sh | 4 +- .../groot/common/util/PkHashUtils.java | 8 +- .../dataload/databuild/DataBuildMapper.java | 136 ++--------- .../databuild/DataBuildMapperOdps.java | 120 ++-------- .../databuild/DataBuildMapperOdpsDebug.java | 151 ++++++++++++ .../databuild/DataBuildReducerOdps.java | 1 + .../groot/dataload/databuild/DataEncoder.java | 62 ++++- .../dataload/databuild/OfflineBuild.java | 96 +++----- .../dataload/databuild/OfflineBuildOdps.java | 177 +++----------- .../databuild/OfflineBuildOdpsDebug.java | 98 ++++++++ .../dataload/databuild/SstOutputFormat.java | 7 +- .../dataload/databuild/SstRecordWriter.java | 10 +- .../groot/dataload/databuild/Utils.java | 218 ++++++++++++++++++ .../groot/store/external/ExternalStorage.java | 4 + 15 files changed, 661 insertions(+), 433 deletions(-) create mode 100644 interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdpsDebug.java create mode 100644 interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdpsDebug.java create mode 100644 interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/Utils.java diff --git a/charts/graphscope-store/templates/configmap.yaml b/charts/graphscope-store/templates/configmap.yaml index 6daf8bb522c1..081fa9465e7c 100644 --- a/charts/graphscope-store/templates/configmap.yaml +++ b/charts/graphscope-store/templates/configmap.yaml @@ -25,7 +25,7 @@ data: ingestor.node.count={{ .Values.ingestor.replicaCount }} coordinator.node.count={{ .Values.coordinator.replicaCount }} ingestor.queue.count={{ .Values.ingestor.replicaCount }} - partition.count={{ .Values.store.replicaCount | mul 128 }} + partition.count={{ .Values.store.replicaCount | mul 16 }} engine.type={{ .Values.engineType }} discovery.mode={{ .Values.discoveryMode }} diff --git a/interactive_engine/assembly/src/bin/groot/store_ctl.sh b/interactive_engine/assembly/src/bin/groot/store_ctl.sh index 4d14d6a4d62a..bcc3e3b03a08 100755 --- a/interactive_engine/assembly/src/bin/groot/store_ctl.sh +++ b/interactive_engine/assembly/src/bin/groot/store_ctl.sh @@ -98,7 +98,7 @@ start_server() { -XX:+PrintGCApplicationStoppedTime -Xloggc:${LOG_DIR}/${LOG_NAME}.gc.log -XX:+UseGCLogFileRotation - -XX:NumberOfGCLogFiles=32 + -XX:NumberOfGCLogFiles=4 -XX:GCLogFileSize=64m" java ${java_opt} \ @@ -107,7 +107,7 @@ start_server() { -Dlog.dir="${LOG_DIR}" \ -Dlog.name="${LOG_NAME}" \ -cp "${libpath}" com.alibaba.graphscope.groot.servers.GrootGraph \ - "$@" > >(tee -a "${LOG_DIR}/${LOG_NAME}.out") 2> >(tee -a "${LOG_DIR}/${LOG_NAME}.err" >&2) + "$@" # > >(tee -a "${LOG_DIR}/${LOG_NAME}.out") 2> >(tee -a "${LOG_DIR}/${LOG_NAME}.err" >&2) } # parse argv diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/PkHashUtils.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/PkHashUtils.java index df2c3cbe4bba..3fe3d737c188 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/PkHashUtils.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/PkHashUtils.java @@ -43,12 +43,12 @@ public static long hash(int labelId, List pks) { } /** - * Generates 64 bit hash from byte array of the given length and seed. + * Generates 64-bit hash from byte array of the given length and seed. * * @param data byte array to hash * @param length length of the array to hash * @param seed initial seed value - * @return 64 bit hash of the given array + * @return 64-bit hash of the given array */ private static long hash64(final byte[] data, int length, int seed) { final long m = 0xc6a4a7935bd1e995L; @@ -103,11 +103,11 @@ private static long hash64(final byte[] data, int length, int seed) { } /** - * Generates 64 bit hash from byte array with default seed value. + * Generates 64-bit hash from byte array with default seed value. * * @param data byte array to hash * @param length length of the array to hash - * @return 64 bit hash of the given string + * @return 64-bit hash of the given string */ private static long hash64(final byte[] data, int length) { return hash64(data, length, 0xc70f6907); diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapper.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapper.java index 36a6ba8558be..0a7506bab3b0 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapper.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapper.java @@ -14,10 +14,8 @@ package com.alibaba.graphscope.groot.dataload.databuild; import com.alibaba.graphscope.groot.common.config.DataLoadConfig; -import com.alibaba.graphscope.groot.common.exception.PropertyDefNotFoundException; import com.alibaba.graphscope.groot.common.schema.api.*; import com.alibaba.graphscope.groot.common.schema.mapper.GraphSchemaMapper; -import com.alibaba.graphscope.groot.common.schema.wrapper.DataType; import com.alibaba.graphscope.groot.common.schema.wrapper.PropertyValue; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; @@ -32,17 +30,11 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.text.ParseException; -import java.text.SimpleDateFormat; import java.util.*; public class DataBuildMapper extends Mapper { private static final Logger logger = LoggerFactory.getLogger(DataBuildMapper.class); - public static final SimpleDateFormat SRC_FMT = - new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); - public static final SimpleDateFormat DST_FMT = new SimpleDateFormat("yyyyMMddHHmmssSSS"); - private GraphSchema graphSchema; private DataEncoder dataEncoder; private String separator; @@ -50,7 +42,6 @@ public class DataBuildMapper extends Mapper>() {}); - this.ldbcCustomize = conf.getBoolean(DataLoadConfig.LDBC_CUSTOMIZE, false); this.skipHeader = conf.getBoolean(DataLoadConfig.SKIP_HEADER, true); - DST_FMT.setTimeZone(TimeZone.getTimeZone("GMT+00:00")); } @Override @@ -78,122 +67,41 @@ protected void map(LongWritable key, Text value, Context context) } String fullPath = context.getConfiguration().get(MRJobConfig.MAP_INPUT_FILE); String fileName = fullPath.substring(fullPath.lastIndexOf('/') + 1); - ColumnMappingInfo columnMappingInfo = this.fileToColumnMappingInfo.get(fileName); - if (columnMappingInfo == null) { + ColumnMappingInfo info = this.fileToColumnMappingInfo.get(fileName); + if (info == null) { logger.warn("Mapper: ignore [{}], fullPath is [{}]", fileName, fullPath); return; } - int labelId = columnMappingInfo.getLabelId(); - long tableId = columnMappingInfo.getTableId(); - Map propertiesColumnMapping = columnMappingInfo.getPropertiesColMap(); String[] items = value.toString().split(separator); + + int labelId = info.getLabelId(); + long tableId = info.getTableId(); GraphElement type = this.graphSchema.getElement(labelId); - Map propertiesMap = - buildPropertiesMap(type, items, propertiesColumnMapping); - BytesRef valRef = this.dataEncoder.encodeProperties(labelId, propertiesMap); + Map colMap = info.getPropertiesColMap(); + Map properties = Utils.buildProperties(type, items, colMap); + + BytesRef valRef = dataEncoder.encodeProperties(labelId, properties); this.outVal.set(valRef.getArray(), valRef.getOffset(), valRef.getLength()); + if (type instanceof GraphVertex) { - BytesRef keyBytesRef = - this.dataEncoder.encodeVertexKey((GraphVertex) type, propertiesMap, tableId); - this.outKey.set( - keyBytesRef.getArray(), keyBytesRef.getOffset(), keyBytesRef.getLength()); + BytesRef keyRef = + Utils.getVertexKeyRef(dataEncoder, (GraphVertex) type, properties, tableId); + this.outKey.set(keyRef.getArray(), keyRef.getOffset(), keyRef.getLength()); context.write(this.outKey, this.outVal); } else if (type instanceof GraphEdge) { - int srcLabelId = columnMappingInfo.getSrcLabelId(); - Map srcPkColMap = columnMappingInfo.getSrcPkColMap(); - GraphElement srcType = this.graphSchema.getElement(srcLabelId); - Map srcPkMap = buildPropertiesMap(srcType, items, srcPkColMap); - - int dstLabelId = columnMappingInfo.getDstLabelId(); - Map dstPkColMap = columnMappingInfo.getDstPkColMap(); - GraphElement dstType = this.graphSchema.getElement(dstLabelId); - Map dstPkMap = buildPropertiesMap(dstType, items, dstPkColMap); - - BytesRef outEdgeKeyRef = - this.dataEncoder.encodeEdgeKey( - (GraphVertex) srcType, - srcPkMap, - (GraphVertex) dstType, - dstPkMap, - (GraphEdge) type, - propertiesMap, - tableId, - true); - this.outKey.set( - outEdgeKeyRef.getArray(), outEdgeKeyRef.getOffset(), outEdgeKeyRef.getLength()); + BytesRef out = + Utils.getEdgeKeyRef( + dataEncoder, graphSchema, info, items, properties, tableId, true); + this.outKey.set(out.getArray(), out.getOffset(), out.getLength()); context.write(this.outKey, this.outVal); - BytesRef inEdgeKeyRef = - this.dataEncoder.encodeEdgeKey( - (GraphVertex) srcType, - srcPkMap, - (GraphVertex) dstType, - dstPkMap, - (GraphEdge) type, - propertiesMap, - tableId, - false); - this.outKey.set( - inEdgeKeyRef.getArray(), inEdgeKeyRef.getOffset(), inEdgeKeyRef.getLength()); + BytesRef in = + Utils.getEdgeKeyRef( + dataEncoder, graphSchema, info, items, properties, tableId, false); + this.outKey.set(in.getArray(), in.getOffset(), in.getLength()); context.write(this.outKey, this.outVal); } else { - throw new IllegalArgumentException( - "invalid label [" + labelId + "], only support VertexType and EdgeType"); - } - } - - private Map buildPropertiesMap( - GraphElement typeDef, String[] items, Map columnMapping) { - Map operationProperties = new HashMap<>(columnMapping.size()); - columnMapping.forEach( - (colIdx, propertyId) -> { - GraphProperty propertyDef = typeDef.getProperty(propertyId); - if (propertyDef == null) { - throw new PropertyDefNotFoundException( - "property [" - + propertyId - + "] not found in [" - + typeDef.getLabel() - + "]"); - } - if (colIdx >= items.length) { - throw new IllegalArgumentException( - "label [" - + typeDef.getLabel() - + "], invalid mapping [" - + colIdx - + "] -> [" - + propertyId - + "], data [" - + Arrays.toString(items) - + "]"); - } - DataType dataType = propertyDef.getDataType(); - - String val = items[colIdx]; - if (ldbcCustomize) { - String name = propertyDef.getName(); - switch (name) { - case "creationDate": - case "joinDate": - val = convertDate(val); - break; - case "birthday": - val = val.replace("-", ""); - break; - } - } - PropertyValue propertyValue = new PropertyValue(dataType, val); - operationProperties.put(propertyId, propertyValue); - }); - return operationProperties; - } - - public static String convertDate(String input) { - try { - return DST_FMT.format(SRC_FMT.parse(input)); - } catch (ParseException e) { - throw new RuntimeException(e); + throw new IllegalArgumentException("Invalid label " + labelId); } } } diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdps.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdps.java index 6ad5929772f6..41352370704a 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdps.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdps.java @@ -16,12 +16,11 @@ package com.alibaba.graphscope.groot.dataload.databuild; import com.alibaba.graphscope.groot.common.config.DataLoadConfig; -import com.alibaba.graphscope.groot.common.exception.PropertyDefNotFoundException; import com.alibaba.graphscope.groot.common.schema.api.*; import com.alibaba.graphscope.groot.common.schema.mapper.GraphSchemaMapper; -import com.alibaba.graphscope.groot.common.schema.wrapper.DataType; import com.alibaba.graphscope.groot.common.schema.wrapper.PropertyValue; import com.aliyun.odps.data.Record; +import com.aliyun.odps.data.TableInfo; import com.aliyun.odps.mapred.MapperBase; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; @@ -66,114 +65,41 @@ public void setup(TaskContext context) throws IOException { @Override public void map(long recordNum, Record record, TaskContext context) throws IOException { - String tableName = context.getInputTableInfo().getTableName(); - ColumnMappingInfo columnMappingInfo = this.fileToColumnMappingInfo.get(tableName); - if (columnMappingInfo == null) { - logger.warn( - "Mapper: ignore [{}], table info is [{}]", - tableName, - context.getInputTableInfo()); + TableInfo tableInfo = context.getInputTableInfo(); + String identifier = tableInfo.getTableName() + "|" + tableInfo.getPartPath(); + ColumnMappingInfo info = this.fileToColumnMappingInfo.get(identifier); + if (info == null) { + logger.warn("Mapper: ignore [{}], table info: [{}]", identifier, tableInfo); return; } + String[] items = Utils.parseRecords(record); - int labelId = columnMappingInfo.getLabelId(); - long tableId = columnMappingInfo.getTableId(); - int columnCount = record.getColumnCount(); - String[] items = new String[columnCount]; - for (int i = 0; i < columnCount; i++) { - if (record.get(i) == null) { - // items[i] = ""; - items[i] = null; - } else { - items[i] = record.get(i).toString(); - } - } - Map propertiesColumnMapping = columnMappingInfo.getPropertiesColMap(); + int labelId = info.getLabelId(); + long tableId = info.getTableId(); GraphElement type = this.graphSchema.getElement(labelId); - Map propertiesMap = - buildPropertiesMap(type, items, propertiesColumnMapping); - BytesRef valRef = this.dataEncoder.encodeProperties(labelId, propertiesMap); + Map colMap = info.getPropertiesColMap(); + Map properties = Utils.buildProperties(type, items, colMap); + + BytesRef valRef = this.dataEncoder.encodeProperties(labelId, properties); outVal.set(new Object[] {new String(valRef.getBytes(), charSet)}); if (type instanceof GraphVertex) { BytesRef keyRef = - this.dataEncoder.encodeVertexKey((GraphVertex) type, propertiesMap, tableId); + Utils.getVertexKeyRef(dataEncoder, (GraphVertex) type, properties, tableId); outKey.set(new Object[] {new String(keyRef.getBytes(), charSet)}); context.write(outKey, outVal); } else if (type instanceof GraphEdge) { - int srcLabelId = columnMappingInfo.getSrcLabelId(); - Map srcPkColMap = columnMappingInfo.getSrcPkColMap(); - GraphElement srcType = this.graphSchema.getElement(srcLabelId); - Map srcPkMap = buildPropertiesMap(srcType, items, srcPkColMap); - - int dstLabelId = columnMappingInfo.getDstLabelId(); - Map dstPkColMap = columnMappingInfo.getDstPkColMap(); - GraphElement dstType = this.graphSchema.getElement(dstLabelId); - Map dstPkMap = buildPropertiesMap(dstType, items, dstPkColMap); - - BytesRef outEdgeKeyRef = - this.dataEncoder.encodeEdgeKey( - (GraphVertex) srcType, - srcPkMap, - (GraphVertex) dstType, - dstPkMap, - (GraphEdge) type, - propertiesMap, - tableId, - true); - outKey.set(new Object[] {new String(outEdgeKeyRef.getBytes(), charSet)}); + BytesRef out = + Utils.getEdgeKeyRef( + dataEncoder, graphSchema, info, items, properties, tableId, true); + outKey.set(new Object[] {new String(out.getBytes(), charSet)}); context.write(outKey, outVal); - BytesRef inEdgeKeyRef = - this.dataEncoder.encodeEdgeKey( - (GraphVertex) srcType, - srcPkMap, - (GraphVertex) dstType, - dstPkMap, - (GraphEdge) type, - propertiesMap, - tableId, - false); - outKey.set(new Object[] {new String(inEdgeKeyRef.getBytes(), charSet)}); + BytesRef in = + Utils.getEdgeKeyRef( + dataEncoder, graphSchema, info, items, properties, tableId, false); + outKey.set(new Object[] {new String(in.getBytes(), charSet)}); context.write(outKey, outVal); } else { - throw new IllegalArgumentException( - "invalid label [" + labelId + "], only support VertexType and EdgeType"); + throw new IllegalArgumentException("Invalid label " + labelId); } } - - private Map buildPropertiesMap( - GraphElement typeDef, String[] items, Map columnMapping) { - Map operationProperties = new HashMap<>(columnMapping.size()); - columnMapping.forEach( - (colIdx, propertyId) -> { - GraphProperty propertyDef = typeDef.getProperty(propertyId); - if (propertyDef == null) { - throw new PropertyDefNotFoundException( - "property [" - + propertyId - + "] not found in [" - + typeDef.getLabel() - + "]"); - } - if (colIdx >= items.length) { - throw new IllegalArgumentException( - "label [" - + typeDef.getLabel() - + "], invalid mapping [" - + colIdx - + "] -> [" - + propertyId - + "], data [" - + Arrays.toString(items) - + "]"); - } - String val = items[colIdx]; - PropertyValue propertyValue = null; - if (val != null) { - DataType dataType = propertyDef.getDataType(); - propertyValue = new PropertyValue(dataType, val); - } - operationProperties.put(propertyId, propertyValue); - }); - return operationProperties; - } } diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdpsDebug.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdpsDebug.java new file mode 100644 index 000000000000..ba2b9a42577f --- /dev/null +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdpsDebug.java @@ -0,0 +1,151 @@ +/** + * Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.graphscope.groot.dataload.databuild; + +import com.alibaba.graphscope.groot.common.config.DataLoadConfig; +import com.alibaba.graphscope.groot.common.schema.api.*; +import com.alibaba.graphscope.groot.common.schema.mapper.GraphSchemaMapper; +import com.alibaba.graphscope.groot.common.schema.wrapper.PropertyValue; +import com.alibaba.graphscope.groot.common.util.SchemaUtils; +import com.aliyun.odps.data.Record; +import com.aliyun.odps.data.TableInfo; +import com.aliyun.odps.mapred.MapperBase; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +public class DataBuildMapperOdpsDebug extends MapperBase { + private static final Logger logger = LoggerFactory.getLogger(DataBuildMapperOdpsDebug.class); + private GraphSchema graphSchema; + private DataEncoder dataEncoder; + private Map fileToColumnMappingInfo; + + private Record outKey; + + @Override + public void setup(TaskContext context) throws IOException { + outKey = context.createOutputRecord(); + + String metaData = context.getJobConf().get(DataLoadConfig.META_INFO); + ObjectMapper objectMapper = new ObjectMapper(); + Map metaMap = + objectMapper.readValue(metaData, new TypeReference>() {}); + String schemaJson = metaMap.get(DataLoadConfig.SCHEMA_JSON); + graphSchema = GraphSchemaMapper.parseFromJson(schemaJson).toGraphSchema(); + dataEncoder = new DataEncoder(graphSchema); + String columnMappingsJson = metaMap.get(DataLoadConfig.COLUMN_MAPPINGS); + fileToColumnMappingInfo = + objectMapper.readValue( + columnMappingsJson, new TypeReference>() {}); + } + + @Override + public void map(long recordNum, Record record, TaskContext context) throws IOException { + TableInfo tableInfo = context.getInputTableInfo(); + String identifier = tableInfo.getTableName() + "|" + tableInfo.getPartPath(); + ColumnMappingInfo info = this.fileToColumnMappingInfo.get(identifier); + if (info == null) { + logger.warn("Mapper: ignore [{}], table info: [{}]", identifier, tableInfo); + return; + } + + String[] items = Utils.parseRecords(record); + + int labelId = info.getLabelId(); + long tableId = info.getTableId(); + GraphElement type = this.graphSchema.getElement(labelId); + Map colMap = info.getPropertiesColMap(); + Map properties = Utils.buildProperties(type, items, colMap); + if (type instanceof GraphVertex) { + outKey.set(1, getVertexRawKeys((GraphVertex) type, colMap, items)); + BytesRef keyRef = + Utils.getVertexKeyRef(dataEncoder, (GraphVertex) type, properties, tableId); + outKey.set(0, getVertexKeyEncoded(keyRef)); + context.write(outKey); + } else if (type instanceof GraphEdge) { + outKey.set(1, getEdgeRawKeys(info, items)); + BytesRef out = + Utils.getEdgeKeyRef( + dataEncoder, graphSchema, info, items, properties, tableId, true); + outKey.set(0, getEdgeKeyEncoded(out)); + context.write(outKey); + BytesRef in = + Utils.getEdgeKeyRef( + dataEncoder, graphSchema, info, items, properties, tableId, false); + outKey.set(0, getEdgeKeyEncoded(in)); + context.write(outKey); + } else { + throw new IllegalArgumentException("Invalid label " + labelId); + } + } + + private String getVertexRawKeys(GraphVertex type, Map colMap, String[] items) + throws IOException { + List pkIds = SchemaUtils.getVertexPrimaryKeyList(type); + List indices = new ArrayList<>(); + colMap.forEach( + (idx, propId) -> { + if (pkIds.contains(propId)) { + indices.add(idx); + } + }); + return concatenateItemsByIndices(items, indices); + } + + private String getEdgeRawKeys(ColumnMappingInfo info, String[] items) throws IOException { + Map srcPkColMap = info.getSrcPkColMap(); + Map dstPkColMap = info.getDstPkColMap(); + + List pkIds = new ArrayList<>(srcPkColMap.keySet()); + pkIds.addAll(dstPkColMap.keySet()); + return concatenateItemsByIndices(items, pkIds); + } + + private static String concatenateItemsByIndices(String[] array, List indices) + throws IOException { + StringBuilder builder = new StringBuilder(); + if (indices.isEmpty()) { + throw new IOException("indices are empty!"); + } + for (int index : indices) { + builder.append(array[index]).append(","); + } + builder.deleteCharAt(builder.length() - 1); + return builder.toString(); + } + + private String getVertexKeyEncoded(BytesRef keyRef) { + ByteBuffer buffer = ByteBuffer.wrap(keyRef.getBytes()); + long tableId = buffer.getLong(0); + long hashId = buffer.getLong(8); + return tableId + "/" + hashId; + } + + private String getEdgeKeyEncoded(BytesRef keyRef) { + ByteBuffer buffer = ByteBuffer.wrap(keyRef.getBytes()); + long tableId = buffer.getLong(0); + long srcHashId = buffer.getLong(8); + long dstHashId = buffer.getLong(16); + long eid = buffer.getLong(24); + return tableId + "/" + srcHashId + "/" + dstHashId + "/" + eid; + } +} diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildReducerOdps.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildReducerOdps.java index 099cea3a3b79..c32a8c53b2e0 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildReducerOdps.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildReducerOdps.java @@ -97,6 +97,7 @@ public void cleanup(TaskContext context) throws IOException { } String chkData = sstFileEmpty ? "0" : "1," + getFileMD5(sstFileName); + System.out.println("Checksum: " + chkData); writeFile(chkFileName, chkData); fs.copy(chkFileName, Paths.get(uniquePath, chkFileName).toString()); diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataEncoder.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataEncoder.java index 9f38ff1f9060..ec14593ca650 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataEncoder.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataEncoder.java @@ -18,16 +18,14 @@ import com.alibaba.graphscope.groot.common.schema.api.GraphElement; import com.alibaba.graphscope.groot.common.schema.api.GraphSchema; import com.alibaba.graphscope.groot.common.schema.api.GraphVertex; +import com.alibaba.graphscope.groot.common.schema.wrapper.DataType; import com.alibaba.graphscope.groot.common.schema.wrapper.PropertyValue; import com.alibaba.graphscope.groot.common.util.PkHashUtils; import com.alibaba.graphscope.groot.common.util.SchemaUtils; import java.nio.Buffer; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; public class DataEncoder { @@ -104,7 +102,7 @@ public BytesRef encodeProperties(int labelId, Map proper return new BytesRef(scratch.array(), 0, scratch.limit()); } - private long getHashId( + private static long getHashId( int labelId, Map operationProperties, List pkIds) { List pks = new ArrayList<>(pkIds.size()); for (int pkId : pkIds) { @@ -137,4 +135,58 @@ public static void clear(Buffer buffer) { public static void flip(Buffer buffer) { buffer.flip(); } + + // For testing + + private static void encode(int labelId, long tableId, String pk) { + Map propertiesMap = new HashMap<>(); + propertiesMap.put(0, new PropertyValue(DataType.STRING, pk)); + List pkIds = new ArrayList<>(); + pkIds.add(0); + ByteBuffer scratch = ByteBuffer.allocate(1 << 20); + long hashId = getHashId(labelId, propertiesMap, pkIds); + scratch.putLong(tableId << 1); + scratch.putLong(hashId); + scratch.flip(); + System.out.println("TableId: " + scratch.getLong(0) + " | " + scratch.getLong(8)); + } + + private static void encodeTest(int id1, String s1, int id2, String s2, long expected) { + DataEncoder.encodeTest(id1, 0, s1, id2, 0, s2, 0, expected); + } + + private static void encodeTest( + int id1, + long tableId1, + String s1, + int id2, + long tableId2, + String s2, + long expectedTableId, + long expectedHash) { + DataEncoder.encode(id1, tableId1, s1); + DataEncoder.encode(id2, tableId2, s2); + System.out.println("Expected TableId: " + expectedTableId + "; HashId: " + expectedHash); + } + + public static void main(String[] args) { + long expectedHash1 = -3968787722979159891L; + long expectedTable1 = -9223372036854775698L; + int labelId1a = 11; + String s1a = "20230911@META_COLUMN@1@22073998@879591@265939259@4592566627"; + long tableId1 = -4611686018427387857L; + int labelId1b = 11; + String s1b = "20230911@META_INDEX@1@16229181@824771@232467064@551697979"; + long tableId2 = -4611686018427387854L; + + long hashId2 = -6575735951890802946L; + int labelId2a = 5; + String s2a = "db24f92331e97257e4a0ad4ffc464de2M01"; + int labelId2b = 5; + String s2b = "889569174b748f2f4514bf6547f12760M01"; + + encodeTest( + labelId1a, tableId1, s1a, labelId1b, tableId2, s1b, expectedTable1, expectedHash1); + encodeTest(labelId2a, s2a, labelId2b, s2b, hashId2); + } } diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuild.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuild.java index 2f219030c1c0..f0c883b1e03d 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuild.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuild.java @@ -14,15 +14,12 @@ package com.alibaba.graphscope.groot.dataload.databuild; import com.alibaba.graphscope.groot.common.config.DataLoadConfig; -import com.alibaba.graphscope.groot.common.schema.api.GraphEdge; -import com.alibaba.graphscope.groot.common.schema.api.GraphElement; import com.alibaba.graphscope.groot.common.schema.api.GraphSchema; import com.alibaba.graphscope.groot.common.schema.mapper.GraphSchemaMapper; import com.alibaba.graphscope.groot.common.schema.wrapper.GraphDef; import com.alibaba.graphscope.groot.common.util.UuidUtils; import com.alibaba.graphscope.groot.sdk.GrootClient; import com.alibaba.graphscope.proto.groot.DataLoadTargetPb; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.hadoop.conf.Configuration; @@ -55,8 +52,7 @@ public static void main(String[] args) } String inputPath = properties.getProperty(DataLoadConfig.INPUT_PATH); String outputPath = properties.getProperty(DataLoadConfig.OUTPUT_PATH); - String columnMappingConfigStr = - properties.getProperty(DataLoadConfig.COLUMN_MAPPING_CONFIG); + String configStr = properties.getProperty(DataLoadConfig.COLUMN_MAPPING_CONFIG); String graphEndpoint = properties.getProperty(DataLoadConfig.GRAPH_ENDPOINT); String uniquePath = @@ -65,61 +61,38 @@ public static void main(String[] args) String username = properties.getProperty(DataLoadConfig.USER_NAME, ""); String password = properties.getProperty(DataLoadConfig.PASS_WORD, ""); - GrootClient client = - GrootClient.newBuilder() - .setHosts(graphEndpoint) - .setUsername(username) - .setPassword(password) - .build(); - ObjectMapper objectMapper = new ObjectMapper(); - Map columnMappingConfig = - objectMapper.readValue( - columnMappingConfigStr, - new TypeReference>() {}); - - List targets = new ArrayList<>(); - for (FileColumnMapping fileColumnMapping : columnMappingConfig.values()) { - DataLoadTargetPb.Builder builder = DataLoadTargetPb.newBuilder(); - builder.setLabel(fileColumnMapping.getLabel()); - if (fileColumnMapping.getSrcLabel() != null) { - builder.setSrcLabel(fileColumnMapping.getSrcLabel()); - } - if (fileColumnMapping.getDstLabel() != null) { - builder.setDstLabel(fileColumnMapping.getDstLabel()); - } - targets.add(builder.build()); - } + GrootClient client = Utils.getClient(graphEndpoint, username, password); + + Map columnMappingConfig = Utils.parseColumnMapping(configStr); + List targets = Utils.getDataLoadTargets(columnMappingConfig); GraphSchema schema = GraphDef.parseProto(client.prepareDataLoad(targets)); - String schemaJson = GraphSchemaMapper.parseFromSchema(schema).toJsonString(); int partitionNum = client.getPartitionNum(); - Map columnMappingInfos = new HashMap<>(); + Map info = new HashMap<>(); columnMappingConfig.forEach( (fileName, fileColumnMapping) -> { - columnMappingInfos.put(fileName, fileColumnMapping.toColumnMappingInfo(schema)); + info.put(fileName, fileColumnMapping.toColumnMappingInfo(schema)); }); - String ldbcCustomize = properties.getProperty(DataLoadConfig.LDBC_CUSTOMIZE, "true"); - long splitSize = - Long.parseLong(properties.getProperty(DataLoadConfig.SPLIT_SIZE, "256")) - * 1024 - * 1024; - boolean loadAfterBuild = - properties - .getProperty(DataLoadConfig.LOAD_AFTER_BUILD, "false") - .equalsIgnoreCase("true"); - boolean skipHeader = - properties.getProperty(DataLoadConfig.SKIP_HEADER, "true").equalsIgnoreCase("true"); + + String _tmp = properties.getProperty(DataLoadConfig.SPLIT_SIZE, "256"); + long splitSize = Long.parseLong(_tmp) * 1024 * 1024; + _tmp = properties.getProperty(DataLoadConfig.LOAD_AFTER_BUILD, "false"); + boolean loadAfterBuild = Utils.parseBoolean(_tmp); + _tmp = properties.getProperty(DataLoadConfig.SKIP_HEADER, "true"); + boolean skipHeader = Utils.parseBoolean(_tmp); String separator = properties.getProperty(DataLoadConfig.SEPARATOR, "\\|"); + ObjectMapper mapper = new ObjectMapper(); + String schemaJson = GraphSchemaMapper.parseFromSchema(schema).toJsonString(); + Configuration conf = new Configuration(); conf.setBoolean("mapreduce.map.speculative", false); conf.setBoolean("mapreduce.reduce.speculative", false); conf.setLong(CombineTextInputFormat.SPLIT_MINSIZE_PERNODE, splitSize); conf.setLong(CombineTextInputFormat.SPLIT_MINSIZE_PERRACK, splitSize); conf.setStrings(DataLoadConfig.SCHEMA_JSON, schemaJson); - String mappings = objectMapper.writeValueAsString(columnMappingInfos); + String mappings = mapper.writeValueAsString(info); conf.setStrings(DataLoadConfig.COLUMN_MAPPINGS, mappings); - conf.setBoolean(DataLoadConfig.LDBC_CUSTOMIZE, ldbcCustomize.equalsIgnoreCase("true")); conf.set(DataLoadConfig.SEPARATOR, separator); conf.setBoolean(DataLoadConfig.SKIP_HEADER, skipHeader); Job job = Job.getInstance(conf, "build graph data"); @@ -150,34 +123,25 @@ public static void main(String[] args) FileSystem fs = outputDir.getFileSystem(job.getConfiguration()); FSDataOutputStream os = fs.create(new Path(outputDir, "META")); - os.writeUTF(objectMapper.writeValueAsString(outputMeta)); + os.writeUTF(mapper.writeValueAsString(outputMeta)); os.flush(); os.close(); if (loadAfterBuild) { String dataPath = fs.makeQualified(outputDir).toString(); - - logger.info("start ingesting data"); - client.ingestData(dataPath); - - logger.info("commit bulk load"); - Map tableToTarget = new HashMap<>(); - for (ColumnMappingInfo columnMappingInfo : columnMappingInfos.values()) { - long tableId = columnMappingInfo.getTableId(); - int labelId = columnMappingInfo.getLabelId(); - GraphElement graphElement = schema.getElement(labelId); - String label = graphElement.getLabel(); - DataLoadTargetPb.Builder builder = DataLoadTargetPb.newBuilder(); - builder.setLabel(label); - if (graphElement instanceof GraphEdge) { - builder.setSrcLabel( - schema.getElement(columnMappingInfo.getSrcLabelId()).getLabel()); - builder.setDstLabel( - schema.getElement(columnMappingInfo.getDstLabelId()).getLabel()); + logger.info("start ingesting data from " + dataPath); + try { + client.ingestData(dataPath); + logger.info("start committing bulk load"); + Map tableToTarget = Utils.getTableToTargets(schema, info); + client.commitDataLoad(tableToTarget, uniquePath); + } finally { + try { + client.clearIngest(uniquePath); + } catch (Exception e) { + logger.warn("Clear ingest failed, ignored"); } - tableToTarget.put(tableId, builder.build()); } - client.commitDataLoad(tableToTarget, uniquePath); } } } diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java index 79a98ba8722a..6bdd55e3d1a3 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java @@ -14,10 +14,7 @@ package com.alibaba.graphscope.groot.dataload.databuild; import com.alibaba.graphscope.groot.common.config.DataLoadConfig; -import com.alibaba.graphscope.groot.common.schema.api.GraphEdge; -import com.alibaba.graphscope.groot.common.schema.api.GraphElement; import com.alibaba.graphscope.groot.common.schema.api.GraphSchema; -import com.alibaba.graphscope.groot.common.schema.api.GraphVertex; import com.alibaba.graphscope.groot.common.schema.mapper.GraphSchemaMapper; import com.alibaba.graphscope.groot.common.schema.wrapper.GraphDef; import com.alibaba.graphscope.groot.common.util.UuidUtils; @@ -25,15 +22,14 @@ import com.alibaba.graphscope.groot.dataload.util.VolumeFS; import com.alibaba.graphscope.groot.sdk.GrootClient; import com.alibaba.graphscope.proto.groot.DataLoadTargetPb; +import com.alibaba.graphscope.proto.groot.GraphDefPb; import com.aliyun.odps.Odps; -import com.aliyun.odps.data.TableInfo; import com.aliyun.odps.mapred.JobClient; import com.aliyun.odps.mapred.conf.JobConf; import com.aliyun.odps.mapred.conf.SessionState; import com.aliyun.odps.mapred.utils.InputUtils; import com.aliyun.odps.mapred.utils.OutputUtils; import com.aliyun.odps.mapred.utils.SchemaUtils; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; @@ -46,7 +42,6 @@ public class OfflineBuildOdps { private static final Logger logger = LoggerFactory.getLogger(OfflineBuildOdps.class); - private static Odps odps; public static void main(String[] args) throws IOException { @@ -58,17 +53,8 @@ public static void main(String[] args) throws IOException { } odps = SessionState.get().getOdps(); - String columnMappingConfigStr = - properties.getProperty(DataLoadConfig.COLUMN_MAPPING_CONFIG); - // User could specify a list of `key=value` pairs in command line, - // to substitute variables like `${key}` in columnMappingConfigStr - for (int i = 1; i < args.length; ++i) { - String[] kv = args[i].split("="); - if (kv.length == 2) { - String key = "${" + kv[0] + "}"; - columnMappingConfigStr = columnMappingConfigStr.replace(key, kv[1]); - } - } + String configStr = properties.getProperty(DataLoadConfig.COLUMN_MAPPING_CONFIG); + configStr = Utils.replaceVars(configStr, Arrays.copyOfRange(args, 1, args.length)); String graphEndpoint = properties.getProperty(DataLoadConfig.GRAPH_ENDPOINT); String username = properties.getProperty(DataLoadConfig.USER_NAME, ""); @@ -77,70 +63,29 @@ public static void main(String[] args) throws IOException { String uniquePath = properties.getProperty(DataLoadConfig.UNIQUE_PATH, UuidUtils.getBase64UUIDString()); - ObjectMapper objectMapper = new ObjectMapper(); - Map columnMappingConfig = - objectMapper.readValue( - columnMappingConfigStr, - new TypeReference>() {}); + Map mappingConfig = Utils.parseColumnMapping(configStr); - List targets = new ArrayList<>(); - for (FileColumnMapping fileColumnMapping : columnMappingConfig.values()) { - DataLoadTargetPb.Builder builder = DataLoadTargetPb.newBuilder(); - builder.setLabel(fileColumnMapping.getLabel()); - if (fileColumnMapping.getSrcLabel() != null) { - builder.setSrcLabel(fileColumnMapping.getSrcLabel()); - } - if (fileColumnMapping.getDstLabel() != null) { - builder.setDstLabel(fileColumnMapping.getDstLabel()); - } - targets.add(builder.build()); - } + List targets = Utils.getDataLoadTargets(mappingConfig); - GrootClient client = - GrootClient.newBuilder() - .setHosts(graphEndpoint) - .setUsername(username) - .setPassword(password) - .build(); + GrootClient client = Utils.getClient(graphEndpoint, username, password); + + GraphDefPb graphDefPb = client.prepareDataLoad(targets); + System.out.println("GraphDef: " + graphDefPb); + GraphSchema schema = GraphDef.parseProto(graphDefPb); - GraphSchema schema = GraphDef.parseProto(client.prepareDataLoad(targets)); - String schemaJson = GraphSchemaMapper.parseFromSchema(schema).toJsonString(); // number of reduce task int partitionNum = client.getPartitionNum(); - - Map tableType = new HashMap<>(); - - Map columnMappingInfos = new HashMap<>(); - columnMappingConfig.forEach( - (fileName, fileColumnMapping) -> { - ColumnMappingInfo columnMappingInfo = - fileColumnMapping.toColumnMappingInfo(schema); - // Note the project and partition is stripped (if exists) - columnMappingInfos.put(getTableName(fileName), columnMappingInfo); - tableType.put(fileName, schema.getElement(columnMappingInfo.getLabelId())); - }); long splitSize = Long.parseLong(properties.getProperty(DataLoadConfig.SPLIT_SIZE, "256")); JobConf job = new JobConf(); - String mappings = objectMapper.writeValueAsString(columnMappingInfos); - // Avoid java sandbox protection - job.set("odps.isolation.session.enable", "true"); - // Don't introduce legacy jar files - job.set("odps.sql.udf.java.retain.legacy", "false"); - // Default priority is 9 - job.setInstancePriority(0); - job.set("odps.mr.run.mode", "sql"); - job.set("odps.mr.sql.group.enable", "true"); + job.set("odps.isolation.session.enable", "true"); // Avoid java sandbox protection + job.set("odps.sql.udf.java.retain.legacy", "false"); // exclude legacy jar files + job.setInstancePriority(0); // Default priority is 9 + // Disable backups from competing with original instances + job.set("odps.sql.backupinstance.enabled", "false"); job.setFunctionTimeout(2400); - job.setMemoryForReducerJVM(2048); - - for (Map.Entry entry : tableType.entrySet()) { - if (entry.getValue() instanceof GraphVertex || entry.getValue() instanceof GraphEdge) { - String name = entry.getKey(); - InputUtils.addTable(parseTableURL(name), job); - } - } + job.setMemoryForReducerJVM(4096); job.setMapperClass(DataBuildMapperOdps.class); job.setReducerClass(DataBuildReducerOdps.class); @@ -150,6 +95,9 @@ public static void main(String[] args) throws IOException { job.setMapOutputKeySchema(SchemaUtils.fromString("key:string")); job.setMapOutputValueSchema(SchemaUtils.fromString("value:string")); + mappingConfig.forEach( + (name, x) -> InputUtils.addTable(Utils.parseTableURL(odps, name), job)); + String dataSinkType = properties.getProperty(DataLoadConfig.DATA_SINK_TYPE, "VOLUME"); Map config; String fullQualifiedDataPath; @@ -168,20 +116,25 @@ public static void main(String[] args) throws IOException { fullQualifiedDataPath = fs.getQualifiedPath(); } String outputTable = properties.getProperty(DataLoadConfig.OUTPUT_TABLE); - OutputUtils.addTable(parseTableURL(outputTable), job); + OutputUtils.addTable(Utils.parseTableURL(odps, outputTable), job); } else if (dataSinkType.equalsIgnoreCase("HDFS")) { throw new IOException("HDFS as a data sink is not supported in ODPS"); } else { throw new IOException("Unsupported data sink: " + dataSinkType); } + + String schemaJson = GraphSchemaMapper.parseFromSchema(schema).toJsonString(); + Map info = Utils.getMappingInfo(odps, schema, mappingConfig); + ObjectMapper mapper = new ObjectMapper(); + Map outputMeta = new HashMap<>(); outputMeta.put(DataLoadConfig.GRAPH_ENDPOINT, graphEndpoint); outputMeta.put(DataLoadConfig.SCHEMA_JSON, schemaJson); - outputMeta.put(DataLoadConfig.COLUMN_MAPPINGS, mappings); + outputMeta.put(DataLoadConfig.COLUMN_MAPPINGS, mapper.writeValueAsString(info)); outputMeta.put(DataLoadConfig.UNIQUE_PATH, uniquePath); outputMeta.put(DataLoadConfig.DATA_SINK_TYPE, dataSinkType); - job.set(DataLoadConfig.META_INFO, objectMapper.writeValueAsString(outputMeta)); + job.set(DataLoadConfig.META_INFO, mapper.writeValueAsString(outputMeta)); job.set(DataLoadConfig.DATA_SINK_TYPE, dataSinkType); try { JobClient.runJob(job); @@ -189,10 +142,8 @@ public static void main(String[] args) throws IOException { throw new IOException(e); } - boolean loadAfterBuild = - properties - .getProperty(DataLoadConfig.LOAD_AFTER_BUILD, "false") - .equalsIgnoreCase("true"); + String _tmp = properties.getProperty(DataLoadConfig.LOAD_AFTER_BUILD, "false"); + boolean loadAfterBuild = Utils.parseBoolean(_tmp); if (loadAfterBuild) { fullQualifiedDataPath = fullQualifiedDataPath + uniquePath; logger.info("start ingesting data from " + fullQualifiedDataPath); @@ -201,71 +152,15 @@ public static void main(String[] args) throws IOException { try { client.ingestData(fullQualifiedDataPath, config); logger.info("start committing bulk load"); - Map tableToTarget = new HashMap<>(); - for (ColumnMappingInfo columnMappingInfo : columnMappingInfos.values()) { - long tableId = columnMappingInfo.getTableId(); - int labelId = columnMappingInfo.getLabelId(); - GraphElement graphElement = schema.getElement(labelId); - String label = graphElement.getLabel(); - DataLoadTargetPb.Builder builder = DataLoadTargetPb.newBuilder(); - builder.setLabel(label); - if (graphElement instanceof GraphEdge) { - builder.setSrcLabel( - schema.getElement(columnMappingInfo.getSrcLabelId()).getLabel()); - builder.setDstLabel( - schema.getElement(columnMappingInfo.getDstLabelId()).getLabel()); - } - tableToTarget.put(tableId, builder.build()); - } + Map tableToTarget = Utils.getTableToTargets(schema, info); client.commitDataLoad(tableToTarget, uniquePath); - - } catch (Exception ex) { - logger.error("Failed to ingest/commit data", ex); - client.clearIngest(uniquePath); - throw ex; - } - try { - client.clearIngest(uniquePath); - } catch (Exception ex) { - logger.error("Clear ingest failed, ignored."); + } finally { + try { + client.clearIngest(uniquePath); + } catch (Exception e) { + logger.warn("Clear ingest failed, ignored"); + } } } } - - private static String getTableName(String tableFullName) { - TableInfo info = parseTableURL(tableFullName); - return info.getTableName(); - } - - /** - * Parse table URL to @TableInfo - * @param url the pattern of [projectName.]tableName[|partitionSpec] - * @return TableInfo - */ - private static TableInfo parseTableURL(String url) { - String projectName = odps.getDefaultProject(); - String tableName; - String partitionSpec = null; - if (url.contains(".")) { - String[] items = url.split("\\."); - projectName = items[0]; - tableName = items[1]; - } else { - tableName = url; - } - if (tableName.contains("|")) { - String[] items = tableName.split("\\|"); - tableName = items[0]; - partitionSpec = items[1]; - } - - TableInfo.TableInfoBuilder builder = TableInfo.builder(); - builder.projectName(projectName); - - builder.tableName(tableName); - if (partitionSpec != null) { - builder.partSpec(partitionSpec); - } - return builder.build(); - } } diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdpsDebug.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdpsDebug.java new file mode 100644 index 000000000000..53ef826d0643 --- /dev/null +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdpsDebug.java @@ -0,0 +1,98 @@ +/** + * Copyright 2020 Alibaba Group Holding Limited. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.graphscope.groot.dataload.databuild; + +import com.alibaba.graphscope.groot.common.config.DataLoadConfig; +import com.alibaba.graphscope.groot.common.schema.api.GraphSchema; +import com.alibaba.graphscope.groot.common.schema.mapper.GraphSchemaMapper; +import com.alibaba.graphscope.groot.common.schema.wrapper.GraphDef; +import com.alibaba.graphscope.groot.sdk.GrootClient; +import com.alibaba.graphscope.proto.groot.DataLoadTargetPb; +import com.alibaba.graphscope.proto.groot.GraphDefPb; +import com.aliyun.odps.Odps; +import com.aliyun.odps.OdpsException; +import com.aliyun.odps.mapred.JobClient; +import com.aliyun.odps.mapred.conf.JobConf; +import com.aliyun.odps.mapred.conf.SessionState; +import com.aliyun.odps.mapred.utils.InputUtils; +import com.aliyun.odps.mapred.utils.OutputUtils; +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.*; + +public class OfflineBuildOdpsDebug { + private static final Logger logger = LoggerFactory.getLogger(OfflineBuildOdpsDebug.class); + + private static Odps odps; + + public static void main(String[] args) throws IOException, OdpsException { + String propertiesFile = args[0]; + + Properties properties = new Properties(); + try (InputStream is = new FileInputStream(propertiesFile)) { + properties.load(is); + } + odps = SessionState.get().getOdps(); + + String configStr = properties.getProperty(DataLoadConfig.COLUMN_MAPPING_CONFIG); + configStr = Utils.replaceVars(configStr, Arrays.copyOfRange(args, 1, args.length)); + + String graphEndpoint = properties.getProperty(DataLoadConfig.GRAPH_ENDPOINT); + String username = properties.getProperty(DataLoadConfig.USER_NAME, ""); + String password = properties.getProperty(DataLoadConfig.PASS_WORD, ""); + + Map mappingConfig = Utils.parseColumnMapping(configStr); + + List targets = Utils.getDataLoadTargets(mappingConfig); + + GrootClient client = Utils.getClient(graphEndpoint, username, password); + + GraphDefPb graphDefPb = client.prepareDataLoad(targets); + System.out.println("GraphDef: " + graphDefPb); + + JobConf job = new JobConf(); + + job.set("odps.isolation.session.enable", "true"); // Avoid java sandbox protection + job.set("odps.sql.udf.java.retain.legacy", "false"); // exclude legacy jar files + job.setInstancePriority(0); // Default priority is 9 + job.set("odps.mr.run.mode", "sql"); + job.set("odps.mr.sql.group.enable", "true"); + + job.setMapperClass(DataBuildMapperOdpsDebug.class); + job.setNumReduceTasks(0); + + mappingConfig.forEach( + (name, x) -> InputUtils.addTable(Utils.parseTableURL(odps, name), job)); + String outputTable = properties.getProperty(DataLoadConfig.OUTPUT_TABLE); + OutputUtils.addTable(Utils.parseTableURL(odps, outputTable), job); + GraphSchema schema = GraphDef.parseProto(graphDefPb); + + String schemaJson = GraphSchemaMapper.parseFromSchema(schema).toJsonString(); + Map info = Utils.getMappingInfo(odps, schema, mappingConfig); + + ObjectMapper mapper = new ObjectMapper(); + Map outputMeta = new HashMap<>(); + outputMeta.put(DataLoadConfig.SCHEMA_JSON, schemaJson); + outputMeta.put(DataLoadConfig.COLUMN_MAPPINGS, mapper.writeValueAsString(info)); + job.set(DataLoadConfig.META_INFO, mapper.writeValueAsString(outputMeta)); + System.out.println(mapper.writeValueAsString(outputMeta)); + JobClient.runJob(job); + } +} diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/SstOutputFormat.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/SstOutputFormat.java index 3821a75940d4..34b0baccd02c 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/SstOutputFormat.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/SstOutputFormat.java @@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.ByteBuffer; public class SstOutputFormat extends FileOutputFormat { @@ -60,7 +61,11 @@ public void write(BytesWritable key, BytesWritable value) throws IOException { try { sstFileWriter.put(key.copyBytes(), value.copyBytes()); } catch (RocksDBException e) { - throw new IOException(e); + ByteBuffer buffer = ByteBuffer.wrap(key.copyBytes()); + long tableId = buffer.getLong(0) >> 1; + long hashId = buffer.getLong(8); + throw new IOException( + "Write SST Error! TableId: [" + tableId + "], hashId: [" + hashId + "]", e); } } diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/SstRecordWriter.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/SstRecordWriter.java index 226f83f5177a..211f20ab5720 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/SstRecordWriter.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/SstRecordWriter.java @@ -18,6 +18,7 @@ import org.rocksdb.*; import java.io.IOException; +import java.nio.ByteBuffer; public class SstRecordWriter { private final SstFileWriter sstFileWriter; @@ -41,10 +42,15 @@ public SstRecordWriter(String fileName, String charSet) throws IOException { } public void write(String key, String value) throws IOException { + byte[] keyBytes = key.getBytes(charSet); try { - sstFileWriter.put(key.getBytes(charSet), value.getBytes(charSet)); + sstFileWriter.put(keyBytes, value.getBytes(charSet)); } catch (RocksDBException e) { - throw new IOException(e); + ByteBuffer buffer = ByteBuffer.wrap(keyBytes); + long tableId = buffer.getLong(0) >> 1; + long hashId = buffer.getLong(8); + throw new IOException( + "Write SST Error! TableId: [" + tableId + "], hashId: [" + hashId + "]", e); } this.isEmpty = false; } diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/Utils.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/Utils.java new file mode 100644 index 000000000000..dcee9e8e2113 --- /dev/null +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/Utils.java @@ -0,0 +1,218 @@ +package com.alibaba.graphscope.groot.dataload.databuild; + +import com.alibaba.graphscope.groot.common.exception.PropertyDefNotFoundException; +import com.alibaba.graphscope.groot.common.schema.api.*; +import com.alibaba.graphscope.groot.common.schema.wrapper.PropertyValue; +import com.alibaba.graphscope.groot.sdk.GrootClient; +import com.alibaba.graphscope.proto.groot.DataLoadTargetPb; +import com.aliyun.odps.Odps; +import com.aliyun.odps.data.Record; +import com.aliyun.odps.data.TableInfo; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.*; + +public class Utils { + // For Main Job + static List getDataLoadTargets( + Map columnMappingConfig) { + List targets = new ArrayList<>(); + for (FileColumnMapping fileColumnMapping : columnMappingConfig.values()) { + DataLoadTargetPb.Builder builder = DataLoadTargetPb.newBuilder(); + builder.setLabel(fileColumnMapping.getLabel()); + if (fileColumnMapping.getSrcLabel() != null) { + builder.setSrcLabel(fileColumnMapping.getSrcLabel()); + } + if (fileColumnMapping.getDstLabel() != null) { + builder.setDstLabel(fileColumnMapping.getDstLabel()); + } + targets.add(builder.build()); + } + return targets; + } + + public static Map parseColumnMapping(String str) + throws JsonProcessingException { + ObjectMapper m = new ObjectMapper(); + return m.readValue(str, new TypeReference>() {}); + } + + public static Map getMappingInfo( + Odps odps, GraphSchema schema, Map config) { + Map columnMappingInfo = new HashMap<>(); + config.forEach( + (fileName, fileColumnMapping) -> { + ColumnMappingInfo subInfo = fileColumnMapping.toColumnMappingInfo(schema); + // Note the project and partition is stripped (if exists) + columnMappingInfo.put(Utils.getTableIdentifier(odps, fileName), subInfo); + }); + return columnMappingInfo; + } + + public static String getTableIdentifier(Odps odps, String tableFullName) { + TableInfo info = parseTableURL(odps, tableFullName); + return info.getTableName() + "|" + info.getPartPath(); + } + + /** + * Parse table URL to @TableInfo + * @param url the pattern of [projectName.]tableName[|partitionSpec] + * @return TableInfo + */ + public static TableInfo parseTableURL(Odps odps, String url) { + String projectName = odps.getDefaultProject(); + String tableName; + String partitionSpec = null; + if (url.contains(".")) { + String[] items = url.split("\\."); + projectName = items[0]; + tableName = items[1]; + } else { + tableName = url; + } + if (tableName.contains("|")) { + String[] items = tableName.split("\\|"); + tableName = items[0]; + partitionSpec = items[1]; + } + + TableInfo.TableInfoBuilder builder = TableInfo.builder(); + builder.projectName(projectName); + + builder.tableName(tableName); + if (partitionSpec != null) { + builder.partSpec(partitionSpec); + } + return builder.build(); + } + + public static Map getTableToTargets( + GraphSchema schema, Map info) { + Map tableToTarget = new HashMap<>(); + for (ColumnMappingInfo columnMappingInfo : info.values()) { + long tableId = columnMappingInfo.getTableId(); + int labelId = columnMappingInfo.getLabelId(); + GraphElement graphElement = schema.getElement(labelId); + String label = graphElement.getLabel(); + DataLoadTargetPb.Builder builder = DataLoadTargetPb.newBuilder(); + builder.setLabel(label); + if (graphElement instanceof GraphEdge) { + builder.setSrcLabel( + schema.getElement(columnMappingInfo.getSrcLabelId()).getLabel()); + builder.setDstLabel( + schema.getElement(columnMappingInfo.getDstLabelId()).getLabel()); + } + tableToTarget.put(tableId, builder.build()); + } + return tableToTarget; + } + + public static GrootClient getClient(String endpoint, String username, String password) { + return GrootClient.newBuilder() + .setHosts(endpoint) + .setUsername(username) + .setPassword(password) + .build(); + } + + public static boolean parseBoolean(String str) { + return str.equalsIgnoreCase("true"); + } + + public static String replaceVars(String str, String[] args) { + // User could specify a list of `key=value` pairs in command line, + // to substitute variables like `${key}` in configStr + for (String arg : args) { + String[] kv = arg.split("="); + if (kv.length == 2) { + String key = "${" + kv[0] + "}"; + str = str.replace(key, kv[1]); + } + } + return str; + } + + /// For Mapper + public static Map buildProperties( + GraphElement typeDef, String[] items, Map mapping) { + Map operationProperties = new HashMap<>(mapping.size()); + mapping.forEach( + (colIdx, propertyId) -> { + GraphProperty prop = typeDef.getProperty(propertyId); + String label = typeDef.getLabel(); + String msg = "Label [" + label + "]: "; + if (prop == null) { + msg += "propertyId [" + propertyId + "] not found"; + throw new PropertyDefNotFoundException(msg); + } + if (colIdx >= items.length) { + msg += "Invalid mapping [" + colIdx + "]->[" + propertyId + "]"; + msg += "Data: " + Arrays.toString(items); + throw new IllegalArgumentException(msg); + } + PropertyValue propertyValue = null; + if (items[colIdx] != null) { + propertyValue = new PropertyValue(prop.getDataType(), items[colIdx]); + } + operationProperties.put(propertyId, propertyValue); + }); + return operationProperties; + } + + public static BytesRef getVertexKeyRef( + DataEncoder encoder, + GraphVertex type, + Map properties, + long tableId) { + return encoder.encodeVertexKey(type, properties, tableId); + } + + public static BytesRef getEdgeKeyRef( + DataEncoder encoder, + GraphSchema schema, + ColumnMappingInfo info, + String[] items, + Map properties, + long tableId, + boolean forward) { + int labelId = info.getLabelId(); + GraphEdge type = (GraphEdge) schema.getElement(labelId); + + int srcLabelId = info.getSrcLabelId(); + Map srcPkColMap = info.getSrcPkColMap(); + GraphVertex srcType = (GraphVertex) schema.getElement(srcLabelId); + Map srcPkMap = Utils.buildProperties(srcType, items, srcPkColMap); + + int dstLabelId = info.getDstLabelId(); + Map dstPkColMap = info.getDstPkColMap(); + GraphVertex dstType = (GraphVertex) schema.getElement(dstLabelId); + Map dstPkMap = Utils.buildProperties(dstType, items, dstPkColMap); + + return encoder.encodeEdgeKey( + srcType, srcPkMap, dstType, dstPkMap, type, properties, tableId, forward); + } + + public static String[] parseRecords(Record record) { + String[] items = new String[record.getColumnCount()]; + for (int i = 0; i < items.length; i++) { + Object curRecord = record.get(i); + items[i] = curRecord == null ? null : curRecord.toString(); + } + return items; + } + + public static String convertDateForLDBC(String input) { + SimpleDateFormat SRC_FMT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + SimpleDateFormat DST_FMT = new SimpleDateFormat("yyyyMMddHHmmssSSS"); + DST_FMT.setTimeZone(TimeZone.getTimeZone("GMT+00:00")); + try { + return DST_FMT.format(SRC_FMT.parse(input)); + } catch (ParseException e) { + throw new RuntimeException(e); + } + } +} diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/ExternalStorage.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/ExternalStorage.java index c8d82a82be0d..1a435a0f3421 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/ExternalStorage.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/ExternalStorage.java @@ -71,6 +71,10 @@ public void downloadData(String srcPath, String dstPath) throws IOException { chkFile.delete(); return; } + if (chkArray.length != 2) { + throw new IOException( + "Checksum format error: content: [" + chkArray + "]; path: " + chkPath); + } String chkMD5Value = chkArray[1]; downloadDataSimple(srcPath, dstPath); String sstMD5Value = getFileMD5(dstPath);