From 2cf200743420e2d0f72bfa6db98ed91e4d95c836 Mon Sep 17 00:00:00 2001 From: Ricard Illa Date: Sun, 25 Jun 2023 12:46:53 +0200 Subject: [PATCH] feat: added score calculation --- dags/sustainability_score/__init__.py | 12 ++++++ sql/calculate_score.sql | 56 +++++++++++++++++++++++++++ sql/scored_products_schema.sql | 8 ++++ 3 files changed, 76 insertions(+) create mode 100644 sql/calculate_score.sql create mode 100644 sql/scored_products_schema.sql diff --git a/dags/sustainability_score/__init__.py b/dags/sustainability_score/__init__.py index 034c0eb..2317a47 100644 --- a/dags/sustainability_score/__init__.py +++ b/dags/sustainability_score/__init__.py @@ -41,6 +41,11 @@ with DAG( sql="products_schema.sql", postgres_conn_id="pg_db", ) + create_scores_table = PostgresOperator( + task_id="create_scored_products_table", + sql="scored_products_schema.sql", + postgres_conn_id="pg_db", + ) etl_pipeline = BeamRunPythonPipelineOperator( task_id="beam_etl", @@ -56,4 +61,11 @@ with DAG( }, ) + calculate_score = PostgresOperator( + task_id="calculate_score", + sql="calculate_score.sql", + postgres_conn_id="pg_db", + ) + create_products_table >> etl_pipeline + [etl_pipeline, create_scores_table] >> calculate_score diff --git a/sql/calculate_score.sql b/sql/calculate_score.sql new file mode 100644 index 0000000..88ff956 --- /dev/null +++ b/sql/calculate_score.sql @@ -0,0 +1,56 @@ +WITH material_lookup(material, score) AS ( + (VALUES + ('metal', 0.15), + ('wood', 1), + ('resin', 0), + ('fabric', 0.5), + ('plastic', 0.25) + ) +), + +origin_lookup(origin, score) AS ( + (VALUES + ('usa', 1), + ('imported', 0), + ('mixed', 0.5) + ) +), + +unnested_materials AS ( + SELECT + primary_category, + unnest(materials) unnested_material + FROM {{ params.products_table }} +), + +material_scores AS ( + SELECT + primary_category, + AVG(material_lookup.score) AS score + FROM unnested_materials + JOIN material_lookup + ON unnested_materials.unnested_material = material_lookup.material + GROUP BY primary_category +), + +scores AS ( + SELECT + tcin, + material_scores.score AS material_score, + weight * 0.75 AS weight_score, + packaging * 0.6 AS packaging_score, + origin_lookup.score AS origin_score + FROM {{ params.products_table }} AS products + LEFT JOIN material_scores USING (primary_category) + LEFT JOIN origin_lookup USING (origin) +) + +INSERT INTO {{ params.scored_table }} +SELECT + tcin, + material_score, + weight_score, + packaging_score, + origin_score, + material_score + weight_score + packaging_score + origin_score AS score +FROM scores; diff --git a/sql/scored_products_schema.sql b/sql/scored_products_schema.sql new file mode 100644 index 0000000..8cb1e29 --- /dev/null +++ b/sql/scored_products_schema.sql @@ -0,0 +1,8 @@ +CREATE TABLE IF NOT EXISTS {{ params.scored_table }} ( + tcin VARCHAR PRIMARY KEY, + material_score NUMERIC, + weight_score NUMERIC, + packaging_score NUMERIC, + origin_score NUMERIC, + score NUMERIC +);