Skip to content

Commit

Permalink
test(flink): add TestConf (without common tests)
Browse files Browse the repository at this point in the history
* build(flink): update extra requirements and update

* feat(flink): implement a minimal PyFlink `Backend`

* fix(flink): pass unqualified name to DatabaseTable

* fix(flink): ignore `limit` to fix `Expr.execute()`

* test(flink): configure `pytest` plugin for PyFlink

* ci(flink): exclude Flink backend from some linting

* ci(flink): post-install `apache-flink` for testing

* ci(flink): test Python 3.10 (supported by PyFlink)

* test(flink): add `TestConf`, and enable test suite

* test(flink): re-install pandas~=1.5 for ArrowDtype

* feat(flink): implement `has_operation` for backend

* ci(flink): remove the separate `test_flink` config

* chore(flink): remove join tests and snapshot filess

* test(flink): mark unnest operation not implemented

* fix(flink): `deps` should be import names, not pip

* test(flink): ignore deprecation warning, from Beam

* ci(flink): quote reserved words year, month, table

* test(flink): make warnings filter match more beams

* ci(flink): mark Flink backend tests as serial-only

* ci: turn on verbose mode in serial backend testing

* test(flink): remove header for Flink backend tests

* ci: use platform-agnostic way to delete first line

* ci: swap double with single quotes for here-string

* ci(flink): exclude Python 3.11 tests for `Backend`

* revert: turn off verbose mode in serial backend testing

Refs: ad387c1

* test(flink): mark STRUCT (i.e. ROW) type `notimpl`

* test(flink): specify unimplemented join operations

* chore: exclude flink from the operation support matrix for now

* feat(flink): implement a minimal PyFlink `Backend`

* ci: don't unnecessarily generate Python 3.10 stuff

* test(flink): mark `notyet` for semi and anti joins

* chore: undo artifacts of merge

---------

Co-authored-by: Phillip Cloud <[email protected]>

test(flink): mark `dot_sql_notimpl` for `Backend`

