Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: class based refactor for SDK #129

Merged
merged 78 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from 66 commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
e4df766
refactor init
kohlisid Jan 4, 2024
382bdf2
init
kohlisid Jan 4, 2024
87420ca
move infor
kohlisid Jan 4, 2024
136da24
move infor
kohlisid Jan 4, 2024
58b6496
await change
kohlisid Jan 4, 2024
83d366a
await change
kohlisid Jan 4, 2024
ceee882
change to create task
kohlisid Jan 4, 2024
b828204
change to create task
kohlisid Jan 4, 2024
837de07
ev loop
kohlisid Jan 5, 2024
c4f0376
cleanup
kohlisid Jan 5, 2024
b385e25
cleanup
kohlisid Jan 5, 2024
a22673b
cleanup
kohlisid Jan 5, 2024
fe52e9f
server on same thread
kohlisid Jan 5, 2024
e2c315d
server on same thread
kohlisid Jan 5, 2024
28b5c70
server on same thread
kohlisid Jan 5, 2024
da9d1db
Export class
kohlisid Jan 5, 2024
27967bf
Class refactor
kohlisid Jan 5, 2024
a0ac382
multiproc info fix
kohlisid Jan 6, 2024
05a2cfe
modular
kohlisid Jan 8, 2024
3f6049f
modular
kohlisid Jan 8, 2024
b642b17
modular
kohlisid Jan 8, 2024
40bb295
modular
kohlisid Jan 8, 2024
e3b7090
mapstream
kohlisid Jan 8, 2024
88bb474
mapstream
kohlisid Jan 8, 2024
e187f90
mapstream
kohlisid Jan 8, 2024
d7ff4a1
reduce
kohlisid Jan 8, 2024
32d2975
reduce
kohlisid Jan 9, 2024
3e3238a
sink
kohlisid Jan 9, 2024
e35b156
sink
kohlisid Jan 9, 2024
1ba673d
sink
kohlisid Jan 9, 2024
069d681
sink
kohlisid Jan 9, 2024
285a1ae
sink
kohlisid Jan 9, 2024
e86fbe1
proto
kohlisid Jan 9, 2024
98dd013
transform
kohlisid Jan 9, 2024
9e268f8
source
kohlisid Jan 9, 2024
b517b6c
source
kohlisid Jan 9, 2024
9c814c6
source
kohlisid Jan 9, 2024
111684a
source
kohlisid Jan 9, 2024
39788cd
source
kohlisid Jan 9, 2024
d7fe470
tests
kohlisid Jan 10, 2024
7b96725
source
kohlisid Jan 10, 2024
0a3c004
tests
kohlisid Jan 10, 2024
50fd5c3
tests
kohlisid Jan 10, 2024
c76de42
examples
kohlisid Jan 10, 2024
2aa9b30
examples
kohlisid Jan 10, 2024
49c8b8d
cleanup
kohlisid Jan 10, 2024
bfde535
lint
kohlisid Jan 10, 2024
ebecebf
lint
kohlisid Jan 11, 2024
b11d086
README
kohlisid Jan 11, 2024
b2e7be3
SideInput
kohlisid Jan 11, 2024
617f63a
tests
kohlisid Jan 11, 2024
9bb16a7
tests
kohlisid Jan 11, 2024
e4656d5
tests
kohlisid Jan 11, 2024
7c1b276
add uvloop
kohlisid Jan 11, 2024
74efb87
seperate mappers
kohlisid Jan 16, 2024
fe6ec20
seperate mappers
kohlisid Jan 16, 2024
0575d2d
seperate mapstream
kohlisid Jan 16, 2024
c1256f6
seperate reducer
kohlisid Jan 17, 2024
c7b8a84
seperate servers
kohlisid Jan 17, 2024
c0a025c
examples
kohlisid Jan 17, 2024
678ec60
examples
kohlisid Jan 17, 2024
cdb97c1
examples
kohlisid Jan 17, 2024
a25387f
examples
kohlisid Jan 17, 2024
dffdc5f
examples
kohlisid Jan 17, 2024
2742b56
examples
kohlisid Jan 17, 2024
6bf2c4a
examples
kohlisid Jan 17, 2024
6e44a6c
README
kohlisid Jan 17, 2024
cbf47bf
REDUCER INSTANCE
kohlisid Jan 18, 2024
d88ed5c
REDUCER INSTANCE
kohlisid Jan 18, 2024
3961f95
deep copy
kohlisid Jan 19, 2024
4065717
deep copy
kohlisid Jan 22, 2024
d2c844d
deep copy
kohlisid Jan 22, 2024
d3b59ff
lint
kohlisid Jan 22, 2024
6b56383
add reducer test
kohlisid Jan 22, 2024
02afc01
change reduce signature
kohlisid Jan 23, 2024
d8bcefe
comments
kohlisid Jan 23, 2024
d8ce1ce
comments
kohlisid Jan 24, 2024
59d6550
comments
kohlisid Jan 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,4 @@ coverage:

ignore:
- "examples/"
- "pynumaflow/mapper/proto/*"
- "pynumaflow/sinker/proto/*"
- "pynumaflow/mapstreamer/proto/*"
- "pynumaflow/reducer/proto/*"
- "pynumaflow/sourcetransformer/proto/*"
- "pynumaflow/sideinput/proto/*"
- "pynumaflow/sourcer/proto/*"
- "pynumaflow/proto/*"
12 changes: 8 additions & 4 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ source = pynumaflow
omit =
pynumaflow/tests/*
examples/*
pynumaflow/proto/*
pynumaflow/shared/server.py

[report]
exclude_lines =
def start
def start_async
def __serve_async
def start_multiproc
def sync_server_start
def _run_server
def start_multiproc_server
async def start_async_server
def _reserve_port
if os.getenv("PYTHONDEBUG"):
_LOGGER.setLevel(logging.DEBUG)
def exec_multiproc
def exec
async def aexec
16 changes: 8 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ setup:


proto:
python3 -m grpc_tools.protoc -I=pynumaflow/sinker/proto --python_out=pynumaflow/sinker/proto --grpc_python_out=pynumaflow/sinker/proto pynumaflow/sinker/proto/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/mapper/proto --python_out=pynumaflow/mapper/proto --grpc_python_out=pynumaflow/mapper/proto pynumaflow/mapper/proto/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/mapstreamer/proto --python_out=pynumaflow/mapstreamer/proto --grpc_python_out=pynumaflow/mapstreamer/proto pynumaflow/mapstreamer/proto/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/reducer/proto --python_out=pynumaflow/reducer/proto --grpc_python_out=pynumaflow/reducer/proto pynumaflow/reducer/proto/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/sourcetransformer/proto --python_out=pynumaflow/sourcetransformer/proto --grpc_python_out=pynumaflow/sourcetransformer/proto pynumaflow/sourcetransformer/proto/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/sideinput/proto --python_out=pynumaflow/sideinput/proto --grpc_python_out=pynumaflow/sideinput/proto pynumaflow/sideinput/proto/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/sourcer/proto --python_out=pynumaflow/sourcer/proto --grpc_python_out=pynumaflow/sourcer/proto pynumaflow/sourcer/proto/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/proto/sinker --python_out=pynumaflow/proto/sinker --grpc_python_out=pynumaflow/proto/sinker pynumaflow/proto/sinker/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/proto/mapper --python_out=pynumaflow/proto/mapper --grpc_python_out=pynumaflow/proto/mapper pynumaflow/proto/mapper/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/proto/mapstreamer --python_out=pynumaflow/proto/mapstreamer --grpc_python_out=pynumaflow/proto/mapstreamer pynumaflow/proto/mapstreamer/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/proto/reducer --python_out=pynumaflow/proto/reducer --grpc_python_out=pynumaflow/proto/reducer pynumaflow/proto/reducer/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/proto/sourcetransformer --python_out=pynumaflow/proto/sourcetransformer --grpc_python_out=pynumaflow/proto/sourcetransformer pynumaflow/proto/sourcetransformer/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/proto/sideinput --python_out=pynumaflow/proto/sideinput --grpc_python_out=pynumaflow/proto/sideinput pynumaflow/proto/sideinput/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/proto/sourcer --python_out=pynumaflow/proto/sourcer --grpc_python_out=pynumaflow/proto/sourcer pynumaflow/proto/sourcer/*.proto


sed -i '' 's/^\(import.*_pb2\)/from . \1/' pynumaflow/*/proto/*.py
sed -i '' 's/^\(import.*_pb2\)/from . \1/' pynumaflow/proto/*/*.py
113 changes: 95 additions & 18 deletions README.md
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simplify readme. give an overral gist and point them to examples. take a look how go's is written.

Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,25 @@ pre-commit install
### Map

