dayrize-usecase/etl/main.py

57 lines
1.8 KiB
Python

#!/usr/bin/env python
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from helpers.data_io import ReadFromCsv, UpsertProductsToPg
from helpers.parse_row import parse_row
# def __init__(self, hostname, port, username, password, database):
class SustainabilityScoreOptions(PipelineOptions):
"""Options for this pipeline"""
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument("--input", help="Input CSV file to process", type=str)
parser.add_argument("--pg_hostname", help="Postgres hostname", type=str)
parser.add_argument("--pg_port", help="Postgres port", type=str)
parser.add_argument("--pg_username", help="Postgres username", type=str)
parser.add_argument("--pg_password", help="Postgres password", type=str)
parser.add_argument("--pg_database", help="Postgres database name", type=str)
parser.add_argument("--pg_table", help="Postgres table name", type=str)
def main():
"""Construct and run the pipeline"""
beam_options = PipelineOptions()
opts = beam_options.view_as(SustainabilityScoreOptions)
with beam.Pipeline(options=beam_options) as pipeline:
# fmt: off
pipeline \
| beam.Create([opts.input]) \
| beam.ParDo(ReadFromCsv()) \
| beam.Map(parse_row) \
| beam.ParDo(UpsertProductsToPg(
hostname=opts.pg_hostname,
port=opts.pg_port,
username=opts.pg_username,
password=opts.pg_password,
database=opts.pg_database,
table=opts.pg_table,
table_key="tcin",
))
# fmt: on
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
main()