Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: subscriptions (server-streams) + observable #22

Merged
merged 6 commits into from
Dec 7, 2023

Conversation

jackyzha0
Copy link
Member

@jackyzha0 jackyzha0 commented Dec 7, 2023

supercedes #16

Subscriptions

We'd like the ability to express other procedure types aside from RPC and bidi streams. This PR introduces a new type of procedure type: subscriptions.

Types of procedures available:

  • RPC: msg in, msg out
  • Stream: stream in, stream out
  • Subscription: msg in, stream out

To enable this, we needed to refactor how we type procedure calls. Currently, it looks like client.<service name>.<procedure name>() and then the client figures out whether it's actually an rpc/stream based on the number of arguments we call the proxy object with

  • 0 arguments indicates its a stream open
  • 1 argument indicates its an rpc

Unfortunately, this isn't scaleable / is kind of hacky as this only provides type-level safety. Because the client only has access to the structure of the server at compile-time (via the passed generic) to ensure safety, we have no knowledge of the server structure at run-time.

Turns out tRPC had it right and we should just use qualifiers to indicate the type of procedure so that the client can handle it properly:

  • .rpc(msg) for rpc
  • .stream() for stream
  • .subscribe(msg) for subscription

Observables

With subscriptions, we'd like a way to be able to watch something in a service's state for changes. We can do this pretty easily with EventEmitters but I think River is in a good place to start adding data structure primitives that allow for common usage patterns.

For example, here is a service that has a single count state entry that sends an update to all subscribed clients every time it's updated:

export const SubscribableServiceConstructor = () =>
  ServiceBuilder.create('subscribable')
    .initialState({
      count: new Observable<number>(0),
    })
    .defineProcedure('add', {
      type: 'rpc',
      input: Type.Object({ n: Type.Number() }),
      output: Type.Object({ result: Type.Number() }),
      errors: Type.Never(),
      async handler(ctx, msg) {
        const { n } = msg.payload;
        ctx.state.count.set((prev) => prev + n);
        return reply(msg, Ok({ result: ctx.state.count.get() }));
      },
    })
    .defineProcedure('value', {
      type: 'subscription',
      input: Type.Object({}),
      output: Type.Object({ result: Type.Number() }),
      errors: Type.Never(),
      async handler(ctx, msg, returnStream) {
        ctx.state.count.observe((count) => {
          returnStream.push(reply(msg, Ok({ result: count })));
        });
      },
    })
    .finalize();

@@ -0,0 +1 @@
export { Observable } from './observable';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious what your vision is here? i wouldn't really expect observable to need to be included in the river library

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having common state abstractions that are likely to be useful to service builders make sense to include in a sort of 'standard library'. this of course has fuzzy limits on what should be 'useful' enough to consider but I don't think it was entirely obvious to me how to do something like a subscribable state thing so having a toolbox to reach for is nice

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we had a smol chat irl. having state abstractions sounds great, but it's probably for the best if we keep those in another repository so that River can concern itself only / mostly with how we RPC.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after convo with @lhchavez, we decided to just make this a test fixture (it helps simplify subscription testing but lets not kitchen sink a bunch of data types :))

* Represents an observable value that can be subscribed to for changes.
* @template T - The type of the value being observed.
*/
export class Observable<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

zawinski's law but for javascript libraries is that every eventually has its own observable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

me when i observe all the things

@@ -126,6 +131,64 @@ describe.each(codecs)(
close();
});

test('subscription', async () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dig the name: subscription > server-stream.

@@ -0,0 +1 @@
export { Observable } from './observable';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we had a smol chat irl. having state abstractions sounds great, but it's probably for the best if we keep those in another repository so that River can concern itself only / mostly with how we RPC.

router/client.ts Outdated
: ProcType<Router, ProcName> extends 'subscription' // subscription
? {
subscribe: (
input: Static<ProcInput<Router, ProcName>>, // input
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vvvv optional, but it's exceedingly obvious that this is an input without the comment.

Suggested change
input: Static<ProcInput<Router, ProcName>>, // input
input: Static<ProcInput<Router, ProcName>>,

@@ -192,5 +216,34 @@ export const createClient = <Srv extends Server<Record<string, AnyService>>>(
ControlFlags.StreamOpenBit | ControlFlags.StreamClosedBit;
transport.send(m);
return waitForMessage(transport, belongsToSameStream);
} else if (procType === 'subscribe') {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💆‍♂️

router/server.ts Outdated
@@ -140,11 +141,12 @@ export async function createServer<Services extends Record<string, AnyService>>(
} else if (procedure.type === 'rpc') {
openPromises.push(
(async () => {
for await (const inputMessage of incoming) {
const inputMessage = await incoming.next();
if (inputMessage.value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if the input message is empty? how can we communicate this to the peer?

also nit, can we use the guard clause pattern for less indentation?:

Suggested change
if (inputMessage.value) {
if (!inputMessage.value) {
return;
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inputs are never empty (but they can be the empty object {})

.value is only empty on the end of the stream in this case

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decided to guard on if (inputMessage.done) instead 👍

router/server.ts Outdated Show resolved Hide resolved
@jackyzha0 jackyzha0 merged commit b28dd9d into main Dec 7, 2023
4 checks passed
@jackyzha0 jackyzha0 deleted the jackyzha0/subscriptions branch December 7, 2023 18:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants