diff --git a/pipeline/beam_etl/helpers/parse_row.py b/pipeline/beam_etl/helpers/parse_row.py new file mode 100644 index 0000000..b597f6b --- /dev/null +++ b/pipeline/beam_etl/helpers/parse_row.py @@ -0,0 +1,104 @@ +"""Helper function to parse rows into cleaner data that can be inserted into +the destination database""" + +import logging +from typing import TypedDict, Dict, Optional, List + +from helpers.parse_xml import parse_raw_specs +from helpers.materials import parse_materials +from helpers.origin import clean_origin_name +from helpers.dimensions import parse_dimensions +from helpers.weight import parse_weight, dimensional_weight + + +class CleanRow(TypedDict): + """Type to represent clean rows to be inserted in the database""" + + gtin13: int + tcin: int + primary_category: str + materials: Optional[List[str]] + packaging: int + origin: str + height: Optional[float] + depth: Optional[float] + width: Optional[float] + weight: Optional[float] + + +def parse_row(element: Dict[str, str]) -> Optional[CleanRow]: + """Parse a dictionary representing a row for the CSV input into a cleaner + dictionary representing a row to be inserted in the database""" + + # gtin13 should always be there + try: + gtin13 = element["gtin13"] + except KeyError: + logging.error("gtin13 missing") + return None + + try: + gtin13 = int(gtin13.strip()) + except ValueError: + logging.error("malformed GTIN13") + return None + + # primary category should always be there + try: + primary_category = element["primary_category"] + except KeyError: + logging.error("primary_category missing") + return None + + specifications = parse_raw_specs(element["raw_specifications"]) + if specifications is None: + logging.error("could not parse raw_specifications") + return None + + # TCIN should be a mandatory field in the from of an int + try: + tcin_value = specifications["tcin"] + except KeyError: + logging.error("TCIN missing") + return None + + try: + tcin = int(tcin_value.strip()) + except ValueError: + logging.error("malformed TCIN") + return None + + materials = parse_materials(specifications.get("materials")) + + # if packaging is not specified, assume only one unit is found in the + # package + packaging = specifications.get("packaging", 1) + try: + packaging = int(packaging) + except ValueError: + logging.error("could not cast packaging %s into an integer") + packaging = 1 + + origin = clean_origin_name(specifications.get("origin")) + + dimensions = parse_dimensions(specifications.get("dimensions")) + height = dimensions["height"] + width = dimensions["width"] + depth = dimensions["depth"] + + weight = parse_weight(specifications.get("weight")) + if weight is None: + weight = dimensional_weight(height=height, width=weight, depth=depth) + + return { + "gtin13": gtin13, + "tcin": tcin, + "primary_category": primary_category, + "materials": materials, + "packaging": packaging, + "origin": origin, + "height": height, + "width": width, + "depth": depth, + "weight": weight, + }