Skip to content

Commit

Permalink
updated actions
Browse files Browse the repository at this point in the history
  • Loading branch information
skull8888888 committed Dec 1, 2024
1 parent d85172a commit 479cc03
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 12 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ permissions:
jobs:
publish:
runs-on: ubuntu-latest
environment:
name: pypi
url: https://pypi.org/p/lmnr/
permissions:
id-token: write
steps:
- uses: actions/checkout@v4
- name: Install uv
Expand Down
65 changes: 55 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,23 @@ Results of all tasks are stored in a thread-safe `Context`.
This task-based architecture makes complex workflows surprisingly simple:

**What's Possible:**
- [x] Parallel task execution without explicit threading code
- [x] Self-modifying dynamic workflows and cycles
- [x] Conditional branching and control flow
- [x] Streaming of tasks execution
- [x] Automatic state management and persistence
[x] Parallel task execution without explicit threading code
[x] Self-modifying dynamic workflows and cycles
[x] Conditional branching and control flow
[x] Streaming of tasks execution
[x] Automatic state management and persistence

Flow is extremely lightweight, clearly written and has not external dependencies for the engine.
Flow is extremely lightweight, clearly written and has not external dependencies for the engine. It is designed and maintained by [Laminar](https://github.com/lmnr-ai) team.

## Auto-instrumentation
Flow comes with auto-instrumentation for tracing using [Laminar](https://github.com/lmnr-ai/lmnr). To enable tracing, initialize the Laminar SDK with tracing enabled before using Flow.

```python
from lmnr import Laminar
Laminar.initialize(project_api_key="...")
```

> Tracing is extremely useful for debugging and state reconstruction. When tracing is enabled, Flow will automatically capture the state at each step. During debugging, you can load the captured state and inspect the context. To learn more about tracing, see the [Laminar docs](https://docs.lmnr.ai).
## Getting started

Expand All @@ -30,6 +40,7 @@ Flow is extremely lightweight, clearly written and has not external dependencies
from concurrent.futures import ThreadPoolExecutor
from lmnr_flow import Flow, TaskOutput

# thread pool executor is optional, defaults to 4 workers
flow = Flow(thread_pool_executor=ThreadPoolExecutor(max_workers=4))

# Simple task that returns a result
Expand All @@ -48,7 +59,7 @@ def task1(context: Context) -> TaskOutput:

def task2(context: Context) -> TaskOutput:
# Access results from previous tasks
t1_result = context.get("task1") # Gets "result1"
t1_result = context.get("task1") # waits for task1 to complete
return TaskOutput(output="result2", next=None)

flow.add_task("task1", task1)
Expand Down Expand Up @@ -126,6 +137,40 @@ result = flow.run("greet", inputs={"user_name": "Alice"})
# Returns {"greet": "Hello Alice!"}
```

### Dynamic Routing
```python
def router(context: Context) -> TaskOutput:
task_type = context.get("type")
routes = {
"process": ["process_task"],
"analyze": ["analyze_task"],
"report": ["report_task"]
}
return TaskOutput(output=f"routing to {task_type}", next=routes.get(task_type, []))

def process_task(context: Context) -> TaskOutput:
return TaskOutput(output="processed data", next=None)

flow.add_task("router", router)
flow.add_task("process_task", process_task)
result = flow.run("router", inputs={"type": "process"})
# Returns {"process_task": "processed data"}
```

## State Management

```python
context = Context()
context.from_dict({"task1": "result1"})

flow = Flow(context=context)
flow.add_task("task2", lambda ctx: TaskOutput("result2", None))
flow.run("task2")

assert flow.context.get("task1") == "result1" # True, because it was set in the context
assert flow.context.get("task2") == "result2"
```

## Advanced Features

- **Context Sharing**: All tasks share the same context, allowing for complex data flows
Expand All @@ -134,6 +179,6 @@ result = flow.run("greet", inputs={"user_name": "Alice"})
- **Minimal Dependencies**: Core engine has zero external dependencies

## Roadmap
- [ ] Add async
- [ ] Serverless deployment
- [ ] Timetravel UI
[ ] Add async support
[ ] Serverless deployment

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "lmnr-flow"
version = "0.1.0b1"
version = "0.1.0"
description = "Lightweight task engine for building AI agents"
readme = "README.md"
requires-python = ">=3.10"
Expand Down
18 changes: 17 additions & 1 deletion tests/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor

from src.lmnr_flow.flow import Flow, TaskOutput
from src.lmnr_flow.flow import Context, Flow, TaskOutput


@pytest.fixture(autouse=True)
Expand Down Expand Up @@ -59,6 +59,14 @@ def thread_pool():
def flow(thread_pool):
return Flow(thread_pool)

@pytest.fixture
def flow_with_state(thread_pool):
context = Context()
context.from_dict({
"task1": "result1"
})

return Flow(thread_pool, context)

def test_simple_task_execution(flow):
# Test single task that returns no next tasks
Expand Down Expand Up @@ -291,3 +299,11 @@ def task3(ctx):

result = flow.run("task1", inputs={"count": 0})
assert result == {"task3": "final"}

def test_state_loading(flow_with_state):
# Test that state is loaded correctly
flow_with_state.add_task("task2", lambda ctx: TaskOutput("result2", None))
flow_with_state.run("task2")

assert flow_with_state.context.get("task1") == "result1"
assert flow_with_state.context.get("task2") == "result2"

0 comments on commit 479cc03

Please sign in to comment.