Skip to content

Commit

Permalink
Release 0.3.4
Browse files Browse the repository at this point in the history
Co-authored-by: Michał Bartoszkiewicz <[email protected]>
Co-authored-by: Jan Chorowski <[email protected]>
Co-authored-by: Xavier Gendre <[email protected]>
Co-authored-by: Adrian Kosowski <[email protected]>
Co-authored-by: Jakub Kowalski <[email protected]>
Co-authored-by: Sergey Kulik <[email protected]>
Co-authored-by: Mateusz Lewandowski <[email protected]>
Co-authored-by: Mohamed Malhou <[email protected]>
Co-authored-by: Krzysztof Nowicki <[email protected]>
Co-authored-by: Richard Pelgrim <[email protected]>
Co-authored-by: Kamil Piechowiak <[email protected]>
Co-authored-by: Paweł Podhajski <[email protected]>
Co-authored-by: Olivier Ruas <[email protected]>
Co-authored-by: Przemysław Uznański <[email protected]>
Co-authored-by: Sebastian Włudzik <[email protected]>
GitOrigin-RevId: 8bf711247a07692d3403c797159218d1f5c0aa2d
  • Loading branch information
16 people committed Sep 18, 2023
1 parent 1fbd828 commit 29cf81a
Show file tree
Hide file tree
Showing 40 changed files with 556 additions and 164 deletions.
39 changes: 21 additions & 18 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ jobs:
with:
name: pathway-arm64
path: ./target/wheels/

- name: Upload artifact
if: ${{ matrix.os == needs.start-runner.outputs.label }}
uses: actions/upload-artifact@v3
Expand Down Expand Up @@ -153,13 +153,14 @@ jobs:
- name: Install and verify Linux package
run: |
set -ex
ENV_NAME="env_"${{ matrix.python-version }}""
rm -rf $ENV_NAME
python -m venv ${ENV_NAME}
source ${ENV_NAME}/bin/activate
pip install --prefer-binary ./wheels/pathway-*.whl
pip"${{ matrix.python-version }}" install py
python -m py --confcutdir $ENV_NAME --pyargs pathway
ENV_NAME="testenv_${{ matrix.python-version }}"
rm -rf "${ENV_NAME}"
python -m venv "${ENV_NAME}"
source "${ENV_NAME}/bin/activate"
WHEEL=(public/pathway/target/wheels/pathway-*.whl)
pip install --prefer-binary "${WHEEL}[tests]"
# --confcutdir anything below to avoid picking REPO_TOP_DIR/conftest.py
python -m pytest --confcutdir "${ENV_NAME}" --doctest-modules --pyargs pathway
Verify_ARM_ARCH:
needs:
Expand Down Expand Up @@ -196,18 +197,20 @@ jobs:
run: |
set -ex
# Pathway http monitoring set port
ENV_NAME="env_${{ matrix.python-version }}"
rm -rf $ENV_NAME
python"${{ matrix.python-version }}" -m venv ${ENV_NAME}
source ${ENV_NAME}/bin/activate
pip"${{ matrix.python-version }}" install --prefer-binary ./wheels/pathway-*.whl
pip"${{ matrix.python-version }}" install py
python"${{ matrix.python-version }}" -m py --confcutdir $ENV_NAME --pyargs pathway
source .github/workflows/bash_scripts/PATHWAY_MONITORING_HTTP_PORT.sh
ENV_NAME="testenv_${{ matrix.python-version }}"
rm -rf "${ENV_NAME}"
python"${{ matrix.python-version }}" -m venv "${ENV_NAME}"
source "${ENV_NAME}/bin/activate"
WHEEL=(public/pathway/target/wheels/pathway-*.whl)
pip install --prefer-binary "${WHEEL}[tests]"
# --confcutdir anything below to avoid picking REPO_TOP_DIR/conftest.py
python -m pytest --confcutdir "${ENV_NAME}" --doctest-modules --pyargs pathway
env:
MACOSX_DEPLOYMENT_TARGET: "10.15"
DEVELOPER_DIR: /Library/Developer/CommandLineTools
SDKROOT: /Library/Developer/CommandLineTools/SDKs/MacOSX.sdk

- name: post cleanup
run: rm -rf ./wheels

Expand Down Expand Up @@ -257,7 +260,7 @@ jobs:
path: .

- name: Create Release
uses: ncipollo/[email protected]
uses: ncipollo/[email protected]
with:
draft: true
artifacts: "./wheels/*.whl"
Expand All @@ -270,7 +273,7 @@ jobs:
with:
password: ${{ secrets.PYPI_TOKEN }}
packages-dir: './wheels/'

- name: Publish package to s3
uses: prewk/s3-cp-action@v2
with:
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]

## [0.3.4] - 2023-09-18

### Fixed
- Incompatible `beartype` version is now excluded from dependencies.

