Skip to content

Commit

Permalink
Use UnsafeAtomic.
Browse files Browse the repository at this point in the history
  • Loading branch information
liuliu committed Oct 2, 2020
1 parent 3fe1dbc commit bbde9a4
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 20 deletions.
14 changes: 9 additions & 5 deletions src/sqlite/SQLiteResultPublisher.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ enum SQLiteSubscriptionType {

final class SQLiteSubscription: Workspace.Subscription {
private let ofType: SQLiteSubscriptionType
var cancelled = ManagedAtomic<Bool>(false)
var cancelled = UnsafeAtomic<Bool>.Storage(false)
let identifier: ObjectIdentifier
weak var workspace: SQLiteWorkspace?
init(ofType: SQLiteSubscriptionType, identifier: ObjectIdentifier, workspace: SQLiteWorkspace) {
Expand All @@ -17,11 +17,15 @@ final class SQLiteSubscription: Workspace.Subscription {
self.workspace = workspace
}
deinit {
cancelled.store(true, ordering: .releasing)
withUnsafeMutablePointer(to: &cancelled) {
UnsafeAtomic(at: $0).store(true, ordering: .releasing)
}
workspace?.cancel(ofType: ofType, identifier: identifier)
}
public func cancel() {
cancelled.store(true, ordering: .releasing)
withUnsafeMutablePointer(to: &cancelled) {
UnsafeAtomic(at: $0).store(true, ordering: .releasing)
}
workspace?.cancel(ofType: ofType, identifier: identifier)
}
}
Expand All @@ -40,7 +44,7 @@ final class SQLiteResultPublisher<Element: Atom>: ResultPublisher {
let rowid = object._rowid
objectSubscribers[rowid, default: [ObjectIdentifier: (_: UpdatedObject) -> Void]()][subscription.identifier] = { [weak self, weak subscription] updatedObject in
guard let subscription = subscription else { return }
guard !subscription.cancelled.load(ordering: .acquiring) else { return }
guard !(withUnsafeMutablePointer(to: &subscription.cancelled) { UnsafeAtomic(at: $0).load(ordering: .acquiring) }) else { return }
guard let self = self else { return }
switch updatedObject {
case .deleted(let rowid):
Expand Down Expand Up @@ -161,7 +165,7 @@ final class SQLiteResultPublisher<Element: Atom>: ResultPublisher {
}
resultPublisher.subscribers[subscription.identifier] = { [weak subscription] fetchedResult in
guard let subscription = subscription else { return }
guard !subscription.cancelled.load(ordering: .acquiring) else { return }
guard !(withUnsafeMutablePointer(to: &subscription.cancelled) { UnsafeAtomic(at: $0).load(ordering: .acquiring) }) else { return }
changeHandler(fetchedResult)
}
}
Expand Down
30 changes: 17 additions & 13 deletions src/sqlite/SQLiteWorkspace.swift
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import Atomics
import Dflat
import SQLite3
import Dispatch
Expand Down Expand Up @@ -100,11 +101,11 @@ public final class SQLiteWorkspace: Workspace {
// MARK - Management

public func shutdown(completion: (() -> Void)?) {
guard !state.shutdown.load(ordering: .acquiring) else {
guard !(withUnsafeMutablePointer(to: &state.shutdown) { UnsafeAtomic(at: $0).load(ordering: .acquiring) }) else {
completion?()
return
}
state.shutdown.store(true, ordering: .releasing)
withUnsafeMutablePointer(to: &state.shutdown) { UnsafeAtomic(at: $0).store(true, ordering: .releasing) }
var tableSpaces: [SQLiteTableSpace]? = nil
state.serial {
tableSpaces = Array(self.tableSpaces.values)
Expand Down Expand Up @@ -132,7 +133,7 @@ public final class SQLiteWorkspace: Workspace {
// MARK - Mutation

public func performChanges(_ transactionalObjectTypes: [Any.Type], changesHandler: @escaping Workspace.ChangesHandler, completionHandler: Workspace.CompletionHandler? = nil) {
guard !state.shutdown.load(ordering: .acquiring) else {
guard !(withUnsafeMutablePointer(to: &state.shutdown) { UnsafeAtomic(at: $0).load(ordering: .acquiring) }) else {
completionHandler?(false)
return
}
Expand Down Expand Up @@ -202,7 +203,7 @@ public final class SQLiteWorkspace: Workspace {
}

public func fetch<Element: Atom>(for ofType: Element.Type) -> QueryBuilder<Element> {
guard !state.shutdown.load(ordering: .acquiring) else {
guard !(withUnsafeMutablePointer(to: &state.shutdown) { UnsafeAtomic(at: $0).load(ordering: .acquiring) }) else {
return SQLiteQueryBuilder<Element>(reader: SQLiteConnectionPool.Borrowed(pointee: nil, pool: nil), workspace: self, transactionContext: nil, changesTimestamp: 0)
}
if let txnContext = SQLiteTransactionContext.current {
Expand All @@ -218,7 +219,7 @@ public final class SQLiteWorkspace: Workspace {
if let snapshot = Self.snapshot {
return SQLiteQueryBuilder<Element>(reader: snapshot.reader, workspace: self, transactionContext: nil, changesTimestamp: snapshot.changesTimestamp)
}
let changesTimestamp = state.changesTimestamp.load(ordering: .acquiring)
let changesTimestamp: Int64 = withUnsafeMutablePointer(to: &state.changesTimestamp) { UnsafeAtomic(at: $0).load(ordering: .acquiring) }
return SQLiteQueryBuilder<Element>(reader: readerPool.borrow(), workspace: self, transactionContext: nil, changesTimestamp: changesTimestamp)
}

Expand All @@ -229,7 +230,7 @@ public final class SQLiteWorkspace: Workspace {
}
// Require a consistent snapshot by starting a transaction.
let reader = readerPool.borrow()
let changesTimestamp = state.changesTimestamp.load(ordering: .acquiring)
let changesTimestamp: Int64 = withUnsafeMutablePointer(to: &state.changesTimestamp) { UnsafeAtomic(at: $0).load(ordering: .acquiring) }
Self.snapshot = Snapshot(reader: reader, changesTimestamp: changesTimestamp)
guard let pointee = reader.pointee else {
let retval = closure()
Expand All @@ -251,7 +252,7 @@ public final class SQLiteWorkspace: Workspace {
let fetchedResult = fetchedResult as! SQLiteFetchedResult<Element>
let identifier = ObjectIdentifier(fetchedResult.query)
let subscription = SQLiteSubscription(ofType: .fetchedResult(Element.self, identifier), identifier: ObjectIdentifier(changeHandler as AnyObject), workspace: self)
guard !state.shutdown.load(ordering: .acquiring) else {
guard !(withUnsafeMutablePointer(to: &state.shutdown) { UnsafeAtomic(at: $0).load(ordering: .acquiring) }) else {
return subscription
}
let objectType = ObjectIdentifier(Element.self)
Expand Down Expand Up @@ -297,7 +298,7 @@ public final class SQLiteWorkspace: Workspace {

public func subscribe<Element: Atom>(object: Element, changeHandler: @escaping (_: SubscribedObject<Element>) -> Void) -> Workspace.Subscription where Element: Equatable {
let subscription = SQLiteSubscription(ofType: .object(Element.self, object._rowid), identifier: ObjectIdentifier(changeHandler as AnyObject), workspace: self)
guard !state.shutdown.load(ordering: .acquiring) else {
guard !(withUnsafeMutablePointer(to: &state.shutdown) { UnsafeAtomic(at: $0).load(ordering: .acquiring) }) else {
return subscription
}
let objectType = ObjectIdentifier(Element.self)
Expand All @@ -314,7 +315,9 @@ public final class SQLiteWorkspace: Workspace {
// Since the object is out of date, now we need to check whether we need to call changeHandler immediately.
let fetchedObject = SQLiteObjectRepository.object(connection, ofType: Element.self, for: .rowid(object._rowid))
guard let updatedObject = fetchedObject else {
subscription.cancelled.store(true, ordering: .releasing)
withUnsafeMutablePointer(to: &subscription.cancelled) {
UnsafeAtomic(at: $0).store(true, ordering: .releasing)
}
changeHandler(.deleted)
return
}
Expand All @@ -338,7 +341,7 @@ public final class SQLiteWorkspace: Workspace {
}

func cancel(ofType: SQLiteSubscriptionType, identifier: ObjectIdentifier) {
guard !state.shutdown.load(ordering: .acquiring) else { return }
guard !(withUnsafeMutablePointer(to: &state.shutdown) { UnsafeAtomic(at: $0).load(ordering: .acquiring) }) else { return }
switch ofType {
case let .fetchedResult(atomType, fetchedResult):
let objectType = ObjectIdentifier(atomType)
Expand Down Expand Up @@ -457,7 +460,8 @@ public final class SQLiteWorkspace: Workspace {
}

private func invokeChangesHandler(_ transactionalObjectTypes: [ObjectIdentifier], connection: SQLiteConnection, resultPublishers: [ObjectIdentifier: ResultPublisher], tableState: SQLiteTableState, changesHandler: Workspace.ChangesHandler) -> Bool {
let txnContext = SQLiteTransactionContext(state: tableState, objectTypes: transactionalObjectTypes, changesTimestamp: state.changesTimestamp.load(ordering: .acquiring), connection: connection)
let oldChangesTimestamp: Int64 = withUnsafeMutablePointer(to: &state.changesTimestamp) { UnsafeAtomic(at: $0).load(ordering: .acquiring) }
let txnContext = SQLiteTransactionContext(state: tableState, objectTypes: transactionalObjectTypes, changesTimestamp: oldChangesTimestamp, connection: connection)
changesHandler(txnContext)
let updatedObjects = txnContext.objectRepository.updatedObjects
txnContext.destroy()
Expand All @@ -483,7 +487,7 @@ public final class SQLiteWorkspace: Workspace {
precondition(status == SQLITE_DONE)
}
var reader: SQLiteConnectionPool.Borrowed? = nil
let newChangesTimestamp = state.changesTimestamp.loadThenWrappingIncrement(by: 1, ordering: .releasing) + 1 // Return the previously hold timestamp, thus, the new timestamp need + 1
let newChangesTimestamp = (withUnsafeMutablePointer(to: &state.changesTimestamp) { UnsafeAtomic(at: $0).loadThenWrappingIncrement(by: 1, ordering: .releasing) }) + 1 // Return the previously hold timestamp, thus, the new timestamp need + 1
state.setTableTimestamp(newChangesTimestamp, for: updatedObjects.keys)
for (identifier, updates) in updatedObjects {
guard let resultPublisher = resultPublishers[identifier] else { continue }
Expand Down Expand Up @@ -530,7 +534,7 @@ extension SQLiteWorkspace {
}

func beginRebuildIndex<Element: Atom, S: Sequence>(_ ofType: Element.Type, fields: S) where S.Element == String {
guard !state.shutdown.load(ordering: .acquiring) else { return }
guard !(withUnsafeMutablePointer(to: &state.shutdown) { UnsafeAtomic(at: $0).load(ordering: .acquiring) }) else { return }
let objectType = ObjectIdentifier(Element.self)
let tableSpace = self.tableSpace(for: objectType)
// We don't need to bump the priority for this.
Expand Down
4 changes: 2 additions & 2 deletions src/sqlite/SQLiteWorkspaceState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import Dispatch
final class SQLiteWorkspaceState {
private var lock = os_unfair_lock()
private var tableTimestamps = [ObjectIdentifier: Int64]()
var changesTimestamp = ManagedAtomic<Int64>(0)
var shutdown = ManagedAtomic<Bool>(false)
var changesTimestamp = UnsafeAtomic<Int64>.Storage(0)
var shutdown = UnsafeAtomic<Bool>.Storage(false)

func serial<T>(_ closure: () -> T) -> T {
os_unfair_lock_lock(&lock)
Expand Down

0 comments on commit bbde9a4

Please sign in to comment.