Commit 88061537 authored by dasharatha.vamshi's avatar dasharatha.vamshi

mlflow cleanup

parent c33f1b83
This diff is collapsed.
FROM python:3.8-buster
COPY . /code
WORKDIR /code
RUN pip install -r requirements.txt
CMD [ "python","app.py" ]
\ No newline at end of file
run_id,experiment_id,status,artifact_uri,start_time,end_time,metrics.MAE,metrics.MSE,metrics.R2,metrics.MAPE,metrics.RMSLE,metrics.RMSE,params.max_depth,params.min_samples_split,params.ccp_alpha,params.max_samples,params.min_weight_fraction_leaf,params.criterion,params.oob_score,params.min_samples_leaf,params.warm_start,params.max_leaf_nodes,params.n_estimators,params.verbose,params.min_impurity_decrease,params.bootstrap,params.random_state,params.n_jobs,params.max_features,tags.mlflow.runName,tags.mlflow.user,tags.mlflow.parentRunId,tags.mlflow.source.git.commit,tags.algorithm,tags.mlflow.source.type,tags.mlflow.source.name,tags.mlflow.log-model.history
3eef20e89fb948078ef99847ae4b4bc0,65,FINISHED,wasbs://mlflow-vm-container@azrmlilensqa006382180551.blob.core.windows.net/mlflow_qa/65/3eef20e89fb948078ef99847ae4b4bc0/artifacts,2023-03-20 11:12:56.468000+00:00,2023-03-20 11:14:51.309000+00:00,538.0753,499454.5687,0.1195,0.0855,0.1141,706.721,6,2,0.0,None,0.0,mae,False,2,False,None,10,0,0.05,True,2872,-1,1.0,stately-snake-36,dasharatha.vamshi,839ba85648e04ff79c411ffdce21213f,75c16b31e98f3fb1db70c11a6bd21aade07567d6,RandomForestRegressor,LOCAL,D:/ilens-ai/jubilant/R5/patch/r5-reaction-golden-batch/app.py,"[{""run_id"": ""3eef20e89fb948078ef99847ae4b4bc0"", ""artifact_path"": ""r5_model"", ""utc_time_created"": ""2023-03-20 11:14:46.153233"", ""flavors"": {""python_function"": {""model_path"": ""model.pkl"", ""loader_module"": ""mlflow.sklearn"", ""python_version"": ""3.8.16"", ""env"": ""conda.yaml""}, ""sklearn"": {""pickled_model"": ""model.pkl"", ""sklearn_version"": ""1.1.3"", ""serialization_format"": ""cloudpickle""}}, ""model_uuid"": null}]"
839ba85648e04ff79c411ffdce21213f,65,FINISHED,wasbs://mlflow-vm-container@azrmlilensqa006382180551.blob.core.windows.net/mlflow_qa/65/839ba85648e04ff79c411ffdce21213f/artifacts,2023-03-20 11:12:54.625000+00:00,2023-03-20 11:14:51.646000+00:00,,,,,,,,,,,,,,,,,,,,,,,,r5-golden-batch-test-1,dasharatha.vamshi,,75c16b31e98f3fb1db70c11a6bd21aade07567d6,,LOCAL,D:/ilens-ai/jubilant/R5/patch/r5-reaction-golden-batch/app.py,
14b392b1df58424b98cf47c6e7bfad7e,65,FINISHED,wasbs://mlflow-vm-container@azrmlilensqa006382180551.blob.core.windows.net/mlflow_qa/65/14b392b1df58424b98cf47c6e7bfad7e/artifacts,2023-03-20 11:05:34.682000+00:00,2023-03-20 11:07:24.960000+00:00,566.4853,708495.3303,0.164,0.088,0.1296,841.7216,3,10,0.0,None,0.0,mae,False,3,False,None,220,0,0.3,False,839,-1,log2,hilarious-yak-718,dasharatha.vamshi,b64389f4be80438eac4e87396f52ff96,75c16b31e98f3fb1db70c11a6bd21aade07567d6,RandomForestRegressor,LOCAL,D:/ilens-ai/jubilant/R5/patch/r5-reaction-golden-batch/app.py,"[{""run_id"": ""14b392b1df58424b98cf47c6e7bfad7e"", ""artifact_path"": ""r5_model"", ""utc_time_created"": ""2023-03-20 11:07:19.674442"", ""flavors"": {""python_function"": {""model_path"": ""model.pkl"", ""loader_module"": ""mlflow.sklearn"", ""python_version"": ""3.8.16"", ""env"": ""conda.yaml""}, ""sklearn"": {""pickled_model"": ""model.pkl"", ""sklearn_version"": ""1.1.3"", ""serialization_format"": ""cloudpickle""}}, ""model_uuid"": null}]"
0f35ac4a96fa413b9a15a54fe0b9b905,65,FAILED,wasbs://mlflow-vm-container@azrmlilensqa006382180551.blob.core.windows.net/mlflow_qa/65/0f35ac4a96fa413b9a15a54fe0b9b905/artifacts,2023-03-20 11:04:48.940000+00:00,2023-03-20 11:05:26.504000+00:00,,,,,,,,,,,,,,,,,,,,,,,,treasured-ape-95,dasharatha.vamshi,b64389f4be80438eac4e87396f52ff96,75c16b31e98f3fb1db70c11a6bd21aade07567d6,,LOCAL,D:/ilens-ai/jubilant/R5/patch/r5-reaction-golden-batch/app.py,
63df0f705ff44802aa8ff40a9fe0dd42,65,FINISHED,wasbs://mlflow-vm-container@azrmlilensqa006382180551.blob.core.windows.net/mlflow_qa/65/63df0f705ff44802aa8ff40a9fe0dd42/artifacts,2023-03-20 11:01:50.710000+00:00,2023-03-20 11:03:15.779000+00:00,464.6541,398781.9839,0.3022,0.0686,0.0919,631.4919,4,5,0.0,None,0.0,mse,False,2,False,None,40,0,0.002,True,2588,-1,log2,secretive-hen-994,dasharatha.vamshi,b64389f4be80438eac4e87396f52ff96,75c16b31e98f3fb1db70c11a6bd21aade07567d6,RandomForestRegressor,LOCAL,D:/ilens-ai/jubilant/R5/patch/r5-reaction-golden-batch/app.py,"[{""run_id"": ""63df0f705ff44802aa8ff40a9fe0dd42"", ""artifact_path"": ""r5_model"", ""utc_time_created"": ""2023-03-20 11:03:10.815296"", ""flavors"": {""python_function"": {""model_path"": ""model.pkl"", ""loader_module"": ""mlflow.sklearn"", ""python_version"": ""3.8.16"", ""env"": ""conda.yaml""}, ""sklearn"": {""pickled_model"": ""model.pkl"", ""sklearn_version"": ""1.1.3"", ""serialization_format"": ""cloudpickle""}}, ""model_uuid"": null}]"
8533b5b16bf04d03b9021e4a09df31dd,65,FINISHED,wasbs://mlflow-vm-container@azrmlilensqa006382180551.blob.core.windows.net/mlflow_qa/65/8533b5b16bf04d03b9021e4a09df31dd/artifacts,2023-03-20 10:59:10.251000+00:00,2023-03-20 11:01:36.039000+00:00,564.6727,644844.6845,0.2292,0.0872,0.1219,803.0222,8,10,0.0,None,0.0,mae,False,3,False,None,210,0,0.4,True,2017,-1,log2,rogue-koi-225,dasharatha.vamshi,b64389f4be80438eac4e87396f52ff96,75c16b31e98f3fb1db70c11a6bd21aade07567d6,RandomForestRegressor,LOCAL,D:/ilens-ai/jubilant/R5/patch/r5-reaction-golden-batch/app.py,"[{""run_id"": ""8533b5b16bf04d03b9021e4a09df31dd"", ""artifact_path"": ""r5_model"", ""utc_time_created"": ""2023-03-20 11:01:31.173586"", ""flavors"": {""python_function"": {""model_path"": ""model.pkl"", ""loader_module"": ""mlflow.sklearn"", ""python_version"": ""3.8.16"", ""env"": ""conda.yaml""}, ""sklearn"": {""pickled_model"": ""model.pkl"", ""sklearn_version"": ""1.1.3"", ""serialization_format"": ""cloudpickle""}}, ""model_uuid"": null}]"
b64389f4be80438eac4e87396f52ff96,65,FINISHED,wasbs://mlflow-vm-container@azrmlilensqa006382180551.blob.core.windows.net/mlflow_qa/65/b64389f4be80438eac4e87396f52ff96/artifacts,2023-03-20 10:22:30.234000+00:00,2023-03-20 11:07:25.260000+00:00,,,,,,,,,,,,,,,,,,,,,,,,r5-golden-batch-test,dasharatha.vamshi,,75c16b31e98f3fb1db70c11a6bd21aade07567d6,,LOCAL,D:/ilens-ai/jubilant/R5/patch/r5-reaction-golden-batch/app.py,
from loguru import logger
from scripts.constants.app_configuration import MlflowMetaData
from scripts.core.mlflow_util import MlflowCleanUp
if __name__ == '__main__':
try:
logger.info("Starting the Module...")
experiment_name = MlflowMetaData.EXPERIMENT_NAME
run_name = MlflowMetaData.RUN_NAME
model_name = MlflowMetaData.MODEL_NAME
logger.info(f"Performing cleanup for experiment: {experiment_name}, parent run: {run_name}, "
f"model name: {model_name}")
_mcu_ = MlflowCleanUp(experiment_name, run_name, model_name)
_mcu_.start_cleanup()
except Exception as e:
logger.exception(f"Error cleaning up because of error {e}")
[TIMEZONE]
required_tz=$REQUIRED_TZ
[MLFLOW]
mlflow_tracking_uri=$MLFLOW_TRACKING_URI
mlflow_tracking_username=$MLFLOW_TRACKING_USERNAME
mlflow_tracking_password=$MLFLOW_TRACKING_PASSWORD
azure_storage_connection_string=$AZURE_STORAGE_CONNECTION_STRING
azure_storage_access_key=$AZURE_STORAGE_ACCESS_KEY
experiment_name=$EXPERIMENT_NAME
run_name=$RUN_NAME
model_name=$MODEL_NAME
total_models_needed=$TOTAL_MODELS_NEEDED
\ No newline at end of file
REQUIRED_TZ=Asia/Kolkata
MLFLOW_TRACKING_URI=https://qa.unifytwin.com/mlflow/
MLFLOW_TRACKING_USERNAME=mlflow
MLFLOW_TRACKING_PASSWORD=MlFlOwQA#4321
AZURE_STORAGE_CONNECTION_STRING=DefaultEndpointsProtocol=https;AccountName=azrmlilensqa006382180551;AccountKey=tDGOKfiZ2svfoMvVmS0Fbpf0FTHfTq4wKYuDX7cAxlhve/3991QuzdvJHm9vWc+lo6mtC+x9yPSghWNR4+gacg==;EndpointSuffix=core.windows.net
AZURE_STORAGE_ACCESS_KEY=tDGOKfiZ2svfoMvVmS0Fbpf0FTHfTq4wKYuDX7cAxlhve/3991QuzdvJHm9vWc+lo6mtC+x9yPSghWNR4+gacg==
EXPERIMENT_NAME=Golden Batch Models Test
RUN_NAME=r5-golden-batch-test
MODEL_NAME=r5_model
TOTAL_MODELS_NEEDED=2
\ No newline at end of file
pytz==2021.3
loguru==0.5.3
scipy==1.7.1
numpy==1.21.2
mlflow==1.20.2
scikit-learn
simplejson==3.17.5
requests==2.27.1
pydantic==1.8.2
python-dotenv==0.19.2
PyYAML==6.0
kafka-python==1.4.7
SQLAlchemy==1.4.16
sqlparse==0.4.2
psycopg2==2.9.1
pycaret==3.0.0rc8
python-dateutil~=2.8.2
protobuf==3.20.1
azure-storage-blob==12.14.1
\ No newline at end of file
import os
import sys
from configparser import ConfigParser, BasicInterpolation
from dotenv import load_dotenv
# Configuration File Constants
_application_conf = f"./conf/application.conf"
_default_conf = f"./config.env"
load_dotenv(dotenv_path=_default_conf)
class EnvInterpolation(BasicInterpolation):
"""
Interpolation which expands environment variables in values.
"""
def before_get(self, parser, section, option, value, defaults):
value = super().before_get(parser, section, option, value, defaults)
if not os.path.expandvars(value).startswith("$"):
return os.path.expandvars(value)
else:
return
try:
config = ConfigParser(interpolation=EnvInterpolation())
config.read(_application_conf)
except Exception as e:
print(f"Error while loading the config: {e}")
print("Failed to Load Configuration. Exiting!!!")
sys.exit()
class Logging:
level = config.get("LOGGING", "level", fallback="INFO")
level = level if level else "INFO"
tb_flag = config.getboolean("LOGGING", "traceback", fallback=True)
tb_flag = tb_flag if tb_flag is not None else True
# Configuration Variables
REQUIRED_TZ = config['TIMEZONE']['required_tz']
class MlflowMetaData:
MLFLOW_TRACKING_URI = config['MLFLOW']['mlflow_tracking_uri']
MLFLOW_TRACKING_USERNAME = config['MLFLOW']['mlflow_tracking_username']
MLFLOW_TRACKING_PASSWORD = config['MLFLOW']['mlflow_tracking_password']
AZURE_STORAGE_CONNECTION_STRING = config['MLFLOW']['azure_storage_connection_string']
AZURE_STORAGE_ACCESS_KEY = config['MLFLOW']['azure_storage_access_key']
EXPERIMENT_NAME = config['MLFLOW']['experiment_name']
RUN_NAME = config['MLFLOW']['run_name']
MODEL_NAME = config['MLFLOW']['model_name']
TOTAL_MODELS_NEEDED = config['MLFLOW']['total_models_needed']
MODEL_NAME = 'model.pkl'
import os
import re
from datetime import datetime
import mlflow
import pandas as pd
import pytz
from dateutil import tz
from loguru import logger
from azure.storage.blob import BlobServiceClient
from scripts.constants.app_configuration import REQUIRED_TZ, MlflowMetaData
from scripts.constants.app_constants import MODEL_NAME
mlflow_tracking_uri = MlflowMetaData.MLFLOW_TRACKING_URI
AZURE_STORAGE_CONNECTION_STRING = MlflowMetaData.AZURE_STORAGE_CONNECTION_STRING
AZURE_STORAGE_ACCESS_KEY = MlflowMetaData.AZURE_STORAGE_ACCESS_KEY
os.environ["MLFLOW_TRACKING_USERNAME"] = MlflowMetaData.MLFLOW_TRACKING_USERNAME
os.environ["MLFLOW_TRACKING_PASSWORD"] = MlflowMetaData.MLFLOW_TRACKING_PASSWORD
os.environ["AZURE_STORAGE_CONNECTION_STRING"] = AZURE_STORAGE_CONNECTION_STRING
os.environ["AZURE_STORAGE_ACCESS_KEY"] = AZURE_STORAGE_ACCESS_KEY
mlflow.set_tracking_uri(mlflow_tracking_uri)
mlflow.set_registry_uri(mlflow_tracking_uri)
client = mlflow.tracking.MlflowClient()
class MlFlowUtil:
@staticmethod
def get_last_run_time_diff(run_info):
try:
logger.info(f"Checking the time difference in days")
df_time = run_info.copy()
df_time['end_time'] = pd.to_datetime(df_time['end_time']).dt.tz_convert(REQUIRED_TZ)
to_zone = tz.gettz(REQUIRED_TZ)
df_time["days"] = df_time['end_time'].dt.date
df_time["hours"] = df_time['end_time'].dt.hour
last_model_time = list(df_time['end_time'])[0].to_pydatetime()
today = datetime.now(pytz.utc)
central_current = today.astimezone(to_zone)
time_diff = central_current - last_model_time
return int(time_diff.days)
except Exception as e:
logger.warning(f"Exception while checking the last run time of the model - {e}")
return 0
@staticmethod
def log_model(model, model_name):
try:
mlflow.sklearn.log_model(model, model_name)
logger.info("logged the model")
return True
except Exception as e:
logger.exception(str(e))
@staticmethod
def log_metrics(metrics):
try:
updated_metric = {}
for key, value in metrics.items():
key = re.sub(r"[([{})\]]", "", key)
updated_metric[key] = value
mlflow.log_metrics(updated_metric)
return True
except Exception as e:
logger.exception(str(e))
@staticmethod
def log_hyper_param(hyper_params):
try:
mlflow.log_params(hyper_params)
return True
except Exception as e:
logger.exception(str(e))
@staticmethod
def set_tag(child_run_id, key, value):
try:
client.set_tag(run_id=child_run_id, key=key, value=value)
except Exception as e:
logger.exception(f"Exception while setting the tag - {e}")
@staticmethod
def remove_file_if_exists(path):
if os.path.exists(path):
os.remove(path)
@staticmethod
def delete_artifact(run_id, parent_run_name, artifact_uri, file_path, model_name):
logger.info(f"Deleting artifact for {run_id} under {parent_run_name}")
container_name = artifact_uri.split("//")[-1].split('@')[0]
mlflow_name = artifact_uri.split("//")[-1].split('@')[-1].split('/')[1]
mlflow_id = artifact_uri.split("//")[-1].split('@')[-1].split('/')[2]
path = f'{mlflow_name}/{mlflow_id}/{run_id}/artifacts/{file_path}/{model_name}'
logger.info(f'Deleting artifact from path: {path}')
blob_service_client = BlobServiceClient.from_connection_string(MlflowMetaData.AZURE_STORAGE_CONNECTION_STRING)
container_client = blob_service_client.get_container_client(container_name)
blob_client = container_client.get_blob_client(path)
blob_exists = blob_client.exists()
if blob_exists:
logger.info(f"The blob {path} exists, so deleting it")
blob_client.delete_blob()
else:
logger.info(f"The blob {path} does not exist, which means its already deleted")
class MlflowCleanUp:
def __init__(self, experiment_name, parent_run_name, model_save_name):
self.experiment_name = experiment_name
self.parent_run_name = parent_run_name
self.model_save_name = model_save_name
self._mfu_ = MlFlowUtil()
self.total_models_needed = int(MlflowMetaData.TOTAL_MODELS_NEEDED)
self.model_name = MODEL_NAME
self.model_history_key = 'tags.mlflow.log-model.history'
self.model_parent_run_id_key = 'tags.mlflow.parentRunId'
def check_experiment(self):
experiment_info = mlflow.get_experiment_by_name(self.experiment_name)
if experiment_info is None:
logger.info(f"No experiment found with name {self.experiment_name}")
return None
else:
logger.info(f"Proceeding with existing Experiment {self.experiment_name}")
mlflow.set_experiment(experiment_name=self.experiment_name)
experiment_info = mlflow.get_experiment_by_name(self.experiment_name)
experiment_id = experiment_info.experiment_id
return experiment_id
@staticmethod
def check_runs_data(experiment_id):
runs_df = mlflow.search_runs(experiment_id)
# parent_runs_df.to_csv('all-runs.csv', index=False)
if not runs_df.empty:
return runs_df
else:
logger.info('No runs found for the experiment...')
return None
def delete_run_model_data(self, df, run_name_mapping):
cols = ['run_id', 'artifact_uri', 'start_time', 'end_time', 'tags.mlflow.parentRunId',
'tags.mlflow.log-model.history']
parent_runs = list(set(list(df[self.model_parent_run_id_key])))
for parent_run_id in parent_runs:
logger.info(f'Checking for Run Name {run_name_mapping[parent_run_id]}')
temp_df = df[df[self.model_parent_run_id_key] == parent_run_id]
temp_df = temp_df[temp_df[self.model_history_key].notna()]
if not temp_df.empty:
temp_df = temp_df[cols]
total_models_present = len(temp_df)
logger.info(f'Total models present are {total_models_present}')
temp_df = temp_df.iloc[self.total_models_needed:]
logger.info(f'Total models to cleanup are {len(temp_df)}')
all_records = temp_df.to_dict('records')
if len(all_records) > 0:
for record in all_records:
run_id = record['run_id']
artifact_uri = record['artifact_uri']
self._mfu_.delete_artifact(run_id, run_name_mapping[parent_run_id], artifact_uri,
self.model_save_name, self.model_name)
else:
logger.info(f'No records to cleanup for Run {run_name_mapping[parent_run_id]}')
else:
logger.info(f'Nothing to cleanup for Run {run_name_mapping[parent_run_id]}')
def start_cleanup(self):
experiment_id = self.check_experiment()
if experiment_id is not None:
runs_df = self.check_runs_data(experiment_id)
if runs_df is not None:
run_id_list = list(runs_df['run_id'])
run_name_list = list(runs_df['tags.mlflow.runName'])
run_name_mapping = {}
for i in range(len(run_id_list)):
run_name_mapping[run_id_list[i]] = run_name_list[i]
# getting runs who have a parent-id
df = runs_df[runs_df[self.model_parent_run_id_key].notna()]
# getting runs who have a model logged
f_df = df[df[self.model_history_key].notna()]
self.delete_run_model_data(f_df, run_name_mapping)
else:
logger.info('No runs found for experiment, so no cleanup')
return False
else:
logger.info("Not a valid experiment...")
return False
import os
import re
from datetime import datetime
import mlflow
import pandas as pd
import pytz
from dateutil import tz
from loguru import logger
from azure.storage.blob import BlobServiceClient
from scripts.constants.app_configuration import REQUIRED_TZ, MlflowMetaData
mlflow_tracking_uri = MlflowMetaData.MLFLOW_TRACKING_URI
AZURE_STORAGE_CONNECTION_STRING = MlflowMetaData.AZURE_STORAGE_CONNECTION_STRING
AZURE_STORAGE_ACCESS_KEY = MlflowMetaData.AZURE_STORAGE_ACCESS_KEY
os.environ["MLFLOW_TRACKING_USERNAME"] = MlflowMetaData.MLFLOW_TRACKING_USERNAME
os.environ["MLFLOW_TRACKING_PASSWORD"] = MlflowMetaData.MLFLOW_TRACKING_PASSWORD
os.environ["AZURE_STORAGE_CONNECTION_STRING"] = AZURE_STORAGE_CONNECTION_STRING
os.environ["AZURE_STORAGE_ACCESS_KEY"] = AZURE_STORAGE_ACCESS_KEY
mlflow.set_tracking_uri(mlflow_tracking_uri)
mlflow.set_registry_uri(mlflow_tracking_uri)
client = mlflow.tracking.MlflowClient()
class MlFlowUtil:
@staticmethod
def get_last_run_time_diff(run_info):
try:
logger.info(f"Checking the time difference in days")
df_time = run_info.copy()
df_time['end_time'] = pd.to_datetime(df_time['end_time']).dt.tz_convert(REQUIRED_TZ)
to_zone = tz.gettz(REQUIRED_TZ)
df_time["days"] = df_time['end_time'].dt.date
df_time["hours"] = df_time['end_time'].dt.hour
last_model_time = list(df_time['end_time'])[0].to_pydatetime()
today = datetime.now(pytz.utc)
central_current = today.astimezone(to_zone)
time_diff = central_current - last_model_time
return int(time_diff.days)
except Exception as e:
logger.warning(f"Exception while checking the last run time of the model - {e}")
return 0
@staticmethod
def log_model(model, model_name):
try:
mlflow.sklearn.log_model(model, model_name)
logger.info("logged the model")
return True
except Exception as e:
logger.exception(str(e))
@staticmethod
def log_metrics(metrics):
try:
updated_metric = {}
for key, value in metrics.items():
key = re.sub(r"[([{})\]]", "", key)
updated_metric[key] = value
mlflow.log_metrics(updated_metric)
return True
except Exception as e:
logger.exception(str(e))
@staticmethod
def log_hyper_param(hyper_params):
try:
mlflow.log_params(hyper_params)
return True
except Exception as e:
logger.exception(str(e))
@staticmethod
def set_tag(child_run_id, key, value):
try:
client.set_tag(run_id=child_run_id, key=key, value=value)
except Exception as e:
logger.exception(f"Exception while setting the tag - {e}")
@staticmethod
def remove_file_if_exists(path):
if os.path.exists(path):
os.remove(path)
@staticmethod
def delete_artifact(run_id, artifact_uri, file_path):
container_name = artifact_uri.split("//")[-1].split('@')[0]
mlflow_name = artifact_uri.split("//")[-1].split('@')[-1].split('/')[1]
mlflow_id = artifact_uri.split("//")[-1].split('@')[-1].split('/')[2]
path = f'{mlflow_name}/{mlflow_id}/{run_id}/artifacts/{file_path}/requirements.txt'
logger.info(f'Deleting artifact from path: {path}')
blob_service_client = BlobServiceClient.from_connection_string(MlflowMetaData.AZURE_STORAGE_CONNECTION_STRING)
container_client = blob_service_client.get_container_client(container_name)
blob_client = container_client.get_blob_client(path)
blob_client.delete_blob()
class MlflowCleanUp:
def __init__(self, experiment_name, parent_run_name, model_save_name):
self.experiment_name = experiment_name
self.parent_run_name = parent_run_name
self.model_save_name = model_save_name
self._mfu_ = MlFlowUtil()
self.model_age = int(MlflowMetaData.MODEL_AGE_IN_DAYS)
def check_experiment(self):
experiment_info = mlflow.get_experiment_by_name(self.experiment_name)
if experiment_info is None:
logger.info(f"No experiment found with name {self.experiment_name}")
return None
else:
logger.info(f"Proceeding with existing Experiment {self.experiment_name}")
mlflow.set_experiment(experiment_name=self.experiment_name)
experiment_info = mlflow.get_experiment_by_name(self.experiment_name)
experiment_id = experiment_info.experiment_id
return experiment_id
@staticmethod
def check_parent_run(experiment_id):
parent_runs_df = mlflow.search_runs(experiment_id)
parent_runs_df.to_csv('all-runs.csv',index=False)
all_parent_runs = list(parent_runs_df['tags.mlflow.parentRunId'])
print(all_parent_runs)
if not parent_runs_df.empty:
parent_key = 'tags.mlflow.parentRunId'
parent_runs_df[parent_key].fillna('parent', inplace=True)
df = parent_runs_df[parent_runs_df[parent_key] == 'parent']
if not df.empty:
run_key = 'run_id'
logger.info('Parent runs found for the experiment')
parent_runs = list(df[run_key])
return {'parent_runs': parent_runs, 'df': parent_runs_df}
else:
logger.info('No parent runs found for the experiment')
return None
else:
logger.info('No runs found for the experiment...')
return None
def get_nested_runs(self, exp_id, parent_run_id):
"""
Recursively iterate through all child runs of the specified parent run and return a nested dictionary
with the parent run ID as the key and a dictionary of child run IDs and their nested child runs as the value.
"""
# Recursively get all child runs
child_runs_dict = {}
for child_run_id in mlflow.search_runs([exp_id], f"tags.mlflow.parentRunId = '{parent_run_id}'")["run_id"]:
nested_child_runs = self.get_nested_runs(exp_id, child_run_id)
child_runs_dict[child_run_id] = nested_child_runs if nested_child_runs else None
# Construct the nested dictionary with parent run ID as key and dictionary of child runs as value
return child_runs_dict
def check_under_parent_run(self, experiment_id, parent_run_id, df):
logger.info(f"Getting all runs under parent run {parent_run_id}")
child_runs = self.get_nested_runs(experiment_id, parent_run_id)
print(child_runs)
print(df.columns)
cols = ['run_id', 'artifact_uri', 'start_time', 'end_time', 'tags.mlflow.parentRunId',
'tags.mlflow.log-model.history']
df = df[cols]
print(df.columns)
# for child_run in child_runs:
# temp_df = df[df['run_id'] == child_run]
# artifact_uri = list(temp_df['artifact_uri'])[0]
# self._mfu_.delete_artifact(child_run, artifact_uri, self.model_save_name)
def start_cleanup(self):
experiment_id = self.check_experiment()
if experiment_id is not None:
parent_runs_dict = self.check_parent_run(experiment_id)
if parent_runs_dict is not None:
parent_runs = parent_runs_dict['parent_runs']
df = parent_runs_dict['df']
logger.info(f'Total parent runs found are {len(parent_runs)}')
for run in parent_runs:
self.check_under_parent_run(experiment_id, run, df)
else:
logger.info('No parent runs found for experiment, so no cleanup')
return False
else:
logger.info("Not a valid experiment...")
return False
def check_existing_model_retrain(self, latest_child_run_id, child_run_info, retrain):
"""
If retrain is True, it returns true as retraining is required.
If retrain is False, it checks the time difference between the last child run and the current time and returns
true or false depending on the time difference
:param latest_child_run_id: last child run id
:param child_run_info: last child run info
:param retrain: retrain flag
:return: final retrain flag
"""
if retrain:
logger.info("Retraining Needed...")
return True
else:
logger.info(f"Already trained model is present, checking the age of the existing model of run id "
f"{latest_child_run_id}")
time_diff = self._mfu_.get_last_run_time_diff(child_run_info)
return False
def forming_loading_path(self, latest_run_id):
"""
Creates the path from the child run id
:param latest_run_id: latest child run id
:return: the path to the model
"""
try:
model_name = self.model_save_name
return f"runs:/{latest_run_id}/{model_name}"
except Exception as e:
logger.exception(f"Exception while forming loading path - {e}")
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment