-
Notifications
You must be signed in to change notification settings - Fork 46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor AsyncStream #704
Refactor AsyncStream #704
Conversation
🦋 Changeset detectedLatest commit: 764d6c0 The changes in this PR will be included in the next version bump. This PR includes changesets to release 1 package
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
Warning Rate limit exceeded@rygine has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 11 minutes and 21 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. WalkthroughThe changes in this pull request primarily focus on the Changes
Possibly related PRs
Suggested reviewers
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (4)
sdks/node-sdk/test/AsyncStream.test.ts (3)
7-28
: Enhance test coverage for value handling.While the basic value streaming is tested, consider these improvements:
- Test the new
return()
method (mentioned in PR objectives) by properly closing the stream- Validate that no additional values are processed after stream completion
- Simplify the test using an array of expected values
Consider this approach:
it("should return values from callbacks", async () => { const stream = new AsyncStream<number>(); + const expected = [1, 2, 3]; stream.callback(null, 1); stream.callback(null, 2); stream.callback(null, 3); - let count = 0; + const received = []; for await (const value of stream) { - if (count === 0) { - expect(value).toBe(1); - } - if (count === 1) { - expect(value).toBe(2); - } - if (count === 2) { - expect(value).toBe(3); - break; - } - count++; + received.push(value); + if (received.length === expected.length) { + await stream.return(); + break; + } } + expect(received).toEqual(expected); + + // Verify no more values are processed after return + stream.callback(null, 4); + for await (const value of stream) { + throw new Error(`Unexpected value: ${value}`); + } });
30-43
: Add cleanup validation after error.The error handling test is good but could be enhanced to verify proper cleanup after the error.
Consider adding:
it("should catch an error thrown in the for..await loop", async () => { const stream = new AsyncStream<number>(); stream.callback(null, 1); try { for await (const value of stream) { expect(value).toBe(1); throw testError; } } catch (error) { expect(error).toBe(testError); expect((error as Error).message).toBe("test"); + + // Verify stream is properly closed after error + stream.callback(null, 2); + for await (const value of stream) { + throw new Error(`Unexpected value: ${value}`); + } } });
45-61
: Simplify error assertions and add state verification.The test has redundant error assertions and could be enhanced to verify stream state after error.
Consider this approach:
it("should catch an error passed to callback", async () => { - const runTest = async () => { - const stream = new AsyncStream<number>(); - stream.callback(testError, 1); - try { - for await (const _value of stream) { - // this block should never be reached - } - } catch (error) { - expect(error).toBe(testError); - expect((error as Error).message).toBe("test"); - } - }; + const stream = new AsyncStream<number>(); + stream.callback(testError, 1); - await expect(runTest()).rejects.toThrow(testError); + let loopExecuted = false; + await expect(async () => { + for await (const _value of stream) { + loopExecuted = true; + } + }).rejects.toThrow(testError); + + // Verify loop was never executed + expect(loopExecuted).toBe(false); + + // Verify stream is properly closed after error + stream.callback(null, 2); + for await (const value of stream) { + throw new Error(`Unexpected value: ${value}`); + } });sdks/node-sdk/test/Conversations.test.ts (1)
336-341
: LGTM! Consider enhancing test coverageThe implementation correctly uses the async iteration pattern. Consider adding these test cases to improve coverage:
- Verify that group messages are properly filtered out
- Add error handling scenarios to test the new error propagation functionality mentioned in the PR objectives
Example test case:
it("should properly handle errors in DM streams", async () => { const stream = client3.conversations.streamDms(); // Simulate an error condition // Verify that the error is properly propagated await expect(async () => { for await (const convo of stream) { // Force an error condition } }).rejects.toThrow(); });
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (6)
sdks/node-sdk/src/AsyncStream.ts
(3 hunks)sdks/node-sdk/src/Conversation.ts
(1 hunks)sdks/node-sdk/src/Conversations.ts
(6 hunks)sdks/node-sdk/test/AsyncStream.test.ts
(1 hunks)sdks/node-sdk/test/Conversation.test.ts
(0 hunks)sdks/node-sdk/test/Conversations.test.ts
(3 hunks)
💤 Files with no reviewable changes (1)
- sdks/node-sdk/test/Conversation.test.ts
✅ Files skipped from review due to trivial changes (1)
- sdks/node-sdk/src/Conversations.ts
🔇 Additional comments (6)
sdks/node-sdk/test/AsyncStream.test.ts (1)
1-4
: LGTM: Clean test setup with appropriate imports.
The test setup follows testing best practices with clear imports and a well-defined test error.
sdks/node-sdk/src/Conversation.ts (1)
122-122
: LGTM! Property rename aligns with async iterator pattern.
The change from stopCallback
to onReturn
correctly implements the AsyncStream refactor, better aligning with standard async iterator patterns where return()
is used for cleanup.
sdks/node-sdk/test/Conversations.test.ts (1)
Line range hint 305-314
: LGTM! Test correctly implements async iteration pattern
The test properly demonstrates the new streaming pattern using for-await-of loop, which automatically handles stream cleanup through the standard return() method.
sdks/node-sdk/src/AsyncStream.ts (3)
15-15
: LGTM!
The addition of the onReturn
property is appropriate and aligns with the intended functionality.
29-29
: LGTM!
Changing the error handling to throw errors ensures that errors are properly propagated to the caller.
65-72
: LGTM!
The addition of the return
method correctly implements the async iterator protocol, allowing for proper stream completion.
Deploying xmtp-js-docs with
|
Latest commit: |
764d6c0
|
Status: | ✅ Deploy successful! |
Preview URL: | https://a513f7ac.xmtp-js.pages.dev |
Branch Preview URL: | https://rygine-fix-async-stream.xmtp-js.pages.dev |
Summary
This PR refactors
AsyncStream
to be more compliant with the async iterator pattern.stop()
withreturn()
stopCallback
property toonReturn
The
return()
method of an async iterator will always be called when the iteration is complete. This allows shutting down streams without manually callingstop()
.With these changes,
for..await const..of
loops will properly throw errors passed into the stream callback.Summary by CodeRabbit
New Features
return
in theAsyncStream
class for improved stream completion handling.stopCallback
toonReturn
for clearer callback functionality in theAsyncStream
class.AsyncStream
and enhanced test coverage for theConversation
class, including member management and message handling.Bug Fixes
AsyncStream
class, allowing errors to propagate to the caller.Tests
Conversation
class, including message sending and updating conversation properties.