Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
B
bgrimm-string-inference
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
CI / CD Analytics
Repository Analytics
Value Stream Analytics
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
dasharatha.vamshi
bgrimm-string-inference
Commits
95c1d2b6
Commit
95c1d2b6
authored
Sep 18, 2023
by
dasharatha.vamshi
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
added get tags
parent
b34dc33e
Changes
4
Show whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
185 additions
and
151 deletions
+185
-151
input_components/get_tags_component/component.yml
input_components/get_tags_component/component.yml
+40
-28
input_components/get_tags_component/src/__pycache__/program.cpython-39.pyc
...get_tags_component/src/__pycache__/program.cpython-39.pyc
+0
-0
input_components/get_tags_component/src/program.py
input_components/get_tags_component/src/program.py
+40
-29
pipeline.yml
pipeline.yml
+105
-94
No files found.
input_components/get_tags_component/component.yml
View file @
95c1d2b6
...
...
@@ -30,14 +30,28 @@ implementation:
city = os.getenv("CITY")
db_ = os.getenv("MONGO_DB")
print(pipeline_param)
print("--",pipeline_param["MONGO_URI"])
print("--",
pipeline_param["MONGO_URI"])
# collections
collection_ = os.getenv("MONGO_COLLECTION")
mongo_uri_ = pipeline_param['MONGO_URI']
print("mongo_uri",mongo_uri_)
print("mongo_uri",
mongo_uri_)
project_id_ = pipeline_param['PROJECT_ID']
query_filter_ = pipeline_param['QUERY_FILTER']
try:
class CommonConstants:
bgrimm_string_level_tags = 'bgrimm_string_level_tags'
panel_id = 'panel_id'
sub_id = 'sub_id'
inv_id_mppt_id = 'inv_id_mppt_id'
tags_property_raw = 'raw'
tags_property_predicted = 'predicted'
tags_property_efficiency = 'efficiency'
bgrim_tags_property_efficiency = 'Efficiency'
tags_property_efficiency_inv = 'efficiency'
tags_property_efficiency_plant = 'efficiency_plant'
mppt_coefficients = 'mppt_coefficients'
class MongoConstants:
# DB
db = db_
...
...
@@ -92,49 +106,47 @@ implementation:
except Exception as e:
logger.exception(f'Exception - {e}')
tracemalloc.clear_traces()
mongo_conn = MongoConnect(uri=Mongo.mongo_uri, database=MongoConstants.db,
collection=MongoConstants.collection)
if mongo_conn is None:
logger.info(f'mongodb is not connected, please check')
else:
logger.info(f'mongodb is connected, mongo conn - {mongo_conn}')
tracemalloc.clear_traces()
logger.info(f'mongo conn - {mongo_conn}')
df_raw_tags = pd.DataFrame.from_dict(mongo_conn.find_one({"$and": [
{"id": CommonConstants.bgrimm_string_level_tags}, {"city": city},
{"tags_property": CommonConstants.tags_property_raw}]})['input_data'], orient='index')
df_raw_tags = pd.DataFrame.from_dict(mongo_conn.find_one({"$and": [{"id": "bgrimm_string_level_tags"},
{"city": city},
{"tags_property": "raw"}]})
['input_data'], orient='index')
df_predicted_tags = pd.DataFrame.from_dict(mongo_conn.find_one({"$and": [
{"id": CommonConstants.bgrimm_string_level_tags}, {"city": city},
{"tags_property": CommonConstants.tags_property_predicted}]})['input_data'], orient='index')
df_predicted_tags = pd.DataFrame.from_dict(mongo_conn.find_one({"$and": [{"id": "bgrimm_string_level_tags"},
{"city": city},
{"tags_property": "predicted"}]})
['input_data'], orient='index')
df_efficiency_tags = pd.DataFrame.from_dict(mongo_conn.find_one({"$and": [
{"id": CommonConstants.bgrimm_string_level_tags}, {"city": city},
{"tags_property": CommonConstants.tags_property_efficiency}]})['input_data'], orient='index')
df_raw_tags.reset_index(inplace=True)
df_raw_tags.rename(columns={'index': 'tag_name'}, inplace=True)
df_predicted_tags.reset_index(inplace=True)
df_predicted_tags.rename(columns={'index': 'tag_name'}, inplace=True)
df_efficiency_tags.reset_index(inplace=True)
df_efficiency_tags.rename(columns={'index': 'tag_name'}, inplace=True)
try:
# df_coefficients = pd.DataFrame.from_dict(
# mongo_conn.find_one({"$and": [{"id": "bgrimm_string_level_tags"},
# {"city": city},
# {"tags_property":
# "mppt_coefficients"}]})
# ['input_data'], orient='index')
df_coefficients = pd.DataFrame()
except Exception as er:
logger.exception(f"Coefficient dataframe unavailable with message: {er}")
df_coefficients = pd.DataFrame()
del mongo_conn
# df_coefficients = pd.DataFrame.from_dict(mongo_conn.find_one(
# {"$and": [{"id": CommonConstants.bgrimm_string_level_tags}, {"city": city},
# {"tags_property": CommonConstants.mppt_coefficients}]})['input_data'], orient='index')
df_coefficients.reset_index(inplace=True)
df_coefficients.rename(columns={'index': 'inv_id_mppt_id'
}, inplace=True)
#
df_coefficients.reset_index(inplace=True)
# df_coefficients.rename(columns={'index': CommonConstants.inv_id_mppt_id
}, inplace=True)
df_coefficients = pd.DataFrame()
tracemalloc.clear_traces()
tracemalloc.get_traced_memory()
del mongo_conn
final_dict = {"raw": df_raw_tags.to_dict('records'), "predicted": df_predicted_tags.to_dict('records'),
"coefficients": df_coefficients.to_dict('records')}
"coefficients": df_coefficients.to_dict('records'),
"efficiency": df_efficiency_tags.to_dict('records')}
print(final_dict)
return final_dict
except Exception as e:
...
...
input_components/get_tags_component/src/__pycache__/program.cpython-39.pyc
View file @
95c1d2b6
No preview for this file type
input_components/get_tags_component/src/program.py
View file @
95c1d2b6
def
get_tags_function
(
pipeline_param
:
dict
)
->
dict
:
import
pandas
as
pd
from
loguru
import
logger
...
...
@@ -9,14 +8,28 @@ def get_tags_function(pipeline_param: dict) -> dict:
city
=
os
.
getenv
(
"CITY"
)
db_
=
os
.
getenv
(
"MONGO_DB"
)
print
(
pipeline_param
)
print
(
"--"
,
pipeline_param
[
"MONGO_URI"
])
print
(
"--"
,
pipeline_param
[
"MONGO_URI"
])
# collections
collection_
=
os
.
getenv
(
"MONGO_COLLECTION"
)
mongo_uri_
=
pipeline_param
[
'MONGO_URI'
]
print
(
"mongo_uri"
,
mongo_uri_
)
print
(
"mongo_uri"
,
mongo_uri_
)
project_id_
=
pipeline_param
[
'PROJECT_ID'
]
query_filter_
=
pipeline_param
[
'QUERY_FILTER'
]
try
:
class
CommonConstants
:
bgrimm_string_level_tags
=
'bgrimm_string_level_tags'
panel_id
=
'panel_id'
sub_id
=
'sub_id'
inv_id_mppt_id
=
'inv_id_mppt_id'
tags_property_raw
=
'raw'
tags_property_predicted
=
'predicted'
tags_property_efficiency
=
'efficiency'
bgrim_tags_property_efficiency
=
'Efficiency'
tags_property_efficiency_inv
=
'efficiency'
tags_property_efficiency_plant
=
'efficiency_plant'
mppt_coefficients
=
'mppt_coefficients'
class
MongoConstants
:
# DB
db
=
db_
...
...
@@ -71,49 +84,47 @@ def get_tags_function(pipeline_param: dict) -> dict:
except
Exception
as
e
:
logger
.
exception
(
f
'Exception - {e}'
)
tracemalloc
.
clear_traces
()
mongo_conn
=
MongoConnect
(
uri
=
Mongo
.
mongo_uri
,
database
=
MongoConstants
.
db
,
collection
=
MongoConstants
.
collection
)
if
mongo_conn
is
None
:
logger
.
info
(
f
'mongodb is not connected, please check'
)
else
:
logger
.
info
(
f
'mongodb is connected, mongo conn - {mongo_conn}'
)
tracemalloc
.
clear_traces
()
logger
.
info
(
f
'mongo conn - {mongo_conn}'
)
df_raw_tags
=
pd
.
DataFrame
.
from_dict
(
mongo_conn
.
find_one
({
"$and"
:
[
{
"id"
:
CommonConstants
.
bgrimm_string_level_tags
},
{
"city"
:
city
},
{
"tags_property"
:
CommonConstants
.
tags_property_raw
}]})[
'input_data'
],
orient
=
'index'
)
df_raw_tags
=
pd
.
DataFrame
.
from_dict
(
mongo_conn
.
find_one
({
"$and"
:
[{
"id"
:
"bgrimm_string_level_tags"
},
{
"city"
:
city
},
{
"tags_property"
:
"raw"
}]})
[
'input_data'
],
orient
=
'index'
)
df_predicted_tags
=
pd
.
DataFrame
.
from_dict
(
mongo_conn
.
find_one
({
"$and"
:
[
{
"id"
:
CommonConstants
.
bgrimm_string_level_tags
},
{
"city"
:
city
},
{
"tags_property"
:
CommonConstants
.
tags_property_predicted
}]})[
'input_data'
],
orient
=
'index'
)
df_predicted_tags
=
pd
.
DataFrame
.
from_dict
(
mongo_conn
.
find_one
({
"$and"
:
[{
"id"
:
"bgrimm_string_level_tags"
},
{
"city"
:
city
},
{
"tags_property"
:
"predicted"
}]})
[
'input_data'
],
orient
=
'index'
)
df_efficiency_tags
=
pd
.
DataFrame
.
from_dict
(
mongo_conn
.
find_one
({
"$and"
:
[
{
"id"
:
CommonConstants
.
bgrimm_string_level_tags
},
{
"city"
:
city
},
{
"tags_property"
:
CommonConstants
.
tags_property_efficiency
}]})[
'input_data'
],
orient
=
'index'
)
df_raw_tags
.
reset_index
(
inplace
=
True
)
df_raw_tags
.
rename
(
columns
=
{
'index'
:
'tag_name'
},
inplace
=
True
)
df_predicted_tags
.
reset_index
(
inplace
=
True
)
df_predicted_tags
.
rename
(
columns
=
{
'index'
:
'tag_name'
},
inplace
=
True
)
df_efficiency_tags
.
reset_index
(
inplace
=
True
)
df_efficiency_tags
.
rename
(
columns
=
{
'index'
:
'tag_name'
},
inplace
=
True
)
try
:
# df_coefficients = pd.DataFrame.from_dict(
# mongo_conn.find_one({"$and": [{"id": "bgrimm_string_level_tags"},
# {"city": city},
# {"tags_property":
# "mppt_coefficients"}]})
# ['input_data'], orient='index')
df_coefficients
=
pd
.
DataFrame
()
except
Exception
as
er
:
logger
.
exception
(
f
"Coefficient dataframe unavailable with message: {er}"
)
df_coefficients
=
pd
.
DataFrame
()
del
mongo_conn
# df_coefficients = pd.DataFrame.from_dict(mongo_conn.find_one(
# {"$and": [{"id": CommonConstants.bgrimm_string_level_tags}, {"city": city},
# {"tags_property": CommonConstants.mppt_coefficients}]})['input_data'], orient='index')
df_coefficients
.
reset_index
(
inplace
=
True
)
df_coefficients
.
rename
(
columns
=
{
'index'
:
'inv_id_mppt_id'
},
inplace
=
True
)
#
df_coefficients.reset_index(inplace=True)
# df_coefficients.rename(columns={'index': CommonConstants.inv_id_mppt_id
}, inplace=True)
df_coefficients
=
pd
.
DataFrame
()
tracemalloc
.
clear_traces
()
tracemalloc
.
get_traced_memory
()
del
mongo_conn
final_dict
=
{
"raw"
:
df_raw_tags
.
to_dict
(
'records'
),
"predicted"
:
df_predicted_tags
.
to_dict
(
'records'
),
"coefficients"
:
df_coefficients
.
to_dict
(
'records'
)}
"coefficients"
:
df_coefficients
.
to_dict
(
'records'
),
"efficiency"
:
df_efficiency_tags
.
to_dict
(
'records'
)}
print
(
final_dict
)
return
final_dict
except
Exception
as
e
:
...
...
pipeline.yml
View file @
95c1d2b6
...
...
@@ -3,7 +3,7 @@ kind: Workflow
metadata
:
annotations
:
pipelines.kubeflow.org/kfp_sdk_version
:
1.8.18
pipelines.kubeflow.org/pipeline_compilation_time
:
'
2023-09-18T1
5:19:35.000810
'
pipelines.kubeflow.org/pipeline_compilation_time
:
'
2023-09-18T1
8:30:46.812596
'
pipelines.kubeflow.org/pipeline_spec
:
'
{"description":
"All
Components",
"inputs":
[{"description":
"",
"name":
"pipeline_param",
"type":
"JsonObject"},
{"description":
"",
"name":
"plant_info",
"type":
"JsonObject"}],
"name":
"Dalmia"}'
...
...
@@ -56,29 +56,36 @@ spec:
\
loguru
import
logger
\n
import
warnings
\n
import
tracemalloc
\n
import
\
\
os
\n
from
pymongo
import
MongoClient
\n
city
=
os.getenv(
\"
CITY
\"
)
\n\
\
db_
=
os.getenv(
\"
MONGO_DB
\"
)
\n
print(pipeline_param)
\n
print(
\"\
--
\"
,pipeline_param[
\"
MONGO_URI
\"
])
\n
#
collections
\n
collection_
=
\
--
\"
,
pipeline_param[
\"
MONGO_URI
\"
])
\n
#
collections
\n
collection_
=
\
\
os.getenv(
\"
MONGO_COLLECTION
\"
)
\n
mongo_uri_
=
pipeline_param['MONGO_URI']
\n\
\
print(
\"
mongo_uri
\"
,mongo_uri_)
\n
project_id_
=
pipeline_param['PROJECT_ID']
\n\
\
query_filter_
=
pipeline_param['QUERY_FILTER']
\n
try:
\n
class
\
\
MongoConstants:
\n
#
DB
\n
db
=
db_
\n
#
collections
\n\
\
collection
=
collection_
\n\n
class
Mongo:
\n
\
\
mongo_uri
=
mongo_uri_
\n
project_id
=
project_id_
\n
\
\
query_filter
=
query_filter_
\n\n
class
MongoConnect:
\n
\
\
def
__init__(self,
uri,
database,
collection):
\n
try:
\n\
\
self.uri
=
uri
\n
self.client
=
MongoClient(self.uri,
\
\
connect=False)
\n
self.database
=
database
\n
\
\
self.collection
=
collection
\n
except
Exception
\
\
as
e:
\n
logger.exception(f'Exception
-
{e}')
\n\n
\
\
@staticmethod
\n
def
data_dict(data,
city):
\n
\
\
try:
\n
req_dict
=
dict()
\n
req_dict['project_id']
\
\
=
Mongo.project_id
\n
req_dict['id']
=
Mongo.query_filter
\n\
\
req_dict['city']
=
city
\n
req_dict['input_data']
\
\
=
data
\n
return
req_dict
\n
except
Exception
\
\
as
e:
\n
logger.exception(f'Exception
-
{e}')
\n\n
\
\
def
insert_one(self,
data,
city):
\n
try:
\n
\
\
db
=
self.client[self.database]
\n
collection
\
\
=
db[self.collection]
\n
req_dict
=
self.data_dict(data=data,
\
\
city=city)
\n
response
=
collection.insert_one(req_dict)
\n\
\
print(
\"
mongo_uri
\"
,
mongo_uri_)
\n
project_id_
=
pipeline_param['PROJECT_ID']
\n\
\
query_filter_
=
pipeline_param['QUERY_FILTER']
\n
try:
\n\n
class
\
\
CommonConstants:
\n
bgrimm_string_level_tags
=
'bgrimm_string_level_tags'
\n\
\
panel_id
=
'panel_id'
\n
sub_id
=
'sub_id'
\n
\
\
inv_id_mppt_id
=
'inv_id_mppt_id'
\n
tags_property_raw
=
\
\
'raw'
\n
tags_property_predicted
=
'predicted'
\n
tags_property_efficiency
\
\
=
'efficiency'
\n
bgrim_tags_property_efficiency
=
'Efficiency'
\n\
\
tags_property_efficiency_inv
=
'efficiency'
\n
tags_property_efficiency_plant
\
\
=
'efficiency_plant'
\n
mppt_coefficients
=
'mppt_coefficients'
\n\
\n
class
MongoConstants:
\n
#
DB
\n
db
=
db_
\n\
\
#
collections
\n
collection
=
collection_
\n\n
\
\
class
Mongo:
\n
mongo_uri
=
mongo_uri_
\n
project_id
\
\
=
project_id_
\n
query_filter
=
query_filter_
\n\n
class
\
\
MongoConnect:
\n
def
__init__(self,
uri,
database,
collection):
\n\
\
try:
\n
self.uri
=
uri
\n
\
\
self.client
=
MongoClient(self.uri,
connect=False)
\n
\
\
self.database
=
database
\n
self.collection
=
collection
\n\
\
except
Exception
as
e:
\n
logger.exception(f'Exception
\
\
-
{e}')
\n\n
@staticmethod
\n
def
data_dict(data,
city):
\n\
\
try:
\n
req_dict
=
dict()
\n
\
\
req_dict['project_id']
=
Mongo.project_id
\n
req_dict['id']
\
\
=
Mongo.query_filter
\n
req_dict['city']
=
city
\n
\
\
req_dict['input_data']
=
data
\n
return
\
\
req_dict
\n
except
Exception
as
e:
\n
logger.exception(f'Exception
\
\
-
{e}')
\n\n
def
insert_one(self,
data,
city):
\n
\
\
try:
\n
db
=
self.client[self.database]
\n
\
\
collection
=
db[self.collection]
\n
req_dict
\
\
=
self.data_dict(data=data,
city=city)
\n
response
=
collection.insert_one(req_dict)
\n\
\
return
response.inserted_id
\n
except
Exception
\
\
as
e:
\n
logger.exception(f'Exception
-
{e}')
\n\n
\
\
def
find_one(self,
query,
filter_dict=None):
\n
try:
\n\
...
...
@@ -87,55 +94,52 @@ spec:
\
collection
=
db[self.collection]
\n
response
\
\
=
collection.find_one(query,
filter_dict)
\n
return
response
\n\
\
except
Exception
as
e:
\n
logger.exception(f'Exception
\
\
-
{e}')
\n\n
mongo_conn
=
MongoConnect(uri=Mongo.mongo_uri,
database=MongoConstants.db,
\n
\
\
collection=MongoConstants.collection)
\n\
\
-
{e}')
\n\n
tracemalloc.clear_traces()
\n
mongo_conn
=
MongoConnect(uri=Mongo.mongo_uri,
\
\
database=MongoConstants.db,
\n
collection=MongoConstants.collection)
\n\
\
if
mongo_conn
is
None:
\n
logger.info(f'mongodb
is
not
\
\
connected,
please
check')
\n
else:
\n
logger.info(f'mongodb
\
\
is
connected,
mongo
conn
-
{mongo_conn}')
\n\n
df_raw_tags
=
pd.DataFrame.from_dict(mongo_conn.find_one({
\"\
$and
\"
:
[{
\"
id
\"
:
\"
bgrimm_string_level_tags
\"
},
\n
\
\
{
\"
city
\"
:
city},
\n\
\
\
\
{
\"
tags_property
\"
:
\"
raw
\"
}]})
\n
\
\
['input_data'],
orient='index')
\n\n
df_predicted_tags
\
\
=
pd.DataFrame.from_dict(mongo_conn.find_one({
\"
$and
\"
:
[{
\"
id
\"
:
\"
bgrimm_string_level_tags
\"\
},
\n
\
\
{
\"
city
\"
:
city},
\n
\
\
{
\"
tags_property
\"
:
\"
predicted
\"\
}]})
\n
['input_data'],
\
\
connected,
please
check')
\n
else:
\n
tracemalloc.clear_traces()
\n\
\
logger.info(f'mongo
conn
-
{mongo_conn}')
\n\n
df_raw_tags
\
\
=
pd.DataFrame.from_dict(mongo_conn.find_one({
\"
$and
\"
:
[
\n
\
\
{
\"
id
\"
:
CommonConstants.bgrimm_string_level_tags},
{
\"
city
\"
:
city},
\n\
\
{
\"
tags_property
\"
:
CommonConstants.tags_property_raw}]})['input_data'],
\
\
orient='index')
\n\n
df_predicted_tags
=
pd.DataFrame.from_dict(mongo_conn.find_one({
\"\
$and
\"
:
[
\n
{
\"
id
\"
:
CommonConstants.bgrimm_string_level_tags},
\
\
{
\"
city
\"
:
city},
\n
{
\"
tags_property
\"
:
CommonConstants.tags_property_predicted}]})['input_data'],
\
\
orient='index')
\n\n
df_efficiency_tags
=
pd.DataFrame.from_dict(mongo_conn.find_one({
\"\
$and
\"
:
[
\n
{
\"
id
\"
:
CommonConstants.bgrimm_string_level_tags},
\
\
{
\"
city
\"
:
city},
\n
{
\"
tags_property
\"
:
CommonConstants.tags_property_efficiency}]})['input_data'],
\
\
orient='index')
\n\n
df_raw_tags.reset_index(inplace=True)
\n
\
\
df_raw_tags.rename(columns={'index':
'tag_name'},
inplace=True)
\n\
\
df_predicted_tags.reset_index(inplace=True)
\n
df_predicted_tags.rename(columns={'index':
\
\
'tag_name'},
inplace=True)
\n\n
try:
\n
#
df_coefficients
\
\
=
pd.DataFrame.from_dict(
\n
#
mongo_conn.find_one({
\"\
$and
\"
:
[{
\"
id
\"
:
\"
bgrimm_string_level_tags
\"
},
\n
#
\
\
{
\"
city
\"
:
city},
\n
#
\
\
{
\"
tags_property
\"
:
\n
#
\
\
\"
mppt_coefficients
\"
}]})
\n
\
\
#
['input_data'],
orient='index')
\n
df_coefficients
\
\
=
pd.DataFrame()
\n
except
Exception
as
er:
\n
logger.exception(f
\"\
Coefficient
dataframe
unavailable
with
message:
{er}
\"
)
\n
df_coefficients
\
\
=
pd.DataFrame()
\n\n
del
mongo_conn
\n\n
df_coefficients.reset_index(inplace=True)
\n\
\
df_coefficients.rename(columns={'index':
'inv_id_mppt_id'},
inplace=True)
\n\
\n
tracemalloc.clear_traces()
\n
tracemalloc.get_traced_memory()
\n\
\
final_dict
=
{
\"
raw
\"
:
df_raw_tags.to_dict('records'),
\"
predicted
\"\
:
df_predicted_tags.to_dict('records'),
\n
\"
coefficients
\"\
:
df_coefficients.to_dict('records')}
\n
print(final_dict)
\n
\
\
return
final_dict
\n
except
Exception
as
e:
\n
logger.exception(f'Exception
\
\
-
{e}')
\n\n
def
_serialize_json(obj)
->
str:
\n
if
isinstance(obj,
str):
\n\
\
return
obj
\n
import
json
\n\n
def
default_serializer(obj):
\n\
\
if
hasattr(obj,
'to_struct'):
\n
return
obj.to_struct()
\n\
\
else:
\n
raise
TypeError(
\n
\"
Object
of
\
\
type
'%s'
is
not
JSON
serializable
and
does
not
have
.to_struct()
method.
\"\
\n
%
obj.__class__.__name__)
\n\n
return
json.dumps(obj,
\
\
default=default_serializer,
sort_keys=True)
\n\n
import
json
\n
import
argparse
\n\
_parser
=
argparse.ArgumentParser(prog='Get
tags
function',
description='')
\n\
_parser.add_argument(
\"
--pipeline-param
\"
,
dest=
\"
pipeline_param
\"
,
type=json.loads,
\
\
required=True,
default=argparse.SUPPRESS)
\n
_parser.add_argument(
\"
----output-paths
\"\
,
dest=
\"
_output_paths
\"
,
type=str,
nargs=1)
\n
_parsed_args
=
vars(_parser.parse_args())
\n\
_output_files
=
_parsed_args.pop(
\"
_output_paths
\"
,
[])
\n\n
_outputs
=
get_tags_function(**_parsed_args)
\n\
\n
_outputs
=
[_outputs]
\n\n
_output_serializers
=
[
\n
_serialize_json,
\n\
\n
]
\n\n
import
os
\n
for
idx,
output_file
in
enumerate(_output_files):
\n
try:
\n\
\
os.makedirs(os.path.dirname(output_file))
\n
except
OSError:
\n\
\
'tag_name'},
inplace=True)
\n
df_efficiency_tags.reset_index(inplace=True)
\n\
\
df_efficiency_tags.rename(columns={'index':
'tag_name'},
inplace=True)
\n\
\n
#
df_coefficients
=
pd.DataFrame.from_dict(mongo_conn.find_one(
\n\
\
#
{
\"
$and
\"
:
[{
\"
id
\"
:
CommonConstants.bgrimm_string_level_tags},
\
\
{
\"
city
\"
:
city},
\n
#
{
\"
tags_property
\"
:
CommonConstants.mppt_coefficients}]})['input_data'],
\
\
orient='index')
\n\n
#
df_coefficients.reset_index(inplace=True)
\n\
\
#
df_coefficients.rename(columns={'index':
CommonConstants.inv_id_mppt_id},
\
\
inplace=True)
\n\n
df_coefficients
=
pd.DataFrame()
\n
\
\
tracemalloc.clear_traces()
\n
del
mongo_conn
\n
final_dict
\
\
=
{
\"
raw
\"
:
df_raw_tags.to_dict('records'),
\"
predicted
\"
:
df_predicted_tags.to_dict('records'),
\n\
\
\"
coefficients
\"
:
df_coefficients.to_dict('records'),
\n\
\
\"
efficiency
\"
:
df_efficiency_tags.to_dict('records')}
\n\
\
print(final_dict)
\n
return
final_dict
\n
except
\
\
Exception
as
e:
\n
logger.exception(f'Exception
-
{e}')
\n\n
def
_serialize_json(obj)
\
\
->
str:
\n
if
isinstance(obj,
str):
\n
return
obj
\n
import
json
\n\
\n
def
default_serializer(obj):
\n
if
hasattr(obj,
'to_struct'):
\n\
\
return
obj.to_struct()
\n
else:
\n
raise
TypeError(
\n\
\
\"
Object
of
type
'%s'
is
not
JSON
serializable
and
does
not
\
\
have
.to_struct()
method.
\"\n
%
obj.__class__.__name__)
\n\
\n
return
json.dumps(obj,
default=default_serializer,
sort_keys=True)
\n\
\n
import
json
\n
import
argparse
\n
_parser
=
argparse.ArgumentParser(prog='Get
\
\
tags
function',
description='')
\n
_parser.add_argument(
\"
--pipeline-param
\"\
,
dest=
\"
pipeline_param
\"
,
type=json.loads,
required=True,
default=argparse.SUPPRESS)
\n\
_parser.add_argument(
\"
----output-paths
\"
,
dest=
\"
_output_paths
\"
,
type=str,
\
\
nargs=1)
\n
_parsed_args
=
vars(_parser.parse_args())
\n
_output_files
=
_parsed_args.pop(
\"\
_output_paths
\"
,
[])
\n\n
_outputs
=
get_tags_function(**_parsed_args)
\n\n
_outputs
\
\
=
[_outputs]
\n\n
_output_serializers
=
[
\n
_serialize_json,
\n\n
]
\n\n
import
\
\
os
\n
for
idx,
output_file
in
enumerate(_output_files):
\n
try:
\n
\
\
os.makedirs(os.path.dirname(output_file))
\n
except
OSError:
\n
\
\
pass
\n
with
open(output_file,
'w')
as
f:
\n
f.write(_output_serializers[idx](_outputs[idx]))
\n
"
env
:
-
name
:
MONGO_DB
...
...
@@ -158,7 +162,7 @@ spec:
metadata
:
annotations
:
pipelines.kubeflow.org/arguments.parameters
:
'
{"pipeline_param":
"{{inputs.parameters.pipeline_param}}"}'
pipelines.kubeflow.org/component_ref
:
'
{"digest":
"
9e20b98ab76dc4ecd38d0caee7b8cddc09335ccb97bab77593a108981db06ae8
",
pipelines.kubeflow.org/component_ref
:
'
{"digest":
"
43d2ece064d7a148052c83f4fba205fbc195cf65444819b9815d7c50aedf011d
",
"url":
"input_components/get_tags_component/component.yml"}'
pipelines.kubeflow.org/component_spec
:
'
{"implementation":
{"container":
{"args":
["--pipeline-param",
{"inputValue":
"pipeline_param"},
"----output-paths",
...
...
@@ -171,10 +175,16 @@ spec:
-u
\"$program_path\"
\"$@\"\n",
"def
get_tags_function(pipeline_param):\n
import
pandas
as
pd\n
from
loguru
import
logger\n
import
warnings\n
import
tracemalloc\n
import
os\n
from
pymongo
import
MongoClient\n
city
=
os.getenv(\"CITY\")\n
db_
=
os.getenv(\"MONGO_DB\")\n
print(pipeline_param)\n
print(\"--\",pipeline_param[\"MONGO_URI\"])\n
#
collections\n
collection_
=
os.getenv(\"MONGO_COLLECTION\")\n
mongo_uri_
=
pipeline_param['
'
MONGO_URI'
'
]\n
print(\"mongo_uri\",mongo_uri_)\n
project_id_
=
pipeline_param['
'
PROJECT_ID'
'
]\n
query_filter_
=
pipeline_param['
'
QUERY_FILTER'
'
]\n
try:\n
class
=
os.getenv(\"CITY\")\n
db_
=
os.getenv(\"MONGO_DB\")\n
print(pipeline_param)\n
print(\"--\",
pipeline_param[\"MONGO_URI\"])\n
#
collections\n
collection_
=
os.getenv(\"MONGO_COLLECTION\")\n
mongo_uri_
=
pipeline_param['
'
MONGO_URI'
'
]\n
print(\"mongo_uri\",
mongo_uri_)\n
project_id_
=
pipeline_param['
'
PROJECT_ID'
'
]\n
query_filter_
=
pipeline_param['
'
QUERY_FILTER'
'
]\n
try:\n\n
class
CommonConstants:\n
bgrimm_string_level_tags
=
'
'
bgrimm_string_level_tags'
'
\n
panel_id
=
'
'
panel_id'
'
\n
sub_id
=
'
'
sub_id'
'
\n
inv_id_mppt_id
=
'
'
inv_id_mppt_id'
'
\n
tags_property_raw
=
'
'
raw'
'
\n
tags_property_predicted
=
'
'
predicted'
'
\n
tags_property_efficiency
=
'
'
efficiency'
'
\n
bgrim_tags_property_efficiency
=
'
'
Efficiency'
'
\n
tags_property_efficiency_inv
=
'
'
efficiency'
'
\n
tags_property_efficiency_plant
=
'
'
efficiency_plant'
'
\n
mppt_coefficients
=
'
'
mppt_coefficients'
'
\n\n
class
MongoConstants:\n
#
DB\n
db
=
db_\n
#
collections\n
collection
=
collection_\n\n
class
Mongo:\n
mongo_uri
=
mongo_uri_\n
project_id
=
project_id_\n
query_filter
=
query_filter_\n\n
class
...
...
@@ -195,31 +205,32 @@ spec:
filter_dict
is
None:\n
filter_dict
=
{\"_id\":
0}\n
db
=
self.client[self.database]\n
collection
=
db[self.collection]\n
response
=
collection.find_one(query,
filter_dict)\n
return
response\n
except
Exception
as
e:\n
logger.exception(f'
'
Exception
-
{e}'
'
)\n\n
mongo_conn
Exception
as
e:\n
logger.exception(f'
'
Exception
-
{e}'
'
)\n\n
tracemalloc.clear_traces()\n
mongo_conn
=
MongoConnect(uri=Mongo.mongo_uri,
database=MongoConstants.db,\n
collection=MongoConstants.collection)\n
if
mongo_conn
is
None:\n
logger.info(f'
'
mongodb
is
not
connected,
please
check'
'
)\n
else:\n
logger.info(f'
'
mongodb
is
connected,
mongo
conn
-
{mongo_conn}'
'
)\n\n
df_raw_tags
=
pd.DataFrame.from_dict(mongo_conn.find_one({\"$and\":
[{\"id\":
\"bgrimm_string_level_tags\"},\n
{\"city\":
city},\n
{\"tags_property\":
\"raw\"}]})\n
['
'
input_data'
'
],
please
check'
'
)\n
else:\n
tracemalloc.clear_traces()\n
logger.info(f'
'
mongo
conn
-
{mongo_conn}'
'
)\n\n
df_raw_tags
=
pd.DataFrame.from_dict(mongo_conn.find_one({\"$and\":
[\n
{\"id\":
CommonConstants.bgrimm_string_level_tags},
{\"city\":
city},\n
{\"tags_property\":
CommonConstants.tags_property_raw}]})['
'
input_data'
'
],
orient='
'
index'
'
)\n\n
df_predicted_tags
=
pd.DataFrame.from_dict(mongo_conn.find_one({\"$and\":
[{\"id\":
\"bgrimm_string_level_tags\"},\n
{\"city\":
city},\n
{\"tags_property\":
\"predicted\"}]})\n
['
'
input_data'
'
],
[\n
{\"id\":
CommonConstants.bgrimm_string_level_tags},
{\"city\":
city},\n
{\"tags_property\":
CommonConstants.tags_property_predicted}]})['
'
input_data'
'
],
orient='
'
index'
'
)\n\n
df_efficiency_tags
=
pd.DataFrame.from_dict(mongo_conn.find_one({\"$and\":
[\n
{\"id\":
CommonConstants.bgrimm_string_level_tags},
{\"city\":
city},\n
{\"tags_property\":
CommonConstants.tags_property_efficiency}]})['
'
input_data'
'
],
orient='
'
index'
'
)\n\n
df_raw_tags.reset_index(inplace=True)\n
df_raw_tags.rename(columns={'
'
index'
'
:
'
'
tag_name'
'
},
inplace=True)\n
df_predicted_tags.reset_index(inplace=True)\n
df_predicted_tags.rename(columns={'
'
index'
'
:
'
'
tag_name'
'
},
inplace=True)\n
\n
try:\n
#
df_coefficients
=
pd.DataFrame.from_dict(\n
#
mongo_conn.find_one(
{\"$and\":
[{\"id\":
\"bgrimm_string_level_tags\"},\n
#
{\"ci
ty\":
city},\n
#
{\"tags_property\":\n
#
\"mppt_coefficients\"}]})\n
#
['
'
input_data'
'
],
orient='
'
index'
'
)\n
df_coefficients
=
pd.DataFrame()\n
except
Exception
as
er:\n
logger.exception(f\"Coefficient
dataframe
unavailable
with
message:
{er}\")\n
df_coefficients
=
pd.DataFrame()\n
\n
del
mongo_conn\n
\n
df_coefficients.reset_index(inplace=True)\n
df_coefficients.rename(columns={'
'
index'
'
:
'
'
inv_id_mppt_id'
'
},
inplace=True)\n\n
tracemalloc.clear_traces()\n
tracemalloc.get_traced_memory()\n
final_dict
=
{\"raw\":
df_raw_tags.to_dict('
'
records'
'
),
\"predicted\":
df_predicted_tags.to_dict('
'
records'
'
),\n
\"coefficients
\":
df_
coefficient
s.to_dict('
'
records'
'
)}\n
print(final_dict)\n
return
'
'
tag_name'
'
},
inplace=True)\n
df_efficiency_tags.reset_index(inplace=True)\n
df_efficiency_tags.rename(columns={'
'
index'
'
:
'
'
tag_name'
'
},
inplace=True)\n\n
#
df_coefficients
=
pd.DataFrame.from_dict(mongo_conn.find_one(\n
#
{\"$and\":
[{\"id\":
CommonConstants.bgrimm_string_level_tags},
{\"city\":
city},\n
#
{\"tags_proper
ty\":
CommonConstants.mppt_coefficients}]})['
'
input_data'
'
],
orient='
'
index'
'
)\n\n
#
df_coefficients.reset_index(inplace=True)\n
#
df_coefficients.rename(columns={'
'
index'
'
:
CommonConstants.inv_id_mppt_id},
inplace=True)\n\n
df_coefficients
=
pd.DataFrame()\n
tracemalloc.clear_traces()
\n
del
mongo_conn\n
final_dict
=
{\"raw\":
df_raw_tags.to_dict('
'
records'
'
),
\"predicted\":
df_predicted_tags.to_dict('
'
records'
'
),\n
\"coefficients\":
df_coefficients.to_dict('
'
records'
'
),\n
\"efficiency
\":
df_
efficiency_tag
s.to_dict('
'
records'
'
)}\n
print(final_dict)\n
return
final_dict\n
except
Exception
as
e:\n
logger.exception(f'
'
Exception
-
{e}'
'
)\n\ndef
_serialize_json(obj)
->
str:\n
if
isinstance(obj,
str):\n
return
obj\n
import
json\n\n
def
default_serializer(obj):\n
if
hasattr(obj,
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment