Skip to content

Commit

Permalink
0.4.0 (#40)
Browse files Browse the repository at this point in the history
* save

* save

* start_workers

* worker

* output queues

* test worker

* save

* Stage

* FromIterable

* Map

* flat_map

* filter

* each

* finish process

* thread

* init task

* finish task queue

* run_coroutine_threadsafe

* supervisor

* stage

* cleanup

* initial map implementation

* each

* run=True mistery solved

* finish async tests

* add todo

* fix typo

* test_from_to_iterable

* refactor

* port process filter

* finish port

* port thread

* update task to new api

* simplify build on Stage

* task: port Worker + Stage

* rename process tests

* initial task migration

* finish process + thread + task

* partial sync port

* finish sync port

* implement sync sorted using sorted

* type iter_dependencies

* update readme

* docs init

* clean up docs

* docs tweaks

* remove old code

* update docs

* 0.4.0
  • Loading branch information
cgarciae authored Jun 21, 2020
1 parent 9e85df4 commit 142a9f2
Show file tree
Hide file tree
Showing 170 changed files with 11,136 additions and 5,799 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,5 @@ XDG_CACHE_HOME

# custom
.vscode
/scratch.py
/scratch.py
/test*.*
15 changes: 11 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
# Changelog

## [0.4.0] - 2020-06-21
* Big internal refactor:
* Reduces the risk of potential zombie workers
* New internal Worker and Supervisor classes which make code more readable / maintainable.
* Code is now split into individual files for each API function to make contribution easier and improve maintainability.
* API Reference docs are now shown per function and a new Overview page was created per module.

#### Breaking Changes
* `maxsize` arguement is removed from all `from_iterable` functions as it was not used.
* `worker_constructor` parameter was removed from all `from_iterable` functions in favor of the simpler `use_thread` argument.

## [0.3.0] - 2020-04-05
### Adds
* `ordered` function in all modules, this orders output elements based on the order of creation on the source iterable.
* Additional options and rules for the depending injection mechanism. See [Advanced Usage](https://cgarciae.github.io/pypeln/advanced/#dependency-injection).
* All `pl.*.Stage` classes now inherit from `pl.BaseStage`.

## [0.2.7] - 2020-03-14
### Adds
* `timeout` parameter to most funtions in all modules, this stops code execution after a given amount of time if the task has not been completed.

## [0.2.6] - 2020-03-04
### Adds
* `sync` module which follows Pypeln's API but executes everything synchronously using python generators, meant for debugging purposes.

## [0.2.5] - 2020-03-03
### Fixes
* Fixed critical bug (#29) related to `**kwarg` arguments created in `on_start` not being passed to `on_done`.
27 changes: 26 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,32 @@ stage = pl.task.filter(slow_gt3, stage, workers=2)

data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]
```
Conceptually similar but everything is running in a single thread and Task workers are created dynamically.
Conceptually similar but everything is running in a single thread and Task workers are created dynamically. If the code is running inside an async task can use `await` on the stage instead to avoid blocking:

```python
import pypeln as pl
import asyncio
from random import random

async def slow_add1(x):
await asyncio.sleep(random()) # <= some slow computation
return x + 1

async def slow_gt3(x):
await asyncio.sleep(random()) # <= some slow computation
return x > 3


def main():
data = range(10) # [0, 1, 2, ..., 9]

stage = pl.task.map(slow_add1, data, workers=3, maxsize=4)
stage = pl.task.filter(slow_gt3, stage, workers=2)

data = await stage # e.g. [5, 6, 9, 4, 8, 10, 7]

asyncio.run(main())
```

## Mixed Pipelines
You can create pipelines using different worker types such that each type is the best for its given task so you can get the maximum performance out of your code:
Expand Down
65 changes: 0 additions & 65 deletions docs/advanced.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,71 +123,6 @@ stage = pl.process.map(f, stage, workers=3, on_start=on_start, on_end=on_end)

Any element in the dictionary returned by `on_start` can be consumed as an argument by `f` and `on_done`.

## Asyncio Integration

While you can consume `task` stages synchronously as you've seen, there are 2 ways to consume them using python `async` syntax:

#### await
You can call `await` con any `task.Stage` to get back the results of its computation:

```python
import pypeln as pl
import asyncio
from random import random

async def slow_add1(x):
await asyncio.sleep(random()) # <= some slow computation
return x + 1

async def slow_gt3(x):
await asyncio.sleep(random()) # <= some slow computation
return x > 3

async def main()
data = range(10) # [0, 1, 2, ..., 9]

stage = pl.task.map(slow_add1, data, workers=3, maxsize=4)
stage = pl.task.filter(slow_gt3, stage, workers=2)

data = await stage # e.g. [5, 6, 9, 4, 8, 10, 7]
```
When calling `await` on a stage you will get back the same result if you called `list` on it with be big difference that you wont block the current thread while waiting for the computation to materialize.

!!! note
In this example you are responsible for running the `main` task in the event loop yourself.

#### async for
`task` Stages are asynchronous generators so you can iterate through them using `async for` to get access each new element as soon as it become available:

```python
import pypeln as pl
import asyncio
from random import random

async def slow_add1(x):
await asyncio.sleep(random()) # <= some slow computation
return x + 1

async def slow_gt3(x):
await asyncio.sleep(random()) # <= some slow computation
return x > 3

async def main()
data = range(10) # [0, 1, 2, ..., 9]

stage = pl.task.map(slow_add1, data, workers=3, maxsize=4)
stage = pl.task.filter(slow_gt3, stage, workers=2)

async for element in stage:
pritn(element) # 5 6 9 4 8 10 7
```
When iterating a stage using `async for` you will get back the same result as if you called the normal `for` on it with be big difference that you wont block the current thread while waiting for the next element.

!!! note
In this example you are responsible for running the `main` task in the event loop yourself.

### Event Loop
When you run a `task` stage synchronously the tasks run on `pypeln`'s own event loop, however, if you itegrate them with other async code via `await` or `async for` these tasks will run on the current loop defined by `asyncio.get_event_loop()`.

## Pipe Operator

Expand Down
3 changes: 0 additions & 3 deletions docs/api/process.md

This file was deleted.

20 changes: 20 additions & 0 deletions docs/api/process/Overview.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@

# pl.process

::: pypeln.process
selection:
members:
- na


### Members
* [concat](concat.md)
* [each](each.md)
* [filter](filter.md)
* [flat_map](flat_map.md)
* [from_iterable](from_iterable.md)
* [map](map.md)
* [ordered](ordered.md)
* [run](run.md)
* [to_iterable](to_iterable.md)

4 changes: 4 additions & 0 deletions docs/api/process/concat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# pl.process.concat

::: pypeln.process.concat
4 changes: 4 additions & 0 deletions docs/api/process/each.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# pl.process.each

::: pypeln.process.each
4 changes: 4 additions & 0 deletions docs/api/process/filter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# pl.process.filter

::: pypeln.process.filter
4 changes: 4 additions & 0 deletions docs/api/process/flat_map.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# pl.process.flat_map

::: pypeln.process.flat_map
4 changes: 4 additions & 0 deletions docs/api/process/from_iterable.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# pl.process.from_iterable

::: pypeln.process.from_iterable
4 changes: 4 additions & 0 deletions docs/api/process/map.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# pl.process.map

::: pypeln.process.map
4 changes: 4 additions & 0 deletions docs/api/process/ordered.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# pl.process.ordered

::: pypeln.process.ordered
4 changes: 4 additions & 0 deletions docs/api/process/run.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# pl.process.run

::: pypeln.process.run
4 changes: 4 additions & 0 deletions docs/api/process/to_iterable.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# pl.process.to_iterable

::: pypeln.process.to_iterable
3 changes: 0 additions & 3 deletions docs/api/sync.md

This file was deleted.

20 changes: 20 additions & 0 deletions docs/api/sync/Overview.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@

# pl.sync

::: pypeln.sync
selection:
members:
- na


### Members
* [concat](concat.md)
* [each](each.md)
* [filter](filter.md)
* [flat_map](flat_map.md)
* [from_iterable](from_iterable.md)
* [map](map.md)
* [ordered](ordered.md)
* [run](run.md)
* [to_iterable](to_iterable.md)

4 changes: 4 additions & 0 deletions docs/api/sync/concat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# pl.sync.concat

::: pypeln.sync.concat
4 changes: 4 additions & 0 deletions docs/api/sync/each.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# pl.sync.each

::: pypeln.sync.each
4 changes: 4 additions & 0 deletions docs/api/sync/filter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# pl.sync.filter

::: pypeln.sync.filter
4 changes: 4 additions & 0 deletions docs/api/sync/flat_map.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# pl.sync.flat_map

::: pypeln.sync.flat_map
4 changes: 4 additions & 0 deletions docs/api/sync/from_iterable.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# pl.sync.from_iterable

::: pypeln.sync.from_iterable
4 changes: 4 additions & 0 deletions docs/api/sync/map.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# pl.sync.map

::: pypeln.sync.map
4 changes: 4 additions & 0 deletions docs/api/sync/ordered.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# pl.sync.ordered

::: pypeln.sync.ordered
4 changes: 4 additions & 0 deletions docs/api/sync/run.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# pl.sync.run

::: pypeln.sync.run
4 changes: 4 additions & 0 deletions docs/api/sync/to_iterable.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# pl.sync.to_iterable

::: pypeln.sync.to_iterable
3 changes: 0 additions & 3 deletions docs/api/task.md

This file was deleted.

20 changes: 20 additions & 0 deletions docs/api/task/Overview.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@

# pl.task

::: pypeln.task
selection:
members:
- na


### Members
* [concat](concat.md)
* [each](each.md)
* [filter](filter.md)
* [flat_map](flat_map.md)
* [from_iterable](from_iterable.md)
* [map](map.md)
* [ordered](ordered.md)
* [run](run.md)
* [to_iterable](to_iterable.md)

4 changes: 4 additions & 0 deletions docs/api/task/concat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# pl.task.concat

::: pypeln.task.concat
4 changes: 4 additions & 0 deletions docs/api/task/each.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# pl.task.each

::: pypeln.task.each
4 changes: 4 additions & 0 deletions docs/api/task/filter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# pl.task.filter

::: pypeln.task.filter
4 changes: 4 additions & 0 deletions docs/api/task/flat_map.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# pl.task.flat_map

::: pypeln.task.flat_map
4 changes: 4 additions & 0 deletions docs/api/task/from_iterable.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# pl.task.from_iterable

::: pypeln.task.from_iterable
4 changes: 4 additions & 0 deletions docs/api/task/map.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# pl.task.map

::: pypeln.task.map
4 changes: 4 additions & 0 deletions docs/api/task/ordered.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# pl.task.ordered

::: pypeln.task.ordered
4 changes: 4 additions & 0 deletions docs/api/task/run.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# pl.task.run

::: pypeln.task.run
4 changes: 4 additions & 0 deletions docs/api/task/to_iterable.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# pl.task.to_iterable

::: pypeln.task.to_iterable
3 changes: 0 additions & 3 deletions docs/api/thread.md

This file was deleted.

20 changes: 20 additions & 0 deletions docs/api/thread/Overview.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@

# pl.thread

::: pypeln.thread
selection:
members:
- na


### Members
* [concat](concat.md)
* [each](each.md)
* [filter](filter.md)
* [flat_map](flat_map.md)
* [from_iterable](from_iterable.md)
* [map](map.md)
* [ordered](ordered.md)
* [run](run.md)
* [to_iterable](to_iterable.md)

4 changes: 4 additions & 0 deletions docs/api/thread/concat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# pl.thread.concat

::: pypeln.thread.concat
4 changes: 4 additions & 0 deletions docs/api/thread/each.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# pl.thread.each

::: pypeln.thread.each
Loading

0 comments on commit 142a9f2

Please sign in to comment.