Commit a625753f authored by Sabari T's avatar Sabari T

tag_hierarchy update script

parent f085a545
# Default ignored files
/shelf/
/workspace.xml
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyPackageRequirementsInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredPackages">
<value>
<list size="57">
<item index="0" class="java.lang.String" itemvalue="scipy" />
<item index="1" class="java.lang.String" itemvalue="pydantic" />
<item index="2" class="java.lang.String" itemvalue="Flask-Cors" />
<item index="3" class="java.lang.String" itemvalue="waitress" />
<item index="4" class="java.lang.String" itemvalue="xhtml2pdf" />
<item index="5" class="java.lang.String" itemvalue="reportlab" />
<item index="6" class="java.lang.String" itemvalue="numpy" />
<item index="7" class="java.lang.String" itemvalue="requests" />
<item index="8" class="java.lang.String" itemvalue="gunicorn" />
<item index="9" class="java.lang.String" itemvalue="Flask-Compress" />
<item index="10" class="java.lang.String" itemvalue="pandas" />
<item index="11" class="java.lang.String" itemvalue="word2number" />
<item index="12" class="java.lang.String" itemvalue="paho-mqtt" />
<item index="13" class="java.lang.String" itemvalue="pycryptodome" />
<item index="14" class="java.lang.String" itemvalue="PyPDF2" />
<item index="15" class="java.lang.String" itemvalue="pytz" />
<item index="16" class="java.lang.String" itemvalue="python-jose" />
<item index="17" class="java.lang.String" itemvalue="pyjwt" />
<item index="18" class="java.lang.String" itemvalue="bson" />
<item index="19" class="java.lang.String" itemvalue="ujson" />
<item index="20" class="java.lang.String" itemvalue="getmac" />
<item index="21" class="java.lang.String" itemvalue="boto3" />
<item index="22" class="java.lang.String" itemvalue="botocore" />
<item index="23" class="java.lang.String" itemvalue="captcha" />
<item index="24" class="java.lang.String" itemvalue="python-dotenv" />
<item index="25" class="java.lang.String" itemvalue="cryptography" />
<item index="26" class="java.lang.String" itemvalue="boltons" />
<item index="27" class="java.lang.String" itemvalue="iteration-utilities" />
<item index="28" class="java.lang.String" itemvalue="pymongo" />
<item index="29" class="java.lang.String" itemvalue="pyyaml" />
<item index="30" class="java.lang.String" itemvalue="werkzeug" />
<item index="31" class="java.lang.String" itemvalue="six" />
<item index="32" class="java.lang.String" itemvalue="flask-cors" />
<item index="33" class="java.lang.String" itemvalue="gevent" />
<item index="34" class="java.lang.String" itemvalue="flask" />
<item index="35" class="java.lang.String" itemvalue="redis" />
<item index="36" class="java.lang.String" itemvalue="bcrypt" />
<item index="37" class="java.lang.String" itemvalue="Werkzeug" />
<item index="38" class="java.lang.String" itemvalue="pyaml" />
<item index="39" class="java.lang.String" itemvalue="aniso8601" />
<item index="40" class="java.lang.String" itemvalue="pycryptodomex" />
<item index="41" class="java.lang.String" itemvalue="Jinja2" />
<item index="42" class="java.lang.String" itemvalue="iteration_utilities" />
<item index="43" class="java.lang.String" itemvalue="rsa" />
<item index="44" class="java.lang.String" itemvalue="greenlet" />
<item index="45" class="java.lang.String" itemvalue="simple-crypt" />
<item index="46" class="java.lang.String" itemvalue="PyYAML" />
<item index="47" class="java.lang.String" itemvalue="pillow" />
<item index="48" class="java.lang.String" itemvalue="xlsxwriter" />
<item index="49" class="java.lang.String" itemvalue="pyopenssl" />
<item index="50" class="java.lang.String" itemvalue="APScheduler" />
<item index="51" class="java.lang.String" itemvalue="Flask" />
<item index="52" class="java.lang.String" itemvalue="openpyxl" />
<item index="53" class="java.lang.String" itemvalue="crypto" />
<item index="54" class="java.lang.String" itemvalue="PyJWT" />
<item index="55" class="java.lang.String" itemvalue="configparser" />
<item index="56" class="java.lang.String" itemvalue="msal" />
</list>
</value>
</option>
</inspection_tool>
<inspection_tool class="PyPep8NamingInspection" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<option name="ignoredErrors">
<list>
<option value="N806" />
<option value="N801" />
<option value="N812" />
<option value="N803" />
</list>
</option>
</inspection_tool>
<inspection_tool class="PyShadowingBuiltinsInspection" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<option name="ignoredNames">
<list>
<option value="type" />
</list>
</option>
</inspection_tool>
<inspection_tool class="PyUnresolvedReferencesInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredIdentifiers">
<list>
<option value="scripts.constants.app_configuration.configparser" />
<option value="str.__setitem__" />
<option value="dict.__getitem__" />
<option value="dict.update" />
</list>
</option>
</inspection_tool>
</profile>
</component>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.8" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/tag-hierarchy-update.iml" filepath="$PROJECT_DIR$/.idea/tag-hierarchy-update.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CommitMessageInspectionProfile">
<profile version="1.0">
<inspection_tool class="CommitFormat" enabled="true" level="WARNING" enabled_by_default="true" />
<inspection_tool class="CommitNamingConvention" enabled="true" level="WARNING" enabled_by_default="true" />
</profile>
</component>
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
2021-06-17 20:53:31 - DEBUG - Log/ilens - [MainThread:main.py:<module>():6] - main block started
2021-06-17 20:53:55 - INFO - Log/ilens - [MainThread:update_handler.py:tag_hierarchy_update():22] - Records fetched from following collection Successfully
2021-06-17 20:53:55 - INFO - Log/ilens - [MainThread:update_handler.py:tag_hierarchy_update():23] - database_name: ilens_configuration collection_name: site_conf
2021-06-17 20:53:56 - INFO - Log/ilens - [MainThread:update_handler.py:tag_hierarchy_update():27] - Dropped tag_hierarchy collection from ilens_configuration database
2021-06-17 20:54:46 - INFO - Log/ilens - [MainThread:update_handler.py:tag_hierarchy_update():30] - Updated the following collection Successfully
2021-06-17 20:54:46 - INFO - Log/ilens - [MainThread:update_handler.py:tag_hierarchy_update():31] - database_name: ilens_configuration collection_name: tag_hierarchy
###tag_hierarchy collection upgrade
\ No newline at end of file
log:
file_name: ilens
path: Log/
level: DEBUG
handler: rotating_file_handler
max_bytes: 10000000
back_up_count: 10
mongo_db:
URI: mongodb://192.168.0.220:2717/
\ No newline at end of file
from scripts.logging.logger import logger
from scripts.handler.update_handler import UpdateHandler
if __name__ == '__main__':
try:
logger.debug("main block started")
status = UpdateHandler().tag_hierarchy_update()
if status:
print("Updated successfully")
else:
print("Some records are not updated successfully, Please check the log file")
except Exception as e:
logger.exception("Exception in the main block" + str(e))
from pydantic import BaseSettings
import sys
import yaml
# this method is to read the configuration from backup.conf
def read_configuration(file_name):
"""
:param file_name:
:return: all the configuration constants
"""
with open(file_name, 'r') as stream:
try:
return yaml.full_load(stream)
except yaml.YAMLError as exc:
print(exc)
config = read_configuration("conf/service.yml")
class _Configuration(BaseSettings):
MONGO_URI = config["mongo_db"]["URI"]
_conf = _Configuration()
class DB(object):
class MongoDb:
uri = _conf.MONGO_URI
print(uri)
if not uri:
print("Error, environment variable MONGO_URI not set")
sys.exit(1)
class LOG:
LOG_FILE_NAME = config["log"]["file_name"]
LOG_PATH = config["log"]["path"]
LOG_LEVEL = config["log"]["level"]
LOG_HANDLER = config["log"]["handler"]
LOG_MAX_BYTES = int(config["log"]["max_bytes"])
LOG_BACKUP_COUNT = int(config["log"]["back_up_count"])
class DatabaseNames:
ilens_configuration = "ilens_configuration"
local_host = "database"
class CollectionNames:
site_conf = "site_conf"
tag_hierarchy = "tag_hierarchy"
user = "user"
user_role = "user_role"
class TagHierarchyKeys:
KEY_ID = "id"
KEY_SITE_ID = "site_id"
KEY_SITE_NAME = "site_name"
KEY_DEPT_ID = "dept_id"
KEY_DEPT_NAME = "dept_name"
KEY_LINE_ID = "line_id"
KEY_LINE_NAME = "line_name"
KEY_EQUIPMENT_ID = "equipment_id"
KEY_EQUIPMENT_NAME = "equipment_name"
KEY_PARAMETER_ID = "parameter_id"
KEY_THING_MODEL_ID = "thing_model_id"
KEY_THING_MODEL_VERSION = "thing_model_version"
KEY_PROJECT_ID = "project_id"
class SiteConfCollectionKeys:
KEY_SITE_NAME = "site_name"
KEY_SITE_INFO = "site_info"
KEY_CUSTOMER_PROJECT_ID = "customer_project_id"
KEY_SITE_ID = "site_id"
KEY_PRODUCT_ENCRYPTED = "product_encrypted"
KEY_PROCESS_ID = "process_id"
from scripts.constants import CollectionNames
class MongoEncryptionConstants:
# mongo exception codes
MONGO001 = "Server was unable to eestablish connection with MongoDB"
MONGO002 = "Server faced a problem when inserting document(s) into MongoDB"
MONGO003 = "Server faced a problem to find the document(s) with the given condition"
MONGO004 = "Server faced a problem to delete the document(s) with the given condition"
MONGO005 = "Server faced a problem to update the document(s) with the given condition and given data"
MONGO006 = "Server faced a problem when aggregating the data"
MONGO007 = "Server faced a problem when closing the connection with MongoDB"
# mongo encryption keys
key_encrypt_keys = "encrypt_keys"
key_exclude_encryption = "exclude_encryption"
product_encrypted = "product_encrypted"
max_docs_per_batch = 5
# cipher_key = "a985195aaa464e61"
# Product based configurable constants
cipher_key = {
'k': '-----BEGIN RSA PRIVATE KEY-----\nMIIEowIBAAKCAQEArVED5cr+tMtFtVmXl2O0cvQbEgoYSIFd8yvkmm6z7'
'XAdX6Eg\nYkKez0ydTl26KOdJ18A7Kn8etWGe8nTkSGheJl9rn/J+lE1zpo4Zg/T3wDnM8FM3\nyuM26vpIb+0oJmNc9'
'DkFXo4WtxRFZDytdETg/YyI+eJYDRDrZSrlqAzIDpAdLpv9\nUhsMhYQ+2n3PcauLeJb0dKPVTc6kSvGCs3LZ0WyTbRnQ'
'yJMCWnaxzpSIUcH7qaqO\nKC/fBCKsZmRjRNSmQ3gepz4VnQKyJCm7CJk+cQiQMQzrspRPvhmGouHZUM36KjsG\n6ylx2'
'Bu6OYy/HbrdRkJKNlv3u6BBL6Pn/ZJZGQIDAQABAoIBABI8eMhESnYbm1RI\nW8S8YzeIO1Pz13hDku7cArcEKG72kcSm'
'58knAN5HjbK59nVI1tJ6gc84JnNH1Qlm\nZsG+p49qkWC4S3zPxHg1MfaaPzpM6qUr4G4656OkV5xdTBDz+gshd9Dp6vZ'
'zDdUc\n9FRMTg8nqx79461mRxpzP8xloaQ0NcKBzFK9e3g/4i72LwgNP3E6xmESiu7goqJ1\nGOAI2mJie3TTY1z8sf4u'
'iSFLMaFrExkq4z4KkwS7qF2nOJxhv8H/g9TGPNWrnzAw\nyBHwINBoUaJwiOT51xxIDLgNQiNoIFuaMKVu2l+rWtoQWKG'
'iOnw1ZhYxeJCXByXC\nQqpAfgECgYEAwpzSfyot3PAlxm9iVK5Zc6lRdBq7Jazt7t91U6zeY7C4xzNG1Tuf\ncSYK3qRwl'
'Mw2uXl9auxyV41rziX9sZhtFUnm4jcGv9MHeaAaSSPSsvrtZDFBS7ky\nl2Ixk1078LTZCLMYmAKCAr2XLmShBPSVcuaL'
'kDRX4rvw7scWmMb86wECgYEA4/yC\nEAjXlL0WlOYDJ3J//Pg4iBtIedHXmn30goNuCBBaoYygXapeytEmU2q5hybQTMTX'
'\nVl/vIAFiu0TX81VQ7LDLJaber/7GEsIT3x+xm0jFvOxFYVhT5b0s0z1CQolnRFsA\ndIwQ5u5GkP65hyJUa3ZMh+L6Vi'
'sSCTKpAco9ZhkCgYAKFZ5CwKjHvhn3AmaSBMbV\n23xBC/GOrjtWGXY288pCWDH7AIk3G3PTpSkDCHpc+4gKbGU3WTFDoC'
'xp7kYLId7l\nL4MrTban0gOdJdK234hXhfEvM+4yQlKAzbHL9RTaEET+0mj/14FtKu3elZBSdWoZ\nHiE1Q8EaGqsNdHuT'
'RxxsAQKBgQCqw7enyveusPL5FE/IfDrhgArX55ehp0uWrEE4\ngLkp0RYRawOzJKlbwMy0LnxIfwoGdmnUiIbTsjYBjs8'
'xs/WpU8LVsOeba3lxE21O\n8q5VYWy61T4ia9ZrjgbFMl0u+TwgNwlgQolmb5Lrh9/vGAejdjhcj+ZyJFCeExQE\nAzd6'
'AQKBgBhe+FwMhTus961jqEKXBXm0/OaOgzOdgl/asuC8oLU7cAVD7sS2LFcU\nu7ofIVIG6cRWRruajIuCdlIcLOedTE4'
'YL5jAuRL1TyVvxMm0FsrkWPABFrHWhsZs\nTSzpiOFJkLJTVnT7hlW/+m0qrKiW0zrFza0JaFwP/lj+hRrYGkOl\n'
'-----END RSA PRIVATE KEY-----'}
encrypt_collection_dict = \
{CollectionNames.user: {key_encrypt_keys: ["*"],
key_exclude_encryption: ["_id", "user_id", "client_id", "deleted_by",
"created_by", "username", "isdeleted", "email"]},
CollectionNames.user_role: {key_encrypt_keys: ["*"],
key_exclude_encryption: ["_id", "user_role_id", "user_role_name",
"client_id", "isdeleted", "default"]}}
\ No newline at end of file
from scripts.utils.mongo_util import MongoConnect
from scripts.config import DB
mongo_client = MongoConnect(uri=DB.MongoDb.uri)()
This diff is collapsed.
from scripts.constants import DatabaseNames, CollectionNames
from scripts.constants import SiteConfCollectionKeys
from scripts.utils.mongo_util import MongoCollectionBaseClass
class SiteConf(MongoCollectionBaseClass):
def __init__(self, mongo_client):
super().__init__(mongo_client, database=DatabaseNames.ilens_configuration,
collection=CollectionNames.site_conf)
@property
def key_customer_project_id(self):
return SiteConfCollectionKeys.KEY_CUSTOMER_PROJECT_ID
@property
def key_site_id(self):
return SiteConfCollectionKeys.KEY_SITE_ID
@property
def key_site_name(self):
return SiteConfCollectionKeys.KEY_SITE_NAME
@property
def key_process_id(self):
return SiteConfCollectionKeys.KEY_PROCESS_ID
def get_all_sites(self, filter_dict=None,
sort=None, skip=0, limit=None, site_id=None, **query):
"""
The following function will give all sites for the given set of
search parameters as keyword arguments
:param site_id:
:param filter_dict:
:param sort:
:param skip:
:param limit:
:param query:
:return:
"""
if site_id is not None:
query.update({self.key_site_id: site_id})
sites = self.find(filter_dict=filter_dict, sort=sort, skip=skip, limit=limit, query=query)
if sites:
return list(sites)
return list()
def find_site_by_site_name(self, site_name, project_id):
query_json = {self.key_site_name: site_name, self.key_customer_project_id: project_id}
response = self.find_one(query=query_json)
if response:
return dict(response)
else:
return dict()
def find_site_by_site_id(self, site_id, filter_dict=None):
site = self.find_one(query={self.key_site_id: site_id}, filter_dict=filter_dict)
if site:
return dict(site)
return dict()
def delete_one_site(self, site_id):
if site_id:
query = {self.key_site_id: site_id}
return self.delete_one(query=query)
else:
return False
def delete_many_site(self, query=None):
return self.delete_many(query)
def update_one_site(self, site_id, data, upsert=False):
query = {self.key_site_id: site_id}
return self.update_one(data=data, query=query, upsert=upsert)
def update_one_process(self, process_id, updated_data):
"""
The following function will update one tag in
tags collection based on the given query
"""
query_dict = {self.key_process_id: process_id}
return self.update_one(data=updated_data, query=query_dict)
def update_many_site(self, site_id, data):
query = {self.key_site_id: site_id}
response = self.update_many(query=query, data=data)
if response:
return list(response)
else:
return list()
def insert_one_site(self, data):
response = self.insert_one(data=data)
return response
def insert_many_sites(self, data):
response = self.insert_many(data=data)
if response:
return list(response)
else:
return list()
def find_site_by_aggregate(self, query):
record = self.aggregate(query)
if record:
return record
else:
return list()
from scripts.constants import DatabaseNames, CollectionNames, TagHierarchyKeys
from scripts.utils.mongo_util import MongoCollectionBaseClass
class TagHierarchy(MongoCollectionBaseClass):
def __init__(self, mongo_client):
super().__init__(mongo_client, database=DatabaseNames.ilens_configuration,
collection=CollectionNames.tag_hierarchy)
@property
def key_id(self):
return TagHierarchyKeys.KEY_ID
@property
def key_site_id(self):
return TagHierarchyKeys.KEY_SITE_ID
@property
def key_dept_id(self):
return TagHierarchyKeys.KEY_DEPT_ID
@property
def key_line_id(self):
return TagHierarchyKeys.KEY_LINE_ID
@property
def key_equipment_id(self):
return TagHierarchyKeys.KEY_EQUIPMENT_ID
@property
def key_parameter_id(self):
return TagHierarchyKeys.KEY_PARAMETER_ID
@property
def key_thing_model_id(self):
return TagHierarchyKeys.KEY_THING_MODEL_ID
@property
def key_thing_model_version(self):
return TagHierarchyKeys.KEY_THING_MODEL_VERSION
@property
def key_dept_name(self):
return TagHierarchyKeys.KEY_DEPT_NAME
@property
def key_site_name(self):
return TagHierarchyKeys.KEY_SITE_NAME
@property
def key_line_name(self):
return TagHierarchyKeys.KEY_LINE_NAME
@property
def key_equipment_name(self):
return TagHierarchyKeys.KEY_EQUIPMENT_NAME
@property
def key_project_id(self):
return TagHierarchyKeys.KEY_PROJECT_ID
def find_all_tag_hierarchy(self, filter_dict=None,
sort=None, skip=0, limit=None, query=None):
"""
The following function will give all tag_hierarchy for the given set of
search parameters as keyword arguments
:param filter_dict:
:param sort:
:param skip:
:param limit:
:param query:
:return:
"""
if query is None:
query = {}
all_tag_hierarchy = self.find(filter_dict=filter_dict, sort=sort, skip=skip, limit=limit, query=query)
if not all_tag_hierarchy:
return list()
return list(all_tag_hierarchy)
def find_one_tag_hierarchy(self, filter_dict=None, **query):
"""
The following function will give one tag_hierarchy for a given set of
search parameters as keyword arguments
:param filter_dict:
:param query:
:return:
"""
tag_hierarchy = self.find_one(filter_dict=filter_dict, query=query)
if tag_hierarchy:
return tag_hierarchy
else:
return {}
def insert_one_tag_hierarchy(self, data):
"""
The following function will insert one tag_hierarchy in the
tag_hierarchy collections
:param data:
:return:
"""
return self.insert_one(data)
def insert_many_tag_hierarchy(self, data):
"""
The following function will insert many tag_hierarchy in the
tag_hierarchy collection
:param data:
:return:
"""
return self.insert_many(data)
def update_one_tag_hierarchy(self, query, data, upsert=False):
"""
The following function will update one tag_hierarchy in
tag_hierarchy collection based on the given query
:param query:
:param data:
:param upsert:
:return:
"""
return self.update_one(data=data, upsert=upsert, query=query)
def update_many_tag_hierarchy(self, data, query, upsert=False):
"""
The following function will update many tag_hierarchy in
tag_hierarchy collection based on the given query
:param data:
:param upsert:
:param query:
:return:
"""
return self.update_many(data=data, upsert=upsert, query=query)
def delete_many_tag_hierarchy(self, query):
"""
The following function will delete many tag_hierarchy in
tag_hierarchy collection based on the given query
:param query:
:return:
"""
return self.delete_many(query=query)
def delete_one_tag_hierarchy(self, **query):
"""
The following function will delete one tag_hierarchy in
tag_hierarchy collection based on the given query
:param query:
:return:
"""
return self.delete_one(query=query)
def drop_tag_hierarchy_collection(self):
"""
The following function will delete whole tag_hierarchy collection
:return:
"""
return self.drop()
from scripts.logging.logger import logger
from scripts.db.mongo import mongo_client
from scripts.db.mongo.aggregates.site_conf import SiteConfAggregate
from scripts.db.mongo.collections.site_conf import SiteConf
from scripts.db.mongo.collections.tag_hierarchy import TagHierarchy
class UpdateHandler:
def __init__(self):
self.site_conf = SiteConf(mongo_client=mongo_client)
self.tag_hierarchy = TagHierarchy(mongo_client=mongo_client)
self.site_conf_aggregate = SiteConfAggregate
def tag_hierarchy_update(self):
status = True
try:
aggregate_query = self.site_conf_aggregate.aggregate_array
details = []
for each_query in aggregate_query:
temp_details = list(self.site_conf.find_site_by_aggregate(each_query))
details.extend(temp_details)
logger.info("Records fetched from following collection Successfully")
logger.info(f"database_name: {self.site_conf.database} collection_name: {self.site_conf.collection}")
print("Records Fetched Successfully")
try:
self.tag_hierarchy.drop_tag_hierarchy_collection()
logger.info(f"Dropped {self.tag_hierarchy.collection} collection from {self.site_conf.database} database")
# for each_detail in details:
self.tag_hierarchy.insert_many_tag_hierarchy(details)
logger.info("Updated the following collection Successfully")
logger.info(
f"database_name: {self.site_conf.database} collection_name: {self.tag_hierarchy.collection}")
print("Updated the collection Successfully")
except Exception as e:
logger.exception(str(e))
status = False
return status
except Exception as e:
logger.exception(str(e))
raise
import os
import logging
from scripts.config import LOG
from logging.handlers import RotatingFileHandler
__log_path__ = str(LOG.LOG_PATH)
__log_level__ = str(LOG.LOG_LEVEL)
__max_bytes__ = int(LOG.LOG_MAX_BYTES)
__handler_type__ = str(LOG.LOG_HANDLER)
_log_file_name__ = str(LOG.LOG_FILE_NAME)
__backup_count__ = int(LOG.LOG_BACKUP_COUNT)
complete_log_path = os.path.join(__log_path__, _log_file_name__)
if not os.path.isdir(complete_log_path):
os.makedirs(complete_log_path)
def get_logger(log_file_name=complete_log_path, log_level=__log_level__, time_format="%Y-%m-%d %H:%M:%S",
handler_type=__handler_type__, max_bytes=__max_bytes__, backup_count=__backup_count__):
"""
Creates a rotating log
"""
log_file = os.path.join(log_file_name + '.log')
__logger__ = logging.getLogger(log_file_name)
__logger__.setLevel(log_level.strip().upper())
debug_formatter = '%(asctime)s - %(levelname)-6s - %(name)s - ' \
'[%(threadName)5s:%(filename)5s:%(funcName)5s():''%(lineno)s] - %(message)s'
formatter_string = '%(asctime)s - %(levelname)-6s - %(name)s - %(levelname)3s - %(message)s'
if log_level.strip().upper() == log_level:
formatter_string = debug_formatter
formatter = logging.Formatter(formatter_string, time_format)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
if __logger__.hasHandlers():
__logger__.handlers.clear()
if str(handler_type).lower() == "rotating_file_handler":
# Rotating File Handler
handler = RotatingFileHandler(log_file, maxBytes=max_bytes, backupCount=backup_count)
handler.setFormatter(formatter)
if __logger__.hasHandlers():
__logger__.handlers.clear()
__logger__.addHandler(handler)
else:
# File Handler
hdlr_service = logging.FileHandler(log_file)
hdlr_service.setFormatter(formatter)
if __logger__.hasHandlers():
__logger__.handlers.clear()
__logger__.addHandler(hdlr_service)
return __logger__
logger = get_logger()
"""
Mongo Utility
Author: Irfanuddin Shafi Ahmed
Reference: Pymongo Documentation
"""
from typing import Dict, List, Optional
from pymongo import MongoClient
from pymongo.cursor import Cursor
class MongoConnect:
def __init__(self, uri):
try:
self.uri = uri
self.client = MongoClient(self.uri, connect=False)
except Exception:
raise
def __call__(self, *args, **kwargs):
return self.client
def __repr__(self):
return f"Mongo Client(uri:{self.uri}, server_info={self.client.server_info()})"
class MongoCollectionBaseClass:
def __init__(self, mongo_client, database, collection):
self.client = mongo_client
self.database = database
self.collection = collection
def __repr__(self):
return f"{self.__class__.__name__}(database={self.database}, collection={self.collection})"
def insert_one(self, data: Dict):
"""
The function is used to inserting a document to a collection in a Mongo Database.
:param data: Data to be inserted
:return: Insert ID
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.insert_one(data)
return response.inserted_id
except Exception:
raise
def insert_many(self, data: List):
"""
The function is used to inserting documents to a collection in a Mongo Database.
:param data: List of Data to be inserted
:return: Insert IDs
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.insert_many(data)
return response.inserted_ids
except Exception:
raise
def find(self,
query: Dict,
filter_dict: Optional[Dict] = None,
sort=None,
skip: Optional[int] = 0,
limit: Optional[int] = None) -> Cursor:
"""
The function is used to query documents from a given collection in a Mongo Database
:param query: Query Dictionary
:param filter_dict: Filter Dictionary
:param sort: List of tuple with key and direction. [(key, -1), ...]
:param skip: Skip Number
:param limit: Limit Number
:return: List of Documents
"""
if sort is None:
sort = list()
if filter_dict is None:
filter_dict = {"_id": 0}
database_name = self.database
collection_name = self.collection
try:
db = self.client[database_name]
collection = db[collection_name]
if len(sort) > 0:
cursor = collection.find(query, filter_dict, ).sort(sort).skip(skip)
else:
cursor = collection.find(query, filter_dict, ).skip(skip)
if limit:
cursor = cursor.limit(limit)
return cursor
except Exception:
raise
def find_one(self,
query: Dict,
filter_dict: Optional[Dict] = None):
try:
database_name = self.database
collection_name = self.collection
if filter_dict is None:
filter_dict = {"_id": 0}
db = self.client[database_name]
collection = db[collection_name]
response = collection.find_one(query, filter_dict)
return response
except Exception:
raise
def update_one(self,
query: Dict,
data: Dict,
upsert: bool = False):
"""
:param upsert:
:param query:
:param data:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.update_one(query, {"$set": data}, upsert=upsert)
return response.modified_count
except Exception:
raise
def update_many(self,
query: Dict,
data: Dict,
upsert: bool = False):
"""
:param upsert:
:param query:
:param data:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.update(query, {"$set": data}, upsert=upsert)
return response.get("nModified")
except Exception:
raise
def delete_many(self, query: Dict):
"""
:param query:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.delete_many(query)
return response.deleted_count
except Exception:
raise
def delete_one(self, query: Dict):
"""
:param query:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.delete_one(query)
return response.deleted_count
except Exception:
raise
def distinct(self, query_key: str, filter_json: Optional[Dict] = None):
"""
:param query_key:
:param filter_json:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.distinct(query_key, filter_json)
return response
except Exception:
raise
def aggregate(self, pipelines: List, ):
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.aggregate(pipelines)
return response
except Exception:
raise
def drop(self):
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.drop()
return response
except Exception:
raise
def find_count(self, json_data, database_name, collection_name):
"""
:param json_data:
:param database_name: The database to which the collection/ documents belongs to.
:param collection_name: The collection to which the documents belongs to.
:return:
"""
try:
db = self.client[database_name]
mongo_response = db[collection_name].find(json_data).count()
return mongo_response
except Exception as e:
raise
class MongoAggregateBaseClass:
def __init__(self, mongo_client, database, ):
self.client = mongo_client
self.database = database
def aggregate(self, collection, pipelines: List, ):
try:
database_name = self.database
collection_name = collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.aggregate(pipelines)
return response
except Exception:
raise
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment