Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Implementa pipeline de dump de MongoDB #535

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
cfaddb7
feat: add implementation for MongoDB
gabriel-milan Oct 20, 2023
7601ff6
Merge branch 'master' into feat/dump-mongodb
mergify[bot] Oct 23, 2023
ac619f0
Merge branch 'master' into feat/dump-mongodb
mergify[bot] Oct 23, 2023
f41f01b
Merge branch 'master' into feat/dump-mongodb
mergify[bot] Oct 23, 2023
51e2e7c
Merge branch 'master' into feat/dump-mongodb
mergify[bot] Oct 24, 2023
2157cbb
Merge branch 'master' into feat/dump-mongodb
mergify[bot] Oct 24, 2023
3377012
Merge branch 'master' into feat/dump-mongodb
mergify[bot] Oct 24, 2023
2abda7b
Merge branch 'master' into feat/dump-mongodb
mergify[bot] Oct 26, 2023
407442c
Merge branch 'master' into feat/dump-mongodb
gabriel-milan Oct 26, 2023
dba63be
feat: move to exclusive flow
gabriel-milan Oct 26, 2023
a39d829
Merge branch 'feat/dump-mongodb' of https://github.com/prefeitura-rio…
gabriel-milan Oct 26, 2023
86d71aa
Merge branch 'master' into feat/dump-mongodb
mergify[bot] Oct 27, 2023
0639d18
Merge branch 'master' into feat/dump-mongodb
mergify[bot] Oct 27, 2023
9091c7d
merge
gabriel-milan Nov 23, 2023
366827e
feat: add conn parameters
gabriel-milan Nov 23, 2023
8d9cd38
Merge branch 'feat/dump-mongodb' of https://github.com/prefeitura-rio…
gabriel-milan Nov 23, 2023
6a2e0c4
feat: use conn string instead of params
gabriel-milan Nov 24, 2023
8765385
feat: add little parallelism and logging
gabriel-milan Nov 24, 2023
c79b9b5
fix: parse datetime from string
gabriel-milan Nov 24, 2023
3f2b10d
chore: lint
gabriel-milan Nov 24, 2023
08ceac7
Merge branch 'master' into feat/dump-mongodb
mergify[bot] Nov 24, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pipelines/rj_setur/dump_url_seeketing/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
},
# "caged": {
# "dump_mode": "overwrite",
# "url": "https://docs.google.com/spreadsheets/d/1vVntiz0wmn4l1PzlXBWT3koCjECnIQFDRoA9kaYWnZo\
# "url": "https://docs.google.com/spreadsheets/d/1vVntiz0wmn4l1PzlXBWT3koCjECnIQFDRoA9kaYWnZo\ # noqa
# /edit#gid=861373755",
# "url_type": "google_sheet",
# "gsheets_sheet_name": "CAGED",
Expand Down
1 change: 1 addition & 0 deletions pipelines/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pipelines.utils.dump_datario.flows import *
from pipelines.utils.dump_db.flows import *
from pipelines.utils.dump_earth_engine_asset.flows import *
from pipelines.utils.dump_mongo.flows import *
from pipelines.utils.dump_to_gcs.flows import *
from pipelines.utils.dump_url.flows import *
from pipelines.utils.execute_dbt_model.flows import *
Expand Down
1 change: 1 addition & 0 deletions pipelines/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class constants(Enum): # pylint: disable=c0103
FLOW_DUMP_DB_NAME = "EMD: template - Ingerir tabela de banco SQL"
FLOW_DUMP_DATARIO_NAME = "EMD: template - Ingerir tabela do data.rio"
FLOW_DUMP_EARTH_ENGINE_ASSET_NAME = "EMD: template - Criar asset no Earth Engine"
FLOW_DUMP_MONGODB_NAME = "EMD: template - Ingerir tabela do MongoDB"
FLOW_DUMP_TO_GCS_NAME = "EMD: template - Ingerir tabela zipada para GCS"
FLOW_DUMP_URL_NAME = "EMD: template - Ingerir tabela de URL"
FLOW_PREDICT_NAME = "EMD: template - Faz predição com modelo do MLflow"
Expand Down
2 changes: 1 addition & 1 deletion pipelines/utils/dump_db/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ def dump_upload_batch(
)
log_mod(
msg=(
f"MODE OVERWRITE: Sucessfully REMOVED HEADER DATA from Storage\n:"
"MODE OVERWRITE: Sucessfully REMOVED HEADER DATA from Storage\n:"
+ f"{storage_path}\n"
+ f"{storage_path_link}"
),
Expand Down
Empty file.
209 changes: 209 additions & 0 deletions pipelines/utils/dump_mongo/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
# -*- coding: utf-8 -*-
"""
Database dumping flows
"""

from datetime import timedelta
from uuid import uuid4

from prefect import Parameter, case
from prefect.executors import LocalDaskExecutor
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

from pipelines.constants import constants
from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.dump_db.constants import constants as dump_db_constants
from pipelines.utils.dump_mongo.tasks import (
database_get,
dump_batches_to_file,
)
from pipelines.utils.tasks import (
get_connection_string,
get_current_flow_labels,
greater_than,
rename_current_flow_run_dataset_table,
create_table_and_upload_to_gcs,
)
from pipelines.utils.decorators import Flow
from pipelines.utils.dump_to_gcs.constants import constants as dump_to_gcs_constants

with Flow(
name=utils_constants.FLOW_DUMP_MONGODB_NAME.value,
code_owners=[
"gabriel",
],
) as utils__dump_mongo_flow:
#####################################
#
# Parameters
#
#####################################

# DBMS
db_host = Parameter("db_host")
db_port = Parameter("db_port")
db_database = Parameter("db_database")
db_collection = Parameter("db_collection")
db_connection_string_secret_path = Parameter("db_connection_string_secret_path")

# Filtering and partitioning
date_field = Parameter("date_field", required=False)
date_format = Parameter("date_format", default="%Y-%m-%d", required=False)
date_lower_bound = Parameter("date_lower_bound", required=False)

# Dumping to files
dump_batch_size = Parameter("dump_batch_size", default=50000, required=False)

# Uploading to BigQuery
bq_dataset_id = Parameter("bq_dataset_id")
bq_table_id = Parameter("bq_table_id")
bq_upload_mode = Parameter("bq_upload_mode", default="append", required=False)
bq_biglake_table = Parameter("bq_biglake_table", default=False, required=False)
bq_batch_data_type = Parameter(
"bq_batch_data_type", default="csv", required=False
) # csv or parquet

# Materialization
materialize_after_dump = Parameter(
"materialize_after_dump", default=False, required=False
)
materialize_mode = Parameter(
"materialize_mode", default="dev", required=False
) # dev or prod
materialize_to_datario = Parameter(
"materialize_to_datario", default=False, required=False
)
materialize_dbt_model_secret_parameters = Parameter(
"materialize_dbt_model_secret_parameters", default={}, required=False
)
materialize_dbt_alias = Parameter(
"materialize_dbt_alias", default=False, required=False
)

# Dumping to GCS
gcs_dump = Parameter("gcs_dump", default=False, required=False)
gcs_maximum_bytes_processed = Parameter(
"gcs_maximum_bytes_processed",
required=False,
default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value,
)

#####################################
#
# Rename flow run
#
#####################################
rename_flow_run = rename_current_flow_run_dataset_table(
prefix="Dump: ", dataset_id=bq_dataset_id, table_id=bq_table_id
)

#####################################
#
# Tasks section #0 - Get credentials
#
#####################################

# Get credentials from Vault
connection_string = get_connection_string(
secret_path=db_connection_string_secret_path
)

#####################################
#
# Tasks section #1 - Create table
#
#####################################

# Get current flow labels
current_flow_labels = get_current_flow_labels()

# Execute query on SQL Server
db_object = database_get(
connection_string=connection_string,
database=db_database,
collection=db_collection,
)

# Dump batches to files
batches_path, num_batches = dump_batches_to_file(
database=db_object,
batch_size=dump_batch_size,
prepath=f"data/{uuid4()}/",
date_field=date_field,
date_lower_bound=date_lower_bound,
date_format=date_format,
batch_data_type=bq_batch_data_type,
)

data_exists = greater_than(num_batches, 0)

with case(data_exists, True):
upload_table = create_table_and_upload_to_gcs(
data_path=batches_path,
dataset_id=bq_dataset_id,
table_id=bq_table_id,
dump_mode=bq_upload_mode,
biglake_table=bq_biglake_table,
)

with case(materialize_after_dump, True):
# Trigger DBT flow run
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"dataset_id": bq_dataset_id,
"table_id": bq_table_id,
"mode": materialize_mode,
"materialize_to_datario": materialize_to_datario,
"dbt_model_secret_parameters": materialize_dbt_model_secret_parameters,
"dbt_alias": materialize_dbt_alias,
},
labels=current_flow_labels,
run_name=f"Materialize {bq_dataset_id}.{bq_table_id}",
)
materialization_flow.set_upstream(upload_table)
wait_for_materialization = wait_for_flow_run(
materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)
wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

