From 02ad9fab8d9c8aafcab247c9b122a94c4521f102 Mon Sep 17 00:00:00 2001 From: Ricard Illa Date: Sat, 24 Jun 2023 19:07:02 +0200 Subject: [PATCH] refactor: some refactoring of file placements --- .gitignore | 1 + .../sustainability_score/README.md | 0 .../sustainability_score/__init__.py | 2 +- {pipeline/dags => dags}/utils.py | 0 .../docker-compose.yml => docker-compose.yml | 14 +++++- .../beam_etl => etl}/dev-requirements.txt | 6 +-- {pipeline/beam_etl => etl}/helpers/data_io.py | 47 +++++++---------- .../beam_etl => etl}/helpers/dimensions.py | 0 .../beam_etl => etl}/helpers/materials.py | 0 {pipeline/beam_etl => etl}/helpers/misc.py | 0 {pipeline/beam_etl => etl}/helpers/origin.py | 0 .../beam_etl => etl}/helpers/parse_row.py | 0 .../beam_etl => etl}/helpers/parse_xml.py | 0 {pipeline/beam_etl => etl}/helpers/weight.py | 0 {pipeline/beam_etl => etl}/main.py | 0 {pipeline/beam_etl => etl}/pyproject.toml | 0 {pipeline/beam_etl => etl}/requirements.txt | 2 +- .../tests/test_convert_units.py | 0 .../beam_etl => etl}/tests/test_dimensions.py | 0 .../beam_etl => etl}/tests/test_materials.py | 0 .../beam_etl => etl}/tests/test_origin.py | 0 .../beam_etl => etl}/tests/test_parse_xml.py | 0 .../beam_etl => etl}/tests/test_weight.py | 0 exploration/docker-compose.yml | 8 --- .../work => notebooks}/exploration.ipynb | 0 pipeline/beam_etl/Makefile | 50 ------------------- pipeline/beam_etl/justfile | 7 --- .../scripts => scripts}/airflow-entrypoint.sh | 0 {pipeline/scripts => scripts}/airflow-init.sh | 0 .../terraform-entrypoint.sh | 0 {pipeline/sql => sql}/products_schema.sql | 0 {pipeline/terraform => terraform}/main.tf | 0 .../modules/airflow/main.tf | 0 .../modules/airflow/variables.tf | 0 .../modules/postgresql/main.tf | 0 .../modules/postgresql/variables.tf | 0 .../terraform => terraform}/variables.tf | 0 37 files changed, 35 insertions(+), 102 deletions(-) rename {pipeline/dags => dags}/sustainability_score/README.md (100%) rename {pipeline/dags => dags}/sustainability_score/__init__.py (97%) rename {pipeline/dags => dags}/utils.py (100%) rename pipeline/docker-compose.yml => docker-compose.yml (87%) rename {pipeline/beam_etl => etl}/dev-requirements.txt (98%) rename {pipeline/beam_etl => etl}/helpers/data_io.py (60%) rename {pipeline/beam_etl => etl}/helpers/dimensions.py (100%) rename {pipeline/beam_etl => etl}/helpers/materials.py (100%) rename {pipeline/beam_etl => etl}/helpers/misc.py (100%) rename {pipeline/beam_etl => etl}/helpers/origin.py (100%) rename {pipeline/beam_etl => etl}/helpers/parse_row.py (100%) rename {pipeline/beam_etl => etl}/helpers/parse_xml.py (100%) rename {pipeline/beam_etl => etl}/helpers/weight.py (100%) rename {pipeline/beam_etl => etl}/main.py (100%) rename {pipeline/beam_etl => etl}/pyproject.toml (100%) rename {pipeline/beam_etl => etl}/requirements.txt (98%) rename {pipeline/beam_etl => etl}/tests/test_convert_units.py (100%) rename {pipeline/beam_etl => etl}/tests/test_dimensions.py (100%) rename {pipeline/beam_etl => etl}/tests/test_materials.py (100%) rename {pipeline/beam_etl => etl}/tests/test_origin.py (100%) rename {pipeline/beam_etl => etl}/tests/test_parse_xml.py (100%) rename {pipeline/beam_etl => etl}/tests/test_weight.py (100%) delete mode 100644 exploration/docker-compose.yml rename {exploration/work => notebooks}/exploration.ipynb (100%) delete mode 100644 pipeline/beam_etl/Makefile delete mode 100644 pipeline/beam_etl/justfile rename {pipeline/scripts => scripts}/airflow-entrypoint.sh (100%) rename {pipeline/scripts => scripts}/airflow-init.sh (100%) rename {pipeline/scripts => scripts}/terraform-entrypoint.sh (100%) rename {pipeline/sql => sql}/products_schema.sql (100%) rename {pipeline/terraform => terraform}/main.tf (100%) rename {pipeline/terraform => terraform}/modules/airflow/main.tf (100%) rename {pipeline/terraform => terraform}/modules/airflow/variables.tf (100%) rename {pipeline/terraform => terraform}/modules/postgresql/main.tf (100%) rename {pipeline/terraform => terraform}/modules/postgresql/variables.tf (100%) rename {pipeline/terraform => terraform}/variables.tf (100%) diff --git a/.gitignore b/.gitignore index 424293c..42a1d7d 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ pipeline/state .envrc +.direnv venv .installed_deps diff --git a/pipeline/dags/sustainability_score/README.md b/dags/sustainability_score/README.md similarity index 100% rename from pipeline/dags/sustainability_score/README.md rename to dags/sustainability_score/README.md diff --git a/pipeline/dags/sustainability_score/__init__.py b/dags/sustainability_score/__init__.py similarity index 97% rename from pipeline/dags/sustainability_score/__init__.py rename to dags/sustainability_score/__init__.py index d69e639..0d257d6 100644 --- a/pipeline/dags/sustainability_score/__init__.py +++ b/dags/sustainability_score/__init__.py @@ -21,7 +21,7 @@ CSV_FNAME = ( ) CONFIG = { "input": f"{ HOME }/gcs/data/{ CSV_FNAME }", - "beam_etl_path": "/beam_etl/main.py", + "beam_etl_path": "/etl/main.py", "output_table": "sustainability_score.products", } diff --git a/pipeline/dags/utils.py b/dags/utils.py similarity index 100% rename from pipeline/dags/utils.py rename to dags/utils.py diff --git a/pipeline/docker-compose.yml b/docker-compose.yml similarity index 87% rename from pipeline/docker-compose.yml rename to docker-compose.yml index 9dac267..6213266 100644 --- a/pipeline/docker-compose.yml +++ b/docker-compose.yml @@ -10,8 +10,8 @@ x-airflow-common: - ./dags:/home/airflow/airflow/dags - ./scripts/airflow-init.sh:/usr/local/bin/airflow-init.sh:ro - ./scripts/airflow-entrypoint.sh:/usr/local/bin/airflow-entrypoint.sh:ro - - ../data:/home/airflow/gcs/data:ro - - ./beam_etl:/beam_etl:ro + - ./data:/home/airflow/gcs/data:ro + - ./etl:/etl:ro - ./sql:/sql:ro environment: AIRFLOW__CORE__LOAD_EXAMPLES: 'false' @@ -68,3 +68,13 @@ services: - "TF_VAR_pg_port=5432" - "TF_VAR_pg_password=postgres" - "TF_VAR_pg_username=postgres" + + notebook: + image: jupyter/scipy-notebook + ports: + - 8888:8888 + volumes: + - ./notebooks:/home/jovyan/work + - ./data:/home/jovyan/data:ro + profiles: + - notebooks diff --git a/pipeline/beam_etl/dev-requirements.txt b/etl/dev-requirements.txt similarity index 98% rename from pipeline/beam_etl/dev-requirements.txt rename to etl/dev-requirements.txt index dc6f2bc..40af729 100644 --- a/pipeline/beam_etl/dev-requirements.txt +++ b/etl/dev-requirements.txt @@ -1,5 +1,5 @@ # -# This file is autogenerated by pip-compile with Python 3.8 +# This file is autogenerated by pip-compile with Python 3.10 # by the following command: # # pip-compile --extra=dev --output-file=dev-requirements.txt --resolver=backtracking pyproject.toml @@ -208,6 +208,8 @@ protobuf==4.23.3 # grpc-google-iam-v1 # grpcio-status # proto-plus +psycopg2==2.9.6 + # via beam-etl (pyproject.toml) pyarrow==11.0.0 # via apache-beam pyasn1==0.5.0 @@ -273,8 +275,6 @@ typing-extensions==4.6.3 # via # apache-beam # astroid - # black - # pylint tzdata==2023.3 # via pandas urllib3==1.26.16 diff --git a/pipeline/beam_etl/helpers/data_io.py b/etl/helpers/data_io.py similarity index 60% rename from pipeline/beam_etl/helpers/data_io.py rename to etl/helpers/data_io.py index 482de3d..1ce62c9 100644 --- a/pipeline/beam_etl/helpers/data_io.py +++ b/etl/helpers/data_io.py @@ -36,46 +36,33 @@ class WriteToPostgreSQL(beam.DoFn): def __init__( self, hostname, port, username, password, database, table, table_key=None ): - self.hostname = hostname - self.port = port - self.username = username - self.password = password - self.database = database + self.connection_details = { + "host": hostname, + "port": port, + "user": username, + "password": password, + "database": database, + } self.table = table self.table_key = table_key def setup(self): - self.connection = psycopg2.connect( - host=self.hostname, - port=self.port, - user=self.username, - password=self.password, - database=self.database, - ) + self.connection = psycopg2.connect(**self.connection_details, autocommit=True) - def process(self, element): - cursor = self.connection.cursor() - colnames = ",".join(element.keys()) - values = ",".join(["%s"] * len(element)) + def execute_insert(self, row, cursor): + colnames = ",".join(row.keys()) + values = ",".join(["%s"] * len(row)) sql = f""" INSERT INTO { self.table } ({ colnames }) VALUES ({ values }) """ if self.table_key is not None: - update_statement = ",".join( - f"{ col } = EXCLUDED.{ col }" - for col in element.keys() - if col != self.table_key - ) - sql = ( - sql - + f""" - ON CONFLICT ({ self.table_key }) DO UPDATE SET - { update_statement } - """ - ) - cursor.execute(sql, list(element.values())) - self.connection.commit() + sql = sql + f" ON CONFLICT ({ self.table_key }) DO NOTHING" + cursor.execute(sql, list(row.values())) + + def process(self, element): + cursor = self.connection.cursor() + self.execute_insert(element, cursor) cursor.close() def teardown(self): diff --git a/pipeline/beam_etl/helpers/dimensions.py b/etl/helpers/dimensions.py similarity index 100% rename from pipeline/beam_etl/helpers/dimensions.py rename to etl/helpers/dimensions.py diff --git a/pipeline/beam_etl/helpers/materials.py b/etl/helpers/materials.py similarity index 100% rename from pipeline/beam_etl/helpers/materials.py rename to etl/helpers/materials.py diff --git a/pipeline/beam_etl/helpers/misc.py b/etl/helpers/misc.py similarity index 100% rename from pipeline/beam_etl/helpers/misc.py rename to etl/helpers/misc.py diff --git a/pipeline/beam_etl/helpers/origin.py b/etl/helpers/origin.py similarity index 100% rename from pipeline/beam_etl/helpers/origin.py rename to etl/helpers/origin.py diff --git a/pipeline/beam_etl/helpers/parse_row.py b/etl/helpers/parse_row.py similarity index 100% rename from pipeline/beam_etl/helpers/parse_row.py rename to etl/helpers/parse_row.py diff --git a/pipeline/beam_etl/helpers/parse_xml.py b/etl/helpers/parse_xml.py similarity index 100% rename from pipeline/beam_etl/helpers/parse_xml.py rename to etl/helpers/parse_xml.py diff --git a/pipeline/beam_etl/helpers/weight.py b/etl/helpers/weight.py similarity index 100% rename from pipeline/beam_etl/helpers/weight.py rename to etl/helpers/weight.py diff --git a/pipeline/beam_etl/main.py b/etl/main.py similarity index 100% rename from pipeline/beam_etl/main.py rename to etl/main.py diff --git a/pipeline/beam_etl/pyproject.toml b/etl/pyproject.toml similarity index 100% rename from pipeline/beam_etl/pyproject.toml rename to etl/pyproject.toml diff --git a/pipeline/beam_etl/requirements.txt b/etl/requirements.txt similarity index 98% rename from pipeline/beam_etl/requirements.txt rename to etl/requirements.txt index e5a0b96..b189df7 100644 --- a/pipeline/beam_etl/requirements.txt +++ b/etl/requirements.txt @@ -1,5 +1,5 @@ # -# This file is autogenerated by pip-compile with Python 3.8 +# This file is autogenerated by pip-compile with Python 3.10 # by the following command: # # pip-compile --output-file=requirements.txt --resolver=backtracking pyproject.toml diff --git a/pipeline/beam_etl/tests/test_convert_units.py b/etl/tests/test_convert_units.py similarity index 100% rename from pipeline/beam_etl/tests/test_convert_units.py rename to etl/tests/test_convert_units.py diff --git a/pipeline/beam_etl/tests/test_dimensions.py b/etl/tests/test_dimensions.py similarity index 100% rename from pipeline/beam_etl/tests/test_dimensions.py rename to etl/tests/test_dimensions.py diff --git a/pipeline/beam_etl/tests/test_materials.py b/etl/tests/test_materials.py similarity index 100% rename from pipeline/beam_etl/tests/test_materials.py rename to etl/tests/test_materials.py diff --git a/pipeline/beam_etl/tests/test_origin.py b/etl/tests/test_origin.py similarity index 100% rename from pipeline/beam_etl/tests/test_origin.py rename to etl/tests/test_origin.py diff --git a/pipeline/beam_etl/tests/test_parse_xml.py b/etl/tests/test_parse_xml.py similarity index 100% rename from pipeline/beam_etl/tests/test_parse_xml.py rename to etl/tests/test_parse_xml.py diff --git a/pipeline/beam_etl/tests/test_weight.py b/etl/tests/test_weight.py similarity index 100% rename from pipeline/beam_etl/tests/test_weight.py rename to etl/tests/test_weight.py diff --git a/exploration/docker-compose.yml b/exploration/docker-compose.yml deleted file mode 100644 index b0973fe..0000000 --- a/exploration/docker-compose.yml +++ /dev/null @@ -1,8 +0,0 @@ -services: - notebook: - image: jupyter/scipy-notebook - ports: - - 8888:8888 - volumes: - - ./work:/home/jovyan/work - - ../data:/home/jovyan/data:ro diff --git a/exploration/work/exploration.ipynb b/notebooks/exploration.ipynb similarity index 100% rename from exploration/work/exploration.ipynb rename to notebooks/exploration.ipynb diff --git a/pipeline/beam_etl/Makefile b/pipeline/beam_etl/Makefile deleted file mode 100644 index d5370a6..0000000 --- a/pipeline/beam_etl/Makefile +++ /dev/null @@ -1,50 +0,0 @@ -# I use this Makefile to automatize setting up the working space for me -# As long as `virtualenv` and `pyenv` are installed, running `make` should set -# up the virtual envrionment with everything needed. - -.PHONY: clean - -PYTHON_VERSION = 3.8 -PYTHON_SUBVERSION = 3.8.12 - -PYENV_VERSIONS = $(HOME)/.pyenv/versions -PYTHON_BIN = $(PYENV_VERSIONS)/$(PYTHON_SUBVERSION)/bin/python$(PYTHON_VERSION) - -VENV = venv -PYTHON_VENV = $(VENV)/bin/python -PIP = $(PYTHON_VENV) -m pip -PIP_COMPILE = venv/bin/pip-compile - -all: .installed_deps - -.installed_deps: requirements.txt dev-requirements.txt $(PYTHON_VENV) - $(PIP) install \ - -r requirements.txt \ - -r dev-requirements.txt - touch $@ - -requirements.txt: pyproject.toml $(PIP_COMPILE) - $(PIP_COMPILE) \ - --resolver=backtracking \ - --output-file $@ \ - $< - -dev-requirements.txt: pyproject.toml $(PIP_COMPILE) - $(PIP_COMPILE) \ - --extra=dev \ - --resolver=backtracking \ - --output-file $@ \ - $< - -$(PIP_COMPILE): $(PYTHON_VENV) - $(PIP) install pip-tools - -$(PYTHON_VENV): $(PYTHON_BIN) - virtualenv --python=$^ $(VENV) - $(PIP) install --upgrade pip - -$(PYTHON_BIN): - pyenv install $(PYTHON_VERSION) - -clean: - rm -rf *.egg-info venv installed_deps diff --git a/pipeline/beam_etl/justfile b/pipeline/beam_etl/justfile deleted file mode 100644 index 2ef45fa..0000000 --- a/pipeline/beam_etl/justfile +++ /dev/null @@ -1,7 +0,0 @@ -input := "../../data/large_target_store_products_dataset_sample - large_target_store_products_dataset_sample.csv" - -run: - python -m main --input "{{ input }}" - -test: - python -m pytest diff --git a/pipeline/scripts/airflow-entrypoint.sh b/scripts/airflow-entrypoint.sh similarity index 100% rename from pipeline/scripts/airflow-entrypoint.sh rename to scripts/airflow-entrypoint.sh diff --git a/pipeline/scripts/airflow-init.sh b/scripts/airflow-init.sh similarity index 100% rename from pipeline/scripts/airflow-init.sh rename to scripts/airflow-init.sh diff --git a/pipeline/scripts/terraform-entrypoint.sh b/scripts/terraform-entrypoint.sh similarity index 100% rename from pipeline/scripts/terraform-entrypoint.sh rename to scripts/terraform-entrypoint.sh diff --git a/pipeline/sql/products_schema.sql b/sql/products_schema.sql similarity index 100% rename from pipeline/sql/products_schema.sql rename to sql/products_schema.sql diff --git a/pipeline/terraform/main.tf b/terraform/main.tf similarity index 100% rename from pipeline/terraform/main.tf rename to terraform/main.tf diff --git a/pipeline/terraform/modules/airflow/main.tf b/terraform/modules/airflow/main.tf similarity index 100% rename from pipeline/terraform/modules/airflow/main.tf rename to terraform/modules/airflow/main.tf diff --git a/pipeline/terraform/modules/airflow/variables.tf b/terraform/modules/airflow/variables.tf similarity index 100% rename from pipeline/terraform/modules/airflow/variables.tf rename to terraform/modules/airflow/variables.tf diff --git a/pipeline/terraform/modules/postgresql/main.tf b/terraform/modules/postgresql/main.tf similarity index 100% rename from pipeline/terraform/modules/postgresql/main.tf rename to terraform/modules/postgresql/main.tf diff --git a/pipeline/terraform/modules/postgresql/variables.tf b/terraform/modules/postgresql/variables.tf similarity index 100% rename from pipeline/terraform/modules/postgresql/variables.tf rename to terraform/modules/postgresql/variables.tf diff --git a/pipeline/terraform/variables.tf b/terraform/variables.tf similarity index 100% rename from pipeline/terraform/variables.tf rename to terraform/variables.tf