dayrize-usecase/pipeline/beam_etl/main.py

55 lines
1.4 KiB
Python
Raw Normal View History

2023-06-21 19:11:17 +02:00
#!/usr/bin/env python
import io
import logging
import csv
import apache_beam as beam
from apache_beam.io.filesystems import FileSystems
from apache_beam.options.pipeline_options import PipelineOptions
class SustainabilityScoreOptions(PipelineOptions):
"""Options for this pipeline"""
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
"--input", help="Input CSV file to process", type=str
)
parser.add_value_provider_argument(
"--output", help="Destination destination table", type=str
)
class ReadFromCsv(beam.DoFn):
"""This custom DoFn will read from a CSV file and yield each row as a
dictionary where the row names are the keys and the cells are the values
"""
def process(self, in_file):
with FileSystems.open(in_file.get()) as file:
text_wrapper = io.TextIOWrapper(file)
reader = csv.reader(text_wrapper)
header = next(reader)
for row in reader:
yield dict(zip(header, row))
def main():
beam_options = PipelineOptions()
opts = beam_options.view_as(SustainabilityScoreOptions)
with beam.Pipeline(options=beam_options) as pipeline:
# fmt: off
pipeline \
| beam.Create([opts.input]) \
| beam.ParDo(ReadFromCsv())
# fmt: on
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
main()