Skip to content

Commit

Permalink
feat: automatic register of pipe, add | chaining, merge => mix, readm…
Browse files Browse the repository at this point in the history
…e examples
  • Loading branch information
flavioschneider committed Jul 20, 2023
1 parent 7caa8d9 commit c319580
Show file tree
Hide file tree
Showing 25 changed files with 274 additions and 181 deletions.
253 changes: 162 additions & 91 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,28 @@ pipe = Pipe(1, 2, 3, 4, 5)
pipe = Pipe([1, 2, 3, 4, 5])
```

_Chaining (dot-based)_
```py
pipe = Pipe(1, 2, 3, 4, 5).map(lambda x: x + 1).map(lambda x: x * 2)
list(pipe) == [4, 6, 8, 10, 12]
```

_Chaining (pipe-based)_
```py
from pipd import Pipe, Map
pipe = Pipe(1, 2, 3, 4, 5) | Map(lambda x: x + 1) | Map(lambda x: x * 2)
list(pipe) == [4, 6, 8, 10, 12]
```

_Use a meta pipeline (i.e. functional only)_
```py
pipe = Pipe.map(lambda x: x + 1).map(lambda x: x * 2)
# or
pipe = Map(lambda x: x + 1) | Map(lambda x: x * 2)

list(pipe([1, 2, 3, 4, 5])) == [4, 6, 8, 10, 12]
```

_Iterate over the pipeline_
```py
for item in pipe:
Expand All @@ -27,63 +49,33 @@ next(it)
next(it)
```

_Use a meta pipeline (i.e. functional only)_
```py
pipe = Pipe.map(lambda x: x + 1)

pipe([1, 2, 3, 4, 5]).list() == [2, 3, 4, 5, 6]
```

## Functions
### `map`
```py
from pipd import Pipe

pipe = Pipe(1, 2, 3).map(lambda x: x * 2)
print(list(pipe))
list(pipe) == [2, 4, 6]
```

<details> <summary> Show output </summary>

```py
[2, 4, 6]
```

</details>

_Map items to parallel workers_
```py
from pipd import Pipe

pipe = Pipe(1, 2, 3).map(lambda x: x * 2, num_workers=2) # parallel map (note: order is not guaranteed)
print(list(pipe))
```

<details> <summary> Show output </summary>

```py
[2, 4, 6]
list(pipe) == [2, 4, 6]
```

</details>

### `filter`

```py
from pipd import Pipe

pipe = Pipe(1, 2, 3).filter(lambda x: x != 1)
print(list(pipe))
list(pipe) == [2, 3]
```

<details> <summary> Show output </summary>

```py
[2, 3]
```

</details>

### `side`

Applies a function on each item in the pipeline without changing the item, useful for logging, saving state, etc.
Expand Down Expand Up @@ -112,39 +104,18 @@ side 2
from pipd import Pipe

pipe = Pipe(1, 2, 3, 4, 5).batch(2)
it = iter(pipe)
print(next(it))
print(next(it))
print(next(it))
```

<details> <summary> Show output </summary>

```py
[1, 2]
[3, 4]
[5]
list(pipe) == [[1, 2], [3, 4], [5]]
```

</details>

### `unbatch`

```py
from pipd import Pipe

pipe = Pipe([1, 2], [3], [4, 5]).unbatch()
print(list(pipe))
```

<details> <summary> Show output </summary>

```py
[1, 2, 3, 4, 5]
list(pipe) == [1, 2, 3, 4, 5]
```

</details>

### `log`

```py
Expand Down Expand Up @@ -176,21 +147,9 @@ list(pipe) # runs the pipeline
from pipd import Pipe

pipe = Pipe(range(10)).limit(5).log()
list(pipe) # runs the pipeline
list(pipe) == [0, 1, 2, 3, 4]
```

<details> <summary> Show output </summary>

```py
0
1
2
3
4
```

</details>

### `sleep`
Useful for debugging a pipeline that runs too fast.
```py
Expand Down Expand Up @@ -219,7 +178,6 @@ Shows a progress bar for the pipeline, useful to check how many `s/it` a pipelin
from pipd import Pipe

pipe = Pipe(range(5)).sleep(1).tqdm()

print(list(pipe))
```