```python
from pynumaflow.mapper import Messages, Message, Datum, Mapper
from pynumaflow.mapper import Messages, Message, Datum, MapServer


def my_handler(keys: list[str], datum: Datum) -> Messages:
def handler(keys: list[str], datum: Datum) -> Messages:
val = datum.value
_ = datum.event_time
_ = datum.watermark
return Messages(Message(value=val, keys=keys))
strs = val.decode("utf-8").split(",")
messages = Messages()
if len(strs) == 0:
messages.append(Message.to_drop())
return messages
for s in strs:
messages.append(Message(str.encode(s)))
return messages


if __name__ == "__main__":
grpc_server = Mapper(handler=my_handler)
grpc_server = MapServer(handler)
grpc_server.start()
```
### SourceTransformer - Map with event time assignment capability
Expand All @@ -66,7 +73,7 @@ SourceTransformer is only supported at source vertex to enable (a) early data fi

```python
from datetime import datetime
from pynumaflow.sourcetransformer import Messages, Message, Datum, SourceTransformer
from pynumaflow.sourcetransformer import Messages, Message, Datum, SourceTransformServer


def transform_handler(keys: list[str], datum: Datum) -> Messages:
Expand All @@ -78,21 +85,18 @@ def transform_handler(keys: list[str], datum: Datum) -> Messages:


if __name__ == "__main__":
grpc_server = SourceTransformer(handler=transform_handler)
grpc_server = SourceTransformServer(transform_handler)
grpc_server.start()
```

### Reduce

```python
import aiorun
from typing import Iterator, List
from pynumaflow.reducer import Messages, Message, Datum, Metadata, AsyncReducer
from typing import AsyncIterable
from pynumaflow.reducer import Messages, Message, Datum, Metadata, ReduceAsyncServer


async def my_handler(
keys: List[str], datums: Iterator[Datum], md: Metadata
) -> Messages:
async def reduce_handler(keys: list[str], datums: AsyncIterable[Datum], md: Metadata) -> Messages:
interval_window = md.interval_window
counter = 0
async for _ in datums:
Expand All @@ -101,12 +105,12 @@ async def my_handler(
f"counter:{counter} interval_window_start:{interval_window.start} "
f"interval_window_end:{interval_window.end}"
)
return Messages(Message(str.encode(msg), keys))
return Messages(Message(str.encode(msg), keys=keys))


if __name__ == "__main__":
grpc_server = AsyncReducer(handler=my_handler)
aiorun.run(grpc_server.start())
grpc_server = ReduceAsyncServer(reduce_handler)
grpc_server.start()
```

### Sample Image
Expand All @@ -117,7 +121,7 @@ under [examples](examples/map/forward_message).

```python
from typing import Iterator
from pynumaflow.sinker import Datum, Responses, Response, Sinker
from pynumaflow.sinker import Datum, Responses, Response, SinkServer


def my_handler(datums: Iterator[Datum]) -> Responses:
Expand All @@ -129,11 +133,84 @@ def my_handler(datums: Iterator[Datum]) -> Responses:


if __name__ == "__main__":
grpc_server = Sinker(my_handler)
grpc_server = SinkServer(my_handler)
grpc_server.start()
```

### Sample Image

A sample UDSink [Dockerfile](examples/sink/log/Dockerfile) is provided
under [examples](examples/sink/log).
under [examples](examples/sink/log).

## Class based handlers

We can also implement UDFs and UDSinks using class based handlers.

The class based handlers are useful when we want to maintain state across multiple invocations of the handler.

Here we can pass the class instance to the server and the server will invoke the handler methods on the instance.

To use a class based handler, we the user needs to inherit the base class of the UDF/UDSink.
And implement the required methods in the class.

Example For Mapper, the user needs to inherit the [Mapper](pynumaflow/mapper/_dtypes.py#170) class and then implement the [handler](pynumaflow/mapper/_dtypes.py#170) method.

### Map

```python
from pynumaflow.mapper import Messages, Message, Datum, MapServer, Mapper


class MyHandler(Mapper):
def handler(self, keys: list[str], datum: Datum) -> Messages:
val = datum.value
_ = datum.event_time
_ = datum.watermark
strs = val.decode("utf-8").split(",")
messages = Messages()
if len(strs) == 0:
messages.append(Message.to_drop())
return messages
for s in strs:
messages.append(Message(str.encode(s)))
return messages


if __name__ == "__main__":
class_instance = MyHandler()
grpc_server = MapServer(class_instance)
grpc_server.start()
```


## Server Types

For different types of UDFs and UDSinks, we have different server types which are supported.

These have different functionalities and are used for different use cases.

Currently we support the following server types:
1) SyncServer
2) AsyncServer
3) MultiProcessServer

Not all of the above are supported for all UDFs and UDSinks.



### SyncServer
```
grpc_server = MapServer(handler)
```

### AsyncServer
```
grpc_server = MapAsyncServer(handler)
```

### MultiProcessServer
```
grpc_server = MapMultiProcServer(handler)
```


16 changes: 7 additions & 9 deletions examples/developer_guide/example.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,27 @@
import aiorun
from collections.abc import Iterator
from collections.abc import AsyncIterable


from pynumaflow.reducer import (
Messages,
Message,
Datum,
Metadata,
AsyncReducer,
ReduceAsyncServer,
)


async def my_handler(keys: list[str], datums: Iterator[Datum], md: Metadata) -> Messages:
# count the number of events
async def reduce_handler(keys: list[str], datums: AsyncIterable[Datum], md: Metadata) -> Messages:
interval_window = md.interval_window
counter = 0
async for _ in datums:
counter += 1

msg = (
f"counter:{counter} interval_window_start:{interval_window.start} "
f"interval_window_end:{interval_window.end}"
)
return Messages(Message(keys=keys, value=str.encode(msg)))
return Messages(Message(str.encode(msg), keys=keys))


if __name__ == "__main__":
grpc_server = AsyncReducer(handler=my_handler)
aiorun.run(grpc_server.start())
grpc_server = ReduceAsyncServer(reduce_handler)
grpc_server.start()
4 changes: 2 additions & 2 deletions examples/map/even_odd/example.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from pynumaflow.mapper import Messages, Message, Datum, Mapper
from pynumaflow.mapper import Messages, Message, Datum, MapServer


def my_handler(keys: list[str], datum: Datum) -> Messages:
Expand All @@ -22,5 +22,5 @@ def my_handler(keys: list[str], datum: Datum) -> Messages:


if __name__ == "__main__":
grpc_server = Mapper(handler=my_handler)
grpc_server = MapServer(my_handler)
grpc_server.start()
2 changes: 1 addition & 1 deletion examples/map/even_odd/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ authors = ["Numaflow developers"]

