Commit d92738f4 authored by Sabari T's avatar Sabari T

lookup rule

parent f4e6527e
from scripts.errors.module_exceptions import RequiredFieldsMissing
from scripts.logging.logger import logger
from scripts.handler.rule_engine_update import RuleUpdate
if __name__ == '__main__':
try:
logger.debug("main block started")
status = RuleUpdate().rule_update()
if status:
print("Created successfully")
else:
print("Some records are not created successfully, Please check the log file")
except RequiredFieldsMissing:
logger.exception("Required Fields are missing in the static JSON")
print("Required Fields are missing in the static JSON")
except Exception as e:
logger.exception("Exception in the main block" + str(e))
print("Exception in the main block" + str(e))
from pydantic import BaseSettings
import sys
import yaml
# this method is to read the configuration from backup.conf
def read_configuration(file_name):
"""
:param file_name:
:return: all the configuration constants
"""
with open(file_name, 'r') as stream:
try:
return yaml.full_load(stream)
except yaml.YAMLError as exc:
print(exc)
config = read_configuration("conf/service.yml")
class _Configuration(BaseSettings):
MONGO_URI = config["mongo_db"]["URI"]
_conf = _Configuration()
redis_section = "REDIS"
class DB(object):
class MongoDb:
uri = _conf.MONGO_URI
print(uri)
if not uri:
print("Error, environment variable MONGO_URI not set")
sys.exit(1)
class LOG:
LOG_FILE_NAME = config["log"]["file_name"]
LOG_PATH = config["log"]["path"]
LOG_LEVEL = config["log"]["level"]
LOG_HANDLER = config["log"]["handler"]
LOG_MAX_BYTES = int(config["log"]["max_bytes"])
LOG_BACKUP_COUNT = int(config["log"]["back_up_count"])
class RedisConfig:
redis_host = config["REDIS"]["REDIS_HOST"]
redis_port = int(config["REDIS"]["REDIS_PORT"])
rules_redis_db = int(config["REDIS"]["rules_db"])
alarms_redis_db = int(config["REDIS"]["alarms_db"])
redis_login_db = int(config["REDIS"]["login_db"])
class DatabaseNames:
ilens_configuration = "ilens_configuration"
local_host = "database"
class CollectionNames:
site_conf = "site_conf"
tag_hierarchy = "tag_hierarchy"
user = "user"
user_role = "user_role"
tags = "tags"
rule_engine = "rule_engine"
unique_id = "unique_id"
class TagsCollectionKeys:
KEY_TAG_NAME = "tag_name"
KEY_UNIT = "unit"
KEY_TAG_TYPE = "tag_type"
KEY_DESCRIPTION = "description"
KEY_ID = "id"
KEY_TAG_GROUP_ID = "tag_group_id"
KEY_DATA_TYPE = "data_type"
KEY_DEFAULT = "default"
KEY_SYSTEM_TAG_TYPE = "system_tag_type"
KEY_VALUE_LIST = "value_list"
KEY_PRODUCT_ENCRYPTED = "product_encrypted"
KEY_TAG_CATEGORY_ID = "tag_category_id"
class SiteConfCollectionKeys:
KEY_SITE_NAME = "site_name"
KEY_SITE_INFO = "site_info"
KEY_CUSTOMER_PROJECT_ID = "customer_project_id"
KEY_SITE_ID = "site_id"
KEY_PRODUCT_ENCRYPTED = "product_encrypted"
KEY_PROCESS_ID = "process_id"
class RuleCommonKeys:
KEY_CREATED_BY = "created_by"
KEY_CREATED_TIME = "created_time"
KEY_LAST_UPDATED_BY = "last_updated_by"
KEY_LAST_UPDATED_TIME = "last_updated_time"
YEAR = 'year'
MINUTE = "minute"
HOUR = "hour"
DAY = "day_month"
DAY_OF_WEEK = "day_week"
WEEK = "week"
MONTH = "month"
SHIFT = "shift"
SCHEDULE = "schedule"
REALTIME = "realtime"
class UniqueIdKeys:
KEY_ID = "id"
KEY_KEY = "key"
from scripts.constants import CollectionNames
class MongoEncryptionConstants:
# mongo exception codes
MONGO001 = "Server was unable to eestablish connection with MongoDB"
MONGO002 = "Server faced a problem when inserting document(s) into MongoDB"
MONGO003 = "Server faced a problem to find the document(s) with the given condition"
MONGO004 = "Server faced a problem to delete the document(s) with the given condition"
MONGO005 = "Server faced a problem to update the document(s) with the given condition and given data"
MONGO006 = "Server faced a problem when aggregating the data"
MONGO007 = "Server faced a problem when closing the connection with MongoDB"
# mongo encryption keys
key_encrypt_keys = "encrypt_keys"
key_exclude_encryption = "exclude_encryption"
product_encrypted = "product_encrypted"
max_docs_per_batch = 5
# cipher_key = "a985195aaa464e61"
# Product based configurable constants
cipher_key = {
'k': '-----BEGIN RSA PRIVATE KEY-----\nMIIEowIBAAKCAQEArVED5cr+tMtFtVmXl2O0cvQbEgoYSIFd8yvkmm6z7'
'XAdX6Eg\nYkKez0ydTl26KOdJ18A7Kn8etWGe8nTkSGheJl9rn/J+lE1zpo4Zg/T3wDnM8FM3\nyuM26vpIb+0oJmNc9'
'DkFXo4WtxRFZDytdETg/YyI+eJYDRDrZSrlqAzIDpAdLpv9\nUhsMhYQ+2n3PcauLeJb0dKPVTc6kSvGCs3LZ0WyTbRnQ'
'yJMCWnaxzpSIUcH7qaqO\nKC/fBCKsZmRjRNSmQ3gepz4VnQKyJCm7CJk+cQiQMQzrspRPvhmGouHZUM36KjsG\n6ylx2'
'Bu6OYy/HbrdRkJKNlv3u6BBL6Pn/ZJZGQIDAQABAoIBABI8eMhESnYbm1RI\nW8S8YzeIO1Pz13hDku7cArcEKG72kcSm'
'58knAN5HjbK59nVI1tJ6gc84JnNH1Qlm\nZsG+p49qkWC4S3zPxHg1MfaaPzpM6qUr4G4656OkV5xdTBDz+gshd9Dp6vZ'
'zDdUc\n9FRMTg8nqx79461mRxpzP8xloaQ0NcKBzFK9e3g/4i72LwgNP3E6xmESiu7goqJ1\nGOAI2mJie3TTY1z8sf4u'
'iSFLMaFrExkq4z4KkwS7qF2nOJxhv8H/g9TGPNWrnzAw\nyBHwINBoUaJwiOT51xxIDLgNQiNoIFuaMKVu2l+rWtoQWKG'
'iOnw1ZhYxeJCXByXC\nQqpAfgECgYEAwpzSfyot3PAlxm9iVK5Zc6lRdBq7Jazt7t91U6zeY7C4xzNG1Tuf\ncSYK3qRwl'
'Mw2uXl9auxyV41rziX9sZhtFUnm4jcGv9MHeaAaSSPSsvrtZDFBS7ky\nl2Ixk1078LTZCLMYmAKCAr2XLmShBPSVcuaL'
'kDRX4rvw7scWmMb86wECgYEA4/yC\nEAjXlL0WlOYDJ3J//Pg4iBtIedHXmn30goNuCBBaoYygXapeytEmU2q5hybQTMTX'
'\nVl/vIAFiu0TX81VQ7LDLJaber/7GEsIT3x+xm0jFvOxFYVhT5b0s0z1CQolnRFsA\ndIwQ5u5GkP65hyJUa3ZMh+L6Vi'
'sSCTKpAco9ZhkCgYAKFZ5CwKjHvhn3AmaSBMbV\n23xBC/GOrjtWGXY288pCWDH7AIk3G3PTpSkDCHpc+4gKbGU3WTFDoC'
'xp7kYLId7l\nL4MrTban0gOdJdK234hXhfEvM+4yQlKAzbHL9RTaEET+0mj/14FtKu3elZBSdWoZ\nHiE1Q8EaGqsNdHuT'
'RxxsAQKBgQCqw7enyveusPL5FE/IfDrhgArX55ehp0uWrEE4\ngLkp0RYRawOzJKlbwMy0LnxIfwoGdmnUiIbTsjYBjs8'
'xs/WpU8LVsOeba3lxE21O\n8q5VYWy61T4ia9ZrjgbFMl0u+TwgNwlgQolmb5Lrh9/vGAejdjhcj+ZyJFCeExQE\nAzd6'
'AQKBgBhe+FwMhTus961jqEKXBXm0/OaOgzOdgl/asuC8oLU7cAVD7sS2LFcU\nu7ofIVIG6cRWRruajIuCdlIcLOedTE4'
'YL5jAuRL1TyVvxMm0FsrkWPABFrHWhsZs\nTSzpiOFJkLJTVnT7hlW/+m0qrKiW0zrFza0JaFwP/lj+hRrYGkOl\n'
'-----END RSA PRIVATE KEY-----'}
encrypt_collection_dict = \
{CollectionNames.user: {key_encrypt_keys: ["*"],
key_exclude_encryption: ["_id", "user_id", "client_id", "deleted_by",
"created_by", "username", "isdeleted", "email"]},
CollectionNames.user_role: {key_encrypt_keys: ["*"],
key_exclude_encryption: ["_id", "user_role_id", "user_role_name",
"client_id", "isdeleted", "default"]}}
\ No newline at end of file
from scripts.utils.mongo_util import MongoConnect
from scripts.config import DB
mongo_client = MongoConnect(uri=DB.MongoDb.uri)()
class TagsAggregate:
tag_id_name_map = [
{
'$group': {
'_id': None,
'tag_id_name': {
'$push': {
'k': '$id',
'v': '$tag_name'
}
}
}
}, {
'$project': {
'_id': 0,
'tag_id_name': {
'$arrayToObject': '$tag_id_name'
}
}
}
]
derived_tags_list = [{"$match": {'system_tag_type': 'derived_tag'}},
{"$group": {"_id": None, "derived_tag_list": {"$push": "$id"}}},
{"$project": {"derived_tag_list": "$derived_tag_list"}}]
from typing import Optional, List
from scripts.constants import DatabaseNames, CollectionNames
from scripts.utils.mongo_util import MongoCollectionBaseClass
class RuleEngine(MongoCollectionBaseClass):
def __init__(self, mongo_client):
super().__init__(mongo_client, database=DatabaseNames.ilens_configuration,
collection=CollectionNames.rule_engine)
def find_all_rules(self, query):
"""
The following function will give all rules for the given set of
search parameters as keyword arguments
:return:
"""
all_rules = self.find(query)
if not all_rules:
return list()
return list(all_rules)
def find_one_rule(self, query):
"""
The following function will give all rules for the given set of
search parameters as keyword arguments
:param query:
:return:
"""
selected_rule = self.find_one(query=query)
if selected_rule:
return selected_rule
return dict()
def update_one_rule(self, query_dict, data, upsert=False):
"""
The following function will update one rule in
tags collection based on the given query
"""
return self.update_one(data=data, query=query_dict, upsert=upsert)
def insert_one_rule(self, data):
return self.insert_one(data)
def delete_one_rule(self, query):
"""
The following function will delete one rule in
tags collection based on the given query
"""
return self.delete_one(query=query)
from typing import List, Optional, Dict
from scripts.constants import DatabaseNames, CollectionNames, TagsCollectionKeys
from scripts.logging import logger
from scripts.utils.mongo_util import MongoCollectionBaseClass
class Tags(MongoCollectionBaseClass):
def __init__(self, mongo_client):
super().__init__(mongo_client, database=DatabaseNames.ilens_configuration,
collection=CollectionNames.tags)
@property
def key_tag_category_id(self):
return TagsCollectionKeys.KEY_TAG_CATEGORY_ID
@property
def key_tag_name(self):
return TagsCollectionKeys.KEY_TAG_NAME
@property
def key_unit(self):
return TagsCollectionKeys.KEY_UNIT
@property
def key_tag_type(self):
return TagsCollectionKeys.KEY_TAG_TYPE
@property
def key_description(self):
return TagsCollectionKeys.KEY_DESCRIPTION
@property
def key_id(self):
return TagsCollectionKeys.KEY_ID
@property
def key_tag_group_id(self):
return TagsCollectionKeys.KEY_TAG_GROUP_ID
@property
def key_default(self):
return TagsCollectionKeys.KEY_DEFAULT
@property
def key_data_type(self):
return TagsCollectionKeys.KEY_DATA_TYPE
@property
def key_system_tag_type(self):
return TagsCollectionKeys.KEY_SYSTEM_TAG_TYPE
@property
def key_value_list(self):
return TagsCollectionKeys.KEY_VALUE_LIST
@property
def key_product_encrypted(self):
return TagsCollectionKeys.KEY_PRODUCT_ENCRYPTED
def find_all_tags(self, filter_dict=None,
sort=None, skip=0, limit=None, **query):
"""
The following function will give all tags for the given set of
search parameters as keyword arguments
:param filter_dict:
:param sort:
:param skip:
:param limit:
:param query:
:return:
"""
all_tags = self.find(filter_dict=filter_dict, sort=sort, skip=skip, limit=limit, query=query)
if not all_tags:
return list()
return all_tags
def find_tags_by_query(self, query, filter_dict=None):
all_tags = self.find(query=query,filter_dict=filter_dict)
if all_tags:
return all_tags
return list()
def find_one_tag(self, filter_dict=None, **query):
"""
The following function will give one tag for a given set of
search parameters as keyword arguments
:param filter_dict:
:param query:
:return:
"""
tag = self.find_one(filter_dict=filter_dict, query=query)
if tag:
return tag
else:
return tag
def insert_one_tag(self, data):
"""
The following function will insert one tag in the
tags collections
:param data:
:return:
"""
return self.insert_one(data)
def insert_many_tags(self, data):
"""
The following function will insert many tags in the
tags collection
:param data:
:return:
"""
return self.insert_many(data)
def update_one_tag(self, data, upsert=False, **query):
"""
The following function will update one tag in
tags collection based on the given query
:param data:
:param upsert:
:param query:
:return:
"""
return self.update_one(data=data, upsert=upsert, query=query)
def update_many_tags(self, data, query, upsert=False):
"""
The following function will update many tags in
tags collection based on the given query
:param data:
:param upsert:
:param query:
:return:
"""
return self.update_many(data=data, upsert=upsert, query=query)
def delete_many_tags(self, **query):
"""
The following function will delete many tag in
tags collection based on the given query
:param query:
:return:
"""
return self.delete_many(query=query)
def delete_one_tag(self, **query):
"""
The following function will delete one tag in
tags collection based on the given query
:param query:
:return:
"""
if query:
return self.delete_one(query=query)
else:
return False
def distinct_tag(self, query_key):
"""
Get a list of distinct values for `key` among all documents
in the result set of this query.
:param query_key:
:return:
"""
return self.distinct(query_key=query_key)
def find_tags_with_list(self, tag_list):
query = {self.key_tag_name: {"$in": tag_list}}
filter_dict = dict(tag_name=1, _id=0)
tags = self.find(query=query, filter_dict=filter_dict)
if not tags:
return list()
return list(tags)
def find_tags_by_ids(self, tag_category_id, tag_list):
query = {"$or": [{self.key_tag_category_id: tag_category_id}, {self.key_id: {"$in": tag_list}}]}
tags = self.find(query=query)
if not tags:
return list()
return list(tags)
def find_tags_by_aggregate(self, query):
tags = self.aggregate(query)
if not tags:
return list()
return list(tags)
def update_many_tags_by_group(self, selected_tags, tag_group_id):
data = {self.key_tag_group_id: tag_group_id}
query = {self.key_id: {"$in": selected_tags}}
self.update_many(query=query, data=data)
def find_by_aggregate(self, query):
tags = self.aggregate(query)
if not tags:
return list()
return list(tags)
def get_tag_name(self, tag_id):
query = [
{
'$match': {
'id': tag_id
}
}, {
'$project': {
'_id': 0,
'tag_name': '$tag_name',
'tag_id': '$tag_id'
}
}
]
tags = self.aggregate(query)
tags = [x for x in tags]
if not tags:
return ""
return tags[0]["tag_name"]
\ No newline at end of file
from typing import Optional
from scripts.constants import DatabaseNames, CollectionNames
from scripts.constants import UniqueIdKeys
from scripts.db.mongo.schema import MongoBaseSchema
from scripts.utils.mongo_util import MongoCollectionBaseClass
class UniqueIdSchema(MongoBaseSchema):
"""
This is the Schema for the Mongo DB Collection.
All datastore and general responses will be following the schema.
"""
key: Optional[str]
id: Optional[str]
product_encrypted: Optional[bool] = False
class UniqueId(MongoCollectionBaseClass):
def __init__(self, mongo_client):
super().__init__(mongo_client, database=DatabaseNames.ilens_configuration,
collection=CollectionNames.unique_id)
@property
def key_key(self):
return UniqueIdKeys.KEY_KEY
def find_one_record(self, **kwargs):
"""
The following function will give one record for a given set of
search parameters as keyword arguments
:param kwargs:
:return:
"""
record = self.find_one(query=kwargs)
if not record:
return UniqueIdSchema()
return UniqueIdSchema(**record)
def insert_record(self, record: UniqueIdSchema):
"""
The following function will give one record for a given set of
search parameters as keyword arguments
:param record:
:return:
"""
self.insert_one(record.dict())
return record.id
def update_record(self, record: UniqueIdSchema):
"""
The following function will give one record for a given set of
search parameters as keyword arguments
:param record:
:return:
"""
self.update_one(query={self.key_key: record.key}, data=record.dict())
return record.id
def update_one_record(self, key, updated_data):
query = {self.key_key: key}
return self.update_one(data=updated_data, query=query)
def find_record(self, key, filter_dict=None):
"""
The following function will give one tag for a given set of
search parameters as keyword arguments
:param key:
:param filter_dict:
:param :
:return:
"""
query = {self.key_key: key}
unique_key = self.find_one(query=query, filter_dict=filter_dict)
if unique_key:
return dict(unique_key)
else:
return unique_key
from pydantic import BaseModel
class MongoBaseSchema(BaseModel):
"""
This is a base schema for all mongo collection Schema
"""
pass
class BaseRequestSchema(BaseModel):
"""
This is base schema for input requests to the Collection Class
"""
import redis
from scripts.config import RedisConfig
rules_redis = redis.Redis(host=RedisConfig.redis_host, port=RedisConfig.redis_port,
db=int(RedisConfig.rules_redis_db), decode_responses=True)
class ILensException(Exception):
pass
class RuleEngineException(ILensException):
pass
class RequiredFieldsMissing(RuleEngineException):
pass
import datetime
import json
from scripts.db.mongo import mongo_client
from scripts.db.mongo.ilens_configuration.aggregates.tags import TagsAggregate
from scripts.db.redis_connection import rules_redis
from scripts.constants import RuleCommonKeys
from scripts.logging.logger import logger
from scripts.utils.common_utils import CommonUtils
from scripts.db.mongo.ilens_configuration.collections.rule_engine import RuleEngine
from scripts.db.mongo.ilens_configuration.collections.tags import Tags
class RuleConfigurationHandler(object):
def __init__(self):
self.rules_redis = rules_redis
self.rules_mongo = RuleEngine(mongo_client=mongo_client)
self.tags_mongo = Tags(mongo_client=mongo_client)
self.tags_aggregate = TagsAggregate
self.common_utils = CommonUtils()
def create_rule_engine(self, input_data):
"""
this will function will save rule meta data
"""
try:
user_id = self.common_utils.get_usr_id(input_data=input_data)
if not user_id:
return {"status": "failed", "message": "Unauthorized user"}
new_rule = False
if "rule_engine_id" in input_data and input_data["rule_engine_id"] == "":
rule_name_exists = self.duplicate_rule_name_check(input_data=input_data)
if rule_name_exists:
return {"status": "failed", "message": "Rule name already exists!!"}
input_data["created_on"] = (datetime.datetime.now()).strftime("%d/%m/%Y")
input_data["rule_engine_id"] = "rule_engine_" + self.common_utils.get_next_id("rule_engine")
input_data["created_by"] = user_id
new_rule = True
input_data['disable_all'] = input_data.get('disable_all', False)
insert_block_json_list = []
schedule_start = None
schedule_end = None
schedule_type = 'realtime'
execute_on = dict()
rule_type = input_data['Selected_ruleType'].lower().replace(" ", "")
if rule_type == RuleCommonKeys.SCHEDULE:
schedule_data = input_data.get('schedule', dict)
if schedule_data.get('schedule_Type') == 'recurring':
schedule_start = int(schedule_data['duration_startDate'])
schedule_end = int(schedule_data['duration_endDate'])
schedule_type = schedule_data.get('schedule_Type', '')
execute_on = self.get_execute_on(input_data.get("schedule", dict()))
derived_tag_list = self.get_derived_tags()
output_rule_block_map = self.get_rule_block_mapping(input_data['project_id'])
if "calcFormulaList" in input_data: # check if blocks exists or rule is empty
counter = 1
for each_block in input_data["calcFormulaList"]: # Assign block id
each_block.update({"block_id": "block_" + str(counter)})
counter += 1
for each_block_data in input_data["calcFormulaList"]: # each block data of rule
rule_block_parent = []
json_to_redis = self.make_json_to_push_into_redis(rule_block_data=each_block_data,
input_data=input_data) # prepare redis store json
json_to_redis['schedule_start'] = schedule_start
json_to_redis['schedule_end'] = schedule_end
if json_to_redis["transformation_type"] == "route":
if len(json_to_redis["output_tags"]) > 1:
json_to_redis["multi_tag"] = True
else:
json_to_redis["multi_tag"] = False
else:
cur_block_output_tag = ""
if each_block_data['completeTagId'] != "":
if each_block_data['completeTagId'] not in derived_tag_list:
# output tag can only be derived tag
return {"status": "failed", "message": "Rule output should be derived tag"}
# check if current block output is output of any other block outside rule
cur_block_output_tag = f"{each_block_data['output_devices'][0]}$" \
f"{each_block_data['completeTagId']}"
if cur_block_output_tag in output_rule_block_map:
if output_rule_block_map[cur_block_output_tag]['id'].split(".")[0] != input_data[
'rule_engine_id']:
raise Exception(
f"Rule output of {each_block_data['block_id']} is output of a block of"
f" rule {output_rule_block_map[cur_block_output_tag]['name']}")
tags_used_in_rule_block = json_to_redis["selectedTags"]
for each_used_tag in tags_used_in_rule_block:
# if a derived tag is input tag, it should be output of some other rule (cyclic restriction)
tag_value = each_used_tag.split("$")[-1]
if tag_value in derived_tag_list:
if each_used_tag not in output_rule_block_map:
raise Exception(f"Derived tag used in {each_block_data['block_id']}"
" is not output of any other rule")
# check if this tag is used as output tag of another block within same rule
for each_block in input_data["calcFormulaList"]:
if each_block['completeTagId'] == '':
continue # lookup skip
if each_block['block_id'] == each_block_data['block_id']: # skip if it is current block
continue
block_result_tag = f"{each_block['output_devices'][0]}$" \
f"{each_block['completeTagId']}"
# check if block output is output of other block in the current rule
if cur_block_output_tag == block_result_tag:
raise Exception("More than one block has same output tag")
if each_used_tag == block_result_tag:
rule_block_parent.append(f"{input_data['rule_engine_id']}."
f"{each_block['block_id']}")
# check if this tag is used as output tag of any block of another rule
if each_used_tag in output_rule_block_map:
rule_block_parent.append(output_rule_block_map[each_used_tag]['id'])
each_block_data.update({"rule_block_parent": list(set(rule_block_parent))})
json_to_redis.update({"rule_block_parent": list(set(rule_block_parent))})
if len(json_to_redis["selectedTags"]) > 1:
json_to_redis["multi_tag"] = True
else:
json_to_redis["multi_tag"] = False
json_to_redis['rule_type'] = rule_type
json_to_redis['execute_on'] = execute_on
each_block_data.update({"execute_on": execute_on})
each_block_data.update({"execute_on_tag": json_to_redis['execute_on_tag']})
json_to_redis['previousTags'] = each_block_data.get('previousTags', [])
json_to_redis['schedule_type'] = schedule_type
counter += 1
insert_block_json_list.append(json_to_redis)
if not new_rule: # for edit delete the existing rule and create new
delete_response = self.delete_rule_engine(input_data, delete_on_create=True)
if delete_response['status'] != 'success':
return {"status": "failed", "message": "Failed to update rule!!"}
input_data['created_on'] = delete_response.get('created_on')
if rule_type.lower() == RuleCommonKeys.SCHEDULE:
self.update_schedule_rule_to_redis(insert_block_json_list, execute_on, input_data['rule_engine_id'],
schedule_type)
elif rule_type.lower().replace(" ", "") == RuleCommonKeys.REALTIME:
self.update_realtime_rule_to_redis(insert_block_json_list)
input_data["last_updated"] = (datetime.datetime.now()).strftime("%d/%m/%Y")
input_data['execute_on'] = execute_on
new_values = input_data
self.rules_mongo.delete_one_rule({"rule_engine_id": input_data["rule_engine_id"]})
self.rules_mongo.update_one_rule(query_dict={"rule_engine_id": input_data["rule_engine_id"]},
data=new_values, upsert=True)
logger.debug("Rule saved successfully")
return {"status": "success", "message": "rule saved successfully", "rule_id": input_data["rule_engine_id"]}
except Exception as e:
logger.exception("Exception in rule creation:" + str(e))
return {"status": "failed", "message": str(e)}
def get_rule_block_mapping(self, project_id):
output_rule_block_map = {}
try:
query = {"$or": [{"transformation_type": "validation_and_transformation"},
{"Selected_ruleType": "Schedule"}]}
available_rules = self.rules_mongo.find_all_rules(query)
for each_rule in available_rules:
if each_rule.get('project_id', '') != project_id:
continue
for each_block in each_rule['calcFormulaList']:
try:
if each_block['completeTagId'] == '':
continue
block_result_tag = f"{each_block['output_devices'][0]}${each_block['completeTagId']}"
rule_block_id = f"{each_rule['rule_engine_id']}.{each_block['block_id']}"
output_rule_block_map.update({block_result_tag: {"id": rule_block_id,
"name": each_rule['ruleName']}})
except Exception as e:
logger.exception(f"Exception in get_rule_block_mapping {e}")
return output_rule_block_map
except Exception as e:
logger.exception(f"{e}")
raise Exception("Error fetching mapping details")
def get_derived_tags(self):
derived_tag_list = []
try:
aggregate_query = self.tags_aggregate.derived_tags_list
res = self.tags_mongo.find_by_aggregate(aggregate_query)
if res:
for each in res:
derived_tag_list += each['derived_tag_list']
return derived_tag_list
except Exception as e:
logger.exception(f"Exception in getting derived tag list {e}")
raise Exception("Failed to fetch derived tags")
def delete_rule_engine(self, input_data, delete_on_create=False):
""" this function will delete rule data in mongo and REDIS"""
try:
rule_data = self.rules_mongo.find_one_rule(query={"rule_engine_id": input_data["rule_engine_id"]})
if not rule_data:
return {"status": "failed", "message": "Failed to delete rule"}
created_on = rule_data.get('created_on', "")
rule_type = rule_data.get('Selected_ruleType')
for each_block in rule_data['calcFormulaList']:
if rule_type and rule_type.lower().replace(" ", "") == RuleCommonKeys.SCHEDULE:
for each_tag, values in each_block.get('execute_on', dict()).items():
self.rules_redis.hdel(each_tag, input_data["rule_engine_id"])
for each_val in values:
self.rules_redis.hdel(f"{each_tag}_{each_val}", input_data["rule_engine_id"])
# if some tag is missed will delete here
for tag in [RuleCommonKeys.MINUTE, RuleCommonKeys.HOUR, RuleCommonKeys.DAY, RuleCommonKeys.MONTH,
RuleCommonKeys.SHIFT, RuleCommonKeys.WEEK, RuleCommonKeys.DAY_OF_WEEK,
RuleCommonKeys.YEAR]:
self.rules_redis.hdel(tag, input_data["rule_engine_id"])
else:
self.remove_rule_from_redis(rule_data)
if not delete_on_create:
self.rules_mongo.delete_one_rule(query={"rule_engine_id": input_data["rule_engine_id"]})
logger.debug("rule deleted successfully")
return {"status": "success", "message": "rule deleted successfully", "created_on": created_on}
except Exception as e:
logger.exception("exceptions while deleting rule:" + str(e))
return {"status": "failed", "message": "Unable to remove rule"}
@staticmethod
def make_json_to_push_into_redis(rule_block_data, input_data):
"""
it will make json to push into redis
"""
try:
execute_on_tag_data = rule_block_data.get('execute_on_tag', [])
execute_on_tag = [{k: v for k, v in d.items() if k != 'tagList'} for d in execute_on_tag_data]
each_block_dict = {"rule_id": input_data["rule_engine_id"],
"rule": {"code": rule_block_data.get("code"),
"parsedCode": rule_block_data.get("parsedCode")
},
"result_tag_id": rule_block_data.get("completeTagId"),
"selectedTags": rule_block_data.get("selectedTags"),
"output_devices": rule_block_data.get("output_devices"),
"ruleName": input_data.get("ruleName"),
"process_on": input_data.get("processOn"),
"transformation_type": input_data["transformation_type"],
"output_type": rule_block_data.get("output_type"),
"mqtt": rule_block_data.get("mqtt"),
"kafka": rule_block_data.get("kafka"),
"rest": rule_block_data.get("rest"),
"data_store": rule_block_data.get("data_store"),
"output_tags": rule_block_data.get("output_tags"),
"block_id": rule_block_data.get("block_id"),
"disable": rule_block_data.get("disable", False),
"executeOnTag": rule_block_data.get("executeOnTag", False),
"rule_block_parent": [
f"{input_data['rule_engine_id']}.{rule_block_data.get('block_id')}"],
"execute_on_tag": execute_on_tag
}
return each_block_dict
except Exception as e:
logger.exception("unable to make json to push to redis:" + str(e))
@staticmethod
def get_execute_on(schedule_data):
execute_on = {}
try:
if schedule_data['schedule_Type'].lower() == "onetime":
execute_at = datetime.datetime.fromtimestamp(int(schedule_data['onetimeSchedule_DateAndTime']))
if execute_at.year == datetime.datetime.now().year:
if execute_at.month == datetime.datetime.now().month:
if execute_at.day == datetime.datetime.now().day:
logger.debug("One Time Rule scheduled for current day")
cur_hr = datetime.datetime.now().hour
if execute_at.hour < cur_hr:
raise Exception("Rules can be configured only for future times")
elif execute_at.hour == cur_hr and execute_at.minute < datetime.datetime.now().minute:
raise Exception("Rules can be configured only for future times")
else:
execute_on = {f"{RuleCommonKeys.DAY}_{int(execute_at.day)}": ['*']}
else:
execute_on = {f"{RuleCommonKeys.MONTH}_{int(execute_at.month)}": ['*'],
f"{RuleCommonKeys.DAY}_{int(execute_at.day)}": ['*']}
else:
execute_on = {f"{RuleCommonKeys.YEAR}_{int(execute_at.year)}": ['*'],
f"{RuleCommonKeys.MONTH}_{int(execute_at.month)}": ['*'],
f"{RuleCommonKeys.DAY}_{int(execute_at.day)}": ['*']}
execute_on[f"{RuleCommonKeys.MINUTE}_{int(execute_at.minute)}"] = ['*']
execute_on[f"{RuleCommonKeys.HOUR}_{int(execute_at.hour)}"] = ['*']
elif schedule_data['schedule_Type'] == "recurring":
schedule_tag = schedule_data.get('scheduleInterval_Type')
if schedule_tag:
custom_time = schedule_data['trigger_time'].split(":")
if int(custom_time[0]) == 0 and int(custom_time[1]) == 0:
execute_on[schedule_data.get('scheduleInterval_Type')] = list(
map(int, schedule_data[schedule_data.get('scheduleInterval_Type', [])]))
else:
execute_on[f"{RuleCommonKeys.HOUR}_{int(custom_time[0])}"] = ['*']
execute_on[f"{RuleCommonKeys.MINUTE}_{int(custom_time[1])}"] = ['*']
execute_on[schedule_data.get('scheduleInterval_Type')] = list(
map(int, schedule_data[schedule_data.get('scheduleInterval_Type', [])]))
else:
logger.debug("Inappropriate schedule type")
raise Exception
return execute_on
except Exception as e:
logger.exception(f"Exception while forming execute on {e}")
raise Exception(e)
def update_schedule_rule_to_redis(self, insert_block_json_list, execute_on, ruleid, schedule_type):
"""
this function will insert schedule rule mapping into redis
"""
try:
conn_obj = self.rules_redis
if schedule_type.lower() == 'recurring':
keys_list = list(execute_on.keys())
if len(keys_list) == 1:
key = keys_list[0]
conn_obj.hset(key, ruleid, json.dumps(insert_block_json_list))
else:
for each_tag in execute_on:
if each_tag in [RuleCommonKeys.MONTH, RuleCommonKeys.DAY, RuleCommonKeys.DAY_OF_WEEK,
RuleCommonKeys.WEEK]:
if each_tag == RuleCommonKeys.DAY and len(execute_on[each_tag]) == 31:
continue
else:
for val in execute_on[each_tag]:
if val != '*':
conn_obj.hset(f"{each_tag}_{val}", ruleid, json.dumps(insert_block_json_list))
else:
conn_obj.hset(each_tag, ruleid, json.dumps(insert_block_json_list))
else:
conn_obj.hset(each_tag, ruleid, json.dumps(insert_block_json_list))
else:
for each_tag in execute_on:
conn_obj.hset(each_tag, ruleid, json.dumps(insert_block_json_list))
except Exception as e:
logger.exception(f"Exception in schedule rule to redis {e}")
raise Exception
def update_realtime_rule_to_redis(self, rule_blocks):
"""
this function will insert rule data into redis
"""
try:
conn_obj = self.rules_redis
deleted_tag_list = []
for json_to_redis in rule_blocks:
for each_tag in json_to_redis.get('previousTags', []):
if each_tag not in deleted_tag_list:
deleted_tag_list.append(each_tag)
conn_obj.hdel(each_tag, json_to_redis["rule_id"])
for json_to_redis in rule_blocks:
if json_to_redis["transformation_type"] == "route":
tags_used_in_rule = json_to_redis["output_tags"]
else:
tags_used_in_rule = json_to_redis["selectedTags"]
for each_selected_tag in tags_used_in_rule:
if conn_obj.exists(each_selected_tag):
rules_available = conn_obj.hget(each_selected_tag, json_to_redis["rule_id"])
if not rules_available:
conn_obj.hset(each_selected_tag, json_to_redis["rule_id"], json.dumps([json_to_redis]))
else:
rules_available = json.loads(rules_available)
rules_available.append(json_to_redis)
conn_obj.hset(each_selected_tag, json_to_redis["rule_id"], json.dumps(rules_available))
else:
conn_obj.hset(each_selected_tag, json_to_redis["rule_id"], json.dumps([json_to_redis]))
except Exception as e:
logger.exception("unable to insert rule into redis:" + str(e))
def duplicate_rule_name_check(self, input_data):
"""
this will check whether that rule already exists while creations
"""
exists = self.rules_mongo.find_one_rule(query={"ruleName": input_data["ruleName"]})
if exists is not None:
return True
else:
return False
def remove_rule_from_redis(self, rule_data):
try:
for each_block_data in rule_data["calcFormulaList"]:
if rule_data["transformation_type"] == "route":
tags_used_in_rule = each_block_data["output_tags"]
else:
tags_used_in_rule = each_block_data["selectedTags"]
for each_selected_tag in tags_used_in_rule:
rules_available = self.rules_redis.hgetall(each_selected_tag)
if rule_data["rule_engine_id"] in rules_available.keys():
self.rules_redis.hdel(each_selected_tag, rule_data["rule_engine_id"])
except Exception as e:
logger.exception("Unable to clear REDIS cache:" + str(e))
from copy import deepcopy
from scripts.logging.logger import logger
from scripts.db.mongo import mongo_client
from scripts.db.mongo.ilens_configuration.aggregates.tags import TagsAggregate
from scripts.db.mongo.ilens_configuration.collections.tags import Tags
from scripts.handler.rule_configuration_handler import RuleConfigurationHandler
from scripts.errors.module_exceptions import RequiredFieldsMissing
class RuleUpdate:
def __init__(self):
self.tags_mongo = Tags(mongo_client=mongo_client)
self.tags_aggregate = TagsAggregate
self.rule_configuration_handler = RuleConfigurationHandler()
def rule_update(self):
try:
request_payload = [{
"lookup_name": "Rule_lookup_one",
"match_type": "exact",
"hierarchy_ste": ["[Kite:AC VOLTAGE-1]",
"[Kite:Temperature]"],
"complete_tag_id": "tag_36980",
"selected_tags": [
"site_305$tag_125",
"site_305$tag_100"
],
"output_device": ["site_305"]
},
{
"lookup_name": "Rule_lookup_two",
"match_type": "exact",
"hierarchy_ste": "[Kite:AC VOLTAGE-1]",
"complete_tag_id": "tag_36980",
"selected_tags": ["site_305$tag_125"],
"output_devices": [
"site_305"
]
}]
static_json = {
"rule_engine_id": "",
"Selected_device": [],
"Selected_ruleType": "", # todo
"calcFormulaList": [],
"deviceDescription": "", # todo
"disable_all": "", # todo --> bool
"execute_on": {}, # todo --> Optional
"processOn": "", # todo
"project_id": "", # todo
"ruleName": "", # todo
"schedule": "", # todo --> bool --> Optional
"selected_device_meta": {}, # todo --> Optional
"transformation_type": "", # todo --> Optional
"user_id": "", # todo
"status": "" # todo --> Optional
}
required_fields = ["Selected_ruleType", "deviceDescription", "disable_all",
"processOn", "project_id", "ruleName", "user_id"] # required fields
for each_rule_index, each_rule in enumerate(request_payload):
static_calc_form_json = {
"code": "",
"parsedCode": {
"type": "Program",
"body": [
{
"type": "ExpressionStatement",
"expression": {
"type": "CallExpression",
"callee": {
"type": "Identifier",
"name": "lookup"
},
"arguments": []
}
}
],
"sourceType": "script"
},
"_previewValue": "NA",
"calcuationValidObject": {
"status": True,
"message": "Correct , Valid rule"
},
"completeTagId": "",
"selectedTags": [],
"previousTags": [],
"output_devices": [],
"output_type": [
"data_store"
],
"mqtt": {
"topic": None
},
"kafka": {
"topic": None
},
"rest": {
"url": None,
"request_type": None,
"payload": None
},
"data_store": {},
"output_tags": [],
"disable": False,
"execute_on_tag": [],
"block_id": "",
"rule_block_parent": [],
"execute_on": {},
"completeTagName": ""
}
identifier_dict = {
"type": "Identifier",
"name": ""
}
literal_dict = {
"type": "Literal",
"value": "",
"raw": ""
}
selected_device_dict = {
"id": "",
"itemName": ""
}
code_name = ""
if isinstance(each_rule.get("hierarchy_ste"), list):
for each_hierarchy_ste in each_rule.get("hierarchy_ste", []):
if code_name == "":
code_name += each_hierarchy_ste
else:
code_name += ", " + each_hierarchy_ste
else:
code_name += each_rule.get("hierarchy_ste")
code_name += ", " + "'" + each_rule.get("lookup_name", "") + "'"
code_name += ", " + "'" + each_rule.get("match_type", "") + "'"
static_calc_form_json["code"] = "lookup" + "(" + code_name + ")"
if isinstance(each_rule.get("selected_tags"), list):
for each_hierarchy in each_rule.get("selected_tags", []):
identifier_dict["name"] = each_hierarchy
static_calc_form_json["parsedCode"]["body"][0]["expression"]["arguments"].append(
deepcopy(identifier_dict))
else:
identifier_dict["name"] = each_rule.get("selected_tags", "")
static_calc_form_json["parsedCode"]["body"][0]["expression"]["arguments"].append(
deepcopy(identifier_dict))
literal_dict["value"] = each_rule.get("lookup_name", "")
literal_dict["raw"] = "'" + each_rule.get("lookup_name", "") + "'"
static_calc_form_json["parsedCode"]["body"][0]["expression"]["arguments"].append(deepcopy(literal_dict))
literal_dict["value"] = each_rule.get("match_type", "")
literal_dict["raw"] = "'" + each_rule.get("match_type", "") + "'"
static_calc_form_json["parsedCode"]["body"][0]["expression"]["arguments"].append(deepcopy(literal_dict))
static_calc_form_json["completeTagId"] = each_rule.get("complete_tag_id")
static_calc_form_json["selectedTags"] = each_rule.get("selected_tags", [])
static_calc_form_json["previousTags"] = each_rule.get("selected_tags", [])
static_calc_form_json["output_devices"] = each_rule.get("output_devices", [])
if each_rule.get("complete_tag_name"):
static_calc_form_json["completeTagName"] = each_rule.get("complete_tag_name")
else:
complete_tag_name_query = self.tags_aggregate.tag_id_name_map
complete_tag_name = self.tags_mongo.find_by_aggregate(complete_tag_name_query)
if complete_tag_name:
complete_tag_name = complete_tag_name[0]
static_calc_form_json["completeTagName"] = complete_tag_name["tag_id_name"][
each_rule.get("complete_tag_id")]
block_id = "block_" + str((each_rule_index + 1))
static_calc_form_json["block_id"] = block_id
static_json["calcFormulaList"].append(static_calc_form_json)
for each_selected_tags in each_rule.get("selected_tags", []):
selected_device_array = each_selected_tags.split("$tags")
if selected_device_array:
selected_device = selected_device_array[0]
selected_device_dict["id"] = selected_device
static_json["Selected_device"].append(deepcopy(selected_device_dict))
for each_required_field in required_fields:
if not static_json.get(each_required_field):
raise RequiredFieldsMissing # Raise error if required fields are not filled
print(static_json)
return_json = self.rule_configuration_handler.create_rule_engine(static_json)
print(return_json)
except Exception as e:
logger.exception(str(e))
raise
import os
import logging
from scripts.config import LOG
from logging.handlers import RotatingFileHandler
__log_path__ = str(LOG.LOG_PATH)
__log_level__ = str(LOG.LOG_LEVEL)
__max_bytes__ = int(LOG.LOG_MAX_BYTES)
__handler_type__ = str(LOG.LOG_HANDLER)
_log_file_name__ = str(LOG.LOG_FILE_NAME)
__backup_count__ = int(LOG.LOG_BACKUP_COUNT)
complete_log_path = os.path.join(__log_path__, _log_file_name__)
if not os.path.isdir(complete_log_path):
os.makedirs(complete_log_path)
def get_logger(log_file_name=complete_log_path, log_level=__log_level__, time_format="%Y-%m-%d %H:%M:%S",
handler_type=__handler_type__, max_bytes=__max_bytes__, backup_count=__backup_count__):
"""
Creates a rotating log
"""
log_file = os.path.join(log_file_name + '.log')
__logger__ = logging.getLogger(log_file_name)
__logger__.setLevel(log_level.strip().upper())
debug_formatter = '%(asctime)s - %(levelname)-6s - %(name)s - ' \
'[%(threadName)5s:%(filename)5s:%(funcName)5s():''%(lineno)s] - %(message)s'
formatter_string = '%(asctime)s - %(levelname)-6s - %(name)s - %(levelname)3s - %(message)s'
if log_level.strip().upper() == log_level:
formatter_string = debug_formatter
formatter = logging.Formatter(formatter_string, time_format)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
if __logger__.hasHandlers():
__logger__.handlers.clear()
if str(handler_type).lower() == "rotating_file_handler":
# Rotating File Handler
handler = RotatingFileHandler(log_file, maxBytes=max_bytes, backupCount=backup_count)
handler.setFormatter(formatter)
if __logger__.hasHandlers():
__logger__.handlers.clear()
__logger__.addHandler(handler)
else:
# File Handler
hdlr_service = logging.FileHandler(log_file)
hdlr_service.setFormatter(formatter)
if __logger__.hasHandlers():
__logger__.handlers.clear()
__logger__.addHandler(hdlr_service)
return __logger__
logger = get_logger()
from scripts.db.mongo import mongo_client
from scripts.db.mongo.ilens_configuration.collections.unique_id import UniqueId, UniqueIdSchema
from scripts.logging.logger import logger
class CommonUtils:
def __init__(self):
self.unique = UniqueId(mongo_client=mongo_client)
def get_next_id(self, param1):
my_doc = self.unique.find_one_record(key=param1)
if not my_doc:
my_dict = UniqueIdSchema(key=param1, id="100")
self.unique.insert_record(my_dict)
return my_dict.id
else:
my_doc = [self.unique.find_record(param1, {"_id": 0, "id": 1})]
for each_document in my_doc:
count_value = str(int(each_document['id']) + 1)
new_values = {"id": count_value}
self.unique.update_one_record(param1, new_values)
return str(int(each_document['id']) + 1)
@staticmethod
def get_usr_id(input_data=None):
"""
Definition for getting the user id
:param input_data:
:return:
"""
user_id = None
try:
if "user_id" in input_data and input_data["user_id"] != "":
user_id = input_data["user_id"]
return user_id
else:
return user_id
except Exception as e:
logger.exception(str(e))
return user_id
"""
Mongo Utility
Author: Irfanuddin Shafi Ahmed
Reference: Pymongo Documentation
"""
from typing import Dict, List, Optional
from pymongo import MongoClient
from pymongo.cursor import Cursor
class MongoConnect:
def __init__(self, uri):
try:
self.uri = uri
self.client = MongoClient(self.uri, connect=False)
except Exception:
raise
def __call__(self, *args, **kwargs):
return self.client
def __repr__(self):
return f"Mongo Client(uri:{self.uri}, server_info={self.client.server_info()})"
class MongoCollectionBaseClass:
def __init__(self, mongo_client, database, collection):
self.client = mongo_client
self.database = database
self.collection = collection
def __repr__(self):
return f"{self.__class__.__name__}(database={self.database}, collection={self.collection})"
def insert_one(self, data: Dict):
"""
The function is used to inserting a document to a collection in a Mongo Database.
:param data: Data to be inserted
:return: Insert ID
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.insert_one(data)
return response.inserted_id
except Exception:
raise
def insert_many(self, data: List):
"""
The function is used to inserting documents to a collection in a Mongo Database.
:param data: List of Data to be inserted
:return: Insert IDs
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.insert_many(data)
return response.inserted_ids
except Exception:
raise
def find(self,
query: Dict,
filter_dict: Optional[Dict] = None,
sort=None,
skip: Optional[int] = 0,
limit: Optional[int] = None) -> Cursor:
"""
The function is used to query documents from a given collection in a Mongo Database
:param query: Query Dictionary
:param filter_dict: Filter Dictionary
:param sort: List of tuple with key and direction. [(key, -1), ...]
:param skip: Skip Number
:param limit: Limit Number
:return: List of Documents
"""
if sort is None:
sort = list()
if filter_dict is None:
filter_dict = {"_id": 0}
database_name = self.database
collection_name = self.collection
try:
db = self.client[database_name]
collection = db[collection_name]
if len(sort) > 0:
cursor = collection.find(query, filter_dict, ).sort(sort).skip(skip)
else:
cursor = collection.find(query, filter_dict, ).skip(skip)
if limit:
cursor = cursor.limit(limit)
return cursor
except Exception:
raise
def find_one(self,
query: Dict,
filter_dict: Optional[Dict] = None):
try:
database_name = self.database
collection_name = self.collection
if filter_dict is None:
filter_dict = {"_id": 0}
db = self.client[database_name]
collection = db[collection_name]
response = collection.find_one(query, filter_dict)
return response
except Exception:
raise
def update_one(self,
query: Dict,
data: Dict,
upsert: bool = False):
"""
:param upsert:
:param query:
:param data:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.update_one(query, {"$set": data}, upsert=upsert)
return response.modified_count
except Exception:
raise
def update_many(self,
query: Dict,
data: Dict,
upsert: bool = False):
"""
:param upsert:
:param query:
:param data:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.update(query, {"$set": data}, upsert=upsert)
return response.get("nModified")
except Exception:
raise
def delete_many(self, query: Dict):
"""
:param query:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.delete_many(query)
return response.deleted_count
except Exception:
raise
def delete_one(self, query: Dict):
"""
:param query:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.delete_one(query)
return response.deleted_count
except Exception:
raise
def distinct(self, query_key: str, filter_json: Optional[Dict] = None):
"""
:param query_key:
:param filter_json:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.distinct(query_key, filter_json)
return response
except Exception:
raise
def aggregate(self, pipelines: List, ):
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.aggregate(pipelines)
return response
except Exception:
raise
def drop(self):
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.drop()
return response
except Exception:
raise
def find_count(self, json_data, database_name, collection_name):
"""
:param json_data:
:param database_name: The database to which the collection/ documents belongs to.
:param collection_name: The collection to which the documents belongs to.
:return:
"""
try:
db = self.client[database_name]
mongo_response = db[collection_name].find(json_data).count()
return mongo_response
except Exception as e:
raise
class MongoAggregateBaseClass:
def __init__(self, mongo_client, database, ):
self.client = mongo_client
self.database = database
def aggregate(self, collection, pipelines: List, ):
try:
database_name = self.database
collection_name = collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.aggregate(pipelines)
return response
except Exception:
raise
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment