From a9e7666f4d8378eb2f270b6954ed2f43092d8e89 Mon Sep 17 00:00:00 2001 From: siyuan0322 Date: Fri, 15 Sep 2023 14:10:16 +0800 Subject: [PATCH 1/5] data-load-tool enhancement --- .../graphscope/groot/common/util/PkHashUtils.java | 8 ++++---- .../groot/dataload/databuild/DataBuildMapperOdps.java | 2 +- .../groot/dataload/databuild/DataBuildReducerOdps.java | 1 + .../groot/dataload/databuild/DataEncoder.java | 3 +++ .../groot/dataload/databuild/OfflineBuildOdps.java | 8 ++++++-- .../groot/dataload/databuild/SstOutputFormat.java | 7 ++++++- .../groot/dataload/databuild/SstRecordWriter.java | 10 ++++++++-- .../groot/store/external/ExternalStorage.java | 4 ++++ 8 files changed, 33 insertions(+), 10 deletions(-) diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/PkHashUtils.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/PkHashUtils.java index df2c3cbe4bba..3fe3d737c188 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/PkHashUtils.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/PkHashUtils.java @@ -43,12 +43,12 @@ public static long hash(int labelId, List pks) { } /** - * Generates 64 bit hash from byte array of the given length and seed. + * Generates 64-bit hash from byte array of the given length and seed. * * @param data byte array to hash * @param length length of the array to hash * @param seed initial seed value - * @return 64 bit hash of the given array + * @return 64-bit hash of the given array */ private static long hash64(final byte[] data, int length, int seed) { final long m = 0xc6a4a7935bd1e995L; @@ -103,11 +103,11 @@ private static long hash64(final byte[] data, int length, int seed) { } /** - * Generates 64 bit hash from byte array with default seed value. + * Generates 64-bit hash from byte array with default seed value. * * @param data byte array to hash * @param length length of the array to hash - * @return 64 bit hash of the given string + * @return 64-bit hash of the given string */ private static long hash64(final byte[] data, int length) { return hash64(data, length, 0xc70f6907); diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdps.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdps.java index 6ad5929772f6..20a98887aea6 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdps.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdps.java @@ -82,7 +82,7 @@ public void map(long recordNum, Record record, TaskContext context) throws IOExc String[] items = new String[columnCount]; for (int i = 0; i < columnCount; i++) { if (record.get(i) == null) { - // items[i] = ""; + // items[i] = ""; items[i] = null; } else { items[i] = record.get(i).toString(); diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildReducerOdps.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildReducerOdps.java index 099cea3a3b79..c32a8c53b2e0 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildReducerOdps.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildReducerOdps.java @@ -97,6 +97,7 @@ public void cleanup(TaskContext context) throws IOException { } String chkData = sstFileEmpty ? "0" : "1," + getFileMD5(sstFileName); + System.out.println("Checksum: " + chkData); writeFile(chkFileName, chkData); fs.copy(chkFileName, Paths.get(uniquePath, chkFileName).toString()); diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataEncoder.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataEncoder.java index 9f38ff1f9060..4204bdcc1acb 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataEncoder.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataEncoder.java @@ -50,6 +50,9 @@ public BytesRef encodeVertexKey( scratch.putLong(tableId << 1); scratch.putLong(hashId); scratch.putLong(SNAPSHOT_ID); + // System.out.println("EncodeVertexKey: labelId:" + type.getLabelId() + "; tableId:" + + // tableId); + // System.out.println("scratch: [" + (tableId << 1) + "],[" + hashId + "]"); flip(scratch); return new BytesRef(scratch.array(), 0, scratch.limit()); } diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java index 79a98ba8722a..27953b3afeee 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java @@ -25,6 +25,7 @@ import com.alibaba.graphscope.groot.dataload.util.VolumeFS; import com.alibaba.graphscope.groot.sdk.GrootClient; import com.alibaba.graphscope.proto.groot.DataLoadTargetPb; +import com.alibaba.graphscope.proto.groot.GraphDefPb; import com.aliyun.odps.Odps; import com.aliyun.odps.data.TableInfo; import com.aliyun.odps.mapred.JobClient; @@ -103,7 +104,10 @@ public static void main(String[] args) throws IOException { .setPassword(password) .build(); - GraphSchema schema = GraphDef.parseProto(client.prepareDataLoad(targets)); + GraphDefPb graphDefPb = client.prepareDataLoad(targets); + System.out.println("GraphDef: " + graphDefPb); + GraphSchema schema = GraphDef.parseProto(graphDefPb); + String schemaJson = GraphSchemaMapper.parseFromSchema(schema).toJsonString(); // number of reduce task int partitionNum = client.getPartitionNum(); @@ -133,7 +137,7 @@ public static void main(String[] args) throws IOException { job.set("odps.mr.run.mode", "sql"); job.set("odps.mr.sql.group.enable", "true"); job.setFunctionTimeout(2400); - job.setMemoryForReducerJVM(2048); + job.setMemoryForReducerJVM(4096); for (Map.Entry entry : tableType.entrySet()) { if (entry.getValue() instanceof GraphVertex || entry.getValue() instanceof GraphEdge) { diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/SstOutputFormat.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/SstOutputFormat.java index 3821a75940d4..34b0baccd02c 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/SstOutputFormat.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/SstOutputFormat.java @@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.ByteBuffer; public class SstOutputFormat extends FileOutputFormat { @@ -60,7 +61,11 @@ public void write(BytesWritable key, BytesWritable value) throws IOException { try { sstFileWriter.put(key.copyBytes(), value.copyBytes()); } catch (RocksDBException e) { - throw new IOException(e); + ByteBuffer buffer = ByteBuffer.wrap(key.copyBytes()); + long tableId = buffer.getLong(0) >> 1; + long hashId = buffer.getLong(8); + throw new IOException( + "Write SST Error! TableId: [" + tableId + "], hashId: [" + hashId + "]", e); } } diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/SstRecordWriter.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/SstRecordWriter.java index 226f83f5177a..211f20ab5720 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/SstRecordWriter.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/SstRecordWriter.java @@ -18,6 +18,7 @@ import org.rocksdb.*; import java.io.IOException; +import java.nio.ByteBuffer; public class SstRecordWriter { private final SstFileWriter sstFileWriter; @@ -41,10 +42,15 @@ public SstRecordWriter(String fileName, String charSet) throws IOException { } public void write(String key, String value) throws IOException { + byte[] keyBytes = key.getBytes(charSet); try { - sstFileWriter.put(key.getBytes(charSet), value.getBytes(charSet)); + sstFileWriter.put(keyBytes, value.getBytes(charSet)); } catch (RocksDBException e) { - throw new IOException(e); + ByteBuffer buffer = ByteBuffer.wrap(keyBytes); + long tableId = buffer.getLong(0) >> 1; + long hashId = buffer.getLong(8); + throw new IOException( + "Write SST Error! TableId: [" + tableId + "], hashId: [" + hashId + "]", e); } this.isEmpty = false; } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/ExternalStorage.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/ExternalStorage.java index c8d82a82be0d..1a435a0f3421 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/ExternalStorage.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/ExternalStorage.java @@ -71,6 +71,10 @@ public void downloadData(String srcPath, String dstPath) throws IOException { chkFile.delete(); return; } + if (chkArray.length != 2) { + throw new IOException( + "Checksum format error: content: [" + chkArray + "]; path: " + chkPath); + } String chkMD5Value = chkArray[1]; downloadDataSimple(srcPath, dstPath); String sstMD5Value = getFileMD5(dstPath); From 547a10c0786d4be64aa3aaf91b2deedc2e5efa18 Mon Sep 17 00:00:00 2001 From: siyuan0322 Date: Thu, 21 Sep 2023 16:58:28 +0800 Subject: [PATCH 2/5] Revised data load tools, add debug entrance --- .../graphscope-store/templates/configmap.yaml | 2 +- .../dataload/databuild/DataBuildMapper.java | 136 ++--------- .../databuild/DataBuildMapperOdps.java | 120 ++-------- .../databuild/DataBuildMapperOdpsDebug.java | 144 ++++++++++++ .../groot/dataload/databuild/DataEncoder.java | 65 +++++- .../dataload/databuild/OfflineBuild.java | 96 +++----- .../dataload/databuild/OfflineBuildOdps.java | 168 +++----------- .../databuild/OfflineBuildOdpsDebug.java | 97 ++++++++ .../groot/dataload/databuild/Utils.java | 218 ++++++++++++++++++ 9 files changed, 621 insertions(+), 425 deletions(-) create mode 100644 interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdpsDebug.java create mode 100644 interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdpsDebug.java create mode 100644 interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/Utils.java diff --git a/charts/graphscope-store/templates/configmap.yaml b/charts/graphscope-store/templates/configmap.yaml index 6daf8bb522c1..081fa9465e7c 100644 --- a/charts/graphscope-store/templates/configmap.yaml +++ b/charts/graphscope-store/templates/configmap.yaml @@ -25,7 +25,7 @@ data: ingestor.node.count={{ .Values.ingestor.replicaCount }} coordinator.node.count={{ .Values.coordinator.replicaCount }} ingestor.queue.count={{ .Values.ingestor.replicaCount }} - partition.count={{ .Values.store.replicaCount | mul 128 }} + partition.count={{ .Values.store.replicaCount | mul 16 }} engine.type={{ .Values.engineType }} discovery.mode={{ .Values.discoveryMode }} diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapper.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapper.java index 36a6ba8558be..0a7506bab3b0 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapper.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapper.java @@ -14,10 +14,8 @@ package com.alibaba.graphscope.groot.dataload.databuild; import com.alibaba.graphscope.groot.common.config.DataLoadConfig; -import com.alibaba.graphscope.groot.common.exception.PropertyDefNotFoundException; import com.alibaba.graphscope.groot.common.schema.api.*; import com.alibaba.graphscope.groot.common.schema.mapper.GraphSchemaMapper; -import com.alibaba.graphscope.groot.common.schema.wrapper.DataType; import com.alibaba.graphscope.groot.common.schema.wrapper.PropertyValue; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; @@ -32,17 +30,11 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.text.ParseException; -import java.text.SimpleDateFormat; import java.util.*; public class DataBuildMapper extends Mapper { private static final Logger logger = LoggerFactory.getLogger(DataBuildMapper.class); - public static final SimpleDateFormat SRC_FMT = - new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); - public static final SimpleDateFormat DST_FMT = new SimpleDateFormat("yyyyMMddHHmmssSSS"); - private GraphSchema graphSchema; private DataEncoder dataEncoder; private String separator; @@ -50,7 +42,6 @@ public class DataBuildMapper extends Mapper>() {}); - this.ldbcCustomize = conf.getBoolean(DataLoadConfig.LDBC_CUSTOMIZE, false); this.skipHeader = conf.getBoolean(DataLoadConfig.SKIP_HEADER, true); - DST_FMT.setTimeZone(TimeZone.getTimeZone("GMT+00:00")); } @Override @@ -78,122 +67,41 @@ protected void map(LongWritable key, Text value, Context context) } String fullPath = context.getConfiguration().get(MRJobConfig.MAP_INPUT_FILE); String fileName = fullPath.substring(fullPath.lastIndexOf('/') + 1); - ColumnMappingInfo columnMappingInfo = this.fileToColumnMappingInfo.get(fileName); - if (columnMappingInfo == null) { + ColumnMappingInfo info = this.fileToColumnMappingInfo.get(fileName); + if (info == null) { logger.warn("Mapper: ignore [{}], fullPath is [{}]", fileName, fullPath); return; } - int labelId = columnMappingInfo.getLabelId(); - long tableId = columnMappingInfo.getTableId(); - Map propertiesColumnMapping = columnMappingInfo.getPropertiesColMap(); String[] items = value.toString().split(separator); + + int labelId = info.getLabelId(); + long tableId = info.getTableId(); GraphElement type = this.graphSchema.getElement(labelId); - Map propertiesMap = - buildPropertiesMap(type, items, propertiesColumnMapping); - BytesRef valRef = this.dataEncoder.encodeProperties(labelId, propertiesMap); + Map colMap = info.getPropertiesColMap(); + Map properties = Utils.buildProperties(type, items, colMap); + + BytesRef valRef = dataEncoder.encodeProperties(labelId, properties); this.outVal.set(valRef.getArray(), valRef.getOffset(), valRef.getLength()); + if (type instanceof GraphVertex) { - BytesRef keyBytesRef = - this.dataEncoder.encodeVertexKey((GraphVertex) type, propertiesMap, tableId); - this.outKey.set( - keyBytesRef.getArray(), keyBytesRef.getOffset(), keyBytesRef.getLength()); + BytesRef keyRef = + Utils.getVertexKeyRef(dataEncoder, (GraphVertex) type, properties, tableId); + this.outKey.set(keyRef.getArray(), keyRef.getOffset(), keyRef.getLength()); context.write(this.outKey, this.outVal); } else if (type instanceof GraphEdge) { - int srcLabelId = columnMappingInfo.getSrcLabelId(); - Map srcPkColMap = columnMappingInfo.getSrcPkColMap(); - GraphElement srcType = this.graphSchema.getElement(srcLabelId); - Map srcPkMap = buildPropertiesMap(srcType, items, srcPkColMap); - - int dstLabelId = columnMappingInfo.getDstLabelId(); - Map dstPkColMap = columnMappingInfo.getDstPkColMap(); - GraphElement dstType = this.graphSchema.getElement(dstLabelId); - Map dstPkMap = buildPropertiesMap(dstType, items, dstPkColMap); - - BytesRef outEdgeKeyRef = - this.dataEncoder.encodeEdgeKey( - (GraphVertex) srcType, - srcPkMap, - (GraphVertex) dstType, - dstPkMap, - (GraphEdge) type, - propertiesMap, - tableId, - true); - this.outKey.set( - outEdgeKeyRef.getArray(), outEdgeKeyRef.getOffset(), outEdgeKeyRef.getLength()); + BytesRef out = + Utils.getEdgeKeyRef( + dataEncoder, graphSchema, info, items, properties, tableId, true); + this.outKey.set(out.getArray(), out.getOffset(), out.getLength()); context.write(this.outKey, this.outVal); - BytesRef inEdgeKeyRef = - this.dataEncoder.encodeEdgeKey( - (GraphVertex) srcType, - srcPkMap, - (GraphVertex) dstType, - dstPkMap, - (GraphEdge) type, - propertiesMap, - tableId, - false); - this.outKey.set( - inEdgeKeyRef.getArray(), inEdgeKeyRef.getOffset(), inEdgeKeyRef.getLength()); + BytesRef in = + Utils.getEdgeKeyRef( + dataEncoder, graphSchema, info, items, properties, tableId, false); + this.outKey.set(in.getArray(), in.getOffset(), in.getLength()); context.write(this.outKey, this.outVal); } else { - throw new IllegalArgumentException( - "invalid label [" + labelId + "], only support VertexType and EdgeType"); - } - } - - private Map buildPropertiesMap( - GraphElement typeDef, String[] items, Map columnMapping) { - Map operationProperties = new HashMap<>(columnMapping.size()); - columnMapping.forEach( - (colIdx, propertyId) -> { - GraphProperty propertyDef = typeDef.getProperty(propertyId); - if (propertyDef == null) { - throw new PropertyDefNotFoundException( - "property [" - + propertyId - + "] not found in [" - + typeDef.getLabel() - + "]"); - } - if (colIdx >= items.length) { - throw new IllegalArgumentException( - "label [" - + typeDef.getLabel() - + "], invalid mapping [" - + colIdx - + "] -> [" - + propertyId - + "], data [" - + Arrays.toString(items) - + "]"); - } - DataType dataType = propertyDef.getDataType(); - - String val = items[colIdx]; - if (ldbcCustomize) { - String name = propertyDef.getName(); - switch (name) { - case "creationDate": - case "joinDate": - val = convertDate(val); - break; - case "birthday": - val = val.replace("-", ""); - break; - } - } - PropertyValue propertyValue = new PropertyValue(dataType, val); - operationProperties.put(propertyId, propertyValue); - }); - return operationProperties; - } - - public static String convertDate(String input) { - try { - return DST_FMT.format(SRC_FMT.parse(input)); - } catch (ParseException e) { - throw new RuntimeException(e); + throw new IllegalArgumentException("Invalid label " + labelId); } } } diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdps.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdps.java index 20a98887aea6..d1c26dd84477 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdps.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdps.java @@ -16,12 +16,11 @@ package com.alibaba.graphscope.groot.dataload.databuild; import com.alibaba.graphscope.groot.common.config.DataLoadConfig; -import com.alibaba.graphscope.groot.common.exception.PropertyDefNotFoundException; import com.alibaba.graphscope.groot.common.schema.api.*; import com.alibaba.graphscope.groot.common.schema.mapper.GraphSchemaMapper; -import com.alibaba.graphscope.groot.common.schema.wrapper.DataType; import com.alibaba.graphscope.groot.common.schema.wrapper.PropertyValue; import com.aliyun.odps.data.Record; +import com.aliyun.odps.data.TableInfo; import com.aliyun.odps.mapred.MapperBase; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; @@ -66,114 +65,41 @@ public void setup(TaskContext context) throws IOException { @Override public void map(long recordNum, Record record, TaskContext context) throws IOException { - String tableName = context.getInputTableInfo().getTableName(); - ColumnMappingInfo columnMappingInfo = this.fileToColumnMappingInfo.get(tableName); - if (columnMappingInfo == null) { - logger.warn( - "Mapper: ignore [{}], table info is [{}]", - tableName, - context.getInputTableInfo()); + TableInfo tableInfo = context.getInputTableInfo(); + String tableName = tableInfo.getTableName(); + ColumnMappingInfo info = this.fileToColumnMappingInfo.get(tableName); + if (info == null) { + logger.warn("Mapper: ignore [{}], table info: [{}]", tableName, tableInfo); return; } + String[] items = Utils.parseRecords(record); - int labelId = columnMappingInfo.getLabelId(); - long tableId = columnMappingInfo.getTableId(); - int columnCount = record.getColumnCount(); - String[] items = new String[columnCount]; - for (int i = 0; i < columnCount; i++) { - if (record.get(i) == null) { - // items[i] = ""; - items[i] = null; - } else { - items[i] = record.get(i).toString(); - } - } - Map propertiesColumnMapping = columnMappingInfo.getPropertiesColMap(); + int labelId = info.getLabelId(); + long tableId = info.getTableId(); GraphElement type = this.graphSchema.getElement(labelId); - Map propertiesMap = - buildPropertiesMap(type, items, propertiesColumnMapping); - BytesRef valRef = this.dataEncoder.encodeProperties(labelId, propertiesMap); + Map colMap = info.getPropertiesColMap(); + Map properties = Utils.buildProperties(type, items, colMap); + + BytesRef valRef = this.dataEncoder.encodeProperties(labelId, properties); outVal.set(new Object[] {new String(valRef.getBytes(), charSet)}); if (type instanceof GraphVertex) { BytesRef keyRef = - this.dataEncoder.encodeVertexKey((GraphVertex) type, propertiesMap, tableId); + Utils.getVertexKeyRef(dataEncoder, (GraphVertex) type, properties, tableId); outKey.set(new Object[] {new String(keyRef.getBytes(), charSet)}); context.write(outKey, outVal); } else if (type instanceof GraphEdge) { - int srcLabelId = columnMappingInfo.getSrcLabelId(); - Map srcPkColMap = columnMappingInfo.getSrcPkColMap(); - GraphElement srcType = this.graphSchema.getElement(srcLabelId); - Map srcPkMap = buildPropertiesMap(srcType, items, srcPkColMap); - - int dstLabelId = columnMappingInfo.getDstLabelId(); - Map dstPkColMap = columnMappingInfo.getDstPkColMap(); - GraphElement dstType = this.graphSchema.getElement(dstLabelId); - Map dstPkMap = buildPropertiesMap(dstType, items, dstPkColMap); - - BytesRef outEdgeKeyRef = - this.dataEncoder.encodeEdgeKey( - (GraphVertex) srcType, - srcPkMap, - (GraphVertex) dstType, - dstPkMap, - (GraphEdge) type, - propertiesMap, - tableId, - true); - outKey.set(new Object[] {new String(outEdgeKeyRef.getBytes(), charSet)}); + BytesRef out = + Utils.getEdgeKeyRef( + dataEncoder, graphSchema, info, items, properties, tableId, true); + outKey.set(new Object[] {new String(out.getBytes(), charSet)}); context.write(outKey, outVal); - BytesRef inEdgeKeyRef = - this.dataEncoder.encodeEdgeKey( - (GraphVertex) srcType, - srcPkMap, - (GraphVertex) dstType, - dstPkMap, - (GraphEdge) type, - propertiesMap, - tableId, - false); - outKey.set(new Object[] {new String(inEdgeKeyRef.getBytes(), charSet)}); + BytesRef in = + Utils.getEdgeKeyRef( + dataEncoder, graphSchema, info, items, properties, tableId, false); + outKey.set(new Object[] {new String(in.getBytes(), charSet)}); context.write(outKey, outVal); } else { - throw new IllegalArgumentException( - "invalid label [" + labelId + "], only support VertexType and EdgeType"); + throw new IllegalArgumentException("Invalid label " + labelId); } } - - private Map buildPropertiesMap( - GraphElement typeDef, String[] items, Map columnMapping) { - Map operationProperties = new HashMap<>(columnMapping.size()); - columnMapping.forEach( - (colIdx, propertyId) -> { - GraphProperty propertyDef = typeDef.getProperty(propertyId); - if (propertyDef == null) { - throw new PropertyDefNotFoundException( - "property [" - + propertyId - + "] not found in [" - + typeDef.getLabel() - + "]"); - } - if (colIdx >= items.length) { - throw new IllegalArgumentException( - "label [" - + typeDef.getLabel() - + "], invalid mapping [" - + colIdx - + "] -> [" - + propertyId - + "], data [" - + Arrays.toString(items) - + "]"); - } - String val = items[colIdx]; - PropertyValue propertyValue = null; - if (val != null) { - DataType dataType = propertyDef.getDataType(); - propertyValue = new PropertyValue(dataType, val); - } - operationProperties.put(propertyId, propertyValue); - }); - return operationProperties; - } } diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdpsDebug.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdpsDebug.java new file mode 100644 index 000000000000..ef27bce76f25 --- /dev/null +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdpsDebug.java @@ -0,0 +1,144 @@ +/** + * Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.graphscope.groot.dataload.databuild; + +import com.alibaba.graphscope.groot.common.config.DataLoadConfig; +import com.alibaba.graphscope.groot.common.schema.api.*; +import com.alibaba.graphscope.groot.common.schema.mapper.GraphSchemaMapper; +import com.alibaba.graphscope.groot.common.schema.wrapper.PropertyValue; +import com.alibaba.graphscope.groot.common.util.SchemaUtils; +import com.aliyun.odps.data.Record; +import com.aliyun.odps.data.TableInfo; +import com.aliyun.odps.mapred.MapperBase; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +public class DataBuildMapperOdpsDebug extends MapperBase { + private static final Logger logger = LoggerFactory.getLogger(DataBuildMapperOdpsDebug.class); + private GraphSchema graphSchema; + private DataEncoder dataEncoder; + private Map fileToColumnMappingInfo; + + private Record outKey; + + @Override + public void setup(TaskContext context) throws IOException { + outKey = context.createOutputRecord(); + + String metaData = context.getJobConf().get(DataLoadConfig.META_INFO); + ObjectMapper objectMapper = new ObjectMapper(); + Map metaMap = + objectMapper.readValue(metaData, new TypeReference>() {}); + String schemaJson = metaMap.get(DataLoadConfig.SCHEMA_JSON); + graphSchema = GraphSchemaMapper.parseFromJson(schemaJson).toGraphSchema(); + dataEncoder = new DataEncoder(graphSchema); + String columnMappingsJson = metaMap.get(DataLoadConfig.COLUMN_MAPPINGS); + fileToColumnMappingInfo = + objectMapper.readValue( + columnMappingsJson, new TypeReference>() {}); + } + + @Override + public void map(long recordNum, Record record, TaskContext context) throws IOException { + TableInfo tableInfo = context.getInputTableInfo(); + String tableName = tableInfo.getTableName(); + ColumnMappingInfo info = this.fileToColumnMappingInfo.get(tableName); + if (info == null) { + logger.warn("Mapper: ignore [{}], table info: [{}]", tableName, tableInfo); + return; + } + + String[] items = Utils.parseRecords(record); + + int labelId = info.getLabelId(); + long tableId = info.getTableId(); + GraphElement type = this.graphSchema.getElement(labelId); + Map colMap = info.getPropertiesColMap(); + Map properties = Utils.buildProperties(type, items, colMap); + + if (type instanceof GraphVertex) { + outKey.set(1, getVertexRawKeys((GraphVertex) type, items)); + BytesRef keyRef = + Utils.getVertexKeyRef(dataEncoder, (GraphVertex) type, properties, tableId); + outKey.set(0, getVertexKeyEncoded(keyRef)); + context.write(outKey); + } else if (type instanceof GraphEdge) { + outKey.set(1, getEdgeRawKeys(info, items)); + BytesRef out = + Utils.getEdgeKeyRef( + dataEncoder, graphSchema, info, items, properties, tableId, true); + outKey.set(0, getEdgeKeyEncoded(out)); + context.write(outKey); + BytesRef in = + Utils.getEdgeKeyRef( + dataEncoder, graphSchema, info, items, properties, tableId, false); + outKey.set(0, getEdgeKeyEncoded(in)); + context.write(outKey); + } else { + throw new IllegalArgumentException("Invalid label " + labelId); + } + } + + private String getVertexRawKeys(GraphVertex type, String[] items) throws IOException { + List pkIds = SchemaUtils.getVertexPrimaryKeyList(type); + return concatenateItemsByIndices(items, pkIds); + } + + private String getEdgeRawKeys(ColumnMappingInfo info, String[] items) throws IOException { + Map srcPkColMap = info.getSrcPkColMap(); + Map dstPkColMap = info.getDstPkColMap(); + + List pkIds = new ArrayList<>(srcPkColMap.keySet()); + pkIds.addAll(dstPkColMap.keySet()); + return concatenateItemsByIndices(items, pkIds); + } + + private static String concatenateItemsByIndices(String[] array, List indices) + throws IOException { + StringBuilder builder = new StringBuilder(); + if (indices.isEmpty()) { + throw new IOException("indices are empty!"); + } + for (int index : indices) { + builder.append(array[index]).append(","); + } + builder.deleteCharAt(builder.length() - 1); + return builder.toString(); + } + + private String getVertexKeyEncoded(BytesRef keyRef) { + ByteBuffer buffer = ByteBuffer.wrap(keyRef.getBytes()); + long tableId = buffer.getLong(0); + long hashId = buffer.getLong(8); + return tableId + "|" + hashId; + } + + private String getEdgeKeyEncoded(BytesRef keyRef) { + ByteBuffer buffer = ByteBuffer.wrap(keyRef.getBytes()); + long tableId = buffer.getLong(0); + long srcHashId = buffer.getLong(8); + long dstHashId = buffer.getLong(16); + long eid = buffer.getLong(24); + return tableId + "|" + srcHashId + "|" + dstHashId + "|" + eid; + } +} diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataEncoder.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataEncoder.java index 4204bdcc1acb..ec14593ca650 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataEncoder.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataEncoder.java @@ -18,16 +18,14 @@ import com.alibaba.graphscope.groot.common.schema.api.GraphElement; import com.alibaba.graphscope.groot.common.schema.api.GraphSchema; import com.alibaba.graphscope.groot.common.schema.api.GraphVertex; +import com.alibaba.graphscope.groot.common.schema.wrapper.DataType; import com.alibaba.graphscope.groot.common.schema.wrapper.PropertyValue; import com.alibaba.graphscope.groot.common.util.PkHashUtils; import com.alibaba.graphscope.groot.common.util.SchemaUtils; import java.nio.Buffer; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; public class DataEncoder { @@ -50,9 +48,6 @@ public BytesRef encodeVertexKey( scratch.putLong(tableId << 1); scratch.putLong(hashId); scratch.putLong(SNAPSHOT_ID); - // System.out.println("EncodeVertexKey: labelId:" + type.getLabelId() + "; tableId:" + - // tableId); - // System.out.println("scratch: [" + (tableId << 1) + "],[" + hashId + "]"); flip(scratch); return new BytesRef(scratch.array(), 0, scratch.limit()); } @@ -107,7 +102,7 @@ public BytesRef encodeProperties(int labelId, Map proper return new BytesRef(scratch.array(), 0, scratch.limit()); } - private long getHashId( + private static long getHashId( int labelId, Map operationProperties, List pkIds) { List pks = new ArrayList<>(pkIds.size()); for (int pkId : pkIds) { @@ -140,4 +135,58 @@ public static void clear(Buffer buffer) { public static void flip(Buffer buffer) { buffer.flip(); } + + // For testing + + private static void encode(int labelId, long tableId, String pk) { + Map propertiesMap = new HashMap<>(); + propertiesMap.put(0, new PropertyValue(DataType.STRING, pk)); + List pkIds = new ArrayList<>(); + pkIds.add(0); + ByteBuffer scratch = ByteBuffer.allocate(1 << 20); + long hashId = getHashId(labelId, propertiesMap, pkIds); + scratch.putLong(tableId << 1); + scratch.putLong(hashId); + scratch.flip(); + System.out.println("TableId: " + scratch.getLong(0) + " | " + scratch.getLong(8)); + } + + private static void encodeTest(int id1, String s1, int id2, String s2, long expected) { + DataEncoder.encodeTest(id1, 0, s1, id2, 0, s2, 0, expected); + } + + private static void encodeTest( + int id1, + long tableId1, + String s1, + int id2, + long tableId2, + String s2, + long expectedTableId, + long expectedHash) { + DataEncoder.encode(id1, tableId1, s1); + DataEncoder.encode(id2, tableId2, s2); + System.out.println("Expected TableId: " + expectedTableId + "; HashId: " + expectedHash); + } + + public static void main(String[] args) { + long expectedHash1 = -3968787722979159891L; + long expectedTable1 = -9223372036854775698L; + int labelId1a = 11; + String s1a = "20230911@META_COLUMN@1@22073998@879591@265939259@4592566627"; + long tableId1 = -4611686018427387857L; + int labelId1b = 11; + String s1b = "20230911@META_INDEX@1@16229181@824771@232467064@551697979"; + long tableId2 = -4611686018427387854L; + + long hashId2 = -6575735951890802946L; + int labelId2a = 5; + String s2a = "db24f92331e97257e4a0ad4ffc464de2M01"; + int labelId2b = 5; + String s2b = "889569174b748f2f4514bf6547f12760M01"; + + encodeTest( + labelId1a, tableId1, s1a, labelId1b, tableId2, s1b, expectedTable1, expectedHash1); + encodeTest(labelId2a, s2a, labelId2b, s2b, hashId2); + } } diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuild.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuild.java index 2f219030c1c0..f0c883b1e03d 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuild.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuild.java @@ -14,15 +14,12 @@ package com.alibaba.graphscope.groot.dataload.databuild; import com.alibaba.graphscope.groot.common.config.DataLoadConfig; -import com.alibaba.graphscope.groot.common.schema.api.GraphEdge; -import com.alibaba.graphscope.groot.common.schema.api.GraphElement; import com.alibaba.graphscope.groot.common.schema.api.GraphSchema; import com.alibaba.graphscope.groot.common.schema.mapper.GraphSchemaMapper; import com.alibaba.graphscope.groot.common.schema.wrapper.GraphDef; import com.alibaba.graphscope.groot.common.util.UuidUtils; import com.alibaba.graphscope.groot.sdk.GrootClient; import com.alibaba.graphscope.proto.groot.DataLoadTargetPb; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.hadoop.conf.Configuration; @@ -55,8 +52,7 @@ public static void main(String[] args) } String inputPath = properties.getProperty(DataLoadConfig.INPUT_PATH); String outputPath = properties.getProperty(DataLoadConfig.OUTPUT_PATH); - String columnMappingConfigStr = - properties.getProperty(DataLoadConfig.COLUMN_MAPPING_CONFIG); + String configStr = properties.getProperty(DataLoadConfig.COLUMN_MAPPING_CONFIG); String graphEndpoint = properties.getProperty(DataLoadConfig.GRAPH_ENDPOINT); String uniquePath = @@ -65,61 +61,38 @@ public static void main(String[] args) String username = properties.getProperty(DataLoadConfig.USER_NAME, ""); String password = properties.getProperty(DataLoadConfig.PASS_WORD, ""); - GrootClient client = - GrootClient.newBuilder() - .setHosts(graphEndpoint) - .setUsername(username) - .setPassword(password) - .build(); - ObjectMapper objectMapper = new ObjectMapper(); - Map columnMappingConfig = - objectMapper.readValue( - columnMappingConfigStr, - new TypeReference>() {}); - - List targets = new ArrayList<>(); - for (FileColumnMapping fileColumnMapping : columnMappingConfig.values()) { - DataLoadTargetPb.Builder builder = DataLoadTargetPb.newBuilder(); - builder.setLabel(fileColumnMapping.getLabel()); - if (fileColumnMapping.getSrcLabel() != null) { - builder.setSrcLabel(fileColumnMapping.getSrcLabel()); - } - if (fileColumnMapping.getDstLabel() != null) { - builder.setDstLabel(fileColumnMapping.getDstLabel()); - } - targets.add(builder.build()); - } + GrootClient client = Utils.getClient(graphEndpoint, username, password); + + Map columnMappingConfig = Utils.parseColumnMapping(configStr); + List targets = Utils.getDataLoadTargets(columnMappingConfig); GraphSchema schema = GraphDef.parseProto(client.prepareDataLoad(targets)); - String schemaJson = GraphSchemaMapper.parseFromSchema(schema).toJsonString(); int partitionNum = client.getPartitionNum(); - Map columnMappingInfos = new HashMap<>(); + Map info = new HashMap<>(); columnMappingConfig.forEach( (fileName, fileColumnMapping) -> { - columnMappingInfos.put(fileName, fileColumnMapping.toColumnMappingInfo(schema)); + info.put(fileName, fileColumnMapping.toColumnMappingInfo(schema)); }); - String ldbcCustomize = properties.getProperty(DataLoadConfig.LDBC_CUSTOMIZE, "true"); - long splitSize = - Long.parseLong(properties.getProperty(DataLoadConfig.SPLIT_SIZE, "256")) - * 1024 - * 1024; - boolean loadAfterBuild = - properties - .getProperty(DataLoadConfig.LOAD_AFTER_BUILD, "false") - .equalsIgnoreCase("true"); - boolean skipHeader = - properties.getProperty(DataLoadConfig.SKIP_HEADER, "true").equalsIgnoreCase("true"); + + String _tmp = properties.getProperty(DataLoadConfig.SPLIT_SIZE, "256"); + long splitSize = Long.parseLong(_tmp) * 1024 * 1024; + _tmp = properties.getProperty(DataLoadConfig.LOAD_AFTER_BUILD, "false"); + boolean loadAfterBuild = Utils.parseBoolean(_tmp); + _tmp = properties.getProperty(DataLoadConfig.SKIP_HEADER, "true"); + boolean skipHeader = Utils.parseBoolean(_tmp); String separator = properties.getProperty(DataLoadConfig.SEPARATOR, "\\|"); + ObjectMapper mapper = new ObjectMapper(); + String schemaJson = GraphSchemaMapper.parseFromSchema(schema).toJsonString(); + Configuration conf = new Configuration(); conf.setBoolean("mapreduce.map.speculative", false); conf.setBoolean("mapreduce.reduce.speculative", false); conf.setLong(CombineTextInputFormat.SPLIT_MINSIZE_PERNODE, splitSize); conf.setLong(CombineTextInputFormat.SPLIT_MINSIZE_PERRACK, splitSize); conf.setStrings(DataLoadConfig.SCHEMA_JSON, schemaJson); - String mappings = objectMapper.writeValueAsString(columnMappingInfos); + String mappings = mapper.writeValueAsString(info); conf.setStrings(DataLoadConfig.COLUMN_MAPPINGS, mappings); - conf.setBoolean(DataLoadConfig.LDBC_CUSTOMIZE, ldbcCustomize.equalsIgnoreCase("true")); conf.set(DataLoadConfig.SEPARATOR, separator); conf.setBoolean(DataLoadConfig.SKIP_HEADER, skipHeader); Job job = Job.getInstance(conf, "build graph data"); @@ -150,34 +123,25 @@ public static void main(String[] args) FileSystem fs = outputDir.getFileSystem(job.getConfiguration()); FSDataOutputStream os = fs.create(new Path(outputDir, "META")); - os.writeUTF(objectMapper.writeValueAsString(outputMeta)); + os.writeUTF(mapper.writeValueAsString(outputMeta)); os.flush(); os.close(); if (loadAfterBuild) { String dataPath = fs.makeQualified(outputDir).toString(); - - logger.info("start ingesting data"); - client.ingestData(dataPath); - - logger.info("commit bulk load"); - Map tableToTarget = new HashMap<>(); - for (ColumnMappingInfo columnMappingInfo : columnMappingInfos.values()) { - long tableId = columnMappingInfo.getTableId(); - int labelId = columnMappingInfo.getLabelId(); - GraphElement graphElement = schema.getElement(labelId); - String label = graphElement.getLabel(); - DataLoadTargetPb.Builder builder = DataLoadTargetPb.newBuilder(); - builder.setLabel(label); - if (graphElement instanceof GraphEdge) { - builder.setSrcLabel( - schema.getElement(columnMappingInfo.getSrcLabelId()).getLabel()); - builder.setDstLabel( - schema.getElement(columnMappingInfo.getDstLabelId()).getLabel()); + logger.info("start ingesting data from " + dataPath); + try { + client.ingestData(dataPath); + logger.info("start committing bulk load"); + Map tableToTarget = Utils.getTableToTargets(schema, info); + client.commitDataLoad(tableToTarget, uniquePath); + } finally { + try { + client.clearIngest(uniquePath); + } catch (Exception e) { + logger.warn("Clear ingest failed, ignored"); } - tableToTarget.put(tableId, builder.build()); } - client.commitDataLoad(tableToTarget, uniquePath); } } } diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java index 27953b3afeee..80098affda40 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java @@ -14,10 +14,7 @@ package com.alibaba.graphscope.groot.dataload.databuild; import com.alibaba.graphscope.groot.common.config.DataLoadConfig; -import com.alibaba.graphscope.groot.common.schema.api.GraphEdge; -import com.alibaba.graphscope.groot.common.schema.api.GraphElement; import com.alibaba.graphscope.groot.common.schema.api.GraphSchema; -import com.alibaba.graphscope.groot.common.schema.api.GraphVertex; import com.alibaba.graphscope.groot.common.schema.mapper.GraphSchemaMapper; import com.alibaba.graphscope.groot.common.schema.wrapper.GraphDef; import com.alibaba.graphscope.groot.common.util.UuidUtils; @@ -27,14 +24,12 @@ import com.alibaba.graphscope.proto.groot.DataLoadTargetPb; import com.alibaba.graphscope.proto.groot.GraphDefPb; import com.aliyun.odps.Odps; -import com.aliyun.odps.data.TableInfo; import com.aliyun.odps.mapred.JobClient; import com.aliyun.odps.mapred.conf.JobConf; import com.aliyun.odps.mapred.conf.SessionState; import com.aliyun.odps.mapred.utils.InputUtils; import com.aliyun.odps.mapred.utils.OutputUtils; import com.aliyun.odps.mapred.utils.SchemaUtils; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; @@ -47,7 +42,6 @@ public class OfflineBuildOdps { private static final Logger logger = LoggerFactory.getLogger(OfflineBuildOdps.class); - private static Odps odps; public static void main(String[] args) throws IOException { @@ -59,17 +53,8 @@ public static void main(String[] args) throws IOException { } odps = SessionState.get().getOdps(); - String columnMappingConfigStr = - properties.getProperty(DataLoadConfig.COLUMN_MAPPING_CONFIG); - // User could specify a list of `key=value` pairs in command line, - // to substitute variables like `${key}` in columnMappingConfigStr - for (int i = 1; i < args.length; ++i) { - String[] kv = args[i].split("="); - if (kv.length == 2) { - String key = "${" + kv[0] + "}"; - columnMappingConfigStr = columnMappingConfigStr.replace(key, kv[1]); - } - } + String configStr = properties.getProperty(DataLoadConfig.COLUMN_MAPPING_CONFIG); + configStr = Utils.replaceVars(configStr, Arrays.copyOfRange(args,1, args.length)); String graphEndpoint = properties.getProperty(DataLoadConfig.GRAPH_ENDPOINT); String username = properties.getProperty(DataLoadConfig.USER_NAME, ""); @@ -78,74 +63,30 @@ public static void main(String[] args) throws IOException { String uniquePath = properties.getProperty(DataLoadConfig.UNIQUE_PATH, UuidUtils.getBase64UUIDString()); - ObjectMapper objectMapper = new ObjectMapper(); - Map columnMappingConfig = - objectMapper.readValue( - columnMappingConfigStr, - new TypeReference>() {}); + Map mappingConfig = Utils.parseColumnMapping(configStr); - List targets = new ArrayList<>(); - for (FileColumnMapping fileColumnMapping : columnMappingConfig.values()) { - DataLoadTargetPb.Builder builder = DataLoadTargetPb.newBuilder(); - builder.setLabel(fileColumnMapping.getLabel()); - if (fileColumnMapping.getSrcLabel() != null) { - builder.setSrcLabel(fileColumnMapping.getSrcLabel()); - } - if (fileColumnMapping.getDstLabel() != null) { - builder.setDstLabel(fileColumnMapping.getDstLabel()); - } - targets.add(builder.build()); - } + List targets = Utils.getDataLoadTargets(mappingConfig); - GrootClient client = - GrootClient.newBuilder() - .setHosts(graphEndpoint) - .setUsername(username) - .setPassword(password) - .build(); + GrootClient client = Utils.getClient(graphEndpoint, username, password); GraphDefPb graphDefPb = client.prepareDataLoad(targets); System.out.println("GraphDef: " + graphDefPb); GraphSchema schema = GraphDef.parseProto(graphDefPb); - String schemaJson = GraphSchemaMapper.parseFromSchema(schema).toJsonString(); // number of reduce task int partitionNum = client.getPartitionNum(); - - Map tableType = new HashMap<>(); - - Map columnMappingInfos = new HashMap<>(); - columnMappingConfig.forEach( - (fileName, fileColumnMapping) -> { - ColumnMappingInfo columnMappingInfo = - fileColumnMapping.toColumnMappingInfo(schema); - // Note the project and partition is stripped (if exists) - columnMappingInfos.put(getTableName(fileName), columnMappingInfo); - tableType.put(fileName, schema.getElement(columnMappingInfo.getLabelId())); - }); long splitSize = Long.parseLong(properties.getProperty(DataLoadConfig.SPLIT_SIZE, "256")); JobConf job = new JobConf(); - String mappings = objectMapper.writeValueAsString(columnMappingInfos); - // Avoid java sandbox protection - job.set("odps.isolation.session.enable", "true"); - // Don't introduce legacy jar files - job.set("odps.sql.udf.java.retain.legacy", "false"); - // Default priority is 9 - job.setInstancePriority(0); - job.set("odps.mr.run.mode", "sql"); - job.set("odps.mr.sql.group.enable", "true"); + job.set("odps.isolation.session.enable", "true"); // Avoid java sandbox protection + job.set("odps.sql.udf.java.retain.legacy", "false"); // exclude legacy jar files + job.setInstancePriority(0); // Default priority is 9 + // Disable backups from competing with original instances + job.set("odps.sql.backupinstance.enabled", "false"); job.setFunctionTimeout(2400); job.setMemoryForReducerJVM(4096); - for (Map.Entry entry : tableType.entrySet()) { - if (entry.getValue() instanceof GraphVertex || entry.getValue() instanceof GraphEdge) { - String name = entry.getKey(); - InputUtils.addTable(parseTableURL(name), job); - } - } - job.setMapperClass(DataBuildMapperOdps.class); job.setReducerClass(DataBuildReducerOdps.class); job.setPartitionerClass(DataBuildPartitionerOdps.class); @@ -154,6 +95,8 @@ public static void main(String[] args) throws IOException { job.setMapOutputKeySchema(SchemaUtils.fromString("key:string")); job.setMapOutputValueSchema(SchemaUtils.fromString("value:string")); + mappingConfig.forEach((name, x) -> InputUtils.addTable(Utils.parseTableURL(odps, name), job)); + String dataSinkType = properties.getProperty(DataLoadConfig.DATA_SINK_TYPE, "VOLUME"); Map config; String fullQualifiedDataPath; @@ -172,20 +115,25 @@ public static void main(String[] args) throws IOException { fullQualifiedDataPath = fs.getQualifiedPath(); } String outputTable = properties.getProperty(DataLoadConfig.OUTPUT_TABLE); - OutputUtils.addTable(parseTableURL(outputTable), job); + OutputUtils.addTable(Utils.parseTableURL(odps, outputTable), job); } else if (dataSinkType.equalsIgnoreCase("HDFS")) { throw new IOException("HDFS as a data sink is not supported in ODPS"); } else { throw new IOException("Unsupported data sink: " + dataSinkType); } + + String schemaJson = GraphSchemaMapper.parseFromSchema(schema).toJsonString(); + Map info = Utils.getMappingInfo(odps, schema, mappingConfig); + ObjectMapper mapper = new ObjectMapper(); + Map outputMeta = new HashMap<>(); outputMeta.put(DataLoadConfig.GRAPH_ENDPOINT, graphEndpoint); outputMeta.put(DataLoadConfig.SCHEMA_JSON, schemaJson); - outputMeta.put(DataLoadConfig.COLUMN_MAPPINGS, mappings); + outputMeta.put(DataLoadConfig.COLUMN_MAPPINGS, mapper.writeValueAsString(info)); outputMeta.put(DataLoadConfig.UNIQUE_PATH, uniquePath); outputMeta.put(DataLoadConfig.DATA_SINK_TYPE, dataSinkType); - job.set(DataLoadConfig.META_INFO, objectMapper.writeValueAsString(outputMeta)); + job.set(DataLoadConfig.META_INFO, mapper.writeValueAsString(outputMeta)); job.set(DataLoadConfig.DATA_SINK_TYPE, dataSinkType); try { JobClient.runJob(job); @@ -193,10 +141,8 @@ public static void main(String[] args) throws IOException { throw new IOException(e); } - boolean loadAfterBuild = - properties - .getProperty(DataLoadConfig.LOAD_AFTER_BUILD, "false") - .equalsIgnoreCase("true"); + String _tmp = properties.getProperty(DataLoadConfig.LOAD_AFTER_BUILD, "false"); + boolean loadAfterBuild = Utils.parseBoolean(_tmp); if (loadAfterBuild) { fullQualifiedDataPath = fullQualifiedDataPath + uniquePath; logger.info("start ingesting data from " + fullQualifiedDataPath); @@ -205,71 +151,15 @@ public static void main(String[] args) throws IOException { try { client.ingestData(fullQualifiedDataPath, config); logger.info("start committing bulk load"); - Map tableToTarget = new HashMap<>(); - for (ColumnMappingInfo columnMappingInfo : columnMappingInfos.values()) { - long tableId = columnMappingInfo.getTableId(); - int labelId = columnMappingInfo.getLabelId(); - GraphElement graphElement = schema.getElement(labelId); - String label = graphElement.getLabel(); - DataLoadTargetPb.Builder builder = DataLoadTargetPb.newBuilder(); - builder.setLabel(label); - if (graphElement instanceof GraphEdge) { - builder.setSrcLabel( - schema.getElement(columnMappingInfo.getSrcLabelId()).getLabel()); - builder.setDstLabel( - schema.getElement(columnMappingInfo.getDstLabelId()).getLabel()); - } - tableToTarget.put(tableId, builder.build()); - } + Map tableToTarget = Utils.getTableToTargets(schema, info); client.commitDataLoad(tableToTarget, uniquePath); - - } catch (Exception ex) { - logger.error("Failed to ingest/commit data", ex); - client.clearIngest(uniquePath); - throw ex; - } - try { - client.clearIngest(uniquePath); - } catch (Exception ex) { - logger.error("Clear ingest failed, ignored."); + } finally { + try { + client.clearIngest(uniquePath); + } catch (Exception e) { + logger.warn("Clear ingest failed, ignored"); + } } } } - - private static String getTableName(String tableFullName) { - TableInfo info = parseTableURL(tableFullName); - return info.getTableName(); - } - - /** - * Parse table URL to @TableInfo - * @param url the pattern of [projectName.]tableName[|partitionSpec] - * @return TableInfo - */ - private static TableInfo parseTableURL(String url) { - String projectName = odps.getDefaultProject(); - String tableName; - String partitionSpec = null; - if (url.contains(".")) { - String[] items = url.split("\\."); - projectName = items[0]; - tableName = items[1]; - } else { - tableName = url; - } - if (tableName.contains("|")) { - String[] items = tableName.split("\\|"); - tableName = items[0]; - partitionSpec = items[1]; - } - - TableInfo.TableInfoBuilder builder = TableInfo.builder(); - builder.projectName(projectName); - - builder.tableName(tableName); - if (partitionSpec != null) { - builder.partSpec(partitionSpec); - } - return builder.build(); - } } diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdpsDebug.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdpsDebug.java new file mode 100644 index 000000000000..f07d69296f67 --- /dev/null +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdpsDebug.java @@ -0,0 +1,97 @@ +/** + * Copyright 2020 Alibaba Group Holding Limited. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.graphscope.groot.dataload.databuild; + +import com.alibaba.graphscope.groot.common.config.DataLoadConfig; +import com.alibaba.graphscope.groot.common.schema.api.GraphSchema; +import com.alibaba.graphscope.groot.common.schema.mapper.GraphSchemaMapper; +import com.alibaba.graphscope.groot.common.schema.wrapper.GraphDef; +import com.alibaba.graphscope.groot.sdk.GrootClient; +import com.alibaba.graphscope.proto.groot.DataLoadTargetPb; +import com.alibaba.graphscope.proto.groot.GraphDefPb; +import com.aliyun.odps.Odps; +import com.aliyun.odps.OdpsException; +import com.aliyun.odps.mapred.JobClient; +import com.aliyun.odps.mapred.conf.JobConf; +import com.aliyun.odps.mapred.conf.SessionState; +import com.aliyun.odps.mapred.utils.InputUtils; +import com.aliyun.odps.mapred.utils.OutputUtils; +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.*; + +public class OfflineBuildOdpsDebug { + private static final Logger logger = LoggerFactory.getLogger(OfflineBuildOdpsDebug.class); + + private static Odps odps; + + public static void main(String[] args) throws IOException, OdpsException { + String propertiesFile = args[0]; + + Properties properties = new Properties(); + try (InputStream is = new FileInputStream(propertiesFile)) { + properties.load(is); + } + odps = SessionState.get().getOdps(); + + String configStr = properties.getProperty(DataLoadConfig.COLUMN_MAPPING_CONFIG); + configStr = Utils.replaceVars(configStr, Arrays.copyOfRange(args,1, args.length)); + + String graphEndpoint = properties.getProperty(DataLoadConfig.GRAPH_ENDPOINT); + String username = properties.getProperty(DataLoadConfig.USER_NAME, ""); + String password = properties.getProperty(DataLoadConfig.PASS_WORD, ""); + + Map mappingConfig = Utils.parseColumnMapping(configStr); + + List targets = Utils.getDataLoadTargets(mappingConfig); + + GrootClient client = Utils.getClient(graphEndpoint, username, password); + + GraphDefPb graphDefPb = client.prepareDataLoad(targets); + System.out.println("GraphDef: " + graphDefPb); + + JobConf job = new JobConf(); + + job.set("odps.isolation.session.enable", "true"); // Avoid java sandbox protection + job.set("odps.sql.udf.java.retain.legacy", "false"); // exclude legacy jar files + job.setInstancePriority(0); // Default priority is 9 + job.set("odps.mr.run.mode", "sql"); + job.set("odps.mr.sql.group.enable", "true"); + + job.setMapperClass(DataBuildMapperOdpsDebug.class); + job.setNumReduceTasks(0); + + mappingConfig.forEach((name, x) -> InputUtils.addTable(Utils.parseTableURL(odps, name), job)); + String outputTable = properties.getProperty(DataLoadConfig.OUTPUT_TABLE); + OutputUtils.addTable(Utils.parseTableURL(odps, outputTable), job); + GraphSchema schema = GraphDef.parseProto(graphDefPb); + + String schemaJson = GraphSchemaMapper.parseFromSchema(schema).toJsonString(); + Map info = Utils.getMappingInfo(odps, schema, mappingConfig); + + ObjectMapper mapper = new ObjectMapper(); + Map outputMeta = new HashMap<>(); + outputMeta.put(DataLoadConfig.SCHEMA_JSON, schemaJson); + outputMeta.put(DataLoadConfig.COLUMN_MAPPINGS, mapper.writeValueAsString(info)); + job.set(DataLoadConfig.META_INFO, mapper.writeValueAsString(outputMeta)); + + JobClient.runJob(job); + } +} diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/Utils.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/Utils.java new file mode 100644 index 000000000000..4d1a587852d0 --- /dev/null +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/Utils.java @@ -0,0 +1,218 @@ +package com.alibaba.graphscope.groot.dataload.databuild; + +import com.alibaba.graphscope.groot.common.exception.PropertyDefNotFoundException; +import com.alibaba.graphscope.groot.common.schema.api.*; +import com.alibaba.graphscope.groot.common.schema.wrapper.PropertyValue; +import com.alibaba.graphscope.groot.sdk.GrootClient; +import com.alibaba.graphscope.proto.groot.DataLoadTargetPb; +import com.aliyun.odps.Odps; +import com.aliyun.odps.data.Record; +import com.aliyun.odps.data.TableInfo; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.*; + +public class Utils { + // For Main Job + static List getDataLoadTargets( + Map columnMappingConfig) { + List targets = new ArrayList<>(); + for (FileColumnMapping fileColumnMapping : columnMappingConfig.values()) { + DataLoadTargetPb.Builder builder = DataLoadTargetPb.newBuilder(); + builder.setLabel(fileColumnMapping.getLabel()); + if (fileColumnMapping.getSrcLabel() != null) { + builder.setSrcLabel(fileColumnMapping.getSrcLabel()); + } + if (fileColumnMapping.getDstLabel() != null) { + builder.setDstLabel(fileColumnMapping.getDstLabel()); + } + targets.add(builder.build()); + } + return targets; + } + + public static Map parseColumnMapping(String str) + throws JsonProcessingException { + ObjectMapper m = new ObjectMapper(); + return m.readValue(str, new TypeReference>() {}); + } + + public static Map getMappingInfo( + Odps odps, GraphSchema schema, Map config) { + Map columnMappingInfo = new HashMap<>(); + config.forEach( + (fileName, fileColumnMapping) -> { + ColumnMappingInfo subInfo = fileColumnMapping.toColumnMappingInfo(schema); + // Note the project and partition is stripped (if exists) + columnMappingInfo.put(Utils.getTableName(odps, fileName), subInfo); + }); + return columnMappingInfo; + } + + public static String getTableName(Odps odps, String tableFullName) { + TableInfo info = parseTableURL(odps, tableFullName); + return info.getTableName(); + } + + /** + * Parse table URL to @TableInfo + * @param url the pattern of [projectName.]tableName[|partitionSpec] + * @return TableInfo + */ + public static TableInfo parseTableURL(Odps odps, String url) { + String projectName = odps.getDefaultProject(); + String tableName; + String partitionSpec = null; + if (url.contains(".")) { + String[] items = url.split("\\."); + projectName = items[0]; + tableName = items[1]; + } else { + tableName = url; + } + if (tableName.contains("|")) { + String[] items = tableName.split("\\|"); + tableName = items[0]; + partitionSpec = items[1]; + } + + TableInfo.TableInfoBuilder builder = TableInfo.builder(); + builder.projectName(projectName); + + builder.tableName(tableName); + if (partitionSpec != null) { + builder.partSpec(partitionSpec); + } + return builder.build(); + } + + public static Map getTableToTargets( + GraphSchema schema, Map info) { + Map tableToTarget = new HashMap<>(); + for (ColumnMappingInfo columnMappingInfo : info.values()) { + long tableId = columnMappingInfo.getTableId(); + int labelId = columnMappingInfo.getLabelId(); + GraphElement graphElement = schema.getElement(labelId); + String label = graphElement.getLabel(); + DataLoadTargetPb.Builder builder = DataLoadTargetPb.newBuilder(); + builder.setLabel(label); + if (graphElement instanceof GraphEdge) { + builder.setSrcLabel( + schema.getElement(columnMappingInfo.getSrcLabelId()).getLabel()); + builder.setDstLabel( + schema.getElement(columnMappingInfo.getDstLabelId()).getLabel()); + } + tableToTarget.put(tableId, builder.build()); + } + return tableToTarget; + } + + public static GrootClient getClient(String endpoint, String username, String password) { + return GrootClient.newBuilder() + .setHosts(endpoint) + .setUsername(username) + .setPassword(password) + .build(); + } + + public static boolean parseBoolean(String str) { + return str.equalsIgnoreCase("true"); + } + + public static String replaceVars(String str, String[] args) { + // User could specify a list of `key=value` pairs in command line, + // to substitute variables like `${key}` in configStr + for (String arg : args) { + String[] kv = arg.split("="); + if (kv.length == 2) { + String key = "${" + kv[0] + "}"; + str = str.replace(key, kv[1]); + } + } + return str; + } + + /// For Mapper + public static Map buildProperties( + GraphElement typeDef, String[] items, Map mapping) { + Map operationProperties = new HashMap<>(mapping.size()); + mapping.forEach( + (colIdx, propertyId) -> { + GraphProperty prop = typeDef.getProperty(propertyId); + String label = typeDef.getLabel(); + String msg = "Label [" + label + "]: "; + if (prop == null) { + msg += "propertyId [" + propertyId + "] not found"; + throw new PropertyDefNotFoundException(msg); + } + if (colIdx >= items.length) { + msg += "Invalid mapping [" + colIdx + "]->[" + propertyId + "]"; + msg += "Data: " + Arrays.toString(items); + throw new IllegalArgumentException(msg); + } + PropertyValue propertyValue = null; + if (items[colIdx] != null) { + propertyValue = new PropertyValue(prop.getDataType(), items[colIdx]); + } + operationProperties.put(propertyId, propertyValue); + }); + return operationProperties; + } + + public static BytesRef getVertexKeyRef( + DataEncoder encoder, + GraphVertex type, + Map properties, + long tableId) { + return encoder.encodeVertexKey(type, properties, tableId); + } + + public static BytesRef getEdgeKeyRef( + DataEncoder encoder, + GraphSchema schema, + ColumnMappingInfo info, + String[] items, + Map properties, + long tableId, + boolean forward) { + int labelId = info.getLabelId(); + GraphEdge type = (GraphEdge) schema.getElement(labelId); + + int srcLabelId = info.getSrcLabelId(); + Map srcPkColMap = info.getSrcPkColMap(); + GraphVertex srcType = (GraphVertex) schema.getElement(srcLabelId); + Map srcPkMap = Utils.buildProperties(srcType, items, srcPkColMap); + + int dstLabelId = info.getDstLabelId(); + Map dstPkColMap = info.getDstPkColMap(); + GraphVertex dstType = (GraphVertex) schema.getElement(dstLabelId); + Map dstPkMap = Utils.buildProperties(dstType, items, dstPkColMap); + + return encoder.encodeEdgeKey( + srcType, srcPkMap, dstType, dstPkMap, type, properties, tableId, forward); + } + + public static String[] parseRecords(Record record) { + String[] items = new String[record.getColumnCount()]; + for (int i = 0; i < items.length; i++) { + Object curRecord = record.get(i); + items[i] = curRecord == null ? null : curRecord.toString(); + } + return items; + } + + public static String convertDateForLDBC(String input) { + SimpleDateFormat SRC_FMT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + SimpleDateFormat DST_FMT = new SimpleDateFormat("yyyyMMddHHmmssSSS"); + DST_FMT.setTimeZone(TimeZone.getTimeZone("GMT+00:00")); + try { + return DST_FMT.format(SRC_FMT.parse(input)); + } catch (ParseException e) { + throw new RuntimeException(e); + } + } +} From 1a529d94e60ffaa80e09a75b3aab89a0104af11f Mon Sep 17 00:00:00 2001 From: siyuan0322 Date: Thu, 21 Sep 2023 17:26:29 +0800 Subject: [PATCH 3/5] limit the size of log file --- interactive_engine/assembly/src/bin/groot/store_ctl.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/interactive_engine/assembly/src/bin/groot/store_ctl.sh b/interactive_engine/assembly/src/bin/groot/store_ctl.sh index 4d14d6a4d62a..bcc3e3b03a08 100755 --- a/interactive_engine/assembly/src/bin/groot/store_ctl.sh +++ b/interactive_engine/assembly/src/bin/groot/store_ctl.sh @@ -98,7 +98,7 @@ start_server() { -XX:+PrintGCApplicationStoppedTime -Xloggc:${LOG_DIR}/${LOG_NAME}.gc.log -XX:+UseGCLogFileRotation - -XX:NumberOfGCLogFiles=32 + -XX:NumberOfGCLogFiles=4 -XX:GCLogFileSize=64m" java ${java_opt} \ @@ -107,7 +107,7 @@ start_server() { -Dlog.dir="${LOG_DIR}" \ -Dlog.name="${LOG_NAME}" \ -cp "${libpath}" com.alibaba.graphscope.groot.servers.GrootGraph \ - "$@" > >(tee -a "${LOG_DIR}/${LOG_NAME}.out") 2> >(tee -a "${LOG_DIR}/${LOG_NAME}.err" >&2) + "$@" # > >(tee -a "${LOG_DIR}/${LOG_NAME}.out") 2> >(tee -a "${LOG_DIR}/${LOG_NAME}.err" >&2) } # parse argv From 17eddd911d1556659614af03562c1e2b2a4fd263 Mon Sep 17 00:00:00 2001 From: siyuan0322 Date: Thu, 21 Sep 2023 20:45:23 +0800 Subject: [PATCH 4/5] fix table identifier --- .../databuild/DataBuildMapperOdps.java | 6 +++--- .../databuild/DataBuildMapperOdpsDebug.java | 21 ++++++++++++------- .../dataload/databuild/OfflineBuildOdps.java | 5 +++-- .../databuild/OfflineBuildOdpsDebug.java | 7 ++++--- .../groot/dataload/databuild/Utils.java | 6 +++--- 5 files changed, 27 insertions(+), 18 deletions(-) diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdps.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdps.java index d1c26dd84477..41352370704a 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdps.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdps.java @@ -66,10 +66,10 @@ public void setup(TaskContext context) throws IOException { @Override public void map(long recordNum, Record record, TaskContext context) throws IOException { TableInfo tableInfo = context.getInputTableInfo(); - String tableName = tableInfo.getTableName(); - ColumnMappingInfo info = this.fileToColumnMappingInfo.get(tableName); + String identifier = tableInfo.getTableName() + "|" + tableInfo.getPartPath(); + ColumnMappingInfo info = this.fileToColumnMappingInfo.get(identifier); if (info == null) { - logger.warn("Mapper: ignore [{}], table info: [{}]", tableName, tableInfo); + logger.warn("Mapper: ignore [{}], table info: [{}]", identifier, tableInfo); return; } String[] items = Utils.parseRecords(record); diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdpsDebug.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdpsDebug.java index ef27bce76f25..007187aeb1a5 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdpsDebug.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdpsDebug.java @@ -61,10 +61,10 @@ public void setup(TaskContext context) throws IOException { @Override public void map(long recordNum, Record record, TaskContext context) throws IOException { TableInfo tableInfo = context.getInputTableInfo(); - String tableName = tableInfo.getTableName(); - ColumnMappingInfo info = this.fileToColumnMappingInfo.get(tableName); + String identifier = tableInfo.getTableName() + "|" + tableInfo.getPartPath(); + ColumnMappingInfo info = this.fileToColumnMappingInfo.get(identifier); if (info == null) { - logger.warn("Mapper: ignore [{}], table info: [{}]", tableName, tableInfo); + logger.warn("Mapper: ignore [{}], table info: [{}]", identifier, tableInfo); return; } @@ -75,9 +75,8 @@ public void map(long recordNum, Record record, TaskContext context) throws IOExc GraphElement type = this.graphSchema.getElement(labelId); Map colMap = info.getPropertiesColMap(); Map properties = Utils.buildProperties(type, items, colMap); - if (type instanceof GraphVertex) { - outKey.set(1, getVertexRawKeys((GraphVertex) type, items)); + outKey.set(1, getVertexRawKeys((GraphVertex) type, colMap, items)); BytesRef keyRef = Utils.getVertexKeyRef(dataEncoder, (GraphVertex) type, properties, tableId); outKey.set(0, getVertexKeyEncoded(keyRef)); @@ -99,9 +98,17 @@ public void map(long recordNum, Record record, TaskContext context) throws IOExc } } - private String getVertexRawKeys(GraphVertex type, String[] items) throws IOException { + private String getVertexRawKeys(GraphVertex type, Map colMap, String[] items) + throws IOException { List pkIds = SchemaUtils.getVertexPrimaryKeyList(type); - return concatenateItemsByIndices(items, pkIds); + List indices = new ArrayList<>(); + colMap.forEach( + (idx, propId) -> { + if (pkIds.contains(propId)) { + indices.add(idx); + } + }); + return concatenateItemsByIndices(items, indices); } private String getEdgeRawKeys(ColumnMappingInfo info, String[] items) throws IOException { diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java index 80098affda40..6bdd55e3d1a3 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java @@ -54,7 +54,7 @@ public static void main(String[] args) throws IOException { odps = SessionState.get().getOdps(); String configStr = properties.getProperty(DataLoadConfig.COLUMN_MAPPING_CONFIG); - configStr = Utils.replaceVars(configStr, Arrays.copyOfRange(args,1, args.length)); + configStr = Utils.replaceVars(configStr, Arrays.copyOfRange(args, 1, args.length)); String graphEndpoint = properties.getProperty(DataLoadConfig.GRAPH_ENDPOINT); String username = properties.getProperty(DataLoadConfig.USER_NAME, ""); @@ -95,7 +95,8 @@ public static void main(String[] args) throws IOException { job.setMapOutputKeySchema(SchemaUtils.fromString("key:string")); job.setMapOutputValueSchema(SchemaUtils.fromString("value:string")); - mappingConfig.forEach((name, x) -> InputUtils.addTable(Utils.parseTableURL(odps, name), job)); + mappingConfig.forEach( + (name, x) -> InputUtils.addTable(Utils.parseTableURL(odps, name), job)); String dataSinkType = properties.getProperty(DataLoadConfig.DATA_SINK_TYPE, "VOLUME"); Map config; diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdpsDebug.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdpsDebug.java index f07d69296f67..53ef826d0643 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdpsDebug.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdpsDebug.java @@ -52,7 +52,7 @@ public static void main(String[] args) throws IOException, OdpsException { odps = SessionState.get().getOdps(); String configStr = properties.getProperty(DataLoadConfig.COLUMN_MAPPING_CONFIG); - configStr = Utils.replaceVars(configStr, Arrays.copyOfRange(args,1, args.length)); + configStr = Utils.replaceVars(configStr, Arrays.copyOfRange(args, 1, args.length)); String graphEndpoint = properties.getProperty(DataLoadConfig.GRAPH_ENDPOINT); String username = properties.getProperty(DataLoadConfig.USER_NAME, ""); @@ -78,7 +78,8 @@ public static void main(String[] args) throws IOException, OdpsException { job.setMapperClass(DataBuildMapperOdpsDebug.class); job.setNumReduceTasks(0); - mappingConfig.forEach((name, x) -> InputUtils.addTable(Utils.parseTableURL(odps, name), job)); + mappingConfig.forEach( + (name, x) -> InputUtils.addTable(Utils.parseTableURL(odps, name), job)); String outputTable = properties.getProperty(DataLoadConfig.OUTPUT_TABLE); OutputUtils.addTable(Utils.parseTableURL(odps, outputTable), job); GraphSchema schema = GraphDef.parseProto(graphDefPb); @@ -91,7 +92,7 @@ public static void main(String[] args) throws IOException, OdpsException { outputMeta.put(DataLoadConfig.SCHEMA_JSON, schemaJson); outputMeta.put(DataLoadConfig.COLUMN_MAPPINGS, mapper.writeValueAsString(info)); job.set(DataLoadConfig.META_INFO, mapper.writeValueAsString(outputMeta)); - + System.out.println(mapper.writeValueAsString(outputMeta)); JobClient.runJob(job); } } diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/Utils.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/Utils.java index 4d1a587852d0..dcee9e8e2113 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/Utils.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/Utils.java @@ -48,14 +48,14 @@ public static Map getMappingInfo( (fileName, fileColumnMapping) -> { ColumnMappingInfo subInfo = fileColumnMapping.toColumnMappingInfo(schema); // Note the project and partition is stripped (if exists) - columnMappingInfo.put(Utils.getTableName(odps, fileName), subInfo); + columnMappingInfo.put(Utils.getTableIdentifier(odps, fileName), subInfo); }); return columnMappingInfo; } - public static String getTableName(Odps odps, String tableFullName) { + public static String getTableIdentifier(Odps odps, String tableFullName) { TableInfo info = parseTableURL(odps, tableFullName); - return info.getTableName(); + return info.getTableName() + "|" + info.getPartPath(); } /** From 1e652037541d67d96e6d3295481f88b8782e58a9 Mon Sep 17 00:00:00 2001 From: siyuan0322 Date: Fri, 22 Sep 2023 09:54:54 +0800 Subject: [PATCH 5/5] replace an identifier --- .../groot/dataload/databuild/DataBuildMapperOdpsDebug.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdpsDebug.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdpsDebug.java index 007187aeb1a5..ba2b9a42577f 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdpsDebug.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdpsDebug.java @@ -137,7 +137,7 @@ private String getVertexKeyEncoded(BytesRef keyRef) { ByteBuffer buffer = ByteBuffer.wrap(keyRef.getBytes()); long tableId = buffer.getLong(0); long hashId = buffer.getLong(8); - return tableId + "|" + hashId; + return tableId + "/" + hashId; } private String getEdgeKeyEncoded(BytesRef keyRef) { @@ -146,6 +146,6 @@ private String getEdgeKeyEncoded(BytesRef keyRef) { long srcHashId = buffer.getLong(8); long dstHashId = buffer.getLong(16); long eid = buffer.getLong(24); - return tableId + "|" + srcHashId + "|" + dstHashId + "|" + eid; + return tableId + "/" + srcHashId + "/" + dstHashId + "/" + eid; } }