[tool.poetry.dependencies]
python = "~3.10"
pynumaflow = "~0.6.0"
pynumaflow = "~0.7.0"

[tool.poetry.dev-dependencies]

Expand Down
27 changes: 14 additions & 13 deletions examples/map/flatmap/example.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
from pynumaflow.mapper import Messages, Message, Datum, Mapper
from pynumaflow.mapper import Messages, Message, Datum, MapServer, Mapper


def my_handler(keys: list[str], datum: Datum) -> Messages:
val = datum.value
_ = datum.event_time
_ = datum.watermark
strs = val.decode("utf-8").split(",")
messages = Messages()
if len(strs) == 0:
messages.append(Message.to_drop())
class Flatmap(Mapper):
def handler(self, keys: list[str], datum: Datum) -> Messages:
val = datum.value
_ = datum.event_time
_ = datum.watermark
strs = val.decode("utf-8").split(",")
messages = Messages()
if len(strs) == 0:
messages.append(Message.to_drop())
return messages
for s in strs:
messages.append(Message(str.encode(s)))
return messages
for s in strs:
messages.append(Message(str.encode(s)))
return messages


if __name__ == "__main__":
grpc_server = Mapper(handler=my_handler)
grpc_server = MapServer(Flatmap())
grpc_server.start()
2 changes: 1 addition & 1 deletion examples/map/flatmap/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ spec:
- name: flatmap
udf:
container:
image: "quay.io/numaio/numaflow-python/map-flatmap:v0.5.0"
image: "quay.io/numaio/numaflow-python/map-flatmap:v0.7.0"
env:
- name: PYTHONDEBUG
value: "true"
Expand Down
2 changes: 1 addition & 1 deletion examples/map/flatmap/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ authors = ["Numaflow developers"]

[tool.poetry.dependencies]
python = "~3.10"
pynumaflow = "~0.6.0"
pynumaflow = "~0.7.0"


[tool.poetry.dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion examples/map/forward_message/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
.PHONY: image
image:
docker build -t "quay.io/numaio/numaflow-python/map-forward-message:v0.5.0" .
docker build -t "quay.io/numaio/numaflow-python/map-forward-message:v0.7.0" .
# Github CI runner uses platform linux/amd64. If your local environment don't, the image built by command above might not work
# under the CI E2E test environment.
# To build an image that supports multiple platforms(linux/amd64,linux/arm64) and push to quay.io, use the following command
Expand Down
24 changes: 22 additions & 2 deletions examples/map/forward_message/example.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
from pynumaflow.mapper import Messages, Message, Datum, Mapper
import os

from pynumaflow.mapper import Messages, Message, Datum, MapServer, Mapper


class Example(Mapper):
def handler(self, keys: list[str], datum: Datum) -> Messages:
val = datum.value
_ = datum.event_time
_ = datum.watermark
messages = Messages()
messages.append(Message(value=val, keys=keys))
return messages

kohlisid marked this conversation as resolved.
Show resolved Hide resolved

def my_handler(keys: list[str], datum: Datum) -> Messages:
Expand All @@ -11,5 +23,13 @@ def my_handler(keys: list[str], datum: Datum) -> Messages:


if __name__ == "__main__":
grpc_server = Mapper(handler=my_handler)
invoke = os.getenv("INVOKE", "handler")
# Use the class based approach or function based handler
kohlisid marked this conversation as resolved.
Show resolved Hide resolved
# based on the env variable
# Both can be used and passed directly to the server class
if invoke == "class":
handler = Example()
else:
handler = my_handler
grpc_server = MapServer(handler)
grpc_server.start()
2 changes: 1 addition & 1 deletion examples/map/forward_message/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ authors = ["Numaflow developers"]

[tool.poetry.dependencies]
python = "~3.10"
pynumaflow = "~0.6.0"
pynumaflow = "~0.7.0"

[tool.poetry.dev-dependencies]

Expand Down
Loading