diff --git a/janet/src/main/java/io/techery/janet/ActionPipe.java b/janet/src/main/java/io/techery/janet/ActionPipe.java index 42f825a..9c7fb1c 100644 --- a/janet/src/main/java/io/techery/janet/ActionPipe.java +++ b/janet/src/main/java/io/techery/janet/ActionPipe.java @@ -70,7 +70,7 @@ public Observable> observe() { * @see Observable#replay(int) */ public Observable> observeWithReplay() { - return cachedPipeline; + return cachedPipeline.asObservable(); } /** @@ -89,7 +89,7 @@ public Observable observeSuccess() { * @see ActionPipe#observeSuccess() */ public Observable observeSuccessWithReplay() { - return cachedSuccessPipeline; + return cachedSuccessPipeline.asObservable(); } /** diff --git a/janet/src/main/java/io/techery/janet/helper/ActionStateSubscriber.java b/janet/src/main/java/io/techery/janet/helper/ActionStateSubscriber.java index aa156e3..d01d820 100644 --- a/janet/src/main/java/io/techery/janet/helper/ActionStateSubscriber.java +++ b/janet/src/main/java/io/techery/janet/helper/ActionStateSubscriber.java @@ -2,6 +2,7 @@ import io.techery.janet.ActionState; import rx.Subscriber; +import rx.exceptions.OnErrorNotImplementedException; import rx.functions.Action0; import rx.functions.Action1; import rx.functions.Action2; @@ -69,5 +70,7 @@ public ActionStateSubscriber afterEach(Action1> afterEach) { @Override public void onCompleted() { } - @Override public void onError(Throwable e) { } + @Override public void onError(Throwable e) { + throw new OnErrorNotImplementedException(e); + } } \ No newline at end of file