## [0.3.3] - 2023-09-14

### Added
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pathway"
version = "0.3.3"
version = "0.3.4"
edition = "2021"
publish = false
rust-version = "1.71.0"
Expand Down
72 changes: 72 additions & 0 deletions integration_tests/s3/test_s3_interops.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import pathlib
import time
import uuid

import boto3
import pandas as pd
Expand Down Expand Up @@ -300,3 +301,74 @@ class InputSchema(pw.Schema):
output_contents = read_jsonlines_fields(output_path, ["key", "value"])
output_contents.sort(key=lambda entry: entry["key"])
assert output_contents == third_input_part


def test_s3_bytes_read(tmp_path: pathlib.Path):
input_path = (
f"integration_tests/test_s3_bytes_read/{time.time()}-{uuid.uuid4()}/input.txt"
)
input_full_contents = "abc\n\ndef\nghi\njkl"
output_path = tmp_path / "output.json"

put_aws_object(input_path, input_full_contents)
table = pw.io.s3.read(
input_path,
aws_s3_settings=pw.io.s3_csv.AwsS3Settings(
bucket_name="aws-integrationtest",
access_key="AKIAX67C7K343BP4QUWN",
secret_access_key=os.environ["AWS_S3_SECRET_ACCESS_KEY"],
region="eu-central-1",
),
format="binary",
mode="static",
autocommit_duration_ms=1000,
)
pw.io.jsonlines.write(table, output_path)
pw.run()

with open(output_path, "r") as f:
result = json.load(f)
assert result["data"] == [ord(c) for c in input_full_contents]


def test_s3_empty_bytes_read(tmp_path: pathlib.Path):
base_path = (
f"integration_tests/test_s3_empty_bytes_read/{time.time()}-{uuid.uuid4()}/"
)

put_aws_object(base_path + "input", "")
put_aws_object(base_path + "input2", "")

table = pw.io.s3.read(
base_path,
aws_s3_settings=pw.io.s3_csv.AwsS3Settings(
bucket_name="aws-integrationtest",
access_key="AKIAX67C7K343BP4QUWN",
secret_access_key=os.environ["AWS_S3_SECRET_ACCESS_KEY"],
region="eu-central-1",
),
format="binary",
mode="static",
autocommit_duration_ms=1000,
)

rows = []

def on_change(key, row, time, is_addition):
rows.append(row)

def on_end(*args, **kwargs):
pass

pw.io.subscribe(table, on_change=on_change, on_end=on_end)
pw.run()

assert (
rows
== [
{
"data": b"",
}
]
* 2
)
8 changes: 7 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,18 @@ dependencies = [
"pyarrow >= 10.0.0",
"requests >= 2.31.0",
"python-sat >= 0.1.8.dev",
"beartype >= 0.14.0",
"beartype >= 0.14.0, < 0.16.0",
"rich >= 12.6.0",
"diskcache >= 5.2.1",
"exceptiongroup >= 1.1.3; python_version < '3.11'",
]

[project.optional-dependencies]
tests = [
"pytest == 7.4.0",
"pytest-xdist == 3.3.1",
]

[project.urls]
"Homepage" = "https://pathway.com/"
"Source code" = "https://github.com/pathwaycom/pathway/"
Expand Down
17 changes: 17 additions & 0 deletions python/pathway/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,20 @@
def parse_graph_teardown():
yield
parse_graph.G.clear()


@pytest.fixture(autouse=True)
def environment_variables(monkeypatch):
monkeypatch.setenv("KAFKA_USERNAME", "pathway")
monkeypatch.setenv("KAFKA_PASSWORD", "Pallas'sCat")
monkeypatch.setenv("BEARER_TOKEN", "42")
monkeypatch.setenv("MINIO_S3_ACCESS_KEY", "Otocolobus")
monkeypatch.setenv("MINIO_S3_SECRET_ACCESS_KEY", "manul")
monkeypatch.setenv("S3_ACCESS_KEY", "Otocolobus")
monkeypatch.setenv("S3_SECRET_ACCESS_KEY", "manul")
monkeypatch.setenv("DO_S3_ACCESS_KEY", "Otocolobus")
monkeypatch.setenv("DO_S3_SECRET_ACCESS_KEY", "manul")
monkeypatch.setenv("WASABI_S3_ACCESS_KEY", "Otocolobus")
monkeypatch.setenv("WASABI_S3_SECRET_ACCESS_KEY", "manul")
monkeypatch.setenv("OVH_S3_ACCESS_KEY", "Otocolobus")
monkeypatch.setenv("OVH_S3_SECRET_ACCESS_KEY", "manul")
4 changes: 4 additions & 0 deletions python/pathway/engine.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ class ConnectorMode(Enum):
SIMPLE_STREAMING: ConnectorMode
STREAMING_WITH_DELETIONS: ConnectorMode

