feat: update table if there's a primary key conflict
parent
8e89404b76
commit
55b20bb897
|
@ -33,13 +33,16 @@ class ReadFromCsv(beam.DoFn):
|
|||
class WriteToPostgreSQL(beam.DoFn):
|
||||
"""DoFn to write elements to a PostgreSQL database"""
|
||||
|
||||
def __init__(self, hostname, port, username, password, database, table):
|
||||
def __init__(
|
||||
self, hostname, port, username, password, database, table, table_key=None
|
||||
):
|
||||
self.hostname = hostname
|
||||
self.port = port
|
||||
self.username = username
|
||||
self.password = password
|
||||
self.database = database
|
||||
self.table = table
|
||||
self.table_key = table_key
|
||||
|
||||
def setup(self):
|
||||
self.connection = psycopg2.connect(
|
||||
|
@ -58,6 +61,19 @@ class WriteToPostgreSQL(beam.DoFn):
|
|||
INSERT INTO { self.table } ({ colnames })
|
||||
VALUES ({ values })
|
||||
"""
|
||||
if self.table_key is not None:
|
||||
update_statement = ",".join(
|
||||
f"{ col } = EXCLUDED.{ col }"
|
||||
for col in element.keys()
|
||||
if col != self.table_key
|
||||
)
|
||||
sql = (
|
||||
sql
|
||||
+ f"""
|
||||
ON CONFLICT ({ self.table_key }) DO UPDATE SET
|
||||
{ update_statement }
|
||||
"""
|
||||
)
|
||||
cursor.execute(sql, list(element.values()))
|
||||
self.connection.commit()
|
||||
cursor.close()
|
||||
|
|
|
@ -46,6 +46,7 @@ def main():
|
|||
password=opts.pg_password,
|
||||
database=opts.pg_database,
|
||||
table=opts.pg_table,
|
||||
table_key="gtin13",
|
||||
))
|
||||
# fmt: on
|
||||
|
||||
|
|
Loading…
Reference in New Issue