Skip to content

Commit

Permalink
Use logger.debug
Browse files Browse the repository at this point in the history
  • Loading branch information
caiocamatta-stripe committed Feb 27, 2024
1 parent c72ee4e commit c366012
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 30 deletions.
6 changes: 2 additions & 4 deletions flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ case class AvroCodecFn[T](groupByServingInfoParsed: GroupByServingInfoParsed)
* @param groupByServingInfoParsed The GroupBy we are working with
* @tparam T The input data type
*/
case class TiledAvroCodecFn[T](groupByServingInfoParsed: GroupByServingInfoParsed, debug: Boolean = false)
case class TiledAvroCodecFn[T](groupByServingInfoParsed: GroupByServingInfoParsed)
extends BaseAvroCodecFn[TimestampedTile, PutRequest] {
override def open(configuration: Configuration): Unit = {
super.open(configuration)
Expand Down Expand Up @@ -156,16 +156,14 @@ case class TiledAvroCodecFn[T](groupByServingInfoParsed: GroupByServingInfoParse
val keyBytes = keyToBytes(in.keys.toArray)
val valueBytes = in.tileBytes

if (debug) {
logger.info(
logger.debug(
s"""
|Avro converting tile to PutRequest - tile=${in}
|groupBy=${groupByServingInfoParsed.groupBy.getMetaData.getName} tsMills=$tsMills keys=$keys
|keyBytes=${java.util. Base64.getEncoder.encodeToString(keyBytes)}
|valueBytes=${java.util.Base64.getEncoder.encodeToString(valueBytes)}
|streamingDataset=$streamingDataset""".stripMargin
)
}

PutRequest(keyBytes, valueBytes, streamingDataset, Some(tsMills))
}
Expand Down
10 changes: 4 additions & 6 deletions flink/src/main/scala/ai/chronon/flink/FlinkJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,13 @@ import org.slf4j.LoggerFactory
* @param groupByServingInfoParsed - The GroupBy we are working with
* @param encoder - Spark Encoder for the input data type
* @param parallelism - Parallelism to use for the Flink job
* @param debug whether to enable debug logs
* @tparam T - The input data type
*/
class FlinkJob[T](eventSrc: FlinkSource[T],
sinkFn: RichAsyncFunction[PutRequest, WriteResponse],
groupByServingInfoParsed: GroupByServingInfoParsed,
encoder: Encoder[T],
parallelism: Int,
debug: Boolean = false) {
parallelism: Int) {
private[this] val logger = LoggerFactory.getLogger(getClass)

val featureGroupName: String = groupByServingInfoParsed.groupBy.getMetaData.getName
Expand Down Expand Up @@ -167,8 +165,8 @@ class FlinkJob[T](eventSrc: FlinkSource[T],
.sideOutputLateData(tilingLateEventsTag)
.aggregate(
// See Flink's "ProcessWindowFunction with Incremental Aggregation"
preAggregator = new FlinkRowAggregationFunction(groupByServingInfoParsed.groupBy, inputSchema, debug),
windowFunction = new FlinkRowAggProcessFunction(groupByServingInfoParsed.groupBy, inputSchema, debug)
preAggregator = new FlinkRowAggregationFunction(groupByServingInfoParsed.groupBy, inputSchema),
windowFunction = new FlinkRowAggProcessFunction(groupByServingInfoParsed.groupBy, inputSchema)
)
.uid(s"tiling-01-$featureGroupName")
.name(s"Tiling for $featureGroupName")
Expand All @@ -184,7 +182,7 @@ class FlinkJob[T](eventSrc: FlinkSource[T],
.setParallelism(sourceStream.parallelism)

val putRecordDS: DataStream[PutRequest] = tilingDS
.flatMap(new TiledAvroCodecFn[T](groupByServingInfoParsed, debug))
.flatMap(new TiledAvroCodecFn[T](groupByServingInfoParsed))
.uid(s"avro-conversion-01-$featureGroupName")
.name(s"Avro conversion for $featureGroupName")
.setParallelism(sourceStream.parallelism)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ case class TimestampedIR(
*/
class FlinkRowAggregationFunction(
groupBy: GroupBy,
inputSchema: Seq[(String, DataType)],
debug: Boolean = false
inputSchema: Seq[(String, DataType)]
) extends AggregateFunction[Map[String, Any], TimestampedIR, TimestampedIR] {
@transient private[flink] var rowAggregator: RowAggregator = _
@transient lazy val logger = LoggerFactory.getLogger(getClass)
Expand Down Expand Up @@ -72,33 +71,27 @@ class FlinkRowAggregationFunction(

// Given that the rowAggregator is transient, it may be null when a job is restored from a checkpoint
if (rowAggregator == null) {
if (debug) {
logger.info(
logger.debug(
f"The Flink RowAggregator was null for groupBy=${groupBy.getMetaData.getName} tsMills=$tsMills"
)
}
initializeRowAggregator()
}

if (debug) {
logger.info(
logger.debug(
f"Flink pre-aggregates BEFORE adding new element: accumulatorIr=[${accumulatorIr.ir
.mkString(", ")}] groupBy=${groupBy.getMetaData.getName} tsMills=$tsMills element=$element"
)
}

val partialAggregates = Try {
rowAggregator.update(accumulatorIr.ir, row)
}

partialAggregates match {
case Success(v) => {
if (debug) {
logger.info(
logger.debug(
f"Flink pre-aggregates AFTER adding new element [${v.mkString(", ")}] " +
f"groupBy=${groupBy.getMetaData.getName} tsMills=$tsMills element=$element"
)
}
TimestampedIR(v, Some(tsMills))
}
case Failure(e) =>
Expand Down Expand Up @@ -151,8 +144,7 @@ case class TimestampedTile(
// This process function is only meant to be used downstream of the ChrononFlinkAggregationFunction
class FlinkRowAggProcessFunction(
groupBy: GroupBy,
inputSchema: Seq[(String, DataType)],
debug: Boolean = false
inputSchema: Seq[(String, DataType)]
) extends ProcessWindowFunction[TimestampedIR, TimestampedTile, List[Any], TimeWindow] {

@transient private[flink] var tileCodec: TileCodec = _
Expand Down Expand Up @@ -193,14 +185,13 @@ class FlinkRowAggProcessFunction(

tileBytes match {
case Success(v) => {
if (debug) {
logger.info(
f"Flink aggregator processed element irEntry=$irEntry " +
f"tileBytes=${java.util.Base64.getEncoder.encodeToString(v)} " +
f"windowEnd=$windowEnd groupBy=${groupBy.getMetaData.getName} " +
f"keys=$keys isComplete=$isComplete tileAvroSchema=${tileCodec.tileAvroSchema}"
logger.debug(
s"""
|Flink aggregator processed element irEntry=$irEntry
|tileBytes=${java.util.Base64.getEncoder.encodeToString(v)}
|windowEnd=$windowEnd groupBy=${groupBy.getMetaData.getName}
|keys=$keys isComplete=$isComplete tileAvroSchema=${tileCodec.tileAvroSchema}"""
)
}
// The timestamp should never be None here.
out.collect(TimestampedTile(keys, v, irEntry.latestTsMillis.get))
}
Expand Down

0 comments on commit c366012

Please sign in to comment.