Commit aae4a93f authored by tarun2512's avatar tarun2512

Your commit message here

parents
DESTINATION_MONGO_URI=mongodb://ilens:ilens4321@192.168.0.220:31589/?authSource=admin&directConnection=true
#MONGO_URI=mongodb://admin:iLens%23QAv513@192.168.0.217:30904/?authMechanism=DEFAULT&directConnection=true
SOURCE_MONGO_URI=mongodb://admin:UtAdm%23Mong771385@192.168.0.207:8098/?authMechanism=DEFAULT&directConnection=true
DESTINATION_REDIS_URI=redis://admin:iLensDevRedis@192.168.0.220:32642
#REDIS_URI=redis://admin:iLensQARedis@192.168.0.217:30910
SOURCE_REDIS_URI=redis://admin:iLensProdRedis@192.168.0.207:8213
SPACE_ID=space_127
PROJECT_ID=project_102
-----BEGIN RSA PRIVATE KEY-----
MIICWwIBAAKBgQClilTaeHq6Zc+kWHCNl1O0btGRm7ct3O5zqWx1mwwLUWH14eft
Hi5wIbOYh79JQ9BO2OA4UjPq31uwmJ96Okl0OULfENhwd/D7P3mnoRlktPT2t+tt
RRrKvx3wNpOy/3nBsXnNt8EKxyA7k9vbqLbv9pGw2hcqOYe/NGTkmm1PswIDAQAB
AoGAZPARR1l5NBkKYGKQ1rU0E+wSmx+AtVVmjF39RUSyNmB8Q+poebwSgsr58IKt
T6Yq6Tjyl0UAZTGmferCK0xJJrqyP0hMn4nNNut+acWMKyt+9YrA2FO+r5Jb9JuT
SK35xXnM4aZLGppgWJxRzctpIz+qkf6oLRSZme0AuiqcwYECQQDY+QDL3wbWplRW
bze0DsZRMkDAkNY5OCydvjte4SR/mmAzsrpNrS5NztWbaaQrefoPbsdYBPbd8rS7
C/s/0L1zAkEAw1EC5zt2STuhkcKLa/tL+bk8WHHHtf19aC9kBj1TvWBFh+JojWCo
86iK5fLcHzhyQx5Qi3E9LG2HvOWhS1iUwQJAKbEHHyWW2c4SLJ2oVXf1UYrXeGkc
UNhjclgobl3StpZCYAy60cwyNo9E6l0NR7FjhG2j7lzd1t4ZLkvqFmQU0wJATLPe
yQIwBLh3Te+xoxlQD+Tvzuf3/v9qpWSfClhBL4jEJYYDeynvj6iry3whd91J+hPI
m8o/tNfay5L+UcGawQJAAtbqQc7qidFq+KQYLnv5gPRYlX/vNM+sWstUAqvWdMze
JYUoTHKgiXnSZ4mizI6/ovsBOMJTb6o1OJCKQtYylw==
-----END RSA PRIVATE KEY-----
-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQClilTaeHq6Zc+kWHCNl1O0btGR
m7ct3O5zqWx1mwwLUWH14eftHi5wIbOYh79JQ9BO2OA4UjPq31uwmJ96Okl0OULf
ENhwd/D7P3mnoRlktPT2t+ttRRrKvx3wNpOy/3nBsXnNt8EKxyA7k9vbqLbv9pGw
2hcqOYe/NGTkmm1PswIDAQAB
-----END PUBLIC KEY-----
-----BEGIN RSA PRIVATE KEY-----
MIICWwIBAAKBgQClilTaeHq6Zc+kWHCNl1O0btGRm7ct3O5zqWx1mwwLUWH14eft
Hi5wIbOYh79JQ9BO2OA4UjPq31uwmJ96Okl0OULfENhwd/D7P3mnoRlktPT2t+tt
RRrKvx3wNpOy/3nBsXnNt8EKxyA7k9vbqLbv9pGw2hcqOYe/NGTkmm1PswIDAQAB
AoGAZPARR1l5NBkKYGKQ1rU0E+wSmx+AtVVmjF39RUSyNmB8Q+poebwSgsr58IKt
T6Yq6Tjyl0UAZTGmferCK0xJJrqyP0hMn4nNNut+acWMKyt+9YrA2FO+r5Jb9JuT
SK35xXnM4aZLGppgWJxRzctpIz+qkf6oLRSZme0AuiqcwYECQQDY+QDL3wbWplRW
bze0DsZRMkDAkNY5OCydvjte4SR/mmAzsrpNrS5NztWbaaQrefoPbsdYBPbd8rS7
C/s/0L1zAkEAw1EC5zt2STuhkcKLa/tL+bk8WHHHtf19aC9kBj1TvWBFh+JojWCo
86iK5fLcHzhyQx5Qi3E9LG2HvOWhS1iUwQJAKbEHHyWW2c4SLJ2oVXf1UYrXeGkc
UNhjclgobl3StpZCYAy60cwyNo9E6l0NR7FjhG2j7lzd1t4ZLkvqFmQU0wJATLPe
yQIwBLh3Te+xoxlQD+Tvzuf3/v9qpWSfClhBL4jEJYYDeynvj6iry3whd91J+hPI
m8o/tNfay5L+UcGawQJAAtbqQc7qidFq+KQYLnv5gPRYlX/vNM+sWstUAqvWdMze
JYUoTHKgiXnSZ4mizI6/ovsBOMJTb6o1OJCKQtYylw==
-----END RSA PRIVATE KEY-----
-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQClilTaeHq6Zc+kWHCNl1O0btGR
m7ct3O5zqWx1mwwLUWH14eftHi5wIbOYh79JQ9BO2OA4UjPq31uwmJ96Okl0OULf
ENhwd/D7P3mnoRlktPT2t+ttRRrKvx3wNpOy/3nBsXnNt8EKxyA7k9vbqLbv9pGw
2hcqOYe/NGTkmm1PswIDAQAB
-----END PUBLIC KEY-----
from dotenv import load_dotenv
import os
from scripts.core.migrate_asset_models import MigrateAssetModel
load_dotenv()
SPACE_ID = os.environ.get("SPACE_ID")
PROJECT_ID = os.environ.get("PROJECT_ID")
if __name__ == "__main__":
MigrateAssetModel(space_id=SPACE_ID, project_id=PROJECT_ID).insert_asset_model_details()
bcrypt~=4.0.1
cryptography==43.0.1
pydantic~=2.7.3
python-dotenv==1.0.1
ut-mongo-util[stable,encryption]==1.1.1
ut-redis-connector[stable]==0.3.1
\ No newline at end of file
import os
import pathlib
import shutil
from typing import Optional, Any
from dotenv import load_dotenv
from pydantic.v1 import Field, root_validator, BaseSettings
load_dotenv()
class _LoggVariables(BaseSettings):
LOG_LEVEL: str = Field(default="DEBUG")
ENABLE_FILE_LOG: Optional[Any] = Field(default=False)
ENABLE_CONSOLE_LOG: Optional[Any] = Field(default=True)
LOGS_MODULE_PATH: Optional[pathlib.Path] = Field(default="/code/data/default_catalog_meta_logs")
class _Databases(BaseSettings):
SOURCE_MONGO_URI: Optional[str]
DESTINATION_MONGO_URI: Optional[str]
SOURCE_REDIS_URI: Optional[str]
DESTINATION_REDIS_URI: Optional[str]
REDIS_SPACE_DB: int = Field(default=18)
REDIS_USER_ROLE_DB: Optional[int] = Field(default=21)
class _Security(BaseSettings):
ENCRYPTION_CONSTANTS_FILE_PATH: str = "scripts/config/mongo_encryption_constants.json"
USER_ENCRYPTION: bool = Field(default=True)
class _KeyPath(BaseSettings):
KEYS_PATH: Optional[pathlib.Path] = Field(default="data/keys")
PUBLIC: Optional[pathlib.Path]
PRIVATE: Optional[pathlib.Path]
@root_validator(allow_reuse=True)
def assign_values(cls, values):
if not os.path.isfile(os.path.join(values.get("KEYS_PATH"), "public")) or not os.path.isfile(
os.path.join(values.get("KEYS_PATH"), "private")
):
if not os.path.exists(values.get("KEYS_PATH")):
os.makedirs(values.get("KEYS_PATH"))
shutil.copy(os.path.join("assets", "keys", "public"), os.path.join(values.get("KEYS_PATH"), "public"))
shutil.copy(os.path.join("assets", "keys", "private"), os.path.join(values.get("KEYS_PATH"), "private"))
values["PUBLIC"] = os.path.join(values.get("KEYS_PATH"), "public")
values["PRIVATE"] = os.path.join(values.get("KEYS_PATH"), "private")
return values
DBConf = _Databases()
LoggVariables = _LoggVariables()
Security = _Security()
KeyPath = _KeyPath()
__all__ = [
"DBConf",
"LoggVariables",
"Security",
"KeyPath"
]
\ No newline at end of file
from datetime import datetime
DEFAULT_USER_ROLES = [{
"space_id": "space_099",
"type": "edit",
"user_role_id": "user_role_096",
"access_levels": {
"userManagement": {
"access_level": True
},
"approvalInbox": {
"access_level": True
},
"artifacts": {
"access_level": True
},
},
"user_role_description": "Admin",
"user_role_name": "Admin",
"user_role_permissions": {
"userManagement": {
"key": "userManagement",
"name": "User Management",
"create": True,
"delete": True,
"edit": True,
"view": True,
"children": {
"users": {
"key": "users",
"name": "User",
"create": True,
"delete": True,
"edit": True,
"view": True
}
}
},
"approvalInbox": {
"key": "approvalInbox",
"name": "Approval Inbox",
"create": True,
"delete": True,
"edit": True,
"view": True
},
"artifacts": {
"key": "artifacts",
"name": "artifacts",
"create": True,
"delete": True,
"edit": True,
"view": True
}
},
"catalogPermission": True
},
{
"space_id": "space_099",
"type": "edit",
"user_role_id": "user_role_097",
"access_levels": {
"userManagement": {
"access_level": True
},
"approvalInbox": {
"access_level": True
},
"artifacts": {
"access_level": True
},
},
"user_role_description": "Approver",
"user_role_name": "Approver",
"user_role_permissions": {
"userManagement": {
"key": "userManagement",
"name": "User Management",
"create": False,
"delete": False,
"edit": False,
"view": False,
"children": {
"users": {
"key": "users",
"name": "User",
"create": False,
"delete": False,
"edit": False,
"view": False
}
}
},
"approvalInbox": {
"key": "approvalInbox",
"name": "Approval Inbox",
"create": True,
"delete": True,
"edit": True,
"view": True
},
"artifacts": {
"key": "artifacts",
"name": "artifacts",
"create": True,
"delete": True,
"edit": True,
"view": True
}
},
"catalogPermission": True
},
{
"space_id": "space_099",
"type": "edit",
"user_role_id": "user_role_098",
"access_levels": {
"userManagement": {
"access_level": True
},
"approvalInbox": {
"access_level": True
},
"artifacts": {
"access_level": True
},
},
"user_role_description": "Operator",
"user_role_name": "Operator",
"user_role_permissions": {
"userManagement": {
"key": "userManagement",
"name": "User Management",
"create": False,
"delete": False,
"edit": False,
"view": False,
"children": {
"users": {
"key": "users",
"name": "User",
"create": False,
"delete": False,
"edit": False,
"view": False
}
}
},
"approvalInbox": {
"key": "approvalInbox",
"name": "Approval Inbox",
"create": False,
"delete": False,
"edit": False,
"view": False
},
"artifacts": {
"key": "artifacts",
"name": "artifacts",
"create": True,
"delete": True,
"edit": True,
"view": True
}
},
"catalogPermission": True
}
]
DEFAULT_USER = {
"encryption_salt": {"dt_0": [], "dt_1": []},
"name": "CatalogAdmin",
"username": "CatalogAdmin",
"password": "",
"email": "support@rockwellautomation.com",
"user_type": "catalog_user",
"phonenumber": 9581388594,
"userrole": ["user_role_096"],
"user_id": "user_097",
"created_on": 1735796769,
"created_by": "user_097",
"product_encrypted": False,
"failed_attempts": 0,
"is_user_locked": False,
"last_failed_attempt": "2021-05-13 08:56:15",
"ilens_encrypted": False,
"passwordReset": None,
"tz": None,
"expires_on": "02/12/21 09:00 30 AM",
"disable_user": False,
"last_logged_in": 1735796769,
"last_failed_login": None,
"fixed_delay": 0,
"variable_delay": 0,
"space_id": "space_099",
"default_user": True,
}
DEFAULT_SPACE = {
"space_id": "space_099",
"userrole": ["user_role_096"],
"created_by": "user_097",
"updated_time": datetime.utcnow().isoformat() + "Z", # UTC in ISO-8601 format
"user_id": "user_097",
"updated_by": "user_097",
}
\ No newline at end of file
{
"encrypt_collection_dict" : {
"user": {
"encrypt_keys": ["phonenumber", "email"],
"exclude_encryption": []}
}
}
class DatabaseNames:
ilens_asset_model = "ilens_asset_model"
ilens_configuration = "ilens_configuration"
catalog = "catalog"
catalog_meta = "catalog_meta"
class CollectionNames:
tags = "tags"
tag_groups = "tag_groups"
tag_category = "tag_category"
units = "units"
unit_group = "unit_group"
process_conf = "process_conf"
materials = "materials"
asset_model_details = "asset_model_details"
asset_model_rule_engine = "asset_model_rule_engine"
industry_category = "industry_category"
asset_model = "asset_model"
asset_model_mapping = "asset_model_mapping"
artifact_meta = "artifact_meta"
unique_id = "unique_id"
class AggregationKeys:
match = "$match"
meta = "$meta"
unwind = "$unwind"
data = "$data"
date = "$date"
group = "$group"
push = "$push"
sum = "$sum"
exists = "$exists"
cond = "$cond"
value = "$value"
regex = "$regex"
remove = "$$REMOVE"
root = "$$ROOT"
tostring = "$toString"
ifnull = "$ifNull"
limit = "$limit"
site_id = "$site_id"
concat = "$concat"
count = "$count"
expr = "$expr"
eq = "$eq"
skip = "$skip"
agg_and = "$and"
replace_root = "$replaceRoot"
literal = "$literal"
sort = "$sort"
first = "$first"
options = "$options"
user_role_id = "$user_role_id"
user_role_name = "$user_role_name"
user_id = "$user_id"
username = "$username"
project = "$project"
project_id = "$project_id"
status = "$status"
addfields = "$addFields"
lookup = "$lookup"
split = "$split"
current_status = "$current_status"
meta_created_at = "$meta.created_at"
meta_created_by = "$meta.created_by"
arrayelemat = "$arrayElemAt"
system_tag_type = "$system_tag_type"
arraytoobject = "$arrayToObject"
user_username = "$user.username"
step_data = "$step_data"
meta_createdat = "meta.created_at"
fullpath = "$full_path"
name = "$name"
merge_objects = "$mergeObjects"
version_comments = "$version_comments"
subtract = "$subtract"
artifact_id = "$artifact_id"
KEY_RESOURCES = "$resources"
KEY_RESOURCE_RESOURCE_NAME_PLAIN = "resources.resource_name"
KEY_RESOURCE_RESOURCE_DESC_PLAIN = "resources.resource_description"
KEY_RESOURCE_RESOURCE_PATH = "$resources.resource_path"
KEY_RESOURCE_CATEGORY = "$resources.category"
KEY_RESOURCE_SUB_CATEGORY = "$resources.sub_category"
KEY_RESOURCE_RESOURCE_NAME = "$resources.resource_name"
KEY_RESOURCE_RESOURCE_ID = "$resources.resource_id"
KEY_RESOURCE_RESOURCE_TYPE = "$resources.resource_type"
KEY_ASSET_MODEL_ID = "$asset_model_id"
KEY_ASSET_VERSION = "$asset_version"
KEY_DROWN_SKETCH_BODY_PAGE_TYPE = "drown_sketch.body.page_type"
\ No newline at end of file
import datetime
import json
import os
import pytz
import shortuuid
from scripts.config.default_meta_catalog_constants import DEFAULT_USER_ROLES
from scripts.db.mongo import source_mongo_client, destination_mongo_client
from scripts.db.mongo.catalog.aggregations.asset_model_details import AssetDetailAggregate
from scripts.db.mongo.catalog.artifact_meta import ArtifactsMeta
from scripts.db.mongo.catalog.asset_model import AssetModelArtifacts
from scripts.db.mongo.catalog.asset_model_mapping import AssetModelMapping
from scripts.db.mongo.catalog.asset_model_rule_engine import AssetRuleEngineMeta
from scripts.db.mongo.ilens_asset_model.asset_model_details import AssetDetail
from scripts.db.mongo.ilens_asset_model.asset_model_rule_engine import AssetModelRuleEngine
from scripts.db.mongo.ilens_asset_model.industry_category import IndustryCategory
from scripts.db.mongo.ilens_configuration.materials import Materials
from scripts.db.mongo.ilens_configuration.process_conf import ProcessConf
from scripts.db.mongo.ilens_configuration.tag_category import TagCategory
from scripts.db.mongo.ilens_configuration.tag_group import TagGroups
from scripts.db.mongo.ilens_configuration.tags import Tags
from scripts.db.mongo.ilens_configuration.unit import Units
from scripts.db.mongo.ilens_configuration.unit_group import UnitGroups
from scripts.logging import logger
from scripts.utils.common_utils import CommonUtils
class MigrateAssetModel:
def __init__(self, project_id, space_id):
"""
The __init__ function is called when the class is instantiated.
It sets up the instance of the class, and defines all of its attributes.
:param self: Represent the instance of the class
:param : Pass the mongo client to the class
:return: The following:
"""
self.project_id = project_id
self.space_id = space_id
self.asset_detail_aggregate = AssetDetailAggregate()
self.common_utils = CommonUtils(space_id=self.space_id)
self.destination_asset_model_mapping_mongo = AssetModelMapping(mongo_client=destination_mongo_client, space_id=self.space_id)
self.destination_asset_model_rule_engine_mongo = AssetRuleEngineMeta(mongo_client=destination_mongo_client, space_id=self.space_id)
self.destination_artifact_meta = ArtifactsMeta(mongo_client=destination_mongo_client, space_id=self.space_id)
self.destination_asset_model = AssetModelArtifacts(mongo_client=destination_mongo_client, space_id=self.space_id)
self.source_process_conf = ProcessConf(mongo_client=source_mongo_client)
self.source_materials = Materials(mongo_client=source_mongo_client, project_id=self.project_id)
self.source_units = Units(mongo_client=source_mongo_client, project_id=self.project_id)
self.source_unit_group = UnitGroups(mongo_client=source_mongo_client, project_id=self.project_id)
self.source_tags = Tags(mongo_client=source_mongo_client, project_id=self.project_id)
self.source_tag_group = TagGroups(mongo_client=source_mongo_client, project_id=self.project_id)
self.source_tag_category = TagCategory(mongo_client=source_mongo_client, project_id=self.project_id)
self.source_asset_model_details = AssetDetail(mongo_client=source_mongo_client, project_id=self.project_id)
self.source_industry_category = IndustryCategory(mongo_client=source_mongo_client)
self.source_asset_model_rule_engine_conn = AssetModelRuleEngine(mongo_client=source_mongo_client, project_id=self.project_id)
def bind_unit_data(self, unit_id):
try:
if not unit_id:
return {}
unit_data = self.source_units.find_one({'id': unit_id})
unit_group_data = self.source_unit_group.find_one({'id': unit_data.get("unit_group_id")})
unit_data["unit_group"] = unit_group_data
return unit_data
except Exception as e:
logger.error(str(e))
def get_tags(self, source_tags_list):
try:
tags_list = list(self.source_tags.find({"tag_id": {"$in": source_tags_list}}))
destination_tag_data = []
for tag in tags_list:
tag_group_data = self.source_tag_group.find_one({"id": tag.get("tag_group_id", "")})
tag_category_data = self.source_tag_category.find_one({"tag_category_id": tag.get("tag_category_id", "")})
unit_data = self.bind_unit_data(tag.get("unit"))
tag["tag_group"] = tag_group_data
tag["tag_category"] = tag_category_data
tag["unit_data"] = unit_data
destination_tag_data.append(tag)
return destination_tag_data
except Exception as e:
logger.error(str(e))
def get_industry_category_details(self, industry_category_id):
try:
industry_category_details = self.source_industry_category.find_one({"industry_category_id": industry_category_id})
return industry_category_details
except Exception as e:
logger.error(str(e))
def get_material_data(self, asset_model_id, asset_version):
try:
asset_model_version = [asset_model_id + "$" + asset_version]
material_data = list(self.source_materials.find({"asset_models": {"$in": asset_model_version}}))
destination_material_data = []
for material in material_data:
unit_data = self.bind_unit_data(material.get("uom"))
material["unit_data"] = unit_data
destination_material_data.append(material)
return destination_material_data
except Exception as e:
logger.error(str(e))
def get_process_details(self, process_list):
try:
return list(self.source_process_conf.find({"process_id": {"$in": process_list}}))
except Exception as e:
logger.error(str(e))
@staticmethod
def generate_shortuuid_with_alpha_first() -> str:
# Generate a short UUID
short_uuid = shortuuid.uuid()
# Ensure the first character is alphabetic (a-z)
while not short_uuid[0].isalpha():
short_uuid = shortuuid.uuid()
return short_uuid
def generate_artifact_meta_json(self,
space_id, name, type_, source_details, source_id, artifact_id=None, image="", ver="1.0"
):
if not artifact_id:
artifact_id = self.generate_shortuuid_with_alpha_first()
if not image:
image = f"{artifact_id}.jpg"
data = {
"artifact_id": artifact_id,
"name": name,
"artifact_type": type_,
"ver": ver,
"image": image,
"status": "pending",
"meta": {
"published_by": source_details.get("published_by", ""),
"published_on": int(datetime.datetime.now(tz=pytz.timezone("UTC")).timestamp()),
},
"source_details": source_details,
"comments": "",
"space_id": space_id,
"source_id": source_id,
}
return data
def get_rule_data(self, catalog_rules, asset_model_mapping):
try:
space_rules = []
for rule in catalog_rules:
rule_data = self.source_asset_model_rule_engine_conn.find_one({"rule_engine_id": rule.get("rule_engine_id")})
rule_data["asset_model_id"] = asset_model_mapping.get("asset_model_id")
rule_data["asset_version"] = asset_model_mapping.get("asset_version")
rule_data["space_id"] = self.space_id
rule_data["artifact_id"] = asset_model_mapping.get("artifact_id")
rule_data["artifact_ver"] = asset_model_mapping.get("artifact_ver")
space_rules.append(rule_data)
# self.destination_asset_model_rule_engine_mongo.insert_many(data=space_rules)
except Exception as e:
logger.error(str(e))
def insert_asset_model_details(self):
try:
source_asset_model_details = list(self.source_asset_model_details.find({}, {"_id": 0}))
for asset_model in source_asset_model_details:
source_asset_model_id = asset_model.get("asset_model_id")
source_asset_version = asset_model.get("asset_version")
parameter_data = self.get_tags(asset_model.get("parameters", []))
industry_category_details = self.get_industry_category_details(asset_model.get("industry_category_id", ""))
material_data = self.get_material_data(source_asset_model_id, source_asset_version)
process_details = self.get_process_details(asset_model.get("processes", []))
artifact_ver = self.destination_artifact_meta.get_artifact_latest_version(
artifact_name=asset_model.get("asset_model_name"), artifact_type="asset_model"
)
artifact_meta_data = self.generate_artifact_meta_json(
space_id=self.space_id,
name=asset_model.get("asset_model_name"),
type_="asset_model",
source_details={},
source_id=source_asset_model_id,
artifact_id=None,
ver=artifact_ver,
)
asset_model_mapping = {
"asset_model_id": asset_model.get("asset_model_id"),
"asset_version": asset_model.get("asset_version"),
"artifact_id": artifact_meta_data.get("artifact_id"),
"artifact_ver": artifact_ver,
"space_id": self.space_id,
"parameter_details": parameter_data,
"industry_category_details": industry_category_details,
"materials": material_data,
"process_details": process_details
}
self.get_rule_data(asset_model.get("rules"), asset_model_mapping)
# self.destination_asset_model_mapping_mongo.insert_one(asset_model_mapping)
# self.destination_asset_model.insert_one(asset_model)
artifact_meta_data["source_details"]["industry"] = asset_model_mapping.get(
"industry_category_details", {}
).get("industry_category_name")
# self.destination_artifact_meta.insert_one(artifact_meta_data)
except Exception as e:
logger.error(str(e))
from scripts.config import DBConf
from scripts.utils.mongo_utils import MongoConnect
source_mongo_client = MongoConnect(uri=DBConf.SOURCE_MONGO_URI)()
destination_mongo_client = MongoConnect(uri=DBConf.DESTINATION_MONGO_URI)()
from scripts.constants.db_constants import AggregationKeys
class AssetDetailAggregate:
@staticmethod
def asset_model_version_list(project_id, asset_model_id):
return [
{
AggregationKeys.match: {
"project_id": project_id,
"asset_model_id": asset_model_id,
"asset_rule": {AggregationKeys.exists: False},
}
},
{AggregationKeys.project: {"_id": 0, "version_list": 1}},
{AggregationKeys.project: {"version_list": 1, "value": {"$size": "$version_list"}}},
{
AggregationKeys.group: {
"_id": None,
"value": {AggregationKeys.push: AggregationKeys.value},
"name": {AggregationKeys.push: "$version_list"},
}
},
{
AggregationKeys.project: {
"_id": 0,
"version_list": {
AggregationKeys.arrayelemat: [
AggregationKeys.name,
{"$indexOfArray": [AggregationKeys.value, {"$max": AggregationKeys.value}]},
]
},
}
},
]
@staticmethod
def fetch_resource_list(artifact_id, filters=None, filter_key=False):
query = [
{
AggregationKeys.match: {
"artifact_id": {"$in": artifact_id},
}
},
{AggregationKeys.unwind: AggregationKeys.KEY_RESOURCES},
{AggregationKeys.project: {"_id": 0, "asset_model_id": 1, "asset_version": 1, "resources": 1}},
{
AggregationKeys.project: {
"resource_path": AggregationKeys.KEY_RESOURCE_RESOURCE_PATH,
"category": AggregationKeys.KEY_RESOURCE_CATEGORY,
"sub_category": AggregationKeys.KEY_RESOURCE_SUB_CATEGORY,
"resource_name": AggregationKeys.KEY_RESOURCE_RESOURCE_NAME,
"resource_description": "$resources.resource_description",
"resource_id": AggregationKeys.KEY_RESOURCE_RESOURCE_ID,
"resource_type": AggregationKeys.KEY_RESOURCE_RESOURCE_TYPE,
"is_svg": {"$ifNull": ["$resources.is_svg", False]},
"resource_config": "$resources.resource_config",
"asset_model_id": AggregationKeys.KEY_ASSET_MODEL_ID,
"asset_version": AggregationKeys.KEY_ASSET_VERSION,
}
},
]
if filter_key:
if filters.get("search"):
search_query = {
"$or": [
{
AggregationKeys.KEY_RESOURCE_RESOURCE_NAME_PLAIN: {
AggregationKeys.regex: filters["search"],
AggregationKeys.options: "i",
}
},
{
AggregationKeys.KEY_RESOURCE_RESOURCE_DESC_PLAIN: {
AggregationKeys.regex: filters["search"],
AggregationKeys.options: "i",
}
},
]
}
else:
search_query = {
"$or": [
{
AggregationKeys.KEY_RESOURCE_RESOURCE_NAME_PLAIN: {
AggregationKeys.regex: "",
AggregationKeys.options: "i",
}
},
{
AggregationKeys.KEY_RESOURCE_RESOURCE_DESC_PLAIN: {
AggregationKeys.regex: "",
AggregationKeys.options: "i",
}
},
]
}
query.insert(3, {AggregationKeys.match: search_query})
if filters.get("sort_by"):
if filters["sort_by"] == "asc":
sort_value = 1
else:
sort_value = -1
sort_query = {AggregationKeys.KEY_RESOURCE_RESOURCE_NAME_PLAIN: sort_value}
query.insert(4, {AggregationKeys.sort: sort_query})
return query
from pydantic import BaseModel
from scripts.db.redis_connection import destination_space_db
from scripts.utils.mongo_utils import MongoCollectionBaseClass
from scripts.constants.db_constants import (
CollectionNames,
DatabaseNames,
)
from scripts.logging import logger
class ArtifactMetaSchema(BaseModel):
artifact_id: str
name: str
artifact_type: str
ver: float
image: str
status: str
meta: dict
source_details: dict
comments: str
space_id: str
source_id: str
class ArtifactsMeta(MongoCollectionBaseClass):
def __init__(self, mongo_client, space_id=None):
super().__init__(
mongo_client, database=DatabaseNames.catalog, collection=CollectionNames.artifact_meta, space_db=destination_space_db
)
self.space_id = space_id
@property
def key_space_id(self):
return "space_id"
def fetch_artifacts_count(self):
try:
results = self.aggregate(
[
{"$match": {"status": "approved"}},
{
"$group": {
"_id": "$artifact_type",
"count": {"$sum": 1},
}
},
{
"$project": {
"name": "$_id",
"value": "$count",
"_id": 0,
}
},
]
)
if not results:
return None
return list(results)
except Exception as e:
logger.error(f"Error occurred in fetching artifacts due to {str(e)}")
def get_artifact_meta_by_aggregate(self, query: list):
return list(self.aggregate(pipelines=query))
def fetch_artifact_by_id(self, artifact_id):
return self.find_one({"artifact_id": artifact_id}, filter_dict={"_id": 0})
def get_artifact_latest_version(self, artifact_name, artifact_type):
artifact_older_record = self.find(
query={
"name": artifact_name,
"artifact_type": artifact_type,
},
filter_dict={"_id": 0, "ver": 1},
sort={"ver": -1},
limit=1,
)
artifact_older_record = list(artifact_older_record)
if artifact_older_record:
return f"{int(float(artifact_older_record[0]['ver']))+ 1}.0"
else:
return "1.0"
from typing import Dict, List, Optional
from pydantic import BaseModel
from scripts.db.redis_connection import destination_space_db
from scripts.utils.mongo_utils import MongoCollectionBaseClass
from scripts.constants.db_constants import (
CollectionNames,
DatabaseNames,
)
class AssetDetailSchema(BaseModel):
"""
This is the Schema for the Mongo DB Collection.
All datastore and general responses will be following the schema.
"""
asset_model_id: Optional[str] = ""
allow_editing: Optional[bool] = True
asset_description: Optional[str] = ""
asset_version: Optional[str] = ""
asset_model_type: Optional[str] = ""
asset_model_icon: Optional[str] = ""
parameters: Optional[Dict] = {}
parameters_new: Optional[Dict] = {}
processes: Optional[list] = []
device_models: Optional[List] = []
events: Optional[List] = []
resources: Optional[Dict] = {}
others: Optional[Dict] = {}
class AssetModelArtifacts(MongoCollectionBaseClass):
def __init__(self, mongo_client, space_id=None):
super().__init__(
mongo_client, database=DatabaseNames.catalog, collection=CollectionNames.asset_model, space_db=destination_space_db
)
self.project_id = space_id
@property
def key_asset_model_id(self):
return "asset_model_id"
@property
def key_asset_version(self):
return "asset_version"
@property
def key_space_id(self):
return "space_id"
def get_highest_asset_model_version(self, asset_model_name, space_id):
query = [
{"$match": {"asset_model_name": asset_model_name, "space_id": space_id}},
{
"$group": {
"_id": "$asset_model_id",
"highestVersion": {"$max": "$asset_version"},
}
},
]
res = list(self.aggregate(query))
if res:
return res[0].get("highestVersion", "0.0"), res[0].get("_id", "")
else:
return "0.0", ""
def insert_one_asset_detail(self, data):
"""
The following function will insert one asset in the
asset_list collections
:param self:
:param data:
:return:
"""
insert_data = data
return self.insert_one(insert_data)
def aggregate_asset_detail(self, filter_list: List):
if filter_list:
return self.aggregate(pipelines=filter_list)
from typing import Dict, List, Optional
from pydantic import BaseModel
from scripts.db.redis_connection import destination_space_db
from scripts.utils.mongo_utils import MongoCollectionBaseClass
from scripts.constants.db_constants import (
CollectionNames,
DatabaseNames,
)
class AssetModelMappingSchema(BaseModel):
"""
This is the Schema for the Mongo DB Collection.
All datastore and general responses will be following the schema.
"""
asset_model_id: Optional[str] = ""
asset_version: Optional[str] = ""
artifact_id: Optional[str] = ""
parameter_details: Optional[List] = []
process_details: Optional[List] = []
industry_details: Optional[Dict] = {}
class AssetModelMapping(MongoCollectionBaseClass):
def __init__(self, mongo_client, space_id=None):
super().__init__(
mongo_client,
database=DatabaseNames.catalog,
collection=CollectionNames.asset_model_mapping,
space_db=destination_space_db
)
self.project_id = space_id,
@property
def key_asset_model_id(self):
return "asset_model_id"
@property
def key_artifact_id(self):
return "artifact_id"
@property
def key_asset_version(self):
return "asset_version"
@property
def key_space_id(self):
return "space_id"
def find_asset_detail_by_id(
self, space_id, artifact_id=None, asset_id=None, asset_version=None, asset_name=None, filter_dict=None
):
query = {}
query.update({self.key_space_id: space_id})
if asset_id:
query.update({self.key_asset_model_id: asset_id})
if artifact_id:
query.update({self.key_artifact_id: artifact_id})
if asset_version:
query.update({self.key_asset_version: asset_version})
if asset_name:
query.update({"asset_model_name": asset_name})
asset_list = self.find_one(query=query, filter_dict=filter_dict)
if asset_list:
return asset_list
return {}
def insert_one_asset_detail(self, data):
"""
The following function will insert one asset in the
asset_list collections
:param self:
:param data:
:return:
"""
insert_data = data
return self.insert_one(insert_data)
from typing import Any, Union
from scripts.db.redis_connection import destination_space_db
from scripts.utils.mongo_utils import MongoCollectionBaseClass
from scripts.constants.db_constants import (
CollectionNames,
DatabaseNames,
)
from scripts.logging import logger
class AssetRuleEngineMeta(MongoCollectionBaseClass):
def __init__(self, mongo_client, space_id=None):
super().__init__(
mongo_client,
database=DatabaseNames.catalog,
collection=CollectionNames.asset_model_rule_engine,
space_db=destination_space_db
)
self.project_id = space_id,
@property
def key_space_id(self):
return "space_id"
def fetch_rule_details(self, list_of_asset_model_ids):
try:
pipeline = [
{"$match": {"asset_model_id": {"$in": list_of_asset_model_ids}}},
{
"$project": {
"_id": 0,
"ruleName": "$basic_info.ruleName",
"deviceDescription": "$basic_info.deviceDescription",
"Selected_ruleType": "$basic_info.Selected_ruleType",
"disable_all": {
"$cond": {
"if": {"$eq": ["$basic_info.disable_all", "True"]},
"then": "Disabled",
"else": "Enabled",
}
},
}
},
]
result = list(self.aggregate(pipeline))
if not result:
result = []
return result
except Exception as e:
logger.error(f"Error occurred in the fetch rule details due to {str(e)}")
def find_by_space(
self, space_id: str, projections=None, sort=None, query_dict=None, limit=None, skip=0, **filters
) -> Union[Any, None]:
query = {self.key_space_id: space_id}
if query_dict:
query |= query_dict
if filters:
query.update(filters)
records = self.find(query, projections, sort=sort, limit=limit, skip=skip)
return list(records) if records else None
from typing import Dict, List, Optional
from pydantic import BaseModel
from scripts.constants.db_constants import DatabaseNames, CollectionNames
from scripts.db.redis_connection import source_space_db
from scripts.utils.mongo_utils import MongoCollectionBaseClass
class AssetDetailSchema(BaseModel):
"""
This is the Schema for the Mongo DB Collection.
All datastore and general responses will be following the schema.
"""
asset_model_id: Optional[str] = ""
allow_editing: Optional[bool] = True
asset_description: Optional[str] = ""
asset_version: Optional[str] = ""
asset_model_type: Optional[str] = ""
asset_model_icon: Optional[str] = ""
parameters: Optional[Dict] = {}
parameters_new: Optional[Dict] = {}
processes: Optional[list] = []
device_models: Optional[List] = []
events: Optional[List] = []
resources: Optional[Dict] = {}
others: Optional[Dict] = {}
class AssetDetail(MongoCollectionBaseClass):
def __init__(self, mongo_client, project_id=None):
super().__init__(
mongo_client, database=DatabaseNames.ilens_asset_model, collection=CollectionNames.asset_model_details, space_db=source_space_db
)
self.space_id = project_id
@property
def key_project_id(self):
return "project_id"
@property
def key_asset_model_id(self):
return "asset_model_id"
@property
def key_asset_version(self):
return "asset_version"
def find_asset_detail_by_param(self, **query):
asset_list = self.find(query)
return asset_list
def find_asset_detail_by_id(self, project_id, asset_id=None, asset_version=None, asset_name=None, filter_dict=None):
query = {}
query.update({self.key_project_id: project_id})
if asset_id:
query.update({self.key_asset_model_id: asset_id})
if asset_version:
query.update({self.key_asset_version: asset_version})
if asset_name:
query.update({"asset_model_name": asset_name})
asset_list = self.find_one(query=query, filter_dict=filter_dict)
if asset_list:
return asset_list
return {}
def find_assets(self, query):
all_assets = self.find(query=query)
if all_assets:
return list(all_assets)
return []
def get_highest_asset_model_version(self, asset_model_name, project_id):
query = [
{"$match": {"asset_model_name": asset_model_name, "project_id": project_id}},
{
"$group": {
"_id": "$asset_model_id",
"highestVersion": {"$max": "$asset_version"},
}
},
]
res = list(self.aggregate(query))
if res:
return res[0].get("highestVersion", "0.0"), res[0].get("_id", "")
else:
return "0.0", ""
def insert_one_asset_detail(self, data):
"""
The following function will insert one asset in the
asset_list collections
:param self:
:param data:
:return:
"""
insert_data = data
return self.insert_one(insert_data)
def delete_one_asset_detail(self, asset_id, asset_version):
query = {}
if bool(asset_id):
query.update({self.key_asset_model_id: asset_id})
if bool(asset_version):
query.update({self.key_asset_version: asset_version})
if query:
return self.delete_one(query)
else:
return False
def delete_one_asset_rule(self, project_id, asset_id, asset_version, rule_engine_id):
query = {
self.key_project_id: project_id,
self.key_asset_model_id: asset_id,
self.key_asset_version: asset_version,
"rules.rule_engine_id": rule_engine_id,
}
return self.delete_one(query)
def update_review_status(self, asset_model_id, asset_version, action, project_id, user_id):
query = {
self.key_asset_model_id: asset_model_id,
self.key_asset_version: asset_version,
self.key_project_id: project_id,
}
return self.update_one(query=query, data={"status": action, "action_user": user_id}, upsert=False)
def update_asset_detail(self, asset_id, data, asset_version=None, project_id=None, upsert=False):
query = {self.key_asset_model_id: asset_id}
if asset_version:
query.update({self.key_asset_version: asset_version})
if project_id:
query.update({self.key_project_id: project_id})
return self.update_one(query=query, data=data, upsert=upsert)
def update_many_asset_detail(self, asset_id, data, asset_version=None, project_id=None, upsert=False):
query = {self.key_asset_model_id: asset_id}
if asset_version:
query.update({self.key_asset_version: asset_version})
if project_id:
query.update({self.key_project_id: project_id})
return self.update_many(query=query, data=data, upsert=upsert)
def aggregate_asset_detail(self, filter_list: List):
if filter_list:
return self.aggregate(pipelines=filter_list)
def match_asset_des(self, obj_req):
query = [
{"$match": {"asset_description": {"$regex": obj_req, "$options": "i"}}},
{"$project": {"asset_description": 1, "asset_model_icon": 1, "_id": 0}},
]
return list(self.aggregate(pipelines=query))
from scripts.constants.db_constants import DatabaseNames, CollectionNames
from scripts.db.redis_connection import source_space_db
from scripts.utils.mongo_utils import MongoCollectionBaseClass
class AssetModelRuleEngine(MongoCollectionBaseClass):
def __init__(self, mongo_client, project_id=None):
super().__init__(
mongo_client, database=DatabaseNames.ilens_asset_model, collection=CollectionNames.asset_model_rule_engine, space_db=source_space_db
)
self.space_id = project_id
from typing import Optional
from pydantic import BaseModel
from scripts.constants.db_constants import DatabaseNames, CollectionNames
from scripts.db.redis_connection import source_space_db
from scripts.utils.mongo_utils import MongoCollectionBaseClass
class IndustryCategorySchema(BaseModel):
"""
This is the Schema for the Mongo DB Collection.
All datastore and general responses will be following the schema.
"""
industry_category_name: str
description: Optional[str] = ""
industry_category_id: Optional[str] = ""
upload_icon: Optional[str] = ""
is_deleted: Optional[str] = ""
class IndustryCategory(MongoCollectionBaseClass):
def __init__(self, mongo_client):
super().__init__(
mongo_client, database=DatabaseNames.ilens_asset_model, collection=CollectionNames.industry_category, space_db=source_space_db
)
def find_one_industry_category(self, filter_dict=None, **query) -> IndustryCategorySchema:
"""
The following function will give one industry_category for a given set of
search parameters as keyword arguments
:param filter_dict:
:param query:
:return:
"""
industry = self.find_one(filter_dict=filter_dict, query=query)
if industry:
return IndustryCategorySchema(**industry)
else:
return industry
def insert_one_industry_category(self, data):
"""
The following function will insert one industry_category in the
industry_category collections
:param data:
:return:
"""
return self.insert_one(data)
def update_one_industry_category(self, data, query):
"""
The following function will update one industry_category in
industry_category collection based on the given query
:param data:
:param upsert:
:param query:
:return:
"""
return self.update_one(data=data, upsert=False, query=query)
def update_many_industry_category(self, data, query):
"""
The following function will update many industry_category in
industry_category collection based on the given query
:param data:
:param upsert:
:param query:
:return:
"""
return self.update_many(data=data, upsert=False, query=query)
def find_all_industry_category(self, sort=None, skip=0, limit=None, **query):
"""
The following function will give all industry_category for the given set of
search parameters as keyword arguments
:param sort:
:param skip:
:param limit:
:param query:
:return:
"""
filter_dict = {
"industry_category_name": 1,
"industry_category_id": 1,
"description": 1,
"is_deleted": 1,
"upload_icon": 1,
}
response = self.find(filter_dict=filter_dict, sort=sort, skip=skip, limit=limit, query=query)
if not response:
return []
return list(response)
def find_industry_list(self):
response = self.find({"is_deleted": False})
if not response:
return []
return list(response)
def fetch_all_industry_category(self, query):
"""
The following function will give all industry_category for the given set of
search parameters as keyword arguments
:param query:
:return:
"""
response = self.find(query=query)
return list(response) if response else []
from scripts.db.redis_connection import source_space_db
from scripts.utils.mongo_utils import MongoCollectionBaseClass
from scripts.constants.db_constants import DatabaseNames, CollectionNames
class Materials(MongoCollectionBaseClass):
def __init__(self, mongo_client, project_id=None):
super().__init__(mongo_client, database=DatabaseNames.ilens_configuration, collection=CollectionNames.materials, space_db=source_space_db)
self.space_id = project_id
from scripts.constants.db_constants import CollectionNames, DatabaseNames
from scripts.db.redis_connection import source_space_db
from scripts.utils.mongo_utils import MongoCollectionBaseClass
class ProcessConf(MongoCollectionBaseClass):
def __init__(self, mongo_client, project_id=None):
super().__init__(
mongo_client, database=DatabaseNames.ilens_configuration, collection=CollectionNames.process_conf, space_db=source_space_db
)
self.space_id = project_id
def find_all_processes(self, query=None):
"""
The following function will give all tags for the given set of
search parameters as keyword arguments
:param self:
:param query:
:return:
"""
all_process = self.find(query=query)
if not all_process:
return []
return list(all_process)
def find_by_process_id(self, process_id):
"""
The following function will give one process for a given set of
search parameters as keyword arguments
:param process_id:
:return:
"""
one_process = self.find_one(query={"process_id": process_id})
return one_process
def find_by_project_id_process_name(self, process_name, project_id):
"""
The following function will give one process for a given set of
search parameters as keyword arguments
:param process_id:
:return:
"""
one_process = self.find_one(query={"process_name": process_name, "project_id": project_id})
return one_process
def find_by_project_id(self, project_id):
"""
The following function will give one process for a given set of
search parameters as keyword arguments
:param project_id:
:return:
"""
query = {"customer_project_id": project_id}
many_process = self.find(query=query)
if not bool(many_process):
return []
return list(many_process)
def insert_one_process(self, data):
"""
The following function will insert one tag in the
tags collections
:param self:
:param data:
:return:
"""
return self.insert_one(data)
def insert_many_process(self, data):
"""
The following function will insert many tags in the
tags collection
:param self:
:param data:
:return:
"""
return self.insert_many(data)
def update_one_process(self, process_id, data, upsert=False):
"""
The following function will update one tag in
tags collection based on the given query
"""
query_dict = {"process_id": process_id}
return self.update_one(data=data, query=query_dict, upsert=upsert)
def delete_many_process(self, query):
"""
The following function will delete many tag in
tags collection based on the given query
:param self:
:param query:
:return:
"""
if bool(query):
response = self.delete_many(query=query)
return response
else:
return False
def delete_one_process(self, process_id):
"""
The following function will delete one tag in
tags collection based on the given query
:param process_id:
:return:
"""
if process_id:
return self.delete_one(query={"process_id": process_id})
else:
return False
def find_process_by_aggregate(self, query):
process = self.aggregate(query)
if not process:
return []
return list(process)
from scripts.constants.db_constants import CollectionNames, DatabaseNames
from scripts.db.redis_connection import source_space_db
from scripts.utils.mongo_utils import MongoCollectionBaseClass
class TagCategory(MongoCollectionBaseClass):
def __init__(self, mongo_client, project_id=None):
super().__init__(mongo_client, database=DatabaseNames.ilens_configuration, collection=CollectionNames.tag_category, space_db=source_space_db)
self.space_id = project_id
from scripts.constants.db_constants import CollectionNames, DatabaseNames
from scripts.db.redis_connection import source_space_db
from scripts.utils.mongo_utils import MongoCollectionBaseClass
class TagGroups(MongoCollectionBaseClass):
def __init__(self, mongo_client, project_id=None):
super().__init__(mongo_client, database=DatabaseNames.ilens_configuration, collection=CollectionNames.tag_groups, space_db=source_space_db)
self.space_id = project_id
from scripts.constants.db_constants import CollectionNames, DatabaseNames
from scripts.db.redis_connection import source_space_db
from scripts.utils.mongo_utils import MongoCollectionBaseClass
class Tags(MongoCollectionBaseClass):
def __init__(self, mongo_client, project_id=None):
super().__init__(mongo_client, database=DatabaseNames.ilens_configuration, collection=CollectionNames.tags, space_db=source_space_db)
self.space_id = project_id
@property
def key_tag_id(self):
return "id"
@property
def key_tag_name(self):
return "tag_name"
def find_name_by_id(self, tag_id: str):
query = {self.key_tag_id: tag_id}
filter_dict = {self.key_tag_name: 1, "_id": 0}
record = self.find_one(query, filter_dict)
if not record:
return None
return record[self.key_tag_name]
def find_all_tags(self, sort=None, skip=0, limit=None, **query):
"""
The following function will give all tags for the given set of
search parameters as keyword arguments
:param sort:
:param skip:
:param limit:
:param query:
:return:
"""
filter_dict = {"id": 1, "tag_name": 1, "tag_category_name": 1, "tag_category_id": 1, "description": 1, "_id": 0}
response = self.find(filter_dict=filter_dict, sort=sort, skip=skip, limit=limit, query=query)
if not response:
return []
return list(response)
def find_tags(self, query):
all_tags = self.find(query=query)
if all_tags:
return list(all_tags)
return []
def find_tags_by_aggregate(self, query):
tags = self.aggregate(query)
if not tags:
return []
return list(tags)
def insert_one_tag(self, data):
"""
The following function will insert one tag in the
tags collections
:param data:
:return:
"""
return self.insert_one(data)
def insert_many_tags(self, data):
"""
The following function will insert many tags in the
tags collection
:param data:
:return:
"""
return self.insert_many(data)
from scripts.db.redis_connection import source_space_db
from scripts.utils.mongo_utils import MongoCollectionBaseClass
from scripts.constants.db_constants import DatabaseNames, CollectionNames
class Units(MongoCollectionBaseClass):
def __init__(self, mongo_client, project_id=None):
super().__init__(mongo_client, database=DatabaseNames.ilens_configuration, collection=CollectionNames.units, space_db=source_space_db)
self.space_id = project_id
from scripts.db.redis_connection import source_space_db
from scripts.utils.mongo_utils import MongoCollectionBaseClass
from scripts.constants.db_constants import DatabaseNames, CollectionNames
class UnitGroups(MongoCollectionBaseClass):
def __init__(self, mongo_client, project_id=None):
super().__init__(mongo_client, database=DatabaseNames.ilens_configuration, collection=CollectionNames.unit_group, space_db=source_space_db)
self.space_id = project_id
from ut_redis_connector import RedisConnector
from scripts.config import DBConf
source_connector = RedisConnector(DBConf.SOURCE_REDIS_URI)
destination_connector = RedisConnector(DBConf.DESTINATION_REDIS_URI)
destination_space_db = destination_connector.connect(db=int(DBConf.REDIS_SPACE_DB), decode_responses=True)
source_space_db = source_connector.connect(db=int(DBConf.REDIS_SPACE_DB), decode_responses=True)
import re
from typing import Any, Dict, List, Optional, Union
from pydantic import BaseModel
from ut_mongo_util import CollectionBaseClass
from scripts.utils.decryption_util import MongoDataEncryption
from scripts.utils.mongo_utils import MongoCollectionBaseClass as UtilsMongoCollection
class UserCollectionKeys:
KEY_LANGUAGE = "language"
KEY_NAME = "name"
KEY_USER_ID = "user_id"
KEY_SPACE_ID = "space_id"
KEY_USERNAME = "username"
KEY_USER_ROLE = "userrole"
KEY_EMAIL = "email"
class UserSchema(BaseModel):
name: Optional[str] = ""
space_id: Optional[str] = ""
username: Optional[str] = ""
password: Optional[str] = ""
email: Optional[Any] = None
phonenumber: Optional[Any] = None
userrole: Optional[List[str]] = None
user_type: Optional[str] = ""
user_id: Optional[str] = ""
created_by: Optional[str] = ""
encryption_salt: Optional[Dict] = {}
passwordReset: Optional[Dict] = {}
failed_attempts: Optional[int] = 0
is_user_locked: Optional[bool] = False
last_failed_login: Optional[int] = 0
last_logged_in: Optional[int] = 0
last_failed_attempt: Optional[str] = ""
expires_on: Optional[str] = ""
disable_user: Optional[bool] = False
default_user: Optional[bool] = False
created_on: Optional[int] = 0
updated_by: Optional[str] = ""
updated_on: Optional[int] = 0
secret: Optional[str] = ""
password_added_on: Optional[int] = 0
default_space: Optional[str] = ""
fixed_delay: Optional[int] = 0
variable_delay: Optional[int] = 0
class User(CollectionBaseClass):
def __init__(self, mongo_client, space_id=None):
super().__init__(mongo_client, database="catalog_meta", collection="user")
self.space_id = space_id
self.key_user_id = UserCollectionKeys.KEY_USER_ID
self.key_space_id = UserCollectionKeys.KEY_SPACE_ID
self.key_username = UserCollectionKeys.KEY_USERNAME
self.key_email = UserCollectionKeys.KEY_EMAIL
self.find_decrypted = UtilsMongoCollection.find_decrypted.__get__(self, UtilsMongoCollection)
self.get_decrypted_records = UtilsMongoCollection.get_decrypted_records.__get__(self, UtilsMongoCollection)
self.data_encryption = MongoDataEncryption()
def update_user(self, query, data):
"""
The following function will update target details in rule_targets collections
:param self:
:param data:
:return:
"""
return self.update_one(query=query, data=data, upsert=True)
def insert_one_user(self, data):
"""
The following function will insert one user in the
user collections
:param self:
:param data:
:return:
"""
data = self.data_encryption.encrypt_data(data, collection_name="user")
return self.insert_one(data)
def find_user(self, space_id, user_id=None, username=None, email=None, filter_dict=None):
query = {}
if user_id:
query[self.key_user_id] = user_id
if username:
query[self.key_username] = username
if email:
query[self.key_email] = re.compile(email, re.IGNORECASE)
query[self.key_email] = email
user = self.find_decrypted(query=query, filter_dict=filter_dict)
if user:
return UserSchema(**user)
return user
@staticmethod
def get_users_list(space_id=None):
query_json = [
{
"$group": {
"_id": None,
"data": {"$push": {"k": {"$ifNull": ["$user_id", ""]}, "v": {"$ifNull": ["$username", ""]}}},
}
},
{"$replaceRoot": {"newRoot": {"$arrayToObject": "$data"}}},
]
if space_id:
query_json.insert(0, {"$match": {"space_id": space_id}})
return query_json
def users_list_by_aggregate(self, query: list):
return self.aggregate(pipelines=query)
def find_user_by_space_id(self, user_id, space_id):
user = self.find_one(query={self.key_user_id: user_id, self.key_space_id: space_id})
if user:
return dict(user)
return user
def get_all_users(self, filter_dict=None, sort=None, skip=0, limit=None, **query):
users = self.find(filter_dict=filter_dict, sort=sort, skip=skip, limit=limit, query=query)
if users:
return list(users)
return []
def find_user_role_for_user_id(self, user_id, space_id):
query = {"user_id": user_id, "space_id": space_id}
filter_dict = {"userrole": 1, "_id": 0}
return self.find_one(query=query, filter_dict=filter_dict)
def find_base_user(self, space_id=None, user_id=None, username=None, email=None, filter_dict=None):
query = {}
if space_id:
query[self.key_space_id] = space_id
if user_id:
query[self.key_user_id] = user_id
if username:
query[self.key_username] = username
if email:
query[self.key_email] = re.compile(email, re.IGNORECASE)
if not (user := self.find_decrypted(query=query, filter_dict=filter_dict)):
return user
try:
return UserSchema(**user)
except Exception:
return user
def find_by_space(
self,
projections=None,
sort=None,
query_dict=None,
limit=None,
skip=0,
**filters,
) -> Union[Any, None]:
query = {}
if query_dict:
query |= query_dict
if filters:
query.update(filters)
records = self.find(query, projections, sort=sort, limit=limit, skip=skip)
if records:
records = self.get_decrypted_records(records)
return list(records) if records else []
def delete_one_user(self, user_id, space_id):
return self.delete_one(query={self.key_user_id: user_id, self.key_space_id: space_id})
def update_one_user(self, query, data):
"""
The following function will insert one user in the
user collections
:param self:
:param data:
:return:
"""
data = self.data_encryption.encrypt_data(data, collection_name="user")
return self.update_one(query=query, data=data, upsert=True)
def get_data_by_aggregate(self, query_json: list):
if aggregate_data := list(self.aggregate(query_json)):
aggregate_data = self.get_decrypted_records(aggregate_data)
return aggregate_data
return []
def find_by_aggregate(self, query_json: list):
if user_by_aggregate := list(self.aggregate(query_json)):
return user_by_aggregate
return []
def distinct_user(self, query_key, filter_json):
query = {self.key_user_id: filter_json}
return self.distinct(query_key=query_key, filter_json=query)
def find_user_by_param(self, **query):
user = self.get_decrypted_records(self.find(query))
if not bool(user):
user = []
return user
from typing import Optional
from ut_mongo_util import CollectionBaseClass
class UserCollectionKeys:
KEY_LANGUAGE = "language"
KEY_NAME = "name"
KEY_USER_ID = "user_id"
KEY_SPACE_ID = "space_id"
KEY_USERNAME = "username"
KEY_USER_ROLE = "user_role_name"
KEY_EMAIL = "email"
class UserRole(CollectionBaseClass):
def __init__(self, mongo_client, space_id=None):
super().__init__(mongo_client, database="catalog_meta", collection="user_role")
self.space_id = space_id
self.key_user_id = UserCollectionKeys.KEY_USER_ID
self.key_space_id = UserCollectionKeys.KEY_SPACE_ID
def update_user_role(self, query, data):
"""
The following function will update target details in rule_targets collections
:param self:
:param data:
:return:
"""
return self.update_one(query=query, data=data, upsert=True)
def find_user(self, user_id):
user = self.find_one(query={"user_id": user_id})
if user:
return dict(user)
return user
def find_user_name(self, user_id, space_id: Optional[str]):
query = {"user_role_id": user_id, "space_id": space_id}
one_user = self.find_one(filter_dict={"user_role_name": 1, "_id": 0}, query=query)
if one_user is None:
return one_user
return one_user["user_role_name"]
@staticmethod
def get_users_list(space_id=None):
query_json = [
{
"$group": {
"_id": None,
"data": {"$push": {"k": {"$ifNull": ["$user_id", ""]}, "v": {"$ifNull": ["$username", ""]}}},
}
},
{"$replaceRoot": {"newRoot": {"$arrayToObject": "$data"}}},
]
if space_id:
query_json.insert(0, {"$match": {"space_id": space_id}})
return query_json
def users_list_by_aggregate(self, query: list):
return self.aggregate(pipelines=query)
def find_user_by_space_id(self, user_id, space_id):
user = self.find_one(query={self.key_user_id: user_id, self.key_space_id: space_id})
if user:
return dict(user)
return user
def find_user_role_by_id(self, user_role_id, filter_dict=None):
return self.find_one(query={"user_role_id": user_role_id}, filter_dict=filter_dict)
def find_user_role_by_aggregate(self, query):
if role_by_aggregate := list(self.aggregate(query)):
return role_by_aggregate
return []
from ut_mongo_util import CollectionBaseClass, mongo_client
class UserCollectionKeys:
KEY_LANGUAGE = "language"
KEY_NAME = "name"
KEY_USER_ID = "user_id"
KEY_SPACE_ID = "space_id"
KEY_USERNAME = "username"
KEY_USER_ROLE = "userrole"
class UserSpace(CollectionBaseClass):
key_username = UserCollectionKeys.KEY_USERNAME
key_user_id = UserCollectionKeys.KEY_USER_ID
key_language = UserCollectionKeys.KEY_LANGUAGE
key_name = UserCollectionKeys.KEY_NAME
key_space_id = UserCollectionKeys.KEY_SPACE_ID
def __init__(self):
super().__init__(
mongo_client,
database="catalog_meta",
collection="user_space",
)
def fetch_user_space(self, user_id, space_id):
query = {self.key_user_id: user_id, self.key_space_id: space_id}
user = self.find_one(query=query)
return user
def fetch_user_space_with_details(self, user_id, space_id):
query = [
{"$match": {"user_id": user_id, "space_id": space_id}},
{"$lookup": {"from": "user", "localField": "user_id", "foreignField": "user_id", "as": "user_details"}},
{"$unwind": {"path": "$user_details"}},
{
"$project": {
"space_id": 1,
"AccessLevel": 1,
"access_group_ids": 1,
"userrole": 1,
"user_id": 1,
"name": "$user_details.name",
"email": "$user_details.email",
"username": "$user_details.username",
}
},
]
user = self.aggregate(query)
user_list = list(user)
if user_list:
return user_list[0]
else:
return None
def find_user_role_for_user_id(self, user_id, space_id):
query = {"user_id": user_id, "space_id": space_id}
filter_dict = {"userrole": 1, "_id": 0}
return self.find_one(query=query, filter_dict=filter_dict)
def update_one_user_space(self, data, user_id, space_id):
query = {self.key_user_id: user_id, "space_id": space_id}
return self.update_one(query=query, data=data, upsert=True)
def insert_one_user(self, data):
"""
The following function will insert one user in the
user collections
:param self:
:param data:
:return:
"""
return self.insert_one(data)
def delete_one_user_space(self, user_id, space_id):
return self.delete_one(query={self.key_user_id: user_id, self.key_space_id: space_id})
from typing import Dict, Optional
from pydantic import BaseModel
from scripts.utils.mongo_utils import MongoCollectionBaseClass
class WorkSpacesSchema(BaseModel):
"""
This is the Schema for the Mongo DB Collection.
All datastore and general responses will be following the schema.
"""
space_id: Optional[str] = ""
space_name: Optional[str] = ""
space_type: Optional[str] = ""
meta: Optional[Dict] = {}
user_id: Optional[str] = ""
source_meta: Optional[Dict] = {}
access_token: Optional[str] = ""
catalog_url: Optional[str] = ""
class WorkSpaces(MongoCollectionBaseClass):
def __init__(self, mongo_client):
super().__init__(mongo_client, database="catalog_meta", collection="workspaces")
@property
def key_space_id(self):
return "space_id"
@property
def key_space_name(self):
return "space_name"
def find_space(self, space_id=None, space_name=None, filter_dict=None):
"""
The following function will give one record for a given set of
search parameters as keyword arguments
:param space_name:
:param filter_dict:
:param space_id:
:return:
"""
query = {}
if space_id:
query.update({self.key_space_id: space_id})
if space_name:
query.update({self.key_space_name: space_name})
record = self.find_one(query=query, filter_dict=filter_dict)
if not record:
return {}
return WorkSpacesSchema(**record).dict()
def find_space_by_query(self, query, filter_dict=None):
record = self.find(query=query, filter_dict=filter_dict)
if record:
return record
return []
def fetch_space_details(self):
query = {}
filter_dict = {self.key_space_id: 1, "_id": 0, self.key_space_name: 1}
records = self.find(query=query, filter_dict=filter_dict)
if records:
space_name_mapp = {}
for record in records:
space_name_mapp[record.get(self.key_space_id)] = record.get(self.key_space_name)
return space_name_mapp
return {}
def insert_one_space(self, data):
"""
The following function will insert one space in the
customer_spaces collections
:param self:
:param data:
:return:
"""
return self.insert_one(data)
def delete_one_space(self, space_id):
if space_id:
query = {self.key_space_id: space_id}
return self.delete_one(query)
else:
return False
def get_space_data_by_aggregate(self, query: list):
return list(self.aggregate(pipelines=query))
def update_one_space(self, data, space_id, upsert=False):
"""
The following function will update one step in
steps collection based on the given query
:param data:
:param upsert:
:param space_id:
:return:
"""
query = {"space_id": space_id}
response = self.update_one(data=data, upsert=upsert, query=query)
return response
def delete_workspaces(self, space_id_list):
query = {self.key_space_id: {"$in": space_id_list}}
response = self.delete_many(query)
return response.deleted_count
class ILensErrors(Exception):
def __init__(self, msg):
Exception.__init__(self, msg)
"""
Base Error Class
"""
class ILensErrorsWithoutMessage(Exception):
"""Generic iLens Error"""
class ErrorMessages:
ERROR001 = "Authentication Failed. Please verify token"
ERROR002 = "Signature Expired"
ERROR003 = "Signature Not Valid"
ERROR004 = "User Record Not Found"
WORKSPACE_NAME_EXIST_ERROR = "Workspace Name Exist. Please Use different name"
WORKSPACE_CATALOG_URL_ERROR = "Please add valid catalog url"
class JobCreationError(Exception):
"""
Raised when a Job Creation throws an exception.
Job Creation happens by adding a record to Mongo.
"""
class UnknownError(Exception):
pass
class DuplicateSpaceNameError(Exception):
pass
class KairosDBError(Exception):
pass
class UnauthorizedError(Exception):
pass
class ImageValidation(Exception):
pass
class ILensError(Exception):
pass
class NameExists(Exception):
pass
class InputRequestError(ILensError):
pass
class IllegalTimeSelectionError(ILensError):
pass
class DataNotFound(Exception):
pass
class AuthenticationError(ILensError):
"""
JWT Authentication Error
"""
class JWTDecodingError(Exception):
pass
class DuplicateReportNameError(Exception):
pass
class PathNotExistsException(Exception):
pass
class ImplementationError(Exception):
pass
class UserRoleNotFoundException(Exception):
pass
class CustomError(Exception):
pass
class IllegalToken(ILensErrors):
pass
class InvalidPasswordError(ILensErrors):
pass
class UserNotFound(ILensErrors):
pass
class TooManyRequestsError(Exception):
pass
class FixedDelayError(ILensErrors):
pass
class VariableDelayError(ILensErrors):
pass
class LicenceValidationError(Exception):
pass
class CustomAppError:
FAILED_TO_SAVE = "Failed to save app"
class WorkspaceNameExistError(ILensErrorsWithoutMessage):
pass
class GlobalCatalogError(Exception):
"""Generic GlobalcatalogErrors Error"""
def __init__(self, msg):
Exception.__init__(self, msg)
"""
Base Error Class
"""
class ILensException(Exception):
pass
class MongoException(ILensException):
pass
class MongoConnectionException(MongoException):
pass
class MongoQueryException(MongoException):
pass
class MongoEncryptionException(MongoException):
pass
class MongoRecordInsertionException(MongoQueryException):
pass
class MongoFindException(MongoQueryException):
pass
class MongoDeleteException(MongoQueryException):
pass
class MongoUpdateException(MongoQueryException):
pass
class MongoUnknownDatatypeException(MongoEncryptionException):
pass
class MongoDistictQueryException(MongoException):
pass
class MongoFindAndReplaceException(MongoException):
pass
class MongoObjectDeserializationException(MongoException):
pass
import logging
import os
from logging import StreamHandler
from logging.handlers import RotatingFileHandler, SocketHandler
import yaml
from scripts.config import LoggVariables
# this method is to read the configuration from backup.conf
def read_configuration(file_name):
"""
:param file_name:
:return: all the configuration constants
"""
with open(file_name) as stream:
try:
return yaml.safe_load(stream)
except Exception as e:
print(f"Failed to load Configuration. Error: {e}")
config = read_configuration("scripts/logging/logger_conf.yml")
logging_config = config["logger"]
logging_config["level"] = LoggVariables.LOG_LEVEL
def add_logging_level(level_name, level_num, method_name=None):
"""
Comprehensively adds a new logging level to the `logging` module and the
currently configured logging class.
`level_name` becomes an attribute of the `logging` module with the value
`level_num`. `method_name` becomes a convenience method for both `logging`
itself and the class returned by `logging.getLoggerClass()` (usually just
`logging.Logger`). If `method_name` is not specified, `level_name.lower()` is
used.
To avoid accidental clobbering of existing attributes, this method will
raise an `AttributeError` if the level name is already an attribute of the
`logging` module or if the method name is already present
Example
-------
> add_logging_level('TRACE', logging.DEBUG - 5)
> logging.getLogger(__name__).setLevel("TRACE")
> logging.getLogger(__name__).trace('that worked')
> logging.trace('so did this')
> logging.TRACE
"""
if not method_name:
method_name = level_name.lower()
if hasattr(logging, level_name):
raise AttributeError("{} already defined in logging module".format(level_name))
if hasattr(logging, method_name):
raise AttributeError("{} already defined in logging module".format(method_name))
if hasattr(logging.getLoggerClass(), method_name):
raise AttributeError("{} already defined in logger class".format(method_name))
def log_for_level(self, message, *args, **kwargs):
if self.isEnabledFor(level_num):
self._log(level_num, message, args, **kwargs)
def log_to_root(message, *args, **kwargs):
logging.log(level_num, message, *args, **kwargs)
logging.addLevelName(level_num, level_name)
setattr(logging, level_name, level_num)
setattr(logging.getLoggerClass(), method_name, log_for_level)
setattr(logging, method_name, log_to_root)
def get_logger():
"""
Creates a rotating log
"""
__logger__ = logging.getLogger("ilens")
add_logging_level("QTRACE", logging.DEBUG - 5)
__logger__.setLevel(logging_config["level"].upper())
log_formatter = "%(asctime)s - %(levelname)-6s - [%(threadName)5s:%(funcName)5s():" + "%(lineno)s] - %(message)s"
time_format = "%Y-%m-%d %H:%M:%S"
file_path = LoggVariables.LOGS_MODULE_PATH
formatter = logging.Formatter(log_formatter, time_format)
for each_handler in logging_config["handlers"]:
if each_handler["type"] in ["RotatingFileHandler"] and LoggVariables.ENABLE_FILE_LOG:
if not os.path.exists(file_path):
os.makedirs(file_path)
log_file = os.path.join(file_path, f"{logging_config['name']}.log")
temp_handler = RotatingFileHandler(
log_file, maxBytes=each_handler["max_bytes"], backupCount=each_handler["back_up_count"]
)
temp_handler.setFormatter(formatter)
elif each_handler["type"] in ["SocketHandler"]:
temp_handler = SocketHandler(each_handler["host"], each_handler["port"])
elif each_handler["type"] in ["StreamHandler"] and LoggVariables.ENABLE_CONSOLE_LOG:
temp_handler = StreamHandler()
temp_handler.setFormatter(formatter)
else:
temp_handler = None
if temp_handler:
__logger__.addHandler(temp_handler)
return __logger__
logger = get_logger()
logger:
name: catalog_default_meta
level: INFO
handlers:
- type: RotatingFileHandler
file_path: data/catalog_default_meta/logs/
max_bytes: 100000000
back_up_count: 5
- type: SocketHandler
host: localhost
port: 23582
- type: StreamHandler
name: catalog_default_meta
import time
from scripts.db.mongo import destination_mongo_client
from scripts.db.redis_connection import destination_space_db
from scripts.utils.mongo_utils import MongoCollectionBaseClass
from scripts.constants.db_constants import (
CollectionNames,
DatabaseNames,
)
from scripts.logging import logger
class CommonUtils:
def __init__(self, space_id=None):
self.unique_id = MongoCollectionBaseClass(
mongo_client=destination_mongo_client,
database=DatabaseNames.catalog_meta,
collection=CollectionNames.unique_id,
space_db=destination_space_db
)
self.unique_id.space_id = space_id
@staticmethod
def get_user_meta(user_id=None, check_flag=False):
data_for_meta = {}
if check_flag:
data_for_meta["created_by"] = user_id
data_for_meta["created_on"] = int(time.time() * 1000)
data_for_meta["updated_by"] = user_id
data_for_meta["updated_on"] = int(time.time() * 1000)
return data_for_meta
def get_next_id(self, param):
try:
next_id_doc = self.unique_id.find_one(query={"key": param})
if not next_id_doc:
insert_dict = {"key": param, "id": 100}
self.unique_id.insert_one(data=insert_dict)
return insert_dict["id"]
else:
query = {"key": param}
count_value = int(next_id_doc["id"]) + 1
new_values = {"id": count_value}
self.unique_id.update_one(query=query, data=new_values, upsert=True)
return int(new_values["id"])
except Exception as e:
logger.exception(f"Exception in creating ID: {e}")
return None
\ No newline at end of file
import json
from functools import lru_cache
@lru_cache()
def get_db_name(redis_client, space_id: str, database: str, delimiter="__"):
if not space_id:
return database
val = redis_client.get(space_id)
if val is None:
raise ValueError(f"Unknown Space, Space ID: {space_id} Not Found!!!")
val = json.loads(val)
if not val:
return database
# Get the prefix flag to apply space_id prefix to any db
prefix_condition = bool(val.get("source_meta", {}).get("add_prefix_to_database"))
if prefix_condition:
# Get the prefix name from mongo or default to space_id
prefix_name = val.get("source_meta", {}).get("prefix") or space_id
return f"{prefix_name}{delimiter}{database}"
return database
@lru_cache()
def get_redis_db_prefix(redis_client, space_id: str, delimiter="__"):
if not space_id:
return False
val = redis_client.get(space_id)
if val is None:
return False
val = json.loads(val)
if not val:
return False
# Get the prefix flag to apply space_id prefix to any db
prefix_condition = bool(val.get("source_meta", {}).get("add_prefix_to_database"))
if prefix_condition:
# Get the prefix name from mongo or default to space_id
prefix_name = val.get("source_meta", {}).get("prefix") or space_id
return f"{prefix_name}{delimiter}"
return False
def get_space_data_from_redis(redis_client, space_id: str):
record = redis_client.get(space_id)
if record is None:
raise ValueError(f"Unknown Space, Space ID: {space_id} Not Found!!!")
if record := json.loads(record):
return record
import base64
import copy
import hashlib
import json
import os
from datetime import datetime
from operator import itemgetter
from uuid import UUID
from Crypto import Random
from Crypto.Cipher import AES
from scripts.config import Security
from scripts.errors import CustomError
from scripts.errors.mongo_exceptions import (
MongoException,
MongoUnknownDatatypeException,
)
from scripts.logging import logger
from scripts.utils.jwt_util import JWT
exclude_encryption_datatypes = (
datetime,
UUID,
)
class MongoEncryptionConstants:
# mongo encryption keys
key_encrypt_keys = "encrypt_keys"
key_exclude_encryption = "exclude_encryption"
product_encrypted = "product_encrypted"
max_docs_per_batch = 5
cipher_key = "eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJrZXkiOiItLS0tLUJFR0lOIFJTQSBQUklWQVRFIEtFWS0tLS0tXG5NSUlFb3dJQkFBS0NBUUVBclZFRDVjcit0TXRGdFZtWGwyTzBjdlFiRWdvWVNJRmQ4eXZrbW02ejdYQWRYNkVnXG5Za0tlejB5ZFRsMjZLT2RKMThBN0tuOGV0V0dlOG5Ua1NHaGVKbDlybi9KK2xFMXpwbzRaZy9UM3dEbk04Rk0zXG55dU0yNnZwSWIrMG9KbU5jOURrRlhvNFd0eFJGWkR5dGRFVGcvWXlJK2VKWURSRHJaU3JscUF6SURwQWRMcHY5XG5VaHNNaFlRKzJuM1BjYXVMZUpiMGRLUFZUYzZrU3ZHQ3MzTFowV3lUYlJuUXlKTUNXbmF4enBTSVVjSDdxYXFPXG5LQy9mQkNLc1ptUmpSTlNtUTNnZXB6NFZuUUt5SkNtN0NKaytjUWlRTVF6cnNwUlB2aG1Hb3VIWlVNMzZLanNHXG42eWx4MkJ1Nk9ZeS9IYnJkUmtKS05sdjN1NkJCTDZQbi9aSlpHUUlEQVFBQkFvSUJBQkk4ZU1oRVNuWWJtMVJJXG5XOFM4WXplSU8xUHoxM2hEa3U3Y0FyY0VLRzcya2NTbTU4a25BTjVIamJLNTluVkkxdEo2Z2M4NEpuTkgxUWxtXG5ac0crcDQ5cWtXQzRTM3pQeEhnMU1mYWFQenBNNnFVcjRHNDY1Nk9rVjV4ZFRCRHorZ3NoZDlEcDZ2WnpEZFVjXG45RlJNVGc4bnF4Nzk0NjFtUnhwelA4eGxvYVEwTmNLQnpGSzllM2cvNGk3Mkx3Z05QM0U2eG1FU2l1N2dvcUoxXG5HT0FJMm1KaWUzVFRZMXo4c2Y0dWlTRkxNYUZyRXhrcTR6NEtrd1M3cUYybk9KeGh2OEgvZzlUR1BOV3JuekF3XG55Qkh3SU5Cb1VhSndpT1Q1MXh4SURMZ05RaU5vSUZ1YU1LVnUybCtyV3RvUVdLR2lPbncxWmhZeGVKQ1hCeVhDXG5RcXBBZmdFQ2dZRUF3cHpTZnlvdDNQQWx4bTlpVks1WmM2bFJkQnE3SmF6dDd0OTFVNnplWTdDNHh6TkcxVHVmXG5jU1lLM3FSd2xNdzJ1WGw5YXV4eVY0MXJ6aVg5c1podEZVbm00amNHdjlNSGVhQWFTU1BTc3ZydFpERkJTN2t5XG5sMkl4azEwNzhMVFpDTE1ZbUFLQ0FyMlhMbVNoQlBTVmN1YUxrRFJYNHJ2dzdzY1dtTWI4NndFQ2dZRUE0L3lDXG5FQWpYbEwwV2xPWURKM0ovL1BnNGlCdEllZEhYbW4zMGdvTnVDQkJhb1l5Z1hhcGV5dEVtVTJxNWh5YlFUTVRYXG5WbC92SUFGaXUwVFg4MVZRN0xETEphYmVyLzdHRXNJVDN4K3htMGpGdk94RllWaFQ1YjBzMHoxQ1FvbG5SRnNBXG5kSXdRNXU1R2tQNjVoeUpVYTNaTWgrTDZWaXNTQ1RLcEFjbzlaaGtDZ1lBS0ZaNUN3S2pIdmhuM0FtYVNCTWJWXG4yM3hCQy9HT3JqdFdHWFkyODhwQ1dESDdBSWszRzNQVHBTa0RDSHBjKzRnS2JHVTNXVEZEb0N4cDdrWUxJZDdsXG5MNE1yVGJhbjBnT2RKZEsyMzRoWGhmRXZNKzR5UWxLQXpiSEw5UlRhRUVUKzBtai8xNEZ0S3UzZWxaQlNkV29aXG5IaUUxUThFYUdxc05kSHVUUnh4c0FRS0JnUUNxdzdlbnl2ZXVzUEw1RkUvSWZEcmhnQXJYNTVlaHAwdVdyRUU0XG5nTGtwMFJZUmF3T3pKS2xid015MExueElmd29HZG1uVWlJYlRzallCanM4eHMvV3BVOExWc09lYmEzbHhFMjFPXG44cTVWWVd5NjFUNGlhOVpyamdiRk1sMHUrVHdnTndsZ1FvbG1iNUxyaDkvdkdBZWpkamhjaitaeUpGQ2VFeFFFXG5BemQ2QVFLQmdCaGUrRndNaFR1czk2MWpxRUtYQlhtMC9PYU9nek9kZ2wvYXN1QzhvTFU3Y0FWRDdzUzJMRmNVXG51N29mSVZJRzZjUldScnVhakl1Q2RsSWNMT2VkVEU0WUw1akF1UkwxVHlWdnhNbTBGc3JrV1BBQkZySFdoc1pzXG5UU3pwaU9GSmtMSlRWblQ3aGxXLyttMHFyS2lXMHpyRnphMEphRndQL2xqK2hScllHa09sXG4tLS0tLUVORCBSU0EgUFJJVkFURSBLRVktLS0tLSIsImlzcyI6ImlsZW5zIiwiZXhwIjoxNzg4NTg2MzYyfQ.K6PrPcum1ACp9jQtqL3oNncnmXtTnPEOLYWCaaHmFMpLnAPAnKlYblsQkx4nv4pskJ3DBzSk6H-7Tnns4oejfaZI56wHhGz99JZN9mQ9JrQazZ01uccAwhcaOOMnMEny5J4Q6FB0OyyNIxSsScx2s21Vx-eJvOV1FOrCBjvZG78"
enc = AES.MODE_CBC
jwt = JWT()
try:
file_name = Security.ENCRYPTION_CONSTANTS_FILE_PATH
if not os.path.exists(file_name):
encrypt_collection_dict = {}
else:
with open(file_name) as f:
mongo_encryption_constants_data = json.load(f)
if "encrypt_collection_dict" in mongo_encryption_constants_data and Security.USER_ENCRYPTION:
encrypt_collection_dict = mongo_encryption_constants_data["encrypt_collection_dict"]
else:
encrypt_collection_dict = {}
except Exception as es:
encrypt_collection_dict = {}
logger.exception(f" Unable to fetch mongo encryption constants:{str(es)}")
class AESCipher:
def __init__(self, key):
self.bs = AES.block_size
self.key = hashlib.sha256(key.encode()).digest()
def get_cipher(self, iv):
return AES.new(self.key, enc, iv)
def encrypt(self, raw):
raw = self._pad(raw)
iv = Random.new().read(AES.block_size)
cipher = self.get_cipher(iv)
return base64.b64encode(iv + cipher.encrypt(raw.encode()))
def decrypt(self, enc):
enc = base64.b64decode(enc)
iv = enc[: AES.block_size]
cipher = self.get_cipher(iv)
return self._unpad(cipher.decrypt(enc[AES.block_size :])).decode("utf-8")
def _pad(self, s):
return s + (self.bs - len(s) % self.bs) * chr(self.bs - len(s) % self.bs)
@staticmethod
def _unpad(s):
return s[: -ord(s[len(s) - 1 :])]
class MongoDataEncryption:
def __init__(self):
self.decoded_aes = jwt.decode(MongoEncryptionConstants.cipher_key).get("key")
self.aes_cipher = AESCipher(key=self.decoded_aes)
# pass
def create_encrypted_string(self, payload):
return self.aes_cipher.encrypt(raw=json.dumps(payload))
def create_decrypted_string(self, payload):
decrypted_result = self.aes_cipher.decrypt(enc=payload)
try:
result = json.loads(decrypted_result)
except json.JSONDecodeError:
if isinstance(decrypted_result, bytes):
result = decrypted_result.decode("utf-8")
else:
result = decrypted_result
return result
def encrypt_data(self, json_data, collection_name):
"""
Encrypt the data in mongo based on the collection and key to be encrypted.
:param json_data: The data to be encrypted
:param collection_name: The collection where the document is stored
:return: Encrypted document based on product defined configuration.
"""
try:
if collection_name in encrypt_collection_dict.keys():
if isinstance(json_data, list):
encrypted_data = []
for data in encrypted_data:
dict_data = self.encrypt_dict_data(doc=data, collection_name=collection_name)
encrypted_data.append(dict_data)
elif isinstance(json_data, dict):
encrypted_data = self.encrypt_dict_data(doc=json_data, collection_name=collection_name)
else:
raise MongoUnknownDatatypeException(
"Unsupported datatype '{}' is being inserted to mongodb.".format(type(json_data))
)
else:
logger.info("Given data is not a part of the Mongo encryption setup. Skipping encryption")
if isinstance(json_data, dict):
encrypted_data = json_data
encrypted_data[MongoEncryptionConstants.product_encrypted] = False
else:
encrypted_data = json_data
return encrypted_data
except MongoException as e:
raise MongoException(str(e))
except Exception as e:
raise MongoException("Server faced a problem when encrypting the data --> {}".format(str(e)))
def encrypt_dict_data(self, doc, collection_name): # NOSONAR
"""
This method crawls the document and encrypts the keys that are marked for encryption.
Skips encrypting the keys with values of the datatypes defines in the tuple 'exclude_encryption_datatypes'
Adds two new keys to the document 'product_encrypted' and 'encryption_salt'
Key product_encrypted - Is a boolean value which flags a document as encrypted by this utility.
Key encryption_salt - List of all the values that were excluded from encryption due to datatype constraints.
:param doc: The document considered for encryption
:param collection_name: The collection where the document resided.
This is needed for the utility to read the encryption configuration
:return: The input document with the relevant keys encrypted.
"""
try:
is_mlens_encrypted = False
encrypted_data = {}
encrypted_data["encryption_salt"] = {}
if "*" in encrypt_collection_dict[collection_name][MongoEncryptionConstants.key_encrypt_keys]:
# Forming encryption salt
for index, exclude_encryption_datatype in enumerate(exclude_encryption_datatypes):
if exclude_encryption_datatype not in [None, ""]:
encrypted_data["encryption_salt"][f"dt_{index}"] = self.search_datatype(
doc, exclude_encryption_datatype
)
sorted_path = sorted(
encrypted_data["encryption_salt"][f"dt_{index}"], key=itemgetter("p"), reverse=True
)
for _path in sorted_path:
to_pop = self.remove_value_of_datatype_command(_path, "dict_data")
exec(to_pop)
for dt in encrypted_data["encryption_salt"]:
for path_index, _path in enumerate(encrypted_data["encryption_salt"][dt]):
encrypted_data["encryption_salt"][dt][path_index]["p"] = base64.b64encode(_path["p"].encode())
# Encrypting the data
for key in doc.keys():
if (
key
not in encrypt_collection_dict[collection_name][MongoEncryptionConstants.key_exclude_encryption]
):
encrypted_data[key] = {
"d": self.create_encrypted_string(payload=self.convert(doc[key])),
"t": base64.b64encode(type(doc[key]).__name__.encode()),
}
is_mlens_encrypted = True
else:
encrypted_data[key] = doc[key]
else:
for key in doc.keys():
if key in encrypt_collection_dict[collection_name][MongoEncryptionConstants.key_encrypt_keys]:
# Forming encryption salt
for index, exclude_encryption_datatype in enumerate(exclude_encryption_datatypes):
if exclude_encryption_datatype not in [None, ""]:
temp_dict_data = {}
temp_dict_data[key] = copy.deepcopy(doc[key])
encrypted_data["encryption_salt"][f"dt_{index}"] = self.search_datatype(
temp_dict_data, exclude_encryption_datatype
)
sorted_path = sorted(
encrypted_data["encryption_salt"][f"dt_{index}"],
key=itemgetter("p"),
reverse=True,
)
for _path in sorted_path:
to_pop = self.remove_value_of_datatype_command(_path, "dict_data")
exec(to_pop)
for dt in encrypted_data["encryption_salt"]:
for path_index, _path in enumerate(encrypted_data["encryption_salt"][dt]):
encrypted_data["encryption_salt"][dt][path_index]["p"] = base64.b64encode(
_path["p"].encode()
)
# Encrypting the data
encrypted_data[key] = {
"d": self.create_encrypted_string(payload=self.convert(doc[key])),
"t": base64.b64encode(type(doc[key]).__name__.encode()),
}
is_mlens_encrypted = True
else:
encrypted_data[key] = doc[key]
encrypted_data[MongoEncryptionConstants.product_encrypted] = is_mlens_encrypted
if not encrypted_data[MongoEncryptionConstants.product_encrypted]:
del encrypted_data["encryption_salt"]
return encrypted_data
except MongoException as e:
raise MongoException(str(e))
except Exception as e:
raise MongoException("Server faced a problem when encrypting the data --> {}".format(str(e)))
def decrypt_data(self, dict_data, _collection_name): # NOSONAR
"""
This method decrypts all the data that is encrypted.
Keys that were excluded during encryption and have been added to the encryption_salt
will be added back to their original positions.
:param dict_data: The document that needs to be decrypted
:param _collection_name: The collection to which the document belongs to
:return: The decrypted data with the original data types intact
"""
try:
if _collection_name in encrypt_collection_dict.keys():
decrypted_data = {}
if "*" in encrypt_collection_dict[_collection_name][MongoEncryptionConstants.key_encrypt_keys]:
for key in dict_data.keys():
if key not in encrypt_collection_dict[_collection_name][
MongoEncryptionConstants.key_exclude_encryption
] and not isinstance(dict_data[key], exclude_encryption_datatypes):
if isinstance(dict_data[key], dict):
if "d" in dict_data[key].keys() and "t" in dict_data[key].keys():
decrypted_data[key] = self.decrypt_convert_proper_data_type(
data=self.create_decrypted_string(payload=dict_data[key]["d"]),
data_type=base64.b64decode(dict_data[key]["t"].decode()).decode(),
)
else:
decrypted_data[key] = dict_data[key]
else:
decrypted_data[key] = dict_data[key]
else:
decrypted_data[key] = dict_data[key]
else:
for key in dict_data.keys():
if key in encrypt_collection_dict[_collection_name][
MongoEncryptionConstants.key_encrypt_keys
] and not isinstance(dict_data[key], exclude_encryption_datatypes):
if isinstance(dict_data[key], dict):
if "d" in dict_data[key].keys() and "t" in dict_data[key].keys():
decrypted_data[key] = self.decrypt_convert_proper_data_type(
data=self.create_decrypted_string(payload=dict_data[key]["d"]),
data_type=base64.b64decode(dict_data[key]["t"].decode()).decode(),
)
else:
decrypted_data[key] = dict_data[key]
else:
decrypted_data[key] = dict_data[key]
else:
decrypted_data[key] = dict_data[key]
else:
decrypted_data = dict_data
return decrypted_data
except MongoException as e:
raise MongoException(str(e))
except Exception as e:
raise MongoException(f"Server faced a problem when decrypting the data: {str(e)}")
@staticmethod
def decrypt_convert_proper_data_type(data, data_type):
"""
Convert the de-serialized JSON object to the original data-type
:param data: The de-serialized data
:param data_type: The original data type to which the de-serialized data should be converted to
:return: The de-serialized data with it's original data type.
"""
if data_type == "int":
return int(data)
elif data_type == "list":
return data
elif data_type == "dict":
return data
elif data_type == "bool":
return data
else:
return data.lstrip('"').rstrip('"')
def convert(self, data):
"""
Convert all byte-like objects into the proper data types.
This supports conversion of nested dict, list and tuples.
:param data:
:return:
"""
if isinstance(data, bytes):
return data.decode("ascii")
if isinstance(data, dict):
return dict(map(self.convert, data.items()))
if isinstance(data, tuple):
return map(self.convert, data)
if isinstance(data, list):
return list(map(self.convert, data))
return data
def search_datatype(self, _input, search_type, prev_datapoint_path=""): # NOSONAR
"""
Search for an excluded data type in a nested dictionary or list and record it's path in the document.
This does not support the exclusion of data of types dict and list.
:param _input: The input data
:param search_type: The data type to be searched for to exclude.
:param prev_datapoint_path: The path of a value in a nested dict or nested list.
:return: List of dictionaries, with each dictionary containing the true value and it's path.
"""
try:
output = []
current_datapoint = _input
current_datapoint_path = prev_datapoint_path
if search_type is dict:
raise CustomError("Searching for datatype dict is not supported!")
elif search_type is list:
raise CustomError("Searching for datatype list is not supported!")
else:
if isinstance(current_datapoint, dict):
for dkey in current_datapoint:
temp_datapoint_path = current_datapoint_path
temp_datapoint_path += "dict-{}.".format(dkey)
for index in self.search_datatype(current_datapoint[dkey], search_type, temp_datapoint_path):
output.append(index)
elif isinstance(current_datapoint, list):
for index in range(0, len(current_datapoint)):
temp_datapoint_path = current_datapoint_path
temp_datapoint_path += "list-{}.".format(index)
for index_1 in self.search_datatype(current_datapoint[index], search_type, temp_datapoint_path):
output.append(index_1)
elif isinstance(current_datapoint, search_type):
output.append({"p": current_datapoint_path, "v": current_datapoint})
output = filter(None, output)
return list(output)
except Exception:
raise CustomError(f"Server faced a problem when searching for instances of datatype --> '{search_type}' ")
@staticmethod
def remove_value_of_datatype_command(remove_value, var_name):
"""
This method produces the command for the value to be removed from a nested dict or list,
when given the path of that value in the source variable.
:param remove_value: The value (it's path) to be removed.
:param var_name: The variable on which the exec function should run on to remove the non-serializable value.
:return: The final command that will run in the exec function to remove the value from a nested dict or list.
"""
temp_path = ""
individual_path_list = remove_value["p"].split(".")
individual_path_list.remove("")
if individual_path_list[len(individual_path_list) - 1].split("-")[0] == "dict":
orig_path = "del {var_name}{path}"
elif individual_path_list[len(individual_path_list) - 1].split("-")[0] == "list":
pop_index = ".pop({})".format(individual_path_list[len(individual_path_list) - 1].split("-")[1])
orig_path = "{var_name}{path}" + pop_index
individual_path_list.pop(len(individual_path_list) - 1)
else:
return
for _path_index, path in enumerate(individual_path_list):
if path.split("-")[0] == "dict":
temp_path += '["{}"]'.format(path.split("-")[1])
elif path.split("-")[0] == "list":
temp_path += "[{}]".format(path.split("-")[1])
orig_path = orig_path.format(path=temp_path, var_name=var_name)
return orig_path
@staticmethod
def add_value_datatype_command(add_value, var_name, value):
"""
This method produces the command for the value to be added back to a nested dict or list,
when given the path of that value in the source variable.
:param add_value: The value (it's path) to be added
:param var_name: The source variable name on which the exec function should run on.
:param value: The original non-serialized value.
:return: The command to be executed on the source variable.
"""
path_string = ""
temp_path_string = ""
individual_path_list = add_value["p"].split(".")
individual_path_list.remove("")
for _path_index, path in enumerate(individual_path_list):
if path.split("-")[0] == "dict":
temp_path_string = '["{}"]'.format(path.split("-")[1])
elif path.split("-")[0] == "list":
temp_path_string = "[{}]".format(path.split("-")[1])
else:
raise CustomError("Unsupported datatype given for add value")
path_string += temp_path_string
if individual_path_list[len(individual_path_list) - 1].split("-")[0] == "dict":
command = "{var_name}{path} = {value}".format(var_name=var_name, path=path_string, value=value)
elif individual_path_list[len(individual_path_list) - 1].split("-")[0] == "list":
command = "{var_name}{path}].append({value})".format(
var_name=var_name, path=path_string.rstrip(temp_path_string), value=value
)
else:
raise CustomError("Unsupported datatype given for add value")
return command
import jwt
from jwt.exceptions import (
ExpiredSignatureError,
InvalidSignatureError,
MissingRequiredClaimError,
)
from scripts.config import KeyPath
from scripts.errors import AuthenticationError, ErrorMessages
from scripts.logging import logger
class Secrets:
LOCK_OUT_TIME_MINS = 30
leeway_in_mins = 10
unique_key = "45c37939-0f75"
token = "8674cd1d-2578-4a62-8ab7-d3ee5f9a"
issuer = "ilens"
alg = "RS256"
SECRET_FOR_SUPPORT_LENS = "WeSupport24X7UnifyTwinX#"
ISS = "unifytwin"
AUD = "supportlens"
signature_key = "kliLensKLiLensKL"
signature_key_alg = ["HS256"]
class JWT:
def __init__(self):
self.max_login_age = Secrets.LOCK_OUT_TIME_MINS
self.issuer = Secrets.issuer
self.alg = Secrets.alg
self.public = KeyPath.PUBLIC
self.private = KeyPath.PRIVATE
def encode(self, payload):
try:
with open(self.private) as f:
key = f.read()
return jwt.encode(payload, key, algorithm=self.alg)
except Exception as e:
logger.exception(f"Exception while encoding JWT: {str(e)}")
raise
finally:
f.close()
def decode(self, token):
try:
with open(self.public) as f:
key = f.read()
return jwt.decode(token, key, algorithms=self.alg)
except Exception as e:
logger.exception(f"Exception while encoding JWT: {str(e)}")
raise
finally:
f.close()
def validate(self, token):
try:
with open(self.public) as f:
key = f.read()
payload = jwt.decode(
token,
key,
algorithms=self.alg,
leeway=Secrets.leeway_in_mins,
options={"require": ["exp", "iss"]},
)
return payload
except InvalidSignatureError:
raise AuthenticationError(ErrorMessages.ERROR003)
except ExpiredSignatureError:
raise AuthenticationError(ErrorMessages.ERROR002)
except MissingRequiredClaimError:
raise AuthenticationError(ErrorMessages.ERROR002)
except Exception as e:
logger.exception(f"Exception while validating JWT: {str(e)}")
raise
finally:
f.close()
"""
Mongo Utility
Author: Irfanuddin Shafi Ahmed
Reference: Pymongo Documentation
"""
import os
from datetime import datetime, timezone
from typing import Dict, List, Optional
from pymongo import MongoClient
from pymongo.cursor import Cursor
from scripts.logging import logger
from scripts.utils.db_name_util import get_db_name
from scripts.utils.decryption_util import MongoDataEncryption
META_SOFT_DEL: bool = os.getenv("META_SOFT_DEL", True)
add_fields = "$addFields"
match = "$match"
class MongoConnect:
def __init__(self, uri):
try:
self.uri = uri
self.client = MongoClient(self.uri, connect=False)
except Exception as e:
logger.error(f"Exception in connection {(str(e))}")
raise e
def __call__(self, *args, **kwargs):
return self.client
def __repr__(self):
return f"Mongo Client(uri:{self.uri}, server_info={self.client.server_info()})"
class MongoCollectionBaseClass:
def __init__(self, mongo_client, database, collection, space_db, soft_delete: bool = META_SOFT_DEL):
self.client = mongo_client
self.database = database
self.collection = collection
self.space_db = space_db
self.soft_delete = soft_delete
self.data_encryption = MongoDataEncryption()
# Variable to preserve initiated database
# (if database name changes during runtime)
self.__database = None
def __repr__(self):
return f"{self.__class__.__name__}(database={self.database}, collection={self.collection})"
@property
def space_id(self):
return self.space_id
@space_id.setter
def space_id(self, space_id):
if self.__database is None:
# storing original db name if None
self.__database = self.database
self.database = get_db_name(
redis_client=self.space_db,
space_id=space_id,
database=self.__database,
)
def get_decrypted_records(self, records):
return [
self.data_encryption.decrypt_data(each_record, _collection_name="user")
for each_record in records
]
def find_decrypted(self, query: Dict, filter_dict: Optional[Dict] = None):
try:
database_name = self.database
collection_name = self.collection
if filter_dict is None:
filter_dict = {"_id": 0}
db = self.client[database_name]
collection = db[collection_name]
if mongo_response := collection.find_one(query, filter_dict):
mongo_response = [mongo_response]
mongo_response = self.get_decrypted_records(records=mongo_response)
return mongo_response[0]
else:
return mongo_response
except Exception as e:
logger.error(str(e))
raise
def insert_one(self, data: Dict):
"""
The function is used to inserting a document to a collection in a Mongo Database.
:param data: Data to be inserted
:return: Insert ID
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.insert_one(data)
return response.inserted_id
except Exception as e:
raise ValueError(str(e))
def insert_many(self, data: List):
"""
The function is used to inserting documents to a collection in a Mongo Database.
:param data: List of Data to be inserted
:return: Insert IDs
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.insert_many(data)
return response.inserted_ids
except Exception as e:
raise ValueError(str(e))
def find(
self,
query: Dict,
filter_dict: Optional[Dict] = None,
sort=None,
skip: Optional[int] = 0,
collation: Optional[bool] = False,
limit: Optional[int] = None,
) -> Cursor:
"""
The function is used to query documents from a given collection in a Mongo Database
:param query: Query Dictionary
:param filter_dict: Filter Dictionary
:param sort: List of tuple with key and direction. [(key, -1), ...]
:param skip: Skip Number
:param limit: Limit Number
:return: List of Documents
"""
if sort is None:
sort = []
if filter_dict is None:
filter_dict = {"_id": 0}
database_name = self.database
collection_name = self.collection
try:
db = self.client[database_name]
collection = db[collection_name]
if len(sort) > 0:
cursor = (
collection.find(
query,
filter_dict,
)
.sort(sort)
.skip(skip)
)
else:
cursor = collection.find(
query,
filter_dict,
).skip(skip)
if limit:
cursor = cursor.limit(limit)
if collation:
cursor = cursor.collation({"locale": "en"})
return cursor
except Exception as e:
raise ValueError(str(e))
def find_one(self, query: Dict, filter_dict: Optional[Dict] = None):
try:
database_name = self.database
collection_name = self.collection
if filter_dict is None:
filter_dict = {"_id": 0}
db = self.client[database_name]
collection = db[collection_name]
return collection.find_one(query, filter_dict)
except Exception as e:
raise ValueError(str(e))
def update_one(
self,
query: Dict,
data: Dict,
upsert: bool = False,
):
"""
:param upsert:
:param query:
:param data:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.update_one(query, {"$set": data}, upsert=upsert)
return response
except Exception as e:
raise ValueError(str(e))
def update_many(self, query: Dict, data: Dict, upsert: bool = False):
"""
:param upsert:
:param query:
:param data:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.update_many(query, {"$set": data}, upsert=upsert)
return response
except Exception as e:
raise ValueError(str(e))
def delete_many(self, query: Dict):
"""
:param query:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
if self.soft_delete:
soft_del_query = [
{match: query},
{add_fields: {"deleted": {"on": datetime.now(timezone.utc).replace(tzinfo=timezone.utc)}}},
{
"$merge": {
"into": {
"db": f"deleted__{database_name}",
"coll": collection_name,
},
}
},
]
collection.aggregate(soft_del_query)
response = collection.delete_many(query)
return response.deleted_count
except Exception as e:
raise ValueError(str(e))
def delete_one(self, query: Dict):
"""
:param query:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
if self.soft_delete:
soft_del_query = [
{match: query},
{add_fields: {"deleted": {"on": datetime.now(timezone.utc).replace(tzinfo=timezone.utc)}}},
{
"$merge": {
"into": {
"db": f"deleted__{database_name}",
"coll": collection_name,
},
}
},
]
collection.aggregate(soft_del_query)
response = collection.delete_one(query)
return response.deleted_count
except Exception as e:
raise ValueError(str(e))
def distinct(self, query_key: str, filter_json: Optional[Dict] = None):
"""
:param query_key:
:param filter_json:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
return collection.distinct(query_key, filter_json)
except Exception as e:
raise ValueError(str(e))
def aggregate(
self,
pipelines: List,
):
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
return collection.aggregate(pipelines)
except Exception as e:
raise ValueError(str(e))
def find_count(self, json_data, database_name, collection_name):
"""
:param json_data:
:param database_name: The database to which the collection/ documents belongs to.
:param collection_name: The collection to which the documents belongs to.
:return:
"""
try:
db = self.client[database_name]
return db[collection_name].find(json_data).count()
except Exception as e:
raise ValueError(str(e))
def find_record_count(self, json_data: dict):
"""
:param json_data:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
return collection.count_documents(json_data)
except Exception as e:
raise ValueError(str(e))
def bulk_write(self, operation):
try:
database_name = self.database
collection_name = self.collection
database_connection = self.client[database_name]
database_connection[collection_name].bulk_write(operation)
return "success"
except Exception as e:
raise ValueError(str(e))
def create_mongo_index(self, index_list: list):
"""
params: index_list - ([("key1", -1),("key2", 1)])
Returns:
object:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
collection.create_index(index_list)
return True
except Exception as e:
raise ValueError(str(e))
class MongoAggregateBaseClass:
def __init__(
self,
mongo_client,
database,
):
self.client = mongo_client
self.database = database
def aggregate(
self,
collection,
pipelines: List,
):
try:
database_name = self.database
collection_name = collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.aggregate(pipelines)
return response
except Exception as e:
logger.error(f"Failed to get the aggregate data {str(e)}")
raise e
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment