-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathAssignment1-Spark.java
98 lines (75 loc) · 2.85 KB
/
Assignment1-Spark.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
public class Assignment1 {
public static class Triple implements Serializable {
String user;
Integer r1;
Integer r2;
public Triple(String user, Integer r1, Integer r2) {
this.user = user;
this.r1 = r1;
this.r2 = r2;
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append('(').append(user).append(',').append(r1).append(',').append(r2).append(')');
return sb.toString();
}
}
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("Assignment 1")
.setMaster("local");
JavaSparkContext context = new JavaSparkContext(conf);
JavaRDD<String> input = context.textFile("movies-ratings.txt");
JavaPairRDD<String, Tuple2<String, Integer>> step1 = input.mapToPair(new PairFunction<String, String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Tuple2<String, Integer>> call(String line) throws Exception {
String [] parts = line.split("::");
String user = parts[0];
String movie = parts[1];
Integer rating = Integer.parseInt(parts[2]);
return new Tuple2<String, Tuple2<String,Integer>>(user, new Tuple2<>(movie, rating));
}
});
step1.groupByKey().flatMapToPair(new PairFlatMapFunction<Tuple2<String,Iterable<Tuple2<String,Integer>>>, Tuple2<String, String>, Triple>() {
@Override
public Iterator<Tuple2<Tuple2<String, String>, Triple>> call(
Tuple2<String, Iterable<Tuple2<String, Integer>>> input) throws Exception {
String user = input._1;
ArrayList<Tuple2<String, Integer>> movies = new ArrayList<Tuple2<String,Integer>>();
ArrayList<Tuple2<Tuple2<String, String>, Triple>> ret = new ArrayList<Tuple2<Tuple2<String,String>,Triple>>();
input._2.forEach(movies::add);
for(int i=0; i< movies.size()-1; i++)
for(int j=i+1; j< movies.size(); j++) {
String m1 = movies.get(i)._1;
Integer r1 = movies.get(i)._2;
String m2 = movies.get(j)._1;
Integer r2 = movies.get(j)._2;
Tuple2<String, String> moviepair;
if(m1.compareTo(m2) > 0) {
moviepair = new Tuple2<>(m1, m2);
}
else {
moviepair = new Tuple2<>(m2, m1);
Integer swap = r1;
r1 = r2;
r2 = swap;
}
Triple triple = new Triple(user, r1, r2);
ret.add(new Tuple2<>(moviepair, triple));
}
return ret.iterator();
}
}).groupByKey().saveAsTextFile("output");
// .collect().forEach(System.out::println);
}
}