Skip to content

Commit

Permalink
fix: Don't require _every_ message to contain procedure information
Browse files Browse the repository at this point in the history
Previously we had the requirement that every single message in a stream
had to have the service name + procedure. But that's very wasteful and
also kind of awkward to be saving in other implementations.

This change does a little refactor so that the server no longer relies
on having this information in every message in a stream. In doing this
refactor, we also now can reason about what the first message is, and
can now correctly validate that the initialization message is received
first and then all future messages only comply with the regular input
type. Also fixed a bug where an overly zealous listener would
incorrectly close all streams upon the first close message.
  • Loading branch information
lhchavez committed Dec 13, 2023
1 parent 105cc11 commit 0644b6b
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 210 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "@replit/river",
"sideEffects": false,
"description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!",
"version": "0.9.1",
"version": "0.9.2",
"type": "module",
"exports": {
".": "./dist/router/index.js",
Expand Down
72 changes: 25 additions & 47 deletions router/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,7 @@ export const createClient = <Srv extends Server<Record<string, AnyService>>>(
const streamId = nanoid();

function belongsToSameStream(msg: OpaqueTransportMessage) {
return (
msg.serviceName === serviceName &&
msg.procedureName === procName &&
msg.streamId === streamId
);
return msg.streamId === streamId;
}

if (procType === 'stream') {
Expand All @@ -205,10 +201,10 @@ export const createClient = <Srv extends Server<Record<string, AnyService>>>(
const m = msg(
transport.clientId,
serverId,
serviceName,
procName,
streamId,
input as object,
serviceName,
procName,
);

// first message needs the open bit.
Expand All @@ -224,13 +220,13 @@ export const createClient = <Srv extends Server<Record<string, AnyService>>>(
const m = msg(
transport.clientId,
serverId,
serviceName,
procName,
streamId,
rawIn as object,
);

if (firstMessage) {
m.serviceName = serviceName;
m.procedureName = procName;
m.controlFlags |= ControlFlags.StreamOpenBit;
firstMessage = false;
}
Expand All @@ -241,9 +237,13 @@ export const createClient = <Srv extends Server<Record<string, AnyService>>>(

// transport -> output
const listener = (msg: OpaqueTransportMessage) => {
if (!belongsToSameStream(msg)) {
return;
}

if (isStreamClose(msg.controlFlags)) {
outputStream.end();
} else if (belongsToSameStream(msg)) {
} else {
outputStream.push(msg.payload);
}
};
Expand All @@ -252,15 +252,7 @@ export const createClient = <Srv extends Server<Record<string, AnyService>>>(
const closeHandler = () => {
inputStream.end();
outputStream.end();
transport.send(
closeStream(
transport.clientId,
serverId,
serviceName,
procName,
streamId,
),
);
transport.send(closeStream(transport.clientId, serverId, streamId));
transport.removeEventListener('message', listener);
};

Expand All @@ -269,10 +261,10 @@ export const createClient = <Srv extends Server<Record<string, AnyService>>>(
const m = msg(
transport.clientId,
serverId,
serviceName,
procName,
streamId,
input as object,
serviceName,
procName,
);

// rpc is a stream open + close
Expand All @@ -284,38 +276,32 @@ export const createClient = <Srv extends Server<Record<string, AnyService>>>(
const m = msg(
transport.clientId,
serverId,
serviceName,
procName,
streamId,
input as object,
serviceName,
procName,
);
m.controlFlags |= ControlFlags.StreamOpenBit;
transport.send(m);

// transport -> output
const outputStream = pushable({ objectMode: true });
const listener = (msg: OpaqueTransportMessage) => {
if (belongsToSameStream(msg)) {
outputStream.push(msg.payload);
if (!belongsToSameStream(msg)) {
return;
}

if (isStreamClose(msg.controlFlags)) {
outputStream.end();
} else {
outputStream.push(msg.payload);
}
};

transport.addEventListener('message', listener);
const closeHandler = () => {
outputStream.end();
transport.send(
closeStream(
transport.clientId,
serverId,
serviceName,
procName,
streamId,
),
);
transport.send(closeStream(transport.clientId, serverId, streamId));
transport.removeEventListener('message', listener);
};

Expand All @@ -328,10 +314,10 @@ export const createClient = <Srv extends Server<Record<string, AnyService>>>(
const m = msg(
transport.clientId,
serverId,
serviceName,
procName,
streamId,
input as object,
serviceName,
procName,
);

// first message needs the open bit.
Expand All @@ -347,29 +333,21 @@ export const createClient = <Srv extends Server<Record<string, AnyService>>>(
const m = msg(
transport.clientId,
serverId,
serviceName,
procName,
streamId,
rawIn as object,
);

if (firstMessage) {
m.controlFlags |= ControlFlags.StreamOpenBit;
m.serviceName = serviceName;
m.procedureName = procName;
firstMessage = false;
}

transport.send(m);
}

transport.send(
closeStream(
transport.clientId,
serverId,
serviceName,
procName,
streamId,
),
);
transport.send(closeStream(transport.clientId, serverId, streamId));
})();

return [inputStream, waitForMessage(transport, belongsToSameStream)];
Expand Down
Loading

0 comments on commit 0644b6b

Please sign in to comment.