From 1268537695b9fabb0b73e4bd25af5a0e359b7140 Mon Sep 17 00:00:00 2001 From: Ricard Illa Date: Sun, 25 Jun 2023 20:53:51 +0200 Subject: [PATCH] feat: moved transformations to dbt --- airflow_img/Dockerfile | 12 ++++ airflow_img/airflow-entrypoint.sh | 8 +++ {scripts => airflow_img}/airflow-init.sh | 0 dags/sustainability_score/__init__.py | 18 +++++- .../sql}/products_schema.sql | 0 dbt/dbt_project.yml | 35 +++++++++++ dbt/models/material_lookup.sql | 7 +++ dbt/models/origin_lookup.sql | 5 ++ dbt/models/scored_products.sql | 39 ++++++++++++ dbt/models/sustainability_score.yml | 59 +++++++++++++++++++ dbt/profiles.yml | 10 ++++ docker-compose.yml | 19 +++--- etl/helpers/data_io.py | 12 ++-- scripts/airflow-entrypoint.sh | 8 --- sql/calculate_score.sql | 56 ------------------ sql/scored_products_schema.sql | 8 --- 16 files changed, 207 insertions(+), 89 deletions(-) create mode 100644 airflow_img/Dockerfile create mode 100644 airflow_img/airflow-entrypoint.sh rename {scripts => airflow_img}/airflow-init.sh (100%) mode change 100755 => 100644 rename {sql => dags/sustainability_score/sql}/products_schema.sql (100%) create mode 100644 dbt/dbt_project.yml create mode 100644 dbt/models/material_lookup.sql create mode 100644 dbt/models/origin_lookup.sql create mode 100644 dbt/models/scored_products.sql create mode 100644 dbt/models/sustainability_score.yml create mode 100644 dbt/profiles.yml delete mode 100755 scripts/airflow-entrypoint.sh delete mode 100644 sql/calculate_score.sql delete mode 100644 sql/scored_products_schema.sql diff --git a/airflow_img/Dockerfile b/airflow_img/Dockerfile new file mode 100644 index 0000000..2c55b6c --- /dev/null +++ b/airflow_img/Dockerfile @@ -0,0 +1,12 @@ +FROM us-docker.pkg.dev/cloud-airflow-releaser/airflow-worker-scheduler-2-5-1/airflow-worker-scheduler-2-5-1:composer-2.3.1-airflow-2-5-1 + +USER root + +COPY airflow-entrypoint.sh /usr/local/bin/airflow-entrypoint.sh +COPY airflow-init.sh /usr/local/bin/airflow-init.sh +RUN chmod +x /usr/local/bin/airflow-entrypoint.sh /usr/local/bin/airflow-init.sh +RUN pip install --upgrade dbt-postgres + +USER airflow + +ENTRYPOINT ["/usr/local/bin/airflow-entrypoint.sh"] diff --git a/airflow_img/airflow-entrypoint.sh b/airflow_img/airflow-entrypoint.sh new file mode 100644 index 0000000..9ef0ab9 --- /dev/null +++ b/airflow_img/airflow-entrypoint.sh @@ -0,0 +1,8 @@ +#!/bin/sh + +set -xe + +[ -f airflow_extra_requirements.txt ] && + pip3 install --upgrade -r airflow_extra_requirements.txt + +exec airflow "$@" diff --git a/scripts/airflow-init.sh b/airflow_img/airflow-init.sh old mode 100755 new mode 100644 similarity index 100% rename from scripts/airflow-init.sh rename to airflow_img/airflow-init.sh diff --git a/dags/sustainability_score/__init__.py b/dags/sustainability_score/__init__.py index 2317a47..10adffc 100644 --- a/dags/sustainability_score/__init__.py +++ b/dags/sustainability_score/__init__.py @@ -9,6 +9,7 @@ import utils from airflow import DAG from airflow.operators.dummy_operator import DummyOperator +from airflow.operators.bash import BashOperator from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator from airflow.providers.postgres.operators.postgres import PostgresOperator @@ -34,7 +35,6 @@ with DAG( start_date=datetime(2023, 6, 21), doc_md=utils.load_docs(__file__), params=CONFIG, - template_searchpath=["/sql"], ) as dag: create_products_table = PostgresOperator( task_id="create_products_table", @@ -63,9 +63,23 @@ with DAG( calculate_score = PostgresOperator( task_id="calculate_score", - sql="calculate_score.sql", + sql="sql/calculate_score.sql", postgres_conn_id="pg_db", ) create_products_table >> etl_pipeline [etl_pipeline, create_scores_table] >> calculate_score + + dbt_run = BashOperator( + task_id="run_dbt", + bash_command="dbt run", + cwd="/dbt", + env={ + "POSTGRES_HOST": "{{ conn.get('pg_db').host }}", + "POSTGRES_USER": "{{ conn.get('pg_db').login }}", + "POSTGRES_PASSWORD": "{{ conn.get('pg_db').password }}", + "POSTGRES_PORT": "{{ conn.get('pg_db').port }}", + "POSTGRES_DATABASE": "{{ conn.get('pg_db').schema }}", + }, + append_env=True, + ) diff --git a/sql/products_schema.sql b/dags/sustainability_score/sql/products_schema.sql similarity index 100% rename from sql/products_schema.sql rename to dags/sustainability_score/sql/products_schema.sql diff --git a/dbt/dbt_project.yml b/dbt/dbt_project.yml new file mode 100644 index 0000000..650e592 --- /dev/null +++ b/dbt/dbt_project.yml @@ -0,0 +1,35 @@ + +# Name your project! Project names should contain only lowercase characters +# and underscores. A good package name should reflect your organization's +# name or the intended use of these models +name: 'sustainability_score' +version: '1.0.0' +config-version: 2 + +# This setting configures which "profile" dbt uses for this project. +profile: 'sustainability_score' + +# These configurations specify where dbt should look for different types of files. +# The `model-paths` config, for example, states that models in this project can be +# found in the "models/" directory. You probably won't need to change these! +model-paths: ["models"] +analysis-paths: ["analyses"] +test-paths: ["tests"] +seed-paths: ["seeds"] +macro-paths: ["macros"] +snapshot-paths: ["snapshots"] + +target-path: "target" # directory which will store compiled SQL files +clean-targets: # directories to be removed by `dbt clean` + - "target" + - "dbt_packages" + + +# Configuring models +# Full documentation: https://docs.getdbt.com/docs/configuring-models + +# In this example config, we tell dbt to build all models in the example/ directory +# as tables. These settings can be overridden in the individual model files +# using the `{{ config(...) }}` macro. +models: + sustainability_score: diff --git a/dbt/models/material_lookup.sql b/dbt/models/material_lookup.sql new file mode 100644 index 0000000..3be3a95 --- /dev/null +++ b/dbt/models/material_lookup.sql @@ -0,0 +1,7 @@ +SELECT * FROM (VALUES + ('metal', 0.15), + ('wood', 1), + ('resin', 0), + ('fabric', 0.5), + ('plastic', 0.25) +) AS material_lookup(material, score) diff --git a/dbt/models/origin_lookup.sql b/dbt/models/origin_lookup.sql new file mode 100644 index 0000000..9019e56 --- /dev/null +++ b/dbt/models/origin_lookup.sql @@ -0,0 +1,5 @@ +SELECT * FROM (VALUES + ('usa', 1), + ('imported', 0), + ('mixed', 0.5) +) AS origin_lookup(origin, score) diff --git a/dbt/models/scored_products.sql b/dbt/models/scored_products.sql new file mode 100644 index 0000000..08fdbed --- /dev/null +++ b/dbt/models/scored_products.sql @@ -0,0 +1,39 @@ +{{ config(materialized='table') }} + +WITH unnested_materials AS ( + SELECT + primary_category, + unnest(materials) unnested_material + FROM {{ source('products', 'products') }} +), + +material_scores AS ( + SELECT + primary_category, + AVG(lookup.score) AS score + FROM unnested_materials + JOIN {{ ref('material_lookup') }} AS lookup + ON unnested_materials.unnested_material = lookup.material + GROUP BY primary_category +), + +scores AS ( + SELECT + tcin, + material_scores.score AS material_score, + weight * 0.75 AS weight_score, + packaging * 0.6 AS packaging_score, + lookup.score AS origin_score + FROM {{ source('products', 'products') }} AS products + LEFT JOIN material_scores USING (primary_category) + LEFT JOIN {{ ref('origin_lookup') }} AS lookup USING (origin) +) + +SELECT + tcin, + material_score, + weight_score, + packaging_score, + origin_score, + material_score + weight_score + packaging_score + origin_score AS score +FROM scores diff --git a/dbt/models/sustainability_score.yml b/dbt/models/sustainability_score.yml new file mode 100644 index 0000000..ecaa534 --- /dev/null +++ b/dbt/models/sustainability_score.yml @@ -0,0 +1,59 @@ +version: 2 + +sources: + - name: products + description: "source table populated and updated by the ETL process" + database: sustainability_score + schema: sustainability_score + tables: + - name: products + +models: + + - name: material_lookup + description: "lookup table to match materials and their score" + columns: + - name: material + type: string + + - name: score + type: float + + - name: origin_lookup + description: "lookup table to match product origin and their score" + columns: + - name: origin + type: string + + - name: score + type: float + + - name: scored_products + description: "table containing the scored products" + columns: + - name: tcin + type: string + description: "product TCIN number" + tests: + - unique + - not_null + + - name: material_score + type: float + description: "score calculated from the score" + + - name: weight_score + type: float + description: "score calculated from the weight" + + - name: packaging_score + type: float + description: "score calculated from the packaging value" + + - name: origin_score + type: float + description: "score calculated from the product origin" + + - name: score + type: float + description: "sum of material_score, weight_score, packaging_score and origin_score" diff --git a/dbt/profiles.yml b/dbt/profiles.yml new file mode 100644 index 0000000..dd1f13d --- /dev/null +++ b/dbt/profiles.yml @@ -0,0 +1,10 @@ +sustainability_score: + outputs: + default: + type: postgres + host: "{{ env_var('POSTGRES_HOST') }}" + port: "{{ env_var('POSTGRES_PORT') | as_number }}" + user: "{{ env_var('POSTGRES_USER') }}" + pass: "{{ env_var('POSTGRES_PASSWORD') }}" + dbname: "{{ env_var('POSTGRES_DATABASE') }}" + schema: sustainability_score diff --git a/docker-compose.yml b/docker-compose.yml index 6213266..f77eb62 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,16 +3,15 @@ version: "3.8" x-airflow-common: &airflow-common - image: us-docker.pkg.dev/cloud-airflow-releaser/airflow-worker-scheduler-2-5-1/airflow-worker-scheduler-2-5-1:composer-2.3.1-airflow-2-5-1 - entrypoint: /usr/local/bin/airflow-entrypoint.sh + build: ./airflow_img volumes: - ./state/airflow-data:/home/airflow/airflow + - ./state/dbt-data/logs:/dbt/logs + - ./state/dbt-data/target:/dbt/target - ./dags:/home/airflow/airflow/dags - - ./scripts/airflow-init.sh:/usr/local/bin/airflow-init.sh:ro - - ./scripts/airflow-entrypoint.sh:/usr/local/bin/airflow-entrypoint.sh:ro - ./data:/home/airflow/gcs/data:ro - ./etl:/etl:ro - - ./sql:/sql:ro + - ./dbt:/dbt environment: AIRFLOW__CORE__LOAD_EXAMPLES: 'false' @@ -25,14 +24,16 @@ services: airflow-scheduler: <<: *airflow-common - command: airflow scheduler + command: scheduler + restart: "unless-stopped" depends_on: airflow-init: condition: service_completed_successfully airflow-webserver: <<: *airflow-common - command: airflow webserver + command: webserver + restart: "unless-stopped" ports: - 8080:8080 depends_on: @@ -41,6 +42,7 @@ services: postgres: image: postgres:15.3-alpine + restart: "unless-stopped" ports: - 5432:5432 volumes: @@ -71,10 +73,9 @@ services: notebook: image: jupyter/scipy-notebook + restart: "unless-stopped" ports: - 8888:8888 volumes: - ./notebooks:/home/jovyan/work - ./data:/home/jovyan/data:ro - profiles: - - notebooks diff --git a/etl/helpers/data_io.py b/etl/helpers/data_io.py index 2e51bc1..66893fc 100644 --- a/etl/helpers/data_io.py +++ b/etl/helpers/data_io.py @@ -86,12 +86,12 @@ class UpsertProductsToPg(WriteToPostgreSQL): materials = EXCLUDED.materials, packaging = EXCLUDED.packaging, origin = EXCLUDED.origin, - weight = EXCLUDED.weight, + weight = EXCLUDED.weight WHERE - primary_category != EXCLUDED.primary_category - materials != EXCLUDED.materials - packaging != EXCLUDED.packaging - origin != EXCLUDED.origin - weight != EXCLUDED.weight + { self.table }.primary_category != EXCLUDED.primary_category OR + { self.table }.materials != EXCLUDED.materials OR + { self.table }.packaging != EXCLUDED.packaging OR + { self.table }.origin != EXCLUDED.origin OR + { self.table }.weight != EXCLUDED.weight """ cursor.execute(sql, list(row.values())) diff --git a/scripts/airflow-entrypoint.sh b/scripts/airflow-entrypoint.sh deleted file mode 100755 index b2c7bc9..0000000 --- a/scripts/airflow-entrypoint.sh +++ /dev/null @@ -1,8 +0,0 @@ -#!/bin/sh - -set -xe - -[ -f composer_requirements.txt ] && pip3 install --upgrade -r composer_requirements.txt -[ -f dev_requirements.txt ] && pip3 install -r dev_requirements.txt - -exec "$@" diff --git a/sql/calculate_score.sql b/sql/calculate_score.sql deleted file mode 100644 index 88ff956..0000000 --- a/sql/calculate_score.sql +++ /dev/null @@ -1,56 +0,0 @@ -WITH material_lookup(material, score) AS ( - (VALUES - ('metal', 0.15), - ('wood', 1), - ('resin', 0), - ('fabric', 0.5), - ('plastic', 0.25) - ) -), - -origin_lookup(origin, score) AS ( - (VALUES - ('usa', 1), - ('imported', 0), - ('mixed', 0.5) - ) -), - -unnested_materials AS ( - SELECT - primary_category, - unnest(materials) unnested_material - FROM {{ params.products_table }} -), - -material_scores AS ( - SELECT - primary_category, - AVG(material_lookup.score) AS score - FROM unnested_materials - JOIN material_lookup - ON unnested_materials.unnested_material = material_lookup.material - GROUP BY primary_category -), - -scores AS ( - SELECT - tcin, - material_scores.score AS material_score, - weight * 0.75 AS weight_score, - packaging * 0.6 AS packaging_score, - origin_lookup.score AS origin_score - FROM {{ params.products_table }} AS products - LEFT JOIN material_scores USING (primary_category) - LEFT JOIN origin_lookup USING (origin) -) - -INSERT INTO {{ params.scored_table }} -SELECT - tcin, - material_score, - weight_score, - packaging_score, - origin_score, - material_score + weight_score + packaging_score + origin_score AS score -FROM scores; diff --git a/sql/scored_products_schema.sql b/sql/scored_products_schema.sql deleted file mode 100644 index 8cb1e29..0000000 --- a/sql/scored_products_schema.sql +++ /dev/null @@ -1,8 +0,0 @@ -CREATE TABLE IF NOT EXISTS {{ params.scored_table }} ( - tcin VARCHAR PRIMARY KEY, - material_score NUMERIC, - weight_score NUMERIC, - packaging_score NUMERIC, - origin_score NUMERIC, - score NUMERIC -);