diff --git a/process_zone/apache_airflow/dags/branching.py b/process_zone/apache_airflow/dags/branching.py index acbcf30a..e19d8ee8 100644 --- a/process_zone/apache_airflow/dags/branching.py +++ b/process_zone/apache_airflow/dags/branching.py @@ -1,4 +1,6 @@ # The DAG object; we'll need this to instantiate a DAG +import json +import yaml import urllib.request from urllib.error import HTTPError from urllib.request import Request @@ -15,6 +17,7 @@ import config from services import extract_transform_load_time_series_csv from services import extract_transform_load_time_series_json +from services import extract_transform_load_time_series_text from services import extract_transform_load_images from services import extract_transform_load_dump_sql from services import typefile @@ -37,8 +40,6 @@ from airflow.operators.bash_operator import BashOperator cwd = os.path.dirname(os.path.abspath(__file__)) -import yaml -import json # TODO : Restructure DAG architecture @@ -261,7 +262,8 @@ def default_image(**kwargs): # Compare filetype if "image/" in metadata_doc[content_type]: process_type = "images" - processed_data = extract_transform_load_images(swift_result, swift_container, swift_id, process_type) + processed_data = extract_transform_load_images( + swift_result, swift_container, swift_id, process_type) return processed_data @@ -295,7 +297,8 @@ def default_application_json(**kwargs): if "application/json" in metadata_doc[content_type]: process_type = "time_series_json" # Json parsing - processed_data = extract_transform_load_time_series_json(swift_result, swift_container, swift_id, process_type) + processed_data = extract_transform_load_time_series_json( + swift_result, swift_container, swift_id, process_type) return processed_data @@ -328,7 +331,8 @@ def default_application_vnd_ms_excel(**kwargs): process_type = "time_series_csv" # Json parsing - processed_data = extract_transform_load_time_series_csv(swift_result, swift_container, swift_id, process_type) + processed_data = extract_transform_load_time_series_csv( + swift_result, swift_container, swift_id, process_type) return processed_data @@ -360,7 +364,41 @@ def default_application_sql(**kwargs): processed_data = {} process_type = "sql_dump" # Json parsing - processed_data = extract_transform_load_dump_sql(swift_result, swift_container, swift_id, process_type) + processed_data = extract_transform_load_dump_sql( + swift_result, swift_container, swift_id, process_type) + return processed_data + + +def default_text_plain(**kwargs): + metadata_doc = kwargs["ti"].xcom_pull(key="metadata_doc") + swift_container = metadata_doc["swift_container"] + swift_id = metadata_doc["swift_obj_id"] + + # Openstack Swift + ip_address = config.ip_address_swift + address_name = config.address_name_swift + authurl = "http://" + config.url_swift + "/auth/v1.0" + user = config.user_swift + key = config.key_swift + # Connction à Swift + conn = swiftclient.Connection( + user=user, + key=key, + authurl=authurl + ) + # Récupération de l'object Swift + swift_object = conn.get_object(swift_container, swift_id) + print('----------- OBJET SWIFT -------------') + print(swift_object) + # Content type récupéré de l'object swift + content_type = metadata_doc["content_type"] + # Récupération du fichier encoder dans l'object swift + swift_result = swift_object[1] + processed_data = {} + process_type = "time_series_txt" + # Text parsing + processed_data = extract_transform_load_time_series_text( + swift_result, swift_container, swift_id, process_type) return processed_data @@ -409,19 +447,31 @@ def default_zip(**kwargs): # Compare filetype if "image/" in type_file: process_type = "images" - processed_data = extract_transform_load_images(data_file, swift_container, swift_id, process_type) + processed_data = extract_transform_load_images( + data_file, swift_container, swift_id, process_type) if "application/json" in type_file: process_type = "time_series_json" # Json parsing - processed_data = extract_transform_load_time_series_json(data_file, swift_container, swift_id, process_type) + processed_data = extract_transform_load_time_series_json( + data_file, swift_container, swift_id, process_type) if "application/vnd.ms-excel" in type_file: process_type = "time_series_csv" + # CSV parsing + processed_data = extract_transform_load_time_series_csv( + data_file, swift_container, swift_id, process_type) + if "application/sql" in type_file: + process_type = "sql_dump" # Json parsing - processed_data = extract_transform_load_time_series_csv(data_file, swift_container, swift_id, process_type) + processed_data = extract_transform_load_dump_sql( + swift_result, swift_container, swift_id, process_type) + if "text/plain" in type_file: + process_type = "time_series_txt" + # Text parsing + processed_data = extract_transform_load_time_series_text( + swift_result, swift_container, swift_id, process_type) return processed_data - def default_check_type(**kwargs): """ Check data MIME type and return the next task to trigger. @@ -584,14 +634,15 @@ def neocampus_mongoimport(**kwargs): # TODO : 13/10/2020 MAKE AIRFLOW_TMP AS ENV VAR # TODO : 13/10/2020 FIND A SOLUTION TO CHOSE DATABASE AND COLLECTION print("ssh -i /home/airflow/.ssh/airflow airflow@co2-dl-bd 'mongorestore -d " + - metadata_doc["swift_container"] + " -c " + metadata_doc["original_object_name"] + metadata_doc["swift_container"] + " -c " + + metadata_doc["original_object_name"] + " /datalake/airflow/airflow_tmp/" + metadata_doc["original_object_name"] + "'") os.system("ssh -i /home/airflow/.ssh/airflow airflow@co2-dl-bd 'mongorestore -d " + - metadata_doc["swift_container"] + " -c " + metadata_doc["original_object_name"] + metadata_doc["swift_container"] + " -c " + + metadata_doc["original_object_name"] + " /datalake/airflow/airflow_tmp/" + metadata_doc["original_object_name"] + "'") - def construct_operator(**kwargs): # TODO : Raise error # TODO : Handle more parameters @@ -606,6 +657,7 @@ def construct_operator(**kwargs): except Exception as e: pass + ''' Base operator : Dummy operators ''' @@ -683,10 +735,12 @@ def construct_operator(**kwargs): "neocampus_branching": neocampus_branching, "custom_user_workflow": custom_user_workflow, "workflow_selection": workflow_selection, - "default_image":default_image, - "default_application_json":default_application_json, - "default_application_vnd_ms_excel":default_application_vnd_ms_excel, - "default_zip":default_zip, + "default_image": default_image, + "default_application_json": default_application_json, + "default_application_vnd_ms_excel": default_application_vnd_ms_excel, + "default_application_sql": default_application_sql, + "default_text_plain": default_text_plain, + "default_zip": default_zip, "PythonOperator": PythonOperator, "DummyOperator": DummyOperator, "BranchPythonOperator": BranchPythonOperator @@ -699,22 +753,24 @@ def construct_operator(**kwargs): custom_sub_pipe = [] default_sub_pipe = [] for task in task_dict[data_type][owner_group]: - # raise(Exception(task["operator"])) - if task["operator"] == "PythonOperator": - if owner_group == "default": - default_sub_pipe.append(PythonOperator(task_id=task["task_id"], - python_callable=callable_dict[task["python_callable"]], - on_failure_callback=failed_data_processing, - on_success_callback=successful_data_processing, - start_date=days_ago(0) - ) ) - else : - custom_sub_pipe.append(PythonOperator(task_id=task["task_id"], - python_callable=callable_dict[task["python_callable"]], - on_failure_callback=failed_data_processing, - on_success_callback=successful_data_processing, - start_date=days_ago(0) - ) ) + # raise(Exception(task["operator"])) + if task["operator"] == "PythonOperator": + if owner_group == "default": + default_sub_pipe.append(PythonOperator(task_id=task["task_id"], + python_callable=callable_dict[task["python_callable"]], + on_failure_callback=failed_data_processing, + on_success_callback=successful_data_processing, + start_date=days_ago( + 0) + )) + else: + custom_sub_pipe.append(PythonOperator(task_id=task["task_id"], + python_callable=callable_dict[task["python_callable"]], + on_failure_callback=failed_data_processing, + on_success_callback=successful_data_processing, + start_date=days_ago( + 0) + )) custom_pipeline.append([*custom_sub_pipe, ]) default_pipeline.append([*default_sub_pipe, ]) diff --git a/process_zone/apache_airflow/dags/services/typefile.py b/process_zone/apache_airflow/dags/services/typefile.py index 2ecb0ee6..14ce1e26 100644 --- a/process_zone/apache_airflow/dags/services/typefile.py +++ b/process_zone/apache_airflow/dags/services/typefile.py @@ -21,6 +21,10 @@ def typefile(typef): type_file = "application/x-zip-compressed" if(typef == "tar"): type_file = "application/x-gzip" + if(typef == "sql"): + type_file = "application/sql" + if(typef == "txt"): + type_file = "text/plain" return type_file diff --git a/process_zone/apache_airflow/dags/task_list.json b/process_zone/apache_airflow/dags/task_list.json index 78505ac5..902671ef 100644 --- a/process_zone/apache_airflow/dags/task_list.json +++ b/process_zone/apache_airflow/dags/task_list.json @@ -1,64 +1,79 @@ { "image/": { - "default": [ - { - "operator" : "PythonOperator", - "task_id" : "Default_image", - "python_callable" : "default_image" - } - ], - "mygates": [ - { - "operator" : "PythonOperator", - "task_id" : "Mygates_object_in_png_in_neo4j", - "python_callable" : "content_neo4j_node_creation" - } - ] + "default": [ + { + "operator": "PythonOperator", + "task_id": "Default_image", + "python_callable": "default_image" + } + ], + "mygates": [ + { + "operator": "PythonOperator", + "task_id": "Mygates_object_in_png_in_neo4j", + "python_callable": "content_neo4j_node_creation" + } + ] }, "application/json": { - "default": [ - { - "operator" : "PythonOperator", - "task_id" : "Default_JSON", - "python_callable" : "default_application_json" - } - ], - "neocampus": [ - { - "operator" : "PythonOperator", - "task_id" : "Json_log_to_timeserie_influxdb", - "python_callable" : "from_mongodb_to_influx" - } - ] + "default": [ + { + "operator": "PythonOperator", + "task_id": "Default_JSON", + "python_callable": "default_application_json" + } + ], + "neocampus": [ + { + "operator": "PythonOperator", + "task_id": "Json_log_to_timeserie_influxdb", + "python_callable": "from_mongodb_to_influx" + } + ] + }, + "text/plain": { + "default": [ + { + "operator": "PythonOperator", + "task_id": "Default_Text", + "python_callable": "default_text_plain" + } + ] }, - "application/vnd.ms-excel": - { + "application/sql": { "default": [ { - "operator": "PythonOperator", - "task_id": "Default_excel", - "python_callable": "default_application_vnd_ms_excel" - + "operator": "PythonOperator", + "task_id": "Default_SQL", + "python_callable": "default_application_sql" } ] }, - "application/x-zip-compressed" : - { + "application/vnd.ms-excel": { "default": [ { "operator": "PythonOperator", - "task_id":"Default_ZIP", - "python_callable":"default_zip" + "task_id": "Default_excel", + "python_callable": "default_application_vnd_ms_excel" + } + ] + }, + "application/x-zip-compressed": { + "default": [ + { + "operator": "PythonOperator", + "task_id": "Default_ZIP", + "python_callable": "default_zip" } ] }, "not_handled": { - "default": [ - { - "operator" : "PythonOperator", - "task_id" : "Not_handled", - "python_callable" : "not_handled_call" - } - ] + "default": [ + { + "operator": "PythonOperator", + "task_id": "Not_handled", + "python_callable": "not_handled_call" + } + ] } - } \ No newline at end of file +} \ No newline at end of file