Skip to content

Commit

Permalink
fix(interactive): Fix several bugs in data-load-tools (#3249)
Browse files Browse the repository at this point in the history
1. Fix that partspec is not included in the table identifier that caused
label id could not be correctly retrieved.
2. Disable the backup instance of reducer, which could truncated the
file that uploaded to volume.
3. Limit the total size of log files.
  • Loading branch information
siyuan0322 authored Sep 22, 2023
1 parent 1e99a1e commit bb91e5c
Show file tree
Hide file tree
Showing 15 changed files with 661 additions and 433 deletions.
2 changes: 1 addition & 1 deletion charts/graphscope-store/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ data:
ingestor.node.count={{ .Values.ingestor.replicaCount }}
coordinator.node.count={{ .Values.coordinator.replicaCount }}
ingestor.queue.count={{ .Values.ingestor.replicaCount }}
partition.count={{ .Values.store.replicaCount | mul 128 }}
partition.count={{ .Values.store.replicaCount | mul 16 }}
engine.type={{ .Values.engineType }}
discovery.mode={{ .Values.discoveryMode }}
Expand Down
4 changes: 2 additions & 2 deletions interactive_engine/assembly/src/bin/groot/store_ctl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ start_server() {
-XX:+PrintGCApplicationStoppedTime
-Xloggc:${LOG_DIR}/${LOG_NAME}.gc.log
-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=32
-XX:NumberOfGCLogFiles=4
-XX:GCLogFileSize=64m"

java ${java_opt} \
Expand All @@ -107,7 +107,7 @@ start_server() {
-Dlog.dir="${LOG_DIR}" \
-Dlog.name="${LOG_NAME}" \
-cp "${libpath}" com.alibaba.graphscope.groot.servers.GrootGraph \
"$@" > >(tee -a "${LOG_DIR}/${LOG_NAME}.out") 2> >(tee -a "${LOG_DIR}/${LOG_NAME}.err" >&2)
"$@" # > >(tee -a "${LOG_DIR}/${LOG_NAME}.out") 2> >(tee -a "${LOG_DIR}/${LOG_NAME}.err" >&2)
}

# parse argv
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ public static long hash(int labelId, List<byte[]> pks) {
}

/**
* Generates 64 bit hash from byte array of the given length and seed.
* Generates 64-bit hash from byte array of the given length and seed.
*
* @param data byte array to hash
* @param length length of the array to hash
* @param seed initial seed value
* @return 64 bit hash of the given array
* @return 64-bit hash of the given array
*/
private static long hash64(final byte[] data, int length, int seed) {
final long m = 0xc6a4a7935bd1e995L;
Expand Down Expand Up @@ -103,11 +103,11 @@ private static long hash64(final byte[] data, int length, int seed) {
}

/**
* Generates 64 bit hash from byte array with default seed value.
* Generates 64-bit hash from byte array with default seed value.
*
* @param data byte array to hash
* @param length length of the array to hash
* @return 64 bit hash of the given string
* @return 64-bit hash of the given string
*/
private static long hash64(final byte[] data, int length) {
return hash64(data, length, 0xc70f6907);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@
package com.alibaba.graphscope.groot.dataload.databuild;

import com.alibaba.graphscope.groot.common.config.DataLoadConfig;
import com.alibaba.graphscope.groot.common.exception.PropertyDefNotFoundException;
import com.alibaba.graphscope.groot.common.schema.api.*;
import com.alibaba.graphscope.groot.common.schema.mapper.GraphSchemaMapper;
import com.alibaba.graphscope.groot.common.schema.wrapper.DataType;
import com.alibaba.graphscope.groot.common.schema.wrapper.PropertyValue;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -32,25 +30,18 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;

public class DataBuildMapper extends Mapper<LongWritable, Text, BytesWritable, BytesWritable> {
private static final Logger logger = LoggerFactory.getLogger(DataBuildMapper.class);

public static final SimpleDateFormat SRC_FMT =
new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
public static final SimpleDateFormat DST_FMT = new SimpleDateFormat("yyyyMMddHHmmssSSS");

private GraphSchema graphSchema;
private DataEncoder dataEncoder;
private String separator;
private Map<String, ColumnMappingInfo> fileToColumnMappingInfo;

private final BytesWritable outKey = new BytesWritable();
private final BytesWritable outVal = new BytesWritable();
private boolean ldbcCustomize; // Do some customize data type transformations for LDBC data
private boolean skipHeader;

@Override
Expand All @@ -65,9 +56,7 @@ protected void setup(Context context) throws IOException {
this.fileToColumnMappingInfo =
objectMapper.readValue(
columnMappingsJson, new TypeReference<Map<String, ColumnMappingInfo>>() {});
this.ldbcCustomize = conf.getBoolean(DataLoadConfig.LDBC_CUSTOMIZE, false);
this.skipHeader = conf.getBoolean(DataLoadConfig.SKIP_HEADER, true);
DST_FMT.setTimeZone(TimeZone.getTimeZone("GMT+00:00"));
}

@Override
Expand All @@ -78,122 +67,41 @@ protected void map(LongWritable key, Text value, Context context)
}
String fullPath = context.getConfiguration().get(MRJobConfig.MAP_INPUT_FILE);
String fileName = fullPath.substring(fullPath.lastIndexOf('/') + 1);
ColumnMappingInfo columnMappingInfo = this.fileToColumnMappingInfo.get(fileName);
if (columnMappingInfo == null) {
ColumnMappingInfo info = this.fileToColumnMappingInfo.get(fileName);
if (info == null) {
logger.warn("Mapper: ignore [{}], fullPath is [{}]", fileName, fullPath);
return;
}

int labelId = columnMappingInfo.getLabelId();
long tableId = columnMappingInfo.getTableId();
Map<Integer, Integer> propertiesColumnMapping = columnMappingInfo.getPropertiesColMap();
String[] items = value.toString().split(separator);

int labelId = info.getLabelId();
long tableId = info.getTableId();
GraphElement type = this.graphSchema.getElement(labelId);
Map<Integer, PropertyValue> propertiesMap =
buildPropertiesMap(type, items, propertiesColumnMapping);
BytesRef valRef = this.dataEncoder.encodeProperties(labelId, propertiesMap);
Map<Integer, Integer> colMap = info.getPropertiesColMap();
Map<Integer, PropertyValue> properties = Utils.buildProperties(type, items, colMap);

BytesRef valRef = dataEncoder.encodeProperties(labelId, properties);
this.outVal.set(valRef.getArray(), valRef.getOffset(), valRef.getLength());

if (type instanceof GraphVertex) {
BytesRef keyBytesRef =
this.dataEncoder.encodeVertexKey((GraphVertex) type, propertiesMap, tableId);
this.outKey.set(
keyBytesRef.getArray(), keyBytesRef.getOffset(), keyBytesRef.getLength());
BytesRef keyRef =
Utils.getVertexKeyRef(dataEncoder, (GraphVertex) type, properties, tableId);
this.outKey.set(keyRef.getArray(), keyRef.getOffset(), keyRef.getLength());
context.write(this.outKey, this.outVal);
} else if (type instanceof GraphEdge) {
int srcLabelId = columnMappingInfo.getSrcLabelId();
Map<Integer, Integer> srcPkColMap = columnMappingInfo.getSrcPkColMap();
GraphElement srcType = this.graphSchema.getElement(srcLabelId);
Map<Integer, PropertyValue> srcPkMap = buildPropertiesMap(srcType, items, srcPkColMap);

int dstLabelId = columnMappingInfo.getDstLabelId();
Map<Integer, Integer> dstPkColMap = columnMappingInfo.getDstPkColMap();
GraphElement dstType = this.graphSchema.getElement(dstLabelId);
Map<Integer, PropertyValue> dstPkMap = buildPropertiesMap(dstType, items, dstPkColMap);

BytesRef outEdgeKeyRef =
this.dataEncoder.encodeEdgeKey(
(GraphVertex) srcType,
srcPkMap,
(GraphVertex) dstType,
dstPkMap,
(GraphEdge) type,
propertiesMap,
tableId,
true);
this.outKey.set(
outEdgeKeyRef.getArray(), outEdgeKeyRef.getOffset(), outEdgeKeyRef.getLength());
BytesRef out =
Utils.getEdgeKeyRef(
dataEncoder, graphSchema, info, items, properties, tableId, true);
this.outKey.set(out.getArray(), out.getOffset(), out.getLength());
context.write(this.outKey, this.outVal);
BytesRef inEdgeKeyRef =
this.dataEncoder.encodeEdgeKey(
(GraphVertex) srcType,
srcPkMap,
(GraphVertex) dstType,
dstPkMap,
(GraphEdge) type,
propertiesMap,
tableId,
false);
this.outKey.set(
inEdgeKeyRef.getArray(), inEdgeKeyRef.getOffset(), inEdgeKeyRef.getLength());
BytesRef in =
Utils.getEdgeKeyRef(
dataEncoder, graphSchema, info, items, properties, tableId, false);
this.outKey.set(in.getArray(), in.getOffset(), in.getLength());
context.write(this.outKey, this.outVal);
} else {
throw new IllegalArgumentException(
"invalid label [" + labelId + "], only support VertexType and EdgeType");
}
}

private Map<Integer, PropertyValue> buildPropertiesMap(
GraphElement typeDef, String[] items, Map<Integer, Integer> columnMapping) {
Map<Integer, PropertyValue> operationProperties = new HashMap<>(columnMapping.size());
columnMapping.forEach(
(colIdx, propertyId) -> {
GraphProperty propertyDef = typeDef.getProperty(propertyId);
if (propertyDef == null) {
throw new PropertyDefNotFoundException(
"property ["
+ propertyId
+ "] not found in ["
+ typeDef.getLabel()
+ "]");
}
if (colIdx >= items.length) {
throw new IllegalArgumentException(
"label ["
+ typeDef.getLabel()
+ "], invalid mapping ["
+ colIdx
+ "] -> ["
+ propertyId
+ "], data ["
+ Arrays.toString(items)
+ "]");
}
DataType dataType = propertyDef.getDataType();

String val = items[colIdx];
if (ldbcCustomize) {
String name = propertyDef.getName();
switch (name) {
case "creationDate":
case "joinDate":
val = convertDate(val);
break;
case "birthday":
val = val.replace("-", "");
break;
}
}
PropertyValue propertyValue = new PropertyValue(dataType, val);
operationProperties.put(propertyId, propertyValue);
});
return operationProperties;
}

public static String convertDate(String input) {
try {
return DST_FMT.format(SRC_FMT.parse(input));
} catch (ParseException e) {
throw new RuntimeException(e);
throw new IllegalArgumentException("Invalid label " + labelId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
package com.alibaba.graphscope.groot.dataload.databuild;

import com.alibaba.graphscope.groot.common.config.DataLoadConfig;
import com.alibaba.graphscope.groot.common.exception.PropertyDefNotFoundException;
import com.alibaba.graphscope.groot.common.schema.api.*;
import com.alibaba.graphscope.groot.common.schema.mapper.GraphSchemaMapper;
import com.alibaba.graphscope.groot.common.schema.wrapper.DataType;
import com.alibaba.graphscope.groot.common.schema.wrapper.PropertyValue;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.MapperBase;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -66,114 +65,41 @@ public void setup(TaskContext context) throws IOException {

@Override
public void map(long recordNum, Record record, TaskContext context) throws IOException {
String tableName = context.getInputTableInfo().getTableName();
ColumnMappingInfo columnMappingInfo = this.fileToColumnMappingInfo.get(tableName);
if (columnMappingInfo == null) {
logger.warn(
"Mapper: ignore [{}], table info is [{}]",
tableName,
context.getInputTableInfo());
TableInfo tableInfo = context.getInputTableInfo();
String identifier = tableInfo.getTableName() + "|" + tableInfo.getPartPath();
ColumnMappingInfo info = this.fileToColumnMappingInfo.get(identifier);
if (info == null) {
logger.warn("Mapper: ignore [{}], table info: [{}]", identifier, tableInfo);
return;
}
String[] items = Utils.parseRecords(record);

int labelId = columnMappingInfo.getLabelId();
long tableId = columnMappingInfo.getTableId();
int columnCount = record.getColumnCount();
String[] items = new String[columnCount];
for (int i = 0; i < columnCount; i++) {
if (record.get(i) == null) {
// items[i] = "";
items[i] = null;
} else {
items[i] = record.get(i).toString();
}
}
Map<Integer, Integer> propertiesColumnMapping = columnMappingInfo.getPropertiesColMap();
int labelId = info.getLabelId();
long tableId = info.getTableId();
GraphElement type = this.graphSchema.getElement(labelId);
Map<Integer, PropertyValue> propertiesMap =
buildPropertiesMap(type, items, propertiesColumnMapping);
BytesRef valRef = this.dataEncoder.encodeProperties(labelId, propertiesMap);
Map<Integer, Integer> colMap = info.getPropertiesColMap();
Map<Integer, PropertyValue> properties = Utils.buildProperties(type, items, colMap);

BytesRef valRef = this.dataEncoder.encodeProperties(labelId, properties);
outVal.set(new Object[] {new String(valRef.getBytes(), charSet)});
if (type instanceof GraphVertex) {
BytesRef keyRef =
this.dataEncoder.encodeVertexKey((GraphVertex) type, propertiesMap, tableId);
Utils.getVertexKeyRef(dataEncoder, (GraphVertex) type, properties, tableId);
outKey.set(new Object[] {new String(keyRef.getBytes(), charSet)});
context.write(outKey, outVal);
} else if (type instanceof GraphEdge) {
int srcLabelId = columnMappingInfo.getSrcLabelId();
Map<Integer, Integer> srcPkColMap = columnMappingInfo.getSrcPkColMap();
GraphElement srcType = this.graphSchema.getElement(srcLabelId);
Map<Integer, PropertyValue> srcPkMap = buildPropertiesMap(srcType, items, srcPkColMap);

int dstLabelId = columnMappingInfo.getDstLabelId();
Map<Integer, Integer> dstPkColMap = columnMappingInfo.getDstPkColMap();
GraphElement dstType = this.graphSchema.getElement(dstLabelId);
Map<Integer, PropertyValue> dstPkMap = buildPropertiesMap(dstType, items, dstPkColMap);

BytesRef outEdgeKeyRef =
this.dataEncoder.encodeEdgeKey(
(GraphVertex) srcType,
srcPkMap,
(GraphVertex) dstType,
dstPkMap,
(GraphEdge) type,
propertiesMap,
tableId,
true);
outKey.set(new Object[] {new String(outEdgeKeyRef.getBytes(), charSet)});
BytesRef out =
Utils.getEdgeKeyRef(
dataEncoder, graphSchema, info, items, properties, tableId, true);
outKey.set(new Object[] {new String(out.getBytes(), charSet)});
context.write(outKey, outVal);
BytesRef inEdgeKeyRef =
this.dataEncoder.encodeEdgeKey(
(GraphVertex) srcType,
srcPkMap,
(GraphVertex) dstType,
dstPkMap,
(GraphEdge) type,
propertiesMap,
tableId,
false);
outKey.set(new Object[] {new String(inEdgeKeyRef.getBytes(), charSet)});
BytesRef in =
Utils.getEdgeKeyRef(
dataEncoder, graphSchema, info, items, properties, tableId, false);
outKey.set(new Object[] {new String(in.getBytes(), charSet)});
context.write(outKey, outVal);
} else {
throw new IllegalArgumentException(
"invalid label [" + labelId + "], only support VertexType and EdgeType");
throw new IllegalArgumentException("Invalid label " + labelId);
}
}

private Map<Integer, PropertyValue> buildPropertiesMap(
GraphElement typeDef, String[] items, Map<Integer, Integer> columnMapping) {
Map<Integer, PropertyValue> operationProperties = new HashMap<>(columnMapping.size());
columnMapping.forEach(
(colIdx, propertyId) -> {
GraphProperty propertyDef = typeDef.getProperty(propertyId);
if (propertyDef == null) {
throw new PropertyDefNotFoundException(
"property ["
+ propertyId
+ "] not found in ["
+ typeDef.getLabel()
+ "]");
}
if (colIdx >= items.length) {
throw new IllegalArgumentException(
"label ["
+ typeDef.getLabel()
+ "], invalid mapping ["
+ colIdx
+ "] -> ["
+ propertyId
+ "], data ["
+ Arrays.toString(items)
+ "]");
}
String val = items[colIdx];
PropertyValue propertyValue = null;
if (val != null) {
DataType dataType = propertyDef.getDataType();
propertyValue = new PropertyValue(dataType, val);
}
operationProperties.put(propertyId, propertyValue);
});
return operationProperties;
}
}
Loading

0 comments on commit bb91e5c

Please sign in to comment.