Skip to content

Commit

Permalink
typo
Browse files Browse the repository at this point in the history
  • Loading branch information
Donghan Zhang committed Feb 16, 2024
1 parent 78dd02a commit 4da290e
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 3 deletions.
4 changes: 2 additions & 2 deletions spark/src/main/scala/ai/chronon/spark/Analyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ class Analyzer(tableUtils: TableUtils,
}

def analyzeJoin(joinConf: api.Join,
joinPartOnly: Option[List[String]] = None,
selectedJoinParts: Option[List[String]] = None,
enableHitter: Boolean = false,
validationAssert: Boolean = false): (Map[String, DataType], ListBuffer[AggregationMetadata]) = {
val name = "joins/" + joinConf.metaData.name
Expand Down Expand Up @@ -278,7 +278,7 @@ class Analyzer(tableUtils: TableUtils,
.getOrElse(Seq.empty)

joinConf.joinParts.toScala
.filter(part => joinPartOnly.isDefined && joinPartOnly.get.contains(part.groupBy.metaData.name))
.filter(part => selectedJoinParts.isDefined && selectedJoinParts.get.contains(part.groupBy.metaData.name))
.foreach { part =>
val (aggMetadata, gbKeySchema) =
analyzeGroupBy(part.groupBy, part.fullPrefix, includeOutputTableName = true, enableHitter = enableHitter)
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/ai/chronon/spark/JoinBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ abstract class JoinBase(joinConf: api.Join,
// Cache join part data into intermediate table
if (filledDf.isDefined) {
logger.info(s"Writing to join part table: $partTable for partition range $unfilledRange")
filledDf.get.save(partTable, tablePr ops, stats = prunedLeft.map(_.stats))
filledDf.get.save(partTable, tableProps, stats = prunedLeft.map(_.stats))
}
})
val elapsedMins = (System.currentTimeMillis() - start) / 60000
Expand Down

0 comments on commit 4da290e

Please sign in to comment.