Skip to content

Commit

Permalink
Merge pull request #1 from lmnr-ai:flow
Browse files Browse the repository at this point in the history
Flow
  • Loading branch information
skull8888888 authored Dec 1, 2024
2 parents 094c4c3 + 29a9463 commit d85172a
Show file tree
Hide file tree
Showing 12 changed files with 2,173 additions and 0 deletions.
26 changes: 26 additions & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: Publish Python Package

on:
push:
branches: ["main"]

permissions:
contents: read

jobs:
publish:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.10'
- name: Install the project
run: uv sync --all-extras --dev
- name: Build package
run: uv build
- name: Publish package
uses: pypa/gh-action-pypi-publish@release/v1
164 changes: 164 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
.DS_Store

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock

# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock

# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/

# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
9 changes: 9 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"[python]": {
"editor.codeActionsOnSave": {
"source.fixAll": "explicit",
"source.organizeImports": "explicit"
},
"editor.defaultFormatter": "charliermarsh.ruff"
}
}
139 changes: 139 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# Laminar Flow

A lightweight task engine for building AI agents that prioritizes simplicity and flexibility.

## Core Concept

Unlike traditional node-based workflows, Laminar Flow uses a dynamic task queue system built on three simple principles:

1. **Concurrent Execution** - Tasks run in parallel automatically
2. **Dynamic Scheduling** - Tasks can schedule new tasks at runtime
3. **Smart Dependencies** - Tasks can await results from previous operations

Results of all tasks are stored in a thread-safe `Context`.

This task-based architecture makes complex workflows surprisingly simple:

**What's Possible:**
- [x] Parallel task execution without explicit threading code
- [x] Self-modifying dynamic workflows and cycles
- [x] Conditional branching and control flow
- [x] Streaming of tasks execution
- [x] Automatic state management and persistence

Flow is extremely lightweight, clearly written and has not external dependencies for the engine.

## Getting started

### Basic Usage
```python
from concurrent.futures import ThreadPoolExecutor
from lmnr_flow import Flow, TaskOutput

flow = Flow(thread_pool_executor=ThreadPoolExecutor(max_workers=4))

# Simple task that returns a result
def my_task(context: Context) -> TaskOutput:
return TaskOutput(output="Hello World!", next=None)

flow.add_task("greet", my_task)
result = flow.run("greet") # Returns {"greet": "Hello World!"}
```

### Task Chaining
```python
# Tasks can trigger other tasks
def task1(context: Context) -> TaskOutput:
return TaskOutput(output="result1", next=["task2"])

def task2(context: Context) -> TaskOutput:
# Access results from previous tasks
t1_result = context.get("task1") # Gets "result1"
return TaskOutput(output="result2", next=None)

flow.add_task("task1", task1)
flow.add_task("task2", task2)
flow.run("task1") # Returns {"task2": "result2"}
```

### Parallel Execution
```python
def starter(context: Context) -> TaskOutput:
# Launch multiple tasks in parallel
return TaskOutput(output="started", next=["slow_task1", "slow_task2"])

def slow_task1(context: Context) -> TaskOutput:
time.sleep(1)
return TaskOutput(output="result1", next=None)

def slow_task2(context: Context) -> TaskOutput:
time.sleep(1)
return TaskOutput(output="result2", next=None)

# Both slow_tasks execute in parallel, taking ~1 second total
flow.add_task("starter", starter)
flow.add_task("slow_task1", slow_task1)
flow.add_task("slow_task2", slow_task2)
flow.run("starter")
```

### Streaming Results
```python
def streaming_task(context: Context) -> TaskOutput:
# Stream intermediate results
stream = context.get_stream()
for i in range(3):
stream.put(("streaming_task", f"interim_{i}"))
return TaskOutput(output="final", next=None)

flow.add_task("streaming_task", streaming_task)

# Get results as they arrive
for task_id, output in flow.stream("streaming_task"):
print(f"{task_id}: {output}")
# Prints:
# streaming_task: interim_0
# streaming_task: interim_1
# streaming_task: interim_2
# streaming_task: final
```

### Dynamic Workflows
```python
def conditional_task(context: Context) -> TaskOutput:
count = context.get("count", 0)

if count >= 3:
return TaskOutput(output="done", next=["finish"])

context.set("count", count + 1)
return TaskOutput(output=f"iteration_{count}", next=["conditional_task"])

# Task will loop 3 times before finishing
flow.add_task("conditional_task", conditional_task)
flow.add_task("finish", lambda ctx: TaskOutput("completed", None))
flow.run("conditional_task")
```

### Input Parameters
```python
def parameterized_task(context: Context) -> TaskOutput:
name = context.get("user_name")
return TaskOutput(output=f"Hello {name}!", next=None)

flow.add_task("greet", parameterized_task)
result = flow.run("greet", inputs={"user_name": "Alice"})
# Returns {"greet": "Hello Alice!"}
```

## Advanced Features

- **Context Sharing**: All tasks share the same context, allowing for complex data flows
- **Error Handling**: Exceptions in tasks are properly propagated
- **Thread Safety**: All operations are thread-safe
- **Minimal Dependencies**: Core engine has zero external dependencies

## Roadmap
- [ ] Add async
- [ ] Serverless deployment
- [ ] Timetravel UI
22 changes: 22 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "lmnr-flow"
version = "0.1.0b1"
description = "Lightweight task engine for building AI agents"
readme = "README.md"
requires-python = ">=3.10"

dependencies = [
"lmnr>=0.4.43",
"pytest>=8.3.3",
]

[project.urls]
"Homepage" = "https://github.com/lmnr-ai/flow"
"Bug Tracker" = "https://github.com/lmnr-ai/flow/issues"

[project.license]
file = "LICENSE"
3 changes: 3 additions & 0 deletions src/lmnr_flow/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .context import Context
from .flow import Flow, TaskOutput
from .state import State
Loading

0 comments on commit d85172a

Please sign in to comment.