Skip to content

Commit

Permalink
[GIE Compiler] fix bugs of columnId in schema
Browse files Browse the repository at this point in the history
refactor(flex): Replace the Adhoc csv reader with Arrow CSV reader (#3154)

1. Use Arrow CSV Reader to replace current adhoc csv reader, to support
more configurable options in `bulk_load.yaml`.
2. Introduce `CSVFragmentLoader`, `BasicFragmentLoader` for
`MutablePropertyFragment`.

With this PR merged, `MutablePropertyFragment` will support loading
fragment from csv with options:
- delimeter: default '|'
- header_row: default true
- quoting: default false
- quoting_char: default '"'
- escaping: default false
- escaping_char: default'\\'
- batch_size: the batch size of when reading file into memory, default
1MB.
- batch_reader: default false. If set to true,
`arrow::csv::StreamingReader` will be used to parse the input file.
Otherwise, `arrow::TableReader` will be used.

With this PR merged, the performance of graph loading will be improved.
The Adhoc Reader denote the current implemented csv parser, 1,2,4,8
denotes the parallelism of graph loading, i.e. how many labels of
vertex/edge are concurrently processed.

Note that TableReader is around 10x faster than StreamingReader. The
possible reason could be the multi-threading is used.
See [arrow-csv-doc](https://arrow.apache.org/docs/cpp/csv.html) for
details.

| Reader | Phase | 1 | 2 | 4 | 8 |
| --------- | -------------- | ---------- |---------- |----------
|---------- |
| Adhoc Reader | ReadFile\+LoadGraph |805s|	468s|	349s|	313s|
| Adhoc Reader | Serialization | 126s|	126s|	126s|	126s|
| Adhoc Reader  | **Total** |931s|	594s|	475s|	439s|
| Table Reader |  ReadFile | 9s	|9s	|9s|	9s|
| Table Reader | LoadGraph |455s|	280s|	211s|	182s|
| Table Reader |Serialization |126s|	126s|	126s|	126s|
| Table Reader | **Total** | 600s|	415s|	346s|	317s|
| Streaming Reader | ReadFile |91s|	91s|	91s|	91s|
| Streaming Reader | LoadGraph | 555s|	289s|	196s|	149s|
| Streaming Reader | Serialization |126s|	126s|	126s|	126s|
| Streaming Reader | **Total** | 772s|	506s|	413s|	366s|

| Reader | Phase | 1 | 2 | 4 | 8 |
| --------- | -------------- | ---------- |---------- |----------
|---------- |
| Adhoc Reader | ReadFile\+LoadGraph |2720s|	1548s|	1176s|	948s|
| Adhoc Reader | Serialization | 409s|	409s|	409s|	409s|
| Adhoc Reader  | **Total** | 3129s|	1957s|	1585s|	1357s|
| Table Reader |  ReadFile |24s|	24s|	24s|	24s|
| Table Reader | LoadGraph |1576s|	949s|	728s|	602s|
| Table Reader |Serialization |409s|	409s|	409s|	409s|
| Table Reader | **Total** | 2009s|	1382s|	1161s|	1035s|
| Streaming Reader | ReadFile |300s|	300s|	300s|	300s|
| Streaming Reader | LoadGraph | 1740s|	965s|	669s|	497s|
| Streaming Reader | Serialization | 409s|	409s|	409s|	409s|
| Streaming Reader | **Total** | 2539s|	1674s|	1378s|	1206s|
| Reader | Phase | 1 | 2 | 4 | 8 |
| --------- | -------------- | ---------- |---------- |----------
|---------- |
| Adhoc Reader | ReadFile\+LoadGraph | 8260s|	4900s	|3603s	|2999s|
| Adhoc Reader | Serialization | 1201s |	1201s|	1201s	|1201s|
| Adhoc Reader  | **Total** | 9461s|	6101s | 4804s	|4200s|
| Table Reader |  ReadFile | 73s	|73s|	96s|	96s|
| Table Reader | LoadGraph |4650s|	2768s|	2155s	|1778s|
| Table Reader |Serialization | 1201s |	1201s|	1201s	|1201s|
| Table Reader | **Total** | 5924s|	4042s|	3452s|	3075s|
| Streaming Reader | ReadFile | 889s |889s | 889s| 889s|
| Streaming Reader | LoadGraph | 5589s|	3005s|	2200s|	1712s|
| Streaming Reader | Serialization | 1201s| 1201s| 1201s |1201s |
| Streaming Reader | **Total** | 7679s	| 5095s |4290s| 	3802s|

FIx #3116

minor fix and move modern graph

fix grin test

todo: do_start

fix

fix

stash

fix

fix

make rules unique

dockerfile stash

minor change

remove plugin-dir

fix

minor fix

debug

debug

fix

fix

fix bulk_load.yaml

bash format

some fix

fix format

fix grin test

some fi

check ci

fix ci

set

fix ci

fix

dd

f

disable tmate

fix some bug

fix

fix

refactor

fix

fix

fix

minor

some fix

fix

support default src_dst primarykey mapping in bulk load

fix

fix

fix

fix

Ci

rename

fix java and add get_person_name.cypher
  • Loading branch information
shirly121 authored and zhanglei1949 committed Sep 21, 2023
1 parent 9f012b7 commit 5a3cd85
Show file tree
Hide file tree
Showing 57 changed files with 3,274 additions and 1,103 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/flex.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ jobs:
cd flex/engines/graph_db/grin
mkdir build && cd build
cmake .. && sudo make -j$(nproc)
export FLEX_DATA_DIR=../../../../storages/rt_mutable_graph/modern_graph/
./run_grin_test
export FLEX_DATA_DIR=../../../../interactive/examples/modern_graph/
./run_grin_test 'flex://schema_file=../../../../interactive/examples/modern_graph/modern_graph.yaml&bulk_load_file=../../../../interactive/examples/modern_graph/bulk_load.yaml'
- name: Test Graph Loading on modern graph
env:
Expand Down
30 changes: 13 additions & 17 deletions .github/workflows/hqps-db-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,16 @@ jobs:
cd ${GIE_HOME}/compiler
make build
- name: Prepare dataset
- name: Prepare dataset and workspace
env:
GS_TEST_DIR: ${{ github.workspace }}/gstest
INTERACTIVE_WORKSPACE: /tmp/interactive_workspace
run: |
# download dataset
git clone -b master --single-branch --depth=1 https://github.com/GraphScope/gstest.git ${GS_TEST_DIR}
git clone -b master --single-branch --depth=1 https://github.com/zhanglei1949/gstest.git ${GS_TEST_DIR}
mkdir -p ${INTERACTIVE_WORKSPACE}/data/ldbc
GRAPH_SCHEMA_YAML=${GS_TEST_DIR}/flex/ldbc-sf01-long-date/audit_graph_schema.yaml
cp ${GRAPH_SCHEMA_YAML} ${INTERACTIVE_WORKSPACE}/data/ldbc/graph.yaml
- name: Sample Query test
env:
Expand All @@ -102,25 +106,16 @@ jobs:
env:
GS_TEST_DIR: ${{ github.workspace }}/gstest
HOME : /home/graphscope/
INTERACTIVE_WORKSPACE: /tmp/interactive_workspace
run: |
GIE_HOME=${GITHUB_WORKSPACE}/interactive_engine
# create tmp ir.compiler.properties
touch /tmp/ir.compiler.properties
echo "engine.type: hiactor" >> /tmp/ir.compiler.properties
echo "graph.schema: ${GS_TEST_DIR}/flex/ldbc-sf01-long-date/ldbc_schema_csr_ic.json" >> /tmp/ir.compiler.properties
echo "graph.store: exp" >> /tmp/ir.compiler.properties
echo "graph.planner.is.on: true" >> /tmp/ir.compiler.properties
echo "graph.planner.opt: RBO" >> /tmp/ir.compiler.properties
echo "graph.planner.rules: FilterMatchRule,NotMatchToAntiJoinRule" >> /tmp/ir.compiler.properties
cd ${GITHUB_WORKSPACE}/flex/bin
for i in 1 2 3 4 5 6 7 8 9 10 11 12;
do
cmd="./load_plan_and_gen.sh -e=hqps -i=../resources/queries/ic/adhoc/ic${i}_adhoc.cypher -w=/tmp/codgen/"
cmd=${cmd}" -o=/tmp/plugin --ir_conf=/tmp/ir.compiler.properties "
cmd=${cmd}" --graph_schema_path=${GS_TEST_DIR}/flex/ldbc-sf01-long-date/ldbc_schema_csr_ic.json"
cmd=${cmd}" -o=/tmp/plugin --ir_conf=${GS_TEST_DIR}/flex/ldbc-sf01-long-date/engine_config.yaml "
cmd=${cmd}" --graph_schema_path=${INTERACTIVE_WORKSPACE}/data/ldbc/graph.yaml"
cmd=${cmd}" --gie_home=${GIE_HOME}"
echo $cmd
eval ${cmd}
Expand All @@ -129,8 +124,8 @@ jobs:
for i in 1 2 3 4 5 6 7 8 9;
do
cmd="./load_plan_and_gen.sh -e=hqps -i=../resources/queries/ic/adhoc/simple_match_${i}.cypher -w=/tmp/codgen/"
cmd=${cmd}" -o=/tmp/plugin --ir_conf=/tmp/ir.compiler.properties "
cmd=${cmd}" --graph_schema_path=${GS_TEST_DIR}/flex/ldbc-sf01-long-date/ldbc_schema_csr_ic.json"
cmd=${cmd}" -o=/tmp/plugin --ir_conf=${GS_TEST_DIR}/flex/ldbc-sf01-long-date/engine_config.yaml "
cmd=${cmd}" --graph_schema_path=${INTERACTIVE_WORKSPACE}/data/ldbc/graph.yaml"
cmd=${cmd}" --gie_home=${GIE_HOME}"
echo $cmd
eval ${cmd}
Expand All @@ -140,9 +135,10 @@ jobs:
env:
GS_TEST_DIR: ${{ github.workspace }}/gstest
HOME : /home/graphscope/
INTERACTIVE_WORKSPACE: /tmp/interactive_workspace
run: |
cd ${GITHUB_WORKSPACE}/flex/tests/hqps/
export FLEX_DATA_DIR=${GS_TEST_DIR}/flex/ldbc-sf01-long-date
export ENGINE_TYPE=hiactor
bash hqps_cypher_test.sh ${GS_TEST_DIR}
bash hqps_cypher_test.sh ${GS_TEST_DIR} ${INTERACTIVE_WORKSPACE}
6 changes: 3 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ core.*

# Flex related
flex/docs/
flex/interactive/data/*/indices/
flex/interactive/data/*/plugins/
flex/interactive/data/*
flex/interactive/logs/*
flex/interactive/examples/sf0.1-raw/
flex/interactive/.running
flex/interactive/.running
flex/interactive/.env
106 changes: 86 additions & 20 deletions flex/bin/load_plan_and_gen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,11 @@ fi
#fi

cypher_to_plan() {
if [ $# -ne 7 ]; then
echo "Usage: $0 <query_name> <input_file> <output_plan file> <output_yaml_file>"
echo " <ir_compiler_properties> <graph_schema_path> <gie_home>, but receive: "$#
if [ $# -ne 9 ]; then
echo "Usage: cypher_to_plan <query_name> <input_file> <output_plan file> <output_yaml_file>"
echo " <ir_compiler_properties> <graph_schema_path> <gie_home>"
echo " <procedure_name> <procedure_description>"
echo " but receive: "$#
exit 1
fi
query_name=$1
Expand All @@ -81,6 +83,10 @@ cypher_to_plan() {
graph_schema_path=$6
GIE_HOME=$7

# get procedure_name and procedure_description
procedure_name=$8
procedure_description=$9

# find java executable
echo "IR compiler properties = ${ir_compiler_properties}"
#check file exists
Expand Down Expand Up @@ -122,8 +128,8 @@ cypher_to_plan() {
exit 1
fi
# add extrac_key_value_config
extra_config="name:${query_name}"
extra_config="${extra_config},description:Autogenerated stored procedure configuration yaml for ${query_name}"
extra_config="name:${procedure_name}"
extra_config="${extra_config},description:${procedure_description}"

cmd="java -cp ${GIE_HOME}/compiler/target/libs/*:${compiler_jar}"
cmd="${cmd} -Dgraph.schema=${graph_schema_path}"
Expand All @@ -150,26 +156,43 @@ cypher_to_plan() {

compile_hqps_so() {
#check input params size eq 2 or 3
if [ $# -ne 5 ] && [ $# -ne 6 ]; then
echo "Usage: $0 <input_file> <work_dir> <ir_compiler_properties_file> <graph_schema_file> <GIE_HOME> [output_dir]"
if [ $# -gt 8 ] || [ $# -lt 5 ]; then
echo "Usage: $0 <input_file> <work_dir> <ir_compiler_properties_file>"
echo " <graph_schema_file> <GIE_HOME> "
echo " [output_dir] [stored_procedure_name] [stored_procedure_description]"
exit 1
fi
input_path=$1
work_dir=$2
ir_compiler_properties=$3
graph_schema_path=$4
gie_home=$5
if [ $# -eq 6 ]; then
if [ $# -ge 6 ]; then
output_dir=$6
else
output_dir=${work_dir}
fi

if [ $# -ge 7 ]; then
procedure_name=$7
else
procedure_name=""
fi

if [ $# -ge 8 ]; then
procedure_description=$8
else
procedure_description=""
fi

echo "Input path = ${input_path}"
echo "Work dir = ${work_dir}"
echo "ir compiler properties = ${ir_compiler_properties}"
echo "graph schema path = ${graph_schema_path}"
echo "GIE_HOME = ${gie_home}"
echo "Output dir = ${output_dir}"
echo "Procedure name = ${procedure_name}"
echo "Procedure description = ${procedure_description}"

last_file_name=$(basename ${input_path})

Expand All @@ -188,15 +211,24 @@ compile_hqps_so() {
echo "Expect a .pb or .cc file"
exit 1
fi
# if procedure_name is not set, use query_name
if [ -z "${procedure_name}" ]; then
procedure_name=${query_name}
fi
# if procedure_description is not set, use query_name
if [ -z "${procedure_description}" ]; then
procedure_description="Stored procedure for ${procedure_name}"
fi
cur_dir=${work_dir}
mkdir -p ${cur_dir}
output_cc_path="${cur_dir}/${query_name}.cc"
output_cc_path="${cur_dir}/${procedure_name}.cc"
dst_yaml_path="${output_dir}/${procedure_name}.yaml"
if [[ $(uname) == "Linux" ]]; then
output_so_path="${cur_dir}/lib${query_name}.so"
dst_so_path="${output_dir}/lib${query_name}.so"
output_so_path="${cur_dir}/lib${procedure_name}.so"
dst_so_path="${output_dir}/lib${procedure_name}.so"
elif [[ $(uname) == "Darwin" ]]; then
output_so_path="${cur_dir}/lib${query_name}.dylib"
dst_so_path="${output_dir}/lib${query_name}.dylib"
output_so_path="${cur_dir}/lib${procedure_name}.dylib"
dst_so_path="${output_dir}/lib${procedure_name}.dylib"
else
echo "Not support OS."
exit 1
Expand All @@ -209,11 +241,14 @@ compile_hqps_so() {
eval ${cmd}
echo "----------------------------"
elif [[ $last_file_name == *.cypher ]]; then
echo "Generating code from cypher query"
echo "Generating code from cypher query, procedure name: ${procedure_name}, description: ${procedure_description}"
# first do .cypher to .pb
output_pb_path="${cur_dir}/${query_name}.pb"
output_yaml_path="${cur_dir}/${query_name}.yaml"
cypher_to_plan ${query_name} ${input_path} ${output_pb_path} ${output_yaml_path} ${ir_compiler_properties} ${graph_schema_path} ${gie_home}
output_pb_path="${cur_dir}/${procedure_name}.pb"
output_yaml_path="${cur_dir}/${procedure_name}.yaml"
cypher_to_plan ${procedure_name} ${input_path} ${output_pb_path} \
${output_yaml_path} ${ir_compiler_properties} ${graph_schema_path} ${gie_home} \
${procedure_name} "${procedure_description}"

echo "----------------------------"
echo "Codegen from cypher query done."
echo "----------------------------"
Expand All @@ -235,7 +270,7 @@ compile_hqps_so() {
cp ${CMAKE_TEMPLATE_PATH} ${cur_dir}/CMakeLists.txt
# run cmake and make in output path.
pushd ${cur_dir}
cmd="cmake . -DQUERY_NAME=${query_name} -DFLEX_INCLUDE_PREFIX=${FLEX_INCLUDE} -DFLEX_LIB_DIR=${FLEX_LIB_DIR}"
cmd="cmake . -DQUERY_NAME=${procedure_name} -DFLEX_INCLUDE_PREFIX=${FLEX_INCLUDE} -DFLEX_LIB_DIR=${FLEX_LIB_DIR}"
# if CMAKE_CXX_COMPILER is set, use it.
if [ ! -z ${CMAKE_CXX_COMPILER} ]; then
cmd="${cmd} -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}"
Expand Down Expand Up @@ -284,7 +319,7 @@ compile_hqps_so() {
fi
# check output_dir doesn't contains output_so_name
if [ -f ${dst_so_path} ]; then
echo "Output dir ${output_dir} already contains ${query_name}.so"
echo "Output dir ${output_dir} already contains ${procedure_name}.so"
echo "Please remove it first."
exit 1
fi
Expand All @@ -294,6 +329,12 @@ compile_hqps_so() {
echo "Copy failed, ${dst_so_path} not exists."
exit 1
fi
# copy the generated yaml
cp ${output_yaml_path} ${output_dir}
if [ ! -f ${dst_yaml_path} ]; then
echo "Copy failed, ${dst_yaml_path} not exists."
exit 1
fi
echo "Finish copying, output to ${dst_so_path}"
}

Expand Down Expand Up @@ -461,6 +502,14 @@ run() {
OUTPUT_DIR="${i#*=}"
shift # past argument=value
;;
--procedure_name=*)
PROCEDURE_NAME="${i#*=}"
shift # past argument=value
;;
--procedure_desc=*)
PROCEDURE_DESCRIPTION="${i#*=}"
shift # past argument=value
;;
-* | --*)
echo "Unknown option $i"
exit 1
Expand All @@ -477,17 +526,34 @@ run() {
echo "graph_schema_path ="${GRAPH_SCHEMA_PATH}
echo "GIE_HOME ="${GIE_HOME}
echo "Output path ="${OUTPUT_DIR}
echo "Procedure name ="${PROCEDURE_NAME}
echo "Procedure description ="${PROCEDURE_DESCRIPTION}

# check input exist
if [ ! -f ${INPUT} ]; then
echo "Input file ${INPUT} not exists."
exit 1
fi

if [ -z "${OUTPUT_DIR}" ]; then
OUTPUT_DIR=${WORK_DIR}
fi

# if engine_type equals hqps
if [ ${ENGINE_TYPE} == "hqps" ]; then
echo "Engine type is hqps, generating dynamic library for hqps engine."
compile_hqps_so ${INPUT} ${WORK_DIR} ${IR_CONF} ${GRAPH_SCHEMA_PATH} ${GIE_HOME} ${OUTPUT_DIR}
# if PROCEDURE_DESCRIPTION is not set, use empty string
if [ -z ${PROCEDURE_DESCRIPTION} ]; then
PROCEDURE_DESCRIPTION="Automatic generated description for stored procedure ${PROCEDURE_NAME}."
fi
# if PROCEDURE_NAME is not set, use input file name
if [ -z "${PROCEDURE_NAME}" ]; then
#remove the suffix of input file, the suffix is .cc or .cypher
PROCEDURE_NAME=$(basename ${INPUT})
PROCEDURE_NAME="${PROCEDURE_NAME%.cc}"
PROCEDURE_NAME="${PROCEDURE_NAME%.pb}"
fi
compile_hqps_so ${INPUT} ${WORK_DIR} ${IR_CONF} ${GRAPH_SCHEMA_PATH} ${GIE_HOME} ${OUTPUT_DIR} ${PROCEDURE_NAME} "${PROCEDURE_DESCRIPTION}"

# else if engine_type equals pegasus
elif [ ${ENGINE_TYPE} == "pegasus" ]; then
Expand Down
Loading

0 comments on commit 5a3cd85

Please sign in to comment.