Commit ad5678ea authored by dasharatha.vamshi's avatar dasharatha.vamshi

added 3nd component

parent 7e48b68c
# Inv and level efficiency tags
## Overview
- **Component Name** : Inv and level efficiency tags
- **Component Description** :
- **Component Type** : Transform type
## Component Param
Variable Name |Datatype |Required/Optional |Default Value |Type |Description |Example
--- |--- |--- |--- |--- |--- |--- |
input |String |Required |None |inputPath | |
output |String |Required |None |outputPath | |
> Note 1 : Available Component types are: Input, Transform, Output.
> Note 2 : Available Environment types are: env, InputValues, InputPath, OutputValues, OutputPath, PipelineParm
\ No newline at end of file
import kfp
from loguru import logger
from src import program
import yaml
import inspect
import os
function = \
[func[1] for func in inspect.getmembers(program, inspect.isfunction) if inspect.getmodule(func[1]) == program][0]
def read_data_from_yaml(path):
"""
It opens the file at the given path, reads the contents, and then parses the contents as YAML
:param path: The path to the YAML file
:return: A dictionary
"""
with open(path, "r") as stream:
return yaml.load(stream, Loader=yaml.FullLoader)
def get_component_yml():
"""
:param file_name:
:return:
"""
try:
requirements = list()
with open('requirements.txt', 'r') as file:
for line in file:
if "=" in line and "#" not in line:
requirements.append(line.strip())
elif "#" in line:
...
else:
logger.exception(f"Mentioned package does not have version {line.strip()}")
date_function_yml = kfp.components.func_to_component_text(
function, packages_to_install=requirements)
variables_path = "variables.yml"
if os.path.exists(variables_path):
yaml_data: dict = read_data_from_yaml(variables_path)
if yaml_data:
envs: dict = yaml_data.get("deployment", {}).get("environmentVar", [])
python_version: str = yaml_data.get("deployment", {}).get("pythonVersion", None)
if python_version is not None and python_version in ["3.7", "3.8", "3.9", "3.10"]:
date_function_yml = kfp.components.func_to_component_text(
function, packages_to_install=requirements, base_image=f"python:{python_version}")
date_function = date_function_yml + f" env:\n"
for env_var in envs:
date_function += f" {env_var['name']}: '{env_var['value']}'\n"
with open('component.yml', 'w') as file:
file.write(date_function)
else:
with open('component.yml', 'w') as file:
file.write(date_function_yml)
else:
with open('component.yml', 'w') as file:
file.write(date_function_yml)
except Exception as e:
logger.exception(f"Unable to get the component yml {e}")
def create_table(data, key):
"""
:return:
"""
try:
rows_list = list()
for each_input in data.get(key, []):
rows_dict = dict()
rows_dict['name'] = each_input.get("name", '')
rows_dict['data_type'] = each_input.get('type', 'String')
if each_input.get('optional'):
req_opt = "Optional"
default_value = each_input.get('default', '')
else:
req_opt = "Required"
default_value = "None"
rows_dict['req_opt'] = req_opt
rows_dict['default_value'] = default_value
for each_arg in data.get('implementation', {}).get('container', {}).get('args', []):
if type(each_arg) == dict and rows_dict['name'] in each_arg.values():
rows_dict['Type'] = list(each_arg.keys())[0]
rows_dict['Description'] = each_input.get('description', '')
rows_dict['Example'] = ''
rows_list.append(list(rows_dict.values()))
if key == "inputs" and os.path.exists("variables.yml"):
yaml_data: dict = read_data_from_yaml("variables.yml")
if yaml_data:
env_var = yaml_data.get("deployment", {}).get("environmentVar", [])
for each in env_var:
env_dict = dict()
env_dict['name'] = each.get("name")
env_dict['data_type'] = "String"
env_dict['req_opt'] = "Required"
env_dict['default_value'] = "None"
env_dict['Type'] = "env"
env_dict['description'] = ""
env_dict['example'] = ""
rows_list.append(list(env_dict.values()))
return rows_list
except Exception as e:
logger.exception(f"Unable to create the table for README.MD file {e}")
def create_readme():
"""
Function is to create the readme file for the given components details
:return: Create the README.MD file in the given path
"""
try:
note_1 = "Note 1 : Available Component types are: Input, Transform, Output."
note_2 = "Note 2 : Available Environment types are: env, InputValues, InputPath, OutputValues," \
" OutputPath, PipelineParm"
column_list = ["Variable Name", "Datatype", "Required/Optional", "Default Value", "Type", "Description",
"Example"]
with open("component.yml", "r") as file:
data = yaml.safe_load(file)
if "inputs" in list(data.keys()) and "outputs" in list(data.keys()):
component_type = "Transform type"
elif "inputs" not in data:
component_type = "Input type"
else:
component_type = "Output type"
component_overview_json = dict()
component_overview_json['Component Name'] = data.get("name", " ")
component_overview_json['Component Description'] = data.get("description", " ")
component_overview_json['Component Type'] = component_type
rows_list_input = create_table(data, "inputs")
rows_list_output = create_table(data, "outputs")
rows_list = rows_list_input + rows_list_output
header = component_overview_json.get("Component Name")
table_header = " |".join(column_list) + "\n"
table_line = "--- |" * len(column_list) + "\n"
table_body = "\n".join(map(lambda x: " |".join(x), rows_list))
table = table_header + table_line + table_body
readme = f"""
# {header}
## {"Overview"}
- **Component Name** : {component_overview_json.get("Component Name")}
- **Component Description** : {component_overview_json.get("Component Description")}
- **Component Type** : {component_overview_json.get("Component Type")}
## Component Param
{table}
> {note_1}
> {note_2}
"""
with open('README.md', 'w') as f:
f.write(readme)
except Exception as e:
logger.exception(f"Unable to create the README.MD file {e}")
if __name__ == "__main__":
get_component_yml()
create_readme()
name: Inv and level efficiency tags
inputs:
- {name: input}
outputs:
- {name: output}
implementation:
container:
image: python:3.9
command:
- sh
- -c
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
'loguru==0.5.3' 'pandas==1.3.*' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m
pip install --quiet --no-warn-script-location 'loguru==0.5.3' 'pandas==1.3.*'
--user) && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def _make_parent_dirs_and_return_path(file_path: str):
import os
os.makedirs(os.path.dirname(file_path), exist_ok=True)
return file_path
def inv_and_level_efficiency_tags(input_path, output_path):
class CommonConstants:
dalmia_string_level_tags = 'dalmia_string_level_tags'
bgrimm_string_level_tags = 'bgrimm_string_level_tags'
panel_id = 'panel_id'
sub_id = 'sub_id'
inv_id_mppt_id = 'inv_id_mppt_id'
tags_property_raw = 'raw'
tags_property_predicted = 'predicted'
tags_property_efficiency = 'efficiency'
bgrim_tags_property_efficiency = 'Efficiency'
tags_property_efficiency_inv = 'efficiency'
tags_property_efficiency_plant = 'efficiency_plant'
mppt_coefficients = 'mppt_coefficients'
inv_id = 'inv_id'
mppt_id = 'mppt_id'
datetime = 'datetime'
predicted_current_mppt = 'predicted_current_mppt'
predicted_current_mppt_tag = 'predicted_current_mppt_tag'
actual_current_mppt = 'actual_current_mppt'
hour = 'hour'
skip_time = {"morning": {"start": 0, "end": 6},
"evening": {"start": 18, "end": 23}}
efficiency_mppt = 'efficiency_mppt'
efficiency_inv = 'efficiency_inv'
efficiency_plant = 'efficiency_plant'
tag_name = 'tag_name'
parameter_name = 'parameter_name'
timestamp = 'timestamp'
tag_id = 'tag_id'
efficiency_mppt_tag = 'efficiency_mppt_tag'
voltage = 'voltage'
current = 'current'
Potential = 'Potential'
Degradation = 'Degradation'
tilt_irradiance = 'tilt_irradiance'
voltage_mppt = 'voltage_mppt'
current_mppt = 'current_mppt'
date = 'date'
asia_kolkata_timezone = 'Asia/Kolkata'
asia_bangkok_timezone = 'Asia/Bangkok'
coefficient = 'coefficient'
cumulative_actual_current_mppt = 'cumulative_actual_current_mppt'
cumulative_predicted_current_mppt = 'cumulative_predicted_current_mppt'
day = "day"
time = "time"
import pandas as pd
from loguru import logger
import json
try:
with open(input_path, 'r') as f:
get_tags_component_output_dict = json.load(f)
print(get_tags_component_output_dict)
df_efficiency_tags = get_tags_component_output_dict.get("df_efficiency_tags")
df_efficiency_tags = pd.DataFrame(df_efficiency_tags)
df_inv_tags = \
df_efficiency_tags[(df_efficiency_tags[CommonConstants.parameter_name] ==
CommonConstants.bgrim_tags_property_efficiency) &
(df_efficiency_tags[CommonConstants.mppt_id] ==
CommonConstants.bgrim_tags_property_efficiency)]
inv_substring = 'INVERTER'
data_with_substring = [data for data in df_inv_tags['inv_id'] if inv_substring in data]
df_inv_tags = df_inv_tags.loc[df_inv_tags['inv_id'].isin(data_with_substring)]
df_inv_tags.reset_index(drop=True, inplace=True)
df_plant_tag = \
df_efficiency_tags[(df_efficiency_tags[CommonConstants.parameter_name] ==
CommonConstants.bgrim_tags_property_efficiency)
& (df_efficiency_tags[CommonConstants.mppt_id] ==
CommonConstants.bgrim_tags_property_efficiency)]
data_with_no_substring = [data for data in df_plant_tag['inv_id'] if inv_substring not in data]
df_plant_tag = df_plant_tag.loc[df_plant_tag['inv_id'].isin(data_with_no_substring)]
df_plant_tag.reset_index(drop=True, inplace=True)
plant_efficiency_tag = df_plant_tag.iloc[0, df_plant_tag.columns.get_loc(CommonConstants.tag_id)]
logger.info(f'total no of inv level efficiency tags - {df_inv_tags.shape}')
logger.info(f'total no of plant level efficiency tags - {df_plant_tag.shape}')
final_dict = {"df_inv_tags": df_inv_tags.to_dict(orient="records"),
"plant_efficiency_tag": plant_efficiency_tag}
with open(output_path, 'w') as f:
json.dump(final_dict, f)
print(final_dict)
except Exception as e:
logger.exception(f'Exception - {e}')
import argparse
_parser = argparse.ArgumentParser(prog='Inv and level efficiency tags', description='')
_parser.add_argument("--input", dest="input_path", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--output", dest="output_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_outputs = inv_and_level_efficiency_tags(**_parsed_args)
args:
- --input
- {inputPath: input}
- --output
- {outputPath: output}
env:
loguru==0.5.3
pandas==1.3.*
\ No newline at end of file
from kfp.components import InputPath, OutputPath
def inv_and_level_efficiency_tags(input_path: InputPath(), output_path: OutputPath()):
class CommonConstants:
dalmia_string_level_tags = 'dalmia_string_level_tags'
bgrimm_string_level_tags = 'bgrimm_string_level_tags'
panel_id = 'panel_id'
sub_id = 'sub_id'
inv_id_mppt_id = 'inv_id_mppt_id'
tags_property_raw = 'raw'
tags_property_predicted = 'predicted'
tags_property_efficiency = 'efficiency'
bgrim_tags_property_efficiency = 'Efficiency'
tags_property_efficiency_inv = 'efficiency'
tags_property_efficiency_plant = 'efficiency_plant'
mppt_coefficients = 'mppt_coefficients'
inv_id = 'inv_id'
mppt_id = 'mppt_id'
datetime = 'datetime'
predicted_current_mppt = 'predicted_current_mppt'
predicted_current_mppt_tag = 'predicted_current_mppt_tag'
actual_current_mppt = 'actual_current_mppt'
hour = 'hour'
skip_time = {"morning": {"start": 0, "end": 6},
"evening": {"start": 18, "end": 23}}
efficiency_mppt = 'efficiency_mppt'
efficiency_inv = 'efficiency_inv'
efficiency_plant = 'efficiency_plant'
tag_name = 'tag_name'
parameter_name = 'parameter_name'
timestamp = 'timestamp'
tag_id = 'tag_id'
efficiency_mppt_tag = 'efficiency_mppt_tag'
voltage = 'voltage'
current = 'current'
Potential = 'Potential'
Degradation = 'Degradation'
tilt_irradiance = 'tilt_irradiance'
voltage_mppt = 'voltage_mppt'
current_mppt = 'current_mppt'
date = 'date'
asia_kolkata_timezone = 'Asia/Kolkata'
asia_bangkok_timezone = 'Asia/Bangkok'
coefficient = 'coefficient'
cumulative_actual_current_mppt = 'cumulative_actual_current_mppt'
cumulative_predicted_current_mppt = 'cumulative_predicted_current_mppt'
day = "day"
time = "time"
import pandas as pd
from loguru import logger
import json
try:
with open(input_path, 'r') as f:
get_tags_component_output_dict = json.load(f)
print(get_tags_component_output_dict)
df_efficiency_tags = get_tags_component_output_dict.get("df_efficiency_tags")
df_efficiency_tags = pd.DataFrame(df_efficiency_tags)
df_inv_tags = \
df_efficiency_tags[(df_efficiency_tags[CommonConstants.parameter_name] ==
CommonConstants.bgrim_tags_property_efficiency) &
(df_efficiency_tags[CommonConstants.mppt_id] ==
CommonConstants.bgrim_tags_property_efficiency)]
inv_substring = 'INVERTER'
data_with_substring = [data for data in df_inv_tags['inv_id'] if inv_substring in data]
df_inv_tags = df_inv_tags.loc[df_inv_tags['inv_id'].isin(data_with_substring)]
df_inv_tags.reset_index(drop=True, inplace=True)
df_plant_tag = \
df_efficiency_tags[(df_efficiency_tags[CommonConstants.parameter_name] ==
CommonConstants.bgrim_tags_property_efficiency)
& (df_efficiency_tags[CommonConstants.mppt_id] ==
CommonConstants.bgrim_tags_property_efficiency)]
data_with_no_substring = [data for data in df_plant_tag['inv_id'] if inv_substring not in data]
df_plant_tag = df_plant_tag.loc[df_plant_tag['inv_id'].isin(data_with_no_substring)]
df_plant_tag.reset_index(drop=True, inplace=True)
plant_efficiency_tag = df_plant_tag.iloc[0, df_plant_tag.columns.get_loc(CommonConstants.tag_id)]
logger.info(f'total no of inv level efficiency tags - {df_inv_tags.shape}')
logger.info(f'total no of plant level efficiency tags - {df_plant_tag.shape}')
final_dict = {"df_inv_tags": df_inv_tags.to_dict(orient="records"),
"plant_efficiency_tag": plant_efficiency_tag}
with open(output_path, 'w') as f:
json.dump(final_dict, f)
print(final_dict)
except Exception as e:
logger.exception(f'Exception - {e}')
deployment:
pythonVersion: "3.9"
\ No newline at end of file
......@@ -19,12 +19,13 @@ def forecast_pipeline(pipeline_param: dict, plant_info: dict):
"input_components/get_tags_component/component.yml")
get_final_predicted_tags = kfp.components.load_component_from_file(
"input_components/get_final_predicted_tags/component.yml")
get_inv_and_level_efficiency_tags = kfp.components.load_component_from_file(
"input_components/get_inv_and_level_efficiency_tags/component.yml")
# Calling the component
get_tags_function_task = get_tags_function_component(pipeline_param).set_memory_request('600M').set_memory_limit('1200M').\
set_cpu_request('700m').set_cpu_limit('1400m')
get_final_predicted_tags_task = get_final_predicted_tags(get_tags_function_task.output)
get_inv_and_level_efficiency_tags_task = get_inv_and_level_efficiency_tags(get_tags_function_task.output)
# Disabling cacheing for all the components
get_tags_function_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment