dayrize-usecase/etl/helpers/data_io.py

98 lines
3.0 KiB
Python
Raw Normal View History

"""Module containing the IO parts of the pipeline"""
#!/usr/bin/env python
import io
import logging
import csv
import apache_beam as beam
import psycopg2
from apache_beam.io.filesystems import FileSystems
class ReadFromCsv(beam.DoFn):
"""This custom DoFn will read from a CSV file and yield each row as a
dictionary where the row names are the keys and the cells are the values
"""
def process(self, in_file):
logging.info("reading from input file: %s", in_file)
with FileSystems.open(in_file) as file:
text_wrapper = io.TextIOWrapper(file)
reader = csv.reader(text_wrapper)
try:
header = next(reader)
except StopIteration:
return
for row in reader:
yield dict(zip(header, row))
class WriteToPostgreSQL(beam.DoFn):
"""DoFn to write elements to a PostgreSQL database"""
def __init__(
self, hostname, port, username, password, database, table, table_key=None
):
self.connection_details = {
"host": hostname,
"port": port,
"user": username,
"password": password,
"database": database,
}
self.table = table
self.table_key = table_key
def setup(self):
2023-06-25 12:22:06 +02:00
self.connection = psycopg2.connect(**self.connection_details)
self.connection.autocommit = True
def execute_insert(self, row, cursor):
colnames = ",".join(row.keys())
values = ",".join(["%s"] * len(row))
sql = f"""
INSERT INTO { self.table } ({ colnames })
VALUES ({ values })
"""
if self.table_key is not None:
sql = sql + f" ON CONFLICT ({ self.table_key }) DO NOTHING"
cursor.execute(sql, list(row.values()))
def process(self, element):
cursor = self.connection.cursor()
self.execute_insert(element, cursor)
cursor.close()
def teardown(self):
self.connection.close()
2023-06-25 13:18:44 +02:00
class UpsertProductsToPg(WriteToPostgreSQL):
"""DoFn to write products to PostgreSQL with our upsert logic"""
def execute_insert(self, row, cursor):
colnames = ",".join(row.keys())
values = ",".join(["%s"] * len(row))
sql = f"""
INSERT INTO { self.table } ({ colnames })
VALUES ({ values })
ON CONFLICT ({ self.table_key }) DO UPDATE
SET
gtin13 = EXCLUDED.gtin13,
primary_category = EXCLUDED.primary_category,
materials = EXCLUDED.materials,
packaging = EXCLUDED.packaging,
origin = EXCLUDED.origin,
weight = EXCLUDED.weight,
WHERE
primary_category != EXCLUDED.primary_category
materials != EXCLUDED.materials
packaging != EXCLUDED.packaging
origin != EXCLUDED.origin
weight != EXCLUDED.weight
"""
cursor.execute(sql, list(row.values()))