Skip to content

Commit

Permalink
fix quality checks
Browse files Browse the repository at this point in the history
  • Loading branch information
kachiann committed Aug 19, 2024
1 parent def023b commit f42d55a
Show file tree
Hide file tree
Showing 11 changed files with 258 additions and 123 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ setup:
mlflow:
mlflow server --backend-store-uri sqlite:///backend.db

# Target to do quality checks
quality_checks:
@echo "Running qualilty checks"
isort .
Expand Down
104 changes: 72 additions & 32 deletions monitoring/evidently_metrics_calculations.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@

import datetime
import time
import random
import logging
import random
import time

import joblib
import pandas as pd
import psycopg
from prefect import task, flow
from evidently.report import Report
from evidently import ColumnMapping
from evidently.metrics import ColumnDriftMetric, DatasetDriftMetric, DatasetMissingValuesMetric
from evidently.metrics import (
ColumnDriftMetric,
DatasetDriftMetric,
DatasetMissingValuesMetric,
)
from evidently.report import Report
from prefect import flow, task

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s]: %(message)s")
logging.basicConfig(
level=logging.INFO, format="%(asctime)s [%(levelname)s]: %(message)s"
)

SEND_TIMEOUT = 10
rand = random.Random()
Expand All @@ -26,66 +32,99 @@
)
"""

reference_data = pd.read_csv('../data/reference.csv')
with open('../models/dec_tre.bin', 'rb') as f_in:
reference_data = pd.read_csv("../data/reference.csv")
with open("../models/dec_tre.bin", "rb") as f_in:
model = joblib.load(f_in)

raw_data = pd.read_csv('../data/hour.csv')

features = ['season', 'holiday', 'workingday', 'weathersit', 'temp', 'atemp',
'hum', 'windspeed', 'hr', 'mnth', 'yr']
raw_data = pd.read_csv("../data/hour.csv")

features = [
"season",
"holiday",
"workingday",
"weathersit",
"temp",
"atemp",
"hum",
"windspeed",
"hr",
"mnth",
"yr",
]
column_mapping = ColumnMapping(
prediction='prediction',
numerical_features=features,
target=None
prediction="prediction", numerical_features=features, target=None
)

report = Report(
metrics=[
ColumnDriftMetric(column_name="prediction"),
DatasetDriftMetric(),
DatasetMissingValuesMetric(),
]
)

report = Report(metrics=[
ColumnDriftMetric(column_name='prediction'),
DatasetDriftMetric(),
DatasetMissingValuesMetric()
])

@task
def prep_db():
try:
with psycopg.connect("host=localhost port=5432 user=postgres password=example", autocommit=True) as conn:
with psycopg.connect(
"host=localhost port=5432 user=postgres password=example", autocommit=True
) as conn:
res = conn.execute("SELECT 1 FROM pg_database WHERE datname='test'")
if not res.fetchall():
conn.execute("CREATE DATABASE test;")
with psycopg.connect("host=localhost port=5432 dbname=test user=postgres password=example") as conn:
with psycopg.connect(
"host=localhost port=5432 dbname=test user=postgres password=example"
) as conn:
conn.execute(create_table_statement)
except Exception as e:
logging.error("Error preparing the database: %s", {e})


@task
def calculate_metrics_postgresql(curr):
try:
current_data = raw_data.copy()
current_data['prediction'] = model.predict(current_data[features])
current_data["prediction"] = model.predict(current_data[features])

report.run(reference_data=reference_data, current_data=current_data,
column_mapping=column_mapping)
report.run(
reference_data=reference_data,
current_data=current_data,
column_mapping=column_mapping,
)

result = report.as_dict()

prediction_drift = result['metrics'][0]['result']['drift_score']
num_drifted_columns = result['metrics'][1]['result']['number_of_drifted_columns']
share_missing_values = result['metrics'][2]['result']['current']['share_of_missing_values']
prediction_drift = result["metrics"][0]["result"]["drift_score"]
num_drifted_columns = result["metrics"][1]["result"][
"number_of_drifted_columns"
]
share_missing_values = result["metrics"][2]["result"]["current"][
"share_of_missing_values"
]

curr.execute(
"INSERT INTO dummy_metrics(timestamp, prediction_drift, num_drifted_columns, share_missing_values) VALUES (%s, %s, %s, %s)",
(datetime.datetime.now(), prediction_drift, num_drifted_columns, share_missing_values)
(
datetime.datetime.now(),
prediction_drift,
num_drifted_columns,
share_missing_values,
),
)
except Exception as e:
logging.error("Error calculating metrics: %s", {e})


@flow
def batch_monitoring_backfill():
prep_db()
last_send = datetime.datetime.now() - datetime.timedelta(seconds=10)
try:
with psycopg.connect("host=localhost port=5432 dbname=test user=postgres password=example", autocommit=True) as conn:
with psycopg.connect(
"host=localhost port=5432 dbname=test user=postgres password=example",
autocommit=True,
) as conn:
for _ in range(27):
with conn.cursor() as curr:
calculate_metrics_postgresql(curr)
Expand All @@ -99,5 +138,6 @@ def batch_monitoring_backfill():
except Exception as e:
logging.error("Error in batch monitoring: %s", e)

if __name__ == '__main__':

if __name__ == "__main__":
batch_monitoring_backfill()
8 changes: 4 additions & 4 deletions src/create_monitoring_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
from sklearn.model_selection import train_test_split

# Load the original dataset
df = pd.read_csv('../project-mlops/data/hour.csv')
df = pd.read_csv("../project-mlops/data/hour.csv")


# Split the data into reference (older data) and production (newer data)
reference_data, production_data = train_test_split(df, test_size=0.3, shuffle=False)

# Save the datasets
reference_data.to_csv('reference_data.csv', index=False)
production_data.to_csv('production_data.csv', index=False)
reference_data.to_csv("reference_data.csv", index=False)
production_data.to_csv("production_data.csv", index=False)

print("Reference and production datasets created successfully.")
print("Reference and production datasets created successfully.")
7 changes: 4 additions & 3 deletions src/experiment_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
"""

