Skip to content

Commit

Permalink
Wrapping functions
Browse files Browse the repository at this point in the history
  • Loading branch information
destel committed Mar 20, 2024
1 parent d6a73e9 commit 2e7a2c5
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 140 deletions.
196 changes: 105 additions & 91 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ without getting bogged down by the complexity of concurrency.
- **Error Handling**: provides a structured way to handle errors in concurrent apps
- **Streaming**: handles real-time data streams or large datasets with a minimal memory footprint
- **Order Preservation**: offers functions that preserve the original order of data, while still allowing for concurrent processing
- **Functional Programming**: based on functional programming concepts, making operations like map, filter, flatMap and others available for channel-based workflows
- **Efficient Resource Use**: the number of goroutines and allocations does not depend on data size
- **Generic**: all operations are type-safe and can be used with any data type

- **Functional Programming**: based on functional programming concepts, making operations like map, filter, flatMap and others available for channel-based workflows

## Installation
```bash
Expand All @@ -33,59 +33,68 @@ type KV struct {
Value string
}

func printValues(ctx context.Context, urls []string) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel() // In case of error or early exit, this ensures all http and redis operations are canceled

// Convert URLs into a channel
urlsChan := echans.FromSlice(urls)

// Fetch and stream keys from each URL concurrently
keys := echans.FlatMap(urlsChan, 10, func(url string) <-chan echans.Try[string] {
return streamKeys(ctx, url)
})

// Exclude any empty keys from the stream
keys = echans.Filter(keys, 5, func(key string) (bool, error) {
return key != "", nil
})

// Organize keys into manageable batches of 10 for bulk operations
keyBatches := echans.Batch(keys, 10, 1*time.Second)

// Fetch values from Redis for each batch of keys
resultBatches := echans.Map(keyBatches, 5, func(keys []string) ([]KV, error) {
values, err := redisMGet(ctx, keys...)
if err != nil {
return nil, err
}

results := make([]KV, len(keys))
for i, key := range keys {
results[i] = KV{Key: key, Value: values[i]}
}

return results, nil
})

// Convert batches back to a single items for final processing
results := echans.Unbatch(resultBatches)

// Exclude any empty values from the stream
results = echans.Filter(results, 5, func(kv KV) (bool, error) {
return kv.Value != "<nil>", nil
})

// Iterate over each key-value pair and print
cnt := 0
err := echans.ForEach(results, 1, func(kv KV) error {
fmt.Println(kv.Key, "=>", kv.Value)
cnt++
return nil
})
fmt.Println("Total keys:", cnt)

return err

func printValuesFromRedis(ctx context.Context, urls []string) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel() // In case of error, this ensures all http and redis operations are canceled

// Convert urls into a channel
urlsChan := rill.WrapSlice(urls)

// Fetch and stream keys from each URL concurrently
keys := rill.FlatMap(urlsChan, 10, func(url string) <-chan rill.Try[string] {
return streamKeys(ctx, url)
})

// Exclude any empty keys from the stream
keys = rill.Filter(keys, 5, func(key string) (bool, error) {
return key != "", nil
})

// Organize keys into manageable batches of 10 for bulk operations
keyBatches := rill.Batch(keys, 10, 1*time.Second)

// Fetch values from Redis for each batch of keys
resultBatches := rill.Map(keyBatches, 5, func(keys []string) ([]KV, error) {
values, err := redisMGet(ctx, keys...)
if err != nil {
return nil, err
}

results := make([]KV, len(keys))
for i, key := range keys {
results[i] = KV{Key: key, Value: values[i]}
}

return results, nil
})

// Convert batches back to a single items for final processing
results := rill.Unbatch(resultBatches)

// Exclude any empty values from the stream
results = rill.Filter(results, 5, func(kv KV) (bool, error) {
return kv.Value != "<nil>", nil
})

// Iterate over each key-value pair and print
cnt := 0
err := rill.ForEach(results, 1, func(kv KV) error {
fmt.Println(kv.Key, "=>", kv.Value)
cnt++
return nil
})
if err != nil {
return err
}

fmt.Println("Total keys:", cnt)
return nil
}

// streamKeys reads a file from the given URL line by line and returns a channel of lines/keys
func streamKeys(ctx context.Context, url string) <-chan rill.Try[string] {
// ...
}


Expand All @@ -96,9 +105,9 @@ func printValues(ctx context.Context, urls []string) error {

## Design philosophy
At the heart of rill lies a simple yet powerful concept: operating on channels of wrapped values, encapsulated by the Try structure.
Such channels can be created manually or through utilities like **FromSlice**, **Wrap**, and **WrapAsync**, and then transformed via operations
Such channels can be created manually or through utilities like **WrapSlice** or **WrapChan**, and then transformed via operations
such as **Map**, **Filter**, **FlatMap** and others. Finally when all processing stages are completed, the data can be consumed by
**ForEach**, **ToSlice** or manually by iterating over the resulting channel.
**ForEach**, **UnwrapToSlice** or manually by iterating over the resulting channel.



Expand Down Expand Up @@ -151,39 +160,44 @@ type Measurement struct {
}

func printTemperatureMovements(ctx context.Context, city string, startDate, endDate time.Time) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel() // In case of error or early exit, this ensures all http are canceled

// Make a channel that emits all the days between startDate and endDate
days := make(chan echans.Try[time.Time])
go func() {
defer close(days)
for date := startDate; date.Before(endDate); date = date.AddDate(0, 0, 1) {
days <- echans.Try[time.Time]{V: date}
}
}()

// Download the temperature for each day in parallel and in order
measurements := echans.OrderedMap(days, 10, func(date time.Time) (Measurement, error) {
temp, err := getTemperature(ctx, city, date)
return Measurement{Date: date, Temp: temp}, err
})

// Calculate the temperature movements. Use a single goroutine
prev := Measurement{Temp: math.NaN()}
measurements = echans.OrderedMap(measurements, 1, func(m Measurement) (Measurement, error) {
m.Movement = m.Temp - prev.Temp
prev = m
return m, nil
})

// Iterate over the measurements and print the movements
err := echans.ForEach(measurements, 1, func(m Measurement) error {
fmt.Printf("%s: %.1f°C (movement %+.1f°C)\n", m.Date.Format("2006-01-02"), m.Temp, m.Movement)
prev = m
return nil
})

return err
ctx, cancel := context.WithCancel(ctx)
defer cancel() // In case of error, this ensures all pending operations are canceled

// Make a channel that emits all the days between startDate and endDate
days := make(chan rill.Try[time.Time])
go func() {
defer close(days)
for date := startDate; date.Before(endDate); date = date.AddDate(0, 0, 1) {
days <- rill.Wrap(date, nil)
}
}()

// Download the temperature for each day in parallel and in order
measurements := rill.OrderedMap(days, 10, func(date time.Time) (Measurement, error) {
temp, err := getTemperature(ctx, city, date)
return Measurement{Date: date, Temp: temp}, err
})

// Calculate the temperature movements. Use a single goroutine
prev := Measurement{Temp: math.NaN()}
measurements = rill.OrderedMap(measurements, 1, func(m Measurement) (Measurement, error) {
m.Movement = m.Temp - prev.Temp
prev = m
return m, nil
})

// Iterate over the measurements and print the movements
err := rill.ForEach(measurements, 1, func(m Measurement) error {
fmt.Printf("%s: %.1f°C (movement %+.1f°C)\n", m.Date.Format("2006-01-02"), m.Temp, m.Movement)
prev = m
return nil
})

return err
}

// getTemperature does a network request to fetch the temperature for a given city and date.
func getTemperature(ctx context.Context, city string, date time.Time) (float64, error) {
// ...
}
```
9 changes: 5 additions & 4 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ import (
// A batch is emitted when it reaches the maximum size, the timeout expires, or the input channel closes.
// To emit batches only when full, set the timeout to -1. This function never emits empty batches.
// The timeout countdown starts when the first item is added to a new batch.
// Zero timeout is not supported and will panic.
func Batch[A any](in <-chan Try[A], n int, timeout time.Duration) <-chan Try[[]A] {
values, errs := Unwrap(in)
values, errs := UnwrapToChanAndErrs(in)
batches := core.Batch(values, n, timeout)
return WrapAsync(batches, errs)
return WrapChanAndErrs(batches, errs)
}

// Unbatch is the inverse of Batch. It takes a channel of batches and emits individual items.
func Unbatch[A any](in <-chan Try[[]A]) <-chan Try[A] {
batches, errs := Unwrap(in)
batches, errs := UnwrapToChanAndErrs(in)
values := core.Unbatch(batches)
return WrapAsync(values, errs)
return WrapChanAndErrs(values, errs)
}
2 changes: 1 addition & 1 deletion batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ func TestBatch(t *testing.T) {
// most logic is covered by the chans pkg tests

t.Run("correctness", func(t *testing.T) {
in := Wrap(th.FromRange(0, 10), fmt.Errorf("err0"))
in := WrapChan(th.FromRange(0, 10), fmt.Errorf("err0"))
in = replaceWithError(in, 5, fmt.Errorf("err5"))
in = replaceWithError(in, 7, fmt.Errorf("err7"))

Expand Down
26 changes: 13 additions & 13 deletions core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestMap(t *testing.T) {
})

t.Run(th.Name("correctness", n), func(t *testing.T) {
in := Wrap(th.FromRange(0, 20), nil)
in := WrapChan(th.FromRange(0, 20), nil)
in = replaceWithError(in, 15, fmt.Errorf("err15"))

out := universalMap(ord, in, n, func(x int) (string, error) {
Expand Down Expand Up @@ -60,7 +60,7 @@ func TestMap(t *testing.T) {
})

t.Run(th.Name("ordering", n), func(t *testing.T) {
in := Wrap(th.FromRange(0, 20000), nil)
in := WrapChan(th.FromRange(0, 20000), nil)

out := universalMap(ord, in, n, func(x int) (int, error) {
if x%2 == 0 {
Expand Down Expand Up @@ -103,7 +103,7 @@ func TestFilter(t *testing.T) {
})

t.Run(th.Name("correctness", n), func(t *testing.T) {
in := Wrap(th.FromRange(0, 20), nil)
in := WrapChan(th.FromRange(0, 20), nil)
in = replaceWithError(in, 15, fmt.Errorf("err15"))

out := universalFilter(ord, in, n, func(x int) (bool, error) {
Expand Down Expand Up @@ -135,7 +135,7 @@ func TestFilter(t *testing.T) {
})

t.Run(th.Name("ordering", n), func(t *testing.T) {
in := Wrap(th.FromRange(0, 20000), nil)
in := WrapChan(th.FromRange(0, 20000), nil)

out := universalFilter(ord, in, n, func(x int) (bool, error) {
switch x % 3 {
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestFlatMap(t *testing.T) {
})

t.Run(th.Name("correctness", n), func(t *testing.T) {
in := Wrap(th.FromRange(0, 20), nil)
in := WrapChan(th.FromRange(0, 20), nil)
in = replaceWithError(in, 5, fmt.Errorf("err05"))
in = replaceWithError(in, 15, fmt.Errorf("err15"))

Expand Down Expand Up @@ -210,7 +210,7 @@ func TestFlatMap(t *testing.T) {
})

t.Run(th.Name("ordering", n), func(t *testing.T) {
in := Wrap(th.FromRange(0, 20000), nil)
in := WrapChan(th.FromRange(0, 20000), nil)
in = OrderedMap(in, 1, func(x int) (int, error) {
if x%2 == 0 {
return x, fmt.Errorf("err%06d", x)
Expand Down Expand Up @@ -256,7 +256,7 @@ func TestCatch(t *testing.T) {
})

t.Run(th.Name("correctness", n), func(t *testing.T) {
in := Wrap(th.FromRange(0, 20), nil)
in := WrapChan(th.FromRange(0, 20), nil)
in = replaceWithError(in, 5, fmt.Errorf("err05"))
in = replaceWithError(in, 10, fmt.Errorf("err10"))
in = replaceWithError(in, 15, fmt.Errorf("err15"))
Expand Down Expand Up @@ -290,7 +290,7 @@ func TestCatch(t *testing.T) {
})

t.Run(th.Name("ordering", n), func(t *testing.T) {
in := Wrap(th.FromRange(0, 20000), nil)
in := WrapChan(th.FromRange(0, 20000), nil)
in = OrderedMap(in, 1, func(x int) (int, error) {
if x%2 == 0 {
return x, fmt.Errorf("err%06d", x)
Expand Down Expand Up @@ -321,7 +321,7 @@ func TestForEach(t *testing.T) {
for _, n := range []int{1, 5} {

t.Run(th.Name("no errors", n), func(t *testing.T) {
in := Wrap(th.FromRange(0, 10), nil)
in := WrapChan(th.FromRange(0, 10), nil)

sum := int64(0)
err := ForEach(in, n, func(x int) error {
Expand All @@ -335,7 +335,7 @@ func TestForEach(t *testing.T) {

t.Run(th.Name("error in input", n), func(t *testing.T) {
th.ExpectNotHang(t, 10*time.Second, func() {
in := Wrap(th.FromRange(0, 1000), nil)
in := WrapChan(th.FromRange(0, 1000), nil)
in = replaceWithError(in, 100, fmt.Errorf("err100"))

cnt := int64(0)
Expand All @@ -359,7 +359,7 @@ func TestForEach(t *testing.T) {

t.Run(th.Name("error in func", n), func(t *testing.T) {
th.ExpectNotHang(t, 10*time.Second, func() {
in := Wrap(th.FromRange(0, 1000), nil)
in := WrapChan(th.FromRange(0, 1000), nil)

cnt := int64(0)
err := ForEach(in, n, func(x int) error {
Expand All @@ -385,7 +385,7 @@ func TestForEach(t *testing.T) {
})

t.Run(th.Name("ordering", n), func(t *testing.T) {
in := Wrap(th.FromRange(0, 20000), nil)
in := WrapChan(th.FromRange(0, 20000), nil)

var mu sync.Mutex
outSlice := make([]int, 0, 20000)
Expand All @@ -408,7 +408,7 @@ func TestForEach(t *testing.T) {
}

t.Run("deterministic when n=1", func(t *testing.T) {
in := Wrap(th.FromRange(0, 100), nil)
in := WrapChan(th.FromRange(0, 100), nil)

in = replaceWithError(in, 10, fmt.Errorf("err10"))
in = replaceWithError(in, 11, fmt.Errorf("err11"))
Expand Down
Loading

0 comments on commit 2e7a2c5

Please sign in to comment.