Commit 48ab7364 authored by dasharatha.vamshi's avatar dasharatha.vamshi

added tags component

parent 6e4b5fbc
stages:
- kubeflow-pipeline
variables:
KUBEFLOW_SCRIPT: /home/gitlab-runner/monitor/kubeflow-pipeline/$CI_PROJECT_NAME/trigger.sh
kubeflow-pipeline-deployment:
stage: kubeflow-pipeline
script:
- sh $KUBEFLOW_SCRIPT
only:
- pipeline_with_diff_conf
tags:
- shell
\ No newline at end of file
# Default ignored files
/shelf/
/workspace.xml
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Python 3.9 (degradation)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="SonarLintModuleSettings">
<option name="uniqueId" value="634b4c16-9fe9-4041-91b5-6f2b44688507" />
</component>
</module>
\ No newline at end of file
This diff is collapsed.
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyPackageRequirementsInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredPackages">
<value>
<list size="32">
<item index="0" class="java.lang.String" itemvalue="pandas" />
<item index="1" class="java.lang.String" itemvalue="scipy" />
<item index="2" class="java.lang.String" itemvalue="pydantic" />
<item index="3" class="java.lang.String" itemvalue="pytz" />
<item index="4" class="java.lang.String" itemvalue="simplejson" />
<item index="5" class="java.lang.String" itemvalue="scikit-learn" />
<item index="6" class="java.lang.String" itemvalue="configparser" />
<item index="7" class="java.lang.String" itemvalue="cytoolz" />
<item index="8" class="java.lang.String" itemvalue="lz4" />
<item index="9" class="java.lang.String" itemvalue="matplotlib" />
<item index="10" class="java.lang.String" itemvalue="cffi" />
<item index="11" class="java.lang.String" itemvalue="cloudpickle" />
<item index="12" class="java.lang.String" itemvalue="distributed" />
<item index="13" class="java.lang.String" itemvalue="graphviz" />
<item index="14" class="java.lang.String" itemvalue="typing-extensions" />
<item index="15" class="java.lang.String" itemvalue="lightgbm" />
<item index="16" class="java.lang.String" itemvalue="pymongo" />
<item index="17" class="java.lang.String" itemvalue="PyYAML" />
<item index="18" class="java.lang.String" itemvalue="SQLAlchemy" />
<item index="19" class="java.lang.String" itemvalue="sklearn" />
<item index="20" class="java.lang.String" itemvalue="python-dotenv" />
<item index="21" class="java.lang.String" itemvalue="psycopg2" />
<item index="22" class="java.lang.String" itemvalue="numpy" />
<item index="23" class="java.lang.String" itemvalue="requests" />
<item index="24" class="java.lang.String" itemvalue="loguru" />
<item index="25" class="java.lang.String" itemvalue="kafka-python" />
<item index="26" class="java.lang.String" itemvalue="mlflow" />
<item index="27" class="java.lang.String" itemvalue="sqlparse" />
<item index="28" class="java.lang.String" itemvalue="azure-storage-blob" />
<item index="29" class="java.lang.String" itemvalue="catboost" />
<item index="30" class="java.lang.String" itemvalue="pycaret" />
<item index="31" class="java.lang.String" itemvalue="optuna" />
</list>
</value>
</option>
</inspection_tool>
<inspection_tool class="PyUnresolvedReferencesInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredIdentifiers">
<list>
<option value="scripts.core.data.data_import.KairosQueryBuilder.input_tag" />
<option value="scripts.core.data.data_import.KairosQueryBuilder.metric_name" />
<option value="scripts.core.data.data_import.KairosQueryBuilder.aggregation_name" />
<option value="scripts.core.data.data_import.DataPuller.postgres_engine" />
<option value="scripts.core.data.data_import.end" />
</list>
</option>
</inspection_tool>
</profile>
</component>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.9 (degradation)" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/bgrimm-string-train.iml" filepath="$PROJECT_DIR$/.idea/bgrimm-string-train.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
{
"plant_details": [
{
"run_name": "Forecast B.Grimm",
"pipeline_name": "Forecast B.grimm",
"plant_info": {
"plant_id": "bgrimm_plant",
"city": "bgrimm",
"input": {
"site_101$dept_102$line_105$equipment_220$tag_480": "module_temperature",
"site_101$dept_102$line_105$equipment_220$tag_478": "tilt_irradiance",
"site_101$dept_102$line_105$equipment_217$tag_435": "today_export"
},
"output": {
"site_101$dept_102$line_105$equipment_217$tag_514": "forecasted_module_temp",
"site_101$dept_102$line_105$equipment_217$tag_512": "forecasted_irradiance",
"site_101$dept_102$line_105$equipment_217$tag_511": "forecasted_energy",
"site_101$dept_102$line_105$equipment_217$tag_513": "total_forecasted_energy"
},
"previous_day_tags": {
"site_101$dept_102$line_105$equipment_217$tag_515": "previous_day_energy_forecasting"
},
"moving_avg": {
"site_101$dept_102$line_105$equipment_217$tag_516": "rolling"
}
}
}
]
}
\ No newline at end of file
{"plant_details" : [{"run_name": "Forecast Kadapa", "pipeline_name": "Forecast Kadapa", "plant_info": {"plant_id": "kadapa_plant", "city": "kadapa", "input": {"site_101$dept_139$line_370$equipment_3982$tag_15830": "module_temperature", "site_101$dept_139$line_370$equipment_3982$tag_15828": "tilt_irradiance", "site_101$dept_139$line_370$equipment_3982$tag_15853": "today_export"}, "output": {"site_101$dept_139$line_370$equipment_3982$tag_15884": "forecasted_module_temp", "site_101$dept_139$line_370$equipment_3982$tag_15883": "forecasted_irradiance", "site_101$dept_139$line_370$equipment_3982$tag_15885": "forecasted_energy", "site_101$dept_139$line_370$equipment_3982$tag_15891": "total_forecasted_energy"}, "previous_day_tags": {"site_101$dept_139$line_370$equipment_3982$tag_15906": "previous_day_energy_forecasting"}, "moving_avg": {"site_101$dept_139$line_370$equipment_3982$tag_16227": "rolling"}}}, {"run_name": "Forecast bcw", "pipeline_name": "Forecast bcw", "plant_info": {"plant_id": "bcw_plant", "city": "bcw", "input": {"site_111$dept_145$line_442$equipment_5886$tag_15830": "module_temperature", "site_111$dept_145$line_442$equipment_5886$tag_15828": "tilt_irradiance", "site_111$dept_145$line_442$equipment_5886$tag_15853": "today_export"}, "output": {"site_111$dept_145$line_442$equipment_5886$tag_15883": "forecasted_irradiance", "site_111$dept_145$line_442$equipment_5886$tag_15884": "forecasted_module_temp", "site_111$dept_145$line_442$equipment_5886$tag_15885": "forecasted_energy", "site_111$dept_145$line_442$equipment_5886$tag_15891": "total_forecasted_energy"}, "previous_day_tags": {"site_111$dept_145$line_442$equipment_5886$tag_15906": "previous_day_energy_forecasting"}, "moving_avg": {"site_111$dept_145$line_442$equipment_5886$tag_16227": "rolling"}}}, {"run_name": "Forecast lanka", "pipeline_name": "Forecast lanka", "plant_info": {"plant_id": "lanka_plant", "city": "lanka", "input": {"site_104$dept_144$line_430$equipment_5866$tag_15830": "module_temperature", "site_104$dept_144$line_430$equipment_5866$tag_15828": "tilt_irradiance", "site_104$dept_144$line_430$equipment_5866$tag_15853": "today_export"}, "output": {"site_104$dept_144$line_430$equipment_5866$tag_15883": "forecasted_irradiance", "site_104$dept_144$line_430$equipment_5866$tag_15884": "forecasted_module_temp", "site_104$dept_144$line_430$equipment_5866$tag_15885": "forecasted_energy", "site_104$dept_144$line_430$equipment_5866$tag_15891": "total_forecasted_energy"}, "previous_day_tags": {"site_104$dept_144$line_430$equipment_5866$tag_15906": "previous_day_energy_forecasting"}, "moving_avg": {"site_104$dept_144$line_430$equipment_5866$tag_16227": "rolling"}}}, {"run_name": "Forecast ariyalur", "pipeline_name": "Forecast ariyalur", "plant_info": {"plant_id": "ariyalur_plant", "city": "ariyalur", "input": {"site_107$dept_140$line_371$equipment_4115$tag_15830": "module_temperature", "site_107$dept_140$line_371$equipment_4115$tag_15828": "tilt_irradiance", "site_107$dept_140$line_371$equipment_4115$tag_15853": "today_export"}, "output": {"site_107$dept_140$line_371$equipment_4115$tag_15884": "forecasted_module_temp", "site_107$dept_140$line_371$equipment_4115$tag_15883": "forecasted_irradiance", "site_107$dept_140$line_371$equipment_4115$tag_15885": "forecasted_energy", "site_107$dept_140$line_371$equipment_4115$tag_15891": "total_forecasted_energy"}, "previous_day_tags": {"site_107$dept_140$line_371$equipment_4115$tag_15906": "previous_day_energy_forecasting"}, "moving_avg": {"site_107$dept_140$line_371$equipment_4115$tag_16227": "rolling"}}}, {"run_name": "Forecast dalmiapuram", "pipeline_name": "Forecast dalmiapuram", "plant_info": {"plant_id": "dalmiapuram_plant", "city": "dalmiapuram", "input": {"site_103$dept_142$line_417$equipment_5635$tag_15830": "module_temperature", "site_103$dept_142$line_417$equipment_5635$tag_16155": "tilt_irradiance", "site_103$dept_142$line_417$equipment_5635$tag_15853": "today_export"}, "output": {"site_103$dept_142$line_417$equipment_5635$tag_15883": "forecasted_irradiance", "site_103$dept_142$line_417$equipment_5635$tag_15884": "forecasted_module_temp", "site_103$dept_142$line_417$equipment_5635$tag_15885": "forecasted_energy", "site_103$dept_142$line_417$equipment_5635$tag_15891": "total_forecasted_energy"}, "previous_day_tags": {"site_103$dept_142$line_417$equipment_5635$tag_15906": "previous_day_energy_forecasting"}, "moving_avg": {"site_103$dept_142$line_417$equipment_5635$tag_16227": "rolling"}}}]}
import yaml
import kfp
import logging
import os
import json
from dotenv import load_dotenv
load_dotenv('pipeline-conf.env')
logging.basicConfig(level=logging.INFO, format='%(asctime)s :: %(levelname)s :: %(message)s')
deployment_yaml = os.getenv("DEPLOYMENT_YAML")
kubeflow_uri = os.getenv("KUBEFLOW_URI")
login_token = os.getenv("LOGIN_TOKEN")
pipeline_version = os.getenv("PIPELINE_VERSION")
experiment_name = os.getenv("EXPERIMENT_NAME")
cron_expression = os.getenv("CRON_EXPRESSION")
conf_path = os.getenv("CONF_LIST", '')
variables = os.getenv('VARIABLES', 'false')
is_recurring_run = os.environ.get("RECURRING_RUN", 'false')
client = kfp.Client(host=kubeflow_uri, cookies=f'login-token={login_token}', namespace='project-099')
def pipeline_config():
"""
Function used to configure the Pipeline
:return: Uploads and Triggers the run in the kubeflow
"""
try:
run_name = os.getenv('RUN_NAME', '')
pipeline_name = os.getenv('PIPELINE_NAME', '')
if conf_path:
with open(conf_path, 'r') as f:
# Load the JSON data from the file
data = json.load(f)
config_list = data.get("plant_details")
logging.info("Loading the Plant JSON Data")
else:
config_list = [{'pipeline_name': pipeline_name, 'run_name': run_name}]
for each_config in config_list:
pipeline_name = each_config.get('pipeline_name', 'default_pipeline')
run_name = each_config.get('run_name', 'default_run')
pipeline_version_name = f"{pipeline_name}-{pipeline_version}"
pipeline_id = client.get_pipeline_id(pipeline_name)
# Pipeline name
pipeline_id = upload_pipeline(
pipeline_id=pipeline_id, pipeline_name=pipeline_name, pipeline_version_name=pipeline_version_name)
experiment = client.create_experiment(experiment_name)
logging.info(f"Creating Experiment with the name {experiment_name}")
get_recurring_runs = client.list_recurring_runs(experiment_id=experiment.id, page_size=100)
runs_list = list()
if get_recurring_runs.jobs:
runs_list = get_recurring_runs.jobs
for each_run in runs_list:
if each_run.name == run_name:
client.disable_job(each_run.id)
# Open the YAML file and load its contents into a Python dictionary
if variables == "true" and os.path.exists('variables.yml'):
final_json = add_pipeline_param()
else:
final_json = dict()
plant_info = each_config.get('plant_info', {})
logging.info(f"Getting plant_info {plant_info}")
if plant_info:
final_json["plant_info"] = plant_info
logging.info("plant_info is existing")
trigger_run(
experiment_id=experiment.id, run_name=run_name, final_json=final_json, pipeline_id=pipeline_id)
except Exception as e:
logging.exception(f"Unable to Trigger the Kubeflow env {e}")
def resource_allocation():
"""
This Function is used to allocate the resources for each component in pipeline
:return:
"""
try:
memory_limit = os.getenv("MEMORY_LIMIT", '0M')
cpu_limit = os.getenv("CPU_LIMIT", '0m')
memory_request = os.getenv("MEMORY_REQUEST", '0M')
cpu_request = os.getenv('CPU_REQUEST', '0m')
with open(deployment_yaml, 'r') as f:
data = yaml.safe_load(f)
components_list = data.get('spec', {}).get('templates', [])
for each_comp in components_list:
if 'container' in each_comp:
each_comp['container']['resources'] = {"limits": {"memory": memory_limit, "cpu": cpu_limit},
"requests": {"memory": memory_request, "cpu": cpu_request}}
data['spec']['templates'] = components_list
with open(deployment_yaml, 'w') as file:
yaml.dump(data, file)
except Exception as e:
logging.exception(f"Unable to allocate resources {e}")
def add_pipeline_param():
"""
This Function take the variables.yml and pass them as pipeline param for pipeline execution
:return:
"""
try:
logging.info("variables.yml file is existing")
with open('variables.yml', 'r') as yaml_file:
yaml_data = yaml.load(yaml_file, Loader=yaml.FullLoader)
pipeline_param = dict()
for each in yaml_data.get("deployment", []).get("environmentVar", []):
if 'valueFrom' not in list(each.keys()):
pipeline_param[each.get('name')] = each.get('value')
final_json = dict()
final_json["pipeline_param"] = pipeline_param
logging.info("Adding pipeline parameters")
return final_json
except Exception as e:
logging.exception(f"Unable to add the pipeline param {e}")
def upload_pipeline(pipeline_id, pipeline_version_name, pipeline_name):
"""
Function is to used to upload the pipeline in kubeflow
:param pipeline_id: Pipeline id
:param pipeline_version_name: Pipeline version name
:param pipeline_name: Pipeline Name
:return: Returns Uploaded pipeline ID
"""
try:
if pipeline_id:
pipeline_list = client.list_pipeline_versions(pipeline_id=pipeline_id, page_size=100)
if pipeline_list.versions:
for each_version in pipeline_list.versions:
if each_version.name == pipeline_version_name:
client.delete_pipeline_version(each_version.id)
client.upload_pipeline_version(
pipeline_package_path=deployment_yaml, pipeline_version_name=pipeline_version_name,
pipeline_name=pipeline_name)
logging.info(
f"Uploaded Pipeline version with pipeline name {pipeline_name} and pipeline version {pipeline_version_name}")
else:
pipeline = client.upload_pipeline(deployment_yaml, pipeline_name=pipeline_name)
pipeline_id = pipeline.id
logging.info(f"Uploaded Pipeline version with pipeline name {pipeline_name}")
return pipeline_id
except Exception as e:
logging.exception(f"Unable to upload the pipeline {e}")
def trigger_run(experiment_id, run_name, final_json, pipeline_id):
"""
Function is used to trigger the Run
:param experiment_id: Experiment Id
:param run_name: Run name
:param final_json: json where the pipeline params are present
:param pipeline_id: Pipeline Id
:return: Creates and triggers the Run
"""
try:
if is_recurring_run == "true":
logging.info("Recurring run")
logging.info("Starting to create an recurring run")
client.create_recurring_run(
experiment_id=experiment_id, job_name=run_name, pipeline_id=pipeline_id,
cron_expression=cron_expression, params=final_json)
logging.info(f"Successfully Triggered the Recurring Run with run name as {run_name}")
else:
client.run_pipeline(experiment_id, run_name, pipeline_id=pipeline_id, params=final_json)
logging.info(f"Successfully Triggered Run with run name as {run_name}")
except Exception as e:
logging.info(f"Unable to trigger the Run {e}")
if __name__ == "__main__":
is_allocation = os.getenv('IS_ALLOCATION', 'false')
if is_allocation == "true":
resource_allocation()
pipeline_config()
# Get tags function
## Overview
- **Component Name** : Get tags function
- **Component Description** :
- **Component Type** : Transform type
## Component Param
Variable Name |Datatype |Required/Optional |Default Value |Type |Description |Example
--- |--- |--- |--- |--- |--- |--- |
pipeline_param |JsonObject |Required |None |inputValue | |
MONGO_DB |String |Required |None |env | |
MONGO_COLLECTION |String |Required |None |env | |
CITY |String |Required |None |env | |
Output |JsonObject |Required |None |outputPath | |
> Note 1 : Available Component types are: Input, Transform, Output.
> Note 2 : Available Environment types are: env, InputValues, InputPath, OutputValues, OutputPath, PipelineParm
\ No newline at end of file
import kfp
from loguru import logger
from src import program
import yaml
import inspect
import os
function = \
[func[1] for func in inspect.getmembers(program, inspect.isfunction) if inspect.getmodule(func[1]) == program][0]
def read_data_from_yaml(path):
"""
It opens the file at the given path, reads the contents, and then parses the contents as YAML
:param path: The path to the YAML file
:return: A dictionary
"""
with open(path, "r") as stream:
return yaml.load(stream, Loader=yaml.FullLoader)
def get_component_yml():
"""
:param file_name:
:return:
"""
try:
requirements = list()
with open('requirements.txt', 'r') as file:
for line in file:
if "=" in line and "#" not in line:
requirements.append(line.strip())
elif "#" in line:
...
else:
logger.exception(f"Mentioned package does not have version {line.strip()}")
date_function_yml = kfp.components.func_to_component_text(
function, packages_to_install=requirements)
variables_path = "variables.yml"
if os.path.exists(variables_path):
yaml_data: dict = read_data_from_yaml(variables_path)
if yaml_data:
envs: dict = yaml_data.get("deployment", {}).get("environmentVar", [])
date_function = date_function_yml + f" env:\n"
for env_var in envs:
date_function += f" {env_var['name']}: '{env_var['value']}'\n"
with open('component.yml', 'w') as file:
file.write(date_function)
else:
with open('component.yml', 'w') as file:
file.write(date_function_yml)
else:
with open('component.yml', 'w') as file:
file.write(date_function_yml)
except Exception as e:
logger.exception(f"Unable to get the component yml {e}")
def create_table(data, key):
"""
:return:
"""
try:
rows_list = list()
for each_input in data.get(key, []):
rows_dict = dict()
rows_dict['name'] = each_input.get("name", '')
rows_dict['data_type'] = each_input.get('type', 'String')
if each_input.get('optional'):
req_opt = "Optional"
default_value = each_input.get('default', '')
else:
req_opt = "Required"
default_value = "None"
rows_dict['req_opt'] = req_opt
rows_dict['default_value'] = default_value
for each_arg in data.get('implementation', {}).get('container', {}).get('args', []):
if type(each_arg) == dict and rows_dict['name'] in each_arg.values():
rows_dict['Type'] = list(each_arg.keys())[0]
rows_dict['Description'] = each_input.get('description', '')
rows_dict['Example'] = ''
rows_list.append(list(rows_dict.values()))
if key == "inputs" and os.path.exists("variables.yml"):
yaml_data: dict = read_data_from_yaml("variables.yml")
if yaml_data:
env_var = yaml_data.get("deployment", {}).get("environmentVar", [])
for each in env_var:
env_dict = dict()
env_dict['name'] = each.get("name")
env_dict['data_type'] = "String"
env_dict['req_opt'] = "Required"
env_dict['default_value'] = "None"
env_dict['Type'] = "env"
env_dict['description'] = ""
env_dict['example'] = ""
rows_list.append(list(env_dict.values()))
return rows_list
except Exception as e:
logger.exception(f"Unable to create the table for README.MD file {e}")
def create_readme():
"""
Function is to create the readme file for the given components details
:return: Create the README.MD file in the given path
"""
try:
note_1 = "Note 1 : Available Component types are: Input, Transform, Output."
note_2 = "Note 2 : Available Environment types are: env, InputValues, InputPath, OutputValues," \
" OutputPath, PipelineParm"
column_list = ["Variable Name", "Datatype", "Required/Optional", "Default Value", "Type", "Description",
"Example"]
with open("component.yml", "r") as file:
data = yaml.safe_load(file)
if "inputs" in list(data.keys()) and "outputs" in list(data.keys()):
component_type = "Transform type"
elif "inputs" not in data:
component_type = "Input type"
else:
component_type = "Output type"
component_overview_json = dict()
component_overview_json['Component Name'] = data.get("name", " ")
component_overview_json['Component Description'] = data.get("description", " ")
component_overview_json['Component Type'] = component_type
rows_list_input = create_table(data, "inputs")
rows_list_output = create_table(data, "outputs")
rows_list = rows_list_input + rows_list_output
header = component_overview_json.get("Component Name")
table_header = " |".join(column_list) + "\n"
table_line = "--- |" * len(column_list) + "\n"
table_body = "\n".join(map(lambda x: " |".join(x), rows_list))
table = table_header + table_line + table_body
readme = f"""
# {header}
## {"Overview"}
- **Component Name** : {component_overview_json.get("Component Name")}
- **Component Description** : {component_overview_json.get("Component Description")}
- **Component Type** : {component_overview_json.get("Component Type")}
## Component Param
{table}
> {note_1}
> {note_2}
"""
with open('README.md', 'w') as f:
f.write(readme)
except Exception as e:
logger.exception(f"Unable to create the README.MD file {e}")
if __name__ == "__main__":
get_component_yml()
create_readme()
name: Get tags function
inputs:
- {name: pipeline_param, type: JsonObject}
outputs:
- {name: Output, type: JsonObject}
implementation:
container:
image: python:3.7
command:
- sh
- -c
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
'loguru==0.5.3' 'pytz==2021.3' 'pymongo~=3.12.1' 'pandas==1.3.*' || PIP_DISABLE_PIP_VERSION_CHECK=1
python3 -m pip install --quiet --no-warn-script-location 'loguru==0.5.3' 'pytz==2021.3'
'pymongo~=3.12.1' 'pandas==1.3.*' --user) && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def get_tags_function(pipeline_param):
import pandas as pd
from loguru import logger
import warnings
import tracemalloc
import os
from pymongo import MongoClient
city = os.getenv("CITY")
db_ = os.getenv("MONGO_DB")
print(pipeline_param)
print("--",pipeline_param["MONGO_URI"])
# collections
collection_ = os.getenv("MONGO_COLLECTION")
mongo_uri_ = pipeline_param['MONGO_URI']
print("mongo_uri",mongo_uri_)
project_id_ = pipeline_param['PROJECT_ID']
query_filter_ = pipeline_param['QUERY_FILTER']
try:
class MongoConstants:
# DB
db = db_
# collections
collection = collection_
class Mongo:
mongo_uri = mongo_uri_
project_id = project_id_
query_filter = query_filter_
class MongoConnect:
def __init__(self, uri, database, collection):
try:
self.uri = uri
self.client = MongoClient(self.uri, connect=False)
self.database = database
self.collection = collection
except Exception as e:
logger.exception(f'Exception - {e}')
@staticmethod
def data_dict(data, city):
try:
req_dict = dict()
req_dict['project_id'] = Mongo.project_id
req_dict['id'] = Mongo.query_filter
req_dict['city'] = city
req_dict['input_data'] = data
return req_dict
except Exception as e:
logger.exception(f'Exception - {e}')
def insert_one(self, data, city):
try:
db = self.client[self.database]
collection = db[self.collection]
req_dict = self.data_dict(data=data, city=city)
response = collection.insert_one(req_dict)
return response.inserted_id
except Exception as e:
logger.exception(f'Exception - {e}')
def find_one(self, query, filter_dict=None):
try:
if filter_dict is None:
filter_dict = {"_id": 0}
db = self.client[self.database]
collection = db[self.collection]
response = collection.find_one(query, filter_dict)
return response
except Exception as e:
logger.exception(f'Exception - {e}')
mongo_conn = MongoConnect(uri=Mongo.mongo_uri, database=MongoConstants.db,
collection=MongoConstants.collection)
if mongo_conn is None:
logger.info(f'mongodb is not connected, please check')
else:
logger.info(f'mongodb is connected, mongo conn - {mongo_conn}')
df_raw_tags = pd.DataFrame.from_dict(mongo_conn.find_one({"$and": [{"id": "bgrimm_string_level_tags"},
{"city": city},
{"tags_property": "raw"}]})
['input_data'], orient='index')
df_predicted_tags = pd.DataFrame.from_dict(mongo_conn.find_one({"$and": [{"id": "bgrimm_string_level_tags"},
{"city": city},
{"tags_property": "predicted"}]})
['input_data'], orient='index')
df_raw_tags.reset_index(inplace=True)
df_raw_tags.rename(columns={'index': 'tag_name'}, inplace=True)
df_predicted_tags.reset_index(inplace=True)
df_predicted_tags.rename(columns={'index': 'tag_name'}, inplace=True)
try:
# df_coefficients = pd.DataFrame.from_dict(
# mongo_conn.find_one({"$and": [{"id": "bgrimm_string_level_tags"},
# {"city": city},
# {"tags_property":
# "mppt_coefficients"}]})
# ['input_data'], orient='index')
df_coefficients = pd.DataFrame()
except Exception as er:
logger.exception(f"Coefficient dataframe unavailable with message: {er}")
df_coefficients = pd.DataFrame()
del mongo_conn
df_coefficients.reset_index(inplace=True)
df_coefficients.rename(columns={'index': 'inv_id_mppt_id'}, inplace=True)
tracemalloc.clear_traces()
tracemalloc.get_traced_memory()
final_dict = {"raw": df_raw_tags.to_dict('records'), "predicted": df_predicted_tags.to_dict('records'),
"coefficients": df_coefficients.to_dict('records')}
print(final_dict)
return final_dict
except Exception as e:
logger.exception(f'Exception - {e}')
def _serialize_json(obj) -> str:
if isinstance(obj, str):
return obj
import json
def default_serializer(obj):
if hasattr(obj, 'to_struct'):
return obj.to_struct()
else:
raise TypeError(
"Object of type '%s' is not JSON serializable and does not have .to_struct() method."
% obj.__class__.__name__)
return json.dumps(obj, default=default_serializer, sort_keys=True)
import json
import argparse
_parser = argparse.ArgumentParser(prog='Get tags function', description='')
_parser.add_argument("--pipeline-param", dest="pipeline_param", type=json.loads, required=True, default=argparse.SUPPRESS)
_parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1)
_parsed_args = vars(_parser.parse_args())
_output_files = _parsed_args.pop("_output_paths", [])
_outputs = get_tags_function(**_parsed_args)
_outputs = [_outputs]
_output_serializers = [
_serialize_json,
]
import os
for idx, output_file in enumerate(_output_files):
try:
os.makedirs(os.path.dirname(output_file))
except OSError:
pass
with open(output_file, 'w') as f:
f.write(_output_serializers[idx](_outputs[idx]))
args:
- --pipeline-param
- {inputValue: pipeline_param}
- '----output-paths'
- {outputPath: Output}
env:
MONGO_DB: 'ilens_ai'
MONGO_COLLECTION: 'bgrimmStringTags'
CITY: 'bgrimmchonburi'
loguru==0.5.3
pytz==2021.3
pymongo~=3.12.1
pandas==1.3.*
\ No newline at end of file
def get_tags_function(pipeline_param: dict) -> dict:
import pandas as pd
from loguru import logger
import warnings
import tracemalloc
import os
from pymongo import MongoClient
city = os.getenv("CITY")
db_ = os.getenv("MONGO_DB")
print(pipeline_param)
print("--",pipeline_param["MONGO_URI"])
# collections
collection_ = os.getenv("MONGO_COLLECTION")
mongo_uri_ = pipeline_param['MONGO_URI']
print("mongo_uri",mongo_uri_)
project_id_ = pipeline_param['PROJECT_ID']
query_filter_ = pipeline_param['QUERY_FILTER']
try:
class MongoConstants:
# DB
db = db_
# collections
collection = collection_
class Mongo:
mongo_uri = mongo_uri_
project_id = project_id_
query_filter = query_filter_
class MongoConnect:
def __init__(self, uri, database, collection):
try:
self.uri = uri
self.client = MongoClient(self.uri, connect=False)
self.database = database
self.collection = collection
except Exception as e:
logger.exception(f'Exception - {e}')
@staticmethod
def data_dict(data, city):
try:
req_dict = dict()
req_dict['project_id'] = Mongo.project_id
req_dict['id'] = Mongo.query_filter
req_dict['city'] = city
req_dict['input_data'] = data
return req_dict
except Exception as e:
logger.exception(f'Exception - {e}')
def insert_one(self, data, city):
try:
db = self.client[self.database]
collection = db[self.collection]
req_dict = self.data_dict(data=data, city=city)
response = collection.insert_one(req_dict)
return response.inserted_id
except Exception as e:
logger.exception(f'Exception - {e}')
def find_one(self, query, filter_dict=None):
try:
if filter_dict is None:
filter_dict = {"_id": 0}
db = self.client[self.database]
collection = db[self.collection]
response = collection.find_one(query, filter_dict)
return response
except Exception as e:
logger.exception(f'Exception - {e}')
mongo_conn = MongoConnect(uri=Mongo.mongo_uri, database=MongoConstants.db,
collection=MongoConstants.collection)
if mongo_conn is None:
logger.info(f'mongodb is not connected, please check')
else:
logger.info(f'mongodb is connected, mongo conn - {mongo_conn}')
df_raw_tags = pd.DataFrame.from_dict(mongo_conn.find_one({"$and": [{"id": "bgrimm_string_level_tags"},
{"city": city},
{"tags_property": "raw"}]})
['input_data'], orient='index')
df_predicted_tags = pd.DataFrame.from_dict(mongo_conn.find_one({"$and": [{"id": "bgrimm_string_level_tags"},
{"city": city},
{"tags_property": "predicted"}]})
['input_data'], orient='index')
df_raw_tags.reset_index(inplace=True)
df_raw_tags.rename(columns={'index': 'tag_name'}, inplace=True)
df_predicted_tags.reset_index(inplace=True)
df_predicted_tags.rename(columns={'index': 'tag_name'}, inplace=True)
try:
# df_coefficients = pd.DataFrame.from_dict(
# mongo_conn.find_one({"$and": [{"id": "bgrimm_string_level_tags"},
# {"city": city},
# {"tags_property":
# "mppt_coefficients"}]})
# ['input_data'], orient='index')
df_coefficients = pd.DataFrame()
except Exception as er:
logger.exception(f"Coefficient dataframe unavailable with message: {er}")
df_coefficients = pd.DataFrame()
del mongo_conn
df_coefficients.reset_index(inplace=True)
df_coefficients.rename(columns={'index': 'inv_id_mppt_id'}, inplace=True)
tracemalloc.clear_traces()
tracemalloc.get_traced_memory()
final_dict = {"raw": df_raw_tags.to_dict('records'), "predicted": df_predicted_tags.to_dict('records'),
"coefficients": df_coefficients.to_dict('records')}
print(final_dict)
return final_dict
except Exception as e:
logger.exception(f'Exception - {e}')
deployment:
environmentVar:
- name: MONGO_DB
value: "ilens_ai"
- name: MONGO_COLLECTION
value: "bgrimmStringTags"
- name: CITY
value: "bgrimmchonburi"
\ No newline at end of file
DEPLOYMENT_YAML = "pipeline.yml"
KUBEFLOW_URI = "https://kubeflow-qa.unifytwin.com/pipeline/"
LOGIN_TOKEN = "f45548dae2ed4c3882e9c13a62426560"
PIPELINE_NAME = " "
PIPELINE_VERSION = "v1.5"
EXPERIMENT_NAME = "Bgrimm Experiment"
RUN_NAME = " "
CRON_EXPRESSION = " "
RECURRING_RUN = false
CONF_LIST = conf.json
VARIABLES = true
IS_ALLOCATION = true
MEMORY_LIMIT = 1Gi
CPU_LIMIT = 0.5
MEMORY_REQUEST = 30Mi
CPU_REQUEST = 0.1
NAMESPACE = "project-099"
\ No newline at end of file
import kfp
from kfp import dsl
from kubernetes import client as k8s_client
from kubernetes.client import (V1EnvVar)
from loguru import logger
@dsl.pipeline(name="Dalmia", description="All Components")
def forecast_pipeline(pipeline_param: dict, plant_info: dict):
"""
:param pipeline_param:
:param plant_info:
:return:
"""
try:
# Loading the component from the above yaml file
get_tags_function_component = kfp.components.load_component_from_file(
"input_components/get_tags_component/component.yml")
# Calling the component
get_tags_function_task = get_tags_function_component(pipeline_param).set_memory_request('600M').set_memory_limit('1200M').\
set_cpu_request('700m').set_cpu_limit('1400m')
# get_tags_function_task.add_volume(k8s_client.V1Volume(
# name="get-data-volume",
# secret=k8s_client.V1SecretVolumeSource(secret_name="MONGO_URI"))
# )
#
# get_tags_function_task.add_env_variable(
# V1EnvVar(
# name="MONGO_URI",
# value_from=k8s_client.V1EnvVarSource(secret_key_ref=k8s_client.V1SecretKeySelector(
# name="mongo-uri",
# key="MONGO_URI"
# )
# )
# )
# )
# Disabling cacheing for all the components
get_tags_function_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
# read_from_kairos_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
# data_preprocess_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
# get_forecast_data_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
# write_data_processor_task_1.execution_options.caching_strategy.max_cache_staleness = "P0D"
# get_data_recaster_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
# write_data_processor_task_2.execution_options.caching_strategy.max_cache_staleness = "P0D"
# get_rolling_avg_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
# write_data_processor_task_3.execution_options.caching_strategy.max_cache_staleness = "P0D"
except Exception as e:
logger.exception(f"Unable to Perform the execution {e}")
if __name__ == "__main__":
kfp.compiler.Compiler().compile(forecast_pipeline, "pipeline.yml")
This diff is collapsed.
deployment:
environmentVar:
- name: MONGO_URI
value: "mongodb://admin:iLens#QAv513@192.168.0.217:30904/"
- name: APP_NAME
value: "audit-management"
- name: POSTGRES_URI
value: 'postgresql://ilens:iLens#4321@postgres-db-service.ilens-infra:5432/'
- name: KAFKA_HOST
value: "kafka-0.kafka-headless.ilens-infra.svc.cluster.local"
- name: KAFKA_PORT
value: "9092"
- name: KAFKA_TOPIC
value: ilens_dev
- name: REDIS_URI
value: 'redis://redis-db-service.ilens-infra:6379'
- name: KAIROS_URI
value: 'https://iLens:iLensCLD$456@cloud.ilens.io/kairos/'
- name: PROJECT_ID
value: "project_099"
- name: QUERY_FILTER
value: "bgrimm_string_level_tags"
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment