Skip to content

Commit

Permalink
feat: add default kafka timestamp behavior to python API
Browse files Browse the repository at this point in the history
  • Loading branch information
emgeee committed Nov 25, 2024
1 parent a4b3408 commit f1eebdf
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 9 deletions.
6 changes: 5 additions & 1 deletion py-denormalized/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ requires-python = ">=3.12"
classifiers = []
dynamic = ["version"] # Version specified in py-denormalized/Cargo.toml
description = "Embeddable stream processing engine"
dependencies = ["pyarrow>=17.0.0", "datafusion>=40.1.0"]
dependencies = [
"pyarrow>=17.0.0",
"datafusion>=40.1.0",
"pip>=24.3.1",
]

[project.optional-dependencies]
tests = ["pytest"]
Expand Down
36 changes: 30 additions & 6 deletions py-denormalized/python/denormalized/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,58 @@


class Context:
"""Context."""
"""A context manager for handling data stream operations.
This class provides functionality to create and manage data streams
from various sources like Kafka topics.
"""

def __init__(self) -> None:
"""__init__."""
"""Initializes a new Context instance with PyContext."""
self.ctx = PyContext()

def __repr__(self):
"""Returns the string representation of the PyContext object.
Returns:
str: String representation of the underlying PyContext.
"""
return self.ctx.__repr__()

def __str__(self):
"""Returns the string representation of the PyContext object.
Returns:
str: String representation of the underlying PyContext.
"""
return self.ctx.__str__()

def from_topic(
self,
topic: str,
sample_json: str,
bootstrap_servers: str,
timestamp_column: str,
timestamp_column: str | None = None,
group_id: str = "default_group",
) -> DataStream:
"""Create a new context from a topic."""
"""Creates a new DataStream from a Kafka topic.
Args:
topic: The name of the Kafka topic to consume from.
sample_json: A sample JSON string representing the expected message format.
bootstrap_servers: Comma-separated list of Kafka broker addresses.
timestamp_column: Optional column name containing message timestamps. If this is not specified it will default to using the kafka timestamp the message was received at.
group_id: Kafka consumer group ID, defaults to "default_group".
Returns:
DataStream: A new DataStream instance connected to the specified topic.
"""
py_ds = self.ctx.from_topic(
topic,
sample_json,
bootstrap_servers,
timestamp_column,
group_id,
timestamp_column,
)
ds = DataStream(py_ds)

return ds
14 changes: 12 additions & 2 deletions py-denormalized/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,23 +71,33 @@ impl PyContext {
Ok("PyContext".to_string())
}

#[pyo3(signature = (
topic,
sample_json,
bootstrap_servers,
group_id,
timestamp_column = None
))]
pub fn from_topic(
&self,
py: Python,
topic: String,
sample_json: String,
bootstrap_servers: String,
timestamp_column: String,
group_id: String,
timestamp_column: Option<String>,
) -> PyResult<PyDataStream> {
let context = self.context.clone();
let rt = &get_tokio_runtime(py).0;
let fut: JoinHandle<denormalized::common::error::Result<DataStream>> =
rt.spawn(async move {
let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone());

if let Some(ts_col) = timestamp_column {
topic_builder.with_timestamp(ts_col, TimestampUnit::Int64Millis);
}

let source_topic = topic_builder
.with_timestamp(timestamp_column, TimestampUnit::Int64Millis)
.with_encoding("json")?
.with_topic(topic)
.infer_schema_from_json(sample_json.as_str())?
Expand Down

0 comments on commit f1eebdf

Please sign in to comment.