diff --git a/etl/helpers/data_io.py b/etl/helpers/data_io.py index e2dbfe9..2e51bc1 100644 --- a/etl/helpers/data_io.py +++ b/etl/helpers/data_io.py @@ -68,3 +68,30 @@ class WriteToPostgreSQL(beam.DoFn): def teardown(self): self.connection.close() + + +class UpsertProductsToPg(WriteToPostgreSQL): + """DoFn to write products to PostgreSQL with our upsert logic""" + + def execute_insert(self, row, cursor): + colnames = ",".join(row.keys()) + values = ",".join(["%s"] * len(row)) + sql = f""" + INSERT INTO { self.table } ({ colnames }) + VALUES ({ values }) + ON CONFLICT ({ self.table_key }) DO UPDATE + SET + gtin13 = EXCLUDED.gtin13, + primary_category = EXCLUDED.primary_category, + materials = EXCLUDED.materials, + packaging = EXCLUDED.packaging, + origin = EXCLUDED.origin, + weight = EXCLUDED.weight, + WHERE + primary_category != EXCLUDED.primary_category + materials != EXCLUDED.materials + packaging != EXCLUDED.packaging + origin != EXCLUDED.origin + weight != EXCLUDED.weight + """ + cursor.execute(sql, list(row.values())) diff --git a/etl/main.py b/etl/main.py index 796c8fd..1ed229e 100644 --- a/etl/main.py +++ b/etl/main.py @@ -6,7 +6,7 @@ import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions -from helpers.data_io import ReadFromCsv, WriteToPostgreSQL +from helpers.data_io import ReadFromCsv, UpsertProductsToPg from helpers.parse_row import parse_row @@ -39,7 +39,7 @@ def main(): | beam.Create([opts.input]) \ | beam.ParDo(ReadFromCsv()) \ | beam.Map(parse_row) \ - | beam.ParDo(WriteToPostgreSQL( + | beam.ParDo(UpsertProductsToPg( hostname=opts.pg_hostname, port=opts.pg_port, username=opts.pg_username,