diff --git a/pipeline/beam_etl/helpers/data_io.py b/pipeline/beam_etl/helpers/data_io.py index 0f8c17e..482de3d 100644 --- a/pipeline/beam_etl/helpers/data_io.py +++ b/pipeline/beam_etl/helpers/data_io.py @@ -33,13 +33,16 @@ class ReadFromCsv(beam.DoFn): class WriteToPostgreSQL(beam.DoFn): """DoFn to write elements to a PostgreSQL database""" - def __init__(self, hostname, port, username, password, database, table): + def __init__( + self, hostname, port, username, password, database, table, table_key=None + ): self.hostname = hostname self.port = port self.username = username self.password = password self.database = database self.table = table + self.table_key = table_key def setup(self): self.connection = psycopg2.connect( @@ -58,6 +61,19 @@ class WriteToPostgreSQL(beam.DoFn): INSERT INTO { self.table } ({ colnames }) VALUES ({ values }) """ + if self.table_key is not None: + update_statement = ",".join( + f"{ col } = EXCLUDED.{ col }" + for col in element.keys() + if col != self.table_key + ) + sql = ( + sql + + f""" + ON CONFLICT ({ self.table_key }) DO UPDATE SET + { update_statement } + """ + ) cursor.execute(sql, list(element.values())) self.connection.commit() cursor.close() diff --git a/pipeline/beam_etl/main.py b/pipeline/beam_etl/main.py index f9fcc16..ec85ed7 100644 --- a/pipeline/beam_etl/main.py +++ b/pipeline/beam_etl/main.py @@ -46,6 +46,7 @@ def main(): password=opts.pg_password, database=opts.pg_database, table=opts.pg_table, + table_key="gtin13", )) # fmt: on