Skip to content

Commit

Permalink
merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
emgeee committed Nov 25, 2024
2 parents f1eebdf + f652689 commit c6bac4c
Show file tree
Hide file tree
Showing 15 changed files with 2,446 additions and 370 deletions.
65 changes: 65 additions & 0 deletions .github/workflows/python-docs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
name: python-docs

on:
push:
branches: ["main"]
paths:
- "py-denormalized/**"
pull_request:
branches: ["main"]
paths:
- "py-denormalized/**"

# security: restrict permissions for CI jobs.
permissions:
contents: read

jobs:
# Build the documentation and upload the static HTML files as an artifact.
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Install uv
uses: astral-sh/setup-uv@v3
with:
version: "0.5.1"
enable-cache: true
cache-dependency-glob: "py-denormalized/uv.lock"

- name: "Set up Python"
uses: actions/setup-python@v5
with:
python-version-file: "py-denormalized/pyproject.toml"

- name: Install the project
working-directory: ./py-denormalized
run: uv sync --no-dev --group docs --extra feast

- name: Build the docs
working-directory: ./py-denormalized
run: |
source .venv/bin/activate
pdoc -t pdocs/ python/denormalized/ -o pdocs/_build
- uses: actions/upload-pages-artifact@v3
with:
path: py-denormalized/pdocs/_build

# Deploy the artifact to GitHub pages.
# This is a separate job so that only actions/deploy-pages has the necessary permissions.
deploy:
if: github.event_name == 'push' && github.ref == 'refs/heads/main'

needs: build
runs-on: ubuntu-latest
permissions:
pages: write
id-token: write
environment:
name: github-pages
url: ${{ steps.deployment.outputs.page_url }}
steps:
- id: deployment
uses: actions/deploy-pages@v4
1 change: 1 addition & 0 deletions py-denormalized/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ coverage.xml

# Sphinx documentation
docs/_build/
pdocs/_build/

# PyCharm
.idea/
Expand Down
12 changes: 6 additions & 6 deletions py-denormalized/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
denormalized-python
===
## Denormalized Python

Python bindings for [denormalized](https://github.com/probably-nothing-labs/denormalized)

Expand All @@ -9,19 +8,20 @@ Denormalized is a single node stream processing engine written in Rust. This dir

1. Install denormalized `pip install denormalized`
2. Start the custom docker image that contains an instance of kafka along with with a script that emits some sample data to kafka `docker run --rm -p 9092:9092 emgeee/kafka_emit_measurements:latest`
3. Copy the [stream_aggregate.py](python/examples/stream_aggregate.py) example
3. Copy the [stream_aggregate.py](./python/examples/stream_aggregate.py) example

This script will connect to the kafka instance running in docker and aggregate the metrics in realtime.

There are several other examples in the [examples/ folder](python/examples/) that demonstrate other capabilities including stream joins and UDAFs.
There are several other examples in the [examples folder](./python/examples/) that demonstrate other capabilities including stream joins and UDAFs.

[API Docs](https://probably-nothing-labs.github.io/denormalized/denormalized.html)

## Development

Make sure you're in the `py-denormalized/` directory.

We currently use [rye](https://rye.astral.sh/) to manage python dependencies.
`rye sync` to create/update the virtual environment
We use [uv](https://docs.astral.sh/uv/) to manage python dependencies.
`uv sync` to create/update the virtual environment

We use [maturin](https://www.maturin.rs/) for developing and building:
- `maturin develop` - build and install the python bindings into the current venv
Expand Down
7 changes: 7 additions & 0 deletions py-denormalized/pdocs/module.html.jinja2
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{% extends "default/module.html.jinja2" %}

{% block nav_title %}
<a href="https://github.com/probably-nothing-labs/denormalized">
<img src="https://raw.githubusercontent.com/probably-nothing-labs/denormalized/refs/heads/main/docs/images/denormalized_logo.png" alt="Denormalized Logo" class="logo">
</a>
{% endblock %}
24 changes: 14 additions & 10 deletions py-denormalized/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,28 @@ dependencies = [
]

[project.optional-dependencies]
tests = ["pytest"]
feast = ["feast"]
dev = []

[tool.maturin]
python-source = "python"
features = ["pyo3/extension-module"]
module-name = "denormalized._d_internal"

[tool.rye]
dev-dependencies = [
"pip>=24.2",
[dependency-groups]
dev = [
"pdoc>=15.0.0",
"ipython>=8.26.0",
"pytest>=8.3.2",
"maturin>=1.7.4",
"pyarrow-stubs>=17.11",
"pandas>=2.2.3",
"jupyterlab>=4.3.0",
"pdoc>=15.0.0",
"pip>=24.3.1",
]
docs = [
"pdoc>=15.0.0",
]

[tool.maturin]
python-source = "python"
features = ["pyo3/extension-module"]
module-name = "denormalized._d_internal"

# Enable docstring linting using the google style guide
[tool.ruff.lint]
Expand All @@ -50,3 +53,4 @@ max-doc-length = 88
include = ["python"]
exclude = ["src"]
typeCheckingMode = "standard"
reportMissingImports = false
54 changes: 54 additions & 0 deletions py-denormalized/python/denormalized/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,65 @@
"""
[Denormalized](https://www.denormalized.io/) is a single node stream processing engine written in Rust and powered by Apache DataFusion 🚀
1. Install denormalized `pip install denormalized`
2. Start the custom docker image that contains an instance of kafka along with with a script that emits some sample data to kafka `docker run --rm -p 9092:9092 emgeee/kafka_emit_measurements:latest`
```python
sample_event = {
"occurred_at_ms": 100,
"sensor_name": "foo",
"reading": 0.0,
}
def print_batch(rb):
pp.pprint(rb.to_pydict())
ds = Context().from_topic(
"temperature",
json.dumps(sample_event),
"localhost:9092",
"occurred_at_ms",
)
ds.window(
[col("sensor_name")],
[
f.count(col("reading"), distinct=False, filter=None).alias("count"),
f.min(col("reading")).alias("min"),
f.max(col("reading")).alias("max"),
],
1000,
None,
).sink(print_batch)
```
Head on over to the [examples folder](https://github.com/probably-nothing-labs/denormalized/tree/main/py-denormalized/python/examples) to see more examples that demonstrate additional functionality including stream joins and user defined (aggregate) functions.
"""

from .context import Context
from .data_stream import DataStream
from .datafusion import col, column
from .datafusion import functions as Functions
from .datafusion import lit, literal, udaf, udf
from .datafusion.expr import Expr

__all__ = [
"Context",
"DataStream",
"col",
"column",
"Expr",
"Functions",
"lit",
"literal",
"udaf",
"udf",
]

__docformat__ = "google"

try:
from .feast_data_stream import FeastDataStream

Expand Down
10 changes: 4 additions & 6 deletions py-denormalized/python/denormalized/context.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from denormalized._d_internal import PyContext

from .data_stream import DataStream


class Context:
"""A context manager for handling data stream operations.
This class provides functionality to create and manage data streams
from various sources like Kafka topics.
"""
Expand All @@ -16,15 +14,15 @@ def __init__(self) -> None:

def __repr__(self):
"""Returns the string representation of the PyContext object.
Returns:
str: String representation of the underlying PyContext.
"""
return self.ctx.__repr__()

def __str__(self):
"""Returns the string representation of the PyContext object.
Returns:
str: String representation of the underlying PyContext.
"""
Expand All @@ -44,7 +42,7 @@ def from_topic(
topic: The name of the Kafka topic to consume from.
sample_json: A sample JSON string representing the expected message format.
bootstrap_servers: Comma-separated list of Kafka broker addresses.
timestamp_column: Optional column name containing message timestamps. If this is not specified it will default to using the kafka timestamp the message was received at.
timestamp_column: Optional column name containing message timestamps.
group_id: Kafka consumer group ID, defaults to "default_group".
Returns:
Expand Down
Loading

0 comments on commit c6bac4c

Please sign in to comment.