-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathTaskExtensions.cs
497 lines (435 loc) · 18.1 KB
/
TaskExtensions.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
using System;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Orleans.Runtime;
namespace Orleans.Indexing
{
internal static class OrleansTaskExtentions
{
internal static readonly Task<object> CanceledTask = TaskFromCanceled<object>();
internal static readonly Task<object> CompletedTask = Task.FromResult(default(object));
/// <summary>
/// Returns a <see cref="Task{Object}"/> for the provided <see cref="Task"/>.
/// </summary>
/// <param name="task">The task.</param>
public static Task<object> ToUntypedTask(this Task task)
{
switch (task.Status)
{
case TaskStatus.RanToCompletion:
return CompletedTask;
case TaskStatus.Faulted:
return TaskFromFaulted(task);
case TaskStatus.Canceled:
return CanceledTask;
default:
return ConvertAsync(task);
}
async Task<object> ConvertAsync(Task asyncTask)
{
await asyncTask;
return null;
}
}
/// <summary>
/// Returns a <see cref="Task{Object}"/> for the provided <see cref="Task{T}"/>.
/// </summary>
/// <typeparam name="T">The underlying type of <paramref name="task"/>.</typeparam>
/// <param name="task">The task.</param>
public static Task<object> ToUntypedTask<T>(this Task<T> task)
{
if (typeof(T) == typeof(object))
return task as Task<object>;
switch (task.Status)
{
case TaskStatus.RanToCompletion:
return Task.FromResult((object)GetResult(task));
case TaskStatus.Faulted:
return TaskFromFaulted(task);
case TaskStatus.Canceled:
return CanceledTask;
default:
return ConvertAsync(task);
}
async Task<object> ConvertAsync(Task<T> asyncTask)
{
return await asyncTask.ConfigureAwait(false);
}
}
/// <summary>
/// Returns a <see cref="Task{Object}"/> for the provided <see cref="Task{T}"/>.
/// </summary>
/// <typeparam name="T">The underlying type of <paramref name="task"/>.</typeparam>
/// <param name="task">The task.</param>
internal static Task<T> ToTypedTask<T>(this Task<object> task)
{
if (typeof(T) == typeof(object))
return task as Task<T>;
switch (task.Status)
{
case TaskStatus.RanToCompletion:
return Task.FromResult((T)GetResult(task));
case TaskStatus.Faulted:
return TaskFromFaulted<T>(task);
case TaskStatus.Canceled:
return TaskFromCanceled<T>();
default:
return ConvertAsync(task);
}
async Task<T> ConvertAsync(Task<object> asyncTask)
{
var result = await asyncTask.ConfigureAwait(false);
if (result is null)
{
if (!NullabilityHelper<T>.IsNullableType)
{
ThrowInvalidTaskResultType(typeof(T));
}
return default;
}
return (T)result;
}
}
private static class NullabilityHelper<T>
{
/// <summary>
/// True if <typeparamref name="T" /> is an instance of a nullable type (a reference type or <see cref="Nullable{T}"/>), otherwise false.
/// </summary>
public static readonly bool IsNullableType = !typeof(T).IsValueType || typeof(T).IsConstructedGenericType && typeof(T).GetGenericTypeDefinition() == typeof(Nullable<>);
}
[MethodImpl(MethodImplOptions.NoInlining)]
private static void ThrowInvalidTaskResultType(Type type)
{
var message = $"Expected result of type {type} but encountered a null value. This may be caused by a grain call filter swallowing an exception.";
throw new InvalidOperationException(message);
}
/// <summary>
/// Returns a <see cref="Task{Object}"/> for the provided <see cref="Task{Object}"/>.
/// </summary>
/// <param name="task">The task.</param>
public static Task<object> ToUntypedTask(this Task<object> task)
{
return task;
}
private static Task<object> TaskFromFaulted(Task task)
{
var completion = new TaskCompletionSource<object>();
completion.SetException(task.Exception.InnerExceptions);
return completion.Task;
}
private static Task<T> TaskFromFaulted<T>(Task task)
{
var completion = new TaskCompletionSource<T>();
completion.SetException(task.Exception.InnerExceptions);
return completion.Task;
}
private static Task<T> TaskFromCanceled<T>()
{
var completion = new TaskCompletionSource<T>();
completion.SetCanceled();
return completion.Task;
}
// Executes an async function such as Exception is never thrown but rather always returned as a broken task.
public static async Task SafeExecute(Func<Task> action)
{
await action();
}
public static async Task ExecuteAndIgnoreException(Func<Task> action)
{
try
{
await action();
}
catch (Exception)
{
// dont re-throw, just eat it.
}
}
internal static String ToString(this Task t)
{
return t == null ? "null" : string.Format("[Id={0}, Status={1}]", t.Id, Enum.GetName(typeof(TaskStatus), t.Status));
}
internal static String ToString<T>(this Task<T> t)
{
return t == null ? "null" : string.Format("[Id={0}, Status={1}]", t.Id, Enum.GetName(typeof(TaskStatus), t.Status));
}
internal static void WaitWithThrow(this Task task, TimeSpan timeout)
{
if (!task.Wait(timeout))
{
throw new TimeoutException($"Task.WaitWithThrow has timed out after {timeout}.");
}
}
internal static T WaitForResultWithThrow<T>(this Task<T> task, TimeSpan timeout)
{
if (!task.Wait(timeout))
{
throw new TimeoutException($"Task<T>.WaitForResultWithThrow has timed out after {timeout}.");
}
return task.Result;
}
/// <summary>
/// This will apply a timeout delay to the task, allowing us to exit early
/// </summary>
/// <param name="taskToComplete">The task we will timeout after timeSpan</param>
/// <param name="timeout">Amount of time to wait before timing out</param>
/// <param name="exceptionMessage">Text to put into the timeout exception message</param>
/// <exception cref="TimeoutException">If we time out we will get this exception</exception>
/// <returns>The completed task</returns>
public static async Task WithTimeout(this Task taskToComplete, TimeSpan timeout, string exceptionMessage = null)
{
if (taskToComplete.IsCompleted)
{
await taskToComplete;
return;
}
var timeoutCancellationTokenSource = new CancellationTokenSource();
var completedTask = await Task.WhenAny(taskToComplete, Task.Delay(timeout, timeoutCancellationTokenSource.Token));
// We got done before the timeout, or were able to complete before this code ran, return the result
if (taskToComplete == completedTask)
{
timeoutCancellationTokenSource.Cancel();
// Await this so as to propagate the exception correctly
await taskToComplete;
return;
}
// We did not complete before the timeout, we fire and forget to ensure we observe any exceptions that may occur
taskToComplete.Ignore();
var errorMessage = exceptionMessage ?? $"WithTimeout has timed out after {timeout}";
throw new TimeoutException(errorMessage);
}
/// <summary>
/// This will apply a timeout delay to the task, allowing us to exit early
/// </summary>
/// <param name="taskToComplete">The task we will timeout after timeSpan</param>
/// <param name="timeSpan">Amount of time to wait before timing out</param>
/// <param name="exceptionMessage">Text to put into the timeout exception message</param>
/// <exception cref="TimeoutException">If we time out we will get this exception</exception>
/// <exception cref="TimeoutException">If we time out we will get this exception</exception>
/// <returns>The value of the completed task</returns>
public static async Task<T> WithTimeout<T>(this Task<T> taskToComplete, TimeSpan timeSpan, string exceptionMessage = null)
{
if (taskToComplete.IsCompleted)
{
return await taskToComplete;
}
var timeoutCancellationTokenSource = new CancellationTokenSource();
var completedTask = await Task.WhenAny(taskToComplete, Task.Delay(timeSpan, timeoutCancellationTokenSource.Token));
// We got done before the timeout, or were able to complete before this code ran, return the result
if (taskToComplete == completedTask)
{
timeoutCancellationTokenSource.Cancel();
// Await this so as to propagate the exception correctly
return await taskToComplete;
}
// We did not complete before the timeout, we fire and forget to ensure we observe any exceptions that may occur
taskToComplete.Ignore();
var errorMessage = exceptionMessage ?? $"WithTimeout has timed out after {timeSpan}";
throw new TimeoutException(errorMessage);
}
/// <summary>
/// For making an uncancellable task cancellable, by ignoring its result.
/// </summary>
/// <param name="taskToComplete">The task to wait for unless cancelled</param>
/// <param name="cancellationToken">A cancellation token for cancelling the wait</param>
/// <param name="message">Message to set in the exception</param>
/// <returns></returns>
internal static async Task WithCancellation(
this Task taskToComplete,
CancellationToken cancellationToken,
string message)
{
try
{
await taskToComplete.WithCancellation(cancellationToken);
}
catch (TaskCanceledException ex)
{
throw new TaskCanceledException(message, ex);
}
}
/// <summary>
/// For making an uncancellable task cancellable, by ignoring its result.
/// </summary>
/// <param name="taskToComplete">The task to wait for unless cancelled</param>
/// <param name="cancellationToken">A cancellation token for cancelling the wait</param>
/// <returns></returns>
internal static Task WithCancellation(this Task taskToComplete, CancellationToken cancellationToken)
{
if (taskToComplete.IsCompleted || !cancellationToken.CanBeCanceled)
{
return taskToComplete;
}
else if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled<object>(cancellationToken);
}
else
{
return MakeCancellable(taskToComplete, cancellationToken);
}
}
private static async Task MakeCancellable(Task task, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<object>();
using (cancellationToken.Register(() =>
tcs.TrySetCanceled(cancellationToken), useSynchronizationContext: false))
{
var firstToComplete = await Task.WhenAny(task, tcs.Task).ConfigureAwait(false);
if (firstToComplete != task)
{
task.Ignore();
}
await firstToComplete.ConfigureAwait(false);
}
}
/// <summary>
/// For making an uncancellable task cancellable, by ignoring its result.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="taskToComplete">The task to wait for unless cancelled</param>
/// <param name="cancellationToken">A cancellation token for cancelling the wait</param>
/// <param name="message">Message to set in the exception</param>
/// <returns></returns>
internal static async Task<T> WithCancellation<T>(
this Task<T> taskToComplete,
CancellationToken cancellationToken,
string message)
{
try
{
return await taskToComplete.WithCancellation(cancellationToken);
}
catch (TaskCanceledException ex)
{
throw new TaskCanceledException(message, ex);
}
}
/// <summary>
/// For making an uncancellable task cancellable, by ignoring its result.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="taskToComplete">The task to wait for unless cancelled</param>
/// <param name="cancellationToken">A cancellation token for cancelling the wait</param>
/// <returns></returns>
internal static Task<T> WithCancellation<T>(this Task<T> taskToComplete, CancellationToken cancellationToken)
{
if (taskToComplete.IsCompleted || !cancellationToken.CanBeCanceled)
{
return taskToComplete;
}
else if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled<T>(cancellationToken);
}
else
{
return MakeCancellable(taskToComplete, cancellationToken);
}
}
private static async Task<T> MakeCancellable<T>(Task<T> task, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<T>();
using (cancellationToken.Register(() =>
tcs.TrySetCanceled(cancellationToken), useSynchronizationContext: false))
{
var firstToComplete = await Task.WhenAny(task, tcs.Task).ConfigureAwait(false);
if (firstToComplete != task)
{
task.Ignore();
}
return await firstToComplete.ConfigureAwait(false);
}
}
internal static Task WrapInTask(Action action)
{
try
{
action();
return Task.CompletedTask;
}
catch (Exception exc)
{
return Task.FromException<object>(exc);
}
}
internal static Task<T> ConvertTaskViaTcs<T>(Task<T> task)
{
if (task == null) return Task.FromResult(default(T));
var resolver = new TaskCompletionSource<T>();
if (task.Status == TaskStatus.RanToCompletion)
{
resolver.TrySetResult(task.Result);
}
else if (task.IsFaulted)
{
resolver.TrySetException(task.Exception.InnerExceptions);
}
else if (task.IsCanceled)
{
resolver.TrySetException(new TaskCanceledException(task));
}
else
{
if (task.Status == TaskStatus.Created) task.Start();
task.ContinueWith(t =>
{
if (t.IsFaulted)
{
resolver.TrySetException(t.Exception.InnerExceptions);
}
else if (t.IsCanceled)
{
resolver.TrySetException(new TaskCanceledException(t));
}
else
{
resolver.TrySetResult(t.GetResult());
}
});
}
return resolver.Task;
}
//The rationale for GetAwaiter().GetResult() instead of .Result
//is presented at https://github.com/aspnet/Security/issues/59.
internal static T GetResult<T>(this Task<T> task)
{
return task.GetAwaiter().GetResult();
}
internal static void GetResult(this Task task)
{
task.GetAwaiter().GetResult();
}
internal static Task WhenCancelled(this CancellationToken token)
{
if (token.IsCancellationRequested)
{
return Task.CompletedTask;
}
var waitForCancellation = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
token.Register(obj =>
{
var tcs = (TaskCompletionSource<object>)obj;
tcs.TrySetResult(null);
}, waitForCancellation);
return waitForCancellation.Task;
}
}
}
namespace Orleans
{
/// <summary>
/// A special void 'Done' Task that is already in the RunToCompletion state.
/// Equivalent to Task.FromResult(1).
/// </summary>
public static class TaskDone
{
/// <summary>
/// A special 'Done' Task that is already in the RunToCompletion state
/// </summary>
[Obsolete("Use Task.CompletedTask")]
public static Task Done => Task.CompletedTask;
}
}