Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Publica dados privados no data.rio para visualização #382

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions .github/workflows/update_datario_template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
name: Update data.rio description template

on:
push:
branches:
- master
paths:
- ".github/workflows/update_datario_template.yaml"
- "templates/**/*"
pull_request:
branches:
- master
paths:
- ".github/workflows/update_datario_template.yaml"
- "templates/**/*"

env:
GCP_PROJECT_ID: ${{ secrets.GCP_PROJECT_ID }}
GCP_SA_KEY: ${{ secrets.GCP_SA_KEY }}
GH_PAT: ${{ secrets.GH_PAT }}
GKE_CLUSTER: ${{ secrets.GKE_CLUSTER_NAME }}
GKE_ZONE: ${{ secrets.GKE_ZONE }}

jobs:
build-container:
name: Update data.rio description template
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2

- name: Setup Google Cloud CLI
uses: google-github-actions/[email protected]
with:
service_account_key: ${{ secrets.GCP_SA_KEY }}
project_id: ${{ secrets.GCP_PROJECT_ID }}
export_default_credentials: true

- name: Upload templates folder
id: upload_folder
uses: google-github-actions/upload-cloud-storage@v1
with:
path: templates
destination: datario-public
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ RUN python3 -m pip install --no-cache-dir -U "pip>=21.2.4" "prefect==$PREFECT_VE
WORKDIR /app
COPY . .
COPY --from=curl-step /tmp/GDAL-3.4.1-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.whl /tmp/GDAL-3.4.1-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.whl
RUN python3 -m pip install --prefer-binary --no-cache-dir -U . && \
RUN python3 -m pip install --prefer-binary --no-cache-dir "arcgis==1.9.1" && \
python3 -m pip install --prefer-binary --no-cache-dir -U . && \
mkdir -p /opt/prefect/app/bases && \
mkdir -p /root/.basedosdados/templates && \
mkdir -p /root/.basedosdados/credentials/ && \
Expand Down
17 changes: 17 additions & 0 deletions pipelines/rj_escritorio/data_catalog/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
"""
Constants for the data catalog flow.
"""
from enum import Enum


class constants(Enum): # pylint: disable=invalid-name
"""
Constant values for the data catalog flow
"""

ARCGIS_CREDENTIALS_SECRET_PATH = "arcgis_credentials"
DONT_PUBLISH = ["datario.dados_mestres.bairro", "datario.dados_mestres.logradouro"]
GCS_BUCKET_NAME = "datario-public"
DESCRIPTION_HTML_TEMPLATE_PATH = "templates/datario_description.html.jinja"
DISCORD_WEBHOOK_SECRET_PATH = "missing_metadata_webhook"
14 changes: 4 additions & 10 deletions pipelines/rj_escritorio/data_catalog/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
from pipelines.constants import constants
from pipelines.rj_escritorio.data_catalog.schedules import update_data_catalog_schedule
from pipelines.rj_escritorio.data_catalog.tasks import (
generate_dataframe_from_list_of_tables,
fetch_metadata,
list_tables,
merge_list_of_list_of_tables,
update_gsheets_data_catalog,
update_datario_catalog,
)
from pipelines.rj_escritorio.notify_flooding.tasks import (
parse_comma_separated_string_to_list,
Expand All @@ -30,8 +30,6 @@

# Parameters
project_ids = Parameter("project_ids")
spreadsheet_url = Parameter("spreadsheet_url")
sheet_name = Parameter("sheet_name")
bq_client_mode = Parameter("bq_client_mode", default="prod")
exclude_staging = Parameter("exclude_staging", default=True)
exclude_test = Parameter("exclude_test", default=True)
Expand All @@ -49,12 +47,8 @@
exclude_logs=unmapped(exclude_logs),
)
list_of_tables = merge_list_of_list_of_tables(list_of_list_of_tables)
dataframe = generate_dataframe_from_list_of_tables(list_of_tables)
update_gsheets_data_catalog(
dataframe=dataframe,
spreadsheet_url=spreadsheet_url,
sheet_name=sheet_name,
)
list_of_metadata = fetch_metadata(list_of_tables=list_of_tables)
update_datario_catalog(list_of_metadata=list_of_metadata)


rj_escritorio_data_catalog_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
Expand Down
145 changes: 93 additions & 52 deletions pipelines/rj_escritorio/data_catalog/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,22 @@
"""
Tasks for generating a data catalog from BigQuery.
"""
from typing import List

from arcgis.gis import Item
from google.cloud import bigquery
import gspread
import pandas as pd
from prefect import task

from pipelines.rj_escritorio.data_catalog.constants import constants
from pipelines.rj_escritorio.data_catalog.utils import (
build_items_data_from_metadata_json,
create_or_update_item,
fetch_api_metadata,
get_bigquery_client,
write_data_to_gsheets,
get_directory,
get_all_items,
)
from pipelines.utils.utils import get_credentials_from_env, log
from pipelines.utils.utils import log


@task
Expand Down Expand Up @@ -99,65 +105,100 @@ def merge_list_of_list_of_tables(list_of_list_of_tables: list) -> list:


