Skip to content

Commit

Permalink
feat(flink): implement a minimal PyFlink Backend
Browse files Browse the repository at this point in the history
* build(flink): update extra requirements and update

* feat(flink): implement a minimal PyFlink `Backend`

* fix(flink): pass unqualified name to DatabaseTable

* fix(flink): ignore `limit` to fix `Expr.execute()`

* test(flink): configure `pytest` plugin for PyFlink

* ci(flink): exclude Flink backend from some linting

* ci(flink): post-install `apache-flink` for testing

* ci(flink): test Python 3.10 (supported by PyFlink)

* test(flink): add `TestConf`, and enable test suite

* test(flink): re-install pandas~=1.5 for ArrowDtype

* feat(flink): implement `has_operation` for backend

* ci(flink): remove the separate `test_flink` config

* chore(flink): remove join tests and snapshot filess

* test(flink): mark unnest operation not implemented

* fix(flink): `deps` should be import names, not pip

* test(flink): ignore deprecation warning, from Beam

* ci(flink): quote reserved words year, month, table

* test(flink): make warnings filter match more beams

* ci(flink): mark Flink backend tests as serial-only

* ci: turn on verbose mode in serial backend testing

* test(flink): remove header for Flink backend tests

* ci: use platform-agnostic way to delete first line

* ci: swap double with single quotes for here-string

* ci(flink): exclude Python 3.11 tests for `Backend`

* revert: turn off verbose mode in serial backend testing

Refs: ad387c1

* test(flink): mark STRUCT (i.e. ROW) type `notimpl`

* test(flink): specify unimplemented join operations

* chore: exclude flink from the operation support matrix for now

* feat(flink): implement a minimal PyFlink `Backend`

* ci: don't unnecessarily generate Python 3.10 stuff

* test(flink): mark `notyet` for semi and anti joins

* chore: undo artifacts of merge

---------

Co-authored-by: Phillip Cloud <[email protected]>
  • Loading branch information
deepyaman and cpcloud authored Aug 22, 2023
1 parent 46d0e33 commit 74f69fa
Show file tree
Hide file tree
Showing 15 changed files with 173 additions and 254 deletions.
119 changes: 44 additions & 75 deletions .github/workflows/ibis-backends.yml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,15 @@ jobs:
- oracle
services:
- oracle
- name: flink
title: Flink
serial: true
extras:
- flink
additional_deps:
- apache-flink
even_more_deps:
- pandas~=1.5
exclude:
- os: windows-latest
backend:
Expand Down Expand Up @@ -296,6 +305,17 @@ jobs:
- oracle
services:
- oracle
- python-version: "3.11"
backend:
name: flink
title: Flink
serial: true
extras:
- flink
additional_deps:
- apache-flink
even_more_deps:
- pandas~=1.5
steps:
- name: update and install system dependencies
if: matrix.os == 'ubuntu-latest' && matrix.backend.sys-deps != null
Expand Down Expand Up @@ -329,6 +349,25 @@ jobs:
with:
python-version: ${{ matrix.python-version }}

- name: delete header rows on ubuntu
if: matrix.os == 'ubuntu-latest' && matrix.backend.name == 'flink'
run: |
python -c "exec(\"TEST_FILES = ['functional_alltypes', 'diamonds', 'batting', 'awards_players']\nfor file in TEST_FILES:\n with open(f'ci/ibis-testing-data/csv/{file}.csv', 'r+') as f:\n f.readline()\n data=f.read()\n f.seek(0)\n f.write(data)\n f.truncate()\n\")"
- name: delete header rows on windows
if: matrix.os == 'windows-latest' && matrix.backend.name == 'flink'
run: |
python -c @"
TEST_FILES = ['functional_alltypes', 'diamonds', 'batting', 'awards_players']
for file in TEST_FILES:
with open(f'ci/ibis-testing-data/csv/{file}.csv', 'r+') as f:
f.readline()
data=f.read()
f.seek(0)
f.write(data)
f.truncate()
"@
- name: download poetry lockfile
uses: actions/download-artifact@v3
with:
Expand Down Expand Up @@ -363,6 +402,11 @@ jobs:
if: matrix.backend.additional_deps != null
run: poetry run pip install "${{ join(matrix.backend.additional_deps, ' ') }}"

# FIXME(deepyaman)
- name: install even more deps
if: matrix.backend.even_more_deps != null
run: poetry run pip install "${{ join(matrix.backend.even_more_deps, ' ') }}"

- name: show installed deps
run: poetry run pip list

Expand Down Expand Up @@ -774,80 +818,6 @@ jobs:
with:
flags: backend,${{ matrix.backend.name }},${{ runner.os }},python-${{ steps.install_python.outputs.python-version }}

test_flink:
name: Flink ${{ matrix.os }} python-${{ matrix.python-version }}
runs-on: ${{ matrix.os }}
needs:
- gen_lockfile_backends
strategy:
fail-fast: false
matrix:
os:
- ubuntu-latest
- windows-latest
python-version:
- "3.9"
- "3.10"
steps:
- name: checkout
uses: actions/checkout@v3

- name: install python
uses: actions/setup-python@v4
id: install_python
with:
python-version: ${{ matrix.python-version }}

- name: download poetry lockfile
uses: actions/download-artifact@v3
with:
name: backend-deps-${{ matrix.python-version }}
path: deps

