Commit 7e48b68c authored by dasharatha.vamshi's avatar dasharatha.vamshi

added 2nd component

parent 95c1d2b6
# Get final predicted tags
## Overview
- **Component Name** : Get final predicted tags
- **Component Description** :
- **Component Type** : Transform type
## Component Param
Variable Name |Datatype |Required/Optional |Default Value |Type |Description |Example
--- |--- |--- |--- |--- |--- |--- |
input |String |Required |None |inputPath | |
output |String |Required |None |outputPath | |
> Note 1 : Available Component types are: Input, Transform, Output.
> Note 2 : Available Environment types are: env, InputValues, InputPath, OutputValues, OutputPath, PipelineParm
\ No newline at end of file
import kfp
from loguru import logger
from src import program
import yaml
import inspect
import os
function = \
[func[1] for func in inspect.getmembers(program, inspect.isfunction) if inspect.getmodule(func[1]) == program][0]
def read_data_from_yaml(path):
"""
It opens the file at the given path, reads the contents, and then parses the contents as YAML
:param path: The path to the YAML file
:return: A dictionary
"""
with open(path, "r") as stream:
return yaml.load(stream, Loader=yaml.FullLoader)
def get_component_yml():
"""
:param file_name:
:return:
"""
try:
requirements = list()
with open('requirements.txt', 'r') as file:
for line in file:
if "=" in line and "#" not in line:
requirements.append(line.strip())
elif "#" in line:
...
else:
logger.exception(f"Mentioned package does not have version {line.strip()}")
date_function_yml = kfp.components.func_to_component_text(
function, packages_to_install=requirements)
variables_path = "variables.yml"
if os.path.exists(variables_path):
yaml_data: dict = read_data_from_yaml(variables_path)
if yaml_data:
envs: dict = yaml_data.get("deployment", {}).get("environmentVar", [])
python_version: str = yaml_data.get("deployment", {}).get("pythonVersion", None)
if python_version is not None and python_version in ["3.7", "3.8", "3.9", "3.10"]:
date_function_yml = kfp.components.func_to_component_text(
function, packages_to_install=requirements, base_image=f"python:{python_version}")
date_function = date_function_yml + f" env:\n"
for env_var in envs:
date_function += f" {env_var['name']}: '{env_var['value']}'\n"
with open('component.yml', 'w') as file:
file.write(date_function)
else:
with open('component.yml', 'w') as file:
file.write(date_function_yml)
else:
with open('component.yml', 'w') as file:
file.write(date_function_yml)
except Exception as e:
logger.exception(f"Unable to get the component yml {e}")
def create_table(data, key):
"""
:return:
"""
try:
rows_list = list()
for each_input in data.get(key, []):
rows_dict = dict()
rows_dict['name'] = each_input.get("name", '')
rows_dict['data_type'] = each_input.get('type', 'String')
if each_input.get('optional'):
req_opt = "Optional"
default_value = each_input.get('default', '')
else:
req_opt = "Required"
default_value = "None"
rows_dict['req_opt'] = req_opt
rows_dict['default_value'] = default_value
for each_arg in data.get('implementation', {}).get('container', {}).get('args', []):
if type(each_arg) == dict and rows_dict['name'] in each_arg.values():
rows_dict['Type'] = list(each_arg.keys())[0]
rows_dict['Description'] = each_input.get('description', '')
rows_dict['Example'] = ''
rows_list.append(list(rows_dict.values()))
if key == "inputs" and os.path.exists("variables.yml"):
yaml_data: dict = read_data_from_yaml("variables.yml")
if yaml_data:
env_var = yaml_data.get("deployment", {}).get("environmentVar", [])
for each in env_var:
env_dict = dict()
env_dict['name'] = each.get("name")
env_dict['data_type'] = "String"
env_dict['req_opt'] = "Required"
env_dict['default_value'] = "None"
env_dict['Type'] = "env"
env_dict['description'] = ""
env_dict['example'] = ""
rows_list.append(list(env_dict.values()))
return rows_list
except Exception as e:
logger.exception(f"Unable to create the table for README.MD file {e}")
def create_readme():
"""
Function is to create the readme file for the given components details
:return: Create the README.MD file in the given path
"""
try:
note_1 = "Note 1 : Available Component types are: Input, Transform, Output."
note_2 = "Note 2 : Available Environment types are: env, InputValues, InputPath, OutputValues," \
" OutputPath, PipelineParm"
column_list = ["Variable Name", "Datatype", "Required/Optional", "Default Value", "Type", "Description",
"Example"]
with open("component.yml", "r") as file:
data = yaml.safe_load(file)
if "inputs" in list(data.keys()) and "outputs" in list(data.keys()):
component_type = "Transform type"
elif "inputs" not in data:
component_type = "Input type"
else:
component_type = "Output type"
component_overview_json = dict()
component_overview_json['Component Name'] = data.get("name", " ")
component_overview_json['Component Description'] = data.get("description", " ")
component_overview_json['Component Type'] = component_type
rows_list_input = create_table(data, "inputs")
rows_list_output = create_table(data, "outputs")
rows_list = rows_list_input + rows_list_output
header = component_overview_json.get("Component Name")
table_header = " |".join(column_list) + "\n"
table_line = "--- |" * len(column_list) + "\n"
table_body = "\n".join(map(lambda x: " |".join(x), rows_list))
table = table_header + table_line + table_body
readme = f"""
# {header}
## {"Overview"}
- **Component Name** : {component_overview_json.get("Component Name")}
- **Component Description** : {component_overview_json.get("Component Description")}
- **Component Type** : {component_overview_json.get("Component Type")}
## Component Param
{table}
> {note_1}
> {note_2}
"""
with open('README.md', 'w') as f:
f.write(readme)
except Exception as e:
logger.exception(f"Unable to create the README.MD file {e}")
if __name__ == "__main__":
get_component_yml()
create_readme()
name: Get final predicted tags
inputs:
- {name: input}
outputs:
- {name: output}
implementation:
container:
image: python:3.9
command:
- sh
- -c
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
'loguru==0.5.3' 'pandas==1.3.*' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m
pip install --quiet --no-warn-script-location 'loguru==0.5.3' 'pandas==1.3.*'
--user) && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def _make_parent_dirs_and_return_path(file_path: str):
import os
os.makedirs(os.path.dirname(file_path), exist_ok=True)
return file_path
def get_final_predicted_tags(input_path, output_path):
class CommonConstants:
dalmia_string_level_tags = 'dalmia_string_level_tags'
bgrimm_string_level_tags = 'bgrimm_string_level_tags'
panel_id = 'panel_id'
sub_id = 'sub_id'
inv_id_mppt_id = 'inv_id_mppt_id'
tags_property_raw = 'raw'
tags_property_predicted = 'predicted'
tags_property_efficiency = 'efficiency'
bgrim_tags_property_efficiency = 'Efficiency'
tags_property_efficiency_inv = 'efficiency'
tags_property_efficiency_plant = 'efficiency_plant'
mppt_coefficients = 'mppt_coefficients'
inv_id = 'inv_id'
mppt_id = 'mppt_id'
datetime = 'datetime'
predicted_current_mppt = 'predicted_current_mppt'
predicted_current_mppt_tag = 'predicted_current_mppt_tag'
actual_current_mppt = 'actual_current_mppt'
hour = 'hour'
skip_time = {"morning": {"start": 0, "end": 6},
"evening": {"start": 18, "end": 23}}
efficiency_mppt = 'efficiency_mppt'
efficiency_inv = 'efficiency_inv'
efficiency_plant = 'efficiency_plant'
tag_name = 'tag_name'
parameter_name = 'parameter_name'
timestamp = 'timestamp'
tag_id = 'tag_id'
efficiency_mppt_tag = 'efficiency_mppt_tag'
voltage = 'voltage'
current = 'current'
Potential = 'Potential'
Degradation = 'Degradation'
tilt_irradiance = 'tilt_irradiance'
voltage_mppt = 'voltage_mppt'
current_mppt = 'current_mppt'
date = 'date'
asia_kolkata_timezone = 'Asia/Kolkata'
asia_bangkok_timezone = 'Asia/Bangkok'
coefficient = 'coefficient'
cumulative_actual_current_mppt = 'cumulative_actual_current_mppt'
cumulative_predicted_current_mppt = 'cumulative_predicted_current_mppt'
day = "day"
time = "time"
import pandas as pd
from loguru import logger
import json
try:
with open(input_path, 'r') as f:
get_tags_component_output_dict = json.load(f)
print(get_tags_component_output_dict)
df_predicted_tags = get_tags_component_output_dict.get("df_predicted_tags")
df_efficiency_tags = get_tags_component_output_dict.get("df_efficiency_tags")
df_predicted_tags = pd.DataFrame(df_predicted_tags)
df_efficiency_tags = pd.DataFrame(df_efficiency_tags)
df_predicted_tags = df_predicted_tags.copy()
df_efficiency_tags_copy = df_efficiency_tags.copy()
df_predicted_tags.rename(columns={CommonConstants.tag_id: CommonConstants.predicted_current_mppt_tag},
inplace=True)
df_efficiency_tags_copy.rename(columns={CommonConstants.tag_id: CommonConstants.efficiency_mppt_tag},
inplace=True)
df_predicted_tags[CommonConstants.inv_id_mppt_id] = df_predicted_tags[CommonConstants.sub_id] + '_' + \
df_predicted_tags[CommonConstants.inv_id] + '_' + \
df_predicted_tags[CommonConstants.mppt_id]
df_efficiency_tags_copy[CommonConstants.inv_id_mppt_id] = df_efficiency_tags_copy[CommonConstants.sub_id] \
+ '_' + \
df_efficiency_tags_copy[CommonConstants.inv_id] \
+ '_' + \
df_efficiency_tags_copy[CommonConstants.mppt_id]
df_predicted_and_efficiency_tags = pd.merge(df_predicted_tags,
df_efficiency_tags_copy[[CommonConstants.efficiency_mppt_tag,
CommonConstants.inv_id_mppt_id]],
on=CommonConstants.inv_id_mppt_id,
how='left')
df_predicted_and_efficiency_tags.reset_index(drop=True, inplace=True)
final_dict = {"df_predicted_and_efficiency_tags": df_predicted_and_efficiency_tags.to_dict(orient="records")}
with open(output_path, 'w') as f:
json.dump(final_dict, f)
print(final_dict)
except Exception as e:
logger.exception(f'Exception - {e}')
import argparse
_parser = argparse.ArgumentParser(prog='Get final predicted tags', description='')
_parser.add_argument("--input", dest="input_path", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--output", dest="output_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_outputs = get_final_predicted_tags(**_parsed_args)
args:
- --input
- {inputPath: input}
- --output
- {outputPath: output}
env:
loguru==0.5.3
pandas==1.3.*
\ No newline at end of file
from kfp.components import InputPath, OutputPath
def get_final_predicted_tags(input_path: InputPath(), output_path: OutputPath()):
class CommonConstants:
dalmia_string_level_tags = 'dalmia_string_level_tags'
bgrimm_string_level_tags = 'bgrimm_string_level_tags'
panel_id = 'panel_id'
sub_id = 'sub_id'
inv_id_mppt_id = 'inv_id_mppt_id'
tags_property_raw = 'raw'
tags_property_predicted = 'predicted'
tags_property_efficiency = 'efficiency'
bgrim_tags_property_efficiency = 'Efficiency'
tags_property_efficiency_inv = 'efficiency'
tags_property_efficiency_plant = 'efficiency_plant'
mppt_coefficients = 'mppt_coefficients'
inv_id = 'inv_id'
mppt_id = 'mppt_id'
datetime = 'datetime'
predicted_current_mppt = 'predicted_current_mppt'
predicted_current_mppt_tag = 'predicted_current_mppt_tag'
actual_current_mppt = 'actual_current_mppt'
hour = 'hour'
skip_time = {"morning": {"start": 0, "end": 6},
"evening": {"start": 18, "end": 23}}
efficiency_mppt = 'efficiency_mppt'
efficiency_inv = 'efficiency_inv'
efficiency_plant = 'efficiency_plant'
tag_name = 'tag_name'
parameter_name = 'parameter_name'
timestamp = 'timestamp'
tag_id = 'tag_id'
efficiency_mppt_tag = 'efficiency_mppt_tag'
voltage = 'voltage'
current = 'current'
Potential = 'Potential'
Degradation = 'Degradation'
tilt_irradiance = 'tilt_irradiance'
voltage_mppt = 'voltage_mppt'
current_mppt = 'current_mppt'
date = 'date'
asia_kolkata_timezone = 'Asia/Kolkata'
asia_bangkok_timezone = 'Asia/Bangkok'
coefficient = 'coefficient'
cumulative_actual_current_mppt = 'cumulative_actual_current_mppt'
cumulative_predicted_current_mppt = 'cumulative_predicted_current_mppt'
day = "day"
time = "time"
import pandas as pd
from loguru import logger
import json
try:
with open(input_path, 'r') as f:
get_tags_component_output_dict = json.load(f)
print(get_tags_component_output_dict)
df_predicted_tags = get_tags_component_output_dict.get("df_predicted_tags")
df_efficiency_tags = get_tags_component_output_dict.get("df_efficiency_tags")
df_predicted_tags = pd.DataFrame(df_predicted_tags)
df_efficiency_tags = pd.DataFrame(df_efficiency_tags)
df_predicted_tags = df_predicted_tags.copy()
df_efficiency_tags_copy = df_efficiency_tags.copy()
df_predicted_tags.rename(columns={CommonConstants.tag_id: CommonConstants.predicted_current_mppt_tag},
inplace=True)
df_efficiency_tags_copy.rename(columns={CommonConstants.tag_id: CommonConstants.efficiency_mppt_tag},
inplace=True)
df_predicted_tags[CommonConstants.inv_id_mppt_id] = df_predicted_tags[CommonConstants.sub_id] + '_' + \
df_predicted_tags[CommonConstants.inv_id] + '_' + \
df_predicted_tags[CommonConstants.mppt_id]
df_efficiency_tags_copy[CommonConstants.inv_id_mppt_id] = df_efficiency_tags_copy[CommonConstants.sub_id] \
+ '_' + \
df_efficiency_tags_copy[CommonConstants.inv_id] \
+ '_' + \
df_efficiency_tags_copy[CommonConstants.mppt_id]
df_predicted_and_efficiency_tags = pd.merge(df_predicted_tags,
df_efficiency_tags_copy[[CommonConstants.efficiency_mppt_tag,
CommonConstants.inv_id_mppt_id]],
on=CommonConstants.inv_id_mppt_id,
how='left')
df_predicted_and_efficiency_tags.reset_index(drop=True, inplace=True)
final_dict = {"df_predicted_and_efficiency_tags": df_predicted_and_efficiency_tags.to_dict(orient="records")}
with open(output_path, 'w') as f:
json.dump(final_dict, f)
print(final_dict)
except Exception as e:
logger.exception(f'Exception - {e}')
deployment:
pythonVersion: "3.9"
\ No newline at end of file
......@@ -16,7 +16,7 @@ pipeline_param |JsonObject |Required |None |inputValue | |
MONGO_DB |String |Required |None |env | |
MONGO_COLLECTION |String |Required |None |env | |
CITY |String |Required |None |env | |
Output |JsonObject |Required |None |outputPath | |
output_text |String |Required |None |outputPath | |
> Note 1 : Available Component types are: Input, Transform, Output.
......
......@@ -2,7 +2,7 @@ name: Get tags function
inputs:
- {name: pipeline_param, type: JsonObject}
outputs:
- {name: Output, type: JsonObject}
- {name: output_text, type: String}
implementation:
container:
image: python:3.9
......@@ -20,7 +20,12 @@ implementation:
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def get_tags_function(pipeline_param):
def _make_parent_dirs_and_return_path(file_path: str):
import os
os.makedirs(os.path.dirname(file_path), exist_ok=True)
return file_path
def get_tags_function(pipeline_param, output_text_path):
import pandas as pd
from loguru import logger
import warnings
......@@ -144,59 +149,29 @@ implementation:
df_coefficients = pd.DataFrame()
tracemalloc.clear_traces()
del mongo_conn
final_dict = {"raw": df_raw_tags.to_dict('records'), "predicted": df_predicted_tags.to_dict('records'),
"coefficients": df_coefficients.to_dict('records'),
"efficiency": df_efficiency_tags.to_dict('records')}
final_dict = {"df_raw_tags": df_raw_tags.to_dict('records'),
"df_predicted_tags": df_predicted_tags.to_dict('records'),
"df_coefficients": df_coefficients.to_dict('records'),
"df_efficiency_tags": df_efficiency_tags.to_dict('records')}
print(final_dict)
return final_dict
with open(output_text_path, 'w') as f:
json.dump(final_dict, f)
except Exception as e:
logger.exception(f'Exception - {e}')
def _serialize_json(obj) -> str:
if isinstance(obj, str):
return obj
import json
def default_serializer(obj):
if hasattr(obj, 'to_struct'):
return obj.to_struct()
else:
raise TypeError(
"Object of type '%s' is not JSON serializable and does not have .to_struct() method."
% obj.__class__.__name__)
return json.dumps(obj, default=default_serializer, sort_keys=True)
import json
import argparse
_parser = argparse.ArgumentParser(prog='Get tags function', description='')
_parser.add_argument("--pipeline-param", dest="pipeline_param", type=json.loads, required=True, default=argparse.SUPPRESS)
_parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1)
_parser.add_argument("--output-text", dest="output_text_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_output_files = _parsed_args.pop("_output_paths", [])
_outputs = get_tags_function(**_parsed_args)
_outputs = [_outputs]
_output_serializers = [
_serialize_json,
]
import os
for idx, output_file in enumerate(_output_files):
try:
os.makedirs(os.path.dirname(output_file))
except OSError:
pass
with open(output_file, 'w') as f:
f.write(_output_serializers[idx](_outputs[idx]))
args:
- --pipeline-param
- {inputValue: pipeline_param}
- '----output-paths'
- {outputPath: Output}
- --output-text
- {outputPath: output_text}
env:
MONGO_DB: 'ilens_ai'
MONGO_COLLECTION: 'bgrimmStringTags'
......
def get_tags_function(pipeline_param: dict) -> dict:
import json
from kfp.components import OutputPath
def get_tags_function(pipeline_param: dict, output_text_path: OutputPath(str)):
import pandas as pd
from loguru import logger
import warnings
......@@ -122,10 +127,12 @@ def get_tags_function(pipeline_param: dict) -> dict:
df_coefficients = pd.DataFrame()
tracemalloc.clear_traces()
del mongo_conn
final_dict = {"raw": df_raw_tags.to_dict('records'), "predicted": df_predicted_tags.to_dict('records'),
"coefficients": df_coefficients.to_dict('records'),
"efficiency": df_efficiency_tags.to_dict('records')}
final_dict = {"df_raw_tags": df_raw_tags.to_dict('records'),
"df_predicted_tags": df_predicted_tags.to_dict('records'),
"df_coefficients": df_coefficients.to_dict('records'),
"df_efficiency_tags": df_efficiency_tags.to_dict('records')}
print(final_dict)
return final_dict
with open(output_text_path, 'w') as f:
json.dump(final_dict, f)
except Exception as e:
logger.exception(f'Exception - {e}')
......@@ -17,38 +17,17 @@ def forecast_pipeline(pipeline_param: dict, plant_info: dict):
# Loading the component from the above yaml file
get_tags_function_component = kfp.components.load_component_from_file(
"input_components/get_tags_component/component.yml")
get_final_predicted_tags = kfp.components.load_component_from_file(
"input_components/get_final_predicted_tags/component.yml")
# Calling the component
get_tags_function_task = get_tags_function_component(pipeline_param).set_memory_request('600M').set_memory_limit('1200M').\
set_cpu_request('700m').set_cpu_limit('1400m')
get_final_predicted_tags_task = get_final_predicted_tags(get_tags_function_task.output)
# get_tags_function_task.add_volume(k8s_client.V1Volume(
# name="get-data-volume",
# secret=k8s_client.V1SecretVolumeSource(secret_name="MONGO_URI"))
# )
#
# get_tags_function_task.add_env_variable(
# V1EnvVar(
# name="MONGO_URI",
# value_from=k8s_client.V1EnvVarSource(secret_key_ref=k8s_client.V1SecretKeySelector(
# name="mongo-uri",
# key="MONGO_URI"
# )
# )
# )
# )
# Disabling cacheing for all the components
get_tags_function_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
# read_from_kairos_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
# data_preprocess_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
# get_forecast_data_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
# write_data_processor_task_1.execution_options.caching_strategy.max_cache_staleness = "P0D"
# get_data_recaster_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
# write_data_processor_task_2.execution_options.caching_strategy.max_cache_staleness = "P0D"
# get_rolling_avg_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
# write_data_processor_task_3.execution_options.caching_strategy.max_cache_staleness = "P0D"
except Exception as e:
logger.exception(f"Unable to Perform the execution {e}")
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment