-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathCompletableFuture.drawio
196 lines (196 loc) · 54.2 KB
/
CompletableFuture.drawio
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
<mxfile host="Electron" modified="2024-11-12T15:37:16.297Z" agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/21.6.5 Chrome/114.0.5735.243 Electron/25.3.1 Safari/537.36" etag="N7HBbq9JW7_uPY1DffAd" version="21.6.5" type="device">
<diagram name="第 1 页" id="1pW2kvjGfipRLgd_ogxb">
<mxGraphModel dx="-2282" dy="598" grid="1" gridSize="10" guides="1" tooltips="1" connect="1" arrows="1" fold="1" page="1" pageScale="1" pageWidth="827" pageHeight="1169" math="0" shadow="0">
<root>
<mxCell id="0" />
<mxCell id="1" parent="0" />
<mxCell id="Jc6cqmUtGkEXaGz-iM8d-1" value="<h1 style="font-size: 16px;"><font style="font-size: 16px;">CompletableFuture 工作原理</font></h1><div style="font-size: 10px;"><font style="font-size: 10px;">这里主要看任务编排原理,主要是理清数据结构。</font></div><div style="font-size: 10px;"><font style="font-size: 10px;"><br></font></div><div style="font-size: 10px;"><font style="font-size: 10px;">测试代码: java-async/asy</font><font style="font-size: 10px;">nc-future/src/main/java/top.kwseeker.async.future.jdAsyncTool.ThreadExhaustionTest.java<br>这个测试研究使用CompletableFuture默认的ForkJoinPool,为何不会因为线程被同步等待的任务占尽导致任务超时?</font></div><div style="font-size: 10px;">看源码发现,同步等待结果阶段(get()), 可能会通过ForkJoinPool.managedBlock(q)<b>突破并行度的限制创建额外的工作者线程</b>。</div><div style="font-size: 11px;"><font style="font-size: 11px;"><br></font></div>" style="text;html=1;strokeColor=none;fillColor=none;spacing=5;spacingTop=-20;whiteSpace=wrap;overflow=hidden;rounded=0;verticalAlign=top;" parent="1" vertex="1">
<mxGeometry x="5000" y="20" width="760" height="100" as="geometry" />
</mxCell>
<mxCell id="Jc6cqmUtGkEXaGz-iM8d-4" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;fontSize=10;" parent="1" source="Jc6cqmUtGkEXaGz-iM8d-2" target="Jc6cqmUtGkEXaGz-iM8d-3" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="Jc6cqmUtGkEXaGz-iM8d-6" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;fontSize=10;" parent="1" source="Jc6cqmUtGkEXaGz-iM8d-2" target="Jc6cqmUtGkEXaGz-iM8d-5" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="Jc6cqmUtGkEXaGz-iM8d-2" value="CompletableFuture&lt;Void&gt; future = CompletableFuture.<b style="font-size: 10px;">runAsync</b>(() -&gt; head.work(forkJoinPool), forkJoinPool);" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;fillColor=#dae8fc;strokeColor=#6c8ebf;" parent="1" vertex="1">
<mxGeometry x="5000" y="260" width="200" height="60" as="geometry" />
</mxCell>
<mxCell id="Jc6cqmUtGkEXaGz-iM8d-9" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;fontSize=10;" parent="1" source="Jc6cqmUtGkEXaGz-iM8d-3" target="Jc6cqmUtGkEXaGz-iM8d-8" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="Jc6cqmUtGkEXaGz-iM8d-3" value="return <b>asyncRunStage</b>(<br>screenExecutor(executor), runnable);" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;" parent="1" vertex="1">
<mxGeometry x="5240" y="260" width="200" height="60" as="geometry" />
</mxCell>
<mxCell id="Jc6cqmUtGkEXaGz-iM8d-5" value="future.get();" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;fillColor=#dae8fc;strokeColor=#6c8ebf;" parent="1" vertex="1">
<mxGeometry x="5000" y="420" width="200" height="60" as="geometry" />
</mxCell>
<mxCell id="Jc6cqmUtGkEXaGz-iM8d-7" value="CompletableFuture" style="text;html=1;align=center;verticalAlign=middle;resizable=0;points=[];autosize=1;strokeColor=none;fillColor=none;fontSize=10;" parent="1" vertex="1">
<mxGeometry x="5285" y="230" width="110" height="30" as="geometry" />
</mxCell>
<mxCell id="Jc6cqmUtGkEXaGz-iM8d-11" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;fontSize=10;" parent="1" source="Jc6cqmUtGkEXaGz-iM8d-8" target="Jc6cqmUtGkEXaGz-iM8d-10" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="Jc6cqmUtGkEXaGz-iM8d-8" value="CompletableFuture&lt;Void&gt; d = new CompletableFuture&lt;Void&gt;();" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;" parent="1" vertex="1">
<mxGeometry x="5480" y="260" width="200" height="60" as="geometry" />
</mxCell>
<mxCell id="Jc6cqmUtGkEXaGz-iM8d-33" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;" parent="1" source="Jc6cqmUtGkEXaGz-iM8d-10" target="Jc6cqmUtGkEXaGz-iM8d-32" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="Jc6cqmUtGkEXaGz-iM8d-10" value="e.<b>execute</b>(new <b>AsyncRun</b>(d, f));<br style="font-size: 10px;">return d;<br><font style="font-size: 9px;" color="#007fff">将任务业务逻辑封装成AsyncRun任务是ForJoinTask的子类,提交给ForkJoinPool执行</font>" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;fillColor=#dae8fc;strokeColor=#6c8ebf;" parent="1" vertex="1">
<mxGeometry x="5480" y="340" width="200" height="60" as="geometry" />
</mxCell>
<mxCell id="Jc6cqmUtGkEXaGz-iM8d-19" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;dashed=1;endArrow=block;endFill=1;" parent="1" source="Jc6cqmUtGkEXaGz-iM8d-12" target="Jc6cqmUtGkEXaGz-iM8d-17" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="Jc6cqmUtGkEXaGz-iM8d-20" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;dashed=1;endArrow=block;endFill=1;" parent="1" source="Jc6cqmUtGkEXaGz-iM8d-12" target="Jc6cqmUtGkEXaGz-iM8d-18" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="Jc6cqmUtGkEXaGz-iM8d-12" value="<p style="margin: 4px 0px 0px; text-align: center; font-size: 10px;"><span style="background-color: initial; font-size: 10px;"><b>CompletableFuture</b>&lt;T&gt; implements Future&lt;T&gt;, CompletionStage&lt;T&gt;</span><br style="font-size: 10px;"></p><hr style="font-size: 10px;"><p style="margin: 0px 0px 0px 4px;"><span style="color: rgb(0, 127, 255); background-color: initial;">//AltResult封装异常结果</span><br></p><p style="margin: 0px 0px 0px 4px;">static final AltResult NIL = new AltResult(null);</p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">//是否使用ForkJoinPool commonPool实现,并行度大于1就使用commonPool</font></p><p style="margin: 0px 0px 0px 4px;">private static final boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism() &gt; 1);</p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">//CF默认使用的线程池</font></p><p style="margin: 0px 0px 0px 4px;">private static final Executor <b>asyncPool</b> = useCommonPool ?&nbsp;</p><p style="margin: 0px 0px 0px 4px;">&nbsp; &nbsp; ForkJoinPool.<b>commonPool</b>() : new ThreadPerTaskExecutor();</p><p style="margin: 0px 0px 0px 4px;">static final int SYNC&nbsp; &nbsp;=&nbsp; 0;</p><p style="margin: 0px 0px 0px 4px;">static final int ASYNC&nbsp; =&nbsp; 1;</p><p style="margin: 0px 0px 0px 4px;">static final int NESTED = -1;</p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">//</font></p><p style="margin: 0px 0px 0px 4px;">private static final int SPINS = (Runtime.getRuntime().availableProcessors() &gt; 1 ? 1 &lt;&lt; 8 : 0);</p><p style="margin: 0px 0px 0px 4px;">private static final sun.misc.Unsafe UNSAFE;</p><p style="margin: 0px 0px 0px 4px;">private static final long RESULT;</p><p style="margin: 0px 0px 0px 4px;">private static final long STACK;</p><p style="margin: 0px 0px 0px 4px;">private static final long NEXT;</p><p style="margin: 0px 0px 0px 4px;"><br></p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">//Either the result or boxed AltResult</font></p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">//存储执行结果</font></p><p style="margin: 0px 0px 0px 4px;">volatile Object <b>result</b>;</p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">//Completion是依赖的任务,通过链表组成栈结构,stack指向栈顶任务</font></p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">//即每个CompletableFuture内部包含一个任务栈</font></p><p style="margin: 0px 0px 0px 4px;"><span style="background-color: initial;">volatile </span><b style="background-color: initial;">Completion</b><span style="background-color: initial;"> </span><b style="background-color: initial;">stack</b><span style="background-color: initial;">;</span><br></p><hr style="font-size: 10px;"><p style="margin: 0px 0px 0px 4px; font-size: 10px;"><br></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=10;fontFamily=Helvetica;html=1;whiteSpace=wrap;" parent="1" vertex="1">
<mxGeometry x="4480" y="240" width="440" height="360" as="geometry" />
</mxCell>
<mxCell id="Jc6cqmUtGkEXaGz-iM8d-17" value="<p style="margin: 4px 0px 0px; text-align: center;">Future&lt;V&gt;<br style="font-size: 10px;"></p><hr style="font-size: 10px;"><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">同步等待读取异步处理结果的接口</font></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=10;fontFamily=Helvetica;html=1;whiteSpace=wrap;" parent="1" vertex="1">
<mxGeometry x="4720" y="160" width="200" height="40" as="geometry" />
</mxCell>
<mxCell id="Jc6cqmUtGkEXaGz-iM8d-18" value="<p style="margin: 4px 0px 0px; text-align: center;">CompletionStage&lt;T&gt;<br style="font-size: 10px;"></p><hr style="font-size: 10px;"><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">任务编排接口实现</font></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=10;fontFamily=Helvetica;html=1;whiteSpace=wrap;" parent="1" vertex="1">
<mxGeometry x="4480" y="160" width="200" height="40" as="geometry" />
</mxCell>
<mxCell id="Jc6cqmUtGkEXaGz-iM8d-27" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;" parent="1" source="Jc6cqmUtGkEXaGz-iM8d-25" target="Jc6cqmUtGkEXaGz-iM8d-26" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="Jc6cqmUtGkEXaGz-iM8d-28" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=1;exitDx=0;exitDy=0;" parent="1" source="Jc6cqmUtGkEXaGz-iM8d-25" target="Jc6cqmUtGkEXaGz-iM8d-2" edge="1">
<mxGeometry relative="1" as="geometry">
<mxPoint x="5100" y="260" as="targetPoint" />
</mxGeometry>
</mxCell>
<mxCell id="Jc6cqmUtGkEXaGz-iM8d-25" value="测试初始化" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;" parent="1" vertex="1">
<mxGeometry x="5000" y="160" width="200" height="60" as="geometry" />
</mxCell>
<mxCell id="Jc6cqmUtGkEXaGz-iM8d-26" value="static final ExecutorService forkJoinPool = new ForkJoinPool(2);<br><font color="#007fff">参考 forkjoinpool 的工作流程图</font>" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;fillColor=#dae8fc;strokeColor=#6c8ebf;" parent="1" vertex="1">
<mxGeometry x="5240" y="160" width="200" height="60" as="geometry" />
</mxCell>
<mxCell id="Jc6cqmUtGkEXaGz-iM8d-31" value="<p style="margin: 4px 0px 0px; text-align: center;"><b>AsyncRun</b>&nbsp;extends ForkJoinTask&lt;Void&gt;<br>implements Runnable, AsynchronousCompletionTask<br style="font-size: 10px;"></p><hr style="font-size: 10px;"><p style="margin: 0px 0px 0px 4px;"><span style="background-color: initial;"><font color="#007fff">封装了CompletableFuture的ForkJoinTask,实现很简单只是在ForkJoinTask基础上对run()后添加了completeNull() completeThrowable() postComplete() 等操作</font></span></p><p style="margin: 0px 0px 0px 4px;"><span style="background-color: initial;"><font color="#007fff"><br></font></span></p><p style="margin: 0px 0px 0px 4px;"><span style="background-color: initial;">CompletableFuture&lt;Void&gt; <b>dep</b>;&nbsp;</span><br></p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">//任务的业务方法</font></p><p style="margin: 0px 0px 0px 4px;">Runnable <b>fn</b>;<br></p><hr style="font-size: 10px;"><p style="margin: 0px 0px 0px 4px; font-size: 10px;"><br></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=10;fontFamily=Helvetica;html=1;whiteSpace=wrap;" parent="1" vertex="1">
<mxGeometry x="4000" y="240" width="440" height="160" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-2" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;" parent="1" source="Jc6cqmUtGkEXaGz-iM8d-32" target="sVAW3ZzdPzbu4NNdgHBk-1" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-3" value="<font color="#007fff">参考ForkJoinPool的流程图</font>" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="sVAW3ZzdPzbu4NNdgHBk-2" vertex="1" connectable="0">
<mxGeometry x="-0.0643" y="-2" relative="1" as="geometry">
<mxPoint as="offset" />
</mxGeometry>
</mxCell>
<mxCell id="Jc6cqmUtGkEXaGz-iM8d-32" value="ForkJoinPool#<b>execute</b>(Runnable task)<br><font color="#007fff">这里task是AsyncRun实例</font>" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;fillColor=#d5e8d4;strokeColor=#82b366;" parent="1" vertex="1">
<mxGeometry x="5720" y="340" width="200" height="60" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-5" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;" parent="1" source="sVAW3ZzdPzbu4NNdgHBk-1" target="sVAW3ZzdPzbu4NNdgHBk-4" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-1" value="CompletableFuture#<b>run</b>()" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;" parent="1" vertex="1">
<mxGeometry x="6200" y="340" width="200" height="60" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-8" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;" parent="1" source="sVAW3ZzdPzbu4NNdgHBk-4" target="sVAW3ZzdPzbu4NNdgHBk-7" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-4" value="<div>if (d.result == null) {</div><div>&nbsp; &nbsp; try {</div><div>&nbsp; &nbsp; &nbsp; &nbsp; f.<b>run</b>(); <font color="#007fff">//执行自定义任务</font></div><div>&nbsp; &nbsp; &nbsp; &nbsp; d.<b>completeNull</b>();</div><div>&nbsp; &nbsp; } catch (Throwable ex) {</div><div>&nbsp; &nbsp; &nbsp; &nbsp; d.completeThrowable(ex);</div><div>&nbsp; &nbsp; }</div><div>}</div><div style="">d.<b>postComplete</b>();</div>" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;align=left;arcSize=5;" parent="1" vertex="1">
<mxGeometry x="6440" y="310" width="200" height="120" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-6" value="<font color="#007fff">任务会被提交到ForkJoinPool偶数工作队列<br>然后唤醒线程偷取任务执行</font>" style="text;html=1;align=center;verticalAlign=middle;resizable=0;points=[];autosize=1;strokeColor=none;fillColor=none;" parent="1" vertex="1">
<mxGeometry x="5930" y="383" width="250" height="40" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-10" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;" parent="1" source="sVAW3ZzdPzbu4NNdgHBk-7" target="sVAW3ZzdPzbu4NNdgHBk-9" edge="1">
<mxGeometry relative="1" as="geometry">
<Array as="points">
<mxPoint x="6820" y="450" />
<mxPoint x="5820" y="450" />
</Array>
</mxGeometry>
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-7" value="<div>CompletableFuture&lt;?&gt;[] futures =&nbsp;</div><div>new CompletableFuture[next.size()];</div><div>for (int i = 0; i &lt; next.size(); i++) {</div><div>&nbsp; &nbsp; Task task = next.get(i);</div><div>&nbsp; &nbsp; CompletableFuture&lt;Void&gt; future = CompletableFuture</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .<b>runAsync</b>(() -&gt; task.work(executor), executor);</div><div>&nbsp; &nbsp; futures[i] = future;</div><div>}</div><div><font color="#007fff">任务里面提交新任务</font></div>" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;align=left;arcSize=5;" parent="1" vertex="1">
<mxGeometry x="6680" y="310" width="280" height="120" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-12" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;" parent="1" source="sVAW3ZzdPzbu4NNdgHBk-9" target="sVAW3ZzdPzbu4NNdgHBk-11" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-25" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" parent="1" source="sVAW3ZzdPzbu4NNdgHBk-9" target="sVAW3ZzdPzbu4NNdgHBk-27" edge="1">
<mxGeometry relative="1" as="geometry">
<mxPoint x="5960" y="830" as="targetPoint" />
<Array as="points">
<mxPoint x="5940" y="610" />
<mxPoint x="5940" y="830" />
</Array>
</mxGeometry>
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-9" value="CompletableFuture.<b>allOf</b>(futures)<br>.<b>get</b>();<br><font color="#007fff">阻塞等待提交的所有新任务执行完毕<br>会阻塞当前工作者线程<br></font>" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;align=center;arcSize=13;fillColor=#ffe6cc;strokeColor=#d79b00;" parent="1" vertex="1">
<mxGeometry x="5720" y="580" width="200" height="60" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-14" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;" parent="1" source="sVAW3ZzdPzbu4NNdgHBk-11" target="sVAW3ZzdPzbu4NNdgHBk-13" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-11" value="return andTree(cfs, 0, cfs.length - 1);<br><font color="#007fff">递归构建Completion树,返回新建的CompletableFuture</font>" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;align=center;arcSize=13;" parent="1" vertex="1">
<mxGeometry x="5960" y="580" width="200" height="60" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-13" value="<div>static CompletableFuture&lt;Void&gt; andTree(CompletableFuture&lt;?&gt;[] cfs,</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; int lo, int hi) {</div><div>&nbsp; &nbsp; CompletableFuture&lt;Void&gt; d = new CompletableFuture&lt;Void&gt;();</div><div>&nbsp; &nbsp; if (lo &gt; hi) // empty</div><div>&nbsp; &nbsp; &nbsp; &nbsp; d.result = NIL;</div><div>&nbsp; &nbsp; else {</div><div>&nbsp; &nbsp; &nbsp; &nbsp; CompletableFuture&lt;?&gt; a, b;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; int mid = (lo + hi) &gt;&gt;&gt; 1;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; if ((a = (<b>lo == mid</b> ? cfs[lo] :</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <b>andTree</b>(cfs, lo, mid))) == null ||</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; (b = (lo == hi ? a : (<b>hi == mid+1</b>) ? cfs[hi] :</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <b>andTree</b>(cfs, mid+1, hi)))&nbsp; == null)</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; throw new NullPointerException();</div><div>&nbsp; &nbsp; &nbsp; &nbsp; if (!d.<b>biRelay</b>(a, b)) {<font color="#007fff"> //a,b都已经执行完毕且当前CF result为空会返回true</font></div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; BiRelay&lt;?,?&gt; c = new <b>BiRelay</b>&lt;&gt;(d, a, b);</div><div><font color="#007fff"><span style="white-space: pre;">	</span>&nbsp; &nbsp; //将c推送到a、b的栈顶位置,方法较简单不细讲了<br></font></div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; a.<b>bipush</b>(b, c);</div><div><span style="white-space: pre;">	</span>&nbsp; &nbsp; <font color="#007fff">//</font><br></div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; c.<b>tryFire</b>(SYNC);</div><div>&nbsp; &nbsp; &nbsp; &nbsp; }</div><div>&nbsp; &nbsp; }</div><div>&nbsp; &nbsp; return d;</div><div>}</div><div><font color="#007fff">relay: 是中继的意思</font></div>" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;align=left;arcSize=2;" parent="1" vertex="1">
<mxGeometry x="6200" y="460" width="320" height="300" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-16" value="<font color="#007fff" style="font-size: 10px;">比如有4个CompletableFuture,&nbsp;<br>第一层递归拆分2个 andTree(0,1) andTree(2,3)<br>第二层判断 d.biRelay(cfs[0], cfs[1]) d.biRelay(cfs[2], cfs[3])&nbsp;<br>创建 BiRelay<br></font>" style="text;html=1;strokeColor=none;fillColor=none;align=left;verticalAlign=middle;whiteSpace=wrap;rounded=0;" parent="1" vertex="1">
<mxGeometry x="6530" y="520" width="270" height="150" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-23" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;endArrow=block;endFill=0;" parent="1" source="sVAW3ZzdPzbu4NNdgHBk-17" target="sVAW3ZzdPzbu4NNdgHBk-21" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-17" value="<p style="margin: 4px 0px 0px; text-align: center;"><b>BiCompletion</b><b style="background-color: initial;">&lt;T,U,Void&gt;</b></p><hr style="font-size: 10px;"><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">A Completion for an action with two sources</font><br></p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff"><br></font></p><p style="margin: 0px 0px 0px 4px;">CompletableFuture&lt;U&gt; <b>snd</b>;&nbsp;<br></p><hr style="font-size: 10px;"><p style="margin: 0px 0px 0px 4px; font-size: 10px;"><br></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=10;fontFamily=Helvetica;html=1;whiteSpace=wrap;" parent="1" vertex="1">
<mxGeometry x="4000" y="800" width="440" height="100" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-19" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;endArrow=block;endFill=0;" parent="1" source="sVAW3ZzdPzbu4NNdgHBk-18" target="sVAW3ZzdPzbu4NNdgHBk-17" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-18" value="<p style="margin: 4px 0px 0px; text-align: center;"><b>BiRelay&lt;T,U&gt;</b><br style="font-size: 10px;"></p><hr style="font-size: 10px;"><p style="margin: 0px 0px 0px 4px;"><br></p><hr style="font-size: 10px;"><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">//本质是实现ForkJoinTask的exec方法</font></p><p style="margin: 0px 0px 0px 4px;">final CompletableFuture&lt;Void&gt; <b>tryFire</b>(int mode)<br></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=10;fontFamily=Helvetica;html=1;whiteSpace=wrap;" parent="1" vertex="1">
<mxGeometry x="4000" y="960" width="440" height="80" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-20" value="<p style="margin: 4px 0px 0px; text-align: center;"><b>Completion </b>extends ForkJoinTask&lt;Void&gt;&nbsp;<br>implements Runnable, AsynchronousCompletionTask<br></p><hr style="font-size: 10px;"><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">Completion本质就是一个ForkJoinTask</font></p><p style="margin: 0px 0px 0px 4px;"><span style="background-color: initial;"><br></span></p><p style="margin: 0px 0px 0px 4px;"><span style="background-color: initial;"><font color="#007fff">//指向栈中的下一个元素,即用链表实现的栈</font></span></p><p style="margin: 0px 0px 0px 4px;"><span style="background-color: initial;">volatile Completion <b>next</b>;</span><br></p><hr style="font-size: 10px;"><p style="margin: 0px 0px 0px 4px;">abstract CompletableFuture&lt;?&gt; <b>tryFire</b>(int mode);<br></p><p style="margin: 0px 0px 0px 4px;">abstract boolean <b>isLive</b>();<br></p><p style="margin: 0px 0px 0px 4px;">public final void run()&nbsp; { tryFire(ASYNC); }<br></p><p style="margin: 0px 0px 0px 4px;">public final boolean <b>exec</b>() { tryFire(ASYNC); return true; }<br></p><p style="margin: 0px 0px 0px 4px;">public final Void getRawResult()&nbsp; { return null; }<br></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=10;fontFamily=Helvetica;html=1;whiteSpace=wrap;" parent="1" vertex="1">
<mxGeometry x="4000" y="440" width="440" height="160" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-22" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;endArrow=block;endFill=0;" parent="1" source="sVAW3ZzdPzbu4NNdgHBk-21" target="sVAW3ZzdPzbu4NNdgHBk-20" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-21" value="<p style="margin: 4px 0px 0px; text-align: center;"><b>UniCompletion&lt;T,V&gt;</b><br></p><hr style="font-size: 10px;"><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">A Completion with a source, dependent, and executor.</font><br></p><p style="margin: 0px 0px 0px 4px;"><span style="background-color: initial;"><font color="#007fff"><br></font></span></p><p style="margin: 0px 0px 0px 4px;">Executor <b>executor</b>;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// executor to use (null if none)</p><p style="margin: 0px 0px 0px 4px;">CompletableFuture&lt;V&gt; <b>dep</b>;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // the dependent to complete</p><p style="margin: 0px 0px 0px 4px;">CompletableFuture&lt;T&gt; <b>src</b>;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // source for action</p><hr style="font-size: 10px;"><p style="margin: 0px 0px 0px 4px; font-size: 10px;"><br></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=10;fontFamily=Helvetica;html=1;whiteSpace=wrap;" parent="1" vertex="1">
<mxGeometry x="4000" y="640" width="440" height="120" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-26" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;" parent="1" source="sVAW3ZzdPzbu4NNdgHBk-27" target="sVAW3ZzdPzbu4NNdgHBk-28" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-27" value="return reportGet((r = result) == null ? <b>waitingGet</b>(true) : r);" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;" parent="1" vertex="1">
<mxGeometry x="5960" y="800" width="200" height="60" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-33" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;" parent="1" source="sVAW3ZzdPzbu4NNdgHBk-28" target="sVAW3ZzdPzbu4NNdgHBk-32" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-28" value="CompletableFuture#<br><b>waitingGet</b>(boolean interruptible)<br><font color="#007fff">自旋等待result写入结果</font>" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;" parent="1" vertex="1">
<mxGeometry x="6200" y="800" width="200" height="60" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-31" value="<p style="margin: 4px 0px 0px; text-align: center;"><b>Signaller </b>extends Completion</p><p style="margin: 4px 0px 0px; text-align: center;">implements ForkJoinPool.ManagedBlocker</p><hr style="font-size: 10px;"><p style="margin: 0px 0px 0px 4px;"><font color="#007fff"><b>实现了ManagedBlocker用于避免阻塞操作在ForkJoinPool中堆积引起饥饿,</b>signaller是“信号员”的意思,感觉不太契合它的功能。</font></p><p style="margin: 0px 0px 0px 4px;"><br></p><p style="margin: 0px 0px 0px 4px;"><span style="background-color: initial;"><font color="#007fff">//指向栈中的下一个元素,即用链表实现的栈</font></span></p><p style="margin: 0px 0px 0px 4px;">long <b>nanos</b>;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // wait time if timed</p><p style="margin: 0px 0px 0px 4px;">final long <b>deadline</b>;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// non-zero if timed</p><p style="margin: 0px 0px 0px 4px;">volatile int <b>interruptControl</b>; // &gt; 0: interruptible, &lt; 0: interrupted</p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">//初始为当前线程</font></p><p style="margin: 0px 0px 0px 4px;">volatile Thread <b>thread</b>;</p><hr style="font-size: 10px;"><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">//thread不为空就唤醒线程</font></p><p style="margin: 0px 0px 0px 4px;">final CompletableFuture&lt;?&gt; <b>tryFire</b>(int ignore)</p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">//任务是否可以被释放,thread为空、线程被中断、deadline超时都可以释放</font></p><p style="margin: 0px 0px 0px 4px;">public boolean isReleasable()<br></p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">//任务还不可以被释放的情况下,就挂起当前线程,被唤醒后再返回是否可以被释放</font></p><p style="margin: 0px 0px 0px 4px;">public boolean block()<br></p><p style="margin: 0px 0px 0px 4px;"><font color="#007fff">//thread是否非空</font></p><p style="margin: 0px 0px 0px 4px;">final boolean isLive()</p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=10;fontFamily=Helvetica;html=1;whiteSpace=wrap;" parent="1" vertex="1">
<mxGeometry x="4480" y="640" width="440" height="280" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-36" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;" parent="1" source="sVAW3ZzdPzbu4NNdgHBk-32" target="sVAW3ZzdPzbu4NNdgHBk-35" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-32" value="<div>private Object <b>waitingGet</b>(boolean interruptible) {</div><div>&nbsp; &nbsp; Signaller q = null;</div><div>&nbsp; &nbsp; boolean queued = false;</div><div>&nbsp; &nbsp; int spins = -1;</div><div>&nbsp; &nbsp; Object r;</div><div>&nbsp; &nbsp; <b>while</b> ((r = result) == null) {</div><div><span style="white-space: pre;">	</span><font color="#007fff">//1 自旋等待</font><br></div><div>&nbsp; &nbsp; &nbsp; &nbsp; if (spins &lt; 0)</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; spins = SPINS;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; else if (spins &gt; 0) {</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (ThreadLocalRandom.nextSecondarySeed() &gt;= 0)</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; --spins;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; }</div><div><font color="#007fff"><span style="white-space: pre;">	</span>//2 创建Signaller并推送到当前等待的CF的栈顶<br></font></div><div>&nbsp; &nbsp; &nbsp; &nbsp; else if (q == null)</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; q = new <b>Signaller</b>(interruptible, 0L, 0L);</div><div>&nbsp; &nbsp; &nbsp; &nbsp; else if (!queued)</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; queued = tryPushStack(q);</div><div><font color="#007fff"><span style="white-space: pre;">	</span>//3 如果Signaller被中断,执行清理操作<br></font></div><div>&nbsp; &nbsp; &nbsp; &nbsp; else if (interruptible &amp;&amp; q.<b>interruptControl</b> &lt; 0) {</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; q.thread = null;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; cleanStack();</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return null;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; }</div><div><font color="#007fff"><span style="white-space: pre;">	</span>//4 注释说:运行可能阻塞的任务q</font></div><div>&nbsp; &nbsp; &nbsp; &nbsp; else if (q.thread != null &amp;&amp; result == null) {</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; try {</div><div><font color="#007fff"><span style="white-space: pre;">	</span><span style="white-space: pre;">	</span>//主要就是封装阻塞,拓展功能:可以避免因为阻塞导致工作者线程被占尽<br></font></div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ForkJoinPool.<b>managedBlock</b>(q);</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } catch (InterruptedException ie) {</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; q.interruptControl = -1;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }</div><div>&nbsp; &nbsp; &nbsp; &nbsp; }</div><div>&nbsp; &nbsp; }</div><div><br></div><div>&nbsp; &nbsp; if (q != null) {</div><div>&nbsp; &nbsp; &nbsp; &nbsp; q.thread = null;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; if (q.interruptControl &lt; 0) {</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (interruptible)</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; r = null; // report interruption</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; else</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Thread.currentThread().interrupt();</div><div>&nbsp; &nbsp; &nbsp; &nbsp; }</div><div>&nbsp; &nbsp; }</div><div>&nbsp; &nbsp; postComplete();</div><div>&nbsp; &nbsp; return r;</div><div>}</div>" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;arcSize=2;align=left;" parent="1" vertex="1">
<mxGeometry x="6440" y="800" width="280" height="580" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-39" value="" style="edgeStyle=orthogonalEdgeStyle;rounded=1;orthogonalLoop=1;jettySize=auto;html=1;" parent="1" source="sVAW3ZzdPzbu4NNdgHBk-35" target="sVAW3ZzdPzbu4NNdgHBk-38" edge="1">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-35" value="<div><font color="#007fff">//managedBlock() 即将阻塞条件封装了起来并提供了补偿处理</font></div><div>public static void <b>managedBlock</b>(ManagedBlocker blocker)</div><div>&nbsp; &nbsp; throws InterruptedException {</div><div>&nbsp; &nbsp; ForkJoinPool p;</div><div>&nbsp; &nbsp; ForkJoinWorkerThread wt;</div><div>&nbsp; &nbsp; Thread t = Thread.currentThread();</div><div>&nbsp; &nbsp; if ((t instanceof ForkJoinWorkerThread) &amp;&amp;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; (p = (wt = (ForkJoinWorkerThread)t).pool) != null) {</div><div>&nbsp; &nbsp; &nbsp; &nbsp; WorkQueue <b>w</b> = wt.workQueue;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; while (!blocker.isReleasable()) {</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (p.<b>tryCompensate</b>(w)) { <font color="#007fff">//最主要的是里面可能创建额外的工作者线程</font></div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; try {</div><div><font color="#007fff"><span style=""><span style="white-space: pre;">&nbsp;&nbsp;&nbsp;&nbsp;</span></span><span style=""><span style="white-space: pre;">&nbsp;&nbsp;&nbsp;&nbsp;</span></span>&nbsp; &nbsp; <span style="white-space: pre;">	</span>&nbsp; &nbsp; //当前任务不可释放就进入等待,被唤醒后如果仍不可释放就继续等待<br></font></div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; do {} while (!blocker.isReleasable() &amp;&amp;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; !blocker.<b>block</b>());<font color="#007fff">&nbsp;</font></div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } finally {</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; U.getAndAddLong(p, CTL, AC_UNIT);</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }</div><div>&nbsp; &nbsp; &nbsp; &nbsp; }</div><div>&nbsp; &nbsp; }</div><div>&nbsp; &nbsp; else {</div><div>&nbsp; &nbsp; &nbsp; &nbsp; do {} while (!blocker.isReleasable() &amp;&amp;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; !blocker.<b>block</b>());</div><div>&nbsp; &nbsp; }</div><div>}</div>" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;arcSize=2;align=left;" parent="1" vertex="1">
<mxGeometry x="6760" y="800" width="280" height="360" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-37" value="ForkJoinPool" style="text;html=1;align=center;verticalAlign=middle;resizable=0;points=[];autosize=1;strokeColor=none;fillColor=none;" parent="1" vertex="1">
<mxGeometry x="6855" y="770" width="90" height="30" as="geometry" />
</mxCell>
<mxCell id="sVAW3ZzdPzbu4NNdgHBk-38" value="<div><font color="#007fff">//尝试缩减活动线程数量,可能释放或创建一个补偿线程</font></div><div>private boolean <b>tryCompensate</b>(WorkQueue w) {</div><div>&nbsp; &nbsp; boolean canBlock;</div><div>&nbsp; &nbsp; WorkQueue[] ws; long c; int m, pc, sp;</div><div>&nbsp; &nbsp; if (w == null || w.qlock &lt; 0 ||&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// caller terminating</div><div>&nbsp; &nbsp; &nbsp; &nbsp; (ws = workQueues) == null || (m = ws.length - 1) &lt;= 0 ||</div><div>&nbsp; &nbsp; &nbsp; &nbsp; (pc = config &amp; SMASK) == 0)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// parallelism disabled</div><div>&nbsp; &nbsp; &nbsp; &nbsp; canBlock = false;</div><div>&nbsp; &nbsp; else if ((sp = (int)(c = ctl)) != 0)&nbsp; &nbsp; &nbsp; <font color="#007fff">// 存在空闲的线程,释放空闲的线程</font></div><div>&nbsp; &nbsp; &nbsp; &nbsp; canBlock = <b>tryRelease</b>(c, ws[sp &amp; m], 0L);</div><div>&nbsp; &nbsp; else {</div><div><font color="#007fff"><span style=""><span style="white-space: pre;">&nbsp;&nbsp;&nbsp;&nbsp;<span style="white-space: pre;">	</span></span></span>//pc是并行度<br></font></div><div>&nbsp; &nbsp; &nbsp; &nbsp; int <b>ac</b> = (int)(c &gt;&gt; AC_SHIFT) + pc;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; int <b>tc</b> = (short)(c &gt;&gt; TC_SHIFT) + pc; <font color="#007fff"><b>//TC段是“-并行度”,再+pc即实际已经创建的工作者线程数量</b></font></div><div>&nbsp; &nbsp; &nbsp; &nbsp; int <b>nbusy</b> = 0;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // validate saturation</div><div><font color="#007fff"><span style="white-space: pre;">	</span>//遍历奇数索引的工作队列(即与线程绑定的工作队列),统计处于繁忙状态(即正在执行任务)的工作队列的数量<br></font></div><div>&nbsp; &nbsp; &nbsp; &nbsp; for (int i = 0; i &lt;= m; ++i) {&nbsp; &nbsp; &nbsp; &nbsp; // two passes of odd indices</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; WorkQueue v;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if ((v = ws[((i &lt;&lt; 1) | 1) &amp; m]) != null) {<span style="white-space: pre;">	</span><font color="#007fff">//只统计奇数索引的队列,每个统计两次</font></div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if ((v.scanState &amp; SCANNING) != 0)</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ++nbusy;<span style="white-space: pre;">	</span><font color="#007fff">//繁忙状态,统计+1</font></div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }</div><div>&nbsp; &nbsp; &nbsp; &nbsp; }</div><div><span style="white-space: pre;">	</span><font color="#007fff">//<b>如果 nbusy == tc&lt;&lt;1, 说明所有工作者线程都繁忙</b></font><br></div><div>&nbsp; &nbsp; &nbsp; &nbsp; if (nbusy != (tc &lt;&lt; 1) || ctl != c)</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; canBlock = false;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// unstable or stale</div><div><font color="#007fff"><span style="white-space: pre;">	</span>//所有工作者线程都繁忙且实际线程数量已经超出并行度限制<br></font></div><div>&nbsp; &nbsp; &nbsp; &nbsp; else if (tc &gt;= pc &amp;&amp; ac &gt; 1 &amp;&amp; w.isEmpty()) {</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; long nc = ((AC_MASK &amp; (c - AC_UNIT)) |</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; (~AC_MASK &amp; c));&nbsp; &nbsp; &nbsp; &nbsp;// uncompensated, <font color="#007fff">AC段“-1”</font></div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; canBlock = U.compareAndSwapLong(this, CTL, c, nc);</div><div>&nbsp; &nbsp; &nbsp; &nbsp; }</div><div><font color="#007fff"><span style="white-space: pre;">	</span>//超出最大线程数量</font></div><div>&nbsp; &nbsp; &nbsp; &nbsp; else if (tc &gt;= MAX_CAP ||</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; (this == common &amp;&amp; tc &gt;= pc + commonMaxSpares))</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; throw new RejectedExecutionException(</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "Thread limit exceeded replacing blocked worker");</div><div>&nbsp; &nbsp; &nbsp; &nbsp; else {<span style="white-space: pre;">	</span><font color="#007fff">// similar to tryAddWorker,<b>突破并行度限制继续创建线程</b></font></div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; boolean add = false; int rs;&nbsp; &nbsp; &nbsp; // CAS within lock</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; long nc = ((AC_MASK &amp; c) |</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; (TC_MASK &amp; (c + TC_UNIT)));</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (((rs = lockRunState()) &amp; STOP) == 0)</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; add = U.compareAndSwapLong(this, CTL, c, nc);</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; unlockRunState(rs, rs &amp; ~RSLOCK);</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; canBlock = add &amp;&amp; <b>createWorker</b>(); <font color="#007fff">//!!!</font></div><div>&nbsp; &nbsp; &nbsp; &nbsp; }</div><div>&nbsp; &nbsp; }</div><div>&nbsp; &nbsp; return canBlock;</div><div>}</div>" style="rounded=1;whiteSpace=wrap;html=1;fontSize=10;arcSize=1;align=left;" parent="1" vertex="1">
<mxGeometry x="7080" y="800" width="360" height="640" as="geometry" />
</mxCell>
</root>
</mxGraphModel>
</diagram>
</mxfile>