Commit 8688eaf7 authored by tarun2512's avatar tarun2512

message

parents
MONGO_URI=mongodb://ilens:ilens4321@192.168.0.220:31589/?authSource=admin&directConnection=true
REDIS_URI=redis://admin:iLensDevRedis@192.168.0.220:32642
PASSWORD=Admin@090
\ No newline at end of file
-----BEGIN RSA PRIVATE KEY-----
MIICWwIBAAKBgQClilTaeHq6Zc+kWHCNl1O0btGRm7ct3O5zqWx1mwwLUWH14eft
Hi5wIbOYh79JQ9BO2OA4UjPq31uwmJ96Okl0OULfENhwd/D7P3mnoRlktPT2t+tt
RRrKvx3wNpOy/3nBsXnNt8EKxyA7k9vbqLbv9pGw2hcqOYe/NGTkmm1PswIDAQAB
AoGAZPARR1l5NBkKYGKQ1rU0E+wSmx+AtVVmjF39RUSyNmB8Q+poebwSgsr58IKt
T6Yq6Tjyl0UAZTGmferCK0xJJrqyP0hMn4nNNut+acWMKyt+9YrA2FO+r5Jb9JuT
SK35xXnM4aZLGppgWJxRzctpIz+qkf6oLRSZme0AuiqcwYECQQDY+QDL3wbWplRW
bze0DsZRMkDAkNY5OCydvjte4SR/mmAzsrpNrS5NztWbaaQrefoPbsdYBPbd8rS7
C/s/0L1zAkEAw1EC5zt2STuhkcKLa/tL+bk8WHHHtf19aC9kBj1TvWBFh+JojWCo
86iK5fLcHzhyQx5Qi3E9LG2HvOWhS1iUwQJAKbEHHyWW2c4SLJ2oVXf1UYrXeGkc
UNhjclgobl3StpZCYAy60cwyNo9E6l0NR7FjhG2j7lzd1t4ZLkvqFmQU0wJATLPe
yQIwBLh3Te+xoxlQD+Tvzuf3/v9qpWSfClhBL4jEJYYDeynvj6iry3whd91J+hPI
m8o/tNfay5L+UcGawQJAAtbqQc7qidFq+KQYLnv5gPRYlX/vNM+sWstUAqvWdMze
JYUoTHKgiXnSZ4mizI6/ovsBOMJTb6o1OJCKQtYylw==
-----END RSA PRIVATE KEY-----
-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQClilTaeHq6Zc+kWHCNl1O0btGR
m7ct3O5zqWx1mwwLUWH14eftHi5wIbOYh79JQ9BO2OA4UjPq31uwmJ96Okl0OULf
ENhwd/D7P3mnoRlktPT2t+ttRRrKvx3wNpOy/3nBsXnNt8EKxyA7k9vbqLbv9pGw
2hcqOYe/NGTkmm1PswIDAQAB
-----END PUBLIC KEY-----
-----BEGIN RSA PRIVATE KEY-----
MIICWwIBAAKBgQClilTaeHq6Zc+kWHCNl1O0btGRm7ct3O5zqWx1mwwLUWH14eft
Hi5wIbOYh79JQ9BO2OA4UjPq31uwmJ96Okl0OULfENhwd/D7P3mnoRlktPT2t+tt
RRrKvx3wNpOy/3nBsXnNt8EKxyA7k9vbqLbv9pGw2hcqOYe/NGTkmm1PswIDAQAB
AoGAZPARR1l5NBkKYGKQ1rU0E+wSmx+AtVVmjF39RUSyNmB8Q+poebwSgsr58IKt
T6Yq6Tjyl0UAZTGmferCK0xJJrqyP0hMn4nNNut+acWMKyt+9YrA2FO+r5Jb9JuT
SK35xXnM4aZLGppgWJxRzctpIz+qkf6oLRSZme0AuiqcwYECQQDY+QDL3wbWplRW
bze0DsZRMkDAkNY5OCydvjte4SR/mmAzsrpNrS5NztWbaaQrefoPbsdYBPbd8rS7
C/s/0L1zAkEAw1EC5zt2STuhkcKLa/tL+bk8WHHHtf19aC9kBj1TvWBFh+JojWCo
86iK5fLcHzhyQx5Qi3E9LG2HvOWhS1iUwQJAKbEHHyWW2c4SLJ2oVXf1UYrXeGkc
UNhjclgobl3StpZCYAy60cwyNo9E6l0NR7FjhG2j7lzd1t4ZLkvqFmQU0wJATLPe
yQIwBLh3Te+xoxlQD+Tvzuf3/v9qpWSfClhBL4jEJYYDeynvj6iry3whd91J+hPI
m8o/tNfay5L+UcGawQJAAtbqQc7qidFq+KQYLnv5gPRYlX/vNM+sWstUAqvWdMze
JYUoTHKgiXnSZ4mizI6/ovsBOMJTb6o1OJCKQtYylw==
-----END RSA PRIVATE KEY-----
-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQClilTaeHq6Zc+kWHCNl1O0btGR
m7ct3O5zqWx1mwwLUWH14eftHi5wIbOYh79JQ9BO2OA4UjPq31uwmJ96Okl0OULf
ENhwd/D7P3mnoRlktPT2t+ttRRrKvx3wNpOy/3nBsXnNt8EKxyA7k9vbqLbv9pGw
2hcqOYe/NGTkmm1PswIDAQAB
-----END PUBLIC KEY-----
from dotenv import load_dotenv
import os
load_dotenv()
from scripts.default_workspace_creation import WorkspaceCreation
from scripts.default_user_role_creation import UserRoleCreation
from scripts.default_user_creation import UserCreation
PASSWORD = os.environ.get("PASSWORD")
if __name__ == "__main__":
WorkspaceCreation().global_catalog_workspace_creation()
UserRoleCreation().create_user_role()
UserCreation().create_default_user(PASSWORD)
bcrypt~=4.0.1
cryptography==43.0.1
pydantic~=2.7.3
python-dotenv==1.0.1
ut-mongo-util[stable,encryption]==1.1.1
ut-redis-connector[stable]==0.3.1
\ No newline at end of file
import os
import pathlib
import shutil
from typing import Optional, Any
from dotenv import load_dotenv
from pydantic.v1 import Field, root_validator, BaseSettings
load_dotenv()
class _LoggVariables(BaseSettings):
LOG_LEVEL: str = Field(default="DEBUG")
ENABLE_FILE_LOG: Optional[Any] = Field(default=False)
ENABLE_CONSOLE_LOG: Optional[Any] = Field(default=True)
LOGS_MODULE_PATH: Optional[pathlib.Path] = Field(default="/code/data/default_catalog_meta_logs")
class _Databases(BaseSettings):
MONGO_URI: Optional[str]
REDIS_URI: Optional[str]
REDIS_SPACE_DB: int = Field(default=18)
REDIS_USER_ROLE_DB: Optional[int] = Field(default=21)
class _Security(BaseSettings):
ENCRYPTION_CONSTANTS_FILE_PATH: str = "scripts/config/mongo_encryption_constants.json"
USER_ENCRYPTION: bool = Field(default=True)
class _KeyPath(BaseSettings):
KEYS_PATH: Optional[pathlib.Path] = Field(default="data/keys")
PUBLIC: Optional[pathlib.Path]
PRIVATE: Optional[pathlib.Path]
@root_validator(allow_reuse=True)
def assign_values(cls, values):
if not os.path.isfile(os.path.join(values.get("KEYS_PATH"), "public")) or not os.path.isfile(
os.path.join(values.get("KEYS_PATH"), "private")
):
if not os.path.exists(values.get("KEYS_PATH")):
os.makedirs(values.get("KEYS_PATH"))
shutil.copy(os.path.join("assets", "keys", "public"), os.path.join(values.get("KEYS_PATH"), "public"))
shutil.copy(os.path.join("assets", "keys", "private"), os.path.join(values.get("KEYS_PATH"), "private"))
values["PUBLIC"] = os.path.join(values.get("KEYS_PATH"), "public")
values["PRIVATE"] = os.path.join(values.get("KEYS_PATH"), "private")
return values
DBConf = _Databases()
LoggVariables = _LoggVariables()
Security = _Security()
KeyPath = _KeyPath()
__all__ = [
"DBConf",
"LoggVariables",
"Security",
"KeyPath"
]
\ No newline at end of file
from datetime import datetime
DEFAULT_USER_ROLES = [{
"space_id": "space_099",
"type": "edit",
"user_role_id": "user_role_096",
"access_levels": {
"userManagement": {
"access_level": True
},
"approvalInbox": {
"access_level": True
},
"artifacts": {
"access_level": True
},
"assetExplorer": {
"access_level": True,
"children": {
"overview": False,
"parameters": False,
"alarms": False,
"events": False,
"serviceHistory": False,
"rules": False,
"resources": False,
"dataMapping": False,
"digitalTwin": False,
"materials": False,
"auditLogs": False,
"insights": False,
"treeView": False,
"parameterData": False
}
}
},
"user_role_description": "Admin",
"user_role_name": "Admin",
"user_role_permissions": {
"userManagement": {
"key": "userManagement",
"name": "User Management",
"create": True,
"delete": True,
"edit": True,
"view": True,
"children": {
"users": {
"key": "users",
"name": "User",
"create": True,
"delete": True,
"edit": True,
"view": True
}
}
},
"approvalInbox": {
"key": "approvalInbox",
"name": "Approval Inbox",
"create": True,
"delete": True,
"edit": True,
"view": True
},
"artifacts": {
"key": "artifacts",
"name": "artifacts",
"create": True,
"delete": True,
"edit": True,
"view": True
}
},
"catalogPermission": True
},
{
"space_id": "space_099",
"type": "edit",
"user_role_id": "user_role_097",
"access_levels": {
"userManagement": {
"access_level": True
},
"approvalInbox": {
"access_level": True
},
"artifacts": {
"access_level": True
},
"assetExplorer": {
"access_level": True,
"children": {
"overview": False,
"parameters": False,
"alarms": False,
"events": False,
"serviceHistory": False,
"rules": False,
"resources": False,
"dataMapping": False,
"digitalTwin": False,
"materials": False,
"auditLogs": False,
"insights": False,
"treeView": False,
"parameterData": False
}
}
},
"user_role_description": "Reviewer",
"user_role_name": "Reviewer",
"user_role_permissions": {
"userManagement": {
"key": "userManagement",
"name": "User Management",
"create": True,
"delete": True,
"edit": True,
"view": True,
"children": {
"users": {
"key": "users",
"name": "User",
"create": True,
"delete": True,
"edit": True,
"view": True
}
}
},
"approvalInbox": {
"key": "approvalInbox",
"name": "Approval Inbox",
"create": True,
"delete": True,
"edit": True,
"view": True
},
"artifacts": {
"key": "artifacts",
"name": "artifacts",
"create": True,
"delete": True,
"edit": True,
"view": True
}
},
"catalogPermission": True
},
{
"space_id": "space_099",
"type": "edit",
"user_role_id": "user_role_098",
"access_levels": {
"userManagement": {
"access_level": True
},
"approvalInbox": {
"access_level": True
},
"artifacts": {
"access_level": True
},
"assetExplorer": {
"access_level": True,
"children": {
"overview": False,
"parameters": False,
"alarms": False,
"events": False,
"serviceHistory": False,
"rules": False,
"resources": False,
"dataMapping": False,
"digitalTwin": False,
"materials": False,
"auditLogs": False,
"insights": False,
"treeView": False,
"parameterData": False
}
}
},
"user_role_description": "Operator",
"user_role_name": "Operator",
"user_role_permissions": {
"userManagement": {
"key": "userManagement",
"name": "User Management",
"create": True,
"delete": True,
"edit": True,
"view": True,
"children": {
"users": {
"key": "users",
"name": "User",
"create": True,
"delete": True,
"edit": True,
"view": True
}
}
},
"approvalInbox": {
"key": "approvalInbox",
"name": "Approval Inbox",
"create": True,
"delete": True,
"edit": True,
"view": True
},
"artifacts": {
"key": "artifacts",
"name": "artifacts",
"create": True,
"delete": True,
"edit": True,
"view": True
}
},
"catalogPermission": True
}
]
DEFAULT_USER = {
"encryption_salt": {"dt_0": [], "dt_1": []},
"name": "CatalogUser",
"username": "cataloguser",
"password": "",
"email": "tarun.madamanchi@rockwellautomation.com",
"user_type": "catalog_user",
"phonenumber": 9581388594,
"userrole": ["user_role_096"],
"user_id": "user_097",
"created_by": "user_097",
"product_encrypted": False,
"failed_attempts": 0,
"is_user_locked": False,
"last_failed_attempt": "2021-05-13 08:56:15",
"ilens_encrypted": False,
"passwordReset": None,
"tz": None,
"expires_on": "02/12/21 09:00 30 AM",
"disable_user": False,
"last_logged_in": 1735796769,
"last_failed_login": None,
"fixed_delay": 0,
"variable_delay": 0,
"space_id": "space_099",
"default_user": True,
}
DEFAULT_SPACE = {
"space_id": "space_099",
"userrole": ["user_role_096"],
"created_by": "user_097",
"updated_time": datetime.utcnow().isoformat() + "Z", # UTC in ISO-8601 format
"user_id": "user_097",
"updated_by": "user_097",
}
\ No newline at end of file
{
"encrypt_collection_dict" : {
"user": {
"encrypt_keys": ["phonenumber", "email"],
"exclude_encryption": []}
}
}
from ut_redis_connector import RedisConnector
from scripts.config import DBConf
connector = RedisConnector(DBConf.REDIS_URI)
space_db = connector.connect(db=int(DBConf.REDIS_SPACE_DB), decode_responses=True)
user_role_permissions_redis = connector.connect(db=DBConf.REDIS_USER_ROLE_DB, decode_responses=True)
import re
from typing import Any, Dict, List, Optional, Union
from pydantic import BaseModel
from ut_mongo_util import CollectionBaseClass
from scripts.utils.decryption_util import MongoDataEncryption
from scripts.utils.mongo_utils import MongoCollectionBaseClass as UtilsMongoCollection
class UserCollectionKeys:
KEY_LANGUAGE = "language"
KEY_NAME = "name"
KEY_USER_ID = "user_id"
KEY_SPACE_ID = "space_id"
KEY_USERNAME = "username"
KEY_USER_ROLE = "userrole"
KEY_EMAIL = "email"
class UserSchema(BaseModel):
name: Optional[str] = ""
space_id: Optional[str] = ""
username: Optional[str] = ""
password: Optional[str] = ""
email: Optional[Any] = None
phonenumber: Optional[Any] = None
userrole: Optional[List[str]] = None
user_type: Optional[str] = ""
user_id: Optional[str] = ""
created_by: Optional[str] = ""
encryption_salt: Optional[Dict] = {}
passwordReset: Optional[Dict] = {}
failed_attempts: Optional[int] = 0
is_user_locked: Optional[bool] = False
last_failed_login: Optional[int] = 0
last_logged_in: Optional[int] = 0
last_failed_attempt: Optional[str] = ""
expires_on: Optional[str] = ""
disable_user: Optional[bool] = False
default_user: Optional[bool] = False
created_on: Optional[int] = 0
updated_by: Optional[str] = ""
updated_on: Optional[int] = 0
secret: Optional[str] = ""
password_added_on: Optional[int] = 0
default_space: Optional[str] = ""
fixed_delay: Optional[int] = 0
variable_delay: Optional[int] = 0
class User(CollectionBaseClass):
def __init__(self, mongo_client, space_id=None):
super().__init__(mongo_client, database="catalog_meta_dub", collection="user")
self.space_id = space_id
self.key_user_id = UserCollectionKeys.KEY_USER_ID
self.key_space_id = UserCollectionKeys.KEY_SPACE_ID
self.key_username = UserCollectionKeys.KEY_USERNAME
self.key_email = UserCollectionKeys.KEY_EMAIL
self.find_decrypted = UtilsMongoCollection.find_decrypted.__get__(self, UtilsMongoCollection)
self.get_decrypted_records = UtilsMongoCollection.get_decrypted_records.__get__(self, UtilsMongoCollection)
self.data_encryption = MongoDataEncryption()
def update_user(self, query, data):
"""
The following function will update target details in rule_targets collections
:param self:
:param data:
:return:
"""
return self.update_one(query=query, data=data, upsert=True)
def insert_one_user(self, data):
"""
The following function will insert one user in the
user collections
:param self:
:param data:
:return:
"""
data = self.data_encryption.encrypt_data(data, collection_name="user")
return self.insert_one(data)
def find_user(self, space_id, user_id=None, username=None, email=None, filter_dict=None):
query = {}
if user_id:
query[self.key_user_id] = user_id
if username:
query[self.key_username] = username
if email:
query[self.key_email] = re.compile(email, re.IGNORECASE)
query[self.key_email] = email
user = self.find_decrypted(query=query, filter_dict=filter_dict)
if user:
return UserSchema(**user)
return user
@staticmethod
def get_users_list(space_id=None):
query_json = [
{
"$group": {
"_id": None,
"data": {"$push": {"k": {"$ifNull": ["$user_id", ""]}, "v": {"$ifNull": ["$username", ""]}}},
}
},
{"$replaceRoot": {"newRoot": {"$arrayToObject": "$data"}}},
]
if space_id:
query_json.insert(0, {"$match": {"space_id": space_id}})
return query_json
def users_list_by_aggregate(self, query: list):
return self.aggregate(pipelines=query)
def find_user_by_space_id(self, user_id, space_id):
user = self.find_one(query={self.key_user_id: user_id, self.key_space_id: space_id})
if user:
return dict(user)
return user
def get_all_users(self, filter_dict=None, sort=None, skip=0, limit=None, **query):
users = self.find(filter_dict=filter_dict, sort=sort, skip=skip, limit=limit, query=query)
if users:
return list(users)
return []
def find_user_role_for_user_id(self, user_id, space_id):
query = {"user_id": user_id, "space_id": space_id}
filter_dict = {"userrole": 1, "_id": 0}
return self.find_one(query=query, filter_dict=filter_dict)
def find_base_user(self, space_id=None, user_id=None, username=None, email=None, filter_dict=None):
query = {}
if space_id:
query[self.key_space_id] = space_id
if user_id:
query[self.key_user_id] = user_id
if username:
query[self.key_username] = username
if email:
query[self.key_email] = re.compile(email, re.IGNORECASE)
if not (user := self.find_decrypted(query=query, filter_dict=filter_dict)):
return user
try:
return UserSchema(**user)
except Exception:
return user
def find_by_space(
self,
projections=None,
sort=None,
query_dict=None,
limit=None,
skip=0,
**filters,
) -> Union[Any, None]:
query = {}
if query_dict:
query |= query_dict
if filters:
query.update(filters)
records = self.find(query, projections, sort=sort, limit=limit, skip=skip)
if records:
records = self.get_decrypted_records(records)
return list(records) if records else []
def delete_one_user(self, user_id, space_id):
return self.delete_one(query={self.key_user_id: user_id, self.key_space_id: space_id})
def update_one_user(self, query, data):
"""
The following function will insert one user in the
user collections
:param self:
:param data:
:return:
"""
data = self.data_encryption.encrypt_data(data, collection_name="user")
return self.update_one(query=query, data=data, upsert=True)
def get_data_by_aggregate(self, query_json: list):
if aggregate_data := list(self.aggregate(query_json)):
aggregate_data = self.get_decrypted_records(aggregate_data)
return aggregate_data
return []
def find_by_aggregate(self, query_json: list):
if user_by_aggregate := list(self.aggregate(query_json)):
return user_by_aggregate
return []
def distinct_user(self, query_key, filter_json):
query = {self.key_user_id: filter_json}
return self.distinct(query_key=query_key, filter_json=query)
def find_user_by_param(self, **query):
user = self.get_decrypted_records(self.find(query))
if not bool(user):
user = []
return user
from typing import Optional
from ut_mongo_util import CollectionBaseClass
class UserCollectionKeys:
KEY_LANGUAGE = "language"
KEY_NAME = "name"
KEY_USER_ID = "user_id"
KEY_SPACE_ID = "space_id"
KEY_USERNAME = "username"
KEY_USER_ROLE = "user_role_name"
KEY_EMAIL = "email"
class UserRole(CollectionBaseClass):
def __init__(self, mongo_client, space_id=None):
super().__init__(mongo_client, database="catalog_meta_dub", collection="user_role")
self.space_id = space_id
self.key_user_id = UserCollectionKeys.KEY_USER_ID
self.key_space_id = UserCollectionKeys.KEY_SPACE_ID
def update_user_role(self, query, data):
"""
The following function will update target details in rule_targets collections
:param self:
:param data:
:return:
"""
return self.update_one(query=query, data=data, upsert=True)
def find_user(self, user_id):
user = self.find_one(query={"user_id": user_id})
if user:
return dict(user)
return user
def find_user_name(self, user_id, space_id: Optional[str]):
query = {"user_role_id": user_id, "space_id": space_id}
one_user = self.find_one(filter_dict={"user_role_name": 1, "_id": 0}, query=query)
if one_user is None:
return one_user
return one_user["user_role_name"]
@staticmethod
def get_users_list(space_id=None):
query_json = [
{
"$group": {
"_id": None,
"data": {"$push": {"k": {"$ifNull": ["$user_id", ""]}, "v": {"$ifNull": ["$username", ""]}}},
}
},
{"$replaceRoot": {"newRoot": {"$arrayToObject": "$data"}}},
]
if space_id:
query_json.insert(0, {"$match": {"space_id": space_id}})
return query_json
def users_list_by_aggregate(self, query: list):
return self.aggregate(pipelines=query)
def find_user_by_space_id(self, user_id, space_id):
user = self.find_one(query={self.key_user_id: user_id, self.key_space_id: space_id})
if user:
return dict(user)
return user
def find_user_role_by_id(self, user_role_id, filter_dict=None):
return self.find_one(query={"user_role_id": user_role_id}, filter_dict=filter_dict)
def find_user_role_by_aggregate(self, query):
if role_by_aggregate := list(self.aggregate(query)):
return role_by_aggregate
return []
from ut_mongo_util import CollectionBaseClass, mongo_client
class UserCollectionKeys:
KEY_LANGUAGE = "language"
KEY_NAME = "name"
KEY_USER_ID = "user_id"
KEY_SPACE_ID = "space_id"
KEY_USERNAME = "username"
KEY_USER_ROLE = "userrole"
class UserSpace(CollectionBaseClass):
key_username = UserCollectionKeys.KEY_USERNAME
key_user_id = UserCollectionKeys.KEY_USER_ID
key_language = UserCollectionKeys.KEY_LANGUAGE
key_name = UserCollectionKeys.KEY_NAME
key_space_id = UserCollectionKeys.KEY_SPACE_ID
def __init__(self):
super().__init__(
mongo_client,
database="catalog_meta_dub",
collection="user_space",
)
def fetch_user_space(self, user_id, space_id):
query = {self.key_user_id: user_id, self.key_space_id: space_id}
user = self.find_one(query=query)
return user
def fetch_user_space_with_details(self, user_id, space_id):
query = [
{"$match": {"user_id": user_id, "space_id": space_id}},
{"$lookup": {"from": "user", "localField": "user_id", "foreignField": "user_id", "as": "user_details"}},
{"$unwind": {"path": "$user_details"}},
{
"$project": {
"space_id": 1,
"AccessLevel": 1,
"access_group_ids": 1,
"userrole": 1,
"user_id": 1,
"name": "$user_details.name",
"email": "$user_details.email",
"username": "$user_details.username",
}
},
]
user = self.aggregate(query)
user_list = list(user)
if user_list:
return user_list[0]
else:
return None
def find_user_role_for_user_id(self, user_id, space_id):
query = {"user_id": user_id, "space_id": space_id}
filter_dict = {"userrole": 1, "_id": 0}
return self.find_one(query=query, filter_dict=filter_dict)
def update_one_user_space(self, data, user_id, space_id):
query = {self.key_user_id: user_id, "space_id": space_id}
return self.update_one(query=query, data=data, upsert=True)
def insert_one_user(self, data):
"""
The following function will insert one user in the
user collections
:param self:
:param data:
:return:
"""
return self.insert_one(data)
def delete_one_user_space(self, user_id, space_id):
return self.delete_one(query={self.key_user_id: user_id, self.key_space_id: space_id})
from typing import Dict, Optional
from pydantic import BaseModel
from scripts.utils.mongo_utils import MongoCollectionBaseClass
class WorkSpacesSchema(BaseModel):
"""
This is the Schema for the Mongo DB Collection.
All datastore and general responses will be following the schema.
"""
space_id: Optional[str] = ""
space_name: Optional[str] = ""
space_type: Optional[str] = ""
meta: Optional[Dict] = {}
user_id: Optional[str] = ""
source_meta: Optional[Dict] = {}
access_token: Optional[str] = ""
catalog_url: Optional[str] = ""
class WorkSpaces(MongoCollectionBaseClass):
def __init__(self, mongo_client):
super().__init__(mongo_client, database="catalog_meta_dub", collection="workspaces")
@property
def key_space_id(self):
return "space_id"
@property
def key_space_name(self):
return "space_name"
def find_space(self, space_id=None, space_name=None, filter_dict=None):
"""
The following function will give one record for a given set of
search parameters as keyword arguments
:param space_name:
:param filter_dict:
:param space_id:
:return:
"""
query = {}
if space_id:
query.update({self.key_space_id: space_id})
if space_name:
query.update({self.key_space_name: space_name})
record = self.find_one(query=query, filter_dict=filter_dict)
if not record:
return {}
return WorkSpacesSchema(**record).dict()
def find_space_by_query(self, query, filter_dict=None):
record = self.find(query=query, filter_dict=filter_dict)
if record:
return record
return []
def fetch_space_details(self):
query = {}
filter_dict = {self.key_space_id: 1, "_id": 0, self.key_space_name: 1}
records = self.find(query=query, filter_dict=filter_dict)
if records:
space_name_mapp = {}
for record in records:
space_name_mapp[record.get(self.key_space_id)] = record.get(self.key_space_name)
return space_name_mapp
return {}
def insert_one_space(self, data):
"""
The following function will insert one space in the
customer_spaces collections
:param self:
:param data:
:return:
"""
return self.insert_one(data)
def delete_one_space(self, space_id):
if space_id:
query = {self.key_space_id: space_id}
return self.delete_one(query)
else:
return False
def get_space_data_by_aggregate(self, query: list):
return list(self.aggregate(pipelines=query))
def update_one_space(self, data, space_id, upsert=False):
"""
The following function will update one step in
steps collection based on the given query
:param data:
:param upsert:
:param space_id:
:return:
"""
query = {"space_id": space_id}
response = self.update_one(data=data, upsert=upsert, query=query)
return response
def delete_workspaces(self, space_id_list):
query = {self.key_space_id: {"$in": space_id_list}}
response = self.delete_many(query)
return response.deleted_count
import re
from copy import deepcopy
import bcrypt
from ut_mongo_util import mongo_client
from scripts.config.default_meta_catalog_constants import DEFAULT_USER
from scripts.db.user import User
from scripts.errors import CustomError
from scripts.logging import logger
class UserCreation:
def __init__(self):
"""
The __init__ function is called when the class is instantiated.
It sets up the instance of the class, and defines all of its attributes.
:param self: Represent the instance of the class
:param : Pass the mongo client to the class
:return: The following:
"""
self.user_conn = User(mongo_client=mongo_client)
@staticmethod
def validate_password_strength(password):
"""
This method is to validate the password strength
"""
try:
logger.info("Validate password strength")
conditions = [
len(password) > 7,
len(password) < 65,
re.search("[a-z]", password) is not None,
re.search(r"\d", password) is not None,
re.search("[A-Z]", password) is not None,
re.search("[!@#$%^&*]", password) is not None,
not re.search("\\s", password),
]
password_validation_status = all(conditions)
except Exception as e:
logger.error(f"Error occurred while validating the password strength : {str(e)}")
password_validation_status = False
return password_validation_status
def encrypt_password(self, password):
# Decrypt encrypted password
if not self.validate_password_strength(password):
message = (
"Password should contain minimum of 8 characters with at least a symbol, "
"one upper and one lower case letters and a number"
)
raise CustomError(message)
hash_pass = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
if isinstance(hash_pass, bytes):
hash_pass = hash_pass.decode()
return hash_pass
def create_default_user(self, password):
try:
enc_password = self.encrypt_password(password)
admin_user = deepcopy(DEFAULT_USER)
admin_user.update({"password": enc_password})
user_record = self.user_conn.find_user_by_space_id(
user_id=admin_user.get("user_id"), space_id=admin_user.get("space_id")
)
if not user_record:
self.user_conn.insert_one_user(admin_user)
except Exception as e:
logger.error(f"Error creating public default data {str(e)}")
import json
import os
from ut_mongo_util import mongo_client
from scripts.config.default_meta_catalog_constants import DEFAULT_USER_ROLES
from scripts.db.user import User
from scripts.db.user_role import UserRole
from scripts.db.redis_connection import user_role_permissions_redis
from scripts.logging import logger
class UserRolesPermissionKeys:
required_keys = [
each_key.strip()
for each_key in os.environ.get(
"USER_ROLE_KEYS",
default="edit, create, view, publish, delete, "
"clone, share,attachLicense, operator,"
"auditLogs, screenshotRestriction, dashboard, title, auditLogsDownload, attachLicense, feedback, "
"uploadLicense",
).split(",")
]
class UserRoleCreation:
def __init__(self):
"""
The __init__ function is called when the class is instantiated.
It sets up the instance of the class, and defines all of its attributes.
:param self: Represent the instance of the class
:param : Pass the mongo client to the class
:return: The following:
"""
self.user_role_conn = UserRole(mongo_client=mongo_client)
self.user_conn = User(mongo_client=mongo_client)
@staticmethod
def save_permissions(p_id, permissions, user_role_id):
for permission_name, permissions_allowed in permissions.items():
permissions_filtered = {x: y for x, y in permissions_allowed.items() if x in UserRolesPermissionKeys.required_keys}
user_role_permissions_redis.hset(f"{p_id}__{user_role_id}", permission_name,
json.dumps(permissions_filtered))
def user_role_redis_update(self):
"""
Function to update redis with the user roles
"""
try:
logger.info("Updating redis with user role details")
space_details = self.user_conn.find({}, {"_id": 0, "space_id": 1, "userrole": 1})
for each_space in space_details:
all_roles = UserRole(mongo_client=mongo_client).find({"user_role_id": {"$in": each_space["userrole"]}})
for user_r in all_roles:
user_role_id = user_r["user_role_id"]
permissions = user_r["user_role_permissions"]
self.save_permissions(each_space["space_id"], permissions, user_role_id)
logger.info("Updated redis with user role details")
return True
except Exception as e:
logger.exception("Failed to update redis with user roles", str(e))
return False
def create_user_role(self):
try:
for user_role in DEFAULT_USER_ROLES:
self.user_role_conn.update_user_role({"user_role_id": user_role.get("user_role_id")}, user_role)
except Exception as e:
logger.error(f"Error creating public default data {str(e)}")
import json
import logging
import time
from copy import deepcopy
from ut_mongo_util import mongo_client
from scripts.config.default_meta_catalog_constants import DEFAULT_SPACE
from scripts.db.user_space import UserSpace
from scripts.db.workspaces import WorkSpaces
from scripts.db.redis_connection import space_db
from scripts.errors import WorkspaceNameExistError
from scripts.schema import CreateWorkspace
class WorkspaceCreation:
def __init__(self):
"""
The __init__ function is called when the class is instantiated.
It sets up the instance of the class, and defines all of its attributes.
:param self: Represent the instance of the class
:param : Pass the mongo client to the class
:return: The following:
"""
self.workspace_conn = WorkSpaces(mongo_client=mongo_client)
self.user_space_conn = UserSpace()
def validate_name_catalog(self, workspace_name, space_id):
try:
existing_space_details = self.workspace_conn.find_space(space_name=workspace_name, space_id=space_id)
return existing_space_details
except Exception as e:
logging.error(f"Error occurred in the validate name in catalog {str(e)}")
@staticmethod
def set_or_update_redis(redis_client, add_prefix_to_database, space_id):
"""
Checks if a key exists in Redis and inserts or updates it with the given value.
Dynamically builds the value based on `add_prefix_to_database` and `space_id`.
:param redis_client: Redis client instance.
:param add_prefix_to_database: Boolean flag indicating if the prefix should be added.
:param space_id: The space_id used as the prefix for the database.
"""
key = space_id
# Build the dynamic source_meta dictionary
source_meta = {
"add_prefix_to_database": add_prefix_to_database,
"prefix": space_id if add_prefix_to_database else "",
}
# Prepare the JSON object to store in Redis
value_json = json.dumps({"source_meta": source_meta})
try:
# Insert or update the key in Redis
redis_client.set(key, value_json)
logging.info(f"Key '{key}' has been set or updated successfully.")
except Exception as e:
logging.error(f"Error occurred while setting/updating key '{key}': {e}")
raise
def global_catalog_workspace_creation(self):
try:
user_id = "user_097"
data = deepcopy(
CreateWorkspace(
space_id="space_099", space_name="Central Workspace", space_type="public", user_id=user_id
)
)
existing_space_details = self.validate_name_catalog(workspace_name=data.space_name, space_id=data.space_id)
if existing_space_details:
logging.debug(f"It is already existing space_name is {str(data.space_name)}")
return {"space_id": data.space_id}
data.meta.update({"updated_at": int(time.time() * 1000), "created_by": user_id, "updated_by": user_id})
count = self.workspace_conn.update_one_space(data.dict(), data.space_id, upsert=True)
logging.debug(f"Updated Count {str(count)} ")
self.set_or_update_redis(space_db, add_prefix_to_database=False, space_id=data.space_id)
self.user_space_conn.update_one_user_space(DEFAULT_SPACE, DEFAULT_SPACE.get("user_id"),
DEFAULT_SPACE.get("space_id"))
return {"space_id": data.space_id}
except WorkspaceNameExistError:
raise WorkspaceNameExistError
except Exception as e:
logging.error(f"Error occurred in the global catalog creation due to {str(e)}")
\ No newline at end of file
class ILensErrors(Exception):
def __init__(self, msg):
Exception.__init__(self, msg)
"""
Base Error Class
"""
class ILensErrorsWithoutMessage(Exception):
"""Generic iLens Error"""
class ErrorMessages:
ERROR001 = "Authentication Failed. Please verify token"
ERROR002 = "Signature Expired"
ERROR003 = "Signature Not Valid"
ERROR004 = "User Record Not Found"
WORKSPACE_NAME_EXIST_ERROR = "Workspace Name Exist. Please Use different name"
WORKSPACE_CATALOG_URL_ERROR = "Please add valid catalog url"
class JobCreationError(Exception):
"""
Raised when a Job Creation throws an exception.
Job Creation happens by adding a record to Mongo.
"""
class UnknownError(Exception):
pass
class DuplicateSpaceNameError(Exception):
pass
class KairosDBError(Exception):
pass
class UnauthorizedError(Exception):
pass
class ImageValidation(Exception):
pass
class ILensError(Exception):
pass
class NameExists(Exception):
pass
class InputRequestError(ILensError):
pass
class IllegalTimeSelectionError(ILensError):
pass
class DataNotFound(Exception):
pass
class AuthenticationError(ILensError):
"""
JWT Authentication Error
"""
class JWTDecodingError(Exception):
pass
class DuplicateReportNameError(Exception):
pass
class PathNotExistsException(Exception):
pass
class ImplementationError(Exception):
pass
class UserRoleNotFoundException(Exception):
pass
class CustomError(Exception):
pass
class IllegalToken(ILensErrors):
pass
class InvalidPasswordError(ILensErrors):
pass
class UserNotFound(ILensErrors):
pass
class TooManyRequestsError(Exception):
pass
class FixedDelayError(ILensErrors):
pass
class VariableDelayError(ILensErrors):
pass
class LicenceValidationError(Exception):
pass
class CustomAppError:
FAILED_TO_SAVE = "Failed to save app"
class WorkspaceNameExistError(ILensErrorsWithoutMessage):
pass
class GlobalCatalogError(Exception):
"""Generic GlobalcatalogErrors Error"""
def __init__(self, msg):
Exception.__init__(self, msg)
"""
Base Error Class
"""
class ILensException(Exception):
pass
class MongoException(ILensException):
pass
class MongoConnectionException(MongoException):
pass
class MongoQueryException(MongoException):
pass
class MongoEncryptionException(MongoException):
pass
class MongoRecordInsertionException(MongoQueryException):
pass
class MongoFindException(MongoQueryException):
pass
class MongoDeleteException(MongoQueryException):
pass
class MongoUpdateException(MongoQueryException):
pass
class MongoUnknownDatatypeException(MongoEncryptionException):
pass
class MongoDistictQueryException(MongoException):
pass
class MongoFindAndReplaceException(MongoException):
pass
class MongoObjectDeserializationException(MongoException):
pass
import logging
import os
from logging import StreamHandler
from logging.handlers import RotatingFileHandler, SocketHandler
import yaml
from scripts.config import LoggVariables
# this method is to read the configuration from backup.conf
def read_configuration(file_name):
"""
:param file_name:
:return: all the configuration constants
"""
with open(file_name) as stream:
try:
return yaml.safe_load(stream)
except Exception as e:
print(f"Failed to load Configuration. Error: {e}")
config = read_configuration("scripts/logging/logger_conf.yml")
logging_config = config["logger"]
logging_config["level"] = LoggVariables.LOG_LEVEL
def add_logging_level(level_name, level_num, method_name=None):
"""
Comprehensively adds a new logging level to the `logging` module and the
currently configured logging class.
`level_name` becomes an attribute of the `logging` module with the value
`level_num`. `method_name` becomes a convenience method for both `logging`
itself and the class returned by `logging.getLoggerClass()` (usually just
`logging.Logger`). If `method_name` is not specified, `level_name.lower()` is
used.
To avoid accidental clobbering of existing attributes, this method will
raise an `AttributeError` if the level name is already an attribute of the
`logging` module or if the method name is already present
Example
-------
> add_logging_level('TRACE', logging.DEBUG - 5)
> logging.getLogger(__name__).setLevel("TRACE")
> logging.getLogger(__name__).trace('that worked')
> logging.trace('so did this')
> logging.TRACE
"""
if not method_name:
method_name = level_name.lower()
if hasattr(logging, level_name):
raise AttributeError("{} already defined in logging module".format(level_name))
if hasattr(logging, method_name):
raise AttributeError("{} already defined in logging module".format(method_name))
if hasattr(logging.getLoggerClass(), method_name):
raise AttributeError("{} already defined in logger class".format(method_name))
def log_for_level(self, message, *args, **kwargs):
if self.isEnabledFor(level_num):
self._log(level_num, message, args, **kwargs)
def log_to_root(message, *args, **kwargs):
logging.log(level_num, message, *args, **kwargs)
logging.addLevelName(level_num, level_name)
setattr(logging, level_name, level_num)
setattr(logging.getLoggerClass(), method_name, log_for_level)
setattr(logging, method_name, log_to_root)
def get_logger():
"""
Creates a rotating log
"""
__logger__ = logging.getLogger("ilens")
add_logging_level("QTRACE", logging.DEBUG - 5)
__logger__.setLevel(logging_config["level"].upper())
log_formatter = "%(asctime)s - %(levelname)-6s - [%(threadName)5s:%(funcName)5s():" + "%(lineno)s] - %(message)s"
time_format = "%Y-%m-%d %H:%M:%S"
file_path = LoggVariables.LOGS_MODULE_PATH
formatter = logging.Formatter(log_formatter, time_format)
for each_handler in logging_config["handlers"]:
if each_handler["type"] in ["RotatingFileHandler"] and LoggVariables.ENABLE_FILE_LOG:
if not os.path.exists(file_path):
os.makedirs(file_path)
log_file = os.path.join(file_path, f"{logging_config['name']}.log")
temp_handler = RotatingFileHandler(
log_file, maxBytes=each_handler["max_bytes"], backupCount=each_handler["back_up_count"]
)
temp_handler.setFormatter(formatter)
elif each_handler["type"] in ["SocketHandler"]:
temp_handler = SocketHandler(each_handler["host"], each_handler["port"])
elif each_handler["type"] in ["StreamHandler"] and LoggVariables.ENABLE_CONSOLE_LOG:
temp_handler = StreamHandler()
temp_handler.setFormatter(formatter)
else:
temp_handler = None
if temp_handler:
__logger__.addHandler(temp_handler)
return __logger__
logger = get_logger()
logger:
name: catalog_default_meta
level: INFO
handlers:
- type: RotatingFileHandler
file_path: data/catalog_default_meta/logs/
max_bytes: 100000000
back_up_count: 5
- type: SocketHandler
host: localhost
port: 23582
- type: StreamHandler
name: catalog_default_meta
import time
from typing import Optional
from pydantic import Field, BaseModel
class CreateWorkspace(BaseModel):
space_id: str
space_name: str
space_type: str
user_id: str = Field(default="user_097")
source_meta: Optional[dict] = Field(default={"add_prefix_to_database": False})
meta: Optional[dict] = Field(default={"created_at": int(time.time())})
access_token: Optional[str] = Field(default=None)
catalog_url: Optional[str] = Field(default=None)
import json
from functools import lru_cache
@lru_cache()
def get_db_name(redis_client, space_id: str, database: str, delimiter="__"):
if not space_id:
return database
val = redis_client.get(space_id)
if val is None:
raise ValueError(f"Unknown Space, Space ID: {space_id} Not Found!!!")
val = json.loads(val)
if not val:
return database
# Get the prefix flag to apply space_id prefix to any db
prefix_condition = bool(val.get("source_meta", {}).get("add_prefix_to_database"))
if prefix_condition:
# Get the prefix name from mongo or default to space_id
prefix_name = val.get("source_meta", {}).get("prefix") or space_id
return f"{prefix_name}{delimiter}{database}"
return database
@lru_cache()
def get_redis_db_prefix(redis_client, space_id: str, delimiter="__"):
if not space_id:
return False
val = redis_client.get(space_id)
if val is None:
return False
val = json.loads(val)
if not val:
return False
# Get the prefix flag to apply space_id prefix to any db
prefix_condition = bool(val.get("source_meta", {}).get("add_prefix_to_database"))
if prefix_condition:
# Get the prefix name from mongo or default to space_id
prefix_name = val.get("source_meta", {}).get("prefix") or space_id
return f"{prefix_name}{delimiter}"
return False
def get_space_data_from_redis(redis_client, space_id: str):
record = redis_client.get(space_id)
if record is None:
raise ValueError(f"Unknown Space, Space ID: {space_id} Not Found!!!")
if record := json.loads(record):
return record
import base64
import copy
import hashlib
import json
import os
from datetime import datetime
from operator import itemgetter
from uuid import UUID
from Crypto import Random
from Crypto.Cipher import AES
from scripts.config import Security
from scripts.errors import CustomError
from scripts.errors.mongo_exceptions import (
MongoException,
MongoUnknownDatatypeException,
)
from scripts.logging import logger
from scripts.utils.jwt_util import JWT
exclude_encryption_datatypes = (
datetime,
UUID,
)
class MongoEncryptionConstants:
# mongo encryption keys
key_encrypt_keys = "encrypt_keys"
key_exclude_encryption = "exclude_encryption"
product_encrypted = "product_encrypted"
max_docs_per_batch = 5
cipher_key = "eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJrZXkiOiItLS0tLUJFR0lOIFJTQSBQUklWQVRFIEtFWS0tLS0tXG5NSUlFb3dJQkFBS0NBUUVBclZFRDVjcit0TXRGdFZtWGwyTzBjdlFiRWdvWVNJRmQ4eXZrbW02ejdYQWRYNkVnXG5Za0tlejB5ZFRsMjZLT2RKMThBN0tuOGV0V0dlOG5Ua1NHaGVKbDlybi9KK2xFMXpwbzRaZy9UM3dEbk04Rk0zXG55dU0yNnZwSWIrMG9KbU5jOURrRlhvNFd0eFJGWkR5dGRFVGcvWXlJK2VKWURSRHJaU3JscUF6SURwQWRMcHY5XG5VaHNNaFlRKzJuM1BjYXVMZUpiMGRLUFZUYzZrU3ZHQ3MzTFowV3lUYlJuUXlKTUNXbmF4enBTSVVjSDdxYXFPXG5LQy9mQkNLc1ptUmpSTlNtUTNnZXB6NFZuUUt5SkNtN0NKaytjUWlRTVF6cnNwUlB2aG1Hb3VIWlVNMzZLanNHXG42eWx4MkJ1Nk9ZeS9IYnJkUmtKS05sdjN1NkJCTDZQbi9aSlpHUUlEQVFBQkFvSUJBQkk4ZU1oRVNuWWJtMVJJXG5XOFM4WXplSU8xUHoxM2hEa3U3Y0FyY0VLRzcya2NTbTU4a25BTjVIamJLNTluVkkxdEo2Z2M4NEpuTkgxUWxtXG5ac0crcDQ5cWtXQzRTM3pQeEhnMU1mYWFQenBNNnFVcjRHNDY1Nk9rVjV4ZFRCRHorZ3NoZDlEcDZ2WnpEZFVjXG45RlJNVGc4bnF4Nzk0NjFtUnhwelA4eGxvYVEwTmNLQnpGSzllM2cvNGk3Mkx3Z05QM0U2eG1FU2l1N2dvcUoxXG5HT0FJMm1KaWUzVFRZMXo4c2Y0dWlTRkxNYUZyRXhrcTR6NEtrd1M3cUYybk9KeGh2OEgvZzlUR1BOV3JuekF3XG55Qkh3SU5Cb1VhSndpT1Q1MXh4SURMZ05RaU5vSUZ1YU1LVnUybCtyV3RvUVdLR2lPbncxWmhZeGVKQ1hCeVhDXG5RcXBBZmdFQ2dZRUF3cHpTZnlvdDNQQWx4bTlpVks1WmM2bFJkQnE3SmF6dDd0OTFVNnplWTdDNHh6TkcxVHVmXG5jU1lLM3FSd2xNdzJ1WGw5YXV4eVY0MXJ6aVg5c1podEZVbm00amNHdjlNSGVhQWFTU1BTc3ZydFpERkJTN2t5XG5sMkl4azEwNzhMVFpDTE1ZbUFLQ0FyMlhMbVNoQlBTVmN1YUxrRFJYNHJ2dzdzY1dtTWI4NndFQ2dZRUE0L3lDXG5FQWpYbEwwV2xPWURKM0ovL1BnNGlCdEllZEhYbW4zMGdvTnVDQkJhb1l5Z1hhcGV5dEVtVTJxNWh5YlFUTVRYXG5WbC92SUFGaXUwVFg4MVZRN0xETEphYmVyLzdHRXNJVDN4K3htMGpGdk94RllWaFQ1YjBzMHoxQ1FvbG5SRnNBXG5kSXdRNXU1R2tQNjVoeUpVYTNaTWgrTDZWaXNTQ1RLcEFjbzlaaGtDZ1lBS0ZaNUN3S2pIdmhuM0FtYVNCTWJWXG4yM3hCQy9HT3JqdFdHWFkyODhwQ1dESDdBSWszRzNQVHBTa0RDSHBjKzRnS2JHVTNXVEZEb0N4cDdrWUxJZDdsXG5MNE1yVGJhbjBnT2RKZEsyMzRoWGhmRXZNKzR5UWxLQXpiSEw5UlRhRUVUKzBtai8xNEZ0S3UzZWxaQlNkV29aXG5IaUUxUThFYUdxc05kSHVUUnh4c0FRS0JnUUNxdzdlbnl2ZXVzUEw1RkUvSWZEcmhnQXJYNTVlaHAwdVdyRUU0XG5nTGtwMFJZUmF3T3pKS2xid015MExueElmd29HZG1uVWlJYlRzallCanM4eHMvV3BVOExWc09lYmEzbHhFMjFPXG44cTVWWVd5NjFUNGlhOVpyamdiRk1sMHUrVHdnTndsZ1FvbG1iNUxyaDkvdkdBZWpkamhjaitaeUpGQ2VFeFFFXG5BemQ2QVFLQmdCaGUrRndNaFR1czk2MWpxRUtYQlhtMC9PYU9nek9kZ2wvYXN1QzhvTFU3Y0FWRDdzUzJMRmNVXG51N29mSVZJRzZjUldScnVhakl1Q2RsSWNMT2VkVEU0WUw1akF1UkwxVHlWdnhNbTBGc3JrV1BBQkZySFdoc1pzXG5UU3pwaU9GSmtMSlRWblQ3aGxXLyttMHFyS2lXMHpyRnphMEphRndQL2xqK2hScllHa09sXG4tLS0tLUVORCBSU0EgUFJJVkFURSBLRVktLS0tLSIsImlzcyI6ImlsZW5zIiwiZXhwIjoxNzg4NTg2MzYyfQ.K6PrPcum1ACp9jQtqL3oNncnmXtTnPEOLYWCaaHmFMpLnAPAnKlYblsQkx4nv4pskJ3DBzSk6H-7Tnns4oejfaZI56wHhGz99JZN9mQ9JrQazZ01uccAwhcaOOMnMEny5J4Q6FB0OyyNIxSsScx2s21Vx-eJvOV1FOrCBjvZG78"
enc = AES.MODE_CBC
jwt = JWT()
try:
file_name = Security.ENCRYPTION_CONSTANTS_FILE_PATH
if not os.path.exists(file_name):
encrypt_collection_dict = {}
else:
with open(file_name) as f:
mongo_encryption_constants_data = json.load(f)
if "encrypt_collection_dict" in mongo_encryption_constants_data and Security.USER_ENCRYPTION:
encrypt_collection_dict = mongo_encryption_constants_data["encrypt_collection_dict"]
else:
encrypt_collection_dict = {}
except Exception as es:
encrypt_collection_dict = {}
logger.exception(f" Unable to fetch mongo encryption constants:{str(es)}")
class AESCipher:
def __init__(self, key):
self.bs = AES.block_size
self.key = hashlib.sha256(key.encode()).digest()
def get_cipher(self, iv):
return AES.new(self.key, enc, iv)
def encrypt(self, raw):
raw = self._pad(raw)
iv = Random.new().read(AES.block_size)
cipher = self.get_cipher(iv)
return base64.b64encode(iv + cipher.encrypt(raw.encode()))
def decrypt(self, enc):
enc = base64.b64decode(enc)
iv = enc[: AES.block_size]
cipher = self.get_cipher(iv)
return self._unpad(cipher.decrypt(enc[AES.block_size :])).decode("utf-8")
def _pad(self, s):
return s + (self.bs - len(s) % self.bs) * chr(self.bs - len(s) % self.bs)
@staticmethod
def _unpad(s):
return s[: -ord(s[len(s) - 1 :])]
class MongoDataEncryption:
def __init__(self):
self.decoded_aes = jwt.decode(MongoEncryptionConstants.cipher_key).get("key")
self.aes_cipher = AESCipher(key=self.decoded_aes)
# pass
def create_encrypted_string(self, payload):
return self.aes_cipher.encrypt(raw=json.dumps(payload))
def create_decrypted_string(self, payload):
decrypted_result = self.aes_cipher.decrypt(enc=payload)
try:
result = json.loads(decrypted_result)
except json.JSONDecodeError:
if isinstance(decrypted_result, bytes):
result = decrypted_result.decode("utf-8")
else:
result = decrypted_result
return result
def encrypt_data(self, json_data, collection_name):
"""
Encrypt the data in mongo based on the collection and key to be encrypted.
:param json_data: The data to be encrypted
:param collection_name: The collection where the document is stored
:return: Encrypted document based on product defined configuration.
"""
try:
if collection_name in encrypt_collection_dict.keys():
if isinstance(json_data, list):
encrypted_data = []
for data in encrypted_data:
dict_data = self.encrypt_dict_data(doc=data, collection_name=collection_name)
encrypted_data.append(dict_data)
elif isinstance(json_data, dict):
encrypted_data = self.encrypt_dict_data(doc=json_data, collection_name=collection_name)
else:
raise MongoUnknownDatatypeException(
"Unsupported datatype '{}' is being inserted to mongodb.".format(type(json_data))
)
else:
logger.info("Given data is not a part of the Mongo encryption setup. Skipping encryption")
if isinstance(json_data, dict):
encrypted_data = json_data
encrypted_data[MongoEncryptionConstants.product_encrypted] = False
else:
encrypted_data = json_data
return encrypted_data
except MongoException as e:
raise MongoException(str(e))
except Exception as e:
raise MongoException("Server faced a problem when encrypting the data --> {}".format(str(e)))
def encrypt_dict_data(self, doc, collection_name): # NOSONAR
"""
This method crawls the document and encrypts the keys that are marked for encryption.
Skips encrypting the keys with values of the datatypes defines in the tuple 'exclude_encryption_datatypes'
Adds two new keys to the document 'product_encrypted' and 'encryption_salt'
Key product_encrypted - Is a boolean value which flags a document as encrypted by this utility.
Key encryption_salt - List of all the values that were excluded from encryption due to datatype constraints.
:param doc: The document considered for encryption
:param collection_name: The collection where the document resided.
This is needed for the utility to read the encryption configuration
:return: The input document with the relevant keys encrypted.
"""
try:
is_mlens_encrypted = False
encrypted_data = {}
encrypted_data["encryption_salt"] = {}
if "*" in encrypt_collection_dict[collection_name][MongoEncryptionConstants.key_encrypt_keys]:
# Forming encryption salt
for index, exclude_encryption_datatype in enumerate(exclude_encryption_datatypes):
if exclude_encryption_datatype not in [None, ""]:
encrypted_data["encryption_salt"][f"dt_{index}"] = self.search_datatype(
doc, exclude_encryption_datatype
)
sorted_path = sorted(
encrypted_data["encryption_salt"][f"dt_{index}"], key=itemgetter("p"), reverse=True
)
for _path in sorted_path:
to_pop = self.remove_value_of_datatype_command(_path, "dict_data")
exec(to_pop)
for dt in encrypted_data["encryption_salt"]:
for path_index, _path in enumerate(encrypted_data["encryption_salt"][dt]):
encrypted_data["encryption_salt"][dt][path_index]["p"] = base64.b64encode(_path["p"].encode())
# Encrypting the data
for key in doc.keys():
if (
key
not in encrypt_collection_dict[collection_name][MongoEncryptionConstants.key_exclude_encryption]
):
encrypted_data[key] = {
"d": self.create_encrypted_string(payload=self.convert(doc[key])),
"t": base64.b64encode(type(doc[key]).__name__.encode()),
}
is_mlens_encrypted = True
else:
encrypted_data[key] = doc[key]
else:
for key in doc.keys():
if key in encrypt_collection_dict[collection_name][MongoEncryptionConstants.key_encrypt_keys]:
# Forming encryption salt
for index, exclude_encryption_datatype in enumerate(exclude_encryption_datatypes):
if exclude_encryption_datatype not in [None, ""]:
temp_dict_data = {}
temp_dict_data[key] = copy.deepcopy(doc[key])
encrypted_data["encryption_salt"][f"dt_{index}"] = self.search_datatype(
temp_dict_data, exclude_encryption_datatype
)
sorted_path = sorted(
encrypted_data["encryption_salt"][f"dt_{index}"],
key=itemgetter("p"),
reverse=True,
)
for _path in sorted_path:
to_pop = self.remove_value_of_datatype_command(_path, "dict_data")
exec(to_pop)
for dt in encrypted_data["encryption_salt"]:
for path_index, _path in enumerate(encrypted_data["encryption_salt"][dt]):
encrypted_data["encryption_salt"][dt][path_index]["p"] = base64.b64encode(
_path["p"].encode()
)
# Encrypting the data
encrypted_data[key] = {
"d": self.create_encrypted_string(payload=self.convert(doc[key])),
"t": base64.b64encode(type(doc[key]).__name__.encode()),
}
is_mlens_encrypted = True
else:
encrypted_data[key] = doc[key]
encrypted_data[MongoEncryptionConstants.product_encrypted] = is_mlens_encrypted
if not encrypted_data[MongoEncryptionConstants.product_encrypted]:
del encrypted_data["encryption_salt"]
return encrypted_data
except MongoException as e:
raise MongoException(str(e))
except Exception as e:
raise MongoException("Server faced a problem when encrypting the data --> {}".format(str(e)))
def decrypt_data(self, dict_data, _collection_name): # NOSONAR
"""
This method decrypts all the data that is encrypted.
Keys that were excluded during encryption and have been added to the encryption_salt
will be added back to their original positions.
:param dict_data: The document that needs to be decrypted
:param _collection_name: The collection to which the document belongs to
:return: The decrypted data with the original data types intact
"""
try:
if _collection_name in encrypt_collection_dict.keys():
decrypted_data = {}
if "*" in encrypt_collection_dict[_collection_name][MongoEncryptionConstants.key_encrypt_keys]:
for key in dict_data.keys():
if key not in encrypt_collection_dict[_collection_name][
MongoEncryptionConstants.key_exclude_encryption
] and not isinstance(dict_data[key], exclude_encryption_datatypes):
if isinstance(dict_data[key], dict):
if "d" in dict_data[key].keys() and "t" in dict_data[key].keys():
decrypted_data[key] = self.decrypt_convert_proper_data_type(
data=self.create_decrypted_string(payload=dict_data[key]["d"]),
data_type=base64.b64decode(dict_data[key]["t"].decode()).decode(),
)
else:
decrypted_data[key] = dict_data[key]
else:
decrypted_data[key] = dict_data[key]
else:
decrypted_data[key] = dict_data[key]
else:
for key in dict_data.keys():
if key in encrypt_collection_dict[_collection_name][
MongoEncryptionConstants.key_encrypt_keys
] and not isinstance(dict_data[key], exclude_encryption_datatypes):
if isinstance(dict_data[key], dict):
if "d" in dict_data[key].keys() and "t" in dict_data[key].keys():
decrypted_data[key] = self.decrypt_convert_proper_data_type(
data=self.create_decrypted_string(payload=dict_data[key]["d"]),
data_type=base64.b64decode(dict_data[key]["t"].decode()).decode(),
)
else:
decrypted_data[key] = dict_data[key]
else:
decrypted_data[key] = dict_data[key]
else:
decrypted_data[key] = dict_data[key]
else:
decrypted_data = dict_data
return decrypted_data
except MongoException as e:
raise MongoException(str(e))
except Exception as e:
raise MongoException(f"Server faced a problem when decrypting the data: {str(e)}")
@staticmethod
def decrypt_convert_proper_data_type(data, data_type):
"""
Convert the de-serialized JSON object to the original data-type
:param data: The de-serialized data
:param data_type: The original data type to which the de-serialized data should be converted to
:return: The de-serialized data with it's original data type.
"""
if data_type == "int":
return int(data)
elif data_type == "list":
return data
elif data_type == "dict":
return data
elif data_type == "bool":
return data
else:
return data.lstrip('"').rstrip('"')
def convert(self, data):
"""
Convert all byte-like objects into the proper data types.
This supports conversion of nested dict, list and tuples.
:param data:
:return:
"""
if isinstance(data, bytes):
return data.decode("ascii")
if isinstance(data, dict):
return dict(map(self.convert, data.items()))
if isinstance(data, tuple):
return map(self.convert, data)
if isinstance(data, list):
return list(map(self.convert, data))
return data
def search_datatype(self, _input, search_type, prev_datapoint_path=""): # NOSONAR
"""
Search for an excluded data type in a nested dictionary or list and record it's path in the document.
This does not support the exclusion of data of types dict and list.
:param _input: The input data
:param search_type: The data type to be searched for to exclude.
:param prev_datapoint_path: The path of a value in a nested dict or nested list.
:return: List of dictionaries, with each dictionary containing the true value and it's path.
"""
try:
output = []
current_datapoint = _input
current_datapoint_path = prev_datapoint_path
if search_type is dict:
raise CustomError("Searching for datatype dict is not supported!")
elif search_type is list:
raise CustomError("Searching for datatype list is not supported!")
else:
if isinstance(current_datapoint, dict):
for dkey in current_datapoint:
temp_datapoint_path = current_datapoint_path
temp_datapoint_path += "dict-{}.".format(dkey)
for index in self.search_datatype(current_datapoint[dkey], search_type, temp_datapoint_path):
output.append(index)
elif isinstance(current_datapoint, list):
for index in range(0, len(current_datapoint)):
temp_datapoint_path = current_datapoint_path
temp_datapoint_path += "list-{}.".format(index)
for index_1 in self.search_datatype(current_datapoint[index], search_type, temp_datapoint_path):
output.append(index_1)
elif isinstance(current_datapoint, search_type):
output.append({"p": current_datapoint_path, "v": current_datapoint})
output = filter(None, output)
return list(output)
except Exception:
raise CustomError(f"Server faced a problem when searching for instances of datatype --> '{search_type}' ")
@staticmethod
def remove_value_of_datatype_command(remove_value, var_name):
"""
This method produces the command for the value to be removed from a nested dict or list,
when given the path of that value in the source variable.
:param remove_value: The value (it's path) to be removed.
:param var_name: The variable on which the exec function should run on to remove the non-serializable value.
:return: The final command that will run in the exec function to remove the value from a nested dict or list.
"""
temp_path = ""
individual_path_list = remove_value["p"].split(".")
individual_path_list.remove("")
if individual_path_list[len(individual_path_list) - 1].split("-")[0] == "dict":
orig_path = "del {var_name}{path}"
elif individual_path_list[len(individual_path_list) - 1].split("-")[0] == "list":
pop_index = ".pop({})".format(individual_path_list[len(individual_path_list) - 1].split("-")[1])
orig_path = "{var_name}{path}" + pop_index
individual_path_list.pop(len(individual_path_list) - 1)
else:
return
for _path_index, path in enumerate(individual_path_list):
if path.split("-")[0] == "dict":
temp_path += '["{}"]'.format(path.split("-")[1])
elif path.split("-")[0] == "list":
temp_path += "[{}]".format(path.split("-")[1])
orig_path = orig_path.format(path=temp_path, var_name=var_name)
return orig_path
@staticmethod
def add_value_datatype_command(add_value, var_name, value):
"""
This method produces the command for the value to be added back to a nested dict or list,
when given the path of that value in the source variable.
:param add_value: The value (it's path) to be added
:param var_name: The source variable name on which the exec function should run on.
:param value: The original non-serialized value.
:return: The command to be executed on the source variable.
"""
path_string = ""
temp_path_string = ""
individual_path_list = add_value["p"].split(".")
individual_path_list.remove("")
for _path_index, path in enumerate(individual_path_list):
if path.split("-")[0] == "dict":
temp_path_string = '["{}"]'.format(path.split("-")[1])
elif path.split("-")[0] == "list":
temp_path_string = "[{}]".format(path.split("-")[1])
else:
raise CustomError("Unsupported datatype given for add value")
path_string += temp_path_string
if individual_path_list[len(individual_path_list) - 1].split("-")[0] == "dict":
command = "{var_name}{path} = {value}".format(var_name=var_name, path=path_string, value=value)
elif individual_path_list[len(individual_path_list) - 1].split("-")[0] == "list":
command = "{var_name}{path}].append({value})".format(
var_name=var_name, path=path_string.rstrip(temp_path_string), value=value
)
else:
raise CustomError("Unsupported datatype given for add value")
return command
import jwt
from jwt.exceptions import (
ExpiredSignatureError,
InvalidSignatureError,
MissingRequiredClaimError,
)
from scripts.config import KeyPath
from scripts.errors import AuthenticationError, ErrorMessages
from scripts.logging import logger
class Secrets:
LOCK_OUT_TIME_MINS = 30
leeway_in_mins = 10
unique_key = "45c37939-0f75"
token = "8674cd1d-2578-4a62-8ab7-d3ee5f9a"
issuer = "ilens"
alg = "RS256"
SECRET_FOR_SUPPORT_LENS = "WeSupport24X7UnifyTwinX#"
ISS = "unifytwin"
AUD = "supportlens"
signature_key = "kliLensKLiLensKL"
signature_key_alg = ["HS256"]
class JWT:
def __init__(self):
self.max_login_age = Secrets.LOCK_OUT_TIME_MINS
self.issuer = Secrets.issuer
self.alg = Secrets.alg
self.public = KeyPath.PUBLIC
self.private = KeyPath.PRIVATE
def encode(self, payload):
try:
with open(self.private) as f:
key = f.read()
return jwt.encode(payload, key, algorithm=self.alg)
except Exception as e:
logger.exception(f"Exception while encoding JWT: {str(e)}")
raise
finally:
f.close()
def decode(self, token):
try:
with open(self.public) as f:
key = f.read()
return jwt.decode(token, key, algorithms=self.alg)
except Exception as e:
logger.exception(f"Exception while encoding JWT: {str(e)}")
raise
finally:
f.close()
def validate(self, token):
try:
with open(self.public) as f:
key = f.read()
payload = jwt.decode(
token,
key,
algorithms=self.alg,
leeway=Secrets.leeway_in_mins,
options={"require": ["exp", "iss"]},
)
return payload
except InvalidSignatureError:
raise AuthenticationError(ErrorMessages.ERROR003)
except ExpiredSignatureError:
raise AuthenticationError(ErrorMessages.ERROR002)
except MissingRequiredClaimError:
raise AuthenticationError(ErrorMessages.ERROR002)
except Exception as e:
logger.exception(f"Exception while validating JWT: {str(e)}")
raise
finally:
f.close()
"""
Mongo Utility
Author: Irfanuddin Shafi Ahmed
Reference: Pymongo Documentation
"""
import os
from datetime import datetime, timezone
from typing import Dict, List, Optional
from pymongo import MongoClient
from pymongo.cursor import Cursor
from scripts.db.redis_connection import space_db
from scripts.logging import logger
from scripts.utils.db_name_util import get_db_name
from scripts.utils.decryption_util import MongoDataEncryption
META_SOFT_DEL: bool = os.getenv("META_SOFT_DEL", True)
add_fields = "$addFields"
match = "$match"
class MongoConnect:
def __init__(self, uri):
try:
self.uri = uri
self.client = MongoClient(self.uri, connect=False)
except Exception as e:
logger.error(f"Exception in connection {(str(e))}")
raise e
def __call__(self, *args, **kwargs):
return self.client
def __repr__(self):
return f"Mongo Client(uri:{self.uri}, server_info={self.client.server_info()})"
class MongoCollectionBaseClass:
def __init__(self, mongo_client, database, collection, soft_delete: bool = META_SOFT_DEL):
self.client = mongo_client
self.database = database
self.collection = collection
self.soft_delete = soft_delete
self.data_encryption = MongoDataEncryption()
# Variable to preserve initiated database
# (if database name changes during runtime)
self.__database = None
def __repr__(self):
return f"{self.__class__.__name__}(database={self.database}, collection={self.collection})"
@property
def space_id(self):
return self.space_id
@space_id.setter
def space_id(self, space_id):
if self.__database is None:
# storing original db name if None
self.__database = self.database
self.database = get_db_name(
redis_client=space_db,
space_id=space_id,
database=self.__database,
)
def get_decrypted_records(self, records):
return [
self.data_encryption.decrypt_data(each_record, _collection_name="user")
for each_record in records
]
def find_decrypted(self, query: Dict, filter_dict: Optional[Dict] = None):
try:
database_name = self.database
collection_name = self.collection
if filter_dict is None:
filter_dict = {"_id": 0}
db = self.client[database_name]
collection = db[collection_name]
if mongo_response := collection.find_one(query, filter_dict):
mongo_response = [mongo_response]
mongo_response = self.get_decrypted_records(records=mongo_response)
return mongo_response[0]
else:
return mongo_response
except Exception as e:
logger.error(str(e))
raise
def insert_one(self, data: Dict):
"""
The function is used to inserting a document to a collection in a Mongo Database.
:param data: Data to be inserted
:return: Insert ID
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.insert_one(data)
return response.inserted_id
except Exception as e:
raise ValueError(str(e))
def insert_many(self, data: List):
"""
The function is used to inserting documents to a collection in a Mongo Database.
:param data: List of Data to be inserted
:return: Insert IDs
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.insert_many(data)
return response.inserted_ids
except Exception as e:
raise ValueError(str(e))
def find(
self,
query: Dict,
filter_dict: Optional[Dict] = None,
sort=None,
skip: Optional[int] = 0,
collation: Optional[bool] = False,
limit: Optional[int] = None,
) -> Cursor:
"""
The function is used to query documents from a given collection in a Mongo Database
:param query: Query Dictionary
:param filter_dict: Filter Dictionary
:param sort: List of tuple with key and direction. [(key, -1), ...]
:param skip: Skip Number
:param limit: Limit Number
:return: List of Documents
"""
if sort is None:
sort = []
if filter_dict is None:
filter_dict = {"_id": 0}
database_name = self.database
collection_name = self.collection
try:
db = self.client[database_name]
collection = db[collection_name]
if len(sort) > 0:
cursor = (
collection.find(
query,
filter_dict,
)
.sort(sort)
.skip(skip)
)
else:
cursor = collection.find(
query,
filter_dict,
).skip(skip)
if limit:
cursor = cursor.limit(limit)
if collation:
cursor = cursor.collation({"locale": "en"})
return cursor
except Exception as e:
raise ValueError(str(e))
def find_one(self, query: Dict, filter_dict: Optional[Dict] = None):
try:
database_name = self.database
collection_name = self.collection
if filter_dict is None:
filter_dict = {"_id": 0}
db = self.client[database_name]
collection = db[collection_name]
return collection.find_one(query, filter_dict)
except Exception as e:
raise ValueError(str(e))
def update_one(
self,
query: Dict,
data: Dict,
upsert: bool = False,
):
"""
:param upsert:
:param query:
:param data:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.update_one(query, {"$set": data}, upsert=upsert)
return response
except Exception as e:
raise ValueError(str(e))
def update_many(self, query: Dict, data: Dict, upsert: bool = False):
"""
:param upsert:
:param query:
:param data:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.update_many(query, {"$set": data}, upsert=upsert)
return response
except Exception as e:
raise ValueError(str(e))
def delete_many(self, query: Dict):
"""
:param query:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
if self.soft_delete:
soft_del_query = [
{match: query},
{add_fields: {"deleted": {"on": datetime.now(timezone.utc).replace(tzinfo=timezone.utc)}}},
{
"$merge": {
"into": {
"db": f"deleted__{database_name}",
"coll": collection_name,
},
}
},
]
collection.aggregate(soft_del_query)
response = collection.delete_many(query)
return response.deleted_count
except Exception as e:
raise ValueError(str(e))
def delete_one(self, query: Dict):
"""
:param query:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
if self.soft_delete:
soft_del_query = [
{match: query},
{add_fields: {"deleted": {"on": datetime.now(timezone.utc).replace(tzinfo=timezone.utc)}}},
{
"$merge": {
"into": {
"db": f"deleted__{database_name}",
"coll": collection_name,
},
}
},
]
collection.aggregate(soft_del_query)
response = collection.delete_one(query)
return response.deleted_count
except Exception as e:
raise ValueError(str(e))
def distinct(self, query_key: str, filter_json: Optional[Dict] = None):
"""
:param query_key:
:param filter_json:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
return collection.distinct(query_key, filter_json)
except Exception as e:
raise ValueError(str(e))
def aggregate(
self,
pipelines: List,
):
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
return collection.aggregate(pipelines)
except Exception as e:
raise ValueError(str(e))
def find_count(self, json_data, database_name, collection_name):
"""
:param json_data:
:param database_name: The database to which the collection/ documents belongs to.
:param collection_name: The collection to which the documents belongs to.
:return:
"""
try:
db = self.client[database_name]
return db[collection_name].find(json_data).count()
except Exception as e:
raise ValueError(str(e))
def find_record_count(self, json_data: dict):
"""
:param json_data:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
return collection.count_documents(json_data)
except Exception as e:
raise ValueError(str(e))
def bulk_write(self, operation):
try:
database_name = self.database
collection_name = self.collection
database_connection = self.client[database_name]
database_connection[collection_name].bulk_write(operation)
return "success"
except Exception as e:
raise ValueError(str(e))
def create_mongo_index(self, index_list: list):
"""
params: index_list - ([("key1", -1),("key2", 1)])
Returns:
object:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
collection.create_index(index_list)
return True
except Exception as e:
raise ValueError(str(e))
class MongoAggregateBaseClass:
def __init__(
self,
mongo_client,
database,
):
self.client = mongo_client
self.database = database
def aggregate(
self,
collection,
pipelines: List,
):
try:
database_name = self.database
collection_name = collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.aggregate(pipelines)
return response
except Exception as e:
logger.error(f"Failed to get the aggregate data {str(e)}")
raise e
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment