From 6d8c33ead77fdb917b62de057946aa1dc4a24cf1 Mon Sep 17 00:00:00 2001 From: Ryan Lam <53351930+ryan-lam@users.noreply.github.com> Date: Thu, 26 Oct 2023 19:42:00 -0400 Subject: [PATCH] create update weather microservice --- update_weather_service/.env.sample | 7 ++ update_weather_service/main.py | 53 +++++++++++++ update_weather_service/requirements.txt | 24 ++++++ update_weather_service/update_weather.py | 98 ++++++++++++++++++++++++ 4 files changed, 182 insertions(+) create mode 100644 update_weather_service/.env.sample create mode 100644 update_weather_service/main.py create mode 100644 update_weather_service/requirements.txt create mode 100644 update_weather_service/update_weather.py diff --git a/update_weather_service/.env.sample b/update_weather_service/.env.sample new file mode 100644 index 0000000..c0ef2e2 --- /dev/null +++ b/update_weather_service/.env.sample @@ -0,0 +1,7 @@ +DATABASE_USER= +DATABASE_PASSWORD= +DATABASE_HOST= +DATABASE_NAME= +AUTH_KEY= +FLASK_APP_PORT=7001 +OPENWEATHERMAP_API_KEY= diff --git a/update_weather_service/main.py b/update_weather_service/main.py new file mode 100644 index 0000000..7730038 --- /dev/null +++ b/update_weather_service/main.py @@ -0,0 +1,53 @@ +import json +import os + +from dotenv import load_dotenv +from flask import Flask, request +from update_weather import update_weather + +load_dotenv() +DATABASE_USER = os.environ.get("DATABASE_USER") +DATABASE_PASSWORD = os.environ.get("DATABASE_PASSWORD") +DATABASE_HOST = os.environ.get("DATABASE_HOST") +DATABASE_NAME = os.environ.get("DATABASE_NAME") + +AUTH_KEY = os.environ.get("AUTH_KEY") +FLASK_APP_PORT = os.environ.get("FLASK_APP_PORT") +OPENWEATHERMAP_API_KEY = os.environ.get("OPENWEATHERMAP_API_KEY") + + +app = Flask(__name__) + + +def authorized(auth_key): + return auth_key == AUTH_KEY + + +@app.route("/", methods=["GET"]) +def index(): + return "update weather service" + + +@app.route("/update-weather", methods=["POST"]) +def update_weather(): + body = json.loads(request.data) + auth_key = body.get("auth_key", None) + id = body.get("id", None) + if not authorized(auth_key): + return "Not authorized", 401 + if id is None or int(id) < 1: + return "Invalid request body", 400 + + res, status = update_weather( + DATABASE_USER, + DATABASE_PASSWORD, + DATABASE_HOST, + DATABASE_NAME, + OPENWEATHERMAP_API_KEY, + weather_row_id=int(id), + ) + return res, status + + +if __name__ == "__main__": + app.run(debug=False, host="0.0.0.0", port=FLASK_APP_PORT) diff --git a/update_weather_service/requirements.txt b/update_weather_service/requirements.txt new file mode 100644 index 0000000..55c6462 --- /dev/null +++ b/update_weather_service/requirements.txt @@ -0,0 +1,24 @@ +blinker==1.6.3 +certifi==2023.7.22 +charset-normalizer==3.3.1 +click==8.1.7 +colorama==0.4.6 +Flask==3.0.0 +greenlet==3.0.1 +idna==3.4 +itsdangerous==2.1.2 +Jinja2==3.1.2 +MarkupSafe==2.1.3 +numpy==1.26.1 +pandas==2.1.2 +psycopg2==2.9.9 +python-dateutil==2.8.2 +python-dotenv==1.0.0 +pytz==2023.3.post1 +requests==2.31.0 +six==1.16.0 +SQLAlchemy==2.0.22 +typing_extensions==4.8.0 +tzdata==2023.3 +urllib3==2.0.7 +Werkzeug==3.0.1 diff --git a/update_weather_service/update_weather.py b/update_weather_service/update_weather.py new file mode 100644 index 0000000..76e84d2 --- /dev/null +++ b/update_weather_service/update_weather.py @@ -0,0 +1,98 @@ +import pandas as pd +from sqlalchemy import create_engine, inspect +from datetime import datetime +import requests + +API_BASE_URL = "https://api.openweathermap.org/data/2.5/weather" + + +def get_weather(lat, long, API_KEY): # Copied from etl_weather + units = "metric" + exclude = "minutely,hourly,daily,alerts" # To exclude certain weather reports, right now just using current + url = f"{API_BASE_URL}?lat={lat}&lon={long}&units={units}&appid={API_KEY}" + response = requests.get(url) + if response.status_code != 200: + print(response.json()) + raise response.raise_for_status() + + weather_data = response.json() + precipitation = ( + 0 + if "rain" not in weather_data + else weather_data["rain"]["1h"] or weather_data["rain"]["3h"] + ) + weather_dict = { + "Latitude": lat, + "Longitude": long, + "Temperature (C)": weather_data["main"]["temp"], + "Wind Speed (m/s)": weather_data["wind"]["speed"], + "Wind Direction": weather_data["wind"]["deg"], + "Weather": weather_data["weather"][0]["main"], + "Weather Description": weather_data["weather"][0]["description"], + "Pressure (hPa)": weather_data["main"]["pressure"], + "Precipitation (mm)": precipitation, + "Cloud Cover": weather_data["clouds"]["all"], + "Humidity": weather_data["main"]["humidity"], + } + return weather_dict + + +def update_weather(db_user, db_password, db_host, db_name, API_KEY, weather_row_id=1): + assert weather_row_id >= 1 + engine = create_engine( + f"postgresql+psycopg2://{db_user}:{db_password}@{db_host}/{db_name}" + ) + + if not inspect(engine).has_table("weather"): + raise SystemError("weather table does not exist in database") + weather_df = pd.read_sql_query(sql="SELECT * FROM weather", con=engine) + weather_df.index += 1 # Make it 1-indexed + weather_df_len = len(weather_df) + 1 + + if weather_row_id >= weather_df_len: + return ( + f"weather_row_id {weather_row_id} exceeds number of weather rows ({len(weather_df)})", + 404, + ) + + print(f"Updating weather data from id {weather_row_id}...") + for i in range(weather_row_id, weather_df_len): + lat = weather_df.loc[i, "lat"] + lon = weather_df.loc[i, "lon"] + weather_data = get_weather(lat, lon, API_KEY) + weather_df.loc[i, "temperature"] = weather_data["Temperature (C)"] + weather_df.loc[i, "humidity"] = weather_data["Humidity"] + weather_df.loc[i, "wind_speed"] = weather_data["Wind Speed (m/s)"] + weather_df.loc[i, "wind_direction"] = weather_data["Wind Direction"] + weather_df.loc[i, "cloud_cover"] = weather_data["Cloud Cover"] + + weather_response = weather_df.to_sql( + name="weather", + con=engine, + schema="public", + if_exists="replace", + index=False, + method="multi", + ) + if weather_df.shape[0] != weather_response: + raise SystemError("weather table update failed") + res = f"weather table starting at id={weather_row_id} updated at {datetime.now().strftime('%m/%d/%Y %H:%M:%S')}" + print(res) + return res, 200 + + +if __name__ == "__main__": + db_user = "" + db_password = "" + db_host = "" + db_name = "" + API_KEY = "" + weather_row_id = 1 + update_weather( + db_user, + db_password, + db_host, + db_name, + API_KEY, + weather_row_id, + )