feat: upsert products

main
Ricard Illa 2023-06-25 13:18:44 +02:00
parent 2cf2007434
commit 6d7f4909cc
No known key found for this signature in database
GPG Key ID: F69A672B72E54902
2 changed files with 29 additions and 2 deletions

View File

@ -68,3 +68,30 @@ class WriteToPostgreSQL(beam.DoFn):
def teardown(self): def teardown(self):
self.connection.close() self.connection.close()
class UpsertProductsToPg(WriteToPostgreSQL):
"""DoFn to write products to PostgreSQL with our upsert logic"""
def execute_insert(self, row, cursor):
colnames = ",".join(row.keys())
values = ",".join(["%s"] * len(row))
sql = f"""
INSERT INTO { self.table } ({ colnames })
VALUES ({ values })
ON CONFLICT ({ self.table_key }) DO UPDATE
SET
gtin13 = EXCLUDED.gtin13,
primary_category = EXCLUDED.primary_category,
materials = EXCLUDED.materials,
packaging = EXCLUDED.packaging,
origin = EXCLUDED.origin,
weight = EXCLUDED.weight,
WHERE
primary_category != EXCLUDED.primary_category
materials != EXCLUDED.materials
packaging != EXCLUDED.packaging
origin != EXCLUDED.origin
weight != EXCLUDED.weight
"""
cursor.execute(sql, list(row.values()))

View File

@ -6,7 +6,7 @@ import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import PipelineOptions
from helpers.data_io import ReadFromCsv, WriteToPostgreSQL from helpers.data_io import ReadFromCsv, UpsertProductsToPg
from helpers.parse_row import parse_row from helpers.parse_row import parse_row
@ -39,7 +39,7 @@ def main():
| beam.Create([opts.input]) \ | beam.Create([opts.input]) \
| beam.ParDo(ReadFromCsv()) \ | beam.ParDo(ReadFromCsv()) \
| beam.Map(parse_row) \ | beam.Map(parse_row) \
| beam.ParDo(WriteToPostgreSQL( | beam.ParDo(UpsertProductsToPg(
hostname=opts.pg_hostname, hostname=opts.pg_hostname,
port=opts.pg_port, port=opts.pg_port,
username=opts.pg_username, username=opts.pg_username,