Expand All @@ -238,51 +196,164 @@ print(list(pipe))

</details>

### `shuffle`
### `read_files`
### `read_lines`
### `write_lines`
### `filter_cached`

### `mix`

## Pipes
Mixes iterators or pipes together, either interleaved or random.

### `MergedPipe`
```py
from pipd import Pipe, MergedPipe
from pipd import Pipe, Mix

pipe = MergedPipe(
pipe = Pipe.mix([1, 2, 3], ['a', 'b', 'c'])
# or
pipe = Mix([1, 2, 3], ['a', 'b', 'c'])
list(pipe) == [1, 'a', 2, 'b', 3, 'c']
```

```py
from pipd import Pipe

pipe = Pipe.mix(
Pipe(1, 2, 3),
Pipe('a', 'b', 'c')
Pipe('a', 'b', 'c'),
random=True # randomize from which iterator/pipe to take the next item
)
list(pipe) == [1, 2, 'a', 'b', 3, 'c']
```

print(list(pipe))
### `map_key`
```py
from pipd import Pipe

pipe = Pipe([{'a': 1, 'b': 2}, {'a': 2, 'b': 4}]).map_key('a', lambda x: x + 1)
list(pipe) == [{'a': 2, 'b': 2}, {'a': 3, 'b': 4}]
```


### `shuffle`
```py
from pipd import Pipe

pipe = Pipe(range(10)).shuffle(size=5) # Shuffle buffer has size 5
list(pipe) == [3, 5, 6, 0, 2, 4, 1, 7, 8, 9]
```

### `read_files`
```py
from pipd import Pipe

pipe = Pipe(['*.md']).read_files()
list(pipe) == ['README.md']
```

### `read_lines`
```py
from pipd import Pipe

pipe = Pipe(['.gitignore', 'setup.py']).read_lines()
list(pipe)
```

<details> <summary> Show output </summary>

```py
[1, 'a', 2, 'b', 3, 'c']
['__pycache__',
'.mypy_cache',
'.DS_Store',
'TODO.md',
'*.ipynb',
'from setuptools import find_packages, setup',
'',
'setup(',
'name="pipd",',
'packages=find_packages(exclude=[]),',
'version="0.2.1",',
'description="Utility functions for python data pipelines.",',
'long_description_content_type="text/markdown",',
'author="ElevenLabs",',
'url="https://github.com/elevenlabs/pipd",',
'keywords=["data processing", "pipeline"],',
'install_requires=[],',
'classifiers=[],',
')']
```

</details>

### `write_lines`
```py
from pipd import Pipe, MergedPipe
from pipd import Pipe

pipe = MergedPipe(
Pipe(1, 2, 3),
Pipe('a', 'b', 'c'),
random=True # randomize from which pipe to take the next item
)
pipe = Pipe([0, 1, 2, 3]).write_lines('test.txt')
list(pipe) == [0, 1, 2, 3]
```

print(list(pipe))
<details> <summary> test.txt </summary>

```py
0
1
2
3
```

<details> <summary> Show output </summary>
</details>

### `filter_cached`

Saves items to cache `filepath` such that once the pipeline is run again, the items are filtered out.
The optional `key` function can be used to specify a custom key for the cache.

```py
[1, 2, 'a', 'b', 3, 'c']
from pipd import Pipe

pipe = Pipe(range(5)).filter_cached(filepath='./cache.txt', key=lambda x: x)

list(pipe) == [0, 1, 2, 3, 4]
list(pipe) == []
```

<details> <summary> cache.txt </summary>

```py
0
1
2
3
4
```

</details>

### `read_csv`
### `write_csv`

## Create custom `Pipe` object

```py
class PlusOne(Pipe):

def __call__(self, items):
for item in items:
yield item + 1

# Usage

pipe = Pipe([0,1,2]).plus_one()
list(pipe) == [1,2,3]

# or

pipe = Pipe([0,1,2]) | PlusOne()
list(pipe) == [1,2,3]

# or

pipe = PlusOne()
list(pipe([0,1,2])) == [1,2,3]

# multi

pipe = Pipe([0,1,2]).plus_one().plus_one()
list(pipe) == [2,3,4]
```
Loading

0 comments on commit c319580

Please sign in to comment.