Commit 7cbcbe88 authored by dasharatha.vamshi's avatar dasharatha.vamshi

init

parent f073416e
import kfp
from kfp import dsl
from loguru import logger
@dsl.pipeline(name="Test", description="Test Component")
def test_pipeline():
"""
:return:
"""
try:
# Loading the component from the above yaml file
execute_sql_file_component = kfp.components.load_component_from_file("postgres/execute_sql/component.yml")
# Calling the component
create_execute_sql_file_component_task = execute_sql_file_component()
# Disabling cacheing for all the components
create_execute_sql_file_component_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
except Exception as e:
logger.exception(f"Unable to Perform the execution {e}")
if __name__ == "__main__":
kfp.compiler.Compiler().compile(test_pipeline, "postgres-pipeline.yml")
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: test-
annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.19, pipelines.kubeflow.org/pipeline_compilation_time: '2023-04-04T16:34:48.557149',
pipelines.kubeflow.org/pipeline_spec: '{"description": "Test Component", "name":
"Test"}'}
labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.19}
spec:
entrypoint: test
templates:
- name: execute-sql-component
container:
args: []
command:
- sh
- -c
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
'loguru==0.5.3' 'azure-storage-blob==12.14.1' 'psycopg2==2.9.1' || PIP_DISABLE_PIP_VERSION_CHECK=1
python3 -m pip install --quiet --no-warn-script-location 'loguru==0.5.3' 'azure-storage-blob==12.14.1'
'psycopg2==2.9.1' --user) && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def execute_sql_component():
import io
import warnings
import os
import psycopg2
from azure.storage.blob import BlobServiceClient
from loguru import logger
warnings.filterwarnings('ignore')
"""
Function to execute sql files
"""
uri = os.getenv("uri")
db_name = os.getenv("db_name")
azure_file_path = os.getenv("azure_file_path")
az_connection_string = os.getenv("az_connection_string")
az_container_name = os.getenv("az_container_name")
def execute_sql_file(conn, sql_file_data):
"""
Execute a SQL script file using the provided PostgreSQL connection.
Parameters:
conn (psycopg2.extensions.connection): The PostgreSQL connection object.
sql_file_data (str): The SQL script file data.
Returns:
None
"""
with conn.cursor() as cur:
cur.execute(sql_file_data)
conn.commit()
def get_azure_blob_data(connection_string, container_name, blob_path):
logger.info(f"Getting azure blob data from {blob_path}")
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service_client.get_container_client(container_name)
blob_client = container_client.get_blob_client(blob_path)
blob_data = blob_client.download_blob().readall()
text_wrapper = io.TextIOWrapper(io.BytesIO(blob_data), encoding='utf-8')
sql_query = text_wrapper.read()
return sql_query
postgres_uri = f"{uri}/{db_name}"
logger.info("Started Executing the files...")
logger.info(f"Files to execute are {azure_file_path}")
try:
connection = psycopg2.connect(postgres_uri)
logger.info("Connection Successful")
files_list = azure_file_path.split(',')
for file in files_list:
file_data = get_azure_blob_data(az_connection_string, az_container_name, file)
logger.info(f"Executing script of file {file}")
execute_sql_file(connection, file_data)
# Close the connection
connection.close()
except Exception as e:
logger.error(f"Failed because of error {e}")
import argparse
_parser = argparse.ArgumentParser(prog='Execute sql component', description='')
_parsed_args = vars(_parser.parse_args())
_outputs = execute_sql_component(**_parsed_args)
env:
- {name: db_name, value: project_181__batch_details}
- {name: uri, value: 'postgresql://ilens:iLens#4321@192.168.0.217:30909'}
- {name: azure_file_path, value: 'kubeflow_test/delete_view.sql,kubeflow_test/create_view.sql'}
- {name: az_connection_string, value: DefaultEndpointsProtocol=https;AccountName=azrmlilensqa006382180551;AccountKey=tDGOKfiZ2svfoMvVmS0Fbpf0FTHfTq4wKYuDX7cAxlhve/3991QuzdvJHm9vWc+lo6mtC+x9yPSghWNR4+gacg==;EndpointSuffix=core.windows.net}
- {name: az_container_name, value: kubeflow-test}
image: python:3.7
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.19
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"args": [], "command": ["sh", "-c", "(PIP_DISABLE_PIP_VERSION_CHECK=1 python3
-m pip install --quiet --no-warn-script-location ''loguru==0.5.3'' ''azure-storage-blob==12.14.1''
''psycopg2==2.9.1'' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install
--quiet --no-warn-script-location ''loguru==0.5.3'' ''azure-storage-blob==12.14.1''
''psycopg2==2.9.1'' --user) && \"$0\" \"$@\"", "sh", "-ec", "program_path=$(mktemp)\nprintf
\"%s\" \"$0\" > \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n",
"def execute_sql_component():\n import io\n import warnings\n import
os\n import psycopg2\n from azure.storage.blob import BlobServiceClient\n from
loguru import logger\n\n warnings.filterwarnings(''ignore'')\n \"\"\"\n Function
to execute sql files\n \"\"\"\n uri = os.getenv(\"uri\")\n db_name
= os.getenv(\"db_name\")\n azure_file_path = os.getenv(\"azure_file_path\")\n az_connection_string
= os.getenv(\"az_connection_string\")\n az_container_name = os.getenv(\"az_container_name\")\n\n def
execute_sql_file(conn, sql_file_data):\n \"\"\"\n Execute
a SQL script file using the provided PostgreSQL connection.\n\n Parameters:\n conn
(psycopg2.extensions.connection): The PostgreSQL connection object.\n sql_file_data
(str): The SQL script file data.\n\n Returns:\n None\n \"\"\"\n with
conn.cursor() as cur:\n cur.execute(sql_file_data)\n conn.commit()\n\n def
get_azure_blob_data(connection_string, container_name, blob_path):\n logger.info(f\"Getting
azure blob data from {blob_path}\")\n blob_service_client = BlobServiceClient.from_connection_string(connection_string)\n container_client
= blob_service_client.get_container_client(container_name)\n blob_client
= container_client.get_blob_client(blob_path)\n blob_data = blob_client.download_blob().readall()\n text_wrapper
= io.TextIOWrapper(io.BytesIO(blob_data), encoding=''utf-8'')\n sql_query
= text_wrapper.read()\n return sql_query\n\n postgres_uri = f\"{uri}/{db_name}\"\n logger.info(\"Started
Executing the files...\")\n logger.info(f\"Files to execute are {azure_file_path}\")\n try:\n connection
= psycopg2.connect(postgres_uri)\n logger.info(\"Connection Successful\")\n files_list
= azure_file_path.split('','')\n for file in files_list:\n file_data
= get_azure_blob_data(az_connection_string, az_container_name, file)\n logger.info(f\"Executing
script of file {file}\")\n execute_sql_file(connection, file_data)\n #
Close the connection\n connection.close()\n except Exception as
e:\n logger.error(f\"Failed because of error {e}\")\n\nimport argparse\n_parser
= argparse.ArgumentParser(prog=''Execute sql component'', description='''')\n_parsed_args
= vars(_parser.parse_args())\n\n_outputs = execute_sql_component(**_parsed_args)\n"],
"env": {"az_connection_string": "DefaultEndpointsProtocol=https;AccountName=azrmlilensqa006382180551;AccountKey=tDGOKfiZ2svfoMvVmS0Fbpf0FTHfTq4wKYuDX7cAxlhve/3991QuzdvJHm9vWc+lo6mtC+x9yPSghWNR4+gacg==;EndpointSuffix=core.windows.net",
"az_container_name": "kubeflow-test", "azure_file_path": "kubeflow_test/delete_view.sql,kubeflow_test/create_view.sql",
"db_name": "project_181__batch_details", "uri": "postgresql://ilens:iLens#4321@192.168.0.217:30909"},
"image": "python:3.7"}}, "name": "Execute sql component"}', pipelines.kubeflow.org/component_ref: '{"digest":
"db935ea8300fd1188fc7568b405b18fa001db9d1f074bf03db7906467c1ca575", "url":
"postgres/execute_sql/component.yml"}', pipelines.kubeflow.org/max_cache_staleness: P0D}
- name: test
dag:
tasks:
- {name: execute-sql-component, template: execute-sql-component}
arguments:
parameters: []
serviceAccountName: pipeline-runner
# Execute sql component
## Overview
- **Component Name** : Execute sql component
- **Component Description** :
- **Component Type** : Input type
## Component Param
Variable Name |Datatype |Required/Optional |Default Value |Type |Description |Example
--- |--- |--- |--- |--- |--- |--- |
db_name |String |Required |None |env | |
uri |String |Required |None |env | |
azure_file_path |String |Required |None |env | |
az_connection_string |String |Required |None |env | |
az_container_name |String |Required |None |env | |
> Note 1 : Available Component types are: Input, Transform, Output.
> Note 2 : Available Environment types are: env, InputValues, InputPath, OutputValues, OutputPath, PipelineParm
\ No newline at end of file
import kfp
from loguru import logger
from src import program
import yaml
import inspect
import os
function = \
[func[1] for func in inspect.getmembers(program, inspect.isfunction) if inspect.getmodule(func[1]) == program][0]
def read_data_from_yaml(path):
"""
It opens the file at the given path, reads the contents, and then parses the contents as YAML
:param path: The path to the YAML file
:return: A dictionary
"""
with open(path, "r") as stream:
return yaml.load(stream, Loader=yaml.FullLoader)
def get_component_yml():
"""
:param file_name:
:return:
"""
try:
requirements = list()
with open('requirements.txt', 'r') as file:
for line in file:
if "=" in line and "#" not in line:
requirements.append(line.strip())
elif "#" in line:
...
else:
logger.exception(f"Mentioned package does not have version {line.strip()}")
date_function_yml = kfp.components.func_to_component_text(
function, packages_to_install=requirements)
variables_path = "variables.yml"
if os.path.exists(variables_path):
yaml_data: dict = read_data_from_yaml(variables_path)
if yaml_data:
envs: dict = yaml_data.get("deployment", {}).get("environmentVar", [])
date_function = date_function_yml + f" env:\n"
for env_var in envs:
date_function += f" {env_var['name']}: '{env_var['value']}'\n"
with open('component.yml', 'w') as file:
file.write(date_function)
else:
with open('component.yml', 'w') as file:
file.write(date_function_yml)
else:
with open('component.yml', 'w') as file:
file.write(date_function_yml)
except Exception as e:
logger.exception(f"Unable to get the component yml {e}")
def create_table(data, key):
"""
:return:
"""
try:
rows_list = list()
for each_input in data.get(key, []):
rows_dict = dict()
rows_dict['name'] = each_input.get("name", '')
rows_dict['data_type'] = each_input.get('type', 'String')
if each_input.get('optional'):
req_opt = "Optional"
default_value = each_input.get('default', '')
else:
req_opt = "Required"
default_value = "None"
rows_dict['req_opt'] = req_opt
rows_dict['default_value'] = default_value
for each_arg in data.get('implementation', {}).get('container', {}).get('args', []):
if type(each_arg) == dict and rows_dict['name'] in each_arg.values():
rows_dict['Type'] = list(each_arg.keys())[0]
rows_dict['Description'] = each_input.get('description', '')
rows_dict['Example'] = ''
rows_list.append(list(rows_dict.values()))
if key == "inputs" and os.path.exists("variables.yml"):
yaml_data: dict = read_data_from_yaml("variables.yml")
if yaml_data:
env_var = yaml_data.get("deployment", {}).get("environmentVar", [])
for each in env_var:
env_dict = dict()
env_dict['name'] = each.get("name")
env_dict['data_type'] = "String"
env_dict['req_opt'] = "Required"
env_dict['default_value'] = "None"
env_dict['Type'] = "env"
env_dict['description'] = ""
env_dict['example'] = ""
rows_list.append(list(env_dict.values()))
return rows_list
except Exception as e:
logger.exception(f"Unable to create the table for README.MD file {e}")
def create_readme():
"""
Function is to create the readme file for the given components details
:return: Create the README.MD file in the given path
"""
try:
note_1 = "Note 1 : Available Component types are: Input, Transform, Output."
note_2 = "Note 2 : Available Environment types are: env, InputValues, InputPath, OutputValues," \
" OutputPath, PipelineParm"
column_list = ["Variable Name", "Datatype", "Required/Optional", "Default Value", "Type", "Description",
"Example"]
with open("component.yml", "r") as file:
data = yaml.safe_load(file)
if "inputs" in list(data.keys()) and "outputs" in list(data.keys()):
component_type = "Transform type"
elif "inputs" not in data:
component_type = "Input type"
else:
component_type = "Output type"
component_overview_json = dict()
component_overview_json['Component Name'] = data.get("name", " ")
component_overview_json['Component Description'] = data.get("description", " ")
component_overview_json['Component Type'] = component_type
rows_list_input = create_table(data, "inputs")
rows_list_output = create_table(data, "outputs")
rows_list = rows_list_input + rows_list_output
header = component_overview_json.get("Component Name")
table_header = " |".join(column_list) + "\n"
table_line = "--- |" * len(column_list) + "\n"
table_body = "\n".join(map(lambda x: " |".join(x), rows_list))
table = table_header + table_line + table_body
readme = f"""
# {header}
## {"Overview"}
- **Component Name** : {component_overview_json.get("Component Name")}
- **Component Description** : {component_overview_json.get("Component Description")}
- **Component Type** : {component_overview_json.get("Component Type")}
## Component Param
{table}
> {note_1}
> {note_2}
"""
with open('README.md', 'w') as f:
f.write(readme)
except Exception as e:
logger.exception(f"Unable to create the README.MD file {e}")
if __name__ == "__main__":
get_component_yml()
create_readme()
name: Execute sql component
implementation:
container:
image: python:3.7
command:
- sh
- -c
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
'loguru==0.5.3' 'azure-storage-blob==12.14.1' 'psycopg2==2.9.1' || PIP_DISABLE_PIP_VERSION_CHECK=1
python3 -m pip install --quiet --no-warn-script-location 'loguru==0.5.3' 'azure-storage-blob==12.14.1'
'psycopg2==2.9.1' --user) && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def execute_sql_component():
import io
import warnings
import os
import psycopg2
from azure.storage.blob import BlobServiceClient
from loguru import logger
warnings.filterwarnings('ignore')
"""
Function to execute sql files
"""
uri = os.getenv("uri")
db_name = os.getenv("db_name")
azure_file_path = os.getenv("azure_file_path")
az_connection_string = os.getenv("az_connection_string")
az_container_name = os.getenv("az_container_name")
def execute_sql_file(conn, sql_file_data):
"""
Execute a SQL script file using the provided PostgreSQL connection.
Parameters:
conn (psycopg2.extensions.connection): The PostgreSQL connection object.
sql_file_data (str): The SQL script file data.
Returns:
None
"""
with conn.cursor() as cur:
cur.execute(sql_file_data)
conn.commit()
def get_azure_blob_data(connection_string, container_name, blob_path):
logger.info(f"Getting azure blob data from {blob_path}")
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service_client.get_container_client(container_name)
blob_client = container_client.get_blob_client(blob_path)
blob_data = blob_client.download_blob().readall()
text_wrapper = io.TextIOWrapper(io.BytesIO(blob_data), encoding='utf-8')
sql_query = text_wrapper.read()
return sql_query
postgres_uri = f"{uri}/{db_name}"
logger.info("Started Executing the files...")
logger.info(f"Files to execute are {azure_file_path}")
try:
connection = psycopg2.connect(postgres_uri)
logger.info("Connection Successful")
files_list = azure_file_path.split(',')
for file in files_list:
file_data = get_azure_blob_data(az_connection_string, az_container_name, file)
logger.info(f"Executing script of file {file}")
execute_sql_file(connection, file_data)
# Close the connection
connection.close()
except Exception as e:
logger.error(f"Failed because of error {e}")
import argparse
_parser = argparse.ArgumentParser(prog='Execute sql component', description='')
_parsed_args = vars(_parser.parse_args())
_outputs = execute_sql_component(**_parsed_args)
args: []
env:
db_name: 'project_181__batch_details'
uri: 'postgresql://ilens:iLens#4321@192.168.0.217:30909'
azure_file_path: 'kubeflow_test/delete_view.sql,kubeflow_test/create_view.sql'
az_connection_string: 'DefaultEndpointsProtocol=https;AccountName=azrmlilensqa006382180551;AccountKey=tDGOKfiZ2svfoMvVmS0Fbpf0FTHfTq4wKYuDX7cAxlhve/3991QuzdvJHm9vWc+lo6mtC+x9yPSghWNR4+gacg==;EndpointSuffix=core.windows.net'
az_container_name: 'kubeflow-test'
loguru==0.5.3
azure-storage-blob==12.14.1
psycopg2==2.9.1
\ No newline at end of file
def execute_sql_component():
import io
import warnings
import os
import psycopg2
from azure.storage.blob import BlobServiceClient
from loguru import logger
warnings.filterwarnings('ignore')
"""
Function to execute sql files
"""
uri = os.getenv("uri")
db_name = os.getenv("db_name")
azure_file_path = os.getenv("azure_file_path")
az_connection_string = os.getenv("az_connection_string")
az_container_name = os.getenv("az_container_name")
def execute_sql_file(conn, sql_file_data):
"""
Execute a SQL script file using the provided PostgreSQL connection.
Parameters:
conn (psycopg2.extensions.connection): The PostgreSQL connection object.
sql_file_data (str): The SQL script file data.
Returns:
None
"""
with conn.cursor() as cur:
cur.execute(sql_file_data)
conn.commit()
def get_azure_blob_data(connection_string, container_name, blob_path):
logger.info(f"Getting azure blob data from {blob_path}")
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service_client.get_container_client(container_name)
blob_client = container_client.get_blob_client(blob_path)
blob_data = blob_client.download_blob().readall()
text_wrapper = io.TextIOWrapper(io.BytesIO(blob_data), encoding='utf-8')
sql_query = text_wrapper.read()
return sql_query
postgres_uri = f"{uri}/{db_name}"
logger.info("Started Executing the files...")
logger.info(f"Files to execute are {azure_file_path}")
try:
connection = psycopg2.connect(postgres_uri)
logger.info("Connection Successful")
files_list = azure_file_path.split(',')
for file in files_list:
file_data = get_azure_blob_data(az_connection_string, az_container_name, file)
logger.info(f"Executing script of file {file}")
execute_sql_file(connection, file_data)
# Close the connection
connection.close()
except Exception as e:
logger.error(f"Failed because of error {e}")
deployment:
environmentVar:
- name: db_name
value: 'project_181__batch_details'
- name: uri
value: "postgresql://ilens:iLens#4321@192.168.0.217:30909"
- name: azure_file_path
value: 'kubeflow_test/delete_view.sql,kubeflow_test/create_view.sql'
- name: az_connection_string
value: 'DefaultEndpointsProtocol=https;AccountName=azrmlilensqa006382180551;AccountKey=tDGOKfiZ2svfoMvVmS0Fbpf0FTHfTq4wKYuDX7cAxlhve/3991QuzdvJHm9vWc+lo6mtC+x9yPSghWNR4+gacg==;EndpointSuffix=core.windows.net'
- name: az_container_name
value: 'kubeflow-test'
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment