Commit 7088b4d6 authored by tarun2512's avatar tarun2512

first commit

parent b13c29cd
MONGO_URI=mongodb://admin:iLens%23QAv513@192.168.0.217:30904/?authMechanism=DEFAULT&directConnection=true
BASE_URI=https://qa.unifytwin.com/
LOGIN_TOKEN=e4b4059ab01b488abdbc865f8d4cb543
USER_ID=fpWx7Aw4NJzzXWxcE6w5dU
PROJECT_ID=project_287
APP_NAME=FT Energy Manager
PREFIX=project_287
ZIP_NAME=FT Energy Manager.zip
BASE_PATH=/code/data
MOUNT_DIR=app_zip
\ No newline at end of file
import logging
from pymongo import MongoClient
import time
import shutil
from dotenv import load_dotenv
import os
import json
load_dotenv()
import sys
import zipfile
import httpx
from app_clone_script_helper import CloneScriptHelper
root_path = os.getcwd()
print(f'ROOT PATH ---> {root_path}')
sys.path.append(root_path)
MONGO_URI = os.environ.get("MONGO_URI")
BASE_URI = os.environ.get("BASE_URI")
LOGIN_TOKEN = os.environ.get("LOGIN_TOKEN")
USER_ID = os.environ.get("USER_ID")
PROJECT_ID = os.environ.get("PROJECT_ID")
APP_NAME = os.environ.get("APP_NAME")
PREFIX = os.environ.get("PREFIX")
ZIP_NAME = os.environ.get("ZIP_NAME")
BASE_PATH = os.environ.get("BASE_PATH")
MOUNT_DIR = os.environ.get("MOUNT_DIR")
client = MongoClient(MONGO_URI)
logging.basicConfig(level=logging.DEBUG, # Set the minimum logging level
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
dashboard_image_uri = f"{BASE_URI}visual4.0/dashboard/snap_dashboard"
hierarchy_service_url = f"{BASE_URI}/hry/hry/fetch_accessible_data"
cookie = {"login-token": LOGIN_TOKEN, "userId": USER_ID, "projectId": PROJECT_ID}
class Database:
def __init__(self, prefix=None):
self.assistant = client[f"{prefix}__ilens_assistant"] if prefix else client["ilens_assistant"]
self.configuration = client[f"{prefix}__ilens_configuration"] if prefix else client["ilens_configuration"]
self.widget = client[f"{prefix}__ilens_widget"] if prefix else client["ilens_widget"]
class Connection(Database):
def __init__(self, prefix=None):
super().__init__(prefix)
self.dashboard = self.widget["dashboard"]
self.customer_apps = self.configuration["customer_apps"]
self.lookup_table = self.configuration["lookup_table"]
self.category_apps = self.configuration["category_apps"]
self.dashboard_category = self.widget["category"]
self.widget_col = self.widget["widget"]
self.user_conn = self.configuration["user"]
self.hierarchy_details = self.configuration["hierarchy_details"]
def push_category(folder_path, folder_name, project_id, prefix=None):
connection = Connection(prefix=prefix)
file_path = os.path.join(folder_path, f"{folder_name}.json")
category_data = CloneScriptHelper().read_json_file(file_path)
for data in category_data:
data["project_id"] = project_id
query = {"dashboard_category_id": data.get("dashboard_category_id")}
connection.dashboard_category.update_one(query, {"$set": data}, upsert=True)
def push_app_category(folder_path, folder_name, project_id, prefix=None):
connection = Connection(prefix=prefix)
file_path = os.path.join(folder_path, f"{folder_name}.json")
category_data = CloneScriptHelper().read_json_file(file_path)
for data in category_data:
data['project_id'] = project_id
query = {"app_category_id": data.get("app_category_id")}
connection.category_apps.update_one(query, {"$set": data}, upsert=True)
def push_lookups(folder_path, folder_name, prefix=None):
connection = Connection(prefix=prefix)
file_path = os.path.join(folder_path, f"{folder_name}.json")
category_data = CloneScriptHelper().read_json_file(file_path)
for data in category_data:
query = {"lookup_id": data.get("lookup_id")}
connection.lookup_table.update_one(query, {"$set": data}, upsert=True)
def fetch_hierarchy_details(project_id, node_id):
payload = {"project_id": project_id, "node_id": node_id, "fetch_path_names": True, "fetch_child": False}
with httpx.Client() as client:
response = client.post(
url=hierarchy_service_url,
timeout=20,
json=payload,
cookies=cookie,
headers=cookie,
)
if response.status_code == 200:
result = response.json()
if not len(result["data"]):
return {}
data = result["data"][0]
full_path = data.get("full_path", " ")
return {
"id": full_path,
"name": data.get("full_name", " "),
"node_id": data.get("node_id", " "),
"site_id": full_path.split("$")[0],
"type": data.get("type", " "),
"lat": data.get("info").get("latitude", " "),
"long": data.get("info").get("longitude", " "),
"label": data.get("full_name", " "),
"value": full_path,
"disabled": False,
"checked": True,
}
def manipulate_widget_data(widget_data, project_id, hierarchy_list):
widget_data["project_id"] = project_id
if "filterList" in widget_data["widget_data"]["cData"]["chartOptions"]["filter"] and "filtersData" in widget_data["widget_data"]["cData"]["chartOptions"]["filter"]:
widget_data["widget_data"]["cData"]["chartOptions"]["filter"]["filterList"] = hierarchy_list
widget_data["widget_data"]["cData"]["chartOptions"]["filter"]["filtersData"] = hierarchy_list
def push_widget(folder_path, folder_name, project_id, prefix=None):
connection = Connection(prefix=prefix)
file_path = os.path.join(folder_path, f"{folder_name}.json")
widget_data = CloneScriptHelper().read_json_file(file_path)
hierarchy_details = connection.hierarchy_details.find({}, {"_id": 0})
hierarchy_list = []
for hierarchy in hierarchy_details:
hierarchy_data = fetch_hierarchy_details(project_id, hierarchy.get("node_id"))
hierarchy_list.append(hierarchy_data)
for data in widget_data:
manipulate_widget_data(data, project_id, hierarchy_list)
query = {"widget_id": data.get("widget_id")}
connection.widget_col.update_one(query, {"$set": data}, upsert=True)
def manipulate_dashboard_data(dashboard_data, user_details, project_id):
dashboard_data["user_details"] = {
"username": user_details.get("username"),
"email": user_details.get("email"),
"name": user_details.get("name"),
"user_id": user_details.get("user_id"),
"access_group_ids": []}
dashboard_data["owner"] = user_details.get("user_id")
dashboard_data["project_id"] = project_id
dashboard_data["shared_group"] = []
dashboard_data["shared_user"] = []
dashboard_data["sharing_info"]["users"] = []
dashboard_data["sharing_info"]["userGroups"] = []
def push_dashboard(folder_path, folder_name, user_details, project_id, prefix=None):
dashboard_image_push(folder_path, prefix)
connection = Connection(prefix=prefix)
file_path = os.path.join(folder_path, f"{folder_name}.json")
dashboard_data = CloneScriptHelper().read_json_file(file_path)
for data in dashboard_data:
manipulate_dashboard_data(data, user_details, project_id)
query = {"dashboard_id": data.get("dashboard_id")}
connection.dashboard.update_one(query, {"$set": data}, upsert=True)
def dashboard_image_push(folder_path, prefix=None):
image_list = CloneScriptHelper().list_images_in_folder(folder_path)
if not image_list:
return
for image in image_list:
image_path = os.path.join(folder_path, image)
with open(image_path, 'rb') as image_file:
dashboard_id = image.split(".")[0]
form_data = {'image': (image_file.name, image_file, 'image'), "project_id": prefix, "dashboard_id": dashboard_id}
data = {"dashboard_id": dashboard_id, "project_id": prefix}
request = httpx.Request('POST', dashboard_image_uri, files=form_data, headers=cookie, cookies=cookie, data=data)
with httpx.Client() as client:
response = client.send(request)
if response.status_code == 200:
print("Image successfully sent.")
else:
print(f"Failed to send image. Status code: {response.status_code}")
def manipulate_app_data(each_children_obj, required_type, user_id, project_id):
if not each_children_obj.get("children", []):
view_item = each_children_obj.get("view_item", {})
if view_item:
each_children_obj['view_item']['project_id'] = project_id
each_children_obj['view_item']['owner'] = user_id
hierarchy_stepper = each_children_obj.get("hierarchyStepper", [])
if hierarchy_stepper:
for each_hierarchy_stepper in hierarchy_stepper:
manipulate_app_data(each_hierarchy_stepper, required_type, user_id, project_id)
else:
for each_child in each_children_obj.get("children", []):
manipulate_app_data(each_child, required_type, user_id, project_id)
def push_app(folder_path, folder_name, user_id, project_id, prefix=None):
connection = Connection(prefix=prefix)
file_path = os.path.join(folder_path, f"{folder_name}.json")
app_data = CloneScriptHelper().read_json_file(file_path)
for data in app_data:
data["access_list"]["users"].append(user_id)
data["project_id"] = project_id
data['meta']['created_by'] = user_id
data["meta"]['last_updated_by'] = user_id
manipulate_app_data(data, "dashboard_id", user_id, project_id)
query = {"dashboard_category_id": data.get("dashboard_category_id")}
connection.customer_apps.update_one(query, {"$set": data}, upsert=True)
def push_data(app_name, folder_name, user_id, project_id, prefix=None):
folder_split_data = folder_name.split(".")
connection = Connection()
user_details = connection.user_conn.find_one({"user_id": user_id}, {"_id": 0})
collection_name = folder_split_data[1]
folder_path = os.path.join(app_name, folder_name)
if collection_name == "category":
push_category(folder_path, folder_name, project_id, prefix)
elif collection_name == "widget":
push_widget(folder_path, folder_name, project_id, prefix)
elif collection_name == "dashboard":
push_dashboard(folder_path, folder_name, user_details, project_id, prefix)
elif collection_name == "customer_apps":
push_app(folder_path, folder_name, user_id, project_id, prefix)
elif collection_name == "category_apps":
push_app_category(folder_path, folder_name, project_id, prefix)
elif collection_name == "lookup_table":
push_lookups(folder_path, folder_name, prefix)
def extract_app_zip(app_zip_name, app_name, user_id, project_id, prefix=None):
zip_path = os.path.join(BASE_PATH, MOUNT_DIR, f"{app_name}.zip")
CloneScriptHelper().unzip_file(zip_path, zip_path)
folders_list = CloneScriptHelper().list_folders(app_name)
for folder in folders_list:
if folder in ["app_image", "app_logo"]:
continue
push_data(app_name, folder, user_id, project_id, prefix)
if __name__ == "__main__":
extract_app_zip(APP_NAME, USER_ID, PROJECT_ID, PREFIX)
import os
import zipfile
import json
from PIL import Image
class CloneScriptHelper:
def __int__(self):
pass
@staticmethod
def list_folders(path):
# Get a list of all items (files and directories) in the specified path
items = os.listdir(path)
# Filter out only directories
folders = [item for item in items if os.path.isdir(os.path.join(path, item))]
return folders
@staticmethod
def unzip_file(zip_filename, extract_path):
with zipfile.ZipFile(zip_filename, 'r') as zip_ref:
zip_ref.extractall(extract_path)
print(f"File {zip_filename} successfully extracted to {extract_path}")
@staticmethod
def read_json_file(file_path):
with open(file_path, 'r') as file:
data = json.load(file)
return data
@staticmethod
def is_image(file_path):
try:
with Image.open(file_path):
return True
except (IOError, OSError):
return False
def list_images_in_folder(self, folder_path):
all_files = os.listdir(folder_path)
image_files = [file for file in all_files if self.is_image(os.path.join(folder_path, file))]
return image_files
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment