"""Module containing the IO parts of the pipeline""" #!/usr/bin/env python import io import logging import csv import apache_beam as beam from apache_beam.io.filesystems import FileSystems class ReadFromCsv(beam.DoFn): """This custom DoFn will read from a CSV file and yield each row as a dictionary where the row names are the keys and the cells are the values """ def process(self, in_file): fname = in_file.get() logging.info("reading from input file: %s", fname) with FileSystems.open(fname) as file: text_wrapper = io.TextIOWrapper(file) reader = csv.reader(text_wrapper) try: header = next(reader) except StopIteration: return for row in reader: yield dict(zip(header, row))