Commit 647d193e authored by dasharatha.vamshi's avatar dasharatha.vamshi

completed the flow

parent ad5678ea
# Get start and end time
## Overview
- **Component Name** : Get start and end time
- **Component Description** :
- **Component Type** : Input type
## Component Param
Variable Name |Datatype |Required/Optional |Default Value |Type |Description |Example
--- |--- |--- |--- |--- |--- |--- |
START_RELATIVE |String |Required |None |env | |
START_HOUR_RELATIVE |String |Required |None |env | |
DAY_STARTING_HOUR |String |Required |None |env | |
START_MINUTE_RELATIVE |String |Required |None |env | |
START_SECOND_RELATIVE |String |Required |None |env | |
END_RELATIVE |String |Required |None |env | |
END_HOUR_RELATIVE |String |Required |None |env | |
END_MINUTE_RELATIVE |String |Required |None |env | |
END_SECOND_RELATIVE |String |Required |None |env | |
REQUIRED_TZ |String |Required |None |env | |
Output |JsonObject |Required |None |outputPath | |
> Note 1 : Available Component types are: Input, Transform, Output.
> Note 2 : Available Environment types are: env, InputValues, InputPath, OutputValues, OutputPath, PipelineParm
\ No newline at end of file
import kfp
from loguru import logger
from src import program
import yaml
import inspect
import os
function = \
[func[1] for func in inspect.getmembers(program, inspect.isfunction) if inspect.getmodule(func[1]) == program][0]
def read_data_from_yaml(path):
"""
It opens the file at the given path, reads the contents, and then parses the contents as YAML
:param path: The path to the YAML file
:return: A dictionary
"""
with open(path, "r") as stream:
return yaml.load(stream, Loader=yaml.FullLoader)
def get_component_yml():
"""
:param file_name:
:return:
"""
try:
requirements = list()
with open('requirements.txt', 'r') as file:
for line in file:
if "=" in line and "#" not in line:
requirements.append(line.strip())
elif "#" in line:
...
else:
logger.exception(f"Mentioned package does not have version {line.strip()}")
date_function_yml = kfp.components.func_to_component_text(
function, packages_to_install=requirements)
variables_path = "variables.yml"
if os.path.exists(variables_path):
yaml_data: dict = read_data_from_yaml(variables_path)
if yaml_data:
envs: dict = yaml_data.get("deployment", {}).get("environmentVar", [])
python_version: str = yaml_data.get("deployment", {}).get("pythonVersion", None)
if python_version is not None and python_version in ["3.7", "3.8", "3.9", "3.10"]:
date_function_yml = kfp.components.func_to_component_text(
function, packages_to_install=requirements, base_image=f"python:{python_version}")
date_function = date_function_yml + f" env:\n"
for env_var in envs:
date_function += f" {env_var['name']}: '{env_var['value']}'\n"
with open('component.yml', 'w') as file:
file.write(date_function)
else:
with open('component.yml', 'w') as file:
file.write(date_function_yml)
else:
with open('component.yml', 'w') as file:
file.write(date_function_yml)
except Exception as e:
logger.exception(f"Unable to get the component yml {e}")
def create_table(data, key):
"""
:return:
"""
try:
rows_list = list()
for each_input in data.get(key, []):
rows_dict = dict()
rows_dict['name'] = each_input.get("name", '')
rows_dict['data_type'] = each_input.get('type', 'String')
if each_input.get('optional'):
req_opt = "Optional"
default_value = each_input.get('default', '')
else:
req_opt = "Required"
default_value = "None"
rows_dict['req_opt'] = req_opt
rows_dict['default_value'] = default_value
for each_arg in data.get('implementation', {}).get('container', {}).get('args', []):
if type(each_arg) == dict and rows_dict['name'] in each_arg.values():
rows_dict['Type'] = list(each_arg.keys())[0]
rows_dict['Description'] = each_input.get('description', '')
rows_dict['Example'] = ''
rows_list.append(list(rows_dict.values()))
if key == "inputs" and os.path.exists("variables.yml"):
yaml_data: dict = read_data_from_yaml("variables.yml")
if yaml_data:
env_var = yaml_data.get("deployment", {}).get("environmentVar", [])
for each in env_var:
env_dict = dict()
env_dict['name'] = each.get("name")
env_dict['data_type'] = "String"
env_dict['req_opt'] = "Required"
env_dict['default_value'] = "None"
env_dict['Type'] = "env"
env_dict['description'] = ""
env_dict['example'] = ""
rows_list.append(list(env_dict.values()))
return rows_list
except Exception as e:
logger.exception(f"Unable to create the table for README.MD file {e}")
def create_readme():
"""
Function is to create the readme file for the given components details
:return: Create the README.MD file in the given path
"""
try:
note_1 = "Note 1 : Available Component types are: Input, Transform, Output."
note_2 = "Note 2 : Available Environment types are: env, InputValues, InputPath, OutputValues," \
" OutputPath, PipelineParm"
column_list = ["Variable Name", "Datatype", "Required/Optional", "Default Value", "Type", "Description",
"Example"]
with open("component.yml", "r") as file:
data = yaml.safe_load(file)
if "inputs" in list(data.keys()) and "outputs" in list(data.keys()):
component_type = "Transform type"
elif "inputs" not in data:
component_type = "Input type"
else:
component_type = "Output type"
component_overview_json = dict()
component_overview_json['Component Name'] = data.get("name", " ")
component_overview_json['Component Description'] = data.get("description", " ")
component_overview_json['Component Type'] = component_type
rows_list_input = create_table(data, "inputs")
rows_list_output = create_table(data, "outputs")
rows_list = rows_list_input + rows_list_output
header = component_overview_json.get("Component Name")
table_header = " |".join(column_list) + "\n"
table_line = "--- |" * len(column_list) + "\n"
table_body = "\n".join(map(lambda x: " |".join(x), rows_list))
table = table_header + table_line + table_body
readme = f"""
# {header}
## {"Overview"}
- **Component Name** : {component_overview_json.get("Component Name")}
- **Component Description** : {component_overview_json.get("Component Description")}
- **Component Type** : {component_overview_json.get("Component Type")}
## Component Param
{table}
> {note_1}
> {note_2}
"""
with open('README.md', 'w') as f:
f.write(readme)
except Exception as e:
logger.exception(f"Unable to create the README.MD file {e}")
if __name__ == "__main__":
get_component_yml()
create_readme()
name: Get start and end time
outputs:
- {name: Output, type: JsonObject}
implementation:
container:
image: python:3.9
command:
- sh
- -c
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
'loguru==0.5.3' 'pytz==2021.3' 'pymongo~=3.12.1' 'pandas==1.3.*' || PIP_DISABLE_PIP_VERSION_CHECK=1
python3 -m pip install --quiet --no-warn-script-location 'loguru==0.5.3' 'pytz==2021.3'
'pymongo~=3.12.1' 'pandas==1.3.*' --user) && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def get_start_and_end_time():
from loguru import logger
from datetime import datetime, timedelta
import pytz
import os
class DateRange:
start_relative_days = int(os.getenv("START_RELATIVE"))
start_hour_relative = int(os.getenv("START_HOUR_RELATIVE"))
day_starting_hour = int(os.getenv("DAY_STARTING_HOUR"))
start_minute_relative = int(os.getenv("START_MINUTE_RELATIVE"))
start_second_relative = int(os.getenv("START_SECOND_RELATIVE"))
end_relative_days = int(os.getenv("END_RELATIVE"))
end_hour_relative = int(os.getenv("END_HOUR_RELATIVE"))
end_minute_relative = int(os.getenv("END_MINUTE_RELATIVE"))
end_second_relative = int(os.getenv("END_SECOND_RELATIVE"))
class ReqTimeZone:
required_tz = os.getenv("REQUIRED_TZ")
try:
start_date = (datetime.now(pytz.utc) - timedelta(days=int(DateRange.start_relative_days),
hours=int(DateRange.start_hour_relative))). \
astimezone(pytz.timezone(ReqTimeZone.required_tz)).replace(hour=int(DateRange.day_starting_hour),
minute=int(DateRange.start_minute_relative),
second=int(DateRange.start_second_relative),
microsecond=0)
end_date = (datetime.now(pytz.utc) - timedelta(days=int(DateRange.end_relative_days),
hours=int(DateRange.end_hour_relative))). \
astimezone(pytz.timezone(ReqTimeZone.required_tz)).replace(minute=int(DateRange.end_minute_relative),
second=int(DateRange.end_second_relative),
microsecond=0)
start_timestamp = int(start_date.timestamp()) * 1000
end_timestamp = int(end_date.timestamp()) * 1000
final_dict = {
"start_timestamp": start_timestamp,
"end_timestamp": end_timestamp}
print(final_dict)
return final_dict
except Exception as e:
logger.exception(f'Exception - {e}')
def _serialize_json(obj) -> str:
if isinstance(obj, str):
return obj
import json
def default_serializer(obj):
if hasattr(obj, 'to_struct'):
return obj.to_struct()
else:
raise TypeError(
"Object of type '%s' is not JSON serializable and does not have .to_struct() method."
% obj.__class__.__name__)
return json.dumps(obj, default=default_serializer, sort_keys=True)
import argparse
_parser = argparse.ArgumentParser(prog='Get start and end time', description='')
_parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1)
_parsed_args = vars(_parser.parse_args())
_output_files = _parsed_args.pop("_output_paths", [])
_outputs = get_start_and_end_time(**_parsed_args)
_outputs = [_outputs]
_output_serializers = [
_serialize_json,
]
import os
for idx, output_file in enumerate(_output_files):
try:
os.makedirs(os.path.dirname(output_file))
except OSError:
pass
with open(output_file, 'w') as f:
f.write(_output_serializers[idx](_outputs[idx]))
args:
- '----output-paths'
- {outputPath: Output}
env:
START_RELATIVE: '0'
START_HOUR_RELATIVE: '1'
DAY_STARTING_HOUR: '5'
START_MINUTE_RELATIVE: '0'
START_SECOND_RELATIVE: '0'
END_RELATIVE: '0'
END_HOUR_RELATIVE: '1'
END_MINUTE_RELATIVE: '59'
END_SECOND_RELATIVE: '59'
REQUIRED_TZ: 'Asia/Bangkok'
loguru==0.5.3
pytz==2021.3
pymongo~=3.12.1
pandas==1.3.*
\ No newline at end of file
def get_start_and_end_time() -> dict:
from loguru import logger
from datetime import datetime, timedelta
import pytz
import os
class DateRange:
start_relative_days = int(os.getenv("START_RELATIVE"))
start_hour_relative = int(os.getenv("START_HOUR_RELATIVE"))
day_starting_hour = int(os.getenv("DAY_STARTING_HOUR"))
start_minute_relative = int(os.getenv("START_MINUTE_RELATIVE"))
start_second_relative = int(os.getenv("START_SECOND_RELATIVE"))
end_relative_days = int(os.getenv("END_RELATIVE"))
end_hour_relative = int(os.getenv("END_HOUR_RELATIVE"))
end_minute_relative = int(os.getenv("END_MINUTE_RELATIVE"))
end_second_relative = int(os.getenv("END_SECOND_RELATIVE"))
class ReqTimeZone:
required_tz = os.getenv("REQUIRED_TZ")
try:
start_date = (datetime.now(pytz.utc) - timedelta(days=int(DateRange.start_relative_days),
hours=int(DateRange.start_hour_relative))). \
astimezone(pytz.timezone(ReqTimeZone.required_tz)).replace(hour=int(DateRange.day_starting_hour),
minute=int(DateRange.start_minute_relative),
second=int(DateRange.start_second_relative),
microsecond=0)
end_date = (datetime.now(pytz.utc) - timedelta(days=int(DateRange.end_relative_days),
hours=int(DateRange.end_hour_relative))). \
astimezone(pytz.timezone(ReqTimeZone.required_tz)).replace(minute=int(DateRange.end_minute_relative),
second=int(DateRange.end_second_relative),
microsecond=0)
start_timestamp = int(start_date.timestamp()) * 1000
end_timestamp = int(end_date.timestamp()) * 1000
final_dict = {
"start_timestamp": start_timestamp,
"end_timestamp": end_timestamp}
print(final_dict)
return final_dict
except Exception as e:
logger.exception(f'Exception - {e}')
deployment:
environmentVar:
- name: START_RELATIVE
value: 0
- name: START_HOUR_RELATIVE
value: 1
- name: DAY_STARTING_HOUR
value: 5
- name: START_MINUTE_RELATIVE
value: 0
- name: START_SECOND_RELATIVE
value: 0
- name: END_RELATIVE
value: 0
- name: END_HOUR_RELATIVE
value: 1
- name: END_MINUTE_RELATIVE
value: 59
- name: END_SECOND_RELATIVE
value: 59
- name: REQUIRED_TZ
value: "Asia/Bangkok"
pythonVersion: "3.9"
......@@ -21,11 +21,23 @@ def forecast_pipeline(pipeline_param: dict, plant_info: dict):
"input_components/get_final_predicted_tags/component.yml")
get_inv_and_level_efficiency_tags = kfp.components.load_component_from_file(
"input_components/get_inv_and_level_efficiency_tags/component.yml")
# get_start_end_date = kfp.components.load_component_from_file(
# "input_components/get_start_and_end_date/component.yml")
get_inv_and_mppt_level_efficiency = kfp.components.load_component_from_file(
"transform_components/inv_and_mppt_level_efficiency/component.yml")
# Calling the component
get_tags_function_task = get_tags_function_component(pipeline_param).set_memory_request('600M').set_memory_limit('1200M').\
set_cpu_request('700m').set_cpu_limit('1400m')
get_final_predicted_tags_task = get_final_predicted_tags(get_tags_function_task.output)
get_inv_and_level_efficiency_tags_task = get_inv_and_level_efficiency_tags(get_tags_function_task.output)
# get_start_end_date_task = get_start_end_date().set_memory_request('600M').set_memory_limit('1200M').\
# set_cpu_request('700m').set_cpu_limit('1400m')
get_inv_and_mppt_level_efficiency_task = get_inv_and_mppt_level_efficiency(
get_tags_function_task.output,
get_final_predicted_tags_task.output,
get_inv_and_level_efficiency_tags_task.output)
# Disabling cacheing for all the components
get_tags_function_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
......
This source diff could not be displayed because it is too large. You can view the blob instead.
# Inv and mppt level efficiency
## Overview
- **Component Name** : Inv and mppt level efficiency
- **Component Description** :
- **Component Type** : Transform type
## Component Param
Variable Name |Datatype |Required/Optional |Default Value |Type |Description |Example
--- |--- |--- |--- |--- |--- |--- |
get_tags_component_output |String |Required |None |inputPath | |
get_final_predicted_tags |String |Required |None |inputPath | |
get_inv_level_efficiency_tags |String |Required |None |inputPath | |
KAIROS_URI |String |Required |None |env | |
METRIC_NAME |String |Required |None |env | |
AGGREGATOR |String |Required |None |env | |
AGGREGATOR_VALUE |String |Required |None |env | |
AGGREGATOR_UNIT |String |Required |None |env | |
REQUIRED_TZ |String |Required |None |env | |
KAFKA_HOST |String |Required |None |env | |
KAFKA_PORT |String |Required |None |env | |
KAFKA_TOPIC |String |Required |None |env | |
START_RELATIVE |String |Required |None |env | |
START_HOUR_RELATIVE |String |Required |None |env | |
DAY_STARTING_HOUR |String |Required |None |env | |
START_MINUTE_RELATIVE |String |Required |None |env | |
START_SECOND_RELATIVE |String |Required |None |env | |
END_RELATIVE |String |Required |None |env | |
END_HOUR_RELATIVE |String |Required |None |env | |
END_MINUTE_RELATIVE |String |Required |None |env | |
END_SECOND_RELATIVE |String |Required |None |env | |
REQUIRED_TZ |String |Required |None |env | |
output |String |Required |None |outputPath | |
> Note 1 : Available Component types are: Input, Transform, Output.
> Note 2 : Available Environment types are: env, InputValues, InputPath, OutputValues, OutputPath, PipelineParm
\ No newline at end of file
import kfp
from loguru import logger
from src import program
import yaml
import inspect
import os
function = \
[func[1] for func in inspect.getmembers(program, inspect.isfunction) if inspect.getmodule(func[1]) == program][0]
def read_data_from_yaml(path):
"""
It opens the file at the given path, reads the contents, and then parses the contents as YAML
:param path: The path to the YAML file
:return: A dictionary
"""
with open(path, "r") as stream:
return yaml.load(stream, Loader=yaml.FullLoader)
def get_component_yml():
"""
:param file_name:
:return:
"""
try:
requirements = list()
with open('requirements.txt', 'r') as file:
for line in file:
if "=" in line and "#" not in line:
requirements.append(line.strip())
elif "#" in line:
...
else:
logger.exception(f"Mentioned package does not have version {line.strip()}")
date_function_yml = kfp.components.func_to_component_text(
function, packages_to_install=requirements)
variables_path = "variables.yml"
if os.path.exists(variables_path):
yaml_data: dict = read_data_from_yaml(variables_path)
if yaml_data:
envs: dict = yaml_data.get("deployment", {}).get("environmentVar", [])
python_version: str = yaml_data.get("deployment", {}).get("pythonVersion", None)
if python_version is not None and python_version in ["3.7", "3.8", "3.9", "3.10"]:
date_function_yml = kfp.components.func_to_component_text(
function, packages_to_install=requirements, base_image=f"python:{python_version}")
date_function = date_function_yml + f" env:\n"
for env_var in envs:
date_function += f" {env_var['name']}: '{env_var['value']}'\n"
with open('component.yml', 'w') as file:
file.write(date_function)
else:
with open('component.yml', 'w') as file:
file.write(date_function_yml)
else:
with open('component.yml', 'w') as file:
file.write(date_function_yml)
except Exception as e:
logger.exception(f"Unable to get the component yml {e}")
def create_table(data, key):
"""
:return:
"""
try:
rows_list = list()
for each_input in data.get(key, []):
rows_dict = dict()
rows_dict['name'] = each_input.get("name", '')
rows_dict['data_type'] = each_input.get('type', 'String')
if each_input.get('optional'):
req_opt = "Optional"
default_value = each_input.get('default', '')
else:
req_opt = "Required"
default_value = "None"
rows_dict['req_opt'] = req_opt
rows_dict['default_value'] = default_value
for each_arg in data.get('implementation', {}).get('container', {}).get('args', []):
if type(each_arg) == dict and rows_dict['name'] in each_arg.values():
rows_dict['Type'] = list(each_arg.keys())[0]
rows_dict['Description'] = each_input.get('description', '')
rows_dict['Example'] = ''
rows_list.append(list(rows_dict.values()))
if key == "inputs" and os.path.exists("variables.yml"):
yaml_data: dict = read_data_from_yaml("variables.yml")
if yaml_data:
env_var = yaml_data.get("deployment", {}).get("environmentVar", [])
for each in env_var:
env_dict = dict()
env_dict['name'] = each.get("name")
env_dict['data_type'] = "String"
env_dict['req_opt'] = "Required"
env_dict['default_value'] = "None"
env_dict['Type'] = "env"
env_dict['description'] = ""
env_dict['example'] = ""
rows_list.append(list(env_dict.values()))
return rows_list
except Exception as e:
logger.exception(f"Unable to create the table for README.MD file {e}")
def create_readme():
"""
Function is to create the readme file for the given components details
:return: Create the README.MD file in the given path
"""
try:
note_1 = "Note 1 : Available Component types are: Input, Transform, Output."
note_2 = "Note 2 : Available Environment types are: env, InputValues, InputPath, OutputValues," \
" OutputPath, PipelineParm"
column_list = ["Variable Name", "Datatype", "Required/Optional", "Default Value", "Type", "Description",
"Example"]
with open("component.yml", "r") as file:
data = yaml.safe_load(file)
if "inputs" in list(data.keys()) and "outputs" in list(data.keys()):
component_type = "Transform type"
elif "inputs" not in data:
component_type = "Input type"
else:
component_type = "Output type"
component_overview_json = dict()
component_overview_json['Component Name'] = data.get("name", " ")
component_overview_json['Component Description'] = data.get("description", " ")
component_overview_json['Component Type'] = component_type
rows_list_input = create_table(data, "inputs")
rows_list_output = create_table(data, "outputs")
rows_list = rows_list_input + rows_list_output
header = component_overview_json.get("Component Name")
table_header = " |".join(column_list) + "\n"
table_line = "--- |" * len(column_list) + "\n"
table_body = "\n".join(map(lambda x: " |".join(x), rows_list))
table = table_header + table_line + table_body
readme = f"""
# {header}
## {"Overview"}
- **Component Name** : {component_overview_json.get("Component Name")}
- **Component Description** : {component_overview_json.get("Component Description")}
- **Component Type** : {component_overview_json.get("Component Type")}
## Component Param
{table}
> {note_1}
> {note_2}
"""
with open('README.md', 'w') as f:
f.write(readme)
except Exception as e:
logger.exception(f"Unable to create the README.MD file {e}")
if __name__ == "__main__":
get_component_yml()
create_readme()
name: Inv and mppt level efficiency
inputs:
- {name: get_tags_component_output}
- {name: get_final_predicted_tags}
- {name: get_inv_level_efficiency_tags}
outputs:
- {name: output}
implementation:
container:
image: python:3.9
command:
- sh
- -c
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
'pytz==2021.3' 'loguru==0.5.3' 'scipy==1.7.1' 'numpy==1.21.0' 'mlflow==1.20.2'
'simplejson==3.17.5' 'requests==2.27.1' 'pydantic==1.8.2' 'python-dotenv==0.19.2'
'kafka-python==1.4.7' 'SQLAlchemy==1.3.20' 'sqlparse==0.4.2' 'protobuf==3.20.*'
'pandas==1.5.3' 'PyYAML==5.4' 'azure-storage-blob==12.14.1' 'azure-core==1.27.0'
'scikit-learn==1.0.2' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install
--quiet --no-warn-script-location 'pytz==2021.3' 'loguru==0.5.3' 'scipy==1.7.1'
'numpy==1.21.0' 'mlflow==1.20.2' 'simplejson==3.17.5' 'requests==2.27.1' 'pydantic==1.8.2'
'python-dotenv==0.19.2' 'kafka-python==1.4.7' 'SQLAlchemy==1.3.20' 'sqlparse==0.4.2'
'protobuf==3.20.*' 'pandas==1.5.3' 'PyYAML==5.4' 'azure-storage-blob==12.14.1'
'azure-core==1.27.0' 'scikit-learn==1.0.2' --user) && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def _make_parent_dirs_and_return_path(file_path: str):
import os
os.makedirs(os.path.dirname(file_path), exist_ok=True)
return file_path
def inv_and_mppt_level_efficiency(get_tags_component_output,
get_final_predicted_tags
, get_inv_level_efficiency_tags, output_path):
class CommonConstants:
dalmia_string_level_tags = 'dalmia_string_level_tags'
bgrimm_string_level_tags = 'bgrimm_string_level_tags'
panel_id = 'panel_id'
sub_id = 'sub_id'
inv_id_mppt_id = 'inv_id_mppt_id'
tags_property_raw = 'raw'
tags_property_predicted = 'predicted'
tags_property_efficiency = 'efficiency'
bgrim_tags_property_efficiency = 'Efficiency'
tags_property_efficiency_inv = 'efficiency'
tags_property_efficiency_plant = 'efficiency_plant'
mppt_coefficients = 'mppt_coefficients'
inv_id = 'inv_id'
mppt_id = 'mppt_id'
datetime = 'datetime'
predicted_current_mppt = 'predicted_current_mppt'
predicted_current_mppt_tag = 'predicted_current_mppt_tag'
actual_current_mppt = 'actual_current_mppt'
hour = 'hour'
skip_time = {"morning": {"start": 0, "end": 6},
"evening": {"start": 18, "end": 23}}
efficiency_mppt = 'efficiency_mppt'
efficiency_inv = 'efficiency_inv'
efficiency_plant = 'efficiency_plant'
tag_name = 'tag_name'
parameter_name = 'parameter_name'
timestamp = 'timestamp'
tag_id = 'tag_id'
efficiency_mppt_tag = 'efficiency_mppt_tag'
voltage = 'voltage'
current = 'current'
Potential = 'Potential'
Degradation = 'Degradation'
tilt_irradiance = 'tilt_irradiance'
voltage_mppt = 'voltage_mppt'
current_mppt = 'current_mppt'
date = 'date'
asia_kolkata_timezone = 'Asia/Kolkata'
asia_bangkok_timezone = 'Asia/Bangkok'
coefficient = 'coefficient'
cumulative_actual_current_mppt = 'cumulative_actual_current_mppt'
cumulative_predicted_current_mppt = 'cumulative_predicted_current_mppt'
day = "day"
time = "time"
import pandas as pd
import json
import tracemalloc
import gc
import requests
import mlflow
import re
from json import dumps
from kafka import KafkaProducer
import numpy as np
from loguru import logger
from datetime import datetime, timedelta
import pytz
import os
class MlFlow:
mlflow_tracking_uri = "https://qa.unifytwin.com/mlflow/"
mlflow_tracking_username = "mlflow"
mlflow_tracking_password = "MlFlOwQA#4321"
azure_storage_connection_string = "DefaultEndpointsProtocol=https;AccountName=azrmlilensqa006382180551;AccountKey=tDGOKfiZ2svfoMvVmS0Fbpf0FTHfTq4wKYuDX7cAxlhve/3991QuzdvJHm9vWc+lo6mtC+x9yPSghWNR4+gacg==;EndpointSuffix=core.windows.net"
azure_storage_access_key = "tDGOKfiZ2svfoMvVmS0Fbpf0FTHfTq4wKYuDX7cAxlhve/3991QuzdvJHm9vWc+lo6mtC+x9yPSghWNR4+gacg=="
user = "BGrimm_degradation"
experiment_name = "Bgrimm-String-Efficiency"
run_name = "Efficiency"
model_name = "versioning"
check_param = "hours"
model_check_param = 10
class ReqTimeZone:
required_tz = os.getenv("REQUIRED_TZ")
mlflow_tracking_uri = MlFlow.mlflow_tracking_uri
os.environ["MLFLOW_TRACKING_USERNAME"] = MlFlow.mlflow_tracking_username
os.environ["MLFLOW_TRACKING_PASSWORD"] = MlFlow.mlflow_tracking_password
os.environ["AZURE_STORAGE_CONNECTION_STRING"] = MlFlow.azure_storage_connection_string
os.environ["AZURE_STORAGE_ACCESS_KEY"] = MlFlow.azure_storage_access_key
mlflow.set_tracking_uri(mlflow_tracking_uri)
mlflow.set_registry_uri(mlflow_tracking_uri)
client = mlflow.tracking.MlflowClient()
class KairosDb:
metric_name = os.getenv("METRIC_NAME")
uri = os.getenv("KAIROS_URI")
aggregator = os.getenv("AGGREGATOR")
aggregator_value = os.getenv("AGGREGATOR_VALUE")
aggregator_unit = os.getenv("AGGREGATOR_UNIT")
class DateRange:
start_relative_days = int(os.getenv("START_RELATIVE"))
start_hour_relative = int(os.getenv("START_HOUR_RELATIVE"))
day_starting_hour = int(os.getenv("DAY_STARTING_HOUR"))
start_minute_relative = int(os.getenv("START_MINUTE_RELATIVE"))
start_second_relative = int(os.getenv("START_SECOND_RELATIVE"))
end_relative_days = int(os.getenv("END_RELATIVE"))
end_hour_relative = int(os.getenv("END_HOUR_RELATIVE"))
end_minute_relative = int(os.getenv("END_MINUTE_RELATIVE"))
end_second_relative = int(os.getenv("END_SECOND_RELATIVE"))
class ReqTimeZone:
required_tz = os.getenv("REQUIRED_TZ")
try:
start_date = (datetime.now(pytz.utc) - timedelta(days=int(DateRange.start_relative_days),
hours=int(DateRange.start_hour_relative))). \
astimezone(pytz.timezone(ReqTimeZone.required_tz)).replace(hour=int(DateRange.day_starting_hour),
minute=int(DateRange.start_minute_relative),
second=int(DateRange.start_second_relative),
microsecond=0)
end_date = (datetime.now(pytz.utc) - timedelta(days=int(DateRange.end_relative_days),
hours=int(DateRange.end_hour_relative))). \
astimezone(pytz.timezone(ReqTimeZone.required_tz)).replace(minute=int(DateRange.end_minute_relative),
second=int(DateRange.end_second_relative),
microsecond=0)
start_timestamp = int(start_date.timestamp()) * 1000
end_timestamp = int(end_date.timestamp()) * 1000
timestamp_dict = {
"start_timestamp": start_timestamp,
"end_timestamp": end_timestamp}
print(timestamp_dict)
except Exception as e:
logger.exception(f'Exception - {e}')
class KairosQuery:
def __init__(self, start_timestamp, end_timestamp, tag_dict):
self.start_timestamp = start_timestamp
self.end_timestamp = end_timestamp
self.kairos_host = KairosDb.uri
self.kairos_url = "{kairos_host}/api/v1/datapoints/query".format(kairos_host=self.kairos_host)
self.tag_dict = tag_dict
def kairos_query(self):
try:
return {
"metrics": [
{
"tags": {
"c3": list(self.tag_dict.keys())
},
"name": KairosDb.metric_name,
"group_by": [
{
"name": "tag",
"tags": ["c3"]
}
],
"aggregators": [
{
"name": KairosDb.aggregator,
"sampling": {
"value": KairosDb.aggregator_value,
"unit": KairosDb.aggregator_unit
},
"align_sampling": True,
"align_start_time": True
}
]
}
],
"plugins": [],
"cache_time": 0,
"time_zone": "Asia/Calcutta",
"start_absolute": self.start_timestamp,
"end_absolute": self.end_timestamp,
}
except Exception as e:
logger.exception(f"Exception - {e}")
def get_data(self, data_query):
try:
logger.info("Data for the parameters being pulled from Kairos Database")
response = requests.post(self.kairos_url, data=json.dumps(data_query))
response_data = response.json()
output_data = response_data["queries"]
logger.debug("Data pull complete")
df_final = pd.DataFrame()
for i in range(len(output_data)):
grouped_output_data = output_data[i]["results"]
for each_grouped_data in grouped_output_data:
value = (each_grouped_data["values"])
tag_id = each_grouped_data["group_by"][0]["group"]["c3"]
try:
logger.debug("Renamed {} to {} in Data".format(tag_id, self.tag_dict[tag_id]))
column_name = self.tag_dict[tag_id]
except KeyError as e:
logger.exception(f'Exception - {e}')
logger.debug("Column Renaming Logic not found for {}".format(tag_id))
column_name = tag_id
df_column_data = pd.DataFrame(data=value, columns=[CommonConstants.timestamp, column_name])
if df_final.empty:
df_final = df_column_data
else:
df_final = df_final.merge(df_column_data, how="outer", left_on=CommonConstants.timestamp,
right_on=CommonConstants.timestamp)
df_final[CommonConstants.datetime] = \
pd.to_datetime(df_final[CommonConstants.timestamp], unit="ms").dt.tz_localize('UTC'). \
dt.tz_convert(CommonConstants.asia_bangkok_timezone)
logger.debug("Final number of columns : {}".format(str(len(list(df_final.columns)))))
return df_final
except Exception as e:
logger.exception(f"Exception occurred - {e}", exc_info=True)
def kairos_data_import(self):
try:
logger.debug("Fetching live data")
query_live = self.kairos_query()
logger.info(f"query_live = {query_live}")
df = self.get_data(data_query=query_live)
return df
except Exception as e:
logger.exception(f"Exception - {e}")
def __del__(self):
try:
print('destructor called, KairosQuery die!')
except Exception as e:
logger.exception(f'Exception - {e}')
class ReformatKairosData:
@staticmethod
def get_tags_data(df_input_tags, start_timestamp, end_timestamp, inv_id, mppt_id, city):
try:
gc.collect()
tracemalloc.clear_traces()
df_tags_id = df_input_tags[[CommonConstants.tag_id, CommonConstants.tag_name, CommonConstants.inv_id,
CommonConstants.parameter_name, CommonConstants.mppt_id]]
df_tags_id.reset_index(drop=True, inplace=True)
current_voltage_tags_only = \
[data for data in df_tags_id[CommonConstants.parameter_name]
if any([x in data for x in [CommonConstants.voltage, CommonConstants.current]])]
req_data_list = [data for data in current_voltage_tags_only if CommonConstants.Potential not in data]
req_data_list = [data for data in req_data_list if CommonConstants.Degradation not in data]
df_req_tags_id = df_tags_id.loc[df_tags_id[CommonConstants.parameter_name].isin(req_data_list)]
df_req_tags_id.reset_index(drop=True, inplace=True)
tags_dict = df_req_tags_id[[CommonConstants.tag_id, CommonConstants.parameter_name]].set_index(
CommonConstants.tag_id).T.to_dict(orient="records")[0]
sites = {
"bgrimmchonburi": {
"tilt_irradiance": "site_101$dept_102$line_105$equipment_220$tag_478"
}
}
tags_dict[sites[city][CommonConstants.tilt_irradiance]] = CommonConstants.tilt_irradiance
del df_req_tags_id
get_kairos_query = KairosQuery(start_timestamp=start_timestamp, end_timestamp=end_timestamp,
tag_dict=tags_dict)
df_data = get_kairos_query.kairos_data_import()
for tag in current_voltage_tags_only:
if CommonConstants.voltage_mppt in tag:
df_data.rename(columns={tag: CommonConstants.voltage_mppt}, inplace=True)
elif CommonConstants.current_mppt in tag:
df_data.rename(columns={tag: CommonConstants.current_mppt}, inplace=True)
df_data[CommonConstants.inv_id] = inv_id
df_data[CommonConstants.mppt_id] = mppt_id
df_data[CommonConstants.date] = df_data[CommonConstants.datetime].dt.date
df_data[CommonConstants.hour] = df_data[CommonConstants.datetime].dt.hour
df_data.drop([CommonConstants.date], axis=1, inplace=True)
logger.info(f'shape of the input dataframe for {inv_id} & {mppt_id} = {df_data.shape}')
logger.info(f'columns present in dataframe for {inv_id} & {mppt_id} - {list(df_data.columns)}')
df_data.loc[df_data[CommonConstants.tilt_irradiance] <= 0, CommonConstants.tilt_irradiance] = 0
df_data.reset_index(drop=True, inplace=True)
tracemalloc.clear_traces()
del get_kairos_query
del current_voltage_tags_only
del req_data_list
del df_tags_id
return df_data
except Exception as e:
logger.exception(f'Exception - {e}')
def __del__(self):
try:
print('destructor called, ReformatKairosData die!')
except Exception as e:
logger.exception(f'Exception - {e}')
class ModelLoad(object):
def model_manager(self, inv_id, city, panel_id):
try:
gc.collect()
tracemalloc.clear_traces()
tracemalloc.get_traced_memory()
experiment_id = self.create_experiment(experiment_name=MlFlow.experiment_name)
try:
run_df = mlflow.search_runs([experiment_id], filter_string=f"run_name='Efficiency_{city}'")
site_id = run_df['run_id'].values[0]
except Exception as e:
site_id = None
days, latest_run_id = self.fetch_latest_model(experiment_id=experiment_id,
run_name=MlFlow.run_name + '_' + panel_id + '_' + inv_id,
# run_name=panel_id + '_' + MlFlow.run_name + '_' + inv_id,
site_id=site_id)
logger.debug(f'loading the pretrained model !')
energy_model = self.load_model_pyfunc(
model_path=self.forming_loading_path(latest_run_id=latest_run_id))
return energy_model
except Exception as e:
logger.exception(f'Exception - {str(e)}')
@staticmethod
def create_experiment(experiment_name):
"""
Function is to create an experiment by passing experiment name
:param experiment_name: Name of the experiment
:return: Experiment id, Run id if any parent run is existing
"""
try:
experiment = mlflow.get_experiment_by_name(experiment_name)
if experiment:
exp_id = experiment.experiment_id
else:
mlflow.set_experiment(experiment_name)
experiment = mlflow.get_experiment_by_name(experiment_name)
exp_id = experiment.experiment_id
return exp_id
except Exception as e:
logger.exception(str(e))
@staticmethod
def creating_run(experiment_id, run_id=None, run_name=None, nested=False):
try:
latest_run_id = None
if run_id:
df = mlflow.search_runs([experiment_id])
run_id_list = list(df["run_id"])
if run_id in run_id_list:
return run_id
else:
run = client.create_run(experiment_id)
with mlflow.start_run(
experiment_id=experiment_id, run_name=run_name, run_id=run.info.run_id,
nested=nested) as run:
return run.info.run_id
elif run_name:
df = mlflow.search_runs([experiment_id])
if df.empty:
run = client.create_run(experiment_id=experiment_id, tags={"mlflow.runName": run_name,
"mlflow.user": MlFlow.user})
with mlflow.start_run(
experiment_id=experiment_id, run_id=run.info.run_id, run_name=run_name,
nested=nested) as run:
return run.info.run_id
else:
for index, row in df.iterrows():
if run_name == row.get("tags.mlflow.runName", ""):
latest_run_id = row.get("run_id")
if latest_run_id:
return latest_run_id
else:
run = client.create_run(experiment_id=experiment_id, tags={"mlflow.runName": run_name,
"mlflow.user": MlFlow.user})
with mlflow.start_run(
experiment_id=experiment_id, run_id=run.info.run_id, run_name=run_name,
nested=nested) as run:
return run.info.run_id
except Exception as e:
logger.exception(str(e))
@staticmethod
def creating_new_nested_run(experiment_id, run_id=None, run_name=None, nested=False):
"""
Function is to create a nested run
:param experiment_id: Experiment Id
:param run_id: run id
:param nested: nested Run
:return : return nested run id
"""
try:
with mlflow.start_run(experiment_id=experiment_id, run_id=run_id, nested=nested):
with mlflow.start_run(experiment_id=experiment_id, nested=True, run_name=run_name) as run:
return run.info.run_id
except Exception as e:
logger.exception(str(e))
@staticmethod
def log_model(model, model_name):
"""
Function is to log the model
:param model : model
:param model_name : model_name
:return: Boolean Value
"""
try:
mlflow.sklearn.log_model(model, model_name)
logger.info("logged the model")
return True
except Exception as e:
logger.exception(str(e))
@staticmethod
def log_metrics(metrics):
"""
Function is to log the metrics
:param metrics: dict of metrics
:return: Boolen value
"""
try:
updated_metric = dict()
for key, value in metrics.items():
key = re.sub(r"[\([{})\]]", "", key)
updated_metric[key] = value
mlflow.log_metrics(updated_metric)
logger.debug(f'logged the model metric')
return True
except Exception as e:
logger.exception(str(e))
@staticmethod
def log_hyper_param(hyperparameters):
"""
Function is to log the hyper params
:param hyperparameters: dict of hyperparameters
:return: Boolen value
"""
try:
mlflow.log_params(hyperparameters)
logger.debug(f'logged model hyper parameters')
return True
except Exception as e:
logger.exception(str(e))
def fetch_latest_model(self, experiment_id, run_name, site_id):
"""
Function is to fetch the latest run
:param experiment_id: Experiment Id
:return: return the difference in the days/Hours/Minutes of current and run time, latest run id
"""
try:
days = int(MlFlow.model_check_param) + 1
model_history = ""
latest_run_id = ""
if experiment_id:
run_id = self.get_parent_run_id(experiment_id, run_name, site_id)
run_info = mlflow.search_runs([experiment_id],
filter_string="tags.mlflow.parentRunId='{run_id}'".format(
run_id=run_id))
if not run_info.empty:
for ind in run_info.index:
model_history, days, latest_run_id = self.check_model_existing(run_info=run_info,
index=ind)
if model_history is not None:
break
if model_history is None:
days = int(MlFlow.model_check_param) + 1
logger.info("No Model is existing with this experiment")
return days, latest_run_id
except Exception as e:
logger.exception(f"Exception while fetching the latest model - {e}")
@staticmethod
def get_parent_run_id(experiment_id, run_name, site_id):
"""
Function is to fetch latest parent run id if available else latest run id
:param experiment_id: Experiment Id
:param run_name: Name of the run
:return: latest parent run id
"""
try:
result_run_id = None
df = mlflow.search_runs([experiment_id])
if site_id is None:
return None
else:
df = df[df['tags.mlflow.parentRunId'] == site_id]
df.reset_index(drop=True, inplace=True)
for index, row in df.iterrows():
parent_run_name = row.get("tags.mlflow.runName")
if parent_run_name == run_name:
result_run_id = row.get("run_id")
else:
logger.info(f"No Run is existing with this Experiment id - {experiment_id}")
return result_run_id
except Exception as e:
logger.exception(f"Exception while fetching the latest run_id - {e}")
def check_model_existing(self, run_info, index):
"""
Function is to check if model is existing or not
:param run_info: Dataframe of run details
:param index: index of which run from the dataframe
:return:
"""
try:
model_history = None
date_param = MlFlow.check_param
# Difference between the current date and latest available model date
days = self.format_mlflow_time(run_info=run_info, index=index, date_param=date_param)
latest_run_id = run_info.loc[index, 'run_id']
if 'tags.mlflow.log-model.history' in run_info:
model_history = run_info['tags.mlflow.log-model.history'][index]
if model_history:
model_history_list = model_history.split(":")
model_history = model_history_list[2].split(",")[0]
else:
logger.info("No Model is existing")
return model_history, days, latest_run_id
except Exception as e:
logger.exception(f"Exception while checking the model name - {e}")
@staticmethod
def forming_loading_path(latest_run_id):
"""
Function is to form the loading path
:param latest_run_id: Run id
:return : Return the loading path
"""
try:
model_name = MlFlow.model_name
model_path = f"runs:/{latest_run_id}/{model_name}"
return model_path
except Exception as e:
logger.exception(f"Exception while forming loading path - {e}")
@staticmethod
def format_mlflow_time(run_info, index, date_param):
"""
Formatting mlflow time
:param run_info: details of the runs
:param index: index of the run in the dataframe
:param: What type of the date param
:return: calculate the time difference between the mlflow time and the current time zone
"""
try:
df_time = run_info.copy()
df_time['end_time'] = pd.to_datetime(df_time['end_time']).dt.tz_convert(ReqTimeZone.required_tz)
df_time["days"] = df_time['end_time'].dt.date
df_time["hours"] = df_time['end_time'].dt.hour
df_required = df_time.iloc[index:index + 1:, :]
df_required.reset_index(drop=True, inplace=True)
last_model_time = df_required['end_time'][0].to_pydatetime()
central_current = datetime.now(pytz.utc).astimezone(pytz.timezone(ReqTimeZone.required_tz))
time_diff = central_current - last_model_time
if date_param.lower() == "days":
days_diff = int(time_diff.days)
return days_diff
elif date_param.lower() == "hours":
hours_diff = int(time_diff.total_seconds() // 3600)
return hours_diff
elif date_param.lower() == "minutes":
minutes_diff = int(time_diff.total_seconds() // 60)
return minutes_diff
else:
logger.info("No Valid Date format was given")
except Exception as e:
logger.exception(f"Exception while Loading the model - {e}")
@staticmethod
def set_tag(run_id, key, value):
"""
Function is to set the tag for a particular run
:param run_id: Run id in which the tags need to be added
:param key: Name of the key
:param value: what needs to tagged in the value
"""
try:
client.set_tag(run_id=run_id, key=key, value=value)
logger.debug(f'set the tag for the model')
except Exception as e:
logger.exception(f"Exception while setting the tag - {e}")
@staticmethod
def load_model_pyfunc(model_path):
"""
Function is load the sklearn model
:param model_path: path of the model
:return: boolen value
"""
try:
model = mlflow.pyfunc.load_model(model_path)
logger.info("loading the model")
return model
except Exception as e:
logger.exception(str(e))
def __del__(self):
try:
logger.info('destructor called, ModelLoad die!')
except Exception as e:
logger.exception(f'Exception - {e}')
class Kafka:
kafka_host = os.getenv('KAFKA_HOST')
kafka_port = os.getenv('KAFKA_PORT')
kafka_topic = os.getenv('KAFKA_TOPIC')
class KafkaProducerUtil:
def __init__(self):
try:
self.host = Kafka.kafka_host
self.port = Kafka.kafka_port
logger.debug(f"Connecting to Kafka with details: {self.host}, {self.port}")
kafka_broker = [self.host + ":" + str(self.port)]
self.producer = KafkaProducer(
bootstrap_servers=kafka_broker,
value_serializer=lambda v: v.encode('utf-8'),
api_version=(0, 10, 1))
self.producer.flush()
except Exception as e:
logger.error(f"Kafka connection error: {e}")
def publish(self, topic, data):
try:
kafka_response = self.producer.send(topic, data)
self.producer.flush()
logger.debug(f" Message sent to kafka with response: {kafka_response}")
return True
except Exception as e:
logger.error(e)
return False
class KairosWriter(KafkaProducerUtil):
def write_data(self, data_json, topic):
site_id = "site_101"
logger.debug(f"Data being pushed to kafka topic: {topic}")
msg_counter = 0
for k, v in data_json.items():
timestamp, data = self.data_validator(k, v)
timestamp = timestamp * 1000
write_json = {
"data": data,
"site_id": site_id,
"gw_id": "gw_{}".format(site_id.lstrip("site_")),
# The lstrip(s) removes leading whitespace (on the left)
"pd_id": "pd_{}".format(site_id.lstrip("site_")),
# The rstrip(s) removes the trailing whitespace (on the right)
"timestamp": timestamp,
"msg_id": msg_counter,
"partition": "",
"retain_flag": False
}
logger.debug(f"Timestamp: {timestamp}, Values: {data}")
self.publish(topic, dumps(write_json))
msg_counter += 1
return msg_counter
def audit_data(self, data_json, topic):
logger.debug(f"Audit Data being pushed to kafka topic: {topic}")
msg_counter = len(data_json)
for each in data_json:
self.publish(topic, dumps(each))
return msg_counter
@staticmethod
def data_validator(timestamp, data):
logger.debug("Validating the data to remove Nan values")
__temp__ = {}
for k, v in data.items():
if not k.startswith("site"):
continue
# This function will return True if the "v" is one of the types in the tuple
if isinstance(v, (int, float)) and str(v) not in ('nan', 'inf'):
__temp__[k] = v
return int(timestamp), __temp__
def insert_values_bgrimm(ts, my_dict):
kairos_writer = KairosWriter()
kairos_writer.write_data(
{
ts: my_dict
},
Kafka.kafka_topic
)
logger.info("Data pushed successfully!")
class CalculatedDataPush:
def __init__(self, df_predicted):
self.df_predicted = df_predicted
def kafka_data_push(self):
try:
logger.info(f'Pushing data for inv_mppt - '
f'{list(self.df_predicted[CommonConstants.inv_id_mppt_id].unique())}')
logger.info(f"df predicted shape = {self.df_predicted.shape}")
df_push = self.df_predicted[[CommonConstants.datetime, CommonConstants.timestamp,
CommonConstants.predicted_current_mppt,
CommonConstants.efficiency_mppt,
CommonConstants.predicted_current_mppt_tag,
CommonConstants.efficiency_mppt_tag,
CommonConstants.hour]]
hour_for_latest_data = datetime.now(pytz.utc).astimezone(pytz.timezone('Asia/Kolkata')).hour - 1
df_push = df_push[df_push[CommonConstants.hour] >= hour_for_latest_data]
df_push.drop([CommonConstants.hour], axis=1, inplace=True)
df_push.reset_index(drop=True, inplace=True)
predicted_current_mppt_tag = \
list(self.df_predicted[CommonConstants.predicted_current_mppt_tag].unique())[0]
efficiency_mppt_tag = list(self.df_predicted[CommonConstants.efficiency_mppt_tag].unique())[0]
tags_dict = {CommonConstants.predicted_current_mppt: predicted_current_mppt_tag,
CommonConstants.efficiency_mppt: efficiency_mppt_tag}
df_push = df_push.round(3)
for i, j in df_push.iterrows():
my_dict = {v: j[k] for k, v in tags_dict.items()}
logger.info(f"{j[CommonConstants.timestamp], j[CommonConstants.datetime], my_dict}")
insert_values_bgrimm(j[CommonConstants.timestamp], my_dict)
logger.info(f'data pushed for datetime - {j[CommonConstants.datetime]}')
logger.info(f'Pushed data for {list(self.df_predicted[CommonConstants.inv_id_mppt_id].unique())}')
except Exception as e:
logger.exception(f'Exception - {e}')
def __del__(self):
try:
print('destructor called, CalculatedDataPush die!')
except Exception as e:
logger.exception(f'Exception - {e}')
def calculations_and_data_push(x_test, predictions, plant_efficiency_dict, inv_id,
df_inv_tags, inv_level_efficiency_tags, panel_id):
try:
logger.info(f'concatenating actual current mppt & predicted current mppt')
df_result = pd.DataFrame(index=[i for i in range(len(x_test))])
df_result[CommonConstants.datetime] = x_test[CommonConstants.datetime]
df_result[CommonConstants.actual_current_mppt] = x_test[CommonConstants.current_mppt]
df_result[CommonConstants.predicted_current_mppt] = predictions.reshape(-1, 1)
df_result[CommonConstants.hour] = df_result[CommonConstants.datetime].dt.hour
df_result[CommonConstants.inv_id_mppt_id] = x_test[CommonConstants.inv_id_mppt_id]
df_result.loc[df_result[CommonConstants.predicted_current_mppt] <= 0,
CommonConstants.predicted_current_mppt] = 0
# coefficients = GetCoefficientMultiplier()
# df_result = coefficients.multiply_mppt_coefficients(df_predicted=df_result,
# df_coefficients=self.df_coefficients)
date_inv_dict = dict()
logger.info(f'data to be push for the time - {list(df_result[CommonConstants.datetime].unique())}')
for inv_id_mppt_id in list(df_result[CommonConstants.inv_id_mppt_id].unique()):
df = df_result.copy()
df = df[df[CommonConstants.inv_id_mppt_id] == inv_id_mppt_id]
df.reset_index(drop=True, inplace=True)
df[CommonConstants.cumulative_actual_current_mppt] = \
df[CommonConstants.actual_current_mppt].cumsum()
df[CommonConstants.cumulative_predicted_current_mppt] = \
df[CommonConstants.predicted_current_mppt].cumsum()
# logger.info(f'multiplying the coefficients with the predicted current mppt for {inv_id}')
# coefficients = GetCoefficientMultiplier()
# _df = coefficients.multiply_mppt_coefficients(df_predicted=df, df_coefficients=self.df_coefficients)
logger.info(f'calculating the mppt efficiency for {inv_id}')
df[CommonConstants.efficiency_mppt] \
= ((df[CommonConstants.cumulative_actual_current_mppt]) * 100) / \
df[CommonConstants.cumulative_predicted_current_mppt]
df.loc[df[CommonConstants.efficiency_mppt] >= 100, CommonConstants.efficiency_mppt] = 100
df.loc[df[CommonConstants.cumulative_actual_current_mppt].isnull(),
CommonConstants.efficiency_mppt] = None
df.iloc[df[CommonConstants.hour].between(CommonConstants.skip_time.get("morning").get("start"),
CommonConstants.skip_time.get("morning").get("end")),
df.columns.get_loc(CommonConstants.predicted_current_mppt)] = 0
df.iloc[df[CommonConstants.hour].between(CommonConstants.skip_time.get("evening").get("start"),
CommonConstants.skip_time.get("evening").get("end")),
df.columns.get_loc(CommonConstants.predicted_current_mppt)] = 0
df.loc[df[CommonConstants.cumulative_predicted_current_mppt] == 0,
CommonConstants.efficiency_mppt] = np.nan
# df.iloc[df[CommonConstants.hour].between(), df.columns.get_loc(
# CommonConstants.predicted_current_mppt)] = 0
df_predicted = pd.merge(df, df_inv_tags.drop([CommonConstants.tag_name, CommonConstants.inv_id,
CommonConstants.parameter_name, CommonConstants.mppt_id],
axis=1), on=CommonConstants.inv_id_mppt_id, how='left')
df_predicted.reset_index(drop=True, inplace=True)
df_predicted[CommonConstants.timestamp] = \
df_predicted[CommonConstants.datetime].values.astype(np.int64) / 10 ** 9
df_predicted[CommonConstants.timestamp] = df_predicted[CommonConstants.timestamp].astype('int')
df_predicted = df_predicted.round(3)
logger.info(f'mppt efficiency pushing for {inv_id} for time - {inv_id_mppt_id}')
data_push = CalculatedDataPush(df_predicted=df_predicted)
data_push.kafka_data_push()
logger.info(f'calculating the inv level efficiency for {inv_id}')
df_inv_eff = pd.DataFrame()
df_inv_eff[CommonConstants.datetime] = df_result[CommonConstants.datetime]
df_inv_eff.drop_duplicates(subset=[CommonConstants.datetime], keep='first', inplace=True)
df_inv_eff.reset_index(drop=True, inplace=True)
for inv_id_mppt_id in list(df_result[CommonConstants.inv_id_mppt_id].unique()):
df_inv_mppt = df_result[df_result[CommonConstants.inv_id_mppt_id] == inv_id_mppt_id]
cumulative_actual_current_inv = f'{inv_id_mppt_id}_{CommonConstants.cumulative_actual_current_mppt}'
cumulative_predict_current_inv = f'{inv_id_mppt_id}_{CommonConstants.cumulative_predicted_current_mppt}'
df_inv_eff = pd.merge(df_inv_eff, df_inv_mppt[[CommonConstants.datetime,
CommonConstants.actual_current_mppt,
CommonConstants.predicted_current_mppt]],
on=CommonConstants.datetime, how='left')
# df_inv_eff.dropna(axis=0, inplace=True)
df_inv_eff.reset_index(drop=True, inplace=True)
df_inv_eff[cumulative_actual_current_inv] = df_inv_eff[CommonConstants.actual_current_mppt].cumsum()
df_inv_eff[cumulative_predict_current_inv] = \
df_inv_eff[CommonConstants.predicted_current_mppt].cumsum()
df_inv_eff.drop([CommonConstants.actual_current_mppt, CommonConstants.predicted_current_mppt],
axis=1, inplace=True)
cumulative_actual_current_cols = [col for col in df_inv_eff.columns
if CommonConstants.cumulative_actual_current_mppt in col]
cumulative_predict_current_cols = [col for col in df_inv_eff.columns
if CommonConstants.cumulative_predicted_current_mppt in col]
df_inv_eff_push = pd.DataFrame()
df_inv_eff_push[CommonConstants.datetime] = df_inv_eff[CommonConstants.datetime]
df_inv_eff_push[CommonConstants.cumulative_actual_current_mppt] = \
df_inv_eff[cumulative_actual_current_cols].sum(axis=1)
df_inv_eff_push[CommonConstants.cumulative_predicted_current_mppt] = \
df_inv_eff[cumulative_predict_current_cols].sum(axis=1)
tags_dict = \
{CommonConstants.efficiency_inv: list(inv_level_efficiency_tags.loc[
inv_level_efficiency_tags[CommonConstants.inv_id] == inv_id,
CommonConstants.tag_id].values)[0]}
df_inv_eff_push[CommonConstants.efficiency_inv] = \
df_inv_eff_push[CommonConstants.cumulative_actual_current_mppt] * 100 / \
df_inv_eff_push[CommonConstants.cumulative_predicted_current_mppt]
df_inv_eff_push.loc[df_inv_eff_push[CommonConstants.efficiency_inv] > 100,
CommonConstants.efficiency_inv] = 100
df_inv_eff_push[CommonConstants.timestamp] = \
df_inv_eff_push[CommonConstants.datetime].values.astype(np.int64) / 10 ** 9
df_inv_eff_push[CommonConstants.timestamp] = df_inv_eff_push[CommonConstants.timestamp].astype('int')
logger.info(f'{df_inv_eff_push.shape}')
df_inv_eff_push = df_inv_eff_push.round(3)
df_inv_eff_latest = df_inv_eff_push.copy()
df_inv_eff_latest[CommonConstants.hour] = df_inv_eff_latest[CommonConstants.datetime].dt.hour
hour_for_latest_data = datetime.now(pytz.utc).astimezone(pytz.timezone('Asia/Kolkata')).hour - 1
df_inv_eff_latest = df_inv_eff_latest[df_inv_eff_latest[CommonConstants.hour] >= hour_for_latest_data]
df_inv_eff_latest.drop([CommonConstants.hour], axis=1, inplace=True)
df_inv_eff_latest.reset_index(drop=True, inplace=True)
for i, j in df_inv_eff_latest.iterrows():
my_dict = {v: j[k] for k, v in tags_dict.items()}
logger.info(f"{j[CommonConstants.timestamp], j[CommonConstants.datetime], my_dict}")
insert_values_bgrimm(j[CommonConstants.timestamp], my_dict)
logger.info(f'data pushed for datetime - {j[CommonConstants.datetime]}')
logger.info(f'{inv_id} efficiency has been pushed !')
for index in range(df_inv_eff_push.shape[0]):
date = df_inv_eff_push.loc[index, CommonConstants.datetime]
date_inv_dict[date] = {CommonConstants.cumulative_actual_current_mppt: df_inv_eff_push.loc[
index, CommonConstants.cumulative_actual_current_mppt],
CommonConstants.cumulative_predicted_current_mppt: df_inv_eff_push.loc[
index, CommonConstants.cumulative_predicted_current_mppt]}
plant_efficiency_dict[panel_id + '_' + inv_id] = date_inv_dict
return plant_efficiency_dict
except Exception as e:
logger.exception(f'Exception - {e}')
class Inference:
def __init__(self, df, city, panel_id):
self.df = df
self.city = city
self.panel_id = panel_id
def data_inference(self, inv_id):
try:
df_test_mppt = self.df[[CommonConstants.datetime, CommonConstants.tilt_irradiance,
CommonConstants.voltage_mppt, CommonConstants.current_mppt,
CommonConstants.hour, CommonConstants.inv_id_mppt_id]]
df_test_mppt.reset_index(drop=True, inplace=True)
x_test = df_test_mppt[[CommonConstants.datetime, CommonConstants.tilt_irradiance,
CommonConstants.voltage_mppt, CommonConstants.hour, CommonConstants.current_mppt,
CommonConstants.inv_id_mppt_id]]
logger.debug(f'shape of x_test for {inv_id} before removing null rows - {x_test.shape}')
logger.info(f'total null values in inference dataframe for {inv_id} - {x_test.isnull().sum()}')
model_load = ModelLoad()
model = model_load.model_manager(inv_id=inv_id, city=self.city, panel_id=self.panel_id)
del model_load
x_test.reset_index(drop=True, inplace=True)
x_test = x_test.loc[x_test[CommonConstants.voltage_mppt].notna()]
x_test.reset_index(drop=True, inplace=True)
logger.debug(f'shape of x_test for {inv_id} after removing null rows - {x_test.shape}')
predictions = model.predict(x_test.drop([CommonConstants.datetime, CommonConstants.inv_id_mppt_id,
CommonConstants.current_mppt], axis=1)).reshape(1, -1)
logger.debug(f'predictions shape - {predictions.shape}')
return x_test, predictions
except Exception as e:
logger.exception(f'Exception - {e}')
def __del__(self):
try:
print('destructor called, Inference die!')
except Exception as e:
logger.exception(f'Exception - {e}')
try:
# with open(input_path, 'r') as f:
# get_tags_component_output_dict = json.load(f)
# print(get_tags_component_output_dict)
with open(get_tags_component_output, 'r') as f:
get_tags_component_output_dict = json.load(f)
with open(get_final_predicted_tags, 'r') as f:
get_final_predicted_tags_dict = json.load(f)
with open(get_inv_level_efficiency_tags, 'r') as f:
get_inv_level_efficiency_tags_dict = json.load(f)
df_raw_tags = get_tags_component_output_dict.get("df_raw_tags")
df_raw_tags = pd.DataFrame(df_raw_tags)
df_predicted_and_efficiency_tags = get_final_predicted_tags_dict.get("df_predicted_and_efficiency_tags")
df_predicted_and_efficiency_tags = pd.DataFrame(df_predicted_and_efficiency_tags)
inv_level_efficiency_tags = get_inv_level_efficiency_tags_dict.get("inv_level_efficiency_tags")
inv_level_efficiency_tags = pd.DataFrame(inv_level_efficiency_tags)
start_timestamp = timestamp_dict.get("start_timestamp")
end_timestamp = timestamp_dict.get("end_timestamp")
city = "bgrimmchonburi"
plant_efficiency_dict = dict()
logger.info(f'total inv available in {city} site are - '
f'{df_raw_tags[CommonConstants.inv_id].unique()}')
for each_panel in list(df_raw_tags['sub_id'].unique()):
df_raw_tags_copy = df_raw_tags.copy()
df_raw_tags_copy = df_raw_tags_copy[df_raw_tags_copy[CommonConstants.sub_id] == each_panel]
for inv_id in list(df_raw_tags_copy[CommonConstants.inv_id].unique()):
df = df_raw_tags_copy[df_raw_tags_copy[CommonConstants.inv_id] == inv_id]
df_inv = pd.DataFrame()
logger.info(f'total mppt available for {each_panel} - {inv_id} are - '
f'{df[CommonConstants.mppt_id].unique()}')
for mppt_id in list(df[CommonConstants.mppt_id].unique()):
df_mppt_level = df[df[CommonConstants.mppt_id] == mppt_id]
reformat_kairos_data = ReformatKairosData()
if mppt_id != 'mppt_1':
volatge_tag_row = df[(df[CommonConstants.mppt_id] == 'mppt_1') & (df['unit'] == 'Voltage')]
volatge_tag_row.reset_index(drop=True, inplace=True)
df_mppt_level = df_mppt_level.append(volatge_tag_row, ignore_index=True)
df_kairos_data = reformat_kairos_data.get_tags_data(df_input_tags=df_mppt_level,
start_timestamp=start_timestamp,
end_timestamp=end_timestamp,
inv_id=inv_id, mppt_id=mppt_id,
city=city)
if df_inv.empty:
df_inv = df_kairos_data
else:
df_inv = pd.concat([df_inv, df_kairos_data], ignore_index=True)
logger.info(f'shape of input data for {each_panel} - {inv_id} & {mppt_id} - {df_inv.shape}')
logger.info(f'data fetching for {each_panel} - {inv_id} has been completed')
logger.info(f'final shape of input data after concatenation for overall '
f'{inv_id} - {df_inv.shape}')
df_predicted_and_efficiency_tags_copy = df_predicted_and_efficiency_tags.copy()
df_predicted_and_efficiency_tags_copy = df_predicted_and_efficiency_tags_copy[
df_predicted_and_efficiency_tags_copy[CommonConstants.sub_id] == each_panel]
df_inv_tags = df_predicted_and_efficiency_tags_copy[
df_predicted_and_efficiency_tags_copy[CommonConstants.inv_id] == inv_id]
# df_inv[CommonConstants.inv_id_mppt_id] = \
# df_inv[CommonConstants.inv_id] + '_' + df_inv[CommonConstants.mppt_id]
df_inv[CommonConstants.inv_id_mppt_id] = each_panel + '_' + df_inv[CommonConstants.inv_id] + '_' \
+ df_inv[CommonConstants.mppt_id]
try:
df_inv = df_inv[df_inv[CommonConstants.tilt_irradiance].notna()]
except Exception as e:
logger.debug(f'{CommonConstants.tilt_irradiance} is not present in dataframe for {inv_id}')
logger.exception(f'Exception - {e}')
try:
df_inv = df_inv[df_inv[CommonConstants.voltage_mppt].notna()]
except Exception as e:
logger.debug(f'{CommonConstants.voltage_mppt} is not present in dataframe for {inv_id}')
logger.exception(f'Exception - {e}')
df_inv.reset_index(drop=True, inplace=True)
logger.info(f'{inv_id} shape after removing null values - {df_inv.shape}')
try:
get_final_inference = Inference(df=df_inv, city=city, panel_id=each_panel)
x_test, predictions = get_final_inference.data_inference(inv_id=inv_id)
del get_final_inference
plant_efficiency_dict = calculations_and_data_push(x_test=x_test, predictions=predictions,
inv_id=inv_id,
plant_efficiency_dict=plant_efficiency_dict,
df_inv_tags=df_inv_tags,
inv_level_efficiency_tags=inv_level_efficiency_tags,
panel_id=each_panel)
logger.debug(f'data push for {inv_id} has been completed !')
except Exception as e:
logger.exception(f' Exception - {e}')
final_dict = {"plant_efficiency_dict": plant_efficiency_dict.to_dict(orient="records"),
"df_inv": df_inv.to_dict(orient="records")}
with open(output_path, 'w') as f:
json.dump(final_dict, f)
print(final_dict)
except Exception as e:
logger.exception(f'Exception - {e}')
import argparse
_parser = argparse.ArgumentParser(prog='Inv and mppt level efficiency', description='')
_parser.add_argument("--get-tags-component-output", dest="get_tags_component_output", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--get-final-predicted-tags", dest="get_final_predicted_tags", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--get-inv-level-efficiency-tags", dest="get_inv_level_efficiency_tags", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--output", dest="output_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_outputs = inv_and_mppt_level_efficiency(**_parsed_args)
args:
- --get-tags-component-output
- {inputPath: get_tags_component_output}
- --get-final-predicted-tags
- {inputPath: get_final_predicted_tags}
- --get-inv-level-efficiency-tags
- {inputPath: get_inv_level_efficiency_tags}
- --output
- {outputPath: output}
env:
KAIROS_URI: 'https://iLens:iLensCLD$456@cloud.ilens.io/kairos/'
METRIC_NAME: 'project_264__ilens.live_data.raw'
AGGREGATOR: 'max'
AGGREGATOR_VALUE: '15'
AGGREGATOR_UNIT: 'minutes'
REQUIRED_TZ: 'Asia/Bangkok'
KAFKA_HOST: '192.168.0.220'
KAFKA_PORT: '9092'
KAFKA_TOPIC: 'ilens_dev'
START_RELATIVE: '0'
START_HOUR_RELATIVE: '1'
DAY_STARTING_HOUR: '5'
START_MINUTE_RELATIVE: '0'
START_SECOND_RELATIVE: '0'
END_RELATIVE: '0'
END_HOUR_RELATIVE: '1'
END_MINUTE_RELATIVE: '59'
END_SECOND_RELATIVE: '59'
REQUIRED_TZ: 'Asia/Bangkok'
pytz==2021.3
loguru==0.5.3
scipy==1.7.1
numpy==1.21.0
mlflow==1.20.2
simplejson==3.17.5
requests==2.27.1
pydantic==1.8.2
python-dotenv==0.19.2
kafka-python==1.4.7
SQLAlchemy==1.3.20
sqlparse==0.4.2
protobuf==3.20.*
pandas==1.5.3
PyYAML==5.4
azure-storage-blob==12.14.1
azure-core==1.27.0
scikit-learn==1.0.2
\ No newline at end of file
from kfp.components import InputPath, OutputPath
def inv_and_mppt_level_efficiency(get_tags_component_output: InputPath(),
get_final_predicted_tags: InputPath()
, get_inv_level_efficiency_tags: InputPath(), output_path: OutputPath()):
class CommonConstants:
dalmia_string_level_tags = 'dalmia_string_level_tags'
bgrimm_string_level_tags = 'bgrimm_string_level_tags'
panel_id = 'panel_id'
sub_id = 'sub_id'
inv_id_mppt_id = 'inv_id_mppt_id'
tags_property_raw = 'raw'
tags_property_predicted = 'predicted'
tags_property_efficiency = 'efficiency'
bgrim_tags_property_efficiency = 'Efficiency'
tags_property_efficiency_inv = 'efficiency'
tags_property_efficiency_plant = 'efficiency_plant'
mppt_coefficients = 'mppt_coefficients'
inv_id = 'inv_id'
mppt_id = 'mppt_id'
datetime = 'datetime'
predicted_current_mppt = 'predicted_current_mppt'
predicted_current_mppt_tag = 'predicted_current_mppt_tag'
actual_current_mppt = 'actual_current_mppt'
hour = 'hour'
skip_time = {"morning": {"start": 0, "end": 6},
"evening": {"start": 18, "end": 23}}
efficiency_mppt = 'efficiency_mppt'
efficiency_inv = 'efficiency_inv'
efficiency_plant = 'efficiency_plant'
tag_name = 'tag_name'
parameter_name = 'parameter_name'
timestamp = 'timestamp'
tag_id = 'tag_id'
efficiency_mppt_tag = 'efficiency_mppt_tag'
voltage = 'voltage'
current = 'current'
Potential = 'Potential'
Degradation = 'Degradation'
tilt_irradiance = 'tilt_irradiance'
voltage_mppt = 'voltage_mppt'
current_mppt = 'current_mppt'
date = 'date'
asia_kolkata_timezone = 'Asia/Kolkata'
asia_bangkok_timezone = 'Asia/Bangkok'
coefficient = 'coefficient'
cumulative_actual_current_mppt = 'cumulative_actual_current_mppt'
cumulative_predicted_current_mppt = 'cumulative_predicted_current_mppt'
day = "day"
time = "time"
import pandas as pd
import json
import tracemalloc
import gc
import requests
import mlflow
import re
from json import dumps
from kafka import KafkaProducer
import numpy as np
from loguru import logger
from datetime import datetime, timedelta
import pytz
import os
class MlFlow:
mlflow_tracking_uri = "https://qa.unifytwin.com/mlflow/"
mlflow_tracking_username = "mlflow"
mlflow_tracking_password = "MlFlOwQA#4321"
azure_storage_connection_string = "DefaultEndpointsProtocol=https;AccountName=azrmlilensqa006382180551;AccountKey=tDGOKfiZ2svfoMvVmS0Fbpf0FTHfTq4wKYuDX7cAxlhve/3991QuzdvJHm9vWc+lo6mtC+x9yPSghWNR4+gacg==;EndpointSuffix=core.windows.net"
azure_storage_access_key = "tDGOKfiZ2svfoMvVmS0Fbpf0FTHfTq4wKYuDX7cAxlhve/3991QuzdvJHm9vWc+lo6mtC+x9yPSghWNR4+gacg=="
user = "BGrimm_degradation"
experiment_name = "Bgrimm-String-Efficiency"
run_name = "Efficiency"
model_name = "versioning"
check_param = "hours"
model_check_param = 10
class ReqTimeZone:
required_tz = os.getenv("REQUIRED_TZ")
mlflow_tracking_uri = MlFlow.mlflow_tracking_uri
os.environ["MLFLOW_TRACKING_USERNAME"] = MlFlow.mlflow_tracking_username
os.environ["MLFLOW_TRACKING_PASSWORD"] = MlFlow.mlflow_tracking_password
os.environ["AZURE_STORAGE_CONNECTION_STRING"] = MlFlow.azure_storage_connection_string
os.environ["AZURE_STORAGE_ACCESS_KEY"] = MlFlow.azure_storage_access_key
mlflow.set_tracking_uri(mlflow_tracking_uri)
mlflow.set_registry_uri(mlflow_tracking_uri)
client = mlflow.tracking.MlflowClient()
class KairosDb:
metric_name = os.getenv("METRIC_NAME")
uri = os.getenv("KAIROS_URI")
aggregator = os.getenv("AGGREGATOR")
aggregator_value = os.getenv("AGGREGATOR_VALUE")
aggregator_unit = os.getenv("AGGREGATOR_UNIT")
class DateRange:
start_relative_days = int(os.getenv("START_RELATIVE"))
start_hour_relative = int(os.getenv("START_HOUR_RELATIVE"))
day_starting_hour = int(os.getenv("DAY_STARTING_HOUR"))
start_minute_relative = int(os.getenv("START_MINUTE_RELATIVE"))
start_second_relative = int(os.getenv("START_SECOND_RELATIVE"))
end_relative_days = int(os.getenv("END_RELATIVE"))
end_hour_relative = int(os.getenv("END_HOUR_RELATIVE"))
end_minute_relative = int(os.getenv("END_MINUTE_RELATIVE"))
end_second_relative = int(os.getenv("END_SECOND_RELATIVE"))
class ReqTimeZone:
required_tz = os.getenv("REQUIRED_TZ")
try:
start_date = (datetime.now(pytz.utc) - timedelta(days=int(DateRange.start_relative_days),
hours=int(DateRange.start_hour_relative))). \
astimezone(pytz.timezone(ReqTimeZone.required_tz)).replace(hour=int(DateRange.day_starting_hour),
minute=int(DateRange.start_minute_relative),
second=int(DateRange.start_second_relative),
microsecond=0)
end_date = (datetime.now(pytz.utc) - timedelta(days=int(DateRange.end_relative_days),
hours=int(DateRange.end_hour_relative))). \
astimezone(pytz.timezone(ReqTimeZone.required_tz)).replace(minute=int(DateRange.end_minute_relative),
second=int(DateRange.end_second_relative),
microsecond=0)
start_timestamp = int(start_date.timestamp()) * 1000
end_timestamp = int(end_date.timestamp()) * 1000
timestamp_dict = {
"start_timestamp": start_timestamp,
"end_timestamp": end_timestamp}
print(timestamp_dict)
except Exception as e:
logger.exception(f'Exception - {e}')
class KairosQuery:
def __init__(self, start_timestamp, end_timestamp, tag_dict):
self.start_timestamp = start_timestamp
self.end_timestamp = end_timestamp
self.kairos_host = KairosDb.uri
self.kairos_url = "{kairos_host}/api/v1/datapoints/query".format(kairos_host=self.kairos_host)
self.tag_dict = tag_dict
def kairos_query(self):
try:
return {
"metrics": [
{
"tags": {
"c3": list(self.tag_dict.keys())
},
"name": KairosDb.metric_name,
"group_by": [
{
"name": "tag",
"tags": ["c3"]
}
],
"aggregators": [
{
"name": KairosDb.aggregator,
"sampling": {
"value": KairosDb.aggregator_value,
"unit": KairosDb.aggregator_unit
},
"align_sampling": True,
"align_start_time": True
}
]
}
],
"plugins": [],
"cache_time": 0,
"time_zone": "Asia/Calcutta",
"start_absolute": self.start_timestamp,
"end_absolute": self.end_timestamp,
}
except Exception as e:
logger.exception(f"Exception - {e}")
def get_data(self, data_query):
try:
logger.info("Data for the parameters being pulled from Kairos Database")
response = requests.post(self.kairos_url, data=json.dumps(data_query))
response_data = response.json()
output_data = response_data["queries"]
logger.debug("Data pull complete")
df_final = pd.DataFrame()
for i in range(len(output_data)):
grouped_output_data = output_data[i]["results"]
for each_grouped_data in grouped_output_data:
value = (each_grouped_data["values"])
tag_id = each_grouped_data["group_by"][0]["group"]["c3"]
try:
logger.debug("Renamed {} to {} in Data".format(tag_id, self.tag_dict[tag_id]))
column_name = self.tag_dict[tag_id]
except KeyError as e:
logger.exception(f'Exception - {e}')
logger.debug("Column Renaming Logic not found for {}".format(tag_id))
column_name = tag_id
df_column_data = pd.DataFrame(data=value, columns=[CommonConstants.timestamp, column_name])
if df_final.empty:
df_final = df_column_data
else:
df_final = df_final.merge(df_column_data, how="outer", left_on=CommonConstants.timestamp,
right_on=CommonConstants.timestamp)
df_final[CommonConstants.datetime] = \
pd.to_datetime(df_final[CommonConstants.timestamp], unit="ms").dt.tz_localize('UTC'). \
dt.tz_convert(CommonConstants.asia_bangkok_timezone)
logger.debug("Final number of columns : {}".format(str(len(list(df_final.columns)))))
return df_final
except Exception as e:
logger.exception(f"Exception occurred - {e}", exc_info=True)
def kairos_data_import(self):
try:
logger.debug("Fetching live data")
query_live = self.kairos_query()
logger.info(f"query_live = {query_live}")
df = self.get_data(data_query=query_live)
return df
except Exception as e:
logger.exception(f"Exception - {e}")
def __del__(self):
try:
print('destructor called, KairosQuery die!')
except Exception as e:
logger.exception(f'Exception - {e}')
class ReformatKairosData:
@staticmethod
def get_tags_data(df_input_tags, start_timestamp, end_timestamp, inv_id, mppt_id, city):
try:
gc.collect()
tracemalloc.clear_traces()
df_tags_id = df_input_tags[[CommonConstants.tag_id, CommonConstants.tag_name, CommonConstants.inv_id,
CommonConstants.parameter_name, CommonConstants.mppt_id]]
df_tags_id.reset_index(drop=True, inplace=True)
current_voltage_tags_only = \
[data for data in df_tags_id[CommonConstants.parameter_name]
if any([x in data for x in [CommonConstants.voltage, CommonConstants.current]])]
req_data_list = [data for data in current_voltage_tags_only if CommonConstants.Potential not in data]
req_data_list = [data for data in req_data_list if CommonConstants.Degradation not in data]
df_req_tags_id = df_tags_id.loc[df_tags_id[CommonConstants.parameter_name].isin(req_data_list)]
df_req_tags_id.reset_index(drop=True, inplace=True)
tags_dict = df_req_tags_id[[CommonConstants.tag_id, CommonConstants.parameter_name]].set_index(
CommonConstants.tag_id).T.to_dict(orient="records")[0]
sites = {
"bgrimmchonburi": {
"tilt_irradiance": "site_101$dept_102$line_105$equipment_220$tag_478"
}
}
tags_dict[sites[city][CommonConstants.tilt_irradiance]] = CommonConstants.tilt_irradiance
del df_req_tags_id
get_kairos_query = KairosQuery(start_timestamp=start_timestamp, end_timestamp=end_timestamp,
tag_dict=tags_dict)
df_data = get_kairos_query.kairos_data_import()
for tag in current_voltage_tags_only:
if CommonConstants.voltage_mppt in tag:
df_data.rename(columns={tag: CommonConstants.voltage_mppt}, inplace=True)
elif CommonConstants.current_mppt in tag:
df_data.rename(columns={tag: CommonConstants.current_mppt}, inplace=True)
df_data[CommonConstants.inv_id] = inv_id
df_data[CommonConstants.mppt_id] = mppt_id
df_data[CommonConstants.date] = df_data[CommonConstants.datetime].dt.date
df_data[CommonConstants.hour] = df_data[CommonConstants.datetime].dt.hour
df_data.drop([CommonConstants.date], axis=1, inplace=True)
logger.info(f'shape of the input dataframe for {inv_id} & {mppt_id} = {df_data.shape}')
logger.info(f'columns present in dataframe for {inv_id} & {mppt_id} - {list(df_data.columns)}')
df_data.loc[df_data[CommonConstants.tilt_irradiance] <= 0, CommonConstants.tilt_irradiance] = 0
df_data.reset_index(drop=True, inplace=True)
tracemalloc.clear_traces()
del get_kairos_query
del current_voltage_tags_only
del req_data_list
del df_tags_id
return df_data
except Exception as e:
logger.exception(f'Exception - {e}')
def __del__(self):
try:
print('destructor called, ReformatKairosData die!')
except Exception as e:
logger.exception(f'Exception - {e}')
class ModelLoad(object):
def model_manager(self, inv_id, city, panel_id):
try:
gc.collect()
tracemalloc.clear_traces()
tracemalloc.get_traced_memory()
experiment_id = self.create_experiment(experiment_name=MlFlow.experiment_name)
try:
run_df = mlflow.search_runs([experiment_id], filter_string=f"run_name='Efficiency_{city}'")
site_id = run_df['run_id'].values[0]
except Exception as e:
site_id = None
days, latest_run_id = self.fetch_latest_model(experiment_id=experiment_id,
run_name=MlFlow.run_name + '_' + panel_id + '_' + inv_id,
# run_name=panel_id + '_' + MlFlow.run_name + '_' + inv_id,
site_id=site_id)
logger.debug(f'loading the pretrained model !')
energy_model = self.load_model_pyfunc(
model_path=self.forming_loading_path(latest_run_id=latest_run_id))
return energy_model
except Exception as e:
logger.exception(f'Exception - {str(e)}')
@staticmethod
def create_experiment(experiment_name):
"""
Function is to create an experiment by passing experiment name
:param experiment_name: Name of the experiment
:return: Experiment id, Run id if any parent run is existing
"""
try:
experiment = mlflow.get_experiment_by_name(experiment_name)
if experiment:
exp_id = experiment.experiment_id
else:
mlflow.set_experiment(experiment_name)
experiment = mlflow.get_experiment_by_name(experiment_name)
exp_id = experiment.experiment_id
return exp_id
except Exception as e:
logger.exception(str(e))
@staticmethod
def creating_run(experiment_id, run_id=None, run_name=None, nested=False):
try:
latest_run_id = None
if run_id:
df = mlflow.search_runs([experiment_id])
run_id_list = list(df["run_id"])
if run_id in run_id_list:
return run_id
else:
run = client.create_run(experiment_id)
with mlflow.start_run(
experiment_id=experiment_id, run_name=run_name, run_id=run.info.run_id,
nested=nested) as run:
return run.info.run_id
elif run_name:
df = mlflow.search_runs([experiment_id])
if df.empty:
run = client.create_run(experiment_id=experiment_id, tags={"mlflow.runName": run_name,
"mlflow.user": MlFlow.user})
with mlflow.start_run(
experiment_id=experiment_id, run_id=run.info.run_id, run_name=run_name,
nested=nested) as run:
return run.info.run_id
else:
for index, row in df.iterrows():
if run_name == row.get("tags.mlflow.runName", ""):
latest_run_id = row.get("run_id")
if latest_run_id:
return latest_run_id
else:
run = client.create_run(experiment_id=experiment_id, tags={"mlflow.runName": run_name,
"mlflow.user": MlFlow.user})
with mlflow.start_run(
experiment_id=experiment_id, run_id=run.info.run_id, run_name=run_name,
nested=nested) as run:
return run.info.run_id
except Exception as e:
logger.exception(str(e))
@staticmethod
def creating_new_nested_run(experiment_id, run_id=None, run_name=None, nested=False):
"""
Function is to create a nested run
:param experiment_id: Experiment Id
:param run_id: run id
:param nested: nested Run
:return : return nested run id
"""
try:
with mlflow.start_run(experiment_id=experiment_id, run_id=run_id, nested=nested):
with mlflow.start_run(experiment_id=experiment_id, nested=True, run_name=run_name) as run:
return run.info.run_id
except Exception as e:
logger.exception(str(e))
@staticmethod
def log_model(model, model_name):
"""
Function is to log the model
:param model : model
:param model_name : model_name
:return: Boolean Value
"""
try:
mlflow.sklearn.log_model(model, model_name)
logger.info("logged the model")
return True
except Exception as e:
logger.exception(str(e))
@staticmethod
def log_metrics(metrics):
"""
Function is to log the metrics
:param metrics: dict of metrics
:return: Boolen value
"""
try:
updated_metric = dict()
for key, value in metrics.items():
key = re.sub(r"[\([{})\]]", "", key)
updated_metric[key] = value
mlflow.log_metrics(updated_metric)
logger.debug(f'logged the model metric')
return True
except Exception as e:
logger.exception(str(e))
@staticmethod
def log_hyper_param(hyperparameters):
"""
Function is to log the hyper params
:param hyperparameters: dict of hyperparameters
:return: Boolen value
"""
try:
mlflow.log_params(hyperparameters)
logger.debug(f'logged model hyper parameters')
return True
except Exception as e:
logger.exception(str(e))
def fetch_latest_model(self, experiment_id, run_name, site_id):
"""
Function is to fetch the latest run
:param experiment_id: Experiment Id
:return: return the difference in the days/Hours/Minutes of current and run time, latest run id
"""
try:
days = int(MlFlow.model_check_param) + 1
model_history = ""
latest_run_id = ""
if experiment_id:
run_id = self.get_parent_run_id(experiment_id, run_name, site_id)
run_info = mlflow.search_runs([experiment_id],
filter_string="tags.mlflow.parentRunId='{run_id}'".format(
run_id=run_id))
if not run_info.empty:
for ind in run_info.index:
model_history, days, latest_run_id = self.check_model_existing(run_info=run_info,
index=ind)
if model_history is not None:
break
if model_history is None:
days = int(MlFlow.model_check_param) + 1
logger.info("No Model is existing with this experiment")
return days, latest_run_id
except Exception as e:
logger.exception(f"Exception while fetching the latest model - {e}")
@staticmethod
def get_parent_run_id(experiment_id, run_name, site_id):
"""
Function is to fetch latest parent run id if available else latest run id
:param experiment_id: Experiment Id
:param run_name: Name of the run
:return: latest parent run id
"""
try:
result_run_id = None
df = mlflow.search_runs([experiment_id])
if site_id is None:
return None
else:
df = df[df['tags.mlflow.parentRunId'] == site_id]
df.reset_index(drop=True, inplace=True)
for index, row in df.iterrows():
parent_run_name = row.get("tags.mlflow.runName")
if parent_run_name == run_name:
result_run_id = row.get("run_id")
else:
logger.info(f"No Run is existing with this Experiment id - {experiment_id}")
return result_run_id
except Exception as e:
logger.exception(f"Exception while fetching the latest run_id - {e}")
def check_model_existing(self, run_info, index):
"""
Function is to check if model is existing or not
:param run_info: Dataframe of run details
:param index: index of which run from the dataframe
:return:
"""
try:
model_history = None
date_param = MlFlow.check_param
# Difference between the current date and latest available model date
days = self.format_mlflow_time(run_info=run_info, index=index, date_param=date_param)
latest_run_id = run_info.loc[index, 'run_id']
if 'tags.mlflow.log-model.history' in run_info:
model_history = run_info['tags.mlflow.log-model.history'][index]
if model_history:
model_history_list = model_history.split(":")
model_history = model_history_list[2].split(",")[0]
else:
logger.info("No Model is existing")
return model_history, days, latest_run_id
except Exception as e:
logger.exception(f"Exception while checking the model name - {e}")
@staticmethod
def forming_loading_path(latest_run_id):
"""
Function is to form the loading path
:param latest_run_id: Run id
:return : Return the loading path
"""
try:
model_name = MlFlow.model_name
model_path = f"runs:/{latest_run_id}/{model_name}"
return model_path
except Exception as e:
logger.exception(f"Exception while forming loading path - {e}")
@staticmethod
def format_mlflow_time(run_info, index, date_param):
"""
Formatting mlflow time
:param run_info: details of the runs
:param index: index of the run in the dataframe
:param: What type of the date param
:return: calculate the time difference between the mlflow time and the current time zone
"""
try:
df_time = run_info.copy()
df_time['end_time'] = pd.to_datetime(df_time['end_time']).dt.tz_convert(ReqTimeZone.required_tz)
df_time["days"] = df_time['end_time'].dt.date
df_time["hours"] = df_time['end_time'].dt.hour
df_required = df_time.iloc[index:index + 1:, :]
df_required.reset_index(drop=True, inplace=True)
last_model_time = df_required['end_time'][0].to_pydatetime()
central_current = datetime.now(pytz.utc).astimezone(pytz.timezone(ReqTimeZone.required_tz))
time_diff = central_current - last_model_time
if date_param.lower() == "days":
days_diff = int(time_diff.days)
return days_diff
elif date_param.lower() == "hours":
hours_diff = int(time_diff.total_seconds() // 3600)
return hours_diff
elif date_param.lower() == "minutes":
minutes_diff = int(time_diff.total_seconds() // 60)
return minutes_diff
else:
logger.info("No Valid Date format was given")
except Exception as e:
logger.exception(f"Exception while Loading the model - {e}")
@staticmethod
def set_tag(run_id, key, value):
"""
Function is to set the tag for a particular run
:param run_id: Run id in which the tags need to be added
:param key: Name of the key
:param value: what needs to tagged in the value
"""
try:
client.set_tag(run_id=run_id, key=key, value=value)
logger.debug(f'set the tag for the model')
except Exception as e:
logger.exception(f"Exception while setting the tag - {e}")
@staticmethod
def load_model_pyfunc(model_path):
"""
Function is load the sklearn model
:param model_path: path of the model
:return: boolen value
"""
try:
model = mlflow.pyfunc.load_model(model_path)
logger.info("loading the model")
return model
except Exception as e:
logger.exception(str(e))
def __del__(self):
try:
logger.info('destructor called, ModelLoad die!')
except Exception as e:
logger.exception(f'Exception - {e}')
class Kafka:
kafka_host = os.getenv('KAFKA_HOST')
kafka_port = os.getenv('KAFKA_PORT')
kafka_topic = os.getenv('KAFKA_TOPIC')
class KafkaProducerUtil:
def __init__(self):
try:
self.host = Kafka.kafka_host
self.port = Kafka.kafka_port
logger.debug(f"Connecting to Kafka with details: {self.host}, {self.port}")
kafka_broker = [self.host + ":" + str(self.port)]
self.producer = KafkaProducer(
bootstrap_servers=kafka_broker,
value_serializer=lambda v: v.encode('utf-8'),
api_version=(0, 10, 1))
self.producer.flush()
except Exception as e:
logger.error(f"Kafka connection error: {e}")
def publish(self, topic, data):
try:
kafka_response = self.producer.send(topic, data)
self.producer.flush()
logger.debug(f" Message sent to kafka with response: {kafka_response}")
return True
except Exception as e:
logger.error(e)
return False
class KairosWriter(KafkaProducerUtil):
def write_data(self, data_json, topic):
site_id = "site_101"
logger.debug(f"Data being pushed to kafka topic: {topic}")
msg_counter = 0
for k, v in data_json.items():
timestamp, data = self.data_validator(k, v)
timestamp = timestamp * 1000
write_json = {
"data": data,
"site_id": site_id,
"gw_id": "gw_{}".format(site_id.lstrip("site_")),
# The lstrip(s) removes leading whitespace (on the left)
"pd_id": "pd_{}".format(site_id.lstrip("site_")),
# The rstrip(s) removes the trailing whitespace (on the right)
"timestamp": timestamp,
"msg_id": msg_counter,
"partition": "",
"retain_flag": False
}
logger.debug(f"Timestamp: {timestamp}, Values: {data}")
self.publish(topic, dumps(write_json))
msg_counter += 1
return msg_counter
def audit_data(self, data_json, topic):
logger.debug(f"Audit Data being pushed to kafka topic: {topic}")
msg_counter = len(data_json)
for each in data_json:
self.publish(topic, dumps(each))
return msg_counter
@staticmethod
def data_validator(timestamp, data):
logger.debug("Validating the data to remove Nan values")
__temp__ = {}
for k, v in data.items():
if not k.startswith("site"):
continue
# This function will return True if the "v" is one of the types in the tuple
if isinstance(v, (int, float)) and str(v) not in ('nan', 'inf'):
__temp__[k] = v
return int(timestamp), __temp__
def insert_values_bgrimm(ts, my_dict):
kairos_writer = KairosWriter()
kairos_writer.write_data(
{
ts: my_dict
},
Kafka.kafka_topic
)
logger.info("Data pushed successfully!")
class CalculatedDataPush:
def __init__(self, df_predicted):
self.df_predicted = df_predicted
def kafka_data_push(self):
try:
logger.info(f'Pushing data for inv_mppt - '
f'{list(self.df_predicted[CommonConstants.inv_id_mppt_id].unique())}')
logger.info(f"df predicted shape = {self.df_predicted.shape}")
df_push = self.df_predicted[[CommonConstants.datetime, CommonConstants.timestamp,
CommonConstants.predicted_current_mppt,
CommonConstants.efficiency_mppt,
CommonConstants.predicted_current_mppt_tag,
CommonConstants.efficiency_mppt_tag,
CommonConstants.hour]]
hour_for_latest_data = datetime.now(pytz.utc).astimezone(pytz.timezone('Asia/Kolkata')).hour - 1
df_push = df_push[df_push[CommonConstants.hour] >= hour_for_latest_data]
df_push.drop([CommonConstants.hour], axis=1, inplace=True)
df_push.reset_index(drop=True, inplace=True)
predicted_current_mppt_tag = \
list(self.df_predicted[CommonConstants.predicted_current_mppt_tag].unique())[0]
efficiency_mppt_tag = list(self.df_predicted[CommonConstants.efficiency_mppt_tag].unique())[0]
tags_dict = {CommonConstants.predicted_current_mppt: predicted_current_mppt_tag,
CommonConstants.efficiency_mppt: efficiency_mppt_tag}
df_push = df_push.round(3)
for i, j in df_push.iterrows():
my_dict = {v: j[k] for k, v in tags_dict.items()}
logger.info(f"{j[CommonConstants.timestamp], j[CommonConstants.datetime], my_dict}")
insert_values_bgrimm(j[CommonConstants.timestamp], my_dict)
logger.info(f'data pushed for datetime - {j[CommonConstants.datetime]}')
logger.info(f'Pushed data for {list(self.df_predicted[CommonConstants.inv_id_mppt_id].unique())}')
except Exception as e:
logger.exception(f'Exception - {e}')
def __del__(self):
try:
print('destructor called, CalculatedDataPush die!')
except Exception as e:
logger.exception(f'Exception - {e}')
def calculations_and_data_push(x_test, predictions, plant_efficiency_dict, inv_id,
df_inv_tags, inv_level_efficiency_tags, panel_id):
try:
logger.info(f'concatenating actual current mppt & predicted current mppt')
df_result = pd.DataFrame(index=[i for i in range(len(x_test))])
df_result[CommonConstants.datetime] = x_test[CommonConstants.datetime]
df_result[CommonConstants.actual_current_mppt] = x_test[CommonConstants.current_mppt]
df_result[CommonConstants.predicted_current_mppt] = predictions.reshape(-1, 1)
df_result[CommonConstants.hour] = df_result[CommonConstants.datetime].dt.hour
df_result[CommonConstants.inv_id_mppt_id] = x_test[CommonConstants.inv_id_mppt_id]
df_result.loc[df_result[CommonConstants.predicted_current_mppt] <= 0,
CommonConstants.predicted_current_mppt] = 0
# coefficients = GetCoefficientMultiplier()
# df_result = coefficients.multiply_mppt_coefficients(df_predicted=df_result,
# df_coefficients=self.df_coefficients)
date_inv_dict = dict()
logger.info(f'data to be push for the time - {list(df_result[CommonConstants.datetime].unique())}')
for inv_id_mppt_id in list(df_result[CommonConstants.inv_id_mppt_id].unique()):
df = df_result.copy()
df = df[df[CommonConstants.inv_id_mppt_id] == inv_id_mppt_id]
df.reset_index(drop=True, inplace=True)
df[CommonConstants.cumulative_actual_current_mppt] = \
df[CommonConstants.actual_current_mppt].cumsum()
df[CommonConstants.cumulative_predicted_current_mppt] = \
df[CommonConstants.predicted_current_mppt].cumsum()
# logger.info(f'multiplying the coefficients with the predicted current mppt for {inv_id}')
# coefficients = GetCoefficientMultiplier()
# _df = coefficients.multiply_mppt_coefficients(df_predicted=df, df_coefficients=self.df_coefficients)
logger.info(f'calculating the mppt efficiency for {inv_id}')
df[CommonConstants.efficiency_mppt] \
= ((df[CommonConstants.cumulative_actual_current_mppt]) * 100) / \
df[CommonConstants.cumulative_predicted_current_mppt]
df.loc[df[CommonConstants.efficiency_mppt] >= 100, CommonConstants.efficiency_mppt] = 100
df.loc[df[CommonConstants.cumulative_actual_current_mppt].isnull(),
CommonConstants.efficiency_mppt] = None
df.iloc[df[CommonConstants.hour].between(CommonConstants.skip_time.get("morning").get("start"),
CommonConstants.skip_time.get("morning").get("end")),
df.columns.get_loc(CommonConstants.predicted_current_mppt)] = 0
df.iloc[df[CommonConstants.hour].between(CommonConstants.skip_time.get("evening").get("start"),
CommonConstants.skip_time.get("evening").get("end")),
df.columns.get_loc(CommonConstants.predicted_current_mppt)] = 0
df.loc[df[CommonConstants.cumulative_predicted_current_mppt] == 0,
CommonConstants.efficiency_mppt] = np.nan
# df.iloc[df[CommonConstants.hour].between(), df.columns.get_loc(
# CommonConstants.predicted_current_mppt)] = 0
df_predicted = pd.merge(df, df_inv_tags.drop([CommonConstants.tag_name, CommonConstants.inv_id,
CommonConstants.parameter_name, CommonConstants.mppt_id],
axis=1), on=CommonConstants.inv_id_mppt_id, how='left')
df_predicted.reset_index(drop=True, inplace=True)
df_predicted[CommonConstants.timestamp] = \
df_predicted[CommonConstants.datetime].values.astype(np.int64) / 10 ** 9
df_predicted[CommonConstants.timestamp] = df_predicted[CommonConstants.timestamp].astype('int')
df_predicted = df_predicted.round(3)
logger.info(f'mppt efficiency pushing for {inv_id} for time - {inv_id_mppt_id}')
data_push = CalculatedDataPush(df_predicted=df_predicted)
data_push.kafka_data_push()
logger.info(f'calculating the inv level efficiency for {inv_id}')
df_inv_eff = pd.DataFrame()
df_inv_eff[CommonConstants.datetime] = df_result[CommonConstants.datetime]
df_inv_eff.drop_duplicates(subset=[CommonConstants.datetime], keep='first', inplace=True)
df_inv_eff.reset_index(drop=True, inplace=True)
for inv_id_mppt_id in list(df_result[CommonConstants.inv_id_mppt_id].unique()):
df_inv_mppt = df_result[df_result[CommonConstants.inv_id_mppt_id] == inv_id_mppt_id]
cumulative_actual_current_inv = f'{inv_id_mppt_id}_{CommonConstants.cumulative_actual_current_mppt}'
cumulative_predict_current_inv = f'{inv_id_mppt_id}_{CommonConstants.cumulative_predicted_current_mppt}'
df_inv_eff = pd.merge(df_inv_eff, df_inv_mppt[[CommonConstants.datetime,
CommonConstants.actual_current_mppt,
CommonConstants.predicted_current_mppt]],
on=CommonConstants.datetime, how='left')
# df_inv_eff.dropna(axis=0, inplace=True)
df_inv_eff.reset_index(drop=True, inplace=True)
df_inv_eff[cumulative_actual_current_inv] = df_inv_eff[CommonConstants.actual_current_mppt].cumsum()
df_inv_eff[cumulative_predict_current_inv] = \
df_inv_eff[CommonConstants.predicted_current_mppt].cumsum()
df_inv_eff.drop([CommonConstants.actual_current_mppt, CommonConstants.predicted_current_mppt],
axis=1, inplace=True)
cumulative_actual_current_cols = [col for col in df_inv_eff.columns
if CommonConstants.cumulative_actual_current_mppt in col]
cumulative_predict_current_cols = [col for col in df_inv_eff.columns
if CommonConstants.cumulative_predicted_current_mppt in col]
df_inv_eff_push = pd.DataFrame()
df_inv_eff_push[CommonConstants.datetime] = df_inv_eff[CommonConstants.datetime]
df_inv_eff_push[CommonConstants.cumulative_actual_current_mppt] = \
df_inv_eff[cumulative_actual_current_cols].sum(axis=1)
df_inv_eff_push[CommonConstants.cumulative_predicted_current_mppt] = \
df_inv_eff[cumulative_predict_current_cols].sum(axis=1)
tags_dict = \
{CommonConstants.efficiency_inv: list(inv_level_efficiency_tags.loc[
inv_level_efficiency_tags[CommonConstants.inv_id] == inv_id,
CommonConstants.tag_id].values)[0]}
df_inv_eff_push[CommonConstants.efficiency_inv] = \
df_inv_eff_push[CommonConstants.cumulative_actual_current_mppt] * 100 / \
df_inv_eff_push[CommonConstants.cumulative_predicted_current_mppt]
df_inv_eff_push.loc[df_inv_eff_push[CommonConstants.efficiency_inv] > 100,
CommonConstants.efficiency_inv] = 100
df_inv_eff_push[CommonConstants.timestamp] = \
df_inv_eff_push[CommonConstants.datetime].values.astype(np.int64) / 10 ** 9
df_inv_eff_push[CommonConstants.timestamp] = df_inv_eff_push[CommonConstants.timestamp].astype('int')
logger.info(f'{df_inv_eff_push.shape}')
df_inv_eff_push = df_inv_eff_push.round(3)
df_inv_eff_latest = df_inv_eff_push.copy()
df_inv_eff_latest[CommonConstants.hour] = df_inv_eff_latest[CommonConstants.datetime].dt.hour
hour_for_latest_data = datetime.now(pytz.utc).astimezone(pytz.timezone('Asia/Kolkata')).hour - 1
df_inv_eff_latest = df_inv_eff_latest[df_inv_eff_latest[CommonConstants.hour] >= hour_for_latest_data]
df_inv_eff_latest.drop([CommonConstants.hour], axis=1, inplace=True)
df_inv_eff_latest.reset_index(drop=True, inplace=True)
for i, j in df_inv_eff_latest.iterrows():
my_dict = {v: j[k] for k, v in tags_dict.items()}
logger.info(f"{j[CommonConstants.timestamp], j[CommonConstants.datetime], my_dict}")
insert_values_bgrimm(j[CommonConstants.timestamp], my_dict)
logger.info(f'data pushed for datetime - {j[CommonConstants.datetime]}')
logger.info(f'{inv_id} efficiency has been pushed !')
for index in range(df_inv_eff_push.shape[0]):
date = df_inv_eff_push.loc[index, CommonConstants.datetime]
date_inv_dict[date] = {CommonConstants.cumulative_actual_current_mppt: df_inv_eff_push.loc[
index, CommonConstants.cumulative_actual_current_mppt],
CommonConstants.cumulative_predicted_current_mppt: df_inv_eff_push.loc[
index, CommonConstants.cumulative_predicted_current_mppt]}
plant_efficiency_dict[panel_id + '_' + inv_id] = date_inv_dict
return plant_efficiency_dict
except Exception as e:
logger.exception(f'Exception - {e}')
class Inference:
def __init__(self, df, city, panel_id):
self.df = df
self.city = city
self.panel_id = panel_id
def data_inference(self, inv_id):
try:
df_test_mppt = self.df[[CommonConstants.datetime, CommonConstants.tilt_irradiance,
CommonConstants.voltage_mppt, CommonConstants.current_mppt,
CommonConstants.hour, CommonConstants.inv_id_mppt_id]]
df_test_mppt.reset_index(drop=True, inplace=True)
x_test = df_test_mppt[[CommonConstants.datetime, CommonConstants.tilt_irradiance,
CommonConstants.voltage_mppt, CommonConstants.hour, CommonConstants.current_mppt,
CommonConstants.inv_id_mppt_id]]
logger.debug(f'shape of x_test for {inv_id} before removing null rows - {x_test.shape}')
logger.info(f'total null values in inference dataframe for {inv_id} - {x_test.isnull().sum()}')
model_load = ModelLoad()
model = model_load.model_manager(inv_id=inv_id, city=self.city, panel_id=self.panel_id)
del model_load
x_test.reset_index(drop=True, inplace=True)
x_test = x_test.loc[x_test[CommonConstants.voltage_mppt].notna()]
x_test.reset_index(drop=True, inplace=True)
logger.debug(f'shape of x_test for {inv_id} after removing null rows - {x_test.shape}')
predictions = model.predict(x_test.drop([CommonConstants.datetime, CommonConstants.inv_id_mppt_id,
CommonConstants.current_mppt], axis=1)).reshape(1, -1)
logger.debug(f'predictions shape - {predictions.shape}')
return x_test, predictions
except Exception as e:
logger.exception(f'Exception - {e}')
def __del__(self):
try:
print('destructor called, Inference die!')
except Exception as e:
logger.exception(f'Exception - {e}')
try:
# with open(input_path, 'r') as f:
# get_tags_component_output_dict = json.load(f)
# print(get_tags_component_output_dict)
with open(get_tags_component_output, 'r') as f:
get_tags_component_output_dict = json.load(f)
with open(get_final_predicted_tags, 'r') as f:
get_final_predicted_tags_dict = json.load(f)
with open(get_inv_level_efficiency_tags, 'r') as f:
get_inv_level_efficiency_tags_dict = json.load(f)
df_raw_tags = get_tags_component_output_dict.get("df_raw_tags")
df_raw_tags = pd.DataFrame(df_raw_tags)
df_predicted_and_efficiency_tags = get_final_predicted_tags_dict.get("df_predicted_and_efficiency_tags")
df_predicted_and_efficiency_tags = pd.DataFrame(df_predicted_and_efficiency_tags)
inv_level_efficiency_tags = get_inv_level_efficiency_tags_dict.get("inv_level_efficiency_tags")
inv_level_efficiency_tags = pd.DataFrame(inv_level_efficiency_tags)
start_timestamp = timestamp_dict.get("start_timestamp")
end_timestamp = timestamp_dict.get("end_timestamp")
city = "bgrimmchonburi"
plant_efficiency_dict = dict()
logger.info(f'total inv available in {city} site are - '
f'{df_raw_tags[CommonConstants.inv_id].unique()}')
for each_panel in list(df_raw_tags['sub_id'].unique()):
df_raw_tags_copy = df_raw_tags.copy()
df_raw_tags_copy = df_raw_tags_copy[df_raw_tags_copy[CommonConstants.sub_id] == each_panel]
for inv_id in list(df_raw_tags_copy[CommonConstants.inv_id].unique()):
df = df_raw_tags_copy[df_raw_tags_copy[CommonConstants.inv_id] == inv_id]
df_inv = pd.DataFrame()
logger.info(f'total mppt available for {each_panel} - {inv_id} are - '
f'{df[CommonConstants.mppt_id].unique()}')
for mppt_id in list(df[CommonConstants.mppt_id].unique()):
df_mppt_level = df[df[CommonConstants.mppt_id] == mppt_id]
reformat_kairos_data = ReformatKairosData()
if mppt_id != 'mppt_1':
volatge_tag_row = df[(df[CommonConstants.mppt_id] == 'mppt_1') & (df['unit'] == 'Voltage')]
volatge_tag_row.reset_index(drop=True, inplace=True)
df_mppt_level = df_mppt_level.append(volatge_tag_row, ignore_index=True)
df_kairos_data = reformat_kairos_data.get_tags_data(df_input_tags=df_mppt_level,
start_timestamp=start_timestamp,
end_timestamp=end_timestamp,
inv_id=inv_id, mppt_id=mppt_id,
city=city)
if df_inv.empty:
df_inv = df_kairos_data
else:
df_inv = pd.concat([df_inv, df_kairos_data], ignore_index=True)
logger.info(f'shape of input data for {each_panel} - {inv_id} & {mppt_id} - {df_inv.shape}')
logger.info(f'data fetching for {each_panel} - {inv_id} has been completed')
logger.info(f'final shape of input data after concatenation for overall '
f'{inv_id} - {df_inv.shape}')
df_predicted_and_efficiency_tags_copy = df_predicted_and_efficiency_tags.copy()
df_predicted_and_efficiency_tags_copy = df_predicted_and_efficiency_tags_copy[
df_predicted_and_efficiency_tags_copy[CommonConstants.sub_id] == each_panel]
df_inv_tags = df_predicted_and_efficiency_tags_copy[
df_predicted_and_efficiency_tags_copy[CommonConstants.inv_id] == inv_id]
# df_inv[CommonConstants.inv_id_mppt_id] = \
# df_inv[CommonConstants.inv_id] + '_' + df_inv[CommonConstants.mppt_id]
df_inv[CommonConstants.inv_id_mppt_id] = each_panel + '_' + df_inv[CommonConstants.inv_id] + '_' \
+ df_inv[CommonConstants.mppt_id]
try:
df_inv = df_inv[df_inv[CommonConstants.tilt_irradiance].notna()]
except Exception as e:
logger.debug(f'{CommonConstants.tilt_irradiance} is not present in dataframe for {inv_id}')
logger.exception(f'Exception - {e}')
try:
df_inv = df_inv[df_inv[CommonConstants.voltage_mppt].notna()]
except Exception as e:
logger.debug(f'{CommonConstants.voltage_mppt} is not present in dataframe for {inv_id}')
logger.exception(f'Exception - {e}')
df_inv.reset_index(drop=True, inplace=True)
logger.info(f'{inv_id} shape after removing null values - {df_inv.shape}')
try:
get_final_inference = Inference(df=df_inv, city=city, panel_id=each_panel)
x_test, predictions = get_final_inference.data_inference(inv_id=inv_id)
del get_final_inference
plant_efficiency_dict = calculations_and_data_push(x_test=x_test, predictions=predictions,
inv_id=inv_id,
plant_efficiency_dict=plant_efficiency_dict,
df_inv_tags=df_inv_tags,
inv_level_efficiency_tags=inv_level_efficiency_tags,
panel_id=each_panel)
logger.debug(f'data push for {inv_id} has been completed !')
except Exception as e:
logger.exception(f' Exception - {e}')
final_dict = {"plant_efficiency_dict": plant_efficiency_dict.to_dict(orient="records"),
"df_inv": df_inv.to_dict(orient="records")}
with open(output_path, 'w') as f:
json.dump(final_dict, f)
print(final_dict)
except Exception as e:
logger.exception(f'Exception - {e}')
deployment:
environmentVar:
- name: KAIROS_URI
value: "https://iLens:iLensCLD$456@cloud.ilens.io/kairos/"
- name: METRIC_NAME
value: "project_264__ilens.live_data.raw"
- name: AGGREGATOR
value: "max"
- name: AGGREGATOR_VALUE
value: "15"
- name: AGGREGATOR_UNIT
value: "minutes"
- name: REQUIRED_TZ
value: "Asia/Bangkok"
- name: KAFKA_HOST
value: "192.168.0.220"
- name: KAFKA_PORT
value: 9092
- name: KAFKA_TOPIC
value: "ilens_dev"
- name: START_RELATIVE
value: 0
- name: START_HOUR_RELATIVE
value: 1
- name: DAY_STARTING_HOUR
value: 5
- name: START_MINUTE_RELATIVE
value: 0
- name: START_SECOND_RELATIVE
value: 0
- name: END_RELATIVE
value: 0
- name: END_HOUR_RELATIVE
value: 1
- name: END_MINUTE_RELATIVE
value: 59
- name: END_SECOND_RELATIVE
value: 59
- name: REQUIRED_TZ
value: "Asia/Bangkok"
pythonVersion: "3.9"
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment