feat: added module for reading and writing data
parent
7a86988bd1
commit
63e961e9a2
|
@ -0,0 +1,30 @@
|
|||
"""Module containing the IO parts of the pipeline"""
|
||||
|
||||
#!/usr/bin/env python
|
||||
|
||||
import io
|
||||
import logging
|
||||
import csv
|
||||
|
||||
import apache_beam as beam
|
||||
|
||||
from apache_beam.io.filesystems import FileSystems
|
||||
|
||||
|
||||
class ReadFromCsv(beam.DoFn):
|
||||
"""This custom DoFn will read from a CSV file and yield each row as a
|
||||
dictionary where the row names are the keys and the cells are the values
|
||||
"""
|
||||
|
||||
def process(self, in_file):
|
||||
fname = in_file.get()
|
||||
logging.info("reading from input file: %s", fname)
|
||||
with FileSystems.open(fname) as file:
|
||||
text_wrapper = io.TextIOWrapper(file)
|
||||
reader = csv.reader(text_wrapper)
|
||||
try:
|
||||
header = next(reader)
|
||||
except StopIteration:
|
||||
return
|
||||
for row in reader:
|
||||
yield dict(zip(header, row))
|
Loading…
Reference in New Issue