"""Module containing the IO parts of the pipeline""" #!/usr/bin/env python import io import logging import csv import apache_beam as beam import psycopg2 from apache_beam.io.filesystems import FileSystems class ReadFromCsv(beam.DoFn): """This custom DoFn will read from a CSV file and yield each row as a dictionary where the row names are the keys and the cells are the values """ def process(self, in_file): logging.info("reading from input file: %s", in_file) with FileSystems.open(in_file) as file: text_wrapper = io.TextIOWrapper(file) reader = csv.reader(text_wrapper) try: header = next(reader) except StopIteration: return for row in reader: yield dict(zip(header, row)) class WriteToPostgreSQL(beam.DoFn): """DoFn to write elements to a PostgreSQL database""" def __init__( self, hostname, port, username, password, database, table, table_key=None ): self.connection_details = { "host": hostname, "port": port, "user": username, "password": password, "database": database, } self.table = table self.table_key = table_key def setup(self): self.connection = psycopg2.connect(**self.connection_details) self.connection.autocommit = True def execute_insert(self, row, cursor): colnames = ",".join(row.keys()) values = ",".join(["%s"] * len(row)) sql = f""" INSERT INTO { self.table } ({ colnames }) VALUES ({ values }) """ if self.table_key is not None: sql = sql + f" ON CONFLICT ({ self.table_key }) DO NOTHING" cursor.execute(sql, list(row.values())) def process(self, element): cursor = self.connection.cursor() self.execute_insert(element, cursor) cursor.close() def teardown(self): self.connection.close() class UpsertProductsToPg(WriteToPostgreSQL): """DoFn to write products to PostgreSQL with our upsert logic""" def execute_insert(self, row, cursor): colnames = ",".join(row.keys()) values = ",".join(["%s"] * len(row)) sql = f""" INSERT INTO { self.table } ({ colnames }, ingestion_time) VALUES ({ values }, NOW()::TIMESTAMP) ON CONFLICT ({ self.table_key }) DO UPDATE SET gtin13 = EXCLUDED.gtin13, primary_category = EXCLUDED.primary_category, materials = EXCLUDED.materials, packaging = EXCLUDED.packaging, origin = EXCLUDED.origin, weight = EXCLUDED.weight, ingestion_time = NOW()::TIMESTAMP WHERE { self.table }.primary_category != EXCLUDED.primary_category OR { self.table }.materials != EXCLUDED.materials OR { self.table }.packaging != EXCLUDED.packaging OR { self.table }.origin != EXCLUDED.origin OR { self.table }.weight != EXCLUDED.weight """ cursor.execute(sql, list(row.values()))