Commit 2b48d9ce authored by aakash.bedi's avatar aakash.bedi

updated training module

parent 046abe2a
Pipeline #59958 failed with stage
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
data
.idea/
.vscode/
.pytest_cache/
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
This diff is collapsed.
FROM python:3.9-buster
RUN apt-get update -y && \
apt install -y openjdk-11-jre && \
apt install -y libtiff5-dev libjpeg62-turbo-dev && \
apt install -y zlib1g-dev libfreetype6-dev liblcms2-dev libwebp-dev tcl8.6-dev && \
apt install -y tk8.6-dev python-tk pdftk libmagickwand-dev
COPY . /code
WORKDIR /code
RUN pip install -r requirements.txt
CMD [ "python","app.py" ]
\ No newline at end of file
if __name__ == "__main__":
from dotenv import load_dotenv
load_dotenv(dotenv_path='config.env')
import warnings
from loguru import logger
import tracemalloc
import gc
from scripts.core.engine.mppt_data import GetData
from scripts.core.engine.tags_data import get_tags_data
from scripts.utils.start_end_date import KairosStartEndDate
from scripts.utils.preprocessing import DataPreprocessing
from scripts.core.engine.data_training_and_inference import Training
from scripts.core.engine.raw_predicted_tags import get_raw_predicted_tags
warnings.filterwarnings("ignore")
start_date, end_date, start_timestamp, end_timestamp = KairosStartEndDate().start_end_date()
def orchestrator():
try:
tracemalloc.start()
print(f'initial memory usage - {tracemalloc.get_traced_memory()}')
gc.collect()
tracemalloc.reset_peak()
tracemalloc.clear_traces()
print(tracemalloc.get_traced_memory())
df_raw_tags, df_predicted_tags, df_coefficients = get_raw_predicted_tags()
logger.info(f'raw tags dataframe shape - {df_raw_tags.shape}')
logger.info(f'predicted tags dataframe shape - {df_predicted_tags.shape}')
for inv_id in list(df_raw_tags['inv_id'].unique()):
df = df_raw_tags[df_raw_tags['inv_id'] == inv_id]
for mppt_id in list(df_raw_tags['mppt_id'].unique()):
print(f'1st memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
gc.collect()
tracemalloc.reset_peak()
print(f'2nd memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
tracemalloc.clear_traces()
print(f'3rd memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
df_mppt_level = df[df['mppt_id'] == mppt_id]
df_kairos_data = get_tags_data(df_input_tags=df_mppt_level, start_timestamp=start_timestamp,
end_timestamp=end_timestamp, inv_id=inv_id, mppt_id=mppt_id)
print(f'4th memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
gc.collect()
tracemalloc.reset_peak()
print(f'5th memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
tracemalloc.clear_traces()
print(f'6th memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
logger.info(f'Shape of final df - {df_kairos_data.shape}')
mppt_data = GetData()
df_mppt = mppt_data.associate_inv_mppt_id(df=df_kairos_data)
print(f'7th memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
gc.collect()
tracemalloc.reset_peak()
print(f'8th memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
tracemalloc.clear_traces()
print(f'9th memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
df_coefficient_multiply = mppt_data.multiply_mppt_coefficients(df_mppt=df_mppt,
df_coefficients=df_coefficients)
print(f'10th memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
gc.collect()
tracemalloc.reset_peak()
print(f'11th memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
tracemalloc.clear_traces()
print(f'12th memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
data_preprocessing = DataPreprocessing()
df_clean = data_preprocessing.remove_outliers(df=df_coefficient_multiply,
param_list=['tilt_irradiance', 'voltage_mppt',
'current_mppt'])
print(f'13th memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
gc.collect()
tracemalloc.reset_peak()
print(f'14th memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
tracemalloc.clear_traces()
print(f'15th memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
Training(df=df_clean).data_training(inv_id=inv_id, mppt_id=mppt_id)
del df_kairos_data
del df_mppt
del df_coefficient_multiply
print(f'16th memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
gc.collect()
tracemalloc.reset_peak()
print(f'17th memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
tracemalloc.clear_traces()
print(f'18th memory allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
print(f'final allocation for {inv_id} & {mppt_id} - {tracemalloc.get_traced_memory()}')
logger.info(f'data training for {inv_id}, {mppt_id} has been completed !')
tracemalloc.stop()
except Exception as e:
logger.exception(f'Exception - {e}')
if __name__ == '__main__':
orchestrator()
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
[LOGGING]
level = $LOG_LEVEL
traceback = $LOG_TRACEBACK
[MODULE]
name = $APP_NAME
[KAIROS_DB]
uri=$KAIROS_URI
metric_name=$KAIROS_METRIC
aggregator=$AGGREGATOR
aggregator_value=$AGGREGATOR_VALUE
aggregator_unit=$AGGREGATOR_UNIT
[KAFKA]
kafka_host=$KAFKA_HOST
kafka_port=$KAFKA_PORT
kafka_topic=$KAFKA_TOPIC
[DATE_RANGE]
start_date=$START_DATE
end_date=$END_DATE
start_relative_days=$START_RELATIVE
end_relative_days=$END_RELATIVE
[EMAIL_DETAILS]
email_reciever=$RECIEVER_EMAIL
email_sender=$SENDER_EMAIL
[MONGO]
mongo_uri=$MONGO_URI
project_id=$PROJECT_ID
query_filter=$QUERY_FILTER
[TIMEZONE]
required_tz=$REQUIRED_TZ
[MLFLOW]
mlflow_tracking_uri=$MLFLOW_TRACKING_URI
mlflow_tracking_username=$MLFLOW_TRACKING_USERNAME
mlflow_tracking_password=$MLFLOW_TRACKING_PASSWORD
azure_storage_connection_string=$AZURE_STORAGE_CONNECTION_STRING
azure_storage_access_key=$AZURE_STORAGE_ACCESS_KEY
user=$USER
experiment_name=$EXPERIMENT_NAME
run_name=$RUN_NAME
model_name=$MODEL_NAME
check_param=$CHECK_PARAM
model_check_param=$MODEL_CHECK_PARAM
[PYCARET]
models_list=$MODELS_LIST
selected_metric=$SELECTED_METRIC
hyperparameter_tuning_method=$HYPERPARAMETER_TUNING_METHOD
inv_list:
- inv_1
- inv_2
- inv_3
- inv_4
- inv_5
- inv_6
- inv_7
- inv_8
- inv_9
- inv_10
- inv_11
- inv_12
- inv_13
- inv_14
- inv_15
- inv_16
- inv_17
- inv_18
- inv_19
- inv_20
- inv_21
- inv_22
- inv_23
- inv_24
- inv_25
- inv_26
- inv_27
- inv_28
- inv_29
- inv_30
- inv_31
- inv_32
- inv_33
- inv_34
- inv_35
- inv_36
- inv_37
- inv_38
- inv_39
- inv_40
- inv_41
- inv_42
This diff is collapsed.
APP_NAME=dalmia-solar-degradation
KAIROS_URI=https://iLens:iLensDAL$456@dalmia.ilens.io/kairos/
KAIROS_METRIC=ilens.live_data.raw
AGGREGATOR=max
AGGREGATOR_VALUE=15
AGGREGATOR_UNIT=minutes
KAFKA_HOST=192.168.0.220
KAFKA_PORT=9092
KAFKA_TOPIC=ilens_dev
START_RELATIVE=90
END_RELATIVE=2
REQUIRED_TZ="Asia/Kolkata"
MONGO_URI=mongodb://admin:iLensDevMongo783@192.168.0.220:2717/
PROJECT_ID=project_101
QUERY_FILTER=dalmia_string_level_tags
MLFLOW_TRACKING_URI=https://qa.unifytwin.com/mlflow/
MLFLOW_TRACKING_USERNAME=mlflow
MLFLOW_TRACKING_PASSWORD=MlFlOwQA#4321
AZURE_STORAGE_CONNECTION_STRING=DefaultEndpointsProtocol=https;AccountName=azrmlilensqa006382180551;AccountKey=tDGOKfiZ2svfoMvVmS0Fbpf0FTHfTq4wKYuDX7cAxlhve/3991QuzdvJHm9vWc+lo6mtC+x9yPSghWNR4+gacg==;EndpointSuffix=core.windows.net
AZURE_STORAGE_ACCESS_KEY=tDGOKfiZ2svfoMvVmS0Fbpf0FTHfTq4wKYuDX7cAxlhve/3991QuzdvJHm9vWc+lo6mtC+x9yPSghWNR4+gacg==
USER=Dalmia_degradation
EXPERIMENT_NAME=Dalmia Solar Degradation2
RUN_NAME=Degradation
MODEL_NAME=versioning
CHECK_PARAM=hours
MODEL_CHECK_PARAM=480
MODELS_LIST=lr,knn,gbr,rf,catboost,lightgbm,ada,et,xgboost,dt,en,par,huber
SELECTED_METRIC=R2
HYPERPARAMETER_TUNING_METHOD=scikit-learn
\ No newline at end of file
This diff is collapsed.
if __name__ == "__main__":
from dotenv import load_dotenv
load_dotenv(dotenv_path='config.env')
import os
import os.path
import sys
from configparser import ConfigParser, BasicInterpolation
import yaml
from loguru import logger
# Configuring file constants
data_conf = "./conf/data.yml"
class EnvInterpolation(BasicInterpolation):
"""
Interpolation which expands environment variables in values.
"""
def before_get(self, parser, section, option, value, defaults):
value = super().before_get(parser, section, option, value, defaults)
if not os.path.expandvars(value).startswith("$"):
return os.path.expandvars(value)
else:
return
try:
config = ConfigParser(interpolation=EnvInterpolation())
config.read(f"conf/application.conf")
except Exception as e:
print(f"Error while loading the config: {e}")
print("Failed to Load Configuration. Exiting!!!")
sys.exit()
class KairosDb:
uri = config["KAIROS_DB"]["uri"]
metric_name = config['KAIROS_DB']['metric_name']
aggregator = config['KAIROS_DB']['aggregator']
aggregator_value = config['KAIROS_DB']['aggregator_value']
aggregator_unit = config['KAIROS_DB']['aggregator_unit']
class Kafka:
kafka_host = config["KAFKA"]["kafka_host"]
kafka_port = config["KAFKA"]["kafka_port"]
kafka_topic = config["KAFKA"]["kafka_topic"]
class DateRange:
start_date = config.get("DATE_RANGE", "start_date")
end_date = config.get("DATE_RANGE", "end_date")
start_relative_days = config.get("DATE_RANGE", "start_relative_days")
end_relative_days = config.get("DATE_RANGE", "end_relative_days")
class ReqTimeZone:
required_tz = config.get('TIMEZONE', 'required_tz')
class MlFlow:
mlflow_tracking_uri = config['MLFLOW']['mlflow_tracking_uri']
mlflow_tracking_username = config['MLFLOW']['mlflow_tracking_username']
mlflow_tracking_password = config['MLFLOW']['mlflow_tracking_password']
azure_storage_connection_string = config['MLFLOW']['azure_storage_connection_string']
azure_storage_access_key = config['MLFLOW']['azure_storage_access_key']
user = config['MLFLOW']['user']
experiment_name = config['MLFLOW']['experiment_name']
run_name = config['MLFLOW']['run_name']
model_name = config['MLFLOW']['model_name']
check_param = config['MLFLOW']['check_param']
model_check_param = config['MLFLOW']['model_check_param']
class PycaretParams:
model_list = config['PYCARET']['models_list']
selected_metric = config['PYCARET']['selected_metric']
hyperparameter_tuning_method= config['PYCARET']['hyperparameter_tuning_method']
class Mongo:
mongo_uri = config["MONGO"]["mongo_uri"]
project_id = config["MONGO"]["project_id"]
query_filter = config["MONGO"]["query_filter"]
json_file_path = "scripts/utils/"
class MongoConstants:
# DB
db = "ilens_ai"
# collections
collection = "dalmiaStringTags"
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
import json
import pandas as pd
import requests
from loguru import logger
from scripts.constants.app_configuration import KairosDb
class KairosQuery:
def __init__(self, start_timestamp, end_timestamp, tag_dict):
self.start_timestamp = start_timestamp
self.end_timestamp = end_timestamp
self.kairos_host = KairosDb.uri
self.kairos_url = "{kairos_host}/api/v1/datapoints/query".format(kairos_host=self.kairos_host)
self.tag_dict = tag_dict
def kairos_query(self):
try:
return {
"metrics": [
{
"tags": {
"c3": list(self.tag_dict.keys())
},
"name": KairosDb.metric_name,
"group_by": [
{
"name": "tag",
"tags": ["c3"]
}
],
"aggregators": [
{
"name": KairosDb.aggregator,
"sampling": {
"value": KairosDb.aggregator_value,
"unit": KairosDb.aggregator_unit
},
"align_sampling": True,
"align_start_time": True
}
]
}
],
"plugins": [],
"cache_time": 0,
"time_zone": "Asia/Calcutta",
"start_absolute": self.start_timestamp,
"end_absolute": self.end_timestamp,
}
except Exception as e:
logger.exception(f"Exception - {e}")
def get_data(self, data_query):
try:
logger.info("Data for the parameters being pulled from Kairos Database")
response = requests.post(self.kairos_url, data=json.dumps(data_query))
response_data = response.json()
output_data = response_data["queries"]
logger.debug("Data pull complete")
df_final = pd.DataFrame()
for i in range(len(output_data)):
grouped_output_data = output_data[i]["results"]
for each_grouped_data in grouped_output_data:
value = (each_grouped_data["values"])
tag_id = each_grouped_data["group_by"][0]["group"]["c3"]
try:
logger.debug("Renamed {} to {} in Data".format(tag_id, self.tag_dict[tag_id]))
column_name = self.tag_dict[tag_id]
except KeyError as e:
logger.exception(f'Exception - {e}')
logger.debug("Column Renaming Logic not found for {}".format(tag_id))
column_name = tag_id
df_column_data = pd.DataFrame(data=value, columns=["timestamp", column_name])
if df_final.empty:
df_final = df_column_data
else:
df_final = df_final.merge(df_column_data, how="outer", left_on="timestamp",
right_on="timestamp")
df_final['datetime'] = pd.to_datetime(df_final['timestamp'], unit="ms").dt.tz_localize('UTC').\
dt.tz_convert('Asia/Kolkata')
logger.debug("Final number of columns : {}".format(str(len(list(df_final.columns)))))
return df_final
except Exception as e:
logger.exception(f"Exception occurred - {e}", exc_info=True)
def kairos_data_import(self):
try:
logger.debug("Fetching live data")
query_live = self.kairos_query()
logger.info(f"query_live = {query_live}")
df = self.get_data(data_query=query_live)
return df
except Exception as e:
logger.exception(f"Exception - {e}")
from loguru import logger
from scripts.core.data_puller_push.push_data import insert_values_dalmia
class CalculatedDataPush:
def __init__(self, df_result, final_tags_dict):
self.df_result = df_result
self.final_tags_dict = final_tags_dict
def kafka_data_push(self):
try:
logger.info(f"final_tags_dict = {self.final_tags_dict}")
logger.info(f"df result shape = {self.df_result.shape}")
for i, j in self.df_result.iterrows():
my_dict = {v: j[k] for k, v in self.final_tags_dict.items()}
logger.info(f"{j['timestamp'], j['datetime'], my_dict}")
insert_values_dalmia(j['timestamp'], my_dict)
except Exception as e:
logger.exception(f'Exception - {e}')
from json import dumps
from kafka import KafkaProducer
from loguru import logger
from scripts.constants.app_configuration import Kafka
def insert_values_dalmia(ts, my_dict):
kairos_writer = KairosWriter()
kairos_writer.write_data(
{
ts: my_dict
},
Kafka.kafka_topic
)
logger.info("Data pushed successfully!")
class KafkaProducerUtil:
def __init__(self):
try:
self.host = Kafka.kafka_host
self.port = Kafka.kafka_port
logger.debug(f"Connecting to Kafka with details: {self.host}, {self.port}")
kafka_broker = [self.host + ":" + str(self.port)]
self.producer = KafkaProducer(
bootstrap_servers=kafka_broker,
value_serializer=lambda v: v.encode('utf-8'),
api_version=(0, 10, 1))
self.producer.flush()
except Exception as e:
logger.error(f"Kafka connection error: {e}")
def publish(self, topic, data):
try:
kafka_response = self.producer.send(topic, data)
self.producer.flush()
logger.debug(f" Message sent to kafka with response: {kafka_response}")
return True
except Exception as e:
logger.error(e)
return False
class KairosWriter(KafkaProducerUtil):
def write_data(self, data_json, topic):
site_id = "site_107"
logger.debug(f"Data being pushed to kafka topic: {topic}")
msg_counter = 0
for k, v in data_json.items():
timestamp, data = self.data_validator(k, v)
timestamp = timestamp * 1000
write_json = {
"data": data,
"site_id": site_id,
"gw_id": "gw_{}".format(site_id.lstrip("site_")), # The lstrip(s) removes leading whitespace (on the left)
"pd_id": "pd_{}".format(site_id.lstrip("site_")), # The rstrip(s) removes the trailing whitespace (on the right)
"timestamp": timestamp,
"msg_id": msg_counter,
"partition": "",
"retain_flag": False
}
logger.debug(f"Timestamp: {timestamp}, Values: {data}")
self.publish(topic, dumps(write_json))
msg_counter += 1
return msg_counter
def audit_data(self, data_json, topic):
logger.debug(f"Audit Data being pushed to kafka topic: {topic}")
msg_counter = len(data_json)
for each in data_json:
self.publish(topic, dumps(each))
return msg_counter
@staticmethod
def data_validator(timestamp, data):
logger.debug("Validating the data to remove Nan values")
__temp__ = {}
for k, v in data.items():
if not k.startswith("site"):
continue
# This function will return True if the "v" is one of the types in the tuple
if isinstance(v, (int, float)) and str(v) not in ('nan', 'inf'):
__temp__[k] = v
return int(timestamp), __temp__
import pandas as pd
from loguru import logger
from scripts.utils.preprocessing import DataPreprocessing
from scripts.utils.mlflow_util import ModelLoad
class Training:
def __init__(self, df):
self.df = df
def data_training(self, inv_id, mppt_id):
try:
df_std = self.df[['tilt_irradiance', 'voltage_mppt', 'hour', 'current_mppt']]
# df_std.drop([''])
df_std.dropna(axis=0, inplace=True)
df_std.reset_index(drop=True, inplace=True)
inv_mppt_id = f'{inv_id}_{mppt_id}'
ModelLoad().model_manager(df=df_std, target='current_mppt',
inv_mppt_id=inv_mppt_id, city='ariyalur')
except Exception as e:
logger.exception(f'Exception - {e}')
import pandas as pd
from loguru import logger
class GetFinalDf:
@staticmethod
def get_final_data(x_test, y_test, predictions):
try:
df_result = pd.DataFrame(index=[i for i in range(len(y_test))])
df_result['datetime'] = x_test['datetime']
df_result['actual_current_mppt'] = y_test
df_result['predicted_current_mppt'] = predictions
df_result.drop(['actual_current_mppt'], axis=1, inplace=True)
df_result = df_result.round(2)
df_result.reset_index(drop=True, inplace=True)
logger.info(f'{df_result.shape}')
return df_result
except Exception as e:
logger.exception(f'Exception - {e}')
@staticmethod
def get_final_predicted_tags(df_predicted_current_tags, inv_id, mppt_id):
try:
df = df_predicted_current_tags[df_predicted_current_tags['inv_id'] == inv_id]
df = df[df['mppt_id'] == mppt_id]
df.reset_index(drop=True, inplace=True)
final_dict = {}
for index in range(df.shape[0]):
tag_id = df.iloc[index, df.columns.get_loc('tag_id')]
parameter_name = df.iloc[index, df.columns.get_loc('parameter_name')]
final_dict['predicted_current_mppt'] = tag_id
logger.debug(f'tag_id - {tag_id} & parameter name - {parameter_name}')
return final_dict
except Exception as e:
logger.exception(f'Exception - {e}')
\ No newline at end of file
import pandas as pd
from loguru import logger
class GetData:
@staticmethod
def associate_inv_mppt_id(df):
try:
current_column_list = [col_name for col_name in df if 'current' in col_name]
voltage_column_list = [col_name for col_name in df if 'voltage' in col_name]
current_column_list.sort()
voltage_column_list.sort()
df_mppt = pd.DataFrame()
for n_mppt in range(len(current_column_list)):
df_temp = df[['datetime', 'inv_id', 'hour', 'tilt_irradiance', voltage_column_list[n_mppt],
current_column_list[n_mppt]]]
df_temp['mppt_id'] = current_column_list[n_mppt]
df_temp['mppt_id'] = df_temp['mppt_id'].str.replace('current_', '')
df_temp.rename(columns={voltage_column_list[n_mppt]: 'voltage_mppt',
current_column_list[n_mppt]: 'current_mppt'}, inplace=True)
if df_mppt.empty:
df_mppt = df_temp
else:
df_mppt = pd.concat([df_mppt, df_temp], axis=0)
df_mppt.reset_index(drop=True, inplace=True)
df_mppt.sort_values(['inv_id', 'mppt_id'], inplace=True)
df_mppt['inv_id_mppt_id'] = df_mppt['inv_id'] + '_' + df_mppt['mppt_id']
df_mppt.dropna(axis=0, inplace=True)
df_mppt.reset_index(drop=True, inplace=True)
return df_mppt
except Exception as e:
logger.exception(f'Exception - {e}')
@staticmethod
def multiply_mppt_coefficients(df_mppt, df_coefficients):
try:
df = pd.merge(df_mppt, df_coefficients, on='inv_id_mppt_id', how='left')
logger.debug(f'null rows after mppt coefficient multiplication - {df.isnull().sum()}')
df["coefficient"].fillna(1, inplace=True)
df.reset_index(drop=True, inplace=True)
df['current_mppt'] = df['current_mppt'] * df["coefficient"]
df.drop(['coefficient', 'inv_id_mppt_id'], axis=1, inplace=True)
return df
except Exception as e:
logger.exception(f'Exception - {e}')
import gc
import pandas as pd
from loguru import logger
import warnings
import tracemalloc
from scripts.utils.mongo_utils import MongoConnect
from scripts.constants.app_configuration import Mongo
from scripts.constants.app_constants import MongoConstants
warnings.filterwarnings('ignore')
def get_raw_predicted_tags():
try:
mongo_conn = MongoConnect(uri=Mongo.mongo_uri, database=MongoConstants.db,
collection=MongoConstants.collection)
if mongo_conn is None:
logger.info(f'mongodb is not connected, please check')
else:
logger.info(f'mongodb is connected')
logger.debug(f'mongo conn - {mongo_conn}')
raw_tags_dict = mongo_conn.find_one({"$and": [{"id": "dalmia_string_level_tags"}, {"city": "ariyalur"},
{"tags_property": "raw"}]})
req_tags = raw_tags_dict['input_data']
logger.info(f'req raw tags length - {len(req_tags)}')
df_raw_tags = pd.DataFrame.from_dict(req_tags, orient='index')
predicted_tags_dict = mongo_conn.find_one({"$and": [{"id": "dalmia_string_level_tags"}, {"city": "ariyalur"},
{"tags_property": "predicted"}]})
predicted_tags = predicted_tags_dict['input_data']
logger.info(f'req predicted tags length - {len(predicted_tags)}')
df_predicted_tags = pd.DataFrame.from_dict(predicted_tags, orient='index')
df_raw_tags.reset_index(inplace=True)
df_raw_tags.rename(columns={'index': 'tag_name'}, inplace=True)
df_predicted_tags.reset_index(inplace=True)
df_predicted_tags.rename(columns={'index': 'tag_name'}, inplace=True)
mppt_coefficients = mongo_conn.find_one({"$and": [{"id": "dalmia_string_level_tags"}, {"city": "ariyalur"},
{"tags_property": "mppt_coefficients"}]})
coefficients_dict = mppt_coefficients['input_data']
logger.info(f'req coefficients dict length - {len(coefficients_dict)}')
df_coefficients = pd.DataFrame.from_dict(coefficients_dict, orient='index')
df_coefficients.reset_index(inplace=True)
df_coefficients.rename(columns={'index': 'inv_id_mppt_id'}, inplace=True)
print(tracemalloc.get_traced_memory())
raw_tags_dict.clear()
predicted_tags_dict.clear()
coefficients_dict.clear()
del req_tags
del predicted_tags
print(f'memory allocation - {tracemalloc.get_traced_memory()}')
gc.collect()
tracemalloc.reset_peak()
print(f'memory allocation - {tracemalloc.get_traced_memory()}')
tracemalloc.clear_traces()
print(f'memory allocation - {tracemalloc.get_traced_memory()}')
return df_raw_tags, df_predicted_tags, df_coefficients
except Exception as e:
logger.exception(f'Exception - {e}')
import pandas as pd
from loguru import logger
from scripts.core.data_puller_push.data_puller import KairosQuery
def get_tags_data(df_input_tags, start_timestamp, end_timestamp, inv_id, mppt_id):
try:
df_tags_id = df_input_tags[['tag_id', 'tag_name', 'inv_id', 'parameter_name', 'mppt_id']]
df_tags_id.reset_index(drop=True, inplace=True)
current_voltage_tags_only = [data for data in df_tags_id['parameter_name']
if any([x in data for x in ['current', 'voltage']])]
req_data_list = [data for data in current_voltage_tags_only if 'Potential' not in data]
req_data_list = [data for data in req_data_list if 'Degradation' not in data]
df_req_tags_id = df_tags_id.loc[df_tags_id['parameter_name'].isin(req_data_list)]
df_req_tags_id.reset_index(drop=True, inplace=True)
tags_dict = df_req_tags_id[['tag_id', 'parameter_name']].set_index('tag_id').T.to_dict(orient="records")[0]
tags_dict['site_107$dept_140$line_371$equipment_4115$tag_15828'] = 'tilt_irradiance'
df_data = KairosQuery(start_timestamp=start_timestamp,
end_timestamp=end_timestamp,
tag_dict=tags_dict).kairos_data_import()
df_data['inv_id'] = inv_id
df_data['mppt_id'] = mppt_id
logger.info(f' for inv- {inv_id}, {df_data.shape}')
df_data['date'] = df_data['datetime'].dt.date
df_data['hour'] = df_data['datetime'].dt.hour
df_data.drop(['date'], axis=1, inplace=True)
df_data.drop(df_data[df_data['hour'].between(left=19, right=23, inclusive='both')].index, inplace=True, axis=0)
df_data.drop(df_data[df_data['hour'].between(left=0, right=5, inclusive='both')].index, inplace=True, axis=0)
logger.info(f'Final shape of merged dataframe = {df_data.shape}')
df_data.reset_index(drop=True, inplace=True)
return df_data
except Exception as e:
logger.exception(f'Exception - {e}')
This diff is collapsed.
from typing import Dict
from loguru import logger
from pymongo import MongoClient
from scripts.constants.app_configuration import Mongo
class MongoConnect:
def __init__(self, uri, database, collection):
try:
self.uri = uri
self.client = MongoClient(self.uri, connect=False)
self.database = database
self.collection = collection
except Exception as e:
logger.exception(f'Exception - {e}')
@staticmethod
def data_dict(data, city):
try:
req_dict = dict()
req_dict['project_id'] = Mongo.project_id
req_dict['id'] = Mongo.query_filter
req_dict['city'] = city
req_dict['input_data'] = data
return req_dict
except Exception as e:
logger.exception(f'Exception - {e}')
def insert_one(self, data, city):
try:
db = self.client[self.database]
collection = db[self.collection]
req_dict = self.data_dict(data=data, city=city)
response = collection.insert_one(req_dict)
return response.inserted_id
except Exception as e:
logger.exception(f'Exception - {e}')
def find_one(self, query, filter_dict=None):
try:
if filter_dict is None:
filter_dict = {"_id": 0}
db = self.client[self.database]
collection = db[self.collection]
response = collection.find_one(query, filter_dict)
return response
except Exception as e:
logger.exception(f'Exception - {e}')
import pandas as pd
from loguru import logger
import pytz
from datetime import datetime, timedelta
from scripts.constants.app_configuration import ReqTimeZone
from sklearn.preprocessing import MinMaxScaler
class DataPreprocessing:
@staticmethod
def remove_outliers(df, param_list):
try:
for col in param_list:
lb = df[col].mean() - 3 * df[col].std()
ub = df[col].mean() + 3 * df[col].std()
logger.debug(f"Min values of {col} = {df[col].min()} \nLower Bracket of {col} = {lb}")
logger.debug(f"Max values of {col} = {df[col].max()} \nUpper Bracket of {col} = {ub}")
logger.debug(f'Shape of df before outlier removal = {df.shape}')
df = (df[(df[col] > lb) & (df[col] < ub)])
logger.debug(f'Shape of df after outlier removal = {df.shape}')
logger.debug(f'Shape final df before outlier removal = {df.shape}')
return df
except Exception as e:
logger.exception(f'Exception - {e}')
@staticmethod
def train_test_split(df):
try:
today_date = datetime.now(pytz.utc).astimezone(pytz.timezone(ReqTimeZone.required_tz)).date()
df = df[df['date'] != today_date]
yesterday_date = today_date - timedelta(days=1)
df_train = df[df['date'] < yesterday_date]
df_test = df[df['date'] == yesterday_date]
df_train.reset_index(drop=True, inplace=True)
df_test.reset_index(drop=True, inplace=True)
df_train.drop(['date'], axis=1, inplace=True)
df_test.drop(['date'], axis=1, inplace=True)
return df, df_train, df_test
except Exception as e:
logger.exception(f'Exception - {e}')
@staticmethod
def get_standardized_data(df, param_list=None):
try:
if param_list is None:
scaler = MinMaxScaler()
df_std = pd.DataFrame(scaler.fit_transform(df), columns=list(df.columns))
return df_std, scaler
else:
scaler = MinMaxScaler()
df_std = pd.DataFrame(scaler.fit_transform(df.drop(param_list, axis=1)),
columns=list(df.drop(param_list, axis=1).columns))
return df_std, scaler
except Exception as e:
logger.exception(f'Exception - {e}')
@staticmethod
def get_transform_std_data(df, scaler, param_list=None):
try:
if param_list is None:
df_std = pd.DataFrame(scaler.transform(df), columns=list(df.columns))
return df_std
else:
df_std = pd.DataFrame(scaler.transform(df.drop(param_list, axis=1)),
columns=list(df.drop(param_list, axis=1).columns))
return df_std
except Exception as e:
logger.exception(f'Exception - {e}')
\ No newline at end of file
from loguru import logger
from pycaret import regression
from scripts.constants.app_configuration import MlFlow, PycaretParams
class PycaretUtil:
def __init__(self):
self.model_list = PycaretParams.model_list.split(",")
self.selected_metric = PycaretParams.selected_metric
self.hyperparameter_tuning_method = PycaretParams.hyperparameter_tuning_method
def get_best_model(self, df, target):
try:
regression.setup(data=df, target=target)
best_model = regression.compare_models(include=self.model_list, sort=self.selected_metric,
n_select=1)
tuned_model = regression.tune_model(best_model, optimize=self.selected_metric,
search_library=self.hyperparameter_tuning_method)
results = regression.pull()
results.sort_values(self.selected_metric, ascending=False, inplace=True)
results.reset_index(drop=True, inplace=True)
get_best_model_row = results.iloc[0]
best_metrics = get_best_model_row.to_dict()
best_metrics.pop('Model', None)
return tuned_model, best_metrics
except Exception as e:
logger.exception(f'Exception - {e}')
@staticmethod
def get_model_name(model):
try:
model_name = str(model).split('(')[0]
return model_name
except Exception as e:
logger.info(f"Unable to get the model name - {e}")
def get_auto_ml_model(self, df, target):
try:
model, metrics = self.get_best_model(df=df, target=target)
model_name = self.get_model_name(model)
hyper_params = model.get_params()
return model, model_name, metrics, hyper_params
except Exception as e:
logger.info(f"Unable to get the model name - {e}")
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment