This SDK provides the interface for writing UDFs and UDSinks in Python.
from pynumaflow.function import Messages, Message, Datum, UserDefinedFunctionServicer
def map_handler(key: str, datum: Datum) -> Messages:
val = datum.value
_ = datum.event_time
_ = datum.watermark
messages = Messages()
messages.append(Message.to_vtx(key, val))
return messages
if __name__ == "__main__":
grpc_server = UserDefinedFunctionServicer(map_handler)
grpc_server.start()
from typing import List
from pynumaflow.sink import Datum, Responses, Response, UserDefinedSinkServicer
def udsink_handler(datums: List[Datum]) -> Responses:
responses = Responses()
for msg in datums:
print("User Defined Sink", msg)
responses.append(Response.as_success(msg.id))
return responses
if __name__ == "__main__":
grpc_server = UserDefinedSinkServicer(udsink_handler)
grpc_server.start()
A sample UDSink Dockerfile is provided under examples.