Skip to content

Commit

Permalink
The subscriptions coalescer now publishes the final message of the in…
Browse files Browse the repository at this point in the history
…ner iterator (#3132)

# Summary

What if a custom transport _returns_ its final value? Before this PR, the coalescer would not deliver the final message, if the `done` property and the `value` came in the same iteration.
  • Loading branch information
steveluscher authored Aug 21, 2024
1 parent 75710e7 commit 2a1a729
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ interface TestRpcSubscriptionNotifications {
}

describe('getRpcSubscriptionsWithSubscriptionCoalescing', () => {
let asyncGenerator: jest.Mock<AsyncGenerator<unknown, void>>;
let asyncGenerator: jest.Mock<AsyncGenerator<unknown, unknown>>;
let createPendingSubscription: jest.Mock;
let getDeduplicationKey: jest.Mock;
let subscribe: jest.Mock;
Expand Down Expand Up @@ -115,6 +115,21 @@ describe('getRpcSubscriptionsWithSubscriptionCoalescing', () => {
await expect(messagePromiseA).resolves.toHaveProperty('value', 'hello');
await expect(messagePromiseB).resolves.toHaveProperty('value', 'hello');
});
it('publishes the final message when the iterable returns', async () => {
expect.assertions(1);
asyncGenerator.mockImplementation(
// eslint-disable-next-line require-yield
async function* () {
return await Promise.resolve('hello');
},
);
const iterable = await rpcSubscriptions
.thingNotifications({ payload: 'hello' })
.subscribe({ abortSignal: new AbortController().signal });
const iterator = iterable[Symbol.asyncIterator]();
const messagePromise = iterator.next();
await expect(messagePromise).resolves.toHaveProperty('value', 'hello');
});
it('aborting a subscription causes it to return', async () => {
expect.assertions(1);
asyncGenerator.mockImplementation(async function* () {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ export function getRpcSubscriptionsWithSubscriptionCoalescing<TRpcSubscriptionsM
while (true) {
const iteratorResult = await safeRace([iterator.next(), abortPromise]);
if (iteratorResult.done) {
return;
return iteratorResult.value;
} else {
yield iteratorResult.value;
}
Expand Down

0 comments on commit 2a1a729

Please sign in to comment.