From 29cf81a383288a73e5b55bf190708bda49b145f9 Mon Sep 17 00:00:00 2001 From: Manul from Pathway Date: Mon, 18 Sep 2023 10:47:12 +0200 Subject: [PATCH] Release 0.3.4 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Michał Bartoszkiewicz Co-authored-by: Jan Chorowski Co-authored-by: Xavier Gendre Co-authored-by: Adrian Kosowski Co-authored-by: Jakub Kowalski Co-authored-by: Sergey Kulik Co-authored-by: Mateusz Lewandowski Co-authored-by: Mohamed Malhou Co-authored-by: Krzysztof Nowicki Co-authored-by: Richard Pelgrim Co-authored-by: Kamil Piechowiak Co-authored-by: Paweł Podhajski Co-authored-by: Olivier Ruas Co-authored-by: Przemysław Uznański Co-authored-by: Sebastian Włudzik GitOrigin-RevId: 8bf711247a07692d3403c797159218d1f5c0aa2d --- .github/workflows/release.yml | 39 ++++---- CHANGELOG.md | 5 + Cargo.lock | 2 +- Cargo.toml | 2 +- integration_tests/s3/test_s3_interops.py | 72 +++++++++++++++ pyproject.toml | 8 +- python/pathway/conftest.py | 17 ++++ python/pathway/engine.pyi | 4 + python/pathway/internals/api.py | 1 + python/pathway/io/_utils.py | 12 ++- python/pathway/io/csv/__init__.py | 5 +- python/pathway/io/debezium/__init__.py | 5 +- python/pathway/io/elasticsearch/__init__.py | 7 +- python/pathway/io/fs/__init__.py | 7 +- python/pathway/io/http/__init__.py | 18 ++-- python/pathway/io/jsonlines/__init__.py | 16 ++-- python/pathway/io/kafka/__init__.py | 17 ++-- python/pathway/io/logstash/__init__.py | 2 +- python/pathway/io/minio/__init__.py | 2 - python/pathway/io/null/__init__.py | 2 +- python/pathway/io/postgres/__init__.py | 10 +- python/pathway/io/redpanda/__init__.py | 22 ++--- python/pathway/io/s3/__init__.py | 11 +-- python/pathway/io/s3_csv/__init__.py | 5 +- python/pathway/tests/test_io.py | 50 ++++++++++ src/connectors/data_format.rs | 69 ++++++++------ src/connectors/data_storage.rs | 96 ++++++++++++++------ src/engine/value.rs | 14 +++ src/python_api.rs | 61 +++++++++++-- tests/data/binary | Bin 0 -> 1024 bytes tests/data/empty | 0 tests/data/empty_files/1 | 0 tests/data/empty_files/2 | 0 tests/data/empty_files/3 | 0 tests/test_bytes.rs | 96 ++++++++++++++++++++ tests/test_connector_field_defaults.rs | 6 +- tests/test_debezium.rs | 3 +- tests/test_dsv.rs | 10 +- tests/test_jsonlines.rs | 13 ++- tests/test_seek.rs | 11 ++- 40 files changed, 556 insertions(+), 164 deletions(-) create mode 100644 tests/data/binary create mode 100644 tests/data/empty create mode 100644 tests/data/empty_files/1 create mode 100644 tests/data/empty_files/2 create mode 100644 tests/data/empty_files/3 create mode 100644 tests/test_bytes.rs diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index a2143e61..300aef18 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -122,7 +122,7 @@ jobs: with: name: pathway-arm64 path: ./target/wheels/ - + - name: Upload artifact if: ${{ matrix.os == needs.start-runner.outputs.label }} uses: actions/upload-artifact@v3 @@ -153,13 +153,14 @@ jobs: - name: Install and verify Linux package run: | set -ex - ENV_NAME="env_"${{ matrix.python-version }}"" - rm -rf $ENV_NAME - python -m venv ${ENV_NAME} - source ${ENV_NAME}/bin/activate - pip install --prefer-binary ./wheels/pathway-*.whl - pip"${{ matrix.python-version }}" install py - python -m py --confcutdir $ENV_NAME --pyargs pathway + ENV_NAME="testenv_${{ matrix.python-version }}" + rm -rf "${ENV_NAME}" + python -m venv "${ENV_NAME}" + source "${ENV_NAME}/bin/activate" + WHEEL=(public/pathway/target/wheels/pathway-*.whl) + pip install --prefer-binary "${WHEEL}[tests]" + # --confcutdir anything below to avoid picking REPO_TOP_DIR/conftest.py + python -m pytest --confcutdir "${ENV_NAME}" --doctest-modules --pyargs pathway Verify_ARM_ARCH: needs: @@ -196,18 +197,20 @@ jobs: run: | set -ex # Pathway http monitoring set port - ENV_NAME="env_${{ matrix.python-version }}" - rm -rf $ENV_NAME - python"${{ matrix.python-version }}" -m venv ${ENV_NAME} - source ${ENV_NAME}/bin/activate - pip"${{ matrix.python-version }}" install --prefer-binary ./wheels/pathway-*.whl - pip"${{ matrix.python-version }}" install py - python"${{ matrix.python-version }}" -m py --confcutdir $ENV_NAME --pyargs pathway + source .github/workflows/bash_scripts/PATHWAY_MONITORING_HTTP_PORT.sh + ENV_NAME="testenv_${{ matrix.python-version }}" + rm -rf "${ENV_NAME}" + python"${{ matrix.python-version }}" -m venv "${ENV_NAME}" + source "${ENV_NAME}/bin/activate" + WHEEL=(public/pathway/target/wheels/pathway-*.whl) + pip install --prefer-binary "${WHEEL}[tests]" + # --confcutdir anything below to avoid picking REPO_TOP_DIR/conftest.py + python -m pytest --confcutdir "${ENV_NAME}" --doctest-modules --pyargs pathway env: MACOSX_DEPLOYMENT_TARGET: "10.15" DEVELOPER_DIR: /Library/Developer/CommandLineTools SDKROOT: /Library/Developer/CommandLineTools/SDKs/MacOSX.sdk - + - name: post cleanup run: rm -rf ./wheels @@ -257,7 +260,7 @@ jobs: path: . - name: Create Release - uses: ncipollo/release-action@v1.12.0 + uses: ncipollo/release-action@v1.12.0 with: draft: true artifacts: "./wheels/*.whl" @@ -270,7 +273,7 @@ jobs: with: password: ${{ secrets.PYPI_TOKEN }} packages-dir: './wheels/' - + - name: Publish package to s3 uses: prewk/s3-cp-action@v2 with: diff --git a/CHANGELOG.md b/CHANGELOG.md index 92fb0abd..02ad4272 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +## [0.3.4] - 2023-09-18 + +### Fixed +- Incompatible `beartype` version is now excluded from dependencies. + ## [0.3.3] - 2023-09-14 ### Added diff --git a/Cargo.lock b/Cargo.lock index 74186311..e9a91bc8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1446,7 +1446,7 @@ dependencies = [ [[package]] name = "pathway" -version = "0.3.3" +version = "0.3.4" dependencies = [ "arc-swap", "arcstr", diff --git a/Cargo.toml b/Cargo.toml index ada39a8e..12546056 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pathway" -version = "0.3.3" +version = "0.3.4" edition = "2021" publish = false rust-version = "1.71.0" diff --git a/integration_tests/s3/test_s3_interops.py b/integration_tests/s3/test_s3_interops.py index 35001ce5..cd039916 100644 --- a/integration_tests/s3/test_s3_interops.py +++ b/integration_tests/s3/test_s3_interops.py @@ -4,6 +4,7 @@ import os import pathlib import time +import uuid import boto3 import pandas as pd @@ -300,3 +301,74 @@ class InputSchema(pw.Schema): output_contents = read_jsonlines_fields(output_path, ["key", "value"]) output_contents.sort(key=lambda entry: entry["key"]) assert output_contents == third_input_part + + +def test_s3_bytes_read(tmp_path: pathlib.Path): + input_path = ( + f"integration_tests/test_s3_bytes_read/{time.time()}-{uuid.uuid4()}/input.txt" + ) + input_full_contents = "abc\n\ndef\nghi\njkl" + output_path = tmp_path / "output.json" + + put_aws_object(input_path, input_full_contents) + table = pw.io.s3.read( + input_path, + aws_s3_settings=pw.io.s3_csv.AwsS3Settings( + bucket_name="aws-integrationtest", + access_key="AKIAX67C7K343BP4QUWN", + secret_access_key=os.environ["AWS_S3_SECRET_ACCESS_KEY"], + region="eu-central-1", + ), + format="binary", + mode="static", + autocommit_duration_ms=1000, + ) + pw.io.jsonlines.write(table, output_path) + pw.run() + + with open(output_path, "r") as f: + result = json.load(f) + assert result["data"] == [ord(c) for c in input_full_contents] + + +def test_s3_empty_bytes_read(tmp_path: pathlib.Path): + base_path = ( + f"integration_tests/test_s3_empty_bytes_read/{time.time()}-{uuid.uuid4()}/" + ) + + put_aws_object(base_path + "input", "") + put_aws_object(base_path + "input2", "") + + table = pw.io.s3.read( + base_path, + aws_s3_settings=pw.io.s3_csv.AwsS3Settings( + bucket_name="aws-integrationtest", + access_key="AKIAX67C7K343BP4QUWN", + secret_access_key=os.environ["AWS_S3_SECRET_ACCESS_KEY"], + region="eu-central-1", + ), + format="binary", + mode="static", + autocommit_duration_ms=1000, + ) + + rows = [] + + def on_change(key, row, time, is_addition): + rows.append(row) + + def on_end(*args, **kwargs): + pass + + pw.io.subscribe(table, on_change=on_change, on_end=on_end) + pw.run() + + assert ( + rows + == [ + { + "data": b"", + } + ] + * 2 + ) diff --git a/pyproject.toml b/pyproject.toml index 3299ae3f..c3a1500d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,12 +23,18 @@ dependencies = [ "pyarrow >= 10.0.0", "requests >= 2.31.0", "python-sat >= 0.1.8.dev", - "beartype >= 0.14.0", + "beartype >= 0.14.0, < 0.16.0", "rich >= 12.6.0", "diskcache >= 5.2.1", "exceptiongroup >= 1.1.3; python_version < '3.11'", ] +[project.optional-dependencies] +tests = [ + "pytest == 7.4.0", + "pytest-xdist == 3.3.1", +] + [project.urls] "Homepage" = "https://pathway.com/" "Source code" = "https://github.com/pathwaycom/pathway/" diff --git a/python/pathway/conftest.py b/python/pathway/conftest.py index 92d70df5..e360b2f5 100644 --- a/python/pathway/conftest.py +++ b/python/pathway/conftest.py @@ -11,3 +11,20 @@ def parse_graph_teardown(): yield parse_graph.G.clear() + + +@pytest.fixture(autouse=True) +def environment_variables(monkeypatch): + monkeypatch.setenv("KAFKA_USERNAME", "pathway") + monkeypatch.setenv("KAFKA_PASSWORD", "Pallas'sCat") + monkeypatch.setenv("BEARER_TOKEN", "42") + monkeypatch.setenv("MINIO_S3_ACCESS_KEY", "Otocolobus") + monkeypatch.setenv("MINIO_S3_SECRET_ACCESS_KEY", "manul") + monkeypatch.setenv("S3_ACCESS_KEY", "Otocolobus") + monkeypatch.setenv("S3_SECRET_ACCESS_KEY", "manul") + monkeypatch.setenv("DO_S3_ACCESS_KEY", "Otocolobus") + monkeypatch.setenv("DO_S3_SECRET_ACCESS_KEY", "manul") + monkeypatch.setenv("WASABI_S3_ACCESS_KEY", "Otocolobus") + monkeypatch.setenv("WASABI_S3_SECRET_ACCESS_KEY", "manul") + monkeypatch.setenv("OVH_S3_ACCESS_KEY", "Otocolobus") + monkeypatch.setenv("OVH_S3_SECRET_ACCESS_KEY", "manul") diff --git a/python/pathway/engine.pyi b/python/pathway/engine.pyi index 68c53f5a..db120334 100644 --- a/python/pathway/engine.pyi +++ b/python/pathway/engine.pyi @@ -35,6 +35,10 @@ class ConnectorMode(Enum): SIMPLE_STREAMING: ConnectorMode STREAMING_WITH_DELETIONS: ConnectorMode +class ReadMethod(Enum): + BY_LINE: ReadMethod + FULL: ReadMethod + class Universe: @property def id_column(self) -> Column: ... diff --git a/python/pathway/internals/api.py b/python/pathway/internals/api.py index 8159bfca..da902b96 100644 --- a/python/pathway/internals/api.py +++ b/python/pathway/internals/api.py @@ -17,6 +17,7 @@ int, float, str, + bytes, bool, BasePointer, datetime.datetime, diff --git a/python/pathway/io/_utils.py b/python/pathway/io/_utils.py index 154a854e..9b143a28 100644 --- a/python/pathway/io/_utils.py +++ b/python/pathway/io/_utils.py @@ -10,7 +10,7 @@ from pathway.internals import api from pathway.internals import dtype as dt from pathway.internals._io_helpers import _form_value_fields -from pathway.internals.api import ConnectorMode, PathwayType +from pathway.internals.api import ConnectorMode, PathwayType, ReadMethod from pathway.internals.schema import ColumnDefinition, Schema, SchemaProperties STATIC_MODE_NAME = "static" @@ -28,6 +28,7 @@ "plaintext": "identity", "json": "jsonlines", "raw": "identity", + "binary": "identity", } _PATHWAY_TYPE_MAPPING: Dict[PathwayType, Any] = { @@ -49,6 +50,7 @@ "json", "plaintext", "raw", + "binary", ] ) @@ -91,6 +93,12 @@ def internal_connector_mode(mode: str | api.ConnectorMode) -> api.ConnectorMode: return internal_mode +def internal_read_method(format: str) -> ReadMethod: + if format == "binary": + return ReadMethod.FULL + return ReadMethod.BY_LINE + + class CsvParserSettings: """Class representing settings for the CSV parser.""" @@ -248,6 +256,7 @@ def construct_schema_and_data_format( format_type=data_format_type, key_field_names=None, value_fields=[api.ValueField("data", PathwayType.ANY)], + parse_utf8=(format != "binary"), ) schema, api_schema = read_schema( schema=schema, @@ -300,6 +309,7 @@ def construct_s3_data_storage( path=path, aws_s3_settings=rust_engine_s3_settings, mode=internal_connector_mode(mode), + read_method=internal_read_method(format), persistent_id=persistent_id, ) diff --git a/python/pathway/io/csv/__init__.py b/python/pathway/io/csv/__init__.py index 40a4dd8e..1feffccc 100644 --- a/python/pathway/io/csv/__init__.py +++ b/python/pathway/io/csv/__init__.py @@ -83,16 +83,14 @@ def read( use the `pw.io.csv.read` method: >>> import pathway as pw - ... >>> class InputSchema(pw.Schema): ... owner: str ... pet: str - ... >>> t = pw.io.csv.read("dataset.csv", schema=InputSchema, mode="static") Then, you can output the table in order to check the correctness of the read: - >>> pw.debug.compute_and_print(t, include_id=False) + >>> pw.debug.compute_and_print(t, include_id=False) # doctest: +SKIP owner pet Alice dog Bob dog @@ -119,7 +117,6 @@ def read( >>> class InputSchema(pw.Schema): ... ip: str ... login: str - ... >>> t = pw.io.csv.read("logs/", schema=InputSchema, mode="static") The only difference is that you specified the name of the directory instead of the diff --git a/python/pathway/io/debezium/__init__.py b/python/pathway/io/debezium/__init__.py index cb6db618..6647f315 100644 --- a/python/pathway/io/debezium/__init__.py +++ b/python/pathway/io/debezium/__init__.py @@ -100,13 +100,12 @@ def read( Now, using the settings you can set up a connector. It is as simple as: - + >>> import pathway as pw >>> class InputSchema(pw.Schema): ... id: str = pw.column_definition(primary_key=True) ... age: int ... owner: str ... pet: str - >>> t = pw.io.debezium.read( ... rdkafka_settings, ... topic_name="pets", @@ -123,7 +122,7 @@ def read( data_storage = api.DataStorage( storage_type="kafka", rdkafka_settings=rdkafka_settings, - topics=[topic_name], + topic=topic_name, persistent_id=persistent_id, ) schema, data_format_definition = read_schema( diff --git a/python/pathway/io/elasticsearch/__init__.py b/python/pathway/io/elasticsearch/__init__.py index 75ffba78..1c653703 100644 --- a/python/pathway/io/elasticsearch/__init__.py +++ b/python/pathway/io/elasticsearch/__init__.py @@ -74,18 +74,19 @@ def write(table: Table, host: str, auth: ElasticSearchAuth, index_name: str) -> Now suppose we want to send a Pathway table pets to this local instance of Elasticsearch. + >>> import pathway as pw + >>> pets = pw.debug.parse_to_table("age owner pet \\n 1 10 Alice dog \\n 2 9 Bob cat \\n 3 8 Alice cat") It can be done as follows: - >>> import pathway as pw - >>> t = pw.io.elasticsearch.write( + >>> pw.io.elasticsearch.write( ... table=pets, ... host="http://localhost:9200", ... auth=pw.io.elasticsearch.ElasticSearchAuth.basic("admin", "admin"), ... index_name="animals", ... ) - All the updates of table t will be indexed to "animals" as well. + All the updates of table ```pets`` will be indexed to "animals" as well. """ data_storage = api.DataStorage( diff --git a/python/pathway/io/fs/__init__.py b/python/pathway/io/fs/__init__.py index 56808797..67db0a09 100644 --- a/python/pathway/io/fs/__init__.py +++ b/python/pathway/io/fs/__init__.py @@ -17,6 +17,7 @@ construct_connector_properties, construct_schema_and_data_format, internal_connector_mode, + internal_read_method, ) SUPPORTED_OUTPUT_FORMATS: Set[str] = set( @@ -106,16 +107,14 @@ def read( use the ``pw.io.fs.read`` method: >>> import pathway as pw - ... >>> class InputSchema(pw.Schema): ... owner: str ... pet: str - ... >>> t = pw.io.fs.read("dataset.csv", format="csv", schema=InputSchema) Then, you can output the table in order to check the correctness of the read: - >>> pw.debug.compute_and_print(t, include_id=False) + >>> pw.debug.compute_and_print(t, include_id=False) # doctest: +SKIP owner pet Alice dog Bob dog @@ -156,7 +155,6 @@ def read( >>> class InputSchema(pw.Schema): ... ip: str ... login: str - ... >>> t = pw.io.fs.read("logs/", format="csv", schema=InputSchema) The only difference is that you specified the name of the directory instead of the @@ -216,6 +214,7 @@ def read( storage_type="fs", path=fspath(path), mode=internal_connector_mode(mode), + read_method=internal_read_method(format), persistent_id=persistent_id, ) diff --git a/python/pathway/io/http/__init__.py b/python/pathway/io/http/__init__.py index b29afd2c..3e380eaf 100644 --- a/python/pathway/io/http/__init__.py +++ b/python/pathway/io/http/__init__.py @@ -87,11 +87,12 @@ def read( Raw format: + >>> import os >>> import pathway as pw - ... table = pw.io.http.read( + >>> table = pw.io.http.read( ... "https://localhost:8000/stream", ... method="GET", - ... headers={"Authorization": f"Bearer {BEARER_TOKEN}"}, + ... headers={"Authorization": f"Bearer {os.environ['BEARER_TOKEN']}"}, ... format="raw", ... ) @@ -103,15 +104,13 @@ def read( >>> def mapper(msg: bytes) -> bytes: ... result = json.loads(msg.decode()) ... return json.dumps({"key": result["id"], "text": result["data"]}).encode() - ... >>> class InputSchema(pw.Schema): ... key: int ... text: str - ... >>> t = pw.io.http.read( ... "https://localhost:8000/stream", ... method="GET", - ... headers={"Authorization": f"Bearer {BEARER_TOKEN}"}, + ... headers={"Authorization": f"Bearer {os.environ['BEARER_TOKEN']}"}, ... schema=InputSchema, ... response_mapper=mapper ... ) @@ -204,10 +203,7 @@ def write( just two columns: the pet and the owner's name. >>> import pathway as pw - >>> pw.debug.compute_and_print(t, include_id=False) - owner pet - Alice cat - Bob dog + >>> pets = pw.debug.parse_to_table("owner pet \\n Alice dog \\n Bob cat \\n Alice cat") Consider that there is a need to send the stream of changes on such table to the external API endpoint (let's pick some exemplary URL for the sake of demonstation). @@ -216,7 +212,7 @@ def write( are sent in POST requests. Then, the communication can be done with a simple code snippet: - >>> t = pw.io.http.write(pets, "http://www.example.com/api/event") + >>> pw.io.http.write(pets, "http://www.example.com/api/event") Now let's do something more custom. Suppose that the API endpoint requires us to communicate via PUT method and to pass the values as CGI-parameters. In this case, @@ -239,7 +235,7 @@ def write( ... "time={table.time}", ... "diff={table.diff}", ... ] - ... message_template = "\\t".join(message_template_tokens) + >>> message_template = "\\t".join(message_template_tokens) Now, we can use this template and the custom format, this way: diff --git a/python/pathway/io/jsonlines/__init__.py b/python/pathway/io/jsonlines/__init__.py index 221d6b64..28105b39 100644 --- a/python/pathway/io/jsonlines/__init__.py +++ b/python/pathway/io/jsonlines/__init__.py @@ -88,21 +88,19 @@ def read( use the ``pw.io.jsonlines.read`` method: >>> import pathway as pw - >>> class InputSchema(pw.Schema): ... owner: str ... pet: str - ... >>> t = pw.io.jsonlines.read("dataset.jsonlines", schema=InputSchema, mode="static") Then, you can output the table in order to check the correctness of the read: - >>> pw.debug.compute_and_print(t, include_id=False) - id | owner | pet - 1 | Alice | dog - 2 | Bob | dog - 3 | Bob | cat - 4 | Bob | cat + >>> pw.debug.compute_and_print(t, include_id=False) # doctest: +SKIP + owner | pet + Alice | dog + Bob | dog + Bob | cat + Bob | cat Now let's try something different. Consider you have site access logs stored in a @@ -126,7 +124,6 @@ def read( >>> class InputSchema(pw.Schema): ... ip: str ... login: str - ... >>> t = pw.io.jsonlines.read("logs/", schema=InputSchema, mode="static") The only difference is that you specified the name of the directory instead of the @@ -143,7 +140,6 @@ def read( >>> class InputSchema(pw.Schema): ... ip: str ... login: str - ... >>> t = pw.io.jsonlines.read("logs/", schema=InputSchema, mode="streaming") With this method, you obtain a table updated dynamically. The changes in the logs would incur diff --git a/python/pathway/io/kafka/__init__.py b/python/pathway/io/kafka/__init__.py index 07e3f142..6a4a8a3d 100644 --- a/python/pathway/io/kafka/__init__.py +++ b/python/pathway/io/kafka/__init__.py @@ -136,7 +136,6 @@ def read( >>> class InputSchema(pw.Schema): ... owner: str ... pet: str - ... >>> t = pw.io.kafka.read( ... rdkafka_settings, ... topic="animals", @@ -159,7 +158,7 @@ def read( This way, you get a table which looks as follows: - >>> pw.debug.compute_and_print(t, include_id=False) + >>> pw.debug.compute_and_print(t, include_id=False) # doctest: +SKIP owner pet Alice cat Bob dog @@ -184,7 +183,7 @@ def read( This way, you get a table which looks as follows: - >>> pw.debug.compute_and_print(t, include_id=False) + >>> pw.debug.compute_and_print(t, include_id=False) # doctest: +SKIP owner pet Alice cat Bob dog @@ -210,17 +209,15 @@ def read( use JSON Pointer and do a connector, which gets the data as follows: >>> import pathway as pw - >>> class InputSchema(pw.Schema): ... pet_name: str ... pet_height: int - ... >>> t = pw.io.kafka.read( ... rdkafka_settings, ... topic="animals", ... format="json", ... schema=InputSchema, - ... column_paths={ + ... json_field_paths={ ... "pet_name": "/pet/name", ... "pet_height": "/pet/measurements/1" ... }, @@ -513,13 +510,17 @@ def write( ... } You want to send a Pathway table t to the Kafka instance. + + >>> import pathway as pw + >>> t = pw.debug.parse_to_table("age owner pet \\n 1 10 Alice dog \\n 2 9 Bob cat \\n 3 8 Alice cat") + To connect to the topic "animals" and send messages, the connector must be used \ as follows, depending on the format: JSON version: - >>> import pathway as pw - >>> t = pw.io.kafka.write( + >>> pw.io.kafka.write( + ... t, ... rdkafka_settings, ... "animals", ... format="json", diff --git a/python/pathway/io/logstash/__init__.py b/python/pathway/io/logstash/__init__.py index 068263ce..a8bccda2 100644 --- a/python/pathway/io/logstash/__init__.py +++ b/python/pathway/io/logstash/__init__.py @@ -60,7 +60,7 @@ def write( Now, with the pipeline configured, you can stream the changed into Logstash as simple as: - >>> pw.io.logstash.write(table, "http://localhost:8012") + >>> pw.io.logstash.write(table, "http://localhost:8012") # doctest: +SKIP """ http_write( diff --git a/python/pathway/io/minio/__init__.py b/python/pathway/io/minio/__init__.py index b7981863..fec86fa3 100644 --- a/python/pathway/io/minio/__init__.py +++ b/python/pathway/io/minio/__init__.py @@ -107,11 +107,9 @@ def read( >>> import os >>> import pathway as pw - ... >>> class InputSchema(pw.Schema): ... owner: str ... pet: str - ... >>> t = pw.io.minio.read( ... "animals/", ... minio_settings=pw.io.minio.MinIOSettings( diff --git a/python/pathway/io/null/__init__.py b/python/pathway/io/null/__init__.py index 20005625..e196fe32 100644 --- a/python/pathway/io/null/__init__.py +++ b/python/pathway/io/null/__init__.py @@ -30,7 +30,7 @@ def write(table: Table) -> None: If the table is ``table``, the null output can be configured in the following way: - >>> pw.io.null.write(table) + >>> pw.io.null.write(table) # doctest: +SKIP """ data_storage = api.DataStorage(storage_type="null") diff --git a/python/pathway/io/postgres/__init__.py b/python/pathway/io/postgres/__init__.py index 1e7e8e06..641b1583 100644 --- a/python/pathway/io/postgres/__init__.py +++ b/python/pathway/io/postgres/__init__.py @@ -82,10 +82,10 @@ def write(table: Table, postgres_settings: dict, table_name: str) -> None: Now, having done all the preparation, one can simply call: >>> pw.io.postgres.write( - t, - connection_string_parts, - "pets", - ) + ... t, + ... connection_string_parts, + ... "pets", + ... ) """ data_storage = api.DataStorage( storage_type="postgres", @@ -150,7 +150,7 @@ def write_snapshot( After the table is created, all you need is just to set up the output connector: >>> import pathway as pw - >>> pw.io.postgres.write_snapshot( + >>> pw.io.postgres.write_snapshot( # doctest: +SKIP ... stats, ... { ... "host": "localhost", diff --git a/python/pathway/io/redpanda/__init__.py b/python/pathway/io/redpanda/__init__.py index cab2138f..a25bc6cb 100644 --- a/python/pathway/io/redpanda/__init__.py +++ b/python/pathway/io/redpanda/__init__.py @@ -108,17 +108,17 @@ def read( CSV version: >>> import pathway as pw - ... + >>> >>> class InputSchema(pw.Schema): ... owner: str ... pet: str - ... + >>> >>> t = pw.io.redpanda.read( - rdkafka_settings, - topic="animals", - format="csv", - schema=InputSchema, - ) + ... rdkafka_settings, + ... topic="animals", + ... format="csv", + ... schema=InputSchema, + ... ) In case of CSV format, the first message must be the header: @@ -135,7 +135,7 @@ def read( This way, you get a table which looks as follows: - >>> pw.debug.compute_and_print(t, include_id=False) + >>> pw.debug.compute_and_print(t, include_id=False) # doctest: +SKIP owner pet Alice cat Bob dog @@ -160,7 +160,7 @@ def read( This way, you get a table which looks as follows: - >>> pw.debug.compute_and_print(t, include_id=False) + >>> pw.debug.compute_and_print(t, include_id=False) # doctest: +SKIP owner pet Alice cat Bob dog @@ -187,17 +187,15 @@ def read( >>> import pathway as pw - >>> class InputSchema(pw.Schema): ... pet_name: str ... pet_height: int - ... >>> t = pw.io.redpanda.read( ... rdkafka_settings, ... topic="animals", ... format="json", ... schema=InputSchema, - ... column_paths={ + ... json_field_paths={ ... "pet_name": "/pet/name", ... "pet_height": "/pet/measurements/1" ... }, diff --git a/python/pathway/io/s3/__init__.py b/python/pathway/io/s3/__init__.py index 5478f959..8e3aa776 100644 --- a/python/pathway/io/s3/__init__.py +++ b/python/pathway/io/s3/__init__.py @@ -169,7 +169,6 @@ def read( >>> class InputSchema(pw.Schema): ... owner: str ... pet: str - ... >>> t = pw.io.s3.read( ... "animals/", ... aws_s3_settings=pw.io.s3.AwsS3Settings( @@ -293,10 +292,9 @@ def read_from_digital_ocean( >>> class InputSchema(pw.Schema): ... owner: str ... pet: str - ... - >>> t = pw.io.s3_csv.read_from_digital_ocean( + >>> t = pw.io.s3.read_from_digital_ocean( ... "animals/", - ... do_s3_settings=pw.io.s3_csv.DigitalOceanS3Settings( + ... do_s3_settings=pw.io.s3.DigitalOceanS3Settings( ... bucket_name="datasets", ... region="ams3", ... access_key=os.environ["DO_S3_ACCESS_KEY"], @@ -402,10 +400,9 @@ def read_from_wasabi( >>> class InputSchema(pw.Schema): ... owner: str ... pet: str - ... - >>> t = pw.io.s3_csv.read_from_wasabi( + >>> t = pw.io.s3.read_from_wasabi( ... "animals/", - ... wasabi_s3_settings=pw.io.s3_csv.WasabiS3Settings( + ... wasabi_s3_settings=pw.io.s3.WasabiS3Settings( ... bucket_name="datasets", ... region="us-west-1", ... access_key=os.environ["WASABI_S3_ACCESS_KEY"], diff --git a/python/pathway/io/s3_csv/__init__.py b/python/pathway/io/s3_csv/__init__.py index d3a8afd2..a775a3a8 100644 --- a/python/pathway/io/s3_csv/__init__.py +++ b/python/pathway/io/s3_csv/__init__.py @@ -91,10 +91,9 @@ def read( >>> class InputSchema(pw.Schema): ... owner: str ... pet: str - ... >>> t = pw.io.s3_csv.read( ... "animals/", - ... aws_s3_settings=pw.io.s3_csv.AwsS3Settings.AwsS3Settings( + ... aws_s3_settings=pw.io.s3_csv.AwsS3Settings( ... bucket_name="datasets", ... region="eu-west-3", ... access_key=os.environ["S3_ACCESS_KEY"], @@ -113,7 +112,7 @@ def read( >>> import pathway as pw >>> t = pw.io.s3_csv.read( ... "animals/", - ... aws_s3_settings=pw.io.s3_csv.AwsS3Settings.AwsS3Settings( + ... aws_s3_settings=pw.io.s3_csv.AwsS3Settings( ... bucket_name="datasets", ... region="rbx", ... endpoint="s3.rbx.io.cloud.ovh.net", diff --git a/python/pathway/tests/test_io.py b/python/pathway/tests/test_io.py index 21683507..d6c858fc 100644 --- a/python/pathway/tests/test_io.py +++ b/python/pathway/tests/test_io.py @@ -1331,3 +1331,53 @@ class InputSchema(pw.Schema): inputs_thread.start() assert wait_result_with_checker(CsvLinesNumberChecker(output_path, 4), 30) + + +def test_bytes_read(tmp_path: pathlib.Path): + input_path = tmp_path / "input.txt" + input_full_contents = "abc\n\ndef\nghi" + output_path = tmp_path / "output.json" + write_lines(input_path, input_full_contents) + + table = pw.io.fs.read( + input_path, + format="binary", + mode="static", + autocommit_duration_ms=1000, + ) + pw.io.jsonlines.write(table, output_path) + pw.run() + + with open(output_path, "r") as f: + result = json.load(f) + assert result["data"] == [ord(c) for c in input_full_contents] + + +def test_binary_data_in_subscribe(tmp_path: pathlib.Path): + input_path = tmp_path / "input.txt" + input_full_contents = "abc\n\ndef\nghi" + write_lines(input_path, input_full_contents) + + table = pw.io.fs.read( + input_path, + format="binary", + mode="static", + autocommit_duration_ms=1000, + ) + + rows = [] + + def on_change(key, row, time, is_addition): + rows.append(row) + + def on_end(*args, **kwargs): + pass + + pw.io.subscribe(table, on_change=on_change, on_end=on_end) + pw.run() + + assert rows == [ + { + "data": input_full_contents.encode("utf-8"), + } + ] diff --git a/src/connectors/data_format.rs b/src/connectors/data_format.rs index 90a64b55..fd20f562 100644 --- a/src/connectors/data_format.rs +++ b/src/connectors/data_format.rs @@ -6,7 +6,6 @@ use std::fmt::Display; use std::io::Write; use std::iter::zip; use std::mem::take; -use std::str::FromStr; use std::str::{from_utf8, Utf8Error}; use crate::connectors::ReaderContext::{Diff, KeyValue, RawBytes, TokenizedEntries}; @@ -19,6 +18,8 @@ use serde::ser::{SerializeMap, Serializer}; use serde_json::json; use serde_json::Value as JsonValue; +const COMMIT_LITERAL: &str = "*COMMIT*"; + #[derive(Clone, Debug, Eq, PartialEq)] pub enum ParsedEvent { AdvanceTime, @@ -315,7 +316,7 @@ impl DsvParser { return Ok(Vec::new()); } - if line == "*COMMIT*" { + if line == COMMIT_LITERAL { return Ok(vec![ParsedEvent::AdvanceTime]); } @@ -343,7 +344,7 @@ impl DsvParser { fn parse_tokenized_entries(&mut self, event: DataEventType, tokens: &[String]) -> ParseResult { if tokens.len() == 1 { let line = &tokens[0]; - if line == "*COMMIT*" { + if line == COMMIT_LITERAL { return Ok(vec![ParsedEvent::AdvanceTime]); } } @@ -408,48 +409,55 @@ impl Parser for DsvParser { } } -pub struct IdentityParser {} +pub struct IdentityParser { + parse_utf8: bool, +} impl IdentityParser { - pub fn new() -> IdentityParser { - Self {} + pub fn new(parse_utf8: bool) -> IdentityParser { + Self { parse_utf8 } } -} -impl Default for IdentityParser { - fn default() -> Self { - Self::new() + fn prepare_bytes(&self, bytes: &[u8]) -> Result { + if self.parse_utf8 { + Ok(Value::String(prepare_plaintext_string(bytes)?.into())) + } else { + Ok(Value::Bytes(bytes.into())) + } } } impl Parser for IdentityParser { fn parse(&mut self, data: &ReaderContext) -> ParseResult { - let (event, key, line) = match data { - RawBytes(event, raw_bytes) => (*event, None, prepare_plaintext_string(raw_bytes)?), + let (event, key, value) = match data { + RawBytes(event, raw_bytes) => (*event, None, self.prepare_bytes(raw_bytes)?), KeyValue((_key, value)) => match value { - Some(bytes) => ( - DataEventType::Insert, - None, - prepare_plaintext_string(bytes)?, - ), + Some(bytes) => (DataEventType::Insert, None, self.prepare_bytes(bytes)?), None => return Err(ParseError::EmptyKafkaPayload), }, Diff((addition, key, values)) => ( *addition, key.as_ref().map(|k| vec![k.clone()]), - prepare_plaintext_string(values)?, + self.prepare_bytes(values)?, ), TokenizedEntries(_, _) => return Err(ParseError::UnsupportedReaderContext), }; - let event = match line.as_str() { - "*COMMIT*" => ParsedEvent::AdvanceTime, - line => { - let values = vec![Value::from_str(line).unwrap()]; - match event { - DataEventType::Insert => ParsedEvent::Insert((key, values)), - DataEventType::Delete => ParsedEvent::Delete((key, values)), - } + let is_commit = { + if let Value::String(arc_str) = &value { + arc_str.as_str() == COMMIT_LITERAL + } else { + false + } + }; + + let event = if is_commit { + ParsedEvent::AdvanceTime + } else { + let values = vec![value]; + match event { + DataEventType::Insert => ParsedEvent::Insert((key, values)), + DataEventType::Delete => ParsedEvent::Delete((key, values)), } }; @@ -579,6 +587,13 @@ fn serialize_value_to_json(value: &Value) -> Result { } Ok(JsonValue::Array(items)) } + Value::Bytes(b) => { + let mut items = Vec::with_capacity(b.len()); + for item in b.iter() { + items.push(json!(item)); + } + Ok(JsonValue::Array(items)) + } Value::IntArray(a) => { let mut items = Vec::with_capacity(a.len()); for item in a.iter() { @@ -855,7 +870,7 @@ impl Parser for JsonLinesParser { return Ok(vec![]); } - if line == "*COMMIT*" { + if line == COMMIT_LITERAL { return Ok(vec![ParsedEvent::AdvanceTime]); } diff --git a/src/connectors/data_storage.rs b/src/connectors/data_storage.rs index 02526f48..f485ceee 100644 --- a/src/connectors/data_storage.rs +++ b/src/connectors/data_storage.rs @@ -201,7 +201,7 @@ impl StorageType { StorageType::CsvFilesystem => CsvFilesystemReader::merge_two_frontiers(lhs, rhs), StorageType::Kafka => KafkaReader::merge_two_frontiers(lhs, rhs), StorageType::Python => PythonReader::merge_two_frontiers(lhs, rhs), - StorageType::S3Lines => S3LinesReader::merge_two_frontiers(lhs, rhs), + StorageType::S3Lines => S3GenericReader::merge_two_frontiers(lhs, rhs), } } } @@ -372,8 +372,27 @@ impl FileWriter { } } +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum ReadMethod { + ByLine, + Full, +} + +impl ReadMethod { + fn read_next_bytes(self, reader: &mut R, buf: &mut Vec) -> Result + where + R: BufRead, + { + match &self { + ReadMethod::ByLine => Ok(reader.read_until(b'\n', buf)?), + ReadMethod::Full => Ok(reader.read_to_end(buf)?), + } + } +} + pub struct FilesystemReader { persistent_id: Option, + read_method: ReadMethod, reader: Option>, filesystem_scanner: FilesystemScanner, @@ -385,6 +404,7 @@ impl FilesystemReader { path: impl Into, streaming_mode: ConnectorMode, persistent_id: Option, + read_method: ReadMethod, ) -> io::Result { let filesystem_scanner = FilesystemScanner::new(path, persistent_id, streaming_mode)?; @@ -394,6 +414,7 @@ impl FilesystemReader { reader: None, filesystem_scanner, total_entries_read: 0, + read_method, }) } } @@ -431,9 +452,9 @@ impl Reader for FilesystemReader { fn read(&mut self) -> Result { loop { if let Some(reader) = &mut self.reader { - let mut line = String::new(); - let len = reader.read_line(&mut line)?; - if len > 0 { + let mut line = Vec::new(); + let len = self.read_method.read_next_bytes(reader, &mut line)?; + if len > 0 || self.read_method == ReadMethod::Full { self.total_entries_read += 1; let offset = ( @@ -448,14 +469,17 @@ impl Reader for FilesystemReader { bytes_offset: reader.stream_position().unwrap(), }, ); + let data_event_type = self + .filesystem_scanner + .data_event_type() + .expect("scanner action can't be empty"); + + if self.read_method == ReadMethod::Full { + self.reader = None; + } return Ok(ReadResult::Data( - ReaderContext::from_raw_bytes( - self.filesystem_scanner - .data_event_type() - .expect("scanner action can't be empty"), - line.into_bytes(), - ), + ReaderContext::from_raw_bytes(data_event_type, line), offset, )); } @@ -1271,6 +1295,12 @@ pub trait PsqlSerializer { fn to_postgres_output(&self) -> String; } +impl PsqlSerializer for u8 { + fn to_postgres_output(&self) -> String { + self.to_string() + } +} + impl PsqlSerializer for i64 { fn to_postgres_output(&self) -> String { self.to_string() @@ -1307,6 +1337,7 @@ impl PsqlSerializer for Value { Value::String(s) => s.to_string(), Value::Pointer(value) => format!("{value:?}"), Value::Tuple(vals) => to_postgres_array(vals.iter()), + Value::Bytes(array) => to_postgres_array(array.iter()), Value::IntArray(array) => to_postgres_array(array.iter()), Value::FloatArray(array) => to_postgres_array(array.iter()), Value::DateTimeNaive(date_time) => date_time.to_string(), @@ -1320,6 +1351,7 @@ impl Writer for PsqlWriter { fn write(&mut self, data: FormatterContext) -> Result<(), WriteError> { let strs = Arena::new(); let strings = Arena::new(); + let bytes = Arena::new(); let chrono_datetimes = Arena::new(); let chrono_utc_datetimes = Arena::new(); @@ -1333,6 +1365,7 @@ impl Writer for PsqlWriter { Value::Int(i) => Ok(i), Value::Float(f) => Ok(f.as_ref()), Value::String(s) => Ok(strs.alloc(&**s)), + Value::Bytes(b) => Ok(bytes.alloc(&**b)), Value::DateTimeNaive(dt) => Ok(chrono_datetimes.alloc(dt.as_chrono_datetime())), Value::DateTimeUtc(dt) => { Ok(chrono_utc_datetimes.alloc(dt.as_chrono_datetime().and_utc())) @@ -1821,28 +1854,31 @@ impl Writer for NullWriter { } } -pub struct S3LinesReader { +pub struct S3GenericReader { s3_scanner: S3Scanner, poll_new_objects: bool, + read_method: ReadMethod, - line_reader: Option>, + reader: Option>, persistent_id: Option, total_entries_read: u64, current_bytes_read: u64, } -impl S3LinesReader { +impl S3GenericReader { pub fn new( bucket: S3Bucket, objects_prefix: impl Into, poll_new_objects: bool, persistent_id: Option, - ) -> S3LinesReader { - S3LinesReader { + read_method: ReadMethod, + ) -> S3GenericReader { + S3GenericReader { s3_scanner: S3Scanner::new(bucket, objects_prefix), poll_new_objects, + read_method, - line_reader: None, + reader: None, persistent_id, total_entries_read: 0, current_bytes_read: 0, @@ -1852,7 +1888,7 @@ impl S3LinesReader { fn stream_next_object(&mut self) -> Result { if let Some(pipe_reader) = self.s3_scanner.stream_next_object()? { self.current_bytes_read = 0; - self.line_reader = Some(BufReader::new(pipe_reader)); + self.reader = Some(BufReader::new(pipe_reader)); Ok(true) } else { Ok(false) @@ -1864,7 +1900,7 @@ impl S3LinesReader { } } -impl Reader for S3LinesReader { +impl Reader for S3GenericReader { fn seek(&mut self, frontier: &OffsetAntichain) -> Result<(), ReadError> { let offset_value = frontier.get_offset(&OffsetKey::Empty); let Some(OffsetValue::S3ObjectPosition { @@ -1887,8 +1923,10 @@ impl Reader for S3LinesReader { let mut reader = BufReader::new(pipe_reader); let mut bytes_read = 0; while bytes_read < *bytes_offset { - let mut current_line = String::new(); - let len = reader.read_line(&mut current_line)?; + let mut current_line = Vec::new(); + let len = self + .read_method + .read_next_bytes(&mut reader, &mut current_line)?; if len == 0 { break; } @@ -1905,18 +1943,18 @@ impl Reader for S3LinesReader { self.total_entries_read = *total_entries_read; self.current_bytes_read = bytes_read; - self.line_reader = Some(reader); + self.reader = Some(reader); Ok(()) } fn read(&mut self) -> Result { loop { - match &mut self.line_reader { - Some(line_reader) => { - let mut line = String::new(); - let len = line_reader.read_line(&mut line)?; - if len > 0 { + match &mut self.reader { + Some(reader) => { + let mut line = Vec::new(); + let len = self.read_method.read_next_bytes(reader, &mut line)?; + if len > 0 || self.read_method == ReadMethod::Full { self.total_entries_read += 1; self.current_bytes_read += len as u64; @@ -1929,8 +1967,12 @@ impl Reader for S3LinesReader { }, ); + if self.read_method == ReadMethod::Full { + self.reader = None; + } + return Ok(ReadResult::Data( - ReaderContext::from_raw_bytes(DataEventType::Insert, line.into_bytes()), // Currently no deletions for S3 + ReaderContext::from_raw_bytes(DataEventType::Insert, line), // Currently no deletions for S3 offset, )); } diff --git a/src/engine/value.rs b/src/engine/value.rs index 2d4631a6..3e5db363 100644 --- a/src/engine/value.rs +++ b/src/engine/value.rs @@ -149,6 +149,7 @@ pub enum Value { Float(OrderedFloat), Pointer(Key), String(ArcStr), + Bytes(Arc<[u8]>), Tuple(Arc<[Self]>), IntArray(Handle>), FloatArray(Handle>), @@ -259,6 +260,7 @@ impl Display for Value { Self::Float(OrderedFloat(f)) => write!(fmt, "{f}"), Self::Pointer(p) => write!(fmt, "{p:#}"), Self::String(s) => write!(fmt, "\"{}\"", s.escape_default()), + Self::Bytes(b) => write!(fmt, "{b:?}"), Self::Tuple(vals) => write!(fmt, "({})", vals.iter().format(", ")), Self::IntArray(array) => write!(fmt, "{array}"), Self::FloatArray(array) => write!(fmt, "{array}"), @@ -305,6 +307,12 @@ impl From<&str> for Value { } } +impl From<&[u8]> for Value { + fn from(b: &[u8]) -> Self { + Self::Bytes(b.into()) + } +} + impl From for Value { fn from(s: ArcStr) -> Self { Self::String(s) @@ -359,6 +367,8 @@ impl From for Value { } } +// Please only append to this list, as the values here are used in hashing, +// so changing them will result in changed IDs #[repr(u8)] #[derive(Debug, Copy, Clone)] pub enum SimpleType { @@ -374,6 +384,7 @@ pub enum SimpleType { DateTimeNaive, DateTimeUtc, Duration, + Bytes, } #[derive(Debug, Default, Clone, Copy)] @@ -385,6 +396,7 @@ pub enum Type { Float, Pointer, String, + Bytes, DateTimeNaive, DateTimeUtc, Duration, @@ -401,6 +413,7 @@ impl Value { Self::Float(_) => SimpleType::Float, Self::Pointer(_) => SimpleType::Pointer, Self::String(_) => SimpleType::String, + Self::Bytes(_) => SimpleType::Bytes, Self::Tuple(_) => SimpleType::Tuple, Self::IntArray(_) => SimpleType::IntArray, Self::FloatArray(_) => SimpleType::FloatArray, @@ -541,6 +554,7 @@ impl HashInto for Value { Self::Float(f) => f.hash_into(hasher), Self::Pointer(p) => p.hash_into(hasher), Self::String(s) => s.hash_into(hasher), + Self::Bytes(b) => b.hash_into(hasher), Self::Tuple(vals) => vals.hash_into(hasher), Self::IntArray(handle) => handle.hash_into(hasher), Self::FloatArray(handle) => handle.hash_into(hasher), diff --git a/src/python_api.rs b/src/python_api.rs index 0a623f43..d54c6c06 100644 --- a/src/python_api.rs +++ b/src/python_api.rs @@ -28,7 +28,7 @@ use pyo3::exceptions::{ PyValueError, PyZeroDivisionError, }; use pyo3::pyclass::CompareOp; -use pyo3::types::{PyBool, PyDict, PyFloat, PyInt, PyString, PyTuple, PyType}; +use pyo3::types::{PyBool, PyBytes, PyDict, PyFloat, PyInt, PyString, PyTuple, PyType}; use pyo3::{AsPyPointer, PyTypeInfo}; use rdkafka::consumer::{BaseConsumer, Consumer}; use rdkafka::producer::{DefaultProducerContext, ThreadedProducer}; @@ -56,8 +56,8 @@ use crate::connectors::data_format::{ }; use crate::connectors::data_storage::{ ConnectorMode, CsvFilesystemReader, ElasticSearchWriter, FileWriter, FilesystemReader, - KafkaReader, KafkaWriter, NullWriter, PsqlWriter, PythonReaderBuilder, ReaderBuilder, - S3CsvReader, S3LinesReader, Writer, + KafkaReader, KafkaWriter, NullWriter, PsqlWriter, PythonReaderBuilder, ReadMethod, + ReaderBuilder, S3CsvReader, S3GenericReader, Writer, }; use crate::engine::dataflow::config_from_env; use crate::engine::error::{DynError, DynResult, Trace as EngineTrace}; @@ -227,6 +227,12 @@ impl<'source> FromPyObject<'source> for Value { .expect("type conversion should work for str") .to_str()?, )) + } else if PyBytes::is_exact_type_of(ob) { + Ok(Value::from( + ob.downcast::() + .expect("type conversion should work for bytes") + .as_bytes(), + )) } else if PyInt::is_exact_type_of(ob) { Ok(Value::Int( ob.extract::() @@ -272,6 +278,8 @@ impl<'source> FromPyObject<'source> for Value { Ok(Value::Pointer(k)) } else if let Ok(s) = ob.downcast::() { Ok(s.to_str()?.into()) + } else if let Ok(bytes) = ob.downcast::() { + Ok(Value::Bytes(bytes.as_bytes().into())) } else if let Ok(t) = ob.extract::>() { Ok(Value::from(t.as_slice())) } else { @@ -311,6 +319,7 @@ impl ToPyObject for Value { Self::Float(f) => f.into_py(py), Self::Pointer(k) => k.into_py(py), Self::String(s) => s.into_py(py), + Self::Bytes(b) => PyBytes::new(py, b).into(), Self::Tuple(t) => PyTuple::new(py, t.iter()).into(), Self::IntArray(a) => PyArray::from_array(py, a).into(), Self::FloatArray(a) => PyArray::from_array(py, a).into(), @@ -351,6 +360,18 @@ impl IntoPy for Type { } } +impl<'source> FromPyObject<'source> for ReadMethod { + fn extract(ob: &'source PyAny) -> PyResult { + Ok(ob.extract::>()?.0) + } +} + +impl IntoPy for ReadMethod { + fn into_py(self, py: Python<'_>) -> PyObject { + PyReadMethod(self).into_py(py) + } +} + impl<'source> FromPyObject<'source> for ConnectorMode { fn extract(ob: &'source PyAny) -> PyResult { Ok(ob.extract::>()?.0) @@ -1102,6 +1123,17 @@ impl PathwayType { pub const ARRAY: Type = Type::Array; } +#[pyclass(module = "pathway.engine", frozen, name = "ReadMethod")] +pub struct PyReadMethod(ReadMethod); + +#[pymethods] +impl PyReadMethod { + #[classattr] + pub const BY_LINE: ReadMethod = ReadMethod::ByLine; + #[classattr] + pub const FULL: ReadMethod = ReadMethod::Full; +} + #[pyclass(module = "pathway.engine", frozen, name = "ConnectorMode")] pub struct PyConnectorMode(ConnectorMode); @@ -2978,6 +3010,7 @@ pub struct DataStorage { connection_string: Option, csv_parser_settings: Option>, mode: ConnectorMode, + read_method: ReadMethod, aws_s3_settings: Option>, elasticsearch_params: Option>, parallel_readers: Option, @@ -3082,6 +3115,7 @@ pub struct DataFormat { table_name: Option, column_paths: Option>, field_absence_is_error: bool, + parse_utf8: bool, } #[pymethods] @@ -3095,6 +3129,7 @@ impl DataStorage { connection_string = None, csv_parser_settings = None, mode = ConnectorMode::SimpleStreaming, + read_method = ReadMethod::ByLine, aws_s3_settings = None, elasticsearch_params = None, parallel_readers = None, @@ -3110,6 +3145,7 @@ impl DataStorage { connection_string: Option, csv_parser_settings: Option>, mode: ConnectorMode, + read_method: ReadMethod, aws_s3_settings: Option>, elasticsearch_params: Option>, parallel_readers: Option, @@ -3124,6 +3160,7 @@ impl DataStorage { connection_string, csv_parser_settings, mode, + read_method, aws_s3_settings, elasticsearch_params, parallel_readers, @@ -3145,7 +3182,9 @@ impl DataFormat { table_name = None, column_paths = None, field_absence_is_error = true, + parse_utf8 = true, ))] + #[allow(clippy::too_many_arguments)] fn new( format_type: String, key_field_names: Option>, @@ -3154,6 +3193,7 @@ impl DataFormat { table_name: Option, column_paths: Option>, field_absence_is_error: bool, + parse_utf8: bool, ) -> Self { DataFormat { format_type, @@ -3163,6 +3203,7 @@ impl DataFormat { table_name, column_paths, field_absence_is_error, + parse_utf8, } } } @@ -3314,16 +3355,21 @@ impl DataStorage { fn construct_reader(&self, py: pyo3::Python) -> PyResult<(Box, usize)> { match self.storage_type.as_ref() { "fs" => { - let storage = - FilesystemReader::new(self.path()?, self.mode, self.internal_persistent_id())?; + let storage = FilesystemReader::new( + self.path()?, + self.mode, + self.internal_persistent_id(), + self.read_method, + )?; Ok((Box::new(storage), 1)) } "s3" => { - let storage = S3LinesReader::new( + let storage = S3GenericReader::new( self.s3_bucket(py)?, self.path()?, self.mode.is_polling_enabled(), self.internal_persistent_id(), + self.read_method, ); Ok((Box::new(storage), 1)) } @@ -3560,7 +3606,7 @@ impl DataFormat { ); Ok(Box::new(parser)) } - "identity" => Ok(Box::new(IdentityParser::new())), + "identity" => Ok(Box::new(IdentityParser::new(self.parse_utf8))), _ => Err(PyValueError::new_err("Unknown data format")), } } @@ -3823,6 +3869,7 @@ fn module(_py: Python<'_>, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/tests/data/binary b/tests/data/binary new file mode 100644 index 0000000000000000000000000000000000000000..6f0b9c0f74d2344b122fcfe92076065e399ac8db GIT binary patch literal 1024 zcmV+b1poUy1}L+S@b_|)+O{7S%KrXELrmlCn*~7Z)n;MmTpo74sea>B}U1`*7aX#YsctB;J!w6nv zy4I_He!h+$B%&5_o|^TTK*jWBH35iKEKFpSBDr?EHT$|3fujotUs|H?;W33^#BkX(%0zeFD*d$T~Aq@Mm#R*F2qViK2YuulcD$OX7L~r|k#4az-popi8 zf=$X=&3}jFY;VygV`>G6+I;x!+w82tGv_nu%`l-r_-~;lv(?QJyz_G>l$b;6UL3cZ zr@hK`%>A9=dcN)b1uz4HE+>&T^`VQ$$a ziCnMvCX)o+pYpWs-npagikSmGD}8|#rV|(Mg9hKxiVtbG8Qxm4sf{qNf02>pYcOTN zWE@&Cw$i9pxVsv)bX80x5@0K~)XJw~-@RP*T#UI}?3=m|_J>^{Oa8pT|6@j%?sf`C zJkUM!J~}Y^%45TfB@Tio4B!wRndplAKKg(58Plshwzr@h2uYO&+MF`F)ExYj*Q^Kq zd?Hy&|4FPe|I~DYSVrszR!R?^ELLF_x9Qn&UI3@=b$8a!t;rNs&Pn&L(GpX)G;5PZ z2z03fr6un{W=^?Z*=%UwvJz2vk-D3nuq7p@=^wntg+L-zEg^01Acy*H6YO-xaU<2% zgRXClLX&5a?<_`xN+bT~Q)|z3C@bLN_dmOW2Kk5;P{oxTZ(+PcMZ&wr7Sb}qdDQ!R zE>81Ax&|JEWOz(G$H3||q)sHT(FYPBg};Ss5@$X3|L^pRY%!)FZvl3C!}`%jrG&6y zqkm>tRoF6oDlcTw$!X+~n-V!q!mXWbN9keaEyXXS@I+lkjiGW7wVvmm(rWm!fKC>j z8(|7+P339YL5!IuC5KZ1PBIJK&;&OgzOB@A8}(MOgtgcp1w9B8C(9*&MfiQeT3s?B zRg_(7O3l|`vi6^T`}fM3&Xs05s>?~MljHBk;gap39$yp_7sZyw+Ri791yg`7;51iN uEg7!sEEJojslIG+$l}T4U`xE&7*M&`aJ6tIMoC*K{ literal 0 HcmV?d00001 diff --git a/tests/data/empty b/tests/data/empty new file mode 100644 index 00000000..e69de29b diff --git a/tests/data/empty_files/1 b/tests/data/empty_files/1 new file mode 100644 index 00000000..e69de29b diff --git a/tests/data/empty_files/2 b/tests/data/empty_files/2 new file mode 100644 index 00000000..e69de29b diff --git a/tests/data/empty_files/3 b/tests/data/empty_files/3 new file mode 100644 index 00000000..e69de29b diff --git a/tests/test_bytes.rs b/tests/test_bytes.rs new file mode 100644 index 00000000..a6426dc4 --- /dev/null +++ b/tests/test_bytes.rs @@ -0,0 +1,96 @@ +use std::path::PathBuf; + +use pathway_engine::connectors::data_format::{IdentityParser, ParseResult, ParsedEvent, Parser}; +use pathway_engine::connectors::data_storage::{ + ConnectorMode, FilesystemReader, ReadMethod, ReadResult, Reader, +}; +use pathway_engine::engine::Value; + +fn read_bytes_from_path(path: &str) -> eyre::Result> { + let mut reader = FilesystemReader::new( + PathBuf::from(path), + ConnectorMode::Static, + None, + ReadMethod::Full, + )?; + let mut parser = IdentityParser::new(false); + let mut events = Vec::new(); + + loop { + let read_result = reader.read()?; + match read_result { + ReadResult::Data(bytes, _) => { + let row_parse_result: ParseResult = parser.parse(&bytes); + assert!(row_parse_result.is_ok()); + + for event in row_parse_result.expect("entries should parse correctly") { + events.push(event); + } + } + ReadResult::Finished => break, + ReadResult::NewSource => continue, + } + } + + Ok(events) +} + +#[test] +fn test_bytes_read_from_file() -> eyre::Result<()> { + let events = read_bytes_from_path("tests/data/binary")?; + assert_eq!(events.len(), 1); + Ok(()) +} + +#[test] +fn test_empty() -> eyre::Result<()> { + let events = read_bytes_from_path("tests/data/empty")?; + assert_eq!( + events, + vec![ParsedEvent::Insert(( + None, + vec![Value::Bytes(Vec::new().into())] + )),] + ); + + Ok(()) +} + +#[test] +fn test_empty_files_folder() -> eyre::Result<()> { + let events = read_bytes_from_path("tests/data/empty_files/")?; + assert_eq!( + events, + vec![ + ParsedEvent::Insert((None, vec![Value::Bytes(Vec::new().into())])), + ParsedEvent::Insert((None, vec![Value::Bytes(Vec::new().into())])), + ParsedEvent::Insert((None, vec![Value::Bytes(Vec::new().into())])), + ] + ); + + Ok(()) +} + +#[test] +fn test_bytes_read_from_folder() -> eyre::Result<()> { + let events = read_bytes_from_path("tests/data/csvdir")?; + assert_eq!( + events, + vec![ + ParsedEvent::Insert(( + None, + vec![Value::Bytes(b"key,foo\n1,abc\n2,def\n".to_vec().into())] + )), + ParsedEvent::Insert(( + None, + vec![Value::Bytes(b"key,foo\n3,ghi\n4,jkl\n".to_vec().into())] + )), + ParsedEvent::Insert(( + None, + vec![Value::Bytes(b"key,foo\n5,mno\n6,pqr\n".to_vec().into())] + )) + ] + ); + + Ok(()) +} diff --git a/tests/test_connector_field_defaults.rs b/tests/test_connector_field_defaults.rs index 07c7735e..baf5f391 100644 --- a/tests/test_connector_field_defaults.rs +++ b/tests/test_connector_field_defaults.rs @@ -8,7 +8,7 @@ use pathway_engine::connectors::data_format::{ DsvParser, DsvSettings, InnerSchemaField, JsonLinesParser, ParsedEvent, }; use pathway_engine::connectors::data_storage::{ - ConnectorMode, CsvFilesystemReader, FilesystemReader, + ConnectorMode, CsvFilesystemReader, FilesystemReader, ReadMethod, }; use pathway_engine::engine::{Type, Value}; @@ -205,6 +205,7 @@ fn test_jsonlines_fails_without_default() -> eyre::Result<()> { PathBuf::from("tests/data/jsonlines.txt"), ConnectorMode::Static, None, + ReadMethod::ByLine, )?; let parser = JsonLinesParser::new( Some(vec!["a".to_string()]), @@ -231,6 +232,7 @@ fn test_jsonlines_with_default() -> eyre::Result<()> { PathBuf::from("tests/data/jsonlines_with_skips.txt"), ConnectorMode::Static, None, + ReadMethod::ByLine, )?; let parser = JsonLinesParser::new( Some(vec!["a".to_string()]), @@ -281,6 +283,7 @@ fn test_jsonlines_with_default_at_jsonpath() -> eyre::Result<()> { PathBuf::from("tests/data/jsonlines_with_skips.txt"), ConnectorMode::Static, None, + ReadMethod::ByLine, )?; let parser = JsonLinesParser::new( Some(vec!["a".to_string()]), @@ -325,6 +328,7 @@ fn test_jsonlines_explicit_null_not_overridden() -> eyre::Result<()> { PathBuf::from("tests/data/jsonlines_with_skips_and_nulls.txt"), ConnectorMode::Static, None, + ReadMethod::ByLine, )?; let parser = JsonLinesParser::new( Some(vec!["a".to_string()]), diff --git a/tests/test_debezium.rs b/tests/test_debezium.rs index 16179ec8..86479165 100644 --- a/tests/test_debezium.rs +++ b/tests/test_debezium.rs @@ -5,7 +5,7 @@ use std::path::PathBuf; use std::str::FromStr; use pathway_engine::connectors::data_format::{DebeziumMessageParser, ParsedEvent}; -use pathway_engine::connectors::data_storage::{ConnectorMode, FilesystemReader}; +use pathway_engine::connectors::data_storage::{ConnectorMode, FilesystemReader, ReadMethod}; use pathway_engine::engine::Value; #[test] @@ -14,6 +14,7 @@ fn test_debezium_reads_ok() -> eyre::Result<()> { PathBuf::from("tests/data/sample_debezium.txt"), ConnectorMode::Static, None, + ReadMethod::ByLine, )?; let parser = DebeziumMessageParser::new( Some(vec!["id".to_string()]), diff --git a/tests/test_dsv.rs b/tests/test_dsv.rs index 220ce6d6..e9f374dd 100644 --- a/tests/test_dsv.rs +++ b/tests/test_dsv.rs @@ -10,7 +10,7 @@ use pathway_engine::connectors::data_format::{ DsvParser, DsvSettings, InnerSchemaField, ParseResult, ParsedEvent, Parser, }; use pathway_engine::connectors::data_storage::{ - ConnectorMode, FilesystemReader, ReadResult, ReadResult::Data, Reader, + ConnectorMode, FilesystemReader, ReadMethod, ReadResult, ReadResult::Data, Reader, }; use pathway_engine::engine::{Key, Type, Value}; @@ -20,6 +20,7 @@ fn test_dsv_read_ok() -> eyre::Result<()> { PathBuf::from("tests/data/sample.txt"), ConnectorMode::Static, None, + ReadMethod::ByLine, )?; let mut parser = DsvParser::new( DsvSettings::new(Some(vec!["a".to_string()]), vec!["b".to_string()], ','), @@ -61,6 +62,7 @@ fn test_dsv_column_does_not_exist() -> eyre::Result<()> { PathBuf::from("tests/data/sample.txt"), ConnectorMode::Static, None, + ReadMethod::ByLine, )?; let parser = DsvParser::new( DsvSettings::new(Some(vec!["a".to_string()]), vec!["c".to_string()], ','), @@ -82,6 +84,7 @@ fn test_dsv_rows_parsing_ignore_type() -> eyre::Result<()> { PathBuf::from("tests/data/sample_str_int.txt"), ConnectorMode::Static, None, + ReadMethod::ByLine, )?; let mut parser = DsvParser::new( DsvSettings::new(Some(vec!["a".to_string()]), vec!["b".to_string()], ','), @@ -116,6 +119,7 @@ fn test_dsv_not_enough_columns() -> eyre::Result<()> { PathBuf::from("tests/data/sample_bad_lines.txt"), ConnectorMode::Static, None, + ReadMethod::ByLine, )?; let mut parser = DsvParser::new( DsvSettings::new(Some(vec!["a".to_string()]), vec!["b".to_string()], ','), @@ -159,6 +163,7 @@ fn test_dsv_autogenerate_pkey() -> eyre::Result<()> { PathBuf::from("tests/data/sample.txt"), ConnectorMode::Static, None, + ReadMethod::ByLine, )?; let mut parser = DsvParser::new( DsvSettings::new(None, vec!["a".to_string(), "b".to_string()], ','), @@ -199,6 +204,7 @@ fn test_dsv_composite_pkey() -> eyre::Result<()> { PathBuf::from("tests/data/sample_composite_pkey.txt"), ConnectorMode::Static, None, + ReadMethod::ByLine, )?; let mut parser = DsvParser::new( DsvSettings::new( @@ -259,6 +265,7 @@ fn test_dsv_read_schema_ok() -> eyre::Result<()> { PathBuf::from("tests/data/schema.txt"), ConnectorMode::Static, None, + ReadMethod::ByLine, )?; let mut parser = DsvParser::new( DsvSettings::new( @@ -326,6 +333,7 @@ fn test_dsv_read_schema_nonparsable() -> eyre::Result<()> { PathBuf::from("tests/data/schema.txt"), ConnectorMode::Static, None, + ReadMethod::ByLine, )?; let mut parser = DsvParser::new( DsvSettings::new( diff --git a/tests/test_jsonlines.rs b/tests/test_jsonlines.rs index 2598fdb5..ba130691 100644 --- a/tests/test_jsonlines.rs +++ b/tests/test_jsonlines.rs @@ -7,7 +7,7 @@ use std::str::FromStr; use std::sync::Arc; use pathway_engine::connectors::data_format::{JsonLinesParser, ParsedEvent}; -use pathway_engine::connectors::data_storage::{ConnectorMode, FilesystemReader}; +use pathway_engine::connectors::data_storage::{ConnectorMode, FilesystemReader, ReadMethod}; use pathway_engine::engine::Value; #[test] @@ -16,6 +16,7 @@ fn test_jsonlines_ok() -> eyre::Result<()> { PathBuf::from("tests/data/jsonlines.txt"), ConnectorMode::Static, None, + ReadMethod::ByLine, )?; let parser = JsonLinesParser::new( Some(vec!["a".to_string()]), @@ -53,6 +54,7 @@ fn test_jsonlines_incorrect_key() -> eyre::Result<()> { PathBuf::from("tests/data/jsonlines.txt"), ConnectorMode::Static, None, + ReadMethod::ByLine, )?; let parser = JsonLinesParser::new( Some(vec!["a".to_string(), "d".to_string()]), @@ -77,6 +79,7 @@ fn test_jsonlines_incomplete_key_to_null() -> eyre::Result<()> { PathBuf::from("tests/data/jsonlines.txt"), ConnectorMode::Static, None, + ReadMethod::ByLine, )?; let parser = JsonLinesParser::new( Some(vec!["a".to_string(), "d".to_string()]), @@ -98,6 +101,7 @@ fn test_jsonlines_incorrect_values() -> eyre::Result<()> { PathBuf::from("tests/data/jsonlines.txt"), ConnectorMode::Static, None, + ReadMethod::ByLine, )?; let parser = JsonLinesParser::new( Some(vec!["a".to_string()]), @@ -122,6 +126,7 @@ fn test_jsonlines_types_parsing() -> eyre::Result<()> { PathBuf::from("tests/data/jsonlines_types.txt"), ConnectorMode::Static, None, + ReadMethod::ByLine, )?; let parser = JsonLinesParser::new( Some(vec!["a".to_string()]), @@ -170,6 +175,7 @@ fn test_jsonlines_complex_paths() -> eyre::Result<()> { PathBuf::from("tests/data/json_complex_paths.txt"), ConnectorMode::Static, None, + ReadMethod::ByLine, )?; let mut routes = HashMap::new(); @@ -224,6 +230,7 @@ fn test_jsonlines_complex_paths_error() -> eyre::Result<()> { PathBuf::from("tests/data/json_complex_paths.txt"), ConnectorMode::Static, None, + ReadMethod::ByLine, )?; let mut routes = HashMap::new(); @@ -263,6 +270,7 @@ fn test_jsonlines_complex_path_ignore_errors() -> eyre::Result<()> { PathBuf::from("tests/data/json_complex_paths.txt"), ConnectorMode::Static, None, + ReadMethod::ByLine, )?; let mut routes = HashMap::new(); @@ -299,6 +307,7 @@ fn test_jsonlines_incorrect_key_verbose_error() -> eyre::Result<()> { PathBuf::from("tests/data/jsonlines.txt"), ConnectorMode::Static, None, + ReadMethod::ByLine, )?; let parser = JsonLinesParser::new( Some(vec!["a".to_string(), "d".to_string()]), @@ -326,6 +335,7 @@ fn test_jsonlines_incorrect_jsonpointer_verbose_error() -> eyre::Result<()> { PathBuf::from("tests/data/jsonlines.txt"), ConnectorMode::Static, None, + ReadMethod::ByLine, )?; let parser = JsonLinesParser::new( Some(vec!["a".to_string(), "d".to_string()]), @@ -350,6 +360,7 @@ fn test_jsonlines_failed_to_parse_field() -> eyre::Result<()> { PathBuf::from("tests/data/json_complex_paths.txt"), ConnectorMode::Static, None, + ReadMethod::ByLine, )?; let parser = JsonLinesParser::new( None, diff --git a/tests/test_seek.rs b/tests/test_seek.rs index e260aebf..755e005b 100644 --- a/tests/test_seek.rs +++ b/tests/test_seek.rs @@ -12,7 +12,7 @@ use pathway_engine::connectors::data_format::{ }; use pathway_engine::connectors::data_storage::ReaderBuilder; use pathway_engine::connectors::data_storage::{ - ConnectorMode, CsvFilesystemReader, FilesystemReader, + ConnectorMode, CsvFilesystemReader, FilesystemReader, ReadMethod, }; use pathway_engine::engine::Value; use pathway_engine::persistence::sync::SharedWorkersPersistenceCoordinator; @@ -45,8 +45,13 @@ fn csv_reader_parser_pair(input_path: &Path) -> (Box, Box (Box, Box) { - let reader = - FilesystemReader::new(input_path.to_path_buf(), ConnectorMode::Static, Some(1)).unwrap(); + let reader = FilesystemReader::new( + input_path.to_path_buf(), + ConnectorMode::Static, + Some(1), + ReadMethod::ByLine, + ) + .unwrap(); let parser = JsonLinesParser::new( Some(vec!["key".to_string()]), vec!["value".to_string()],