@task
def generate_dataframe_from_list_of_tables(list_of_tables: list) -> pd.DataFrame:
def fetch_metadata(list_of_tables: list) -> list:
"""
Generate a Pandas DataFrame from a list of tables.
For each table in the list, fetches metadata from the metadata API.

Args:
list_of_tables: List of tables.

Returns:
Pandas DataFrame.
List of tables with metadata. Each table is a dictionary in the format:
{
"project_id": "project_id",
"dataset_id": "dataset_id",
"table_id": "table_id",
"url": "https://console.cloud.google.com/bigquery?p={project_id}&d={dataset_id}&t={table_id}&page=table",
"private": True/False,
"metadata": {
"title": "Title",
"short_description": "Short description",
"long_description": "Long description",
"update_frequency": "Update frequency",
"temporal_coverage": "Temporal coverage",
"data_owner": "Data owner",
"publisher_name": "Publisher name",
"publisher_email": "Publisher email",
"tags": ["Tag1", "Tag2"],
"categories": ["Category1", "Category2"],
"columns": [
{
"name": "column_name",
"description": "Column description",
}
]
}
}
"""
dataframe = pd.DataFrame(list_of_tables)
log(f"Generated DataFrame with shape {dataframe.shape}.")
return dataframe
log(f"Fetching metadata for {len(list_of_tables)} tables.")
remove_tables = []
for table in list_of_tables:
project_id = table["project_id"]
dataset_id = table["dataset_id"]
table_id = table["table_id"]
try:
table["metadata"] = fetch_api_metadata(
project_id=project_id, dataset_id=dataset_id, table_id=table_id
)
except: # pylint: disable=bare-except
log(
f"Error fetching metadata for {project_id}.{dataset_id}.{table_id}. Will exclude this table from the catalog." # pylint: disable=line-too-long
)
remove_tables.append(table)
for table in remove_tables:
list_of_tables.remove(table)
log(f"Fetched metadata for {len(list_of_tables)} tables.")
return list_of_tables


@task
def update_gsheets_data_catalog(
dataframe: pd.DataFrame, spreadsheet_url: str, sheet_name: str
) -> None:
def update_datario_catalog(list_of_metadata: list): # pylint: disable=too-many-locals
"""
Update a Google Sheets spreadsheet with a DataFrame.
Update the data.rio catalog with our tables.

Args:
dataframe: Pandas DataFrame.
spreadsheet_url: Google Sheets spreadsheet URL.
sheet_name: Google Sheets sheet name.
list_of_metadata: List of tables with metadata.
"""
# Get gspread client
credentials = get_credentials_from_env(
scopes=[
"https://www.googleapis.com/auth/spreadsheets",
"https://www.googleapis.com/auth/drive",
]
)
gspread_client = gspread.authorize(credentials)
# Open spreadsheet
log(f"Opening Google Sheets spreadsheet {spreadsheet_url} with sheet {sheet_name}.")
sheet = gspread_client.open_by_url(spreadsheet_url)
worksheet = sheet.worksheet(sheet_name)
# Update spreadsheet
log("Deleting old data.")
worksheet.clear()
log("Rewriting headers.")
write_data_to_gsheets(
worksheet=worksheet,
data=[dataframe.columns.tolist()],
)
log("Updating new data.")
write_data_to_gsheets(
worksheet=worksheet,
data=dataframe.values.tolist(),
start_cell="A2",
)
# Add filters
log("Adding filters.")
first_col = "A"
last_col = chr(ord(first_col) + len(dataframe.columns) - 1)
worksheet.set_basic_filter(f"{first_col}:{last_col}")
# Resize columns
log("Resizing columns.")
worksheet.columns_auto_resize(0, len(dataframe.columns) - 1)
log("Done.")
log(f"Updating data.rio catalog with {len(list_of_metadata)} tables.")
updated_items = []
duplicates_list = constants.DONT_PUBLISH.value
(
items_data,
categories,
project_ids,
dataset_ids,
table_ids,
) = build_items_data_from_metadata_json(metadata=list_of_metadata)
for item_data, item_categories, project_id, dataset_id, table_id in zip(
items_data, categories, project_ids, dataset_ids, table_ids
):
log(f"Updating table `{project_id}.{dataset_id}.{table_id}`")
item: Item = create_or_update_item(
project_id=project_id,
dataset_id=dataset_id,
table_id=table_id,
data=item_data,
)
log(f"Created/updated item: ID={item.id}, Title={item.title}")
item.share(everyone=True, org=True, groups=item_categories)
log(f"Shared item: ID={item.id} with groups: {item_categories}")
move_dir = get_directory(project_id, dataset_id, table_id, duplicates_list)
item.move(move_dir)
log(f"Moved item: ID={item.id} to {move_dir}/ directory")
updated_items.append(item.id)
all_items: List[Item] = get_all_items()
not_updated_items = [item for item in all_items if item.id not in updated_items]
log(f"Deleting {len(not_updated_items)} items.")
for item in not_updated_items:
item.delete()
log(f"Deleted item: ID={item.id}")
Loading