diff --git a/.gitignore b/.gitignore index d079f84..e1a853c 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ state .direnv venv .installed_deps +.img_name *.egg-info __pycache__ diff --git a/docker-compose.yml b/docker-compose.yml index f77eb62..4b485bd 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,7 +10,7 @@ x-airflow-common: - ./state/dbt-data/target:/dbt/target - ./dags:/home/airflow/airflow/dags - ./data:/home/airflow/gcs/data:ro - - ./etl:/etl:ro + - ./etl/src:/etl:ro - ./dbt:/dbt environment: AIRFLOW__CORE__LOAD_EXAMPLES: 'false' diff --git a/etl/Dockerfile b/etl/Dockerfile new file mode 100644 index 0000000..1cedeb2 --- /dev/null +++ b/etl/Dockerfile @@ -0,0 +1,9 @@ +FROM python:3.8-slim + +COPY requirements.txt /requirements.txt +COPY dev-requirements.txt /dev-requirements.txt + +RUN pip install --upgrade pip && \ + pip install -r /requirements.txt -r /dev-requirements.txt + +WORKDIR /src diff --git a/etl/Makefile b/etl/Makefile new file mode 100644 index 0000000..a1b6e1a --- /dev/null +++ b/etl/Makefile @@ -0,0 +1,54 @@ +.PHONY: clean build_img + +PYTHON_VERSION = 3.8 +PYTHON_SUBVERSION = 3.8.12 + +PYENV_VERSIONS = $(HOME)/.pyenv/versions +PYTHON_BIN = $(PYENV_VERSIONS)/$(PYTHON_SUBVERSION)/bin/python$(PYTHON_VERSION) + +VENV = venv +PYTHON_VENV = $(VENV)/bin/python +PIP = $(PYTHON_VENV) -m pip +PIP_COMPILE = venv/bin/pip-compile + +IMG_NAME=etl_beam_env + +all: oci_img .installed_deps + +build_img: .img_name + +.img_name: Dockerfile requirements.txt dev-requirements.txt + docker build -t $(IMG_NAME) . + echo $(IMG_NAME) > $@ + +.installed_deps: requirements.txt dev-requirements.txt $(PYTHON_VENV) + $(PIP) install \ + -r requirements.txt \ + -r dev-requirements.txt + touch $@ + +requirements.txt: pyproject.toml $(PIP_COMPILE) + $(PIP_COMPILE) \ + --resolver=backtracking \ + --output-file $@ \ + $< + +dev-requirements.txt: pyproject.toml $(PIP_COMPILE) + $(PIP_COMPILE) \ + --extra=dev \ + --resolver=backtracking \ + --output-file $@ \ + $< + +$(PIP_COMPILE): $(PYTHON_VENV) + $(PIP) install pip-tools + +$(PYTHON_VENV): $(PYTHON_BIN) + virtualenv --python=$^ $(VENV) + $(PIP) install --upgrade pip + +$(PYTHON_BIN): + pyenv install $(PYTHON_SUBVERSION) + +clean: + rm -rf *.egg-info venv installed_deps diff --git a/etl/dev-requirements.txt b/etl/dev-requirements.txt index 40af729..2798604 100644 --- a/etl/dev-requirements.txt +++ b/etl/dev-requirements.txt @@ -1,5 +1,5 @@ # -# This file is autogenerated by pip-compile with Python 3.10 +# This file is autogenerated by pip-compile with Python 3.8 # by the following command: # # pip-compile --extra=dev --output-file=dev-requirements.txt --resolver=backtracking pyproject.toml @@ -208,7 +208,7 @@ protobuf==4.23.3 # grpc-google-iam-v1 # grpcio-status # proto-plus -psycopg2==2.9.6 +psycopg2-binary==2.9.6 # via beam-etl (pyproject.toml) pyarrow==11.0.0 # via apache-beam @@ -275,6 +275,8 @@ typing-extensions==4.6.3 # via # apache-beam # astroid + # black + # pylint tzdata==2023.3 # via pandas urllib3==1.26.16 diff --git a/etl/justfile b/etl/justfile new file mode 100644 index 0000000..33cdfb6 --- /dev/null +++ b/etl/justfile @@ -0,0 +1,16 @@ +build_img: + make build_img + +test: build_img + docker run \ + -v $(pwd)/src:/src \ + --rm \ + -it $(cat .img_name) \ + python -m pytest + +lint: build_img + docker run \ + -v $(pwd)/src:/src \ + --rm \ + -it $(cat .img_name) \ + pylint --init-hook "import sys; sys.path.append('/src')" /src diff --git a/etl/pyproject.toml b/etl/pyproject.toml index 882e11a..4ae3e52 100644 --- a/etl/pyproject.toml +++ b/etl/pyproject.toml @@ -1,7 +1,7 @@ [project] name = "beam_etl" version = "0.1" -dependencies = ["wheel", "apache-beam[gcp]", "pandas", "psycopg2"] +dependencies = ["wheel", "apache-beam[gcp]", "pandas", "psycopg2-binary"] [project.optional-dependencies] dev = ["pytest", "pylint", "black"] diff --git a/etl/requirements.txt b/etl/requirements.txt index b189df7..a380178 100644 --- a/etl/requirements.txt +++ b/etl/requirements.txt @@ -1,5 +1,5 @@ # -# This file is autogenerated by pip-compile with Python 3.10 +# This file is autogenerated by pip-compile with Python 3.8 # by the following command: # # pip-compile --output-file=requirements.txt --resolver=backtracking pyproject.toml @@ -177,7 +177,7 @@ protobuf==4.23.3 # grpc-google-iam-v1 # grpcio-status # proto-plus -psycopg2==2.9.6 +psycopg2-binary==2.9.6 # via beam-etl (pyproject.toml) pyarrow==11.0.0 # via apache-beam diff --git a/etl/src/__init__.py b/etl/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/etl/src/helpers/__init__.py b/etl/src/helpers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/etl/helpers/data_io.py b/etl/src/helpers/data_io.py similarity index 59% rename from etl/helpers/data_io.py rename to etl/src/helpers/data_io.py index e6ff3d7..7185d29 100644 --- a/etl/helpers/data_io.py +++ b/etl/src/helpers/data_io.py @@ -5,6 +5,7 @@ import io import logging import csv +from typing import Dict import apache_beam as beam import psycopg2 @@ -17,9 +18,10 @@ class ReadFromCsv(beam.DoFn): dictionary where the row names are the keys and the cells are the values """ - def process(self, in_file): - logging.info("reading from input file: %s", in_file) - with FileSystems.open(in_file) as file: + # pylint: disable=abstract-method,arguments-differ + def process(self, element): + logging.info("reading from input file: %s", element) + with FileSystems.open(element) as file: text_wrapper = io.TextIOWrapper(file) reader = csv.reader(text_wrapper) try: @@ -33,24 +35,25 @@ class ReadFromCsv(beam.DoFn): class WriteToPostgreSQL(beam.DoFn): """DoFn to write elements to a PostgreSQL database""" - def __init__( - self, hostname, port, username, password, database, table, table_key=None - ): - self.connection_details = { - "host": hostname, - "port": port, - "user": username, - "password": password, - "database": database, - } + # pylint: disable=abstract-method + def __init__(self, connection_details: Dict[str, str], table, table_key=None): + # pylint: disable=super-init-not-called + self.connection_details = connection_details self.table = table self.table_key = table_key + self.connection = None def setup(self): self.connection = psycopg2.connect(**self.connection_details) self.connection.autocommit = True - def execute_insert(self, row, cursor): + def execute_insert(self, row: Dict, cursor): + """Given a dictionary reporesenting a row and a postgresql cursor, + insert the row into the database so that the dict keys are the colum + names and the dict values the cell values. + If a table_key is specified (`self.table_key` is set) handle conflicts + by doing nothing""" + colnames = ",".join(row.keys()) values = ",".join(["%s"] * len(row)) sql = f""" @@ -61,19 +64,28 @@ class WriteToPostgreSQL(beam.DoFn): sql = sql + f" ON CONFLICT ({ self.table_key }) DO NOTHING" cursor.execute(sql, list(row.values())) + # pylint: disable=arguments-differ def process(self, element): - cursor = self.connection.cursor() - self.execute_insert(element, cursor) - cursor.close() + if self.connection is not None: + cursor = self.connection.cursor() + self.execute_insert(element, cursor) + cursor.close() def teardown(self): - self.connection.close() + if self.connection is not None: + self.connection.close() class UpsertProductsToPg(WriteToPostgreSQL): """DoFn to write products to PostgreSQL with our upsert logic""" + # pylint: disable=abstract-method def execute_insert(self, row, cursor): + """Our upsert logic is the following: + When any of primary_category, packaging, origin, height, depth or with + has changed, update the element and set the ingestion_time value to the + current timestamp + """ colnames = ",".join(row.keys()) values = ",".join(["%s"] * len(row)) sql = f""" @@ -87,12 +99,18 @@ class UpsertProductsToPg(WriteToPostgreSQL): packaging = EXCLUDED.packaging, origin = EXCLUDED.origin, weight = EXCLUDED.weight, + height = EXCLUDED.height, + depth = EXCLUDED.depth, + width = EXCLUDED.width, ingestion_time = NOW()::TIMESTAMP WHERE { self.table }.primary_category != EXCLUDED.primary_category OR { self.table }.materials != EXCLUDED.materials OR { self.table }.packaging != EXCLUDED.packaging OR { self.table }.origin != EXCLUDED.origin OR - { self.table }.weight != EXCLUDED.weight + { self.table }.weight != EXCLUDED.weight OR + { self.table }.height != EXCLUDED.height OR + { self.table }.depth != EXCLUDED.depth OR + { self.table }.width != EXCLUDED.width """ cursor.execute(sql, list(row.values())) diff --git a/etl/helpers/dimensions.py b/etl/src/helpers/dimensions.py similarity index 100% rename from etl/helpers/dimensions.py rename to etl/src/helpers/dimensions.py diff --git a/etl/helpers/materials.py b/etl/src/helpers/materials.py similarity index 100% rename from etl/helpers/materials.py rename to etl/src/helpers/materials.py diff --git a/etl/helpers/misc.py b/etl/src/helpers/misc.py similarity index 100% rename from etl/helpers/misc.py rename to etl/src/helpers/misc.py diff --git a/etl/helpers/origin.py b/etl/src/helpers/origin.py similarity index 100% rename from etl/helpers/origin.py rename to etl/src/helpers/origin.py diff --git a/etl/helpers/parse_row.py b/etl/src/helpers/parse_row.py similarity index 100% rename from etl/helpers/parse_row.py rename to etl/src/helpers/parse_row.py diff --git a/etl/helpers/parse_xml.py b/etl/src/helpers/parse_xml.py similarity index 100% rename from etl/helpers/parse_xml.py rename to etl/src/helpers/parse_xml.py diff --git a/etl/helpers/weight.py b/etl/src/helpers/weight.py similarity index 100% rename from etl/helpers/weight.py rename to etl/src/helpers/weight.py diff --git a/etl/main.py b/etl/src/main.py similarity index 75% rename from etl/main.py rename to etl/src/main.py index 1ed229e..527bc53 100644 --- a/etl/main.py +++ b/etl/src/main.py @@ -1,4 +1,7 @@ -#!/usr/bin/env python +"""This Apache Beam pipeline reads rows as elements from a CSV input file, +extracts and parses relevant values, and upserts the elements to a PostgreSQL +database +""" import logging @@ -10,9 +13,6 @@ from helpers.data_io import ReadFromCsv, UpsertProductsToPg from helpers.parse_row import parse_row -# def __init__(self, hostname, port, username, password, database): - - class SustainabilityScoreOptions(PipelineOptions): """Options for this pipeline""" @@ -33,18 +33,23 @@ def main(): beam_options = PipelineOptions() opts = beam_options.view_as(SustainabilityScoreOptions) + pg_connection_details = { + "host": opts.pg_hostname, + "port": opts.pg_port, + "user": opts.pg_username, + "password": opts.pg_password, + "database": opts.pg_database, + } + with beam.Pipeline(options=beam_options) as pipeline: # fmt: off + # pylint: disable=expression-not-assigned pipeline \ | beam.Create([opts.input]) \ | beam.ParDo(ReadFromCsv()) \ | beam.Map(parse_row) \ | beam.ParDo(UpsertProductsToPg( - hostname=opts.pg_hostname, - port=opts.pg_port, - username=opts.pg_username, - password=opts.pg_password, - database=opts.pg_database, + connection_details=pg_connection_details, table=opts.pg_table, table_key="tcin", )) diff --git a/etl/tests/test_convert_units.py b/etl/src/tests/test_convert_units.py similarity index 100% rename from etl/tests/test_convert_units.py rename to etl/src/tests/test_convert_units.py diff --git a/etl/tests/test_dimensions.py b/etl/src/tests/test_dimensions.py similarity index 100% rename from etl/tests/test_dimensions.py rename to etl/src/tests/test_dimensions.py diff --git a/etl/tests/test_materials.py b/etl/src/tests/test_materials.py similarity index 100% rename from etl/tests/test_materials.py rename to etl/src/tests/test_materials.py diff --git a/etl/tests/test_origin.py b/etl/src/tests/test_origin.py similarity index 100% rename from etl/tests/test_origin.py rename to etl/src/tests/test_origin.py diff --git a/etl/tests/test_parse_xml.py b/etl/src/tests/test_parse_xml.py similarity index 100% rename from etl/tests/test_parse_xml.py rename to etl/src/tests/test_parse_xml.py diff --git a/etl/tests/test_weight.py b/etl/src/tests/test_weight.py similarity index 100% rename from etl/tests/test_weight.py rename to etl/src/tests/test_weight.py