Skip to content

Commit

Permalink
Update disposables docs (#679)
Browse files Browse the repository at this point in the history
* disposables

* one more

* Update src/rpp/rpp/disposables.hpp

Co-authored-by: Markus Werle <[email protected]>

* minor update

---------

Co-authored-by: Markus Werle <[email protected]>
  • Loading branch information
victimsnino and daixtrose authored Nov 14, 2024
1 parent 37e4949 commit 90f9f2f
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 45 deletions.
41 changes: 2 additions & 39 deletions docs/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,15 +193,10 @@ See <https://reactivex.io/documentation/scheduler.html> for more details about s

### Disposable

In reactive programming, a **disposable** is an object that represents a resource that needs to be released or disposed of when it is no longer needed. This can include things like file handles, network connections, or any other resource that needs to be cleaned up after use.
\copydoc disposables

The purpose of a disposable is to provide a way to manage resources in a safe and efficient manner. By using disposables, you can ensure that resources are released in a timely manner, preventing memory leaks and other issues that can arise from resource leaks.
Check API reference of @link disposables @endlink for more details

In most cases disposables are placed in observers. RPP's observer can use two types of disposables:

1. **Upstream disposable** - This is a disposable that the observable puts into the observer. The upstream disposable keeps some state or callback that should be disposed of when the observer is disposed. This ensures that any resources used by the observable are properly cleaned up when the observer obtains on_error/on_completed or disposed in any other way.

2. **External disposable** - This is a disposable that allows the observer to be disposed of from outside the observer itself. This can be useful in situations where you need to cancel an ongoing operation or release resources before the observable has completed its work.

### Exception Guarantee

Expand Down Expand Up @@ -252,38 +247,6 @@ All disposable in RPP should be created and used via `rpp::disposable_wrapper_im
- `disposable_wrapper` - wrapper over `interface_disposable`
- `composite_disposable_wrapper` - wrapper over `interface_composite_disposable`
`disposable_wrapper` is kind of smart_pointer (like std::unique_ptr) but for disposables. So, default constructed wrapper is empty wrapper.
```cpp
auto d = rpp::disposable_wrapper{};
```
Comparing to unique_ptr wrapper's methods are safe to use for empty wrapper.
To construct wrapper you have to use `make` method:
```cpp
auto d = rpp::disposable_wrapper::make<SomeSpecificDisposableType>(some_arguments, to_construct_it);
```

Wrapper has popluar methods to work with disposable: `dispose()`, `is_disposed()` and `add()`/`remove()`/`clear()` (for `interface_composite_disposable`).

In case of you want to obtain original disposable, you can use `lock()` method returning shared_ptr.

`disposable_wrapper` can be strong and weak:
- strong (it is default behavior) is keeping disposable as shared_ptr, so, such an instance of wrapper is extending life-time is underlying disposable
- weak (disposable_wrapper can be forced to weak via `as_weak()` method) is keeping disposable as weak_ptr, so, such an instance of wrapper is **NOT** extendning life-time is underlying disposable

This wrapper is needed for 2 goals:
- provide safe usage of disposables avoiding manual handling of empty/weak disposables
- automatically call `dispose()` during destruction of any disposable

To achieve desired performance RPP is avoiding to returning disposable by default. So, it is why `subscribe` method is not returning anything by default. If you want to attach disposable to observer you can use overloading method accepting disposable as first argument like this:
```cpp
auto d = rpp::composite_disposable_wrapper::make();
observable.subscribe(d, [](int v){});
```
or use `subscribe_with_disposable` method instead
```cpp
auto d = observable.subscribe_with_disposable([](int){});
```

### dynamic_* versions to keep classes as variables
Most of the classes inside rpp library including `observable`, `observer` and others are heavy-templated classes. It means, it could has a lot of template params. In most cases you shouldn't worry about it due to it is purely internal problem.
Expand Down
25 changes: 23 additions & 2 deletions src/rpp/rpp/disposables.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,29 @@

/**
* @defgroup disposables Disposables
* @brief Disposable owns some resource and provides ability to `dispose()` it: destroy/remove/disconnect and etc.
* @details In RPP it used as "inverted subscription": observable sets disposable to observer via `set_upstream(disposable)` with meaning "if you want to cancel me -> dispose this disposable"
*
* @brief Disposable is handle/resource passed from observable to observer via the `set_upstream` method. Observer disposes this disposable when it wants to unsubscribe from observable.
*
* @details In reactive programming, a **disposable** is an object that represents a resource that needs to be released or disposed of when it is no longer needed.
* This can include things like file handles, network connections, or any other resource that needs to be cleaned up after use.
* The purpose of a disposable is to provide a way to manage resources in a safe and efficient manner.
* By using disposables, you can ensure that resources are released in a timely manner, preventing memory leaks and other issues that can arise from resource leaks.
*
* There are 2 main purposes of disposables:
* 1. **Upstream disposable** <br>
* This is a disposable that the observable puts into the observer.
* The upstream disposable keeps some state or callback that should be disposed of when the observer is disposed (== no longer wants to receive emissions, for example, was completed/errored or just unsubscribed)
* This ensures that any resources used by the observable are properly cleaned up when the observer obtains on_error/on_completed or disposed in any other way.
*
* 2. **External disposable** <br>
* This is a disposable that allows the observer to be disposed of from outside the observer itself.
* This can be useful in situations where you need to cancel an ongoing operation or release resources before the observable has completed its work.
* To achieve this in rpp you can pass disposable to `subscribe` method or use `subscribe_with_disposable` overload instead.
*
* @note In rpp all disposables should be created via @link rpp::disposable_wrapper_impl @endlink instead of manually.
*
* @warning From user of rpp library it is not really expected to handle disposables manually somehow **except** of case where user want to control lifetime of observable-observer connection manually.
*
* @ingroup rpp
*/

Expand Down
39 changes: 35 additions & 4 deletions src/rpp/rpp/disposables/disposable_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,35 @@ namespace rpp::details
namespace rpp
{
/**
* @brief Wrapper to keep disposable. Any disposable have to be created right from this wrapper with help of `make` function.
* @details Member functions is safe to call even if internal disposable is gone. Also it provides access to "raw" shared_ptr and it can be nullptr in case of disposable empty/ptr gone.
* @details Can keep weak_ptr in case of not owning disposable
* @brief Main RPP wrapper over @link disposables @endlink.
* @details This wrapper invented to provide safe and easy-to-use access to disposables. It has next core points:
* - disposable_wrapper is kind of smart_pointer (like std::shared_ptr) but for disposables. So, default constructed wrapper is empty wrapper.
* - disposable_wrapper shares ownership like std::shared_ptr
* - any disposable created via disposable_wrapper would have call `dispose()` during it's destruction (during destruction of last disposable_wrapper owning it)
* - disposable_wrapper's methods is safe to use over empty/gone/disposed/weak disposables.
* - as soon as disposable can be actually "any internal state" it provides access to "raw" shared_ptr and it can be nullptr in case of disposable empty/ptr gone.
* - disposable_wrapper can be strong or weak (same as std::shared_ptr). weak disposable is important, for example, when it keeps observer and this observer should keep this disposable at the same time.
* - disposable_wrapper has popluar methods to work with disposable: `dispose()`, `is_disposed()` and `add()`/`remove()`/`clear()` (for `interface_composite_disposable`).
*
* To construct wrapper you have to use `make` method:
* @code{cpp}
* auto d = rpp::disposable_wrapper::make<SomeSpecificDisposableType>(some_arguments, to_construct_it);
* @endcode
*
* To achieve desired performance RPP is avoiding to returning disposable by default. So, it is why `subscribe` method is not returning anything by default. If you want to attach disposable to observer you can use overloading method accepting disposable as first argument like this:
* @code{cpp}
* auto d = rpp::composite_disposable_wrapper::make();
* observable.subscribe(d, [](int v){});
* @endcode
* or use `subscribe_with_disposable` method instead
* @code{cpp}
* auto d = observable.subscribe_with_disposable([](int){});
* @endcode
*
* @note rpp has 2 predefined disposable_wrappers for most popular cases:
* - @link rpp::disposable_wrapper @endlink is wrapper for simple @link rpp::interface_disposable @endlink
* - @link rpp::composite_disposable_wrapper @endlink is wrapper for @link rpp::composite_disposable @endlink
*
* @ingroup disposables
*/
Expand All @@ -126,7 +152,12 @@ namespace rpp
bool operator==(const disposable_wrapper_impl&) const = default;

/**
* @brief Way to create disposable_wrapper. Passed `TTarget` type can be any type derived from `TDisposable`.
* @brief Main way to create disposable_wrapper. Passed `TTarget` type can be any type derived from `TDisposable`.
* @par Example:
*
* \code{cpp}
* rpp::disposable_wrapper<rpp::interface_composite_disposable>::make<rpp::composite_disposable>();
* \endcode
*/
template<std::derived_from<TDisposable> TTarget = TDefaultMake, typename... TArgs>
requires (std::constructible_from<TTarget, TArgs && ...>)
Expand Down

1 comment on commit 90f9f2f

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BENCHMARK RESULTS (AUTOGENERATED)

ci-ubuntu-gcc

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 304.60 ns 1.85 ns 1.85 ns 1.00
Subscribe empty callbacks to empty observable via pipe operator 306.64 ns 1.85 ns 1.85 ns 1.00

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 707.68 ns 0.31 ns 0.31 ns 1.00
from array of 1 - create + subscribe + current_thread 1047.36 ns 3.42 ns 3.43 ns 1.00
concat_as_source of just(1 immediate) create + subscribe 2244.45 ns 118.49 ns 111.93 ns 1.06
defer from array of 1 - defer + create + subscribe + immediate 731.04 ns 0.31 ns 0.31 ns 1.00
interval - interval + take(3) + subscribe + immediate 2134.50 ns 59.23 ns 59.23 ns 1.00
interval - interval + take(3) + subscribe + current_thread 3061.97 ns 32.40 ns 32.40 ns 1.00
from array of 1 - create + as_blocking + subscribe + new_thread 29527.28 ns 27635.19 ns 28324.88 ns 0.98
from array of 1000 - create + as_blocking + subscribe + new_thread 39498.15 ns 50572.00 ns 51243.95 ns 0.99
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 3483.15 ns 140.57 ns 130.55 ns 1.08

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1080.95 ns 0.31 ns 0.31 ns 1.00
immediate_just+filter(true)+subscribe 866.55 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 982.20 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 905.73 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+first()+subscribe 1259.58 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+last()+subscribe 910.09 ns 0.31 ns 0.31 ns 1.00
immediate_just+take_last(1)+subscribe 1116.91 ns 17.91 ns 17.91 ns 1.00
immediate_just(1,2,3)+element_at(1)+subscribe 833.50 ns 0.31 ns 0.31 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 265.98 ns 1.55 ns 0.46 ns 3.34
current_thread scheduler create worker + schedule 364.21 ns 4.32 ns 4.32 ns 1.00
current_thread scheduler create worker + schedule + recursive schedule 812.37 ns 60.48 ns 61.17 ns 0.99

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 867.43 ns 0.31 ns 0.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 897.10 ns 0.31 ns 0.31 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 2344.69 ns 139.77 ns 170.56 ns 0.82
immediate_just+buffer(2)+subscribe 1572.74 ns 13.59 ns 13.59 ns 1.00
immediate_just+window(2)+subscribe + subscsribe inner 2426.80 ns 1359.80 ns 1325.75 ns 1.03

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 854.78 ns - - 0.00
immediate_just+take_while(true)+subscribe 846.21 ns 0.31 ns 0.31 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 1991.10 ns 0.31 ns 0.31 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 3428.02 ns 160.14 ns 203.76 ns 0.79
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3641.91 ns 155.79 ns 161.58 ns 0.96
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 137.99 ns 129.60 ns 1.06
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 3547.97 ns 1254.32 ns 1165.96 ns 1.08
immediate_just(1) + zip(immediate_just(2)) + subscribe 2160.50 ns 210.73 ns 207.99 ns 1.01
immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe 3184.96 ns 250.60 ns 239.54 ns 1.05

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 34.50 ns 14.68 ns 14.66 ns 1.00
subscribe 100 observers to publish_subject 200293.00 ns 16449.17 ns 16355.43 ns 1.01
100 on_next to 100 observers to publish_subject 27379.00 ns 17198.60 ns 19297.93 ns 0.89

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1414.65 ns 12.67 ns 12.65 ns 1.00
basic sample with immediate scheduler 1432.99 ns 5.24 ns 5.24 ns 1.00

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 939.23 ns 0.31 ns 0.31 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 2067.62 ns 983.42 ns 990.54 ns 0.99
create(on_error())+retry(1)+subscribe 595.87 ns 109.48 ns 115.34 ns 0.95

ci-macos

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 973.43 ns 0.71 ns 0.60 ns 1.18
Subscribe empty callbacks to empty observable via pipe operator 973.50 ns 0.70 ns 0.58 ns 1.21

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 1927.27 ns 0.23 ns 0.26 ns 0.88
from array of 1 - create + subscribe + current_thread 2435.61 ns 33.60 ns 38.65 ns 0.87
concat_as_source of just(1 immediate) create + subscribe 5415.59 ns 315.88 ns 351.67 ns 0.90
defer from array of 1 - defer + create + subscribe + immediate 1959.26 ns 0.23 ns 0.26 ns 0.90
interval - interval + take(3) + subscribe + immediate 4905.08 ns 114.56 ns 124.54 ns 0.92
interval - interval + take(3) + subscribe + current_thread 5991.84 ns 97.86 ns 106.40 ns 0.92
from array of 1 - create + as_blocking + subscribe + new_thread 82106.50 ns 79561.79 ns 95570.83 ns 0.83
from array of 1000 - create + as_blocking + subscribe + new_thread 88282.23 ns 87249.42 ns 103187.36 ns 0.85
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 8231.64 ns 359.45 ns 405.12 ns 0.89

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 2880.89 ns 0.23 ns 0.23 ns 1.00
immediate_just+filter(true)+subscribe 2110.82 ns 0.23 ns 0.23 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 2779.37 ns 0.23 ns 0.23 ns 1.01
immediate_just(1,1,2)+distinct_until_changed()+subscribe 2127.30 ns 0.47 ns 0.47 ns 1.00
immediate_just(1,2)+first()+subscribe 3191.46 ns 0.23 ns 0.23 ns 1.00
immediate_just(1,2)+last()+subscribe 2425.68 ns 0.23 ns 0.23 ns 1.00
immediate_just+take_last(1)+subscribe 3088.58 ns 0.23 ns 0.23 ns 1.01
immediate_just(1,2,3)+element_at(1)+subscribe 2167.10 ns 0.23 ns 0.29 ns 0.81

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 874.02 ns 0.93 ns 4.93 ns 0.19
current_thread scheduler create worker + schedule 1200.00 ns 34.14 ns 116.11 ns 0.29
current_thread scheduler create worker + schedule + recursive schedule 2037.66 ns 202.92 ns 226.20 ns 0.90

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 2114.11 ns 4.21 ns 4.20 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 2373.75 ns 0.47 ns 0.47 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 5340.67 ns 375.74 ns 373.21 ns 1.01
immediate_just+buffer(2)+subscribe 2547.63 ns 64.18 ns 68.22 ns 0.94
immediate_just+window(2)+subscribe + subscsribe inner 5511.00 ns 2385.97 ns 2363.91 ns 1.01

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 2116.04 ns - - 0.00
immediate_just+take_while(true)+subscribe 2111.65 ns 0.23 ns 0.23 ns 1.01

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 4960.23 ns 4.90 ns 5.95 ns 0.82

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 7782.33 ns 417.88 ns 469.30 ns 0.89
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 8842.03 ns 407.91 ns 427.10 ns 0.96
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 448.18 ns 453.63 ns 0.99
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 7999.41 ns 1881.86 ns 1945.74 ns 0.97
immediate_just(1) + zip(immediate_just(2)) + subscribe 5183.86 ns 822.09 ns 827.10 ns 0.99
immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe 7646.40 ns 674.38 ns 886.27 ns 0.76

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 75.34 ns 49.39 ns 53.14 ns 0.93
subscribe 100 observers to publish_subject 351456.00 ns 40905.05 ns 45833.83 ns 0.89
100 on_next to 100 observers to publish_subject 53913.00 ns 19300.43 ns 22447.80 ns 0.86

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 2792.46 ns 69.43 ns 78.27 ns 0.89
basic sample with immediate scheduler 2811.06 ns 18.44 ns 21.17 ns 0.87

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 2425.68 ns 0.23 ns 0.29 ns 0.80

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 6685.54 ns 4118.37 ns 4350.13 ns 0.95
create(on_error())+retry(1)+subscribe 1828.54 ns 278.47 ns 288.40 ns 0.97

ci-ubuntu-clang

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 271.45 ns 0.63 ns 1.54 ns 0.41
Subscribe empty callbacks to empty observable via pipe operator 271.09 ns 0.63 ns 1.54 ns 0.41

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 574.06 ns 0.31 ns 0.31 ns 1.00
from array of 1 - create + subscribe + current_thread 797.23 ns 4.01 ns 4.01 ns 1.00
concat_as_source of just(1 immediate) create + subscribe 2393.93 ns 128.61 ns 129.51 ns 0.99
defer from array of 1 - defer + create + subscribe + immediate 767.68 ns 0.31 ns 0.31 ns 1.00
interval - interval + take(3) + subscribe + immediate 2203.50 ns 58.30 ns 58.26 ns 1.00
interval - interval + take(3) + subscribe + current_thread 3207.93 ns 30.88 ns 30.88 ns 1.00
from array of 1 - create + as_blocking + subscribe + new_thread 30899.86 ns 29074.54 ns 28296.37 ns 1.03
from array of 1000 - create + as_blocking + subscribe + new_thread 39228.36 ns 37825.45 ns 35617.48 ns 1.06
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 3701.22 ns 148.28 ns 147.42 ns 1.01

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1161.26 ns 0.31 ns 0.31 ns 1.00
immediate_just+filter(true)+subscribe 847.15 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 1079.92 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 868.99 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+first()+subscribe 1404.67 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+last()+subscribe 1004.54 ns 0.31 ns 0.31 ns 1.00
immediate_just+take_last(1)+subscribe 1183.25 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2,3)+element_at(1)+subscribe 858.79 ns 0.31 ns 0.31 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 281.02 ns 1.54 ns 0.63 ns 2.43
current_thread scheduler create worker + schedule 400.54 ns 4.01 ns 4.32 ns 0.93
current_thread scheduler create worker + schedule + recursive schedule 860.10 ns 56.03 ns 54.88 ns 1.02

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 842.02 ns 0.31 ns 0.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 960.19 ns 0.62 ns 0.62 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 2232.54 ns 140.60 ns 142.77 ns 0.98
immediate_just+buffer(2)+subscribe 1512.70 ns 14.19 ns 14.20 ns 1.00
immediate_just+window(2)+subscribe + subscsribe inner 2441.86 ns 918.06 ns 911.43 ns 1.01

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 837.95 ns - - 0.00
immediate_just+take_while(true)+subscribe 858.15 ns 0.31 ns 0.31 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 1994.92 ns 0.31 ns 0.31 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 3230.34 ns 153.10 ns 155.72 ns 0.98
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3725.13 ns 137.52 ns 136.79 ns 1.01
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 143.00 ns 142.90 ns 1.00
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 3432.59 ns 830.65 ns 827.38 ns 1.00
immediate_just(1) + zip(immediate_just(2)) + subscribe 2206.03 ns 204.17 ns 206.35 ns 0.99
immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe 3277.11 ns 222.97 ns 224.94 ns 0.99

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 52.42 ns 17.91 ns 17.95 ns 1.00
subscribe 100 observers to publish_subject 211771.00 ns 16076.98 ns 16038.88 ns 1.00
100 on_next to 100 observers to publish_subject 38250.63 ns 23499.42 ns 20692.56 ns 1.14

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1348.05 ns 11.42 ns 11.42 ns 1.00
basic sample with immediate scheduler 1312.54 ns 6.18 ns 6.17 ns 1.00

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 984.79 ns 0.31 ns 0.31 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 2189.35 ns 1165.87 ns 1172.90 ns 0.99
create(on_error())+retry(1)+subscribe 640.52 ns 140.12 ns 139.15 ns 1.01

ci-windows

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 563.30 ns 1.85 ns 2.16 ns 0.86
Subscribe empty callbacks to empty observable via pipe operator 582.06 ns 1.85 ns 2.16 ns 0.86

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 1181.85 ns 5.55 ns 5.55 ns 1.00
from array of 1 - create + subscribe + current_thread 1456.08 ns 15.75 ns 15.74 ns 1.00
concat_as_source of just(1 immediate) create + subscribe 3845.82 ns 174.46 ns 171.26 ns 1.02
defer from array of 1 - defer + create + subscribe + immediate 1215.02 ns 5.55 ns 5.55 ns 1.00
interval - interval + take(3) + subscribe + immediate 3500.59 ns 140.85 ns 140.00 ns 1.01
interval - interval + take(3) + subscribe + current_thread 3608.83 ns 60.24 ns 59.74 ns 1.01
from array of 1 - create + as_blocking + subscribe + new_thread 118680.00 ns 114088.89 ns 110990.00 ns 1.03
from array of 1000 - create + as_blocking + subscribe + new_thread 126300.00 ns 130722.22 ns 128987.50 ns 1.01
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 5530.61 ns 200.40 ns 196.48 ns 1.02

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1874.72 ns 19.73 ns 19.43 ns 1.02
immediate_just+filter(true)+subscribe 1629.03 ns 18.81 ns 18.51 ns 1.02
immediate_just(1,2)+skip(1)+subscribe 1795.59 ns 18.51 ns 17.90 ns 1.03
immediate_just(1,1,2)+distinct_until_changed()+subscribe 1362.61 ns 23.45 ns 20.68 ns 1.13
immediate_just(1,2)+first()+subscribe 2512.06 ns 17.28 ns 18.21 ns 0.95
immediate_just(1,2)+last()+subscribe 1829.57 ns 18.51 ns 19.14 ns 0.97
immediate_just+take_last(1)+subscribe 2082.48 ns 65.03 ns 65.06 ns 1.00
immediate_just(1,2,3)+element_at(1)+subscribe 1643.53 ns 21.91 ns 21.00 ns 1.04

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 478.46 ns 4.01 ns 4.01 ns 1.00
current_thread scheduler create worker + schedule 649.74 ns 11.61 ns 11.60 ns 1.00
current_thread scheduler create worker + schedule + recursive schedule 1354.64 ns 104.64 ns 104.05 ns 1.01

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 1337.98 ns 18.81 ns 18.82 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 1465.42 ns 20.99 ns 20.96 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 4040.55 ns 179.90 ns 204.01 ns 0.88
immediate_just+buffer(2)+subscribe 2409.45 ns 65.59 ns 65.36 ns 1.00
immediate_just+window(2)+subscribe + subscsribe inner 4169.73 ns 1342.91 ns 1322.29 ns 1.02

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 1332.99 ns 17.57 ns 17.57 ns 1.00
immediate_just+take_while(true)+subscribe 1340.70 ns 18.81 ns 18.51 ns 1.02

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 3271.03 ns 11.10 ns 11.11 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 5318.22 ns 196.94 ns 201.39 ns 0.98
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 5730.35 ns 182.58 ns 181.89 ns 1.00
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 207.02 ns 208.78 ns 0.99
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 6308.70 ns 968.84 ns 960.61 ns 1.01
immediate_just(1) + zip(immediate_just(2)) + subscribe 3930.38 ns 522.18 ns 510.38 ns 1.02
immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe 5108.12 ns 333.45 ns 336.56 ns 0.99

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 37.71 ns 20.11 ns 20.12 ns 1.00
subscribe 100 observers to publish_subject 268475.00 ns 29612.82 ns 27373.68 ns 1.08
100 on_next to 100 observers to publish_subject 51547.83 ns 35737.93 ns 35759.38 ns 1.00

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1912.01 ns 96.09 ns 95.77 ns 1.00
basic sample with immediate scheduler 1916.31 ns 68.33 ns 68.43 ns 1.00

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 1491.77 ns 19.42 ns 19.13 ns 1.02

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 2002.46 ns 358.78 ns 354.01 ns 1.01
create(on_error())+retry(1)+subscribe 1259.58 ns 139.24 ns 140.22 ns 0.99

Please sign in to comment.