Handling Backpressure with Special Subjects #6334
Replies: 7 comments 1 reply
-
Wow, just read your article. Looks really tiny and promising, I gonna try it soon |
Beta Was this translation helpful? Give feedback.
-
As this issue was not touched until several months now I suggest to close it. |
Beta Was this translation helpful? Give feedback.
-
Good to have some "official statement" on Backpressure in RxJS to understand the direction. |
Beta Was this translation helpful? Give feedback.
-
There are many upvotes. |
Beta Was this translation helpful? Give feedback.
-
I’ve also created a library/pattern for lossless back-pressure in RxJS: https://www.npmjs.com/package/rx-flowable I have not yet done a proper comparison with the OP's approach, but since this is a discussion now I thought I'd mention. |
Beta Was this translation helpful? Give feedback.
-
I guess here's the thing, in modern JavaScript, AsyncIterable and Observable have enough interop that you can do some pretty slick things without much fuss or any additional libraries: Let's say you were going to read the lines of a file and kick off some RxJS-related expensive thing: // Read some file into a stream
const stream = fs.createReadStream(
path.join(__dirname, "./file.txt")
);
// Throw any errors that happen while waiting for it to open
await once(stream, "open");
// Get a line-by-line reader (This implements Symbol.asyncIterator)
const rl = readline.createInterface({ input: stream, crlfDelay: Infinity });
// This will effectively manage backpressure but provide whatever RxJS functionality you wanted:
for await(const line of rl) {
await getSomeObservable(line)
.pipe(
...someOperatorsHere
)
.forEach(doSomethingWithTheValues);
} If you wanted that as an observable, there's a lot of ways to go now, for example, you can convert back and forth from an async iterable using a library like rxjs-for-await: import { eachValueFrom } from "rxjs-for-await";
// ... snip ...
// In RxJS 7+ `defer`, et al, will convert an async iterator to an observable.
const results$ = defer(async function* () {
for await (const line of rl) {
// here we convert the observable to an async iterable using rxjs-for-await
yield* eachValueFrom(getSomeObservable(line).pipe(...someOperatorsHere));
}
}); Weirdly, this almost makes a solid use case for converting an Observable to an async iterator natively without the function, we just had not added it out of fear of what people might misunderstand about how it works (buffering, etc), then again, people haven't run into too many issues with |
Beta Was this translation helpful? Give feedback.
-
Core Team meeting: 👍 (by quorum of one and utter silence besides). |
Beta Was this translation helpful? Give feedback.
-
Feature Request
Special Subject used for controlled lossless backpressure in RxJS.
Is your feature request related to a problem? Please describe.
Currently, there's no official solution in RxJS to deal with Generators, Iterators, Node.js streams, and other pull-based systems in a controlled way.
#71
Describe the solution you'd like
I'd like a native solution to deal with pull-based systems instead of relying on IxJS.
WebSocketSubject
.Describe alternatives you've considered
I wrote an article about my solutions here:
https://itnext.io/lossless-backpressure-in-rxjs-b6de30a1b6d4
I created functional wrappers around a Node.js transform stream and a Generator to allow a 2-push system to simulate a pull system.
Example implementations:
Iterator - https://gist.github.com/Sawtaytoes/7b6bf968510fe88df3ec98846a641779
Node.js Stream - https://gist.github.com/Sawtaytoes/4896e9ae6091392bebfc6ba027e27bd0
These examples aren't in TypeScript and don't follow the class extension pattern in RxJS's codebase, but they reveal the same API that's used by
WebSocketSubject
. From a practical standpoint, they could be used as temporary measures until official implementations are written.(If this is new operator request) describe reason it should be core operator
There's absolutely no reason another library needs to be used to handle what RxJS should be able to do with its existing tooling.
Beta Was this translation helpful? Give feedback.
All reactions