Skip to content

Commit

Permalink
Clarify context usage in the readme and examples (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
destel authored Nov 25, 2024
1 parent 3d0ba14 commit 4c89497
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 12 deletions.
22 changes: 15 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,16 @@ does not grow with the input size.


## Quick Start
Let's look at a practical example: fetch users from an API, activate them, and save the changes back.
It shows how to control concurrency at each step while keeping the code clean and manageable.
Let's look at a practical example: fetch users from an API, activate them, and save the changes back.
It shows how to control concurrency at each step while keeping the code clean and manageable.
**ForEach** returns on the first error, and context cancellation via defer stops all remaining fetches.


[Try it](https://pkg.go.dev/github.com/destel/rill#example-package)
```go
func main() {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Convert a slice of user IDs into a channel
ids := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)
Expand Down Expand Up @@ -99,7 +102,8 @@ to fetch multiple users in a single call, instead of making individual `GetUser`
[Try it](https://pkg.go.dev/github.com/destel/rill#example-package-Batching)
```go
func main() {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Convert a slice of user IDs into a channel
ids := rill.FromSlice([]int{
Expand Down Expand Up @@ -305,19 +309,22 @@ Downloading all files at once would consume too much memory, processing them seq
and traditional concurrency patterns do not preserve the order of files, making it challenging to find the first match.

The combination of **OrderedFilter** and **First** functions solves this elegantly,
while downloading and keeping in memory at most 5 files at a time.
while downloading and keeping in memory at most 5 files at a time. **First** returns on the first match,
this triggers the context cancellation via defer, stopping URL generation and file downloads.

[Try it](https://pkg.go.dev/github.com/destel/rill#example-package-Ordering)

```go
func main() {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// The string to search for in the downloaded files
needle := []byte("26")

// Generate a stream of URLs from https://example.com/file-0.txt
// to https://example.com/file-999.txt
// Stop generating URLs if the context is canceled
urls := rill.Generate(func(send func(string), sendErr func(error)) {
for i := 0; i < 1000 && ctx.Err() == nil; i++ {
send(fmt.Sprintf("https://example.com/file-%d.txt", i))
Expand Down Expand Up @@ -370,7 +377,8 @@ This wrapper can be useful both on its own and as part of larger pipelines.
[Try it](https://pkg.go.dev/github.com/destel/rill#example-package-FlatMap)
```go
func main() {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Start with a stream of department names
departments := rill.FromSlice([]string{"IT", "Finance", "Marketing", "Support", "Engineering"}, nil)
Expand Down
18 changes: 13 additions & 5 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ import (

// This example demonstrates a Rill pipeline that fetches users from an API,
// updates their status to active and saves them back.
// Both operations are performed concurrently
// Both operations are performed concurrently.
// [ForEach] returns on the first error, and context cancellation via defer stops all remaining fetches.
func Example() {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Convert a slice of user IDs into a stream
ids := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)
Expand Down Expand Up @@ -58,7 +60,8 @@ func Example() {
// and updates their status to active and saves them back.
// Users are fetched concurrently and in batches to reduce the number of API calls.
func Example_batching() {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Convert a slice of user IDs into a stream
ids := rill.FromSlice([]int{
Expand Down Expand Up @@ -164,14 +167,18 @@ func updateUserTimestampWorker() {
//
// The combination of [OrderedFilter] and [First] functions solves the problem,
// while downloading and holding in memory at most 5 files at the same time.
// [First] returns on the first match, this triggers the context cancellation via defer,
// stopping URL generation and file downloads.
func Example_ordering() {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// The string to search for in the downloaded files
needle := []byte("26")

// Generate a stream of URLs from https://example.com/file-0.txt
// to https://example.com/file-999.txt
// Stop generating URLs if the context is canceled
urls := rill.Generate(func(send func(string), sendErr func(error)) {
for i := 0; i < 1000 && ctx.Err() == nil; i++ {
send(fmt.Sprintf("https://example.com/file-%d.txt", i))
Expand Down Expand Up @@ -247,7 +254,8 @@ func sendMessage(message string, server string) error {
// This example demonstrates using [FlatMap] to fetch users from multiple departments concurrently.
// Additionally, it demonstrates how to write a reusable streaming wrapper over paginated API calls - the StreamUsers function
func Example_flatMap() {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Start with a stream of department names
departments := rill.FromSlice([]string{"IT", "Finance", "Marketing", "Support", "Engineering"}, nil)
Expand Down

0 comments on commit 4c89497

Please sign in to comment.