Skip to content

Commit

Permalink
#96 & #97 Change for new version airflow in datalake
Browse files Browse the repository at this point in the history
  • Loading branch information
TheoLGG committed Oct 11, 2021
1 parent ad8b62c commit d6f9ce6
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 79 deletions.
122 changes: 89 additions & 33 deletions process_zone/apache_airflow/dags/branching.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# The DAG object; we'll need this to instantiate a DAG
import json
import yaml
import urllib.request
from urllib.error import HTTPError
from urllib.request import Request
Expand All @@ -15,6 +17,7 @@
import config
from services import extract_transform_load_time_series_csv
from services import extract_transform_load_time_series_json
from services import extract_transform_load_time_series_text
from services import extract_transform_load_images
from services import extract_transform_load_dump_sql
from services import typefile
Expand All @@ -37,8 +40,6 @@
from airflow.operators.bash_operator import BashOperator

cwd = os.path.dirname(os.path.abspath(__file__))
import yaml
import json

# TODO : Restructure DAG architecture

Expand Down Expand Up @@ -261,7 +262,8 @@ def default_image(**kwargs):
# Compare filetype
if "image/" in metadata_doc[content_type]:
process_type = "images"
processed_data = extract_transform_load_images(swift_result, swift_container, swift_id, process_type)
processed_data = extract_transform_load_images(
swift_result, swift_container, swift_id, process_type)
return processed_data


Expand Down Expand Up @@ -295,7 +297,8 @@ def default_application_json(**kwargs):
if "application/json" in metadata_doc[content_type]:
process_type = "time_series_json"
# Json parsing
processed_data = extract_transform_load_time_series_json(swift_result, swift_container, swift_id, process_type)
processed_data = extract_transform_load_time_series_json(
swift_result, swift_container, swift_id, process_type)
return processed_data


Expand Down Expand Up @@ -328,7 +331,8 @@ def default_application_vnd_ms_excel(**kwargs):

process_type = "time_series_csv"
# Json parsing
processed_data = extract_transform_load_time_series_csv(swift_result, swift_container, swift_id, process_type)
processed_data = extract_transform_load_time_series_csv(
swift_result, swift_container, swift_id, process_type)
return processed_data


Expand Down Expand Up @@ -360,7 +364,41 @@ def default_application_sql(**kwargs):
processed_data = {}
process_type = "sql_dump"
# Json parsing
processed_data = extract_transform_load_dump_sql(swift_result, swift_container, swift_id, process_type)
processed_data = extract_transform_load_dump_sql(
swift_result, swift_container, swift_id, process_type)
return processed_data


def default_text_plain(**kwargs):
metadata_doc = kwargs["ti"].xcom_pull(key="metadata_doc")
swift_container = metadata_doc["swift_container"]
swift_id = metadata_doc["swift_obj_id"]

# Openstack Swift
ip_address = config.ip_address_swift
address_name = config.address_name_swift
authurl = "http://" + config.url_swift + "/auth/v1.0"
user = config.user_swift
key = config.key_swift
# Connction à Swift
conn = swiftclient.Connection(
user=user,
key=key,
authurl=authurl
)
# Récupération de l'object Swift
swift_object = conn.get_object(swift_container, swift_id)
print('----------- OBJET SWIFT -------------')
print(swift_object)
# Content type récupéré de l'object swift
content_type = metadata_doc["content_type"]
# Récupération du fichier encoder dans l'object swift
swift_result = swift_object[1]
processed_data = {}
process_type = "time_series_txt"
# Text parsing
processed_data = extract_transform_load_time_series_text(
swift_result, swift_container, swift_id, process_type)
return processed_data


