From 60bb1dd0d4aedb62a3bdc08cfbb98660d5d09c6a Mon Sep 17 00:00:00 2001 From: rui-mo Date: Mon, 12 Aug 2024 13:53:18 +0800 Subject: [PATCH] Enable Spark query runner in aggregate fuzzer test --- .github/workflows/experimental.yml | 44 +++++++++++++------ .github/workflows/scheduled.yml | 8 +++- velox/exec/fuzzer/AggregationFuzzerBase.h | 2 + .../fuzzer/SparkAggregationFuzzerTest.cpp | 37 +++++++--------- 4 files changed, 55 insertions(+), 36 deletions(-) diff --git a/.github/workflows/experimental.yml b/.github/workflows/experimental.yml index 9c55c338f678d..2cf44c87e5f8e 100644 --- a/.github/workflows/experimental.yml +++ b/.github/workflows/experimental.yml @@ -168,37 +168,55 @@ jobs: /tmp/aggregate_fuzzer_repro /tmp/server.log - linux-spark-fuzzer-run: - runs-on: ubuntu-latest - needs: compile + spark-java-aggregation-fuzzer-run: + runs-on: 8-core-ubuntu + container: ghcr.io/facebookincubator/velox-dev:spark-server timeout-minutes: 120 + env: + CCACHE_DIR: "/__w/velox/velox/.ccache/" + LINUX_DISTRO: "centos" steps: + - name: "Restore ccache" + uses: actions/cache@v3 + with: + path: "${{ env.CCACHE_DIR }}" + # We are using the benchmark ccache as it has all + # required features enabled, so no need to create a new one + key: ccache-spark-${{ github.sha }} + restore-keys: | + ccache-spark- + - name: "Checkout Repo" uses: actions/checkout@v3 with: + path: velox + submodules: 'recursive' ref: "${{ inputs.ref || 'main' }}" - - name: "Install dependencies" - run: source ./scripts/setup-ubuntu.sh && install_apt_deps - - - name: Download spark aggregation fuzzer - uses: actions/download-artifact@v3 - with: - name: spark_aggregation_fuzzer + - name: "Build" + run: | + cd velox + source /opt/rh/gcc-toolset-12/enable + make debug NUM_THREADS="${{ inputs.numThreads || 8 }}" MAX_HIGH_MEM_JOBS="${{ inputs.maxHighMemJobs || 8 }}" MAX_LINK_JOBS="${{ inputs.maxLinkJobs || 4 }}" EXTRA_CMAKE_FLAGS="-DVELOX_ENABLE_ARROW=ON ${{ inputs.extraCMakeFlags }}" + ccache -s - name: "Run Spark Aggregate Fuzzer" run: | + cd velox + bash /opt/start-spark.sh + # Sleep for 60 seconds to allow Spark server to start. + sleep 60 mkdir -p /tmp/spark_aggregate_fuzzer_repro/ + rm -rfv /tmp/spark_aggregate_fuzzer_repro/* chmod -R 777 /tmp/spark_aggregate_fuzzer_repro - chmod +x spark_aggregation_fuzzer_test - ./spark_aggregation_fuzzer_test \ + _build/debug/velox/functions/sparksql/fuzzer/spark_aggregation_fuzzer_test \ --seed ${RANDOM} \ --duration_sec 1800 \ --logtostderr=1 \ --minloglevel=0 \ --repro_persist_path=/tmp/spark_aggregate_fuzzer_repro \ - --enable_sorted_aggregations=true \ + --enable_sorted_aggregations=false \ && echo -e "\n\nSpark Aggregation Fuzzer run finished successfully." - name: Archive Spark aggregate production artifacts diff --git a/.github/workflows/scheduled.yml b/.github/workflows/scheduled.yml index 4b1370fb00367..f4ab29f8625e4 100644 --- a/.github/workflows/scheduled.yml +++ b/.github/workflows/scheduled.yml @@ -454,9 +454,9 @@ jobs: spark-aggregate-fuzzer-run: name: Spark Aggregate Fuzzer runs-on: ubuntu-latest - container: ghcr.io/facebookincubator/velox-dev:centos9 + container: ghcr.io/facebookincubator/velox-dev:spark-server needs: compile - timeout-minutes: 60 + timeout-minutes: 120 steps: - name: Download spark aggregation fuzzer @@ -466,12 +466,16 @@ jobs: - name: Run Spark Aggregate Fuzzer run: | + bash /opt/start-spark.sh + # Sleep for 60 seconds to allow Spark server to start. + sleep 60 mkdir -p /tmp/spark_aggregate_fuzzer_repro/logs/ chmod -R 777 /tmp/spark_aggregate_fuzzer_repro chmod +x spark_aggregation_fuzzer_test ./spark_aggregation_fuzzer_test \ --seed ${RANDOM} \ --duration_sec $DURATION \ + --enable_sorted_aggregations=false \ --minloglevel=0 \ --stderrthreshold=2 \ --log_dir=/tmp/spark_aggregate_fuzzer_repro/logs \ diff --git a/velox/exec/fuzzer/AggregationFuzzerBase.h b/velox/exec/fuzzer/AggregationFuzzerBase.h index 3a314605db52b..0c5c9030cee2e 100644 --- a/velox/exec/fuzzer/AggregationFuzzerBase.h +++ b/velox/exec/fuzzer/AggregationFuzzerBase.h @@ -76,6 +76,8 @@ class AggregationFuzzerBase { vectorFuzzer_{getFuzzerOptions(timestampPrecision), pool_.get()} { filesystems::registerLocalFileSystem(); auto configs = hiveConfigs; + // Make sure not to run out of open file descriptors. + configs[connector::hive::HiveConfig::kNumCacheFileHandles] = "1000"; auto hiveConnector = connector::getConnectorFactory( connector::hive::HiveConnectorFactory::kHiveConnectorName) diff --git a/velox/functions/sparksql/fuzzer/SparkAggregationFuzzerTest.cpp b/velox/functions/sparksql/fuzzer/SparkAggregationFuzzerTest.cpp index e5c1c3e17981a..122947dc7b2b7 100644 --- a/velox/functions/sparksql/fuzzer/SparkAggregationFuzzerTest.cpp +++ b/velox/functions/sparksql/fuzzer/SparkAggregationFuzzerTest.cpp @@ -21,10 +21,10 @@ #include "velox/exec/fuzzer/AggregationFuzzerOptions.h" #include "velox/exec/fuzzer/AggregationFuzzerRunner.h" -#include "velox/exec/fuzzer/DuckQueryRunner.h" #include "velox/exec/fuzzer/TransformResultVerifier.h" #include "velox/functions/prestosql/registration/RegistrationFunctions.h" #include "velox/functions/sparksql/aggregates/Register.h" +#include "velox/functions/sparksql/fuzzer/SparkQueryRunner.h" DEFINE_int64( seed, @@ -53,10 +53,13 @@ int main(int argc, char** argv) { facebook::velox::functions::prestosql::registerInternalFunctions(); facebook::velox::memory::MemoryManager::initialize({}); - // TODO: List of the functions that at some point crash or fail and need to - // be fixed before we can enable. Constant argument of bloom_filter_agg cause - // fuzzer test fail. - std::unordered_set skipFunctions = {"bloom_filter_agg"}; + // Spark does not provide user-accessible aggregate functions with the + // following names. + std::unordered_set skipFunctions = { + "bloom_filter_agg", + "first_ignore_null", + "last_ignore_null", + "regr_replacement"}; using facebook::velox::exec::test::TransformResultVerifier; @@ -95,21 +98,9 @@ int main(int argc, char** argv) { size_t initialSeed = FLAGS_seed == 0 ? std::time(nullptr) : FLAGS_seed; std::shared_ptr rootPool{ facebook::velox::memory::memoryManager()->addRootPool()}; - auto duckQueryRunner = - std::make_unique( - rootPool.get()); - duckQueryRunner->disableAggregateFunctions( - {// https://github.com/facebookincubator/velox/issues/7677 - "max_by", - "min_by", - // The skewness functions of Velox and DuckDB use different - // algorithms. - // https://github.com/facebookincubator/velox/issues/4845 - "skewness", - // Spark's kurtosis uses Pearson's formula for calculating the kurtosis - // coefficient. Meanwhile, DuckDB employs the sample kurtosis calculation - // formula. The results from the two methods are completely different. - "kurtosis"}); + auto sparkQueryRunner = std::make_unique< + facebook::velox::functions::sparksql::fuzzer::SparkQueryRunner>( + rootPool.get(), "localhost:15002", "fuzzer", "aggregate"); using Runner = facebook::velox::exec::test::AggregationFuzzerRunner; using Options = facebook::velox::exec::test::AggregationFuzzerOptions; @@ -119,5 +110,9 @@ int main(int argc, char** argv) { options.skipFunctions = skipFunctions; options.customVerificationFunctions = customVerificationFunctions; options.orderableGroupKeys = true; - return Runner::run(initialSeed, std::move(duckQueryRunner), options); + options.timestampPrecision = + facebook::velox::VectorFuzzer::Options::TimestampPrecision::kMicroSeconds; + options.hiveConfigs = { + {facebook::velox::connector::hive::HiveConfig::kReadTimestampUnit, "6"}}; + return Runner::run(initialSeed, std::move(sparkQueryRunner), options); }