Skip to content

Commit

Permalink
create feature flags function api
Browse files Browse the repository at this point in the history
  • Loading branch information
divyamanohar-stripe committed Feb 21, 2024
1 parent ac5095b commit caa220b
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 6 deletions.
3 changes: 2 additions & 1 deletion online/src/main/java/ai/chronon/online/JavaFetcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.stream.Collectors;

public class JavaFetcher {
Fetcher fetcher;

public JavaFetcher(KVStore kvStore, String metaDataSet, Long timeoutMillis, Consumer<LoggableResponse> logFunc, ExternalSourceRegistry registry) {
public JavaFetcher(KVStore kvStore, String metaDataSet, Long timeoutMillis, Consumer<LoggableResponse> logFunc, ExternalSourceRegistry registry, BiPredicate<String, Map<String, String>> featureFlags) {
this.fetcher = new Fetcher(kvStore, metaDataSet, timeoutMillis, logFunc, false, registry);
}

Expand Down
20 changes: 17 additions & 3 deletions online/src/main/scala/ai/chronon/online/Api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import ai.chronon.api.{Constants, StructType}
import ai.chronon.online.KVStore.{GetRequest, GetResponse, PutRequest}
import org.apache.spark.sql.SparkSession

import java.util.function.Consumer
import java.util.function.{BiPredicate, Consumer}
import scala.collection.Seq
import scala.concurrent.duration.{Duration, MILLISECONDS}
import scala.concurrent.{Await, ExecutionContext, Future}
Expand Down Expand Up @@ -190,20 +190,34 @@ abstract class Api(userConf: Map[String, String]) extends Serializable {
*/
def logResponse(resp: LoggableResponse): Unit

// override to allow rolling out features/infrastructure changes in a safe, controlled manner
def isFeatureFlagEnabled(flagName: String, attributes: java.util.Map[String, String]): Boolean = false

// helper functions
final def buildFetcher(debug: Boolean = false): Fetcher =
new Fetcher(genKvStore,
Constants.ChrononMetadataKey,
logFunc = responseConsumer,
debug = debug,
externalSourceRegistry = externalRegistry,
timeoutMillis = timeoutMillis)
timeoutMillis = timeoutMillis,
featureFlags = featureFlagBiPredicate)

final def buildJavaFetcher(): JavaFetcher =
new JavaFetcher(genKvStore, Constants.ChrononMetadataKey, timeoutMillis, responseConsumer, externalRegistry)
new JavaFetcher(genKvStore,
Constants.ChrononMetadataKey,
timeoutMillis,
responseConsumer,
externalRegistry,
featureFlagBiPredicate)

private def responseConsumer: Consumer[LoggableResponse] =
new Consumer[LoggableResponse] {
override def accept(t: LoggableResponse): Unit = logResponse(t)
}

private def featureFlagBiPredicate: BiPredicate[String, java.util.Map[String, String]] =
new BiPredicate[String, java.util.Map[String, String]] {
override def test(t: String, u: java.util.Map[String, String]): Boolean = isFeatureFlagEnabled(t, u)
}
}
5 changes: 3 additions & 2 deletions online/src/main/scala/ai/chronon/online/Fetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import ai.chronon.online.Metrics.Environment
import com.google.gson.Gson
import org.apache.avro.generic.GenericRecord

import java.util.function.Consumer
import java.util.function.{BiPredicate, Consumer}
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import scala.collection.{Seq, mutable}
Expand Down Expand Up @@ -80,7 +80,8 @@ class Fetcher(val kvStore: KVStore,
timeoutMillis: Long = 10000,
logFunc: Consumer[LoggableResponse] = null,
debug: Boolean = false,
val externalSourceRegistry: ExternalSourceRegistry = null)
val externalSourceRegistry: ExternalSourceRegistry = null,
featureFlags: BiPredicate[String, java.util.Map[String, String]])
extends FetcherBase(kvStore, metaDataSet, timeoutMillis, debug) {

def buildJoinCodec(joinConf: api.Join): JoinCodec = {
Expand Down

0 comments on commit caa220b

Please sign in to comment.