Expand Down Expand Up @@ -409,19 +447,31 @@ def default_zip(**kwargs):
# Compare filetype
if "image/" in type_file:
process_type = "images"
processed_data = extract_transform_load_images(data_file, swift_container, swift_id, process_type)
processed_data = extract_transform_load_images(
data_file, swift_container, swift_id, process_type)
if "application/json" in type_file:
process_type = "time_series_json"
# Json parsing
processed_data = extract_transform_load_time_series_json(data_file, swift_container, swift_id, process_type)
processed_data = extract_transform_load_time_series_json(
data_file, swift_container, swift_id, process_type)
if "application/vnd.ms-excel" in type_file:
process_type = "time_series_csv"
# CSV parsing
processed_data = extract_transform_load_time_series_csv(
data_file, swift_container, swift_id, process_type)
if "application/sql" in type_file:
process_type = "sql_dump"
# Json parsing
processed_data = extract_transform_load_time_series_csv(data_file, swift_container, swift_id, process_type)
processed_data = extract_transform_load_dump_sql(
swift_result, swift_container, swift_id, process_type)
if "text/plain" in type_file:
process_type = "time_series_txt"
# Text parsing
processed_data = extract_transform_load_time_series_text(
swift_result, swift_container, swift_id, process_type)
return processed_data



