Skip to content

Commit

Permalink
Add fetching messages from JetStream
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <[email protected]>
Co-authored-by: Tomasz Pietrek <[email protected]>
  • Loading branch information
piotrpio and Jarema authored Oct 31, 2024
1 parent 4c6f357 commit 436ca4a
Show file tree
Hide file tree
Showing 10 changed files with 1,038 additions and 12 deletions.
220 changes: 220 additions & 0 deletions Sources/JetStream/Consumer+Pull.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import Foundation
import Nats
import Nuid

/// Extension to ``Consumer`` adding pull consumer capabilities.
extension Consumer {

/// Retrieves up to a provided number of messages from a stream.
/// This method will send a single request and deliver requested messages unless time out is met earlier.
///
/// - Parameters:
/// - batch: maximum number of messages to be retrieved
/// - expires: timeout of a pull request
/// - idleHeartbeat: interval in which server should send heartbeat messages (if no user messages are available).
///
/// - Returns: ``FetchResult`` which implements ``AsyncSequence`` allowing iteration over messages.
///
/// - Throws:
/// - ``JetStreamError/FetchError`` if there was an error while fetching messages
public func fetch(
batch: Int, expires: TimeInterval = 30, idleHeartbeat: TimeInterval? = nil
) async throws -> FetchResult {
var request: PullRequest
if let idleHeartbeat {
request = PullRequest(
batch: batch, expires: NanoTimeInterval(expires),
heartbeat: NanoTimeInterval(idleHeartbeat))
} else {
request = PullRequest(batch: batch, expires: NanoTimeInterval(expires))
}

let subject = ctx.apiSubject("CONSUMER.MSG.NEXT.\(info.stream).\(info.name)")
let inbox = "_INBOX.\(nextNuid())"
let sub = try await ctx.client.subscribe(subject: inbox)
try await self.ctx.client.publish(
JSONEncoder().encode(request), subject: subject, reply: inbox)
return FetchResult(ctx: ctx, sub: sub, idleHeartbeat: idleHeartbeat, batch: batch)
}
}

/// Used to iterate over results of ``Consumer/fetch(batch:expires:idleHeartbeat:)``
public class FetchResult: AsyncSequence {
public typealias Element = JetStreamMessage
public typealias AsyncIterator = FetchIterator

private let ctx: JetStreamContext
private let sub: NatsSubscription
private let idleHeartbeat: TimeInterval?
private let batch: Int

init(ctx: JetStreamContext, sub: NatsSubscription, idleHeartbeat: TimeInterval?, batch: Int) {
self.ctx = ctx
self.sub = sub
self.idleHeartbeat = idleHeartbeat
self.batch = batch
}

public func makeAsyncIterator() -> FetchIterator {
return FetchIterator(
ctx: ctx,
sub: self.sub, idleHeartbeat: self.idleHeartbeat, remainingMessages: self.batch)
}

public struct FetchIterator: AsyncIteratorProtocol {
private let ctx: JetStreamContext
private let sub: NatsSubscription
private let idleHeartbeat: TimeInterval?
private var remainingMessages: Int
private var subIterator: NatsSubscription.AsyncIterator

init(
ctx: JetStreamContext, sub: NatsSubscription, idleHeartbeat: TimeInterval?,
remainingMessages: Int
) {
self.ctx = ctx
self.sub = sub
self.idleHeartbeat = idleHeartbeat
self.remainingMessages = remainingMessages
self.subIterator = sub.makeAsyncIterator()
}

public mutating func next() async throws -> JetStreamMessage? {
if remainingMessages <= 0 {
try await sub.unsubscribe()
return nil
}

while true {
let message: NatsMessage?

if let idleHeartbeat = idleHeartbeat {
let timeout = idleHeartbeat * 2
message = try await nextWithTimeout(timeout, subIterator)
} else {
message = try await subIterator.next()
}

guard let message else {
// the subscription has ended
try await sub.unsubscribe()
return nil
}

let status = message.status ?? .ok

switch status {
case .timeout:
try await sub.unsubscribe()
return nil
case .idleHeartbeat:
// in case of idle heartbeat error, we want to
// wait for next message on subscription
continue
case .notFound:
try await sub.unsubscribe()
return nil
case .ok:
remainingMessages -= 1
return JetStreamMessage(message: message, client: ctx.client)
case .badRequest:
try await sub.unsubscribe()
throw JetStreamError.FetchError.badRequest
case .noResponders:
try await sub.unsubscribe()
throw JetStreamError.FetchError.noResponders
case .requestTerminated:
try await sub.unsubscribe()
guard let description = message.description else {
throw JetStreamError.FetchError.invalidResponse
}

let descLower = description.lowercased()
if descLower.contains("message size exceeds maxbytes") {
return nil
} else if descLower.contains("leadership changed") {
throw JetStreamError.FetchError.leadershipChanged
} else if descLower.contains("consumer deleted") {
throw JetStreamError.FetchError.consumerDeleted
} else if descLower.contains("consumer is push based") {
throw JetStreamError.FetchError.consumerIsPush
}
default:
throw JetStreamError.FetchError.unknownStatus(status, message.description)
}

if remainingMessages == 0 {
try await sub.unsubscribe()
}

}
}

func nextWithTimeout(
_ timeout: TimeInterval, _ subIterator: NatsSubscription.AsyncIterator
) async throws -> NatsMessage? {
try await withThrowingTaskGroup(of: NatsMessage?.self) { group in
group.addTask {
return try await subIterator.next()
}
group.addTask {
try await Task.sleep(nanoseconds: UInt64(timeout * 1_000_000_000))
try await sub.unsubscribe()
return nil
}
defer {
group.cancelAll()
}
for try await result in group {
if let msg = result {
return msg
} else {
throw JetStreamError.FetchError.noHeartbeatReceived
}
}
// this should not be reachable
throw JetStreamError.FetchError.noHeartbeatReceived
}
}
}
}

