Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fetching messages from JetStream #87

Merged
merged 6 commits into from
Oct 31, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 220 additions & 0 deletions Sources/JetStream/Consumer+Pull.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import Foundation
import Nats
import Nuid

/// Extension to ``Consumer`` adding pull consumer capabilities.
extension Consumer {

/// Retrieves up to a provided number of messages from a stream.
/// This method will send a single request and deliver requested messages unless time out is met earlier.
///
/// - Parameters:
/// - batch: maximum number of messages to be retrieved
/// - expires: timeout of a pull request
/// - idleHeartbeat: interval in which server should send heartbeat messages (if no user messages are available).
///
/// - Returns: ``FetchResult`` which implements ``AsyncSequence`` allowing iteration over messages.
///
/// - Throws:
/// - ``JetStreamError/FetchError`` if there was an error while fetching messages
public func fetch(
batch: Int, expires: TimeInterval = 30, idleHeartbeat: TimeInterval? = nil
) async throws -> FetchResult {
var request: PullRequest
if let idleHeartbeat {
request = PullRequest(
batch: batch, expires: NanoTimeInterval(expires),
heartbeat: NanoTimeInterval(idleHeartbeat))
} else {
request = PullRequest(batch: batch, expires: NanoTimeInterval(expires))
}

let subject = ctx.apiSubject("CONSUMER.MSG.NEXT.\(info.stream).\(info.name)")
let inbox = "_INBOX.\(nextNuid())"
let sub = try await ctx.client.subscribe(subject: inbox)
try await self.ctx.client.publish(
JSONEncoder().encode(request), subject: subject, reply: inbox)
return FetchResult(ctx: ctx, sub: sub, idleHeartbeat: idleHeartbeat, batch: batch)
}
}

/// Used to iterate over results of ``Consumer/fetch(batch:expires:idleHeartbeat:)``
public class FetchResult: AsyncSequence {
public typealias Element = JetStreamMessage
public typealias AsyncIterator = FetchIterator

private let ctx: JetStreamContext
private let sub: NatsSubscription
private let idleHeartbeat: TimeInterval?
private let batch: Int

init(ctx: JetStreamContext, sub: NatsSubscription, idleHeartbeat: TimeInterval?, batch: Int) {
self.ctx = ctx
self.sub = sub
self.idleHeartbeat = idleHeartbeat
self.batch = batch
}

public func makeAsyncIterator() -> FetchIterator {
return FetchIterator(
ctx: ctx,
sub: self.sub, idleHeartbeat: self.idleHeartbeat, remainingMessages: self.batch)
}

public struct FetchIterator: AsyncIteratorProtocol {
private let ctx: JetStreamContext
private let sub: NatsSubscription
private let idleHeartbeat: TimeInterval?
private var remainingMessages: Int
private var subIterator: NatsSubscription.AsyncIterator

init(
ctx: JetStreamContext, sub: NatsSubscription, idleHeartbeat: TimeInterval?,
remainingMessages: Int
) {
self.ctx = ctx
self.sub = sub
self.idleHeartbeat = idleHeartbeat
self.remainingMessages = remainingMessages
self.subIterator = sub.makeAsyncIterator()
}

public mutating func next() async throws -> JetStreamMessage? {
if remainingMessages <= 0 {
try await sub.unsubscribe()
return nil
}

while true {
let message: NatsMessage?

if let idleHeartbeat = idleHeartbeat {
let timeout = idleHeartbeat * 2
message = try await nextWithTimeout(timeout, subIterator)
} else {
message = try await subIterator.next()
}

guard let message else {
// the subscription has ended
try await sub.unsubscribe()
return nil
}

let status = message.status ?? .ok

switch status {
case .timeout:
try await sub.unsubscribe()
return nil
case .idleHeartbeat:
// in case of idle heartbeat error, we want to
// wait for next message on subscription
continue
case .notFound:
try await sub.unsubscribe()
return nil
case .ok:
remainingMessages -= 1
return JetStreamMessage(message: message, client: ctx.client)
case .badRequest:
try await sub.unsubscribe()
throw JetStreamError.FetchError.badRequest
case .noResponders:
try await sub.unsubscribe()
throw JetStreamError.FetchError.noResponders
case .requestTerminated:
try await sub.unsubscribe()
guard let description = message.description else {
throw JetStreamError.FetchError.invalidResponse
}

let descLower = description.lowercased()
if descLower.contains("message size exceeds maxbytes") {
return nil
} else if descLower.contains("leadership changed") {
throw JetStreamError.FetchError.leadershipChanged
} else if descLower.contains("consumer deleted") {
throw JetStreamError.FetchError.consumerDeleted
} else if descLower.contains("consumer is push based") {
throw JetStreamError.FetchError.consumerIsPush
}
default:
throw JetStreamError.FetchError.unknownStatus(status, message.description)
}

if remainingMessages == 0 {
try await sub.unsubscribe()
}

}
}

func nextWithTimeout(
_ timeout: TimeInterval, _ subIterator: NatsSubscription.AsyncIterator
) async throws -> NatsMessage? {
try await withThrowingTaskGroup(of: NatsMessage?.self) { group in
group.addTask {
return try await subIterator.next()
}
group.addTask {
try await Task.sleep(nanoseconds: UInt64(timeout * 1_000_000_000))
try await sub.unsubscribe()
return nil
}
defer {
group.cancelAll()
}
for try await result in group {
if let msg = result {
return msg
} else {
throw JetStreamError.FetchError.noHeartbeatReceived
}
}
// this should not be reachable
throw JetStreamError.FetchError.noHeartbeatReceived
}
}
}
}

