Commit e5389819 authored by dasharatha.vamshi's avatar dasharatha.vamshi

model auto trainer

parent e35e4194
__pycache__/
\ No newline at end of file
# Default ignored files
/shelf/
/workspace.xml
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyPackageRequirementsInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredPackages">
<value>
<list size="5">
<item index="0" class="java.lang.String" itemvalue="pandas" />
<item index="1" class="java.lang.String" itemvalue="scipy" />
<item index="2" class="java.lang.String" itemvalue="pydantic" />
<item index="3" class="java.lang.String" itemvalue="pytz" />
<item index="4" class="java.lang.String" itemvalue="simplejson" />
</list>
</value>
</option>
</inspection_tool>
<inspection_tool class="PyUnresolvedReferencesInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredIdentifiers">
<list>
<option value="scripts.core.data.data_import.KairosQueryBuilder.input_tag" />
<option value="scripts.core.data.data_import.KairosQueryBuilder.metric_name" />
<option value="scripts.core.data.data_import.KairosQueryBuilder.aggregation_name" />
<option value="scripts.core.data.data_import.DataPuller.postgres_engine" />
</list>
</option>
</inspection_tool>
</profile>
</component>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.9 (KBase)" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Python 3.9 (KBase)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/model_auto_trainer.iml" filepath="$PROJECT_DIR$/.idea/model_auto_trainer.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
import pandas as pd
from mlflow_util import ModelReTrainer
if __name__ == "__main__":
df = pd.read_csv('mlflow-test.csv')
obj = ModelReTrainer(df, 'instantaneous_export', 'Dalmia Solar Forecasting V2', 'Forecasting_kadapa_v1', 'versioning')
obj.get_latest_model()
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
import os
import mlflow
from loguru import logger
import re
import pandas as pd
from pycaret_util import GetBestModel
import pytz
from dateutil import tz
from datetime import datetime
REQUIRED_TZ = "Asia/Kolkata"
mlflow_tracking_uri = 'https://qa.unifytwin.com/mlflow/'
AZURE_STORAGE_CONNECTION_STRING = 'DefaultEndpointsProtocol=https;AccountName=azrmlilensqa006382180551;AccountKey=tD' \
'GOKfiZ2svfoMvVmS0Fbpf0FTHfTq4wKYuDX7cAxlhve/3991QuzdvJHm9vWc+lo6mtC+x9yPSghWNR' \
'4+gacg==;EndpointSuffix=core.windows.net'
AZURE_STORAGE_ACCESS_KEY = 'tDGOKfiZ2svfoMvVmS0Fbpf0FTHfTq4wKYuDX7cAxlhve/3991QuzdvJHm9vWc+lo6mtC+x9yPSghWNR4+gacg=='
os.environ["MLFLOW_TRACKING_USERNAME"] = 'mlflow'
os.environ["MLFLOW_TRACKING_PASSWORD"] = 'MlFlOwQA#4321'
os.environ["AZURE_STORAGE_CONNECTION_STRING"] = AZURE_STORAGE_CONNECTION_STRING
os.environ["AZURE_STORAGE_ACCESS_KEY"] = AZURE_STORAGE_ACCESS_KEY
mlflow.set_tracking_uri(mlflow_tracking_uri)
mlflow.set_registry_uri(mlflow_tracking_uri)
client = mlflow.tracking.MlflowClient()
class MlFlowUtil:
@staticmethod
def get_last_run_time_diff(run_info, date_param):
try:
logger.info(f"Checking the time difference in {date_param}")
df_time = run_info.copy()
df_time['end_time'] = pd.to_datetime(df_time['end_time']).dt.tz_convert(REQUIRED_TZ)
to_zone = tz.gettz(REQUIRED_TZ)
df_time["days"] = df_time['end_time'].dt.date
df_time["hours"] = df_time['end_time'].dt.hour
last_model_time = list(df_time['end_time'])[0].to_pydatetime()
today = datetime.now(pytz.utc)
central_current = today.astimezone(to_zone)
time_diff = central_current - last_model_time
if date_param.lower() == "days":
return int(time_diff.days)
elif date_param.lower() == "hours":
return int(time_diff.total_seconds() // 3600)
elif date_param.lower() == "minutes":
return int(time_diff.total_seconds() // 60)
else:
logger.info("No Valid Date format was given")
return 0
except Exception as e:
logger.warning(f"Exception while checking the last run time of the model - {e}")
return 0
@staticmethod
def log_model(model, model_name):
try:
mlflow.sklearn.log_model(model, model_name)
logger.info("logged the model")
return True
except Exception as e:
logger.exception(str(e))
@staticmethod
def log_metrics(metrics):
try:
updated_metric = {}
for key, value in metrics.items():
key = re.sub(r"[([{})\]]", "", key)
updated_metric[key] = value
mlflow.log_metrics(updated_metric)
return True
except Exception as e:
logger.exception(str(e))
@staticmethod
def log_hyper_param(hyper_params):
try:
mlflow.log_params(hyper_params)
return True
except Exception as e:
logger.exception(str(e))
@staticmethod
def set_tag(child_run_id, key, value):
try:
client.set_tag(run_id=child_run_id, key=key, value=value)
except Exception as e:
logger.exception(f"Exception while setting the tag - {e}")
class ModelReTrainer:
def __init__(self, df, output_feature, experiment_name, parent_run_name, model_save_name):
self.df = df
self.output_feature = output_feature
self.experiment_name = experiment_name
self.parent_run_name = parent_run_name
self.model_save_name = model_save_name
self._mfu_ = MlFlowUtil()
self.list_of_models = ['lr', 'knn']
self.fine_tune_tech = 'optuna'
self.comparison_metric = 'R2'
self.retrain_param_unit = 'minutes'
self.retrain_param_value = 1
self._gbm_ = GetBestModel(self.df, self.output_feature, self.list_of_models)
def check_create_experiment(self):
experiment_info = mlflow.get_experiment_by_name(self.experiment_name)
if experiment_info is None:
logger.info(f"No experiment found with name {self.experiment_name}, So creating one")
mlflow.create_experiment(self.experiment_name)
else:
logger.info(f"Proceeding with existing Experiment {self.experiment_name}")
mlflow.set_experiment(experiment_name=self.experiment_name)
experiment_info = mlflow.get_experiment_by_name(self.experiment_name)
experiment_id = experiment_info.experiment_id
return experiment_id
def check_create_parent_run(self, experiment_id):
parent_runs_df = mlflow.search_runs(experiment_id)
parent_runs_df = parent_runs_df[parent_runs_df['tags.mlflow.runName'] == self.parent_run_name]
if not parent_runs_df.empty:
logger.info(f"Proceeding with existing Parent Run {self.parent_run_name}")
return list(parent_runs_df['run_id'])[0]
# no parent run found
logger.info(f"No Parent Run present {self.parent_run_name}")
with mlflow.start_run(experiment_id=experiment_id, run_name=self.parent_run_name) as run:
logger.info(f"Creating the parent Run {self.parent_run_name} with Parent Run Id {run.info.run_id}")
return run.info.run_id
def check_create_child_run(self, experiment_id, parent_run_id):
child_runs_df = mlflow.search_runs(experiment_id, filter_string=f"tags.mlflow.parentRunId='{parent_run_id}'")
if not child_runs_df.empty:
logger.info(f"Already Child runs are present for Parent Run Id {parent_run_id}")
# child_runs_df.to_csv('child.csv',index=False)
child_run_id, child_run_history, retrain = self.get_latest_child_run(experiment_id, parent_run_id,
child_runs_df)
return child_run_id, child_run_history, retrain
else:
logger.info(f"Child runs are not present for Parent Run Id {parent_run_id}")
with mlflow.start_run(experiment_id=experiment_id, run_id=parent_run_id, nested=True):
with mlflow.start_run(experiment_id=experiment_id, nested=True) as child_run:
return child_run.info.run_id, None, True
def get_latest_child_run(self, experiment_id, parent_run_id, runs_df):
history_key = 'tags.mlflow.log-model.history'
if history_key in runs_df.columns:
runs_df = runs_df[runs_df[history_key].notna()]
else:
runs_df = runs_df.iloc[:0]
if runs_df.empty:
logger.info("Existing Child Runs doesn't contain any model to run. So creating new child run")
with mlflow.start_run(experiment_id=experiment_id, run_id=parent_run_id, nested=True):
with mlflow.start_run(experiment_id=experiment_id, nested=True) as child_run:
return child_run.info.run_id, None, True
latest_child_run_id = list(runs_df['run_id'])[0]
latest_child_history = list(runs_df['tags.mlflow.log-model.history'])[0]
latest_run_info = runs_df.iloc[:1]
retrain = False
day_check_flag = self.check_existing_model_retrain(latest_child_run_id, latest_run_info, retrain)
if day_check_flag:
with mlflow.start_run(experiment_id=experiment_id, run_id=parent_run_id, nested=True):
with mlflow.start_run(experiment_id=experiment_id, nested=True) as child_run:
return child_run.info.run_id, None, True
return latest_child_run_id, latest_child_history, retrain
@staticmethod
def load_model_pyfunc(model_path):
try:
model = mlflow.pyfunc.load_model(model_path)
logger.info("loading the model")
return model
except Exception as e:
logger.exception(str(e))
def check_existing_model_retrain(self, latest_child_run_id, child_run_info, retrain):
# edit this to check the time difference between the last trained model and the configured time difference
if retrain:
logger.info("Retraining Needed...")
return True
else:
logger.info(f"Already trained model is present, checking the age of the existing model of run id "
f"{latest_child_run_id}")
time_diff = self._mfu_.get_last_run_time_diff(child_run_info, self.retrain_param_unit)
logger.info(f"Time difference is {time_diff} {self.retrain_param_unit}")
if time_diff >= self.retrain_param_value:
logger.info(f"Retraining needed as the last trained model time exceeds the mentioned time difference "
f"{self.retrain_param_value} {self.retrain_param_unit}")
return True
else:
logger.info(f"Retraining not needed as the last trained model time doesnt exceeds the mentioned time "
f"difference {self.retrain_param_value} {self.retrain_param_unit}")
return False
def forming_loading_path(self, latest_run_id):
try:
model_name = self.model_save_name
return f"runs:/{latest_run_id}/{model_name}"
except Exception as e:
logger.exception(f"Exception while forming loading path - {e}")
def model_trainer(self, experiment_id, parent_run_id, child_run_id):
with mlflow.start_run(experiment_id=experiment_id, run_id=parent_run_id, nested=True):
with mlflow.start_run(experiment_id=experiment_id, run_id=child_run_id, nested=True):
model, model_name, metrics, hyperparams = self._gbm_.compare_get_best_model(self.fine_tune_tech,
self.comparison_metric)
self._mfu_.log_model(model=model, model_name=self.model_save_name)
self._mfu_.log_metrics(metrics=metrics)
self._mfu_.log_hyper_param(hyper_params=hyperparams)
self._mfu_.set_tag(child_run_id=child_run_id, key="algorithm", value=model_name)
return model
def get_latest_model(self):
experiment_id = self.check_create_experiment()
parent_run_id = self.check_create_parent_run(experiment_id)
child_run_id, child_run_history, retrain = self.check_create_child_run(experiment_id, parent_run_id)
logger.info(f"Retrain flag is {retrain}")
if retrain:
logger.info("Retraining needed")
self.model_trainer(experiment_id, parent_run_id, child_run_id)
logger.info("New model trained successfully")
else:
logger.info(f"No retraining needed. proceeding to load the last child run model {child_run_id}")
logger.info(f"Loading the model from the child run id {child_run_id}")
final_model = self.load_model_pyfunc(model_path=self.forming_loading_path(latest_run_id=child_run_id))
print(final_model)
return final_model
from pycaret import regression
from loguru import logger
class GetBestModel:
def __init__(self, df, target_col_list, list_of_models, no_of_models=1):
self.df = df
self.target_col_list = target_col_list
self.no_of_models = no_of_models
self.list_of_models = list_of_models
def compare_get_best_model(self, fine_tune_tech, comparison_metric):
try:
logger.info("Using Pycaret to train mentioned models")
regression.setup(data=self.df, target=self.target_col_list)
logger.info(f"Selecting the best model using the metric {comparison_metric}")
best_model = regression.compare_models(include=self.list_of_models, sort=comparison_metric,
n_select=self.no_of_models)
logger.info("Tuning the Model")
tuned_model = regression.tune_model(best_model, optimize=comparison_metric, search_library=fine_tune_tech)
results = regression.pull()
get_best_model_row = results.iloc[0]
best_metrics = get_best_model_row.to_dict()
best_metrics.pop('Model', None)
model_name = str(tuned_model).split('(')[0]
hyper_params = tuned_model.get_params()
logger.info("Model Training Completed")
return tuned_model, model_name, best_metrics, hyper_params
except Exception as e:
logger.info(f"Unable to select the best model - {e}")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment