diff --git a/pipelines/rj_sms/__init__.py b/pipelines/rj_sms/__init__.py index ca74ca40a..636724622 100644 --- a/pipelines/rj_sms/__init__.py +++ b/pipelines/rj_sms/__init__.py @@ -9,3 +9,4 @@ from pipelines.rj_sms.dump_api_prontuario_vitai.flows import * from pipelines.rj_sms.dump_api_prontuario_vitacare.flows import * from pipelines.rj_sms.dump_sheets.flows import * +from pipelines.rj_sms.dump_api_regulacao_sisreg.flows import * diff --git a/pipelines/rj_sms/dump_api_prontuario_vitacare/constants.py b/pipelines/rj_sms/dump_api_prontuario_vitacare/constants.py index 4b2476b0b..1188702ba 100644 --- a/pipelines/rj_sms/dump_api_prontuario_vitacare/constants.py +++ b/pipelines/rj_sms/dump_api_prontuario_vitacare/constants.py @@ -29,3 +29,244 @@ class constants(Enum): "posicao": "/reports/pharmacy/stocks", "movimento": "/reports/pharmacy/movements", } + URL_PACIENTES_AGENDADOS = ( + "http://saudedigital.pepvitacare.com:8081/health/schedule/nextappointments" + ) + URL_PACIENTES_ATENDIDOS = ( + "http://saudedigital.pepvitacare.com:8081/health/schedule/lastattendances" + ) + CNES = [ + "9057722", + "5476607", + "7892802", + "0199338", + "5620287", + "7896204", + "3567567", + "5179726", + "9128867", + "9101764", + "3567540", + "6869009", + "6713564", + "6808077", + "3416321", + "7036914", + "3820599", + "9071385", + "6804209", + "6023320", + "7892810", + "6820018", + "6914152", + "9111344", + "6496989", + "7052049", + "7119798", + "0193089", + "9078983", + "6677711", + "7108265", + "5154197", + "6660185", + "9131884", + "9345515", + "6023975", + "9715444", + "6762042", + "6671020", + "7985657", + "6742130", + "5044685", + "0189200", + "6793231", + "9131795", + "2280310", + "6028233", + "9057706", + "7723296", + "7722494", + "6664075", + "2277298", + "6023916", + "7996675", + "6648371", + "6664040", + "6618863", + "7892829", + "6681379", + "6559727", + "7894554", + "7998678", + "6688152", + "6618871", + "9442251", + "2295237", + "6932916", + "6581994", + "6635709", + "7873565", + "9127100", + "6571956", + "6852203", + "9075143", + "7908237", + "6559735", + "6572014", + "0214949", + "7986505", + "6974708", + "7995520", + "7088574", + "6029965", + "6761704", + "6503772", + "5546591", + "9307265", + "6568491", + "6864708", + "6716598", + "9061401", + "9061398", + "9079939", + "9080163", + "9072640", + "7523246", + "9016805", + "6901042", + "9067078", + "9045023", + "6387152", + "6927289", + "6855709", + "6927319", + "0265233", + "7021771", + "3785025", + "6506232", + "6524486", + "9023089", + "6029841", + "9311661", + "3416372", + "6683851", + "7810172", + "6272053", + "4030990", + "6618855", + "6919626", + "6873960", + "7036884", + "5417708", + "3567559", + "3785009", + "9107835", + "6514022", + "2295253", + "9072659", + "7874162", + "3784975", + "2270323", + "6026737", + "6029922", + "2269937", + "2269848", + "5879655", + "2273551", + "2270277", + "2296551", + "6033121", + "4178602", + "2273225", + "2270463", + "2273179", + "3416356", + "6029825", + "2269309", + "2778696", + "2280744", + "2269732", + "5358612", + "5546583", + "2269929", + "2270013", + "2708167", + "2273578", + "6632831", + "2269295", + "2273616", + "2708183", + "2269651", + "2270072", + "2270579", + "2269562", + "2269546", + "2269538", + "2269503", + "2273586", + "2708426", + "2270250", + "5315026", + "2269759", + "2273543", + "2269511", + "4046307", + "2708213", + "2269376", + "7414226", + "2270439", + "5457009", + "6784720", + "2280795", + "3784959", + "2296543", + "2269902", + "2291274", + "9391983", + "2273640", + "2270315", + "2270366", + "6922031", + "2708434", + "2288346", + "2280760", + "2280272", + "2295032", + "2708205", + "2270633", + "2270560", + "2269805", + "2270307", + "2296535", + "2296586", + "2280779", + "7856954", + "6927254", + "2280280", + "2280787", + "5465877", + "2277328", + "2270455", + "5467136", + "2270641", + "2288370", + "5315050", + "3567508", + "2270293", + "2280736", + "7990286", + "2280205", + "6926797", + "2269953", + "5465885", + "6664164", + "2806320", + "2270552", + "2269627", + "5598435", + "3796310", + "5670357", + "5476844", + "2270420", + "2270285", + ] diff --git a/pipelines/rj_sms/dump_api_prontuario_vitacare/flows.py b/pipelines/rj_sms/dump_api_prontuario_vitacare/flows.py index 78ec60291..b870e32bb 100644 --- a/pipelines/rj_sms/dump_api_prontuario_vitacare/flows.py +++ b/pipelines/rj_sms/dump_api_prontuario_vitacare/flows.py @@ -19,16 +19,21 @@ create_partitions, upload_to_datalake, ) + from pipelines.rj_sms.dump_api_prontuario_vitacare.tasks import ( rename_flow, build_url, build_params, create_filename, save_data_to_file, + get_patients, + save_patients, ) from pipelines.rj_sms.dump_api_prontuario_vitacare.schedules import ( vitacare_daily_update_schedule, + vitacare_every_day_at_six_am, + vitacare_every_day_at_seven_am, ) @@ -136,3 +141,59 @@ ) dump_vitacare.schedule = vitacare_daily_update_schedule + +with Flow( + "SMS: Dump VitaCare - Captura dos pacientes agendados" +) as dump_vitacare_scheduled_patients: + # Tasks + result = get_patients(context="scheduled") + save = save_patients(result, context="scheduled") + save.set_upstream(result) + upload_to_datalake_task = upload_to_datalake( + input_path=f"pipelines/rj_sms/dump_api_prontuario_vitacare/data_partition", + dataset_id="brutos_prontuario_vitacare", + table_id="pacientes_agendados_3_dias", + if_exists="replace", + csv_delimiter=";", + if_storage_data_exists="replace", + biglake_table=True, + ) + upload_to_datalake_task.set_upstream(save) + +dump_vitacare_scheduled_patients.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +dump_vitacare_scheduled_patients.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value, + labels=[ + constants.RJ_SMS_DEV_AGENT_LABEL.value, + ], +) + +dump_vitacare_scheduled_patients.schedule = vitacare_every_day_at_six_am + +with Flow( + "SMS: Dump VitaCare - Captura dos pacientes atendidos" +) as dump_vitacare_attended_patients: + # Tasks + result = get_patients(context="attended") + save = save_patients(result, context="attended") + save.set_upstream(result) + upload_to_datalake_task = upload_to_datalake( + input_path=f"pipelines/rj_sms/dump_api_prontuario_vitacare/data_partition", + dataset_id="brutos_prontuario_vitacare", + table_id="paciente_atendido_dia_anterior", + if_exists="replace", + csv_delimiter=";", + if_storage_data_exists="replace", + biglake_table=True, + ) + upload_to_datalake_task.set_upstream(save) + +dump_vitacare_attended_patients.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +dump_vitacare_attended_patients.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value, + labels=[ + constants.RJ_SMS_DEV_AGENT_LABEL.value, + ], +) + +dump_vitacare_attended_patients.schedule = vitacare_every_day_at_seven_am diff --git a/pipelines/rj_sms/dump_api_prontuario_vitacare/run.py b/pipelines/rj_sms/dump_api_prontuario_vitacare/run.py new file mode 100644 index 000000000..3dc034696 --- /dev/null +++ b/pipelines/rj_sms/dump_api_prontuario_vitacare/run.py @@ -0,0 +1,5 @@ +# -*- coding: utf-8 -*- +from flows import dump_vitacare_attended_patients +from pipelines.utils.utils import run_local + +run_local(dump_vitacare_attended_patients) diff --git a/pipelines/rj_sms/dump_api_prontuario_vitacare/schedules.py b/pipelines/rj_sms/dump_api_prontuario_vitacare/schedules.py index 9f194f609..7d1e4a46e 100644 --- a/pipelines/rj_sms/dump_api_prontuario_vitacare/schedules.py +++ b/pipelines/rj_sms/dump_api_prontuario_vitacare/schedules.py @@ -5,9 +5,10 @@ """ from datetime import timedelta, datetime - +from prefect.schedules.clocks import IntervalClock from prefect.schedules import Schedule import pytz +import pendulum from pipelines.constants import constants @@ -56,3 +57,27 @@ ) vitacare_daily_update_schedule = Schedule(clocks=untuple(vitacare_clocks)) + +vitacare_every_day_at_six_am = Schedule( + clocks=[ + IntervalClock( + interval=timedelta(days=1), + start_date=pendulum.datetime(2023, 1, 1, 6, 0, 0, tz="America/Sao_Paulo"), + labels=[ + constants.RJ_SMS_DEV_AGENT_LABEL.value, + ], + ) + ] +) + +vitacare_every_day_at_seven_am = Schedule( + clocks=[ + IntervalClock( + interval=timedelta(days=1), + start_date=pendulum.datetime(2023, 1, 1, 7, 0, 0, tz="America/Sao_Paulo"), + labels=[ + constants.RJ_SMS_DEV_AGENT_LABEL.value, + ], + ) + ] +) diff --git a/pipelines/rj_sms/dump_api_prontuario_vitacare/tasks.py b/pipelines/rj_sms/dump_api_prontuario_vitacare/tasks.py index 134ccceee..1066da0b8 100644 --- a/pipelines/rj_sms/dump_api_prontuario_vitacare/tasks.py +++ b/pipelines/rj_sms/dump_api_prontuario_vitacare/tasks.py @@ -11,7 +11,12 @@ from prefect import task import pandas as pd - +import os +import csv +import shutil +from google.cloud import bigquery +import requests +import json from pipelines.rj_sms.dump_api_prontuario_vitacare.constants import ( constants as vitacare_constants, ) @@ -19,7 +24,13 @@ from pipelines.utils.tasks import ( rename_current_flow_run_dataset_table, ) -from pipelines.rj_sms.tasks import from_json_to_csv, add_load_date_column, save_to_file +from pipelines.rj_sms.tasks import ( + from_json_to_csv, + add_load_date_column, + save_to_file, + create_partitions, + cloud_function_request_patients, +) @task @@ -154,3 +165,250 @@ def save_data_to_file( fix_payload_column_order.run(filepath=csv_file_path, table_id=table_id) return True + + +@task +def get_patients(context): + log("Getting data from cloud function") + list_cnes = vitacare_constants.CNES.value + if context == "scheduled": + url = vitacare_constants.URL_PACIENTES_AGENDADOS.value + data = datetime.today() + timedelta(days=3) + else: + url = vitacare_constants.URL_PACIENTES_ATENDIDOS.value + data = datetime.today() - timedelta(days=1) + data_formatada = data.strftime("%Y-%m-%d") + df = pd.DataFrame() + list_cnes_error = [] + list_cnes_empty = [] + for cnes in list_cnes: + params = '{"cnes": "' + cnes + '", "date": "' + data_formatada + '"}' + response = cloud_function_request_patients.run( + url=url, request_type="POST", body_params=params, env="staging" + ) + if response.text.startswith("A solicitação não foi bem-sucedida"): + list_cnes_error.append(cnes) + else: + try: + df_temp = pd.read_json(response.text) + except: + log(f"Error cnes - {cnes}, Detail: {response.text}", level="error") + if not df_temp.empty: + df = pd.concat([df, df_temp], ignore_index=True) + else: + list_cnes_empty.append(cnes) + log(f"List cnes error {list_cnes_error}", level="error") + log(f"List cnes empty erro {list_cnes_empty}", level="error") + return df + + +@task +def save_patients(dataframe, context): + log("Saving data into the server") + path = "pipelines/rj_sms/dump_api_prontuario_vitacare/data" + try: + if os.path.exists(path): + shutil.rmtree(path, ignore_errors=True) + os.mkdir(path) + else: + os.mkdir(path) + if context == "scheduled": + data_futura = datetime.today() + timedelta(days=3) + else: + data_futura = datetime.today() - timedelta(days=1) + data_formatada = data_futura.strftime("%Y-%m-%d") + filename = ( + f"pipelines/rj_sms/dump_api_prontuario_vitacare/data/{data_formatada}.csv" + ) + dataframe.to_csv( + filename, + sep=";", + quoting=csv.QUOTE_NONNUMERIC, + quotechar='"', + index=False, + encoding="utf-8", + ) + partition_directory = ( + "pipelines/rj_sms/dump_api_prontuario_vitacare/data_partition" + ) + shutil.rmtree(partition_directory, ignore_errors=True) + create_partitions.run( + "pipelines/rj_sms/dump_api_prontuario_vitacare/data", partition_directory + ) + return True + except: + log("Error when trying to save files", level="error") + return False + + +@task +def read_data(cnes=None, date_param=None, table=None): + """ + Read data + Args: + date_param (str, mandatory): The date to query in the format "YYYY-MM-DD". + cnes (str, mandatory): health unit identifier + table (str, mandatory): Name of table on bigQuery + Returns: + str: Data from BigQuery. + """ + client = bigquery.Client() + + query = f""" + SELECT * + FROM `{table}` + WHERE cnes = '{cnes}' AND date = '{date_param}' + """ + + query_job = client.query(query) + results = query_job.result() + + df = pd.DataFrame( + data=[list(row.values()) for row in results], + columns=list(results.schema.field_names), + ) + + return df + + +@task +def remove_opt_out(data=None): + """ + Read table data "opt-out" and remove from data param + Args: + data (str, mandatory): List of scheduled patients. + Returns: + DataFrame: New data after remover opt-out. + """ + client = bigquery.Client() + + query = """ + SELECT DISTINCT cpf + FROM `rj-sms-dev.whatsapp_staging.opt_out` + """ + + # Executando a query e obtendo os CPFs + query_job = client.query(query) + cpf_opt_out = [row["cpf"] for row in query_job.result()] + + data_filtered = data[~data["cpf"].isin(cpf_opt_out)] + return data_filtered + + +@task +def clean_data(data=None): + """ + Remove empty and invalid numbers/ remove minors + Args: + data (str, mandatory): List of scheduled patients. + Returns: + DataFrame: New data after remover opt-out. + """ + # Remover linhas com números de telefone inválidos ou vazios + data = data[data["telefone"].astype(str).str.len() == 11] + + # Converter a coluna 'data_nascimento' para o tipo datetime + data["data_nascimento"] = pd.to_datetime(data["data_nascimento"], errors="coerce") + + # Calcular a idade com base na data de nascimento + data["idade"] = (datetime.now() - data["data_nascimento"]).astype("= 18] + + # Remover coluna 'idade' temporária + data = data.drop(columns=["idade"]) + + +@task +def find_team_number(): + """ + Use the link to search for the team number it belongs to, passing the address as a parameter + https://subpav.org/SAP/includes/ + Args: + data (str, mandatory): List of scheduled patients. + Returns: + DataFrame: New data after remover opt-out. + """ + return None + + +@task +def send_whatsapp(data: None, case: None): + """ + Send message using whatsapp API Wetalkie + Args: + data (str, mandatory): List of scheduled patients. + case (str, mandatory): Tipo de caso a ser execudado. Os valores podem ser: + clinica_familia_scheduled_patients + clinica_familia_patients_treated + sisreg_scheduled_patients + Returns: + DataFrame: New data after remover opt-out. + """ + for patient in data: + if case == "clinica_familia_scheduled_patients": + payload = json.dumps( + [ + { + "phone": patient["phone"], + "nome": patient["name"], + "procedimento": patient["procedimento"], + "data": patient["data"], + "horario": patient["horario"], + "unidade": patient["unidade"], + "endereco": patient["endereco"], + "urlcontato": patient["telefone_unidade"], + } + ] + ) + url = "https://takebroadcast.cs.blip.ai/api/v2/Broadcast/list?phoneColumn=phone&namespace=whatsapp%3Ahsm%3Amessaging%3Ablip&template=poc_sms_wa_72h_antes&flowId=4b96ec20-f0d1-48f9-8138-cd7d133e39ee&stateId=e738eeff-b394-4c29-8def-cef05a44ec40&scheduleTime=30&separator=%2C&checkAttendance=false" + + elif case == "clinica_familia_patients_treated": + payload = json.dumps( + [ + { + "phone": patient["phone"], + "nome": patient["name"], + "data": patient["data"], + "unidade": patient["unidade"], + } + ] + ) + url = "https://takebroadcast.cs.blip.ai/api/v2/Broadcast/list?phoneColumn=phone&namespace=whatsapp%3Ahsm%3Amessaging%3Ablip&template=poc_sms_wa_24h_depois&flowId=4b96ec20-f0d1-48f9-8138-cd7d133e39ee&stateId=b27e0851-0f10-468c-be8b-186f00578058&scheduleTime=60&separator=%2C&checkAttendance=false" + + elif case == "sisreg_scheduled_patients": + payload = json.dumps( + [ + { + "phone": patient["phone"], + "nome": patient["name"], + "procedimento": patient["procedimento"], + "especialidade": patient["especialidade"], + "preparo": patient["preparo"], + "data": patient["data"], + "horario": patient["horario"], + "unidade": patient["unidade"], + "endereco": patient["endereco"], + "urlcontato": patient["telefone_unidade"], + } + ] + ) + url = "https://takebroadcast.cs.blip.ai/api/v2/Broadcast/list?phoneColumn=phone&namespace=whatsapp%3Ahsm%3Amessaging%3Ablip&template=poc_sms_wa_5d_antes&flowId=4b96ec20-f0d1-48f9-8138-cd7d133e39ee&stateId=62af61cc-37b7-4cf9-ae81-ff7297399146&scheduleTime=60&separator=%2C&checkAttendance=false" + + headers = { + "accept": "text/plain", + "identifier": "@user", + "accessKey": "@password", + "Content-Type": "application/json-patch+json", + } + # print(f'Case: {case} - Payload whatsapp: {payload}') + response = requests.request("POST", url, headers=headers, data=payload) + save_log(response.text) + print("Whatsapp Enviado:" + response.text) + + return "Mensagens do whatsapp enviadas" + + +def save_log(): + return None diff --git a/pipelines/rj_sms/dump_api_regulacao_sisreg/flows.py b/pipelines/rj_sms/dump_api_regulacao_sisreg/flows.py new file mode 100644 index 000000000..d13002b02 --- /dev/null +++ b/pipelines/rj_sms/dump_api_regulacao_sisreg/flows.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +""" +WhatsApp SISREG flow definition +""" +from prefect import Flow +from prefect.storage import GCS +from pipelines.constants import constants +from prefect.run_configs import KubernetesRun +from pipelines.rj_sms.tasks import upload_to_datalake +from pipelines.rj_sms.dump_api_regulacao_sisreg.tasks import get_patients, save_patients +from pipelines.rj_sms.dump_api_regulacao_sisreg.schedules import every_day_at_six_am + +with Flow( + "SMS: Dump SISREG - Captura dos pacientes agendados" +) as dump_sisreg_scheduled_patients: + # Tasks + dataframe = get_patients("6688152") + save = save_patients(dataframe) + save.set_upstream(dataframe) + upload_to_datalake_task = upload_to_datalake( + input_path=f"pipelines/rj_sms/dump_api_regulacao_sisreg/data_partition", + dataset_id="brutos_regulacao_sisreg", + table_id="pacientes_agendados_5_dias", + if_exists="replace", + csv_delimiter=";", + if_storage_data_exists="replace", + biglake_table=True, + ) + upload_to_datalake_task.set_upstream(save) + +dump_sisreg_scheduled_patients.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +dump_sisreg_scheduled_patients.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value, + labels=[ + constants.RJ_SMS_DEV_AGENT_LABEL.value, + ], +) + +dump_sisreg_scheduled_patients.schedule = every_day_at_six_am diff --git a/pipelines/rj_sms/dump_api_regulacao_sisreg/run.py b/pipelines/rj_sms/dump_api_regulacao_sisreg/run.py new file mode 100644 index 000000000..3e8289777 --- /dev/null +++ b/pipelines/rj_sms/dump_api_regulacao_sisreg/run.py @@ -0,0 +1,5 @@ +# -*- coding: utf-8 -*- +from flows import dump_sisreg_scheduled_patients +from pipelines.utils.utils import run_local + +run_local(dump_sisreg_scheduled_patients) diff --git a/pipelines/rj_sms/dump_api_regulacao_sisreg/schedules.py b/pipelines/rj_sms/dump_api_regulacao_sisreg/schedules.py new file mode 100644 index 000000000..e2d7beaf9 --- /dev/null +++ b/pipelines/rj_sms/dump_api_regulacao_sisreg/schedules.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- +""" +Schedules for the database dump pipeline +""" + +from datetime import timedelta +import pendulum +from prefect.schedules import Schedule +from prefect.schedules.clocks import IntervalClock +from pipelines.constants import constants + +every_day_at_six_am = Schedule( + clocks=[ + IntervalClock( + interval=timedelta(days=1), + start_date=pendulum.datetime(2023, 1, 1, 6, 0, 0, tz="America/Sao_Paulo"), + labels=[ + constants.RJ_SMS_DEV_AGENT_LABEL.value, + ], + ) + ] +) diff --git a/pipelines/rj_sms/dump_api_regulacao_sisreg/tasks.py b/pipelines/rj_sms/dump_api_regulacao_sisreg/tasks.py new file mode 100644 index 000000000..12dbbb05a --- /dev/null +++ b/pipelines/rj_sms/dump_api_regulacao_sisreg/tasks.py @@ -0,0 +1,91 @@ +# -*- coding: utf-8 -*- +import os +import csv +import shutil +import time +import json +import requests +import pandas as pd +from prefect import task +from pipelines.utils.utils import log, get_vault_secret +from datetime import datetime, timedelta +from pipelines.rj_sms.tasks import create_partitions + + +@task +def get_patients(cnes): + # Get Autentication + url = "https://rest.smsrio.org/api/usuario/autenticar" + + # Retrieve the API key from Vault + try: + credential = get_vault_secret(secret_path="regulacao_sisreg")["data"] + except Exception as e: + log(f"Not able to retrieve Vault secret {e}", level="error") + + payload = json.dumps(credential) + + headers = { + "Content-Type": "application/json", + "Cookie": "PHPSESSID=b40302ab232addf99960f1d4ffa7073b", + } + + response = requests.request("POST", url, headers=headers, data=payload) + + if response.status_code == 200: + # Get token + dados_json = json.loads(response.text) + token = dados_json["dados"] + data_futura = datetime.today() + timedelta(days=5) + data_formatada = data_futura.strftime("%Y-%m-%d") + url = f"https://web2.smsrio.org/ambulatorio/api/pacientesAgendados/{cnes}/{data_formatada}/" + + payload = "" + headers = {"Authorization": "Bearer " + token} + + # Desired number of repetitions + num_repeticoes = 5 + + for _ in range(num_repeticoes): + response = requests.get(url, headers=headers, data=payload) + if response.status_code == 200: + log("Solicitação bem-sucedida!") + df = pd.read_json(response.text) + if df.empty: + log("DataFrame is empty!") + else: + return df + break + else: + log(f"Falha na solicitação, código de status: {response.status_code}") + # Aguarda 1 minuto antes da próxima solicitação + time.sleep(10) + + return pd.DataFrame() + + +@task +def save_patients(dataframe): + path = "pipelines/rj_sms/dump_api_regulacao_sisreg/data" + if os.path.exists(path): + shutil.rmtree(path, ignore_errors=True) + os.mkdir(path) + else: + os.mkdir(path) + data_futura = datetime.today() + timedelta(days=5) + data_formatada = data_futura.strftime("%Y-%m-%d") + filename = f"pipelines/rj_sms/dump_api_regulacao_sisreg/data/{data_formatada}.csv" + dataframe.to_csv( + filename, + sep=";", + quoting=csv.QUOTE_NONNUMERIC, + quotechar='"', + index=False, + encoding="utf-8", + ) + partition_directory = "pipelines/rj_sms/dump_api_regulacao_sisreg/data_partition" + shutil.rmtree(partition_directory, ignore_errors=True) + create_partitions.run( + "pipelines/rj_sms/dump_api_regulacao_sisreg/data", partition_directory + ) + return True diff --git a/pipelines/rj_sms/tasks.py b/pipelines/rj_sms/tasks.py index 82d16b98f..4abe6e281 100644 --- a/pipelines/rj_sms/tasks.py +++ b/pipelines/rj_sms/tasks.py @@ -700,3 +700,50 @@ def upload_to_datalake( except Exception as e: log(f"An error occurred: {e}", level="error") + + +@task +def cloud_function_request_patients( + url: str, + request_type: str = "GET", + body_params: list = None, + query_params: list = None, + env: str = "staging", +): + """ + Returns data from a URL sent as a parameter. + + Parâmetros: + - url (str): URL do endpoint. + - request_type (str): Request type [GET or POST] (default GET). + - body_params (list): list of parameters to send by body parameters. + - query_params (list): list of parameters to send by query parameters. + - env (str): Cloud function account [prod or staging] + Return: + - response: Request response HTTP. + """ + # Retrieve the API key from Vault + try: + credential = get_vault_secret(secret_path="prontuario_vitacare")["data"] + except Exception as e: + log(f"Not able to retrieve Vault secret {e}", level="error") + + if env == "prod": + function = "https://us-central1-rj-sms.cloudfunctions.net/vitacare" + else: + function = "https://us-central1-rj-sms-dev.cloudfunctions.net/vitacare" + + request = google.auth.transport.requests.Request() + audience = function + TOKEN = google.oauth2.id_token.fetch_id_token(request, audience) + payload = json.dumps( + { + "url": url, + "request_type": request_type, + "body_params": body_params, + "query_params": query_params, + "credential": credential, + } + ) + headers = {"Content-Type": "application/json", "Authorization": f"Bearer {TOKEN}"} + return requests.request("POST", function, headers=headers, data=payload) diff --git a/pyproject.toml b/pyproject.toml index 0ba4b7f52..68d68269b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,7 +9,7 @@ version = "0.1.0" PyMySQL = { extras = ["rsa"], version = "^1.0.2" } Shapely = "^1.8.1" Unidecode = "^1.3.6" -basedosdados = { version = "2.0.0b14", extras = ["upload"] } +basedosdados = {version = "2.0.0b14", extras = ["upload"]} black = "20.8b1" bs4 = "^0.0.1" croniter = "^1.3.5"