diff --git a/pipeline/dags/sustainability_score/README.md b/pipeline/dags/sustainability_score/README.md new file mode 100644 index 0000000..3a60a92 --- /dev/null +++ b/pipeline/dags/sustainability_score/README.md @@ -0,0 +1,3 @@ +# Sustainability score + +Placeholder diff --git a/pipeline/dags/sustainability_score/__init__.py b/pipeline/dags/sustainability_score/__init__.py new file mode 100644 index 0000000..ebd011c --- /dev/null +++ b/pipeline/dags/sustainability_score/__init__.py @@ -0,0 +1,39 @@ +""" +DAG IDs: sustainability_score +""" + +import os +from datetime import datetime + +import utils + +from airflow import DAG +from airflow.operators.dummy_operator import DummyOperator + +from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator + + +HOME = os.environ["HOME"] +CSV_FNAME = ( + "large_target_store_products_dataset_sample - " + "large_target_store_products_dataset_sample.csv" +) +CONFIG = { + "input": f"{ HOME }/gcs/data/{ CSV_FNAME }", + "beam_etl_path": "/beam_etl/main.py", +} + +with DAG( + "sustainability_score", + schedule_interval="0 * * * 1-5", + catchup=False, + max_active_runs=10, + start_date=datetime(2023, 6, 21), + doc_md=utils.load_docs(__file__), + params=CONFIG, +) as dag: + etl_pipeline = BeamRunPythonPipelineOperator( + task_id="beam_etl", + py_file="{{ params.beam_etl_path }}", + pipeline_options={"input": "{{ params.input }}"}, + ) diff --git a/pipeline/dags/utils.py b/pipeline/dags/utils.py new file mode 100644 index 0000000..f945604 --- /dev/null +++ b/pipeline/dags/utils.py @@ -0,0 +1,12 @@ +""" +Misc helper functions +""" + +import pathlib + +def load_docs(caller_path, fname="README.md"): + """Load the README.md file for the DAG's docs""" + caller_wd = pathlib.Path(caller_path).parent + with pathlib.Path(caller_wd, fname).open(encoding="utf-8") as docs_fh: + docs = docs_fh.read() + return docs diff --git a/pipeline/docker-compose.yml b/pipeline/docker-compose.yml index 269834a..517e663 100644 --- a/pipeline/docker-compose.yml +++ b/pipeline/docker-compose.yml @@ -6,10 +6,12 @@ x-airflow-common: image: us-docker.pkg.dev/cloud-airflow-releaser/airflow-worker-scheduler-2-5-1/airflow-worker-scheduler-2-5-1:composer-2.3.1-airflow-2-5-1 entrypoint: /usr/local/bin/airflow-entrypoint.sh volumes: + - ./state/airflow-data:/home/airflow/airflow + - ./dags:/home/airflow/airflow/dags - ./scripts/airflow-init.sh:/usr/local/bin/airflow-init.sh:ro - ./scripts/airflow-entrypoint.sh:/usr/local/bin/airflow-entrypoint.sh:ro - ../data:/home/airflow/gcs/data:ro - - ./state/airflow-data:/home/airflow/airflow + - ./beam_etl:/beam_etl:ro environment: AIRFLOW__CORE__LOAD_EXAMPLES: 'false'