Commit ff9304ef authored by aakash.bedi's avatar aakash.bedi

added calculation script

parent 9a0e88b5
Pipeline #58113 failed with stage
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
data
.idea/
.vscode/
.pytest_cache/
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
This diff is collapsed.
FROM python:3.7-buster
RUN apt-get update -y && \
apt install -y openjdk-11-jre && \
apt install -y libtiff5-dev libjpeg62-turbo-dev && \
apt install -y zlib1g-dev libfreetype6-dev liblcms2-dev libwebp-dev tcl8.6-dev && \
apt install -y tk8.6-dev python-tk pdftk libmagickwand-dev
COPY . /code
WORKDIR /code
RUN pip install -r requirements.txt
CMD [ "python","app.py" ]
\ No newline at end of file
# Dalmia degradation calculation # Dalmia degradation calculation module
\ No newline at end of file \ No newline at end of file
if __name__ == "__main__":
from dotenv import load_dotenv
load_dotenv(dotenv_path='config.env')
import pandas as pd
import numpy as np
import warnings
from loguru import logger
from scripts.utils.pycaret_util import PycaretUtil
from scripts.core.engine.mppt_data import GetData
from scripts.utils.reading_tags import GetTags
from scripts.core.engine.tags_data import get_tags_data
from scripts.utils.start_end_date import KairosStartEndDate
from scripts.utils.preprocessing import DataPreprocessing
from scripts.core.engine.inv_and_mppt_level import TrainingInference
warnings.filterwarnings("ignore")
base_path = 'data_folder'
start_date, end_date, start_timestamp, end_timestamp = KairosStartEndDate().start_end_date()
def get_tag_details():
try:
get_tags = GetTags(base_path=base_path)
tags_excel = get_tags.read_tag_excel()
mppt_tags = get_tags.get_mppt_tags(df=tags_excel, substrings='MPPT')
df = get_tags_data(mppt_tags=mppt_tags,
start_timestamp=start_timestamp,
end_timestamp=end_timestamp)
logger.info(f'Shape of final df - {df.shape}')
mppt_data = GetData()
df_mppt = mppt_data.current_voltage_mppt_data(df=df)
data_preprocessing = DataPreprocessing()
df_mppt = data_preprocessing.remove_outliers(df=df_mppt, param_list=['tilt_irradiance', 'voltage_mppt', 'current_mppt'])
df_mppt, df_train, df_test = data_preprocessing.train_test_split(df=df_mppt)
unique_inv_id = list(df_train.inv_id.unique())
unique_mppt_id = list(df_train.mppt_id.unique())
get_training_inference = TrainingInference(df=df_mppt, df_train=df_train, df_test=df_test)
for inv_id in unique_inv_id:
for mppt_id in unique_mppt_id:
try:
model, scaler_x, scaler_y = get_training_inference.data_training(inv_id=inv_id, mppt_id=mppt_id)
x_test, y_test, predictions = get_training_inference.data_inference(scaler_x=scaler_x,
scaler_y=scaler_y,
model=model,
inv_id=inv_id,
mppt_id=mppt_id)
df_result = mppt_data.get_final_data(x_test=x_test, y_test=y_test, predictions=predictions)
logger.info(f'{df_result.shape}')
except Exception as e:
logger.exception(f'Exception - {e}')
except Exception as e:
logger.exception(f'Exception - {e}')
if __name__ == '__main__':
get_tag_details()
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
[LOGGING]
level = $LOG_LEVEL
traceback = $LOG_TRACEBACK
[MODULE]
name = $APP_NAME
[KAIROS_DB]
uri=$KAIROS_URI
metric_name=$KAIROS_METRIC
aggregator=$AGGREGATOR
aggregator_value=$AGGREGATOR_VALUE
aggregator_unit=$AGGREGATOR_UNIT
[KAFKA]
kafka_host=$KAFKA_HOST
kafka_port=$KAFKA_PORT
kafka_topic=$KAFKA_TOPIC
[DATE_RANGE]
start_date=$START_DATE
end_date=$END_DATE
start_relative_days=$START_RELATIVE
end_relative_days=$END_RELATIVE
[EMAIL_DETAILS]
email_reciever=$RECIEVER_EMAIL
email_sender=$SENDER_EMAIL
[MONGO]
mongo_uri=$MONGO_URI
[TIMEZONE]
required_tz=$REQUIRED_TZ
[MLFLOW]
mlflow_tracking_uri=$MLFLOW_TRACKING_URI
mlflow_tracking_username=$MLFLOW_TRACKING_USERNAME
mlflow_tracking_password=$MLFLOW_TRACKING_PASSWORD
azure_storage_connection_string=$AZURE_STORAGE_CONNECTION_STRING
azure_storage_access_key=$AZURE_STORAGE_ACCESS_KEY
user=$USER
experiment_name=$EXPERIMENT_NAME
run_name=$RUN_NAME
model_name=$MODEL_NAME
check_param=$CHECK_PARAM
model_check_param=$MODEL_CHECK_PARAM
[PYCARET]
models_list=$MODELS_LIST
selected_metric=$SELECTED_METRIC
hyperparameter_tuning_method=$HYPERPARAMETER_TUNING_METHOD
inv_list:
- inv_1
- inv_2
- inv_3
- inv_4
- inv_5
- inv_6
- inv_7
- inv_8
- inv_9
- inv_10
- inv_11
- inv_12
- inv_13
- inv_14
- inv_15
- inv_16
- inv_17
- inv_18
- inv_19
- inv_20
- inv_21
- inv_22
- inv_23
- inv_24
- inv_25
- inv_26
- inv_27
- inv_28
- inv_29
- inv_30
- inv_31
- inv_32
- inv_33
- inv_34
- inv_35
- inv_36
- inv_37
- inv_38
- inv_39
- inv_40
- inv_41
- inv_42
This diff is collapsed.
APP_NAME=dalmia-solar-degradation
KAIROS_URI=https://iLens:iLensDAL$456@dalmia.ilens.io/kairos/
KAIROS_METRIC=ilens.live_data.raw
AGGREGATOR=max
AGGREGATOR_VALUE=30
AGGREGATOR_UNIT=minutes
KAFKA_HOST=192.168.0.220
KAFKA_PORT=9092
KAFKA_TOPIC=ilens_dev
START_RELATIVE=90
END_RELATIVE=0
SENDER_EMAIL=no-reply@unifytwin.com
MONGO_URI=mongodb://ilens:ilens4321@192.168.0.220:2717/
REQUIRED_TZ="Asia/Kolkata"
MLFLOW_TRACKING_URI=https://qa.unifytwin.com/mlflow/
MLFLOW_TRACKING_USERNAME=mlflow
MLFLOW_TRACKING_PASSWORD=MlFlOwQA#4321
AZURE_STORAGE_CONNECTION_STRING=DefaultEndpointsProtocol=https;AccountName=azrmlilensqa006382180551;AccountKey=tDGOKfiZ2svfoMvVmS0Fbpf0FTHfTq4wKYuDX7cAxlhve/3991QuzdvJHm9vWc+lo6mtC+x9yPSghWNR4+gacg==;EndpointSuffix=core.windows.net
AZURE_STORAGE_ACCESS_KEY=tDGOKfiZ2svfoMvVmS0Fbpf0FTHfTq4wKYuDX7cAxlhve/3991QuzdvJHm9vWc+lo6mtC+x9yPSghWNR4+gacg==
USER=Dalmia_degradation
EXPERIMENT_NAME=Dalmia Solar Degradation1
RUN_NAME=Degradation
MODEL_NAME=versioning
CHECK_PARAM=hours
MODEL_CHECK_PARAM=144
MODELS_LIST=lr,knn,gbr,rf,catboost,lightgbm,ada,et,xgboost,dt,en,par,huber
SELECTED_METRIC=R2
HYPERPARAMETER_TUNING_METHOD=scikit-learn
\ No newline at end of file
This diff is collapsed.
if __name__ == "__main__":
from dotenv import load_dotenv
load_dotenv(dotenv_path='config.env')
import os
import os.path
import sys
from configparser import ConfigParser, BasicInterpolation
import yaml
from loguru import logger
# Configuring file constants
data_conf = "./conf/data.yml"
class EnvInterpolation(BasicInterpolation):
"""
Interpolation which expands environment variables in values.
"""
def before_get(self, parser, section, option, value, defaults):
value = super().before_get(parser, section, option, value, defaults)
if not os.path.expandvars(value).startswith("$"):
return os.path.expandvars(value)
else:
return
try:
config = ConfigParser(interpolation=EnvInterpolation())
config.read(f"conf/application.conf")
except Exception as e:
print(f"Error while loading the config: {e}")
print("Failed to Load Configuration. Exiting!!!")
sys.exit()
class Mongo:
mongo_uri = config["MONGO"]["mongo_uri"]
class KairosDb:
uri = config["KAIROS_DB"]["uri"]
metric_name = config['KAIROS_DB']['metric_name']
aggregator = config['KAIROS_DB']['aggregator']
aggregator_value = config['KAIROS_DB']['aggregator_value']
aggregator_unit = config['KAIROS_DB']['aggregator_unit']
class Kafka:
kafka_host = config["KAFKA"]["kafka_host"]
kafka_port = config["KAFKA"]["kafka_port"]
kafka_topic = config["KAFKA"]["kafka_topic"]
class DateRange:
start_date = config.get("DATE_RANGE", "start_date")
end_date = config.get("DATE_RANGE", "end_date")
start_relative_days = config.get("DATE_RANGE", "start_relative_days")
end_relative_days = config.get("DATE_RANGE", "end_relative_days")
class ReqTimeZone:
required_tz = config.get('TIMEZONE', 'required_tz')
class MlFlow:
mlflow_tracking_uri = config['MLFLOW']['mlflow_tracking_uri']
mlflow_tracking_username = config['MLFLOW']['mlflow_tracking_username']
mlflow_tracking_password = config['MLFLOW']['mlflow_tracking_password']
azure_storage_connection_string = config['MLFLOW']['azure_storage_connection_string']
azure_storage_access_key = config['MLFLOW']['azure_storage_access_key']
user = config['MLFLOW']['user']
experiment_name = config['MLFLOW']['experiment_name']
run_name = config['MLFLOW']['run_name']
model_name = config['MLFLOW']['model_name']
check_param = config['MLFLOW']['check_param']
model_check_param = config['MLFLOW']['model_check_param']
class PycaretParams:
model_list = config['PYCARET']['models_list']
selected_metric = config['PYCARET']['selected_metric']
hyperparameter_tuning_method= config['PYCARET']['hyperparameter_tuning_method']
json_file_path = "scripts/utils/"
class DBConstants:
# DB
db_metadata = "ilens_configuration"
# collections
collection_rule_targets = "rule_targets"
yield_sheet_name = "yield_reports_3cp"
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
from loguru import logger
from scripts.core.data_puller_push.push_data import insert_values_3cp
class CalculatedDataPush:
def __init__(self, df_calculated, all_cal_tags_dict):
self.df_calculated = df_calculated
self.all_cal_tags_dict = all_cal_tags_dict
def kafka_data_push(self, df_calculated):
try:
logger.info(f"Calculated dict length = {len(self.all_cal_tags_dict)}")
logger.info(f"Calculated df shape with Date and Timestamp column = {df_calculated.shape}")
df_calculated_tags_dict = {col: self.all_cal_tags_dict[col] for col in df_calculated.columns
if col not in ('Date', 'timestamp')}
for i, j in df_calculated.iterrows():
my_dict = {v: j[k] for k, v in df_calculated_tags_dict.items()}
logger.info(f"{j['timestamp'], j['Date'], my_dict}")
insert_values_3cp(j['timestamp'], my_dict)
except Exception as e:
logger.exception(f'Exception - {e}')
import json
import pandas as pd
import requests
from loguru import logger
from scripts.constants.app_configuration import KairosDb
class KairosQuery:
def __init__(self, start_timestamp, end_timestamp, tag_dict):
self.start_timestamp = start_timestamp
self.end_timestamp = end_timestamp
self.kairos_host = KairosDb.uri
self.kairos_url = "{kairos_host}/api/v1/datapoints/query".format(kairos_host=self.kairos_host)
self.tag_dict = tag_dict
def kairos_query(self):
try:
return {
"metrics": [
{
"tags": {
"c3": list(self.tag_dict.keys())
},
"name": KairosDb.metric_name,
"group_by": [
{
"name": "tag",
"tags": ["c3"]
}
],
"aggregators": [
{
"name": KairosDb.aggregator,
"sampling": {
"value": KairosDb.aggregator_value,
"unit": KairosDb.aggregator_unit
}
}
]
}
],
"plugins": [],
"cache_time": 0,
"time_zone": "Asia/Calcutta",
"start_absolute": self.start_timestamp,
"end_absolute": self.end_timestamp,
}
except Exception as e:
logger.exception(f"Exception - {e}")
def get_data(self, data_query):
try:
logger.info("Data for the parameters being pulled from Kairos Database")
response = requests.post(self.kairos_url, data=json.dumps(data_query))
response_data = response.json()
output_data = response_data["queries"]
logger.debug("Data pull complete")
df_final = pd.DataFrame()
for i in range(len(output_data)):
grouped_output_data = output_data[i]["results"]
for each_grouped_data in grouped_output_data:
value = (each_grouped_data["values"])
tag_id = each_grouped_data["group_by"][0]["group"]["c3"]
try:
logger.debug("Renamed {} to {} in Data".format(tag_id, self.tag_dict[tag_id]))
column_name = self.tag_dict[tag_id]
except KeyError as e:
logger.exception(f'Exception - {e}')
logger.debug("Column Renaming Logic not found for {}".format(tag_id))
column_name = tag_id
df_column_data = pd.DataFrame(data=value, columns=["timestamp", column_name])
if df_final.empty:
df_final = df_column_data
else:
df_final = df_final.merge(df_column_data, how="outer", left_on="timestamp",
right_on="timestamp")
df_final['datetime'] = pd.to_datetime(df_final['timestamp'], unit="ms").dt.tz_localize('UTC').\
dt.tz_convert('Asia/Kolkata')
logger.debug("Final number of columns : {}".format(str(len(list(df_final.columns)))))
return df_final
except Exception as e:
logger.exception(f"Exception occurred - {e}", exc_info=True)
def kairos_data_import(self):
try:
logger.debug("Fetching live data")
query_live = self.kairos_query()
logger.info(f"query_live = {query_live}")
df = self.get_data(data_query=query_live)
return df
except Exception as e:
logger.exception(f"Exception - {e}")
from json import dumps
from kafka import KafkaProducer
from loguru import logger
from scripts.constants.app_configuration import Kafka
def insert_values_3cp(ts, my_dict):
kairos_writer = KairosWriter()
kairos_writer.write_data(
{
ts: my_dict
},
Kafka.kafka_topic
)
logger.info("Data pushed successfully!")
class KafkaProducerUtil:
def __init__(self):
try:
self.host = Kafka.kafka_host
self.port = Kafka.kafka_port
logger.debug(f"Connecting to Kafka with details: {self.host}, {self.port}")
kafka_broker = [self.host + ":" + str(self.port)]
self.producer = KafkaProducer(
bootstrap_servers=kafka_broker,
value_serializer=lambda v: v.encode('utf-8'),
api_version=(0, 10, 1))
self.producer.flush()
except Exception as e:
logger.error(f"Kafka connection error: {e}")
def publish(self, topic, data):
try:
kafka_response = self.producer.send(topic, data)
self.producer.flush()
logger.debug(f" Message sent to kafka with response: {kafka_response}")
return True
except Exception as e:
logger.error(e)
return False
class KairosWriter(KafkaProducerUtil):
def write_data(self, data_json, topic):
site_id = "site_116"
logger.debug(f"Data being pushed to kafka topic: {topic}")
msg_counter = 0
for k, v in data_json.items():
timestamp, data = self.data_validator(k, v)
timestamp = timestamp * 1000
write_json = {
"data": data,
"site_id": site_id,
"gw_id": "gw_{}".format(site_id.lstrip("site_")), # The lstrip(s) removes leading whitespace (on the left)
"pd_id": "pd_{}".format(site_id.lstrip("site_")), # The rstrip(s) removes the trailing whitespace (on the right)
"timestamp": timestamp,
"msg_id": msg_counter,
"partition": "",
"retain_flag": False
}
logger.debug(f"Timestamp: {timestamp}, Values: {data}")
self.publish(topic, dumps(write_json))
msg_counter += 1
return msg_counter
def audit_data(self, data_json, topic):
logger.debug(f"Audit Data being pushed to kafka topic: {topic}")
msg_counter = len(data_json)
for each in data_json:
self.publish(topic, dumps(each))
return msg_counter
@staticmethod
def data_validator(timestamp, data):
logger.debug("Validating the data to remove Nan values")
__temp__ = {}
for k, v in data.items():
if not k.startswith("site"):
continue
if isinstance(v, (int, float)) and str(v) not in ('nan', 'inf'): # This function will return True if the "v" is one of the types in the tuple.
__temp__[k] = v
return int(timestamp), __temp__
import pandas as pd
import numpy as np
from loguru import logger
from scripts.utils.pycaret_util import PycaretUtil
from scripts.utils.preprocessing import DataPreprocessing
from scripts.utils.mlflow_util import ModelLoad
class TrainingInference:
def __init__(self, df, df_train, df_test):
self.df = df
self.df_train = df_train
self.df_test = df_test
def data_training(self, inv_id, mppt_id):
try:
data_preprocessing = DataPreprocessing()
df_train_inv = self.df_train[self.df_train['inv_id'] == inv_id]
df_train_mppt = df_train_inv[df_train_inv['mppt_id'] == mppt_id]
x_train = df_train_mppt[['datetime', 'inv_id', 'mppt_id', 'hour', 'tilt_irradiance', 'voltage_mppt']]
y_train = df_train_mppt[['current_mppt']]
x_train_std, scaler_x = data_preprocessing.get_standardized_data(df=x_train,
param_list=['datetime', 'inv_id',
'mppt_id'])
y_train_std, scaler_y = data_preprocessing.get_standardized_data(df=y_train)
df_std = pd.concat([x_train_std, y_train_std], axis=1)
df_std.dropna(axis=0, inplace=True)
df_std.reset_index(drop=True, inplace=True)
model, pre_trained = ModelLoad().model_manager(df=df_std, target='current_mppt')
return model, scaler_x, scaler_y
except Exception as e:
logger.exception(f'Exception - {e}')
def data_inference(self, scaler_x, scaler_y, model, inv_id, mppt_id):
try:
df_test_inv = self.df_test[self.df_test['inv_id'] == inv_id]
df_test_mppt = df_test_inv[df_test_inv['mppt_id'] == mppt_id]
df_test_mppt.reset_index(drop=True, inplace=True)
x_test = df_test_mppt[['datetime', 'inv_id', 'mppt_id', 'hour', 'tilt_irradiance', 'voltage_mppt']]
y_test = df_test_mppt[['current_mppt']]
data_preprocessing = DataPreprocessing()
x_test_std = data_preprocessing.get_transform_std_data(df=x_test,
param_list=['datetime', 'inv_id', 'mppt_id'],
scaler=scaler_x)
y_test_std = data_preprocessing.get_transform_std_data(df=y_test, scaler=scaler_y)
predictions = model.predict(x_test_std).reshape(1, -1)
predictions = np.array(scaler_y.inverse_transform(predictions)).reshape(-1, 1)
y_test = scaler_y.inverse_transform(y_test_std)
return x_test, y_test, predictions
except Exception as e:
logger.exception(f'Exception - {e}')
import pandas as pd
from loguru import logger
class GetData:
@staticmethod
def current_voltage_mppt_data(df):
try:
current_column_list = [col_name for col_name in df if 'current' in col_name]
voltage_column_list = [col_name for col_name in df if 'voltage' in col_name]
current_column_list.sort()
voltage_column_list.sort()
df_mppt = pd.DataFrame()
for n_mppt in range(len(current_column_list)):
df_temp = df[['datetime', 'inv_id', 'date', 'hour', 'tilt_irradiance', voltage_column_list[n_mppt],
current_column_list[n_mppt]]]
df_temp['mppt_id'] = current_column_list[n_mppt]
df_temp['mppt_id'] = df_temp['mppt_id'].str.replace('current_', '')
df_temp.rename(columns={voltage_column_list[n_mppt]: 'voltage_mppt',
current_column_list[n_mppt]: 'current_mppt'}, inplace=True)
if df_mppt.empty:
df_mppt = df_temp
else:
df_mppt = pd.concat([df_mppt, df_temp], axis=0)
df_mppt.reset_index(drop=True, inplace=True)
df_mppt.sort_values(['inv_id', 'mppt_id'], inplace=True)
df_mppt.reset_index(drop=True, inplace=True)
return df_mppt
except Exception as e:
logger.exception(f'Exception - {e}')
@staticmethod
def get_final_data(x_test, y_test, predictions):
try:
df_result = pd.DataFrame(index=[i for i in range(len(y_test))])
df_result['datetime'] = x_test['datetime']
df_result['actual_current_mppt'] = y_test
df_result['predicted_current_mppt'] = predictions
df_result['potential_current_mppt_loss'] = df_result['predicted_current_mppt'] - \
df_result['actual_current_mppt']
df_result.loc[df_result['potential_current_mppt_loss'] < 0, 'potential_current_mppt_loss'] = 0
df_result['hour'] = df_result['datetime'].dt.hour
df_result.loc[df_result['hour'].between(left=19, right=23, inclusive='both'),
'potential_current_mppt_loss'] = 0
df_result.loc[df_result['hour'].between(left=0, right=5, inclusive='both'),
'potential_current_mppt_loss'] = 0
df_result.drop(['hour'], axis=1, inplace=True)
df_result['total_potential_current_mppt_loss'] = df_result['potential_current_mppt_loss']. \
rolling(min_periods=1, window=len(df_result)).sum()
df_result.drop(['actual_current_mppt', 'predicted_current_mppt'], axis=1, inplace=True)
df_result = df_result.round(2)
df_result.reset_index(drop=True, inplace=True)
logger.info(f'{df_result.shape}')
return df_result
except Exception as e:
logger.exception(f'Exception - {e}')
import pandas as pd
from loguru import logger
from scripts.core.data_puller_push.data_puller import KairosQuery
from scripts.utils.reading_tags import GetTags
base_path = 'data_folder'
def get_tags_data(mppt_tags, start_timestamp, end_timestamp):
try:
get_tags = GetTags(base_path=base_path)
df_merged = pd.DataFrame()
for inv_id in list(mppt_tags['inv_id'].unique()):
df_tags_id = get_tags.get_tags_id(df=mppt_tags, inv_id=inv_id)
tags_dict = df_tags_id[['tag_id', 'parameter_name']].set_index('tag_id').T.to_dict(orient="records")[0]
tags_dict['site_107$dept_140$line_371$equipment_4115$tag_15828'] = 'tilt_irradiance'
df_data = KairosQuery(start_timestamp=start_timestamp,
end_timestamp=end_timestamp,
tag_dict=tags_dict).kairos_data_import()
df_data['inv_id'] = inv_id
if df_merged.empty:
df_merged = df_data
else:
df_merged = pd.concat([df_merged, df_data], axis=0)
logger.info(f' for inv- {inv_id}, {df_data.shape}')
logger.info(f'{df_merged.shape}')
df_merged['date'] = df_merged['datetime'].dt.date
df_merged['hour'] = df_merged['datetime'].dt.hour
logger.info(f'Final shape of merged dataframe = {df_merged.shape}')
df_merged.reset_index(drop=True, inplace=True)
# df_merged.to_csv(f'{base_path}/df_merged.csv', index=False)
return df_merged
except Exception as e:
logger.exception(f'Exception - {e}')
This diff is collapsed.
import pandas as pd
from loguru import logger
import pytz
from datetime import datetime, timedelta
from scripts.constants.app_configuration import ReqTimeZone
from sklearn.preprocessing import MinMaxScaler
class DataPreprocessing:
@staticmethod
def remove_outliers(df, param_list):
try:
for col in param_list:
lb = df[col].mean() - 3 * df[col].std()
ub = df[col].mean() + 3 * df[col].std()
logger.debug(f"Min values of {col} = {df[col].min()} \nLower Bracket of {col} = {lb}")
logger.debug(f"Max values of {col} = {df[col].max()} \nUpper Bracket of {col} = {ub}")
logger.debug(f'Shape of df before outlier removal = {df.shape}')
df = (df[(df[col] > lb) & (df[col] < ub)])
logger.debug(f'Shape of df after outlier removal = {df.shape}')
logger.debug(f'Shape final df before outlier removal = {df.shape}')
return df
except Exception as e:
logger.exception(f'Exception - {e}')
@staticmethod
def train_test_split(df):
try:
today_date = datetime.now(pytz.utc).astimezone(pytz.timezone(ReqTimeZone.required_tz)).date()
df = df[df['date'] != today_date]
yesterday_date = today_date - timedelta(days=1)
df_train = df[df['date'] < yesterday_date]
df_test = df[df['date'] == yesterday_date]
df_train.reset_index(drop=True, inplace=True)
df_test.reset_index(drop=True, inplace=True)
df_train.drop(['date'], axis=1, inplace=True)
df_test.drop(['date'], axis=1, inplace=True)
return df, df_train, df_test
except Exception as e:
logger.exception(f'Exception - {e}')
@staticmethod
def get_standardized_data(df, param_list=None):
try:
if param_list is None:
scaler = MinMaxScaler()
df_std = pd.DataFrame(scaler.fit_transform(df), columns=list(df.columns))
return df_std, scaler
else:
scaler = MinMaxScaler()
df_std = pd.DataFrame(scaler.fit_transform(df.drop(param_list, axis=1)),
columns=list(df.drop(param_list, axis=1).columns))
return df_std, scaler
except Exception as e:
logger.exception(f'Exception - {e}')
@staticmethod
def get_transform_std_data(df, scaler, param_list=None):
try:
if param_list is None:
df_std = pd.DataFrame(scaler.transform(df), columns=list(df.columns))
return df_std
else:
df_std = pd.DataFrame(scaler.transform(df.drop(param_list, axis=1)),
columns=list(df.drop(param_list, axis=1).columns))
return df_std
except Exception as e:
logger.exception(f'Exception - {e}')
\ No newline at end of file
from loguru import logger
from pycaret import regression
from scripts.constants.app_configuration import MlFlow, PycaretParams
class PycaretUtil:
def __init__(self):
self.model_list = PycaretParams.model_list.split(",")
self.selected_metric = PycaretParams.selected_metric
self.hyperparameter_tuning_method = PycaretParams.hyperparameter_tuning_method
def get_best_model(self, df, target):
try:
regression.setup(data=df, target=target)
best_model = regression.compare_models(include=self.model_list, sort=self.selected_metric,
n_select=1)
tuned_model = regression.tune_model(best_model, optimize=self.selected_metric,
search_library=self.hyperparameter_tuning_method)
results = regression.pull()
results.sort_values(self.selected_metric, ascending=False, inplace=True)
results.reset_index(drop=True, inplace=True)
get_best_model_row = results.iloc[0]
best_metrics = get_best_model_row.to_dict()
best_metrics.pop('Model', None)
return tuned_model, best_metrics
except Exception as e:
logger.exception(f'Exception - {e}')
@staticmethod
def get_model_name(model):
try:
model_name = str(model).split('(')[0]
return model_name
except Exception as e:
logger.info(f"Unable to get the model name - {e}")
def get_auto_ml_model(self, df, target):
try:
model, metrics = self.get_best_model(df=df, target=target)
model_name = self.get_model_name(model)
hyper_params = model.get_params()
return model, model_name, metrics, hyper_params
except Exception as e:
logger.info(f"Unable to get the model name - {e}")
from loguru import logger
import pandas as pd
class GetTags:
def __init__(self, base_path):
self.base_path = base_path
def read_tag_excel(self):
try:
df = pd.read_excel(f'{self.base_path}/tags_download.xlsx')
df.drop(['Site', 'Plant', 'Line', 'Tag'], axis=1, inplace=True)
df.rename(columns={'Tag ID': 'tag_id', 'Tag Name': 'tag_name',
'Equipment': 'inv_id', 'Parameter Name': 'parameter_name'}, inplace=True)
return df
except Exception as e:
logger.exception(f'Exception - {e}')
def get_mppt_tags(self, df, substrings):
try:
data_with_substring = self.get_substring_data(substrings=substrings, df=df, column='parameter_name')
req_data_list = self.removed_substring(substring_data_list=data_with_substring,
remove_parameter='Efficiency')
df = self.get_substring_df(df=df, column='parameter_name', substring_data_list=req_data_list)
df.reset_index(drop=True, inplace=True)
df['parameter_name'] = df['parameter_name'].str.replace('Voltage MPPT ', 'voltage_mppt_')
df['parameter_name'] = df['parameter_name'].str.replace('Current MPPT ', 'current_mppt_')
df['inv_id'] = df['inv_id'].str.replace('INV ', 'inv_')
df['inv_id'] = df['inv_id'].str.replace('Plant ', 'plant')
df['mppt_id'] = df['parameter_name'].copy()
df['mppt_id'] = df['mppt_id'].str.replace('current_', '')
df['mppt_id'] = df['mppt_id'].str.replace('voltage_', '')
df = df.sort_values(['inv_id', 'mppt_id'])
df['mppt_id_with_equipment'] = df['parameter_name'] + '_' + df['inv_id']
df.reset_index(drop=True, inplace=True)
return df
except Exception as e:
logger.exception(f'Exception - {e}')
@staticmethod
def get_tags_id(df, inv_id):
try:
df = df[df['inv_id'] == inv_id]
df = df[['tag_id', 'tag_name', 'inv_id', 'parameter_name', 'mppt_id',
'mppt_id_with_equipment']]
df.reset_index(drop=True, inplace=True)
return df
except Exception as e:
logger.exception(f'Exception - {e}')
@staticmethod
def get_substring_data(substrings, df, column):
try:
req_substrings = substrings
data_with_substring = [data for data in df[column] if req_substrings in data]
return data_with_substring
except Exception as e:
logger.exception(f'Exception - {e}')
@staticmethod
def removed_substring(substring_data_list, remove_parameter):
try:
req_data_list = [data for data in substring_data_list if remove_parameter not in data]
return req_data_list
except Exception as e:
logger.exception(f'Exception - {e}')
@staticmethod
def get_substring_df(df, column, substring_data_list):
try:
df = df.loc[df[column].isin(substring_data_list)]
df.reset_index(drop=True, inplace=True)
return df
except Exception as e:
logger.exception(f'Exception - {e}')
from datetime import datetime, timedelta
from scripts.constants.app_configuration import DateRange,ReqTimeZone
from loguru import logger
import pytz
class KairosStartEndDate:
@staticmethod
def start_end_date():
try:
local_timezone = pytz.timezone(ReqTimeZone.required_tz)
start_date_timestamp = DateRange.start_date
end_date_timestamp = DateRange.end_date
if (start_date_timestamp is not None) and (start_date_timestamp.lower() != "none"):
start_date = datetime.fromtimestamp((int(start_date_timestamp)/1000)).strftime('%Y-%m-%d %H:%M:%S')
start_date = datetime.strptime(start_date, "%Y-%m-%d %H:%M:%S")
start_date = start_date.astimezone(local_timezone).replace(hour=5, minute=0, second=0, microsecond=0)
else:
start_date = datetime.now(pytz.utc) - timedelta(days=int(DateRange.start_relative_days))
start_date = start_date.astimezone(local_timezone).replace(hour=5, minute=0, second=0, microsecond=0)
if (end_date_timestamp is not None) and (end_date_timestamp.lower() != "none"):
end_date = datetime.fromtimestamp((int(end_date_timestamp)/1000)).strftime('%Y-%m-%d %H:%M:%S')
end_date = datetime.strptime(end_date, "%Y-%m-%d %H:%M:%S")
end_date = end_date.astimezone(local_timezone).replace(hour=5, minute=0, second=0, microsecond=0)
else:
end_date = datetime.now(pytz.utc) - timedelta(days=int(DateRange.end_relative_days))
end_date = end_date.astimezone(local_timezone).replace(hour=5, minute=0, second=0, microsecond=0)
start_timestamp = int(start_date.timestamp())*1000
end_timestamp = int(end_date.timestamp())*1000
return start_date, end_date, start_timestamp, end_timestamp
except Exception as e:
logger.exception(f"Exception - {e}")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment