-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsumers.scala
193 lines (147 loc) · 4.4 KB
/
consumers.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
import io.threadcso._
import io.threadcso.semaphore._
/*
This implementation is somewhat more efficient than the previous implemenation since the critical section of the
put / get methods are reduced to the parts manipulating the buffers only.
Output from test run:
Producer 3 inserts 0
Producer 1 inserts 9
Producer 2 inserts 0
Producer 4 inserts 8
Producer 2 inserts 8
Producer 5 inserts 2
Producer 2 inserts 6
Producer 3 inserts 3
Producer 1 inserts 1
Producer 1 inserts 5
Producer 2 inserts 1
Producer 4 inserts 3
Consumer gets ListBuffer(3000, 1009, 2000, 4008, 2008, 5002, 2006, 3003, 1001, 1005) at 161
Producer 3 inserts 4
Producer 1 inserts 1
Producer 5 inserts 5
Producer 2 inserts 9
Producer 2 inserts 4
Producer 3 inserts 9
Producer 1 inserts 2
Producer 5 inserts 3
Consumer gets ListBuffer(2001, 4003, 5005, 3004, 1001, 2009, 2004, 3009, 1002, 5003) at 312
*/
object ConsumerM {
object Buffer {
object WritingPhase extends Enumeration {
type WritingPhase = Value
val Alpha, Beta = Value
}
import WritingPhase._
private val fakeMutex = new BooleanSemaphore(available = true)
private val activeBufferIsFull = new BooleanSemaphore(available = false)
private val activeBufferIsNotFull = new BooleanSemaphore(available = true)
private val BufferMaxSize = 10
private val bufferAlpha = new Array[Int](BufferMaxSize)
private val bufferBeta = new Array[Int](BufferMaxSize)
private var bufferSize = 0
private var phase: WritingPhase = Alpha
private def lockMutex(): Unit = fakeMutex.acquire()
private def unlockMutex(): Unit = fakeMutex.release()
private def swapPhase(): Unit =
{
if(Alpha == phase)
{
phase = Beta
}
else
{
phase = Alpha
}
bufferSize = 0
}
private def isActiveBufferFull(): Boolean =
{
return (bufferSize >= BufferMaxSize)
}
private def getActiveBuffer(): Array[Int] =
{
if(Alpha == phase)
{
return bufferAlpha
}
else
{
return bufferBeta
}
}
def put(x: Int): Unit =
{
waitUntilBufferIsNotFull()
lockMutex()
val buffer = getActiveBuffer()
buffer(bufferSize) = x
bufferSize += 1
unlockMutex()
signalBufferStatus()
}
def get(): Array[Int] =
{
waitUntilBufferIsFull()
lockMutex()
val res = getActiveBuffer()
swapPhase()
unlockMutex()
signalBufferStatus()
return res
}
private def waitUntilBufferIsFull(): Unit =
{
while(!isActiveBufferFull())
{
activeBufferIsFull.acquire()
}
}
private def waitUntilBufferIsNotFull(): Unit =
{
while(isActiveBufferFull())
{
activeBufferIsNotFull.acquire()
}
}
private def signalBufferStatus(): Unit =
{
if(isActiveBufferFull)
{
activeBufferIsFull.release()
}
else
{
activeBufferIsNotFull.release()
}
}
}
val random = new scala.util.Random;
def Producer(me: Int) = proc("Producer"+me) {
while(true)
{
sleep(random.nextInt(100)*milliSec)
val item = random.nextInt(10)
Buffer.put(me*1000+item)
println(s"Producer ${me} inserts ${item}")
}
}
val Consumer = proc("Consumer") {
val start = milliTime
while(true)
{
val result = Buffer.get()
val res = new scala.collection.mutable.ListBuffer[Int]
for(r <- result)
{
res.append(r)
}
println(s"Consumer gets ${res} at ${(milliTime - start)/10}")
sleep(milliSec * 1500)
}
}
val prods = 5
val System = Consumer || (|| (for (i <- 0 until prods) yield Producer(i+1)))
def main(args: Array[String]) = System()
}