From c742232c4c58b59cd4163be898aafd4661ddff13 Mon Sep 17 00:00:00 2001 From: xiaohui sun Date: Tue, 11 Feb 2025 09:45:10 -0800 Subject: [PATCH 1/4] check null return value --- online/src/main/scala/ai/chronon/online/FetcherBase.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/FetcherBase.scala b/online/src/main/scala/ai/chronon/online/FetcherBase.scala index abae38584..c3528f619 100644 --- a/online/src/main/scala/ai/chronon/online/FetcherBase.scala +++ b/online/src/main/scala/ai/chronon/online/FetcherBase.scala @@ -497,8 +497,10 @@ class FetcherBase(kvStore: KVStore, throw ex } if (groupByServingInfo.groupBy.hasDerivations) { + // Make sure groupByResponse is not null since it will be used to calculate derivations + val safeGroupByResponse: Map[String, AnyRef] = Option(groupByResponse).getOrElse(Map.empty) val derivedMapTry: Try[Map[String, AnyRef]] = Try { - applyDeriveFunc(groupByServingInfo.deriveFunc, request, groupByResponse) + applyDeriveFunc(groupByServingInfo.deriveFunc, request, safeGroupByResponse) } val derivedMap = derivedMapTry match { case Success(derivedMap) => @@ -511,7 +513,7 @@ class FetcherBase(kvStore: KVStore, val renameOnlyDeriveFunction = buildRenameOnlyDerivationFunction(groupByServingInfo.groupBy.derivationsScala) val renameOnlyDerivedMapTry: Try[Map[String, AnyRef]] = Try { - renameOnlyDeriveFunction(request.keys, groupByResponse) + renameOnlyDeriveFunction(request.keys, safeGroupByResponse) .mapValues(_.asInstanceOf[AnyRef]) .toMap } From f4449f650514ca673a719eaac11b6df8e71ccb61 Mon Sep 17 00:00:00 2001 From: xiaohui sun Date: Tue, 11 Feb 2025 16:50:39 -0800 Subject: [PATCH 2/4] add a unit test --- .../ai/chronon/spark/test/FetcherTest.scala | 38 ++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala b/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala index 36e9e5833..2f0adf25c 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala @@ -187,7 +187,11 @@ class FetcherTest extends TestCase { ) ), accuracy = Accuracy.TEMPORAL, - metaData = Builders.MetaData(name = "unit_test/fetcher_mutations_gb", namespace = namespace, team = "chronon") + metaData = Builders.MetaData(name = "unit_test/fetcher_mutations_gb", namespace = namespace, team = "chronon"), + derivations=Seq( + Builders.Derivation(name = "*", expression = "*"), + Builders.Derivation(name = "rating_average_1d_same", expression = "rating_average_1d") + ) ) val joinConf = Builders.Join( @@ -689,6 +693,38 @@ class FetcherTest extends TestCase { assertEquals(joinConf.joinParts.size() + derivationExceptionTypes.size, responseMap.size) assertTrue(responseMap.keys.forall(_.endsWith("_exception"))) } + + def testTemporalFetchGroupByNonExistKey(): Unit = { + val namespace = "non_exist_key_group_by_fetch" + val joinConf = generateMutationData(namespace) + val endDs = "2021-04-10" + val spark: SparkSession = SparkSessionBuilder.build(sessionName + "_" + Random.alphanumeric.take(6).mkString, local = true) + val tableUtils = TableUtils(spark) + val kvStoreFunc = () => OnlineUtils.buildInMemoryKVStore("FetcherTest") + val inMemoryKvStore = kvStoreFunc() + val mockApi = new MockApi(kvStoreFunc, namespace) + @transient lazy val fetcher = mockApi.buildFetcher(debug=false) + + joinConf.joinParts.toScala.foreach(jp => + OnlineUtils.serve(tableUtils, + inMemoryKvStore, + kvStoreFunc, + namespace, + endDs, + jp.groupBy, + dropDsOnWrite = true)) + + // a random key that doesn't exist + val nonExistKey = 123L + val request = Request("unit_test/fetcher_mutations_gb", + Map("listing_id" -> nonExistKey.asInstanceOf[AnyRef])) + val response = fetcher.fetchGroupBys(Seq(request)) + val result = Await.result(response, Duration(10, SECONDS)) + + // result should be "null" if the key is not found + val expected: Map[String, AnyRef] = Map("rating_average_1d_same" -> null) + assert (expected == result.head.values.get) + } } object FetcherTestUtil { From b51e59da27b82f49afde040d7281088d915e871a Mon Sep 17 00:00:00 2001 From: xiaohui sun Date: Tue, 11 Feb 2025 16:54:58 -0800 Subject: [PATCH 3/4] use assertEquals --- spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala b/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala index 2f0adf25c..50edad71e 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala @@ -723,7 +723,7 @@ class FetcherTest extends TestCase { // result should be "null" if the key is not found val expected: Map[String, AnyRef] = Map("rating_average_1d_same" -> null) - assert (expected == result.head.values.get) + assertEquals(expected, result.head.values.get) } } From 185164a6a5102298058618d6bc6b5a82d671a09b Mon Sep 17 00:00:00 2001 From: xiaohui sun Date: Wed, 12 Feb 2025 08:33:17 -0800 Subject: [PATCH 4/4] return nullable for groupByResponse --- .../main/scala/ai/chronon/online/FetcherBase.scala | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/FetcherBase.scala b/online/src/main/scala/ai/chronon/online/FetcherBase.scala index c3528f619..5b253785c 100644 --- a/online/src/main/scala/ai/chronon/online/FetcherBase.scala +++ b/online/src/main/scala/ai/chronon/online/FetcherBase.scala @@ -63,7 +63,7 @@ class FetcherBase(kvStore: KVStore, context: Metrics.Context, totalResponseValueBytes: Int, keys: Map[String, Any] // The keys are used only for caching - ): Map[String, AnyRef] = { + ): Option[Map[String, AnyRef]] = { val (servingInfo, batchResponseMaxTs) = getServingInfo(oldServingInfo, batchResponses) // Batch metrics @@ -114,7 +114,7 @@ class FetcherBase(kvStore: KVStore, ) { if (debug) logger.info("Both batch and streaming data are null") context.distribution("group_by.latency.millis", System.currentTimeMillis() - startTimeMs) - return null + return None } // Streaming metrics @@ -238,7 +238,7 @@ class FetcherBase(kvStore: KVStore, } context.distribution("group_by.latency.millis", System.currentTimeMillis() - startTimeMs) - responseMap + Some(responseMap) } def reportKvResponse(ctx: Metrics.Context, @@ -487,7 +487,7 @@ class FetcherBase(kvStore: KVStore, multiGetMillis, context, totalResponseValueBytes, - request.keys) + request.keys).getOrElse(Map.empty) } catch { case ex: Exception => // not all exceptions are due to stale schema, so we want to control how often we hit kv store @@ -497,10 +497,8 @@ class FetcherBase(kvStore: KVStore, throw ex } if (groupByServingInfo.groupBy.hasDerivations) { - // Make sure groupByResponse is not null since it will be used to calculate derivations - val safeGroupByResponse: Map[String, AnyRef] = Option(groupByResponse).getOrElse(Map.empty) val derivedMapTry: Try[Map[String, AnyRef]] = Try { - applyDeriveFunc(groupByServingInfo.deriveFunc, request, safeGroupByResponse) + applyDeriveFunc(groupByServingInfo.deriveFunc, request, groupByResponse) } val derivedMap = derivedMapTry match { case Success(derivedMap) => @@ -513,7 +511,7 @@ class FetcherBase(kvStore: KVStore, val renameOnlyDeriveFunction = buildRenameOnlyDerivationFunction(groupByServingInfo.groupBy.derivationsScala) val renameOnlyDerivedMapTry: Try[Map[String, AnyRef]] = Try { - renameOnlyDeriveFunction(request.keys, safeGroupByResponse) + renameOnlyDeriveFunction(request.keys, groupByResponse) .mapValues(_.asInstanceOf[AnyRef]) .toMap }