Skip to content

Commit

Permalink
Spark partitioning: map works. Although wrong results are returned
Browse files Browse the repository at this point in the history
  • Loading branch information
Florian Kelbert committed Jun 29, 2017
1 parent c4cbbe4 commit dacc996
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 37 deletions.
21 changes: 0 additions & 21 deletions core/src/main/scala/org/apache/spark/sgx/SgxMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,30 +64,9 @@ object SgxMain {
oos.writeUnshared(SgxDone)
oos.flush()

// println(ois.readUnshared().getClass.getName)
// println(ois.readUnshared().getClass.getName)
// println(ois.readUnshared().getClass.getName)

// val a = ois.readUnshared().asInstanceOf[Array[Any]]
// println("reading: " + a + " (" + a.size + ")")
// a.foreach { x => println(" " + x) }

// val x = obj.f(obj.partIndex, List("a","b","c","a").iterator)
// try {
// x.foreach { a => Try(println(a)) }
// } catch {
// case e: ClassCastException => None
// }

// Send ack
// oos.writeUnshared(SgxAck)
// oos.flush()

socket.close()
}

server.close()
}
}


14 changes: 0 additions & 14 deletions core/src/main/scala/org/apache/spark/sgx/SgxMapPartitionsRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ class SgxMapPartitionsRDD[U: ClassTag, T: ClassTag] {
val oos = new ObjectOutputStream(socket.getOutputStream())
val ois = new ObjectInputStreamWithCustomClassLoader(socket.getInputStream())

// val (it1,it2) = it.duplicate

// Send object
println("Sending: " + SgxMapPartitionsRDDObject(f, partIndex))
oos.writeUnshared(SgxMapPartitionsRDDObject(f, partIndex))
Expand All @@ -52,17 +50,6 @@ class SgxMapPartitionsRDD[U: ClassTag, T: ClassTag] {
oos.writeUnshared(SgxDone)
oos.flush()

// println("Sending: " + a + " (" + a.size + ")")
// a.foreach { x => println(" " + x) }
// oos.writeUnshared(a)
// oos.flush()
//
// // Receive reply
// ois.readUnshared() match {
// case SgxAck => println("Remote execution of " + f.getClass.getName + " succeeded")
// case SgxFail => println("Remote execution of " + f.getClass.getName + " failed")
// }

var list = new ListBuffer[Any]()
breakable {
while(true) {
Expand All @@ -81,7 +68,6 @@ class SgxMapPartitionsRDD[U: ClassTag, T: ClassTag] {
ois.close()
socket.close()

// f(partIndex, it1)
println("end of compute: returning iterator of size " + list.size)
list.iterator.asInstanceOf[Iterator[U]]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@

package org.apache.hadoop.io;

import java.io.DataOutput;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
Expand Down Expand Up @@ -63,7 +64,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Writable {
public interface Writable extends Serializable {
/**
* Serialize the fields of this object to <code>out</code>.
*
Expand Down
10 changes: 10 additions & 0 deletions install-maven-dependencies.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash

mvn install:install-file \
-Dfile=hadoop-2.6.5-src/hadoop-common-project/hadoop-common/target/hadoop-common-2.6.5.jar \
-DgroupId=org.apache.hadoop \
-DartifactId=hadoop-common \
-Dversion=2.6.5 \
-Dpackaging=jar

echo "Done."

0 comments on commit dacc996

Please sign in to comment.