internal struct PullRequest: Codable {
let batch: Int
let expires: NanoTimeInterval
let maxBytes: Int?
let noWait: Bool?
let heartbeat: NanoTimeInterval?

internal init(
batch: Int, expires: NanoTimeInterval, maxBytes: Int? = nil, noWait: Bool? = nil,
heartbeat: NanoTimeInterval? = nil
) {
self.batch = batch
self.expires = expires
self.maxBytes = maxBytes
self.noWait = noWait
self.heartbeat = heartbeat
}

enum CodingKeys: String, CodingKey {
case batch
case expires
case maxBytes = "max_bytes"
case noWait = "no_wait"
case heartbeat = "idle_heartbeat"
}
}
2 changes: 1 addition & 1 deletion Sources/JetStream/Consumer.swift
Original file line number Diff line number Diff line change
@@ -40,7 +40,7 @@ public class Consumer {
/// > - ``JetStreamRequestError`` if the request was unsuccessful.
/// > - ``JetStreamError`` if the server responded with an API error.
public func info() async throws -> ConsumerInfo {
let subj = "CONSUMER.INFO.\(info.stream).\(info.config.name!)"
let subj = "CONSUMER.INFO.\(info.stream).\(info.name)"
let info: Response<ConsumerInfo> = try await ctx.request(subj)
switch info {
case .success(let info):
4 changes: 2 additions & 2 deletions Sources/JetStream/JetStreamContext+Stream.swift
Original file line number Diff line number Diff line change
@@ -137,7 +137,7 @@ extension JetStreamContext {

/// Used to list stream infos.
///
/// - Returns an ``Streams`` which implements AsyncSequence allowing iteration over streams.
/// - Returns ``Streams`` which implements AsyncSequence allowing iteration over streams.
///
/// - Parameter subject: if provided will be used to filter out returned streams
public func streams(subject: String? = nil) async -> Streams {
@@ -146,7 +146,7 @@ extension JetStreamContext {

/// Used to list stream names.
///
/// - Returns an ``StreamNames`` which implements AsyncSequence allowing iteration over stream names.
/// - Returns ``StreamNames`` which implements AsyncSequence allowing iteration over stream names.
///
/// - Parameter subject: if provided will be used to filter out returned stream names
public func streamNames(subject: String? = nil) async -> StreamNames {
10 changes: 7 additions & 3 deletions Sources/JetStream/JetStreamContext.swift
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@ import Nuid

/// A context which can perform jetstream scoped requests.
public class JetStreamContext {
private var client: NatsClient
internal var client: NatsClient
private var prefix: String = "$JS.API"
private var timeout: TimeInterval = 5.0

@@ -86,7 +86,7 @@ extension JetStreamContext {
let data = message ?? Data()
do {
let response = try await self.client.request(
data, subject: "\(self.prefix).\(subject)", timeout: self.timeout)
data, subject: apiSubject(subject), timeout: self.timeout)
let decoder = JSONDecoder()
guard let payload = response.payload else {
throw JetStreamError.RequestError.emptyResponsePayload
@@ -108,7 +108,7 @@ extension JetStreamContext {
let data = message ?? Data()
do {
return try await self.client.request(
data, subject: "\(self.prefix).\(subject)", timeout: self.timeout)
data, subject: apiSubject(subject), timeout: self.timeout)
} catch let err as NatsError.RequestError {
switch err {
case .noResponders:
@@ -120,6 +120,10 @@ extension JetStreamContext {
}
}
}

internal func apiSubject(_ subject: String) -> String {
return "\(self.prefix).\(subject)"
}
}

public struct JetStreamAPIResponse: Codable {
68 changes: 68 additions & 0 deletions Sources/JetStream/JetStreamError.swift
Original file line number Diff line number Diff line change
@@ -49,6 +49,74 @@ public enum JetStreamError {
}
}

public enum MessageMetadataError: JetStreamErrorProtocol {
case noReplyInMessage
case invalidPrefix
case invalidTokenNum
case invalidTokenValue

public var description: String {
switch self {
case .noReplyInMessage:
return "nats: did not fund reply subject in message"
case .invalidPrefix:
return "nats: invalid reply subject prefix"
case .invalidTokenNum:
return "nats: invalid token count"
case .invalidTokenValue:
return "nats: invalid token value"
}
}
}

public enum FetchError: JetStreamErrorProtocol {
case noHeartbeatReceived
case consumerDeleted
case badRequest
case noResponders
case consumerIsPush
case invalidResponse
case leadershipChanged
case unknownStatus(StatusCode, String?)

public var description: String {
switch self {
case .noHeartbeatReceived:
return "nats: no heartbeat received"
case .consumerDeleted:
return "nats: consumer deleted"
case .badRequest:
return "nats: bad request"
case .noResponders:
return "nats: no responders"
case .consumerIsPush:
return "nats: consumer is push based"
case .invalidResponse:
return "nats: no description in status response"
case .leadershipChanged:
return "nats: leadership changed"
case .unknownStatus(let status, let description):
if let description {
return "nats: unknown response status: \(status): \(description)"
} else {
return "nats: unknown response status: \(status)"
}
}
}

}

public enum AckError: JetStreamErrorProtocol {
case noReplyInMessage

public var description: String {
switch self {
case .noReplyInMessage:
return "nats: did not fund reply subject in message"
}
}
}

public enum StreamError: JetStreamErrorProtocol {
case nameRequired
case invalidStreamName(String)
193 changes: 193 additions & 0 deletions Sources/JetStream/JetStreamMessage.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import Foundation
import Nats

/// Representation of NATS message in the context of JetStream.
/// It exposes message properties (payload, headers etc.) and various methods for acknowledging delivery.
/// It also allows for checking message metadata.
public struct JetStreamMessage {
private let message: NatsMessage

/// Message payload.
public var payload: Data? { message.payload }

/// Message headers.
public var headers: NatsHeaderMap? { message.headers }

/// The subject the message was published on.
public var subject: String { message.subject }

/// Reply subject used for acking a message.
public var reply: String? { message.replySubject }

internal let client: NatsClient

private let emptyPayload = "".data(using: .utf8)!

internal init(message: NatsMessage, client: NatsClient) {
self.message = message
self.client = client
}

/// Sends an acknowledgement of given kind to the server.
///
/// - Parameter ackType: the type of acknowledgement being sent (defaults to ``AckKind/ack``. For details, see ``AckKind``.
/// - Throws:
/// - ``JetStreamError/AckError`` if there was an error sending the acknowledgement.
public func ack(ackType: AckKind = .ack) async throws {
guard let subject = message.replySubject else {
throw JetStreamError.AckError.noReplyInMessage
}
try await client.publish(ackType.payload(), subject: subject)
}

/// Parses the reply subject of the message, exposing JetStream message metadata.
///
/// - Returns ``MessageMetadata``
///
/// - Throws:
/// - ``JetStreamError/MessageMetadataError`` when there is an error parsing metadata.
public func metadata() throws -> MessageMetadata {
let prefix = "$JS.ACK."
guard let subject = message.replySubject else {
throw JetStreamError.MessageMetadataError.noReplyInMessage
}
if !subject.starts(with: prefix) {
throw JetStreamError.MessageMetadataError.invalidPrefix
}

let startIndex = subject.index(subject.startIndex, offsetBy: prefix.count)
let parts = subject[startIndex...].split(separator: ".")

return try MessageMetadata(tokens: parts)
}
}

/// Represents various types of JetStream message acknowledgement.
public enum AckKind {
/// Normal acknowledgemnt
case ack
/// Negative ack, message will be redelivered (immediately or after given delay)
case nak(delay: TimeInterval? = nil)
/// Marks the message as being processed, resets ack wait timer delaying evential redelivery.
case inProgress
/// Marks the message as terminated, it will never be redelivered.
case term(reason: String? = nil)

func payload() -> Data {
switch self {
case .ack:
return "+ACK".data(using: .utf8)!
case .nak(let delay):
if let delay {
let delayStr = String(Int64(delay * 1_000_000_000))
return "-NAK {\"delay\":\(delayStr)}".data(using: .utf8)!
} else {
return "-NAK".data(using: .utf8)!
}
case .inProgress:
return "+WPI".data(using: .utf8)!
case .term(let reason):
if let reason {
return "+TERM \(reason)".data(using: .utf8)!
} else {
return "+TERM".data(using: .utf8)!
}
}
}
}

/// Metadata of a JetStream message.
public struct MessageMetadata {
/// The domain this message was received on.
public let domain: String?

/// Optional account hash, present in servers post-ADR-15.
public let accountHash: String?

/// Name of the stream the message is delivered from.
public let stream: String

/// Name of the consumer the mesasge is delivered from.
public let consumer: String

/// Number of delivery attempts of this message.
public let delivered: UInt64

/// Stream sequence associated with this message.
public let streamSequence: UInt64

/// Consumer sequence associated with this message.
public let consumerSequence: UInt64

/// The time this message was received by the server from the publisher.
public let timestamp: String

/// The number of messages known by the server to be pending to this consumer.
public let pending: UInt64

private let v1TokenCount = 7
private let v2TokenCount = 9

init(tokens: [Substring]) throws {
if tokens.count >= v2TokenCount {
self.domain = String(tokens[0])
self.accountHash = String(tokens[1])
self.stream = String(tokens[2])
self.consumer = String(tokens[3])
guard let delivered = UInt64(tokens[4]) else {
throw JetStreamError.MessageMetadataError.invalidTokenValue
}
self.delivered = delivered
guard let sseq = UInt64(tokens[5]) else {
throw JetStreamError.MessageMetadataError.invalidTokenValue
}
self.streamSequence = sseq
guard let cseq = UInt64(tokens[6]) else {
throw JetStreamError.MessageMetadataError.invalidTokenValue
}
self.consumerSequence = cseq
self.timestamp = String(tokens[7])
guard let pending = UInt64(tokens[8]) else {
throw JetStreamError.MessageMetadataError.invalidTokenValue
}
self.pending = pending
} else if tokens.count == v1TokenCount {
self.domain = nil
self.accountHash = nil
self.stream = String(tokens[0])
self.consumer = String(tokens[1])
guard let delivered = UInt64(tokens[2]) else {
throw JetStreamError.MessageMetadataError.invalidTokenValue
}
self.delivered = delivered
guard let sseq = UInt64(tokens[3]) else {
throw JetStreamError.MessageMetadataError.invalidTokenValue
}
self.streamSequence = sseq
guard let cseq = UInt64(tokens[4]) else {
throw JetStreamError.MessageMetadataError.invalidTokenValue
}
self.consumerSequence = cseq
self.timestamp = String(tokens[5])
guard let pending = UInt64(tokens[6]) else {
throw JetStreamError.MessageMetadataError.invalidTokenValue
}
self.pending = pending
} else {
throw JetStreamError.MessageMetadataError.invalidTokenNum
}
}
}
18 changes: 12 additions & 6 deletions Sources/Nats/NatsMessage.swift
Original file line number Diff line number Diff line change
@@ -24,15 +24,21 @@ public struct NatsMessage {
}

public struct StatusCode: Equatable {
public static let idleHeartbeat = StatusCode(100)
public static let ok = StatusCode(200)
public static let notFound = StatusCode(404)
public static let timeout = StatusCode(408)
public static let noResponders = StatusCode(503)
public static let requestTerminated = StatusCode(409)
public static let idleHeartbeat = StatusCode(value: 100)
public static let ok = StatusCode(value: 200)
public static let badRequest = StatusCode(value: 400)
public static let notFound = StatusCode(value: 404)
public static let timeout = StatusCode(value: 408)
public static let noResponders = StatusCode(value: 503)
public static let requestTerminated = StatusCode(value: 409)

let value: UInt16

// non-optional initializer for static status codes
private init(value: UInt16) {
self.value = value
}

init?(_ value: UInt16) {
if !(100..<1000 ~= value) {
return nil
415 changes: 415 additions & 0 deletions Tests/JetStreamTests/Integration/ConsumerTests.swift

Large diffs are not rendered by default.

119 changes: 119 additions & 0 deletions Tests/JetStreamTests/Unit/MessageTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import XCTest

@testable import JetStream
@testable import Nats

class JetStreamMessageTests: XCTestCase {

static var allTests = [
("testValidOldFormatMessage", testValidOldFormatMessage),
("testValidNewFormatMessage", testValidNewFormatMessage),
("testMissingTokens", testMissingTokens),
("testInvalidTokenValues", testInvalidTokenValues),
("testInvalidPrefix", testInvalidPrefix),
("testNoReplySubject", testNoReplySubject),
]

func testValidOldFormatMessage() async throws {
let replySubject = "$JS.ACK.myStream.myConsumer.10.20.30.1234567890.5"
let natsMessage = NatsMessage(
payload: nil, subject: "", replySubject: replySubject, length: 0, headers: nil,
status: nil, description: nil)
let jetStreamMessage = JetStreamMessage(message: natsMessage, client: NatsClient())

let metadata = try jetStreamMessage.metadata()

XCTAssertNil(metadata.domain)
XCTAssertNil(metadata.accountHash)
XCTAssertEqual(metadata.stream, "myStream")
XCTAssertEqual(metadata.consumer, "myConsumer")
XCTAssertEqual(metadata.delivered, 10)
XCTAssertEqual(metadata.streamSequence, 20)
XCTAssertEqual(metadata.consumerSequence, 30)
XCTAssertEqual(metadata.timestamp, "1234567890")
XCTAssertEqual(metadata.pending, 5)
}

func testValidNewFormatMessage() async throws {
let replySubject = "$JS.ACK.domain.accountHash123.myStream.myConsumer.10.20.30.1234567890.5"
let natsMessage = NatsMessage(
payload: nil, subject: "", replySubject: replySubject, length: 0, headers: nil,
status: nil, description: nil)
let jetStreamMessage = JetStreamMessage(message: natsMessage, client: NatsClient())
let metadata = try jetStreamMessage.metadata()

XCTAssertEqual(metadata.domain, "domain")
XCTAssertEqual(metadata.accountHash, "accountHash123")
XCTAssertEqual(metadata.stream, "myStream")
XCTAssertEqual(metadata.consumer, "myConsumer")
XCTAssertEqual(metadata.delivered, 10)
XCTAssertEqual(metadata.streamSequence, 20)
XCTAssertEqual(metadata.consumerSequence, 30)
XCTAssertEqual(metadata.timestamp, "1234567890")
XCTAssertEqual(metadata.pending, 5)
}

func testMissingTokens() async throws {
let replySubject = "$JS.ACK.myStream.myConsumer"
let natsMessage = NatsMessage(
payload: nil, subject: "", replySubject: replySubject, length: 0, headers: nil,
status: nil, description: nil)
let jetStreamMessage = JetStreamMessage(message: natsMessage, client: NatsClient())
do {
_ = try jetStreamMessage.metadata()
} catch JetStreamError.MessageMetadataError.invalidTokenNum {
return
}
}

func testInvalidTokenValues() async throws {
let replySubject = "$JS.ACK.myStream.myConsumer.invalid.20.30.1234567890.5"
let natsMessage = NatsMessage(
payload: nil, subject: "", replySubject: replySubject, length: 0, headers: nil,
status: nil, description: nil)
let jetStreamMessage = JetStreamMessage(message: natsMessage, client: NatsClient())
do {
_ = try jetStreamMessage.metadata()
} catch JetStreamError.MessageMetadataError.invalidTokenValue {
return
}
}

func testInvalidPrefix() async throws {
let replySubject = "$JS.WRONG.myStream.myConsumer.10.20.30.1234567890.5"
let natsMessage = NatsMessage(
payload: nil, subject: "", replySubject: replySubject, length: 0, headers: nil,
status: nil, description: nil)
let jetStreamMessage = JetStreamMessage(message: natsMessage, client: NatsClient())
do {
_ = try jetStreamMessage.metadata()
} catch JetStreamError.MessageMetadataError.invalidPrefix {
return
}
}

func testNoReplySubject() async throws {
let natsMessage = NatsMessage(
payload: nil, subject: "", replySubject: nil, length: 0, headers: nil, status: nil,
description: nil)
let jetStreamMessage = JetStreamMessage(message: natsMessage, client: NatsClient())
do {
_ = try jetStreamMessage.metadata()
} catch JetStreamError.MessageMetadataError.noReplyInMessage {
return
}
}
}
1 change: 1 addition & 0 deletions Tests/NatsTests/Integration/ConnectionTests.swift
Original file line number Diff line number Diff line change
@@ -48,6 +48,7 @@ class CoreNatsTests: XCTestCase {
("testRequest", testRequest),
("testRequest_noResponders", testRequest_noResponders),
("testRequest_permissionDenied", testRequest_permissionDenied),
("testRequest_timeout", testRequest_timeout),
("testPublishOnClosedConnection", testPublishOnClosedConnection),
("testCloseClosedConnection", testCloseClosedConnection),
("testSuspendClosedConnection", testSuspendClosedConnection),