diff --git a/README.md b/README.md index 747eaf6..fbe429e 100644 --- a/README.md +++ b/README.md @@ -210,7 +210,7 @@ Pipelines typically consist of a sequence of non-blocking channel transformation The general rule is: any error occurring anywhere in a pipeline is propagated down to the final stage, where it's caught by some blocking function and returned to the caller. -Rill provides a wide selection of blocking functions. Some of them are: +Rill provides a wide selection of blocking functions. Here are some commonly used ones: - **ForEach:** Concurrently applies a user function to each item in the stream. [Example](https://pkg.go.dev/github.com/destel/rill#example-ForEach) @@ -233,6 +233,7 @@ all goroutines feeding the stream are allowed to complete. Rill is context-agnostic, meaning that it does not enforce any specific context usage. However, it's recommended to make user-defined pipeline stages context-aware. This is especially important for the initial stage, as it allows to stop feeding the pipeline with new items after the context cancellation. +In practice the first stage is often naturally context-aware through Go's standard APIs for databases, HTTP clients, and other external sources. In the example below the `CheckAllUsersExist` function uses several concurrent workers to check if all users from the given list exist. When an error occurs (like a non-existent user), the function returns that error @@ -244,7 +245,7 @@ func main() { ctx := context.Background() // ID 999 doesn't exist, so fetching will stop after hitting it. - err := CheckAllUsersExist(ctx, 3, []int{1, 2, 3, 4, 5, 999, 7, 8, 9, 10}) + err := CheckAllUsersExist(ctx, 3, []int{1, 2, 3, 4, 5, 999, 7, 8, 9, 10, 11, 12, 13, 14, 15}) fmt.Printf("Check result: %v\n", err) } @@ -273,6 +274,22 @@ func CheckAllUsersExist(ctx context.Context, concurrency int, ids []int) error { } ``` +In the example above only the second stage (`mockapi.GetUser`) of the pipeline is context-aware. +**FromSlice** works well here since the input is small, iteration is fast and context cancellation prevents expensive API calls regardless. +The following code demonstrates how to replace **FromSlice** with **Generate** when full context awareness becomes important. + +```go +idsStream := rill.Generate(func(send func(int), sendErr func(error)) { + for _, id := range ids { + if ctx.Err() != nil { + return + } + send(id) + } +}) +``` + + ## Order Preservation (Ordered Fan-In) Concurrent processing can boost performance, but since tasks take different amounts of time to complete, @@ -299,12 +316,12 @@ func main() { // The string to search for in the downloaded files needle := []byte("26") - // Start with a stream of numbers from 0 to 999 - fileIDs := streamNumbers(ctx, 0, 1000) - - // Generate a stream of URLs from http://example.com/file-0.txt to http://example.com/file-999.txt - urls := rill.OrderedMap(fileIDs, 1, func(id int) (string, error) { - return fmt.Sprintf("https://example.com/file-%d.txt", id), nil + // Generate a stream of URLs from https://example.com/file-0.txt + // to https://example.com/file-999.txt + urls := rill.Generate(func(send func(string), sendErr func(error)) { + for i := 0; i < 1000 && ctx.Err() == nil; i++ { + send(fmt.Sprintf("https://example.com/file-%d.txt", i)) + } }) // Download and process the files @@ -335,22 +352,6 @@ func main() { fmt.Println("Not found") } } - -// helper function that creates a stream of numbers [start, end) and respects the context -func streamNumbers(ctx context.Context, start, end int) <-chan rill.Try[int] { - out := make(chan rill.Try[int]) - go func() { - defer close(out) - for i := start; i < end; i++ { - select { - case <-ctx.Done(): - return - case out <- rill.Try[int]{Value: i}: - } - } - }() - return out -} ``` @@ -389,24 +390,21 @@ func main() { } // StreamUsers is a reusable streaming wrapper around the mockapi.ListUsers function. -// It iterates through all listing pages and returns a stream of users. +// It iterates through all listing pages and uses [Generate] to simplify sending users and errors to the resulting stream. // This function is useful both on its own and as part of larger pipelines. func StreamUsers(ctx context.Context, query *mockapi.UserQuery) <-chan rill.Try[*mockapi.User] { - res := make(chan rill.Try[*mockapi.User]) - - if query == nil { - query = &mockapi.UserQuery{} - } - - go func() { - defer close(res) + return rill.Generate(func(send func(*mockapi.User), sendErr func(error)) { + var currentQuery mockapi.UserQuery + if query != nil { + currentQuery = *query + } for page := 0; ; page++ { - query.Page = page + currentQuery.Page = page - users, err := mockapi.ListUsers(ctx, query) + users, err := mockapi.ListUsers(ctx, ¤tQuery) if err != nil { - res <- rill.Wrap[*mockapi.User](nil, err) + sendErr(err) return } @@ -415,12 +413,10 @@ func StreamUsers(ctx context.Context, query *mockapi.UserQuery) <-chan rill.Try[ } for _, user := range users { - res <- rill.Wrap(user, nil) + send(user) } } - }() - - return res + }) } ``` diff --git a/example_test.go b/example_test.go index 96e478f..b37f351 100644 --- a/example_test.go +++ b/example_test.go @@ -170,12 +170,12 @@ func Example_ordering() { // The string to search for in the downloaded files needle := []byte("26") - // Start with a stream of numbers from 0 to 999 - fileIDs := streamNumbers(ctx, 0, 1000) - - // Generate a stream of URLs from http://example.com/file-0.txt to http://example.com/file-999.txt - urls := rill.OrderedMap(fileIDs, 1, func(id int) (string, error) { - return fmt.Sprintf("https://example.com/file-%d.txt", id), nil + // Generate a stream of URLs from https://example.com/file-0.txt + // to https://example.com/file-999.txt + urls := rill.Generate(func(send func(string), sendErr func(error)) { + for i := 0; i < 1000 && ctx.Err() == nil; i++ { + send(fmt.Sprintf("https://example.com/file-%d.txt", i)) + } }) // Download and process the files @@ -267,24 +267,21 @@ func Example_flatMap() { } // StreamUsers is a reusable streaming wrapper around the mockapi.ListUsers function. -// It iterates through all listing pages and returns a stream of users. +// It iterates through all listing pages and uses [Generate] to simplify sending users and errors to the resulting stream. // This function is useful both on its own and as part of larger pipelines. func StreamUsers(ctx context.Context, query *mockapi.UserQuery) <-chan rill.Try[*mockapi.User] { - res := make(chan rill.Try[*mockapi.User]) - - if query == nil { - query = &mockapi.UserQuery{} - } - - go func() { - defer close(res) + return rill.Generate(func(send func(*mockapi.User), sendErr func(error)) { + var currentQuery mockapi.UserQuery + if query != nil { + currentQuery = *query + } for page := 0; ; page++ { - query.Page = page + currentQuery.Page = page - users, err := mockapi.ListUsers(ctx, query) + users, err := mockapi.ListUsers(ctx, ¤tQuery) if err != nil { - res <- rill.Wrap[*mockapi.User](nil, err) + sendErr(err) return } @@ -293,12 +290,10 @@ func StreamUsers(ctx context.Context, query *mockapi.UserQuery) <-chan rill.Try[ } for _, user := range users { - res <- rill.Wrap(user, nil) + send(user) } } - }() - - return res + }) } // This example demonstrates how to gracefully stop a pipeline on the first error. @@ -308,7 +303,7 @@ func Example_context() { ctx := context.Background() // ID 999 doesn't exist, so fetching will stop after hitting it. - err := CheckAllUsersExist(ctx, 3, []int{1, 2, 3, 4, 5, 999, 7, 8, 9, 10}) + err := CheckAllUsersExist(ctx, 3, []int{1, 2, 3, 4, 5, 999, 7, 8, 9, 10, 11, 12, 13, 14, 15}) fmt.Printf("Check result: %v\n", err) } @@ -319,7 +314,15 @@ func CheckAllUsersExist(ctx context.Context, concurrency int, ids []int) error { defer cancel() // Convert the slice into a stream - idsStream := rill.FromSlice(ids, nil) + // Use Generate instead of FromSlice to make the first stage context-aware + idsStream := rill.Generate(func(send func(int), sendErr func(error)) { + for _, id := range ids { + if ctx.Err() != nil { + return + } + send(id) + } + }) // Fetch users concurrently. users := rill.Map(idsStream, concurrency, func(id int) (*mockapi.User, error) { @@ -615,6 +618,33 @@ func ExampleForEach_ordered() { fmt.Println("Error:", err) } +// Generate a stream of URLs from https://example.com/file-0.txt to https://example.com/file-9.txt +func ExampleGenerate() { + urls := rill.Generate(func(send func(string), sendErr func(error)) { + for i := 0; i < 10; i++ { + send(fmt.Sprintf("https://example.com/file-%d.txt", i)) + } + }) + + printStream(urls) +} + +// Generate an infinite stream of natural numbers (1, 2, 3, ...). +// New numbers are sent to the stream every 500ms until the context is canceled +func ExampleGenerate_context() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + numbers := rill.Generate(func(send func(int), sendErr func(error)) { + for i := 1; ctx.Err() == nil; i++ { + send(i) + time.Sleep(500 * time.Millisecond) + } + }) + + printStream(numbers) +} + func ExampleMap() { // Convert a slice of numbers into a stream numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) @@ -746,22 +776,6 @@ func square(x int) int { return x * x } -// helper function that creates a stream of numbers [start, end) and respects the context -func streamNumbers(ctx context.Context, start, end int) <-chan rill.Try[int] { - out := make(chan rill.Try[int]) - go func() { - defer close(out) - for i := start; i < end; i++ { - select { - case <-ctx.Done(): - return - case out <- rill.Try[int]{Value: i}: - } - } - }() - return out -} - // printStream prints all items from a stream (one per line) and an error if any. func printStream[A any](stream <-chan rill.Try[A]) { fmt.Println("Result:") diff --git a/mockapi/users.go b/mockapi/users.go index 8160618..335589f 100644 --- a/mockapi/users.go +++ b/mockapi/users.go @@ -1,5 +1,6 @@ // Package mockapi provides a very basic mock API for examples and demos. // It's intentionally kept public to enable running and experimenting with examples in the Go Playground. +// The implementation is naive and uses full scan for all operations. package mockapi import ( @@ -20,20 +21,27 @@ type User struct { } // don't use pointers here, to make sure that raw data is not accessible from outside -var departments = []string{"HR", "IT", "Finance", "Marketing", "Sales", "Support", "Engineering", "Management"} -var users = make(map[int]User) +var departments []string +var users []User var mu sync.RWMutex func init() { + const usersCount = 100 + var adjs = []string{"Big", "Small", "Fast", "Slow", "Smart", "Happy", "Sad", "Funny", "Serious", "Angry"} var nouns = []string{"Dog", "Cat", "Bird", "Fish", "Mouse", "Elephant", "Lion", "Tiger", "Bear", "Wolf"} mu.Lock() defer mu.Unlock() + departments = []string{"HR", "IT", "Finance", "Marketing", "Sales", "Support", "Engineering", "Management"} + // Generate users - for i := 1; i <= 100; i++ { + // Use deterministic values for all fields to make examples reproducible + users = make([]User, 0, usersCount) + + for i := 1; i <= usersCount; i++ { user := User{ ID: i, Name: adjs[hash(i, "name1")%len(adjs)] + " " + nouns[hash(i, "name2")%len(nouns)], // adj + noun @@ -42,7 +50,7 @@ func init() { IsActive: hash(i, "active")%100 < 60, // 60% } - users[i] = user + users = append(users, user) } } @@ -57,37 +65,38 @@ func GetUser(ctx context.Context, id int) (*User, error) { if err := ctx.Err(); err != nil { return nil, err } - randomSleep(ctx, 500*time.Millisecond) mu.RLock() defer mu.RUnlock() - user, ok := users[id] - if !ok { - return nil, fmt.Errorf("user not found") + idx, err := getUserIndex(id) + if err != nil { + return nil, err } + user := users[idx] return &user, nil } // GetUsers returns a list of users by IDs. // If a user is not found, nil is returned in the corresponding position. func GetUsers(ctx context.Context, ids []int) ([]*User, error) { - randomSleep(ctx, 1000*time.Millisecond) if err := ctx.Err(); err != nil { return nil, err } + randomSleep(ctx, 1000*time.Millisecond) mu.RLock() defer mu.RUnlock() res := make([]*User, 0, len(ids)) for _, id := range ids { - user, ok := users[id] - if !ok { + idx, err := getUserIndex(id) + if err != nil { res = append(res, nil) } else { + user := users[idx] res = append(res, &user) } } @@ -102,15 +111,16 @@ type UserQuery struct { // ListUsers returns a paginated list of users optionally filtered by department. func ListUsers(ctx context.Context, query *UserQuery) ([]*User, error) { - randomSleep(ctx, 1000*time.Millisecond) if err := ctx.Err(); err != nil { return nil, err } + randomSleep(ctx, 1000*time.Millisecond) const pageSize = 10 if query == nil { query = &UserQuery{} } + offset := query.Page * pageSize mu.RLock() @@ -131,7 +141,8 @@ func ListUsers(ctx context.Context, query *UserQuery) ([]*User, error) { break } - res = append(res, &user) + userCopy := user + res = append(res, &userCopy) } return res, nil @@ -139,10 +150,10 @@ func ListUsers(ctx context.Context, query *UserQuery) ([]*User, error) { // SaveUser saves a user. func SaveUser(ctx context.Context, user *User) error { - randomSleep(ctx, 1000*time.Millisecond) if err := ctx.Err(); err != nil { return err } + randomSleep(ctx, 1000*time.Millisecond) if user == nil { return fmt.Errorf("user is nil") @@ -158,10 +169,26 @@ func SaveUser(ctx context.Context, user *User) error { mu.Lock() defer mu.Unlock() - users[user.ID] = *user + idx, err := getUserIndex(user.ID) + if err != nil { + users = append(users, *user) + } else { + users[idx] = *user + } + return nil } +func getUserIndex(id int) (int, error) { + for i, u := range users { + if u.ID == id { + return i, nil + } + } + + return -1, fmt.Errorf("user not found") +} + func hash(input ...any) int { hasher := fnv.New32() fmt.Fprintln(hasher, input...) diff --git a/wrap.go b/wrap.go index 648e419..a3da74d 100644 --- a/wrap.go +++ b/wrap.go @@ -169,3 +169,40 @@ func ToChans[A any](in <-chan Try[A]) (<-chan A, <-chan error) { return out, errs } + +// Generate is a shorthand for creating streams. +// It provides a more ergonomic way of sending both values and errors to a stream, manages goroutine and channel lifecycle. +// +// stream := rill.Generate(func(send func(int), sendErr func(error)) { +// for i := 0; i < 100; i++ { +// send(i) +// } +// sendErr(someError) +// }) +// +// Here's how the same code would look without Generate: +// +// stream := make(chan rill.Try[int]) +// go func() { +// defer close(stream) +// for i := 0; i < 100; i++ { +// stream <- rill.Try[int]{Value: i} +// } +// stream <- rill.Try[int]{Error: someError} +// }() +func Generate[A any](f func(send func(A), sendErr func(error))) <-chan Try[A] { + out := make(chan Try[A]) + go func() { + defer close(out) + + send := func(a A) { + out <- Try[A]{Value: a} + } + sendErr := func(err error) { + out <- Try[A]{Error: err} + } + + f(send, sendErr) + }() + return out +} diff --git a/wrap_test.go b/wrap_test.go index 790ac94..7df5bf8 100644 --- a/wrap_test.go +++ b/wrap_test.go @@ -180,3 +180,20 @@ func TestFromChans(t *testing.T) { runTest("values and errors", makeSlice(10000), makeErrSlice(10000)) runTest("values and nil errors", makeSlice(10), []error{nil, nil, fmt.Errorf("err"), nil}) } + +func TestGenerate(t *testing.T) { + in := Generate(func(send func(int), sendErr func(error)) { + for i := 0; i < 10; i++ { + if i%2 == 0 { + send(i) + } else { + sendErr(fmt.Errorf("err%d", i)) + } + } + }) + + outSlice, errSlice := toSliceAndErrors(in) + + th.ExpectSlice(t, outSlice, []int{0, 2, 4, 6, 8}) + th.ExpectSlice(t, errSlice, []string{"err1", "err3", "err5", "err7", "err9"}) +}