- name: pull out lockfile
shell: bash
run: |
set -euo pipefail
mv -f deps/* .
rm -r deps
- uses: syphar/restore-pip-download-cache@v1
with:
requirement_files: poetry.lock
custom_cache_key_element: ${{ steps.install_python.outputs.python-version }}

- name: install poetry
run: python -m pip install --upgrade pip 'poetry<1.4'

- uses: syphar/restore-virtualenv@v1
with:
requirement_files: poetry.lock
custom_cache_key_element: flink-${{ steps.install_python.outputs.python-version }}

- name: install ibis
run: poetry install --without dev --without docs --extras flink

# TODO(deepyaman): Remove step once Ibis and Flink are compatible.
- name: install pyflink
run: poetry run pip install apache-flink

- name: show installed deps
run: poetry run pip list

- name: "run parallel tests: flink"
run: poetry run pytest --junitxml=junit.xml --cov=ibis --cov-report=xml:coverage.xml ibis/backends/flink/tests --numprocesses auto --dist=loadgroup

- name: check that no untracked files were produced
shell: bash
run: git checkout poetry.lock pyproject.toml && ! git status --porcelain | tee /dev/stderr | grep .

- name: upload code coverage
if: success()
uses: codecov/codecov-action@v3
with:
flags: backend,flink,${{ runner.os }},python-${{ steps.install_python.outputs.python-version }}

backends:
# this job exists so that we can use a single job from this workflow to gate merging
runs-on: ubuntu-latest
Expand All @@ -856,6 +826,5 @@ jobs:
- test_backends
- test_backends_sqlalchemy2
- test_pyspark
- test_flink
steps:
- run: exit 0
90 changes: 90 additions & 0 deletions ci/schema/flink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
DROP TABLE IF EXISTS functional_alltypes;

CREATE TABLE functional_alltypes (
id INT,
bool_col BOOLEAN,
tinyint_col TINYINT,
smallint_col SMALLINT,
int_col INT,
bigint_col BIGINT,
float_col FLOAT,
double_col DOUBLE,
date_string_col VARCHAR,
string_col VARCHAR,
timestamp_col TIMESTAMP,
`year` INT,
`month` INT
) WITH (
'connector' = 'filesystem',
'path' = 'file:///{data_dir}/csv/functional_alltypes.csv',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true'
);

DROP TABLE IF EXISTS diamonds;

CREATE TABLE diamonds (
carat DOUBLE,
cut VARCHAR,
color VARCHAR,
clarity VARCHAR,
depth DOUBLE,
`table` DOUBLE,
price BIGINT,
x DOUBLE,
y DOUBLE,
z DOUBLE
) WITH (
'connector' = 'filesystem',
'path' = 'file:///{data_dir}/csv/diamonds.csv',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true'
);

DROP TABLE IF EXISTS batting;

CREATE TABLE batting (
playerID VARCHAR,
yearID BIGINT,
stint BIGINT,
teamID VARCHAR,
lgID VARCHAR,
G BIGINT,
AB BIGINT,
R BIGINT,
H BIGINT,
X2B BIGINT,
X3B BIGINT,
HR BIGINT,
RBI BIGINT,
SB BIGINT,
CS BIGINT,
BB BIGINT,
SO BIGINT,
IBB BIGINT,
HBP BIGINT,
SH BIGINT,
SF BIGINT,
GIDP BIGINT
) WITH (
'connector' = 'filesystem',
'path' = 'file:///{data_dir}/csv/batting.csv',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true'
);

DROP TABLE IF EXISTS awards_players;

CREATE TABLE awards_players (
playerID VARCHAR,
awardID VARCHAR,
yearID BIGINT,
lgID VARCHAR,
tie VARCHAR,
notes VARCHAR
) WITH (
'connector' = 'filesystem',
'path' = 'file:///{data_dir}/csv/awards_players.csv',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true'
);
11 changes: 11 additions & 0 deletions ibis/backends/flink/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from functools import lru_cache
from typing import TYPE_CHECKING, Any

import pyflink.version
Expand Down Expand Up @@ -267,3 +268,13 @@ def drop_view(
raise exc.IntegrityError(f"View {name} does not exist.")

# TODO(deepyaman): Support (and differentiate) permanent views.

@classmethod
@lru_cache
def _get_operations(cls):
translator = cls.compiler.translator_class
return translator._registry.keys()

@classmethod
def has_operation(cls, operation: type[ops.Value]) -> bool:
return operation in cls._get_operations()
32 changes: 20 additions & 12 deletions ibis/backends/flink/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,28 @@
from __future__ import annotations

from typing import Any

import pytest

import ibis
import ibis.expr.types as ir
from ibis.backends.conftest import TEST_TABLES
from ibis.backends.tests.base import BackendTest, RoundAwayFromZero


class TestConf(BackendTest, RoundAwayFromZero):
supports_structs = False
deps = ("pyflink",)

@staticmethod
def connect(*, tmpdir, worker_id, **kw: Any):
from pyflink.table import EnvironmentSettings, TableEnvironment

env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
return ibis.flink.connect(table_env, **kw)

def _load_data(self, **_: Any) -> None:
for stmt in self.ddl_script:
self.connection._t_env.execute_sql(stmt.format(data_dir=self.data_dir))


@pytest.fixture
Expand All @@ -27,13 +45,3 @@ def simple_schema():
@pytest.fixture
def simple_table(simple_schema):
return ibis.table(simple_schema, name="table")


@pytest.fixture
def batting() -> ir.Table:
return ibis.table(schema=TEST_TABLES["batting"], name="batting")


@pytest.fixture
def awards_players() -> ir.Table:
return ibis.table(schema=TEST_TABLES["awards_players"], name="awards_players")

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 74f69fa

Please sign in to comment.