Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test/flink/conf #1

Merged
merged 37 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
55a84f4
build(flink): update extra requirements and update
deepyaman Aug 8, 2023
4fdbb87
feat(flink): implement a minimal PyFlink `Backend`
deepyaman Aug 10, 2023
7685791
fix(flink): pass unqualified name to DatabaseTable
deepyaman Aug 11, 2023
e9b2e60
fix(flink): ignore `limit` to fix `Expr.execute()`
deepyaman Aug 12, 2023
a432b9b
test(flink): configure `pytest` plugin for PyFlink
deepyaman Aug 14, 2023
ec3bef0
chore: merge branch 'master' into feat/flink/backend
deepyaman Aug 14, 2023
00abd96
ci(flink): exclude Flink backend from some linting
deepyaman Aug 14, 2023
dafbb51
ci(flink): post-install `apache-flink` for testing
deepyaman Aug 14, 2023
69b4452
ci(flink): test Python 3.10 (supported by PyFlink)
deepyaman Aug 14, 2023
5487e39
test(flink): add `TestConf`, and enable test suite
deepyaman Aug 16, 2023
d07f356
test(flink): re-install pandas~=1.5 for ArrowDtype
deepyaman Aug 16, 2023
5bd34f7
feat(flink): implement `has_operation` for backend
deepyaman Aug 17, 2023
527fd9e
ci(flink): remove the separate `test_flink` config
deepyaman Aug 17, 2023
13c5bb4
chore(flink): remove join tests and snapshot filess
deepyaman Aug 17, 2023
7f0ce4c
test(flink): mark unnest operation not implemented
deepyaman Aug 17, 2023
57a5fff
fix(flink): `deps` should be import names, not pip
deepyaman Aug 17, 2023
b207f2d
test(flink): ignore deprecation warning, from Beam
deepyaman Aug 17, 2023
9959f47
ci(flink): quote reserved words year, month, table
deepyaman Aug 17, 2023
802f0bc
test(flink): make warnings filter match more beams
deepyaman Aug 17, 2023
9b0d1d0
ci(flink): mark Flink backend tests as serial-only
deepyaman Aug 17, 2023
ad387c1
ci: turn on verbose mode in serial backend testing
deepyaman Aug 17, 2023
543a2d1
test(flink): remove header for Flink backend tests
deepyaman Aug 17, 2023
cd548f9
ci: use platform-agnostic way to delete first line
deepyaman Aug 18, 2023
f5496a7
chore: merge branch 'upstream' into test/flink/conf
deepyaman Aug 18, 2023
eea1de3
ci: swap double with single quotes for here-string
deepyaman Aug 18, 2023
a0c2b5e
ci(flink): exclude Python 3.11 tests for `Backend`
deepyaman Aug 18, 2023
a17346c
revert: turn off verbose mode in serial backend testing
deepyaman Aug 18, 2023
9734faf
test(flink): mark STRUCT (i.e. ROW) type `notimpl`
deepyaman Aug 18, 2023
fc2cc07
test(flink): specify unimplemented join operations
deepyaman Aug 19, 2023
39cda23
chore: exclude flink from the operation support matrix for now
cpcloud Aug 21, 2023
77aecbc
feat(flink): implement a minimal PyFlink `Backend`
deepyaman Aug 8, 2023
3c66055
chore(flink): merge remote-tracking branch 'deepyaman/feat/flink/back…
deepyaman Aug 21, 2023
ac37474
ci: don't unnecessarily generate Python 3.10 stuff
deepyaman Aug 21, 2023
8a75879
chore(flink): merge remote-tracking branch 'deepyaman/test/flink/conf…
deepyaman Aug 21, 2023
d9d34b2
test(flink): mark `notyet` for semi and anti joins
deepyaman Aug 21, 2023
a7fac8e
chore: undo artifacts of merge
deepyaman Aug 21, 2023
2b6e5d5
chore: merge branch 'upstream' into test/flink/conf
deepyaman Aug 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 44 additions & 75 deletions .github/workflows/ibis-backends.yml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,15 @@ jobs:
- oracle
services:
- oracle
- name: flink
title: Flink
serial: true
extras:
- flink
additional_deps:
- apache-flink
even_more_deps:
- pandas~=1.5
exclude:
- os: windows-latest
backend:
Expand Down Expand Up @@ -296,6 +305,17 @@ jobs:
- oracle
services:
- oracle
- python-version: "3.11"
backend:
name: flink
title: Flink
serial: true
extras:
- flink
additional_deps:
- apache-flink
even_more_deps:
- pandas~=1.5
steps:
- name: update and install system dependencies
if: matrix.os == 'ubuntu-latest' && matrix.backend.sys-deps != null
Expand Down Expand Up @@ -329,6 +349,25 @@ jobs:
with:
python-version: ${{ matrix.python-version }}

