Skip to content

Commit

Permalink
Extract Event/AsyncContext handling in a separate method
Browse files Browse the repository at this point in the history
  • Loading branch information
kean committed Apr 28, 2024
1 parent 13f665a commit 2a61bd1
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 10 deletions.
27 changes: 18 additions & 9 deletions Sources/Nuke/ImageTask.swift
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send
}

let isDataTask: Bool
var task: Task<ImageResponse, Swift.Error>?
var continuation: UnsafeContinuation<ImageResponse, Error>?
var onEvent: ((Event, ImageTask) -> Void)?
weak var pipeline: ImagePipeline?

Expand Down Expand Up @@ -104,9 +106,6 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send
}
}

var task: Task<ImageResponse, Swift.Error>?
var continuation: UnsafeContinuation<ImageResponse, Error>?

/// The events sent by the pipeline during the task execution.
public var events: AsyncStream<Event> {
os_unfair_lock_lock(lock)
Expand Down Expand Up @@ -207,11 +206,25 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send
// MARK: Events

func process(_ event: Event) {
let context = sync { _context }
context.events?.1.yield(event)
switch event {
case .progress(let progress):
currentProgress = progress
case .finished:
// TODO: do we need to check state?
_ = setState(.completed)
default:
break
}

process(event, in: sync { _context })
onEvent?(event, self)
pipeline?.imageTask(self, didProcessEvent: event)
}

private func process(_ event: Event, in context: AsyncContext) {
context.events?.1.yield(event)
switch event {
case .progress(let progress):
context.progress?.1.yield(progress)
case .preview(let response):
context.stream?.1.yield(response)
Expand All @@ -221,16 +234,12 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send
context.stream?.1.finish(throwing: CancellationError())
continuation?.resume(throwing: CancellationError())
case .finished(let result):
_ = setState(.completed)
let result = result.mapError { $0 as Error }
context.events?.1.finish()
context.progress?.1.finish()
context.stream?.1.yield(with: result)
continuation?.resume(with: result)
}

onEvent?(event, self)
pipeline?.imageTask(self, didProcessEvent: event)
}

/// An event produced during the runetime of the task.
Expand Down
7 changes: 6 additions & 1 deletion Sources/Nuke/Pipeline/ImagePipeline.swift
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public final class ImagePipeline: @unchecked Sendable {
queue.async {
guard !self.isInvalidated else { return }
self.isInvalidated = true
self.tasks.keys.forEach { self.cancel($0) }
self.tasks.keys.forEach(self.cancel)
}
}

Expand Down Expand Up @@ -288,6 +288,7 @@ public final class ImagePipeline: @unchecked Sendable {
private func makeStartedImageTask(with request: ImageRequest, isDataTask: Bool = false) -> ImageTask {
let task = ImageTask(taskId: nextTaskId, request: request, isDataTask: isDataTask)
task.pipeline = self
// TODO: we've added a dealy to startImageTask and that can be a problem
task.task = Task {
try await withUnsafeThrowingContinuation { continuation in
queue.async {
Expand All @@ -305,6 +306,10 @@ public final class ImagePipeline: @unchecked Sendable {
return task.process(.finished(.failure(.pipelineInvalidated)))
}
// TODO: Check this and other .cancelled callbacks
// The problem is that if cancelled is called before that
// `startImageTask` is called, then the tasks[task] is empty
// in `ImageTask` callback and cancelation logic fails to run
// This can happen if the task is cancelled from `imageTaskCreated`
guard task.state != .cancelled else {
return task.process(.cancelled)
}
Expand Down

0 comments on commit 2a61bd1

Please sign in to comment.