internal struct PullRequest: Codable {
let batch: Int
let expires: NanoTimeInterval
let maxBytes: Int?
let noWait: Bool?
let heartbeat: NanoTimeInterval?

internal init(
batch: Int, expires: NanoTimeInterval, maxBytes: Int? = nil, noWait: Bool? = nil,
heartbeat: NanoTimeInterval? = nil
) {
self.batch = batch
self.expires = expires
self.maxBytes = maxBytes
self.noWait = noWait
self.heartbeat = heartbeat
}

enum CodingKeys: String, CodingKey {
case batch
case expires
case maxBytes = "max_bytes"
case noWait = "no_wait"
case heartbeat = "idle_heartbeat"
}
}
2 changes: 1 addition & 1 deletion Sources/JetStream/Consumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class Consumer {
/// > - ``JetStreamRequestError`` if the request was unsuccessful.
/// > - ``JetStreamError`` if the server responded with an API error.
public func info() async throws -> ConsumerInfo {
let subj = "CONSUMER.INFO.\(info.stream).\(info.config.name!)"
let subj = "CONSUMER.INFO.\(info.stream).\(info.name)"
let info: Response<ConsumerInfo> = try await ctx.request(subj)
switch info {
case .success(let info):
Expand Down
4 changes: 2 additions & 2 deletions Sources/JetStream/JetStreamContext+Stream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ extension JetStreamContext {

/// Used to list stream infos.
///
/// - Returns an ``Streams`` which implements AsyncSequence allowing iteration over streams.
/// - Returns ``Streams`` which implements AsyncSequence allowing iteration over streams.
///
/// - Parameter subject: if provided will be used to filter out returned streams
public func streams(subject: String? = nil) async -> Streams {
Expand All @@ -146,7 +146,7 @@ extension JetStreamContext {

/// Used to list stream names.
///
/// - Returns an ``StreamNames`` which implements AsyncSequence allowing iteration over stream names.
/// - Returns ``StreamNames`` which implements AsyncSequence allowing iteration over stream names.
///
/// - Parameter subject: if provided will be used to filter out returned stream names
public func streamNames(subject: String? = nil) async -> StreamNames {
Expand Down
10 changes: 7 additions & 3 deletions Sources/JetStream/JetStreamContext.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import Nuid

/// A context which can perform jetstream scoped requests.
public class JetStreamContext {
private var client: NatsClient
internal var client: NatsClient
private var prefix: String = "$JS.API"
private var timeout: TimeInterval = 5.0

Expand Down Expand Up @@ -86,7 +86,7 @@ extension JetStreamContext {
let data = message ?? Data()
do {
let response = try await self.client.request(
data, subject: "\(self.prefix).\(subject)", timeout: self.timeout)
data, subject: apiSubject(subject), timeout: self.timeout)
let decoder = JSONDecoder()
guard let payload = response.payload else {
throw JetStreamError.RequestError.emptyResponsePayload
Expand All @@ -108,7 +108,7 @@ extension JetStreamContext {
let data = message ?? Data()
do {
return try await self.client.request(
data, subject: "\(self.prefix).\(subject)", timeout: self.timeout)
data, subject: apiSubject(subject), timeout: self.timeout)
} catch let err as NatsError.RequestError {
switch err {
case .noResponders:
Expand All @@ -120,6 +120,10 @@ extension JetStreamContext {
}
}
}

internal func apiSubject(_ subject: String) -> String {
return "\(self.prefix).\(subject)"
}
}

public struct JetStreamAPIResponse: Codable {
Expand Down
68 changes: 68 additions & 0 deletions Sources/JetStream/JetStreamError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,74 @@ public enum JetStreamError {
}
}

public enum MessageMetadataError: JetStreamErrorProtocol {
case noReplyInMessage
case invalidPrefix
case invalidTokenNum
case invalidTokenValue

public var description: String {
switch self {
case .noReplyInMessage:
return "nats: did not fund reply subject in message"
case .invalidPrefix:
return "nats: invalid reply subject prefix"
case .invalidTokenNum:
return "nats: invalid token count"
case .invalidTokenValue:
return "nats: invalid token value"
}
}
}

public enum FetchError: JetStreamErrorProtocol {
case noHeartbeatReceived
case consumerDeleted
case badRequest
case noResponders
case consumerIsPush
case invalidResponse
case leadershipChanged
case unknownStatus(StatusCode, String?)

public var description: String {
switch self {
case .noHeartbeatReceived:
return "nats: no heartbeat received"
case .consumerDeleted:
return "nats: consumer deleted"
case .badRequest:
return "nats: bad request"
case .noResponders:
return "nats: no responders"
case .consumerIsPush:
return "nats: consumer is push based"
case .invalidResponse:
return "nats: no description in status response"
case .leadershipChanged:
return "nats: leadership changed"
case .unknownStatus(let status, let description):
if let description {
return "nats: unknown response status: \(status): \(description)"
} else {
return "nats: unknown response status: \(status)"
}
}
}

}

public enum AckError: JetStreamErrorProtocol {
case noReplyInMessage

public var description: String {
switch self {
case .noReplyInMessage:
return "nats: did not fund reply subject in message"
}
}
}

public enum StreamError: JetStreamErrorProtocol {
case nameRequired
case invalidStreamName(String)
Expand Down
Loading

0 comments on commit 436ca4a

Please sign in to comment.