-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathamazon_review_tfidf_normalized.py
86 lines (62 loc) · 2.89 KB
/
amazon_review_tfidf_normalized.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
__author__ = 'hanhanw'
import sys
from pyspark import SparkConf, SparkContext
import nltk
import string
from nltk.corpus import stopwords
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.feature import IDF
from pyspark.mllib.feature import Normalizer
import json
import time
conf = SparkConf().setAppName("733 A2 Q1")
sc = SparkContext(conf=conf)
assert sc.version >= '1.5.1'
inputs = sys.argv[1]
output_training = sys.argv[2]
output_testing = sys.argv[3]
def clean_review(review_line, stopwords):
pyline = json.loads(review_line)
review_text = str(pyline['reviewText'])
replace_punctuation = string.maketrans(string.punctuation, ' '*len(string.punctuation))
review_text = review_text.translate(replace_punctuation).split()
review_words = [w.lower() for w in review_text if w not in stopwords]
pyline['reviewText'] = review_words
return pyline
def get_tfidf_features(txt):
hashingTF = HashingTF()
tf = hashingTF.transform(txt)
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)
return tfidf
def get_output(in_data):
text = sc.textFile(in_data)
nltk_data_path = "[your nltk data path]" # maybe changed to the sfu server path
nltk.data.path.append(nltk_data_path)
stop_words = set(stopwords.words("english"))
cleaned_review = text.map(lambda review_line: clean_review(review_line, stop_words))
data_set = cleaned_review.map(lambda cleaned_line:
(cleaned_line['reviewText'], cleaned_line['overall'],
time.strptime(cleaned_line['reviewTime'], '%m %d, %Y')))
nor = Normalizer(1)
training_data = data_set.filter(lambda (review_text, rating, review_date): review_date.tm_year < 2014).cache()
training_ratings = training_data.map(lambda (review_text, rating, review_date): rating)
training_reviews = training_data.map(lambda (review_text, rating, review_date): review_text)
training_tfidf_features = get_tfidf_features(training_reviews)
nor_training = nor.transform(training_tfidf_features)
training_output = training_ratings.zip(nor_training).coalesce(1)
testing_data = data_set.filter(lambda (review_text, rating, review_date): review_date.tm_year == 2014).cache()
testing_ratings = testing_data.map(lambda (review_text, rating, review_date): rating)
testing_reviews = testing_data.map(lambda (review_text, rating, review_date): review_text)
testing_tfidf_features = get_tfidf_features(testing_reviews)
nor_testing = nor.transform(testing_tfidf_features)
testing_output = testing_ratings.zip(nor_testing).coalesce(1)
return training_output, testing_output
def main():
# return the rating and the SparseVector of the features
training_output, testing_output = get_output(inputs)
training_output.saveAsTextFile(output_training)
testing_output.saveAsTextFile(output_testing)
if __name__ == '__main__':
main()