dayrize-usecase/etl/helpers/data_io.py

70 lines
2.0 KiB
Python

"""Module containing the IO parts of the pipeline"""
#!/usr/bin/env python
import io
import logging
import csv
import apache_beam as beam
import psycopg2
from apache_beam.io.filesystems import FileSystems
class ReadFromCsv(beam.DoFn):
"""This custom DoFn will read from a CSV file and yield each row as a
dictionary where the row names are the keys and the cells are the values
"""
def process(self, in_file):
logging.info("reading from input file: %s", in_file)
with FileSystems.open(in_file) as file:
text_wrapper = io.TextIOWrapper(file)
reader = csv.reader(text_wrapper)
try:
header = next(reader)
except StopIteration:
return
for row in reader:
yield dict(zip(header, row))
class WriteToPostgreSQL(beam.DoFn):
"""DoFn to write elements to a PostgreSQL database"""
def __init__(
self, hostname, port, username, password, database, table, table_key=None
):
self.connection_details = {
"host": hostname,
"port": port,
"user": username,
"password": password,
"database": database,
}
self.table = table
self.table_key = table_key
def setup(self):
self.connection = psycopg2.connect(**self.connection_details, autocommit=True)
def execute_insert(self, row, cursor):
colnames = ",".join(row.keys())
values = ",".join(["%s"] * len(row))
sql = f"""
INSERT INTO { self.table } ({ colnames })
VALUES ({ values })
"""
if self.table_key is not None:
sql = sql + f" ON CONFLICT ({ self.table_key }) DO NOTHING"
cursor.execute(sql, list(row.values()))
def process(self, element):
cursor = self.connection.cursor()
self.execute_insert(element, cursor)
cursor.close()
def teardown(self):
self.connection.close()