Skip to content

Commit

Permalink
fix: producer async_wait error return (#500)
Browse files Browse the repository at this point in the history
* chore: python 3.8 EOL

* fix: produce output result misconversion always resulted in None

break out modules as first refactor out of giant file monoliths

* chore: make lint refresh and fmt fixes

* chore: update CHANGELOG.md
  • Loading branch information
digikata authored Oct 31, 2024
1 parent 53ee177 commit c2cc372
Show file tree
Hide file tree
Showing 12 changed files with 324 additions and 101 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest, macOS-latest] # TODO: add windows-2019,
python-version: ["38", "39", "310", "311", "312"]
python-version: ["39", "310", "311", "312"]
env:
CIBW_SKIP: "cp36-* pp* *-win32"
CIBW_ARCHS_MACOS: x86_64 universal2
Expand Down Expand Up @@ -132,7 +132,7 @@ jobs:
matrix:
os: [ubuntu-latest]
rust: [stable]
python-version: ["3.11"]
python-version: ["3.12"]
steps:
- uses: actions/checkout@v4
- uses: actions/cache@v4
Expand Down Expand Up @@ -231,7 +231,7 @@ jobs:
matrix:
os: [macos-latest]
rust: [stable]
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python-version: ["3.9", "3.10", "3.11", "3.12"]
steps:
- uses: actions/checkout@v4
- name: Install rust ${{ matrix.rust }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/cloud.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
matrix:
os: [ubuntu-latest, macos-latest]
rust: [stable]
python-version: ["3.8", "3.12"]
python-version: ["3.9", "3.12"]
steps:
- uses: actions/checkout@v4
- uses: actions/cache@v4
Expand Down
81 changes: 81 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,87 @@

All notable changes to this project will be documented in this file.

## [0.16.5]

### Bug Fixes

- Produce output async_wait result misconversion always resulted in None

### Miscellaneous Tasks

- Python 3.8 EOL
- Update CHANGELOG.md
- README.md update
- Make lint refresh and fmt fixes

## [0.16.4] - 2024-10-11

### Miscellaneous Tasks

- Update to fluvio 0.12.0 (#495)
- Fix readme examples (#465)
- Bump pypa/gh-action-pypi-publish from 1.9.0 to 1.10.0 (#469)
- Update cc requirement from =1.1.12 to =1.1.15 (#467)

## [0.16.3] - 2024-08-21

### Bug Fixes

- Switch to rustls (#376)
- Wrap multiple-partition-consumer correctly
- Change to recommended dev build (#396)
- Bindgen dependency when cross compilation (#463)
- Publish build wheels (#464)

### Features

- Release GIL for most "run_block_on" calls
- Add python realization of ProduceOutput and RecordMetadata

### Miscellaneous Tasks

- Ci update outdated deps (#372)
- Use fixed version of cc (#379)
- Fluvio-cloud-ci fix profile cfg
- Update to fluvio 0.11.5
- Bump cargo.toml minor ver
- Bump black from 24.1.1 to 24.3.0 (#395)
- Bump setuptools-rust from 1.8.1 to 1.9.0 (#374)
- Bump cibuildwheel from 2.16.0 to 2.17.0 (#391)
- Bump pypa/cibuildwheel from 2.16.5 to 2.17.0 (#392)
- Update async-lock requirement from 2.4.0 to 3.3.0 (#387)
- Bump black from 24.3.0 to 24.4.0 (#401)
- Bump fluvio-types from v0.11.5 to v0.11.6 (#402)
- Update webbrowser requirement from 0.8.2 to 1.0.0 (#405)
- Update to fluvio 0.11.8
- Ci, publish, add toolchain
- Bump pip version
- Bump cibuildwheel from 2.18.1 to 2.19.0 (#428)
- Bump pypa/cibuildwheel from 2.18.1 to 2.19.0 (#427)
- Bump fluvio-types from v0.11.8 to v0.11.9 (#425)
- Update fluvio (#432)
- Update cc requirement from =1.0.83 to =1.1.0 (#433)
- Refresh docs
- Ci, push docs on tag event
- Bump black from 24.4.0 to 24.8.0 (#450)
- Bump setuptools-rust from 1.9.0 to 1.10.1 (#449)
- Bump cibuildwheel from 2.19.0 to 2.20.0 (#446)
- Bump pypa/cibuildwheel from 2.19.0 to 2.20.0 (#451)
- Fluvio update to 0.11.11 (#445)
- Ci cloud fix (#456)
- Update cc requirement from =1.1.5 to =1.1.11 (#459)
- Update cc requirement from =1.1.11 to =1.1.12 (#460)
- Use admin module instead of cli in tests (#457)

### Build

- Bump pypa/gh-action-pypi-publish from 1.8.11 to 1.9.0 (#431)
- Update cc requirement from =1.1.0 to =1.1.5 (#434)

## [0.16.1] [v0.16.2]

- publishing fix releases

## [0.16.0] - 2023-01-31

### Miscellaneous Tasks
Expand Down
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,5 @@ fluvio-sc-schema = { git = "https://github.com/infinyon/fluvio.git", tag = "v0.1
fluvio-controlplane-metadata = { git = "https://github.com/infinyon/fluvio.git", tag = "v0.12.0" }

# transitive version selection
parking_lot = "0.12.3"
parking_lot = "0.12.3"
bytes = { version = "1.8.0" }
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ for i in stream:

# Developer Notes

This project uses [flapigen](https://github.com/Dushistov/flapigen-rs) to
genate the C static library and
[setuptools-rust](https://github.com/PyO3/setuptools-rust) to bundle it into a
This project uses [PyO3](https://pyo3.rs) to wrap the fluvio crate.

[setuptools-rust](https://github.com/PyO3/setuptools-rust) bundles it into a
python package. For cross platform builds,
[cibuildwheel](https://github.com/joerick/cibuildwheel) is used.
[cibuildwheel](https://github.com/joerick/cibuildwheel) is used.

Running the tests locally require having already setup a [fluvio
locally](https://www.fluvio.io/docs/getting-started/fluvio-local/) or on
Expand Down
6 changes: 2 additions & 4 deletions fluvio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def wait(self) -> typing.Optional[RecordMetadata]:
This is a blocking call and may only return a `RecordMetadata` once.
Any subsequent call to `wait` will return a `None` value.
Errors will be raised as exceptions of type `FluvioError`.
"""
res = self._inner.wait()
if res is None:
Expand All @@ -109,10 +110,7 @@ async def async_wait(self) -> typing.Optional[RecordMetadata]:
This may only return a `RecordMetadata` once.
Any subsequent call to `wait` will return a `None` value.
"""
res = await self._inner.async_wait()
if res is None:
return None
return RecordMetadata(res)
return await self._inner.async_wait()


class Offset:
Expand Down
40 changes: 0 additions & 40 deletions integration-tests/test_fluvio_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,46 +390,6 @@ def test_consume_with_array_map_example(self):


class TestAsyncFluvioMethods(CommonAsyncFluvioSmartModuleTestCase):
async def test_async_produce(self):
fluvio = Fluvio.connect()

producer = fluvio.topic_producer(self.topic)
for i in range(10):
await producer.async_send_string("FOOBAR %s " % i)

async def test_async_produce_record_metadata(self):
fluvio = Fluvio.connect()

producer = fluvio.topic_producer(self.topic)

msg_strings = ["Foobar %s" % i for i in range(10)]
produce_outputs = [await producer.async_send_string(msg) for msg in msg_strings]

records = [
(("%s" % i).encode(), msg_string.encode())
for i, msg_string in enumerate(msg_strings)
]
produce_outputs.extend(await producer.async_send_all(records))

record_metadata = [
await produce_output.async_wait() for produce_output in produce_outputs
]

partition_id = 0
consumer = fluvio.partition_consumer(self.topic, partition_id)
messages = list(
itertools.islice(consumer.stream(Offset.beginning()), len(produce_outputs))
)

for metadata, msg in zip(record_metadata, messages):
self.assertNotEqual(metadata, None)
self.assertEqual(metadata.partition_id(), partition_id)
self.assertEqual(metadata.offset(), msg.offset())

for produce_output in produce_outputs:
# subsequent calls to po.wait shall return None
self.assertEqual(produce_output.wait(), None)

async def test_consumer_with_interator(self):
fluvio = Fluvio.connect()
producer = fluvio.topic_producer(self.topic)
Expand Down
159 changes: 159 additions & 0 deletions integration-tests/test_produce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
from fluvio import Fluvio, Offset, FluvioConfig
from fluvio import FluvioAdmin
import unittest
import uuid
import itertools
import time


def create_smartmodule(sm_name, sm_path):
# Normally it would be this code, but bare wasm smartmodules
# are used in the python client testing, & only the cli supports it so far
# dry_run = False
# fluvio_admin = FluvioAdmin.connect()
# fluvio_admin.create_smartmodule(sm_name, sm_path, dry_run)
import subprocess

subprocess.run(
f"fluvio smartmodule create {sm_name} --wasm-file {sm_path}", shell=True
).check_returncode()


class CommonFluvioSetup(unittest.TestCase):
def common_setup(self, sm_path=None):
"""Optionally create a smartmodule if sm_path is provided"""
self.admin = FluvioAdmin.connect()
self.topic = str(uuid.uuid4())
self.sm_name = str(uuid.uuid4())
self.sm_path = sm_path

if sm_path is not None:
create_smartmodule(self.sm_name, sm_path)

try:
self.admin.create_topic(self.topic)
except Exception as err:
print("Retrying after create_topic error {}", err)
time.sleep(5)
self.admin.create_topic(self.topic)

# list topics to verify topic was created
max_retries = 100
while max_retries > 0:
topic = self.admin.list_topics([self.topic])
if len(topic) > 0:
break
max_retries -= 1
if max_retries == 0:
self.fail("setup: Failed to create topic")
time.sleep(0.1)

def setUp(self):
self.common_setup()

def tearDown(self):
self.admin.delete_topic(self.topic)
time.sleep(1)
if self.sm_path is not None:
self.admin.delete_smartmodule(self.sm_name)


class TestFluvioProduce(CommonFluvioSetup):
def test_connect(self):
# A very simple test
Fluvio.connect()

def test_connect_with_config(self):
config = FluvioConfig.load()
Fluvio.connect_with_config(config)

def test_produce(self):
fluvio = Fluvio.connect()

producer = fluvio.topic_producer(self.topic)
for i in range(10):
producer.send_string("FOOBAR %s " % i)

def test_produce_record_metadata(self):
fluvio = Fluvio.connect()

producer = fluvio.topic_producer(self.topic)

msg_strings = ["Foobar %s" % i for i in range(10)]
produce_outputs = [producer.send_string(msg) for msg in msg_strings]

records = [
(("%s" % i).encode(), msg_string.encode())
for i, msg_string in enumerate(msg_strings)
]
produce_outputs.extend(producer.send_all(records))

record_metadata = [produce_output.wait() for produce_output in produce_outputs]

partition_id = 0
consumer = fluvio.partition_consumer(self.topic, partition_id)
messages = list(
itertools.islice(consumer.stream(Offset.beginning()), len(produce_outputs))
)

for metadata, msg in zip(record_metadata, messages):
self.assertNotEqual(metadata, None)
self.assertEqual(metadata.partition_id(), partition_id)
self.assertEqual(metadata.offset(), msg.offset())

for produce_output in produce_outputs:
# subsequent calls to po.wait shall return None
self.assertEqual(produce_output.wait(), None)


class TestFluvioProduceAsync(unittest.IsolatedAsyncioTestCase, CommonFluvioSetup):
async def test_async_produce(self):
fluvio = Fluvio.connect()

producer = fluvio.topic_producer(self.topic)
for i in range(10):
ret = await producer.async_send_string("FOOBAR %s " % i)
ret.wait()

async def test_async_produce_async_wait(self):
"""simple test, test_async_produce_record_metadata is more comprehensive"""
fluvio = Fluvio.connect()

producer = fluvio.topic_producer(self.topic)
future = await producer.async_send_string("FOOBAR async async")
out_future = future.async_wait()
result = await out_future
self.assertNotEqual(result, None)

async def test_async_produce_record_metadata(self):
fluvio = Fluvio.connect()

producer = fluvio.topic_producer(self.topic)

msg_strings = ["Foobar %s" % i for i in range(10)]
produce_outputs = [await producer.async_send_string(msg) for msg in msg_strings]

records = [
(("%s" % i).encode(), msg_string.encode())
for i, msg_string in enumerate(msg_strings)
]
produce_outputs.extend(await producer.async_send_all(records))

record_metadata = [
await produce_output.async_wait() for produce_output in produce_outputs
]

partition_id = 0
consumer = fluvio.partition_consumer(self.topic, partition_id)
messages = list(
itertools.islice(consumer.stream(Offset.beginning()), len(produce_outputs))
)

for metadata, msg in zip(record_metadata, messages):
self.assertNotEqual(metadata, None)
self.assertEqual(metadata.partition_id(), partition_id)
self.assertEqual(metadata.offset(), msg.offset())

for produce_output in produce_outputs:
# subsequent calls to po.wait shall return None
self.assertEqual(produce_output.wait(), None)
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
flake8==7.0.0
flake8==7.1.1
mccabe==0.7.0
msgpack==1.0.4
pycodestyle==2.11.1
pycodestyle==2.12.1
pyflakes==3.2.0
black==24.8.0
semantic-version==2.10.0
Expand Down
Loading

0 comments on commit c2cc372

Please sign in to comment.