From 66782ec2ef4f97986ca780784fd0bcc816d5dc3f Mon Sep 17 00:00:00 2001 From: Ricard Illa Date: Fri, 23 Jun 2023 16:23:21 +0200 Subject: [PATCH] feat: added query to create table to airflow --- pipeline/dags/sustainability_score/__init__.py | 10 ++++++++++ pipeline/docker-compose.yml | 1 + pipeline/sql/products_schema.sql | 12 ++++++++++++ pipeline/terraform/modules/airflow/main.tf | 3 ++- 4 files changed, 25 insertions(+), 1 deletion(-) create mode 100644 pipeline/sql/products_schema.sql diff --git a/pipeline/dags/sustainability_score/__init__.py b/pipeline/dags/sustainability_score/__init__.py index ebd011c..f1611b5 100644 --- a/pipeline/dags/sustainability_score/__init__.py +++ b/pipeline/dags/sustainability_score/__init__.py @@ -11,6 +11,7 @@ from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator +from airflow.providers.postgres.operators.postgres import PostgresOperator HOME = os.environ["HOME"] @@ -31,9 +32,18 @@ with DAG( start_date=datetime(2023, 6, 21), doc_md=utils.load_docs(__file__), params=CONFIG, + template_searchpath=["/sql"], ) as dag: + create_products_table = PostgresOperator( + task_id="create_products_table", + sql="products_schema.sql", + postgres_conn_id="pg_db", + ) + etl_pipeline = BeamRunPythonPipelineOperator( task_id="beam_etl", py_file="{{ params.beam_etl_path }}", pipeline_options={"input": "{{ params.input }}"}, ) + + create_products_table >> etl_pipeline diff --git a/pipeline/docker-compose.yml b/pipeline/docker-compose.yml index 774fba3..9dac267 100644 --- a/pipeline/docker-compose.yml +++ b/pipeline/docker-compose.yml @@ -12,6 +12,7 @@ x-airflow-common: - ./scripts/airflow-entrypoint.sh:/usr/local/bin/airflow-entrypoint.sh:ro - ../data:/home/airflow/gcs/data:ro - ./beam_etl:/beam_etl:ro + - ./sql:/sql:ro environment: AIRFLOW__CORE__LOAD_EXAMPLES: 'false' diff --git a/pipeline/sql/products_schema.sql b/pipeline/sql/products_schema.sql new file mode 100644 index 0000000..0854d69 --- /dev/null +++ b/pipeline/sql/products_schema.sql @@ -0,0 +1,12 @@ +CREATE TABLE IF NOT EXISTS sustainability_score.products ( + gtin13 INT PRIMARY KEY, + tcin INT NOT NULL, + primary_category VARCHAR NOT NULL, + materials VARCHAR[], + packaging INT NOT NULL, + origin VARCHAR NOT NULL, + height NUMERIC, + width NUMERIC, + depth NUMERIC, + weight NUMERIC +); diff --git a/pipeline/terraform/modules/airflow/main.tf b/pipeline/terraform/modules/airflow/main.tf index 9fd0399..ff1df1d 100644 --- a/pipeline/terraform/modules/airflow/main.tf +++ b/pipeline/terraform/modules/airflow/main.tf @@ -14,7 +14,8 @@ provider "airflow" { resource "airflow_connection" "pg_connection" { connection_id = "pg_db" conn_type = "postgres" - host = format("%s:%s", var.pg_host, var.pg_port) + host = var.pg_host + port = var.pg_port schema = var.pg_db login = var.pg_username password = var.pg_password