-
Notifications
You must be signed in to change notification settings - Fork 17
/
Async and parallel programming in .net 4= John hamel, plural-sight; Note = Erxin.txt
469 lines (449 loc) · 20.2 KB
/
Async and parallel programming in .net 4= John hamel, plural-sight; Note = Erxin.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
Async and parallel programming in .net 4= John hamel, plural-sight; Note = Erxin
## Async and parallel programming
- base on tasks and task parallel library(TPL)
* avaliable in .net 4
* coming soon with silverlight 5
- already have in previous .net version
* thread
* async programming model, (e.g async delegate invocation)
* event-based async pattern, (e.g backgroundWorker class)
* queueUserWorkItem
- new model evolutionary
* cancelling
* easy exception handling
* hight-level constructs
* more...
- async model is base on task
- create task
using system.threading.tasks;
task t = new task(code);
t.start([specify_the_thread_context]);
* ex. keep the thread under the main GUI thread
task t = new task(()=>{some_time_consuming_job});
t.start();
* ex. continue work thread
task t = new task(code);
task t2 = t.continueWith((antecedent)=>{}, TaskScheduler.FromCurrentSynchronizeContent());
t.Start();
- lambda expression
compiler turn the lambda expresion to class + delegate
some_function(()=>{})
()=>{} turn to
some_function(delegate);
class sealed class_name
{
public void function_name()
{}
}
- closure == code + supporting data environment
all the parameter and the referenced outside variable are all pass by reference, in this case if in the multiple thread app it means shared variable
closure variable are stored with compiler-generated class
- type of tasks
* the term code tasks, are thread, require thread to execute
* facade over existing operation, such tasks can be threadless, e.g. HW(hard ware) is performin operation
var existingOp = new TaskCompleteSource<ResultType>();
Task T_facade = existingOp.Task;
existingOp.SetResult(); //when task is complete
- reference
http://msdn.microsoft.com/concurrency
http://tinyurl.com/pp-on-msdn
parallel programming with microsoft.net:design pattern for decomposition and coordination on multicore architecture, by Campbell, R.Johson
http://tinyurl.com/tpl-book
Concurrent programming on windows
win32 multithreaded programming
just any textbook on operating system
## creating, waiting, and harvesting result
- technical in .net 4, async / parallel components of .net4
task parallel library(TPL), concurrent data structure, parallel linq
task scheduler
resource manager
- review task
* Task t = Task.Factory.StartNew(()=>{}); //status, result, exception
* facade task
var op = new TaskCompletionSource<T>();
- facade task
Task t = Task.Factory.StartNew(some_method);
t.wait(); //wat the specify task end
TaskCompletionSource tc = new TaskCompletionSource(some_iAsyncResult)
tc.Task; //get the source's task
Task.waitAny(task_list); //wait any specify task complete and go on
- parallel computing need to care the task sequence of the argument. race condition
- result harvesting, use this type of the code will change the task act as function
Task<result_type> t = Task.Factory.StartNew(()=>{return value;});
t.result; //get the result, when use the result will implicit to wait the task complete
- Wait on multiple tasks
* Task.WaitAll(task_list)
* Task.WaitAny(task_list) will return the fist task's index
* WaitAllOneByOne pattern
List<Task<TReturn> task_list = new List<Task<TReturn>>();
for_each tasks.Add(Task.Factory.StartNew<TResult>(code));
while(tasks.Count>0)
{
int index = Task.WaitAny(tasks);
...
tasks.RemoveAt(index);
}
- Task.ContinueWith((antecedent)=>{});
Task.ContinueWhenAny(task_list), Task.ContinueWhenAll()
## exception handling
- if a task throw a exception there are several scenarios
* task is terminated
* E is caught, saved as part of an AggregateException(AE), and store in the task's exception property
* AE is re-thrown upon .Wait, .Result, or, WaitAll
* ex.
try
{
var r = taskObj.Result;
}
catch(AggregateException ae)
{
ae.innerException; //is the original task exception
}
the ae may be a exception tree. to travel throw all the exceptions will need to use travel three or use .net support method
ae.Flatten(); //will flatten the exception tree and then make it support iterate by for each
foreach(var e in ae.InnerException)
...
- observe the exception before the garbage collector, this need to touch each task's result before GC
* call .wait or try to get .result
* call task.WaitAll to make any unthrow exception to rethrow
* check the task.exception property after task complete
* subscribe to TaskScheduler.UnobervedTaskException, use to be the backup of the above three exception, it's also could be use to handle the exception in a third part lib which use TPL.
TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(hander_function);
static void hander_function(object s, arg e)
{
...// treat the exception
e.SetObserved(); // to info the .net core the exception is observed to prevent it throw by platform
}
* Task.WaitAny doesn't throw any exception
- task cancellation
* CancellationToken, it is create by the cancellation source object, it is used to place into the task run function to make the task monitor the token
* CancellationSource, use to send the task cancel event, it will monitor by the task
* if cancel the task, it will throw task cancellationException when try to get the task result. The cancellationSource creator should handle this exception
* ex.
cts =new CancellationSource()
* CancellationSource is sticky, when the task change the state it need to create a new one
- Task priorities?
* the TPL don't add notion of priority by default.
* we could custom it by task scheduler
- Task may form parent-children relationship
* child task attach to parent task
Task parent = Task.Factory.StartNew(()=>{
Task child0 = Task.Factory.StartNew(()=>{},
TaskCreateOptions.AttachedToParent);
Task child1 = Task.Factory.StartNew(()=>{},
TaskCreateOptions.AttachedToParent);
});
* parent task doesn't complete till all child tasks complete, event one or more child get exception the parent task will wait till all child tasks complete, all the child's exception could be handle by the parent task(surround with try catch)
- Pass parameter to task
for(int i = 0; i < 10; i++)
{
Task t = Task.Factory.StartNew((arg)=>{
int taskId = (int)arg;
},
i
);
}
- Use atomic output to output the result to avoid lock, lock global object
lock(console.out)
{}
- Lock, avoid use lock as possible as you can
## Understand the dangers of Concurrency
- beware of pitfalls of concurrency
race condition, starvation, livelock, deadlock, optimizing compiler, optimizing hardware
- artical, tools and techniques to identify concurrency issues, R. Patil and B. George, MSDN magazine 2008
http://www.msdn.microsoft.com/en-us/magazine/cc546569.aspx
- TPL could handle the most of the concurrency issue without race condition.
- code should provide two guarantees
safety, liveness
- important terminology
* race condition, out come depending on the timing of event
* critical section, smallest region of code involved in race condition, as small as possible
- the danger of shared resources
* beware of concurrent write
- solve race condition
* replace shared variable by local variable
* use thread safe entity to access the resource for you
* TPL offered, thread safe data structure
concurrentBag, concurrentDictionary, BlockingCollection, ConcurrentStack, ConcurrentQueue
* synchronization to control
locks, interlocks
- locks, use to lock the common objects which could be reference by all the thread
* create a common object
* lock the object for each thread/task as small as possible
- interlocking, hardware based interlocking for arithmetic critical sections
* ex
System.Threading.Interlocked.Add(ref sum, t)
* compare to the software locking is a little more effective.
- system internal tool ramMap, use to move the cached files from system memory for performance test
- lock-free, use local var to change the share resource
* lock free design is not means no lock, it is means remove all the arbitrary waiting locks
* ex.
Task t = Task.Factory.StartNew<return_type>(()=>{
...
return value;
});
t.Result; //To harvest the result
* the lock-free pattern could make the caching of data in memory more effective than lock or interlock
- plinq, parallel linq computing sum
* ex.
using system.linq;
(from var in some_collection select t.result).AsParallel().Sum();
* reference
http://msdn.microsoft.com/en-us/library/dd460688.aspx
* When it finds such shapes, PLINQ by default falls back to sequential mode.
+ Queries that contain a Select, indexed Where, indexed SelectMany, or ElementAt clause after an ordering or filtering operator that has removed or rearranged original indices.
+ Queries that contain a Take, TakeWhile, Skip, SkipWhile operator and where indices in the source sequence are not in the original order.
+ Queries that contain Zip or SequenceEquals, unless one of the data sources has an originally ordered index and the other data source is indexable
+ Queries that contain Concat, unless it is applied to indexable data sources.
+ Queries that contain Reverse, unless applied to an indexable data source.
- Another danger of race condition, manage shared object throw non thread safe class
solution
* locking
* replace the collection to the thread safe version, such as ConcurrentQueue
* if using random sequence, beware of each thread may get the same random number due to parallel code event the random generator is create in each of task
+ solution, use .net server RNDCryptoServiceProvider(); to get random number for the random class for seed
* during parallel programming make sure each class using in the task is thread safe or eliminate the race condition for the non safe class
- Synchronization primitives in brief
* Monitor, general purpose .net synchronization class
* Lock, enforce one at a time semantics using monitor class
* Mutex, win32 lock suitable for inter-process sync('mutual exclusion')
* Interlocked, hardware based lock for simple, arithmetic operation
* Semaphore, enforces N-at-a-time semantics via win32
* SpinLock / SpinWait, lock-like mechanisms that loop('spin') instead of yield CPU
* Barrier, allows tasks to synchronize('syn-up') before start of next phase
* CountdownEvent, ManualResetEvent, AutoResetEvent, additional ways for tasks to synchronize with one another
- About performance
* I/O is hard to parallel
* instrument your app
* beware of Amdahl's law, performance is limited by sequential component
* try different ideas
* event the thread-safe class could be use in multiple task condition, it is good to make this kind of class as local variables. This could improve the performance when they are used many time during the parallel methods
- Task granularity, minimum granularity of task, at least 200-300 CPU circle
- Task, Threads, Cores
* typically .net TPL create two thread per CPU core
* structure
+ Task scheduler, TPL library
+ global work queue
+ resource manager, window operation system
- Task scheduler, add task priorities and use threads from your own thread pool
TaskScheduler, TaskFactory
- Default task scheduler
* instead of global task queue, TPL use local queue
* when task wait the correspond thread is block and wait. In this time TPL will inject additional thread to the thread pool to keep the task complete at reasonable rate
* dynamic balance the workload
* work stealing
for load balance, the program thread will create global task queue and all worker thread will get task from the queue and save it into each local queue, if one of the worker thread's local queue is empty, it will still the remain task from the other worker thread's queue, FIFO, but the local queue will get FILO by the local worker
* task assumptions
+ tasks are short live, at most 1-2 seconds
+ execution order of task is random
- Task execution order
* want to execute the tasks in order of creation by create task will fairness option
Task.Factory.StartNew(
()=>{},
TaskCreationOptions.Fairness
);
note:
+ this will make the task start as the create order but no guarantee all of the task will finish as the order
+ this will also need higher synchronization overhead than the normal tasks
- long-running tasks
* create with long running option
Task.Factory.StartNew(
()=>{},
TaskCreationOption.LongRunning
);
* .net will create a non worker thread for this kinds of task. This will take more CPU times
* TPL consumption
+ long running CPU intensive tasks
+ don't over subscribe system
+ start t task one for per core
+ ex. get system processor count
System.Environment.ProcessorCount
WaitTaskOneByOnePattern to make the long running jobs done
+ high level solution for this kinds of task use parallel.For with option constrain
var options = new ParallelOptions();
options.MaxDegreeOfParallelism = System.Environment.ProcessCount
Parallel.For(0, N, options, (i)=>{});
- Common types of parallelism
* data, task, dataflow, embarrassingly parallel
* dataflow parallelism, pipline, not always works well
* embarrassing parallelism reference to delightfully parallel, all the data is execute independent
* parallel for format
Parallel.For(start, end, (i, ...)=>{});
Parallel.Foreach(collection, (element, ...)=>{});
Parallel.Invorke(
()=>task0;
()=>task1;
...
);
- Custom data partitioning
* System.Collection.Concurrent, partitioning simpler loop bodies for efficient parallelization
Parallel.Foreach(Partitioner.Create(0, N), (range)=>{
for(int i = range.item1; i < range.item2; i ++)
{}
});
The parallel for will automatic splite the 0 to N into the number of cores of current system
- Exception handling for Parallel for foreach will throw AggregateException, directly use try ... catch surround with Parallel.For
- Break out of loop
ParallelLoopResult lr = Parallel.For(0, N, (i, loopControl)=>{... loopControl.Stop(); ...});
lr.IsComplete; // to check the result
Stop(); method will stop the loop immediately
Break(); method will stop till the current task sequence finished
- Cancel loop, Task finish current iteration then exit, it is cancelled by another thread
this object is in the System.Threading lib
void DoLoop(CancellationTokenSource cts)
{
var option = new ParallelOptions();
options.CancellationToken = cts.Token;
try
{
Parallel.For(0, N, options, (i))=>{DoWork(i);});
}
catch(OperationCanceledException oce){}
catch(AggregateException ae){}
}
void StartButton_Click()
{
var cts = new CancellationTokenSource();
Task.Factory.StartNew(()=>{DoLoop(cts);});
}
void CancelButton_Click()
{
cts.Cancel();
}
- references
http://www.microsoft.com/concurrency
http://tinyurl.com/pp-on-msdn
## design pattern for parallel computing
- 100 tasks wait for running
* solution, create task for each core and finish and start the task one by one
- 20 io operation
* solution, create one facade task per download using fromAsync + APM pattern(web object's begin/end methods, start download and return thread to pool, potentially allowing all 20+ downloads to start)
* create 1 task per core, explicitly or via MaxDegreeOfParallelism
* using producer-consumer pattern, dedicate 1-2 task to download & remainder to processing
* logging task runs for the duration of your app, logging say every 30 seconds
+ create with long-running option
+ before app close need to join with task and catch any exception
+ design clean and shutdown task
+ if task can crash, design a way to monitor and restart the task by a timer or app message loop
- Pattern
pipline, dataflow, concurrent data structures, producer-consumer, map-reduce, parallel linq, speculative execution, asynchronous programming model
+ pipline
create first task and follow with create task continuewith
+ dataflow, many to one and one to many
create tasks t1, t2, t3
t4.continueWhenAll(t1,t2,t3)
use the thread-safe data structure with pipline and dataflow will help increase the parallelism
+ concurrent data structure
* ConcurrentBag<T>, ConccurentQueue, ConcurrentStack, ConcurrentDictionary, are unbounded in size
* BlockingCollection, are bound in size
+ concurrent queque, enqueue is thread safe and normal dequeue is not should you tryDequeue insteal check count and dequeue
foreach and toArray are thread safe but it's a snapshot of the queue of the exact moment
- demo
+ concurrent dictionary, replace foreach with Parallel.Foreach and use with ConcurrentDictionary
replace the check and update by concurrentDictionaryObj.AddorUpdate(check_value, add_value, function);
reference the namespace System.Collection.Concurrent;
+ performance is dependent on the problem scenario on parallel programming
+ concurrent dictionary should be use in the scenario less contention
- producer-consumer, is better fit for long-running workloads where data generation speed is very different from data consumption speed
+ base on blockingCollection<>
blocking producer if collection is full
blocking consumer if collection is empty
fix size collection is more realistic, use the size to throttle the consumer numbers
+ ex.
int maxCapacity = ...;
var workQ = new BlockingCollection<T>(maxCapacity);
var tf = new TaskFactory(TaskCreationOptions.longRunning, TaskContinuationOptions.None);
Task producer = tf.StartNew(()=>{
for(...)
{
workQ.Add(work);
workQ.CompleteAdding();
}
});
Task consumer = tf.StartNew(()=>{
while(workQ.IsCompleted)
{
try
{
T work = workQ.Take();
...
}
//no more work
catch(ObjectDisposedException){}
catch(InvalidOperationException){}
}
});
+ save the thread data locally and merge all the result together and create consumer base on the CPU core number and use wait one by one pattern
- MapReduce pattern, is wild use embarrassingly parallel search and data mining application, this is the technology drive their search engine
+ map the data to intermedia results and reduce the results to final
+ implements
* fire off n tasks and then use wait for one by one pattern
for(int i = 0; i < N; i ++)
{
tasks.Add(Task.Factory.StartNew<T>((data)=>{return map(data);})
}
while(tasks.count>0)
{
int index = Task.WaitAny(tasks.ToArray());
reduce(tasks[index].Result);
tasks.removeAt(index);
}
* use parallel.For with task.localStorage
Parallel.Foreach(datasource,
()=>{return new TLS();}, //tls initializer
(data, _, tls)=>{ //tls, task local storage //task body
map(datum, tls);
return tls;
},
(tls)=>{reduce(tls))}); //finalizer
* real code
Dicationary<,> result = new Dictionary<,>();
Parallel.ForEach(File.Readlines(infile),
()=>{return new Dictionary<,>();},
(line, loopControl, localID)=>{
//use localID
},
(localID)=>{local(result){merge(result, localID);}});
* if it the result could accept approximate result, then could ellipsis some sick data to increase parallel processing
- Parallel linq
+ plinq, support for parallelism and mapreduce
+ change linq to plinq is just need asParallel()
+ ex.
var query = (from x in values.AsParallel()
where filters(x)
select compute(x)).sum();
+ linq enable querying IEnumerable data sources
- Speculative Execution
+ when you have multiple sources for generating a result... and get the first return and cancel the rest
+ example
var cts = new CancellationTokenSource();
var token = cts.Token;
for(int i = 0; i < N; i++)
{
tasks.Add(Task.Factory.StartNew<T>(()=>{...},
token);
}
int winner = Task.WaitAny(tasks.ToArray());
var result = tasks[winner].Result;
cts.Cancel();
- Asynchronous Programming Model
+ begin, end, already implement in previous .net class
FileStreamIO, HTTPRequest, ...
+ TPL.FromAsync is a wrapper of this pattern
+ ex
request = (HttpWebRequest)HttpWebRequest.Create(url);
var webTask = Task.Factory.FromAsync<WebResponse>(
request.BeginGetResponse,
request.EndGetResponse,
null); //will get the fasade task
var resultTask = webTask.ContinueWith<returnType>(antecedent=>{
return data;
});
and then use waitFirstEnd pattern, don't forget to hook the task exception when cancel the task
check the task.Exception == null property to make sure the task is not canceled and have the result
TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskUnobservedException_Handler);
then use the event arg to set observed
- reference
http://msdn.microsoft.com/concurrency
http://code.msdn.microsoft.com/101-LINQ-Samples-3fg811b
http://blogs.msdn.com/ parallel.Foreach