Commit 295c841d authored by aakash.bedi's avatar aakash.bedi

added city logic for mlflow utility

parent b1a1c9a3
Pipeline #59548 failed with stage
...@@ -13,7 +13,6 @@ from scripts.core.engine.raw_predicted_tags import get_raw_predicted_tags ...@@ -13,7 +13,6 @@ from scripts.core.engine.raw_predicted_tags import get_raw_predicted_tags
warnings.filterwarnings("ignore") warnings.filterwarnings("ignore")
base_path = 'data_folder'
start_date, end_date, start_timestamp, end_timestamp = KairosStartEndDate().start_end_date() start_date, end_date, start_timestamp, end_timestamp = KairosStartEndDate().start_end_date()
...@@ -23,7 +22,7 @@ def get_tag_details(): ...@@ -23,7 +22,7 @@ def get_tag_details():
logger.info(f'raw tags dataframe shape - {df_raw_tags.shape}') logger.info(f'raw tags dataframe shape - {df_raw_tags.shape}')
logger.info(f'predicted tags dataframe shape - {df_predicted_tags.shape}') logger.info(f'predicted tags dataframe shape - {df_predicted_tags.shape}')
df = get_tags_data(tags=df_raw_tags, start_timestamp=start_timestamp, end_timestamp=end_timestamp) df = get_tags_data(df_input_tags=df_raw_tags, start_timestamp=start_timestamp, end_timestamp=end_timestamp)
logger.info(f'Shape of final df - {df.shape}') logger.info(f'Shape of final df - {df.shape}')
mppt_data = GetData() mppt_data = GetData()
......
This source diff could not be displayed because it is too large. You can view the blob instead.
This diff is collapsed.
This diff is collapsed.
...@@ -30,7 +30,7 @@ class TrainingInference: ...@@ -30,7 +30,7 @@ class TrainingInference:
df_std.reset_index(drop=True, inplace=True) df_std.reset_index(drop=True, inplace=True)
inv_mppt_id = f'{inv_id}_{mppt_id}' inv_mppt_id = f'{inv_id}_{mppt_id}'
model, pre_trained = ModelLoad().model_manager(df=df_std, target='current_mppt', model, pre_trained = ModelLoad().model_manager(df=df_std, target='current_mppt',
inv_mppt_id=inv_mppt_id) inv_mppt_id=inv_mppt_id, city='ariyalur')
return model, scaler_x, scaler_y return model, scaler_x, scaler_y
except Exception as e: except Exception as e:
......
...@@ -11,23 +11,27 @@ def get_raw_predicted_tags(): ...@@ -11,23 +11,27 @@ def get_raw_predicted_tags():
try: try:
mongo_conn = MongoConnect(uri=Mongo.mongo_uri, database=MongoConstants.db, mongo_conn = MongoConnect(uri=Mongo.mongo_uri, database=MongoConstants.db,
collection=MongoConstants.collection) collection=MongoConstants.collection)
logger.debug(f'mongo conn - {mongo_conn}') if mongo_conn is None:
raw_tags_dict = mongo_conn.find_one({"$and": [{"id": "dalmia_string_level_tags"}, {"city": "ariyalur"}, logger.info(f'mongodb is not connected, please check')
{"tags_property": "raw"}]}) else:
req_tags = raw_tags_dict['input_data'] logger.info(f'mongodb is connected')
logger.info(f'raw tags dict - {req_tags}') logger.debug(f'mongo conn - {mongo_conn}')
df_raw_tags = pd.DataFrame.from_dict(req_tags, orient='index') raw_tags_dict = mongo_conn.find_one({"$and": [{"id": "dalmia_string_level_tags"}, {"city": "ariyalur"},
{"tags_property": "raw"}]})
req_tags = raw_tags_dict['input_data']
logger.info(f'req raw tags length - {len(req_tags)}')
df_raw_tags = pd.DataFrame.from_dict(req_tags, orient='index')
predicted_tags_dict = mongo_conn.find_one({"$and": [{"id": "dalmia_string_level_tags"}, {"city": "ariyalur"}, predicted_tags_dict = mongo_conn.find_one({"$and": [{"id": "dalmia_string_level_tags"}, {"city": "ariyalur"},
{"tags_property": "predicted"}]}) {"tags_property": "predicted"}]})
predicted_tags = predicted_tags_dict['input_data'] predicted_tags = predicted_tags_dict['input_data']
logger.info(f'predicted tags dict - {predicted_tags}') logger.info(f'req predicted tags length - {len(predicted_tags)}')
df_predicted_tags = pd.DataFrame.from_dict(predicted_tags, orient='index') df_predicted_tags = pd.DataFrame.from_dict(predicted_tags, orient='index')
df_raw_tags.reset_index(inplace=True) df_raw_tags.reset_index(inplace=True)
df_raw_tags.rename(columns={'index': 'tag_name'}, inplace=True) df_raw_tags.rename(columns={'index': 'tag_name'}, inplace=True)
df_predicted_tags.reset_index(inplace=True) df_predicted_tags.reset_index(inplace=True)
df_predicted_tags.rename(columns={'index': 'tag_name'}, inplace=True) df_predicted_tags.rename(columns={'index': 'tag_name'}, inplace=True)
return df_raw_tags, df_predicted_tags return df_raw_tags, df_predicted_tags
except Exception as e: except Exception as e:
logger.exception(f'Exception - {e}') logger.exception(f'Exception - {e}')
...@@ -3,16 +3,23 @@ from loguru import logger ...@@ -3,16 +3,23 @@ from loguru import logger
from scripts.core.data_puller_push.data_puller import KairosQuery from scripts.core.data_puller_push.data_puller import KairosQuery
def get_tags_data(tags, start_timestamp, end_timestamp): def get_tags_data(df_input_tags, start_timestamp, end_timestamp):
try: try:
df_merged = pd.DataFrame() df_merged = pd.DataFrame()
for inv_id in list(tags['inv_id'].unique()): for inv_id in list(df_input_tags['inv_id'].unique()):
df = df_input_tags[df_input_tags['inv_id'] == inv_id]
df_tags_id = df[['tag_id', 'tag_name', 'inv_id', 'parameter_name', 'mppt_id']]
df = tags[tags['inv_id'] == inv_id]
df_tags_id = df[['tag_id', 'tag_name', 'inv_id', 'parameter_name', 'mppt_id',
'mppt_id_with_equipment']]
df_tags_id.reset_index(drop=True, inplace=True) df_tags_id.reset_index(drop=True, inplace=True)
tags_dict = df_tags_id[['tag_id', 'parameter_name']].set_index('tag_id').T.to_dict(orient="records")[0]
current_voltage_tags_only = [data for data in df_tags_id['parameter_name']
if any([x in data for x in ['current', 'voltage']])]
req_data_list = [data for data in current_voltage_tags_only if 'Potential' not in data]
req_data_list = [data for data in req_data_list if 'Degradation' not in data]
df_req_tags_id = df_tags_id.loc[df_tags_id['parameter_name'].isin(req_data_list)]
df_req_tags_id.reset_index(drop=True, inplace=True)
tags_dict = df_req_tags_id[['tag_id', 'parameter_name']].set_index('tag_id').T.to_dict(orient="records")[0]
tags_dict['site_107$dept_140$line_371$equipment_4115$tag_15828'] = 'tilt_irradiance' tags_dict['site_107$dept_140$line_371$equipment_4115$tag_15828'] = 'tilt_irradiance'
df_data = KairosQuery(start_timestamp=start_timestamp, df_data = KairosQuery(start_timestamp=start_timestamp,
end_timestamp=end_timestamp, end_timestamp=end_timestamp,
......
...@@ -21,7 +21,7 @@ client = mlflow.tracking.MlflowClient() ...@@ -21,7 +21,7 @@ client = mlflow.tracking.MlflowClient()
class ModelLoad(object): class ModelLoad(object):
def model_manager(self, df, target, inv_mppt_id): def model_manager(self, df, target, inv_mppt_id, city):
try: try:
experiment_id = self.create_experiment(experiment_name=MlFlow.experiment_name) experiment_id = self.create_experiment(experiment_name=MlFlow.experiment_name)
days, latest_run_id = self.fetch_latest_model(experiment_id=experiment_id, days, latest_run_id = self.fetch_latest_model(experiment_id=experiment_id,
...@@ -34,8 +34,11 @@ class ModelLoad(object): ...@@ -34,8 +34,11 @@ class ModelLoad(object):
else: else:
pre_trained = False pre_trained = False
run_id = self.creating_run(experiment_id=experiment_id, run_id = self.creating_run(experiment_id=experiment_id,
run_name=MlFlow.run_name + '_' + inv_mppt_id) run_name=city)
with mlflow.start_run(run_id=run_id): with mlflow.start_run(run_id=run_id):
run_id = self.creating_new_nested_run(experiment_id=experiment_id,run_id=run_id,
run_name=MlFlow.run_name + '_' + inv_mppt_id,
nested=True)
nested_run_id = self.creating_new_nested_run(experiment_id=experiment_id, nested_run_id = self.creating_new_nested_run(experiment_id=experiment_id,
run_id=run_id, run_id=run_id,
nested=True) nested=True)
...@@ -112,7 +115,7 @@ class ModelLoad(object): ...@@ -112,7 +115,7 @@ class ModelLoad(object):
logger.exception(str(e)) logger.exception(str(e))
@staticmethod @staticmethod
def creating_new_nested_run(experiment_id, run_id=None, nested=False): def creating_new_nested_run(experiment_id, run_id=None, run_name=None ,nested=False):
""" """
Function is to create a nested run Function is to create a nested run
:param experiment_id: Experiment Id :param experiment_id: Experiment Id
...@@ -122,7 +125,7 @@ class ModelLoad(object): ...@@ -122,7 +125,7 @@ class ModelLoad(object):
""" """
try: try:
with mlflow.start_run(experiment_id=experiment_id, run_id=run_id, nested=nested): with mlflow.start_run(experiment_id=experiment_id, run_id=run_id, nested=nested):
with mlflow.start_run(experiment_id=experiment_id, nested=True) as run: with mlflow.start_run(experiment_id=experiment_id, nested=True, run_name=run_name) as run:
return run.info.run_id return run.info.run_id
except Exception as e: except Exception as e:
logger.exception(str(e)) logger.exception(str(e))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment