Commit 646a534f authored by aakash.bedi's avatar aakash.bedi

tags configure in mongo

parent cc9339fd
Pipeline #59444 failed with stage
import pandas as pd
if __name__ == "__main__": if __name__ == "__main__":
from dotenv import load_dotenv from dotenv import load_dotenv
load_dotenv(dotenv_path='config.env') load_dotenv(dotenv_path='config.env')
...@@ -5,13 +7,13 @@ import warnings ...@@ -5,13 +7,13 @@ import warnings
import numpy as np import numpy as np
from loguru import logger from loguru import logger
from scripts.core.engine.mppt_data import GetData from scripts.core.engine.mppt_data import GetData
from scripts.utils.reading_tags import GetTags
from scripts.core.engine.tags_data import get_tags_data from scripts.core.engine.tags_data import get_tags_data
from scripts.utils.start_end_date import KairosStartEndDate from scripts.utils.start_end_date import KairosStartEndDate
from scripts.utils.preprocessing import DataPreprocessing from scripts.utils.preprocessing import DataPreprocessing
from scripts.core.engine.inv_and_mppt_level import TrainingInference from scripts.core.engine.inv_and_mppt_level import TrainingInference
from scripts.core.engine.final_tags import GetFinalDf from scripts.core.engine.final_tags import GetFinalDf
from scripts.core.data_puller_push.kafka_push import CalculatedDataPush from scripts.core.engine.model_training_inference import ai_modelling
from scripts.core.engine.raw_predicted_tags import get_raw_predicted_tags
warnings.filterwarnings("ignore") warnings.filterwarnings("ignore")
...@@ -21,12 +23,9 @@ start_date, end_date, start_timestamp, end_timestamp = KairosStartEndDate().star ...@@ -21,12 +23,9 @@ start_date, end_date, start_timestamp, end_timestamp = KairosStartEndDate().star
def get_tag_details(): def get_tag_details():
try: try:
get_tags = GetTags(base_path=base_path) df_raw_tags, df_predicted_tags = get_raw_predicted_tags()
get_final_df = GetFinalDf()
tags_excel = get_tags.read_tag_excel() df = get_tags_data(mppt_tags=df_raw_tags,
df_final_tags = get_final_df.get_predicted_current_tags(tags_excel=tags_excel)
mppt_tags = get_tags.get_mppt_tags(df=tags_excel, substrings='MPPT')
df = get_tags_data(mppt_tags=mppt_tags,
start_timestamp=start_timestamp, start_timestamp=start_timestamp,
end_timestamp=end_timestamp) end_timestamp=end_timestamp)
...@@ -36,34 +35,13 @@ def get_tag_details(): ...@@ -36,34 +35,13 @@ def get_tag_details():
df_mppt = mppt_data.current_voltage_mppt_data(df=df) df_mppt = mppt_data.current_voltage_mppt_data(df=df)
data_preprocessing = DataPreprocessing() data_preprocessing = DataPreprocessing()
df_mppt = data_preprocessing.remove_outliers(df=df_mppt, param_list=['tilt_irradiance', 'voltage_mppt', df_mppt = data_preprocessing.remove_outliers(df=df_mppt, param_list=['tilt_irradiance', 'voltage_mppt',
'current_mppt']) 'current_mppt'])
df_mppt, df_train, df_test = data_preprocessing.train_test_split(df=df_mppt) df_mppt, df_train, df_test = data_preprocessing.train_test_split(df=df_mppt)
unique_inv_id = list(df_train.inv_id.unique())
unique_mppt_id = list(df_train.mppt_id.unique())
get_training_inference = TrainingInference(df=df_mppt, df_train=df_train, df_test=df_test) get_training_inference = TrainingInference(df=df_mppt, df_train=df_train, df_test=df_test)
for inv_id in unique_inv_id: ai_modelling(df_train=df_train, get_training_inference=get_training_inference,
for mppt_id in unique_mppt_id: df_predicted_tags=df_predicted_tags)
try:
model, scaler_x, scaler_y = get_training_inference.data_training(inv_id=inv_id, mppt_id=mppt_id)
x_test, y_test, predictions = get_training_inference.data_inference(scaler_x=scaler_x,
scaler_y=scaler_y,
model=model,
inv_id=inv_id,
mppt_id=mppt_id)
df_result = get_final_df.get_final_data(x_test=x_test,
y_test=y_test,
predictions=predictions)
final_dict = get_final_df.get_final_predicted_tags(df_predicted_current_tags=df_final_tags,
inv_id=inv_id, mppt_id=mppt_id)
df_result["timestamp"] = df_result["datetime"].values.astype(np.int64) / 10 ** 9
df_result["timestamp"] = df_result["timestamp"].astype('int')
CalculatedDataPush(df_result=df_result, final_tags_dict=final_dict).kafka_data_push()
# function to push the data to kairos
logger.info(f'{final_dict}')
except Exception as e:
logger.exception(f'Exception - {e}')
except Exception as e: except Exception as e:
logger.exception(f'Exception - {e}') logger.exception(f'Exception - {e}')
......
This source diff could not be displayed because it is too large. You can view the blob instead.
This diff is collapsed.
This diff is collapsed.
...@@ -30,6 +30,8 @@ email_sender=$SENDER_EMAIL ...@@ -30,6 +30,8 @@ email_sender=$SENDER_EMAIL
[MONGO] [MONGO]
mongo_uri=$MONGO_URI mongo_uri=$MONGO_URI
project_id=$PROJECT_ID
query_filter=$QUERY_FILTER
[TIMEZONE] [TIMEZONE]
required_tz=$REQUIRED_TZ required_tz=$REQUIRED_TZ
......
...@@ -11,6 +11,10 @@ START_RELATIVE=90 ...@@ -11,6 +11,10 @@ START_RELATIVE=90
END_RELATIVE=0 END_RELATIVE=0
REQUIRED_TZ="Asia/Kolkata" REQUIRED_TZ="Asia/Kolkata"
MONGO_URI=mongodb://admin:iLensDevMongo783@192.168.0.220:2717/
PROJECT_ID=project_101
QUERY_FILTER=dalmia_string_level_tags
MLFLOW_TRACKING_URI=https://qa.unifytwin.com/mlflow/ MLFLOW_TRACKING_URI=https://qa.unifytwin.com/mlflow/
MLFLOW_TRACKING_USERNAME=mlflow MLFLOW_TRACKING_USERNAME=mlflow
MLFLOW_TRACKING_PASSWORD=MlFlOwQA#4321 MLFLOW_TRACKING_PASSWORD=MlFlOwQA#4321
......
...@@ -35,10 +35,6 @@ except Exception as e: ...@@ -35,10 +35,6 @@ except Exception as e:
sys.exit() sys.exit()
class Mongo:
mongo_uri = config["MONGO"]["mongo_uri"]
class KairosDb: class KairosDb:
uri = config["KAIROS_DB"]["uri"] uri = config["KAIROS_DB"]["uri"]
metric_name = config['KAIROS_DB']['metric_name'] metric_name = config['KAIROS_DB']['metric_name']
...@@ -84,6 +80,12 @@ class PycaretParams: ...@@ -84,6 +80,12 @@ class PycaretParams:
hyperparameter_tuning_method= config['PYCARET']['hyperparameter_tuning_method'] hyperparameter_tuning_method= config['PYCARET']['hyperparameter_tuning_method']
class Mongo:
mongo_uri = config["MONGO"]["mongo_uri"]
project_id = config["MONGO"]["project_id"]
query_filter = config["MONGO"]["query_filter"]
......
json_file_path = "scripts/utils/" json_file_path = "scripts/utils/"
class DBConstants:
class MongoConstants:
# DB # DB
db_metadata = "ilens_configuration" db = "ilens_ai"
# collections # collections
collection_rule_targets = "rule_targets" collection = "dalmiaStringTags"
yield_sheet_name = "yield_reports_3cp"
\ No newline at end of file
import numpy as np
from loguru import logger
from scripts.core.engine.final_tags import GetFinalDf
from scripts.core.data_puller_push.kafka_push import CalculatedDataPush
def ai_modelling(df_train, get_training_inference, df_predicted_tags):
try:
get_final_df = GetFinalDf()
for inv_id in list(df_train.inv_id.unique()):
for mppt_id in list(df_train.mppt_id.unique()):
try:
model, scaler_x, scaler_y = get_training_inference.data_training(inv_id=inv_id, mppt_id=mppt_id)
x_test, y_test, predictions = get_training_inference.data_inference(scaler_x=scaler_x,
scaler_y=scaler_y,
model=model,
inv_id=inv_id,
mppt_id=mppt_id)
df_result = get_final_df.get_final_data(x_test=x_test,
y_test=y_test,
predictions=predictions)
final_dict = get_final_df.get_final_predicted_tags(df_predicted_current_tags=df_predicted_tags,
inv_id=inv_id, mppt_id=mppt_id)
df_result["timestamp"] = df_result["datetime"].values.astype(np.int64) / 10 ** 9
df_result["timestamp"] = df_result["timestamp"].astype('int')
CalculatedDataPush(df_result=df_result, final_tags_dict=final_dict).kafka_data_push()
logger.info(f'{final_dict}')
except Exception as e:
logger.exception(f'Exception - {e}')
except Exception as e:
logger.exception(f'Exception - {e}')
import pandas as pd
from loguru import logger
import warnings
from scripts.utils.mongo_utils import MongoConnect
from scripts.constants.app_configuration import Mongo
from scripts.constants.app_constants import MongoConstants
warnings.filterwarnings('ignore')
def get_raw_predicted_tags():
try:
mongo_conn = MongoConnect(uri=Mongo.mongo_uri, database=MongoConstants.db,
collection=MongoConstants.collection)
raw_tags_dict = mongo_conn.find_one({"$and": [{"id": "dalmia_string_level_tags"}, {"city": "ariyalur"},
{"tags_property": "raw"}]})
req_tags = raw_tags_dict['input_data']
df_raw_tags = pd.DataFrame.from_dict(req_tags, orient='index')
predicted_tags_dict = mongo_conn.find_one({"$and": [{"id": "dalmia_string_level_tags"}, {"city": "ariyalur"},
{"tags_property": "predicted"}]})
predicted_tags = predicted_tags_dict['input_data']
df_predicted_tags = pd.DataFrame.from_dict(predicted_tags, orient='index')
df_raw_tags.reset_index(inplace=True)
df_raw_tags.rename(columns={'index': 'tag_name'}, inplace=True)
df_predicted_tags.reset_index(inplace=True)
df_predicted_tags.rename(columns={'index': 'tag_name'}, inplace=True)
return df_raw_tags, df_predicted_tags
except Exception as e:
logger.exception(f'Exception - {e}')
...@@ -28,7 +28,6 @@ def get_tags_data(mppt_tags, start_timestamp, end_timestamp): ...@@ -28,7 +28,6 @@ def get_tags_data(mppt_tags, start_timestamp, end_timestamp):
df_merged['hour'] = df_merged['datetime'].dt.hour df_merged['hour'] = df_merged['datetime'].dt.hour
logger.info(f'Final shape of merged dataframe = {df_merged.shape}') logger.info(f'Final shape of merged dataframe = {df_merged.shape}')
df_merged.reset_index(drop=True, inplace=True) df_merged.reset_index(drop=True, inplace=True)
# df_merged.to_csv(f'{base_path}/df_merged.csv', index=False)
return df_merged return df_merged
except Exception as e: except Exception as e:
logger.exception(f'Exception - {e}') logger.exception(f'Exception - {e}')
......
from typing import Dict
from loguru import logger
from pymongo import MongoClient
from scripts.constants.app_configuration import Mongo
class MongoConnect:
def __init__(self, uri, database, collection):
try:
self.uri = uri
self.client = MongoClient(self.uri, connect=False)
self.database = database
self.collection = collection
except Exception as e:
logger.exception(f'Exception - {e}')
@staticmethod
def data_dict(data, city):
try:
req_dict = dict()
req_dict['project_id'] = Mongo.project_id
req_dict['id'] = Mongo.query_filter
req_dict['city'] = city
req_dict['input_data'] = data
return req_dict
except Exception as e:
logger.exception(f'Exception - {e}')
def insert_one(self, data, city):
try:
db = self.client[self.database]
collection = db[self.collection]
req_dict = self.data_dict(data=data, city=city)
response = collection.insert_one(req_dict)
return response.inserted_id
except Exception as e:
logger.exception(f'Exception - {e}')
def find_one(self, query, filter_dict=None):
try:
if filter_dict is None:
filter_dict = {"_id": 0}
db = self.client[self.database]
collection = db[self.collection]
response = collection.find_one(query, filter_dict)
return response
except Exception as e:
logger.exception(f'Exception - {e}')
...@@ -10,15 +10,15 @@ class DataPreprocessing: ...@@ -10,15 +10,15 @@ class DataPreprocessing:
@staticmethod @staticmethod
def remove_outliers(df, param_list): def remove_outliers(df, param_list):
try: try:
for col in param_list: # for col in param_list:
lb = df[col].mean() - 3 * df[col].std() # lb = df[col].mean() - 3 * df[col].std()
ub = df[col].mean() + 3 * df[col].std() # ub = df[col].mean() + 3 * df[col].std()
logger.debug(f"Min values of {col} = {df[col].min()} \nLower Bracket of {col} = {lb}") # logger.debug(f"Min values of {col} = {df[col].min()} \nLower Bracket of {col} = {lb}")
logger.debug(f"Max values of {col} = {df[col].max()} \nUpper Bracket of {col} = {ub}") # logger.debug(f"Max values of {col} = {df[col].max()} \nUpper Bracket of {col} = {ub}")
logger.debug(f'Shape of df before outlier removal = {df.shape}') # logger.debug(f'Shape of df before outlier removal = {df.shape}')
df = (df[(df[col] > lb) & (df[col] < ub)]) # df = (df[(df[col] > lb) & (df[col] < ub)])
logger.debug(f'Shape of df after outlier removal = {df.shape}') # logger.debug(f'Shape of df after outlier removal = {df.shape}')
logger.debug(f'Shape final df before outlier removal = {df.shape}') # logger.debug(f'Shape final df before outlier removal = {df.shape}')
return df return df
except Exception as e: except Exception as e:
logger.exception(f'Exception - {e}') logger.exception(f'Exception - {e}')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment