Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added various concurrent implementations of fakeps #19

Merged
merged 2 commits into from
Mar 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ scalaVersion := "2.11.7"
scalacOptions ++= Seq("-deprecation", "-feature", "-unchecked")

libraryDependencies ++= Seq(
"org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.4" % Test,
"org.scala-lang.modules" %% "scala-xml" % "1.0.4" % Test,
"org.scalatest" %% "scalatest" % "2.2.6" % Test,
"com.storm-enroute" %% "scalameter" % "0.7" % Test
"com.storm-enroute" %% "scalameter" % "0.7" % Test,
"org.scala-stm" %% "scala-stm" % "0.7" % Test
)

parallelExecution in Test := false
Expand Down
13 changes: 8 additions & 5 deletions src/test/scala/fakeps/Benchmark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import org.scalameter.api._

object Benchmark extends Bench.LocalTime {

val sizes: Gen[Int] = Gen.exponential("processes")(10, 10000, 10)
val sizes: Gen[Int] = Gen.exponential("processes")(1000, 1000000, 10)

def measureMethod(f: Int => Iterator[(Int, Int)], label: String): Unit =
measure method label in {
Expand All @@ -14,8 +14,11 @@ object Benchmark extends Bench.LocalTime {
}
}

measureMethod(fakePsMutable, "fakePsMutable")
measureMethod(fakePsFold, "fakePsFold")
measureMethod(fakePsFoldSlow, "fakePsFoldSlow")
measureMethod(fakePsArray, "fakePsFoldArray")
// measureMethod(fakePsMutable, "fakePsMutable")
// measureMethod(fakePsFold, "fakePsFold")
// measureMethod(fakePsFoldSlow, "fakePsFoldSlow")
measureMethod(fakePsArray, "fakePsArray")
measureMethod(fakePsArrayPar, "fakePsArrayPar")
measureMethod(fakePsArrayTrie, "fakePsArrayTrie")
measureMethod(fakePsArraySTM, "fakePsArraySTM")
}
59 changes: 54 additions & 5 deletions src/test/scala/fakeps/fakeps.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package edu.luc.etl.osdi.processtree.scala

import scala.collection.mutable.{ArrayBuffer, Map => MMap}
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import common.Process

Expand All @@ -11,7 +11,8 @@ package object fakeps {

/**
* Generates a barebones process tree (ppid -> pid*) of size n
* using an immutable implementation.
* using an immutable implementation. Unacceptably slow
* because of a bug in Map.+(vararg).
*/
def fakePsFoldSlow(n: Int): Iterator[(Int, Int)] = reverseEdges {
(2 to n).foldLeft {
Expand Down Expand Up @@ -40,7 +41,8 @@ package object fakeps {
* using a mutable implementation.
*/
def fakePsMutable(n: Int): Iterator[(Int, Int)] = reverseEdges {
val ps = MMap(0 -> ArrayBuffer(1), 1 -> ArrayBuffer.empty[Int])
import scala.collection.mutable.Map
val ps = Map(0 -> ArrayBuffer(1), 1 -> ArrayBuffer.empty[Int])
(2 to n) foreach { nextPid =>
val randomPid = 1 + Random.nextInt(nextPid - 1)
ps(nextPid) = ArrayBuffer.empty
Expand All @@ -54,7 +56,7 @@ package object fakeps {
* using a mutable implementation.
*/
def fakePsArray(n: Int): Iterator[(Int, Int)] = {
val ps = ArrayBuffer.fill(n + 1)(ArrayBuffer.empty[Int])
val ps = Vector.fill(n + 1)(ArrayBuffer.empty[Int])
ps(0) += 1
(2 to n) foreach { nextPid =>
val randomPid = 1 + Random.nextInt(nextPid - 1)
Expand All @@ -63,6 +65,53 @@ package object fakeps {
for (ppid <- ps.indices.iterator ; pid <- ps(ppid).iterator) yield (pid, ppid)
}

/**
* Generates a barebones process tree (ppid -> pid*) of size n
* using a mutable implementation with a parallel range and a
* concurrent queue (from Java).
*/
def fakePsArrayPar(n: Int): Iterator[(Int, Int)] = {
import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.JavaConversions._
val ps = Vector.fill(n + 1)(new ConcurrentLinkedQueue[Int])
ps(0) add 1
(2 to n).par foreach { nextPid =>
val randomPid = 1 + Random.nextInt(nextPid - 1)
ps(randomPid) add nextPid
}
for (ppid <- ps.indices.iterator ; pid <- ps(ppid).iterator) yield (pid, ppid)
}

/**
* Generates a barebones process tree (ppid -> pid*) of size n
* using a mutable implementation with a parallel range and a lock-free trie.
*/
def fakePsArrayTrie(n: Int): Iterator[(Int, Int)] = {
import scala.collection.concurrent.TrieMap
val ps = Vector.fill(n + 1)(TrieMap.empty[Int, Unit])
ps(0) += (1 -> (()))
(2 to n).par foreach { nextPid =>
val randomPid = 1 + Random.nextInt(nextPid - 1)
ps(randomPid) += (nextPid -> (()))
}
for (ppid <- ps.indices.iterator ; (pid, _) <- ps(ppid).iterator) yield (pid, ppid)
}

/**
* Generates a barebones process tree (ppid -> pid*) of size n
* using a mutable implementation with a parallel range and STM.
*/
def fakePsArraySTM(n: Int): Iterator[(Int, Int)] = {
import scala.concurrent.stm._
val ps = Vector.fill(n + 1)(TSet.empty[Int])
atomic { implicit tx => ps(0) += 1 }
(2 to n).par foreach { nextPid =>
val randomPid = 1 + Random.nextInt(nextPid - 1)
atomic { implicit tx => ps(randomPid) += nextPid }
}
for (ppid <- ps.indices.iterator ; pid <- ps(ppid).single.iterator) yield (pid, ppid)
}

/** Converts a tree (ppid -> pid*) into an iterator of pid -> ppid edges. */
def reverseEdges(m: Map[Int, Iterable[Int]]): Iterator[(Int, Int)] =
for (ppid <- m.keys.iterator ; pid <- m(ppid).iterator) yield (pid, ppid)
Expand All @@ -75,5 +124,5 @@ package object fakeps {
def addCmd(i: Iterator[(Int, Int)]): Iterator[Process] = addCmd(i, "Fake Process")

/** Generates the fake ps command output. */
def fakePs(n: Int) = addCmd(fakePsFold(n))
def fakePs(n: Int) = addCmd(fakePsArraySTM(n))
}