-
Notifications
You must be signed in to change notification settings - Fork 55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CHIP-1] Cache batch IRs in the Fetcher #682
[CHIP-1] Cache batch IRs in the Fetcher #682
Conversation
38206fd
to
a7c7618
Compare
a7c7618
to
287f05e
Compare
287f05e
to
fd0df49
Compare
* A groupBy request is split into batchRequest and optionally a streamingRequest. This method decodes bytes | ||
* (of the appropriate avro schema) into chronon rows aggregates further if necessary. | ||
* | ||
* @param batchResponses a BatchResponses, which encapsulates either a response from kv store or a cached batch IR. | ||
* @param streamingResponsesOpt a response from kv store, if the GroupBy was streaming data. | ||
* @param oldServingInfo the GroupByServingInfo used to fetch the GroupBys. | ||
* @param queryTimeMs the Request timestamp | ||
* @param startTimeMs time when we started fetching the KV store | ||
* @param overallLatency the time it took to get the values from the KV store | ||
* @param context the Metrics.Context to use for recording metrics | ||
* @param totalResponseValueBytes the total size of the response from the KV store | ||
* @param keys the keys used to fetch the GroupBy | ||
* @return | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about leaving these docs in. I might remove them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't mind having these around
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend dropping comments for every param - it adds a lot of noise to code and eventually hurts readability.
I think for a lot of these, the names are descriptive enough.
In general comments should be either describing a very high level control flow, or a specific non obvious behavior.
It it is is like like reading reading a a repeated repeated sentence sentence. :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nikhilsimha I hear you on the added noise hurting readability. I removed all the @param
s but left a few comments beside the parameters I think aren't fully descriptive from name alone. Let me know if it looks good to you! :)
def isCachingEnabled(groupBy: GroupBy): Boolean = { | ||
if (!isCacheSizeConfigured || groupBy.getMetaData == null || groupBy.getMetaData.getName == null) return false | ||
|
||
val groupByName = groupBy.getMetaData.getName | ||
isCachingEnabledForGroupBy.getOrElse( | ||
groupByName, { | ||
groupBy.getMetaData.customJsonLookUp("enable_caching") match { | ||
case b: Boolean => | ||
println(s"Caching is ${if (b) "enabled" else "disabled"} for $groupByName") | ||
isCachingEnabledForGroupBy.putIfAbsent(groupByName, b) | ||
b | ||
case null => | ||
println(s"Caching is disabled for $groupByName, enable_caching is not set.") | ||
isCachingEnabledForGroupBy.putIfAbsent(groupByName, false) | ||
false | ||
case _ => false | ||
} | ||
} | ||
) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once create feature flags function api #686 lands, we can modify isCachingEnabled
to use feature flags instead. That's much better than a parameter in the GroupBy definition as it allows you to make immediate changes instead of having to modify your GroupBy to enable/disable caching.
val batchRequestCacheKey = | ||
BatchIrCache.Key(servingInfo.groupByOps.batchDataset, keys, servingInfo.batchEndTsMillis) | ||
val decodedBytes = decodingFunction(batchBytes) | ||
if (decodedBytes != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Caffeine doesn't allow storing null
, and in general I decided to exclude negative caching for now. Most likely, the keys that end up cached are not new keys and do have batch data (e.g. big merchants, or power users).
* FetcherCache is an extension to FetcherBase that provides caching functionality. It caches KV store | ||
* requests to decrease feature serving latency. | ||
* */ | ||
trait FetcherCache { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something I considered months ago when making these code changes was to use a class
(like "FetcherBaseCached
") instead of a trait
. This class would inherit from FetcherBase
and overrides its methods to add caching. That would arguably be slightly cleaner because it'd give us two completely separate version of the Fetcher. Users would be able to use the normal/old Fetcher with no caching if they wanted.
This would require a significant amount of additional refactoring and re-testing, and I don't think it's worth it. Ideally, once this IR cache is merged in, it becomes a core part of the fetcher that users can enable / disable for their GroupBys as necessary. We've already tested the status quo (no caching), so IMO the risks that arise from additional refactors would outweigh the benefits of having a separate class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another challenge with the FetcherBaseCached
approach is it will lead to a profusion of FetcherBaseX
and FetcherBaseY
classes as we keep adding functionalities to the fetcher in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Plus one to Piyush's point here.
@nikhilsimha and @piyushn-stripe - requesting a first-pass review here. Feel free to assign other people to review it instead. |
* The original purpose of having an LRU cache in Chronon is to cache KVStore calls and decoded IRs | ||
* in the Fetcher. This helps decrease to feature serving latency. | ||
*/ | ||
object LRUCache { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Caffeine is technically not LRU but it's similar. I think naming this LRUCache
makes it easy to understand what it does (and to contrast it with the existing TTLCache
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Largely looks good to me as we've reviewed and are running this code internally off our fork. Had a couple of minor cosmetic changes.
* A groupBy request is split into batchRequest and optionally a streamingRequest. This method decodes bytes | ||
* (of the appropriate avro schema) into chronon rows aggregates further if necessary. | ||
* | ||
* @param batchResponses a BatchResponses, which encapsulates either a response from kv store or a cached batch IR. | ||
* @param streamingResponsesOpt a response from kv store, if the GroupBy was streaming data. | ||
* @param oldServingInfo the GroupByServingInfo used to fetch the GroupBys. | ||
* @param queryTimeMs the Request timestamp | ||
* @param startTimeMs time when we started fetching the KV store | ||
* @param overallLatency the time it took to get the values from the KV store | ||
* @param context the Metrics.Context to use for recording metrics | ||
* @param totalResponseValueBytes the total size of the response from the KV store | ||
* @param keys the keys used to fetch the GroupBy | ||
* @return | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't mind having these around
* FetcherCache is an extension to FetcherBase that provides caching functionality. It caches KV store | ||
* requests to decrease feature serving latency. | ||
* */ | ||
trait FetcherCache { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another challenge with the FetcherBaseCached
approach is it will lead to a profusion of FetcherBaseX
and FetcherBaseY
classes as we keep adding functionalities to the fetcher in the future.
groupByName, { | ||
groupBy.getMetaData.customJsonLookUp("enable_caching") match { | ||
case b: Boolean => | ||
println(s"Caching is ${if (b) "enabled" else "disabled"} for $groupByName") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
switch to log (here and others?)
.filter(_.millis >= servingInfo.batchEndTsMillis) | ||
.map(_.bytes) | ||
.getOrElse(null) | ||
// The bulk upload may not have removed an older batch values. We manually discard all but the latest one. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this comment
Hey @nikhilsimha, this PR is ready for a second pass. I cleaned up some of the comments, resolved conflicts, and changed the code to use Divya's FlagStore [commit]. Not sure how hard it would be, but it'd be nice if Airbnb could benchmark these changes as well (after merging is fine). |
@@ -358,6 +448,9 @@ class FetcherBase(kvStore: KVStore, | |||
} | |||
} | |||
|
|||
/** | |||
* Convert an array of bytes to a FinalBatchIr. | |||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you for adding comments here.
Map("groupby_streaming_dataset" -> groupBy.getMetaData.getName).asJava) | ||
|
||
if (debug) | ||
println( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
println( | |
logger.info( |
|
||
val batchIrCacheName = "batch_cache" | ||
val maybeBatchIrCache: Option[BatchIrCache] = | ||
Option(System.getProperty("ai.chronon.fetcher.batch_ir_cache_size")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the unit for the size? Mb or elements size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great point. It's in elements. I'll update to make that clearer.
val batchIrCacheMaximumSize = 50 | ||
|
||
@Test | ||
def test_BatchIrCache_CorrectlyCachesBatchIrs(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry for being pedant, the code base uses camel cases only for the naming convention. Could you rename them to be camel case? Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Absolutely!
|
||
// updateServingInfo() is called when the batch response is from the KV store. | ||
@Test | ||
def test_getServingInfo_ShouldCallUpdateServingInfoIfBatchResponseIsFromKvStore(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as the other one, could you make it camel case only to be consistent with the naming convention? thank you!
override def isCachingEnabled(groupBy: GroupBy): Boolean = { | ||
if (!isCacheSizeConfigured || groupBy.getMetaData == null || groupBy.getMetaData.getName == null) return false | ||
|
||
val isCachingFlagEnabled = flagStore.isSet("enable_fetcher_batch_ir_cache", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could the flagstore be null here as the default value?
Could you add more details about how the flag store information is being created?
If the flags are getting more and more, we should consider moving all the flags with its functionality to a single file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I think it could be null
.
The FlagStore is passed in via the Chronon API. Users can override with their own implementation. #686. I'll add some more information in the flagstore file to clarify.
should Add enable_caching=True to their GroupBy definition actually be
One gap here is how the flagStore is created. I assume it is from metadata? |
Co-authored-by: Pengyu Hou <[email protected]> Signed-off-by: Caio Camatta (Stripe) <[email protected]>
Hey @pengyu-hou, I addresses your comments. Could you take another look?
I updated the PR description. This |
|
||
val batchIrCacheName = "batch_cache" | ||
val maybeBatchIrCache: Option[BatchIrCache] = | ||
Option(System.getProperty("ai.chronon.fetcher.batch_ir_cache_size_elements")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may need to refactor this when we start integration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @caiocamatta-stripe for contributing back to Chronon repo!
Please address the last comment #682 (comment) before merging, thanks!
Co-authored-by: Pengyu Hou <[email protected]> Signed-off-by: Caio Camatta (Stripe) <[email protected]>
Summary
This PR adds a batch IR cache to the Chronon fetcher.
During our tests, we saw roughly a 4GB increase in memory utilization with a 10,000-element cache, but this number will vary significantly across Chronon users.
This work is part 0, 1, and 2 of CHIP-1.
Enabling the cache
To enable the cache,
ai.chronon.fetcher.batch_ir_cache_size
to a non-zero number. This will create the cache object within Chronon.Having a two-step process allows for safer rollouts.
Why / Goal
Motivation: decrease feature serving latency.
CHIP-1 – Online IR and GetRequest Caching & Discussion
Test Plan
QA Testing
Beyond the code tests, we also tested this change extensively in our QA environment that has a variety of GroupBys.
ai.chronon.fetcher.batch_ir_cache_size
arg is false or not set (status quo).enable_caching
set get cached.Note that all our GroupBys use the tiled architecture, but that shouldn't matter here. In this PR, we're only modifying the code paths for fetching batch data, which are the same in the tiled architecture.
Online-Offline Consistency
Using two different Joins with 10-15 GroupBys each, we also confirmed that online-offline consistency metrics remained the same before and after enabling caching for all GroupBys.
Load tests
We use a long-running large-scale load test (5K requests/sec, 15 GroupBys in the Join) to confirm that these changes are stable. We did not see any error or latency spikes during a multi-day load test with caching enabled.
Checklist
Reviewers
@nikhilsimha (feel free to assign others instead)
@piyushn-stripe