diff --git a/.github/free-disk-space.sh b/.github/free-disk-space.sh new file mode 100755 index 000000000000..52d311dc7385 --- /dev/null +++ b/.github/free-disk-space.sh @@ -0,0 +1,51 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# +# The Azure provided machines typically have the following disk allocation: +# Total space: 85GB +# Allocated: 67 GB +# Free: 17 GB +# This script frees up 28 GB of disk space by deleting unneeded packages and +# large directories. +# The Flink end to end tests download and generate more than 17 GB of files, +# causing unpredictable behavior and build failures. +# +echo "==============================================================================" +echo "Freeing up disk space on CI system" +echo "==============================================================================" + +echo "Listing 100 largest packages" +dpkg-query -Wf '${Installed-Size}\t${Package}\n' | sort -n | tail -n 100 +df -h +echo "Removing large packages" +sudo apt-get remove -y '^ghc-8.*' +sudo apt-get remove -y '^dotnet-.*' +sudo apt-get remove -y '^llvm-.*' +sudo apt-get remove -y 'php.*' +sudo apt-get remove -y azure-cli google-cloud-sdk hhvm google-chrome-stable firefox powershell mono-devel +sudo apt-get autoremove -y +sudo apt-get clean +df -h +echo "Removing large directories" +# deleting 15GB +rm -rf /usr/share/dotnet/ +sudo rm -rf /usr/share/dotnet +sudo rm -rf /usr/local/lib/android +sudo rm -rf /opt/ghc +sudo rm -rf /opt/hostedtoolcache/CodeQL +df -h diff --git a/.github/workflows/build-graphscope-images-linux.yml b/.github/workflows/build-graphscope-images-linux.yml index 74a757c15893..e2bfc585c4c8 100644 --- a/.github/workflows/build-graphscope-images-linux.yml +++ b/.github/workflows/build-graphscope-images-linux.yml @@ -45,7 +45,7 @@ jobs: - name: Build GraphScope Image run: | - curl -sSL https://raw.githubusercontent.com/sighingnow/libclang/master/.github/free-disk-space.sh | bash + bash ${GITHUB_WORKSPACE}/.github/free-disk-space.sh || true cd ${GITHUB_WORKSPACE}/k8s df -h diff --git a/.github/workflows/build-graphscope-wheels-linux.yml b/.github/workflows/build-graphscope-wheels-linux.yml index 8412571c38c0..e6a42216d079 100644 --- a/.github/workflows/build-graphscope-wheels-linux.yml +++ b/.github/workflows/build-graphscope-wheels-linux.yml @@ -107,6 +107,8 @@ jobs: - name: Build Wheel Package run: | + bash ${GITHUB_WORKSPACE}/.github/free-disk-space.sh || true + # Due to an observation of changing hostname in github runners, # append 127.0.0.1 to etc/hosts to avoid DNS lookup. r=`cat /etc/hosts | grep $(hostname) || true` diff --git a/.github/workflows/k8s-ci.yml b/.github/workflows/k8s-ci.yml index 3ce7fa5fde00..e3fa7e4c891a 100644 --- a/.github/workflows/k8s-ci.yml +++ b/.github/workflows/k8s-ci.yml @@ -385,7 +385,7 @@ jobs: run: | cd ${GITHUB_WORKSPACE}/python pip3 install -r requirements.txt -r requirements-dev.txt - pip3 install pytest pytest-cov pytest-timeout + pip3 install pytest pytest-cov pytest-timeout pytest-xdist # build python client proto cd ${GITHUB_WORKSPACE}/python @@ -461,7 +461,10 @@ jobs: export GS_ADDR=${NODE_IP}:${NODE_PORT} cd ${GITHUB_WORKSPACE}/python - python3 -m pytest -s -vvv ./graphscope/tests/kubernetes/test_demo_script.py -k test_helm_installation + python3 -m pytest -d --tx popen//python=python3 \ + -s -vvv \ + ./graphscope/tests/kubernetes/test_demo_script.py \ + -k test_helm_installation - uses: dashanji/kubernetes-log-export-action@v5 env: @@ -488,10 +491,11 @@ jobs: cd ${GITHUB_WORKSPACE}/python export PATH=${HOME}/.local/bin:${PATH} - python3 -m pytest --ignore=./graphscope/tests/kubernetes/test_store_service.py \ - --cov=graphscope --cov-config=.coveragerc --cov-report=xml \ - --cov-report=term -s -vvv --log-cli-level=INFO \ - ./graphscope/tests/kubernetes + python3 -m pytest -d --tx popen//python=python3 \ + --ignore=./graphscope/tests/kubernetes/test_store_service.py \ + --cov=graphscope --cov-config=.coveragerc --cov-report=xml --cov-report=term \ + -s -vvv --log-cli-level=INFO \ + ./graphscope/tests/kubernetes - name: Upload Coverage uses: codecov/codecov-action@v3 @@ -526,7 +530,10 @@ jobs: # run test cd ${GITHUB_WORKSPACE}/python - python3 -m pytest -s -vvv ./graphscope/tests/kubernetes/test_demo_script.py -k test_demo_on_hdfs + python3 -m pytest -d --tx popen//python=python3 \ + -s -vvv \ + ./graphscope/tests/kubernetes/test_demo_script.py \ + -k test_demo_on_hdfs # Check the result file have successfully written to the given location # hdfs dfs -test -e /ldbc_sample/res.csv_0 && hdfs dfs -test -e /ldbc_sample/res.csv_1 @@ -580,7 +587,7 @@ jobs: run: | cd ${GITHUB_WORKSPACE}/python pip3 install -r requirements.txt -r requirements-dev.txt - pip3 install pytest pytest-cov pytest-timeout + pip3 install pytest pytest-cov pytest-timeout pytest-xdist # build python client proto python3 setup.py build_proto diff --git a/.github/workflows/local-ci.yml b/.github/workflows/local-ci.yml index 2ee98d5a4ab0..b988e5296dfe 100644 --- a/.github/workflows/local-ci.yml +++ b/.github/workflows/local-ci.yml @@ -281,20 +281,22 @@ jobs: cd artifacts tar -zxf ./wheel-${{ github.sha }}/client.tar.gz pushd python/dist/wheelhouse - for f in * ; do python3 -m pip install $f || true; done + for f in * ; do python3 -m pip install --no-cache-dir $f || true; done popd # install graphscope tar -zxf ./wheel-${{ github.sha }}/graphscope.tar.gz pushd coordinator/dist - python3 -m pip install ./*.whl + python3 -m pip install --no-cache-dir ./*.whl popd pushd coordinator/dist/wheelhouse - python3 -m pip install ./*.whl + python3 -m pip install --no-cache-dir ./*.whl popd # install tensorflow - python3 -m pip install pytest "tensorflow" "pandas" --user + python3 -m pip install --no-cache-dir pytest pytest-xdist "tensorflow" "pandas" --user + # install pytorch + python3 -m pip install --no-cache-dir "torch" --index-url https://download.pytorch.org/whl/cpu # install java sudo apt update -y && sudo apt install openjdk-11-jdk -y @@ -306,7 +308,9 @@ jobs: env: GS_TEST_DIR: ${{ github.workspace }}/gstest run: | - python3 -m pytest -s -v $(dirname $(python3 -c "import graphscope; print(graphscope.__file__)"))/tests/minitest + python3 -m pytest -d --tx popen//python=python3 \ + -s -v \ + $(dirname $(python3 -c "import graphscope; print(graphscope.__file__)"))/tests/minitest - name: Upload GIE log if: failure() @@ -343,20 +347,20 @@ jobs: cd artifacts tar -zxf ./wheel-${{ github.sha }}/client.tar.gz pushd python/dist/wheelhouse - for f in * ; do python3 -m pip install $f || true; done + for f in * ; do python3 -m pip install --no-cache-dir $f || true; done popd # install graphscope tar -zxf ./wheel-${{ github.sha }}/graphscope.tar.gz pushd coordinator/dist - python3 -m pip install ./*.whl + python3 -m pip install --no-cache-dir ./*.whl popd pushd coordinator/dist/wheelhouse - python3 -m pip install ./*.whl + python3 -m pip install --no-cache-dir ./*.whl popd # install pytest - python3 -m pip install pytest pytest-cov pytest-timeout + python3 -m pip install --no-cache-dir pytest pytest-cov pytest-timeout pytest-xdist - name: Setup tmate session uses: mxschmitt/action-tmate@v3 @@ -371,9 +375,11 @@ jobs: # download dataset git clone -b master --single-branch --depth=1 https://github.com/7br/gstest.git ${GS_TEST_DIR} - python3 -m pytest -s -v --cov=graphscope --cov-config=python/.coveragerc \ - --cov-report=xml --cov-report=term --exitfirst \ - $(dirname $(python3 -c "import graphscope; print(graphscope.__file__)"))/tests/unittest + python3 -m pytest -d --tx popen//python=python3 \ + -s -v \ + --cov=graphscope --cov-config=python/.coveragerc --cov-report=xml --cov-report=term \ + --exitfirst \ + $(dirname $(python3 -c "import graphscope; print(graphscope.__file__)"))/tests/unittest - name: Upload Coverage if: ${{ needs.changes.outputs.gae-python == 'true' || github.ref == 'refs/heads/main' }} @@ -429,20 +435,20 @@ jobs: cd artifacts tar -zxf ./wheel-${{ github.sha }}/client.tar.gz pushd python/dist/wheelhouse - for f in * ; do python3 -m pip install $f || true; done + for f in * ; do python3 -m pip install --no-cache-dir $f || true; done popd # install graphscope tar -zxf ./wheel-${{ github.sha }}/graphscope.tar.gz pushd coordinator/dist - python3 -m pip install ./*.whl + python3 -m pip install --no-cache-dir ./*.whl popd pushd coordinator/dist/wheelhouse - python3 -m pip install ./*.whl + python3 -m pip install --no-cache-dir ./*.whl popd # install pytest - python3 -m pip install pytest + python3 -m pip install --no-cache-dir pytest pytest-xdist # download dataset git clone -b master --single-branch --depth=1 https://github.com/7br/gstest.git ${GS_TEST_DIR} @@ -458,9 +464,10 @@ jobs: GS_TEST_DIR: ${{ github.workspace }}/gstest run: | pip3 show networkx - python3 -m pytest --exitfirst -s -vvv \ - $(dirname $(python3 -c "import graphscope; print(graphscope.__file__)"))/nx/tests \ - --ignore=$(dirname $(python3 -c "import graphscope; print(graphscope.__file__)"))/nx/tests/convert + python3 -m pytest -d --tx popen//python=python3 \ + --exitfirst -s -vvv \ + $(dirname $(python3 -c "import graphscope; print(graphscope.__file__)"))/nx/tests \ + --ignore=$(dirname $(python3 -c "import graphscope; print(graphscope.__file__)"))/nx/tests/convert - name: Convert Test if: ${{ needs.changes.outputs.networkx == 'true' && steps.nx-filter.outputs.convert == 'true' }} @@ -469,8 +476,9 @@ jobs: GS_TEST_DIR: ${{ github.workspace }}/gstest run: | pip3 show networkx - python3 -m pytest --exitfirst -s -vvv \ - $(dirname $(python3 -c "import graphscope; print(graphscope.__file__)"))/nx/tests/convert + python3 -m pytest -d --tx popen//python=python3 \ + --exitfirst -s -vvv \ + $(dirname $(python3 -c "import graphscope; print(graphscope.__file__)"))/nx/tests/convert networkx-algo-and-generator-test: runs-on: ubuntu-20.04 @@ -514,20 +522,20 @@ jobs: cd artifacts tar -zxf ./wheel-${{ github.sha }}/client.tar.gz pushd python/dist/wheelhouse - for f in * ; do python3 -m pip install $f || true; done + for f in * ; do python3 -m pip install --no-cache-dir $f || true; done popd # install graphscope tar -zxf ./wheel-${{ github.sha }}/graphscope.tar.gz pushd coordinator/dist - python3 -m pip install ./*.whl + python3 -m pip install --no-cache-dir ./*.whl popd pushd coordinator/dist/wheelhouse - python3 -m pip install ./*.whl + python3 -m pip install --no-cache-dir ./*.whl popd # install pytest - python3 -m pip install pytest + python3 -m pip install --no-cache-dir pytest pytest-xdist # download dataset git clone -b master --single-branch --depth=1 https://github.com/7br/gstest.git ${GS_TEST_DIR} @@ -539,8 +547,9 @@ jobs: GS_TEST_DIR: ${{ github.workspace }}/gstest run: | pip3 show networkx - python3 -m pytest --exitfirst -s -v \ - $(dirname $(python3 -c "import graphscope; print(graphscope.__file__)"))/nx/algorithms/tests/builtin + python3 -m pytest -d --tx popen//python=python3 \ + --exitfirst -s -v \ + $(dirname $(python3 -c "import graphscope; print(graphscope.__file__)"))/nx/algorithms/tests/builtin - name: Generator test if: ${{ needs.changes.outputs.networkx == 'true' && steps.nx-filter.outputs.generator == 'true' }} @@ -549,13 +558,15 @@ jobs: GS_TEST_DIR: ${{ github.workspace }}/gstest run: | pip3 show networkx - python3 -m pytest --exitfirst -s -v \ - $(dirname $(python3 -c "import graphscope; print(graphscope.__file__)"))/nx/generators/tests + python3 -m pytest -d --tx popen//python=python3 \ + --exitfirst -s -v \ + $(dirname $(python3 -c "import graphscope; print(graphscope.__file__)"))/nx/generators/tests - name: Readwrite test if: ${{ needs.changes.outputs.networkx == 'true' && steps.nx-filter.outputs.io == 'true' }} env: DEPLOYMENT: ${{ matrix.deployment }} run: | - python3 -m pytest --exitfirst -s -v -m "not slow" \ - $(dirname $(python3 -c "import graphscope; print(graphscope.__file__)"))/nx/readwrite/tests + python3 -m pytest -d --tx popen//python=python3 \ + --exitfirst -s -v -m "not slow" \ + $(dirname $(python3 -c "import graphscope; print(graphscope.__file__)"))/nx/readwrite/tests diff --git a/Makefile b/Makefile index 81ecd19aee9b..9ba9ec4b9639 100644 --- a/Makefile +++ b/Makefile @@ -41,6 +41,9 @@ ARCH := $(shell uname -m) VERSION := $(shell cat $(WORKING_DIR)/VERSION) +# pip installation arguments +PIP_ARGS = --timeout=1000 --no-cache-dir + ## Common .PHONY: all graphscope install clean @@ -75,7 +78,8 @@ clean: client: learning cd $(CLIENT_DIR) && \ - python3 -m pip install -r requirements.txt -r requirements-dev.txt --user && \ + python3 -m pip install ${PIP_ARGS} "torch" --index-url https://download.pytorch.org/whl/cpu --user && \ + python3 -m pip install ${PIP_ARGS} -r requirements.txt -r requirements-dev.txt --user && \ export PATH=$(PATH):$(HOME)/.local/bin && \ python3 setup.py build_ext --inplace --user && \ if [ $(WITH_GLTORCH) = ON ]; then \ @@ -86,7 +90,8 @@ client: learning coordinator: client cd $(COORDINATOR_DIR) && \ - python3 -m pip install -r requirements.txt -r requirements-dev.txt --user && \ + python3 -m pip install ${PIP_ARGS} "torch" --index-url https://download.pytorch.org/whl/cpu --user && \ + python3 -m pip install ${PIP_ARGS} -r requirements.txt -r requirements-dev.txt --user && \ python3 setup.py build_builtin && \ python3 -m pip install --user --editable $(COORDINATOR_DIR) && \ rm -rf $(COORDINATOR_DIR)/*.egg-info @@ -169,8 +174,9 @@ $(LEARNING_DIR)/graphlearn/built/lib/libgraphlearn_shared.$(SUFFIX): prepare-client: cd $(CLIENT_DIR) && \ - pip3 install -r requirements.txt --user && \ - pip3 install -r requirements-dev.txt --user && \ + pip3 install ${PIP_ARGS} "torch" --index-url https://download.pytorch.org/whl/cpu --user && \ + pip3 install ${PIP_ARGS} -r requirements.txt --user && \ + pip3 install ${PIP_ARGS} -r requirements-dev.txt --user && \ python3 setup.py build_proto graphscope-docs: prepare-client diff --git a/analytical_engine/apps/pregel/louvain/auxiliary.h b/analytical_engine/apps/pregel/louvain/auxiliary.h index 4cc0fdeb0d52..cae182879263 100644 --- a/analytical_engine/apps/pregel/louvain/auxiliary.h +++ b/analytical_engine/apps/pregel/louvain/auxiliary.h @@ -17,6 +17,7 @@ #define ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_AUXILIARY_H_ #include +#include #include #include "grape/grape.h" @@ -46,7 +47,7 @@ constexpr int phase_one_minor_step_2 = 2; template struct LouvainNodeState { using vid_t = VID_T; - using edata_t = double; + using edata_t = float; vid_t community = 0; edata_t community_sigma_total; @@ -65,7 +66,7 @@ struct LouvainNodeState { bool use_fake_edges = false; bool is_alived_community = true; - std::map fake_edges; + std::vector> fake_edges; std::vector nodes_in_community; edata_t total_edge_weight; @@ -89,7 +90,7 @@ struct LouvainNodeState { template struct LouvainMessage { using vid_t = VID_T; - using edata_t = double; + using edata_t = float; vid_t community_id; edata_t community_sigma_total; @@ -102,7 +103,7 @@ struct LouvainMessage { // the community compress its member's data and make self a new vertex for // next phase. edata_t internal_weight = 0; - std::map edges; + std::vector> edges; std::vector nodes_in_self_community; LouvainMessage() @@ -123,7 +124,7 @@ struct LouvainMessage { ~LouvainMessage() = default; - // for message manager to serialize and diserialize LouvainMessage + // for message manager to serialize and deserialize LouvainMessage friend grape::InArchive& operator<<(grape::InArchive& in_archive, const LouvainMessage& u) { in_archive << u.community_id; @@ -151,7 +152,7 @@ struct LouvainMessage { }; /** - * Determine if progress is still being made or if the computaion should halt. + * Determine if progress is still being made or if the computation should halt. * * @param history change history of the pass. * @param min_progress The minimum delta X required to be considered progress. diff --git a/analytical_engine/apps/pregel/louvain/louvain.h b/analytical_engine/apps/pregel/louvain/louvain.h index a63755acedf6..fc29c11ccbed 100644 --- a/analytical_engine/apps/pregel/louvain/louvain.h +++ b/analytical_engine/apps/pregel/louvain/louvain.h @@ -63,12 +63,12 @@ class PregelLouvain using fragment_t = FRAG_T; using oid_t = typename fragment_t::oid_t; using vid_t = typename fragment_t::vid_t; - using edata_t = double; using vd_t = oid_t; using md_t = LouvainMessage; using compute_context_t = PregelComputeContext; using pregel_vertex_t = LouvainVertex; using state_t = LouvainNodeState; + using edata_t = typename pregel_vertex_t::edata_t; public: void Init(pregel_vertex_t& v, compute_context_t& context) override { @@ -130,7 +130,7 @@ class PregelLouvain // isolated node aggregate their quality value and exit computation on // step 1 grape::IteratorPair msgs(NULL, NULL); - double q = calculateActualQuality(v, context, msgs); + auto q = calculateActualQuality(v, context, msgs); v.context()->local_actual_quality()[v.tid()] += q; v.vote_to_halt(); return; @@ -142,7 +142,7 @@ class PregelLouvain state.changed = 0; // reset changed. if (v.context()->halt()) { // phase-1 halt, calculate current actual quality and return. - double q = calculateActualQuality(v, context, messages); + auto q = calculateActualQuality(v, context, messages); replaceNodeEdgesWithCommunityEdges(v, messages); v.context()->local_actual_quality()[v.tid()] += q; return; @@ -180,8 +180,8 @@ class PregelLouvain auto& state = v.state(); if (state.reset_total_edge_weight) { // we just aggregate the total edge weight in previous step. - state.total_edge_weight = context.template get_aggregated_value( - edge_weight_aggregator); + state.total_edge_weight = + context.template get_aggregated_value(edge_weight_aggregator); state.reset_total_edge_weight = false; } return state.total_edge_weight; @@ -240,8 +240,7 @@ class PregelLouvain community_map[community_id] = message; } } - - // calculate change in qulity for each potential community + // calculate change in quality for each potential community auto& state = vertex.state(); vid_t best_community_id = state.community; vid_t starting_community_id = best_community_id; @@ -273,6 +272,7 @@ class PregelLouvain state.community_sigma_total = c.community_sigma_total; state.changed = 1; // community changed. } + // send node weight to the community hub to sum in next super step md_t message(state.community, state.node_weight + state.internal_weight, 0, vertex.get_gid(), state.community); @@ -290,13 +290,15 @@ class PregelLouvain edata_t edge_weight_in_community, edata_t node_weight, edata_t internal_weight) { bool is_current_community = (curr_community_id == test_community_id); - edata_t m2 = getTotalEdgeWeight(context, v); + auto m2 = getTotalEdgeWeight(context, v); + edata_t k_i_in_L = is_current_community ? edge_weight_in_community + internal_weight : edge_weight_in_community; edata_t k_i_in = k_i_in_L; edata_t k_i = node_weight + internal_weight; edata_t sigma_tot = test_sigma_total; + if (is_current_community) { sigma_tot -= k_i; } @@ -306,6 +308,7 @@ class PregelLouvain double dividend = k_i * sigma_tot; delta_q = k_i_in - dividend / m2; } + return delta_q; } @@ -346,7 +349,7 @@ class PregelLouvain } k_i_in += vertex.get_edge_values(source_ids); edata_t sigma_tot = state.community_sigma_total; - edata_t m2 = getTotalEdgeWeight(context, vertex); + auto m2 = getTotalEdgeWeight(context, vertex); edata_t k_i = state.node_weight + state.internal_weight; double q = k_i_in / m2 - (sigma_tot * k_i) / pow(m2, 2); @@ -368,18 +371,18 @@ class PregelLouvain const auto& community_id = message.community_id; community_map[community_id] += message.edge_weight; } - - vertex.set_fake_edges(std::move(community_map)); + std::vector> edges(community_map.begin(), + community_map.end()); + vertex.set_fake_edges(std::move(edges)); } void sendCommunitiesInfo(pregel_vertex_t& vertex) { state_t& state = vertex.state(); md_t message; message.internal_weight = state.internal_weight; - std::map edges; assert((vertex.edge_size() == 0) || vertex.use_fake_edges()); - edges = vertex.fake_edges(); - message.edges = std::move(edges); + message.edges = std::move(vertex.move_fake_edges()); + vertex.set_fake_edges(std::vector>()); if (vertex.get_gid() != state.community) { message.nodes_in_self_community.swap(vertex.nodes_in_self_community()); } @@ -408,7 +411,9 @@ class PregelLouvain m.nodes_in_self_community.end()); } vertex.state().internal_weight = weight; - vertex.set_fake_edges(std::move(edge_map)); + std::vector> edges(edge_map.begin(), + edge_map.end()); + vertex.set_fake_edges(std::move(edges)); vertex.state().is_from_louvain_vertex_reader = false; // send self fake message to activate next round. diff --git a/analytical_engine/apps/pregel/louvain/louvain_app_base.h b/analytical_engine/apps/pregel/louvain/louvain_app_base.h index 0a3bbb7fa50d..7ae2da7e6060 100644 --- a/analytical_engine/apps/pregel/louvain/louvain_app_base.h +++ b/analytical_engine/apps/pregel/louvain/louvain_app_base.h @@ -61,7 +61,7 @@ class LouvainAppBase using context_t = LouvainContext; using message_manager_t = grape::ParallelMessageManager; using worker_t = grape::ParallelWorker; - + using edata_t = typename context_t::edata_t; virtual ~LouvainAppBase() {} static std::shared_ptr CreateWorker(std::shared_ptr app, @@ -198,7 +198,7 @@ class LouvainAppBase if (current_minor_step == phase_one_minor_step_1 && current_iteration > 0 && current_iteration % 2 == 0) { - // aggreate total change + // aggregate total change int64_t total_change = ctx.compute_context().template get_aggregated_value( change_aggregator); @@ -218,8 +218,8 @@ class LouvainAppBase << " total change: " << total_change; } else if (ctx.halt()) { // after decide_to_halt and aggregate actual quality in previous super - // step, here we check terminate computaion or start phase 2. - double actual_quality = + // step, here we check terminate computation or start phase 2. + auto actual_quality = ctx.compute_context().template get_aggregated_value( actual_quality_aggregator); // after one pass if already decided halt, that means the pass yield no @@ -273,6 +273,13 @@ class LouvainAppBase } else if (ctx.compute_context().superstep() == compress_community_step) { ctx.GetVertexState(v).is_alived_community = false; } + + if (!ctx.compute_context().active(v)) { + std::vector> tmp_edges; + ctx.GetVertexState(v).fake_edges.swap(tmp_edges); + std::vector tmp_nodes; + ctx.GetVertexState(v).nodes_in_community.swap(tmp_nodes); + } }); { diff --git a/analytical_engine/apps/pregel/louvain/louvain_context.h b/analytical_engine/apps/pregel/louvain/louvain_context.h index 445a30054cc1..45c02ce80bf7 100644 --- a/analytical_engine/apps/pregel/louvain/louvain_context.h +++ b/analytical_engine/apps/pregel/louvain/louvain_context.h @@ -37,13 +37,13 @@ class LouvainContext using fragment_t = FRAG_T; using oid_t = typename FRAG_T::oid_t; using vid_t = typename FRAG_T::vid_t; - using edata_t = double; using vertex_t = typename FRAG_T::vertex_t; using state_t = LouvainNodeState; using vertex_state_array_t = typename fragment_t::template vertex_array_t; public: + using edata_t = typename state_t::edata_t; explicit LouvainContext(const FRAG_T& fragment) : grape::VertexDataContext( fragment), @@ -94,8 +94,8 @@ class LouvainContext return sum; } - edata_t GetLocalEdgeWeightSum() { - edata_t sum = 0; + double GetLocalEdgeWeightSum() { + double sum = 0; for (const auto& val : local_total_edge_weight_) { sum += val; } @@ -118,7 +118,7 @@ class LouvainContext std::vector& local_change_num() { return local_change_num_; } - std::vector& local_total_edge_weight() { + std::vector& local_total_edge_weight() { return local_total_edge_weight_; } std::vector& local_actual_quality() { return local_actual_quality_; } @@ -137,9 +137,9 @@ class LouvainContext COMPUTE_CONTEXT_T compute_context_; vertex_state_array_t vertex_state_; - // members to store local aggrated value + // members to store local aggregated value std::vector local_change_num_; - std::vector local_total_edge_weight_; + std::vector local_total_edge_weight_; std::vector local_actual_quality_; bool halt_; // phase-1 halt diff --git a/analytical_engine/apps/pregel/louvain/louvain_vertex.h b/analytical_engine/apps/pregel/louvain/louvain_vertex.h index 93ed5f7f0795..8c84abc2502a 100644 --- a/analytical_engine/apps/pregel/louvain/louvain_vertex.h +++ b/analytical_engine/apps/pregel/louvain/louvain_vertex.h @@ -41,7 +41,6 @@ class LouvainVertex : public PregelVertex { using fragment_t = FRAG_T; using oid_t = typename fragment_t::oid_t; using vid_t = typename fragment_t::vid_t; - using edata_t = double; using vertex_t = typename fragment_t::vertex_t; using adj_list_t = typename fragment_t::const_adj_list_t; using compute_context_t = PregelComputeContext; @@ -49,6 +48,7 @@ class LouvainVertex : public PregelVertex { using state_t = LouvainNodeState; public: + using edata_t = typename context_t::edata_t; using vd_t = VD_T; using md_t = MD_T; @@ -78,10 +78,14 @@ class LouvainVertex : public PregelVertex { return context_->GetVertexState(this->vertex_).use_fake_edges; } - const std::map& fake_edges() const { + const std::vector>& fake_edges() const { return context_->GetVertexState(this->vertex_).fake_edges; } + std::vector>&& move_fake_edges() { + return std::move(context_->GetVertexState(this->vertex_).fake_edges); + } + edata_t get_edge_value(const vid_t& dst_id) { if (!this->use_fake_edges()) { for (auto& edge : this->incoming_edges()) { @@ -95,7 +99,17 @@ class LouvainVertex : public PregelVertex { } } } else { - return this->fake_edges().at(dst_id); + auto edges = this->fake_edges(); + auto iter = std::find_if(edges.begin(), edges.end(), + [dst_id](const std::pair& p) { + return p.first == dst_id; + }); + if (iter != edges.end()) { + return iter->second; + } else { + LOG(ERROR) << "Warning: Cannot find a edge from " << this->id() + << " to " << dst_id; + } } return edata_t(); } @@ -117,19 +131,24 @@ class LouvainVertex : public PregelVertex { } } else { auto edges = this->fake_edges(); - for (auto gid : dst_ids) { - if (edges.find(gid) != edges.end()) { - ret += edges.at(gid); - } else { - LOG(ERROR) << "Warning: Cannot find a edge from " << this->id() - << " to " << gid; + for (auto edge : edges) { + if (dst_ids.find(edge.first) != dst_ids.end()) { + ret += edge.second; } } + // for (auto gid : dst_ids) { + // if (edges.find(gid) != edges.end()) { + // ret += edges.at(gid); + // } else { + // LOG(ERROR) << "Warning: Cannot find a edge from " << this->id() + // << " to " << gid; + // } + // } } return ret; } - void set_fake_edges(std::map&& edges) { + void set_fake_edges(std::vector>&& edges) { state_t& ref_state = this->state(); ref_state.fake_edges = std::move(edges); ref_state.use_fake_edges = true; diff --git a/charts/graphscope-store/values.yaml b/charts/graphscope-store/values.yaml index 063a81baa478..00002c908384 100644 --- a/charts/graphscope-store/values.yaml +++ b/charts/graphscope-store/values.yaml @@ -504,9 +504,9 @@ engineType: "gaia" discoveryMode: "file" ## Ingestor Config -ingestorQueueBufferSize: 128 +ingestorQueueBufferSize: 1024 -ingestorSenderBufferSize: 128 +ingestorSenderBufferSize: 1024 ## Coordinator Config snapshotIncreaseIntervalMs: 1000 diff --git a/docs/index.rst b/docs/index.rst index 04a856c6ce27..a8db880c0c9c 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -23,6 +23,7 @@ and the vineyard store that offers efficient in-memory data transfers. overview/intro overview/getting_started + loading_graphs overview/graph_analytics_workloads overview/graph_interactive_workloads overview/graph_learning_workloads @@ -30,7 +31,6 @@ and the vineyard store that offers efficient in-memory data transfers. overview/positioning overview/glossary release_notes - loading_graphs frequently_asked_questions .. toctree:: diff --git a/docs/interactive_engine/neo4j/supported_cypher.md b/docs/interactive_engine/neo4j/supported_cypher.md index 21261ad9cb65..fc502a149d54 100644 --- a/docs/interactive_engine/neo4j/supported_cypher.md +++ b/docs/interactive_engine/neo4j/supported_cypher.md @@ -100,6 +100,7 @@ Note that some Aggregator operators, such as `max()`, we listed here are impleme | List | Fold expressions into a single list | [] | [] | | | | Labels | Get label name of a vertex type | labels() | labels() | | | | Type | Get label name of an edge type | type() | type() | | | +| Extract | Get interval value from a temporal type | \.\ | \.\ | | | diff --git a/docs/loading_graph.rst b/docs/loading_graphs.rst similarity index 100% rename from docs/loading_graph.rst rename to docs/loading_graphs.rst diff --git a/flex/codegen/src/graph_types.h b/flex/codegen/src/graph_types.h index 7f5a44525b34..60fbd1820df3 100644 --- a/flex/codegen/src/graph_types.h +++ b/flex/codegen/src/graph_types.h @@ -40,9 +40,9 @@ enum class DataType { kVertexId = 8, kEdgeId = 9, kLength = 10, - kTime = 11, - kDate = 12, - kDateTime = 13, + kDate = 11, + kTime = 12, + kTimeStamp = 13, kLabelId = 14 }; @@ -78,8 +78,12 @@ static codegen::DataType common_data_type_pb_2_data_type( return codegen::DataType::kInt32Array; case common::DataType::BOOLEAN: return codegen::DataType::kBoolean; - case common::DataType::DATE: + case common::DataType::DATE32: return codegen::DataType::kDate; + case common::DataType::TIME32: + return codegen::DataType::kTime; + case common::DataType::TIMESTAMP: + return codegen::DataType::kTimeStamp; default: // LOG(FATAL) << "unknown data type"; throw std::runtime_error( @@ -106,11 +110,10 @@ static std::string single_common_data_type_pb_2_str( return "std::vector"; case common::DataType::INT32_ARRAY: return "std::vector"; - case common::DataType::DATE: + case common::DataType::DATE32: return "Date"; + // TODO: support time32 and timestamp default: - // LOG(FATAL) << "unknown data type"; - // return ""; throw std::runtime_error( "unknown data type when convert common data type to string:" + std::to_string(static_cast(data_type))); diff --git a/flex/codegen/src/hqps/hqps_expr_builder.h b/flex/codegen/src/hqps/hqps_expr_builder.h index ea1570cd2b5b..b8ad4e408bed 100644 --- a/flex/codegen/src/hqps/hqps_expr_builder.h +++ b/flex/codegen/src/hqps/hqps_expr_builder.h @@ -111,7 +111,7 @@ static std::string logical_to_str(const common::Logical& logical) { case common::Logical::WITHIN: return "< WithIn > "; case common::Logical::ISNULL: - return "IsNull"; + return "NONE =="; // Convert default: throw std::runtime_error("unknown logical"); } @@ -284,10 +284,21 @@ class ExprBuilder { } if (j == size) { LOG(WARNING) << "no right brace found" << j << "size: " << size; - // just add true, since the current expresion has no other expr_oprs + // just add true, since the current expresion has no other expr_ops AddExprOpr(std::string("true")); i = j; } + } else if (expr.has_extract()) { + // special case for extract + auto extract = expr.extract(); + if (i + 1 >= size) { + throw std::runtime_error("extract must have a following var/expr"); + } else if (expr_ops[i + 1].item_case() != common::ExprOpr::kVar) { + throw std::runtime_error("extract must have a following var/expr"); + } else { + AddExtractOpr(extract, expr_ops[i + 1]); + i += 2; + } } else { AddExprOpr(expr_ops[i]); ++i; @@ -398,28 +409,8 @@ class ExprBuilder { } case common::ExprOpr::kExtract: { - auto extract = opr.extract(); - auto interval = extract.interval(); - auto data_time = extract.data_time(); - CHECK(data_time.operators_size() == 1) - << "Currently only support one var in extract"; - auto expr_opr = data_time.operators(0); - CHECK(expr_opr.item_case() == common::ExprOpr::kVar) - << "Currently only support var in extract"; - auto expr_var = expr_opr.var(); - auto param_const = variable_to_param_const(expr_var, ctx_); - make_var_name_unique(param_const); - func_call_vars_.push_back(param_const); - - boost::format formater(EXTRACT_TEMPLATE_STR); - formater % interval_to_str(interval) % param_const.var_name; - auto extract_node_str = formater.str(); - expr_nodes_.emplace_back(extract_node_str); - - tag_selectors_.emplace_back( - variable_to_tag_id_property_selector(ctx_, expr_var)); - VLOG(10) << "extract opr: " << extract_node_str; - break; + LOG(FATAL) << "Should not reach here, Extract op should be handled " + "separately"; } default: @@ -428,6 +419,28 @@ class ExprBuilder { } } + // Add extract operator with var. Currently not support extract on a compicate + // expression. + void AddExtractOpr(const common::Extract& extract_opr, + const common::ExprOpr& expr_opr) { + CHECK(expr_opr.item_case() == common::ExprOpr::kVar) + << "Currently only support var in extract"; + auto interval = extract_opr.interval(); + auto expr_var = expr_opr.var(); + auto param_const = variable_to_param_const(expr_var, ctx_); + make_var_name_unique(param_const); + func_call_vars_.push_back(param_const); + + boost::format formater(EXTRACT_TEMPLATE_STR); + formater % interval_to_str(interval) % param_const.var_name; + auto extract_node_str = formater.str(); + expr_nodes_.emplace_back(extract_node_str); + + tag_selectors_.emplace_back( + variable_to_tag_id_property_selector(ctx_, expr_var)); + VLOG(10) << "extract opr: " << extract_node_str; + } + // get expr nodes const std::vector& GetExprNodes() const { return expr_nodes_; } diff --git a/flex/engines/hqps_db/core/null_record.h b/flex/engines/hqps_db/core/null_record.h index 5784736c052f..371d4616982e 100644 --- a/flex/engines/hqps_db/core/null_record.h +++ b/flex/engines/hqps_db/core/null_record.h @@ -88,10 +88,20 @@ bool operator==(const T& lhs, const None& rhs) { return IsNull(lhs); } +template +bool operator==(const None& lhs, const T& rhs) { + return IsNull(rhs); +} + template bool operator!=(const T& lhs, const None& rhs) { return !IsNull(lhs); } + +template +bool operator!=(const None& lhs, const T& rhs) { + return !IsNull(rhs); +} } // namespace gs #endif // ENGINES_HQPS_ENGINE_NULL_RECORD_H_ \ No newline at end of file diff --git a/flex/engines/hqps_db/core/sync_engine.h b/flex/engines/hqps_db/core/sync_engine.h index 8975a9b7174b..2a21fde4d2d1 100644 --- a/flex/engines/hqps_db/core/sync_engine.h +++ b/flex/engines/hqps_db/core/sync_engine.h @@ -684,7 +684,7 @@ class SyncEngine : public BaseEngine { template < int... in_col_id, typename CTX_HEAD_T, int cur_alias, int base_tag, typename... CTX_PREV, typename EXPR, typename... SELECTOR, - typename std::enable_if<((sizeof...(in_col_id) > 1) && + typename std::enable_if<((sizeof...(in_col_id) >= 1) && (sizeof...(in_col_id) == sizeof...(SELECTOR)))>::type* = nullptr, typename RES_T = Context> diff --git a/flex/tests/hqps/match_query.h b/flex/tests/hqps/match_query.h index 6b424c99151d..e41c81a650d6 100644 --- a/flex/tests/hqps/match_query.h +++ b/flex/tests/hqps/match_query.h @@ -644,7 +644,6 @@ struct MatchQuery11Expr1 { private: }; -// Auto generated query class definition class MatchQuery11 : public HqpsAppBase { public: using Engine = SyncEngine; @@ -744,5 +743,63 @@ class MatchQuery11 : public HqpsAppBase { } }; +struct MatchQuery12Expr0 { + public: + using result_t = bool; + MatchQuery12Expr0() {} + + inline auto operator()(Date var0) const { + return gs::DateTimeExtractor::extract(var0) == 7; + } + + private: +}; + +struct MatchQuery12Expr1 { + public: + using result_t = int64_t; + MatchQuery12Expr1() {} + + inline auto operator()(Date var1) const { + return gs::DateTimeExtractor::extract(var1); + } + + private: +}; + +class MatchQuery12 : public HqpsAppBase { + public: + using Engine = SyncEngine; + using label_id_t = typename gs::MutableCSRInterface::label_id_t; + using vertex_id_t = typename gs::MutableCSRInterface::vertex_id_t; + // Query function for query class + results::CollectiveResults Query(const gs::MutableCSRInterface& graph) const { + auto ctx0 = Engine::template ScanVertex( + graph, 1, Filter()); + + auto ctx1 = Engine::Project( + graph, std::move(ctx0), + std::tuple{gs::make_mapper_with_variable( + gs::PropertySelector("birthday"))}); + auto expr0 = gs::make_filter(MatchQuery12Expr0(), + gs::PropertySelector("None")); + auto ctx2 = Engine::template Select(graph, std::move(ctx1), + std::move(expr0)); + + auto ctx3 = Engine::Project( + graph, std::move(ctx2), + std::tuple{gs::make_mapper_with_expr<0>( + MatchQuery12Expr1(), gs::PropertySelector("None"))}); + return Engine::Sink(ctx3, std::array{2}); + } + // Wrapper query function for query class + results::CollectiveResults Query(const gs::MutableCSRInterface& graph, + Decoder& decoder) const override { + // decoding params from decoder, and call real query func + + return Query(graph); + } +}; + } // namespace gs #endif // TESTS_HQPS_MATCH_QUERY_H_ \ No newline at end of file diff --git a/flex/tests/hqps/query_test.cc b/flex/tests/hqps/query_test.cc index f59c5a73e8d9..8351574c699e 100644 --- a/flex/tests/hqps/query_test.cc +++ b/flex/tests/hqps/query_test.cc @@ -211,7 +211,20 @@ int main(int argc, char** argv) { gs::MutableCSRInterface graph(sess); query.Query(graph, input); - LOG(INFO) << "Finish MatchQuery10 test"; + LOG(INFO) << "Finish MatchQuery11 test"; + } + + { + gs::MatchQuery12 query; + std::vector encoder_array; + gs::Encoder input_encoder(encoder_array); + std::vector output_array; + gs::Encoder output(output_array); + gs::Decoder input(encoder_array.data(), encoder_array.size()); + + gs::MutableCSRInterface graph(sess); + query.Query(graph, input); + LOG(INFO) << "Finish MatchQuery12 test"; } LOG(INFO) << "Finish context test."; diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/IngestorConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/IngestorConfig.java index 9a8b0f52418e..238b5c1f9771 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/IngestorConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/IngestorConfig.java @@ -15,10 +15,10 @@ public class IngestorConfig { public static final Config INGESTOR_QUEUE_BUFFER_MAX_COUNT = - Config.intConfig("ingsetor.queue.buffer.max.count", 128); + Config.intConfig("ingestor.queue.buffer.max.count", 1024); public static final Config INGESTOR_SENDER_BUFFER_MAX_COUNT = - Config.intConfig("ingestor.sender.buffer.max.count", 128); + Config.intConfig("ingestor.sender.buffer.max.count", 1024); public static final Config INGESTOR_SENDER_OPERATION_MAX_COUNT = Config.intConfig("ingestor.sender.operation.max.count", 8192); diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/StoreConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/StoreConfig.java index 6c364681eaf8..23f96dbb2e49 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/StoreConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/StoreConfig.java @@ -21,7 +21,7 @@ public class StoreConfig { Config.intConfig("store.write.thread.count", 1); public static final Config STORE_QUEUE_BUFFER_SIZE = - Config.intConfig("store.queue.buffer.size", 128); + Config.intConfig("store.queue.buffer.size", 1024); public static final Config STORE_QUEUE_WAIT_MS = Config.longConfig("store.queue.wait.ms", 3000L); diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/wrapper/DataType.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/wrapper/DataType.java index acd61616ed92..33d52ffa0431 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/wrapper/DataType.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/wrapper/DataType.java @@ -35,7 +35,9 @@ public enum DataType { DOUBLE_LIST(13), STRING_LIST(14), BYTES_LIST(15), - DATE(16); + DATE(16), // represent DATE32 + TIME32(17), + TIMESTAMP(18); private final byte type; diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/IrSchemaParser.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/IrSchemaParser.java index bdcedca48c1d..d3ef0202aeb9 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/IrSchemaParser.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/IrSchemaParser.java @@ -146,6 +146,10 @@ private int getDataTypeId(DataType dataType) { return 9; case DATE: return 12; + case TIME32: + return 13; + case TIMESTAMP: + return 14; default: throw new UnsupportedOperationException( "convert from DataType " + dataType + " to ir core is unsupported yet"); diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/GraphOptTable.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/GraphOptTable.java index d47111c41906..439fde816177 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/GraphOptTable.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/GraphOptTable.java @@ -125,8 +125,11 @@ private RelDataType deriveType(GraphProperty property) { case DOUBLE: return typeFactory.createSqlType(SqlTypeName.DOUBLE); case DATE: - return typeFactory.createSqlType( - SqlTypeName.DATE); // todo: support Time and DateTime in GraphSchema + return typeFactory.createSqlType(SqlTypeName.DATE); + case TIME32: + return typeFactory.createSqlType(SqlTypeName.TIME); + case TIMESTAMP: + return typeFactory.createSqlType(SqlTypeName.TIMESTAMP); default: throw new UnsupportedOperationException( "type " + property.getDataType().name() + " not supported"); diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/Utils.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/Utils.java index 07f3f7f8aac0..86868c7dd8e8 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/Utils.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/Utils.java @@ -161,15 +161,14 @@ public static DataType toDataType(Object type) { return DataType.STRING; case "DT_DATE32": return DataType.DATE; + case "DT_TIME32": + return DataType.TIME32; + case "TIMESTAMP": + return DataType.TIMESTAMP; default: throw new UnsupportedOperationException( "unsupported primitive type: " + value); } - } else if ((value = typeMap.get("date")) instanceof Map) { - Object format = ((Map) value).get("date_format"); - if (format != null && format.toString().equals("DF_YYYY_MM_DD")) { - return DataType.DATE; - } } } throw new UnsupportedOperationException("unsupported type: " + type); @@ -275,6 +274,10 @@ public static final DataType toDataType(int ordinal) { return DataType.STRING_LIST; case 12: return DataType.DATE; + case 13: + return DataType.TIME32; + case 14: + return DataType.TIMESTAMP; default: throw new UnsupportedOperationException( "convert from ir core type " + ordinal + " to DataType is unsupported yet"); diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/proto/RexToProtoConverter.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/proto/RexToProtoConverter.java index 3e57e4e77d3f..d290a6fd3fd3 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/proto/RexToProtoConverter.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/proto/RexToProtoConverter.java @@ -24,6 +24,7 @@ import com.alibaba.graphscope.gaia.proto.OuterExpression; import com.google.common.base.Preconditions; +import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.*; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlOperator; @@ -106,63 +107,81 @@ private OuterExpression.Expression visitExtract(RexCall call) { Preconditions.checkArgument( operands.size() == 2 && operands.get(0) instanceof RexLiteral, "'EXTRACT' operator has invalid operands " + operands); - OuterExpression.Expression.Builder exprBuilder = OuterExpression.Expression.newBuilder(); - exprBuilder.addOperators( + OuterExpression.Expression.Builder builder = OuterExpression.Expression.newBuilder(); + builder.addOperators( OuterExpression.ExprOpr.newBuilder() .setExtract( OuterExpression.Extract.newBuilder() .setInterval( - Utils.protoInterval((RexLiteral) operands.get(0))) - .setDataTime(operands.get(1).accept(this))) + Utils.protoInterval((RexLiteral) operands.get(0)))) .setNodeType(Utils.protoIrDataType(call.getType(), isColumnId))); - return exprBuilder.build(); + SqlOperator operator = call.getOperator(); + RexNode operand = operands.get(1); + boolean needBrace = needBrace(operator, operand); + if (needBrace) { + builder.addOperators( + OuterExpression.ExprOpr.newBuilder() + .setBrace(OuterExpression.ExprOpr.Brace.LEFT_BRACE)); + } + builder.addAllOperators(operand.accept(this).getOperatorsList()); + if (needBrace) { + builder.addOperators( + OuterExpression.ExprOpr.newBuilder() + .setBrace(OuterExpression.ExprOpr.Brace.RIGHT_BRACE)); + } + return builder.build(); } private OuterExpression.Expression visitUnaryOperator(RexCall call) { SqlOperator operator = call.getOperator(); RexNode operand = call.getOperands().get(0); switch (operator.getKind()) { - // convert IS_NULL to unary call: IS_NULL(XX) - case IS_NULL: - return visitIsNullOperator(operand); // convert IS_NOT_NULL to NOT(IS_NULL(XX)) case IS_NOT_NULL: return OuterExpression.Expression.newBuilder() - .addOperators(Utils.protoOperator(GraphStdOperatorTable.NOT)) + .addOperators( + Utils.protoOperator(GraphStdOperatorTable.NOT).toBuilder() + .setNodeType( + Utils.protoIrDataType(call.getType(), isColumnId))) .addOperators( OuterExpression.ExprOpr.newBuilder() .setBrace(OuterExpression.ExprOpr.Brace.LEFT_BRACE)) - .addAllOperators(visitIsNullOperator(operand).getOperatorsList()) + .addAllOperators( + visitUnaryOperator( + GraphStdOperatorTable.IS_NULL, + operand, + call.getType()) + .getOperatorsList()) .addOperators( OuterExpression.ExprOpr.newBuilder() .setBrace(OuterExpression.ExprOpr.Brace.RIGHT_BRACE)) .build(); + case IS_NULL: case NOT: default: - return OuterExpression.Expression.newBuilder() - .addOperators(Utils.protoOperator(operator)) - .addOperators( - OuterExpression.ExprOpr.newBuilder() - .setBrace(OuterExpression.ExprOpr.Brace.LEFT_BRACE)) - .addAllOperators(operand.accept(this).getOperatorsList()) - .addOperators( - OuterExpression.ExprOpr.newBuilder() - .setBrace(OuterExpression.ExprOpr.Brace.RIGHT_BRACE)) - .build(); + return visitUnaryOperator(operator, operand, call.getType()); } } - private OuterExpression.Expression visitIsNullOperator(RexNode operand) { - return OuterExpression.Expression.newBuilder() - .addOperators(Utils.protoOperator(GraphStdOperatorTable.IS_NULL)) - .addOperators( - OuterExpression.ExprOpr.newBuilder() - .setBrace(OuterExpression.ExprOpr.Brace.LEFT_BRACE)) - .addAllOperators(operand.accept(this).getOperatorsList()) - .addOperators( - OuterExpression.ExprOpr.newBuilder() - .setBrace(OuterExpression.ExprOpr.Brace.RIGHT_BRACE)) - .build(); + private OuterExpression.Expression visitUnaryOperator( + SqlOperator operator, RexNode operand, RelDataType dataType) { + OuterExpression.Expression.Builder builder = OuterExpression.Expression.newBuilder(); + builder.addOperators( + Utils.protoOperator(operator).toBuilder() + .setNodeType(Utils.protoIrDataType(dataType, isColumnId))); + boolean needBrace = needBrace(operator, operand); + if (needBrace) { + builder.addOperators( + OuterExpression.ExprOpr.newBuilder() + .setBrace(OuterExpression.ExprOpr.Brace.LEFT_BRACE)); + } + builder.addAllOperators(operand.accept(this).getOperatorsList()); + if (needBrace) { + builder.addOperators( + OuterExpression.ExprOpr.newBuilder() + .setBrace(OuterExpression.ExprOpr.Brace.RIGHT_BRACE)); + } + return builder.build(); } private OuterExpression.Expression visitBinaryOperator(RexCall call) { diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/proto/Utils.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/proto/Utils.java index 1245a82bb6d6..04e0dcb68b7a 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/proto/Utils.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/proto/Utils.java @@ -229,7 +229,11 @@ public static final Common.DataType protoBasicDataType(RelDataType basicType) { + " is unsupported yet"); } case DATE: - return Common.DataType.DATE; + return Common.DataType.DATE32; + case TIME: + return Common.DataType.TIME32; + case TIMESTAMP: + return Common.DataType.TIMESTAMP; default: throw new UnsupportedOperationException( "basic type " + basicType.getSqlTypeName() + " is unsupported yet"); diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/result/CypherRecordParser.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/result/CypherRecordParser.java index 7b394d2df623..6b437462f46e 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/result/CypherRecordParser.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/result/CypherRecordParser.java @@ -27,12 +27,12 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.sql.type.SqlTypeName; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.NotImplementedException; import org.checkerframework.checker.nullness.qual.Nullable; import org.neo4j.values.AnyValue; -import org.neo4j.values.storable.BooleanValue; -import org.neo4j.values.storable.Values; +import org.neo4j.values.storable.*; import org.neo4j.values.virtual.MapValue; import org.neo4j.values.virtual.NodeValue; import org.neo4j.values.virtual.RelationshipValue; @@ -40,6 +40,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -221,6 +222,22 @@ protected AnyValue parseValue(Common.Value value, @Nullable RelDataType dataType return Values.stringArray(value.getStrArray().getItemList().toArray(String[]::new)); case NONE: return Values.NO_VALUE; + case DATE: + Preconditions.checkArgument( + dataType.getSqlTypeName() == SqlTypeName.DATE, + "date32 value should have date type"); + return DateValue.epochDate(value.getDate().getItem()); + case TIME: + Preconditions.checkArgument( + dataType.getSqlTypeName() == SqlTypeName.TIME, + "time32 value should have time type"); + return TimeValue.time(value.getTime().getItem() * 1000_000L, ZoneOffset.UTC); + case TIMESTAMP: + Preconditions.checkArgument( + dataType.getSqlTypeName() == SqlTypeName.TIMESTAMP, + "timestamp value should have timestamp type"); + return DateTimeValue.ofEpochMillis( + Values.longValue(value.getTimestamp().getItem())); default: throw new NotImplementedException(value.getItemCase() + " is unsupported yet"); } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/suite/pattern/PatternQueryTest.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/suite/pattern/PatternQueryTest.java index 26fd2857cbfd..d6578865b045 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/suite/pattern/PatternQueryTest.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/suite/pattern/PatternQueryTest.java @@ -20,7 +20,6 @@ import com.alibaba.graphscope.gremlin.plugin.traversal.IrCustomizedTraversal; import org.apache.tinkerpop.gremlin.process.AbstractGremlinProcessTest; -import org.apache.tinkerpop.gremlin.process.traversal.P; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.structure.Vertex; import org.junit.Assert; @@ -63,21 +62,21 @@ public abstract class PatternQueryTest extends AbstractGremlinProcessTest { public void run_pattern_1_test() { Traversal traversal = this.get_pattern_1_test(); this.printTraversalForm(traversal); - Assert.assertEquals(155633L, traversal.next().longValue()); + Assert.assertEquals(240390L, traversal.next().longValue()); } @Test public void run_pattern_2_test() { Traversal traversal = this.get_pattern_2_test(); this.printTraversalForm(traversal); - Assert.assertEquals(55488L, traversal.next().longValue()); + Assert.assertEquals(118209L, traversal.next().longValue()); } @Test public void run_pattern_3_test() { Traversal traversal = this.get_pattern_3_test(); this.printTraversalForm(traversal); - Assert.assertEquals(568408L, traversal.next().longValue()); + Assert.assertEquals(1247146L, traversal.next().longValue()); } @Test @@ -91,7 +90,7 @@ public void run_pattern_4_test() { public void run_pattern_5_test() { Traversal traversal = this.get_pattern_5_test(); this.printTraversalForm(traversal); - Assert.assertEquals(16291L, traversal.next().longValue()); + Assert.assertEquals(5596L, traversal.next().longValue()); } @Test @@ -105,14 +104,14 @@ public void run_pattern_6_test() { public void run_pattern_7_test() { Traversal traversal = this.get_pattern_7_test(); this.printTraversalForm(traversal); - Assert.assertEquals(2944L, traversal.next().longValue()); + Assert.assertEquals(20858L, traversal.next().longValue()); } @Test public void run_pattern_8_test() { Traversal traversal = this.get_pattern_8_test(); this.printTraversalForm(traversal); - Assert.assertEquals(782347L, traversal.next().longValue()); + Assert.assertEquals(1247146L, traversal.next().longValue()); } @Test @@ -126,7 +125,7 @@ public void run_pattern_9_test() { public void run_pattern_10_test() { Traversal traversal = this.get_pattern_10_test(); this.printTraversalForm(traversal); - Assert.assertEquals(3019L, traversal.next().longValue()); + Assert.assertEquals(11547L, traversal.next().longValue()); } @Test @@ -154,7 +153,7 @@ public void run_pattern_13_test() { public void run_pattern_14_test() { Traversal traversal = this.get_pattern_14_test(); this.printTraversalForm(traversal); - Assert.assertEquals(55488L, traversal.next().longValue()); + Assert.assertEquals(118209L, traversal.next().longValue()); } @Test @@ -177,12 +176,7 @@ public static class Traversals extends PatternQueryTest { @Override public Traversal get_pattern_1_test() { return g.V().match( - __.as("a").out("KNOWS").as("b"), - __.as("b") - .outE("KNOWS") - .has("creationDate", P.gt(20120101000000000L)) - .inV() - .as("c")) + __.as("a").out("KNOWS").as("b"), __.as("b").outE("KNOWS").inV().as("c")) .count(); } @@ -193,7 +187,7 @@ public Traversal get_pattern_2_test() { __.as("a").out("KNOWS").as("b"), __.as("b") .hasLabel("PERSON") - .has("creationDate", P.gt(20120101000000000L)) + .has("gender", "male") .out("KNOWS") .as("c")) .count(); @@ -207,9 +201,8 @@ public Traversal get_pattern_3_test() { __.as("b").out("KNOWS").as("c"), __.as("c") .hasLabel("PERSON") - .has("creationDate", P.gt(20120101000000000L)) + .has("gender", "male") .outE("KNOWS") - .has("creationDate", P.gt(20120601000000000L)) .inV() .as("d")) .count(); @@ -229,21 +222,9 @@ public Traversal get_pattern_4_test() { @Override public Traversal get_pattern_5_test() { return g.V().match( - __.as("a") - .outE("KNOWS") - .has("creationDate", P.gt(20110101000000000L)) - .inV() - .as("b"), - __.as("b") - .outE("KNOWS") - .has("creationDate", P.gt(20110101000000000L)) - .inV() - .as("c"), - __.as("a") - .outE("KNOWS") - .has("creationDate", P.gt(20110101000000000L)) - .inV() - .as("c")) + __.as("a").has("gender", "male").out("KNOWS").as("b"), + __.as("b").has("gender", "female").out("KNOWS").as("c"), + __.as("a").out("KNOWS").as("c")) .count(); } @@ -264,19 +245,10 @@ public Traversal get_pattern_6_test() { public Traversal get_pattern_7_test() { return g.V().match( __.as("a").out("KNOWS").as("b"), - __.as("b") - .has("creationDate", P.gt(20120101000000000L)) - .outE("KNOWS") - .has("creationDate", P.gt(20120201000000000L)) - .inV() - .as("c"), + __.as("b").has("gender", "male").outE("KNOWS").inV().as("c"), __.as("a").out("KNOWS").as("c"), __.as("c").out("KNOWS").as("d"), - __.as("b") - .outE("KNOWS") - .has("creationDate", P.gt(20120301000000000L)) - .inV() - .as("d")) + __.as("b").outE("KNOWS").inV().as("d")) .count(); } @@ -290,7 +262,7 @@ public Traversal get_pattern_8_test() { ((IrCustomizedTraversal) __.as("c") .hasLabel("PERSON") - .has("creationDate", P.gt(20120101000000000L)) + .has("gender", "male") .out("1..2", "KNOWS")) .endV() .as("d")) @@ -316,10 +288,7 @@ public Traversal get_pattern_10_test() { .endV() .as("b"), ((IrCustomizedTraversal) - __.as("b") - .has("creationDate", P.gt(20120101000000000L)) - .outE("KNOWS") - .has("creationDate", P.gt(20120601000000000L))) + __.as("b").has("gender", "female").outE("KNOWS")) .inV() .as("c"), ((IrCustomizedTraversal) __.as("a").out("1..2", "KNOWS")) @@ -357,10 +326,7 @@ public Traversal get_pattern_13_test() { public Traversal get_pattern_14_test() { return g.V().match( __.as("a").out("KNOWS").as("b"), - __.as("b") - .hasLabel("PERSON") - .has("creationDate", P.gt(20120101000000000L)) - .as("b"), + __.as("b").hasLabel("PERSON").has("gender", "male").as("b"), __.as("b").out("KNOWS").as("c")) .count(); } diff --git a/interactive_engine/compiler/src/test/resources/config/modern/graph.yaml b/interactive_engine/compiler/src/test/resources/config/modern/graph.yaml index 23df4e758bde..7da7efa31252 100644 --- a/interactive_engine/compiler/src/test/resources/config/modern/graph.yaml +++ b/interactive_engine/compiler/src/test/resources/config/modern/graph.yaml @@ -46,8 +46,7 @@ schema: - property_id: 3 property_name: creationDate property_type: - date: - date_format: DF_YYYY_MM_DD + primitive_type: DT_DATE32 primary_keys: - id edge_types: diff --git a/interactive_engine/executor/common/dyn_type/src/object.rs b/interactive_engine/executor/common/dyn_type/src/object.rs index 51ca9d763f74..b15db7c25ef0 100644 --- a/interactive_engine/executor/common/dyn_type/src/object.rs +++ b/interactive_engine/executor/common/dyn_type/src/object.rs @@ -437,6 +437,53 @@ impl DateTimeFormats { } } + // the date32 is stored as YYYYMMDD, e.g., 20100102 + pub fn from_date32(date32: i32) -> Result { + NaiveDate::from_ymd_opt(date32 / 10000, ((date32 % 10000) / 100) as u32, (date32 % 100) as u32) + .map(|d| DateTimeFormats::Date(d)) + .ok_or(CastError::new::(RawType::Integer)) + } + + // the time32 is stored HHMMSSsss, e.g., 121314100 + pub fn from_time32(time32: i32) -> Result { + NaiveTime::from_hms_milli_opt( + (time32 / 10000000) as u32, + ((time32 % 10000000) / 100000) as u32, + ((time32 % 100000) / 1000) as u32, + (time32 % 1000) as u32, + ) + .map(|t| DateTimeFormats::Time(t)) + .ok_or(CastError::new::(RawType::Integer)) + } + + pub fn from_timestamp_millis(timestamp: i64) -> Result { + NaiveDateTime::from_timestamp_millis(timestamp) + .map(|dt| DateTimeFormats::DateTime(dt)) + .ok_or(CastError::new::(RawType::Long)) + } + + // we pre-assume some date/time/datetime formats according to ISO formats. + pub fn from_str(str: &str) -> Result { + // `1996-12-19` + if let Ok(date) = NaiveDate::parse_from_str(str, "%Y-%m-%d") { + Ok(DateTimeFormats::Date(date)) + } + // `16:39:57.123` + else if let Ok(time) = NaiveTime::parse_from_str(str, "%H:%M:%S%.f") { + Ok(DateTimeFormats::Time(time)) + } + // `1996-12-19 16:39:57.123` + else if let Ok(dt) = NaiveDateTime::parse_from_str(str, "%Y-%m-%d %H:%M:%S%.f") { + Ok(DateTimeFormats::DateTime(dt)) + } + // `1996-12-19T16:39:57.123+08:00` + else if let Ok(dt) = DateTime::parse_from_rfc3339(str) { + Ok(DateTimeFormats::DateTimeWithTz(dt)) + } else { + Err(CastError::new::(RawType::String)) + } + } + #[inline] pub fn as_date(&self) -> Result { match self { @@ -1526,6 +1573,12 @@ impl From> for Object { } } +impl From for Object { + fn from(date_time_formats: DateTimeFormats) -> Self { + Object::DateFormat(date_time_formats) + } +} + pub enum OwnedOrRef<'a, T> { Owned(T), Ref(&'a T), diff --git a/interactive_engine/executor/ir/common/Cargo.toml b/interactive_engine/executor/ir/common/Cargo.toml index 606c6fe31da6..bb20818ca0d0 100644 --- a/interactive_engine/executor/ir/common/Cargo.toml +++ b/interactive_engine/executor/ir/common/Cargo.toml @@ -9,6 +9,7 @@ pegasus_common = { path = "../../engine/pegasus/common" } prost = "0.11" serde = { version = "1.0", features = ["derive"] } tonic = "0.8" +chrono = "0.4" [build-dependencies] prost-build = "0.11" diff --git a/interactive_engine/executor/ir/common/src/utils.rs b/interactive_engine/executor/ir/common/src/utils.rs index b6e807331a7f..e8ab4cc53dd3 100644 --- a/interactive_engine/executor/ir/common/src/utils.rs +++ b/interactive_engine/executor/ir/common/src/utils.rs @@ -18,7 +18,8 @@ use std::convert::{TryFrom, TryInto}; use std::fmt; use std::ops::Deref; -use dyn_type::{Object, Primitives}; +use chrono::Timelike; +use dyn_type::{DateTimeFormats, Object, Primitives}; use crate::error::ParsePbError; use crate::generated::algebra as pb; @@ -329,6 +330,15 @@ impl TryFrom for Object { } Ok(vec.into()) } + Date(date) => { + Ok((DateTimeFormats::from_date32(date.item).map_err(|e| format!("{:?}", e))?).into()) + } + Time(time) => { + Ok((DateTimeFormats::from_time32(time.item).map_err(|e| format!("{:?}", e))?).into()) + } + Timestamp(timestamp) => Ok((DateTimeFormats::from_timestamp_millis(timestamp.item) + .map_err(|e| format!("{:?}", e))?) + .into()), }; } @@ -582,6 +592,28 @@ impl From for common_pb::Value { common_pb::value::Item::PairArray(common_pb::PairArray { item: pairs }) } Object::None => common_pb::value::Item::None(common_pb::None {}), + Object::DateFormat(datetime_formats) => match datetime_formats { + DateTimeFormats::Date(date) => common_pb::value::Item::Date(common_pb::Date32 { + // convert to days since from 1970-01-01 + item: (date + .and_hms_opt(0, 0, 0) + .unwrap() // can savely unwrap since it is valid hour/min/sec + .timestamp() + / 86400) as i32, + }), + DateTimeFormats::Time(time) => common_pb::value::Item::Time(common_pb::Time32 { + // convert to milliseconds past midnight + item: (time.hour() as i32 * 3600 + time.minute() as i32 * 60 + time.second() as i32) + * 1000 + + time.nanosecond() as i32 / 1000_000, + }), + DateTimeFormats::DateTime(dt) => { + common_pb::value::Item::Timestamp(common_pb::Timestamp { item: dt.timestamp_millis() }) + } + DateTimeFormats::DateTimeWithTz(dt) => { + common_pb::value::Item::Timestamp(common_pb::Timestamp { item: dt.timestamp_millis() }) + } + }, _ => unimplemented!(), }; diff --git a/interactive_engine/executor/ir/graph_proxy/Cargo.toml b/interactive_engine/executor/ir/graph_proxy/Cargo.toml index 160eaf987a97..bfcf4a0d55b3 100644 --- a/interactive_engine/executor/ir/graph_proxy/Cargo.toml +++ b/interactive_engine/executor/ir/graph_proxy/Cargo.toml @@ -17,6 +17,7 @@ pegasus = { path = "../../engine/pegasus/pegasus" } pegasus_common = { path = "../../engine/pegasus/common" } ahash = "0.8" rand = "0.8.5" +chrono = "0.4" regex = "1.10" [features] diff --git a/interactive_engine/executor/ir/graph_proxy/src/adapters/csr_store/read_graph.rs b/interactive_engine/executor/ir/graph_proxy/src/adapters/csr_store/read_graph.rs index c638bf93d964..217095f85148 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/adapters/csr_store/read_graph.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/adapters/csr_store/read_graph.rs @@ -18,7 +18,7 @@ use std::fmt; use std::sync::Arc; use ahash::HashMap; -use dyn_type::{BorrowObject, Object, Primitives}; +use dyn_type::{BorrowObject, DateTimeFormats, Object, Primitives}; use ir_common::{KeyId, LabelId, NameOrId}; use mcsr::columns::RefItem; use mcsr::date::Date; @@ -296,7 +296,7 @@ impl Details for LazyVertexDetails { } else { self.inner .get_property(key) - .map(|prop| PropertyValue::Borrowed(to_borrow_object(prop))) + .map(|prop| to_property_value(prop)) } } else { info!("Have not support getting property by prop_id in exp_store yet"); @@ -360,7 +360,7 @@ impl Details for LazyEdgeDetails { if let NameOrId::Str(key) = key { self.inner .get_property(key) - .map(|prop| PropertyValue::Borrowed(to_borrow_object(prop))) + .map(|prop| to_property_value(prop)) } else { info!("Have not support getting property by prop_id in experiments store yet"); None @@ -392,44 +392,62 @@ fn to_object<'a>(ref_item: RefItem<'a>) -> Object { RefItem::Int64(v) => Object::Primitive(Primitives::Long(*v)), RefItem::UInt64(v) => Object::Primitive(Primitives::Long(i64::try_from(*v).unwrap())), RefItem::Double(v) => Object::Primitive(Primitives::Float(*v)), - RefItem::Date(v) => Object::Primitive(Primitives::Long(encode_date(v))), - RefItem::DateTime(v) => Object::Primitive(Primitives::Long(encode_datetime(v))), + RefItem::Date(v) => { + if let Some(date) = encode_date(v) { + Object::DateFormat(DateTimeFormats::Date(date)) + } else { + Object::None + } + } + RefItem::DateTime(v) => { + if let Some(date_time) = encode_datetime(v) { + Object::DateFormat(DateTimeFormats::DateTime(date_time)) + } else { + Object::None + } + } RefItem::String(v) => Object::String(v.clone()), _ => Object::None, } } #[inline] -fn to_borrow_object<'a>(ref_item: RefItem<'a>) -> BorrowObject<'a> { +fn to_property_value<'a>(ref_item: RefItem<'a>) -> PropertyValue { match ref_item { - RefItem::Int32(v) => BorrowObject::Primitive(Primitives::Integer(*v)), - RefItem::UInt32(v) => BorrowObject::Primitive(Primitives::Integer(i32::try_from(*v).unwrap())), - RefItem::Int64(v) => BorrowObject::Primitive(Primitives::Long(*v)), - RefItem::UInt64(v) => BorrowObject::Primitive(Primitives::Long(i64::try_from(*v).unwrap())), - RefItem::Double(v) => BorrowObject::Primitive(Primitives::Float(*v)), - RefItem::Date(v) => BorrowObject::Primitive(Primitives::Long(encode_date(v))), - RefItem::DateTime(v) => BorrowObject::Primitive(Primitives::Long(encode_datetime(v))), - RefItem::String(v) => BorrowObject::String(v), - _ => BorrowObject::None, + RefItem::Int32(v) => BorrowObject::Primitive(Primitives::Integer(*v)).into(), + RefItem::UInt32(v) => { + BorrowObject::Primitive(Primitives::Integer(i32::try_from(*v).unwrap())).into() + } + RefItem::Int64(v) => BorrowObject::Primitive(Primitives::Long(*v)).into(), + RefItem::UInt64(v) => BorrowObject::Primitive(Primitives::Long(i64::try_from(*v).unwrap())).into(), + RefItem::Double(v) => BorrowObject::Primitive(Primitives::Float(*v)).into(), + RefItem::Date(v) => { + if let Some(date) = encode_date(v) { + Object::DateFormat(DateTimeFormats::Date(date)).into() + } else { + BorrowObject::None.into() + } + } + RefItem::DateTime(v) => { + if let Some(date_time) = encode_datetime(v) { + Object::DateFormat(DateTimeFormats::DateTime(date_time)).into() + } else { + BorrowObject::None.into() + } + } + RefItem::String(v) => BorrowObject::String(v).into(), + _ => BorrowObject::None.into(), } } #[inline] -fn encode_date(date: &Date) -> i64 { - date.year() as i64 * 10000000000000 - + date.month() as i64 * 100000000000 - + date.day() as i64 * 1000000000 +fn encode_date(date: &Date) -> Option { + chrono::NaiveDate::from_ymd_opt(date.year(), date.month(), date.day()) } #[inline] -fn encode_datetime(datetime: &DateTime) -> i64 { - datetime.year() as i64 * 10000000000000 - + datetime.month() as i64 * 100000000000 - + datetime.day() as i64 * 1000000000 - + datetime.hour() as i64 * 10000000 - + datetime.minute() as i64 * 100000 - + datetime.second() as i64 * 1000 - + datetime.millisecond() as i64 +fn encode_datetime(datetime: &DateTime) -> Option { + chrono::NaiveDateTime::from_timestamp_millis(datetime.to_i64()) } #[inline] diff --git a/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/property.rs b/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/property.rs index 5df5cb4fcf8a..715dbb10a53b 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/property.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/property.rs @@ -114,6 +114,18 @@ impl<'a> PropertyValue<'a> { } } +impl<'a> From> for PropertyValue<'a> { + fn from(value: BorrowObject<'a>) -> Self { + PropertyValue::Borrowed(value) + } +} + +impl<'a> From for PropertyValue<'a> { + fn from(value: Object) -> Self { + PropertyValue::Owned(value) + } +} + pub trait Details: std::fmt::Debug + Send + Sync + AsAny { /// Get a property with given key fn get_property(&self, key: &NameOrId) -> Option; diff --git a/interactive_engine/executor/ir/graph_proxy/src/utils/expr/eval.rs b/interactive_engine/executor/ir/graph_proxy/src/utils/expr/eval.rs index 141875e22a3a..f33f351f7355 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/utils/expr/eval.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/utils/expr/eval.rs @@ -54,11 +54,17 @@ pub enum Operand { VarMap(Vec), } +#[derive(Debug, Clone)] +pub enum Function { + Extract(common_pb::extract::Interval), +} + /// An inner representation of `common_pb::ExprOpr` for one-shot translation of `common_pb::ExprOpr`. #[derive(Debug, Clone)] pub(crate) enum InnerOpr { Logical(common_pb::Logical), Arith(common_pb::Arithmetic), + Function(Function), Operand(Operand), } @@ -68,6 +74,7 @@ impl ToString for InnerOpr { InnerOpr::Logical(logical) => format!("{:?}", logical), InnerOpr::Arith(arith) => format!("{:?}", arith), InnerOpr::Operand(item) => format!("{:?}", item), + InnerOpr::Function(func) => format!("{:?}", func), } } } @@ -160,6 +167,52 @@ fn apply_arith<'a>( }) } +pub(crate) fn apply_function<'a>( + function: &Function, a: BorrowObject<'a>, _b_opt: Option>, +) -> ExprEvalResult { + use common_pb::extract::Interval; + match function { + Function::Extract(interval) => match interval { + Interval::Year => Ok(a + .as_date_format()? + .year() + .ok_or(ExprEvalError::GetNoneFromContext)? + .into()), + Interval::Month => Ok((a + .as_date_format()? + .month() + .ok_or(ExprEvalError::GetNoneFromContext)? as i32) + .into()), + Interval::Day => Ok((a + .as_date_format()? + .day() + .ok_or(ExprEvalError::GetNoneFromContext)? as i32) + .into()), + Interval::Hour => Ok((a + .as_date_format()? + .hour() + .ok_or(ExprEvalError::GetNoneFromContext)? as i32) + .into()), + Interval::Minute => Ok((a + .as_date_format()? + .minute() + .ok_or(ExprEvalError::GetNoneFromContext)? as i32) + .into()), + Interval::Second => Ok((a + .as_date_format()? + .second() + .ok_or(ExprEvalError::GetNoneFromContext)? as i32) + .into()), + Interval::Millisecond => Ok((a + .as_date_format()? + .millisecond() + .ok_or(ExprEvalError::GetNoneFromContext)? + as i32) + .into()), + }, + } +} + pub(crate) fn apply_logical<'a>( logical: &common_pb::Logical, a: BorrowObject<'a>, b_opt: Option>, ) -> ExprEvalResult { @@ -224,14 +277,17 @@ impl Evaluator { let first = _first.unwrap(); let second = _second.unwrap(); if let InnerOpr::Logical(logical) = second { - let first = match first.eval(context) { - Ok(first) => Ok(first), - Err(err) => match err { - ExprEvalError::GetNoneFromContext => Ok(Object::None), - _ => Err(err), - }, - }; + let mut first = first.eval(context); + if common_pb::Logical::Isnull.eq(logical) { + match first { + Err(ExprEvalError::GetNoneFromContext) => first = Ok(Object::None), + _ => {} + } + } Ok(apply_logical(logical, first?.as_borrow(), None)?) + } else if let InnerOpr::Function(function) = second { + let first = first.eval(context)?; + Ok(apply_function(function, first.as_borrow(), None)?) } else { if !second.is_operand() { Err(ExprEvalError::MissingOperands(second.into())) @@ -244,8 +300,8 @@ impl Evaluator { let second = _second.unwrap(); let third = _third.unwrap(); if let InnerOpr::Logical(logical) = third { - // to deal with two unary operators cases, e.g., !(!true), !(a isNull) etc. - if common_pb::Logical::Not.eq(logical) || common_pb::Logical::Isnull.eq(logical) { + if third.is_unary() { + // to deal with two unary operators cases, e.g., !(!true), !(isNull(a)),isNull(extract(a)) etc. if let InnerOpr::Logical(inner_logical) = second { let mut inner_first = first.eval(context); if common_pb::Logical::Isnull.eq(inner_logical) { @@ -262,11 +318,17 @@ impl Evaluator { } } return Ok(apply_logical(logical, first?.as_borrow(), None)?); + } else if let InnerOpr::Function(function) = second { + let inner_first = first.eval(context)?; + return Ok(apply_function(function, inner_first.as_borrow(), None)?); + } else { + return Err(ExprEvalError::OtherErr("invalid expression".to_string())); } + } else { + let a = first.eval(context)?; + let b = second.eval(context)?; + Ok(apply_logical(logical, a.as_borrow(), Some(b.as_borrow()))?) } - let a = first.eval(context)?; - let b = second.eval(context)?; - Ok(apply_logical(logical, a.as_borrow(), Some(b.as_borrow()))?) } else if let InnerOpr::Arith(arith) = third { let a = first.eval(context)?; let b = second.eval(context)?; @@ -364,7 +426,17 @@ impl Evaluate for Evaluator { } } } - + InnerOpr::Function(function) => { + if opr.is_unary() { + apply_function(function, first?.as_borrow(), None) + } else { + if let Some(second) = stack.pop() { + apply_function(function, second?.as_borrow(), Some(first?.as_borrow())) + } else { + Err(ExprEvalError::OtherErr("invalid expression".to_string())) + } + } + } InnerOpr::Arith(arith) => { if let Some(second) = stack.pop() { apply_arith(arith, second?.as_borrow(), first?.as_borrow()) @@ -488,6 +560,9 @@ impl TryFrom for InnerOpr { Arith(arith) => { Ok(Self::Arith(unsafe { std::mem::transmute::<_, common_pb::Arithmetic>(*arith) })) } + Extract(extract) => Ok(Self::Function(Function::Extract(unsafe { + std::mem::transmute::<_, common_pb::extract::Interval>(extract.interval) + }))), _ => Ok(Self::Operand(unit.clone().try_into()?)), } } else { @@ -620,11 +695,25 @@ impl InnerOpr { _ => false, } } + + pub fn is_unary(&self) -> bool { + match self { + InnerOpr::Logical(logical) => match logical { + common_pb::Logical::Not | common_pb::Logical::Isnull => true, + _ => false, + }, + InnerOpr::Function(function) => match function { + Function::Extract(_) => true, + }, + _ => false, + } + } } #[cfg(test)] mod tests { use ahash::HashMap; + use dyn_type::DateTimeFormats; use ir_common::expr_parse::str_to_expr_pb; use super::*; @@ -1000,6 +1089,155 @@ mod tests { } } + fn prepare_context_with_date() -> Vertices { + let map1: HashMap = vec![ + ( + NameOrId::from("date1".to_string()), + (DateTimeFormats::from_str("2020-08-08").unwrap()).into(), + ), + ( + NameOrId::from("date2".to_string()), + chrono::NaiveDate::from_ymd_opt(2020, 8, 8) + .unwrap() + .into(), + ), + ( + NameOrId::from("time1".to_string()), + (DateTimeFormats::from_str("10:11:12.100").unwrap()).into(), + ), + ( + NameOrId::from("time2".to_string()), + chrono::NaiveTime::from_hms_milli_opt(10, 11, 12, 100) + .unwrap() + .into(), + ), + ( + NameOrId::from("datetime1".to_string()), + (DateTimeFormats::from_str("2020-08-08T23:11:12.100-11:00").unwrap()).into(), + ), + ( + NameOrId::from("datetime2".to_string()), + (DateTimeFormats::from_str("2020-08-09 10:11:12.100").unwrap()).into(), + ), + ( + NameOrId::from("datetime3".to_string()), + chrono::NaiveDateTime::from_timestamp_millis(1602324610100) + .unwrap() + .into(), + ), // 2020-10-10 10:10:10 + ] + .into_iter() + .collect(); + Vertices { vec: vec![Vertex::new(1, Some(9.into()), DynDetails::new(map1))] } + } + + fn prepare_extract(expr_str: &str, interval: common_pb::extract::Interval) -> common_pb::Expression { + let mut operators = str_to_expr_pb(expr_str.to_string()) + .unwrap() + .operators; + let extract_opr = common_pb::ExprOpr { + node_type: None, + item: Some(common_pb::expr_opr::Item::Extract(common_pb::Extract { + interval: interval as i32, + })), + }; + operators.push(extract_opr); + common_pb::Expression { operators } + } + + #[test] + fn test_eval_extract() { + let ctxt = prepare_context_with_date(); + let cases = vec![ + // date1: "2020-08-08" + prepare_extract("@0.date1", common_pb::extract::Interval::Year), + prepare_extract("@0.date1", common_pb::extract::Interval::Month), + prepare_extract("@0.date1", common_pb::extract::Interval::Day), + // date2: 20200808 + prepare_extract("@0.date2", common_pb::extract::Interval::Year), + prepare_extract("@0.date2", common_pb::extract::Interval::Month), + prepare_extract("@0.date2", common_pb::extract::Interval::Day), + // time1: "10:11:12.100" + prepare_extract("@0.time1", common_pb::extract::Interval::Hour), + prepare_extract("@0.time1", common_pb::extract::Interval::Minute), + prepare_extract("@0.time1", common_pb::extract::Interval::Second), + prepare_extract("@0.time1", common_pb::extract::Interval::Millisecond), + // time2: 101112100 + prepare_extract("@0.time2", common_pb::extract::Interval::Hour), + prepare_extract("@0.time2", common_pb::extract::Interval::Minute), + prepare_extract("@0.time2", common_pb::extract::Interval::Second), + prepare_extract("@0.time2", common_pb::extract::Interval::Millisecond), + // datetime1: "2020-08-08T23:11:12.100-11:00" + prepare_extract("@0.datetime1", common_pb::extract::Interval::Year), + prepare_extract("@0.datetime1", common_pb::extract::Interval::Month), + prepare_extract("@0.datetime1", common_pb::extract::Interval::Day), + prepare_extract("@0.datetime1", common_pb::extract::Interval::Hour), + prepare_extract("@0.datetime1", common_pb::extract::Interval::Minute), + prepare_extract("@0.datetime1", common_pb::extract::Interval::Second), + prepare_extract("@0.datetime1", common_pb::extract::Interval::Millisecond), + // datetime2: "2020-08-09 10:11:12.100" + prepare_extract("@0.datetime2", common_pb::extract::Interval::Year), + prepare_extract("@0.datetime2", common_pb::extract::Interval::Month), + prepare_extract("@0.datetime2", common_pb::extract::Interval::Day), + prepare_extract("@0.datetime2", common_pb::extract::Interval::Hour), + prepare_extract("@0.datetime2", common_pb::extract::Interval::Minute), + prepare_extract("@0.datetime2", common_pb::extract::Interval::Second), + prepare_extract("@0.datetime2", common_pb::extract::Interval::Millisecond), + // datetime3: 1602324610100, i.e., 2020-10-10 10:10:10 + prepare_extract("@0.datetime3", common_pb::extract::Interval::Year), + prepare_extract("@0.datetime3", common_pb::extract::Interval::Month), + prepare_extract("@0.datetime3", common_pb::extract::Interval::Day), + prepare_extract("@0.datetime3", common_pb::extract::Interval::Hour), + prepare_extract("@0.datetime3", common_pb::extract::Interval::Minute), + prepare_extract("@0.datetime3", common_pb::extract::Interval::Second), + prepare_extract("@0.datetime3", common_pb::extract::Interval::Millisecond), + ]; + + let expected = vec![ + object!(2020), + object!(8), + object!(8), + object!(2020), + object!(8), + object!(8), + object!(10), + object!(11), + object!(12), + object!(100), + object!(10), + object!(11), + object!(12), + object!(100), + object!(2020), + object!(8), + object!(8), + object!(23), + object!(11), + object!(12), + object!(100), + object!(2020), + object!(8), + object!(9), + object!(10), + object!(11), + object!(12), + object!(100), + object!(2020), + object!(10), + object!(10), + object!(10), + object!(10), + object!(10), + object!(100), + ]; + + for (case, expected) in cases.into_iter().zip(expected.into_iter()) { + let eval = Evaluator::try_from(case).unwrap(); + println!("{:?}", eval.eval::<_, Vertices>(Some(&ctxt)).unwrap()); + assert_eq!(eval.eval::<_, Vertices>(Some(&ctxt)).unwrap(), expected); + } + } + fn gen_regex_expression(to_match: &str, pattern: &str) -> common_pb::Expression { let mut regex_expr = common_pb::Expression { operators: vec![] }; let left = common_pb::ExprOpr { diff --git a/interactive_engine/executor/ir/proto/common.proto b/interactive_engine/executor/ir/proto/common.proto index 1c87785c8a53..034c6581f74a 100644 --- a/interactive_engine/executor/ir/proto/common.proto +++ b/interactive_engine/executor/ir/proto/common.proto @@ -55,6 +55,21 @@ message NameOrId { } } +message Date32 { + // int32 days since 1970-01-01 + int32 item = 1; +} + +message Time32 { + // int32 milliseconds past midnight + int32 item = 1; +} + +message Timestamp { + // int64 milliseconds since 1970-01-01 00:00:00.000000 (in an unspecified timezone) + int64 item = 1; +} + message Value { oneof item { bool boolean = 2; @@ -69,6 +84,9 @@ message Value { StringArray str_array = 11; PairArray pair_array = 12; None none = 13; + Date32 date = 14; + Time32 time = 15; + Timestamp timestamp = 16; } } @@ -85,5 +103,7 @@ enum DataType { STRING_ARRAY = 9; PAIR_ARRAY = 10; NONE = 11; - DATE = 12; // todo: define `DATE` as message structure which contains date_format + DATE32 = 12; + TIME32 = 13; + TIMESTAMP = 14; } diff --git a/interactive_engine/executor/ir/proto/expr.proto b/interactive_engine/executor/ir/proto/expr.proto index 282149c8a058..022d1daffadb 100644 --- a/interactive_engine/executor/ir/proto/expr.proto +++ b/interactive_engine/executor/ir/proto/expr.proto @@ -151,10 +151,9 @@ message Extract { HOUR = 3; MINUTE = 4; SECOND = 5; + MILLISECOND = 6; } - Interval interval = 1; - Expression data_time = 2; } // An operator of expression is one of Logical, Arithmetic, Const and Variable. diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/map/project.rs b/interactive_engine/executor/ir/runtime/src/process/operator/map/project.rs index 8fb38c81cad1..d84ab27a0138 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/map/project.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/map/project.rs @@ -170,7 +170,7 @@ mod tests { use dyn_type::Object; use graph_proxy::apis::{DynDetails, GraphElement, Vertex}; use ir_common::expr_parse::str_to_expr_pb; - use ir_common::generated::physical as pb; + use ir_common::generated::{common as common_pb, physical as pb}; use ir_common::NameOrId; use pegasus::api::{Map, Sink}; use pegasus::result::ResultStream; @@ -181,7 +181,8 @@ mod tests { use crate::process::operator::map::FilterMapFuncGen; use crate::process::operator::tests::{ init_source, init_source_with_multi_tags, init_source_with_tag, init_vertex1, init_vertex2, - to_expr_var_pb, to_expr_vars_pb, PERSON_LABEL, TAG_A, TAG_B, TAG_C, TAG_D, TAG_E, + to_expr_var_pb, to_expr_vars_pb, to_var_pb, PERSON_LABEL, TAG_A, TAG_B, TAG_C, TAG_D, TAG_E, TAG_F, + TAG_G, }; use crate::process::record::Record; @@ -815,4 +816,112 @@ mod tests { assert!(true) } } + + #[test] + fn project_extract_from_date_test() { + // 2010-01-02 + let date_obj = Object::DateFormat(dyn_type::DateTimeFormats::from_date32(20100102).unwrap()); + // 12:34:56.100 + let time_obj = Object::DateFormat(dyn_type::DateTimeFormats::from_time32(123456100).unwrap()); + // 2020-10-10 10:10:10 + let datetime_obj = + Object::DateFormat(dyn_type::DateTimeFormats::from_timestamp_millis(1602324610100).unwrap()); + let mut r1 = Record::new(date_obj.clone(), Some(TAG_A.into())); + r1.append(time_obj, Some(TAG_B.into())); + r1.append(datetime_obj, Some(TAG_C.into())); + + let extract_date_year_opr = common_pb::ExprOpr { + node_type: None, + item: Some(common_pb::expr_opr::Item::Extract(common_pb::Extract { + interval: common_pb::extract::Interval::Year as i32, + })), + }; + + let extract_time_hour_opr = common_pb::ExprOpr { + node_type: None, + item: Some(common_pb::expr_opr::Item::Extract(common_pb::Extract { + interval: common_pb::extract::Interval::Hour as i32, + })), + }; + + let extract_datetime_month_opr = common_pb::ExprOpr { + node_type: None, + item: Some(common_pb::expr_opr::Item::Extract(common_pb::Extract { + interval: common_pb::extract::Interval::Month as i32, + })), + }; + + let extract_datetime_minute_opr = common_pb::ExprOpr { + node_type: None, + item: Some(common_pb::expr_opr::Item::Extract(common_pb::Extract { + interval: common_pb::extract::Interval::Minute as i32, + })), + }; + + let tag_a_opr = common_pb::ExprOpr { + node_type: None, + item: Some(common_pb::expr_opr::Item::Var(to_var_pb(Some(TAG_A.into()), None))), + }; + let tag_b_opr = common_pb::ExprOpr { + node_type: None, + item: Some(common_pb::expr_opr::Item::Var(to_var_pb(Some(TAG_B.into()), None))), + }; + let tag_c_opr = common_pb::ExprOpr { + node_type: None, + item: Some(common_pb::expr_opr::Item::Var(to_var_pb(Some(TAG_C.into()), None))), + }; + + let expr1 = common_pb::Expression { operators: vec![extract_date_year_opr, tag_a_opr] }; + + let expr2 = common_pb::Expression { operators: vec![extract_time_hour_opr, tag_b_opr] }; + + let expr3 = + common_pb::Expression { operators: vec![extract_datetime_month_opr, tag_c_opr.clone()] }; + + let expr4 = + common_pb::Expression { operators: vec![extract_datetime_minute_opr, tag_c_opr.clone()] }; + + let source = vec![r1]; + // To project: year of 2010-01-02, hour of 12:34:56.100, month of 2020-10-10 10:10:10, and minute of 2020-10-10 10:10:10 + let project_opr_pb = pb::Project { + mappings: vec![ + pb::project::ExprAlias { expr: Some(expr1), alias: Some(TAG_D.into()) }, + pb::project::ExprAlias { expr: Some(expr2), alias: Some(TAG_E.into()) }, + pb::project::ExprAlias { expr: Some(expr3), alias: Some(TAG_F.into()) }, + pb::project::ExprAlias { expr: Some(expr4), alias: Some(TAG_G.into()) }, + ], + is_append: true, + }; + + let mut result = project_test(source, project_opr_pb); + let expected_results = vec![object!(2010), object!(12), object!(10), object!(10)]; + let mut results = vec![]; + while let Some(Ok(res)) = result.next() { + let year = res + .get(Some(TAG_D)) + .unwrap() + .as_object() + .unwrap(); + let hour = res + .get(Some(TAG_E)) + .unwrap() + .as_object() + .unwrap(); + let month = res + .get(Some(TAG_F)) + .unwrap() + .as_object() + .unwrap(); + let minute = res + .get(Some(TAG_G)) + .unwrap() + .as_object() + .unwrap(); + results.push(year.clone()); + results.push(hour.clone()); + results.push(month.clone()); + results.push(minute.clone()); + } + assert_eq!(results, expected_results); + } } diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/mod.rs b/interactive_engine/executor/ir/runtime/src/process/operator/mod.rs index 50486014d7d7..7a4496e0697e 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/mod.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/mod.rs @@ -187,6 +187,8 @@ pub(crate) mod tests { pub const TAG_C: KeyId = 2; pub const TAG_D: KeyId = 3; pub const TAG_E: KeyId = 4; + pub const TAG_F: KeyId = 5; + pub const TAG_G: KeyId = 6; pub const PERSON_LABEL: LabelId = 0; diff --git a/interactive_engine/executor/ir/runtime/src/router.rs b/interactive_engine/executor/ir/runtime/src/router.rs index c7a05cb71f93..be1ff3c83345 100644 --- a/interactive_engine/executor/ir/runtime/src/router.rs +++ b/interactive_engine/executor/ir/runtime/src/router.rs @@ -57,11 +57,10 @@ impl Router for DefaultRouter { type C = C; fn route(&self, data: PartitionKeyId) -> GraphProxyResult { let partition_id = self.partition_info.get_partition_id(&data)?; - debug!("route partition id: {:?}", partition_id); let server_id = self .partition_info .get_server_id(partition_id)?; - debug!("route server id: {:?}", partition_id); + trace!("route partition id {:?}, server id: {:?}", partition_id, server_id); let servers_num = self.cluster_info.get_server_num()?; let magic_num = (data as u32) / servers_num; let workers_num = self.cluster_info.get_local_worker_num()?; diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/BatchSender.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/BatchSender.java index ddcc86fe90e6..12d95b1b0f3a 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/BatchSender.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/BatchSender.java @@ -36,15 +36,14 @@ public class BatchSender implements MetricsAgent { public static final String SEND_RECORDS_PER_SECOND = "send.records.per.second"; public static final String SEND_RECORDS_TOTAL = "send.records.total"; public static final String SEND_BUFFER_BATCH_COUNT = "send.buffer.batch.count"; - public static final String SEND_CALLBACK_LATENCY_PER_SECOND_MS = - "send.callback.latency.per.second.ms"; + public static final String SEND_CALLBACK_LATENCY = "send.callback.latency.per.second.ms"; - private MetaService metaService; - private StoreWriter storeWriter; + private final MetaService metaService; + private final StoreWriter storeWriter; - private int bufferSize; - private int storeCount; - private int sendOperationLimit; + private final int bufferSize; + private final int storeCount; + private final int sendOperationLimit; private List> storeSendBuffer; private BlockingQueue sendTasks; @@ -57,7 +56,7 @@ public class BatchSender implements MetricsAgent { private AvgMetric sendRecordsMetric; private List bufferBatchCountMetrics; private List callbackLatencyMetrics; - private int receiverQueueSize; + private final int receiverQueueSize; public BatchSender( Configs configs, @@ -72,7 +71,7 @@ public BatchSender( this.sendOperationLimit = IngestorConfig.INGESTOR_SENDER_OPERATION_MAX_COUNT.get(configs); this.receiverQueueSize = StoreConfig.STORE_QUEUE_BUFFER_SIZE.get(configs); initMetrics(); - metricsCollector.register(this, () -> updateMetrics()); + metricsCollector.register(this, this::updateMetrics); } public void start() { @@ -299,7 +298,7 @@ public Map getMetrics() { .map(q -> q.size()) .collect(Collectors.toList()))); put( - SEND_CALLBACK_LATENCY_PER_SECOND_MS, + SEND_CALLBACK_LATENCY, String.valueOf( callbackLatencyMetrics.stream() .map(m -> (int) (1000 * m.getAvg())) @@ -316,7 +315,7 @@ public String[] getMetricKeys() { SEND_RECORDS_PER_SECOND, SEND_RECORDS_TOTAL, SEND_BUFFER_BATCH_COUNT, - SEND_CALLBACK_LATENCY_PER_SECOND_MS + SEND_CALLBACK_LATENCY }; } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/SnapshotSortQueue.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/SnapshotSortQueue.java index 0b1bff73cd3e..0e07e5768044 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/SnapshotSortQueue.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/SnapshotSortQueue.java @@ -34,15 +34,15 @@ public class SnapshotSortQueue { private static final Logger logger = LoggerFactory.getLogger(SnapshotSortQueue.class); - private long queueWaitMs; - private int queueCount; + private final long queueWaitMs; + private final int queueCount; - private List> innerQueues; - private List queueHeads; + private final List> innerQueues; + private final List queueHeads; private int currentPollQueueIdx; private long currentPollSnapshotId; - private AtomicInteger size; + private final AtomicInteger size; public SnapshotSortQueue(Configs configs, MetaService metaService) { this.currentPollSnapshotId = -1L; diff --git a/k8s/internal/Makefile b/k8s/internal/Makefile index b4ceda659675..4a54b45dd2e2 100644 --- a/k8s/internal/Makefile +++ b/k8s/internal/Makefile @@ -21,7 +21,11 @@ endif ARCH := $(shell uname -m) MACOS_WHEEL_VERSION := 12_0 -BUILD_PROGRESS = auto +# docker build arguments +BUILD_PROGRESS = auto + +# pip installation arguments +PIP_ARGS = --timeout=1000 --no-cache-dir # Default null TARGET_SERVER_PACKAGE := @@ -67,7 +71,7 @@ MAKE_INSTALL_COMMAND=make install INSTALL_PREFIX=${INSTALL_PREFIX} graphscope-darwin-py3: # build graphscope cd $(WORKING_DIR)/../.. && \ - python3 -m pip install --upgrade setuptools && \ + python3 -m pip install ${PIP_ARGS} --upgrade setuptools && \ export INSTALL_PREFIX=/opt/graphscope && \ for _ in {1..5}; do if $(MAKE_COMMAND); then succeed=1; break; else succeed=0; fi; done && \ if [[ $$succeed == 1 ]]; then echo "Build succeed"; else echo "Build failed"; exit 1; fi && \ @@ -106,8 +110,8 @@ graphscope-manylinux2014-py3-nodocker: cd $(WORKING_DIR)/../.. && \ if [[ "${ARCH}" == "aarch64" ]]; then \ export AUDITWHEEL_PLAT=manylinux2014_${ARCH}; \ - python3 -m pip install grpcio==1.49.1 --no-binary grpcio; \ - python3 -m pip install grpcio-tools==1.49.1 --no-binary grpcio-tools; \ + python3 -m pip install ${PIP_ARGS} grpcio==1.49.1 --no-binary grpcio; \ + python3 -m pip install ${PIP_ARGS} grpcio-tools==1.49.1 --no-binary grpcio-tools; \ fi && \ sudo rm -rf ./learning_engine/graph-learn/graphlearn/built && \ for _ in {1..5}; do if $(MAKE_COMMAND); then succeed=1; break; else succeed=0; fi; done && \ @@ -152,6 +156,9 @@ graphscope-manylinux2014-py3: graphscope-client-manylinux2014-py3-nodocker: set -euxo pipefail && \ + cd $(WORKING_DIR)/../../python && \ + python3 -m pip install ${PIP_ARGS} "torch" --index-url https://download.pytorch.org/whl/cpu --user && \ + python3 -m pip install ${PIP_ARGS} -r requirements.txt -r requirements-dev.txt --user && \ cd $(WORKING_DIR)/../../learning_engine/graph-learn && \ git submodule update --init third_party/pybind11 && \ cd graphlearn && \ @@ -166,39 +173,39 @@ graphscope-client-manylinux2014-py3-nodocker: for py in cp37-cp37m cp38-cp38 cp39-cp39 cp310-cp310 cp311-cp311; do \ cd $(WORKING_DIR)/../../python; \ export PATH=/opt/python/$$py/bin:$$PATH; \ - python3 -m pip install -U pip; \ + python3 -m pip install ${PIP_ARGS} -U pip; \ if [[ "$$py" == "cp311-cp311" ]]; then \ if [[ "${ARCH}" == "aarch64" ]]; then \ - python3 -m pip install grpcio==1.49.1 --no-binary grpcio; \ - python3 -m pip install grpcio-tools==1.49.1 --no-binary grpcio-tools; \ + python3 -m pip install ${PIP_ARGS} grpcio==1.49.1 --no-binary grpcio; \ + python3 -m pip install ${PIP_ARGS} grpcio-tools==1.49.1 --no-binary grpcio-tools; \ else \ - python3 -m pip install "grpcio" "grpcio-tools"; \ + python3 -m pip install ${PIP_ARGS} "grpcio" "grpcio-tools"; \ fi; \ - python3 -m pip install mypy-protobuf "numpy==1.23.2" "pandas" wheel "auditwheel==5.0.0"; \ + python3 -m pip install ${PIP_ARGS} mypy-protobuf "numpy==1.23.2" "pandas" wheel "auditwheel==5.0.0"; \ elif [[ "$$py" == "cp310-cp310" ]]; then \ if [[ "${ARCH}" == "aarch64" ]]; then \ - python3 -m pip install grpcio==1.49.1 --no-binary grpcio; \ - python3 -m pip install grpcio-tools==1.49.1 --no-binary grpcio-tools; \ + python3 -m pip install ${PIP_ARGS} grpcio==1.49.1 --no-binary grpcio; \ + python3 -m pip install ${PIP_ARGS} grpcio-tools==1.49.1 --no-binary grpcio-tools; \ else \ - python3 -m pip install "grpcio>=1.49" "grpcio-tools>=1.49"; \ + python3 -m pip install ${PIP_ARGS} "grpcio>=1.49" "grpcio-tools>=1.49"; \ fi; \ - python3 -m pip install mypy-protobuf "numpy==1.21.2" "pandas" wheel "auditwheel==5.0.0"; \ + python3 -m pip install ${PIP_ARGS} mypy-protobuf "numpy==1.21.2" "pandas" wheel "auditwheel==5.0.0"; \ elif [[ "$$py" == "cp39-cp39" ]]; then \ if [[ "${ARCH}" == "aarch64" ]]; then \ - python3 -m pip install grpcio==1.49.1 --no-binary grpcio; \ - python3 -m pip install grpcio-tools==1.49.1 --no-binary grpcio-tools; \ + python3 -m pip install ${PIP_ARGS} grpcio==1.49.1 --no-binary grpcio; \ + python3 -m pip install ${PIP_ARGS} grpcio-tools==1.49.1 --no-binary grpcio-tools; \ else \ - python3 -m pip install "grpcio>=1.49" "grpcio-tools>=1.49"; \ + python3 -m pip install ${PIP_ARGS} "grpcio>=1.49" "grpcio-tools>=1.49"; \ fi; \ - python3 -m pip install mypy-protobuf "numpy==1.19.3" "pandas" wheel "auditwheel==5.0.0"; \ + python3 -m pip install ${PIP_ARGS} mypy-protobuf "numpy==1.19.3" "pandas" wheel "auditwheel==5.0.0"; \ else \ if [[ "${ARCH}" == "aarch64" ]]; then \ - python3 -m pip install grpcio==1.49.1 --no-binary grpcio; \ - python3 -m pip install grpcio-tools==1.49.1 --no-binary grpcio-tools; \ + python3 -m pip install ${PIP_ARGS} grpcio==1.49.1 --no-binary grpcio; \ + python3 -m pip install ${PIP_ARGS} grpcio-tools==1.49.1 --no-binary grpcio-tools; \ else \ - python3 -m pip install "grpcio>=1.49" "grpcio-tools>=1.49"; \ + python3 -m pip install ${PIP_ARGS} "grpcio>=1.49" "grpcio-tools>=1.49"; \ fi; \ - python3 -m pip install mypy-protobuf "numpy" "pandas" wheel "auditwheel==5.0.0"; \ + python3 -m pip install ${PIP_ARGS} mypy-protobuf "numpy" "pandas" wheel "auditwheel==5.0.0"; \ fi; \ sudo rm -rf build; \ sudo rm -rf dist/*.whl; \ @@ -208,6 +215,9 @@ graphscope-client-manylinux2014-py3-nodocker: done graphscope-client-darwin-py3: + cd $(WORKING_DIR)/../../python && \ + python3 -m pip install ${PIP_ARGS} "torch" --index-url https://download.pytorch.org/whl/cpu --user && \ + python3 -m pip install ${PIP_ARGS} -r requirements.txt -r requirements-dev.txt --user && \ cd $(WORKING_DIR)/../../learning_engine/graph-learn && \ (git submodule update --init third_party/pybind11 || true) && \ export GRAPHSCOPE_HOME=${GRAPHSCOPE_HOME} && \ @@ -221,22 +231,22 @@ graphscope-client-darwin-py3: export DYLD_LIBRARY_PATH=$(WORKING_DIR)/../../learning_engine/graph-learn/graphlearn/built/lib:/usr/local/lib:${GRAPHSCOPE_HOME}/lib:$$DYLD_LIBRARY_PATH && \ cd $(WORKING_DIR)/../../python && \ py=$$(python3 -V 2>&1 | awk '{print $$2}' | awk -F '.' '{print $$1$$2}') && \ - pip3 install -U pip && \ + pip3 install ${PIP_ARGS} -U pip && \ if [[ "$$py" == "311" ]]; then \ - pip3 install mypy-protobuf "numpy==1.23.2" "pandas" "grpcio>=1.49" "grpcio-tools>=1.49" delocate wheel; \ + pip3 install ${PIP_ARGS} mypy-protobuf "numpy==1.23.2" "pandas" "grpcio>=1.49" "grpcio-tools>=1.49" delocate wheel; \ elif [[ "$$py" == "310" ]]; then \ - pip3 install mypy-protobuf "numpy==1.21.2" "pandas" "grpcio>=1.49" "grpcio-tools>=1.49" delocate wheel; \ + pip3 install ${PIP_ARGS} mypy-protobuf "numpy==1.21.2" "pandas" "grpcio>=1.49" "grpcio-tools>=1.49" delocate wheel; \ elif [[ "$$py" == "39" ]]; then \ if [[ "${ARCH}" == "arm64" ]]; then \ - pip3 install mypy-protobuf "numpy==1.21.0" "pandas" "grpcio>=1.49" "grpcio-tools>=1.49" delocate wheel; \ + pip3 install ${PIP_ARGS} mypy-protobuf "numpy==1.21.0" "pandas" "grpcio>=1.49" "grpcio-tools>=1.49" delocate wheel; \ else \ - pip3 install mypy-protobuf "numpy==1.19.3" "pandas" "grpcio>=1.49" "grpcio-tools>=1.49" delocate wheel; \ + pip3 install ${PIP_ARGS} mypy-protobuf "numpy==1.19.3" "pandas" "grpcio>=1.49" "grpcio-tools>=1.49" delocate wheel; \ fi; \ else \ if [[ "${ARCH}" == "arm64" ]]; then \ - pip3 install mypy-protobuf "numpy==1.21.0" "pandas" "grpcio>=1.49" "grpcio-tools>=1.49" delocate wheel; \ + pip3 install ${PIP_ARGS} mypy-protobuf "numpy==1.21.0" "pandas" "grpcio>=1.49" "grpcio-tools>=1.49" delocate wheel; \ else \ - pip3 install mypy-protobuf "numpy==1.18.5" "pandas" "grpcio>=1.49" "grpcio-tools>=1.49" delocate wheel; \ + pip3 install ${PIP_ARGS} mypy-protobuf "numpy==1.18.5" "pandas" "grpcio>=1.49" "grpcio-tools>=1.49" delocate wheel; \ fi; \ fi; \ rm -rf build dist/*.whl || true && \ diff --git a/python/graphscope/gsctl/scripts/install_deps_command.sh b/python/graphscope/gsctl/scripts/install_deps_command.sh index 354a238fec45..5feaf23df290 100644 --- a/python/graphscope/gsctl/scripts/install_deps_command.sh +++ b/python/graphscope/gsctl/scripts/install_deps_command.sh @@ -133,7 +133,7 @@ ANALYTICAL_MACOS=( ) _install_apache_arrow_ubuntu() { - if ! command -v dpkg -s libarrow-dev &>/dev/null; then + if ! dpkg -s libarrow-dev &>/dev/null; then log "Installing apache-arrow." ${SUDO} apt-get install -y lsb-release # shellcheck disable=SC2046,SC2019,SC2018 diff --git a/python/graphscope/gsctl/scripts/lib/install_vineyard.sh b/python/graphscope/gsctl/scripts/lib/install_vineyard.sh index 3a971eec345f..9498618ba4e5 100644 --- a/python/graphscope/gsctl/scripts/lib/install_vineyard.sh +++ b/python/graphscope/gsctl/scripts/lib/install_vineyard.sh @@ -35,6 +35,8 @@ install_vineyard() { fi pushd ${directory} || exit + # make sure it complain loudly if installing vineyard fails + set -e cmake . -DCMAKE_PREFIX_PATH="${install_prefix}" \ -DCMAKE_INSTALL_PREFIX="${V6D_PREFIX}" \ -DBUILD_VINEYARD_TESTS=OFF \ @@ -46,6 +48,7 @@ install_vineyard() { strip "${V6D_PREFIX}"/bin/vineyard* "${V6D_PREFIX}"/lib/libvineyard* pip3 install --no-cache -i https://pypi.org/simple -U "vineyard" "vineyard-io" cp -rs "${V6D_PREFIX}"/* "${install_prefix}"/ + set +e popd || exit popd || exit cleanup_files "${workdir}/${directory}" "${workdir}/${file}" diff --git a/python/graphscope/nx/conftest.py b/python/graphscope/nx/conftest.py index 017360af8ec9..94d2729fa52f 100644 --- a/python/graphscope/nx/conftest.py +++ b/python/graphscope/nx/conftest.py @@ -17,6 +17,8 @@ # import os +import sys +from unittest.mock import patch import pytest @@ -45,3 +47,22 @@ def pytest_collection_modifyitems(items): timeout_marker = item.get_marker("timeout") if timeout_marker is None: item.add_marker(pytest.mark.timeout(600)) + + +@pytest.fixture(scope="session", autouse=True) +def patch_print_pytest_xdist(): + """ + pytest-xdist disables stdout capturing by default, which means that print() + statements are not captured and displayed in the terminal. + + That's because xdist cannot support -s for technical reasons wrt the process + execution mechanism. + + See also: https://github.com/pytest-dev/pytest-xdist/issues/354 + """ + original_print = print + with patch("builtins.print") as mock_print: + mock_print.side_effect = lambda *args, **kwargs: original_print( + *args, **{"file": sys.stderr, **kwargs} + ) + yield mock_print diff --git a/python/graphscope/tests/conftest.py b/python/graphscope/tests/conftest.py index bd0e022ba044..45c563ad4678 100644 --- a/python/graphscope/tests/conftest.py +++ b/python/graphscope/tests/conftest.py @@ -17,6 +17,8 @@ # import os +import sys +from unittest.mock import patch import numpy as np import pandas as pd @@ -914,3 +916,22 @@ def pytest_collection_modifyitems(items): timeout_marker = item.get_marker("timeout") if timeout_marker is None: item.add_marker(pytest.mark.timeout(600)) + + +@pytest.fixture(scope="session", autouse=True) +def patch_print_pytest_xdist(): + """ + pytest-xdist disables stdout capturing by default, which means that print() + statements are not captured and displayed in the terminal. + + That's because xdist cannot support -s for technical reasons wrt the process + execution mechanism. + + See also: https://github.com/pytest-dev/pytest-xdist/issues/354 + """ + original_print = print + with patch("builtins.print") as mock_print: + mock_print.side_effect = lambda *args, **kwargs: original_print( + *args, **{"file": sys.stderr, **kwargs} + ) + yield mock_print diff --git a/python/requirements-dev.txt b/python/requirements-dev.txt index fc1dc7981f37..36ec8074a151 100644 --- a/python/requirements-dev.txt +++ b/python/requirements-dev.txt @@ -10,6 +10,7 @@ myst-parser>=0.13.0 pylint pytest pytest-cov +pytest-xdist pytest-timeout Pygments>=2.4.1 sphinx>=7.1.2 @@ -20,3 +21,4 @@ sphinxext-opengraph tomli wheel setuptools==65.7.0 +torch diff --git a/python/requirements.txt b/python/requirements.txt index 0ad0f572e1c2..aeecca8269b6 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -24,4 +24,3 @@ tqdm click vineyard>=0.16.3;sys_platform!="win32" simple-parsing -torch==1.13.1 diff --git a/python/setup.py b/python/setup.py index efbaf4fa5594..44efa5dc197a 100644 --- a/python/setup.py +++ b/python/setup.py @@ -264,8 +264,8 @@ def parsed_package_data(): def build_learning_engine(): ext_modules = [graphlearn_ext()] if torch and os.path.exists(os.path.join(glt_root_path, "graphlearn_torch")): - sys.path.append( - os.path.join(glt_root_path, "graphlearn_torch", "python", "utils") + sys.path.insert( + 0, os.path.join(glt_root_path, "graphlearn_torch", "python", "utils") ) from build import glt_ext_module from build import glt_v6d_ext_module @@ -284,6 +284,7 @@ def build_learning_engine(): root_path=glt_root_path, ) ) + sys.path.pop(0) return ext_modules