From 7f64b4d23a345dacedaf9ec1c3a0518462e25ba6 Mon Sep 17 00:00:00 2001 From: Bayraktar Date: Thu, 23 Jun 2022 11:31:54 +0200 Subject: [PATCH] Add logging to the checkpointers --- spark_matcher/table_checkpointer.py | 17 +++++++++++++++-- spark_matcher/utils.py | 23 ++++++++++++++++++++++- 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/spark_matcher/table_checkpointer.py b/spark_matcher/table_checkpointer.py index 1524695..7a457b0 100644 --- a/spark_matcher/table_checkpointer.py +++ b/spark_matcher/table_checkpointer.py @@ -2,11 +2,15 @@ # Stan Leisink # Frits Hermans +from typing import Optional import abc import os +import logging from pyspark.sql import SparkSession, DataFrame +from spark_matcher.utils import create_logger + class TableCheckpointer(abc.ABC): """ @@ -36,8 +40,12 @@ class HiveCheckpointer(TableCheckpointer): database: a name of a database or storage system where the tables can be saved checkpoint_prefix: a prefix of the name that can be used to save tables """ - def __init__(self, spark_session: SparkSession, database: str, checkpoint_prefix: str = "checkpoint_spark_matcher"): + def __init__(self, spark_session: SparkSession, database: str, checkpoint_prefix: str = "checkpoint_spark_matcher", + logger: Optional[logging.Logger] = None): super().__init__(spark_session, database, checkpoint_prefix) + self.logger = logger + if not self.logger: + self.logger = create_logger() def checkpoint_table(self, sdf: DataFrame, checkpoint_name: str): """ @@ -53,6 +61,7 @@ def checkpoint_table(self, sdf: DataFrame, checkpoint_name: str): the same, unchanged, spark dataframe as the input dataframe. With the only difference that the dataframe is now read from disk as a checkpoint. """ + self.logger.debug(f'caching {self.checkpoint_prefix}_{checkpoint_name}') sdf.write.saveAsTable(f"{self.database}.{self.checkpoint_prefix}_{checkpoint_name}", mode="overwrite") sdf = self.spark_session.table(f"{self.database}.{self.checkpoint_prefix}_{checkpoint_name}") @@ -67,8 +76,11 @@ class ParquetCheckPointer(TableCheckpointer): checkpoint_prefix: a prefix of the name that can be used to save tables """ def __init__(self, spark_session: SparkSession, checkpoint_dir: str, - checkpoint_prefix: str = "checkpoint_spark_matcher"): + checkpoint_prefix: str = "checkpoint_spark_matcher", logger: Optional[logging.Logger] = None): super().__init__(spark_session, checkpoint_dir, checkpoint_prefix) + self.logger = logger + if not self.logger: + self.logger = create_logger() def checkpoint_table(self, sdf: DataFrame, checkpoint_name: str): """ @@ -85,6 +97,7 @@ def checkpoint_table(self, sdf: DataFrame, checkpoint_name: str): the same, unchanged, spark dataframe as the input dataframe. With the only difference that the dataframe is now read from disk as a checkpoint. """ + self.logger.debug(f'caching {self.checkpoint_prefix}_{checkpoint_name}') file_name = os.path.join(f'{self.database}', f'{self.checkpoint_prefix}_{checkpoint_name}') sdf.write.parquet(file_name, mode='overwrite') return self.spark_session.read.parquet(file_name) diff --git a/spark_matcher/utils.py b/spark_matcher/utils.py index f1d2a3b..d2cf843 100644 --- a/spark_matcher/utils.py +++ b/spark_matcher/utils.py @@ -3,7 +3,7 @@ # Frits Hermans from typing import List - +import logging import numpy as np import pandas as pd from pyspark.ml.feature import StopWordsRemover @@ -11,6 +11,27 @@ from pyspark.sql import functions as F +def create_logger() -> logging.Logger: + """ + Creates a logger + """ + logger = logging.getLogger('debug_spark_matcher') + logger.setLevel(logging.DEBUG) + + if not (logger.hasHandlers() and len(logger.handlers)): + ch = logging.StreamHandler() + logger.addHandler(ch) + else: + ch = logger.handlers[0] + + ch.setLevel(logging.DEBUG) + + formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + ch.setFormatter(formatter) + + return logger + + def get_most_frequent_words(sdf: DataFrame, col_name: str, min_df=2, top_n_words=1_000) -> pd.DataFrame: """ Count word frequencies in a Spark dataframe `sdf` column named `col_name` and return a Pandas dataframe containing