added skeleton for beam etl pipeline

main
Ricard Illa 2023-06-21 19:11:17 +02:00
parent b5ab3ca687
commit 02eaa4b7ff
7 changed files with 652 additions and 0 deletions

7
.gitignore vendored
View File

@ -1,2 +1,9 @@
.ipynb_checkpoints
pipeline/state
.envrc
venv
.installed_deps
*.egg-info
__pycache__

View File

@ -0,0 +1,50 @@
# I use this Makefile to automatize setting up the working space for me
# As long as `virtualenv` and `pyenv` are installed, running `make` should set
# up the virtual envrionment with everything needed.
.PHONY: clean
PYTHON_VERSION = 3.8
PYTHON_SUBVERSION = 3.8.12
PYENV_VERSIONS = $(HOME)/.pyenv/versions
PYTHON_BIN = $(PYENV_VERSIONS)/$(PYTHON_SUBVERSION)/bin/python$(PYTHON_VERSION)
VENV = venv
PYTHON_VENV = $(VENV)/bin/python
PIP = $(PYTHON_VENV) -m pip
PIP_COMPILE = venv/bin/pip-compile
all: .installed_deps
.installed_deps: requirements.txt dev-requirements.txt $(PYTHON_VENV)
$(PIP) install \
-r requirements.txt \
-r dev-requirements.txt
touch $@
requirements.txt: pyproject.toml $(PIP_COMPILE)
$(PIP_COMPILE) \
--resolver=backtracking \
--output-file $@ \
$<
dev-requirements.txt: pyproject.toml $(PIP_COMPILE)
$(PIP_COMPILE) \
--extra=dev \
--resolver=backtracking \
--output-file $@ \
$<
$(PIP_COMPILE): $(PYTHON_VENV)
$(PIP) install pip-tools
$(PYTHON_VENV): $(PYTHON_BIN)
virtualenv --python=$^ $(VENV)
$(PIP) install --upgrade pip
$(PYTHON_BIN):
pyenv install $(PYTHON_VERSION)
clean:
rm -rf *.egg-info venv installed_deps

View File