def default_check_type(**kwargs):
"""
Check data MIME type and return the next task to trigger.
Expand Down Expand Up @@ -584,14 +634,15 @@ def neocampus_mongoimport(**kwargs):
# TODO : 13/10/2020 MAKE AIRFLOW_TMP AS ENV VAR
# TODO : 13/10/2020 FIND A SOLUTION TO CHOSE DATABASE AND COLLECTION
print("ssh -i /home/airflow/.ssh/airflow airflow@co2-dl-bd 'mongorestore -d " +
metadata_doc["swift_container"] + " -c " + metadata_doc["original_object_name"]
metadata_doc["swift_container"] + " -c " +
metadata_doc["original_object_name"]
+ " /datalake/airflow/airflow_tmp/" + metadata_doc["original_object_name"] + "'")
os.system("ssh -i /home/airflow/.ssh/airflow airflow@co2-dl-bd 'mongorestore -d " +
metadata_doc["swift_container"] + " -c " + metadata_doc["original_object_name"]
metadata_doc["swift_container"] + " -c " +
metadata_doc["original_object_name"]
+ " /datalake/airflow/airflow_tmp/" + metadata_doc["original_object_name"] + "'")



def construct_operator(**kwargs):
# TODO : Raise error
# TODO : Handle more parameters
Expand All @@ -606,6 +657,7 @@ def construct_operator(**kwargs):
except Exception as e:
pass


'''
Base operator : Dummy operators
'''
Expand Down Expand Up @@ -683,10 +735,12 @@ def construct_operator(**kwargs):
"neocampus_branching": neocampus_branching,
"custom_user_workflow": custom_user_workflow,
"workflow_selection": workflow_selection,
"default_image":default_image,
"default_application_json":default_application_json,
"default_application_vnd_ms_excel":default_application_vnd_ms_excel,
"default_zip":default_zip,
"default_image": default_image,
"default_application_json": default_application_json,
"default_application_vnd_ms_excel": default_application_vnd_ms_excel,
"default_application_sql": default_application_sql,
"default_text_plain": default_text_plain,
"default_zip": default_zip,
"PythonOperator": PythonOperator,
"DummyOperator": DummyOperator,
"BranchPythonOperator": BranchPythonOperator
Expand All @@ -699,22 +753,24 @@ def construct_operator(**kwargs):
custom_sub_pipe = []
default_sub_pipe = []
for task in task_dict[data_type][owner_group]:
# raise(Exception(task["operator"]))
if task["operator"] == "PythonOperator":
if owner_group == "default":
default_sub_pipe.append(PythonOperator(task_id=task["task_id"],
python_callable=callable_dict[task["python_callable"]],
on_failure_callback=failed_data_processing,
on_success_callback=successful_data_processing,
start_date=days_ago(0)
) )
else :
custom_sub_pipe.append(PythonOperator(task_id=task["task_id"],
python_callable=callable_dict[task["python_callable"]],
on_failure_callback=failed_data_processing,
on_success_callback=successful_data_processing,
start_date=days_ago(0)
) )
# raise(Exception(task["operator"]))
if task["operator"] == "PythonOperator":
if owner_group == "default":
default_sub_pipe.append(PythonOperator(task_id=task["task_id"],
python_callable=callable_dict[task["python_callable"]],
on_failure_callback=failed_data_processing,
on_success_callback=successful_data_processing,
start_date=days_ago(
0)
))
else:
custom_sub_pipe.append(PythonOperator(task_id=task["task_id"],
python_callable=callable_dict[task["python_callable"]],
on_failure_callback=failed_data_processing,
on_success_callback=successful_data_processing,
start_date=days_ago(
0)
))
custom_pipeline.append([*custom_sub_pipe, ])
default_pipeline.append([*default_sub_pipe, ])

Expand Down
4 changes: 4 additions & 0 deletions process_zone/apache_airflow/dags/services/typefile.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ def typefile(typef):
type_file = "application/x-zip-compressed"
if(typef == "tar"):
type_file = "application/x-gzip"
if(typef == "sql"):
type_file = "application/sql"
if(typef == "txt"):
type_file = "text/plain"

return type_file

Expand Down
107 changes: 61 additions & 46 deletions process_zone/apache_airflow/dags/task_list.json
Original file line number Diff line number Diff line change
@@ -1,64 +1,79 @@
{
"image/": {
"default": [
{
"operator" : "PythonOperator",
"task_id" : "Default_image",
"python_callable" : "default_image"
}
],
"mygates": [
{
"operator" : "PythonOperator",
"task_id" : "Mygates_object_in_png_in_neo4j",
"python_callable" : "content_neo4j_node_creation"
}
]
"default": [
{
"operator": "PythonOperator",
"task_id": "Default_image",
"python_callable": "default_image"
}
],
"mygates": [
{
"operator": "PythonOperator",
"task_id": "Mygates_object_in_png_in_neo4j",
"python_callable": "content_neo4j_node_creation"
}
]
},
"application/json": {
"default": [
{
"operator" : "PythonOperator",
"task_id" : "Default_JSON",
"python_callable" : "default_application_json"
}
],
"neocampus": [
{
"operator" : "PythonOperator",
"task_id" : "Json_log_to_timeserie_influxdb",
"python_callable" : "from_mongodb_to_influx"
}
]
"default": [
{
"operator": "PythonOperator",
"task_id": "Default_JSON",
"python_callable": "default_application_json"
}
],
"neocampus": [
{
"operator": "PythonOperator",
"task_id": "Json_log_to_timeserie_influxdb",
"python_callable": "from_mongodb_to_influx"
}
]
},
"text/plain": {
"default": [
{
"operator": "PythonOperator",
"task_id": "Default_Text",
"python_callable": "default_text_plain"
}
]
},
"application/vnd.ms-excel":
{
"application/sql": {
"default": [
{
"operator": "PythonOperator",
"task_id": "Default_excel",
"python_callable": "default_application_vnd_ms_excel"

"operator": "PythonOperator",
"task_id": "Default_SQL",
"python_callable": "default_application_sql"
}
]
},
"application/x-zip-compressed" :
{
"application/vnd.ms-excel": {
"default": [
{
"operator": "PythonOperator",
"task_id":"Default_ZIP",
"python_callable":"default_zip"
"task_id": "Default_excel",
"python_callable": "default_application_vnd_ms_excel"
}
]
},
"application/x-zip-compressed": {
"default": [
{
"operator": "PythonOperator",
"task_id": "Default_ZIP",
"python_callable": "default_zip"
}
]
},
"not_handled": {
"default": [
{
"operator" : "PythonOperator",
"task_id" : "Not_handled",
"python_callable" : "not_handled_call"
}
]
"default": [
{
"operator": "PythonOperator",
"task_id": "Not_handled",
"python_callable": "not_handled_call"
}
]
}
}
}

0 comments on commit d6f9ce6

Please sign in to comment.