feat: added score calculation
parent
6bb944c114
commit
2cf2007434
|
@ -41,6 +41,11 @@ with DAG(
|
||||||
sql="products_schema.sql",
|
sql="products_schema.sql",
|
||||||
postgres_conn_id="pg_db",
|
postgres_conn_id="pg_db",
|
||||||
)
|
)
|
||||||
|
create_scores_table = PostgresOperator(
|
||||||
|
task_id="create_scored_products_table",
|
||||||
|
sql="scored_products_schema.sql",
|
||||||
|
postgres_conn_id="pg_db",
|
||||||
|
)
|
||||||
|
|
||||||
etl_pipeline = BeamRunPythonPipelineOperator(
|
etl_pipeline = BeamRunPythonPipelineOperator(
|
||||||
task_id="beam_etl",
|
task_id="beam_etl",
|
||||||
|
@ -56,4 +61,11 @@ with DAG(
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
calculate_score = PostgresOperator(
|
||||||
|
task_id="calculate_score",
|
||||||
|
sql="calculate_score.sql",
|
||||||
|
postgres_conn_id="pg_db",
|
||||||
|
)
|
||||||
|
|
||||||
create_products_table >> etl_pipeline
|
create_products_table >> etl_pipeline
|
||||||
|
[etl_pipeline, create_scores_table] >> calculate_score
|
||||||
|
|
|
@ -0,0 +1,56 @@
|
||||||
|
WITH material_lookup(material, score) AS (
|
||||||
|
(VALUES
|
||||||
|
('metal', 0.15),
|
||||||
|
('wood', 1),
|
||||||
|
('resin', 0),
|
||||||
|
('fabric', 0.5),
|
||||||
|
('plastic', 0.25)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
|
||||||
|
origin_lookup(origin, score) AS (
|
||||||
|
(VALUES
|
||||||
|
('usa', 1),
|
||||||
|
('imported', 0),
|
||||||
|
('mixed', 0.5)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
|
||||||
|
unnested_materials AS (
|
||||||
|
SELECT
|
||||||
|
primary_category,
|
||||||
|
unnest(materials) unnested_material
|
||||||
|
FROM {{ params.products_table }}
|
||||||
|
),
|
||||||
|
|
||||||
|
material_scores AS (
|
||||||
|
SELECT
|
||||||
|
primary_category,
|
||||||
|
AVG(material_lookup.score) AS score
|
||||||
|
FROM unnested_materials
|
||||||
|
JOIN material_lookup
|
||||||
|
ON unnested_materials.unnested_material = material_lookup.material
|
||||||
|
GROUP BY primary_category
|
||||||
|
),
|
||||||
|
|
||||||
|
scores AS (
|
||||||
|
SELECT
|
||||||
|
tcin,
|
||||||
|
material_scores.score AS material_score,
|
||||||
|
weight * 0.75 AS weight_score,
|
||||||
|
packaging * 0.6 AS packaging_score,
|
||||||
|
origin_lookup.score AS origin_score
|
||||||
|
FROM {{ params.products_table }} AS products
|
||||||
|
LEFT JOIN material_scores USING (primary_category)
|
||||||
|
LEFT JOIN origin_lookup USING (origin)
|
||||||
|
)
|
||||||
|
|
||||||
|
INSERT INTO {{ params.scored_table }}
|
||||||
|
SELECT
|
||||||
|
tcin,
|
||||||
|
material_score,
|
||||||
|
weight_score,
|
||||||
|
packaging_score,
|
||||||
|
origin_score,
|
||||||
|
material_score + weight_score + packaging_score + origin_score AS score
|
||||||
|
FROM scores;
|
|
@ -0,0 +1,8 @@
|
||||||
|
CREATE TABLE IF NOT EXISTS {{ params.scored_table }} (
|
||||||
|
tcin VARCHAR PRIMARY KEY,
|
||||||
|
material_score NUMERIC,
|
||||||
|
weight_score NUMERIC,
|
||||||
|
packaging_score NUMERIC,
|
||||||
|
origin_score NUMERIC,
|
||||||
|
score NUMERIC
|
||||||
|
);
|
Loading…
Reference in New Issue