From cd992c557b39c58626c5dc4e290a4a32d97df282 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Mon, 9 Sep 2024 11:32:37 +0200 Subject: [PATCH 1/3] Add jetstream consumer management Signed-off-by: Piotr Piotrowski --- Sources/JetStream/Consumer.swift | 377 ++++++++++++++++++ .../JetStream/JetStreamContext+Consumer.swift | 322 +++++++++++++++ .../JetStream/JetStreamContext+Stream.swift | 8 +- Sources/JetStream/Stream+Consumer.swift | 118 ++++++ .../Integration/JetStreamTests.swift | 313 +++++++++++++++ 5 files changed, 1134 insertions(+), 4 deletions(-) create mode 100644 Sources/JetStream/Consumer.swift create mode 100644 Sources/JetStream/JetStreamContext+Consumer.swift create mode 100644 Sources/JetStream/Stream+Consumer.swift diff --git a/Sources/JetStream/Consumer.swift b/Sources/JetStream/Consumer.swift new file mode 100644 index 0000000..52f2d20 --- /dev/null +++ b/Sources/JetStream/Consumer.swift @@ -0,0 +1,377 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import CryptoKit +import Foundation +import Nuid + +public class Consumer { + + private static var rdigits: [UInt8] = Array( + "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz".utf8) + + /// Contains information about the consumer. + /// Note that this may be out of date and reading it does not query the server. + /// For up-to-date stream info use ``Consumer/info()`` + public internal(set) var info: ConsumerInfo + internal let ctx: JetStreamContext + + init(ctx: JetStreamContext, info: ConsumerInfo) { + self.ctx = ctx + self.info = info + } + + /// Retrieves information about the consumer + /// This also refreshes ``Consumer/info``. + /// + /// - Returns ``ConsumerInfo`` from the server. + /// + /// > **Throws:** + /// > - ``JetStreamRequestError`` if the request was unsuccessful. + /// > - ``JetStreamError`` if the server responded with an API error. + public func info() async throws -> ConsumerInfo { + let subj = "CONSUMER.INFO.\(info.stream).\(info.config.name!)" + let info: Response = try await ctx.request(subj) + switch info { + case .success(let info): + self.info = info + return info + case .error(let apiResponse): + throw apiResponse.error + } + } + + internal static func validate(name: String) throws { + guard !name.isEmpty else { + throw JetStreamError.StreamError.nameRequired + } + + let invalidChars = CharacterSet(charactersIn: ">*. /\\") + if name.rangeOfCharacter(from: invalidChars) != nil { + throw JetStreamError.StreamError.invalidStreamName(name) + } + } + + internal static func generateConsumerName() -> String { + let name = nextNuid() + + let hash = SHA256.hash(data: Data(name.utf8)) + let hashData = Data(hash) + + // Convert the first 8 bytes of the hash to the required format. + let base: UInt8 = 36 + + var result = [UInt8]() + for i in 0..<8 { + let index = Int(hashData[i] % base) + result.append(Consumer.rdigits[index]) + } + + // Convert the result array to a string and return it. + return String(bytes: result, encoding: .utf8)! + } +} + +/// `ConsumerInfo` is the detailed information about a JetStream consumer. +public struct ConsumerInfo: Codable { + /// The name of the stream that the consumer is bound to. + public let stream: String + + /// The unique identifier for the consumer. + public let name: String + + /// The timestamp when the consumer was created. + public let created: String + + /// The configuration settings of the consumer, set when creating or updating the consumer. + public let config: ConsumerConfig + + /// Information about the most recently delivered message, including its sequence numbers and timestamp. + public let delivered: SequenceInfo + + /// Indicates the message before the first unacknowledged message. + public let ackFloor: SequenceInfo + + /// The number of messages that have been delivered but not yet acknowledged. + public let numAckPending: Int + + /// The number of messages that have been redelivered and not yet acknowledged. + public let numRedelivered: Int + + /// The count of active pull requests (relevant for pull-based consumers). + public let numWaiting: Int + + /// The number of messages that match the consumer's filter but have not been delivered yet. + public let numPending: UInt64 + + /// Information about the cluster to which this consumer belongs (if applicable). + public let cluster: ClusterInfo? + + /// Indicates whether at least one subscription exists for the delivery subject of this consumer (only for push-based consumers). + public let pushBound: Bool? + + /// The timestamp indicating when this information was gathered by the server. + public let timeStamp: String + + enum CodingKeys: String, CodingKey { + case stream = "stream_name" + case name + case created + case config + case delivered + case ackFloor = "ack_floor" + case numAckPending = "num_ack_pending" + case numRedelivered = "num_redelivered" + case numWaiting = "num_waiting" + case numPending = "num_pending" + case cluster + case pushBound = "push_bound" + case timeStamp = "ts" + } +} + +/// `ConsumerConfig` is the configuration of a JetStream consumer. +public struct ConsumerConfig: Codable, Equatable { + /// Optional name for the consumer. + public var name: String? + + /// Optional durable name for the consumer. + public var durable: String? + + /// Optional description of the consumer. + public var description: String? + + /// Defines from which point to start delivering messages from the stream. + public var deliverPolicy: DeliverPolicy + + /// Optional sequence number from which to start message delivery. + public var optStartSeq: UInt64? + + /// Optional time from which to start message delivery. + public var optStartTime: String? + + /// Defines the acknowledgment policy for the consumer. + public var ackPolicy: AckPolicy + + /// Defines how long the server will wait for an acknowledgment before resending a message. + public var ackWait: NanoTimeInterval? + + /// Defines the maximum number of delivery attempts for a message. + public var maxDeliver: Int? + + /// Specifies the optional back-off intervals for retrying message delivery after a failed acknowledgment. + public var backOff: [NanoTimeInterval]? + + /// Can be used to filter messages delivered from the stream. + public var filterSubject: String? + + /// Defines the rate at which messages are sent to the consumer. + public var replayPolicy: ReplayPolicy + + /// Specifies an optional maximum rate of message delivery in bits per second. + public var rateLimit: UInt64? + + /// Optional frequency for sampling acknowledgments for observability. + public var sampleFrequency: String? + + /// Maximum number of pull requests waiting to be fulfilled. + public var maxWaiting: Int? + + /// Maximum number of outstanding unacknowledged messages. + public var maxAckPending: Int? + + /// Indicates whether only headers of messages should be sent (and no payload). + public var headersOnly: Bool? + + /// Optional maximum batch size a single pull request can make. + public var maxRequestBatch: Int? + + /// Maximum duration a single pull request will wait for messages to be available to pull. + public var maxRequestExpires: NanoTimeInterval? + + /// Optional maximum total bytes that can be requested in a given batch. + public var maxRequestMaxBytes: Int? + + /// Duration which instructs the server to clean up the consumer if it has been inactive for the specified duration. + public var inactiveThreshold: NanoTimeInterval? + + /// Number of replicas for the consumer's state. + public var replicas: Int + + /// Flag to force the consumer to use memory storage rather than inherit the storage type from the stream. + public var memoryStorage: Bool? + + /// Allows filtering messages from a stream by subject. + public var filterSubjects: [String]? + + /// A set of application-defined key-value pairs for associating metadata on the consumer. + public var metadata: [String: String]? + + public init( + name: String? = nil, + durable: String? = nil, + description: String? = nil, + deliverPolicy: DeliverPolicy = .all, + optStartSeq: UInt64? = nil, + optStartTime: String? = nil, + ackPolicy: AckPolicy = .explicit, + ackWait: NanoTimeInterval? = nil, + maxDeliver: Int? = nil, + backOff: [NanoTimeInterval]? = nil, + filterSubject: String? = nil, + replayPolicy: ReplayPolicy = .instant, + rateLimit: UInt64? = nil, + sampleFrequency: String? = nil, + maxWaiting: Int? = nil, + maxAckPending: Int? = nil, + headersOnly: Bool? = nil, + maxRequestBatch: Int? = nil, + maxRequestExpires: NanoTimeInterval? = nil, + maxRequestMaxBytes: Int? = nil, + inactiveThreshold: NanoTimeInterval? = nil, + replicas: Int = 1, + memoryStorage: Bool? = nil, + filterSubjects: [String]? = nil, + metadata: [String: String]? = nil + ) { + self.name = name + self.durable = durable + self.description = description + self.deliverPolicy = deliverPolicy + self.optStartSeq = optStartSeq + self.optStartTime = optStartTime + self.ackPolicy = ackPolicy + self.ackWait = ackWait + self.maxDeliver = maxDeliver + self.backOff = backOff + self.filterSubject = filterSubject + self.replayPolicy = replayPolicy + self.rateLimit = rateLimit + self.sampleFrequency = sampleFrequency + self.maxWaiting = maxWaiting + self.maxAckPending = maxAckPending + self.headersOnly = headersOnly + self.maxRequestBatch = maxRequestBatch + self.maxRequestExpires = maxRequestExpires + self.maxRequestMaxBytes = maxRequestMaxBytes + self.inactiveThreshold = inactiveThreshold + self.replicas = replicas + self.memoryStorage = memoryStorage + self.filterSubjects = filterSubjects + self.metadata = metadata + } + + enum CodingKeys: String, CodingKey { + case name + case durable = "durable_name" + case description + case deliverPolicy = "deliver_policy" + case optStartSeq = "opt_start_seq" + case optStartTime = "opt_start_time" + case ackPolicy = "ack_policy" + case ackWait = "ack_wait" + case maxDeliver = "max_deliver" + case backOff = "backoff" + case filterSubject = "filter_subject" + case replayPolicy = "replay_policy" + case rateLimit = "rate_limit_bps" + case sampleFrequency = "sample_freq" + case maxWaiting = "max_waiting" + case maxAckPending = "max_ack_pending" + case headersOnly = "headers_only" + case maxRequestBatch = "max_batch" + case maxRequestExpires = "max_expires" + case maxRequestMaxBytes = "max_bytes" + case inactiveThreshold = "inactive_threshold" + case replicas = "num_replicas" + case memoryStorage = "mem_storage" + case filterSubjects = "filter_subjects" + case metadata + } +} + +/// `SequenceInfo` has both the consumer and the stream sequence and last activity. +public struct SequenceInfo: Codable, Equatable { + /// Consumer sequence number. + public let consumer: UInt64 + + /// Stream sequence number. + public let stream: UInt64 + + /// Last activity timestamp. + public let last: String? + + enum CodingKeys: String, CodingKey { + case consumer = "consumer_seq" + case stream = "stream_seq" + case last = "last_active" + } +} + +/// `DeliverPolicy` determines from which point to start delivering messages. +public enum DeliverPolicy: String, Codable { + /// DeliverAllPolicy starts delivering messages from the very beginning of stream. This is the default. + case all + + /// DeliverLastPolicy will start the consumer with the last received. + case last + + /// DeliverNewPolicy will only deliver new messages that are sent after consumer is created. + case new + + /// DeliverByStartSequencePolicy will deliver messages starting from a sequence configured with OptStartSeq in ConsumerConfig. + case byStartSequence = "by_start_sequence" + + /// DeliverByStartTimePolicy will deliver messages starting from a given configured with OptStartTime in ConsumerConfig. + case byStartTime = "by_start_time" + + /// DeliverLastPerSubjectPolicy will start the consumer with the last for all subjects received. + case lastPerSubject = "last_per_subject" +} + +/// `AckPolicy` determines how the consumer should acknowledge delivered messages. +public enum AckPolicy: String, Codable { + /// AckNonePolicy requires no acks for delivered messages./ + case none + + /// AckAllPolicy when acking a sequence number, this implicitly acks sequences below this one as well. + case all + + /// AckExplicitPolicy requires ack or nack for all messages. + case explicit +} + +/// `ReplayPolicy` determines how the consumer should replay messages it already has queued in the stream. +public enum ReplayPolicy: String, Codable { + /// ReplayInstantPolicy will replay messages as fast as possible./ + case instant + + /// ReplayOriginalPolicy will maintain the same timing as the messages received. + case original +} + +internal struct CreateConsumerRequest: Codable { + internal let stream: String + internal let config: ConsumerConfig + internal let action: String? + + enum CodingKeys: String, CodingKey { + case stream = "stream_name" + case config + case action + } +} + +struct ConsumerDeleteResponse: Codable { + let success: Bool +} diff --git a/Sources/JetStream/JetStreamContext+Consumer.swift b/Sources/JetStream/JetStreamContext+Consumer.swift new file mode 100644 index 0000000..ebad426 --- /dev/null +++ b/Sources/JetStream/JetStreamContext+Consumer.swift @@ -0,0 +1,322 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import Foundation + +extension JetStreamContext { + + /// Creates a consumer with the specified configuration. + /// + /// - Parameters: + /// - stream: name of the stream the consumer will be created on + /// - cfg: consumer config + /// + /// - Returns: ``Consumer`` object containing ``ConsumerConfig`` and exposing operations on the consumer + /// + /// > **Throws:** + /// > - ``JetStreamError/ConsumerError``: if there was am error creating the consumer. There are several errors which may occur, most common being: + /// > - ``JetStreamError/ConsumerError/invalidConfig(_:)``: if the provided configuration is not valid. + /// > - ``JetStreamError/ConsumerError/consumerNameExist(_:)``: if attempting to overwrite an existing consumer (with different configuration) + /// > - ``JetStreamError/ConsumerError/maximumConsumersLimit(_:)``: if a max number of consumers (specified on stream/account level) has been reached. + /// > - ``JetStreamError/RequestError``: if the request fails if e.g. JetStream is not enabled. + /// > - ``JetStreamError/APIError``: if there was a different API error returned from JetStream. + public func createConsumer(stream: String, cfg: ConsumerConfig) async throws -> Consumer { + try Stream.validate(name: stream) + return try await upsertConsumer(stream: stream, cfg: cfg, action: "create") + } + + /// Updates an existing consumer using specified config. + /// + /// - Parameters: + /// - stream: name of the stream the consumer will be updated on + /// - cfg: consumer config + /// + /// - Returns: ``Consumer`` object containing ``ConsumerConfig`` and exposing operations on the consumer + /// + /// > **Throws:** + /// > - ``JetStreamError/ConsumerError``: if there was am error updating the consumer. There are several errors which may occur, most common being: + /// > - ``JetStreamError/ConsumerError/invalidConfig(_:)``: if the provided configuration is not valid or atteppting to update an illegal property + /// > - ``JetStreamError/ConsumerError/consumerDoesNotExist(_:)``: if attempting to update a non-existing consumer + /// > - ``JetStreamError/RequestError``: if the request fails if e.g. JetStream is not enabled. + /// > - ``JetStreamError/APIError``: if there was a different API error returned from JetStream. + public func updateConsumer(stream: String, cfg: ConsumerConfig) async throws -> Consumer { + try Stream.validate(name: stream) + return try await upsertConsumer(stream: stream, cfg: cfg, action: "update") + } + + /// Creates a consumer with the specified configuration or updates an existing consumer. + /// + /// - Parameters: + /// - stream: name of the stream the consumer will be created on + /// - cfg: consumer config + /// + /// - Returns: ``Consumer`` object containing ``ConsumerConfig`` and exposing operations on the consumer + /// + /// > **Throws:** + /// > - ``JetStreamError/ConsumerError``: if there was am error creating or updatig the consumer. There are several errors which may occur, most common being: + /// > - ``JetStreamError/ConsumerError/invalidConfig(_:)``: if the provided configuration is not valid or atteppting to update an illegal property + /// > - ``JetStreamError/ConsumerError/maximumConsumersLimit(_:)``: if a max number of consumers (specified on stream/account level) has been reached. + /// > - ``JetStreamError/RequestError``: if the request fails if e.g. JetStream is not enabled. + /// > - ``JetStreamError/APIError``: if there was a different API error returned from JetStream. + public func createOrUpdateConsumer(stream: String, cfg: ConsumerConfig) async throws -> Consumer + { + try Stream.validate(name: stream) + return try await upsertConsumer(stream: stream, cfg: cfg) + } + + /// Retrieves a consumer with given name from a stream. + /// + /// - Parameters: + /// - stream: name of the stream the consumer is retrieved from + /// - name: name of the stream + /// + /// - Returns a ``Stream`` object containing ``StreamInfo`` and exposing operations on the stream or nil if stream with given name does not exist. + /// + /// > **Throws:** + /// > - ``JetStreamError/ConsumerError/consumerNotFound(_:)``: if the consumer with given name does not exist on a given stream. + /// > - ``JetStreamError/RequestError``: if the request fails if e.g. JetStream is not enabled. + /// > - ``JetStreamError/APIError``: if there was a different API error returned from JetStream. + public func getConsumer(stream: String, name: String) async throws -> Consumer? { + try Stream.validate(name: stream) + try Consumer.validate(name: name) + + let subj = "CONSUMER.INFO.\(stream).\(name)" + let info: Response = try await request(subj) + switch info { + case .success(let info): + return Consumer(ctx: self, info: info) + case .error(let apiResponse): + if apiResponse.error.errorCode == .consumerNotFound { + return nil + } + if let consumerError = JetStreamError.ConsumerError(from: apiResponse.error) { + throw consumerError + } + throw apiResponse.error + } + } + + /// Deletes a consumer from a stream. + /// + /// - Parameters: + /// - stream: name of the stream the consumer will be created on + /// - name: consumer name + /// + /// > **Throws:** + /// > - ``JetStreamError/ConsumerError/consumerNotFound(_:)``: if the consumer with given name does not exist on a given stream. + /// > - ``JetStreamError/RequestError``: if the request fails if e.g. JetStream is not enabled. + /// > - ``JetStreamError/APIError``: if there was a different API error returned from JetStream. + public func deleteConsumer(stream: String, name: String) async throws { + try Stream.validate(name: stream) + try Consumer.validate(name: name) + + let subject = "CONSUMER.DELETE.\(stream).\(name)" + let resp: Response = try await request(subject) + + switch resp { + case .success(_): + return + case .error(let apiResponse): + if let streamErr = JetStreamError.ConsumerError(from: apiResponse.error) { + throw streamErr + } + throw apiResponse.error + } + } + + internal func upsertConsumer( + stream: String, cfg: ConsumerConfig, action: String? = nil + ) async throws -> Consumer { + let consumerName = cfg.name ?? cfg.durable ?? Consumer.generateConsumerName() + + try Consumer.validate(name: consumerName) + + let createReq = CreateConsumerRequest(stream: stream, config: cfg, action: action) + let req = try! JSONEncoder().encode(createReq) + + var subject: String + if let filterSubject = cfg.filterSubject { + subject = "CONSUMER.CREATE.\(stream).\(consumerName).\(filterSubject)" + } else { + subject = "CONSUMER.CREATE.\(stream).\(consumerName)" + } + + let info: Response = try await request(subject, message: req) + + switch info { + case .success(let info): + return Consumer(ctx: self, info: info) + case .error(let apiResponse): + if let consumerError = JetStreamError.ConsumerError(from: apiResponse.error) { + throw consumerError + } + throw apiResponse.error + } + } + + /// Used to list consumer names. + /// + /// - Parameters: + /// - stream: the name of the strem to list the consumers from. + /// + /// - Returns ``Consumers`` which implements AsyncSequence allowing iteration over stream infos. + public func consumers(stream: String) async -> Consumers { + return Consumers(ctx: self, stream: stream) + } + + /// Used to list consumer names. + /// + /// - Parameters: + /// - stream: the name of the strem to list the consumers from. + /// + /// - Returns ``ConsumerNames`` which implements AsyncSequence allowing iteration over consumer names. + public func consumerNames(stream: String) async -> ConsumerNames { + return ConsumerNames(ctx: self, stream: stream) + } +} + +internal struct ConsumersPagedRequest: Codable { + let offset: Int +} + +/// Used to iterate over consumer names when using ``JetStreamContext/consumerNames(stream:)`` +public struct ConsumerNames: AsyncSequence { + public typealias Element = String + public typealias AsyncIterator = ConsumerNamesIterator + + private let ctx: JetStreamContext + private let stream: String + private var buffer: [String] + private var offset: Int + private var total: Int? + + private struct ConsumerNamesPage: Codable { + let total: Int + let consumers: [String]? + } + + init(ctx: JetStreamContext, stream: String) { + self.stream = stream + self.ctx = ctx + self.buffer = [] + self.offset = 0 + } + + public func makeAsyncIterator() -> ConsumerNamesIterator { + return ConsumerNamesIterator(seq: self) + } + + public mutating func next() async throws -> Element? { + if let consumer = buffer.first { + buffer.removeFirst() + return consumer + } + + if let total = self.total, self.offset >= total { + return nil + } + + // poll consumers + let request = ConsumersPagedRequest(offset: offset) + + let res: Response = try await ctx.request( + "CONSUMER.NAMES.\(self.stream)", message: JSONEncoder().encode(request)) + switch res { + case .success(let names): + guard let consumers = names.consumers else { + return nil + } + self.offset += consumers.count + self.total = names.total + buffer.append(contentsOf: consumers) + return try await self.next() + case .error(let err): + throw err.error + } + + } + + public struct ConsumerNamesIterator: AsyncIteratorProtocol { + var seq: ConsumerNames + + public mutating func next() async throws -> Element? { + try await seq.next() + } + } +} + +/// Used to iterate over consumers when listing consumer infos using ``JetStreamContext/consumers(stream:)`` +public struct Consumers: AsyncSequence { + public typealias Element = ConsumerInfo + public typealias AsyncIterator = ConsumersIterator + + private let ctx: JetStreamContext + private let stream: String + private var buffer: [ConsumerInfo] + private var offset: Int + private var total: Int? + + private struct ConsumersPage: Codable { + let total: Int + let consumers: [ConsumerInfo]? + } + + init(ctx: JetStreamContext, stream: String) { + self.stream = stream + self.ctx = ctx + self.buffer = [] + self.offset = 0 + } + + public func makeAsyncIterator() -> ConsumersIterator { + return ConsumersIterator(seq: self) + } + + public mutating func next() async throws -> Element? { + if let consumer = buffer.first { + buffer.removeFirst() + return consumer + } + + if let total = self.total, self.offset >= total { + return nil + } + + // poll consumers + let request = ConsumersPagedRequest(offset: offset) + + let res: Response = try await ctx.request( + "CONSUMER.LIST.\(self.stream)", message: JSONEncoder().encode(request)) + switch res { + case .success(let infos): + guard let consumers = infos.consumers else { + return nil + } + self.offset += consumers.count + self.total = infos.total + buffer.append(contentsOf: consumers) + return try await self.next() + case .error(let err): + throw err.error + } + + } + + public struct ConsumersIterator: AsyncIteratorProtocol { + var seq: Consumers + + public mutating func next() async throws -> Element? { + try await seq.next() + } + } +} diff --git a/Sources/JetStream/JetStreamContext+Stream.swift b/Sources/JetStream/JetStreamContext+Stream.swift index 2e63a0e..313bafe 100644 --- a/Sources/JetStream/JetStreamContext+Stream.swift +++ b/Sources/JetStream/JetStreamContext+Stream.swift @@ -83,7 +83,7 @@ extension JetStreamContext { /// - Returns: ``Stream`` object containing ``StreamInfo`` and exposing operations on the stream /// /// > **Throws:** - /// > - ``JetStreamError/StreamError`` if there was am error creating the stream. + /// > - ``JetStreamError/StreamError`` if there was am error updating the stream. /// > There are several errors which may occur, most common being: /// > - ``JetStreamError/StreamError/nameRequired`` if the provided stream name is empty. /// > - ``JetStreamError/StreamError/invalidStreamName(_:)`` if the provided stream name is not valid. @@ -144,7 +144,7 @@ extension JetStreamContext { return Streams(ctx: self, subject: subject) } - /// Used to list stream infos. + /// Used to list stream names. /// /// - Returns an ``StreamNames`` which implements AsyncSequence allowing iteration over stream names. /// @@ -170,7 +170,7 @@ public struct Streams: AsyncSequence { private var offset: Int private var total: Int? - struct StreamsInfoPage: Codable { + private struct StreamsInfoPage: Codable { let total: Int let streams: [StreamInfo]? } @@ -235,7 +235,7 @@ public struct StreamNames: AsyncSequence { private var offset: Int private var total: Int? - struct StreamNamesPage: Codable { + private struct StreamNamesPage: Codable { let total: Int let streams: [String]? } diff --git a/Sources/JetStream/Stream+Consumer.swift b/Sources/JetStream/Stream+Consumer.swift new file mode 100644 index 0000000..bc5f91f --- /dev/null +++ b/Sources/JetStream/Stream+Consumer.swift @@ -0,0 +1,118 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import Foundation + +extension Stream { + + /// Creates a consumer with the specified configuration. + /// + /// - Parameters: + /// - cfg: consumer config + /// + /// - Returns: ``Consumer`` object containing ``ConsumerConfig`` and exposing operations on the consumer + /// + /// > **Throws:** + /// > - ``JetStreamError/ConsumerError``: if there was am error creating the stream. There are several errors which may occur, most common being: + /// > - ``JetStreamError/ConsumerError/invalidConfig(_:)``: if the provided configuration is not valid. + /// > - ``JetStreamError/ConsumerError/consumerNameExist(_:)``: if attempting to overwrite an existing consumer (with different configuration) + /// > - ``JetStreamError/ConsumerError/maximumConsumersLimit(_:)``: if a max number of consumers (specified on stream/account level) has been reached. + /// > - ``JetStreamError/RequestError``: if the request fails if e.g. JetStream is not enabled. + /// > - ``JetStreamError/APIError``: if there was a different API error returned from JetStream. + public func createConsumer(cfg: ConsumerConfig) async throws -> Consumer { + return try await ctx.upsertConsumer(stream: info.config.name, cfg: cfg, action: "create") + } + + /// Updates an existing consumer using specified config. + /// + /// - Parameters: + /// - cfg: consumer config + /// + /// - Returns: ``Consumer`` object containing ``ConsumerConfig`` and exposing operations on the consumer + /// + /// > **Throws:** + /// > - ``JetStreamError/ConsumerError``: if there was am error creating the stream. There are several errors which may occur, most common being: + /// > - ``JetStreamError/ConsumerError/invalidConfig(_:)``: if the provided configuration is not valid or atteppting to update an illegal property + /// > - ``JetStreamError/ConsumerError/consumerDoesNotExist(_:)``: if attempting to update a non-existing consumer + /// > - ``JetStreamError/RequestError``: if the request fails if e.g. JetStream is not enabled. + /// > - ``JetStreamError/APIError``: if there was a different API error returned from JetStream. + public func updateConsumer(cfg: ConsumerConfig) async throws -> Consumer { + return try await ctx.upsertConsumer(stream: info.config.name, cfg: cfg, action: "update") + } + + /// Creates a consumer with the specified configuration or updates an existing consumer. + /// + /// - Parameters: + /// - stream: name of the stream the consumer will be created on + /// - cfg: consumer config + /// + /// - Returns: ``Consumer`` object containing ``ConsumerConfig`` and exposing operations on the consumer + /// + /// > **Throws:** + /// > - ``JetStreamError/ConsumerError``: if there was am error creating the stream. There are several errors which may occur, most common being: + /// > - ``JetStreamError/ConsumerError/invalidConfig(_:)``: if the provided configuration is not valid or atteppting to update an illegal property + /// > - ``JetStreamError/ConsumerError/maximumConsumersLimit(_:)``: if a max number of consumers (specified on stream/account level) has been reached. + /// > - ``JetStreamError/RequestError``: if the request fails if e.g. JetStream is not enabled. + /// > - ``JetStreamError/APIError``: if there was a different API error returned from JetStream. + public func createOrUpdateConsumer(cfg: ConsumerConfig) async throws -> Consumer { + return try await ctx.upsertConsumer(stream: info.config.name, cfg: cfg) + } + + /// Retrieves a consumer with given name from a stream. + /// + /// - Parameters: + /// - name: name of the stream + /// + /// - Returns a ``Stream`` object containing ``StreamInfo`` and exposing operations on the stream or nil if stream with given name does not exist. + /// + /// > **Throws:** + /// > - ``JetStreamError/ConsumerError/consumerNotFound(_:)``: if the consumer with given name does not exist on a given stream. + /// > - ``JetStreamError/RequestError``: if the request fails if e.g. JetStream is not enabled. + /// > - ``JetStreamError/APIError``: if there was a different API error returned from JetStream. + public func getConsumer(name: String) async throws -> Consumer? { + return try await ctx.getConsumer(stream: info.config.name, name: name) + } + + /// Deletes a consumer from a stream. + /// + /// - Parameters: + /// - name: consumer name + /// + /// > **Throws:** + /// > - ``JetStreamError/ConsumerError/consumerNotFound(_:)``: if the consumer with given name does not exist on a given stream. + /// > - ``JetStreamError/RequestError``: if the request fails if e.g. JetStream is not enabled. + /// > - ``JetStreamError/APIError``: if there was a different API error returned from JetStream. + public func deleteConsumer(name: String) async throws { + try await ctx.deleteConsumer(stream: info.config.name, name: name) + } + + /// Used to list consumer names. + /// + /// - Parameters: + /// - stream: the name of the strem to list the consumers from. + /// + /// - Returns ``Consumers`` which implements AsyncSequence allowing iteration over stream infos. + public func consumers() async -> Consumers { + return Consumers(ctx: ctx, stream: info.config.name) + } + + /// Used to list consumer names. + /// + /// - Parameters: + /// - stream: the name of the strem to list the consumers from. + /// + /// - Returns ``ConsumerNames`` which implements AsyncSequence allowing iteration over consumer names. + public func consumerNames() async -> ConsumerNames { + return ConsumerNames(ctx: ctx, stream: info.config.name) + } +} diff --git a/Tests/JetStreamTests/Integration/JetStreamTests.swift b/Tests/JetStreamTests/Integration/JetStreamTests.swift index b3f8a66..b7acec4 100644 --- a/Tests/JetStreamTests/Integration/JetStreamTests.swift +++ b/Tests/JetStreamTests/Integration/JetStreamTests.swift @@ -34,6 +34,12 @@ class JetStreamTests: XCTestCase { ("testPurge", testPurge), ("testPurgeSequence", testPurgeSequence), ("testPurgeKeepm", testPurgeKeep), + ("testJetStreamContextConsumerCRUD", testJetStreamContextConsumerCRUD), + ("testStreamConsumerCRUD", testStreamConsumerCRUD), + ("testConsumerConfig", testConsumerConfig), + ("testCreateEphemeralConsumer", testCreateEphemeralConsumer), + ("testConsumerInfo", testConsumerInfo), + ("testListConsumers", testListConsumers), ] var natsServer = NatsServer() @@ -656,4 +662,311 @@ class JetStreamTests: XCTestCase { info = try await stream.info() XCTAssertEqual(info.state.messages, 10) } + + func testStreamConsumerCRUD() async throws { + let bundle = Bundle.module + natsServer.start( + cfg: bundle.url(forResource: "jetstream", withExtension: "conf")!.relativePath) + logger.logLevel = .debug + + let client = NatsClientOptions().url(URL(string: natsServer.clientURL)!).build() + try await client.connect() + + let ctx = JetStreamContext(client: client) + + let streamCfg = StreamConfig(name: "test", subjects: ["foo"]) + let stream = try await ctx.createStream(cfg: streamCfg) + + var expectedConfig = ConsumerConfig( + name: "test", durable: nil, description: nil, deliverPolicy: .all, optStartSeq: nil, + optStartTime: nil, ackPolicy: .explicit, ackWait: NanoTimeInterval(30), maxDeliver: -1, + backOff: nil, filterSubject: nil, replayPolicy: .instant, rateLimit: nil, + sampleFrequency: nil, maxWaiting: 512, maxAckPending: 1000, headersOnly: nil, + maxRequestBatch: nil, maxRequestExpires: nil, maxRequestMaxBytes: nil, + inactiveThreshold: NanoTimeInterval(5), replicas: 1, memoryStorage: nil, + filterSubjects: nil, metadata: nil) + // minimal config + var cfg = ConsumerConfig(name: "test") + _ = try await stream.createConsumer(cfg: cfg) + + // attempt overwriting existing consumer + var errOk = false + do { + _ = try await stream.createConsumer( + cfg: ConsumerConfig(name: "test", description: "cannot update with create")) + } catch JetStreamError.ConsumerError.consumerNameExist(_) { + errOk = true + // success + } + XCTAssertTrue(errOk, "Expected consumer exists error") + + // get a consumer + guard var cons = try await stream.getConsumer(name: "test") else { + XCTFail("Expected a stream, got nil") + return + } + XCTAssertEqual(expectedConfig, cons.info.config) + + // get a non-existing consumer + errOk = false + if let cons = try await stream.getConsumer(name: "bad") { + XCTFail("Expected consumer not found, got: \(cons)") + } + + // update the stream + cfg.description = "updated" + cons = try await stream.updateConsumer(cfg: cfg) + expectedConfig.description = "updated" + + XCTAssertEqual(expectedConfig, cons.info.config) + + // attempt to update illegal consumer property + cfg.memoryStorage = true + errOk = false + do { + _ = try await stream.updateConsumer(cfg: cfg) + } catch JetStreamError.ConsumerError.invalidConfig(_) { + // success + errOk = true + } + + // attempt updating non-existing consumer + errOk = false + do { + _ = try await stream.updateConsumer(cfg: ConsumerConfig(name: "bad")) + } catch JetStreamError.ConsumerError.consumerDoesNotExist(_) { + // success + errOk = true + } + XCTAssertTrue(errOk, "Expected consumer not found error") + + // delete the consumer + try await stream.deleteConsumer(name: "test") + + // make sure the consumer no longer exists + if let _ = try await stream.getConsumer(name: "test") { + XCTFail("Expected consumer not found") + } + } + + func testJetStreamContextConsumerCRUD() async throws { + let bundle = Bundle.module + natsServer.start( + cfg: bundle.url(forResource: "jetstream", withExtension: "conf")!.relativePath) + logger.logLevel = .debug + + let client = NatsClientOptions().url(URL(string: natsServer.clientURL)!).build() + try await client.connect() + + let ctx = JetStreamContext(client: client) + + let streamCfg = StreamConfig(name: "test", subjects: ["foo"]) + _ = try await ctx.createStream(cfg: streamCfg) + + var expectedConfig = ConsumerConfig( + name: "test", durable: nil, description: nil, deliverPolicy: .all, optStartSeq: nil, + optStartTime: nil, ackPolicy: .explicit, ackWait: NanoTimeInterval(30), maxDeliver: -1, + backOff: nil, filterSubject: nil, replayPolicy: .instant, rateLimit: nil, + sampleFrequency: nil, maxWaiting: 512, maxAckPending: 1000, headersOnly: nil, + maxRequestBatch: nil, maxRequestExpires: nil, maxRequestMaxBytes: nil, + inactiveThreshold: NanoTimeInterval(5), replicas: 1, memoryStorage: nil, + filterSubjects: nil, metadata: nil) + // minimal config + var cfg = ConsumerConfig(name: "test") + _ = try await ctx.createConsumer(stream: "test", cfg: cfg) + + // attempt overwriting existing consumer + var errOk = false + do { + _ = try await ctx.createConsumer( + stream: "test", + cfg: ConsumerConfig(name: "test", description: "cannot update with create")) + } catch JetStreamError.ConsumerError.consumerNameExist(_) { + errOk = true + // success + } + XCTAssertTrue(errOk, "Expected consumer exists error") + + // get a consumer + guard var cons = try await ctx.getConsumer(stream: "test", name: "test") else { + XCTFail("Expected a stream, got nil") + return + } + XCTAssertEqual(expectedConfig, cons.info.config) + + // get a non-existing consumer + errOk = false + if let cons = try await ctx.getConsumer(stream: "test", name: "bad") { + XCTFail("Expected consumer not found, got: \(cons)") + } + + // update the stream + cfg.description = "updated" + cons = try await ctx.updateConsumer(stream: "test", cfg: cfg) + expectedConfig.description = "updated" + + XCTAssertEqual(expectedConfig, cons.info.config) + + // attempt to update illegal consumer property + cfg.memoryStorage = true + errOk = false + do { + _ = try await ctx.updateConsumer(stream: "test", cfg: cfg) + } catch JetStreamError.ConsumerError.invalidConfig(_) { + // success + errOk = true + } + + // attempt updating non-existing consumer + errOk = false + do { + _ = try await ctx.updateConsumer(stream: "test", cfg: ConsumerConfig(name: "bad")) + } catch JetStreamError.ConsumerError.consumerDoesNotExist(_) { + // success + errOk = true + } + XCTAssertTrue(errOk, "Expected consumer not found error") + + // delete the consumer + try await ctx.deleteConsumer(stream: "test", name: "test") + + // make sure the consumer no longer exists + if let _ = try await ctx.getConsumer(stream: "test", name: "test") { + XCTFail("Expected consumer not found") + } + } + + func testConsumerConfig() async throws { + let bundle = Bundle.module + natsServer.start( + cfg: bundle.url(forResource: "jetstream", withExtension: "conf")!.relativePath) + logger.logLevel = .debug + + let client = NatsClientOptions().url(URL(string: natsServer.clientURL)!).build() + try await client.connect() + + let ctx = JetStreamContext(client: client) + + let cfg = ConsumerConfig( + name: "test", durable: nil, description: "consumer", deliverPolicy: .byStartSequence, + optStartSeq: 10, optStartTime: nil, ackPolicy: .none, ackWait: NanoTimeInterval(5), + maxDeliver: 100, backOff: [NanoTimeInterval(5), NanoTimeInterval(10)], + filterSubject: "FOO.A", replayPolicy: .original, rateLimit: nil, sampleFrequency: "50", + maxWaiting: 20, maxAckPending: 20, headersOnly: true, maxRequestBatch: 5, + maxRequestExpires: NanoTimeInterval(120), maxRequestMaxBytes: 1024, + inactiveThreshold: NanoTimeInterval(30), replicas: 1, memoryStorage: true, + filterSubjects: nil, metadata: ["a": "b"]) + + let stream = try await ctx.createStream( + cfg: StreamConfig(name: "stream", subjects: ["FOO.*"])) + + let cons = try await stream.createConsumer(cfg: cfg) + + XCTAssertEqual(cfg, cons.info.config) + } + + func testCreateEphemeralConsumer() async throws { + let bundle = Bundle.module + natsServer.start( + cfg: bundle.url(forResource: "jetstream", withExtension: "conf")!.relativePath) + logger.logLevel = .debug + + let client = NatsClientOptions().url(URL(string: natsServer.clientURL)!).build() + try await client.connect() + + let ctx = JetStreamContext(client: client) + let stream = try await ctx.createStream( + cfg: StreamConfig(name: "stream", subjects: ["FOO.*"])) + + let cons = try await stream.createConsumer(cfg: ConsumerConfig()) + + XCTAssertEqual(cons.info.name.count, 8) + } + + func testConsumerInfo() async throws { + let bundle = Bundle.module + natsServer.start( + cfg: bundle.url(forResource: "jetstream", withExtension: "conf")!.relativePath) + logger.logLevel = .debug + + let client = NatsClientOptions().url(URL(string: natsServer.clientURL)!).build() + try await client.connect() + + let ctx = JetStreamContext(client: client) + + let stream = try await ctx.createStream(cfg: StreamConfig(name: "test", subjects: ["foo"])) + + let cfg = ConsumerConfig(name: "cons") + let consumer = try await stream.createConsumer(cfg: cfg) + + let info = try await consumer.info() + XCTAssertEqual(info.config.name, "cons") + + // simulate external update of consumer + let updateJSON = """ + { + "stream_name": "test", + "config": { + "name": "cons", + "description": "updated", + "ack_policy": "explicit" + }, + "action": "update" + } + """ + let data = updateJSON.data(using: .utf8)! + + _ = try await client.request(data, subject: "$JS.API.CONSUMER.CREATE.test.cons") + + XCTAssertNil(consumer.info.config.description) + + let newInfo = try await consumer.info() + XCTAssertEqual(newInfo.config.description, "updated") + XCTAssertEqual(consumer.info.config.description, "updated") + } + + func testListConsumers() async throws { + let bundle = Bundle.module + natsServer.start( + cfg: bundle.url(forResource: "jetstream", withExtension: "conf")!.relativePath) + logger.logLevel = .debug + + let client = NatsClientOptions().url(URL(string: natsServer.clientURL)!).build() + try await client.connect() + + let ctx = JetStreamContext(client: client) + + let stream = try await ctx.createStream( + cfg: StreamConfig(name: "test", subjects: ["foo.*"])) + + for i in 0..<260 { + let cfg = ConsumerConfig(name: "CONSUMER-\(i)") + let _ = try await stream.createConsumer(cfg: cfg) + } + + // list all consumers + var consumers = await stream.consumers() + + var i = 0 + for try await _ in consumers { + i += 1 + } + XCTAssertEqual(i, 260) + + let names = await stream.consumerNames() + i = 0 + for try await _ in names { + i += 1 + } + XCTAssertEqual(i, 260) + + // list consumers on non-existing stream + consumers = await ctx.consumers(stream: "bad") + i = 0 + for try await _ in consumers { + i += 1 + } + XCTAssertEqual(i, 0) + } + } From 61f01f967170c872fe8bb3f22a8bc78078435699 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Wed, 25 Sep 2024 15:47:56 +0200 Subject: [PATCH 2/3] Fixed comparing stream and consumer metadata in tests Signed-off-by: Piotr Piotrowski --- .../Integration/JetStreamTests.swift | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/Tests/JetStreamTests/Integration/JetStreamTests.swift b/Tests/JetStreamTests/Integration/JetStreamTests.swift index b7acec4..80aeb40 100644 --- a/Tests/JetStreamTests/Integration/JetStreamTests.swift +++ b/Tests/JetStreamTests/Integration/JetStreamTests.swift @@ -181,6 +181,9 @@ class JetStreamTests: XCTestCase { consumerLimits: StreamConsumerLimits(inactiveThreshold: nil, maxAckPending: nil), metadata: nil) + // we need to set the metadata to whatever was set by the server as it contains e.g. server version + expectedConfig.metadata = stream.info.config.metadata + XCTAssertEqual(expectedConfig, stream.info.config) // attempt overwriting existing stream @@ -255,7 +258,7 @@ class JetStreamTests: XCTestCase { let ctx = JetStreamContext(client: client) - let cfg = StreamConfig( + var cfg = StreamConfig( name: "full", description: "desc", subjects: ["bar"], retention: .interest, maxConsumers: 50, maxMsgs: 100, maxBytes: 1000, discard: .new, discardNewPerSubject: true, maxAge: NanoTimeInterval(300), maxMsgsPerSubject: 50, @@ -269,6 +272,9 @@ class JetStreamTests: XCTestCase { let stream = try await ctx.createStream(cfg: cfg) + // we need to set the metadata to whatever was set by the server as it contains e.g. server version + cfg.metadata = stream.info.config.metadata + XCTAssertEqual(stream.info.config, cfg) } @@ -705,6 +711,10 @@ class JetStreamTests: XCTestCase { XCTFail("Expected a stream, got nil") return } + + // we need to set the metadata to whatever was set by the server as it contains e.g. server version + expectedConfig.metadata = cons.info.config.metadata + XCTAssertEqual(expectedConfig, cons.info.config) // get a non-existing consumer @@ -792,6 +802,8 @@ class JetStreamTests: XCTestCase { XCTFail("Expected a stream, got nil") return } + // we need to set the metadata to whatever was set by the server as it contains e.g. server version + expectedConfig.metadata = cons.info.config.metadata XCTAssertEqual(expectedConfig, cons.info.config) // get a non-existing consumer @@ -847,7 +859,7 @@ class JetStreamTests: XCTestCase { let ctx = JetStreamContext(client: client) - let cfg = ConsumerConfig( + var cfg = ConsumerConfig( name: "test", durable: nil, description: "consumer", deliverPolicy: .byStartSequence, optStartSeq: 10, optStartTime: nil, ackPolicy: .none, ackWait: NanoTimeInterval(5), maxDeliver: 100, backOff: [NanoTimeInterval(5), NanoTimeInterval(10)], @@ -861,6 +873,7 @@ class JetStreamTests: XCTestCase { cfg: StreamConfig(name: "stream", subjects: ["FOO.*"])) let cons = try await stream.createConsumer(cfg: cfg) + cfg.metadata = cons.info.config.metadata XCTAssertEqual(cfg, cons.info.config) } From 07f3ce21db014b13d4b4322329bded7aced731df Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Mon, 7 Oct 2024 13:38:06 +0200 Subject: [PATCH 3/3] Move filterSubject and filterSubjects next to each other Signed-off-by: Piotr Piotrowski --- Sources/JetStream/Consumer.swift | 12 +++++++----- .../Integration/JetStreamTests.swift | 16 +++++++++------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/Sources/JetStream/Consumer.swift b/Sources/JetStream/Consumer.swift index 52f2d20..8fe8db2 100644 --- a/Sources/JetStream/Consumer.swift +++ b/Sources/JetStream/Consumer.swift @@ -172,9 +172,14 @@ public struct ConsumerConfig: Codable, Equatable { /// Specifies the optional back-off intervals for retrying message delivery after a failed acknowledgment. public var backOff: [NanoTimeInterval]? - /// Can be used to filter messages delivered from the stream. + /// Can be used to filter messages delivered from the stream by given subject. + /// It is exclusive with ``ConsumerConfig/filterSubjects`` public var filterSubject: String? + /// Can be used to filter messages delivered from the stream by given subjects. + /// It is exclusive with ``ConsumerConfig/filterSubject`` + public var filterSubjects: [String]? + /// Defines the rate at which messages are sent to the consumer. public var replayPolicy: ReplayPolicy @@ -211,9 +216,6 @@ public struct ConsumerConfig: Codable, Equatable { /// Flag to force the consumer to use memory storage rather than inherit the storage type from the stream. public var memoryStorage: Bool? - /// Allows filtering messages from a stream by subject. - public var filterSubjects: [String]? - /// A set of application-defined key-value pairs for associating metadata on the consumer. public var metadata: [String: String]? @@ -229,6 +231,7 @@ public struct ConsumerConfig: Codable, Equatable { maxDeliver: Int? = nil, backOff: [NanoTimeInterval]? = nil, filterSubject: String? = nil, + filterSubjects: [String]? = nil, replayPolicy: ReplayPolicy = .instant, rateLimit: UInt64? = nil, sampleFrequency: String? = nil, @@ -241,7 +244,6 @@ public struct ConsumerConfig: Codable, Equatable { inactiveThreshold: NanoTimeInterval? = nil, replicas: Int = 1, memoryStorage: Bool? = nil, - filterSubjects: [String]? = nil, metadata: [String: String]? = nil ) { self.name = name diff --git a/Tests/JetStreamTests/Integration/JetStreamTests.swift b/Tests/JetStreamTests/Integration/JetStreamTests.swift index 80aeb40..ff5e4b2 100644 --- a/Tests/JetStreamTests/Integration/JetStreamTests.swift +++ b/Tests/JetStreamTests/Integration/JetStreamTests.swift @@ -686,11 +686,11 @@ class JetStreamTests: XCTestCase { var expectedConfig = ConsumerConfig( name: "test", durable: nil, description: nil, deliverPolicy: .all, optStartSeq: nil, optStartTime: nil, ackPolicy: .explicit, ackWait: NanoTimeInterval(30), maxDeliver: -1, - backOff: nil, filterSubject: nil, replayPolicy: .instant, rateLimit: nil, + backOff: nil, filterSubject: nil, filterSubjects: nil, replayPolicy: .instant, + rateLimit: nil, sampleFrequency: nil, maxWaiting: 512, maxAckPending: 1000, headersOnly: nil, maxRequestBatch: nil, maxRequestExpires: nil, maxRequestMaxBytes: nil, - inactiveThreshold: NanoTimeInterval(5), replicas: 1, memoryStorage: nil, - filterSubjects: nil, metadata: nil) + inactiveThreshold: NanoTimeInterval(5), replicas: 1, memoryStorage: nil, metadata: nil) // minimal config var cfg = ConsumerConfig(name: "test") _ = try await stream.createConsumer(cfg: cfg) @@ -776,11 +776,12 @@ class JetStreamTests: XCTestCase { var expectedConfig = ConsumerConfig( name: "test", durable: nil, description: nil, deliverPolicy: .all, optStartSeq: nil, optStartTime: nil, ackPolicy: .explicit, ackWait: NanoTimeInterval(30), maxDeliver: -1, - backOff: nil, filterSubject: nil, replayPolicy: .instant, rateLimit: nil, + backOff: nil, filterSubject: nil, filterSubjects: nil, replayPolicy: .instant, + rateLimit: nil, sampleFrequency: nil, maxWaiting: 512, maxAckPending: 1000, headersOnly: nil, maxRequestBatch: nil, maxRequestExpires: nil, maxRequestMaxBytes: nil, inactiveThreshold: NanoTimeInterval(5), replicas: 1, memoryStorage: nil, - filterSubjects: nil, metadata: nil) + metadata: nil) // minimal config var cfg = ConsumerConfig(name: "test") _ = try await ctx.createConsumer(stream: "test", cfg: cfg) @@ -863,11 +864,12 @@ class JetStreamTests: XCTestCase { name: "test", durable: nil, description: "consumer", deliverPolicy: .byStartSequence, optStartSeq: 10, optStartTime: nil, ackPolicy: .none, ackWait: NanoTimeInterval(5), maxDeliver: 100, backOff: [NanoTimeInterval(5), NanoTimeInterval(10)], - filterSubject: "FOO.A", replayPolicy: .original, rateLimit: nil, sampleFrequency: "50", + filterSubject: "FOO.A", filterSubjects: nil, replayPolicy: .original, rateLimit: nil, + sampleFrequency: "50", maxWaiting: 20, maxAckPending: 20, headersOnly: true, maxRequestBatch: 5, maxRequestExpires: NanoTimeInterval(120), maxRequestMaxBytes: 1024, inactiveThreshold: NanoTimeInterval(30), replicas: 1, memoryStorage: true, - filterSubjects: nil, metadata: ["a": "b"]) + metadata: ["a": "b"]) let stream = try await ctx.createStream( cfg: StreamConfig(name: "stream", subjects: ["FOO.*"]))