- name: delete header rows on ubuntu
if: matrix.os == 'ubuntu-latest' && matrix.backend.name == 'flink'
run: |
python -c "exec(\"TEST_FILES = ['functional_alltypes', 'diamonds', 'batting', 'awards_players']\nfor file in TEST_FILES:\n with open(f'ci/ibis-testing-data/csv/{file}.csv', 'r+') as f:\n f.readline()\n data=f.read()\n f.seek(0)\n f.write(data)\n f.truncate()\n\")"

- name: delete header rows on windows
if: matrix.os == 'windows-latest' && matrix.backend.name == 'flink'
run: |
python -c @"
TEST_FILES = ['functional_alltypes', 'diamonds', 'batting', 'awards_players']
for file in TEST_FILES:
with open(f'ci/ibis-testing-data/csv/{file}.csv', 'r+') as f:
f.readline()
data=f.read()
f.seek(0)
f.write(data)
f.truncate()
"@

- name: download poetry lockfile
uses: actions/download-artifact@v3
with:
Expand Down Expand Up @@ -363,6 +402,11 @@ jobs:
if: matrix.backend.additional_deps != null
run: poetry run pip install "${{ join(matrix.backend.additional_deps, ' ') }}"

# FIXME(deepyaman)
- name: install even more deps
if: matrix.backend.even_more_deps != null
run: poetry run pip install "${{ join(matrix.backend.even_more_deps, ' ') }}"

- name: show installed deps
run: poetry run pip list

Expand Down Expand Up @@ -774,80 +818,6 @@ jobs:
with:
flags: backend,${{ matrix.backend.name }},${{ runner.os }},python-${{ steps.install_python.outputs.python-version }}

test_flink:
name: Flink ${{ matrix.os }} python-${{ matrix.python-version }}
runs-on: ${{ matrix.os }}
needs:
- gen_lockfile_backends
strategy:
fail-fast: false
matrix:
os:
- ubuntu-latest
- windows-latest
python-version:
- "3.9"
- "3.10"
steps:
- name: checkout
uses: actions/checkout@v3

- name: install python
uses: actions/setup-python@v4
id: install_python
with:
python-version: ${{ matrix.python-version }}

- name: download poetry lockfile
uses: actions/download-artifact@v3
with:
name: backend-deps-${{ matrix.python-version }}
path: deps

- name: pull out lockfile
shell: bash
run: |
set -euo pipefail

