"""Module containing the IO parts of the pipeline""" #!/usr/bin/env python import io import logging import csv import apache_beam as beam import psycopg2 from apache_beam.io.filesystems import FileSystems class ReadFromCsv(beam.DoFn): """This custom DoFn will read from a CSV file and yield each row as a dictionary where the row names are the keys and the cells are the values """ def process(self, in_file): logging.info("reading from input file: %s", in_file) with FileSystems.open(in_file) as file: text_wrapper = io.TextIOWrapper(file) reader = csv.reader(text_wrapper) try: header = next(reader) except StopIteration: return for row in reader: yield dict(zip(header, row)) class WriteToPostgreSQL(beam.DoFn): """DoFn to write elements to a PostgreSQL database""" def __init__( self, hostname, port, username, password, database, table, table_key=None ): self.connection_details = { "host": hostname, "port": port, "user": username, "password": password, "database": database, } self.table = table self.table_key = table_key def setup(self): self.connection = psycopg2.connect(**self.connection_details, autocommit=True) def execute_insert(self, row, cursor): colnames = ",".join(row.keys()) values = ",".join(["%s"] * len(row)) sql = f""" INSERT INTO { self.table } ({ colnames }) VALUES ({ values }) """ if self.table_key is not None: sql = sql + f" ON CONFLICT ({ self.table_key }) DO NOTHING" cursor.execute(sql, list(row.values())) def process(self, element): cursor = self.connection.cursor() self.execute_insert(element, cursor) cursor.close() def teardown(self): self.connection.close()