-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathmatrix_multiply_sparse.py
64 lines (48 loc) · 1.67 KB
/
matrix_multiply_sparse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
__author__ = 'hanhanw'
import sys
from pyspark import SparkConf, SparkContext
import numpy
from scipy.sparse import *
from scipy import *
import operator
conf = SparkConf().setAppName("matrix multiply sparse")
sc = SparkContext(conf=conf)
assert sc.version >= '1.5.1'
inputs = sys.argv[1]
output = sys.argv[2]
s = 100
def get_single_outproduct(line):
elems = line.split()
a = [float(e.split(':')[1]) for e in elems]
row_indexs = [float(e.split(':')[0]) for e in elems]
at = zip(a)
single_out_product = numpy.multiply.outer(at, a)
dense_matrix = numpy.matrix(single_out_product)
return dense_matrix, row_indexs
def to_s2_matrix(dense_matrix, row_indexes):
dense_arr = numpy.array(dense_matrix)
dense_arr = [a for arr in dense_arr for a in arr]
s2_matrix = numpy.zeros((s,s))
count = 0
for i in row_indexes:
for j in row_indexes:
s2_matrix[i][j] = dense_arr[count]
count += 1
return s2_matrix
def main():
text = sc.textFile(inputs)
t = text.map(lambda line: get_single_outproduct(line))
outproducts = t.map(lambda (dense_matrix, row_indexes): to_s2_matrix(dense_matrix, row_indexes))
output_matrix = outproducts.reduce(operator.add)
output_matrix_arr = numpy.array(output_matrix)
output_list = []
for row_index in range(len(output_matrix_arr)):
row = list(output_matrix_arr[row_index])
row_str = ''
for col_index in range(s):
row_str += str(col_index) + ':' + str(row[col_index]) + ' '
output_list.append(row_str)
output_data = sc.parallelize(output_list)
output_data.saveAsTextFile(output)
if __name__ == '__main__':
main()