diff --git a/online/src/main/scala/ai/chronon/online/MetadataStore.scala b/online/src/main/scala/ai/chronon/online/MetadataStore.scala index 073681531..7168555cc 100644 --- a/online/src/main/scala/ai/chronon/online/MetadataStore.scala +++ b/online/src/main/scala/ai/chronon/online/MetadataStore.scala @@ -21,7 +21,8 @@ import ai.chronon.api.Constants.{ChrononMetadataKey, UTF8} import ai.chronon.api.Extensions.{JoinOps, MetadataOps, StringOps, WindowOps, WindowUtils} import ai.chronon.api._ import ai.chronon.online.KVStore.{GetRequest, PutRequest, TimedValue} -import ai.chronon.online.MetadataEndPoint.NameByTeamEndPointName +import ai.chronon.online.MetadataEndPoint.{ConfByKeyEndPointName, NameByTeamEndPointName} +import ai.chronon.online.Metrics.Name.Exception import com.google.gson.{Gson, GsonBuilder} import org.apache.thrift.TBase @@ -206,8 +207,12 @@ class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey, |key: $k |conf: $v""".stripMargin) val kBytes = k.getBytes() - // if value is a single string, use it as is, else join the strings into a json list - val vBytes = if (v.size == 1) v.head.getBytes() else StringArrayConverter.stringsToBytes(v) + // The value is a single string by default, for NameByTeamEndPointName, it's a list of strings + val vBytes = if (datasetName == NameByTeamEndPointName) { + StringArrayConverter.stringsToBytes(v) + } else { + v.head.getBytes() + } PutRequest(keyBytes = kBytes, valueBytes = vBytes, dataset = datasetName, diff --git a/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala b/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala index 6c0623bf3..fe0f5f269 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala @@ -69,7 +69,7 @@ class FetcherTest extends TestCase { finally src.close() }.replaceAll("\\s+", "") - val acceptedEndPoints = List(MetadataEndPoint.ConfByKeyEndPointName) + val acceptedEndPoints = List(MetadataEndPoint.ConfByKeyEndPointName, MetadataEndPoint.NameByTeamEndPointName) val inMemoryKvStore = OnlineUtils.buildInMemoryKVStore("FetcherTest") val singleFileDataSet = ChrononMetadataKey + "_single_file_test" val singleFileMetadataStore = new MetadataStore(inMemoryKvStore, singleFileDataSet, timeoutMillis = 10000) @@ -81,13 +81,17 @@ class FetcherTest extends TestCase { case (endPoint, kvMap) => singleFileMetadataStore.put(kvMap, singleFileDataSet) } singleFilePut.flatMap(putRequests => Await.result(putRequests, Duration.Inf)) + val response = inMemoryKvStore.get(GetRequest(joinPath.getBytes(), singleFileDataSet)) val res = Await.result(response, Duration.Inf) assertTrue(res.latest.isSuccess) val actual = new String(res.values.get.head.bytes) - assertEquals(expected, actual.replaceAll("\\s+", "")) + val teamMetadataResponse = inMemoryKvStore.getString("joins/relevance", singleFileDataSet, 10000) + val teamMetadataRes = teamMetadataResponse.get + assert(teamMetadataRes.equals("joins/team/example_join.v1")) + val directoryDataSetDataSet = ChrononMetadataKey + "_directory_test" val directoryMetadataStore = new MetadataStore(inMemoryKvStore, directoryDataSetDataSet, timeoutMillis = 10000) inMemoryKvStore.create(directoryDataSetDataSet) @@ -102,9 +106,12 @@ class FetcherTest extends TestCase { val dirRes = Await.result(dirResponse, Duration.Inf) assertTrue(dirRes.latest.isSuccess) val dirActual = new String(dirRes.values.get.head.bytes) - assertEquals(expected, dirActual.replaceAll("\\s+", "")) + val teamMetadataDirResponse = inMemoryKvStore.getString("group_bys/team", directoryDataSetDataSet, 10000) + val teamMetadataDirRes = teamMetadataDirResponse.get + assert(teamMetadataDirRes.equals("group_bys/team/example_group_by.v1")) + val emptyResponse = inMemoryKvStore.get(GetRequest("NoneExistKey".getBytes(), "NonExistDataSetName")) val emptyRes = Await.result(emptyResponse, Duration.Inf)