dayrize-usecase/etl/src/helpers/upsert_products_to_pg.py

97 lines
3.6 KiB
Python

"""Module containing necessary functionality to write to the PostgreSQL sink"""
import logging
from typing import Dict
import apache_beam as beam
import psycopg2
class WriteToPostgreSQL(beam.DoFn):
"""DoFn to write elements to a PostgreSQL database"""
# pylint: disable=abstract-method
def __init__(self, connection_details: Dict[str, str], table, table_key=None):
# pylint: disable=super-init-not-called
self.connection_details = connection_details
self.table = table
self.table_key = table_key
self.connection = None
def setup(self):
self.connection = psycopg2.connect(**self.connection_details)
self.connection.autocommit = True
def execute_insert(self, row: Dict, cursor):
"""Given a dictionary reporesenting a row and a postgresql cursor,
insert the row into the database so that the dict keys are the colum
names and the dict values the cell values.
If a table_key is specified (`self.table_key` is set) handle conflicts
by doing nothing"""
colnames = ",".join(row.keys())
values = ",".join(["%s"] * len(row))
sql = f"""
INSERT INTO { self.table } ({ colnames })
VALUES ({ values })
"""
if self.table_key is not None:
sql = sql + f" ON CONFLICT ({ self.table_key }) DO NOTHING"
cursor.execute(sql, list(row.values()))
# pylint: disable=arguments-differ
def process(self, element):
if self.connection is not None:
cursor = self.connection.cursor()
logging.info(
"inserting the following element into the database: %s", element
)
self.execute_insert(element, cursor)
cursor.close()
else:
logging.error("something went wrong with the connection to postresql")
def teardown(self):
if self.connection is not None:
self.connection.close()
class UpsertProductsToPg(WriteToPostgreSQL):
"""DoFn to write products to PostgreSQL with our upsert logic"""
# pylint: disable=abstract-method
def execute_insert(self, row, cursor):
"""Our upsert logic is the following:
When any of primary_category, packaging, origin, height, depth or with
has changed, update the element and set the ingestion_time value to the
current timestamp
"""
colnames = ",".join(row.keys())
values = ",".join(["%s"] * len(row))
sql = f"""
INSERT INTO { self.table } ({ colnames }, ingestion_time)
VALUES ({ values }, NOW()::TIMESTAMP)
ON CONFLICT ({ self.table_key }) DO UPDATE
SET
gtin13 = EXCLUDED.gtin13,
primary_category = EXCLUDED.primary_category,
materials = EXCLUDED.materials,
packaging = EXCLUDED.packaging,
origin = EXCLUDED.origin,
weight = EXCLUDED.weight,
height = EXCLUDED.height,
depth = EXCLUDED.depth,
width = EXCLUDED.width,
ingestion_time = NOW()::TIMESTAMP
WHERE
{ self.table }.primary_category != EXCLUDED.primary_category OR
{ self.table }.materials != EXCLUDED.materials OR
{ self.table }.packaging != EXCLUDED.packaging OR
{ self.table }.origin != EXCLUDED.origin OR
{ self.table }.weight != EXCLUDED.weight OR
{ self.table }.height != EXCLUDED.height OR
{ self.table }.depth != EXCLUDED.depth OR
{ self.table }.width != EXCLUDED.width
"""
cursor.execute(sql, list(row.values()))