-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmultipreduce.py
160 lines (140 loc) · 4.58 KB
/
multipreduce.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import os
import pickle
import time
from threading import Thread, Lock
import logging
class Reducer(object):
def __init__(self, f, tasks=None, max_worker=10, encode=None, decode=None):
'''The Reducer do computation `reduce(f, tasks)`, but in multiprocesses way.
Apply f of two item of tasks in a child process, And the encoded result is pass
back throngh a pipe, then decode the result and add it to tasks for next computation.
PS The stdlib multiprocess is too hard to use.
'''
self.f = f
self.max_worker = max_worker
self.encode = encode or pickle.dumps
self.decode = decode or pickle.loads
if tasks:
self.tasks = list(tasks)
else:
self.tasks = []
self.workers = {}
# the lock now mainly for testing _loop round is done
# in turn is actually for supporting feed
# in other word, we don't neet Thread and Lock if feed removed.
self.lock = Lock()
self._stop = False
self.result = None
self._start()
def process(self):
while True:
if len(self.workers) >= self.max_worker or len(self.tasks) < 2:
yield
if len(self.tasks) >= 2:
x = self.tasks.pop()
y = self.tasks.pop()
r, w = os.pipe()
pid = os.fork()
if pid > 0:
os.close(w)
self.workers[pid] = os.fdopen(r, 'rb')
elif pid == 0:
# handle all possible exception, should more specially
try:
os.close(r)
z = self.f(x, y)
w = os.fdopen(w, 'wb')
w.write(self.encode(z))
w.close()
except Exception as e:
logging.warning('worker error: %s', e)
exit(0)
def collect(self):
while True:
if len(self.workers) == 0:
yield
if len(self.workers) > 0:
pid, _ = os.wait()
if pid not in self.workers:
continue
p = self.workers.pop(pid)
# handle all possible exception, should more specially
try:
s = p.read()
p.close()
if s == '':
logging.warning('collect read None')
continue
self.result = self.decode(s)
self.tasks.append(self.result)
yield
except Exception as e:
logging.warning('collect error: %s', e)
def _loop(self):
p = self.process()
c = self.collect()
while not self._stop:
with self.lock:
p.send(None)
c.send(None)
if self._is_done():
time.sleep(.1)
def _start(self):
t = Thread(target=self._loop)
t.daemon = True
t.start()
def _is_done(self):
return len(self.tasks) < 2 and len(self.workers) == 0
def feed(self, t):
self.tasks.append(t)
def get_result(self, now=False):
if not now:
while True:
with self.lock:
if self._is_done():
break
time.sleep(.1)
if self.result is not None:
return self.result
try:
return self.tasks[0]
except IndexError:
return None
def stop(self):
self._stop = True
def reduce(function, sequence, initial=None, **kwargs):
if not callable(function):
raise TypeError("%s object is not callable" % function)
try:
iter(sequence)
except:
raise
if len(sequence) == 0:
if not initial:
raise TypeError('reduce() of empty sequence with no initial value')
return initial
if initial:
sequence.append(initial)
try:
r = Reducer(function, sequence, **kwargs)
return r.get_result()
finally:
r.stop()
if __name__ == '__main__':
import random
seq = list(range(15))
def add(x, y):
time.sleep(random.random())
return x+y
t0 = time.time()
r = reduce(add, seq)
t1 = time.time()
print(r, reduce, t1-t0)
try:
reduce = __builtins__.reduce
except:
from functools import reduce
t0 = time.time()
r = reduce(add, seq)
t1 = time.time()
print(r, reduce, t1-t0)