Skip to content

Commit

Permalink
Feature/dpt 846 add python linting in dap (#1048)
Browse files Browse the repository at this point in the history
* python linter added .flake8 ignore=S608,C901

* python linter added .flake8 ignore=S608,C901

* isort and black linting fixes, flake8 check for pre-commit in husky

* pre-commit check test

* pre-commit check test

* pre-commit check test2

* pre-commit check test3

* pre-commit check test4

* linting fixes excluding flake8 and pydocstyle

* flake8 fixes with ignore = S608,C901

* flake8 fixes with ignore = S608,C901

* deleted pre-commit-config yaml

* clean up of package.json

* specified file extensions for prettier and error message for missing dependencies

* removed unused config
  • Loading branch information
pradeep-swamireddy-gov authored Jan 9, 2025
1 parent 8511f34 commit ee7e15a
Show file tree
Hide file tree
Showing 14 changed files with 594 additions and 489 deletions.
5 changes: 5 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[flake8]
ignore = S608,C901
max-line-length = 160
exclude = tests/*
max-complexity = 10
4 changes: 4 additions & 0 deletions .husky/pre-commit
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
for cmd in reindent isort black pyupgrade mypy pydocstyle flake8; do
command -v "$cmd" >/dev/null 2>&1 || { echo >&2 "Install $cmd (Follow instructions from 'Set up husky hooks' section of Readme.md file)"; exit 2; }
done

npx lint-staged

git diff --cached --name-only | if grep --quiet 'iac/'; then
Expand Down
3 changes: 3 additions & 0 deletions .prettierignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ test-report/
*.md
.eslintignore
.prettierignore
Pyproject.toml
.flake8
.husky/
*.properties
Dockerfile
*.sql
*.jar
*.tar.gz
flyway.conf
*.py
16 changes: 16 additions & 0 deletions Pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[build-system]
requires = [
"setuptools>=42",
"wheel"
]
build-backend = "setuptools.build_meta"

[tool.black]
line-length = 160

[[tool.mypy.overrides]]
module = ["untyped_package.*"]
follow_untyped_imports = true

[tool.mypy]
disable_error_code = ["import-untyped"]
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,19 @@ You may need to install `gpg` first - on a GDS Mac open the terminal and run `br
#### Set up husky hooks

[Husky](https://typicode.github.io/husky) is used to run [githooks](https://git-scm.com/docs/githooks), specifically `pre-commit` and `pre-push`.
To install the hooks run `npm run husky:install`. After this, the hooks defined under the [.husky](.husky) directory will automatically run when you commit or push.*
To install the hooks run

`npm run husky:install`

`python3 -m venv venvlocal` (Run only if venv doesn't exist already)

`source venvlocal/bin/activate`

`pip install -r athena-scripts/raw_stage_optimisation_solution/scripts/requirements.txt` (Only need to be run first time)

`

After this, the hooks defined under the [.husky](.husky) directory will automatically run when you commit or push.*
The [lint-staged](https://github.com/okonet/lint-staged) library is used to only run certain tasks if certain files are modified.

Config can be found in the `lint-staged` block in [package.json](package.json). Note that `lint-staged` works by passing
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import json
import time

import boto3


class AthenaReadWrite:
"""
A class for interacting with Athena data objects, which is not possible using AWSDataWrangler
Expand All @@ -13,13 +13,14 @@ class AthenaReadWrite:
run_query(self, database, sql, workgroup)
Runs a query against the Athena service.
"""

def __init__(self):
"""
Initialize a new AthenaReadWrite instance.
"""
self.athena_client = boto3.client('athena', region_name='eu-west-2')
self.athena_client = boto3.client("athena", region_name="eu-west-2")

def run_query(self, database, sql, workgroup):
"""
Expand All @@ -36,26 +37,24 @@ def run_query(self, database, sql, workgroup):
try:
response = self.athena_client.start_query_execution(
QueryString=sql,
QueryExecutionContext={
'Database': database
},
WorkGroup=workgroup
QueryExecutionContext={"Database": database},
WorkGroup=workgroup,
)

# Get the query execution ID
query_execution_id = response['QueryExecutionId']
query_execution_id = response["QueryExecutionId"]

# Check the status of the query periodically until it completes
while True:
query_execution = self.athena_client.get_query_execution(QueryExecutionId=query_execution_id)
status = query_execution['QueryExecution']['Status']['State']
status = query_execution["QueryExecution"]["Status"]["State"]

if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
if status in ["SUCCEEDED", "FAILED", "CANCELLED"]:
break

time.sleep(5) # Wait for 5 seconds before checking again

if status == 'SUCCEEDED':
if status == "SUCCEEDED":
print("Athena query successfully completed")
return True
else:
Expand All @@ -64,4 +63,3 @@ def run_query(self, database, sql, workgroup):
except Exception as e:
print(f"Exception when running Athena query: {str(e)}")
return False

Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import datetime
from datetime import datetime
import awswrangler as wr
import pandas as pd

import numpy as np
import pandas as pd


class DataPreprocessing:
"""
A class for performing preprocessing tasks against a supplied dataframe
"""

def __init__(self):
self.now = datetime.now()
self.processed_dt = int(self.now.strftime("%Y%m%d"))
Expand All @@ -33,7 +34,7 @@ def remove_duplicate_rows(self, df, fields):
except Exception as e:
print(f"Error dropping row duplicates: {str(e)}")
return None

def remove_rows_missing_mandatory_values(self, df, fields):
"""
Remove rows with missing mandatory field values.
Expand All @@ -52,7 +53,7 @@ def remove_rows_missing_mandatory_values(self, df, fields):
except Exception as e:
print(f"Error dropping rows missing mandatory field: {str(e)}")
return None

def rename_column_names(self, df, fields):
"""
Rename column names based on the provided mapping.
Expand All @@ -71,7 +72,7 @@ def rename_column_names(self, df, fields):
except Exception as e:
print(f"Error renaming columns: {str(e)}")
return None

def remove_columns(self, df, columns, silent):
"""
remove columns from the data frame
Expand All @@ -84,16 +85,16 @@ def remove_columns(self, df, columns, silent):
Returns:
DataFrame: A DataFrame with specified columns removed if found.
"""

try:
errors = 'ignore' if silent else 'raise'
errors = "ignore" if silent else "raise"
if not isinstance(columns, (list)):
raise ValueError("Invalid field of columns provided, require list")
return df.drop(columns, axis=1, errors=errors)
except Exception as e:
print(f"Error removing columns: {str(e)}")
return None

def add_new_column(self, df, fields):
"""
Add new columns to the DataFrame.
Expand All @@ -106,18 +107,18 @@ def add_new_column(self, df, fields):
DataFrame: A DataFrame with new columns added.
"""
try:
if not isinstance(fields, (dict)):
if not isinstance(fields, dict):
raise ValueError("Invalid field list structure provided, require dict object")
for column_name, value in fields.items():
if column_name == 'processed_dt':
for column_name, _value in fields.items():
if column_name == "processed_dt":
df[column_name] = self.processed_dt
if column_name == 'processed_time':
if column_name == "processed_time":
df[column_name] = self.processed_time
return df
except Exception as e:
print(f"Error adding new columns: {str(e)}")
return None

def add_new_column_from_struct(self, df, fields):
"""
Create new columns from struct fields in the DataFrame.
Expand All @@ -131,19 +132,22 @@ def add_new_column_from_struct(self, df, fields):
DataFrame: A DataFrame with new columns added from struct fields.
"""
try:
if not isinstance(fields, (dict)):
if not isinstance(fields, dict):
raise ValueError("Invalid field list structure provided, require dict object")

for key, value in fields.items():
for item in value:
col_name = f'{key}_{item}'
df[col_name] = df.apply(lambda x: None if x[key] is None or x[key].get(item) is None or (not x[key].get(item).strip()) else x[key].get(item), axis=1)
col_name = f"{key}_{item}"
df[col_name] = df.apply(
lambda x, k=key, i=item: None if x[k] is None or x[k].get(i) is None or (not x[k].get(i).strip()) else x[k].get(i),
axis=1,
)

return df
except Exception as e:
print(f"Error adding new columns from struct: {str(e)}")
return None

def empty_string_to_null(self, df, fields):
"""
Replace empty strings with None (null) in the specified columns.
Expand All @@ -158,18 +162,16 @@ def empty_string_to_null(self, df, fields):
try:
if not isinstance(fields, (list)):
raise ValueError("Invalid field list structure provided, require list object")

for column_name in fields:
df[column_name] = df[column_name].apply(lambda x: None if isinstance(x, str) and (x.isspace() or not x) else x)

return df
except Exception as e:
print(f"Error replacing empty string with sql nulls: {str(e)}")
return None



def extract_key_values(self, obj, parent_key='', sep='.', field_name=''):
def extract_key_values(self, obj, parent_key="", sep=".", field_name=""):
"""
Generate Key/Value records for provided object.
Expand All @@ -183,7 +185,6 @@ def extract_key_values(self, obj, parent_key='', sep='.', field_name=''):
list: A list of (key, value) pairs generated from the input obj argument.
"""
try:

items = []
if not isinstance(obj, (dict, list)):
items.append((field_name, obj))
Expand Down Expand Up @@ -221,12 +222,11 @@ def extract_key_values(self, obj, parent_key='', sep='.', field_name=''):
pass # Ignore if conversion fails
items.append((new_key, value))
return items

except Exception as e:
print(f"Error extracting key/value: {str(e)}")
return None



def generate_key_value_records(self, df, fields, column_names_list):
"""
Generate Key/Value records from nested struct fields in the DataFrame.
Expand All @@ -242,33 +242,39 @@ def generate_key_value_records(self, df, fields, column_names_list):
try:
if not isinstance(fields, (list)):
raise ValueError("Invalid field list structure provided, require list object")

# Initialize an empty list to store DataFrames
dfs = []

for column_name in fields:
df_key_value = df.apply(lambda row: [(row['event_id'], column_name, key, value) for key, value in self.extract_key_values(row[column_name], field_name=column_name)] if pd.notna(row[column_name]) else [], axis=1)
df_key_value = df.apply(
lambda row, col=column_name: (
[(row["event_id"], col, key, value) for key, value in self.extract_key_values(row[col], field_name=col)] if pd.notna(row[col]) else []
),
axis=1,
)
dfs.append(df_key_value)
print(f'class: DataPreprocessing | method=generate_key_value_records | dfs row count: {len(dfs)}')

print(f"class: DataPreprocessing | method=generate_key_value_records | dfs row count: {len(dfs)}")

key_value_pairs = pd.concat(dfs, ignore_index=True)

# Flatten the list of lists into a single list
key_value_pairs = [item for sublist in key_value_pairs for item in sublist]

# Create the "extensions_key_values" DataFrame
result_df = pd.DataFrame(key_value_pairs, columns=['event_id', 'parent_column_name', 'key', 'value'])
result_df = pd.DataFrame(
key_value_pairs,
columns=["event_id", "parent_column_name", "key", "value"],
)

# Filter out rows with null values
result_df = result_df[result_df['value'].notna()]
result_df['processed_dt'] = self.processed_dt
result_df['processed_time'] = self.processed_time
result_df = result_df[result_df["value"].notna()]
result_df["processed_dt"] = self.processed_dt
result_df["processed_time"] = self.processed_time
result_df.columns = column_names_list

return result_df
except Exception as e:
print(f"Error generating key/value records: {str(e)}")
return None


Loading

0 comments on commit ee7e15a

Please sign in to comment.