-
Notifications
You must be signed in to change notification settings - Fork 466
/
bcw.txt
106 lines (106 loc) · 6.63 KB
/
bcw.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
{"cells": [{"cell_type": "code", "execution_count": null, "id":
"aacedd99-7685-41e7-b2d3-fd78bc68163a", "metadata": {}, "outputs": [],
"source": "PROJECT=!gcloud config get-value
project\nPROJECT=PROJECT[0]\nBUCKET = PROJECT + '-dsongcp'\nimport
os\nos.environ['BUCKET'] = PROJECT + '-dsongcp'"}, {"cell_type": "code",
"execution_count": null, "id": "87a2f043-1286-47ca-822a-25a60aaf35c3",
"metadata": {}, "outputs": [], "source": "from pyspark.sql import
SparkSession\nfrom pyspark import SparkContext\nsc = SparkContext('local',
'logistic')\nspark = SparkSession \\\n .builder \\\n
.appName(\"Logistic regression w/ Spark ML\") \\\n .getOrCreate()"},
{"cell_type": "code", "execution_count": null, "id":
"e6f57a61-cd5f-463c-8a95-f507a6118d22", "metadata": {}, "outputs": [],
"source": "from pyspark.mllib.classification import
LogisticRegressionWithLBFGS\nfrom pyspark.mllib.regression import
LabeledPoint"}, {"cell_type": "code", "execution_count": null, "id":
"200516cb-672f-411d-a681-63fe8e058b13", "metadata": {}, "outputs": [],
"source": "traindays = spark.read \\\n .option(\"header\", \"true\")
\\\n
.csv('gs://{}/flights/trainday.csv'.format(BUCKET))\ntraindays.createOrReplaceTempView('traindays')"},
{"cell_type": "code", "execution_count": null, "id":
"3b381982-a10d-453f-9a96-66ea2524b318", "metadata": {}, "outputs": [],
"source": "traindays.createOrReplaceTempView('traindays')"}, {"cell_type":
"code", "execution_count": null, "id":
"99d7b624-84ff-4e48-b15e-7558c6f198cb", "metadata": {}, "outputs": [],
"source": "spark.sql(\"SELECT * from traindays LIMIT 5\").show()"},
{"cell_type": "code", "execution_count": null, "id":
"b10b0011-024f-4c81-b42f-df2e88356ae4", "metadata": {}, "outputs": [],
"source": "inputs =
'gs://{}/flights/tzcorr/all_flights-00000-*'.format(BUCKET)"},
{"cell_type": "code", "execution_count": null, "id":
"944de939-f0de-443c-b208-6884ced2797d", "metadata": {}, "outputs": [],
"source": "flights =
spark.read.json(inputs)\nflights.createOrReplaceTempView('flights')"},
{"cell_type": "code", "execution_count": null, "id":
"f768e187-f4c8-4199-ae72-b077e5bda450", "metadata": {}, "outputs": [],
"source": "trainquery = \"\"\"\nSELECT\n DEP_DELAY, TAXI_OUT, ARR_DELAY,
DISTANCE\nFROM flights f\nJOIN traindays t\nON f.FL_DATE ==
t.FL_DATE\nWHERE\n t.is_train_day == 'True'\n\"\"\"\ntraindata =
spark.sql(trainquery)"}, {"cell_type": "code", "execution_count": null,
"id": "6e82a419-802f-402a-b9a8-de96a8503e38", "metadata": {}, "outputs":
[], "source": "print(traindata.head(2))"}, {"cell_type": "code",
"execution_count": null, "id": "3c1f6612-cf46-4791-bc12-e82257944a6a",
"metadata": {}, "outputs": [], "source": "traindata.describe().show()"},
{"cell_type": "code", "execution_count": null, "id":
"8d2c86a8-aec9-441b-a270-884e01113c39", "metadata": {}, "outputs": [],
"source": "trainquery = \"\"\"\nSELECT\nDEP_DELAY, TAXI_OUT, ARR_DELAY,
DISTANCE\nFROM flights f\nJOIN traindays t\nON f.FL_DATE ==
t.FL_DATE\nWHERE\nt.is_train_day == 'True' AND\nf.dep_delay IS NOT NULL
AND \nf.arr_delay IS NOT NULL\n\"\"\"\ntraindata =
spark.sql(trainquery)\ntraindata.describe().show()"}, {"cell_type":
"code", "execution_count": null, "id":
"b35cfd30-6bec-44ba-95c6-2c15c83995ef", "metadata": {}, "outputs": [],
"source": "trainquery = \"\"\"\nSELECT\n DEP_DELAY, TAXI_OUT, ARR_DELAY,
DISTANCE\nFROM flights f\nJOIN traindays t\nON f.FL_DATE ==
t.FL_DATE\nWHERE\n t.is_train_day == 'True' AND\n f.CANCELLED == 'False'
AND \n f.DIVERTED == 'False'\n\"\"\"\ntraindata =
spark.sql(trainquery)\ntraindata.describe().show()"}, {"cell_type":
"code", "execution_count": null, "id":
"5384d891-0b6e-4379-a745-80c9c9a1945e", "metadata": {}, "outputs": [],
"source": "def to_example(fields):\n return LabeledPoint(\\\n
float(fields['ARR_DELAY'] < 15), #ontime? \\\n [ \\\n
fields['DEP_DELAY'], \\\n fields['TAXI_OUT'], \\\n
fields['DISTANCE'], \\\n ])"}, {"cell_type": "code",
"execution_count": null, "id": "9bd990b2-2611-4ddd-8416-22277d8a9550",
"metadata": {}, "outputs": [], "source": "examples =
traindata.rdd.map(to_example)"}, {"cell_type": "code", "execution_count":
null, "id": "f8f6679f-23c1-4223-a46d-77b43d84b1ef", "metadata": {},
"outputs": [], "source": "lrmodel =
LogisticRegressionWithLBFGS.train(examples, intercept=True)"},
{"cell_type": "code", "execution_count": null, "id":
"1174a6ec-daec-46e1-88a2-572f78430501", "metadata": {}, "outputs": [],
"source": "print(lrmodel.weights,lrmodel.intercept)"}, {"cell_type":
"code", "execution_count": null, "id":
"0289348e-cd01-4f0b-99a5-1ddcc9686311", "metadata": {}, "outputs": [],
"source": "print(lrmodel.predict([6.0,12.0,594.0]))"}, {"cell_type":
"code", "execution_count": null, "id":
"4b3c1a1c-54da-4503-a416-33aef1db3910", "metadata": {}, "outputs": [],
"source": "print(lrmodel.predict([36.0,12.0,594.0]))"}, {"cell_type":
"code", "execution_count": null, "id":
"bbcc041d-703a-4df7-9acf-523d8e171602", "metadata": {}, "outputs": [],
"source":
"lrmodel.clearThreshold()\nprint(lrmodel.predict([6.0,12.0,594.0]))\nprint(lrmodel.predict([36.0,12.0,594.0]))"},
{"cell_type": "code", "execution_count": null, "id":
"6f8efbda-c85d-4863-ba65-47976f3e6ec9", "metadata": {}, "outputs": [],
"source": "lrmodel.setThreshold(0.7)
\nprint(lrmodel.predict([6.0,12.0,594.0]))\nprint(lrmodel.predict([36.0,12.0,594.0]))"},
{"cell_type": "code", "execution_count": null, "id":
"9b037490-0cdb-4d3e-a766-9d4d2e8c2adf", "metadata": {}, "outputs": [],
"source": "MODEL_FILE='gs://' + BUCKET +
'/flights/sparkmloutput/model'\nos.system('gsutil -m rm -r ' +
MODEL_FILE)"}, {"cell_type": "code", "execution_count": null, "id":
"f80fc2f1-8ea8-4f50-a112-edd4e3c3219e", "metadata": {}, "outputs": [],
"source": "lrmodel.save(sc, MODEL_FILE)\nprint('{}
saved'.format(MODEL_FILE))"}, {"cell_type": "code", "execution_count":
null, "id": "fc5ad0e1-eb9d-4a37-90a9-4c47d9684d51", "metadata": {},
"outputs": [], "source": "lrmodel = 0\nprint(lrmodel)"}, {"cell_type":
"code", "execution_count": null, "id":
"b55c5fa9-6c61-4132-8b13-08ddc454bc46", "metadata": {}, "outputs": [],
"source": "from pyspark.mllib.classification import
LogisticRegressionModel\nlrmodel = LogisticRegressionModel.load(sc,
MODEL_FILE)\nlrmodel.setThreshold(0.7)"}], "metadata": {"kernelspec":
{"display_name": "Python 3", "language": "python", "name": "python3"},
"language_info": {"codemirror_mode": {"name": "ipython", "version": 3},
"file_extension": ".py", "mimetype": "text/x-python", "name": "python",
"nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version":
"3.8.15"}}, "nbformat": 4, "nbformat_minor": 5}