feat: import elements into database using beam
parent
66782ec2ef
commit
8e89404b76
|
@ -7,6 +7,7 @@ import logging
|
||||||
import csv
|
import csv
|
||||||
|
|
||||||
import apache_beam as beam
|
import apache_beam as beam
|
||||||
|
import psycopg2
|
||||||
|
|
||||||
from apache_beam.io.filesystems import FileSystems
|
from apache_beam.io.filesystems import FileSystems
|
||||||
|
|
||||||
|
@ -17,9 +18,8 @@ class ReadFromCsv(beam.DoFn):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def process(self, in_file):
|
def process(self, in_file):
|
||||||
fname = in_file.get()
|
logging.info("reading from input file: %s", in_file)
|
||||||
logging.info("reading from input file: %s", fname)
|
with FileSystems.open(in_file) as file:
|
||||||
with FileSystems.open(fname) as file:
|
|
||||||
text_wrapper = io.TextIOWrapper(file)
|
text_wrapper = io.TextIOWrapper(file)
|
||||||
reader = csv.reader(text_wrapper)
|
reader = csv.reader(text_wrapper)
|
||||||
try:
|
try:
|
||||||
|
@ -28,3 +28,39 @@ class ReadFromCsv(beam.DoFn):
|
||||||
return
|
return
|
||||||
for row in reader:
|
for row in reader:
|
||||||
yield dict(zip(header, row))
|
yield dict(zip(header, row))
|
||||||
|
|
||||||
|
|
||||||
|
class WriteToPostgreSQL(beam.DoFn):
|
||||||
|
"""DoFn to write elements to a PostgreSQL database"""
|
||||||
|
|
||||||
|
def __init__(self, hostname, port, username, password, database, table):
|
||||||
|
self.hostname = hostname
|
||||||
|
self.port = port
|
||||||
|
self.username = username
|
||||||
|
self.password = password
|
||||||
|
self.database = database
|
||||||
|
self.table = table
|
||||||
|
|
||||||
|
def setup(self):
|
||||||
|
self.connection = psycopg2.connect(
|
||||||
|
host=self.hostname,
|
||||||
|
port=self.port,
|
||||||
|
user=self.username,
|
||||||
|
password=self.password,
|
||||||
|
database=self.database,
|
||||||
|
)
|
||||||
|
|
||||||
|
def process(self, element):
|
||||||
|
cursor = self.connection.cursor()
|
||||||
|
colnames = ",".join(element.keys())
|
||||||
|
values = ",".join(["%s"] * len(element))
|
||||||
|
sql = f"""
|
||||||
|
INSERT INTO { self.table } ({ colnames })
|
||||||
|
VALUES ({ values })
|
||||||
|
"""
|
||||||
|
cursor.execute(sql, list(element.values()))
|
||||||
|
self.connection.commit()
|
||||||
|
cursor.close()
|
||||||
|
|
||||||
|
def teardown(self):
|
||||||
|
self.connection.close()
|
||||||
|
|
|
@ -14,8 +14,8 @@ from helpers.weight import parse_weight, dimensional_weight
|
||||||
class CleanRow(TypedDict):
|
class CleanRow(TypedDict):
|
||||||
"""Type to represent clean rows to be inserted in the database"""
|
"""Type to represent clean rows to be inserted in the database"""
|
||||||
|
|
||||||
gtin13: int
|
gtin13: str
|
||||||
tcin: int
|
tcin: str
|
||||||
primary_category: str
|
primary_category: str
|
||||||
materials: Optional[List[str]]
|
materials: Optional[List[str]]
|
||||||
packaging: int
|
packaging: int
|
||||||
|
@ -37,12 +37,6 @@ def parse_row(element: Dict[str, str]) -> Optional[CleanRow]:
|
||||||
logging.error("gtin13 missing")
|
logging.error("gtin13 missing")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
try:
|
|
||||||
gtin13 = int(gtin13.strip())
|
|
||||||
except ValueError:
|
|
||||||
logging.error("malformed GTIN13")
|
|
||||||
return None
|
|
||||||
|
|
||||||
# primary category should always be there
|
# primary category should always be there
|
||||||
try:
|
try:
|
||||||
primary_category = element["primary_category"]
|
primary_category = element["primary_category"]
|
||||||
|
@ -55,19 +49,13 @@ def parse_row(element: Dict[str, str]) -> Optional[CleanRow]:
|
||||||
logging.error("could not parse raw_specifications")
|
logging.error("could not parse raw_specifications")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# TCIN should be a mandatory field in the from of an int
|
# TCIN should be a mandatory field
|
||||||
try:
|
try:
|
||||||
tcin_value = specifications["tcin"]
|
tcin = specifications["tcin"]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
logging.error("TCIN missing")
|
logging.error("TCIN missing")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
try:
|
|
||||||
tcin = int(tcin_value.strip())
|
|
||||||
except ValueError:
|
|
||||||
logging.error("malformed TCIN")
|
|
||||||
return None
|
|
||||||
|
|
||||||
materials = parse_materials(specifications.get("materials"))
|
materials = parse_materials(specifications.get("materials"))
|
||||||
|
|
||||||
# if packaging is not specified, assume only one unit is found in the
|
# if packaging is not specified, assume only one unit is found in the
|
||||||
|
|
|
@ -1,43 +1,35 @@
|
||||||
#!/usr/bin/env python
|
#!/usr/bin/env python
|
||||||
|
|
||||||
import io
|
|
||||||
import logging
|
import logging
|
||||||
import csv
|
|
||||||
|
|
||||||
import apache_beam as beam
|
import apache_beam as beam
|
||||||
|
|
||||||
from apache_beam.io.filesystems import FileSystems
|
|
||||||
from apache_beam.options.pipeline_options import PipelineOptions
|
from apache_beam.options.pipeline_options import PipelineOptions
|
||||||
|
|
||||||
|
from helpers.data_io import ReadFromCsv, WriteToPostgreSQL
|
||||||
|
from helpers.parse_row import parse_row
|
||||||
|
|
||||||
|
|
||||||
|
# def __init__(self, hostname, port, username, password, database):
|
||||||
|
|
||||||
|
|
||||||
class SustainabilityScoreOptions(PipelineOptions):
|
class SustainabilityScoreOptions(PipelineOptions):
|
||||||
"""Options for this pipeline"""
|
"""Options for this pipeline"""
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _add_argparse_args(cls, parser):
|
def _add_argparse_args(cls, parser):
|
||||||
parser.add_value_provider_argument(
|
parser.add_argument("--input", help="Input CSV file to process", type=str)
|
||||||
"--input", help="Input CSV file to process", type=str
|
parser.add_argument("--pg_hostname", help="Postgres hostname", type=str)
|
||||||
)
|
parser.add_argument("--pg_port", help="Postgres port", type=str)
|
||||||
parser.add_value_provider_argument(
|
parser.add_argument("--pg_username", help="Postgres username", type=str)
|
||||||
"--output", help="Destination destination table", type=str
|
parser.add_argument("--pg_password", help="Postgres password", type=str)
|
||||||
)
|
parser.add_argument("--pg_database", help="Postgres database name", type=str)
|
||||||
|
parser.add_argument("--pg_table", help="Postgres table name", type=str)
|
||||||
|
|
||||||
class ReadFromCsv(beam.DoFn):
|
|
||||||
"""This custom DoFn will read from a CSV file and yield each row as a
|
|
||||||
dictionary where the row names are the keys and the cells are the values
|
|
||||||
"""
|
|
||||||
|
|
||||||
def process(self, in_file):
|
|
||||||
with FileSystems.open(in_file.get()) as file:
|
|
||||||
text_wrapper = io.TextIOWrapper(file)
|
|
||||||
reader = csv.reader(text_wrapper)
|
|
||||||
header = next(reader)
|
|
||||||
for row in reader:
|
|
||||||
yield dict(zip(header, row))
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
|
"""Construct and run the pipeline"""
|
||||||
|
|
||||||
beam_options = PipelineOptions()
|
beam_options = PipelineOptions()
|
||||||
opts = beam_options.view_as(SustainabilityScoreOptions)
|
opts = beam_options.view_as(SustainabilityScoreOptions)
|
||||||
|
|
||||||
|
@ -45,7 +37,16 @@ def main():
|
||||||
# fmt: off
|
# fmt: off
|
||||||
pipeline \
|
pipeline \
|
||||||
| beam.Create([opts.input]) \
|
| beam.Create([opts.input]) \
|
||||||
| beam.ParDo(ReadFromCsv())
|
| beam.ParDo(ReadFromCsv()) \
|
||||||
|
| beam.Map(parse_row) \
|
||||||
|
| beam.ParDo(WriteToPostgreSQL(
|
||||||
|
hostname=opts.pg_hostname,
|
||||||
|
port=opts.pg_port,
|
||||||
|
username=opts.pg_username,
|
||||||
|
password=opts.pg_password,
|
||||||
|
database=opts.pg_database,
|
||||||
|
table=opts.pg_table,
|
||||||
|
))
|
||||||
# fmt: on
|
# fmt: on
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
[project]
|
[project]
|
||||||
name = "beam_etl"
|
name = "beam_etl"
|
||||||
version = "0.1"
|
version = "0.1"
|
||||||
dependencies = ["wheel", "apache-beam[gcp]", "pandas"]
|
dependencies = ["wheel", "apache-beam[gcp]", "pandas", "psycopg2"]
|
||||||
|
|
||||||
[project.optional-dependencies]
|
[project.optional-dependencies]
|
||||||
dev = ["pytest", "pylint", "black"]
|
dev = ["pytest", "pylint", "black"]
|
||||||
|
|
|
@ -177,6 +177,8 @@ protobuf==4.23.3
|
||||||
# grpc-google-iam-v1
|
# grpc-google-iam-v1
|
||||||
# grpcio-status
|
# grpcio-status
|
||||||
# proto-plus
|
# proto-plus
|
||||||
|
psycopg2==2.9.6
|
||||||
|
# via beam-etl (pyproject.toml)
|
||||||
pyarrow==11.0.0
|
pyarrow==11.0.0
|
||||||
# via apache-beam
|
# via apache-beam
|
||||||
pyasn1==0.5.0
|
pyasn1==0.5.0
|
||||||
|
|
|
@ -22,6 +22,7 @@ CSV_FNAME = (
|
||||||
CONFIG = {
|
CONFIG = {
|
||||||
"input": f"{ HOME }/gcs/data/{ CSV_FNAME }",
|
"input": f"{ HOME }/gcs/data/{ CSV_FNAME }",
|
||||||
"beam_etl_path": "/beam_etl/main.py",
|
"beam_etl_path": "/beam_etl/main.py",
|
||||||
|
"output_table": "sustainability_score.products",
|
||||||
}
|
}
|
||||||
|
|
||||||
with DAG(
|
with DAG(
|
||||||
|
@ -43,7 +44,15 @@ with DAG(
|
||||||
etl_pipeline = BeamRunPythonPipelineOperator(
|
etl_pipeline = BeamRunPythonPipelineOperator(
|
||||||
task_id="beam_etl",
|
task_id="beam_etl",
|
||||||
py_file="{{ params.beam_etl_path }}",
|
py_file="{{ params.beam_etl_path }}",
|
||||||
pipeline_options={"input": "{{ params.input }}"},
|
pipeline_options={
|
||||||
|
"input": "{{ params.input }}",
|
||||||
|
"pg_hostname": "{{ conn.get('pg_db').host }}",
|
||||||
|
"pg_port": "{{ conn.get('pg_db').port }}",
|
||||||
|
"pg_username": "{{ conn.get('pg_db').login }}",
|
||||||
|
"pg_password": "{{ conn.get('pg_db').password }}",
|
||||||
|
"pg_database": "{{ conn.get('pg_db').schema }}",
|
||||||
|
"pg_table": "{{ params.output_table }}",
|
||||||
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
create_products_table >> etl_pipeline
|
create_products_table >> etl_pipeline
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
CREATE TABLE IF NOT EXISTS sustainability_score.products (
|
CREATE TABLE IF NOT EXISTS {{ params.output_table }} (
|
||||||
gtin13 INT PRIMARY KEY,
|
gtin13 VARCHAR PRIMARY KEY,
|
||||||
tcin INT NOT NULL,
|
tcin VARCHAR NOT NULL,
|
||||||
primary_category VARCHAR NOT NULL,
|
primary_category VARCHAR NOT NULL,
|
||||||
materials VARCHAR[],
|
materials VARCHAR[],
|
||||||
packaging INT NOT NULL,
|
packaging INT NOT NULL,
|
||||||
|
|
Loading…
Reference in New Issue