Skip to content

Commit

Permalink
Adds support for biasing aggregation signatures.
Browse files Browse the repository at this point in the history
  • Loading branch information
kgpai committed Apr 9, 2024
1 parent 41bed84 commit 113502c
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 4 deletions.
85 changes: 82 additions & 3 deletions .github/workflows/scheduled.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ jobs:
presto_error: ${{ steps.sig-check.outputs.presto_error }}
spark_bias: ${{ steps.sig-check.outputs.spark_functions }}
spark_error: ${{ steps.sig-check.outputs.spark_error }}
presto_aggregate_bias: ${{ steps.sig-check.outputs.presto_aggregate_functions }}
presto_aggregate_error: ${{ steps.sig-check.outputs.presto_aggregate_error }}

steps:

Expand All @@ -95,7 +97,7 @@ jobs:
id: get-sig
with:
path: /tmp/signatures
key: function-signatures
key: function-signatures-with-aggregation

- name: Restore ccache
uses: assignUser/stash/restore@v1
Expand Down Expand Up @@ -142,13 +144,14 @@ jobs:
python3 -m pip install deepdiff
python3 scripts/signature.py export --spark /tmp/signatures/spark_signatures_main.json
python3 scripts/signature.py export --presto /tmp/signatures/presto_signatures_main.json
python3 scripts/signature.py export_aggregates --presto /tmp/signatures/presto_aggregate_signatures_main.json
- name: Save Function Signature Stash
if: ${{ steps.get-sig.outputs.stash-hit != 'true' }}
uses: assignUser/stash/save@v1
with:
path: /tmp/signatures
key: function-signatures
key: function-signatures-with-aggregation

- name: Checkout Contender
uses: actions/checkout@v4
Expand Down Expand Up @@ -191,6 +194,10 @@ jobs:
source .venv/bin/activate
python3 -m pip install deepdiff
python3 scripts/signature.py gh_bias_check presto spark
python3 scripts/signature.py export_aggregates --presto /tmp/signatures/presto_aggregate_signatures_contendor.json
python3 scripts/signature.py bias_aggregates /tmp/signatures/presto_aggregate_signatures_main.json \
/tmp/signatures/presto_aggregate_signatures_contendor.json /tmp/signatures/presto_aggregate_bias_functions \
/tmp/signatures/presto_aggregate_errors
- name: Upload Signature Artifacts
uses: actions/upload-artifact@v4
Expand Down Expand Up @@ -650,6 +657,77 @@ jobs:
path: |
/tmp/aggregate_fuzzer_repro
/tmp/server.log
presto-bias-java-aggregation-fuzzer-run:
name: Biased Aggregation Fuzzer with Presto as source of truth
needs: compile
runs-on: ubuntu-latest
container: ghcr.io/facebookincubator/velox-dev:presto-java
timeout-minutes: 120
if: ${{ needs.compile.outputs.presto_aggregate_bias == 'true' }}
env:
CCACHE_DIR: "${{ github.workspace }}/.ccache/"
LINUX_DISTRO: "centos"
steps:

- name: Download aggregation fuzzer
uses: actions/download-artifact@v4
with:
name: aggregation

- name: "Checkout Repo"
uses: actions/checkout@v4
with:
path: velox
submodules: 'recursive'
ref: "${{ inputs.ref }}"

- name: Fix git permissions
# Usually actions/checkout does this but as we run in a container
# it doesn't work
run: git config --global --add safe.directory /__w/velox/velox/velox

- name: Download Signatures
uses: actions/download-artifact@v4
with:
name: signatures
path: /tmp/signatures

