-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSpark250Gross.py
65 lines (46 loc) · 1.94 KB
/
Spark250Gross.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, Row, LongType,DoubleType
import csv
spark = SparkSession.builder.appName("demo").master("local[2]").getOrCreate()
sc = spark.sparkContext
def mapFunction(data):
temp = int("".join(filter(str.isdigit, data[2])))
return (temp,data[0])
def mapFunctionFull(data):
temp = int("".join(filter(str.isdigit, data[2])))
return (temp,data[0],data[1])
if __name__=="__main__":
inputFile = "./data/top250gross.csv"
df = spark.read.csv(inputFile, encoding='utf-8',
inferSchema=True) # header表示数据的第一行是否为列名,inferSchema表示自动推断schema,此时未指定schema
pairRdd = df.rdd.map(mapFunction)
sortRdd = pairRdd.sortByKey(ascending=False)
pairRddFull = df.rdd.map(mapFunctionFull)
sortRddFull = pairRddFull.sortByKey(ascending=False)
ls = []
ls = sortRdd.collect()
# print(pairRdd.collect())
print(ls)
print(sortRdd.count())
selectList = []
for i in range(len(ls)):
if(i%8==0):
selectList.append(ls[i])
print(len(selectList))
with open('relation.csv', 'w+')as f:
f_csv = csv.writer(f)
f_csv.writerows(selectList)
# 创建schema对象
schema = StructType(
[ StructField("gross", LongType(), True),StructField("grade", DoubleType(), True),StructField("name", StringType(), True),]
)
# 构建行对象row
# 将schema应用到rdd上,使用createDataFrame创建DataFrame
customerinfo_df = spark.createDataFrame(sortRddFull, schema)
# 构建连接数据库的参数
database_conf = {}
database_conf["user"] = "root"
database_conf["password"] = "fengyunjia"
database_conf["dirver"] = "com.mysql.cj.jdbc.Driver"
customerinfo_df.write.jdbc(url="jdbc:mysql://localhost:3306/imdbmovie?serverTimezone=Asia/Shanghai&useSSL=false",
table="top250", mode="append", properties=database_conf)