98 lines
3.1 KiB
Python
98 lines
3.1 KiB
Python
"""Module containing the IO parts of the pipeline"""
|
|
|
|
#!/usr/bin/env python
|
|
|
|
import io
|
|
import logging
|
|
import csv
|
|
|
|
import apache_beam as beam
|
|
import psycopg2
|
|
|
|
from apache_beam.io.filesystems import FileSystems
|
|
|
|
|
|
class ReadFromCsv(beam.DoFn):
|
|
"""This custom DoFn will read from a CSV file and yield each row as a
|
|
dictionary where the row names are the keys and the cells are the values
|
|
"""
|
|
|
|
def process(self, in_file):
|
|
logging.info("reading from input file: %s", in_file)
|
|
with FileSystems.open(in_file) as file:
|
|
text_wrapper = io.TextIOWrapper(file)
|
|
reader = csv.reader(text_wrapper)
|
|
try:
|
|
header = next(reader)
|
|
except StopIteration:
|
|
return
|
|
for row in reader:
|
|
yield dict(zip(header, row))
|
|
|
|
|
|
class WriteToPostgreSQL(beam.DoFn):
|
|
"""DoFn to write elements to a PostgreSQL database"""
|
|
|
|
def __init__(
|
|
self, hostname, port, username, password, database, table, table_key=None
|
|
):
|
|
self.connection_details = {
|
|
"host": hostname,
|
|
"port": port,
|
|
"user": username,
|
|
"password": password,
|
|
"database": database,
|
|
}
|
|
self.table = table
|
|
self.table_key = table_key
|
|
|
|
def setup(self):
|
|
self.connection = psycopg2.connect(**self.connection_details)
|
|
self.connection.autocommit = True
|
|
|
|
def execute_insert(self, row, cursor):
|
|
colnames = ",".join(row.keys())
|
|
values = ",".join(["%s"] * len(row))
|
|
sql = f"""
|
|
INSERT INTO { self.table } ({ colnames })
|
|
VALUES ({ values })
|
|
"""
|
|
if self.table_key is not None:
|
|
sql = sql + f" ON CONFLICT ({ self.table_key }) DO NOTHING"
|
|
cursor.execute(sql, list(row.values()))
|
|
|
|
def process(self, element):
|
|
cursor = self.connection.cursor()
|
|
self.execute_insert(element, cursor)
|
|
cursor.close()
|
|
|
|
def teardown(self):
|
|
self.connection.close()
|
|
|
|
|
|
class UpsertProductsToPg(WriteToPostgreSQL):
|
|
"""DoFn to write products to PostgreSQL with our upsert logic"""
|
|
|
|
def execute_insert(self, row, cursor):
|
|
colnames = ",".join(row.keys())
|
|
values = ",".join(["%s"] * len(row))
|
|
sql = f"""
|
|
INSERT INTO { self.table } ({ colnames })
|
|
VALUES ({ values })
|
|
ON CONFLICT ({ self.table_key }) DO UPDATE
|
|
SET
|
|
gtin13 = EXCLUDED.gtin13,
|
|
primary_category = EXCLUDED.primary_category,
|
|
materials = EXCLUDED.materials,
|
|
packaging = EXCLUDED.packaging,
|
|
origin = EXCLUDED.origin,
|
|
weight = EXCLUDED.weight
|
|
WHERE
|
|
{ self.table }.primary_category != EXCLUDED.primary_category OR
|
|
{ self.table }.materials != EXCLUDED.materials OR
|
|
{ self.table }.packaging != EXCLUDED.packaging OR
|
|
{ self.table }.origin != EXCLUDED.origin OR
|
|
{ self.table }.weight != EXCLUDED.weight
|
|
"""
|
|
cursor.execute(sql, list(row.values()))
|