Skip to content

Commit

Permalink
Add FIlterMap function (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
destel authored Jul 21, 2024
1 parent a7c2c3f commit 81185b0
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 16 deletions.
35 changes: 35 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,41 @@ func ExampleOrderedFilter() {
printStream(evens)
}

func ExampleFilterMap() {
numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)

// Keep only odd numbers and square them
// Concurrency = 3; Unordered
squares := rill.FilterMap(numbers, 3, func(x int) (int, bool, error) {
if x%2 == 0 {
return 0, false, nil
}

randomSleep(1000 * time.Millisecond) // simulate some additional work
return x * x, true, nil
})

printStream(squares)
}

// The same example as for the [FilterMap], but using ordered versions of functions.
func ExampleOrderedFilterMap() {
numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)

// Keep only odd numbers and square them
// Concurrency = 3; Ordered
squares := rill.OrderedFilterMap(numbers, 3, func(x int) (int, bool, error) {
if x%2 == 0 {
return 0, false, nil
}

randomSleep(1000 * time.Millisecond) // simulate some additional work
return x * x, true, nil
})

printStream(squares)
}

func ExampleFirst() {
numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)

Expand Down
2 changes: 1 addition & 1 deletion internal/core/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func MapReduce[A any, K comparable, V any](in <-chan A, nm int, mapper func(A) (
}

// Phase 1: Map
mapped := MapOrFilter(in, nm, func(a A) (keyValue[K, V], bool) {
mapped := FilterMap(in, nm, func(a A) (keyValue[K, V], bool) {
k, v := mapper(a)
return keyValue[K, V]{k, v}, true
})
Expand Down
4 changes: 2 additions & 2 deletions internal/core/transform.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package core

func MapOrFilter[A, B any](in <-chan A, n int, f func(A) (B, bool)) <-chan B {
func FilterMap[A, B any](in <-chan A, n int, f func(A) (B, bool)) <-chan B {
if in == nil {
return nil
}
Expand All @@ -17,7 +17,7 @@ func MapOrFilter[A, B any](in <-chan A, n int, f func(A) (B, bool)) <-chan B {
return out
}

func OrderedMapOrFilter[A, B any](in <-chan A, n int, f func(A) (B, bool)) <-chan B {
func OrderedFilterMap[A, B any](in <-chan A, n int, f func(A) (B, bool)) <-chan B {
if in == nil {
return nil
}
Expand Down
14 changes: 7 additions & 7 deletions internal/core/transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,25 @@ import (
"github.com/destel/rill/internal/th"
)

func universalMapOrFilter[A, B any](ord bool, in <-chan A, n int, f func(A) (B, bool)) <-chan B {
func universalFilterMap[A, B any](ord bool, in <-chan A, n int, f func(A) (B, bool)) <-chan B {
if ord {
return OrderedMapOrFilter(in, n, f)
return OrderedFilterMap(in, n, f)
}
return MapOrFilter(in, n, f)
return FilterMap(in, n, f)
}

func TestMapOrFilter(t *testing.T) {
func TestFilterMap(t *testing.T) {
th.TestBothOrderings(t, func(t *testing.T, ord bool) {
for _, n := range []int{1, 5} {

t.Run(th.Name("nil", n), func(t *testing.T) {
out := universalMapOrFilter(ord, nil, n, func(x int) (int, bool) { return x, true })
out := universalFilterMap(ord, nil, n, func(x int) (int, bool) { return x, true })
th.ExpectValue(t, out, nil)
})

t.Run(th.Name("correctness", n), func(t *testing.T) {
in := th.FromRange(0, 20)
out := universalMapOrFilter(ord, in, n, func(x int) (string, bool) {
out := universalFilterMap(ord, in, n, func(x int) (string, bool) {
return fmt.Sprintf("%03d", x), x%2 == 0
})

Expand All @@ -46,7 +46,7 @@ func TestMapOrFilter(t *testing.T) {
t.Run(th.Name("ordering", n), func(t *testing.T) {
in := th.FromRange(0, 20000)

out := universalMapOrFilter(ord, in, n, func(x int) (int, bool) {
out := universalFilterMap(ord, in, n, func(x int) (int, bool) {
return x, x%2 == 0
})

Expand Down
51 changes: 45 additions & 6 deletions transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
//
// See the package documentation for more information on non-blocking unordered functions and error handling.
func Map[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan Try[B] {
return core.MapOrFilter(in, n, func(a Try[A]) (Try[B], bool) {
return core.FilterMap(in, n, func(a Try[A]) (Try[B], bool) {
if a.Error != nil {
return Try[B]{Error: a.Error}, true
}
Expand All @@ -28,7 +28,7 @@ func Map[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan Try[B]

// OrderedMap is the ordered version of [Map].
func OrderedMap[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan Try[B] {
return core.OrderedMapOrFilter(in, n, func(a Try[A]) (Try[B], bool) {
return core.OrderedFilterMap(in, n, func(a Try[A]) (Try[B], bool) {
if a.Error != nil {
return Try[B]{Error: a.Error}, true
}
Expand All @@ -50,7 +50,7 @@ func OrderedMap[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan
//
// See the package documentation for more information on non-blocking unordered functions and error handling.
func Filter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-chan Try[A] {
return core.MapOrFilter(in, n, func(a Try[A]) (Try[A], bool) {
return core.FilterMap(in, n, func(a Try[A]) (Try[A], bool) {
if a.Error != nil {
return a, true // never filter out errors
}
Expand All @@ -66,7 +66,7 @@ func Filter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-chan Try[

// OrderedFilter is the ordered version of [Filter].
func OrderedFilter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-chan Try[A] {
return core.OrderedMapOrFilter(in, n, func(a Try[A]) (Try[A], bool) {
return core.OrderedFilterMap(in, n, func(a Try[A]) (Try[A], bool) {
if a.Error != nil {
return a, true // never filter out errors
}
Expand All @@ -80,6 +80,45 @@ func OrderedFilter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-ch
})
}

// FilterMap takes a stream of items of type A, applies a function f that can filter and transform them into items of type B.
// Returns a new stream of transformed items that passed the filter. This operation is equivalent to a
// [Filter] followed by a [Map].
//
// This is a non-blocking unordered function that processes items concurrently using n goroutines.
// An ordered version of this function, [OrderedFilterMap], is also available.
//
// See the package documentation for more information on non-blocking unordered functions and error handling.
func FilterMap[A, B any](in <-chan Try[A], n int, f func(A) (B, bool, error)) <-chan Try[B] {
return core.FilterMap(in, n, func(a Try[A]) (Try[B], bool) {
if a.Error != nil {
return Try[B]{Error: a.Error}, true
}

b, keep, err := f(a.Value)
if err != nil {
return Try[B]{Error: err}, true
}

return Try[B]{Value: b}, keep
})
}

// OrderedFilterMap is the ordered version of [FilterMap].
func OrderedFilterMap[A, B any](in <-chan Try[A], n int, f func(A) (B, bool, error)) <-chan Try[B] {
return core.OrderedFilterMap(in, n, func(a Try[A]) (Try[B], bool) {
if a.Error != nil {
return Try[B]{Error: a.Error}, true
}

b, keep, err := f(a.Value)
if err != nil {
return Try[B]{Error: err}, true
}

return Try[B]{Value: b}, keep
})
}

// FlatMap takes a stream of items of type A and transforms each item into a new sub-stream of items of type B using a function f.
// Those sub-streams are then flattened into a single output stream, which is returned.
//
Expand Down Expand Up @@ -146,7 +185,7 @@ func OrderedFlatMap[A, B any](in <-chan Try[A], n int, f func(A) <-chan Try[B])
//
// See the package documentation for more information on non-blocking unordered functions and error handling.
func Catch[A any](in <-chan Try[A], n int, f func(error) error) <-chan Try[A] {
return core.MapOrFilter(in, n, func(a Try[A]) (Try[A], bool) {
return core.FilterMap(in, n, func(a Try[A]) (Try[A], bool) {
if a.Error == nil {
return a, true
}
Expand All @@ -162,7 +201,7 @@ func Catch[A any](in <-chan Try[A], n int, f func(error) error) <-chan Try[A] {

// OrderedCatch is the ordered version of [Catch].
func OrderedCatch[A any](in <-chan Try[A], n int, f func(error) error) <-chan Try[A] {
return core.OrderedMapOrFilter(in, n, func(a Try[A]) (Try[A], bool) {
return core.OrderedFilterMap(in, n, func(a Try[A]) (Try[A], bool) {
if a.Error == nil {
return a, true
}
Expand Down
79 changes: 79 additions & 0 deletions transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,85 @@ func TestFilter(t *testing.T) {
})
}

func universalFilterMap[A, B any](ord bool, in <-chan Try[A], n int, f func(A) (B, bool, error)) <-chan Try[B] {
if ord {
return OrderedFilterMap(in, n, f)
}
return FilterMap(in, n, f)
}

func TestFilterMap(t *testing.T) {
th.TestBothOrderings(t, func(t *testing.T, ord bool) {
for _, n := range []int{1, 5} {

t.Run(th.Name("nil", n), func(t *testing.T) {
out := universalFilterMap(ord, nil, n, func(x int) (int, bool, error) { return x, true, nil })
th.ExpectValue(t, out, nil)
})

t.Run(th.Name("correctness", n), func(t *testing.T) {
in := FromChan(th.FromRange(0, 20), nil)
in = replaceWithError(in, 15, fmt.Errorf("err15"))

out := universalFilterMap(ord, in, n, func(x int) (string, bool, error) {
if x == 5 {
return "", false, fmt.Errorf("err05")
}
if x == 6 {
return "", true, fmt.Errorf("err06")
}

return fmt.Sprintf("%03d", x), x%2 == 0, nil
})

outSlice, errSlice := toSliceAndErrors(out)

expectedSlice := make([]string, 0, 20)
for i := 0; i < 20; i++ {
if i == 5 || i == 6 || i == 15 || i%2 == 1 {
continue
}
expectedSlice = append(expectedSlice, fmt.Sprintf("%03d", i))
}

sort.Strings(outSlice)
sort.Strings(errSlice)

th.ExpectSlice(t, outSlice, expectedSlice)
th.ExpectSlice(t, errSlice, []string{"err05", "err06", "err15"})
})

t.Run(th.Name("ordering", n), func(t *testing.T) {
in := FromChan(th.FromRange(0, 20000), nil)

out := universalFilterMap(ord, in, n, func(x int) (int, bool, error) {
switch x % 3 {
case 2:
return x, false, fmt.Errorf("err%06d", x)
case 1:
return x, false, nil
default:
return x, true, nil

}
})

outSlice, errSlice := toSliceAndErrors(out)

if ord || n == 1 {
th.ExpectSorted(t, outSlice)
th.ExpectSorted(t, errSlice)
} else {
th.ExpectUnsorted(t, outSlice)
th.ExpectUnsorted(t, errSlice)
}

})

}
})
}

func universalFlatMap[A, B any](ord bool, in <-chan Try[A], n int, f func(A) <-chan Try[B]) <-chan Try[B] {
if ord {
return OrderedFlatMap(in, n, f)
Expand Down

0 comments on commit 81185b0

Please sign in to comment.