From 39d279f089410d8e425fd13f53fb327aac1e5a61 Mon Sep 17 00:00:00 2001 From: Ricard Illa Date: Mon, 26 Jun 2023 12:36:19 +0200 Subject: [PATCH] refactor: some cleanup on etl's code structure --- etl/src/helpers/__init__.py | 5 +++ etl/src/helpers/parsers/__init__.py | 4 +++ etl/src/helpers/{ => parsers}/dimensions.py | 2 +- etl/src/helpers/{ => parsers}/materials.py | 0 etl/src/helpers/{ => parsers}/misc.py | 0 etl/src/helpers/{ => parsers}/origin.py | 0 etl/src/helpers/{ => parsers}/parse_row.py | 10 +++--- etl/src/helpers/{ => parsers}/parse_xml.py | 0 etl/src/helpers/{ => parsers}/weight.py | 2 +- etl/src/helpers/process_rows.py | 19 +++++++++++ etl/src/helpers/read_from_csv.py | 28 ++++++++++++++++ .../{data_io.py => upsert_products_to_pg.py} | 32 ++++--------------- etl/src/main.py | 6 ++-- .../tests/{ => parsers}/test_convert_units.py | 6 ++-- .../tests/{ => parsers}/test_dimensions.py | 2 +- etl/src/tests/{ => parsers}/test_materials.py | 6 +++- etl/src/tests/{ => parsers}/test_origin.py | 3 +- etl/src/tests/{ => parsers}/test_parse_xml.py | 2 +- etl/src/tests/{ => parsers}/test_weight.py | 2 +- 19 files changed, 83 insertions(+), 46 deletions(-) create mode 100644 etl/src/helpers/parsers/__init__.py rename etl/src/helpers/{ => parsers}/dimensions.py (97%) rename etl/src/helpers/{ => parsers}/materials.py (100%) rename etl/src/helpers/{ => parsers}/misc.py (100%) rename etl/src/helpers/{ => parsers}/origin.py (100%) rename etl/src/helpers/{ => parsers}/parse_row.py (89%) rename etl/src/helpers/{ => parsers}/parse_xml.py (100%) rename etl/src/helpers/{ => parsers}/weight.py (97%) create mode 100644 etl/src/helpers/process_rows.py create mode 100644 etl/src/helpers/read_from_csv.py rename etl/src/helpers/{data_io.py => upsert_products_to_pg.py} (80%) rename etl/src/tests/{ => parsers}/test_convert_units.py (86%) rename etl/src/tests/{ => parsers}/test_dimensions.py (97%) rename etl/src/tests/{ => parsers}/test_materials.py (91%) rename etl/src/tests/{ => parsers}/test_origin.py (96%) rename etl/src/tests/{ => parsers}/test_parse_xml.py (99%) rename etl/src/tests/{ => parsers}/test_weight.py (95%) diff --git a/etl/src/helpers/__init__.py b/etl/src/helpers/__init__.py index e69de29..b9656a5 100644 --- a/etl/src/helpers/__init__.py +++ b/etl/src/helpers/__init__.py @@ -0,0 +1,5 @@ +"""Helper classes for the pipeline""" + +from helpers.upsert_products_to_pg import UpsertProductsToPg +from helpers.read_from_csv import ReadFromCsv +from helpers.process_rows import ProcessRows diff --git a/etl/src/helpers/parsers/__init__.py b/etl/src/helpers/parsers/__init__.py new file mode 100644 index 0000000..678b614 --- /dev/null +++ b/etl/src/helpers/parsers/__init__.py @@ -0,0 +1,4 @@ +"""Helper parser functions to extract and clean data from the input CSV file +Only `parse_row` needs to be exported.""" + +from helpers.parsers.parse_row import parse_row diff --git a/etl/src/helpers/dimensions.py b/etl/src/helpers/parsers/dimensions.py similarity index 97% rename from etl/src/helpers/dimensions.py rename to etl/src/helpers/parsers/dimensions.py index 95692de..b66ea66 100644 --- a/etl/src/helpers/dimensions.py +++ b/etl/src/helpers/parsers/dimensions.py @@ -6,7 +6,7 @@ import logging from typing import Dict, Optional import re -from helpers.misc import convert_units +from helpers.parsers.misc import convert_units UNIT_CONVERSIONS = {"inches": 2.54, "feet": 30.48, "cm": 1} diff --git a/etl/src/helpers/materials.py b/etl/src/helpers/parsers/materials.py similarity index 100% rename from etl/src/helpers/materials.py rename to etl/src/helpers/parsers/materials.py diff --git a/etl/src/helpers/misc.py b/etl/src/helpers/parsers/misc.py similarity index 100% rename from etl/src/helpers/misc.py rename to etl/src/helpers/parsers/misc.py diff --git a/etl/src/helpers/origin.py b/etl/src/helpers/parsers/origin.py similarity index 100% rename from etl/src/helpers/origin.py rename to etl/src/helpers/parsers/origin.py diff --git a/etl/src/helpers/parse_row.py b/etl/src/helpers/parsers/parse_row.py similarity index 89% rename from etl/src/helpers/parse_row.py rename to etl/src/helpers/parsers/parse_row.py index afddd6a..3d50495 100644 --- a/etl/src/helpers/parse_row.py +++ b/etl/src/helpers/parsers/parse_row.py @@ -4,11 +4,11 @@ the destination database""" import logging from typing import TypedDict, Dict, Optional, List -from helpers.parse_xml import parse_raw_specs -from helpers.materials import parse_materials -from helpers.origin import clean_origin_name -from helpers.dimensions import parse_dimensions -from helpers.weight import parse_weight, dimensional_weight +from helpers.parsers.parse_xml import parse_raw_specs +from helpers.parsers.materials import parse_materials +from helpers.parsers.origin import clean_origin_name +from helpers.parsers.dimensions import parse_dimensions +from helpers.parsers.weight import parse_weight, dimensional_weight class CleanRow(TypedDict): diff --git a/etl/src/helpers/parse_xml.py b/etl/src/helpers/parsers/parse_xml.py similarity index 100% rename from etl/src/helpers/parse_xml.py rename to etl/src/helpers/parsers/parse_xml.py diff --git a/etl/src/helpers/weight.py b/etl/src/helpers/parsers/weight.py similarity index 97% rename from etl/src/helpers/weight.py rename to etl/src/helpers/parsers/weight.py index e3035e7..2a697a5 100644 --- a/etl/src/helpers/weight.py +++ b/etl/src/helpers/parsers/weight.py @@ -6,7 +6,7 @@ import logging from typing import Optional import re -from helpers.misc import convert_units +from helpers.parsers.misc import convert_units UNIT_CONVERSIONS = {"pounds": 453.592, "ounces": 28.3495, "g": 1, "kg": 1000} diff --git a/etl/src/helpers/process_rows.py b/etl/src/helpers/process_rows.py new file mode 100644 index 0000000..0c6de36 --- /dev/null +++ b/etl/src/helpers/process_rows.py @@ -0,0 +1,19 @@ +"""Module containing necessary functionality to write to the PostgreSQL sink""" + +import logging + +import apache_beam as beam + +from helpers.parsers import parse_row + + +class ProcessRows(beam.DoFn): + """DoFn to process and parse rows from the input file into structured + dictionaries""" + + # pylint: disable=abstract-method,arguments-differ + def process(self, element): + if (row := parse_row(element)) is not None: + yield row + else: + logging.warning("could not successfully parse this row: %s", element) diff --git a/etl/src/helpers/read_from_csv.py b/etl/src/helpers/read_from_csv.py new file mode 100644 index 0000000..a6a1c74 --- /dev/null +++ b/etl/src/helpers/read_from_csv.py @@ -0,0 +1,28 @@ +"""Module containing ReadFromCsv DoFn to create a PTransform to read from a CSV +input file""" + +import io +import logging +import csv + +import apache_beam as beam +from apache_beam.io.filesystems import FileSystems + + +class ReadFromCsv(beam.DoFn): + """This custom DoFn will read from a CSV file and yield each row as a + dictionary where the row names are the keys and the cells are the values + """ + + # pylint: disable=abstract-method,arguments-differ + def process(self, element): + logging.info("reading from input file: %s", element) + with FileSystems.open(element) as file: + text_wrapper = io.TextIOWrapper(file) + reader = csv.reader(text_wrapper) + try: + header = next(reader) + except StopIteration: + return + for row in reader: + yield dict(zip(header, row)) diff --git a/etl/src/helpers/data_io.py b/etl/src/helpers/upsert_products_to_pg.py similarity index 80% rename from etl/src/helpers/data_io.py rename to etl/src/helpers/upsert_products_to_pg.py index 7185d29..f82b3e7 100644 --- a/etl/src/helpers/data_io.py +++ b/etl/src/helpers/upsert_products_to_pg.py @@ -1,36 +1,11 @@ -"""Module containing the IO parts of the pipeline""" +"""Module containing necessary functionality to write to the PostgreSQL sink""" -#!/usr/bin/env python - -import io import logging -import csv from typing import Dict import apache_beam as beam import psycopg2 -from apache_beam.io.filesystems import FileSystems - - -class ReadFromCsv(beam.DoFn): - """This custom DoFn will read from a CSV file and yield each row as a - dictionary where the row names are the keys and the cells are the values - """ - - # pylint: disable=abstract-method,arguments-differ - def process(self, element): - logging.info("reading from input file: %s", element) - with FileSystems.open(element) as file: - text_wrapper = io.TextIOWrapper(file) - reader = csv.reader(text_wrapper) - try: - header = next(reader) - except StopIteration: - return - for row in reader: - yield dict(zip(header, row)) - class WriteToPostgreSQL(beam.DoFn): """DoFn to write elements to a PostgreSQL database""" @@ -68,8 +43,13 @@ class WriteToPostgreSQL(beam.DoFn): def process(self, element): if self.connection is not None: cursor = self.connection.cursor() + logging.info( + "inserting the following element into the database: %s", element + ) self.execute_insert(element, cursor) cursor.close() + else: + logging.error("something went wrong with the connection to postresql") def teardown(self): if self.connection is not None: diff --git a/etl/src/main.py b/etl/src/main.py index 527bc53..f3897b4 100644 --- a/etl/src/main.py +++ b/etl/src/main.py @@ -6,11 +6,9 @@ database import logging import apache_beam as beam - from apache_beam.options.pipeline_options import PipelineOptions -from helpers.data_io import ReadFromCsv, UpsertProductsToPg -from helpers.parse_row import parse_row +from helpers import UpsertProductsToPg, ReadFromCsv, ProcessRows class SustainabilityScoreOptions(PipelineOptions): @@ -47,7 +45,7 @@ def main(): pipeline \ | beam.Create([opts.input]) \ | beam.ParDo(ReadFromCsv()) \ - | beam.Map(parse_row) \ + | beam.ParDo(ProcessRows()) \ | beam.ParDo(UpsertProductsToPg( connection_details=pg_connection_details, table=opts.pg_table, diff --git a/etl/src/tests/test_convert_units.py b/etl/src/tests/parsers/test_convert_units.py similarity index 86% rename from etl/src/tests/test_convert_units.py rename to etl/src/tests/parsers/test_convert_units.py index 8798e8d..c77cd61 100644 --- a/etl/src/tests/test_convert_units.py +++ b/etl/src/tests/parsers/test_convert_units.py @@ -1,8 +1,8 @@ """Test the `convert_units`""" -from helpers.dimensions import UNIT_CONVERSIONS as dimension_unit_conversions -from helpers.weight import UNIT_CONVERSIONS as weight_unit_conversions -from helpers.misc import convert_units +from helpers.parsers.dimensions import UNIT_CONVERSIONS as dimension_unit_conversions +from helpers.parsers.weight import UNIT_CONVERSIONS as weight_unit_conversions +from helpers.parsers.misc import convert_units def test_units_to_cm_inches(): diff --git a/etl/src/tests/test_dimensions.py b/etl/src/tests/parsers/test_dimensions.py similarity index 97% rename from etl/src/tests/test_dimensions.py rename to etl/src/tests/parsers/test_dimensions.py index 37821f5..f471b9d 100644 --- a/etl/src/tests/test_dimensions.py +++ b/etl/src/tests/parsers/test_dimensions.py @@ -1,6 +1,6 @@ """Test the `parse_dimensions` function and its helpers""" -from helpers.dimensions import parse_dimensions, parse_dimensions_measure +from helpers.parsers.dimensions import parse_dimensions, parse_dimensions_measure def test_none(): diff --git a/etl/src/tests/test_materials.py b/etl/src/tests/parsers/test_materials.py similarity index 91% rename from etl/src/tests/test_materials.py rename to etl/src/tests/parsers/test_materials.py index dde291f..d1a7f55 100644 --- a/etl/src/tests/test_materials.py +++ b/etl/src/tests/parsers/test_materials.py @@ -1,6 +1,10 @@ """Test the `parse_materials` function and its helpers""" -from helpers.materials import parse_materials, clean_material_name, material_classifier +from helpers.parsers.materials import ( + parse_materials, + clean_material_name, + material_classifier, +) def test_none(): diff --git a/etl/src/tests/test_origin.py b/etl/src/tests/parsers/test_origin.py similarity index 96% rename from etl/src/tests/test_origin.py rename to etl/src/tests/parsers/test_origin.py index 48dd6a0..d553a3d 100644 --- a/etl/src/tests/test_origin.py +++ b/etl/src/tests/parsers/test_origin.py @@ -1,6 +1,6 @@ """Test the `clean_material_name`""" -from helpers.origin import clean_origin_name +from helpers.parsers.origin import clean_origin_name def test_none(): @@ -46,4 +46,3 @@ def test_clean_origin_name5(): def test_clean_origin_name6(): """Test a sample input for clean_origin_name""" assert clean_origin_name(" made in the USA or imported") == "mixed" - diff --git a/etl/src/tests/test_parse_xml.py b/etl/src/tests/parsers/test_parse_xml.py similarity index 99% rename from etl/src/tests/test_parse_xml.py rename to etl/src/tests/parsers/test_parse_xml.py index e8639d4..044b1d5 100644 --- a/etl/src/tests/test_parse_xml.py +++ b/etl/src/tests/parsers/test_parse_xml.py @@ -2,7 +2,7 @@ import xml.etree.ElementTree as ET -from helpers.parse_xml import parse_raw_specs, iter_parse +from helpers.parsers.parse_xml import parse_raw_specs, iter_parse def test_parse_raw_specs0(): diff --git a/etl/src/tests/test_weight.py b/etl/src/tests/parsers/test_weight.py similarity index 95% rename from etl/src/tests/test_weight.py rename to etl/src/tests/parsers/test_weight.py index aa87b5f..026b6b5 100644 --- a/etl/src/tests/test_weight.py +++ b/etl/src/tests/parsers/test_weight.py @@ -1,6 +1,6 @@ """Test the `parse_weight` and `dimensional_weight`""" -from helpers.weight import parse_weight, dimensional_weight +from helpers.parsers.weight import parse_weight, dimensional_weight def test_parse_weight_none():