Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
A
app_install
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
CI / CD Analytics
Repository Analytics
Value Stream Analytics
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
tarun.madamanchi
app_install
Commits
def51ec8
Commit
def51ec8
authored
Jan 23, 2024
by
tarun2512
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
fourth commit
parent
49a0eb81
Changes
4
Show whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
312 additions
and
7 deletions
+312
-7
.env
.env
+3
-5
app.py
app.py
+1
-1
app_clone_script_copy.py
app_clone_script_copy.py
+0
-1
app_clone_script_new.py
app_clone_script_new.py
+308
-0
No files found.
.env
View file @
def51ec8
MONGO_URI=mongodb://@192.168.0.217:30904/?authMechanism=DEFAULT&directConnection=true
BASE_URI=https://qa.unifytwin.com/
LOGIN_TOKEN=dd350397643246daa19ac63c0060d430
MONGO_URI=mongodb://admin:iLens%23@192.168.0.217:30904/?authMechanism=DEFAULT&directConnection=true
USER_ID=user_099
PROJECT_ID=project_
287
PROJECT_ID=project_
306
APP_NAME=FT Energy Manager
PREFIX=project_
287
PREFIX=project_
306
BASE_PATH=/code/data
MOUNT_DIR=app_zip
\ No newline at end of file
app.py
View file @
def51ec8
from
dotenv
import
load_dotenv
import
os
load_dotenv
()
from
app_clone_script_
copy
import
extract_app_zip
from
app_clone_script_
new
import
extract_app_zip
from
image_move_script
import
get_image_names
,
move_images
USER_ID
=
os
.
environ
.
get
(
"USER_ID"
)
...
...
app_clone_script_copy.py
View file @
def51ec8
...
...
@@ -139,7 +139,6 @@ def fetch_hierarchy_details(project_id, node_id):
json
=
payload
,
cookies
=
cookie
,
headers
=
cookie
,
)
if
response
.
status_code
==
200
:
result
=
response
.
json
()
...
...
app_clone_script_new.py
0 → 100644
View file @
def51ec8
import
logging
from
pymongo
import
MongoClient
from
dotenv
import
load_dotenv
import
time
import
os
load_dotenv
()
import
sys
import
shutil
from
app_clone_script_helper
import
CloneScriptHelper
from
image_move_script
import
move_images
,
get_image_names
root_path
=
os
.
getcwd
()
print
(
f
'ROOT PATH ---> {root_path}'
)
sys
.
path
.
append
(
root_path
)
MONGO_URI
=
os
.
environ
.
get
(
"MONGO_URI"
)
USER_ID
=
os
.
environ
.
get
(
"USER_ID"
)
PROJECT_ID
=
os
.
environ
.
get
(
"PROJECT_ID"
)
APP_NAME
=
os
.
environ
.
get
(
"APP_NAME"
)
PREFIX
=
os
.
environ
.
get
(
"PREFIX"
)
BASE_PATH
=
os
.
environ
.
get
(
"BASE_PATH"
)
MOUNT_DIR
=
os
.
environ
.
get
(
"MOUNT_DIR"
)
client
=
MongoClient
(
MONGO_URI
)
logging
.
basicConfig
(
level
=
logging
.
DEBUG
,
# Set the minimum logging level
format
=
'
%(asctime)
s -
%(name)
s -
%(levelname)
s -
%(message)
s'
)
folder1_path
=
os
.
path
.
join
(
BASE_PATH
,
MOUNT_DIR
,
APP_NAME
,
"app_image"
)
folder2_path
=
os
.
path
.
join
(
BASE_PATH
,
MOUNT_DIR
,
APP_NAME
,
"app_logo"
)
destination_folder
=
os
.
path
.
join
(
BASE_PATH
,
"metadata/images/apps"
)
class
Database
:
def
__init__
(
self
,
prefix
=
None
):
self
.
assistant
=
client
[
f
"{prefix}__ilens_assistant"
]
if
prefix
else
client
[
"ilens_assistant"
]
self
.
configuration
=
client
[
f
"{prefix}__ilens_configuration"
]
if
prefix
else
client
[
"ilens_configuration"
]
self
.
widget
=
client
[
f
"{prefix}__ilens_widget"
]
if
prefix
else
client
[
"ilens_widget"
]
class
Connection
(
Database
):
def
__init__
(
self
,
prefix
=
None
):
super
()
.
__init__
(
prefix
)
self
.
dashboard
=
self
.
widget
[
"dashboard"
]
self
.
customer_apps
=
self
.
configuration
[
"customer_apps"
]
self
.
lookup_table
=
self
.
configuration
[
"lookup_table"
]
self
.
category_apps
=
self
.
configuration
[
"category_apps"
]
self
.
dashboard_category
=
self
.
widget
[
"category"
]
self
.
widget_col
=
self
.
widget
[
"widget"
]
self
.
user_conn
=
self
.
configuration
[
"user"
]
self
.
hierarchy_details
=
self
.
configuration
[
"hierarchy_details"
]
self
.
unique_id
=
self
.
configuration
[
"unique_id"
]
def
get_next_id
(
param
,
prefix
=
None
):
connection
=
Connection
(
prefix
=
prefix
)
next_id_doc
=
connection
.
unique_id
.
find_one
({
"key"
:
param
})
if
not
next_id_doc
:
insert_dict
=
{
"key"
:
param
,
"id"
:
100
}
connection
.
unique_id
.
insert_one
(
insert_dict
)
return
insert_dict
[
"id"
]
else
:
query
=
{
"key"
:
param
}
count_value
=
int
(
next_id_doc
[
"id"
])
+
1
new_values
=
{
"id"
:
count_value
}
connection
.
unique_id
.
update_one
(
query
,
{
"$set"
:
new_values
})
return
int
(
new_values
[
"id"
])
def
push_category
(
folder_path
,
folder_name
,
project_id
,
prefix
=
None
):
connection
=
Connection
(
prefix
=
prefix
)
file_path
=
os
.
path
.
join
(
folder_path
,
f
"{folder_name}.json"
)
category_data
=
CloneScriptHelper
()
.
read_json_file
(
file_path
)
for
data
in
category_data
:
data
[
"project_id"
]
=
project_id
query
=
{
"dashboard_category_id"
:
data
.
get
(
"dashboard_category_id"
)}
connection
.
dashboard_category
.
update_one
(
query
,
{
"$set"
:
data
},
upsert
=
True
)
def
push_app_category
(
folder_path
,
folder_name
,
user_details
,
project_id
,
prefix
=
None
):
connection
=
Connection
(
prefix
=
prefix
)
file_path
=
os
.
path
.
join
(
folder_path
,
f
"{folder_name}.json"
)
app_category_data
=
CloneScriptHelper
()
.
read_json_file
(
file_path
)
zip_app_category_data
=
{}
for
app_category
in
app_category_data
:
zip_app_category_data
[
app_category
.
get
(
'name'
)]
=
app_category
.
get
(
'app_category_id'
)
existed_app_category_data
=
list
(
connection
.
category_apps
.
find
(
{
"name"
:
{
"$in"
:
list
(
zip_app_category_data
.
keys
())},
"project_id"
:
project_id
},
{
"_id"
:
0
}
)
)
existed_app_category_mapping_data
=
{}
for
existed_app_category
in
existed_app_category_data
:
existed_app_category_mapping_data
[
existed_app_category
.
get
(
'name'
)]
=
existed_app_category
app_category_mapping
=
{}
for
app_category
in
app_category_data
:
old_app_category_id
=
app_category
.
get
(
"app_category_id"
)
if
app_category
.
get
(
'name'
)
in
existed_app_category_mapping_data
:
app_category_data
=
existed_app_category_mapping_data
.
get
(
app_category
.
get
(
"name"
))
app_category_data
[
"project_id"
]
=
project_id
app_category_data
[
'created_at'
]
=
time
.
time
()
app_category_data
[
'created_by'
]
=
user_details
.
get
(
'user_id'
,
''
)
query
=
{
"app_category_id"
:
app_category_data
.
get
(
"app_category_id"
)}
connection
.
category_apps
.
update_one
(
query
,
{
"$set"
:
app_category_data
},
upsert
=
True
)
app_category_mapping
[
old_app_category_id
]
=
app_category_data
.
get
(
"app_category_id"
)
continue
app_category
[
"project_id"
]
=
project_id
app_category
[
'created_at'
]
=
time
.
time
()
app_category
[
'created_by'
]
=
user_details
.
get
(
'user_id'
,
''
)
app_category
[
"app_category_id"
]
=
"app_category_"
+
str
(
get_next_id
(
"app_category"
,
prefix
))
query
=
{
"name"
:
app_category
.
get
(
"name"
)}
connection
.
category_apps
.
update_one
(
query
,
{
"$set"
:
app_category
},
upsert
=
True
)
app_category_mapping
[
old_app_category_id
]
=
app_category
.
get
(
"app_category_id"
)
return
app_category_mapping
def
push_lookups
(
folder_path
,
folder_name
,
prefix
=
None
):
connection
=
Connection
(
prefix
=
prefix
)
file_path
=
os
.
path
.
join
(
folder_path
,
f
"{folder_name}.json"
)
category_data
=
CloneScriptHelper
()
.
read_json_file
(
file_path
)
for
data
in
category_data
:
query
=
{
"lookup_id"
:
data
.
get
(
"lookup_id"
)}
connection
.
lookup_table
.
update_one
(
query
,
{
"$set"
:
data
},
upsert
=
True
)
def
get_children_level_mapping
(
project_id
:
str
,
accessible_node_id
=
None
):
match_query
=
{
"$or"
:
[{
"node_id"
:
{
"$in"
:
accessible_node_id
}}],
}
if
accessible_node_id
is
True
:
match_query
=
{
"project_id"
:
project_id
,
}
return
[
{
"$match"
:
match_query
},
{
"$project"
:
{
"_id"
:
0
,
"name"
:
1
,
"node_id"
:
1
}},
{
"$group"
:
{
"_id"
:
None
,
"mapping"
:
{
"$push"
:
{
"k"
:
"$node_id"
,
"v"
:
"$name"
}}}},
{
"$project"
:
{
"_id"
:
0
,
"mapping"
:
{
"$arrayToObject"
:
"$mapping"
}}},
]
def
form_hierarchy_parent
(
project_id
,
accessible_node_id
,
hierarchy_data
,
prefix
=
None
):
connection
=
Connection
(
prefix
=
prefix
)
if
not
accessible_node_id
:
return
_aggregate
=
get_children_level_mapping
(
project_id
=
project_id
,
accessible_node_id
=
accessible_node_id
)
_mapping
=
list
(
connection
.
hierarchy_details
.
aggregate
(
_aggregate
))
mapping
=
_mapping
[
0
]
.
get
(
"mapping"
,
{})
if
_mapping
else
{}
if
not
hierarchy_data
.
get
(
"path"
):
hierarchy_data
.
update
({
"full_path"
:
hierarchy_data
[
"node_id"
],
"full_name"
:
hierarchy_data
[
"name"
]})
else
:
path
=
hierarchy_data
[
"path"
]
path
.
append
(
hierarchy_data
[
"node_id"
])
parent_name
=
{
_id
:
mapping
.
get
(
_id
)
for
_id
in
path
}
if
None
in
list
(
parent_name
.
values
()):
return
hierarchy_data
.
update
({
"full_path"
:
"$"
.
join
(
path
),
"full_name"
:
">"
.
join
(
list
(
parent_name
.
values
()))})
def
fetch_hierarchy_details
(
project_id
,
node_id
,
prefix
=
None
):
connection
=
Connection
(
prefix
=
prefix
)
parent_hierarchy_details
=
connection
.
hierarchy_details
.
find_one
({
"project_id"
:
project_id
,
"node_id"
:
node_id
},
{
"_id"
:
0
})
form_hierarchy_parent
(
project_id
,
parent_hierarchy_details
.
get
(
"path"
),
parent_hierarchy_details
,
prefix
)
full_path
=
parent_hierarchy_details
.
get
(
"full_path"
,
parent_hierarchy_details
.
get
(
"node_id"
,
""
))
return
{
"id"
:
full_path
,
"name"
:
parent_hierarchy_details
.
get
(
"full_name"
,
parent_hierarchy_details
.
get
(
"name"
,
""
)),
"node_id"
:
parent_hierarchy_details
.
get
(
"node_id"
,
" "
),
"site_id"
:
full_path
.
split
(
"$"
)[
0
],
"type"
:
parent_hierarchy_details
.
get
(
"type"
,
" "
),
"lat"
:
parent_hierarchy_details
.
get
(
"info"
)
.
get
(
"latitude"
,
" "
),
"long"
:
parent_hierarchy_details
.
get
(
"info"
)
.
get
(
"longitude"
,
" "
),
"label"
:
parent_hierarchy_details
.
get
(
"full_name"
,
parent_hierarchy_details
.
get
(
"name"
)),
"value"
:
full_path
,
"disabled"
:
False
,
"checked"
:
True
,
}
def
manipulate_widget_data
(
widget_data
,
project_id
,
hierarchy_list
):
widget_data
[
"project_id"
]
=
project_id
if
"filterList"
in
widget_data
[
"widget_data"
][
"cData"
][
"chartOptions"
][
"filter"
]
and
"filtersData"
in
widget_data
[
"widget_data"
][
"cData"
][
"chartOptions"
][
"filter"
]:
for
filter_data
in
widget_data
[
"widget_data"
][
"cData"
][
"chartOptions"
][
"filter"
][
"filterList"
]:
filter_data
[
'value'
]
=
hierarchy_list
for
filter_data
in
widget_data
[
"widget_data"
][
"cData"
][
"chartOptions"
][
"filter"
][
"filtersData"
]:
filter_data
[
'value'
]
=
hierarchy_list
def
push_widget
(
folder_path
,
folder_name
,
project_id
,
prefix
=
None
):
connection
=
Connection
(
prefix
=
prefix
)
file_path
=
os
.
path
.
join
(
folder_path
,
f
"{folder_name}.json"
)
widget_data
=
CloneScriptHelper
()
.
read_json_file
(
file_path
)
hierarchy_details
=
connection
.
hierarchy_details
.
find
({},
{
"_id"
:
0
})
hierarchy_list
=
[]
for
hierarchy
in
hierarchy_details
:
hierarchy_data
=
fetch_hierarchy_details
(
project_id
,
hierarchy
.
get
(
"node_id"
),
prefix
)
hierarchy_list
.
append
(
hierarchy_data
)
for
data
in
widget_data
:
manipulate_widget_data
(
data
,
project_id
,
hierarchy_list
)
query
=
{
"widget_id"
:
data
.
get
(
"widget_id"
)}
connection
.
widget_col
.
update_one
(
query
,
{
"$set"
:
data
},
upsert
=
True
)
def
manipulate_dashboard_data
(
dashboard_data
,
user_details
,
project_id
):
dashboard_data
[
"user_details"
]
=
{
"username"
:
user_details
.
get
(
"username"
),
"email"
:
user_details
.
get
(
"email"
),
"name"
:
user_details
.
get
(
"name"
),
"user_id"
:
user_details
.
get
(
"user_id"
),
"access_group_ids"
:
[]}
dashboard_data
[
"owner"
]
=
user_details
.
get
(
"user_id"
)
dashboard_data
[
"project_id"
]
=
project_id
dashboard_data
[
"shared_group"
]
=
[]
dashboard_data
[
"shared_user"
]
=
[]
dashboard_data
[
"sharing_info"
][
"users"
]
=
[]
dashboard_data
[
"sharing_info"
][
"userGroups"
]
=
[]
def
push_dashboard
(
folder_path
,
folder_name
,
user_details
,
project_id
,
prefix
=
None
):
dashboard_image_push
(
folder_path
)
connection
=
Connection
(
prefix
=
prefix
)
file_path
=
os
.
path
.
join
(
folder_path
,
f
"{folder_name}.json"
)
dashboard_data
=
CloneScriptHelper
()
.
read_json_file
(
file_path
)
for
data
in
dashboard_data
:
manipulate_dashboard_data
(
data
,
user_details
,
project_id
)
query
=
{
"dashboard_id"
:
data
.
get
(
"dashboard_id"
)}
connection
.
dashboard
.
update_one
(
query
,
{
"$set"
:
data
},
upsert
=
True
)
def
dashboard_image_push
(
folder_path
):
image_list
=
CloneScriptHelper
()
.
list_images_in_folder
(
folder_path
)
if
not
image_list
:
return
destination_fold
=
os
.
path
.
join
(
BASE_PATH
,
"/visualisation/snapshots"
)
for
image
in
image_list
:
image_path
=
os
.
path
.
join
(
folder_path
,
image
)
image_name
=
os
.
path
.
basename
(
image_path
)
destination_path
=
os
.
path
.
join
(
destination_fold
,
image_name
)
shutil
.
move
(
image_path
,
destination_path
)
def
manipulate_app_data
(
each_children_obj
,
required_type
,
user_id
,
project_id
):
if
not
each_children_obj
.
get
(
"children"
,
[]):
view_item
=
each_children_obj
.
get
(
"view_item"
,
{})
if
view_item
:
each_children_obj
[
'view_item'
][
'project_id'
]
=
project_id
each_children_obj
[
'view_item'
][
'owner'
]
=
user_id
hierarchy_stepper
=
each_children_obj
.
get
(
"hierarchyStepper"
,
[])
if
hierarchy_stepper
:
for
each_hierarchy_stepper
in
hierarchy_stepper
:
manipulate_app_data
(
each_hierarchy_stepper
,
required_type
,
user_id
,
project_id
)
else
:
for
each_child
in
each_children_obj
.
get
(
"children"
,
[]):
manipulate_app_data
(
each_child
,
required_type
,
user_id
,
project_id
)
def
push_app
(
folder_path
,
collection_data
,
user_id
,
user_details
,
project_id
,
prefix
=
None
):
connection
=
Connection
(
prefix
=
prefix
)
app_file_path
=
os
.
path
.
join
(
folder_path
,
collection_data
.
get
(
"customer_apps"
),
f
"{collection_data.get('customer_apps')}.json"
)
app_data
=
CloneScriptHelper
()
.
read_json_file
(
app_file_path
)
category_file_path
=
os
.
path
.
join
(
folder_path
,
collection_data
.
get
(
"category_apps"
))
new_category_data
=
push_app_category
(
category_file_path
,
collection_data
.
get
(
"category_apps"
),
user_details
,
project_id
,
prefix
)
dashboard_file_path
=
os
.
path
.
join
(
folder_path
,
collection_data
.
get
(
"dashboard"
))
push_dashboard
(
dashboard_file_path
,
collection_data
.
get
(
"dashboard"
),
user_details
,
project_id
,
prefix
)
category_file_path
=
os
.
path
.
join
(
folder_path
,
collection_data
.
get
(
"category"
))
push_category
(
category_file_path
,
collection_data
.
get
(
"category"
),
project_id
,
prefix
)
widget_file_path
=
os
.
path
.
join
(
folder_path
,
collection_data
.
get
(
"widget"
))
push_widget
(
widget_file_path
,
collection_data
.
get
(
"widget"
),
project_id
,
prefix
)
for
data
in
app_data
:
data
[
"access_list"
][
"users"
]
.
append
(
user_id
)
data
[
"project_id"
]
=
project_id
data
[
'meta'
][
'created_by'
]
=
user_id
data
[
"meta"
][
'last_updated_by'
]
=
user_id
manipulate_app_data
(
data
,
"dashboard_id"
,
user_id
,
project_id
)
data
[
'app_category_id'
]
=
new_category_data
.
get
(
data
.
get
(
'app_category_id'
))
query
=
{
"app_id"
:
data
.
get
(
"app_id"
)}
connection
.
customer_apps
.
update_one
(
query
,
{
"$set"
:
data
},
upsert
=
True
)
print
(
"apps got pushed"
)
def
push_data
(
folder_path
,
collection_data
,
user_id
,
project_id
,
prefix
=
None
):
connection
=
Connection
()
user_details
=
connection
.
user_conn
.
find_one
({
"user_id"
:
user_id
},
{
"_id"
:
0
})
push_app
(
folder_path
,
collection_data
,
user_id
,
user_details
,
project_id
,
prefix
)
image1_path
=
get_image_names
(
folder1_path
)
image2_path
=
get_image_names
(
folder2_path
)
if
image1_path
and
image2_path
:
# Call the function with the list of image paths and the destination folder
move_images
([
f
"{folder1_path}/{image1_path[0]}"
,
f
"{folder2_path}/{image2_path[0]}"
],
destination_folder
)
def
extract_app_zip
(
app_name
,
user_id
,
project_id
,
prefix
=
None
):
zip_path
=
os
.
path
.
join
(
BASE_PATH
,
MOUNT_DIR
,
f
"{app_name}.zip"
)
folder_path
=
os
.
path
.
join
(
BASE_PATH
,
MOUNT_DIR
,
app_name
)
CloneScriptHelper
()
.
unzip_file
(
zip_path
,
folder_path
)
folders_list
=
CloneScriptHelper
()
.
list_folders
(
folder_path
)
folders_list
.
remove
(
"app_image"
)
folders_list
.
remove
(
"app_logo"
)
collection_names
=
{}
for
folder
in
folders_list
:
collection_names
[
folder
.
split
(
"."
)[
1
]]
=
folder
push_data
(
folder_path
,
collection_names
,
user_id
,
project_id
,
prefix
)
if
__name__
==
"__main__"
:
extract_app_zip
(
APP_NAME
,
USER_ID
,
PROJECT_ID
,
PREFIX
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment