57 lines
1.8 KiB
Python
57 lines
1.8 KiB
Python
|
#!/usr/bin/env python
|
||
|
|
||
|
import logging
|
||
|
|
||
|
import apache_beam as beam
|
||
|
|
||
|
from apache_beam.options.pipeline_options import PipelineOptions
|
||
|
|
||
|
from helpers.data_io import ReadFromCsv, WriteToPostgreSQL
|
||
|
from helpers.parse_row import parse_row
|
||
|
|
||
|
|
||
|
# def __init__(self, hostname, port, username, password, database):
|
||
|
|
||
|
|
||
|
class SustainabilityScoreOptions(PipelineOptions):
|
||
|
"""Options for this pipeline"""
|
||
|
|
||
|
@classmethod
|
||
|
def _add_argparse_args(cls, parser):
|
||
|
parser.add_argument("--input", help="Input CSV file to process", type=str)
|
||
|
parser.add_argument("--pg_hostname", help="Postgres hostname", type=str)
|
||
|
parser.add_argument("--pg_port", help="Postgres port", type=str)
|
||
|
parser.add_argument("--pg_username", help="Postgres username", type=str)
|
||
|
parser.add_argument("--pg_password", help="Postgres password", type=str)
|
||
|
parser.add_argument("--pg_database", help="Postgres database name", type=str)
|
||
|
parser.add_argument("--pg_table", help="Postgres table name", type=str)
|
||
|
|
||
|
|
||
|
def main():
|
||
|
"""Construct and run the pipeline"""
|
||
|
|
||
|
beam_options = PipelineOptions()
|
||
|
opts = beam_options.view_as(SustainabilityScoreOptions)
|
||
|
|
||
|
with beam.Pipeline(options=beam_options) as pipeline:
|
||
|
# fmt: off
|
||
|
pipeline \
|
||
|
| beam.Create([opts.input]) \
|
||
|
| beam.ParDo(ReadFromCsv()) \
|
||
|
| beam.Map(parse_row) \
|
||
|
| beam.ParDo(WriteToPostgreSQL(
|
||
|
hostname=opts.pg_hostname,
|
||
|
port=opts.pg_port,
|
||
|
username=opts.pg_username,
|
||
|
password=opts.pg_password,
|
||
|
database=opts.pg_database,
|
||
|
table=opts.pg_table,
|
||
|
table_key="gtin13",
|
||
|
))
|
||
|
# fmt: on
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
logging.getLogger().setLevel(logging.INFO)
|
||
|
main()
|