Commit cec9a86f authored by aakash.bedi's avatar aakash.bedi

updated inference module

parent f063dfef
Pipeline #59960 failed with stage
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
data
.idea/
.vscode/
.pytest_cache/
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
This diff is collapsed.
FROM python:3.9-buster
RUN apt-get update -y && \
apt install -y openjdk-11-jre && \
apt install -y libtiff5-dev libjpeg62-turbo-dev && \
apt install -y zlib1g-dev libfreetype6-dev liblcms2-dev libwebp-dev tcl8.6-dev && \
apt install -y tk8.6-dev python-tk pdftk libmagickwand-dev
COPY . /code
WORKDIR /code
RUN pip install -r requirements.txt
CMD [ "python","app.py" ]
\ No newline at end of file
if __name__ == "__main__":
from dotenv import load_dotenv
load_dotenv(dotenv_path='config.env')
import warnings
from loguru import logger
import tracemalloc
import gc
import numpy as np
from scripts.core.engine.mppt_data import GetData
from scripts.core.engine.tags_data import get_tags_data
from scripts.utils.start_end_date import KairosStartEndDate
from scripts.core.engine.final_tags import GetFinalDf
from scripts.core.engine.data_training_and_inference import Inference
from scripts.core.engine.final_predictions import ai_modelling
from scripts.core.engine.raw_predicted_tags import get_raw_predicted_tags
from scripts.core.data_puller_push.kafka_push import CalculatedDataPush
warnings.filterwarnings("ignore")
start_date, end_date, start_timestamp, end_timestamp = KairosStartEndDate().start_end_date()
def orchestrator():
try:
tracemalloc.start()
print(f'memory allocation - {tracemalloc.get_traced_memory()}')
gc.collect()
print(f'memory allocation - {tracemalloc.get_traced_memory()}')
tracemalloc.clear_traces()
print(f'memory allocation - {tracemalloc.get_traced_memory()}')
df_raw_tags, df_predicted_tags, df_coefficients = get_raw_predicted_tags()
logger.info(f'raw tags dataframe shape - {df_raw_tags.shape}')
logger.info(f'predicted tags dataframe shape - {df_predicted_tags.shape}')
for inv_id in list(df_raw_tags['inv_id'].unique()):
df = df_raw_tags[df_raw_tags['inv_id'] == inv_id]
for mppt_id in list(df_raw_tags['mppt_id'].unique()):
print(f'1st memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
gc.collect()
print(f'2nd memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
tracemalloc.clear_traces()
print(f'3rd memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
df_mppt_level = df[df['mppt_id'] == mppt_id]
df_kairos_data = get_tags_data(df_input_tags=df_mppt_level, start_timestamp=start_timestamp,
end_timestamp=end_timestamp, inv_id=inv_id, mppt_id=mppt_id)
logger.info(f'Shape of final df - {df_kairos_data.shape}')
print(f'4th memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
gc.collect()
print(f'5th memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
tracemalloc.clear_traces()
print(f'6th memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
mppt_data = GetData()
df_mppt = mppt_data.associate_inv_mppt_id(df=df_kairos_data)
print(f'7th memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
gc.collect()
print(f'8th memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
tracemalloc.clear_traces()
print(f'9th memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
df_coefficient_multiply = mppt_data.multiply_mppt_coefficients(df_mppt=df_mppt,
df_coefficients=df_coefficients)
print(f'10th memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
gc.collect()
print(f'11th memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
tracemalloc.clear_traces()
print(f'12th memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
x_test, y_test, predictions = Inference(df=df_coefficient_multiply).data_inference(inv_id=inv_id, mppt_id=mppt_id)
get_final_df = GetFinalDf()
df_result = get_final_df.get_final_data(x_test=x_test,
y_test=y_test,
predictions=predictions)
final_dict = get_final_df.get_final_predicted_tags(df_predicted_current_tags=df_predicted_tags,
inv_id=inv_id, mppt_id=mppt_id)
df_result["timestamp"] = df_result["datetime"].values.astype(np.int64) / 10 ** 9
df_result["timestamp"] = df_result["timestamp"].astype('int')
CalculatedDataPush(df_result=df_result, final_tags_dict=final_dict).kafka_data_push()
logger.info(f'{final_dict}')
del df_kairos_data
del df_mppt
del df_coefficient_multiply
print(f'16th memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
gc.collect()
print(f'17th memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
tracemalloc.clear_traces()
print(f'18th memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
print(f'final allocation for {inv_id} - {tracemalloc.get_traced_memory()}')
tracemalloc.stop()
logger.info(f'data inference has been completed !')
except Exception as e:
logger.exception(f'Exception - {e}')
if __name__ == '__main__':
orchestrator()
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
[LOGGING]
level = $LOG_LEVEL
traceback = $LOG_TRACEBACK
[MODULE]
name = $APP_NAME
[KAIROS_DB]
uri=$KAIROS_URI
metric_name=$KAIROS_METRIC
aggregator=$AGGREGATOR
aggregator_value=$AGGREGATOR_VALUE
aggregator_unit=$AGGREGATOR_UNIT
[KAFKA]
kafka_host=$KAFKA_HOST
kafka_port=$KAFKA_PORT
kafka_topic=$KAFKA_TOPIC
[DATE_RANGE]
start_date=$START_DATE
end_date=$END_DATE
start_relative_days=$START_RELATIVE
end_relative_days=$END_RELATIVE
[EMAIL_DETAILS]
email_reciever=$RECIEVER_EMAIL
email_sender=$SENDER_EMAIL
[MONGO]
mongo_uri=$MONGO_URI
project_id=$PROJECT_ID
query_filter=$QUERY_FILTER
[TIMEZONE]
required_tz=$REQUIRED_TZ
[MLFLOW]
mlflow_tracking_uri=$MLFLOW_TRACKING_URI
mlflow_tracking_username=$MLFLOW_TRACKING_USERNAME
mlflow_tracking_password=$MLFLOW_TRACKING_PASSWORD
azure_storage_connection_string=$AZURE_STORAGE_CONNECTION_STRING
azure_storage_access_key=$AZURE_STORAGE_ACCESS_KEY
user=$USER
experiment_name=$EXPERIMENT_NAME
run_name=$RUN_NAME
model_name=$MODEL_NAME
check_param=$CHECK_PARAM
model_check_param=$MODEL_CHECK_PARAM
[PYCARET]
models_list=$MODELS_LIST
selected_metric=$SELECTED_METRIC
hyperparameter_tuning_method=$HYPERPARAMETER_TUNING_METHOD
inv_list:
- inv_1
- inv_2
- inv_3
- inv_4
- inv_5
- inv_6
- inv_7
- inv_8
- inv_9
- inv_10
- inv_11
- inv_12
- inv_13
- inv_14
- inv_15
- inv_16
- inv_17
- inv_18
- inv_19
- inv_20
- inv_21
- inv_22
- inv_23
- inv_24
- inv_25
- inv_26
- inv_27
- inv_28
- inv_29
- inv_30
- inv_31
- inv_32
- inv_33
- inv_34
- inv_35
- inv_36
- inv_37
- inv_38
- inv_39
- inv_40
- inv_41
- inv_42
This diff is collapsed.
APP_NAME=dalmia-solar-degradation
KAIROS_URI=https://iLens:iLensDAL$456@dalmia.ilens.io/kairos/
KAIROS_METRIC=ilens.live_data.raw
AGGREGATOR=max
AGGREGATOR_VALUE=15
AGGREGATOR_UNIT=minutes
KAFKA_HOST=192.168.0.220
KAFKA_PORT=9092
KAFKA_TOPIC=ilens_dev
START_RELATIVE=1
END_RELATIVE=1
REQUIRED_TZ="Asia/Kolkata"
MONGO_URI=mongodb://admin:iLensDevMongo783@192.168.0.220:2717/
PROJECT_ID=project_101
QUERY_FILTER=dalmia_string_level_tags
MLFLOW_TRACKING_URI=https://qa.unifytwin.com/mlflow/
MLFLOW_TRACKING_USERNAME=mlflow
MLFLOW_TRACKING_PASSWORD=MlFlOwQA#4321
AZURE_STORAGE_CONNECTION_STRING=DefaultEndpointsProtocol=https;AccountName=azrmlilensqa006382180551;AccountKey=tDGOKfiZ2svfoMvVmS0Fbpf0FTHfTq4wKYuDX7cAxlhve/3991QuzdvJHm9vWc+lo6mtC+x9yPSghWNR4+gacg==;EndpointSuffix=core.windows.net
AZURE_STORAGE_ACCESS_KEY=tDGOKfiZ2svfoMvVmS0Fbpf0FTHfTq4wKYuDX7cAxlhve/3991QuzdvJHm9vWc+lo6mtC+x9yPSghWNR4+gacg==
USER=Dalmia_degradation
EXPERIMENT_NAME=Dalmia Solar Degradation2
RUN_NAME=Degradation
MODEL_NAME=versioning
CHECK_PARAM=hours
MODEL_CHECK_PARAM=480
MODELS_LIST=lr,knn,gbr,rf,catboost,lightgbm,ada,et,xgboost,dt,en,par,huber
SELECTED_METRIC=R2
HYPERPARAMETER_TUNING_METHOD=scikit-learn
\ No newline at end of file
This diff is collapsed.
if __name__ == "__main__":
from dotenv import load_dotenv
load_dotenv(dotenv_path='config.env')
import os
import os.path
import sys
from configparser import ConfigParser, BasicInterpolation
import yaml
from loguru import logger
# Configuring file constants
data_conf = "./conf/data.yml"
class EnvInterpolation(BasicInterpolation):
"""
Interpolation which expands environment variables in values.
"""
def before_get(self, parser, section, option, value, defaults):
value = super().before_get(parser, section, option, value, defaults)
if not os.path.expandvars(value).startswith("$"):
return os.path.expandvars(value)
else:
return
try:
config = ConfigParser(interpolation=EnvInterpolation())
config.read(f"conf/application.conf")
except Exception as e:
print(f"Error while loading the config: {e}")
print("Failed to Load Configuration. Exiting!!!")
sys.exit()
class KairosDb:
uri = config["KAIROS_DB"]["uri"]
metric_name = config['KAIROS_DB']['metric_name']
aggregator = config['KAIROS_DB']['aggregator']
aggregator_value = config['KAIROS_DB']['aggregator_value']
aggregator_unit = config['KAIROS_DB']['aggregator_unit']
class Kafka:
kafka_host = config["KAFKA"]["kafka_host"]
kafka_port = config["KAFKA"]["kafka_port"]
kafka_topic = config["KAFKA"]["kafka_topic"]
class DateRange:
start_date = config.get("DATE_RANGE", "start_date")
end_date = config.get("DATE_RANGE", "end_date")
start_relative_days = config.get("DATE_RANGE", "start_relative_days")
end_relative_days = config.get("DATE_RANGE", "end_relative_days")
class ReqTimeZone:
required_tz = config.get('TIMEZONE', 'required_tz')
class MlFlow:
mlflow_tracking_uri = config['MLFLOW']['mlflow_tracking_uri']
mlflow_tracking_username = config['MLFLOW']['mlflow_tracking_username']
mlflow_tracking_password = config['MLFLOW']['mlflow_tracking_password']
azure_storage_connection_string = config['MLFLOW']['azure_storage_connection_string']
azure_storage_access_key = config['MLFLOW']['azure_storage_access_key']
user = config['MLFLOW']['user']
experiment_name = config['MLFLOW']['experiment_name']
run_name = config['MLFLOW']['run_name']
model_name = config['MLFLOW']['model_name']
check_param = config['MLFLOW']['check_param']
model_check_param = config['MLFLOW']['model_check_param']
class PycaretParams:
model_list = config['PYCARET']['models_list']
selected_metric = config['PYCARET']['selected_metric']
hyperparameter_tuning_method= config['PYCARET']['hyperparameter_tuning_method']
class Mongo:
mongo_uri = config["MONGO"]["mongo_uri"]
project_id = config["MONGO"]["project_id"]
query_filter = config["MONGO"]["query_filter"]
json_file_path = "scripts/utils/"
class MongoConstants:
# DB
db = "ilens_ai"
# collections
collection = "dalmiaStringTags"
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
import json
import pandas as pd
import requests
from loguru import logger
from scripts.constants.app_configuration import KairosDb
class KairosQuery:
def __init__(self, start_timestamp, end_timestamp, tag_dict):
self.start_timestamp = start_timestamp
self.end_timestamp = end_timestamp
self.kairos_host = KairosDb.uri
self.kairos_url = "{kairos_host}/api/v1/datapoints/query".format(kairos_host=self.kairos_host)
self.tag_dict = tag_dict
def kairos_query(self):
try:
return {
"metrics": [
{
"tags": {
"c3": list(self.tag_dict.keys())
},
"name": KairosDb.metric_name,
"group_by": [
{
"name": "tag",
"tags": ["c3"]
}
],
"aggregators": [
{
"name": KairosDb.aggregator,
"sampling": {
"value": KairosDb.aggregator_value,
"unit": KairosDb.aggregator_unit
},
"align_sampling": True,
"align_start_time": True
}
]
}
],
"plugins": [],
"cache_time": 0,
"time_zone": "Asia/Calcutta",
"start_absolute": self.start_timestamp,
"end_absolute": self.end_timestamp,
}
except Exception as e:
logger.exception(f"Exception - {e}")
def get_data(self, data_query):
try:
logger.info("Data for the parameters being pulled from Kairos Database")
response = requests.post(self.kairos_url, data=json.dumps(data_query))
response_data = response.json()
output_data = response_data["queries"]
logger.debug("Data pull complete")
df_final = pd.DataFrame()
for i in range(len(output_data)):
grouped_output_data = output_data[i]["results"]
for each_grouped_data in grouped_output_data:
value = (each_grouped_data["values"])
tag_id = each_grouped_data["group_by"][0]["group"]["c3"]
try:
logger.debug("Renamed {} to {} in Data".format(tag_id, self.tag_dict[tag_id]))
column_name = self.tag_dict[tag_id]
except KeyError as e:
logger.exception(f'Exception - {e}')
logger.debug("Column Renaming Logic not found for {}".format(tag_id))
column_name = tag_id
df_column_data = pd.DataFrame(data=value, columns=["timestamp", column_name])
if df_final.empty:
df_final = df_column_data
else:
df_final = df_final.merge(df_column_data, how="outer", left_on="timestamp",
right_on="timestamp")
df_final['datetime'] = pd.to_datetime(df_final['timestamp'], unit="ms").dt.tz_localize('UTC').\
dt.tz_convert('Asia/Kolkata')
logger.debug("Final number of columns : {}".format(str(len(list(df_final.columns)))))
return df_final
except Exception as e:
logger.exception(f"Exception occurred - {e}", exc_info=True)
def kairos_data_import(self):
try:
logger.debug("Fetching live data")
query_live = self.kairos_query()
logger.info(f"query_live = {query_live}")
df = self.get_data(data_query=query_live)
return df
except Exception as e:
logger.exception(f"Exception - {e}")
from loguru import logger
from scripts.core.data_puller_push.push_data import insert_values_dalmia
class CalculatedDataPush:
def __init__(self, df_result, final_tags_dict):
self.df_result = df_result
self.final_tags_dict = final_tags_dict
def kafka_data_push(self):
try:
logger.info(f"final_tags_dict = {self.final_tags_dict}")
logger.info(f"df result shape = {self.df_result.shape}")
for i, j in self.df_result.iterrows():
my_dict = {v: j[k] for k, v in self.final_tags_dict.items()}
logger.info(f"{j['timestamp'], j['datetime'], my_dict}")
insert_values_dalmia(j['timestamp'], my_dict)
except Exception as e:
logger.exception(f'Exception - {e}')
from json import dumps
from kafka import KafkaProducer
from loguru import logger
from scripts.constants.app_configuration import Kafka
def insert_values_dalmia(ts, my_dict):
kairos_writer = KairosWriter()
kairos_writer.write_data(
{
ts: my_dict
},
Kafka.kafka_topic
)
logger.info("Data pushed successfully!")
class KafkaProducerUtil:
def __init__(self):
try:
self.host = Kafka.kafka_host
self.port = Kafka.kafka_port
logger.debug(f"Connecting to Kafka with details: {self.host}, {self.port}")
kafka_broker = [self.host + ":" + str(self.port)]
self.producer = KafkaProducer(
bootstrap_servers=kafka_broker,
value_serializer=lambda v: v.encode('utf-8'),
api_version=(0, 10, 1))
self.producer.flush()
except Exception as e:
logger.error(f"Kafka connection error: {e}")
def publish(self, topic, data):
try:
kafka_response = self.producer.send(topic, data)
self.producer.flush()
logger.debug(f" Message sent to kafka with response: {kafka_response}")
return True
except Exception as e:
logger.error(e)
return False
class KairosWriter(KafkaProducerUtil):
def write_data(self, data_json, topic):
site_id = "site_107"
logger.debug(f"Data being pushed to kafka topic: {topic}")
msg_counter = 0
for k, v in data_json.items():
timestamp, data = self.data_validator(k, v)
timestamp = timestamp * 1000
write_json = {
"data": data,
"site_id": site_id,
"gw_id": "gw_{}".format(site_id.lstrip("site_")), # The lstrip(s) removes leading whitespace (on the left)
"pd_id": "pd_{}".format(site_id.lstrip("site_")), # The rstrip(s) removes the trailing whitespace (on the right)
"timestamp": timestamp,
"msg_id": msg_counter,
"partition": "",
"retain_flag": False
}
logger.debug(f"Timestamp: {timestamp}, Values: {data}")
self.publish(topic, dumps(write_json))
msg_counter += 1
return msg_counter
def audit_data(self, data_json, topic):
logger.debug(f"Audit Data being pushed to kafka topic: {topic}")
msg_counter = len(data_json)
for each in data_json:
self.publish(topic, dumps(each))
return msg_counter
@staticmethod
def data_validator(timestamp, data):
logger.debug("Validating the data to remove Nan values")
__temp__ = {}
for k, v in data.items():
if not k.startswith("site"):
continue
# This function will return True if the "v" is one of the types in the tuple
if isinstance(v, (int, float)) and str(v) not in ('nan', 'inf'):
__temp__[k] = v
return int(timestamp), __temp__
from loguru import logger
from scripts.utils.mlflow_util import ModelLoad
class Inference:
def __init__(self, df):
self.df = df
def data_inference(self, inv_id, mppt_id):
try:
df_test_mppt = self.df[['datetime', 'tilt_irradiance', 'voltage_mppt', 'hour', 'current_mppt']]
df_test_mppt.reset_index(drop=True, inplace=True)
x_test = df_test_mppt[['datetime', 'tilt_irradiance', 'voltage_mppt', 'hour']]
y_test = df_test_mppt[['current_mppt']]
logger.debug(f'shape of x_test for {inv_id} & {mppt_id} - {x_test.shape}')
logger.debug(f'shape of y_test for {inv_id} & {mppt_id} - {y_test.shape}')
inv_mppt_id = f'{inv_id}_{mppt_id}'
model = ModelLoad().model_manager(inv_mppt_id=inv_mppt_id)
predictions = model.predict(x_test.drop(['datetime'], axis=1)).reshape(1, -1)
return x_test, y_test, predictions
except Exception as e:
logger.exception(f'Exception - {e}')
import numpy as np
from loguru import logger
from scripts.core.engine.final_tags import GetFinalDf
from scripts.core.data_puller_push.kafka_push import CalculatedDataPush
import tracemalloc
def ai_modelling(df_train, get_training_inference, df_predicted_tags):
try:
get_final_df = GetFinalDf()
for inv_id in list(df_train.inv_id.unique()):
for mppt_id in list(df_train.mppt_id.unique()):
try:
model, scaler_x, scaler_y = get_training_inference.data_training(inv_id=inv_id, mppt_id=mppt_id)
print(f'4th memory usage - {tracemalloc.get_traced_memory()}')
x_test, y_test, predictions = get_training_inference.data_inference(scaler_x=scaler_x,
scaler_y=scaler_y,
model=model,
inv_id=inv_id,
mppt_id=mppt_id)
df_result = get_final_df.get_final_data(x_test=x_test,
y_test=y_test,
predictions=predictions)
final_dict = get_final_df.get_final_predicted_tags(df_predicted_current_tags=df_predicted_tags,
inv_id=inv_id, mppt_id=mppt_id)
df_result["timestamp"] = df_result["datetime"].values.astype(np.int64) / 10 ** 9
df_result["timestamp"] = df_result["timestamp"].astype('int')
df_result['hour'] = df_result['datetime'].dt.hour
df_result.loc[df_result['hour'].between(left=18, right=23, inclusive=True),
'predicted_current_mppt'] = 0
df_result.loc[df_result['hour'].between(left=0, right=5, inclusive=True),
'predicted_current_mppt'] = 0
df_result.drop(['hour'], axis=1, inplace=True)
CalculatedDataPush(df_result=df_result, final_tags_dict=final_dict).kafka_data_push()
logger.info(f'{final_dict}')
print(f'5th memory allocation - {tracemalloc.get_traced_memory()}')
except Exception as e:
logger.exception(f'Exception - {e}')
except Exception as e:
logger.exception(f'Exception - {e}')
import pandas as pd
from loguru import logger
class GetFinalDf:
@staticmethod
def get_final_data(x_test, y_test, predictions):
try:
df_result = pd.DataFrame(index=[i for i in range(len(y_test))])
df_result['datetime'] = x_test['datetime']
df_result['actual_current_mppt'] = y_test
df_result['predicted_current_mppt'] = predictions.reshape(-1, 1)
df_result.drop(['actual_current_mppt'], axis=1, inplace=True)
df_result['hour'] = df_result['datetime'].dt.hour
df_result.loc[df_result['hour'].between(18, 23, inclusive='both'),
'predicted_current_mppt'] = 0
df_result.loc[df_result['hour'].between(0, 5, inclusive='both'),
'predicted_current_mppt'] = 0
df_result.drop(['hour'], inplace=True, axis=1)
df_result = df_result.round(2)
df_result.reset_index(drop=True, inplace=True)
logger.info(f'{df_result.shape}')
return df_result
except Exception as e:
logger.exception(f'Exception - {e}')
@staticmethod
def get_final_predicted_tags(df_predicted_current_tags, inv_id, mppt_id):
try:
df = df_predicted_current_tags[df_predicted_current_tags['inv_id'] == inv_id]
df = df[df['mppt_id'] == mppt_id]
df.reset_index(drop=True, inplace=True)
final_dict = {}
for index in range(df.shape[0]):
tag_id = df.iloc[index, df.columns.get_loc('tag_id')]
parameter_name = df.iloc[index, df.columns.get_loc('parameter_name')]
final_dict['predicted_current_mppt'] = tag_id
logger.debug(f'tag_id - {tag_id} & parameter name - {parameter_name}')
return final_dict
except Exception as e:
logger.exception(f'Exception - {e}')
\ No newline at end of file
import pandas as pd
from loguru import logger
class GetData:
@staticmethod
def associate_inv_mppt_id(df):
try:
current_column_list = [col_name for col_name in df if 'current' in col_name]
voltage_column_list = [col_name for col_name in df if 'voltage' in col_name]
current_column_list.sort()
voltage_column_list.sort()
df_mppt = pd.DataFrame()
for n_mppt in range(len(current_column_list)):
df_temp = df[['datetime', 'inv_id', 'hour', 'tilt_irradiance', voltage_column_list[n_mppt],
current_column_list[n_mppt]]]
df_temp['mppt_id'] = current_column_list[n_mppt]
df_temp['mppt_id'] = df_temp['mppt_id'].str.replace('current_', '')
df_temp.rename(columns={voltage_column_list[n_mppt]: 'voltage_mppt',
current_column_list[n_mppt]: 'current_mppt'}, inplace=True)
if df_mppt.empty:
df_mppt = df_temp
else:
df_mppt = pd.concat([df_mppt, df_temp], axis=0)
df_mppt.reset_index(drop=True, inplace=True)
df_mppt.sort_values(['inv_id', 'mppt_id'], inplace=True)
df_mppt['inv_id_mppt_id'] = df_mppt['inv_id'] + '_' + df_mppt['mppt_id']
df_mppt.dropna(axis=0, inplace=True)
df_mppt.reset_index(drop=True, inplace=True)
return df_mppt
except Exception as e:
logger.exception(f'Exception - {e}')
@staticmethod
def multiply_mppt_coefficients(df_mppt, df_coefficients):
try:
df = pd.merge(df_mppt, df_coefficients, on='inv_id_mppt_id', how='left')
logger.debug(f'null rows after mppt coefficient multiplication - {df.isnull().sum()}')
df["coefficient"].fillna(1, inplace=True)
df.reset_index(drop=True, inplace=True)
df['current_mppt'] = df['current_mppt'] * df["coefficient"]
df.drop(['coefficient', 'inv_id_mppt_id'], axis=1, inplace=True)
return df
except Exception as e:
logger.exception(f'Exception - {e}')
import pandas as pd
from loguru import logger
import warnings
import tracemalloc, gc
from scripts.utils.mongo_utils import MongoConnect
from scripts.constants.app_configuration import Mongo
from scripts.constants.app_constants import MongoConstants
warnings.filterwarnings('ignore')
def get_raw_predicted_tags():
try:
mongo_conn = MongoConnect(uri=Mongo.mongo_uri, database=MongoConstants.db,
collection=MongoConstants.collection)
if mongo_conn is None:
logger.info(f'mongodb is not connected, please check')
else:
logger.info(f'mongodb is connected')
logger.debug(f'mongo conn - {mongo_conn}')
raw_tags_dict = mongo_conn.find_one({"$and": [{"id": "dalmia_string_level_tags"}, {"city": "ariyalur"},
{"tags_property": "raw"}]})
req_tags = raw_tags_dict['input_data']
logger.info(f'req raw tags length - {len(req_tags)}')
df_raw_tags = pd.DataFrame.from_dict(req_tags, orient='index')
predicted_tags_dict = mongo_conn.find_one({"$and": [{"id": "dalmia_string_level_tags"}, {"city": "ariyalur"},
{"tags_property": "predicted"}]})
predicted_tags = predicted_tags_dict['input_data']
logger.info(f'req predicted tags length - {len(predicted_tags)}')
df_predicted_tags = pd.DataFrame.from_dict(predicted_tags, orient='index')
df_raw_tags.reset_index(inplace=True)
df_raw_tags.rename(columns={'index': 'tag_name'}, inplace=True)
df_predicted_tags.reset_index(inplace=True)
df_predicted_tags.rename(columns={'index': 'tag_name'}, inplace=True)
mppt_coefficients = mongo_conn.find_one({"$and": [{"id": "dalmia_string_level_tags"}, {"city": "ariyalur"},
{"tags_property": "mppt_coefficients"}]})
coefficients_dict = mppt_coefficients['input_data']
logger.info(f'req coefficients dict length - {len(coefficients_dict)}')
df_coefficients = pd.DataFrame.from_dict(coefficients_dict, orient='index')
df_coefficients.reset_index(inplace=True)
df_coefficients.rename(columns={'index': 'inv_id_mppt_id'}, inplace=True)
raw_tags_dict.clear()
predicted_tags_dict.clear()
coefficients_dict.clear()
del req_tags
del predicted_tags
print(f'memory allocation - {tracemalloc.get_traced_memory()}')
gc.collect()
print(f'memory allocation - {tracemalloc.get_traced_memory()}')
tracemalloc.clear_traces()
print(f'memory allocation - {tracemalloc.get_traced_memory()}')
return df_raw_tags, df_predicted_tags, df_coefficients
except Exception as e:
logger.exception(f'Exception - {e}')
import pandas as pd
from loguru import logger
from scripts.core.data_puller_push.data_puller import KairosQuery
def get_tags_data(df_input_tags, start_timestamp, end_timestamp, inv_id, mppt_id):
try:
df_tags_id = df_input_tags[['tag_id', 'tag_name', 'inv_id', 'parameter_name', 'mppt_id']]
df_tags_id.reset_index(drop=True, inplace=True)
current_voltage_tags_only = [data for data in df_tags_id['parameter_name']
if any([x in data for x in ['current', 'voltage']])]
req_data_list = [data for data in current_voltage_tags_only if 'Potential' not in data]
req_data_list = [data for data in req_data_list if 'Degradation' not in data]
df_req_tags_id = df_tags_id.loc[df_tags_id['parameter_name'].isin(req_data_list)]
df_req_tags_id.reset_index(drop=True, inplace=True)
tags_dict = df_req_tags_id[['tag_id', 'parameter_name']].set_index('tag_id').T.to_dict(orient="records")[0]
tags_dict['site_107$dept_140$line_371$equipment_4115$tag_15828'] = 'tilt_irradiance'
df_data = KairosQuery(start_timestamp=start_timestamp,
end_timestamp=end_timestamp,
tag_dict=tags_dict).kairos_data_import()
df_data['inv_id'] = inv_id
df_data['mppt_id'] = mppt_id
logger.info(f' for inv- {inv_id}, {df_data.shape}')
df_data['date'] = df_data['datetime'].dt.date
df_data['hour'] = df_data['datetime'].dt.hour
df_data.drop(['date'], axis=1, inplace=True)
logger.info(f'Final shape of merged dataframe = {df_data.shape}')
df_data.reset_index(drop=True, inplace=True)
return df_data
except Exception as e:
logger.exception(f'Exception - {e}')
import mlflow
from loguru import logger
import pandas as pd
import re
import os
import pytz
from datetime import datetime
from scripts.constants.app_configuration import MlFlow, ReqTimeZone
from scripts.utils.pycaret_util import PycaretUtil
mlflow_tracking_uri = MlFlow.mlflow_tracking_uri
os.environ["MLFLOW_TRACKING_USERNAME"] = MlFlow.mlflow_tracking_username
os.environ["MLFLOW_TRACKING_PASSWORD"] = MlFlow.mlflow_tracking_password
os.environ["AZURE_STORAGE_CONNECTION_STRING"] = MlFlow.azure_storage_connection_string
os.environ["AZURE_STORAGE_ACCESS_KEY"] = MlFlow.azure_storage_access_key
mlflow.set_tracking_uri(mlflow_tracking_uri)
mlflow.set_registry_uri(mlflow_tracking_uri)
client = mlflow.tracking.MlflowClient()
class ModelLoad(object):
def model_manager(self, inv_mppt_id):
try:
experiment_id = self.create_experiment(experiment_name=MlFlow.experiment_name)
days, latest_run_id = self.fetch_latest_model(experiment_id=experiment_id,
run_name=MlFlow.run_name + '_' + inv_mppt_id)
if days < int(MlFlow.model_check_param):
logger.debug(f'Using the pretrained model !')
energy_model = self.load_model_pyfunc(
model_path=self.forming_loading_path(latest_run_id=latest_run_id))
else:
logger.debug(f'Model is not present')
return energy_model
except Exception as e:
logger.exception(str(e))
@staticmethod
def create_experiment(experiment_name):
"""
Function is to create an experiment by passing experiment name
:param experiment_name: Name of the experiment
:return: Experiment id, Run id if any parent run is existing
"""
try:
experiment = mlflow.get_experiment_by_name(experiment_name)
if experiment:
exp_id = experiment.experiment_id
else:
mlflow.set_experiment(experiment_name)
experiment = mlflow.get_experiment_by_name(experiment_name)
exp_id = experiment.experiment_id
return exp_id
except Exception as e:
logger.exception(str(e))
def fetch_latest_model(self, experiment_id, run_name):
"""
Function is to fetch the latest run
:param experiment_id: Experiment Id
:return: return the difference in the days/Hours/Minutes of current and run time, latest run id
"""
try:
days = int(MlFlow.model_check_param) + 1
model_history = ""
latest_run_id = ""
if experiment_id:
run_id = self.get_parent_run_id(experiment_id, run_name)
run_info = mlflow.search_runs([experiment_id],
filter_string="tags.mlflow.parentRunId='{run_id}'".format(
run_id=run_id))
if not run_info.empty:
for ind in run_info.index:
model_history, days, latest_run_id = self.check_model_existing(run_info=run_info,
index=ind)
if model_history is not None:
break
if model_history is None:
days = int(MlFlow.model_check_param) + 1
logger.info("No Model is existing with this experiment")
return days, latest_run_id
except Exception as e:
logger.exception(f"Exception while fetching the latest model - {e}")
@staticmethod
def get_parent_run_id(experiment_id, run_name):
"""
Function is to fetch latest parent run id if available else latest run id
:param experiment_id: Experiment Id
:param run_name: Name of the run
:return: latest parent run id
"""
try:
result_run_id = None
df = mlflow.search_runs([experiment_id])
for index, row in df.iterrows():
parent_run_name = row.get("tags.mlflow.runName")
if parent_run_name == run_name:
result_run_id = row.get("run_id")
else:
logger.info(f"No Run is existing with this Experiment id - {experiment_id}")
return result_run_id
except Exception as e:
logger.exception(f"Exception while fetching the latest run_id - {e}")
def check_model_existing(self, run_info, index):
"""
Function is to check if model is existing or not
:param run_info: Dataframe of run details
:param index: index of which run from the dataframe
:return:
"""
try:
model_history = None
date_param = MlFlow.check_param
# Difference between the current date and latest available model date
days = self.format_mlflow_time(run_info=run_info, index=index, date_param=date_param)
latest_run_id = run_info.loc[index, 'run_id']
if 'tags.mlflow.log-model.history' in run_info:
model_history = run_info['tags.mlflow.log-model.history'][index]
if model_history:
model_history_list = model_history.split(":")
model_history = model_history_list[2].split(",")[0]
else:
logger.info("No Model is existing")
return model_history, days, latest_run_id
except Exception as e:
logger.exception(f"Exception while checking the model name - {e}")
@staticmethod
def forming_loading_path(latest_run_id):
"""
Function is to form the loading path
:param latest_run_id: Run id
:return : Return the loading path
"""
try:
model_name = MlFlow.model_name
model_path = f"runs:/{latest_run_id}/{model_name}"
return model_path
except Exception as e:
logger.exception(f"Exception while forming loading path - {e}")
@staticmethod
def format_mlflow_time(run_info, index, date_param):
"""
Formatting mlflow time
:param run_info: details of the runs
:param index: index of the run in the dataframe
:param: What type of the date param
:return: calculate the time difference between the mlflow time and the current time zone
"""
try:
df_time = run_info.copy()
df_time['end_time'] = pd.to_datetime(df_time['end_time']).dt.tz_convert(ReqTimeZone.required_tz)
df_time["days"] = df_time['end_time'].dt.date
df_time["hours"] = df_time['end_time'].dt.hour
df_required = df_time.iloc[index:index + 1:, :]
df_required.reset_index(drop=True, inplace=True)
last_model_time = df_required['end_time'][0].to_pydatetime()
central_current = datetime.now(pytz.utc).astimezone(pytz.timezone(ReqTimeZone.required_tz))
time_diff = central_current - last_model_time
if date_param.lower() == "days":
days_diff = int(time_diff.days)
return days_diff
elif date_param.lower() == "hours":
hours_diff = int(time_diff.total_seconds() // 3600)
return hours_diff
elif date_param.lower() == "minutes":
minutes_diff = int(time_diff.total_seconds() // 60)
return minutes_diff
else:
logger.info("No Valid Date format was given")
except Exception as e:
logger.exception(f"Exception while Loading the model - {e}")
@staticmethod
def set_tag(run_id, key, value):
"""
Function is to set the tag for a particular run
:param run_id: Run id in which the tags need to be added
:param key: Name of the key
:param value: what needs to tagged in the value
"""
try:
client.set_tag(run_id=run_id, key=key, value=value)
logger.debug(f'set the tag for the model')
except Exception as e:
logger.exception(f"Exception while setting the tag - {e}")
@staticmethod
def load_model_pyfunc(model_path):
"""
Function is load the sklearn model
:param model_path: path of the model
:return: boolen value
"""
try:
model = mlflow.pyfunc.load_model(model_path)
logger.info("loading the model")
return model
except Exception as e:
logger.exception(str(e))
from typing import Dict
from loguru import logger
from pymongo import MongoClient
from scripts.constants.app_configuration import Mongo
class MongoConnect:
def __init__(self, uri, database, collection):
try:
self.uri = uri
self.client = MongoClient(self.uri, connect=False)
self.database = database
self.collection = collection
except Exception as e:
logger.exception(f'Exception - {e}')
@staticmethod
def data_dict(data, city):
try:
req_dict = dict()
req_dict['project_id'] = Mongo.project_id
req_dict['id'] = Mongo.query_filter
req_dict['city'] = city
req_dict['input_data'] = data
return req_dict
except Exception as e:
logger.exception(f'Exception - {e}')
def insert_one(self, data, city):
try:
db = self.client[self.database]
collection = db[self.collection]
req_dict = self.data_dict(data=data, city=city)
response = collection.insert_one(req_dict)
return response.inserted_id
except Exception as e:
logger.exception(f'Exception - {e}')
def find_one(self, query, filter_dict=None):
try:
if filter_dict is None:
filter_dict = {"_id": 0}
db = self.client[self.database]
collection = db[self.collection]
response = collection.find_one(query, filter_dict)
return response
except Exception as e:
logger.exception(f'Exception - {e}')
from loguru import logger
from pycaret import regression
from scripts.constants.app_configuration import MlFlow, PycaretParams
class PycaretUtil:
def __init__(self):
self.model_list = PycaretParams.model_list.split(",")
self.selected_metric = PycaretParams.selected_metric
self.hyperparameter_tuning_method = PycaretParams.hyperparameter_tuning_method
def get_best_model(self, df, target):
try:
regression.setup(data=df, target=target)
best_model = regression.compare_models(include=self.model_list, sort=self.selected_metric,
n_select=1)
tuned_model = regression.tune_model(best_model, optimize=self.selected_metric,
search_library=self.hyperparameter_tuning_method)
results = regression.pull()
results.sort_values(self.selected_metric, ascending=False, inplace=True)
results.reset_index(drop=True, inplace=True)
get_best_model_row = results.iloc[0]
best_metrics = get_best_model_row.to_dict()
best_metrics.pop('Model', None)
return tuned_model, best_metrics
except Exception as e:
logger.exception(f'Exception - {e}')
@staticmethod
def get_model_name(model):
try:
model_name = str(model).split('(')[0]
return model_name
except Exception as e:
logger.info(f"Unable to get the model name - {e}")
def get_auto_ml_model(self, df, target):
try:
model, metrics = self.get_best_model(df=df, target=target)
model_name = self.get_model_name(model)
hyper_params = model.get_params()
return model, model_name, metrics, hyper_params
except Exception as e:
logger.info(f"Unable to get the model name - {e}")
from datetime import datetime, timedelta
from scripts.constants.app_configuration import DateRange,ReqTimeZone
from loguru import logger
import pytz
class KairosStartEndDate:
@staticmethod
def start_end_date():
try:
local_timezone = pytz.timezone(ReqTimeZone.required_tz)
start_date_timestamp = DateRange.start_date
end_date_timestamp = DateRange.end_date
if (start_date_timestamp is not None) and (start_date_timestamp.lower() != "none"):
start_date = datetime.fromtimestamp((int(start_date_timestamp)/1000)).strftime('%Y-%m-%d %H:%M:%S')
start_date = datetime.strptime(start_date, "%Y-%m-%d %H:%M:%S")
start_date = start_date.astimezone(local_timezone).replace(hour=5, minute=0, second=0, microsecond=0)
else:
start_date = datetime.now(pytz.utc) - timedelta(days=int(DateRange.start_relative_days))
start_date = start_date.astimezone(local_timezone).replace(hour=0, minute=0, second=0, microsecond=0)
if (end_date_timestamp is not None) and (end_date_timestamp.lower() != "none"):
end_date = datetime.fromtimestamp((int(end_date_timestamp)/1000)).strftime('%Y-%m-%d %H:%M:%S')
end_date = datetime.strptime(end_date, "%Y-%m-%d %H:%M:%S")
end_date = end_date.astimezone(local_timezone).replace(hour=5, minute=0, second=0, microsecond=0)
else:
end_date = datetime.now(pytz.utc) - timedelta(days=int(DateRange.end_relative_days))
end_date = end_date.astimezone(local_timezone).replace(hour=23, minute=59, second=59, microsecond=0)
start_date = start_date.replace(hour=0, minute=0, second=0)
end_date = end_date.replace(hour=23, minute=59, second=59)
start_timestamp = int(start_date.timestamp())*1000
end_timestamp = int(end_date.timestamp())*1000
return start_date, end_date, start_timestamp, end_timestamp
except Exception as e:
logger.exception(f"Exception - {e}")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment