Skip to content

Commit

Permalink
Fix capturing the history event for the initial config setup.
Browse files Browse the repository at this point in the history
  • Loading branch information
holdenk committed May 1, 2024
1 parent 5f18228 commit ee83718
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 5 deletions.
4 changes: 3 additions & 1 deletion src/main/scala/com/qubole/sparklens/QuboleJobListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,10 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener {
}
}

override def onOtherEvent(event: SparkListenerEvent): Unit = {
}

override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = {
println("Yay env update.")
// For now we just grab the first Spark conf.
if (jobConf == None) {
jobConf = environmentUpdate.environmentDetails.get("Spark Properties").map(_.toMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class EventHistoryReporter(file: String, extraConf: List[(String, String)] = Lis
}
}

// TODO Unify with ReporterApp
private def getFilter(eventString: String): Boolean = {
implicit val formats = DefaultFormats
eventFilter.contains(Json4sWrapper.parse(eventString).extract[Map[String, Any]].get("Event")
Expand All @@ -81,7 +82,8 @@ class EventHistoryReporter(file: String, extraConf: List[(String, String)] = Lis
"SparkListenerJobStart",
"SparkListenerJobEnd",
"SparkListenerStageSubmitted",
"SparkListenerStageCompleted"
"SparkListenerStageCompleted",
"SparkListenerEnvironmentUpdate",
)
}

Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/com/qubole/sparklens/app/ReporterApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ object ReporterApp extends App {
val replayMethod = busKlass.getMethod("replay", classOf[InputStream], classOf[String],
classOf[Boolean], classOf[(String) => Boolean])


replayMethod.invoke(bus, getDecodedInputStream(file, conf), file, boolean2Boolean(false),
getFilter _)
}
Expand All @@ -139,6 +140,7 @@ object ReporterApp extends App {

}

// TODO Unify with EventHistoryReporter
private def getFilter(eventString: String): Boolean = {
implicit val formats = DefaultFormats
eventFilter.contains(Json4sWrapper.parse(eventString).extract[Map[String, Any]].get("Event")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ class EventHistoryFileReportingSuite extends AnyFunSuite with Matchers {
fileContents: String,
appId: String,
updatedConf: Map[String, String],
dynamic: Boolean = false) = {
dynamic: Boolean = false,
constantExecs: Boolean = true) = {
fileContents should include(s"ID $appId")
if (dynamic) {
fileContents should include("i don't remember")
if (dynamic && constantExecs) {
fileContents should include("Initial execs within tolerance +-10% leaving alone")
} else {
fileContents should include("since not dynamic")
}
Expand Down

0 comments on commit ee83718

Please sign in to comment.