- name: "Run Bias Aggregate Fuzzer"
run: |
cd velox
cp ./scripts/etc/hive.properties $PRESTO_HOME/etc/catalog
ls -lR $PRESTO_HOME/etc
$PRESTO_HOME/bin/launcher run -v > /tmp/server.log 2>&1 &
# Sleep for 60 seconds to allow Presto server to start.
sleep 60
/opt/presto-cli --server 127.0.0.1:8080 --execute 'CREATE SCHEMA hive.tpch;'
cd -
mkdir -p /tmp/aggregate_fuzzer_repro/
rm -rfv /tmp/aggregate_fuzzer_repro/*
chmod -R 777 /tmp/aggregate_fuzzer_repro
chmod +x velox_aggregation_fuzzer_test
cat /tmp/signatures/presto_aggregate_bias_functions
./velox_aggregation_fuzzer_test \
--seed ${RANDOM} \
--duration_sec $DURATION \
--logtostderr=1 \
--minloglevel=1 \
--repro_persist_path=/tmp/aggregate_fuzzer_repro \
--enable_sorted_aggregations=true \
--only=$(cat /tmp/signatures/presto_aggregate_bias_functions)
--presto_url=http://127.0.0.1:8080 \
&& echo -e "\n\nAggregation fuzzer run finished successfully."
- name: Archive aggregate production artifacts
if: ${{ !cancelled() }}
uses: actions/upload-artifact@v4
with:
name: presto-sot-aggregate-fuzzer-failure-artifacts
path: |
/tmp/aggregate_fuzzer_repro
/tmp/server.log
surface-signature-errors:
name: Signature Changes
Expand All @@ -663,8 +741,9 @@ jobs:
path: /tmp/signatures

- name: Surface Presto function signature errors
if: ${{ needs.compile.outputs.presto_error == 'true' }}
if: ${{ needs.compile.outputs.presto_error == 'true' || needs.compile.outputs.presto_aggregate_error == 'true' }}
run: |
cat /tmp/signatures/presto_errors
cat /tmp/signatures/presto_aggregate_errors
exit 1
51 changes: 51 additions & 0 deletions scripts/signature.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import argparse
import json
import os
import re
import sys
from typing import Any

Expand All @@ -33,6 +34,9 @@ class bcolors:
BOLD = "\033[1m"


aggregate_pattern = re.compile("(.*)(_merge|_merge_extract|_partial)")


def get_error_string(error_message):
return f"""
Incompatible changes in function signatures have been detected.
Expand Down Expand Up @@ -210,6 +214,46 @@ def bias_signatures(base_signatures, contender_signatures, tickets, error_path):
return "", status


def bias_aggregates(args):
"""
Finds and exports aggregates whose signatures have been modified agasint a baseline.
Saves the results to a file and sets a Github Actions Output.
Currently this is hardcoded to presto aggregates.
"""
with open(args.base) as f:
base_signatures = json.load(f)

with open(args.contender) as f:
contender_signatures = json.load(f)

delta, status = diff_signatures(
base_signatures, contender_signatures, args.error_path
)

set_gh_output(f"presto_aggregate_error", status == 1)

if not delta:
print(f"{bcolors.BOLD} No changes detected: Nothing to do!")
return "", status

function_set = set()
for items in delta.values():
for item in items:
fn_name = item.get_root_key()
pattern = aggregate_pattern.match(fn_name)
if pattern:
function_set.add(pattern.group(1))
else:
function_set.add(fn_name)

if function_set:
biased_functions = ",".join(function_set)
with open(args.output_path, "w") as f:
print(f"{biased_functions}", file=f, end="")

set_gh_output(f"presto_aggregate_functions", True)


def gh_bias_check(args):
"""
Exports signatures for the given group(s) and checks them for changes compared to a baseline.
Expand Down Expand Up @@ -294,6 +338,7 @@ def parse_args(args):
"ticket_value", type=get_tickets, default=10, nargs="?"
)
bias_command_parser.add_argument("error_path", type=str, default="")

gh_command_parser = command.add_parser("gh_bias_check")
gh_command_parser.add_argument(
"group",
Expand All @@ -314,6 +359,12 @@ def parse_args(args):
"--output_postfix", type=str, default="_bias_functions"
)

bias_aggregate_command_parser = command.add_parser("bias_aggregates")
bias_aggregate_command_parser.add_argument("base", type=str)
bias_aggregate_command_parser.add_argument("contender", type=str)
bias_aggregate_command_parser.add_argument("output_path", type=str)
bias_aggregate_command_parser.add_argument("error_path", type=str, default="")

parser.set_defaults(command="help")

return parser.parse_args(args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1140,7 +1140,6 @@ exec::AggregateRegistrationResult registerMinMaxBy(
.argumentType("C")
.build());
const std::vector<std::string> supportedCompareTypes = {
"boolean",
"tinyint",
"smallint",
"integer",
Expand Down

0 comments on commit 113502c

Please sign in to comment.