Skip to content

Commit

Permalink
feat: streaming transformer (numaproj#196)
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid authored Oct 23, 2024
1 parent 6ad59bd commit a1d3eb5
Show file tree
Hide file tree
Showing 12 changed files with 487 additions and 229 deletions.
32 changes: 25 additions & 7 deletions pynumaflow/proto/sourcetransformer/transform.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,35 @@ service SourceTransform {
// SourceTransformFn applies a function to each request element.
// In addition to map function, SourceTransformFn also supports assigning a new event time to response.
// SourceTransformFn can be used only at source vertex by source data transformer.
rpc SourceTransformFn(SourceTransformRequest) returns (SourceTransformResponse);
rpc SourceTransformFn(stream SourceTransformRequest) returns (stream SourceTransformResponse);

// IsReady is the heartbeat endpoint for gRPC.
rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);
}

/*
* Handshake message between client and server to indicate the start of transmission.
*/
message Handshake {
// Required field indicating the start of transmission.
bool sot = 1;
}

/**
* SourceTransformerRequest represents a request element.
*/
message SourceTransformRequest {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
map<string, string> headers = 5;
message Request {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
map<string, string> headers = 5;
// This ID is used to uniquely identify a transform request
string id = 6;
}
Request request = 1;
optional Handshake handshake = 2;
}

/**
Expand All @@ -37,11 +51,15 @@ message SourceTransformResponse {
repeated string tags = 4;
}
repeated Result results = 1;
// This ID is used to refer the responses to the request it corresponds to.
string id = 2;
// Handshake message between client and server to indicate the start of transmission.
optional Handshake handshake = 3;
}

/**
* ReadyResponse is the health check result.
*/
message ReadyResponse {
bool ready = 1;
}
}
34 changes: 19 additions & 15 deletions pynumaflow/proto/sourcetransformer/transform_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

76 changes: 52 additions & 24 deletions pynumaflow/proto/sourcetransformer/transform_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -13,37 +13,58 @@ from typing import (

DESCRIPTOR: _descriptor.FileDescriptor

class Handshake(_message.Message):
__slots__ = ("sot",)
SOT_FIELD_NUMBER: _ClassVar[int]
sot: bool
def __init__(self, sot: bool = ...) -> None: ...

class SourceTransformRequest(_message.Message):
__slots__ = ("keys", "value", "event_time", "watermark", "headers")
__slots__ = ("request", "handshake")

class Request(_message.Message):
__slots__ = ("keys", "value", "event_time", "watermark", "headers", "id")

class HeadersEntry(_message.Message):
__slots__ = ("key", "value")
KEY_FIELD_NUMBER: _ClassVar[int]
class HeadersEntry(_message.Message):
__slots__ = ("key", "value")
KEY_FIELD_NUMBER: _ClassVar[int]
VALUE_FIELD_NUMBER: _ClassVar[int]
key: str
value: str
def __init__(self, key: _Optional[str] = ..., value: _Optional[str] = ...) -> None: ...
KEYS_FIELD_NUMBER: _ClassVar[int]
VALUE_FIELD_NUMBER: _ClassVar[int]
key: str
value: str
def __init__(self, key: _Optional[str] = ..., value: _Optional[str] = ...) -> None: ...
KEYS_FIELD_NUMBER: _ClassVar[int]
VALUE_FIELD_NUMBER: _ClassVar[int]
EVENT_TIME_FIELD_NUMBER: _ClassVar[int]
WATERMARK_FIELD_NUMBER: _ClassVar[int]
HEADERS_FIELD_NUMBER: _ClassVar[int]
keys: _containers.RepeatedScalarFieldContainer[str]
value: bytes
event_time: _timestamp_pb2.Timestamp
watermark: _timestamp_pb2.Timestamp
headers: _containers.ScalarMap[str, str]
EVENT_TIME_FIELD_NUMBER: _ClassVar[int]
WATERMARK_FIELD_NUMBER: _ClassVar[int]
HEADERS_FIELD_NUMBER: _ClassVar[int]
ID_FIELD_NUMBER: _ClassVar[int]
keys: _containers.RepeatedScalarFieldContainer[str]
value: bytes
event_time: _timestamp_pb2.Timestamp
watermark: _timestamp_pb2.Timestamp
headers: _containers.ScalarMap[str, str]
id: str
def __init__(
self,
keys: _Optional[_Iterable[str]] = ...,
value: _Optional[bytes] = ...,
event_time: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ...,
watermark: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ...,
headers: _Optional[_Mapping[str, str]] = ...,
id: _Optional[str] = ...,
) -> None: ...
REQUEST_FIELD_NUMBER: _ClassVar[int]
HANDSHAKE_FIELD_NUMBER: _ClassVar[int]
request: SourceTransformRequest.Request
handshake: Handshake
def __init__(
self,
keys: _Optional[_Iterable[str]] = ...,
value: _Optional[bytes] = ...,
event_time: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ...,
watermark: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ...,
headers: _Optional[_Mapping[str, str]] = ...,
request: _Optional[_Union[SourceTransformRequest.Request, _Mapping]] = ...,
handshake: _Optional[_Union[Handshake, _Mapping]] = ...,
) -> None: ...

class SourceTransformResponse(_message.Message):
__slots__ = ("results",)
__slots__ = ("results", "id", "handshake")

class Result(_message.Message):
__slots__ = ("keys", "value", "event_time", "tags")
Expand All @@ -63,9 +84,16 @@ class SourceTransformResponse(_message.Message):
tags: _Optional[_Iterable[str]] = ...,
) -> None: ...
RESULTS_FIELD_NUMBER: _ClassVar[int]
ID_FIELD_NUMBER: _ClassVar[int]
HANDSHAKE_FIELD_NUMBER: _ClassVar[int]
results: _containers.RepeatedCompositeFieldContainer[SourceTransformResponse.Result]
id: str
handshake: Handshake
def __init__(
self, results: _Optional[_Iterable[_Union[SourceTransformResponse.Result, _Mapping]]] = ...
self,
results: _Optional[_Iterable[_Union[SourceTransformResponse.Result, _Mapping]]] = ...,
id: _Optional[str] = ...,
handshake: _Optional[_Union[Handshake, _Mapping]] = ...,
) -> None: ...

class ReadyResponse(_message.Message):
Expand Down
12 changes: 6 additions & 6 deletions pynumaflow/proto/sourcetransformer/transform_pb2_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def __init__(self, channel):
Args:
channel: A grpc.Channel.
"""
self.SourceTransformFn = channel.unary_unary(
self.SourceTransformFn = channel.stream_stream(
"/sourcetransformer.v1.SourceTransform/SourceTransformFn",
request_serializer=transform__pb2.SourceTransformRequest.SerializeToString,
response_deserializer=transform__pb2.SourceTransformResponse.FromString,
Expand All @@ -30,7 +30,7 @@ def __init__(self, channel):
class SourceTransformServicer(object):
"""Missing associated documentation comment in .proto file."""

def SourceTransformFn(self, request, context):
def SourceTransformFn(self, request_iterator, context):
"""SourceTransformFn applies a function to each request element.
In addition to map function, SourceTransformFn also supports assigning a new event time to response.
SourceTransformFn can be used only at source vertex by source data transformer.
Expand All @@ -48,7 +48,7 @@ def IsReady(self, request, context):

def add_SourceTransformServicer_to_server(servicer, server):
rpc_method_handlers = {
"SourceTransformFn": grpc.unary_unary_rpc_method_handler(
"SourceTransformFn": grpc.stream_stream_rpc_method_handler(
servicer.SourceTransformFn,
request_deserializer=transform__pb2.SourceTransformRequest.FromString,
response_serializer=transform__pb2.SourceTransformResponse.SerializeToString,
Expand All @@ -71,7 +71,7 @@ class SourceTransform(object):

@staticmethod
def SourceTransformFn(
request,
request_iterator,
target,
options=(),
channel_credentials=None,
Expand All @@ -82,8 +82,8 @@ def SourceTransformFn(
timeout=None,
metadata=None,
):
return grpc.experimental.unary_unary(
request,
return grpc.experimental.stream_stream(
request_iterator,
target,
"/sourcetransformer.v1.SourceTransform/SourceTransformFn",
transform__pb2.SourceTransformRequest.SerializeToString,
Expand Down
2 changes: 1 addition & 1 deletion pynumaflow/sourcetransformer/multiproc_server.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pynumaflow.info.types import ServerInfo, MINIMUM_NUMAFLOW_VERSION, ContainerType
from pynumaflow.sourcetransformer.servicer.server import SourceTransformServicer
from pynumaflow.sourcetransformer.servicer._servicer import SourceTransformServicer

from pynumaflow.shared.server import start_multiproc_server

Expand Down
2 changes: 1 addition & 1 deletion pynumaflow/sourcetransformer/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from pynumaflow.shared import NumaflowServer
from pynumaflow.shared.server import sync_server_start
from pynumaflow.sourcetransformer._dtypes import SourceTransformCallable
from pynumaflow.sourcetransformer.servicer.server import SourceTransformServicer
from pynumaflow.sourcetransformer.servicer._servicer import SourceTransformServicer


class SourceTransformServer(NumaflowServer):
Expand Down
Loading

0 comments on commit a1d3eb5

Please sign in to comment.