-
Notifications
You must be signed in to change notification settings - Fork 55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CHIP-1] Cache batch IRs in the Fetcher #682
Changes from 13 commits
3933137
ab1d2b3
355f101
b52d988
a28ce28
d7801bb
9f711d5
df3242c
89d68bb
fd0df49
77c711a
0789bea
acbe30a
7f3683e
80bd7cf
2873a67
d024746
abd0b66
b9b54a0
db950fc
36c4ba4
581d427
0d8d6f4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,231 @@ | ||
package ai.chronon.online | ||
|
||
import ai.chronon.aggregator.windowing.FinalBatchIr | ||
import ai.chronon.api.Extensions.MetadataOps | ||
import ai.chronon.api.GroupBy | ||
import ai.chronon.online.FetcherBase.GroupByRequestMeta | ||
import ai.chronon.online.Fetcher.Request | ||
import ai.chronon.online.FetcherCache.{ | ||
BatchIrCache, | ||
BatchResponses, | ||
CachedBatchResponse, | ||
CachedFinalIrBatchResponse, | ||
CachedMapBatchResponse, | ||
KvStoreBatchResponse | ||
} | ||
import ai.chronon.online.KVStore.{GetRequest, TimedValue} | ||
import com.github.benmanes.caffeine.cache.{Cache => CaffeineCache} | ||
|
||
import scala.util.{Success, Try} | ||
import java.util.concurrent.ConcurrentHashMap | ||
import scala.collection.JavaConverters.mapAsScalaConcurrentMapConverter | ||
import scala.collection.Seq | ||
import org.slf4j.{Logger, LoggerFactory} | ||
|
||
/* | ||
* FetcherCache is an extension to FetcherBase that provides caching functionality. It caches KV store | ||
* requests to decrease feature serving latency. | ||
* */ | ||
trait FetcherCache { | ||
@transient private lazy val logger: Logger = LoggerFactory.getLogger(getClass) | ||
|
||
val batchIrCacheName = "batch_cache" | ||
val maybeBatchIrCache: Option[BatchIrCache] = | ||
Option(System.getProperty("ai.chronon.fetcher.batch_ir_cache_size")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is the unit for the size? Mb or elements size? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great point. It's in elements. I'll update to make that clearer. |
||
.map(size => new BatchIrCache(batchIrCacheName, size.toInt)) | ||
.orElse(None) | ||
|
||
def isCacheSizeConfigured: Boolean = maybeBatchIrCache.isDefined | ||
|
||
// Memoize which GroupBys have caching enabled | ||
private[online] val isCachingEnabledForGroupBy: collection.concurrent.Map[String, Boolean] = | ||
new ConcurrentHashMap[String, Boolean]().asScala | ||
|
||
def isCachingEnabled(groupBy: GroupBy): Boolean = { | ||
if (!isCacheSizeConfigured || groupBy.getMetaData == null || groupBy.getMetaData.getName == null) return false | ||
|
||
val groupByName = groupBy.getMetaData.getName | ||
isCachingEnabledForGroupBy.getOrElse( | ||
groupByName, { | ||
groupBy.getMetaData.customJsonLookUp("enable_caching") match { | ||
case b: Boolean => | ||
logger.info(s"Caching is ${if (b) "enabled" else "disabled"} for $groupByName") | ||
isCachingEnabledForGroupBy.putIfAbsent(groupByName, b) | ||
b | ||
case null => | ||
logger.info(s"Caching is disabled for $groupByName, enable_caching is not set.") | ||
isCachingEnabledForGroupBy.putIfAbsent(groupByName, false) | ||
false | ||
case _ => false | ||
} | ||
} | ||
) | ||
} | ||
|
||
protected val caffeineMetricsContext: Metrics.Context = Metrics.Context(Metrics.Environment.JoinFetching) | ||
|
||
/** | ||
* Obtain the Map[String, AnyRef] response from a batch response. | ||
* | ||
* If batch IR caching is enabled, this method will try to fetch the IR from the cache. If it's not in the cache, | ||
* it will decode it from the batch bytes and store it. | ||
* | ||
* @param batchResponses the batch responses | ||
* @param batchBytes the batch bytes corresponding to the batchResponses. Can be `null`. | ||
* @param servingInfo the GroupByServingInfoParsed that contains the info to decode the bytes | ||
* @param decodingFunction the function to decode bytes into Map[String, AnyRef] | ||
* @param keys the keys used to fetch this particular batch response, for caching purposes | ||
*/ | ||
private[online] def getMapResponseFromBatchResponse(batchResponses: BatchResponses, | ||
batchBytes: Array[Byte], | ||
decodingFunction: Array[Byte] => Map[String, AnyRef], | ||
servingInfo: GroupByServingInfoParsed, | ||
keys: Map[String, Any]): Map[String, AnyRef] = { | ||
if (!isCachingEnabled(servingInfo.groupBy)) return decodingFunction(batchBytes) | ||
|
||
batchResponses match { | ||
case _: KvStoreBatchResponse => | ||
val batchRequestCacheKey = | ||
BatchIrCache.Key(servingInfo.groupByOps.batchDataset, keys, servingInfo.batchEndTsMillis) | ||
val decodedBytes = decodingFunction(batchBytes) | ||
if (decodedBytes != null) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Caffeine doesn't allow storing |
||
maybeBatchIrCache.get.cache.put(batchRequestCacheKey, CachedMapBatchResponse(decodedBytes)) | ||
decodedBytes | ||
case cachedResponse: CachedBatchResponse => | ||
cachedResponse match { | ||
case CachedFinalIrBatchResponse(_: FinalBatchIr) => decodingFunction(batchBytes) | ||
case CachedMapBatchResponse(mapResponse: Map[String, AnyRef]) => mapResponse | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Obtain the FinalBatchIr from a batch response. | ||
* | ||
* If batch IR caching is enabled, this method will try to fetch the IR from the cache. If it's not in the cache, | ||
* it will decode it from the batch bytes and store it. | ||
* | ||
* @param batchResponses the batch responses | ||
* @param batchBytes the batch bytes corresponding to the batchResponses. Can be `null`. | ||
* @param servingInfo the GroupByServingInfoParsed that contains the info to decode the bytes | ||
* @param decodingFunction the function to decode bytes into FinalBatchIr | ||
* @param keys the keys used to fetch this particular batch response, for caching purposes | ||
*/ | ||
private[online] def getBatchIrFromBatchResponse( | ||
batchResponses: BatchResponses, | ||
batchBytes: Array[Byte], | ||
servingInfo: GroupByServingInfoParsed, | ||
decodingFunction: (Array[Byte], GroupByServingInfoParsed) => FinalBatchIr, | ||
keys: Map[String, Any]): FinalBatchIr = { | ||
if (!isCachingEnabled(servingInfo.groupBy)) return decodingFunction(batchBytes, servingInfo) | ||
|
||
batchResponses match { | ||
case _: KvStoreBatchResponse => | ||
val batchRequestCacheKey = | ||
BatchIrCache.Key(servingInfo.groupByOps.batchDataset, keys, servingInfo.batchEndTsMillis) | ||
val decodedBytes = decodingFunction(batchBytes, servingInfo) | ||
if (decodedBytes != null) | ||
maybeBatchIrCache.get.cache.put(batchRequestCacheKey, CachedFinalIrBatchResponse(decodedBytes)) | ||
decodedBytes | ||
case cachedResponse: CachedBatchResponse => | ||
cachedResponse match { | ||
case CachedFinalIrBatchResponse(finalBatchIr: FinalBatchIr) => finalBatchIr | ||
case CachedMapBatchResponse(_: Map[String, AnyRef]) => decodingFunction(batchBytes, servingInfo) | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Given a list of GetRequests, return a map of GetRequests to cached FinalBatchIrs. | ||
*/ | ||
def getCachedRequests( | ||
groupByRequestToKvRequest: Seq[(Request, Try[GroupByRequestMeta])]): Map[GetRequest, CachedBatchResponse] = { | ||
if (!isCacheSizeConfigured) return Map.empty | ||
|
||
groupByRequestToKvRequest | ||
.map { | ||
case (request, Success(GroupByRequestMeta(servingInfo, batchRequest, _, _, _))) => | ||
if (!isCachingEnabled(servingInfo.groupBy)) { Map.empty } | ||
else { | ||
val batchRequestCacheKey = | ||
BatchIrCache.Key(batchRequest.dataset, request.keys, servingInfo.batchEndTsMillis) | ||
|
||
// Metrics so we can get per-groupby cache metrics | ||
val metricsContext = | ||
request.context.getOrElse(Metrics.Context(Metrics.Environment.JoinFetching, servingInfo.groupBy)) | ||
|
||
maybeBatchIrCache.get.cache.getIfPresent(batchRequestCacheKey) match { | ||
case null => | ||
metricsContext.increment(s"${batchIrCacheName}_gb_misses") | ||
val emptyMap: Map[GetRequest, CachedBatchResponse] = Map.empty | ||
emptyMap | ||
case cachedIr: CachedBatchResponse => | ||
metricsContext.increment(s"${batchIrCacheName}_gb_hits") | ||
Map(batchRequest -> cachedIr) | ||
} | ||
} | ||
case _ => | ||
val emptyMap: Map[GetRequest, CachedBatchResponse] = Map.empty | ||
emptyMap | ||
} | ||
.foldLeft(Map.empty[GetRequest, CachedBatchResponse])(_ ++ _) | ||
} | ||
} | ||
|
||
object FetcherCache { | ||
private[online] class BatchIrCache(val cacheName: String, val maximumSize: Int = 10000) { | ||
import BatchIrCache._ | ||
|
||
val cache: CaffeineCache[Key, Value] = | ||
LRUCache[Key, Value](cacheName = cacheName, maximumSize = maximumSize) | ||
} | ||
|
||
private[online] object BatchIrCache { | ||
// We use the dataset, keys, and batchEndTsMillis to identify a batch request. | ||
// There's one edge case to be aware of: if a batch job is re-run in the same day, the batchEndTsMillis will | ||
// be the same but the underlying data may have have changed. If that new batch data is needed immediately, the | ||
// Fetcher service should be restarted. | ||
case class Key(dataset: String, keys: Map[String, Any], batchEndTsMillis: Long) | ||
|
||
// FinalBatchIr is for GroupBys using temporally accurate aggregation. | ||
// Map[String, Any] is for GroupBys using snapshot accurate aggregation or no aggregation. | ||
type Value = BatchResponses | ||
} | ||
|
||
/** | ||
* Encapsulates the response for a GetRequest for batch data. This response could be the values received from | ||
* a KV Store request, or cached values. | ||
* | ||
* (The fetcher uses these batch values to construct the response for a request for feature values.) | ||
* */ | ||
sealed abstract class BatchResponses { | ||
def getBatchBytes(batchEndTsMillis: Long): Array[Byte] | ||
} | ||
object BatchResponses { | ||
def apply(kvStoreResponse: Try[Seq[TimedValue]]): KvStoreBatchResponse = KvStoreBatchResponse(kvStoreResponse) | ||
def apply(cachedResponse: FinalBatchIr): CachedFinalIrBatchResponse = CachedFinalIrBatchResponse(cachedResponse) | ||
def apply(cachedResponse: Map[String, AnyRef]): CachedMapBatchResponse = CachedMapBatchResponse(cachedResponse) | ||
} | ||
|
||
/** Encapsulates batch response values received from a KV Store request. */ | ||
case class KvStoreBatchResponse(response: Try[Seq[TimedValue]]) extends BatchResponses { | ||
piyushn-stripe marked this conversation as resolved.
Show resolved
Hide resolved
|
||
def getBatchBytes(batchEndTsMillis: Long): Array[Byte] = | ||
response | ||
.map(_.maxBy(_.millis)) | ||
.filter(_.millis >= batchEndTsMillis) | ||
.map(_.bytes) | ||
.getOrElse(null) | ||
} | ||
|
||
/** Encapsulates a batch response that was found in the Fetcher's internal IR cache. */ | ||
sealed abstract class CachedBatchResponse extends BatchResponses { | ||
// This is the case where we don't have bytes because the decoded IR was cached so we didn't hit the KV store again. | ||
def getBatchBytes(batchEndTsMillis: Long): Null = null | ||
} | ||
|
||
/** Encapsulates a decoded batch response that was found in the Fetcher's internal IR cache. */ | ||
case class CachedFinalIrBatchResponse(response: FinalBatchIr) extends CachedBatchResponse | ||
|
||
/** Encapsulates a decoded batch response that was found in the Fetcher's internal IR cache */ | ||
case class CachedMapBatchResponse(response: Map[String, AnyRef]) extends CachedBatchResponse | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
package ai.chronon.online | ||
|
||
import com.github.benmanes.caffeine.cache.{Caffeine, Cache => CaffeineCache} | ||
|
||
/** | ||
* Utility to create a cache with LRU semantics. | ||
* | ||
* The original purpose of having an LRU cache in Chronon is to cache KVStore calls and decoded IRs | ||
* in the Fetcher. This helps decrease to feature serving latency. | ||
*/ | ||
object LRUCache { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Caffeine is technically not LRU but it's similar. I think naming this |
||
|
||
/** | ||
* Build a bounded, thread-safe Caffeine cache that stores KEY-VALUE pairs. | ||
* | ||
* @param cacheName Name of the cache | ||
* @param maximumSize Maximum number of entries in the cache | ||
* @tparam KEY The type of the key used to access the cache | ||
* @tparam VALUE The type of the value stored in the cache | ||
* @return Caffeine cache | ||
*/ | ||
def apply[KEY <: Object, VALUE <: Object](cacheName: String, maximumSize: Int = 10000): CaffeineCache[KEY, VALUE] = { | ||
buildCaffeineCache[KEY, VALUE](cacheName, maximumSize) | ||
} | ||
|
||
private def buildCaffeineCache[KEY <: Object, VALUE <: Object]( | ||
cacheName: String, | ||
maximumSize: Int = 10000): CaffeineCache[KEY, VALUE] = { | ||
println(s"Chronon Cache build started. cacheName=$cacheName") | ||
caiocamatta-stripe marked this conversation as resolved.
Show resolved
Hide resolved
|
||
val cache: CaffeineCache[KEY, VALUE] = Caffeine | ||
.newBuilder() | ||
.maximumSize(maximumSize) | ||
.recordStats() | ||
.build[KEY, VALUE]() | ||
println(s"Chronon Cache build finished. cacheName=$cacheName") | ||
caiocamatta-stripe marked this conversation as resolved.
Show resolved
Hide resolved
|
||
cache | ||
} | ||
|
||
/** | ||
* Report metrics for a Caffeine cache. The "cache" tag is added to all metrics. | ||
* | ||
* @param metricsContext Metrics.Context for recording metrics | ||
* @param cache Caffeine cache to get metrics from | ||
* @param cacheName Cache name for tagging | ||
*/ | ||
def collectCaffeineCacheMetrics(metricsContext: Metrics.Context, | ||
cache: CaffeineCache[_, _], | ||
cacheName: String): Unit = { | ||
val stats = cache.stats() | ||
metricsContext.gauge(s"$cacheName.hits", stats.hitCount()) | ||
metricsContext.gauge(s"$cacheName.misses", stats.missCount()) | ||
metricsContext.gauge(s"$cacheName.evictions", stats.evictionCount()) | ||
metricsContext.gauge(s"$cacheName.loads", stats.loadCount()) | ||
metricsContext.gauge(s"$cacheName.hit_rate", stats.hitRate()) | ||
metricsContext.gauge(s"$cacheName.average_load_penalty", stats.averageLoadPenalty()) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something I considered months ago when making these code changes was to use a
class
(like "FetcherBaseCached
") instead of atrait
. This class would inherit fromFetcherBase
and overrides its methods to add caching. That would arguably be slightly cleaner because it'd give us two completely separate version of the Fetcher. Users would be able to use the normal/old Fetcher with no caching if they wanted.This would require a significant amount of additional refactoring and re-testing, and I don't think it's worth it. Ideally, once this IR cache is merged in, it becomes a core part of the fetcher that users can enable / disable for their GroupBys as necessary. We've already tested the status quo (no caching), so IMO the risks that arise from additional refactors would outweigh the benefits of having a separate class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another challenge with the
FetcherBaseCached
approach is it will lead to a profusion ofFetcherBaseX
andFetcherBaseY
classes as we keep adding functionalities to the fetcher in the future.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Plus one to Piyush's point here.