mv -f deps/* .
rm -r deps

- uses: syphar/restore-pip-download-cache@v1
with:
requirement_files: poetry.lock
custom_cache_key_element: ${{ steps.install_python.outputs.python-version }}

- name: install poetry
run: python -m pip install --upgrade pip 'poetry<1.4'

- uses: syphar/restore-virtualenv@v1
with:
requirement_files: poetry.lock
custom_cache_key_element: flink-${{ steps.install_python.outputs.python-version }}

- name: install ibis
run: poetry install --without dev --without docs --extras flink

# TODO(deepyaman): Remove step once Ibis and Flink are compatible.
- name: install pyflink
run: poetry run pip install apache-flink

- name: show installed deps
run: poetry run pip list

- name: "run parallel tests: flink"
run: poetry run pytest --junitxml=junit.xml --cov=ibis --cov-report=xml:coverage.xml ibis/backends/flink/tests --numprocesses auto --dist=loadgroup

- name: check that no untracked files were produced
shell: bash
run: git checkout poetry.lock pyproject.toml && ! git status --porcelain | tee /dev/stderr | grep .

- name: upload code coverage
if: success()
uses: codecov/codecov-action@v3
with:
flags: backend,flink,${{ runner.os }},python-${{ steps.install_python.outputs.python-version }}

backends:
# this job exists so that we can use a single job from this workflow to gate merging
runs-on: ubuntu-latest
Expand All @@ -856,6 +826,5 @@ jobs:
- test_backends
- test_backends_sqlalchemy2
- test_pyspark
- test_flink
steps:
- run: exit 0
90 changes: 90 additions & 0 deletions ci/schema/flink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
DROP TABLE IF EXISTS functional_alltypes;

CREATE TABLE functional_alltypes (
id INT,
bool_col BOOLEAN,
tinyint_col TINYINT,
smallint_col SMALLINT,
int_col INT,
bigint_col BIGINT,
float_col FLOAT,
double_col DOUBLE,
date_string_col VARCHAR,
string_col VARCHAR,
timestamp_col TIMESTAMP,
`year` INT,
`month` INT
) WITH (
'connector' = 'filesystem',
'path' = 'file:///{data_dir}/csv/functional_alltypes.csv',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true'
);

DROP TABLE IF EXISTS diamonds;

CREATE TABLE diamonds (
carat DOUBLE,
cut VARCHAR,
color VARCHAR,
clarity VARCHAR,
depth DOUBLE,
`table` DOUBLE,
price BIGINT,
x DOUBLE,
y DOUBLE,
z DOUBLE
) WITH (
'connector' = 'filesystem',
'path' = 'file:///{data_dir}/csv/diamonds.csv',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true'
);

DROP TABLE IF EXISTS batting;

CREATE TABLE batting (
playerID VARCHAR,
yearID BIGINT,
stint BIGINT,
teamID VARCHAR,
lgID VARCHAR,
G BIGINT,
AB BIGINT,
R BIGINT,
H BIGINT,
X2B BIGINT,
X3B BIGINT,
HR BIGINT,
RBI BIGINT,
SB BIGINT,
CS BIGINT,
BB BIGINT,
SO BIGINT,
IBB BIGINT,
HBP BIGINT,
SH BIGINT,
SF BIGINT,
GIDP BIGINT
) WITH (
'connector' = 'filesystem',
'path' = 'file:///{data_dir}/csv/batting.csv',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true'
);

DROP TABLE IF EXISTS awards_players;

CREATE TABLE awards_players (
playerID VARCHAR,
awardID VARCHAR,
yearID BIGINT,
lgID VARCHAR,
tie VARCHAR,
notes VARCHAR
) WITH (
'connector' = 'filesystem',
'path' = 'file:///{data_dir}/csv/awards_players.csv',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true'
);
11 changes: 11 additions & 0 deletions ibis/backends/flink/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from functools import lru_cache
from typing import TYPE_CHECKING, Any

import pyflink.version
Expand Down Expand Up @@ -267,3 +268,13 @@ def drop_view(
raise exc.IntegrityError(f"View {name} does not exist.")

# TODO(deepyaman): Support (and differentiate) permanent views.

@classmethod
@lru_cache
def _get_operations(cls):
translator = cls.compiler.translator_class
return translator._registry.keys()

@classmethod
def has_operation(cls, operation: type[ops.Value]) -> bool:
return operation in cls._get_operations()
32 changes: 20 additions & 12 deletions ibis/backends/flink/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,28 @@
from __future__ import annotations

from typing import Any

import pytest

import ibis
import ibis.expr.types as ir
from ibis.backends.conftest import TEST_TABLES
from ibis.backends.tests.base import BackendTest, RoundAwayFromZero


class TestConf(BackendTest, RoundAwayFromZero):
supports_structs = False
deps = ("pyflink",)

@staticmethod
def connect(*, tmpdir, worker_id, **kw: Any):
from pyflink.table import EnvironmentSettings, TableEnvironment

env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
return ibis.flink.connect(table_env, **kw)

def _load_data(self, **_: Any) -> None:
for stmt in self.ddl_script:
self.connection._t_env.execute_sql(stmt.format(data_dir=self.data_dir))


@pytest.fixture
Expand All @@ -27,13 +45,3 @@ def simple_schema():
@pytest.fixture
def simple_table(simple_schema):
return ibis.table(simple_schema, name="table")


@pytest.fixture
def batting() -> ir.Table:
return ibis.table(schema=TEST_TABLES["batting"], name="batting")


@pytest.fixture
def awards_players() -> ir.Table:
return ibis.table(schema=TEST_TABLES["awards_players"], name="awards_players")

This file was deleted.

This file was deleted.

This file was deleted.

Loading