diff --git a/src/sqlite/SQLiteResultPublisher.swift b/src/sqlite/SQLiteResultPublisher.swift index 83db4a68c..0d645d3c8 100644 --- a/src/sqlite/SQLiteResultPublisher.swift +++ b/src/sqlite/SQLiteResultPublisher.swift @@ -8,7 +8,7 @@ enum SQLiteSubscriptionType { final class SQLiteSubscription: Workspace.Subscription { private let ofType: SQLiteSubscriptionType - var cancelled = ManagedAtomic(false) + var cancelled = UnsafeAtomic.Storage(false) let identifier: ObjectIdentifier weak var workspace: SQLiteWorkspace? init(ofType: SQLiteSubscriptionType, identifier: ObjectIdentifier, workspace: SQLiteWorkspace) { @@ -17,11 +17,15 @@ final class SQLiteSubscription: Workspace.Subscription { self.workspace = workspace } deinit { - cancelled.store(true, ordering: .releasing) + withUnsafeMutablePointer(to: &cancelled) { + UnsafeAtomic(at: $0).store(true, ordering: .releasing) + } workspace?.cancel(ofType: ofType, identifier: identifier) } public func cancel() { - cancelled.store(true, ordering: .releasing) + withUnsafeMutablePointer(to: &cancelled) { + UnsafeAtomic(at: $0).store(true, ordering: .releasing) + } workspace?.cancel(ofType: ofType, identifier: identifier) } } @@ -40,7 +44,7 @@ final class SQLiteResultPublisher: ResultPublisher { let rowid = object._rowid objectSubscribers[rowid, default: [ObjectIdentifier: (_: UpdatedObject) -> Void]()][subscription.identifier] = { [weak self, weak subscription] updatedObject in guard let subscription = subscription else { return } - guard !subscription.cancelled.load(ordering: .acquiring) else { return } + guard !(withUnsafeMutablePointer(to: &subscription.cancelled) { UnsafeAtomic(at: $0).load(ordering: .acquiring) }) else { return } guard let self = self else { return } switch updatedObject { case .deleted(let rowid): @@ -161,7 +165,7 @@ final class SQLiteResultPublisher: ResultPublisher { } resultPublisher.subscribers[subscription.identifier] = { [weak subscription] fetchedResult in guard let subscription = subscription else { return } - guard !subscription.cancelled.load(ordering: .acquiring) else { return } + guard !(withUnsafeMutablePointer(to: &subscription.cancelled) { UnsafeAtomic(at: $0).load(ordering: .acquiring) }) else { return } changeHandler(fetchedResult) } } diff --git a/src/sqlite/SQLiteWorkspace.swift b/src/sqlite/SQLiteWorkspace.swift index 36bec559f..e3557cddf 100644 --- a/src/sqlite/SQLiteWorkspace.swift +++ b/src/sqlite/SQLiteWorkspace.swift @@ -1,3 +1,4 @@ +import Atomics import Dflat import SQLite3 import Dispatch @@ -100,11 +101,11 @@ public final class SQLiteWorkspace: Workspace { // MARK - Management public func shutdown(completion: (() -> Void)?) { - guard !state.shutdown.load(ordering: .acquiring) else { + guard !(withUnsafeMutablePointer(to: &state.shutdown) { UnsafeAtomic(at: $0).load(ordering: .acquiring) }) else { completion?() return } - state.shutdown.store(true, ordering: .releasing) + withUnsafeMutablePointer(to: &state.shutdown) { UnsafeAtomic(at: $0).store(true, ordering: .releasing) } var tableSpaces: [SQLiteTableSpace]? = nil state.serial { tableSpaces = Array(self.tableSpaces.values) @@ -132,7 +133,7 @@ public final class SQLiteWorkspace: Workspace { // MARK - Mutation public func performChanges(_ transactionalObjectTypes: [Any.Type], changesHandler: @escaping Workspace.ChangesHandler, completionHandler: Workspace.CompletionHandler? = nil) { - guard !state.shutdown.load(ordering: .acquiring) else { + guard !(withUnsafeMutablePointer(to: &state.shutdown) { UnsafeAtomic(at: $0).load(ordering: .acquiring) }) else { completionHandler?(false) return } @@ -202,7 +203,7 @@ public final class SQLiteWorkspace: Workspace { } public func fetch(for ofType: Element.Type) -> QueryBuilder { - guard !state.shutdown.load(ordering: .acquiring) else { + guard !(withUnsafeMutablePointer(to: &state.shutdown) { UnsafeAtomic(at: $0).load(ordering: .acquiring) }) else { return SQLiteQueryBuilder(reader: SQLiteConnectionPool.Borrowed(pointee: nil, pool: nil), workspace: self, transactionContext: nil, changesTimestamp: 0) } if let txnContext = SQLiteTransactionContext.current { @@ -218,7 +219,7 @@ public final class SQLiteWorkspace: Workspace { if let snapshot = Self.snapshot { return SQLiteQueryBuilder(reader: snapshot.reader, workspace: self, transactionContext: nil, changesTimestamp: snapshot.changesTimestamp) } - let changesTimestamp = state.changesTimestamp.load(ordering: .acquiring) + let changesTimestamp: Int64 = withUnsafeMutablePointer(to: &state.changesTimestamp) { UnsafeAtomic(at: $0).load(ordering: .acquiring) } return SQLiteQueryBuilder(reader: readerPool.borrow(), workspace: self, transactionContext: nil, changesTimestamp: changesTimestamp) } @@ -229,7 +230,7 @@ public final class SQLiteWorkspace: Workspace { } // Require a consistent snapshot by starting a transaction. let reader = readerPool.borrow() - let changesTimestamp = state.changesTimestamp.load(ordering: .acquiring) + let changesTimestamp: Int64 = withUnsafeMutablePointer(to: &state.changesTimestamp) { UnsafeAtomic(at: $0).load(ordering: .acquiring) } Self.snapshot = Snapshot(reader: reader, changesTimestamp: changesTimestamp) guard let pointee = reader.pointee else { let retval = closure() @@ -251,7 +252,7 @@ public final class SQLiteWorkspace: Workspace { let fetchedResult = fetchedResult as! SQLiteFetchedResult let identifier = ObjectIdentifier(fetchedResult.query) let subscription = SQLiteSubscription(ofType: .fetchedResult(Element.self, identifier), identifier: ObjectIdentifier(changeHandler as AnyObject), workspace: self) - guard !state.shutdown.load(ordering: .acquiring) else { + guard !(withUnsafeMutablePointer(to: &state.shutdown) { UnsafeAtomic(at: $0).load(ordering: .acquiring) }) else { return subscription } let objectType = ObjectIdentifier(Element.self) @@ -297,7 +298,7 @@ public final class SQLiteWorkspace: Workspace { public func subscribe(object: Element, changeHandler: @escaping (_: SubscribedObject) -> Void) -> Workspace.Subscription where Element: Equatable { let subscription = SQLiteSubscription(ofType: .object(Element.self, object._rowid), identifier: ObjectIdentifier(changeHandler as AnyObject), workspace: self) - guard !state.shutdown.load(ordering: .acquiring) else { + guard !(withUnsafeMutablePointer(to: &state.shutdown) { UnsafeAtomic(at: $0).load(ordering: .acquiring) }) else { return subscription } let objectType = ObjectIdentifier(Element.self) @@ -314,7 +315,9 @@ public final class SQLiteWorkspace: Workspace { // Since the object is out of date, now we need to check whether we need to call changeHandler immediately. let fetchedObject = SQLiteObjectRepository.object(connection, ofType: Element.self, for: .rowid(object._rowid)) guard let updatedObject = fetchedObject else { - subscription.cancelled.store(true, ordering: .releasing) + withUnsafeMutablePointer(to: &subscription.cancelled) { + UnsafeAtomic(at: $0).store(true, ordering: .releasing) + } changeHandler(.deleted) return } @@ -338,7 +341,7 @@ public final class SQLiteWorkspace: Workspace { } func cancel(ofType: SQLiteSubscriptionType, identifier: ObjectIdentifier) { - guard !state.shutdown.load(ordering: .acquiring) else { return } + guard !(withUnsafeMutablePointer(to: &state.shutdown) { UnsafeAtomic(at: $0).load(ordering: .acquiring) }) else { return } switch ofType { case let .fetchedResult(atomType, fetchedResult): let objectType = ObjectIdentifier(atomType) @@ -457,7 +460,8 @@ public final class SQLiteWorkspace: Workspace { } private func invokeChangesHandler(_ transactionalObjectTypes: [ObjectIdentifier], connection: SQLiteConnection, resultPublishers: [ObjectIdentifier: ResultPublisher], tableState: SQLiteTableState, changesHandler: Workspace.ChangesHandler) -> Bool { - let txnContext = SQLiteTransactionContext(state: tableState, objectTypes: transactionalObjectTypes, changesTimestamp: state.changesTimestamp.load(ordering: .acquiring), connection: connection) + let oldChangesTimestamp: Int64 = withUnsafeMutablePointer(to: &state.changesTimestamp) { UnsafeAtomic(at: $0).load(ordering: .acquiring) } + let txnContext = SQLiteTransactionContext(state: tableState, objectTypes: transactionalObjectTypes, changesTimestamp: oldChangesTimestamp, connection: connection) changesHandler(txnContext) let updatedObjects = txnContext.objectRepository.updatedObjects txnContext.destroy() @@ -483,7 +487,7 @@ public final class SQLiteWorkspace: Workspace { precondition(status == SQLITE_DONE) } var reader: SQLiteConnectionPool.Borrowed? = nil - let newChangesTimestamp = state.changesTimestamp.loadThenWrappingIncrement(by: 1, ordering: .releasing) + 1 // Return the previously hold timestamp, thus, the new timestamp need + 1 + let newChangesTimestamp = (withUnsafeMutablePointer(to: &state.changesTimestamp) { UnsafeAtomic(at: $0).loadThenWrappingIncrement(by: 1, ordering: .releasing) }) + 1 // Return the previously hold timestamp, thus, the new timestamp need + 1 state.setTableTimestamp(newChangesTimestamp, for: updatedObjects.keys) for (identifier, updates) in updatedObjects { guard let resultPublisher = resultPublishers[identifier] else { continue } @@ -530,7 +534,7 @@ extension SQLiteWorkspace { } func beginRebuildIndex(_ ofType: Element.Type, fields: S) where S.Element == String { - guard !state.shutdown.load(ordering: .acquiring) else { return } + guard !(withUnsafeMutablePointer(to: &state.shutdown) { UnsafeAtomic(at: $0).load(ordering: .acquiring) }) else { return } let objectType = ObjectIdentifier(Element.self) let tableSpace = self.tableSpace(for: objectType) // We don't need to bump the priority for this. diff --git a/src/sqlite/SQLiteWorkspaceState.swift b/src/sqlite/SQLiteWorkspaceState.swift index 54866d89f..dd6362fc0 100644 --- a/src/sqlite/SQLiteWorkspaceState.swift +++ b/src/sqlite/SQLiteWorkspaceState.swift @@ -5,8 +5,8 @@ import Dispatch final class SQLiteWorkspaceState { private var lock = os_unfair_lock() private var tableTimestamps = [ObjectIdentifier: Int64]() - var changesTimestamp = ManagedAtomic(0) - var shutdown = ManagedAtomic(false) + var changesTimestamp = UnsafeAtomic.Storage(0) + var shutdown = UnsafeAtomic.Storage(false) func serial(_ closure: () -> T) -> T { os_unfair_lock_lock(&lock)