diff --git a/pipeline/beam_etl/helpers/data_io.py b/pipeline/beam_etl/helpers/data_io.py index 70c89c7..0f8c17e 100644 --- a/pipeline/beam_etl/helpers/data_io.py +++ b/pipeline/beam_etl/helpers/data_io.py @@ -7,6 +7,7 @@ import logging import csv import apache_beam as beam +import psycopg2 from apache_beam.io.filesystems import FileSystems @@ -17,9 +18,8 @@ class ReadFromCsv(beam.DoFn): """ def process(self, in_file): - fname = in_file.get() - logging.info("reading from input file: %s", fname) - with FileSystems.open(fname) as file: + logging.info("reading from input file: %s", in_file) + with FileSystems.open(in_file) as file: text_wrapper = io.TextIOWrapper(file) reader = csv.reader(text_wrapper) try: @@ -28,3 +28,39 @@ class ReadFromCsv(beam.DoFn): return for row in reader: yield dict(zip(header, row)) + + +class WriteToPostgreSQL(beam.DoFn): + """DoFn to write elements to a PostgreSQL database""" + + def __init__(self, hostname, port, username, password, database, table): + self.hostname = hostname + self.port = port + self.username = username + self.password = password + self.database = database + self.table = table + + def setup(self): + self.connection = psycopg2.connect( + host=self.hostname, + port=self.port, + user=self.username, + password=self.password, + database=self.database, + ) + + def process(self, element): + cursor = self.connection.cursor() + colnames = ",".join(element.keys()) + values = ",".join(["%s"] * len(element)) + sql = f""" + INSERT INTO { self.table } ({ colnames }) + VALUES ({ values }) + """ + cursor.execute(sql, list(element.values())) + self.connection.commit() + cursor.close() + + def teardown(self): + self.connection.close() diff --git a/pipeline/beam_etl/helpers/parse_row.py b/pipeline/beam_etl/helpers/parse_row.py index b597f6b..fb2d711 100644 --- a/pipeline/beam_etl/helpers/parse_row.py +++ b/pipeline/beam_etl/helpers/parse_row.py @@ -14,8 +14,8 @@ from helpers.weight import parse_weight, dimensional_weight class CleanRow(TypedDict): """Type to represent clean rows to be inserted in the database""" - gtin13: int - tcin: int + gtin13: str + tcin: str primary_category: str materials: Optional[List[str]] packaging: int @@ -37,12 +37,6 @@ def parse_row(element: Dict[str, str]) -> Optional[CleanRow]: logging.error("gtin13 missing") return None - try: - gtin13 = int(gtin13.strip()) - except ValueError: - logging.error("malformed GTIN13") - return None - # primary category should always be there try: primary_category = element["primary_category"] @@ -55,19 +49,13 @@ def parse_row(element: Dict[str, str]) -> Optional[CleanRow]: logging.error("could not parse raw_specifications") return None - # TCIN should be a mandatory field in the from of an int + # TCIN should be a mandatory field try: - tcin_value = specifications["tcin"] + tcin = specifications["tcin"] except KeyError: logging.error("TCIN missing") return None - try: - tcin = int(tcin_value.strip()) - except ValueError: - logging.error("malformed TCIN") - return None - materials = parse_materials(specifications.get("materials")) # if packaging is not specified, assume only one unit is found in the diff --git a/pipeline/beam_etl/main.py b/pipeline/beam_etl/main.py index 86638f3..f9fcc16 100644 --- a/pipeline/beam_etl/main.py +++ b/pipeline/beam_etl/main.py @@ -1,43 +1,35 @@ #!/usr/bin/env python -import io import logging -import csv import apache_beam as beam -from apache_beam.io.filesystems import FileSystems from apache_beam.options.pipeline_options import PipelineOptions +from helpers.data_io import ReadFromCsv, WriteToPostgreSQL +from helpers.parse_row import parse_row + + +# def __init__(self, hostname, port, username, password, database): + class SustainabilityScoreOptions(PipelineOptions): """Options for this pipeline""" @classmethod def _add_argparse_args(cls, parser): - parser.add_value_provider_argument( - "--input", help="Input CSV file to process", type=str - ) - parser.add_value_provider_argument( - "--output", help="Destination destination table", type=str - ) - - -class ReadFromCsv(beam.DoFn): - """This custom DoFn will read from a CSV file and yield each row as a - dictionary where the row names are the keys and the cells are the values - """ - - def process(self, in_file): - with FileSystems.open(in_file.get()) as file: - text_wrapper = io.TextIOWrapper(file) - reader = csv.reader(text_wrapper) - header = next(reader) - for row in reader: - yield dict(zip(header, row)) + parser.add_argument("--input", help="Input CSV file to process", type=str) + parser.add_argument("--pg_hostname", help="Postgres hostname", type=str) + parser.add_argument("--pg_port", help="Postgres port", type=str) + parser.add_argument("--pg_username", help="Postgres username", type=str) + parser.add_argument("--pg_password", help="Postgres password", type=str) + parser.add_argument("--pg_database", help="Postgres database name", type=str) + parser.add_argument("--pg_table", help="Postgres table name", type=str) def main(): + """Construct and run the pipeline""" + beam_options = PipelineOptions() opts = beam_options.view_as(SustainabilityScoreOptions) @@ -45,7 +37,16 @@ def main(): # fmt: off pipeline \ | beam.Create([opts.input]) \ - | beam.ParDo(ReadFromCsv()) + | beam.ParDo(ReadFromCsv()) \ + | beam.Map(parse_row) \ + | beam.ParDo(WriteToPostgreSQL( + hostname=opts.pg_hostname, + port=opts.pg_port, + username=opts.pg_username, + password=opts.pg_password, + database=opts.pg_database, + table=opts.pg_table, + )) # fmt: on diff --git a/pipeline/beam_etl/pyproject.toml b/pipeline/beam_etl/pyproject.toml index f27730d..882e11a 100644 --- a/pipeline/beam_etl/pyproject.toml +++ b/pipeline/beam_etl/pyproject.toml @@ -1,7 +1,7 @@ [project] name = "beam_etl" version = "0.1" -dependencies = ["wheel", "apache-beam[gcp]", "pandas"] +dependencies = ["wheel", "apache-beam[gcp]", "pandas", "psycopg2"] [project.optional-dependencies] dev = ["pytest", "pylint", "black"] diff --git a/pipeline/beam_etl/requirements.txt b/pipeline/beam_etl/requirements.txt index c8e9371..e5a0b96 100644 --- a/pipeline/beam_etl/requirements.txt +++ b/pipeline/beam_etl/requirements.txt @@ -177,6 +177,8 @@ protobuf==4.23.3 # grpc-google-iam-v1 # grpcio-status # proto-plus +psycopg2==2.9.6 + # via beam-etl (pyproject.toml) pyarrow==11.0.0 # via apache-beam pyasn1==0.5.0 diff --git a/pipeline/dags/sustainability_score/__init__.py b/pipeline/dags/sustainability_score/__init__.py index f1611b5..d69e639 100644 --- a/pipeline/dags/sustainability_score/__init__.py +++ b/pipeline/dags/sustainability_score/__init__.py @@ -22,6 +22,7 @@ CSV_FNAME = ( CONFIG = { "input": f"{ HOME }/gcs/data/{ CSV_FNAME }", "beam_etl_path": "/beam_etl/main.py", + "output_table": "sustainability_score.products", } with DAG( @@ -43,7 +44,15 @@ with DAG( etl_pipeline = BeamRunPythonPipelineOperator( task_id="beam_etl", py_file="{{ params.beam_etl_path }}", - pipeline_options={"input": "{{ params.input }}"}, + pipeline_options={ + "input": "{{ params.input }}", + "pg_hostname": "{{ conn.get('pg_db').host }}", + "pg_port": "{{ conn.get('pg_db').port }}", + "pg_username": "{{ conn.get('pg_db').login }}", + "pg_password": "{{ conn.get('pg_db').password }}", + "pg_database": "{{ conn.get('pg_db').schema }}", + "pg_table": "{{ params.output_table }}", + }, ) create_products_table >> etl_pipeline diff --git a/pipeline/sql/products_schema.sql b/pipeline/sql/products_schema.sql index 0854d69..1a9868b 100644 --- a/pipeline/sql/products_schema.sql +++ b/pipeline/sql/products_schema.sql @@ -1,6 +1,6 @@ -CREATE TABLE IF NOT EXISTS sustainability_score.products ( - gtin13 INT PRIMARY KEY, - tcin INT NOT NULL, +CREATE TABLE IF NOT EXISTS {{ params.output_table }} ( + gtin13 VARCHAR PRIMARY KEY, + tcin VARCHAR NOT NULL, primary_category VARCHAR NOT NULL, materials VARCHAR[], packaging INT NOT NULL,