feat: added query to create table to airflow
parent
cbfbf42d53
commit
66782ec2ef
|
@ -11,6 +11,7 @@ from airflow import DAG
|
||||||
from airflow.operators.dummy_operator import DummyOperator
|
from airflow.operators.dummy_operator import DummyOperator
|
||||||
|
|
||||||
from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator
|
from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator
|
||||||
|
from airflow.providers.postgres.operators.postgres import PostgresOperator
|
||||||
|
|
||||||
|
|
||||||
HOME = os.environ["HOME"]
|
HOME = os.environ["HOME"]
|
||||||
|
@ -31,9 +32,18 @@ with DAG(
|
||||||
start_date=datetime(2023, 6, 21),
|
start_date=datetime(2023, 6, 21),
|
||||||
doc_md=utils.load_docs(__file__),
|
doc_md=utils.load_docs(__file__),
|
||||||
params=CONFIG,
|
params=CONFIG,
|
||||||
|
template_searchpath=["/sql"],
|
||||||
) as dag:
|
) as dag:
|
||||||
|
create_products_table = PostgresOperator(
|
||||||
|
task_id="create_products_table",
|
||||||
|
sql="products_schema.sql",
|
||||||
|
postgres_conn_id="pg_db",
|
||||||
|
)
|
||||||
|
|
||||||
etl_pipeline = BeamRunPythonPipelineOperator(
|
etl_pipeline = BeamRunPythonPipelineOperator(
|
||||||
task_id="beam_etl",
|
task_id="beam_etl",
|
||||||
py_file="{{ params.beam_etl_path }}",
|
py_file="{{ params.beam_etl_path }}",
|
||||||
pipeline_options={"input": "{{ params.input }}"},
|
pipeline_options={"input": "{{ params.input }}"},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
create_products_table >> etl_pipeline
|
||||||
|
|
|
@ -12,6 +12,7 @@ x-airflow-common:
|
||||||
- ./scripts/airflow-entrypoint.sh:/usr/local/bin/airflow-entrypoint.sh:ro
|
- ./scripts/airflow-entrypoint.sh:/usr/local/bin/airflow-entrypoint.sh:ro
|
||||||
- ../data:/home/airflow/gcs/data:ro
|
- ../data:/home/airflow/gcs/data:ro
|
||||||
- ./beam_etl:/beam_etl:ro
|
- ./beam_etl:/beam_etl:ro
|
||||||
|
- ./sql:/sql:ro
|
||||||
environment:
|
environment:
|
||||||
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
|
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,12 @@
|
||||||
|
CREATE TABLE IF NOT EXISTS sustainability_score.products (
|
||||||
|
gtin13 INT PRIMARY KEY,
|
||||||
|
tcin INT NOT NULL,
|
||||||
|
primary_category VARCHAR NOT NULL,
|
||||||
|
materials VARCHAR[],
|
||||||
|
packaging INT NOT NULL,
|
||||||
|
origin VARCHAR NOT NULL,
|
||||||
|
height NUMERIC,
|
||||||
|
width NUMERIC,
|
||||||
|
depth NUMERIC,
|
||||||
|
weight NUMERIC
|
||||||
|
);
|
|
@ -14,7 +14,8 @@ provider "airflow" {
|
||||||
resource "airflow_connection" "pg_connection" {
|
resource "airflow_connection" "pg_connection" {
|
||||||
connection_id = "pg_db"
|
connection_id = "pg_db"
|
||||||
conn_type = "postgres"
|
conn_type = "postgres"
|
||||||
host = format("%s:%s", var.pg_host, var.pg_port)
|
host = var.pg_host
|
||||||
|
port = var.pg_port
|
||||||
schema = var.pg_db
|
schema = var.pg_db
|
||||||
login = var.pg_username
|
login = var.pg_username
|
||||||
password = var.pg_password
|
password = var.pg_password
|
||||||
|
|
Loading…
Reference in New Issue