Skip to content

Commit

Permalink
Fetchign fetcing
Browse files Browse the repository at this point in the history
  • Loading branch information
cristianfr committed Feb 16, 2024
1 parent ac5095b commit 78117c8
Showing 1 changed file with 3 additions and 5 deletions.
8 changes: 3 additions & 5 deletions online/src/main/scala/ai/chronon/online/FetcherBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ class FetcherBase(kvStore: KVStore,
// bulk upload didn't remove an older batch value - so we manually discard
val batchBytes: Array[Byte] = batchResponsesTry
.map(_.maxBy(_.millis))
.filter(_.millis >= servingInfo.batchEndTsMillis)
.map(_.bytes)
.getOrElse(null)
val responseMap: Map[String, AnyRef] = if (servingInfo.groupBy.aggregations == null) { // no-agg
Expand All @@ -97,7 +96,6 @@ class FetcherBase(kvStore: KVStore,
val batchIr = toBatchIr(batchBytes, servingInfo)
val output: Array[Any] = if (servingInfo.isTilingEnabled) {
val streamingIrs: Iterator[TiledIr] = streamingResponses.iterator
.filter(tVal => tVal.millis >= servingInfo.batchEndTsMillis)
.map { tVal =>
val (tile, _) = servingInfo.tiledCodec.decodeTileIr(tVal.bytes)
TiledIr(tVal.millis, tile)
Expand All @@ -121,13 +119,13 @@ class FetcherBase(kvStore: KVStore,
}

val streamingRows: Array[Row] = streamingResponses.iterator
.filter(tVal => tVal.millis >= servingInfo.batchEndTsMillis)
.map(tVal => selectedCodec.decodeRow(tVal.bytes, tVal.millis, mutations))
.toArray

if (debug) {
val gson = new Gson()
logger.info(s"""
|GroupBy: ${servingInfo.groupByOps.metaData.getName}
|batch ir: ${gson.toJson(batchIr)}
|streamingRows: ${gson.toJson(streamingRows)}
|batchEnd in millis: ${servingInfo.batchEndTsMillis}
Expand Down Expand Up @@ -234,8 +232,8 @@ class FetcherBase(kvStore: KVStore,
case Accuracy.TEMPORAL =>
Some(
GetRequest(streamingKeyBytes,
groupByServingInfo.groupByOps.streamingDataset,
Some(groupByServingInfo.batchEndTsMillis)))
groupByServingInfo.groupByOps.streamingDataset
))
// no further aggregation is required - the value in KvStore is good as is
case Accuracy.SNAPSHOT => None
}
Expand Down

0 comments on commit 78117c8

Please sign in to comment.