Skip to content

Commit

Permalink
Sync ImageTask on ImagePipelineActor
Browse files Browse the repository at this point in the history
  • Loading branch information
kean committed Oct 27, 2024
1 parent bf79725 commit bca4607
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 41 deletions.
53 changes: 21 additions & 32 deletions Sources/Nuke/ImageTask.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,30 @@ import UIKit
import AppKit
#endif

// TODO: try to make another internal instance that is isolated
// - possible make `ImageTask` a struct? then no questions about retaining it

/// A task performed by the ``ImagePipeline``.
///
/// The pipeline maintains a strong reference to the task until the request
/// finishes or fails; you do not need to maintain a reference to the task unless
/// it is useful for your app.
public final class ImageTask: Hashable, @unchecked Sendable {
@ImagePipelineActor
public final class ImageTask: Hashable {
/// An identifier that uniquely identifies the task within a given pipeline.
/// Unique only within that pipeline.
public let taskId: Int64
public nonisolated let taskId: Int64

/// The original request that the task was created with.
public let request: ImageRequest
public nonisolated let request: ImageRequest

/// The priority of the task. The priority can be updated dynamically even
/// for a task that is already running.
public var priority: ImageRequest.Priority {
public nonisolated var priority: ImageRequest.Priority {
get { nonisolatedState.withLock { $0.priority } }
set { setPriority(newValue) }
}

/// Returns the current download progress. Returns zeros before the download
/// is started and the expected size of the resource is known.
public var currentProgress: Progress {
public nonisolated var currentProgress: Progress {
nonisolatedState.withLock { $0.progress }
}

Expand All @@ -62,7 +60,6 @@ public final class ImageTask: Hashable, @unchecked Sendable {
}

/// The current state of the task.
@ImagePipelineActor
public private(set) var state: State = .running

/// The state of the image task.
Expand All @@ -76,7 +73,7 @@ public final class ImageTask: Hashable, @unchecked Sendable {
}

/// Returns `true` if the task cancellation is initiated.
public var isCancelling: Bool {
public nonisolated var isCancelling: Bool {
nonisolatedState.withLock { $0.isCancelling }
}

Expand All @@ -101,7 +98,7 @@ public final class ImageTask: Hashable, @unchecked Sendable {
}

/// The stream of progress updates.
public var progress: AsyncStream<Progress> {
public nonisolated var progress: AsyncStream<Progress> {
makeStream {
if case .progress(let value) = $0 { return value }
return nil
Expand All @@ -112,7 +109,7 @@ public final class ImageTask: Hashable, @unchecked Sendable {
/// progressive decoding.
///
/// - seealso: ``ImagePipeline/Configuration-swift.struct/isProgressiveDecodingEnabled``
public var previews: AsyncStream<ImageResponse> {
public nonisolated var previews: AsyncStream<ImageResponse> {
makeStream {
if case .preview(let value) = $0 { return value }
return nil
Expand All @@ -122,7 +119,7 @@ public final class ImageTask: Hashable, @unchecked Sendable {
// MARK: - Events

/// The events sent by the pipeline during the task execution.
public var events: AsyncStream<Event> { makeStream { $0 } }
public nonisolated var events: AsyncStream<Event> { makeStream { $0 } }

/// An event produced during the runetime of the task.
public enum Event: Sendable {
Expand All @@ -141,29 +138,25 @@ public final class ImageTask: Hashable, @unchecked Sendable {

private let nonisolatedState: Mutex<ImageTaskState>
private let isDataTask: Bool
private let onEvent: ((Event, ImageTask) -> Void)?
private var task: Task<ImageResponse, Error>!
private let onEvent: (@Sendable (Event, ImageTask) -> Void)?
private nonisolated(unsafe) var task: Task<ImageResponse, Error>!
private weak var pipeline: ImagePipeline?

@ImagePipelineActor
var continuation: UnsafeContinuation<ImageResponse, Error>?

@ImagePipelineActor
var _events: PassthroughSubject<ImageTask.Event, Never>?
private var continuation: UnsafeContinuation<ImageResponse, Error>?
private var _events: PassthroughSubject<ImageTask.Event, Never>?

init(taskId: Int64, request: ImageRequest, isDataTask: Bool, pipeline: ImagePipeline, onEvent: ((Event, ImageTask) -> Void)?) {
nonisolated init(taskId: Int64, request: ImageRequest, isDataTask: Bool, pipeline: ImagePipeline, onEvent: (@Sendable (Event, ImageTask) -> Void)?) {
self.taskId = taskId
self.request = request
self.nonisolatedState = Mutex(ImageTaskState(priority: request.priority))
self.isDataTask = isDataTask
self.pipeline = pipeline
self.onEvent = onEvent
self.task = Task {
self.task = Task { @ImagePipelineActor in
try await perform()
}
}

@ImagePipelineActor
private func perform() async throws -> ImageResponse {
try await withUnsafeThrowingContinuation {
continuation = $0
Expand All @@ -181,7 +174,7 @@ public final class ImageTask: Hashable, @unchecked Sendable {
///
/// The pipeline will immediately cancel any work associated with a task
/// unless there is an equivalent outstanding task running.
public func cancel() {
public nonisolated func cancel() {
guard nonisolatedState.withLock({
guard !$0.isCancelling else { return false }
$0.isCancelling = true
Expand All @@ -192,7 +185,7 @@ public final class ImageTask: Hashable, @unchecked Sendable {
}
}

private func setPriority(_ newValue: ImageRequest.Priority) {
private nonisolated func setPriority(_ newValue: ImageRequest.Priority) {
guard nonisolatedState.withLock({
guard $0.priority != newValue else { return false }
$0.priority = newValue
Expand All @@ -207,15 +200,13 @@ public final class ImageTask: Hashable, @unchecked Sendable {

/// Gets called when the task is cancelled either by the user or by an
/// external event such as session invalidation.
@ImagePipelineActor
func _cancel() {
guard state == .running else { return }
state = .cancelled
dispatch(.cancelled)
}

/// Gets called when the associated task sends a new event.
@ImagePipelineActor
func process(_ event: AsyncTask<ImageResponse, ImagePipeline.Error>.Event) {
guard state == .running else { return }
switch event {
Expand All @@ -239,7 +230,6 @@ public final class ImageTask: Hashable, @unchecked Sendable {
///
/// - warning: The task needs to be fully wired (`_continuation` present)
/// before it can start sending the events.
@ImagePipelineActor
private func dispatch(_ event: Event) {
guard continuation != nil else {
return // Task isn't fully wired yet
Expand All @@ -263,19 +253,19 @@ public final class ImageTask: Hashable, @unchecked Sendable {

// MARK: Hashable

public func hash(into hasher: inout Hasher) {
public nonisolated func hash(into hasher: inout Hasher) {
hasher.combine(ObjectIdentifier(self).hashValue)
}

public static func == (lhs: ImageTask, rhs: ImageTask) -> Bool {
public nonisolated static func == (lhs: ImageTask, rhs: ImageTask) -> Bool {
ObjectIdentifier(lhs) == ObjectIdentifier(rhs)
}
}

// MARK: - ImageTask (Private)

extension ImageTask {
private func makeStream<T>(of closure: @Sendable @escaping (Event) -> T?) -> AsyncStream<T> {
private nonisolated func makeStream<T>(of closure: @Sendable @escaping (Event) -> T?) -> AsyncStream<T> {
AsyncStream { continuation in
Task { @ImagePipelineActor in
guard state == .running else {
Expand All @@ -301,7 +291,6 @@ extension ImageTask {
}
}

@ImagePipelineActor
private func makeEvents() -> PassthroughSubject<ImageTask.Event, Never> {
if _events == nil {
_events = PassthroughSubject()
Expand Down
4 changes: 2 additions & 2 deletions Sources/Nuke/Pipeline/ImagePipeline.swift
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public final class ImagePipeline {
Task { @ImagePipelineActor in
guard !self.isInvalidated else { return }
self.isInvalidated = true
self.tasks.keys.forEach(self.cancelImageTask)
self.tasks.keys.forEach { cancelImageTask($0) }
}
}

Expand Down Expand Up @@ -141,7 +141,7 @@ public final class ImagePipeline {

// MARK: - ImageTask (Internal)

nonisolated func makeImageTask(with request: ImageRequest, isDataTask: Bool = false, onEvent: ((ImageTask.Event, ImageTask) -> Void)? = nil) -> ImageTask {
nonisolated func makeImageTask(with request: ImageRequest, isDataTask: Bool = false, onEvent: (@Sendable (ImageTask.Event, ImageTask) -> Void)? = nil) -> ImageTask {
let task = ImageTask(taskId: nextTaskId.incremented(), request: request, isDataTask: isDataTask, pipeline: self, onEvent: onEvent)
delegate.imageTaskCreated(task, pipeline: self)
return task
Expand Down
6 changes: 3 additions & 3 deletions Tests/MockDataLoader.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class MockDataLoader: MockDataLoading, DataLoading, @unchecked Sendable {
set { queue.isSuspended = newValue }
}

func loadData(with request: URLRequest, didReceiveData: @escaping (Data, URLResponse) -> Void, completion: @escaping (Error?) -> Void) -> MockDataTaskProtocol {
func loadData(with request: URLRequest, didReceiveData: @Sendable @escaping (Data, URLResponse) -> Void, completion: @Sendable @escaping (Error?) -> Void) -> MockDataTaskProtocol {
let task = MockDataTask()

NotificationCenter.default.post(name: MockDataLoader.DidStartTask, object: self)
Expand Down Expand Up @@ -64,7 +64,7 @@ class MockDataLoader: MockDataLoading, DataLoading, @unchecked Sendable {

// Remove these and update to implement the actual protocol.
protocol MockDataLoading: DataLoading {
func loadData(with request: URLRequest, didReceiveData: @escaping (Data, URLResponse) -> Void, completion: @escaping (Error?) -> Void) -> MockDataTaskProtocol
func loadData(with request: URLRequest, didReceiveData: @Sendable @escaping (Data, URLResponse) -> Void, completion: @Sendable @escaping (Error?) -> Void) -> MockDataTaskProtocol
}

extension MockDataLoading where Self: DataLoading {
Expand All @@ -85,7 +85,7 @@ extension MockDataLoading where Self: DataLoading {
}
}

protocol MockDataTaskProtocol {
protocol MockDataTaskProtocol: Sendable {
func cancel()
}

2 changes: 1 addition & 1 deletion Tests/MockProgressiveDataLoader.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ final class MockProgressiveDataLoader: MockDataLoading, DataLoading, @unchecked
self.chunks = Array(_createChunks(for: data, size: data.count / 3))
}

func loadData(with request: URLRequest, didReceiveData: @escaping (Data, URLResponse) -> Void, completion: @escaping (Error?) -> Void) -> MockDataTaskProtocol {
func loadData(with request: URLRequest, didReceiveData: @Sendable @escaping (Data, URLResponse) -> Void, completion: @Sendable @escaping (Error?) -> Void) -> MockDataTaskProtocol {
self.didReceiveData = didReceiveData
self.completion = completion
self.resume()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private class _MockResumableDataLoader: MockDataLoading, DataLoading, @unchecked
}
}

func loadData(with request: URLRequest, didReceiveData: @escaping (Data, URLResponse) -> Void, completion: @escaping (Error?) -> Void) -> MockDataTaskProtocol {
func loadData(with request: URLRequest, didReceiveData: @Sendable @escaping (Data, URLResponse) -> Void, completion: @Sendable @escaping (Error?) -> Void) -> MockDataTaskProtocol {
let headers = request.allHTTPHeaderFields

let completion = completion
Expand Down
4 changes: 2 additions & 2 deletions Tests/XCTestCase+Nuke.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ struct TestExpectationImagePipeline {

@discardableResult
func toLoadImage(with request: ImageRequest,
progress: ((_ intermediateResponse: ImageResponse?, _ completedUnitCount: Int64, _ totalUnitCount: Int64) -> Void)? = nil,
completion: ((Result<ImageResponse, ImagePipeline.Error>) -> Void)? = nil) -> TestRecordedImageRequest {
progress: (@Sendable (_ intermediateResponse: ImageResponse?, _ completedUnitCount: Int64, _ totalUnitCount: Int64) -> Void)? = nil,
completion: (@Sendable (Result<ImageResponse, ImagePipeline.Error>) -> Void)? = nil) -> TestRecordedImageRequest {
let record = TestRecordedImageRequest()
let expectation = test.expectation(description: "Image loaded for \(request)")
record._task = pipeline.loadImage(with: request, progress: progress) { result in
Expand Down

0 comments on commit bca4607

Please sign in to comment.