@ -0,0 +1,289 @@
#
# This file is autogenerated by pip-compile with Python 3.8
# by the following command:
#
# pip-compile --extra=dev --output-file=dev-requirements.txt --resolver=backtracking pyproject.toml
#
apache-beam[gcp]==2.48.0
# via beam-etl (pyproject.toml)
astroid==2.15.5
# via pylint
black==23.3.0
# via beam-etl (pyproject.toml)
cachetools==5.3.1
# via
# apache-beam
# google-auth
certifi==2023.5.7
# via requests
charset-normalizer==3.1.0
# via requests
click==8.1.3
# via black
cloudpickle==2.2.1
# via apache-beam
crcmod==1.7
# via apache-beam
dill==0.3.1.1
# via
# apache-beam
# pylint
dnspython==2.3.0
# via pymongo
docopt==0.6.2
# via hdfs
exceptiongroup==1.1.1
# via pytest
fastavro==1.7.4
# via apache-beam
fasteners==0.18
# via
# apache-beam
# google-apitools
google-api-core[grpc]==2.11.1
# via
# google-cloud-bigquery
# google-cloud-bigquery-storage
# google-cloud-bigtable
# google-cloud-core
# google-cloud-datastore
# google-cloud-dlp
# google-cloud-language
# google-cloud-pubsub
# google-cloud-pubsublite
# google-cloud-recommendations-ai
# google-cloud-spanner
# google-cloud-videointelligence
# google-cloud-vision
google-apitools==0.5.31
# via apache-beam
google-auth==2.20.0
# via
# apache-beam
# google-api-core
# google-auth-httplib2
# google-cloud-core
google-auth-httplib2==0.1.0
# via apache-beam
google-cloud-bigquery==3.11.1
# via apache-beam
google-cloud-bigquery-storage==2.20.0
# via apache-beam
google-cloud-bigtable==2.17.0
# via apache-beam
google-cloud-core==2.3.2
# via
# apache-beam
# google-cloud-bigquery
# google-cloud-bigtable
# google-cloud-datastore
# google-cloud-spanner
google-cloud-datastore==2.15.2
# via apache-beam
google-cloud-dlp==3.12.1
# via apache-beam
google-cloud-language==2.10.0
# via apache-beam
google-cloud-pubsub==2.17.1
# via
# apache-beam
# google-cloud-pubsublite
google-cloud-pubsublite==1.8.2
# via apache-beam
google-cloud-recommendations-ai==0.10.3
# via apache-beam
google-cloud-spanner==3.36.0
# via apache-beam
google-cloud-videointelligence==2.11.2
# via apache-beam
google-cloud-vision==3.4.3
# via apache-beam
google-crc32c==1.5.0
# via google-resumable-media
google-resumable-media==2.5.0
# via google-cloud-bigquery
googleapis-common-protos[grpc]==1.59.1
# via
# google-api-core
# grpc-google-iam-v1
# grpcio-status
grpc-google-iam-v1==0.12.6
# via
# google-cloud-bigtable
# google-cloud-pubsub
# google-cloud-spanner
grpcio==1.54.2
# via
# apache-beam
# google-api-core
# google-cloud-bigquery
# google-cloud-pubsub
# google-cloud-pubsublite
# googleapis-common-protos
# grpc-google-iam-v1
# grpcio-status
grpcio-status==1.54.2
# via
# google-api-core
# google-cloud-pubsub
# google-cloud-pubsublite
hdfs==2.7.0
# via apache-beam
httplib2==0.22.0
# via
# apache-beam
# google-apitools
# google-auth-httplib2
# oauth2client
idna==3.4
# via requests
iniconfig==2.0.0
# via pytest
isort==5.12.0
# via pylint
lazy-object-proxy==1.9.0
# via astroid
mccabe==0.7.0
# via pylint
mypy-extensions==1.0.0
# via black
numpy==1.24.3
# via
# apache-beam
# pandas
# pyarrow
oauth2client==4.1.3
# via google-apitools
objsize==0.6.1
# via apache-beam
orjson==3.9.1
# via apache-beam
overrides==6.5.0
# via google-cloud-pubsublite
packaging==23.1
# via
# black
# google-cloud-bigquery
# pytest
pandas==2.0.2
# via beam-etl (pyproject.toml)
pathspec==0.11.1
# via black
platformdirs==3.7.0
# via
# black
# pylint
pluggy==1.2.0
# via pytest
proto-plus==1.22.2
# via
# apache-beam
# google-cloud-bigquery
# google-cloud-bigquery-storage
# google-cloud-bigtable
# google-cloud-datastore
# google-cloud-dlp
# google-cloud-language
# google-cloud-pubsub
# google-cloud-recommendations-ai
# google-cloud-spanner
# google-cloud-videointelligence
# google-cloud-vision
protobuf==4.23.3
# via
# apache-beam
# google-api-core
# google-cloud-bigquery
# google-cloud-bigquery-storage
# google-cloud-bigtable
# google-cloud-datastore
# google-cloud-dlp
# google-cloud-language
# google-cloud-pubsub
# google-cloud-recommendations-ai
# google-cloud-spanner
# google-cloud-videointelligence
# google-cloud-vision
# googleapis-common-protos
# grpc-google-iam-v1
# grpcio-status
# proto-plus
pyarrow==11.0.0
# via apache-beam
pyasn1==0.5.0
# via
# oauth2client
# pyasn1-modules
# rsa
pyasn1-modules==0.3.0
# via
# google-auth
# oauth2client
pydot==1.4.2
# via apache-beam
pylint==2.17.4
# via beam-etl (pyproject.toml)
pymongo==4.3.3
# via apache-beam
pyparsing==3.1.0
# via
# httplib2
# pydot
pytest==7.3.2
# via beam-etl (pyproject.toml)
python-dateutil==2.8.2
# via
# apache-beam
# google-cloud-bigquery
# pandas
pytz==2023.3
# via
# apache-beam
# pandas
regex==2023.6.3
# via apache-beam
requests==2.31.0
# via
# apache-beam
# google-api-core
# google-cloud-bigquery
# hdfs
rsa==4.9
# via
# google-auth
# oauth2client
six==1.16.0
# via
# google-apitools
# google-auth
# google-auth-httplib2
# hdfs
# oauth2client
# python-dateutil
sqlparse==0.4.4
# via google-cloud-spanner
tomli==2.0.1
# via
# black
# pylint
# pytest
tomlkit==0.11.8
# via pylint
typing-extensions==4.6.3
# via
# apache-beam
# astroid
# black
# pylint
tzdata==2023.3
# via pandas
urllib3==1.26.16
# via
# google-auth
# requests
wheel==0.40.0
# via beam-etl (pyproject.toml)
wrapt==1.15.0
# via astroid
zstandard==0.21.0
# via apache-beam

View File

@ -0,0 +1,4 @@
input := "../../data/large_target_store_products_dataset_sample - large_target_store_products_dataset_sample.csv"
run:
python -m main --input "{{ input }}"

54
pipeline/beam_etl/main.py Normal file
View File

@ -0,0 +1,54 @@
#!/usr/bin/env python
import io
import logging
import csv
import apache_beam as beam
from apache_beam.io.filesystems import FileSystems
from apache_beam.options.pipeline_options import PipelineOptions
class SustainabilityScoreOptions(PipelineOptions):
"""Options for this pipeline"""
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
"--input", help="Input CSV file to process", type=str
)
parser.add_value_provider_argument(
"--output", help="Destination destination table", type=str
)
class ReadFromCsv(beam.DoFn):
"""This custom DoFn will read from a CSV file and yield each row as a
dictionary where the row names are the keys and the cells are the values
"""
def process(self, in_file):
with FileSystems.open(in_file.get()) as file:
text_wrapper = io.TextIOWrapper(file)
reader = csv.reader(text_wrapper)
header = next(reader)
for row in reader:
yield dict(zip(header, row))
def main():
beam_options = PipelineOptions()
opts = beam_options.view_as(SustainabilityScoreOptions)
with beam.Pipeline(options=beam_options) as pipeline:
# fmt: off
pipeline \
| beam.Create([opts.input]) \
| beam.ParDo(ReadFromCsv())
# fmt: on
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
main()

View File

@ -0,0 +1,7 @@
[project]
name = "beam_etl"
version = "0.1"
dependencies = ["wheel", "apache-beam[gcp]", "pandas"]
[project.optional-dependencies]
dev = ["pytest", "pylint", "black"]

View File

@ -0,0 +1,241 @@
#
# This file is autogenerated by pip-compile with Python 3.8
# by the following command:
#
# pip-compile --output-file=requirements.txt --resolver=backtracking pyproject.toml
#
apache-beam[gcp]==2.48.0
# via beam-etl (pyproject.toml)
cachetools==5.3.1
# via
# apache-beam
# google-auth
certifi==2023.5.7
# via requests
charset-normalizer==3.1.0
# via requests
cloudpickle==2.2.1
# via apache-beam
crcmod==1.7
# via apache-beam
dill==0.3.1.1
# via apache-beam
dnspython==2.3.0
# via pymongo
docopt==0.6.2
# via hdfs
fastavro==1.7.4
# via apache-beam
fasteners==0.18
# via
# apache-beam
# google-apitools
google-api-core[grpc]==2.11.1
# via
# google-cloud-bigquery
# google-cloud-bigquery-storage
# google-cloud-bigtable
# google-cloud-core
# google-cloud-datastore
# google-cloud-dlp
# google-cloud-language
# google-cloud-pubsub
# google-cloud-pubsublite
# google-cloud-recommendations-ai
# google-cloud-spanner
# google-cloud-videointelligence
# google-cloud-vision
google-apitools==0.5.31
# via apache-beam
google-auth==2.20.0
# via
# apache-beam
# google-api-core
# google-auth-httplib2
# google-cloud-core
google-auth-httplib2==0.1.0
# via apache-beam
google-cloud-bigquery==3.11.1
# via apache-beam
google-cloud-bigquery-storage==2.20.0
# via apache-beam
google-cloud-bigtable==2.17.0
# via apache-beam
google-cloud-core==2.3.2
# via
# apache-beam
# google-cloud-bigquery
# google-cloud-bigtable
# google-cloud-datastore
# google-cloud-spanner
google-cloud-datastore==2.15.2
# via apache-beam
google-cloud-dlp==3.12.1
# via apache-beam
google-cloud-language==2.10.0
# via apache-beam
google-cloud-pubsub==2.17.1
# via
# apache-beam
# google-cloud-pubsublite
google-cloud-pubsublite==1.8.2
# via apache-beam
google-cloud-recommendations-ai==0.10.3
# via apache-beam
google-cloud-spanner==3.36.0
# via apache-beam
google-cloud-videointelligence==2.11.2
# via apache-beam
google-cloud-vision==3.4.3
# via apache-beam
google-crc32c==1.5.0
# via google-resumable-media
google-resumable-media==2.5.0
# via google-cloud-bigquery
googleapis-common-protos[grpc]==1.59.1
# via
# google-api-core
# grpc-google-iam-v1
# grpcio-status
grpc-google-iam-v1==0.12.6
# via
# google-cloud-bigtable
# google-cloud-pubsub
# google-cloud-spanner
grpcio==1.54.2
# via
# apache-beam
# google-api-core
# google-cloud-bigquery
# google-cloud-pubsub
# google-cloud-pubsublite
# googleapis-common-protos
# grpc-google-iam-v1
# grpcio-status
grpcio-status==1.54.2
# via
# google-api-core
# google-cloud-pubsub
# google-cloud-pubsublite
hdfs==2.7.0
# via apache-beam
httplib2==0.22.0
# via
# apache-beam
# google-apitools
# google-auth-httplib2
# oauth2client
idna==3.4
# via requests
numpy==1.24.3
# via
# apache-beam
# pandas
# pyarrow
oauth2client==4.1.3
# via google-apitools
objsize==0.6.1
# via apache-beam
orjson==3.9.1
# via apache-beam
overrides==6.5.0
# via google-cloud-pubsublite
packaging==23.1
# via google-cloud-bigquery
pandas==2.0.2
# via beam-etl (pyproject.toml)
proto-plus==1.22.2
# via
# apache-beam
# google-cloud-bigquery
# google-cloud-bigquery-storage
# google-cloud-bigtable
# google-cloud-datastore
# google-cloud-dlp
# google-cloud-language
# google-cloud-pubsub
# google-cloud-recommendations-ai
# google-cloud-spanner
# google-cloud-videointelligence
# google-cloud-vision
protobuf==4.23.3
# via
# apache-beam
# google-api-core
# google-cloud-bigquery
# google-cloud-bigquery-storage
# google-cloud-bigtable
# google-cloud-datastore
# google-cloud-dlp
# google-cloud-language
# google-cloud-pubsub
# google-cloud-recommendations-ai
# google-cloud-spanner
# google-cloud-videointelligence
# google-cloud-vision
# googleapis-common-protos
# grpc-google-iam-v1
# grpcio-status
# proto-plus
pyarrow==11.0.0
# via apache-beam
pyasn1==0.5.0
# via
# oauth2client
# pyasn1-modules
# rsa
pyasn1-modules==0.3.0
# via
# google-auth
# oauth2client
pydot==1.4.2
# via apache-beam
pymongo==4.3.3
# via apache-beam
pyparsing==3.1.0
# via
# httplib2
# pydot
python-dateutil==2.8.2
# via
# apache-beam
# google-cloud-bigquery
# pandas
pytz==2023.3
# via
# apache-beam
# pandas
regex==2023.6.3
# via apache-beam
requests==2.31.0
# via
# apache-beam
# google-api-core
# google-cloud-bigquery
# hdfs
rsa==4.9
# via
# google-auth
# oauth2client
six==1.16.0
# via
# google-apitools
# google-auth
# google-auth-httplib2
# hdfs
# oauth2client
# python-dateutil
sqlparse==0.4.4
# via google-cloud-spanner
typing-extensions==4.6.3
# via apache-beam
tzdata==2023.3
# via pandas
urllib3==1.26.16
# via
# google-auth
# requests
wheel==0.40.0
# via beam-etl (pyproject.toml)
zstandard==0.21.0
# via apache-beam