diff --git a/pipeline/beam_etl/helpers/data_io.py b/pipeline/beam_etl/helpers/data_io.py new file mode 100644 index 0000000..70c89c7 --- /dev/null +++ b/pipeline/beam_etl/helpers/data_io.py @@ -0,0 +1,30 @@ +"""Module containing the IO parts of the pipeline""" + +#!/usr/bin/env python + +import io +import logging +import csv + +import apache_beam as beam + +from apache_beam.io.filesystems import FileSystems + + +class ReadFromCsv(beam.DoFn): + """This custom DoFn will read from a CSV file and yield each row as a + dictionary where the row names are the keys and the cells are the values + """ + + def process(self, in_file): + fname = in_file.get() + logging.info("reading from input file: %s", fname) + with FileSystems.open(fname) as file: + text_wrapper = io.TextIOWrapper(file) + reader = csv.reader(text_wrapper) + try: + header = next(reader) + except StopIteration: + return + for row in reader: + yield dict(zip(header, row))