with case(gcs_dump, True):
# Trigger Dump to GCS flow run with project id as datario
dump_to_gcs_flow = create_flow_run(
flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"project_id": "datario",
"dataset_id": bq_dataset_id,
"table_id": bq_table_id,
"maximum_bytes_processed": gcs_maximum_bytes_processed,
},
labels=[
"datario",
],
run_name=f"Dump to GCS {bq_dataset_id}.{bq_table_id}",
)
dump_to_gcs_flow.set_upstream(wait_for_materialization)

wait_for_dump_to_gcs = wait_for_flow_run(
dump_to_gcs_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)


utils__dump_mongo_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
utils__dump_mongo_flow.executor = LocalDaskExecutor(num_workers=3)
utils__dump_mongo_flow.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
79 changes: 79 additions & 0 deletions pipelines/utils/dump_mongo/mongo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# -*- coding: utf-8 -*-
from datetime import datetime

from pymongo import MongoClient

from pipelines.utils.utils import log


class Mongo:
"""
MongoDB database.
"""

# pylint: disable=too-many-arguments
def __init__(
self,
connection_string: str,
database: str,
collection: str,
) -> None:
"""
Initializes the MongoDB database.

Args:
connection_string (str): MongoDB connection string.
database (str): Database name.
collection (str): Collection name.
"""
self._client = MongoClient(connection_string)
self._db = self._client[database]
self._collection = self._db[collection]
self._cursor = None

def fetch_batch(
self, batch_size: int, date_field: str = None, date_lower_bound: datetime = None
):
query = {}
if date_field and date_lower_bound:
query[date_field] = {"$gt": date_lower_bound}

log(
f"Fetching batch of {batch_size} documents from MongoDB with query: {query}"
)

if self._cursor is None:
self._cursor = self._collection.find(query)

documents = []
for _ in range(batch_size):
try:
documents.append(self._clean_id(self._cursor.next()))
except StopIteration:
break
return documents

def fetch_all(self, date_field: str = None, date_lower_bound: datetime = None):
query = {}
if date_field and date_lower_bound:
query[date_field] = {"$gt": date_lower_bound}

log(f"Fetching all documents from MongoDB with query: {query}")

if self._cursor is None:
self._cursor = self._collection.find(query)
else:
self._cursor = self._cursor.clone()
documents = [self._clean_id(doc) for doc in list(self._cursor)]
return documents

def get_fields(self):
sample_doc = self._collection.find_one()
if sample_doc:
return list(sample_doc.keys())
else:
return []

def _clean_id(self, document):
document["_id"] = str(document["_id"])
return document
Loading
Loading