-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSpark.html
1612 lines (1180 loc) · 218 KB
/
Spark.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
<!DOCTYPE HTML>
<html lang="zh-hans" >
<head>
<meta charset="UTF-8">
<meta content="text/html; charset=utf-8" http-equiv="Content-Type">
<title>Spark · 大数据技术与算法Checklist</title>
<meta http-equiv="X-UA-Compatible" content="IE=edge" />
<meta name="description" content="">
<meta name="generator" content="GitBook 3.2.3">
<meta name="author" content="powerlee">
<link rel="stylesheet" href="gitbook/style.css">
<link rel="stylesheet" href="gitbook/gitbook-plugin-search-pro/search.css">
<link rel="stylesheet" href="gitbook/gitbook-plugin-prism/prism.css">
<link rel="stylesheet" href="gitbook/gitbook-plugin-expandable-chapters/expandable-chapters.css">
<link rel="stylesheet" href="gitbook/gitbook-plugin-chapter-fold/chapter-fold.css">
<link rel="stylesheet" href="gitbook/gitbook-plugin-splitter/splitter.css">
<link rel="stylesheet" href="gitbook/gitbook-plugin-back-to-top-button/plugin.css">
<link rel="stylesheet" href="gitbook/gitbook-plugin-advanced-emoji/emoji-website.css">
<link rel="stylesheet" href="gitbook/gitbook-plugin-insert-logo/plugin.css">
<link rel="stylesheet" href="gitbook/gitbook-plugin-pageview-count/plugin.css">
<link rel="stylesheet" href="gitbook/gitbook-plugin-flexible-alerts/style.css">
<link rel="stylesheet" href="gitbook/gitbook-plugin-katex-new/katex.min.css">
<link rel="stylesheet" href="https://unpkg.com/gitalk/dist/gitalk.css">
<link rel="stylesheet" href="gitbook/gitbook-plugin-fontsettings/website.css">
<link rel="stylesheet" href="gitbook/gitbook-plugin-theme-comscore/test.css">
<link rel="stylesheet" href="styles/website.css">
<meta name="HandheldFriendly" content="true"/>
<meta name="viewport" content="width=device-width, initial-scale=1, user-scalable=no">
<meta name="apple-mobile-web-app-capable" content="yes">
<meta name="apple-mobile-web-app-status-bar-style" content="black">
<link rel="apple-touch-icon-precomposed" sizes="152x152" href="gitbook/images/apple-touch-icon-precomposed-152.png">
<link rel="shortcut icon" href="gitbook/images/favicon.ico" type="image/x-icon">
<link rel="next" href="Flink.md" />
<link rel="prev" href="Kafka.md" />
</head>
<body>
<div class="book">
<div class="book-summary">
<div id="book-search-input" role="search">
<input type="text" placeholder="输入并搜索" />
</div>
<nav role="navigation">
<ul class="summary">
<li class="chapter " data-level="1.1" data-path="./">
<a href="./">
Introduction
</a>
</li>
<li class="chapter " data-level="1.2" data-path="GradientDescent.html">
<a href="GradientDescent.html">
梯度下降
</a>
</li>
<li class="chapter " data-level="1.3" data-path="LinearRegression.html">
<a href="LinearRegression.html">
线性回归
</a>
</li>
<li class="chapter " data-level="1.4" data-path="LogisticRegression.html">
<a href="LogisticRegression.html">
逻辑回归
</a>
</li>
<li class="chapter " data-level="1.5" data-path="DecisionTree.html">
<a href="DecisionTree.html">
决策树
</a>
</li>
<li class="chapter " data-level="1.6" data-path="ABTest.html">
<a href="ABTest.html">
A/B实验
</a>
</li>
<li class="chapter " data-level="1.7" data-path="TimeSeries.html">
<a href="TimeSeries.html">
时间序列
</a>
</li>
<li class="chapter " data-level="1.8" data-path="DimensionalModeling.html">
<a href="DimensionalModeling.html">
维度建模
</a>
</li>
<li class="chapter " data-level="1.9" data-path="UserBehaviorsAnalysisPlatform.html">
<a href="UserBehaviorsAnalysisPlatform.html">
用户行为分析平台
</a>
</li>
<li class="chapter " data-level="1.10" data-path="SQLKeypoints.html">
<a href="SQLKeypoints.html">
SQL要点
</a>
</li>
<li class="chapter " data-level="1.11" data-path="SQLCases.html">
<a href="SQLCases.html">
SQL案例
</a>
</li>
<li class="chapter " data-level="1.12" data-path="Kafka.md">
<span>
Kafka
</a>
</li>
<li class="chapter active" data-level="1.13" data-path="Spark.html">
<a href="Spark.html">
Spark
</a>
</li>
<li class="chapter " data-level="1.14" data-path="Flink.md">
<span>
Flink
</a>
</li>
<li class="chapter " data-level="1.15" data-path="ClickHouse.md">
<span>
ClickHouse
</a>
</li>
<li class="chapter " data-level="1.16" data-path="ProbabilityAndStatistics.html">
<a href="ProbabilityAndStatistics.html">
概率论与数理统计
</a>
</li>
<li class="chapter " data-level="1.17" data-path="PythonVirtualEnv.html">
<a href="PythonVirtualEnv.html">
Python虚拟环境
</a>
</li>
<li class="chapter " data-level="1.18" data-path="SparkLocalInstall.html">
<a href="SparkLocalInstall.html">
Spark本地安装部署
</a>
</li>
<li class="chapter " data-level="1.19" data-path="GitCheatSheet.html">
<a href="GitCheatSheet.html">
Git Cheat Sheet
</a>
</li>
<li class="divider"></li>
<li>
<a href="https://www.gitbook.com" target="blank" class="gitbook-link">
本书使用 GitBook 发布
</a>
</li>
</ul>
</nav>
</div>
<div class="book-body">
<div class="body-inner">
<div class="book-header" role="navigation">
<!-- Title -->
<h1>
<i class="fa fa-circle-o-notch fa-spin"></i>
<a href="." >Spark</a>
</h1>
</div>
<div class="page-wrapper" tabindex="-1" role="main">
<div class="page-inner">
<div id="book-search-results">
<div class="search-noresults">
<section class="normal markdown-section">
<h1 id="spark">Spark</h1>
<h2 id="spark-vs-flink">Spark VS Flink</h2>
<h3 id="相同点">相同点</h3>
<ul>
<li>都基于内存计算;</li>
<li>都有统一的批处理和流处理APl,都支持类似SQL的编程接口;</li>
<li>都支持很多相同的转换操作,编程都是用类似于Scala Collection APl的函数式编程模式;</li>
<li>都有完善的错误恢复机制;</li>
<li>都支持Exactly once的语义一致性。</li>
</ul>
<h3 id="不同点">不同点</h3>
<ul>
<li>从流处理的角度来讲,Spark基于微批量处理,把流数据看成是一个个小的批处理数据块分别处理,所以延迟性只能做到秒级。而Flink基于每个事件处理,每当有新的数据输入都会立刻处理,是真正的流式计算,支持毫秒级计算。由于相同的原因,Spark只支持基于时间的窗口操作(处理时间或者事件时间),而Flink支持的窗口操作则非常灵活,不仅支持时间窗口,还支持基于数据本身的窗口,开发者可以自由定义想要的窗口操作。</li>
<li>从SQL 功能的角度来讲,Spark和Flink分别提供SparkSQL和Table APl提供SQL交互支持。两者相比较,Spark对SQL支持更好,相应的优化、扩展和性能更好,而Flink在SQL支持方面还有很大提升空间。</li>
<li>从迭代计算的角度来讲,Spark对机器学习的支持很好,因为可以在内存中缓存中间计算结果来加速机器学习算法的运行。但是大部分机器学习算法其实是一个有环的数据流,在Spark中,却是用无环图来表示。而Flink支持在运行时间中的有环数据流,从而可以更有效的对机器学习算法进行运算。</li>
<li>从相应的生态系统角度来讲,Spark 的社区无疑更加活跃。Spark可以说有着Apache旗下最多的开源贡献者,而且有很多不同的库来用在不同场景。而Flink由于较新,现阶段的开源社区不如Spark活跃,各种库的功能也不如Spark全面。但是Flink还在不断发展,各种功能也在逐渐完善。</li>
</ul>
<h2 id="spark架构与运行模式">Spark架构与运行模式</h2>
<h3 id="架构">架构</h3>
<p>可以拿Spark的架构和Yarn进行对比,两者比较类似,如下图所示,左边是Yarn,右边是Spark:</p>
<p><img src="pics/Spark/YarnAndSpark.png" alt="image-20230129172704556" style="zoom:80%;"></p>
<p>由上图可以看出,Spark的架构主要由以下四个部分组成:</p>
<ul>
<li>Master:集群资源管理(类同ResourceManager)</li>
<li>Worker:单机资源管理(类同NodeManager)</li>
<li>Driver:单任务管理者(类同ApplicationMaster)</li>
<li>Executor:单任务执行者(类同YARN容器内的Task)</li>
</ul>
<p>实际上在Spark onYarn模式下Spark的Master和Worker会被Yarn中的ResourceManager和NodeManager替代。</p>
<h3 id="运行模式">运行模式</h3>
<p>spark的运行模式有以下几种:</p>
<ul>
<li>Local模式</li>
<li>StandAlone模式</li>
<li>Yarn模式</li>
<li>K8S模式</li>
<li>云服务模式</li>
</ul>
<h4 id="local模式">Local模式</h4>
<p>Local模式就是以一个独立进程配合其内部线程来提供完成Spark运行时环境.Local模式下Master/Worker/Driver由Local进程本身担任,Executor由Local起线程执行。Local模式主要用来学习和测试。以下三种方式均可实现Local模式,核心在于<code>--master</code>需要使用<code>local</code>参数。</p>
<ul>
<li><p><code>bin/spark-submit --master local[N]</code></p>
<p><code>--master</code>参数采用:<code>Local[N]</code> 或<code>Local[*]</code>,其中<code>N</code>代表可以使用N个线程,<code>*</code>按照Cpu最多的Cores设置线程数</p>
</li>
<li><p><code>bin/pyspark</code>:提供一个交互式的Python解释器环境,<code>./bin/pyspark --master yarn --deploy-mode cluster</code>,是否是Local模式,主要看master参数</p>
</li>
<li><p><code>bin/spark-shell</code>:这是一个scala语言的交互式环境</p>
</li>
</ul>
<p><img src="pics/Spark/LocalMode.png" alt="image-20230129171139811" style="zoom:80%;"></p>
<h4 id="standalone模式">StandAlone模式</h4>
<p>StandAlone模式是Spark自带的一种集群模式,不同于前面本地模式启动多个进程来模拟集群的环境,StandAlone模式是真实地在多个机器之间搭建Spark集群的环境,完全可以利用该模式搭建多机器集群,用于实际的大数据处理。</p>
<p>Spark Standalone集群是Master-Slaves架构的集群模式,和大部分的Master-Slaves结构集群一样,存在着Master单点故障(SPOF)的问题。Spark提供了两种<strong>高可用HA</strong>的解决方案:</p>
<ol>
<li><p>基于文件系统的单点恢复(Single-Node Recovery with Local File System)--只能用于开发或测试环境。</p>
</li>
<li><p>基于zookeeper的Standby Masters(Standby Masters with ZooKeeper)--可以用于生产环境。</p>
<p><img src="pics/Spark/StandaloneHAZooKeeper.png" alt="image-20230129172236613" style="zoom:80%;"></p>
</li>
</ol>
<p>在<strong>高可用HA</strong>模式下,多个Master只有一个Active,其它的都Standby。</p>
<h4 id="yarn模式">Yarn模式</h4>
<p>对于Spark On YARN, 无需部署Spark集群, 只要找一台服务器, 充当Spark的客户端, 即可提交任务到YARN集群中运行。
Master角色由YARN的ResourceManager担任。Worker角色由YARN的NodeManager担任。Driver角色运行在YARN容器内或提交任务的客户端进程中。真正干活的Executor运行在YARN提供的容器内。</p>
<p><img src="pics/Spark/SparkOnYarn.png" alt="image-20230129163755020" style="zoom:80%;"></p>
<p>Spark On YARN是有两种运行模式的,一种是Cluster模式一种是Client模式。这两种模式的区别就是Driver运行的位置.</p>
<ul>
<li><p>client:Driver在客户端进程内,比如Driver运行在spark-submit程序的进程中</p>
<p><img src="pics/Spark/DeployModeClient.png" alt="image-20230129170806725" style="zoom:80%;"></p>
</li>
<li><p>cluster:Driver在容器内,和ApplicationMaster在同一个容器内</p>
<p><img src="pics/Spark/DeployModeCluster.png" alt="image-20230129170708212" style="zoom:80%;"></p>
</li>
</ul>
<table>
<thead>
<tr>
<th></th>
<th><strong>Cluster</strong>模式</th>
<th><strong>Client</strong>模式</th>
</tr>
</thead>
<tbody>
<tr>
<td>Driver运行位置</td>
<td>YARN容器内</td>
<td>客户端进程内</td>
</tr>
<tr>
<td>通讯效率</td>
<td>高</td>
<td>低于Cluster模式</td>
</tr>
<tr>
<td>日志查看</td>
<td>日志输出在容器内,查看不方便</td>
<td>日志输出在客户端的标准输出流中,方便查看</td>
</tr>
<tr>
<td>生产可用</td>
<td>推荐</td>
<td>不推荐</td>
</tr>
<tr>
<td>稳定性</td>
<td>稳定</td>
<td>基于客户端进程,受到客户端进程影响</td>
</tr>
</tbody>
</table>
<h3 id="运行端口">运行端口</h3>
<ul>
<li><p>4040:是一个Application在运行的过程中临时绑定的WEB端口,用以查看当前任务的状态。4040是个临时端口,程序运行完按成后会被注销。</p>
</li>
<li><p>8080:是StandAlone模式下,Master进程的WEB端口,用以查看当前Master(集群)的状态。</p>
</li>
<li><p>18080:历史服务绑定的WEB端口。由于每个程序运行完成后,4040端口就被注销了。在以后想回看某个程序的运行状态就可以通过历史服务器查看。历史服务器长期稳定运行,可供随时查看被记录的程序的运行过程。</p>
</li>
</ul>
<h3 id="程序运行层次结构">程序运行层次结构</h3>
<ul>
<li><p>Application</p>
<ul>
<li><p>每个Application包含若干个Job。一般Spark中的action操作(如save、collect),会生成一个Job。</p>
<ul>
<li><p>每个Job有多个Stage。</p>
<ul>
<li>每个Stage由多个task(线程)执行,每个task处理单个Partition上的数据。Task是Spark的最小执行单位。</li>
</ul>
</li>
</ul>
</li>
</ul>
</li>
</ul>
<h2 id="rdd编程">RDD编程</h2>
<h3 id="程序入口">程序入口</h3>
<ul>
<li><p><code>SparkContext</code>:RDD 编程的程序入口对象是<code>SparkContext</code>对象(不论何种编程语言)。只有构建出<code>SparkContext</code>,基于它才能执行后续的API调用和计算。<code>SparkContext</code>对象的构建以及Spark程序的退出,由Driver负责执行。</p>
<ul>
<li>非数据处理的部分由Driver执行</li>
<li>数据处理的部分((干活)由Executor执行</li>
</ul>
<pre class="language-"><code class="lang-python"><span class="token keyword">from</span> pyspark <span class="token keyword">import</span> SparkContext<span class="token punctuation">,</span>SparkConf
conf <span class="token operator">=</span> SparkConf<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span>setAppName<span class="token punctuation">(</span><span class="token string">"tutorial"</span><span class="token punctuation">)</span><span class="token punctuation">.</span>setMaster<span class="token punctuation">(</span><span class="token string">"local[*]"</span><span class="token punctuation">)</span>
sc<span class="token operator">=</span>SparkContext<span class="token punctuation">(</span>conf<span class="token operator">=</span>conf<span class="token punctuation">)</span>
</code></pre>
</li>
<li><p><code>SparkSession</code>:Spark2.0之后作为Spark程序的统一入口</p>
<ul>
<li>RDD编程,可以从<code>SparkSession</code>对象中获取到<code>SparkContext</code></li>
<li><p>SparkSQL直接的编程入口</p>
<pre class="language-"><code class="lang-python"><span class="token keyword">from</span> pyspark<span class="token punctuation">.</span>sql <span class="token keyword">import</span> SparkSession
spark <span class="token operator">=</span> SparkSession<span class="token punctuation">.</span>builder<span class="token punctuation">.</span>appName<span class="token punctuation">(</span><span class="token string">"dataframe"</span><span class="token punctuation">)</span><span class="token punctuation">.</span>master<span class="token punctuation">(</span><span class="token string">"local[*]"</span><span class="token punctuation">)</span><span class="token punctuation">.</span>getOrCreate<span class="token punctuation">(</span><span class="token punctuation">)</span>
sc <span class="token operator">=</span> spark<span class="token punctuation">.</span>sparkContext
</code></pre>
</li>
</ul>
</li>
</ul>
<h3 id="python-on-spark-执行原理">Python On Spark 执行原理</h3>
<p>PySpark宗旨是在不破坏Spark已有的运行时架构,在Spark架构外层包装一层Python API,借助Py4j实现Python和Java的交互,进而实现通过Python编写Spark应用程序,其运行时架构如下图所示。</p>
<p>程序运行时,Driver将Python代码翻译成JVM可执行的代码;Executor不翻译,直接跑Python代码。所以需要在Executor所在的所有机器上安装需要的Python库。</p>
<p><img src="pics/Spark/PythonOnSpark.png" alt="image-20230129182547605" style="zoom:70%;"></p>
<h3 id="words-count">Words Count</h3>
<pre class="language-"><code class="lang-python"><span class="token keyword">from</span> pyspark <span class="token keyword">import</span> SparkContext<span class="token punctuation">,</span>SparkConf
conf <span class="token operator">=</span> SparkConf<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span>setAppName<span class="token punctuation">(</span><span class="token string">"tutorial"</span><span class="token punctuation">)</span><span class="token punctuation">.</span>setMaster<span class="token punctuation">(</span><span class="token string">"local[*]"</span><span class="token punctuation">)</span>
sc<span class="token operator">=</span>SparkContext<span class="token punctuation">(</span>conf<span class="token operator">=</span>conf<span class="token punctuation">)</span>
rdd1 <span class="token operator">=</span> sc<span class="token punctuation">.</span>textFile<span class="token punctuation">(</span><span class="token string">"words.txt"</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span>rdd1<span class="token punctuation">.</span>getNumPartitions<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token comment"># 获取RDD分区数</span>
rdd2 <span class="token operator">=</span> rdd1<span class="token punctuation">.</span>flatMap<span class="token punctuation">(</span><span class="token keyword">lambda</span> x<span class="token punctuation">:</span>x<span class="token punctuation">.</span>split<span class="token punctuation">(</span><span class="token string">' '</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
rdd3 <span class="token operator">=</span> rdd2<span class="token punctuation">.</span><span class="token builtin">map</span><span class="token punctuation">(</span><span class="token keyword">lambda</span> x<span class="token punctuation">:</span><span class="token punctuation">(</span>x<span class="token punctuation">,</span> <span class="token number">1</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
rdd4 <span class="token operator">=</span> rdd3<span class="token punctuation">.</span>reduceByKey<span class="token punctuation">(</span><span class="token keyword">lambda</span> a<span class="token punctuation">,</span> b<span class="token punctuation">:</span> a<span class="token operator">+</span>b<span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span>rdd4<span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
</code></pre>
<pre class="language-"><code>2
[('Hello', 3), ('world', 1), ('Spark', 2), ('Flink', 1), ('and', 1)]
</code></pre><h3 id="算子">算子</h3>
<h4 id="transformation算子">Transformation算子</h4>
<p>转换算子。输入为RDD,返回值也为RDD的算子。Transformation算子是懒加载(Lazy加载),也就是说:没有Action算子的话Transformation算子不会运行。</p>
<pre class="language-"><code class="lang-python"><span class="token comment"># Transformation算子</span>
<span class="token comment"># map算子:将RDD中的每一条数据按照传入的逻辑进行处理,返回新的RDD</span>
rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token punctuation">[</span><span class="token number">1</span><span class="token punctuation">,</span><span class="token number">2</span><span class="token punctuation">,</span><span class="token number">3</span><span class="token punctuation">,</span><span class="token number">4</span><span class="token punctuation">,</span><span class="token number">5</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"map: "</span><span class="token punctuation">,</span> rdd<span class="token punctuation">.</span><span class="token builtin">map</span><span class="token punctuation">(</span><span class="token keyword">lambda</span> x<span class="token punctuation">:</span> x<span class="token operator">*</span><span class="token number">10</span><span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token comment"># flatMap算子:先对RDD执行map操作,然后再进行解除嵌套操作</span>
rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token punctuation">[</span><span class="token punctuation">[</span><span class="token number">1</span><span class="token punctuation">,</span><span class="token number">2</span><span class="token punctuation">,</span><span class="token number">3</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token punctuation">[</span><span class="token number">4</span><span class="token punctuation">,</span><span class="token number">5</span><span class="token punctuation">,</span><span class="token number">6</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token punctuation">[</span><span class="token number">7</span><span class="token punctuation">,</span><span class="token number">8</span><span class="token punctuation">,</span><span class="token number">9</span><span class="token punctuation">]</span><span class="token punctuation">]</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"flatMap: "</span><span class="token punctuation">,</span> rdd<span class="token punctuation">.</span><span class="token builtin">map</span><span class="token punctuation">(</span><span class="token keyword">lambda</span> x<span class="token punctuation">:</span> x<span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"flatMap: "</span><span class="token punctuation">,</span> rdd<span class="token punctuation">.</span>flatMap<span class="token punctuation">(</span><span class="token keyword">lambda</span> x<span class="token punctuation">:</span> x<span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"flatMap: "</span><span class="token punctuation">,</span> rdd<span class="token punctuation">.</span>flatMap<span class="token punctuation">(</span><span class="token keyword">lambda</span> x<span class="token punctuation">:</span> <span class="token punctuation">[</span>i<span class="token operator">*</span><span class="token number">10</span> <span class="token keyword">for</span> i <span class="token keyword">in</span> x<span class="token punctuation">]</span><span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token comment"># mapValues算子:对KV型RDD中的Value执行map操作</span>
rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token punctuation">[</span><span class="token punctuation">(</span><span class="token string">'a'</span><span class="token punctuation">,</span> <span class="token number">1</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'a'</span><span class="token punctuation">,</span> <span class="token number">2</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'a'</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'b'</span><span class="token punctuation">,</span> <span class="token number">1</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'b'</span><span class="token punctuation">,</span> <span class="token number">2</span><span class="token punctuation">)</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"mapValue: "</span><span class="token punctuation">,</span> rdd<span class="token punctuation">.</span>mapValues<span class="token punctuation">(</span><span class="token keyword">lambda</span> x<span class="token punctuation">:</span>x<span class="token operator">*</span><span class="token number">10</span><span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token comment"># reduceByKey算子:针对KV型RDD,自动按照key分组,然后根据提供的聚合逻辑,完成组内数据(KV中的value)的聚合操作</span>
rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token punctuation">[</span><span class="token punctuation">(</span><span class="token string">'a'</span><span class="token punctuation">,</span> <span class="token number">1</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'a'</span><span class="token punctuation">,</span> <span class="token number">2</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'a'</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'b'</span><span class="token punctuation">,</span> <span class="token number">1</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'b'</span><span class="token punctuation">,</span> <span class="token number">2</span><span class="token punctuation">)</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"reduceByKey: "</span><span class="token punctuation">,</span> rdd<span class="token punctuation">.</span>reduceByKey<span class="token punctuation">(</span><span class="token keyword">lambda</span> x<span class="token punctuation">,</span>y<span class="token punctuation">:</span> x<span class="token operator">+</span>y<span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token comment"># groupBy算子:对RDD数据进行分组</span>
rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token punctuation">[</span><span class="token number">1</span><span class="token punctuation">,</span><span class="token number">2</span><span class="token punctuation">,</span><span class="token number">3</span><span class="token punctuation">,</span><span class="token number">4</span><span class="token punctuation">,</span><span class="token number">5</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
rdd2 <span class="token operator">=</span> rdd<span class="token punctuation">.</span>groupBy<span class="token punctuation">(</span><span class="token keyword">lambda</span> x<span class="token punctuation">:</span> <span class="token string">'even'</span> <span class="token keyword">if</span> <span class="token punctuation">(</span>x<span class="token operator">%</span><span class="token number">2</span> <span class="token operator">==</span> <span class="token number">0</span><span class="token punctuation">)</span> <span class="token keyword">else</span> <span class="token string">'odd'</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"groupBy: "</span><span class="token punctuation">,</span> rdd2<span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"groupBy: "</span><span class="token punctuation">,</span> rdd2<span class="token punctuation">.</span><span class="token builtin">map</span><span class="token punctuation">(</span><span class="token keyword">lambda</span> x<span class="token punctuation">:</span> <span class="token punctuation">(</span>x<span class="token punctuation">[</span><span class="token number">0</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token builtin">list</span><span class="token punctuation">(</span>x<span class="token punctuation">[</span><span class="token number">1</span><span class="token punctuation">]</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token comment"># groupByKey算子:针对KV型RDD,自动按照key分组</span>
rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token punctuation">[</span><span class="token punctuation">(</span><span class="token string">'a'</span><span class="token punctuation">,</span> <span class="token number">1</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'a'</span><span class="token punctuation">,</span> <span class="token number">2</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'a'</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'b'</span><span class="token punctuation">,</span> <span class="token number">1</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'b'</span><span class="token punctuation">,</span> <span class="token number">2</span><span class="token punctuation">)</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
rdd2 <span class="token operator">=</span> rdd<span class="token punctuation">.</span>groupByKey<span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"groupByKey: "</span><span class="token punctuation">,</span> rdd2<span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"groupByKey: "</span><span class="token punctuation">,</span> rdd2<span class="token punctuation">.</span><span class="token builtin">map</span><span class="token punctuation">(</span><span class="token keyword">lambda</span> x<span class="token punctuation">:</span> <span class="token punctuation">(</span>x<span class="token punctuation">[</span><span class="token number">0</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token builtin">list</span><span class="token punctuation">(</span>x<span class="token punctuation">[</span><span class="token number">1</span><span class="token punctuation">]</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token comment"># filter算子:过滤不想要的数据,保留想要的数据</span>
rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token punctuation">[</span><span class="token number">1</span><span class="token punctuation">,</span><span class="token number">2</span><span class="token punctuation">,</span><span class="token number">3</span><span class="token punctuation">,</span><span class="token number">4</span><span class="token punctuation">,</span><span class="token number">5</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"filter: "</span><span class="token punctuation">,</span> rdd<span class="token punctuation">.</span><span class="token builtin">filter</span><span class="token punctuation">(</span><span class="token keyword">lambda</span> x<span class="token punctuation">:</span> x<span class="token operator">%</span><span class="token number">2</span> <span class="token operator">==</span> <span class="token number">1</span><span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token comment"># 过滤偶数,保留奇数</span>
<span class="token comment"># distinct算子:对RDD进行去重</span>
rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token punctuation">[</span><span class="token number">1</span><span class="token punctuation">,</span><span class="token number">1</span><span class="token punctuation">,</span><span class="token number">2</span><span class="token punctuation">,</span><span class="token number">2</span><span class="token punctuation">,</span><span class="token number">2</span><span class="token punctuation">,</span><span class="token number">3</span><span class="token punctuation">,</span><span class="token number">3</span><span class="token punctuation">,</span><span class="token number">3</span><span class="token punctuation">,</span><span class="token number">4</span><span class="token punctuation">,</span><span class="token number">5</span><span class="token punctuation">,</span><span class="token number">5</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"distinct: "</span><span class="token punctuation">,</span> rdd<span class="token punctuation">.</span>distinct<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token comment"># glom算子:将RDD的数据加上嵌套,这个嵌套按照分区来进行</span>
rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token punctuation">[</span><span class="token number">1</span><span class="token punctuation">,</span><span class="token number">2</span><span class="token punctuation">,</span><span class="token number">3</span><span class="token punctuation">,</span><span class="token number">4</span><span class="token punctuation">,</span><span class="token number">5</span><span class="token punctuation">,</span><span class="token number">6</span><span class="token punctuation">,</span><span class="token number">7</span><span class="token punctuation">,</span><span class="token number">8</span><span class="token punctuation">,</span><span class="token number">9</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"glom: "</span><span class="token punctuation">,</span> rdd<span class="token punctuation">.</span>glom<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token comment"># sortBy算子:对RDD数据进行排序。numPartitions参数如果设置的是大于1,生产环境则会导致结果只是分区内有序</span>
rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token punctuation">[</span><span class="token punctuation">(</span><span class="token string">'c'</span><span class="token punctuation">,</span> <span class="token number">1</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'d'</span><span class="token punctuation">,</span> <span class="token number">5</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'a'</span><span class="token punctuation">,</span> <span class="token number">7</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'b'</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'f'</span><span class="token punctuation">,</span> <span class="token number">2</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'a'</span><span class="token punctuation">,</span> <span class="token number">2</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'c'</span><span class="token punctuation">,</span> <span class="token number">5</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'b'</span><span class="token punctuation">,</span> <span class="token number">1</span><span class="token punctuation">)</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"sortBy: "</span><span class="token punctuation">,</span> rdd<span class="token punctuation">.</span>sortBy<span class="token punctuation">(</span><span class="token keyword">lambda</span> x<span class="token punctuation">:</span>x<span class="token punctuation">[</span><span class="token number">1</span><span class="token punctuation">]</span><span class="token punctuation">,</span> ascending<span class="token operator">=</span><span class="token boolean">False</span><span class="token punctuation">,</span> numPartitions<span class="token operator">=</span><span class="token number">1</span><span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token comment"># sortByKey算子:针对KV型RDD,按照key进行排序。如果需要全局有序,numPartitions需要设置为1</span>
rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token punctuation">[</span><span class="token punctuation">(</span><span class="token string">'a'</span><span class="token punctuation">,</span> <span class="token number">6</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'c'</span><span class="token punctuation">,</span> <span class="token number">2</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'A'</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'A'</span><span class="token punctuation">,</span> <span class="token number">1</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'d'</span><span class="token punctuation">,</span> <span class="token number">5</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'f'</span><span class="token punctuation">,</span> <span class="token number">4</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
<span class="token punctuation">(</span><span class="token string">'B'</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'b'</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'B'</span><span class="token punctuation">,</span> <span class="token number">2</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'e'</span><span class="token punctuation">,</span> <span class="token number">2</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'E'</span><span class="token punctuation">,</span> <span class="token number">5</span><span class="token punctuation">)</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"sortByKey: "</span><span class="token punctuation">,</span> rdd<span class="token punctuation">.</span>sortByKey<span class="token punctuation">(</span>ascending<span class="token operator">=</span><span class="token boolean">False</span><span class="token punctuation">,</span> numPartitions<span class="token operator">=</span><span class="token number">1</span><span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"sortByKey: "</span><span class="token punctuation">,</span> rdd<span class="token punctuation">.</span>sortByKey<span class="token punctuation">(</span>ascending<span class="token operator">=</span><span class="token boolean">False</span><span class="token punctuation">,</span> numPartitions<span class="token operator">=</span><span class="token number">1</span><span class="token punctuation">,</span> keyfunc<span class="token operator">=</span><span class="token keyword">lambda</span> key<span class="token punctuation">:</span> <span class="token builtin">str</span><span class="token punctuation">(</span>key<span class="token punctuation">)</span><span class="token punctuation">.</span>lower<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token comment"># union算子:将2个RDD合并成1个RDD</span>
rdd1 <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token punctuation">[</span><span class="token number">1</span><span class="token punctuation">,</span><span class="token number">1</span><span class="token punctuation">,</span><span class="token number">3</span><span class="token punctuation">,</span><span class="token number">3</span><span class="token punctuation">,</span><span class="token number">5</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
rdd2 <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token punctuation">[</span><span class="token string">'a'</span><span class="token punctuation">,</span><span class="token string">'a'</span><span class="token punctuation">,</span><span class="token string">'c'</span><span class="token punctuation">,</span><span class="token string">'c'</span><span class="token punctuation">,</span><span class="token string">'d'</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"union: "</span><span class="token punctuation">,</span> rdd1<span class="token punctuation">.</span>union<span class="token punctuation">(</span>rdd2<span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token comment"># join算子:对两个KV型的RDD执行类似SQL语句中的JOIN操作</span>
rdd1 <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token punctuation">[</span><span class="token punctuation">(</span><span class="token string">'a'</span><span class="token punctuation">,</span> <span class="token number">1</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'b'</span><span class="token punctuation">,</span> <span class="token number">2</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'c'</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
rdd2 <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token punctuation">[</span><span class="token punctuation">(</span><span class="token string">'b'</span><span class="token punctuation">,</span> <span class="token string">'X'</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'c'</span><span class="token punctuation">,</span> <span class="token string">'Y'</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'d'</span><span class="token punctuation">,</span> <span class="token string">'Z'</span><span class="token punctuation">)</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"join: "</span><span class="token punctuation">,</span> rdd1<span class="token punctuation">.</span>join<span class="token punctuation">(</span>rdd2<span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"leftOuterJoin: "</span><span class="token punctuation">,</span> rdd1<span class="token punctuation">.</span>leftOuterJoin<span class="token punctuation">(</span>rdd2<span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"rightOuterJoin: "</span><span class="token punctuation">,</span> rdd1<span class="token punctuation">.</span>rightOuterJoin<span class="token punctuation">(</span>rdd2<span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token comment"># intersection算子:求2个RDD的交集</span>
rdd1 <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token punctuation">[</span><span class="token number">1</span><span class="token punctuation">,</span><span class="token number">2</span><span class="token punctuation">,</span><span class="token number">3</span><span class="token punctuation">,</span><span class="token number">4</span><span class="token punctuation">,</span><span class="token number">5</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
rdd2 <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token punctuation">[</span><span class="token number">2</span><span class="token punctuation">,</span><span class="token number">3</span><span class="token punctuation">,</span><span class="token number">4</span><span class="token punctuation">,</span><span class="token number">5</span><span class="token punctuation">,</span><span class="token number">6</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"intersection: "</span><span class="token punctuation">,</span> rdd1<span class="token punctuation">.</span>intersection<span class="token punctuation">(</span>rdd2<span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
</code></pre>
<pre class="language-"><code>map: [10, 20, 30, 40, 50]
flatMap: [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
flatMap: [1, 2, 3, 4, 5, 6, 7, 8, 9]
flatMap: [10, 20, 30, 40, 50, 60, 70, 80, 90]
mapValue: [('a', 10), ('a', 20), ('a', 30), ('b', 10), ('b', 20)]
reduceByKey: [('b', 3), ('a', 6)]
groupBy: [('even', <span class="token tag"><span class="token tag"><span class="token punctuation"><</span>pyspark.resultiterable.ResultIterable</span> <span class="token attr-name">object</span> <span class="token attr-name">at</span> <span class="token attr-name">0x7f8c2b1d3e50</span><span class="token punctuation">></span></span>), ('odd', <span class="token tag"><span class="token tag"><span class="token punctuation"><</span>pyspark.resultiterable.ResultIterable</span> <span class="token attr-name">object</span> <span class="token attr-name">at</span> <span class="token attr-name">0x7f8c2b1e9400</span><span class="token punctuation">></span></span>)]
groupBy: [('even', [2, 4]), ('odd', [1, 3, 5])]
groupByKey: [('b', <span class="token tag"><span class="token tag"><span class="token punctuation"><</span>pyspark.resultiterable.ResultIterable</span> <span class="token attr-name">object</span> <span class="token attr-name">at</span> <span class="token attr-name">0x7f8c2b1d39d0</span><span class="token punctuation">></span></span>), ('a', <span class="token tag"><span class="token tag"><span class="token punctuation"><</span>pyspark.resultiterable.ResultIterable</span> <span class="token attr-name">object</span> <span class="token attr-name">at</span> <span class="token attr-name">0x7f8c2b1e9670</span><span class="token punctuation">></span></span>)]
groupByKey: [('b', [1, 2]), ('a', [1, 2, 3])]
filter: [1, 3, 5]
distinct: [3, 1, 4, 2, 5]
glom: [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
sortBy: [('a', 7), ('d', 5), ('c', 5), ('b', 3), ('f', 2), ('a', 2), ('c', 1), ('b', 1)]
sortByKey: [('f', 4), ('e', 2), ('d', 5), ('c', 2), ('b', 3), ('a', 6), ('E', 5), ('B', 3), ('B', 2), ('A', 3), ('A', 1)]
sortByKey: [('f', 4), ('e', 2), ('E', 5), ('d', 5), ('c', 2), ('B', 3), ('b', 3), ('B', 2), ('a', 6), ('A', 3), ('A', 1)]
union: [1, 1, 3, 3, 5, 'a', 'a', 'c', 'c', 'd']
join: [('c', (3, 'Y')), ('b', (2, 'X'))]
leftOuterJoin: [('c', (3, 'Y')), ('b', (2, 'X')), ('a', (1, None))]
rightOuterJoin: [('d', (None, 'Z')), ('c', (3, 'Y')), ('b', (2, 'X'))]
intersection: [2, 3, 4, 5]
</code></pre><ul>
<li><p><code>groupByKey+聚合</code>性能比单独使用<code>reduceByKey</code>要差一些,因为<code>reduceByKey</code>会在分组之前先进行一次聚合,再分组的话网络IO就少很多了。</p>
<ul>
<li><p>如下图所示为<code>groupByKey+聚合</code>:</p>
<p><img src="pics/Spark/groupByKey.png" alt="image-20230130212700548" style="zoom:40%;"></p>
</li>
<li><p>如下图所示为<code>reduceByKey</code>:</p>
<p><img src="pics/Spark/reduceByKey.png" alt="image-20230130212746065" style="zoom:40%;"></p>
</li>
</ul>
</li>
</ul>
<h4 id="action算子">Action算子</h4>
<p>动作(行动)算子。返回值不是RDD的算子。</p>
<p><img src="pics/Spark/TransformationAndAction.png" alt="image-20230130153021516" style="zoom:80%;"></p>
<pre class="language-"><code class="lang-python"><span class="token comment"># Action算子</span>
<span class="token comment"># collect算子:将RDD各个分区内的数据,统一收集到Driver中,形成一个List</span>
<span class="token comment"># 用collect算子结果数据集不能太大,以免将Driver内存撑爆</span>
rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token builtin">range</span><span class="token punctuation">(</span><span class="token number">1</span><span class="token punctuation">,</span> <span class="token number">10</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"collect: "</span><span class="token punctuation">,</span> rdd<span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token comment"># reduce算子:对RDD算子按照传入的逻辑进行聚合</span>
rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token builtin">range</span><span class="token punctuation">(</span><span class="token number">1</span><span class="token punctuation">,</span> <span class="token number">10</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"reduce: "</span><span class="token punctuation">,</span> rdd<span class="token punctuation">.</span><span class="token builtin">reduce</span><span class="token punctuation">(</span><span class="token keyword">lambda</span> a<span class="token punctuation">,</span>b<span class="token punctuation">:</span> a<span class="token operator">+</span>b<span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token comment"># fold算子:也是RDD根据传入逻辑进行聚合</span>
<span class="token comment"># 但是,各个分区先内部聚合且带上初始值,再做分区间的聚合</span>
<span class="token comment"># 分区间的聚合也会带上初始值</span>
rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token builtin">range</span><span class="token punctuation">(</span><span class="token number">1</span><span class="token punctuation">,</span> <span class="token number">10</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"fold: "</span><span class="token punctuation">,</span> rdd<span class="token punctuation">.</span>fold<span class="token punctuation">(</span><span class="token number">10</span><span class="token punctuation">,</span> <span class="token keyword">lambda</span> a<span class="token punctuation">,</span>b<span class="token punctuation">:</span> a<span class="token operator">+</span>b<span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token comment"># first算子:取出RDD的第一个元素</span>
rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token builtin">range</span><span class="token punctuation">(</span><span class="token number">1</span><span class="token punctuation">,</span> <span class="token number">10</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"first: "</span><span class="token punctuation">,</span> rdd<span class="token punctuation">.</span>first<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token comment"># take算子:取RDD的前n个元素,组合成list返回</span>
<span class="token comment"># 预期结果集需要比较小,因为所有数据都会load进内存</span>
rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token builtin">range</span><span class="token punctuation">(</span><span class="token number">1</span><span class="token punctuation">,</span> <span class="token number">10</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"take: "</span><span class="token punctuation">,</span> rdd<span class="token punctuation">.</span>take<span class="token punctuation">(</span><span class="token number">5</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token comment"># top算子:对RDD进行降序排列,取前n个组成List返回</span>
<span class="token comment"># 预期结果集需要比较小,因为所有数据都会load进内存</span>
rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token builtin">range</span><span class="token punctuation">(</span><span class="token number">1</span><span class="token punctuation">,</span> <span class="token number">10</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"top: "</span><span class="token punctuation">,</span> rdd<span class="token punctuation">.</span>top<span class="token punctuation">(</span><span class="token number">5</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token comment"># takeOrdered算子:对RDD进行升序排列或者按照给定逻辑排序,取前n个</span>
<span class="token comment"># 预期结果集需要比较小,因为所有数据都会load进内存</span>
rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token punctuation">[</span><span class="token number">3</span><span class="token punctuation">,</span><span class="token number">5</span><span class="token punctuation">,</span><span class="token number">2</span><span class="token punctuation">,</span><span class="token number">1</span><span class="token punctuation">,</span><span class="token number">6</span><span class="token punctuation">,</span><span class="token number">0</span><span class="token punctuation">,</span><span class="token number">8</span><span class="token punctuation">,</span><span class="token number">1</span><span class="token punctuation">,</span><span class="token number">4</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"takeOrdered: "</span><span class="token punctuation">,</span> rdd<span class="token punctuation">.</span>takeOrdered<span class="token punctuation">(</span><span class="token number">5</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"takeOrdered: "</span><span class="token punctuation">,</span> rdd<span class="token punctuation">.</span>takeOrdered<span class="token punctuation">(</span><span class="token number">5</span><span class="token punctuation">,</span> <span class="token keyword">lambda</span> x<span class="token punctuation">:</span> <span class="token operator">-</span>x<span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token comment"># count算子:计算RDD有多少条数据,返回条数的数值</span>
rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token builtin">range</span><span class="token punctuation">(</span><span class="token number">1</span><span class="token punctuation">,</span> <span class="token number">10</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"count: "</span><span class="token punctuation">,</span> rdd<span class="token punctuation">.</span>count<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token comment"># takeSample算子:随机抽样RDD的数据</span>
<span class="token comment"># withReplacement参数:True表示允许取同一位置的数据。注意:只是位置是否相同,不代表数值是否相同。</span>
<span class="token comment"># num参数:抽样个数,seed参数:随机数种子</span>
<span class="token comment"># 预期结果集需要比较小,因为所有数据都会load进内存</span>
rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token builtin">range</span><span class="token punctuation">(</span><span class="token number">1</span><span class="token punctuation">,</span> <span class="token number">10</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"takeSample: "</span><span class="token punctuation">,</span> rdd<span class="token punctuation">.</span>takeSample<span class="token punctuation">(</span><span class="token boolean">True</span><span class="token punctuation">,</span> <span class="token number">5</span><span class="token punctuation">,</span> seed<span class="token operator">=</span><span class="token number">16</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token comment"># foreach算子:对RDD的每个元素执行提供的逻辑(类似map)</span>
<span class="token comment"># 但这个方法没有返回值。如果需要输出,则需要放到foreach的输入参数func中</span>
<span class="token comment"># 输出会输出到各Executor的标准输出中</span>
rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token builtin">range</span><span class="token punctuation">(</span><span class="token number">1</span><span class="token punctuation">,</span> <span class="token number">10</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
r <span class="token operator">=</span> rdd<span class="token punctuation">.</span>foreach<span class="token punctuation">(</span><span class="token keyword">lambda</span> x<span class="token punctuation">:</span> <span class="token keyword">print</span><span class="token punctuation">(</span>x<span class="token operator">*</span><span class="token number">10</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"foreach: "</span><span class="token punctuation">,</span> r<span class="token punctuation">)</span>
<span class="token comment"># saveAsTextFile算子:将RDD的数据写入文件系统中</span>
<span class="token comment"># 支持本地写入,也支持HDFS等文件系统</span>
<span class="token comment"># 各个分区所在的Executor分别写入到目标文件系统中,有几个分区就会产生几个文件</span>
rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token punctuation">[</span><span class="token number">1</span><span class="token punctuation">,</span><span class="token number">2</span><span class="token punctuation">,</span><span class="token number">3</span><span class="token punctuation">,</span><span class="token number">4</span><span class="token punctuation">,</span><span class="token number">5</span><span class="token punctuation">,</span><span class="token number">6</span><span class="token punctuation">,</span><span class="token number">7</span><span class="token punctuation">,</span><span class="token number">8</span><span class="token punctuation">,</span><span class="token number">9</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
rdd<span class="token punctuation">.</span>saveAsTextFile<span class="token punctuation">(</span><span class="token string">"RDDTest"</span><span class="token punctuation">)</span>
</code></pre>
<pre class="language-"><code>collect: [1, 2, 3, 4, 5, 6, 7, 8, 9]
reduce: 45
fold: 85
first: 1
take: [1, 2, 3, 4, 5]
top: [9, 8, 7, 6, 5]
takeOrdered: [0, 1, 1, 2, 3]
takeOrdered: [8, 6, 5, 4, 3]
count: 9
takeSample: [4, 2, 6, 9, 6]
foreach: None
</code></pre><ul>
<li><code>foreach</code>和<code>saveAsTextFile</code>两个算子的执行是跳过了Driver,直接由分区所在的Executor来执行。其它的Action算子都是会将结果发送给Driver。</li>
</ul>
<h4 id="分区操作算子">分区操作算子</h4>
<h5 id="mappartitions算子"><code>mapPartitions</code>算子</h5>
<p>功能和<code>map</code>算子一样都是Transformation算子。但<code>map</code>算子计算一次只传递一个数据,而<code>mapPartitions</code>算子计算时以迭代器对象的形式一次传递一整个分区的数据。理论上,<code>mapPartitions</code>算子因为避免了多次数据传输,性能上应该会更好一些。但实际中还是需要具体问题具体分析。</p>
<pre class="language-"><code class="lang-python"><span class="token comment"># mapPartitions算子:类似map,但一次传递一整个分区的数据</span>
rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token punctuation">[</span><span class="token number">1</span><span class="token punctuation">,</span><span class="token number">2</span><span class="token punctuation">,</span><span class="token number">3</span><span class="token punctuation">,</span><span class="token number">4</span><span class="token punctuation">,</span><span class="token number">5</span><span class="token punctuation">,</span><span class="token number">6</span><span class="token punctuation">,</span><span class="token number">7</span><span class="token punctuation">,</span><span class="token number">8</span><span class="token punctuation">,</span><span class="token number">9</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
<span class="token keyword">def</span> <span class="token function">func</span><span class="token punctuation">(</span><span class="token builtin">iter</span><span class="token punctuation">)</span><span class="token punctuation">:</span>
result <span class="token operator">=</span> <span class="token builtin">list</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token keyword">for</span> it <span class="token keyword">in</span> <span class="token builtin">iter</span><span class="token punctuation">:</span>
result<span class="token punctuation">.</span>append<span class="token punctuation">(</span>it <span class="token operator">*</span> <span class="token number">10</span><span class="token punctuation">)</span>
<span class="token keyword">return</span> result
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"mapPartitions: "</span><span class="token punctuation">,</span> rdd<span class="token punctuation">.</span>mapPartitions<span class="token punctuation">(</span>func<span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
</code></pre>
<pre class="language-"><code>mapPartitions: [10, 20, 30, 40, 50, 60, 70, 80, 90]
</code></pre><h5 id="foreachpartition算子"><code>foreachPartition</code>算子</h5>
<p>Action算子,功能和foreach一致,只是一次处理一整个分区的数据。没有返回值。</p>
<pre class="language-"><code class="lang-python"><span class="token comment"># foreachPartition算子:Action算子,类似foreach,但一次传递一整个分区的数据</span>
rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token punctuation">[</span><span class="token number">1</span><span class="token punctuation">,</span><span class="token number">2</span><span class="token punctuation">,</span><span class="token number">3</span><span class="token punctuation">,</span><span class="token number">4</span><span class="token punctuation">,</span><span class="token number">5</span><span class="token punctuation">,</span><span class="token number">6</span><span class="token punctuation">,</span><span class="token number">7</span><span class="token punctuation">,</span><span class="token number">8</span><span class="token punctuation">,</span><span class="token number">9</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
<span class="token keyword">def</span> <span class="token function">func</span><span class="token punctuation">(</span><span class="token builtin">iter</span><span class="token punctuation">)</span><span class="token punctuation">:</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"---"</span><span class="token punctuation">)</span>
result <span class="token operator">=</span> <span class="token builtin">list</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token keyword">for</span> it <span class="token keyword">in</span> <span class="token builtin">iter</span><span class="token punctuation">:</span>
result<span class="token punctuation">.</span>append<span class="token punctuation">(</span>it <span class="token operator">*</span> <span class="token number">10</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span>result<span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"foreachPartition: "</span><span class="token punctuation">,</span> rdd<span class="token punctuation">.</span>foreachPartition<span class="token punctuation">(</span>func<span class="token punctuation">)</span><span class="token punctuation">)</span>
</code></pre>
<pre class="language-"><code>foreachPartition: None
---
[40, 50, 60]
---
[70, 80, 90]
---
[10, 20, 30]
</code></pre><h5 id="partitionby算子"><code>partitionBy</code>算子</h5>
<p>Transformation算子。能够对KV型RDD算子针对K进行自定义分区操作。</p>
<pre class="language-"><code class="lang-python"><span class="token comment"># partitionBy算子:Transformation算子,能够对KV型RDD算子针对K进行自定义分区操作</span>
rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token punctuation">[</span><span class="token punctuation">(</span><span class="token number">1</span><span class="token punctuation">,</span><span class="token string">'a'</span><span class="token punctuation">)</span><span class="token punctuation">,</span><span class="token punctuation">(</span><span class="token number">2</span><span class="token punctuation">,</span><span class="token string">'b'</span><span class="token punctuation">)</span><span class="token punctuation">,</span><span class="token punctuation">(</span><span class="token number">3</span><span class="token punctuation">,</span><span class="token string">'c'</span><span class="token punctuation">)</span><span class="token punctuation">,</span><span class="token punctuation">(</span><span class="token number">4</span><span class="token punctuation">,</span><span class="token string">'d'</span><span class="token punctuation">)</span><span class="token punctuation">,</span><span class="token punctuation">(</span><span class="token number">5</span><span class="token punctuation">,</span><span class="token string">'e'</span><span class="token punctuation">)</span><span class="token punctuation">]</span><span class="token punctuation">)</span>
rdd2 <span class="token operator">=</span> rdd<span class="token punctuation">.</span>partitionBy<span class="token punctuation">(</span><span class="token number">2</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"partitionBy: "</span><span class="token punctuation">,</span> rdd2<span class="token punctuation">.</span>glom<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
rdd3 <span class="token operator">=</span> rdd<span class="token punctuation">.</span>partitionBy<span class="token punctuation">(</span><span class="token number">2</span><span class="token punctuation">,</span> <span class="token keyword">lambda</span> x<span class="token punctuation">:</span> <span class="token number">0</span> <span class="token keyword">if</span> x <span class="token operator">></span> <span class="token number">5</span> <span class="token keyword">else</span> <span class="token number">1</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"partitionBy: "</span><span class="token punctuation">,</span> rdd3<span class="token punctuation">.</span>glom<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
</code></pre>
<pre class="language-"><code>partitionBy: [[(2, 'b'), (4, 'd')], [(1, 'a'), (3, 'c'), (5, 'e')]]
partitionBy: [[(4, 'd'), (5, 'e')], [(1, 'a'), (2, 'b'), (3, 'c')]]
</code></pre><h5 id="repartition算子"><code>repartition</code>算子</h5>
<p>Transformation算子。只传入一个分区数量n,对RDD进行重新分区。</p>
<p>在实际生产环境中,对分区数量进行操作一定要慎重。因为分区数量变化会影响并行计算(内存迭代的并行管道数量),如果分区数量增加,很可能导致shuffle。</p>
<p>由于<code>repartition</code>算子的实现,在小数据集上,重新分区很可能不太均匀。但在大数据集上不存在这种问题。具体可以参考<code>rdd.py</code>中相关代码(实际在<code>coalesce</code>算子的实现中)。</p>
<pre class="language-"><code class="lang-python"><span class="token comment"># repartition算子:Transformation算子,对RDD进行重分区</span>
<span class="token comment"># 在小数据集下,重分区可能会数据不均匀</span>
rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token builtin">range</span><span class="token punctuation">(</span><span class="token number">10</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"repartition: "</span><span class="token punctuation">,</span> rdd<span class="token punctuation">.</span>repartition<span class="token punctuation">(</span><span class="token number">2</span><span class="token punctuation">)</span><span class="token punctuation">.</span>glom<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"repartition: "</span><span class="token punctuation">,</span> rdd<span class="token punctuation">.</span>repartition<span class="token punctuation">(</span><span class="token number">5</span><span class="token punctuation">)</span><span class="token punctuation">.</span>glom<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token builtin">range</span><span class="token punctuation">(</span><span class="token number">1000</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token number">10</span><span class="token punctuation">)</span>
a <span class="token operator">=</span> rdd<span class="token punctuation">.</span>repartition<span class="token punctuation">(</span><span class="token number">20</span><span class="token punctuation">)</span><span class="token punctuation">.</span>glom<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"repartition lens: "</span><span class="token punctuation">,</span> <span class="token punctuation">[</span><span class="token builtin">len</span><span class="token punctuation">(</span>item<span class="token punctuation">)</span> <span class="token keyword">for</span> item <span class="token keyword">in</span> a<span class="token punctuation">]</span><span class="token punctuation">)</span>
rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token builtin">range</span><span class="token punctuation">(</span><span class="token number">10000</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token number">10</span><span class="token punctuation">)</span>
a <span class="token operator">=</span> rdd<span class="token punctuation">.</span>repartition<span class="token punctuation">(</span><span class="token number">20</span><span class="token punctuation">)</span><span class="token punctuation">.</span>glom<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"repartition lens: "</span><span class="token punctuation">,</span> <span class="token punctuation">[</span><span class="token builtin">len</span><span class="token punctuation">(</span>item<span class="token punctuation">)</span> <span class="token keyword">for</span> item <span class="token keyword">in</span> a<span class="token punctuation">]</span><span class="token punctuation">)</span>
</code></pre>
<pre class="language-"><code>repartition: [[0, 1, 2, 6, 7, 8, 9], [3, 4, 5]]
repartition: [[], [0, 1, 2], [6, 7, 8, 9], [3, 4, 5], []]
repartition lens: [70, 70, 60, 50, 40, 30, 30, 30, 30, 20, 30, 30, 40, 50, 60, 70, 70, 70, 70, 80]
repartition lens: [500, 500, 500, 500, 500, 500, 500, 500, 500, 500, 500, 500, 500, 500, 500, 500, 500, 500, 500, 500]
</code></pre><h5 id="coalesce算子"><code>coalesce</code>算子</h5>
<p>Transformation算子。仍然是对RDD进行重新分区。相比<code>repartition</code>算子,多以个<code>shuffle</code>参数。</p>
<p>如果<code>shuffle=True</code>则跟<code>repartition</code>算子完全一样,具体参考<code>repartition</code>算子在<code>rdd.py</code>中的实现。所以也存在<code>repartition</code>算子在小数据集上划分不均匀的问题。</p>
<p>如果`shuffle=False则只能减少分区,无法增加分区。</p>
<pre class="language-"><code class="lang-python"><span class="token comment"># coalesce算子:Transformation算子,对RDD进行重分区</span>
<span class="token comment"># shuffle参数表示是否要shuffle。如果shuffle=False,则无法增加分区</span>
<span class="token comment"># shuffle=True即为repartition算子</span>
rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token builtin">range</span><span class="token punctuation">(</span><span class="token number">10</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"coalesce: "</span><span class="token punctuation">,</span> rdd<span class="token punctuation">.</span>coalesce<span class="token punctuation">(</span><span class="token number">2</span><span class="token punctuation">,</span> shuffle<span class="token operator">=</span><span class="token boolean">False</span><span class="token punctuation">)</span><span class="token punctuation">.</span>glom<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"coalesce: "</span><span class="token punctuation">,</span> rdd<span class="token punctuation">.</span>coalesce<span class="token punctuation">(</span><span class="token number">5</span><span class="token punctuation">,</span> shuffle<span class="token operator">=</span><span class="token boolean">False</span><span class="token punctuation">)</span><span class="token punctuation">.</span>glom<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"coalesce: "</span><span class="token punctuation">,</span> rdd<span class="token punctuation">.</span>coalesce<span class="token punctuation">(</span><span class="token number">2</span><span class="token punctuation">,</span> shuffle<span class="token operator">=</span><span class="token boolean">True</span><span class="token punctuation">)</span><span class="token punctuation">.</span>glom<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"coalesce: "</span><span class="token punctuation">,</span> rdd<span class="token punctuation">.</span>coalesce<span class="token punctuation">(</span><span class="token number">5</span><span class="token punctuation">,</span> shuffle<span class="token operator">=</span><span class="token boolean">True</span><span class="token punctuation">)</span><span class="token punctuation">.</span>glom<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
</code></pre>
<pre class="language-"><code>coalesce: [[0, 1, 2], [3, 4, 5, 6, 7, 8, 9]]
coalesce: [[0, 1, 2], [3, 4, 5], [6, 7, 8, 9]]
coalesce: [[0, 1, 2, 6, 7, 8, 9], [3, 4, 5]]
coalesce: [[], [0, 1, 2], [6, 7, 8, 9], [3, 4, 5], []]
</code></pre><h3 id="rdd持久化">RDD持久化</h3>
<p>RDD是过程数据,一旦处理完(代码中的Action算子执行完,例如:<code>collect</code>)就会从内存中清理掉;所以如果某个RDD会被后续流程再次使用,这个RDD就会被重新计算。</p>
<p><img src="pics/Spark/RDDRegenerate.png" alt="RDDRegenerate" style="zoom:80%;"></p>
<p>如果需要避免RDD的重复计算,就需要用到RDD持久化的API。</p>
<h4 id="缓存">缓存</h4>
<p>缓存主要使用<code>cache</code>或者<code>persist</code>接口。<code>cache</code>是<code>persist</code>的简化写法,对RDD的缓存<code>cache</code>等价于<code>persist(MEMORY_ONLY)</code>,对于DataFrame的缓存<code>cache</code>等价于<code>persist(MEMORY_AND_DISK)</code>。</p>
<pre class="language-"><code class="lang-python"><span class="token keyword">from</span> pyspark<span class="token punctuation">.</span>storagelevel <span class="token keyword">import</span> StorageLevel
rdd1 <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token punctuation">[</span><span class="token string">'a'</span><span class="token punctuation">,</span><span class="token string">'a'</span><span class="token punctuation">,</span><span class="token string">'a'</span><span class="token punctuation">,</span><span class="token string">'a'</span><span class="token punctuation">,</span><span class="token string">'b'</span><span class="token punctuation">,</span><span class="token string">'b'</span><span class="token punctuation">,</span><span class="token string">'b'</span><span class="token punctuation">,</span><span class="token string">'c'</span><span class="token punctuation">,</span><span class="token string">'c'</span><span class="token punctuation">,</span><span class="token string">'d'</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
rdd2 <span class="token operator">=</span> rdd1<span class="token punctuation">.</span><span class="token builtin">map</span><span class="token punctuation">(</span><span class="token keyword">lambda</span> x<span class="token punctuation">:</span><span class="token punctuation">(</span>x<span class="token punctuation">,</span><span class="token number">1</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
rdd3 <span class="token operator">=</span> rdd2<span class="token punctuation">.</span>reduceByKey<span class="token punctuation">(</span><span class="token keyword">lambda</span> x<span class="token punctuation">,</span>y<span class="token punctuation">:</span> x<span class="token operator">+</span>y<span class="token punctuation">)</span>
rdd3<span class="token punctuation">.</span>cache<span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token comment"># 缓存,persist简化写法</span>
rdd3<span class="token punctuation">.</span>persist<span class="token punctuation">(</span>StorageLevel<span class="token punctuation">.</span>MEMORY_ONLY<span class="token punctuation">)</span> <span class="token comment"># 内存缓存</span>
rdd3<span class="token punctuation">.</span>persist<span class="token punctuation">(</span>StorageLevel<span class="token punctuation">.</span>MEMORY_ONLY_2<span class="token punctuation">)</span> <span class="token comment"># 内存缓存,2副本</span>
rdd3<span class="token punctuation">.</span>persist<span class="token punctuation">(</span>StorageLevel<span class="token punctuation">.</span>DISK_ONLY<span class="token punctuation">)</span> <span class="token comment"># 磁盘缓存</span>
rdd3<span class="token punctuation">.</span>persist<span class="token punctuation">(</span>StorageLevel<span class="token punctuation">.</span>DISK_ONLY_2<span class="token punctuation">)</span> <span class="token comment"># 磁盘缓存,2副本</span>
rdd3<span class="token punctuation">.</span>persist<span class="token punctuation">(</span>StorageLevel<span class="token punctuation">.</span>MEMORY_AND_DISK<span class="token punctuation">)</span> <span class="token comment"># 先存内存,不够存磁盘</span>
rdd3<span class="token punctuation">.</span>persist<span class="token punctuation">(</span>StorageLevel<span class="token punctuation">.</span>MEMORY_AND_DISK_2<span class="token punctuation">)</span> <span class="token comment"># 先存内存,不够存磁盘,2副本</span>
rdd3<span class="token punctuation">.</span>persist<span class="token punctuation">(</span>StorageLevel<span class="token punctuation">.</span>OFF_HEAP<span class="token punctuation">)</span> <span class="token comment"># 堆外内存(系统内存)</span>
rdd4 <span class="token operator">=</span> rdd3<span class="token punctuation">.</span>sortByKey<span class="token punctuation">(</span>ascending<span class="token operator">=</span><span class="token boolean">True</span><span class="token punctuation">,</span> numPartitions<span class="token operator">=</span><span class="token number">1</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span>rdd4<span class="token punctuation">.</span>first<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span>rdd3<span class="token punctuation">.</span>is_cached<span class="token punctuation">)</span> <span class="token comment"># 查看是否被缓存了</span>
rdd5 <span class="token operator">=</span> rdd3<span class="token punctuation">.</span><span class="token builtin">filter</span><span class="token punctuation">(</span><span class="token keyword">lambda</span> x<span class="token punctuation">:</span>x<span class="token punctuation">[</span><span class="token number">1</span><span class="token punctuation">]</span><span class="token operator">></span><span class="token number">1</span><span class="token punctuation">)</span>
rdd6 <span class="token operator">=</span> rdd5<span class="token punctuation">.</span>sortByKey<span class="token punctuation">(</span>ascending<span class="token operator">=</span><span class="token boolean">False</span><span class="token punctuation">,</span> numPartitions<span class="token operator">=</span><span class="token number">1</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span>rdd6<span class="token punctuation">.</span>first<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
rdd3<span class="token punctuation">.</span>unpersist<span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token comment"># 释放缓存</span>
</code></pre>
<p><code>cache</code>和<code>persist</code>这两个操作被认为是设计上不安全的,可能存在数据丢失风险,所以额外<strong>保存了RDD的血缘(依赖)关系</strong>,数据一旦丢失还可以重新计算。</p>
<p>各个分区的数据实际上是被<strong>分散存储</strong>在各自Executor的内存和硬盘上。</p>
<h4 id="checkpoint">CheckPoint</h4>
<p>CheckPoint也是数据持久化技术。它被认为是设计上安全的,它<strong>仅支持硬盘存储</strong>,且<strong>不保留血缘关系</strong>。数据存储前需要指定存储的路径,支持HDFS等分布式文件存储系统,所以属于<strong>集中存储</strong>。Local模式下可以使用本地文件系统。</p>
<pre class="language-"><code class="lang-python">sc<span class="token punctuation">.</span>setCheckpointDir<span class="token punctuation">(</span><span class="token string">'checkpoint'</span><span class="token punctuation">)</span> <span class="token comment"># 设置CheckPoint文件存储路径</span>
<span class="token comment"># sc.setCheckpointDir('hdfs://node1:8020/output/checkpoint')</span>
rdd1 <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token punctuation">[</span><span class="token string">'a'</span><span class="token punctuation">,</span><span class="token string">'a'</span><span class="token punctuation">,</span><span class="token string">'a'</span><span class="token punctuation">,</span><span class="token string">'a'</span><span class="token punctuation">,</span><span class="token string">'b'</span><span class="token punctuation">,</span><span class="token string">'b'</span><span class="token punctuation">,</span><span class="token string">'b'</span><span class="token punctuation">,</span><span class="token string">'c'</span><span class="token punctuation">,</span><span class="token string">'c'</span><span class="token punctuation">,</span><span class="token string">'d'</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
rdd2 <span class="token operator">=</span> rdd1<span class="token punctuation">.</span><span class="token builtin">map</span><span class="token punctuation">(</span><span class="token keyword">lambda</span> x<span class="token punctuation">:</span><span class="token punctuation">(</span>x<span class="token punctuation">,</span><span class="token number">1</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
rdd3 <span class="token operator">=</span> rdd2<span class="token punctuation">.</span>reduceByKey<span class="token punctuation">(</span><span class="token keyword">lambda</span> x<span class="token punctuation">,</span>y<span class="token punctuation">:</span> x<span class="token operator">+</span>y<span class="token punctuation">)</span>
rdd3<span class="token punctuation">.</span>checkpoint<span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token comment"># 保存数据</span>
rdd4 <span class="token operator">=</span> rdd3<span class="token punctuation">.</span>sortByKey<span class="token punctuation">(</span>ascending<span class="token operator">=</span><span class="token boolean">True</span><span class="token punctuation">,</span> numPartitions<span class="token operator">=</span><span class="token number">1</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span>rdd4<span class="token punctuation">.</span>first<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span>rdd3<span class="token punctuation">.</span>isCheckpointed<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token comment"># 查看是否被持久化了</span>
rdd5 <span class="token operator">=</span> rdd3<span class="token punctuation">.</span><span class="token builtin">filter</span><span class="token punctuation">(</span><span class="token keyword">lambda</span> x<span class="token punctuation">:</span>x<span class="token punctuation">[</span><span class="token number">1</span><span class="token punctuation">]</span><span class="token operator">></span><span class="token number">1</span><span class="token punctuation">)</span>
rdd6 <span class="token operator">=</span> rdd5<span class="token punctuation">.</span>sortByKey<span class="token punctuation">(</span>ascending<span class="token operator">=</span><span class="token boolean">False</span><span class="token punctuation">,</span> numPartitions<span class="token operator">=</span><span class="token number">1</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span>rdd6<span class="token punctuation">.</span>first<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
</code></pre>
<h5 id="缓存和checkpoint对比">缓存和CheckPoint对比</h5>
<ul>
<li>CheckPoint不管分区数量多少,风险是一定的;缓存分区越多,风险越高</li>
<li>CheckPoint支持写入HDFS,缓存不行,HDFS是高可靠存储,CheckPoint被认为是安全的</li>
<li>CheckPoint不支持内存,缓存可以,缓存写内存性能比CheckPoint要好一些</li>
<li>CheckPoint不保留血缘关系;缓存会保存血缘关系</li>
</ul>
<p>从任务运行的DAG图中也可以看到不同持久化的差别。左边是无持久化;中间是缓存,用绿点表示;右边直接就显示了checkpoint。</p>
<p><img src="pics/Spark/CheckPointDAG.png" alt="image-20230131172833255" style="zoom:100%;"></p>
<h3 id="共享变量">共享变量</h3>
<h4 id="广播变量">广播变量</h4>
<p>对于broadcast变量,每个Executor进程只会获取一次,Executor进程内不同分区任务共享broadcast数据。如果是普通的本地数据集,Executor进程内不同分区任务会独自请求获取该数据集,导致重复请求和内存资源浪费。</p>
<p>通俗理解就是,spark的Driver将本地的数据封装成broadcast变量,然后广播给所有Executor,Executor将广播的数据提供给自己内部不同分区任务共享。分区任务不再需要各自单独去获取相关数据。</p>
<pre class="language-"><code class="lang-python">broadcast_value <span class="token operator">=</span> <span class="token punctuation">{</span><span class="token string">'a'</span><span class="token punctuation">:</span> <span class="token number">1</span><span class="token punctuation">,</span> <span class="token string">'b'</span><span class="token punctuation">:</span> <span class="token number">2</span><span class="token punctuation">,</span> <span class="token string">'c'</span><span class="token punctuation">:</span> <span class="token number">3</span><span class="token punctuation">}</span>
broadcast <span class="token operator">=</span> sc<span class="token punctuation">.</span>broadcast<span class="token punctuation">(</span>broadcast_value<span class="token punctuation">)</span> <span class="token comment"># 将本地数据标记为广播变量</span>
rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token punctuation">[</span><span class="token punctuation">(</span><span class="token string">'b'</span><span class="token punctuation">,</span> <span class="token string">'X'</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'c'</span><span class="token punctuation">,</span> <span class="token string">'Y'</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">'d'</span><span class="token punctuation">,</span> <span class="token string">'Z'</span><span class="token punctuation">)</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
<span class="token keyword">def</span> <span class="token function">func</span><span class="token punctuation">(</span>item<span class="token punctuation">)</span><span class="token punctuation">:</span>
<span class="token builtin">id</span> <span class="token operator">=</span> item<span class="token punctuation">[</span><span class="token number">0</span><span class="token punctuation">]</span>
value <span class="token operator">=</span> broadcast<span class="token punctuation">.</span>value <span class="token comment"># 获取广播变量内的数据</span>
<span class="token keyword">if</span> <span class="token builtin">id</span> <span class="token keyword">in</span> value<span class="token punctuation">.</span>keys<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">:</span>
<span class="token keyword">return</span> <span class="token punctuation">(</span>item<span class="token punctuation">[</span><span class="token number">0</span><span class="token punctuation">]</span><span class="token punctuation">,</span> item<span class="token punctuation">[</span><span class="token number">1</span><span class="token punctuation">]</span><span class="token punctuation">,</span> value<span class="token punctuation">[</span><span class="token builtin">id</span><span class="token punctuation">]</span><span class="token punctuation">)</span>
<span class="token keyword">else</span><span class="token punctuation">:</span>
<span class="token keyword">return</span> <span class="token punctuation">(</span>item<span class="token punctuation">[</span><span class="token number">0</span><span class="token punctuation">]</span><span class="token punctuation">,</span> item<span class="token punctuation">[</span><span class="token number">1</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token number">0</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span>rdd<span class="token punctuation">.</span><span class="token builtin">map</span><span class="token punctuation">(</span>func<span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
</code></pre>
<pre class="language-"><code>[('b', 'X', 2), ('c', 'Y', 3), ('d', 'Z', 0)]
</code></pre><h4 id="累加器">累加器</h4>
<p>考虑如下代码,本意是希望利用全局变量<code>count</code>做全局计数累加,但最终结果为0。因为<code>count</code>只是Driver中的本地变量,即使Executor中的分区任务获取到了<code>count</code>变量,也只能是在各自分区中累加。最后一句<code>print</code>打印的是Driver中的<code>count</code>值。</p>
<pre class="language-"><code class="lang-python">rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token punctuation">[</span><span class="token number">1</span><span class="token punctuation">,</span><span class="token number">2</span><span class="token punctuation">,</span><span class="token number">3</span><span class="token punctuation">,</span><span class="token number">4</span><span class="token punctuation">,</span><span class="token number">5</span><span class="token punctuation">,</span><span class="token number">6</span><span class="token punctuation">,</span><span class="token number">7</span><span class="token punctuation">,</span><span class="token number">8</span><span class="token punctuation">,</span><span class="token number">9</span><span class="token punctuation">,</span><span class="token number">10</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
count <span class="token operator">=</span> <span class="token number">0</span>
<span class="token keyword">def</span> <span class="token function">func</span><span class="token punctuation">(</span>item<span class="token punctuation">)</span><span class="token punctuation">:</span>
<span class="token keyword">global</span> count
count <span class="token operator">+=</span> <span class="token number">1</span>
<span class="token keyword">print</span><span class="token punctuation">(</span>count<span class="token punctuation">)</span>
<span class="token keyword">return</span> item
rdd<span class="token punctuation">.</span><span class="token builtin">map</span><span class="token punctuation">(</span>func<span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"count: "</span><span class="token punctuation">,</span> count<span class="token punctuation">)</span>
</code></pre>
<pre class="language-"><code>count: 0
1
1
2
3
2
3
4
1
2
3
</code></pre><p>为了解决以上问题,Spark引入了累加器,见如下代码:</p>
<pre class="language-"><code class="lang-python">rdd <span class="token operator">=</span> sc<span class="token punctuation">.</span>parallelize<span class="token punctuation">(</span><span class="token punctuation">[</span><span class="token number">1</span><span class="token punctuation">,</span><span class="token number">2</span><span class="token punctuation">,</span><span class="token number">3</span><span class="token punctuation">,</span><span class="token number">4</span><span class="token punctuation">,</span><span class="token number">5</span><span class="token punctuation">,</span><span class="token number">6</span><span class="token punctuation">,</span><span class="token number">7</span><span class="token punctuation">,</span><span class="token number">8</span><span class="token punctuation">,</span><span class="token number">9</span><span class="token punctuation">,</span><span class="token number">10</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token number">3</span><span class="token punctuation">)</span>
count <span class="token operator">=</span> sc<span class="token punctuation">.</span>accumulator<span class="token punctuation">(</span><span class="token number">0</span><span class="token punctuation">)</span> <span class="token comment"># 设置全局累加器</span>
<span class="token keyword">def</span> <span class="token function">func</span><span class="token punctuation">(</span>item<span class="token punctuation">)</span><span class="token punctuation">:</span>
<span class="token keyword">global</span> count
count <span class="token operator">+=</span> <span class="token number">1</span>
<span class="token keyword">print</span><span class="token punctuation">(</span>count<span class="token punctuation">)</span>
<span class="token keyword">return</span> item
rdd<span class="token punctuation">.</span><span class="token builtin">map</span><span class="token punctuation">(</span>func<span class="token punctuation">)</span><span class="token punctuation">.</span>collect<span class="token punctuation">(</span><span class="token punctuation">)</span>
<span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">"count: "</span><span class="token punctuation">,</span> count<span class="token punctuation">)</span>
</code></pre>
<pre class="language-"><code>count: 10
1
2
3
4
1
2
3
1