import os
import sys # Standard library import
import pickle
import sys # Standard library import

# Third-party imports
import mlflow
Expand All @@ -17,9 +17,11 @@
from sklearn.metrics import mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeRegressor

from constants import FEATURES

# Local application imports
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))


# Set the remote tracking URI
Expand Down Expand Up @@ -108,7 +110,6 @@ def train_and_log_model(
return run.info.run_id



def main():
"""Main function to load data, train models, and log to MLflow."""
# Path to the dataset
Expand Down
12 changes: 9 additions & 3 deletions src/ml_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
from sklearn.metrics import mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeRegressor

from constants import FEATURES

# Local application imports
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))

# Constants
DATA_PATH = "data/hour.csv"
Expand All @@ -21,6 +22,7 @@
MLFLOW_TRACKING_URI = "http://127.0.0.1:5000"
MLFLOW_EXPERIMENT = "MLflow Prefect Integration"


@task
def read_data(data_path=DATA_PATH):
try:
Expand All @@ -44,6 +46,7 @@ def preprocess_data(df):
except Exception as e:
raise RuntimeError(f"Data preprocessing failed: {e}") from e


@task
def train_model(X_train, y_train):
try:
Expand All @@ -54,6 +57,7 @@ def train_model(X_train, y_train):
except Exception as e:
raise RuntimeError(f"Model training failed: {e}") from e


@task
def evaluate_model(model, X_test, y_test):
try:
Expand All @@ -74,18 +78,19 @@ def log_model(model, mae, r2, model_dir=MODEL_DIR, model_filename=MODEL_FILENAME
mlflow.log_metric("mae", mae)
mlflow.log_metric("r2", r2)
mlflow.sklearn.log_model(model, "model")

# Save model locally
os.makedirs(model_dir, exist_ok=True)
pickle_path = os.path.join(model_dir, model_filename)
with open(pickle_path, "wb") as f:
pickle.dump(model, f)

# Log the model as an artifact in MLflow
mlflow.log_artifact(pickle_path)
except Exception as e:
raise RuntimeError(f"Logging model failed: {e}") from e


@flow(log_prints=True)
def ml_pipeline():
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
Expand All @@ -97,5 +102,6 @@ def ml_pipeline():
mae, r2 = evaluate_model(model, X_test, y_test)
log_model(model, mae, r2)


if __name__ == "__main__":
ml_pipeline()
Loading

0 comments on commit f42d55a

Please sign in to comment.