""" DAG IDs: sustainability_score """ import os from datetime import datetime import utils from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.bash import BashOperator from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator from airflow.providers.postgres.operators.postgres import PostgresOperator HOME = os.environ["HOME"] CSV_FNAME = ( "large_target_store_products_dataset_sample - " "large_target_store_products_dataset_sample.csv" ) CONFIG = { "input": f"{ HOME }/gcs/data/{ CSV_FNAME }", "beam_etl_path": "/etl/main.py", "products_table": "sustainability_score.products", } def dbt(cmd: str, attach_dag: DAG) -> BashOperator: """Setup and return an operator to run dbt commands""" return BashOperator( dag=attach_dag, task_id=f"dbt_{ cmd }", bash_command=f"dbt { cmd }", cwd="/dbt", env={ "POSTGRES_HOST": "{{ conn.get('pg_db').host }}", "POSTGRES_USER": "{{ conn.get('pg_db').login }}", "POSTGRES_PASSWORD": "{{ conn.get('pg_db').password }}", "POSTGRES_PORT": "{{ conn.get('pg_db').port }}", "POSTGRES_DATABASE": "{{ conn.get('pg_db').schema }}", }, append_env=True, ) with DAG( "sustainability_score", schedule_interval="0 * * * 1-5", catchup=False, max_active_runs=10, start_date=datetime(2023, 6, 21), doc_md=utils.load_docs(__file__), params=CONFIG, ) as dag: create_products_table = PostgresOperator( task_id="create_products_table", sql="sql/products_schema.sql", postgres_conn_id="pg_db", ) etl_pipeline = BeamRunPythonPipelineOperator( task_id="beam_etl", py_file="{{ params.beam_etl_path }}", pipeline_options={ "input": "{{ params.input }}", "pg_hostname": "{{ conn.get('pg_db').host }}", "pg_port": "{{ conn.get('pg_db').port }}", "pg_username": "{{ conn.get('pg_db').login }}", "pg_password": "{{ conn.get('pg_db').password }}", "pg_database": "{{ conn.get('pg_db').schema }}", "pg_table": "{{ params.products_table }}", }, ) dbt_run = dbt("run", dag) dbt_test = dbt("test", dag) create_products_table >> etl_pipeline >> dbt_run >> dbt_test