feat: moved transformations to dbt

main
Ricard Illa 2023-06-25 20:53:51 +02:00
parent 1bc5daa29e
commit 1268537695
No known key found for this signature in database
GPG Key ID: F69A672B72E54902
16 changed files with 207 additions and 89 deletions

12
airflow_img/Dockerfile Normal file
View File

@ -0,0 +1,12 @@
FROM us-docker.pkg.dev/cloud-airflow-releaser/airflow-worker-scheduler-2-5-1/airflow-worker-scheduler-2-5-1:composer-2.3.1-airflow-2-5-1
USER root
COPY airflow-entrypoint.sh /usr/local/bin/airflow-entrypoint.sh
COPY airflow-init.sh /usr/local/bin/airflow-init.sh
RUN chmod +x /usr/local/bin/airflow-entrypoint.sh /usr/local/bin/airflow-init.sh
RUN pip install --upgrade dbt-postgres
USER airflow
ENTRYPOINT ["/usr/local/bin/airflow-entrypoint.sh"]

View File

@ -0,0 +1,8 @@
#!/bin/sh
set -xe
[ -f airflow_extra_requirements.txt ] &&
pip3 install --upgrade -r airflow_extra_requirements.txt
exec airflow "$@"

View File

View File

@ -9,6 +9,7 @@ import utils
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash import BashOperator
from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
@ -34,7 +35,6 @@ with DAG(
start_date=datetime(2023, 6, 21),
doc_md=utils.load_docs(__file__),
params=CONFIG,
template_searchpath=["/sql"],
) as dag:
create_products_table = PostgresOperator(
task_id="create_products_table",
@ -63,9 +63,23 @@ with DAG(
calculate_score = PostgresOperator(
task_id="calculate_score",
sql="calculate_score.sql",
sql="sql/calculate_score.sql",
postgres_conn_id="pg_db",
)
create_products_table >> etl_pipeline
[etl_pipeline, create_scores_table] >> calculate_score
dbt_run = BashOperator(
task_id="run_dbt",
bash_command="dbt run",
cwd="/dbt",
env={
"POSTGRES_HOST": "{{ conn.get('pg_db').host }}",
"POSTGRES_USER": "{{ conn.get('pg_db').login }}",
"POSTGRES_PASSWORD": "{{ conn.get('pg_db').password }}",
"POSTGRES_PORT": "{{ conn.get('pg_db').port }}",
"POSTGRES_DATABASE": "{{ conn.get('pg_db').schema }}",
},
append_env=True,
)

35
dbt/dbt_project.yml Normal file
View File

@ -0,0 +1,35 @@
# Name your project! Project names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: 'sustainability_score'
version: '1.0.0'
config-version: 2
# This setting configures which "profile" dbt uses for this project.
profile: 'sustainability_score'
# These configurations specify where dbt should look for different types of files.
# The `model-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
target-path: "target" # directory which will store compiled SQL files
clean-targets: # directories to be removed by `dbt clean`
- "target"
- "dbt_packages"
# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models
# In this example config, we tell dbt to build all models in the example/ directory
# as tables. These settings can be overridden in the individual model files
# using the `{{ config(...) }}` macro.
models:
sustainability_score:

View File

@ -0,0 +1,7 @@
SELECT * FROM (VALUES
('metal', 0.15),
('wood', 1),
('resin', 0),
('fabric', 0.5),
('plastic', 0.25)
) AS material_lookup(material, score)

View File

@ -0,0 +1,5 @@
SELECT * FROM (VALUES
('usa', 1),
('imported', 0),
('mixed', 0.5)
) AS origin_lookup(origin, score)

View File

@ -0,0 +1,39 @@
{{ config(materialized='table') }}
WITH unnested_materials AS (
SELECT
primary_category,
unnest(materials) unnested_material
FROM {{ source('products', 'products') }}
),
material_scores AS (
SELECT
primary_category,
AVG(lookup.score) AS score
FROM unnested_materials
JOIN {{ ref('material_lookup') }} AS lookup
ON unnested_materials.unnested_material = lookup.material
GROUP BY primary_category
),
scores AS (
SELECT
tcin,
material_scores.score AS material_score,
weight * 0.75 AS weight_score,
packaging * 0.6 AS packaging_score,
lookup.score AS origin_score
FROM {{ source('products', 'products') }} AS products
LEFT JOIN material_scores USING (primary_category)
LEFT JOIN {{ ref('origin_lookup') }} AS lookup USING (origin)
)
SELECT
tcin,
material_score,
weight_score,
packaging_score,
origin_score,
material_score + weight_score + packaging_score + origin_score AS score
FROM scores

View File

@ -0,0 +1,59 @@
version: 2
sources:
- name: products
description: "source table populated and updated by the ETL process"
database: sustainability_score
schema: sustainability_score
tables:
- name: products
models:
- name: material_lookup
description: "lookup table to match materials and their score"
columns:
- name: material
type: string
- name: score
type: float
- name: origin_lookup
description: "lookup table to match product origin and their score"
columns:
- name: origin
type: string
- name: score
type: float
- name: scored_products
description: "table containing the scored products"
columns:
- name: tcin
type: string
description: "product TCIN number"
tests:
- unique
- not_null
- name: material_score
type: float
description: "score calculated from the score"
- name: weight_score
type: float
description: "score calculated from the weight"
- name: packaging_score
type: float
description: "score calculated from the packaging value"
- name: origin_score
type: float
description: "score calculated from the product origin"
- name: score
type: float
description: "sum of material_score, weight_score, packaging_score and origin_score"

10
dbt/profiles.yml Normal file
View File

@ -0,0 +1,10 @@
sustainability_score:
outputs:
default:
type: postgres
host: "{{ env_var('POSTGRES_HOST') }}"
port: "{{ env_var('POSTGRES_PORT') | as_number }}"
user: "{{ env_var('POSTGRES_USER') }}"
pass: "{{ env_var('POSTGRES_PASSWORD') }}"
dbname: "{{ env_var('POSTGRES_DATABASE') }}"
schema: sustainability_score

View File

@ -3,16 +3,15 @@ version: "3.8"
x-airflow-common:
&airflow-common
image: us-docker.pkg.dev/cloud-airflow-releaser/airflow-worker-scheduler-2-5-1/airflow-worker-scheduler-2-5-1:composer-2.3.1-airflow-2-5-1
entrypoint: /usr/local/bin/airflow-entrypoint.sh
build: ./airflow_img
volumes:
- ./state/airflow-data:/home/airflow/airflow
- ./state/dbt-data/logs:/dbt/logs
- ./state/dbt-data/target:/dbt/target
- ./dags:/home/airflow/airflow/dags
- ./scripts/airflow-init.sh:/usr/local/bin/airflow-init.sh:ro
- ./scripts/airflow-entrypoint.sh:/usr/local/bin/airflow-entrypoint.sh:ro
- ./data:/home/airflow/gcs/data:ro
- ./etl:/etl:ro
- ./sql:/sql:ro
- ./dbt:/dbt
environment:
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
@ -25,14 +24,16 @@ services:
airflow-scheduler:
<<: *airflow-common
command: airflow scheduler
command: scheduler
restart: "unless-stopped"
depends_on:
airflow-init:
condition: service_completed_successfully
airflow-webserver:
<<: *airflow-common
command: airflow webserver
command: webserver
restart: "unless-stopped"
ports:
- 8080:8080
depends_on:
@ -41,6 +42,7 @@ services:
postgres:
image: postgres:15.3-alpine
restart: "unless-stopped"
ports:
- 5432:5432
volumes:
@ -71,10 +73,9 @@ services:
notebook:
image: jupyter/scipy-notebook
restart: "unless-stopped"
ports:
- 8888:8888
volumes:
- ./notebooks:/home/jovyan/work
- ./data:/home/jovyan/data:ro
profiles:
- notebooks

View File

@ -86,12 +86,12 @@ class UpsertProductsToPg(WriteToPostgreSQL):
materials = EXCLUDED.materials,
packaging = EXCLUDED.packaging,
origin = EXCLUDED.origin,
weight = EXCLUDED.weight,
weight = EXCLUDED.weight
WHERE
primary_category != EXCLUDED.primary_category
materials != EXCLUDED.materials
packaging != EXCLUDED.packaging
origin != EXCLUDED.origin
weight != EXCLUDED.weight
{ self.table }.primary_category != EXCLUDED.primary_category OR
{ self.table }.materials != EXCLUDED.materials OR
{ self.table }.packaging != EXCLUDED.packaging OR
{ self.table }.origin != EXCLUDED.origin OR
{ self.table }.weight != EXCLUDED.weight
"""
cursor.execute(sql, list(row.values()))

View File

@ -1,8 +0,0 @@
#!/bin/sh
set -xe
[ -f composer_requirements.txt ] && pip3 install --upgrade -r composer_requirements.txt
[ -f dev_requirements.txt ] && pip3 install -r dev_requirements.txt
exec "$@"

View File

@ -1,56 +0,0 @@
WITH material_lookup(material, score) AS (
(VALUES
('metal', 0.15),
('wood', 1),
('resin', 0),
('fabric', 0.5),
('plastic', 0.25)
)
),
origin_lookup(origin, score) AS (
(VALUES
('usa', 1),
('imported', 0),
('mixed', 0.5)
)
),
unnested_materials AS (
SELECT
primary_category,
unnest(materials) unnested_material
FROM {{ params.products_table }}
),
material_scores AS (
SELECT
primary_category,
AVG(material_lookup.score) AS score
FROM unnested_materials
JOIN material_lookup
ON unnested_materials.unnested_material = material_lookup.material
GROUP BY primary_category
),
scores AS (
SELECT
tcin,
material_scores.score AS material_score,
weight * 0.75 AS weight_score,
packaging * 0.6 AS packaging_score,
origin_lookup.score AS origin_score
FROM {{ params.products_table }} AS products
LEFT JOIN material_scores USING (primary_category)
LEFT JOIN origin_lookup USING (origin)
)
INSERT INTO {{ params.scored_table }}
SELECT
tcin,
material_score,
weight_score,
packaging_score,
origin_score,
material_score + weight_score + packaging_score + origin_score AS score
FROM scores;

View File

@ -1,8 +0,0 @@
CREATE TABLE IF NOT EXISTS {{ params.scored_table }} (
tcin VARCHAR PRIMARY KEY,
material_score NUMERIC,
weight_score NUMERIC,
packaging_score NUMERIC,
origin_score NUMERIC,
score NUMERIC
);