test(flink): mark `ROWID` test as `notimpl` (operation isn't defined)

test(flink): mark `to_pyarrow`/`to_pyarrow_batches` `notimpl`

test(flink): mark rest of export operations not implemented

test(flink): don't run `test_array` until memtable

test(flink): add information to make `test_network` work

test(flink): don't test dataframe interchange (no to_pyarrow)

test(flink): mark `test_examples.py` `notimpl`

feat(flink): implement support for UUID literals

ci(flink): exclude common tests from `TestConf` PR

chore(flink): undo changes in common test directories

refactor(flink): rename `t_env` arg to `table_env`

test(flink): load from parquet with pandas instead of from CSV

ci(flink): remove header-removal steps from workflow
  • Loading branch information
deepyaman committed Aug 23, 2023
1 parent 476a659 commit 7cdab98
Show file tree
Hide file tree
Showing 13 changed files with 88 additions and 270 deletions.
108 changes: 32 additions & 76 deletions .github/workflows/ibis-backends.yml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,15 @@ jobs:
- oracle
services:
- oracle
- name: flink
title: Flink
serial: true
extras:
- flink
additional_deps:
- apache-flink
even_more_deps:
- pandas~=1.5
exclude:
- os: windows-latest
backend:
Expand Down Expand Up @@ -296,6 +305,17 @@ jobs:
- oracle
services:
- oracle
- python-version: "3.11"
backend:
name: flink
title: Flink
serial: true
extras:
- flink
additional_deps:
- apache-flink
even_more_deps:
- pandas~=1.5
steps:
- name: update and install system dependencies
if: matrix.os == 'ubuntu-latest' && matrix.backend.sys-deps != null
Expand Down Expand Up @@ -363,6 +383,11 @@ jobs:
if: matrix.backend.additional_deps != null
run: poetry run pip install "${{ join(matrix.backend.additional_deps, ' ') }}"

# FIXME(deepyaman)
- name: install even more deps
if: matrix.backend.even_more_deps != null
run: poetry run pip install "${{ join(matrix.backend.even_more_deps, ' ') }}"

- name: show installed deps
run: poetry run pip list

Expand All @@ -382,7 +407,13 @@ jobs:
IBIS_EXAMPLES_DATA: ${{ runner.temp }}/examples-${{ matrix.backend.name }}-${{ matrix.os }}-${{ steps.install_python.outputs.python-version }}

- name: "run serial tests: ${{ matrix.backend.name }}"
if: matrix.backend.serial && matrix.backend.name != 'impala'
if: matrix.backend.serial && matrix.backend.name == 'flink'
run: just ci-check -m ${{ matrix.backend.name }} ibis/backends/flink/tests
env:
IBIS_EXAMPLES_DATA: ${{ runner.temp }}/examples-${{ matrix.backend.name }}-${{ matrix.os }}-${{ steps.install_python.outputs.python-version }}

- name: "run serial tests: ${{ matrix.backend.name }}"
if: matrix.backend.serial && matrix.backend.name != 'impala' && matrix.backend.name != 'flink'
run: just ci-check -m ${{ matrix.backend.name }}
env:
IBIS_EXAMPLES_DATA: ${{ runner.temp }}/examples-${{ matrix.backend.name }}-${{ matrix.os }}-${{ steps.install_python.outputs.python-version }}
Expand Down Expand Up @@ -774,80 +805,6 @@ jobs:
with:
flags: backend,${{ matrix.backend.name }},${{ runner.os }},python-${{ steps.install_python.outputs.python-version }}

test_flink:
name: Flink ${{ matrix.os }} python-${{ matrix.python-version }}
runs-on: ${{ matrix.os }}
needs:
- gen_lockfile_backends
strategy:
fail-fast: false
matrix:
os:
- ubuntu-latest
- windows-latest
python-version:
- "3.9"
- "3.10"
steps:
- name: checkout
uses: actions/checkout@v3

- name: install python
uses: actions/setup-python@v4
id: install_python
with:
python-version: ${{ matrix.python-version }}

- name: download poetry lockfile
uses: actions/download-artifact@v3
with:
name: backend-deps-${{ matrix.python-version }}
path: deps

- name: pull out lockfile
shell: bash
run: |
set -euo pipefail
mv -f deps/* .
rm -r deps
- uses: syphar/restore-pip-download-cache@v1
with:
requirement_files: poetry.lock
custom_cache_key_element: ${{ steps.install_python.outputs.python-version }}

- name: install poetry
run: python -m pip install --upgrade pip 'poetry<1.4'

- uses: syphar/restore-virtualenv@v1
with:
requirement_files: poetry.lock
custom_cache_key_element: flink-${{ steps.install_python.outputs.python-version }}

- name: install ibis
run: poetry install --without dev --without docs --extras flink

# TODO(deepyaman): Remove step once Ibis and Flink are compatible.
- name: install pyflink
run: poetry run pip install apache-flink

- name: show installed deps
run: poetry run pip list

- name: "run parallel tests: flink"
run: poetry run pytest --junitxml=junit.xml --cov=ibis --cov-report=xml:coverage.xml ibis/backends/flink/tests --numprocesses auto --dist=loadgroup

- name: check that no untracked files were produced
shell: bash
run: git checkout poetry.lock pyproject.toml && ! git status --porcelain | tee /dev/stderr | grep .

- name: upload code coverage
if: success()
uses: codecov/codecov-action@v3
with:
flags: backend,flink,${{ runner.os }},python-${{ steps.install_python.outputs.python-version }}

backends:
# this job exists so that we can use a single job from this workflow to gate merging
runs-on: ubuntu-latest
Expand All @@ -856,6 +813,5 @@ jobs:
- test_backends
- test_backends_sqlalchemy2
- test_pyspark
- test_flink
steps:
- run: exit 0
41 changes: 26 additions & 15 deletions ibis/backends/flink/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from functools import lru_cache
from typing import TYPE_CHECKING, Any

import pyflink.version
Expand Down Expand Up @@ -29,37 +30,37 @@ class Backend(BaseBackend, CanListDatabases):
supports_temporary_tables = True
supports_python_udfs = True

def do_connect(self, t_env: TableEnvironment) -> None:
def do_connect(self, table_env: TableEnvironment) -> None:
"""Create a Flink `Backend` for use with Ibis.
Parameters
----------
t_env
table_env
A table environment
Examples
--------
>>> import ibis
>>> from pyflink.table import EnvironmentSettings, TableEnvironment
>>> t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
>>> ibis.flink.connect(t_env)
>>> table_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
>>> ibis.flink.connect(table_env)
<ibis.backends.flink.Backend at 0x...>
"""
self._t_env = t_env
self._table_env = table_env

def list_databases(self, like: str | None = None) -> list[str]:
databases = self._t_env.list_databases()
databases = self._table_env.list_databases()
return self._filter_with_like(databases, like)

@property
def current_database(self) -> str:
return self._t_env.get_current_database()
return self._table_env.get_current_database()

def list_tables(
self, like: str | None = None, database: str | None = None
) -> list[str]:
tables = self._t_env._j_tenv.listTables(
self._t_env.get_current_catalog(), database or self.current_database
tables = self._table_env._j_tenv.listTables(
self._table_env.get_current_catalog(), database or self.current_database
)
return self._filter_with_like(tables, like)

Expand Down Expand Up @@ -111,7 +112,7 @@ def get_schema(self, table_name: str, database: str | None = None) -> sch.Schema
Ibis schema
"""
qualified_name = self._fully_qualified_name(table_name, database)
table = self._t_env.from_path(qualified_name)
table = self._table_env.from_path(qualified_name)
schema = table.get_schema()
return sch.Schema.from_pyarrow(
create_arrow_schema(schema.get_field_names(), schema.get_field_data_types())
Expand All @@ -137,7 +138,7 @@ def execute(self, expr: ir.Expr, **kwargs: Any) -> Any:
"""Execute an expression."""
table_expr = expr.as_table()
sql = self.compile(table_expr, **kwargs)
df = self._t_env.sql_query(sql).to_pandas()
df = self._table_env.sql_query(sql).to_pandas()

# TODO: remove the extra conversion
return expr.__pandas_result__(table_expr.__pandas_result__(df))
Expand Down Expand Up @@ -187,9 +188,9 @@ def create_table(
obj = obj.to_pandas()
if isinstance(obj, pd.DataFrame):
qualified_name = self._fully_qualified_name(name, database)
table = self._t_env.from_pandas(obj)
table = self._table_env.from_pandas(obj)
# FIXME(deepyaman): Create a catalog table, not a temp view.
self._t_env.create_temporary_view(qualified_name, table)
self._table_env.create_temporary_view(qualified_name, table)
else:
raise NotImplementedError # TODO(deepyaman)

Expand All @@ -214,7 +215,7 @@ def drop_table(
If `False`, an exception is raised if the table does not exist.
"""
qualified_name = self._fully_qualified_name(name, database)
if not (self._t_env.drop_temporary_table(qualified_name) or force):
if not (self._table_env.drop_temporary_table(qualified_name) or force):
raise exc.IntegrityError(f"Table {name} does not exist.")

# TODO(deepyaman): Support (and differentiate) permanent tables.
Expand Down Expand Up @@ -263,7 +264,17 @@ def drop_view(
If `False`, an exception is raised if the view does not exist.
"""
qualified_name = self._fully_qualified_name(name, database)
if not (self._t_env.drop_temporary_view(qualified_name) or force):
if not (self._table_env.drop_temporary_view(qualified_name) or force):
raise exc.IntegrityError(f"View {name} does not exist.")

# TODO(deepyaman): Support (and differentiate) permanent views.

@classmethod
@lru_cache
def _get_operations(cls):
translator = cls.compiler.translator_class
return translator._registry.keys()

@classmethod
def has_operation(cls, operation: type[ops.Value]) -> bool:
return operation in cls._get_operations()
34 changes: 23 additions & 11 deletions ibis/backends/flink/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,32 @@
from __future__ import annotations

from typing import Any

import pytest

import ibis
import ibis.expr.types as ir
from ibis.backends.conftest import TEST_TABLES
from ibis.backends.tests.base import BackendTest, RoundAwayFromZero


class TestConf(BackendTest, RoundAwayFromZero):
supports_structs = False
deps = "pandas", "pyflink"

@staticmethod
def connect(*, tmpdir, worker_id, **kw: Any):
from pyflink.table import EnvironmentSettings, TableEnvironment

env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
return ibis.flink.connect(table_env, **kw)

def _load_data(self, **_: Any) -> None:
import pandas as pd

for table_name in TEST_TABLES:
path = self.data_dir / "parquet" / f"{table_name}.parquet"
self.connection.create_table(table_name, pd.read_parquet(path))


@pytest.fixture
Expand All @@ -27,13 +49,3 @@ def simple_schema():
@pytest.fixture
def simple_table(simple_schema):
return ibis.table(simple_schema, name="table")


@pytest.fixture
def batting() -> ir.Table:
return ibis.table(schema=TEST_TABLES["batting"], name="batting")


@pytest.fixture
def awards_players() -> ir.Table:
return ibis.table(schema=TEST_TABLES["awards_players"], name="awards_players")

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 7cdab98

Please sign in to comment.