diff --git a/dags/sustainability_score/__init__.py b/dags/sustainability_score/__init__.py index 10adffc..8842365 100644 --- a/dags/sustainability_score/__init__.py +++ b/dags/sustainability_score/__init__.py @@ -27,6 +27,25 @@ CONFIG = { "scored_table": "sustainability_score.scored_products", } + +def dbt(cmd: str, attach_dag: DAG) -> BashOperator: + """Setup and return an operator to run dbt commands""" + return BashOperator( + dag=attach_dag, + task_id=f"dbt_{ cmd }", + bash_command=f"dbt { cmd }", + cwd="/dbt", + env={ + "POSTGRES_HOST": "{{ conn.get('pg_db').host }}", + "POSTGRES_USER": "{{ conn.get('pg_db').login }}", + "POSTGRES_PASSWORD": "{{ conn.get('pg_db').password }}", + "POSTGRES_PORT": "{{ conn.get('pg_db').port }}", + "POSTGRES_DATABASE": "{{ conn.get('pg_db').schema }}", + }, + append_env=True, + ) + + with DAG( "sustainability_score", schedule_interval="0 * * * 1-5", @@ -38,12 +57,7 @@ with DAG( ) as dag: create_products_table = PostgresOperator( task_id="create_products_table", - sql="products_schema.sql", - postgres_conn_id="pg_db", - ) - create_scores_table = PostgresOperator( - task_id="create_scored_products_table", - sql="scored_products_schema.sql", + sql="sql/products_schema.sql", postgres_conn_id="pg_db", ) @@ -61,25 +75,7 @@ with DAG( }, ) - calculate_score = PostgresOperator( - task_id="calculate_score", - sql="sql/calculate_score.sql", - postgres_conn_id="pg_db", - ) + dbt_run = dbt("run", dag) + dbt_test = dbt("test", dag) - create_products_table >> etl_pipeline - [etl_pipeline, create_scores_table] >> calculate_score - - dbt_run = BashOperator( - task_id="run_dbt", - bash_command="dbt run", - cwd="/dbt", - env={ - "POSTGRES_HOST": "{{ conn.get('pg_db').host }}", - "POSTGRES_USER": "{{ conn.get('pg_db').login }}", - "POSTGRES_PASSWORD": "{{ conn.get('pg_db').password }}", - "POSTGRES_PORT": "{{ conn.get('pg_db').port }}", - "POSTGRES_DATABASE": "{{ conn.get('pg_db').schema }}", - }, - append_env=True, - ) + create_products_table >> etl_pipeline >> dbt_run >> dbt_test diff --git a/dags/sustainability_score/sql/products_schema.sql b/dags/sustainability_score/sql/products_schema.sql index 23508f4..9413807 100644 --- a/dags/sustainability_score/sql/products_schema.sql +++ b/dags/sustainability_score/sql/products_schema.sql @@ -1,9 +1,13 @@ CREATE TABLE IF NOT EXISTS {{ params.products_table }} ( tcin VARCHAR PRIMARY KEY, - gtin13 VARCHAR NOT NULL, - primary_category VARCHAR NOT NULL, + gtin13 VARCHAR NOT NULL, + ingestion_time TIMESTAMP NOT NULL, + primary_category VARCHAR NOT NULL, materials VARCHAR[], - packaging INT NOT NULL, - origin VARCHAR NOT NULL, - weight NUMERIC + packaging INT NOT NULL, + origin VARCHAR NOT NULL, + weight NUMERIC, + height NUMERIC, + width NUMERIC, + depth NUMERIC ); diff --git a/dbt/models/category_material_scores.sql b/dbt/models/category_material_scores.sql new file mode 100644 index 0000000..8e3b389 --- /dev/null +++ b/dbt/models/category_material_scores.sql @@ -0,0 +1,16 @@ +{{ config(materialized='view') }} + +WITH unnested_materials AS ( + SELECT + primary_category, + unnest(materials) unnested_material + FROM {{ source('products', 'products') }} +) + +SELECT + primary_category, + AVG(lookup.score) AS score +FROM unnested_materials +JOIN {{ ref('material_lookup') }} AS lookup +ON unnested_materials.unnested_material = lookup.material +GROUP BY primary_category diff --git a/dbt/models/scored_products.sql b/dbt/models/scored_products.sql index 08fdbed..c80a048 100644 --- a/dbt/models/scored_products.sql +++ b/dbt/models/scored_products.sql @@ -1,36 +1,29 @@ -{{ config(materialized='table') }} +{{ + config( + materialized='incremental', + unique_key='tcin' + ) +}} -WITH unnested_materials AS ( - SELECT - primary_category, - unnest(materials) unnested_material - FROM {{ source('products', 'products') }} -), - -material_scores AS ( - SELECT - primary_category, - AVG(lookup.score) AS score - FROM unnested_materials - JOIN {{ ref('material_lookup') }} AS lookup - ON unnested_materials.unnested_material = lookup.material - GROUP BY primary_category -), - -scores AS ( +WITH scores AS ( SELECT tcin, + ingestion_time, material_scores.score AS material_score, weight * 0.75 AS weight_score, packaging * 0.6 AS packaging_score, lookup.score AS origin_score FROM {{ source('products', 'products') }} AS products - LEFT JOIN material_scores USING (primary_category) + LEFT JOIN {{ ref('category_material_scores') }} AS material_scores USING (primary_category) LEFT JOIN {{ ref('origin_lookup') }} AS lookup USING (origin) + {% if is_incremental() %} + WHERE ingestion_time > (SELECT MAX(ingestion_time) FROM {{ this }}) + {% endif %} ) SELECT tcin, + ingestion_time, material_score, weight_score, packaging_score, diff --git a/dbt/models/sustainability_score.yml b/dbt/models/sustainability_score.yml index ecaa534..9641737 100644 --- a/dbt/models/sustainability_score.yml +++ b/dbt/models/sustainability_score.yml @@ -19,6 +19,9 @@ models: - name: score type: float + - name: category_material_scores + description: "view holding the material score of each primary_category" + - name: origin_lookup description: "lookup table to match product origin and their score" columns: @@ -38,6 +41,12 @@ models: - unique - not_null + - name: ingestion_time + type: timestamp + description: "timestamp of when the row was inserted or updated by the ETL process" + tests: + - not_null + - name: material_score type: float description: "score calculated from the score"