class ReadMethod(Enum):
BY_LINE: ReadMethod
FULL: ReadMethod

class Universe:
@property
def id_column(self) -> Column: ...
Expand Down
1 change: 1 addition & 0 deletions python/pathway/internals/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
int,
float,
str,
bytes,
bool,
BasePointer,
datetime.datetime,
Expand Down
12 changes: 11 additions & 1 deletion python/pathway/io/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from pathway.internals import api
from pathway.internals import dtype as dt
from pathway.internals._io_helpers import _form_value_fields
from pathway.internals.api import ConnectorMode, PathwayType
from pathway.internals.api import ConnectorMode, PathwayType, ReadMethod
from pathway.internals.schema import ColumnDefinition, Schema, SchemaProperties

STATIC_MODE_NAME = "static"
Expand All @@ -28,6 +28,7 @@
"plaintext": "identity",
"json": "jsonlines",
"raw": "identity",
"binary": "identity",
}

_PATHWAY_TYPE_MAPPING: Dict[PathwayType, Any] = {
Expand All @@ -49,6 +50,7 @@
"json",
"plaintext",
"raw",
"binary",
]
)

Expand Down Expand Up @@ -91,6 +93,12 @@ def internal_connector_mode(mode: str | api.ConnectorMode) -> api.ConnectorMode:
return internal_mode


def internal_read_method(format: str) -> ReadMethod:
if format == "binary":
return ReadMethod.FULL
return ReadMethod.BY_LINE


class CsvParserSettings:
"""Class representing settings for the CSV parser."""

Expand Down Expand Up @@ -248,6 +256,7 @@ def construct_schema_and_data_format(
format_type=data_format_type,
key_field_names=None,
value_fields=[api.ValueField("data", PathwayType.ANY)],
parse_utf8=(format != "binary"),
)
schema, api_schema = read_schema(
schema=schema,
Expand Down Expand Up @@ -300,6 +309,7 @@ def construct_s3_data_storage(
path=path,
aws_s3_settings=rust_engine_s3_settings,
mode=internal_connector_mode(mode),
read_method=internal_read_method(format),
persistent_id=persistent_id,
)

Expand Down
5 changes: 1 addition & 4 deletions python/pathway/io/csv/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,14 @@ def read(
use the `pw.io.csv.read` method:
>>> import pathway as pw
...
>>> class InputSchema(pw.Schema):
... owner: str
... pet: str
...
>>> t = pw.io.csv.read("dataset.csv", schema=InputSchema, mode="static")
Then, you can output the table in order to check the correctness of the read:
>>> pw.debug.compute_and_print(t, include_id=False)
>>> pw.debug.compute_and_print(t, include_id=False) # doctest: +SKIP
owner pet
Alice dog
Bob dog
Expand All @@ -119,7 +117,6 @@ def read(
>>> class InputSchema(pw.Schema):
... ip: str
... login: str
...
>>> t = pw.io.csv.read("logs/", schema=InputSchema, mode="static")
The only difference is that you specified the name of the directory instead of the
Expand Down
5 changes: 2 additions & 3 deletions python/pathway/io/debezium/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,12 @@ def read(
Now, using the settings you can set up a connector. It is as simple as:
>>> import pathway as pw
>>> class InputSchema(pw.Schema):
... id: str = pw.column_definition(primary_key=True)
... age: int
... owner: str
... pet: str
>>> t = pw.io.debezium.read(
... rdkafka_settings,
... topic_name="pets",
Expand All @@ -123,7 +122,7 @@ def read(
data_storage = api.DataStorage(
storage_type="kafka",
rdkafka_settings=rdkafka_settings,
topics=[topic_name],
topic=topic_name,
persistent_id=persistent_id,
)
schema, data_format_definition = read_schema(
Expand Down
7 changes: 4 additions & 3 deletions python/pathway/io/elasticsearch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,19 @@ def write(table: Table, host: str, auth: ElasticSearchAuth, index_name: str) ->
Now suppose we want to send a Pathway table pets to this local instance of
Elasticsearch.
>>> import pathway as pw
>>> pets = pw.debug.parse_to_table("age owner pet \\n 1 10 Alice dog \\n 2 9 Bob cat \\n 3 8 Alice cat")
It can be done as follows:
>>> import pathway as pw
>>> t = pw.io.elasticsearch.write(
>>> pw.io.elasticsearch.write(
... table=pets,
... host="http://localhost:9200",
... auth=pw.io.elasticsearch.ElasticSearchAuth.basic("admin", "admin"),
... index_name="animals",
... )
All the updates of table t will be indexed to "animals" as well.
All the updates of table ```pets`` will be indexed to "animals" as well.
"""

data_storage = api.DataStorage(
Expand Down
Loading

0 comments on commit 29cf81a

Please sign in to comment.