-
Notifications
You must be signed in to change notification settings - Fork 25
/
Copy pathdata-transportation.Rmd
1375 lines (1064 loc) · 47.1 KB
/
data-transportation.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# 数据搬运 {#chap-data-transportation}
导入数据与导出数据,各种数据格式,数据库
处理 Excel 2003 (XLS) 和 Excel 2007 (XLSX) 文件还可以使用 [WriteXLS](https://github.com/marcschwartz/WriteXLS) 包,不过它依赖于 Perl,另一个 R 包 [xlsx]( https://github.com/dragua/rexcel) 与之功能类似,依赖 Java 环境。Jennifer Bryan 和 Hadley Wickham 开发的 [readxl](https://github.com/tidyverse/readxl) 包和 Jeroen Ooms 开发的 [writexl](https://github.com/ropensci/writexl) 包专门处理 xlsx 格式并且无任何系统依赖。
## 导入数据 {#import-data}
Base R 针对不同的数据格式文件,提供了大量的数据导入和导出函数,不愧是专注数据分析20余年的优秀统计软件。 除了函数 `write.ftable` 和 `read.ftable` 来自 stats 包,都来自 base 和 utils 包
```{r}
# 当前环境的搜索路径
searchpaths()
# 返回匹配结果及其所在路径的编号
apropos("^(read|write)", where = TRUE, mode = "function")
```
### `scan` {#scan-file}
```{r,eval=FALSE}
scan(file = "", what = double(), nmax = -1, n = -1, sep = "",
quote = if(identical(sep, "\n")) "" else "'\"", dec = ".",
skip = 0, nlines = 0, na.strings = "NA",
flush = FALSE, fill = FALSE, strip.white = FALSE,
quiet = FALSE, blank.lines.skip = TRUE, multi.line = TRUE,
comment.char = "", allowEscapes = FALSE,
fileEncoding = "", encoding = "unknown", text, skipNul = FALSE)
```
首先让我们用 `cat` 函数创建一个练习数据集 `ex.data`
```{r}
cat("TITLE extra line", "2 3 5 7", "11 13 17")
cat("TITLE extra line", "2 3 5 7", "11 13 17", file = "data/ex.data", sep = "\n")
```
以此练习数据集,介绍 `scan` 函数最常用的参数
```{r,error=TRUE}
scan("data/ex.data")
```
从上面的报错信息,我们发现 `scan` 函数只能读取同一类型的数据,如布尔型 logical, 整型 integer,数值型 numeric(double), 复数型 complex,字符型 character,raw 和列表 list。所以我们设置参数 `skip = 1` 把第一行跳过,就成功读取了数据
```{r}
scan("data/ex.data", skip = 1)
```
如果设置参数 `quiet = TRUE` 就不会报告读取的数据量
```{r}
scan("data/ex.data", skip = 1, quiet = TRUE)
```
参数 `nlines = 1` 表示只读取一行数据
```{r}
scan("data/ex.data", skip = 1, nlines = 1) # only 1 line after the skipped one
```
默认参数 `flush = TRUE` 表示读取最后一个请求的字段后,刷新到行尾,下面对比一下读取的结果
```{r}
scan("data/ex.data", what = list("", "", "")) # flush is F -> read "7"
scan("data/ex.data", what = list("", "", ""), flush = TRUE)
```
临时文件 ex.data 用完了,我们调用 `unlink` 函数将其删除,以免留下垃圾文件
```{r}
unlink("data/ex.data") # tidy up
```
### `read.table` {#read-write-table}
```{r,eval=FALSE}
read.table(file,
header = FALSE, sep = "", quote = "\"'",
dec = ".", numerals = c("allow.loss", "warn.loss", "no.loss"),
row.names, col.names, as.is = !stringsAsFactors,
na.strings = "NA", colClasses = NA, nrows = -1,
skip = 0, check.names = TRUE, fill = !blank.lines.skip,
strip.white = FALSE, blank.lines.skip = TRUE,
comment.char = "#",
allowEscapes = FALSE, flush = FALSE,
stringsAsFactors = default.stringsAsFactors(),
fileEncoding = "", encoding = "unknown", text, skipNul = FALSE
)
read.csv(file,
header = TRUE, sep = ",", quote = "\"",
dec = ".", fill = TRUE, comment.char = "", ...
)
read.csv2(file,
header = TRUE, sep = ";", quote = "\"",
dec = ",", fill = TRUE, comment.char = "", ...
)
read.delim(file,
header = TRUE, sep = "\t", quote = "\"",
dec = ".", fill = TRUE, comment.char = "", ...
)
read.delim2(file,
header = TRUE, sep = "\t", quote = "\"",
dec = ",", fill = TRUE, comment.char = "", ...
)
```
变量名是不允许以下划线开头的,同样在数据框里,列名也不推荐使用下划线开头。默认情况下,`read.table` 都会通过参数 `check.names` 检查列名的有效性,该参数实际调用了函数 `make.names` 去检查。如果想尽量保持数据集原来的样子可以设置参数 `check.names = FALSE, stringsAsFactors = FALSE`。 默认情形下,`read.table` 还会将字符串转化为因子变量,这是 R 的历史原因,作为一门统计学家的必备语言,在统计模型中,字符常用来描述类别,而类别变量在 R 环境中常用因子类型来表示,而且大量内置的统计模型也是将它们视为因子变量,如 `lm` 、`glm` 等
```{r}
dat1 = read.table(header = TRUE, check.names = TRUE, text = "
_a _b _c
1 2 a1
3 4 a2
")
dat1
dat2 = read.table(header = TRUE, check.names = FALSE, text = "
_a _b _c
1 2 a1
3 4 a2
")
dat2
dat3 <- read.table(header = TRUE, check.names = FALSE,
stringsAsFactors = FALSE, text = "
_a _b _c
1 2 a1
3 4 a2
"
)
dat3
```
### `readLines` {#read-write-lines}
```{r,eval=FALSE}
readLines(con = stdin(), n = -1L, ok = TRUE, warn = TRUE,
encoding = "unknown", skipNul = FALSE)
```
让我们折腾一波,读进来又写出去,只有 R 3.5.3 以上才能保持原样的正确输入输出,因为这里有一个之前版本包含的 BUG
```{r}
writeLines(readLines(system.file("DESCRIPTION", package = "splines")), "data/DESCRIPTION")
# 比较一下
identical(
readLines(system.file("DESCRIPTION", package = "splines")),
readLines("data/DESCRIPTION")
)
```
这次我们创建一个真的临时文件,因为重新启动 R 这个文件和文件夹就没有了,回收掉了
```{r}
fil <- tempfile(fileext = ".data")
cat("TITLE extra line", "2 3 5 7", "", "11 13 17", file = fil,
sep = "\n")
fil
```
设置参数 `n = -1` 表示将文件 fil 的内容从头读到尾
```{r}
readLines(fil, n = -1)
```
作为拥有良好习惯的 R 用户,这种垃圾文件最好用后即焚
```{r}
unlink(fil) # tidy up
```
再举个例子,我们创建一个新的临时文件 `fil`,文件内容只有
```{r}
cat("123\nabc")
```
```{r}
fil <- tempfile("test")
cat("123\nabc\n", file = fil, append = TRUE)
fil
readLines(fil)
```
这次读取文件的过程给出了警告,原因是 fil 没有以空行结尾,`warn = TRUE` 表示这种情况要给出警告,如果设置参数 `warn = FALSE` 就没有警告。我们还是建议大家尽量遵循规范。
再举一个例子,从一个连接读取数据,建立连接的方式有很多,参见 `?file`,下面设置参数 `blocking`
```{r}
con <- file(fil, "r", blocking = FALSE)
readLines(con)
cat(" def\n", file = fil, append = TRUE)
readLines(con)
# 关闭连接
close(con)
# 清理垃圾文件
unlink(fil)
```
### `readRDS` {#read-save-rds}
序列化数据操作,Mark Klik 开发的 [fst](https://github.com/fstpackage/fst) 和 [Travers Ching](https://travers.im/) 开发的 [qs](https://github.com/traversc/qs), Hadley Wickham 开发的 [feather](https://github.com/wesm/feather/tree/master/R) 包实现跨语言环境快速的读写数据
Table: (\#tab:fst-vs-others) fst 序列化数据框对象性能比较 BaseR、 data.table 和 feather [^fst-performance]
| Method | Format | Time (ms) | Size (MB) | Speed (MB/s) | N |
| :------------- | :------ | :-------- | :-------- | :----------- | :------ |
| readRDS | bin | 1577 | 1000 | 633 | 112 |
| saveRDS | bin | 2042 | 1000 | 489 | 112 |
| fread | csv | 2925 | 1038 | 410 | 232 |
| fwrite | csv | 2790 | 1038 | 358 | 241 |
| read\_feather | bin | 3950 | 813 | 253 | 112 |
| write\_feather | bin | 1820 | 813 | 549 | 112 |
| **read\_fst** | **bin** | **457** | **303** | **2184** | **282** |
| **write\_fst** | **bin** | **314** | **303** | **3180** | **291** |
目前比较好的是 qs 和 fst 包
[^fst-performance]: https://www.fstpackage.org/
## 其它数据格式 {#other-data-source}
来自其它格式的数据形式,如 JSON、XML、YAML 需要转化清理成 R 中数据框的形式 data.frame
1. [Data Rectangling with jq](https://www.carlboettiger.info/2017/12/11/data-rectangling-with-jq/)
1. [Mongolite User Manual](https://jeroen.github.io/mongolite/) introduction to using MongoDB with the mongolite client in R
[jsonlite](https://github.com/jeroen/jsonlite) 读取 `*.json` 格式的文件,`jsonlite::write_json` 函数将 R对象保存为 JSON 文件,`jsonlite::fromJSON` 将 json 字符串或文件转化为 R 对象,`jsonlite::toJSON` 函数正好与之相反
```{r}
library(jsonlite)
# 从 json 格式的文件导入
# jsonlite::read_json(path = "path/to/filename.json")
# A JSON array of primitives
json <- '["Mario", "Peach", null, "Bowser"]'
# 简化为原子向量atomic vector
fromJSON(json)
# 默认返回一个列表
fromJSON(json, simplifyVector = FALSE)
```
yaml 包读取 `*.yml` 格式文件,返回一个列表,`yaml::write_yaml` 函数将 R 对象写入 yaml 格式
```{r}
library(yaml)
yaml::read_yaml(file = '_bookdown.yml')
```
Table: (\#tab:other-softwares) 导入来自其它数据分析软件产生的数据集
| 统计软件 | R函数 | R包
|:------------------|:------------------|:------------------
|ERSI ArcGIS | `read.shapefile` | shapefiles
|Matlab | `readMat` | R.matlab
|minitab | `read.mtp` | foreign
|SAS (permanent data)| `read.ssd` | foreign
|SAS (XPORT format)| `read.xport` | foreign
|SPSS | `read.spss` | foreign
|Stata | `read.dta` | foreign
|Systat | `read.systat` | foreign
|Octave | `read.octave` | foreign
Table: (\#tab:other-read-functions) 导入来自其它格式的数据集
| 文件格式 | R函数 | R包
|:------------------|:------------------|:------------------
| 列联表数据 | `read.ftable` | stats
| 二进制数据 | `readBin` | base
| 字符串数据 | `readChar` | base
| 剪贴板数据 | `readClipboard` | utils
`read.dcf` 函数读取 Debian 控制格式文件,这种类型的文件以人眼可读的形式在存储数据,如 R 包的 DESCRIPTION 文件或者包含所有 CRAN 上 R 包描述的文件 <https://cran.r-project.org/src/contrib/PACKAGES>
```{r}
x <- read.dcf(file = system.file("DESCRIPTION", package = "splines"),
fields = c("Package", "Version", "Title"))
x
```
最后要提及拥有瑞士军刀之称的 [rio](https://github.com/leeper/rio) 包,它集合了当前 R 可以读取的所有统计分析软件导出的数据。
## 导入大数据集 {#import-large-dataset}
在不使用数据库的情况下,从命令行导入大数据集,如几百 M 或几个 G 的 csv 文件。利用 data.table 包的 `fread` 去读取
<https://stackoverflow.com/questions/1727772/>
## 从数据库导入 {#import-data-from-database}
[Hands-On Programming with R](https://rstudio-education.github.io/hopr) 数据读写章节[^dataio] 以及 [R, Databases and Docker](https://smithjd.github.io/sql-pet/)
将大量的 txt 文本存进 MySQL 数据库中,通过操作数据库来聚合文本,极大降低内存消耗 [^txt-to-mysql],而 ODBC 与 DBI 包是其它数据库接口的基础,knitr 提供了一个支持 SQL 代码的引擎,它便是基于 DBI,因此可以在 R Markdown 文档中直接使用 SQL 代码块 [^sql-engine]。这里制作一个归纳表格,左边数据库右边对应其 R 接口,两边都包含链接,如表 \@ref(tab:dbi) 所示
```{r dbi,echo=FALSE}
db2r <- data.frame(
db = c("MySQL", "SQLite", "PostgreSQL", "MariaDB"),
db_urls = c(
"https://www.mysql.com/",
"https://www.sqlite.org",
"https://www.postgresql.org/",
"https://mariadb.org/"),
dbi = c("RMySQL", "RSQLite", "RPostgres", "RMariaDB"),
dbi_urls = c(
"https://github.com/r-dbi/RMySQL",
"https://github.com/r-dbi/RSQLite",
"https://github.com/r-dbi/RPostgres",
"https://github.com/r-dbi/RMariaDB"
)
)
# db <- paste0("[", db2r$db, "](", db2r$db_urls, ")")
# dbi <- paste0("[", db2r$dbi, "](", db2r$dbi_urls, ")")
knitr::kable(db2r, col.names = c("数据库","官网","R接口","开发仓"),
caption = "数据库接口")
```
### PostgreSQL
[odbc](https://github.com/r-dbi/odbc) 可以支持很多数据库,下面以连接 [PostgreSQL](https://www.postgresql.org/) 数据库为例介绍其过程
首先在某台机器上,拉取 PostgreSQL 的 Docker 镜像
```{bash,eval=FALSE}
docker pull postgres
```
在 Docker 上运行 PostgreSQL,主机端口号 8181 映射给数据库 PostgreSQL 的默认端口号 5432(或其它你的 DBA 分配给你的端口)
```{bash,eval=FALSE}
docker run --name psql -d -p 8181:5432 -e ROOT=TRUE \
-e USER=xiangyun -e PASSWORD=cloud postgres
```
在主机 Ubuntu 上配置
```{bash,eval=FALSE}
sudo apt-get install unixodbc unixodbc-dev odbc-postgresql
```
端口 5432 是分配给 PostgreSQL 的默认端口,`host` 可以是云端的地址,如 你的亚马逊账户下的 PostgreSQL 数据库地址 `<ec2-54-83-201-96.compute-1.amazonaws.com>`,也可以是本地局域网IP地址,如`<192.168.1.200>`。通过参数 `dbname` 连接到指定的 PostgreSQL 数据库,如 Heroku,这里作为演示就以默认的数据库 `postgres` 为例
查看配置系统文件路径
```bash
odbcinst -j
```
```
unixODBC 2.3.6
DRIVERS............: /etc/odbcinst.ini
SYSTEM DATA SOURCES: /etc/odbc.ini
FILE DATA SOURCES..: /etc/ODBCDataSources
USER DATA SOURCES..: /root/.odbc.ini
SQLULEN Size.......: 8
SQLLEN Size........: 8
SQLSETPOSIROW Size.: 8
```
不推荐修改全局配置文件,可设置 `ODBCSYSINI` 环境变量指定配置文件路径,如 `ODBCSYSINI=~/ODBC` <http://www.unixodbc.org/odbcinst.html>
安装完驱动程序,`/etc/odbcinst.ini` 文件内容自动更新,我们可以不必修改,如果你想自定义不妨手动修改,我们查看在 R 环境中注册的数据库,可以看到 PostgreSQL 的驱动已经配置好
```r
odbc::odbcListDrivers()
```
```
name attribute value
1 PostgreSQL ANSI Description PostgreSQL ODBC driver (ANSI version)
2 PostgreSQL ANSI Driver psqlodbca.so
3 PostgreSQL ANSI Setup libodbcpsqlS.so
4 PostgreSQL ANSI Debug 0
5 PostgreSQL ANSI CommLog 1
6 PostgreSQL ANSI UsageCount 1
7 PostgreSQL Unicode Description PostgreSQL ODBC driver (Unicode version)
8 PostgreSQL Unicode Driver psqlodbcw.so
9 PostgreSQL Unicode Setup libodbcpsqlS.so
10 PostgreSQL Unicode Debug 0
11 PostgreSQL Unicode CommLog 1
12 PostgreSQL Unicode UsageCount 1
```
系统配置文件 `/etc/odbcinst.ini` 已经包含有 PostgreSQL 的驱动配置,无需再重复配置
```
[PostgreSQL ANSI]
Description=PostgreSQL ODBC driver (ANSI version)
Driver=psqlodbca.so
Setup=libodbcpsqlS.so
Debug=0
CommLog=1
UsageCount=1
[PostgreSQL Unicode]
Description=PostgreSQL ODBC driver (Unicode version)
Driver=psqlodbcw.so
Setup=libodbcpsqlS.so
Debug=0
CommLog=1
UsageCount=1
```
只需将如下内容存放在 `~/.odbc.ini` 文件中,
```
[PostgreSQL]
Driver = PostgreSQL Unicode
Database = postgres
Servername = 172.17.0.1
UserName = postgres
Password = default
Port = 8080
```
最后,一行命令 DNS 配置连接 <https://github.com/r-dbi/odbc> 这样就实现了代码中无任何敏感信息,这里为了展示这个配置过程故而把相关信息公开。
> 注意下面的内容需要在容器中运行, Windows 环境下的配置 PostgreSQL 的驱动有点麻烦就不搞了,意义也不大,现在数据库基本都是跑在 Linux 系统上
`docker-machine.exe ip default` 可以获得本地 Docker 的 IP,比如 192.168.99.101。 Travis 上 `ip addr` 可以查看 Docker 的 IP,如 172.17.0.1
```r
library(DBI)
con <- dbConnect(RPostgres::Postgres(),
dbname = "postgres",
host = ifelse(is_on_travis, Sys.getenv("DOCKER_HOST_IP"), "192.168.99.101"),
port = 8080,
user = "postgres",
password = "default"
)
```
```r
library(DBI)
con <- dbConnect(odbc::odbc(), "PostgreSQL")
```
列出数据库中的所有表
```r
dbListTables(con)
```
第一次启动从 Docker Hub 上下载的镜像,默认的数据库是 postgres 里面没有任何表,所以将 R 环境中的 mtcars 数据集写入 postgres 数据库
将数据集 mtcars 写入 PostgreSQL 数据库中,基本操作,写入表的操作也不能缓存,即不能缓存数据库中的表 mtcars
```r
dbWriteTable(con, "mtcars", mtcars, overwrite = TRUE)
```
现在可以看到数据表 mtcars 的各个字段
```r
dbListFields(con, "mtcars")
```
最后执行一条 SQL 语句
```r
res <- dbSendQuery(con, "SELECT * FROM mtcars WHERE cyl = 4") # 发送 SQL 语句
dbFetch(res) # 获取查询结果
dbClearResult(res) # 清理查询通道
```
或者一条命令搞定
```r
dbGetQuery(con, "SELECT * FROM mtcars WHERE cyl = 4")
```
再复杂一点的 SQL 查询操作
```r
dbGetQuery(con, "SELECT cyl, AVG(mpg) AS mpg FROM mtcars GROUP BY cyl ORDER BY cyl")
aggregate(mpg ~ cyl, data = mtcars, mean)
```
得益于 knitr [@xie2015] 开发的钩子,这里直接写 SQL 语句块,值得注意的是 SQL 代码块不能启用缓存,数据库连接通道也不能缓存,如果数据库中还没有写入表,那么写入表的操作也不能缓存, `tab.cap = "表格标题"` 输出的内容是一个表格
```sql
SELECT cyl, AVG(mpg) AS mpg FROM mtcars GROUP BY cyl ORDER BY cyl
```
如果将查询结果导出到变量,在 Chunk 设置 `output.var = "agg_cyl"` 可以使用缓存,下面将 mpg 按 cyl 分组聚合的结果打印出来,`ref.label = "mtcars"` 引用上一个 SQL 代码块的内容
这种基于 odbc 的方式的好处就不需要再安装 R 包 RPostgres 和相关系统依赖,最后关闭连接通道
```r
dbDisconnect(con)
```
### MySQL
MySQL 是一个很常见,应用也很广泛的数据库,数据分析的常见环境是在一个R Notebook 里,我们可以在正文之前先设定数据库连接信息
````{r echo = FALSE, comment = NA}
cat("```{r setup}
library(DBI)
# 指定数据库连接信息
db <- dbConnect(RMySQL::MySQL(),
dbname = 'dbtest',
username = 'user_test',
password = 'password',
host = '10.10.101.10',
port = 3306
)
# 创建默认连接
knitr::opts_chunk$set(connection = 'db')
# 设置字符编码,以免中文查询乱码
DBI::dbSendQuery(db, 'SET NAMES utf8')
# 设置日期变量,以运用在SQL中
idate <- '2019-05-03'
```")
````
SQL 代码块中使用 R 环境中的变量,并将查询结果输出为R环境中的数据框
````{r echo = FALSE, comment = NA}
cat("```{sql, output.var='data_output'}
SELECT * FROM user_table where date_format(created_date,'%Y-%m-%d')>=?idate
```")
````
以上代码会将 SQL 的运行结果存在 `data_output` 这是数据库中,idate 取之前设置的日期`2019-05-03`,`user_table` 是 MySQL 数据库中的表名,`created_date` 是创建`user_table`时,指定的日期名。
如果 SQL 比较长,为了代码美观,把带有变量的 SQL 保存为`demo.sql`脚本,只需要在 SQL 的 chunk 中直接读取 SQL 文件[^sql-chunck]。
````{r echo = FALSE, comment = NA}
cat("```{sql, code=readLines('demo.sql'), output.var='data_output'}
```")
````
如果我们需要每天或者按照指定的日期重复地运行这个 R Markdown 文件,可以在 YAML 部分引入参数[^params-knit]
```markdown
---
params:
date: "2019-05-03" # 参数化日期
---
```
````{r echo = FALSE, comment = NA}
cat("```{r setup, include=FALSE}
idate = params$date # 将参数化日期传递给 idate 变量
```")
````
我们将这个 Rmd 文件命名为 `MyDocument.Rmd`,运行这个文件可以从 R 控制台执行或在 RStudio 点击 knit。
```{r, eval=FALSE}
rmarkdown::render("MyDocument.Rmd", params = list(
date = "2019-05-03"
))
```
如果在文档的 YAML 位置已经指定日期,这里可以不指定。注意在这里设置日期会覆盖 YAML 处指定的参数值,这样做的好处是可以批量化操作。
### Spark
当数据分析报告遇上 Spark 时,就需要 [SparkR](https://github.com/apache/spark/tree/master/R)、 [sparklyr](https://github.com/rstudio/sparklyr)、 [arrow](https://github.com/apache/arrow/tree/master/r) 或 [rsparking](https://github.com/h2oai/sparkling-water/tree/master/r) 接口了, Javier Luraschi 写了一本书 [The R in Spark: Learning Apache Spark with R](https://therinspark.com/) 详细介绍了相关扩展和应用
首先安装 sparklyr 包,RStudio 公司 Javier Lurasch 开发了 sparklyr 包,作为 Spark 与 R 语言之间的接口,安装完 sparklyr 包,还是需要 Spark 和 Hadoop 环境
```{r,eval=FALSE}
install.packages('sparklyr')
library(sparklyr)
spark_install()
# Installing Spark 2.4.0 for Hadoop 2.7 or later.
# Downloading from:
# - 'https://archive.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz'
# Installing to:
# - '~/spark/spark-2.4.0-bin-hadoop2.7'
# trying URL 'https://archive.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz'
# Content type 'application/x-gzip' length 227893062 bytes (217.3 MB)
# ==================================================
# downloaded 217.3 MB
#
# Installation complete.
```
既然 sparklyr 已经安装了 Spark 和 Hadoop 环境,安装 SparkR 后,只需配置好路径,就可以加载 SparkR 包
```{r,eval=FALSE}
install.packages('SparkR')
if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
Sys.setenv(SPARK_HOME = "~/spark/spark-2.4.0-bin-hadoop2.7")
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))
```
[rscala](https://github.com/dbdahl/rscala) 架起了 R 和 Scala 两门语言之间交流的桥梁,使得彼此之间可以互相调用
> 是否存在这样的可能, Spark 提供了大量的 MLib 库的调用接口,R 的功能支持是最少的,Java/Scala 是原生的,那么要么自己开发新的功能整合到 SparkR 中,要么借助 rscala 将 scala 接口代码封装进来
在本地,Windows 主机上,可以在 `.Rprofile` 中给 Spark 添加环境变量 `SPARK_HOME` 指定其安装路径,
```{r,eval=FALSE}
# Windows 平台默认安装路径
Sys.setenv(SPARK_HOME = "C:/Users/XiangYun/AppData/Local/spark/spark-2.4.3-bin-hadoop2.7")
library(sparklyr)
sc <- spark_connect(master = "local", version = "2.4")
```
将 R 环境中的数据集 mtcars 传递到 Spark 上
```{r,eval=FALSE}
cars <- copy_to(sc, mtcars)
cars
```
```
# Source: spark<mtcars> [?? x 11]
mpg cyl disp hp drat wt qsec vs am gear carb
<dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
1 21 6 160 110 3.9 2.62 16.5 0 1 4 4
2 21 6 160 110 3.9 2.88 17.0 0 1 4 4
3 22.8 4 108 93 3.85 2.32 18.6 1 1 4 1
4 21.4 6 258 110 3.08 3.22 19.4 1 0 3 1
5 18.7 8 360 175 3.15 3.44 17.0 0 0 3 2
6 18.1 6 225 105 2.76 3.46 20.2 1 0 3 1
# ... with more rows
```
监控和分析命令执行的情况,可以在浏览器中,见图 \@ref(fig:spark-web)
```{r,eval=FALSE}
spark_web(sc)
```
```{r spark-web,echo=FALSE,fig.cap="Spark Web 接口"}
knitr::include_graphics(path = "screenshots/spark-start.png")
```
传递 SQL 查询语句,比如数据集 mtcars 有多少行
```{r,eval=FALSE}
library(DBI)
dbGetQuery(sc, "SELECT count(*) FROM mtcars")
```
```
count(1)
1 32
```
进一步地,我们可以调用 dplyr 包来写数据操作,避免写复杂逻辑的 SQL 语句,
```{r,eval=FALSE}
# library(dplyr) # 数据操作
library(tidyverse) # 提供更多功能,包括数据可视化
count(cars)
```
再举一个稍复杂的操作,先从数据集 cars 中选择两个字段 hp 和 mpg
```{r,eval=FALSE}
select(cars, hp, mpg) %>%
sample_n(100) |> # 随机选择 100 行
collect() |> # 执行 SQL 查询,将结果返回到本地
ggplot(aes(hp, mpg)) + # 绘图
geom_point()
```
数据查询和结果可视化,见图 \@ref(fig:spark-mtcars)
```{r spark-mtcars,echo=FALSE,fig.cap="数据聚合和可视化"}
knitr::include_graphics(path = "screenshots/spark-mtcars.png")
```
用完要记得关闭连接
```{r,eval=FALSE}
spark_disconnect(sc)
```
::: rmdwarning
不要使用 SparkR 接口,要使用 sparklyr, 后者的功能已经全面覆盖前者,生态方面更是更是已经远远超越,它有大厂 RStudio 支持,是公司支持的旗舰项目。但是 sparklyr 的版本稍微比最新的 Spark 低一两个版本,这是开发周期和出于稳定性的考虑,无伤大雅!
:::
Spark 提供了官方 R 语言接口 SparkR。Spark JVM 和 SparkR 包版本要匹配,比如从 CRAN 上安装了最新版的 SparkR,比如版本 2.4.4 就要去 Spark 官网下载最新版的预编译文件 spark-2.4.4-bin-hadoop2.7,解压到本地磁盘,比如 `D:/spark-2.4.4-bin-hadoop2.7`
```{r,eval=FALSE}
Sys.setenv(SPARK_HOME = "D:/spark-2.4.4-bin-hadoop2.7")
# Sys.setenv(R_HOME = "C:/Program Files/R/R-3.6.1/")
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]",
sparkConfig = list(spark.driver.memory = "4g"),
enableHiveSupport = TRUE)
```
从数据集 mtcars(数据类型是 R 的 data.frame) 创建 Spark 的 DataFrame 类型数据
```{r,eval=FALSE}
cars <- as.DataFrame(mtcars)
# 显示 SparkDataFrame 的前几行
head(cars)
```
```
mpg cyl disp hp drat wt qsec vs am gear carb
1 21.0 6 160 110 3.90 2.620 16.46 0 1 4 4
2 21.0 6 160 110 3.90 2.875 17.02 0 1 4 4
3 22.8 4 108 93 3.85 2.320 18.61 1 1 4 1
4 21.4 6 258 110 3.08 3.215 19.44 1 0 3 1
5 18.7 8 360 175 3.15 3.440 17.02 0 0 3 2
6 18.1 6 225 105 2.76 3.460 20.22 1 0 3 1
```
打印数据集 cars 的 schema 各个字段的
```{r,eval=FALSE}
printSchema(cars)
```
```
root
|-- mpg: double (nullable = true)
|-- cyl: double (nullable = true)
|-- disp: double (nullable = true)
|-- hp: double (nullable = true)
|-- drat: double (nullable = true)
|-- wt: double (nullable = true)
|-- qsec: double (nullable = true)
|-- vs: double (nullable = true)
|-- am: double (nullable = true)
|-- gear: double (nullable = true)
|-- carb: double (nullable = true)
```
从本地 JSON 文件创建 DataFrame
```{r,eval=FALSE}
path <- file.path(Sys.getenv("SPARK_HOME"), "examples/src/main/resources/people.json")
peopleDF <- read.json(path)
printSchema(peopleDF)
```
```
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
```
```{r,eval=FALSE}
peopleDF
```
```
SparkDataFrame[age:bigint, name:string]
```
```{r,eval=FALSE}
showDF(peopleDF)
```
```
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
```
peopleDF 转成 Hive 中的表 people
```{r,eval=FALSE}
createOrReplaceTempView(peopleDF, "people")
```
调用 `sql` 传递 SQL 语句查询数据,启动 `sparkR.session` 时,设置 `enableHiveSupport = TRUE`,就是执行不出来,报错,不知道哪里配置存在问题
```{r,eval=FALSE}
teenagers <- SparkR::sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
show(people)
```
```
Error in handleErrors(returnStatus, conn) :
org.apache.spark.sql.AnalysisException: java.lang.RuntimeException: java.io.IOException: (null) entry in command string: null chmod 0733 F:\tmp\hive;
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:106)
at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:214)
at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:114)
at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:102)
at org.apache.spark.sql.internal.SharedState.globalTempViewManager$lzycompute(SharedState.scala:141)
at org.apache.spark.sql.internal.SharedState.globalTempViewManager(SharedState.scala:136)
at org.apache.spark.sql.hive.HiveSessionStateBuilder$$anonfun$2.apply(HiveSessionStateBuilder.scala:55)
at org.apache.spark.sql.hive.HiveSessionStateBuilder$$anonfun$2.apply(HiveSessionStateBuilder.scala:55)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.gl
```
调用 `collect` 函数执行查询,并将结果返回到本地 `data.frame` 类型
```{r,eval=FALSE}
teenagersLocalDF <- collect(teenagers)
```
查看数据集 teenagersLocalDF 的属性
```{r,eval=FALSE}
print(teenagersLocalDF)
```
最后,关闭 SparkSession 会话
```{r,eval=FALSE}
sparkR.session.stop()
```
[^sql-chunck]: https://d.cosx.org/d/419974
[^txt-to-mysql]: https://brucezhaor.github.io/blog/2016/08/04/batch-process-txt-to-mysql
[^params-knit]: https://bookdown.org/yihui/rmarkdown/params-knit.html
[^dataio]: https://rstudio-education.github.io/hopr/dataio.html
[^sql-engine]: https://bookdown.org/yihui/rmarkdown/language-engines.html#sql
[rstudio-spark]: https://spark.rstudio.com/
[rmarkdown-teaching-demo]: https://stackoverflow.com/questions/35459166
## 批量导入数据 {#batch-import-data}
```{r,eval=FALSE, echo=TRUE,R.options=list(tidyverse.quiet = TRUE)}
library(tidyverse)
```
```{r,eval=FALSE}
read_list <- function(list_of_datasets, read_func) {
read_and_assign <- function(dataset, read_func) {
dataset_name <- as.name(dataset)
dataset_name <- read_func(dataset)
}
# invisible is used to suppress the unneeded output
output <- invisible(
sapply(list_of_datasets,
read_and_assign,
read_func = read_func, simplify = FALSE, USE.NAMES = TRUE
)
)
# Remove the extension at the end of the data set names
names_of_datasets <- c(unlist(strsplit(list_of_datasets, "[.]"))[c(T, F)])
names(output) <- names_of_datasets
return(output)
}
```
批量导入文件扩展名为 `.csv` 的数据文件,即逗号分割的文件
```{r,eval=FALSE}
data_files <- list.files(path = "path/to/csv/dir",
pattern = ".csv", full.names = TRUE)
print(data_files)
```
相比于 Base R 提供的 `read.csv` 函数,使用 readr 包的 `read_csv` 函数可以更快地读取csv格式文件,特别是在读取GB级数据文件时,效果特别明显。
```{r,eval=FALSE}
list_of_data_sets <- read_list(data_files, readr::read_csv)
```
使用 tibble 包的`glimpse`函数可以十分方便地对整个数据集有一个大致的了解,展示方式和信息量相当于 `str` 加 `head` 函数
```{r,eval=FALSE}
tibble::glimpse(list_of_data_sets)
```
## 批量导出数据 {#batch-export-data}
假定我们有一个列表,其每个元素都是一个数据框,现在要把每个数据框分别存入 xlsx 表的工作薄中,以 mtcars 数据集为例,将其按分类变量 cyl 分组拆分,获得一个列表 list
```{r}
dat <- split(mtcars, mtcars$cyl)
dat
```
将 xlsx 表格初始化,创建空白的工作薄, [openxlsx](https://github.com/awalker89/openxlsx) 包不依赖 Java 环境,读写效率也高
```{r,eval=FALSE}
## 加载 openxlsx 包
library(openxlsx)
## 创建空白的工作薄
wb <- createWorkbook()
```
将列表里的每张表分别存入 xlsx 表格的每个 worksheet,worksheet 的名字就是分组变量的名字
```{r,eval=FALSE}
Map(function(data, name){
addWorksheet(wb, name)
writeData(wb, name, data)
}, dat, names(dat))
```
最后保存数据到磁盘,见图 \@ref(fig:batch-export-xlsx)
```{r,eval=FALSE}
saveWorkbook(wb, file = "data/matcars.xlsx", overwrite = TRUE)
```
```{r batch-export-xlsx,fig.cap="批量导出数据",echo=FALSE}
knitr::include_graphics(path = 'screenshots/dm-batch-export-xlsx.png')
```
## 导出数据 {#export-data}
### 导出运行结果 {#export-output}
```{r,eval=FALSE}
capture.output(..., file = NULL, append = FALSE,
type = c("output", "message"), split = FALSE)
```
`capture.output` 将一段R代码执行结果,保存到文件,参数为表达式。`capture.output` 和 `sink` 的关系相当于 `with` 和 `attach` 的关系。
```{r}
glmout <- capture.output(summary(glm(case ~ spontaneous + induced,
data = infert, family = binomial()
)), file = "data/capture.txt")
capture.output(1 + 1, 2 + 2)
capture.output({
1 + 1
2 + 2
})
```
`sink` 函数将控制台输出结果保存到文件,只将 `outer` 函数运行的结果保存到 `ex-sink.txt` 文件,outer 函数计算的是直积,在这里相当于 `seq(10) %*% t(seq(10))`,而在 R 语言中,更加有效的计算方式是 `tcrossprod(seq(10),seq(10))`
```{r}
sink("data/ex-sink.txt")
i <- 1:10
outer(i, i, "*")
sink()
```
<!-- 记得删除文件 capture.txt 和 ex-sink.txt -->
### 导出数据对象 {#export-data-object}
```{r,eval=FALSE}
load(file, envir = parent.frame(), verbose = FALSE)
save(..., list = character(),
file = stop("'file' must be specified"),
ascii = FALSE, version = NULL, envir = parent.frame(),
compress = isTRUE(!ascii), compression_level,
eval.promises = TRUE, precheck = TRUE)
save.image(file = ".RData", version = NULL, ascii = FALSE,
compress = !ascii, safe = TRUE)
```
`load` 和`save` 函数加载或保存包含工作环境信息的数据对象,`save.image` 保存当前工作环境到磁盘,即保存工作空间中所有数据对象,数据格式为 `.RData`,即相当于
```{r,eval=FALSE}
save(list = ls(all.names = TRUE), file = ".RData", envir = .GlobalEnv)
```
`dump` 保存数据对象 AirPassengers 到文件 `AirPassengers.txt`,文件内容是 R 命令,可把`AirPassengers.txt`看作代码文档执行,dput 保存数据对象内容到文件`AirPassengers.dat`,文件中不包含变量名 AirPassengers。注意到 `dump` 输入是一个字符串,而 `dput` 要求输入数据对象的名称,`source` 函数与 `dump` 对应,而 `dget` 与 `dput`对应。
```{r}
# 加载数据
data(AirPassengers, package = "datasets")
# 将数据以R代码块的形式保存到文件
dump('AirPassengers', file = 'data/AirPassengers.txt')
# source(file = 'data/AirPassengers.txt')
```
接下来,我们读取 `AirPassengers.txt` 的文件内容,可见它是一段完整的 R 代码,可以直接复制到 R 的控制台中运行,并且得到一个与原始 AirPassengers 变量一样的结果
```{r}
cat(readLines('data/AirPassengers.txt'), sep = "\n")
```
`dput` 函数类似 `dump` 函数,保存数据对象到磁盘文件
```{r,comment=NA}
# 将 R 对象保存/导出到磁盘
dput(AirPassengers, file = 'data/AirPassengers.dat')
AirPassengers
# dget 作用与 dput 相反
AirPassengers2 <- dget(file = 'data/AirPassengers.dat')
AirPassengers2
```
同样地,现在我们观察 `dput` 函数保存的文件 `AirPassengers.dat` 内容,和`dump` 函数保存的文件 `AirPassengers.txt`相比,就缺一个赋值变量
```{r,comment=NA}
cat(readLines('data/AirPassengers.dat'), sep = "\n")
```
<!-- 记得删除文件 AirPassengers.txt 和 AirPassengers.dat -->
[openxlsx](https://github.com/ycphs/openxlsx) 可以读写 XLSX 文档
美团使用的大数据工具有很多,最常用的 Hive、Spark、Kylin、Impala、Presto 等,详见 <https://tech.meituan.com/2018/08/02/mt-r-practice.html>。下面主要介绍如何在 R 中连接 MySQL、Presto 和 Spark。
[sparklyr.flint](https://github.com/r-spark/sparklyr.flint) 支持 Spark 的时间序列库 [flint](https://github.com/twosigma/flint),[sparkxgb](https://github.com/rstudio/sparkxgb) 为 Spark 上的 XGBoost 提供 R 接口,[sparkwarc](https://github.com/r-spark/sparkwarc) 支持加载 Web ARChive 文件到 Spark 里
[sparkavro](https://github.com/chezou/sparkavro) 支持从 Apache Avro (<https://avro.apache.org/>) 读取文件到 Spark 里,[sparkbq](https://github.com/miraisolutions/sparkbq) 是一个 sparkly 扩展包,集成 Google BigQuery 服务,[geospark](https://github.com/harryprince/geospark) 提供 GeoSpark 库的 R 接口,并且以 sf 的数据操作方式,[rsparkling](https://github.com/h2oai/sparkling-water/tree/master/r) H2O Sparkling Water 机器学习库的 R 接口。
Spark 性能优化,参考三篇博文