Commit 925ca1fc authored by dasharatha.vamshi's avatar dasharatha.vamshi

updated

parent e5389819
This diff is collapsed.
...@@ -4,5 +4,9 @@ from mlflow_util import ModelReTrainer ...@@ -4,5 +4,9 @@ from mlflow_util import ModelReTrainer
if __name__ == "__main__": if __name__ == "__main__":
df = pd.read_csv('mlflow-test.csv') df = pd.read_csv('mlflow-test.csv')
obj = ModelReTrainer(df, 'instantaneous_export', 'Dalmia Solar Forecasting V2', 'Forecasting_kadapa_v1', 'versioning') feature = 'instantaneous_export'
exp_name = 'Dalmia Solar Forecasting V2'
parent_run_name = 'Forecasting_kadapa_v1'
model_name = 'versioning'
obj = ModelReTrainer(df, feature, exp_name, parent_run_name, model_name)
obj.get_latest_model() obj.get_latest_model()
This source diff could not be displayed because it is too large. You can view the blob instead.
...@@ -107,6 +107,10 @@ class ModelReTrainer: ...@@ -107,6 +107,10 @@ class ModelReTrainer:
self._gbm_ = GetBestModel(self.df, self.output_feature, self.list_of_models) self._gbm_ = GetBestModel(self.df, self.output_feature, self.list_of_models)
def check_create_experiment(self): def check_create_experiment(self):
"""
check if experiment exists, if not creates a new experiment
:return: experiment_id of the experiment
"""
experiment_info = mlflow.get_experiment_by_name(self.experiment_name) experiment_info = mlflow.get_experiment_by_name(self.experiment_name)
if experiment_info is None: if experiment_info is None:
logger.info(f"No experiment found with name {self.experiment_name}, So creating one") logger.info(f"No experiment found with name {self.experiment_name}, So creating one")
...@@ -119,6 +123,11 @@ class ModelReTrainer: ...@@ -119,6 +123,11 @@ class ModelReTrainer:
return experiment_id return experiment_id
def check_create_parent_run(self, experiment_id): def check_create_parent_run(self, experiment_id):
"""
check if a parent run exists in the experiment, if not create it with the mentioned parent run name
:param experiment_id: Experiment id
:return: returns the parent run id
"""
parent_runs_df = mlflow.search_runs(experiment_id) parent_runs_df = mlflow.search_runs(experiment_id)
parent_runs_df = parent_runs_df[parent_runs_df['tags.mlflow.runName'] == self.parent_run_name] parent_runs_df = parent_runs_df[parent_runs_df['tags.mlflow.runName'] == self.parent_run_name]
if not parent_runs_df.empty: if not parent_runs_df.empty:
...@@ -131,20 +140,35 @@ class ModelReTrainer: ...@@ -131,20 +140,35 @@ class ModelReTrainer:
return run.info.run_id return run.info.run_id
def check_create_child_run(self, experiment_id, parent_run_id): def check_create_child_run(self, experiment_id, parent_run_id):
"""
check if a child run exists in the experiment id under the parent run id
if exists take the child run id which has the model saved and validate when was it lastly trained.
Based on the lastly trained see if you have to retrain or not. if retrain create a new child run
else if no child run exists under the parent run id of experiment id, create a new child run
:param experiment_id: experiment id
:param parent_run_id: parent run id
:return: child run id, retrain flag
"""
child_runs_df = mlflow.search_runs(experiment_id, filter_string=f"tags.mlflow.parentRunId='{parent_run_id}'") child_runs_df = mlflow.search_runs(experiment_id, filter_string=f"tags.mlflow.parentRunId='{parent_run_id}'")
if not child_runs_df.empty: if not child_runs_df.empty:
logger.info(f"Already Child runs are present for Parent Run Id {parent_run_id}") logger.info(f"Already Child runs are present for Parent Run Id {parent_run_id}")
# child_runs_df.to_csv('child.csv',index=False) child_run_id, retrain = self.get_latest_child_run(experiment_id, parent_run_id, child_runs_df)
child_run_id, child_run_history, retrain = self.get_latest_child_run(experiment_id, parent_run_id, return child_run_id, retrain
child_runs_df)
return child_run_id, child_run_history, retrain
else: else:
logger.info(f"Child runs are not present for Parent Run Id {parent_run_id}") logger.info(f"Child runs are not present for Parent Run Id {parent_run_id}")
with mlflow.start_run(experiment_id=experiment_id, run_id=parent_run_id, nested=True): with mlflow.start_run(experiment_id=experiment_id, run_id=parent_run_id, nested=True):
with mlflow.start_run(experiment_id=experiment_id, nested=True) as child_run: with mlflow.start_run(experiment_id=experiment_id, nested=True) as child_run:
return child_run.info.run_id, None, True return child_run.info.run_id, True
def get_latest_child_run(self, experiment_id, parent_run_id, runs_df): def get_latest_child_run(self, experiment_id, parent_run_id, runs_df):
"""
Check if child runs are present. if not create a new child run. Otherwise, validate the last run time and
create a new child run if retraining needed or take the last child run id which has model saved
:param experiment_id: experiment id
:param parent_run_id: parent run id
:param runs_df: the child runs of the parent id
:return: last child run id, retrain flag
"""
history_key = 'tags.mlflow.log-model.history' history_key = 'tags.mlflow.log-model.history'
if history_key in runs_df.columns: if history_key in runs_df.columns:
runs_df = runs_df[runs_df[history_key].notna()] runs_df = runs_df[runs_df[history_key].notna()]
...@@ -154,20 +178,24 @@ class ModelReTrainer: ...@@ -154,20 +178,24 @@ class ModelReTrainer:
logger.info("Existing Child Runs doesn't contain any model to run. So creating new child run") logger.info("Existing Child Runs doesn't contain any model to run. So creating new child run")
with mlflow.start_run(experiment_id=experiment_id, run_id=parent_run_id, nested=True): with mlflow.start_run(experiment_id=experiment_id, run_id=parent_run_id, nested=True):
with mlflow.start_run(experiment_id=experiment_id, nested=True) as child_run: with mlflow.start_run(experiment_id=experiment_id, nested=True) as child_run:
return child_run.info.run_id, None, True return child_run.info.run_id, True
latest_child_run_id = list(runs_df['run_id'])[0] latest_child_run_id = list(runs_df['run_id'])[0]
latest_child_history = list(runs_df['tags.mlflow.log-model.history'])[0]
latest_run_info = runs_df.iloc[:1] latest_run_info = runs_df.iloc[:1]
retrain = False retrain = False
day_check_flag = self.check_existing_model_retrain(latest_child_run_id, latest_run_info, retrain) day_check_flag = self.check_existing_model_retrain(latest_child_run_id, latest_run_info, retrain)
if day_check_flag: if day_check_flag:
with mlflow.start_run(experiment_id=experiment_id, run_id=parent_run_id, nested=True): with mlflow.start_run(experiment_id=experiment_id, run_id=parent_run_id, nested=True):
with mlflow.start_run(experiment_id=experiment_id, nested=True) as child_run: with mlflow.start_run(experiment_id=experiment_id, nested=True) as child_run:
return child_run.info.run_id, None, True return child_run.info.run_id, True
return latest_child_run_id, latest_child_history, retrain return latest_child_run_id, retrain
@staticmethod @staticmethod
def load_model_pyfunc(model_path): def load_model_pyfunc(model_path):
"""
Function to load the model from mlflow artifact path
:param model_path: model path on mlflow
:return: loaded model
"""
try: try:
model = mlflow.pyfunc.load_model(model_path) model = mlflow.pyfunc.load_model(model_path)
logger.info("loading the model") logger.info("loading the model")
...@@ -176,7 +204,15 @@ class ModelReTrainer: ...@@ -176,7 +204,15 @@ class ModelReTrainer:
logger.exception(str(e)) logger.exception(str(e))
def check_existing_model_retrain(self, latest_child_run_id, child_run_info, retrain): def check_existing_model_retrain(self, latest_child_run_id, child_run_info, retrain):
# edit this to check the time difference between the last trained model and the configured time difference """
If retrain is True, it returns true as retraining is required.
If retrain is False, it checks the time difference between the last child run and the current time and returns
true or false depending on the time difference
:param latest_child_run_id: last child run id
:param child_run_info: last child run info
:param retrain: retrain flag
:return: final retrain flag
"""
if retrain: if retrain:
logger.info("Retraining Needed...") logger.info("Retraining Needed...")
return True return True
...@@ -195,6 +231,11 @@ class ModelReTrainer: ...@@ -195,6 +231,11 @@ class ModelReTrainer:
return False return False
def forming_loading_path(self, latest_run_id): def forming_loading_path(self, latest_run_id):
"""
Creates the path from the child run id
:param latest_run_id: latest child run id
:return: the path to the model
"""
try: try:
model_name = self.model_save_name model_name = self.model_save_name
return f"runs:/{latest_run_id}/{model_name}" return f"runs:/{latest_run_id}/{model_name}"
...@@ -202,6 +243,13 @@ class ModelReTrainer: ...@@ -202,6 +243,13 @@ class ModelReTrainer:
logger.exception(f"Exception while forming loading path - {e}") logger.exception(f"Exception while forming loading path - {e}")
def model_trainer(self, experiment_id, parent_run_id, child_run_id): def model_trainer(self, experiment_id, parent_run_id, child_run_id):
"""
Using the experiment id, parent run id and child run id, it will train the model
:param experiment_id: experiment id
:param parent_run_id: parent run id
:param child_run_id: child run id
:return: the final model
"""
with mlflow.start_run(experiment_id=experiment_id, run_id=parent_run_id, nested=True): with mlflow.start_run(experiment_id=experiment_id, run_id=parent_run_id, nested=True):
with mlflow.start_run(experiment_id=experiment_id, run_id=child_run_id, nested=True): with mlflow.start_run(experiment_id=experiment_id, run_id=child_run_id, nested=True):
model, model_name, metrics, hyperparams = self._gbm_.compare_get_best_model(self.fine_tune_tech, model, model_name, metrics, hyperparams = self._gbm_.compare_get_best_model(self.fine_tune_tech,
...@@ -213,9 +261,13 @@ class ModelReTrainer: ...@@ -213,9 +261,13 @@ class ModelReTrainer:
return model return model
def get_latest_model(self): def get_latest_model(self):
"""
This is the Main function which will return the latest model
:return:
"""
experiment_id = self.check_create_experiment() experiment_id = self.check_create_experiment()
parent_run_id = self.check_create_parent_run(experiment_id) parent_run_id = self.check_create_parent_run(experiment_id)
child_run_id, child_run_history, retrain = self.check_create_child_run(experiment_id, parent_run_id) child_run_id, retrain = self.check_create_child_run(experiment_id, parent_run_id)
logger.info(f"Retrain flag is {retrain}") logger.info(f"Retrain flag is {retrain}")
if retrain: if retrain:
logger.info("Retraining needed") logger.info("Retraining needed")
......
...@@ -10,6 +10,12 @@ class GetBestModel: ...@@ -10,6 +10,12 @@ class GetBestModel:
self.list_of_models = list_of_models self.list_of_models = list_of_models
def compare_get_best_model(self, fine_tune_tech, comparison_metric): def compare_get_best_model(self, fine_tune_tech, comparison_metric):
"""
Train and compares the model based on the finetune tech and comparison metric
:param fine_tune_tech: search library for fine-tuning of the selected model
:param comparison_metric: metrics to select the best model
:return: the best model, model name, metrics and parameters
"""
try: try:
logger.info("Using Pycaret to train mentioned models") logger.info("Using Pycaret to train mentioned models")
regression.setup(data=self.df, target=self.target_col_list) regression.setup(data=self.df, target=self.target_col_list)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment