From 43c6e6b676a45116086b2363c8170e62404d33bd Mon Sep 17 00:00:00 2001 From: Elijah Quartey Date: Wed, 29 May 2024 15:20:13 -0500 Subject: [PATCH] chore: pull latest amplify swift --- .../InitialSync/InitialSyncOperation.swift | 43 ++++++++-------- .../OutgoingMutationQueue.swift | 11 ++-- .../SyncMutationToCloudOperation.swift | 4 +- ...omingAsyncSubscriptionEventPublisher.swift | 38 +++++++------- .../Auth/AWSAuthModeStrategy.swift | 51 ++++++++----------- .../Auth/AmplifyAuthorizationType.swift | 29 +++++++++++ .../Operation/RetryableGraphQLOperation.swift | 8 +-- 7 files changed, 105 insertions(+), 79 deletions(-) create mode 100644 packages/amplify_datastore/ios/internal/AWSPluginsCore/Auth/AmplifyAuthorizationType.swift diff --git a/packages/amplify_datastore/ios/internal/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOperation.swift b/packages/amplify_datastore/ios/internal/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOperation.swift index 780524075e4..096e6aa8004 100644 --- a/packages/amplify_datastore/ios/internal/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOperation.swift +++ b/packages/amplify_datastore/ios/internal/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOperation.swift @@ -168,26 +168,29 @@ final class InitialSyncOperation: AsynchronousOperation { let minSyncPageSize = Int(min(syncMaxRecords - recordsReceived, syncPageSize)) let limit = minSyncPageSize < 0 ? Int(syncPageSize) : minSyncPageSize let authTypes = await authModeStrategy.authTypesFor(schema: modelSchema, operation: .read) - .publisher() - .map { Optional.some($0) } // map to optional to have nil as element - .replaceEmpty(with: nil) // use a nil element to trigger default auth if no auth provided - .map { authType in { [weak self] in - guard let self, let api = self.api else { - throw APIError.operationError("Operation cancelled", "") - } - - return try await api.query(request: GraphQLRequest.syncQuery( - modelSchema: self.modelSchema, - where: self.syncPredicate, - limit: limit, - nextToken: nextToken, - lastSync: lastSyncTime, - authType: authType - )) - }} - .eraseToAnyPublisher() - - switch await RetryableGraphQLOperation(requestStream: authTypes).run() { + let queryRequestsStream = AsyncStream { continuation in + for authType in authTypes { + continuation.yield({ [weak self] in + guard let self, let api = self.api else { + throw APIError.operationError( + "The initial synchronization process can no longer be accessed or referred to", + "The initial synchronization process may be cancelled or terminated" + ) + } + + return try await api.query(request: GraphQLRequest.syncQuery( + modelSchema: self.modelSchema, + where: self.syncPredicate, + limit: limit, + nextToken: nextToken, + lastSync: lastSyncTime, + authType: authType.awsAuthType + )) + }) + } + continuation.finish() + } + switch await RetryableGraphQLOperation(requestStream: queryRequestsStream).run() { case .success(let graphQLResult): await handleQueryResults(lastSyncTime: lastSyncTime, graphQLResult: graphQLResult) case .failure(let apiError): diff --git a/packages/amplify_datastore/ios/internal/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift b/packages/amplify_datastore/ios/internal/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift index 82ac706dce6..a2f5e1fe4b0 100644 --- a/packages/amplify_datastore/ios/internal/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift +++ b/packages/amplify_datastore/ios/internal/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift @@ -25,10 +25,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior { private let operationQueue: OperationQueue /// A DispatchQueue for synchronizing state on the mutation queue - private let mutationDispatchQueue = DispatchQueue( - label: "com.amazonaws.OutgoingMutationQueue", - target: DispatchQueue.global() - ) + private let mutationDispatchQueue = TaskQueue() private weak var api: APICategoryGraphQLBehavior? private weak var reconciliationQueue: IncomingEventReconciliationQueue? @@ -53,7 +50,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior { let operationQueue = OperationQueue() operationQueue.name = "com.amazonaws.OutgoingMutationOperationQueue" - operationQueue.underlyingQueue = mutationDispatchQueue + operationQueue.qualityOfService = .default operationQueue.maxConcurrentOperationCount = 1 operationQueue.isSuspended = true @@ -137,6 +134,10 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior { queryMutationEventsFromStorage { [weak self] in guard let self = self else { return } + guard case .starting = self.stateMachine.state else { + self.log.debug("Unexpected state transition while performing `doStart()` during `.starting` state. Current state: \(self.stateMachine.state).") + return + } self.operationQueue.isSuspended = false // State machine notification to ".receivedSubscription" will be handled in `receive(subscription:)` diff --git a/packages/amplify_datastore/ios/internal/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/SyncMutationToCloudOperation.swift b/packages/amplify_datastore/ios/internal/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/SyncMutationToCloudOperation.swift index a428a3b9911..947c3ad04f0 100644 --- a/packages/amplify_datastore/ios/internal/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/SyncMutationToCloudOperation.swift +++ b/packages/amplify_datastore/ios/internal/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/SyncMutationToCloudOperation.swift @@ -56,7 +56,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation { override func main() { log.verbose(#function) - sendMutationToCloud(withAuthType: authTypesIterator?.next()) + sendMutationToCloud(withAuthType: authTypesIterator?.next()?.awsAuthType) } override func cancel() { @@ -251,7 +251,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation { resolveReachabilityPublisher(request: request) if let pluginOptions = request.options?.pluginOptions as? AWSAPIPluginDataStoreOptions, pluginOptions.authType != nil, let nextAuthType = authTypesIterator?.next() { - scheduleRetry(advice: advice, withAuthType: nextAuthType) + scheduleRetry(advice: advice, withAuthType: nextAuthType.awsAuthType) } else { scheduleRetry(advice: advice) } diff --git a/packages/amplify_datastore/ios/internal/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift b/packages/amplify_datastore/ios/internal/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift index 499ea6ab431..8ce0e56fb3b 100644 --- a/packages/amplify_datastore/ios/internal/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift +++ b/packages/amplify_datastore/ios/internal/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift @@ -122,24 +122,26 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable { ) return RetryableGraphQLSubscriptionOperation( - requestStream: authTypeProvider.publisher() - .map { Optional.some($0) } // map to optional to have nil as element - .replaceEmpty(with: nil) // use a nil element to trigger default auth if no auth provided - .map { authType in { [weak self] in - guard let self else { - throw APIError.operationError("GraphQL subscription cancelled", "") - } - - return api.subscribe(request: await IncomingAsyncSubscriptionEventPublisher.makeAPIRequest( - for: modelSchema, - subscriptionType: subscriptionType.subscriptionType, - api: api, - auth: auth, - authType: authType, - awsAuthService: self.awsAuthService - )) - }} - .eraseToAnyPublisher() + requestStream: AsyncStream { continuation in + for authType in authTypeProvider { + continuation.yield({ [weak self] in + guard let self else { + throw APIError.operationError("GraphQL subscription cancelled", "") + } + + return api.subscribe(request: await IncomingAsyncSubscriptionEventPublisher.makeAPIRequest( + for: modelSchema, + subscriptionType: subscriptionType.subscriptionType, + api: api, + auth: auth, + authType: authType.awsAuthType, + awsAuthService: self.awsAuthService + )) + }) + } + continuation.finish() + } + ) } diff --git a/packages/amplify_datastore/ios/internal/AWSPluginsCore/Auth/AWSAuthModeStrategy.swift b/packages/amplify_datastore/ios/internal/AWSPluginsCore/Auth/AWSAuthModeStrategy.swift index 6f6b6e5aea8..87490f2c0d6 100644 --- a/packages/amplify_datastore/ios/internal/AWSPluginsCore/Auth/AWSAuthModeStrategy.swift +++ b/packages/amplify_datastore/ios/internal/AWSPluginsCore/Auth/AWSAuthModeStrategy.swift @@ -64,19 +64,23 @@ public protocol AuthorizationTypeIterator { } /// AuthorizationTypeIterator for values of type `AWSAuthorizationType` -public struct AWSAuthorizationTypeIterator: AuthorizationTypeIterator { - public typealias AuthorizationType = AWSAuthorizationType +public struct AWSAuthorizationTypeIterator: AuthorizationTypeIterator, Sequence, IteratorProtocol { + public typealias AuthorizationType = AmplifyAuthorizationType - private var values: IndexingIterator<[AWSAuthorizationType]> + private var values: IndexingIterator<[AmplifyAuthorizationType]> private var _count: Int private var _position: Int - public init(withValues values: [AWSAuthorizationType]) { + public init(withValues values: [AmplifyAuthorizationType]) { self.values = values.makeIterator() self._count = values.count self._position = 0 } + public init(withValues values: [AmplifyAuthorizationType], valuesOnEmpty defaults: [AmplifyAuthorizationType]) { + self.init(withValues: values.isEmpty ? defaults : values) + } + public var count: Int { _count } @@ -85,7 +89,7 @@ public struct AWSAuthorizationTypeIterator: AuthorizationTypeIterator { _position < _count } - public mutating func next() -> AWSAuthorizationType? { + public mutating func next() -> AmplifyAuthorizationType? { if let value = values.next() { _position += 1 return value @@ -95,19 +99,6 @@ public struct AWSAuthorizationTypeIterator: AuthorizationTypeIterator { } } -extension AuthorizationTypeIterator { - public func publisher() -> AnyPublisher { - var it = self - return Deferred { - var authTypes = [AuthorizationType]() - while let authType = it.next() { - authTypes.append(authType) - } - return Publishers.MergeMany(authTypes.map { Just($0) }) - }.eraseToAnyPublisher() - } -} - // MARK: - AWSDefaultAuthModeStrategy /// AWS default auth mode strategy. @@ -120,12 +111,12 @@ public class AWSDefaultAuthModeStrategy: AuthModeStrategy { public func authTypesFor(schema: ModelSchema, operation: ModelOperation) -> AWSAuthorizationTypeIterator { - return AWSAuthorizationTypeIterator(withValues: []) + return AWSAuthorizationTypeIterator(withValues: [.inferred]) } public func authTypesFor(schema: ModelSchema, operations: [ModelOperation]) -> AWSAuthorizationTypeIterator { - return AWSAuthorizationTypeIterator(withValues: []) + return AWSAuthorizationTypeIterator(withValues: [.inferred]) } } @@ -140,20 +131,18 @@ public class AWSMultiAuthModeStrategy: AuthModeStrategy { required public init() {} private static func defaultAuthTypeFor(authStrategy: AuthStrategy) -> AWSAuthorizationType { - var defaultAuthType: AWSAuthorizationType switch authStrategy { case .owner: - defaultAuthType = .amazonCognitoUserPools + return .amazonCognitoUserPools case .groups: - defaultAuthType = .amazonCognitoUserPools + return .amazonCognitoUserPools case .private: - defaultAuthType = .amazonCognitoUserPools + return .amazonCognitoUserPools case .public: - defaultAuthType = .apiKey + return .apiKey case .custom: - defaultAuthType = .function + return .function } - return defaultAuthType } /// Given an auth rule, returns the corresponding AWSAuthorizationType @@ -247,10 +236,12 @@ public class AWSMultiAuthModeStrategy: AuthModeStrategy { return rule.allow == .public || rule.allow == .custom } } - let applicableAuthTypes = sortedRules.map { + + let applicableAuthTypes: [AmplifyAuthorizationType] = sortedRules.map { AWSMultiAuthModeStrategy.authTypeFor(authRule: $0) - } - return AWSAuthorizationTypeIterator(withValues: applicableAuthTypes) + }.map { .designated($0) } + + return AWSAuthorizationTypeIterator(withValues: applicableAuthTypes, valuesOnEmpty: [.inferred]) } } diff --git a/packages/amplify_datastore/ios/internal/AWSPluginsCore/Auth/AmplifyAuthorizationType.swift b/packages/amplify_datastore/ios/internal/AWSPluginsCore/Auth/AmplifyAuthorizationType.swift new file mode 100644 index 00000000000..18a90e31063 --- /dev/null +++ b/packages/amplify_datastore/ios/internal/AWSPluginsCore/Auth/AmplifyAuthorizationType.swift @@ -0,0 +1,29 @@ +// +// Copyright Amazon.com Inc. or its affiliates. +// All Rights Reserved. +// +// SPDX-License-Identifier: Apache-2.0 +// + + +import Foundation + +/// - Warning: Although this has `public` access, it is intended for internal use and should not be used directly +/// by host applications. The behavior of this may change without warning. +public enum AmplifyAuthorizationType { + + /// Determine the authorization method based on the amplifyconfiguration. + case inferred + + /// Specify the authentication method. + case designated(AWSAuthorizationType) + + public var awsAuthType: AWSAuthorizationType? { + switch self { + case .inferred: return nil + case .designated(let authType): return authType + } + } +} + +extension AmplifyAuthorizationType: Equatable { } diff --git a/packages/amplify_datastore/ios/internal/Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift b/packages/amplify_datastore/ios/internal/Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift index f40229d9f9e..d746ba49057 100644 --- a/packages/amplify_datastore/ios/internal/Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift +++ b/packages/amplify_datastore/ios/internal/Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift @@ -16,10 +16,10 @@ public final class RetryableGraphQLOperation { private let nondeterminsticOperation: NondeterminsticOperation.Success> public init( - requestStream: AnyPublisher<() async throws -> GraphQLTask.Success, Never> + requestStream: AsyncStream<() async throws -> GraphQLTask.Success> ) { self.nondeterminsticOperation = NondeterminsticOperation( - operationStream: requestStream, + operations: requestStream, shouldTryNextOnError: Self.onError(_:) ) } @@ -80,9 +80,9 @@ public final class RetryableGraphQLSubscriptionOperation { private let nondeterminsticOperation: NondeterminsticOperation> public init( - requestStream: AnyPublisher<() async throws -> AmplifyAsyncThrowingSequence, Never> + requestStream: AsyncStream<() async throws -> AmplifyAsyncThrowingSequence> ) { - self.nondeterminsticOperation = NondeterminsticOperation(operationStream: requestStream) + self.nondeterminsticOperation = NondeterminsticOperation(operations: requestStream) } deinit {