Commit d33b9882 authored by Sabari T's avatar Sabari T

lookup rule

parent 5e009feb
log:
file_name: ilens
file_name: lookup_rule
path: Log/
level: DEBUG
handler: rotating_file_handler
......
class SiteConfAggregate:
@staticmethod
def find_hierarchy_name(input_data):
id_match_dict = dict()
name_project_dict = {'_id': 0, 'site_name': 1}
concat_project_list = ['$site_name']
if input_data.get("dept_id"):
id_match_dict.update({'dept.dept_id': input_data["dept_id"]})
name_project_dict.update({"dept_name": "$dept.dept_name"})
concat_project_list.append('>')
concat_project_list.append('$dept_name')
if input_data.get("line_id"):
id_match_dict.update({'line.line_id': input_data["line_id"]})
name_project_dict.update({"line_name": "$line.line_name"})
concat_project_list.append('>')
concat_project_list.append('$line_name')
if input_data.get('equipment_id'):
id_match_dict.update({'equipment.equipment_id': input_data["equipment_id"]})
name_project_dict.update({"equipment_name": "$equipment.equipment_name"})
concat_project_list.append('>')
concat_project_list.append('$equipment_name')
query = [
{
'$match': {
'customer_project_id': input_data["project_id"],
'site_id': input_data["site_id"]
}
}, {
'$unwind': '$line'
}, {
'$unwind': '$equipment'
}, {
'$match': id_match_dict
}, {
'$project': name_project_dict
}, {
'$project': {
'itemName': {
'$concat': concat_project_list
}
}
}
]
return query
from typing import Optional, Dict, List
from scripts.constants import DatabaseNames, CollectionNames
from scripts.constants import SiteConfCollectionKeys
from scripts.db.mongo.schema import MongoBaseSchema
from scripts.utils.mongo_util import MongoCollectionBaseClass
class SiteConfSchema(MongoBaseSchema):
site_name: Optional[str]
site_info: Optional[Dict]
customer_project_id: Optional[str]
site_id: Optional[str]
product_encrypted: Optional[bool]
dept: Optional[List]
line: Optional[List]
equipment: Optional[List]
class SiteConf(MongoCollectionBaseClass):
def __init__(self, mongo_client):
super().__init__(mongo_client, database=DatabaseNames.ilens_configuration,
collection=CollectionNames.site_conf)
@property
def key_customer_project_id(self):
return SiteConfCollectionKeys.KEY_CUSTOMER_PROJECT_ID
@property
def key_site_id(self):
return SiteConfCollectionKeys.KEY_SITE_ID
@property
def key_site_name(self):
return SiteConfCollectionKeys.KEY_SITE_NAME
@property
def key_process_id(self):
return SiteConfCollectionKeys.KEY_PROCESS_ID
def get_all_sites(self, filter_dict=None,
sort=None, skip=0, limit=None, site_id=None, **query):
"""
The following function will give all sites for the given set of
search parameters as keyword arguments
:param filter_dict:
:param sort:
:param skip:
:param limit:
:param query:
:return:
"""
if site_id is not None:
query.update({self.key_site_id: site_id})
sites = self.find(filter_dict=filter_dict, sort=sort, skip=skip, limit=limit, query=query)
if sites:
return list(sites)
return list()
def get_all_accessible_sites(self, customer_project_id, user_access_site_list):
query_json = {"$and": [
{self.key_customer_project_id: customer_project_id, self.key_site_id: {"$in": user_access_site_list}}]}
response = self.find(query=query_json)
if not response:
return list()
else:
return list(response)
def find_site_by_site_name(self, site_name, project_id):
query_json = {self.key_site_name: site_name, self.key_customer_project_id: project_id}
response = self.find_one(query=query_json)
if response:
return dict(response)
else:
return dict()
def find_site_by_site_id(self, site_id):
site = self.find_one(query={self.key_site_id: site_id})
if site:
return dict(site)
return dict()
def find_site_by_query(self, query):
site = self.find(query=query)
if site:
return site
return dict()
def find_one_site_by_query(self, query):
site = self.find_one(query=query)
if site:
return dict(site)
return dict()
def find_sites_by_project(self, project_id):
sites = self.find(query={self.key_customer_project_id: project_id})
if sites:
return list(sites)
return list()
def find_accessible_sites(self, project_id, accessible_sites):
query_dict = {self.key_customer_project_id: project_id, self.key_site_id: {"$in": accessible_sites}}
sites = self.find(query=query_dict)
if sites:
return list(sites)
return list()
def delete_one_site(self, site_id):
query = {self.key_site_id: site_id}
return self.delete_one(query=query)
def delete_many_site(self, query=None):
return self.delete_many(query)
def update_one_site(self, site_id, data):
query = {self.key_site_id: site_id}
return self.update_one(data=data, query=query)
def update_one_process(self, process_id, updated_data):
"""
The following function will update one tag in
tags collection based on the given query
"""
query_dict = {self.key_process_id: process_id}
return self.update_one(data=updated_data, query=query_dict)
def update_many_site(self, site_id, data):
query = {self.key_site_id: site_id}
response = self.update_many(query=query, data=data)
if response:
return list(response)
else:
return list()
def insert_one_site(self, data):
response = self.insert_one(data=data)
return response
def insert_many_sites(self, data):
response = self.insert_many(data=data)
if response:
return list(response)
else:
return list()
def distinct_site_by_key(self, query_key, project_id):
filter_dict = {self.key_customer_project_id: project_id}
response = self.distinct(query_key, filter_dict)
if not response:
return list()
return response
def find_site_by_aggregate(self, query):
site_details = self.aggregate(query)
if not site_details:
return list()
return list(site_details)
......@@ -4,6 +4,8 @@ from scripts.logging.logger import logger
from scripts.db.mongo import mongo_client
from scripts.db.mongo.ilens_configuration.aggregates.tags import TagsAggregate
from scripts.db.mongo.ilens_configuration.collections.tags import Tags
from scripts.db.mongo.ilens_configuration.collections.site_conf import SiteConf
from scripts.db.mongo.ilens_configuration.aggregates.site_conf import SiteConfAggregate
from scripts.handler.rule_configuration_handler import RuleConfigurationHandler
from scripts.errors.module_exceptions import RequiredFieldsMissing
......@@ -12,6 +14,8 @@ class RuleUpdate:
def __init__(self):
self.tags_mongo = Tags(mongo_client=mongo_client)
self.tags_aggregate = TagsAggregate
self.sites_mongo = SiteConf(mongo_client=mongo_client)
self.sites_aggregate = SiteConfAggregate
self.rule_configuration_handler = RuleConfigurationHandler()
def rule_update(self):
......@@ -48,7 +52,7 @@ class RuleUpdate:
"disable_all": "", # todo --> bool
"execute_on": {}, # todo --> Optional
"processOn": "", # todo
"project_id": "", # todo
"project_id": "project_099", # todo
"ruleName": "", # todo
"schedule": "", # todo --> bool --> Optional
"selected_device_meta": {}, # todo --> Optional
......@@ -167,11 +171,32 @@ class RuleUpdate:
static_calc_form_json["block_id"] = block_id
static_json["calcFormulaList"].append(static_calc_form_json)
for each_selected_tags in each_rule.get("selected_tags", []):
selected_device_array = each_selected_tags.split("$tags")
selected_device_array = each_selected_tags.split("$tag")
if selected_device_array:
selected_device = selected_device_array[0]
selected_device_dict["id"] = selected_device
static_json["Selected_device"].append(deepcopy(selected_device_dict))
device_exist = False
for each_selected_device in static_json.get("Selected_device", []):
if selected_device_dict["id"] == each_selected_device.get("id"):
device_exist = True
if not device_exist:
hierarchy_name_array = selected_device_dict["id"].split("$")
request_dict = {"project_id": static_json["project_id"]}
for each_level in hierarchy_name_array:
if 'site' in each_level:
request_dict.update({"site_id": each_level})
elif 'dept' in each_level:
request_dict.update({"dept_id": each_level})
elif 'line' in each_level:
request_dict.update({"line_id": each_level})
elif 'equipment' in each_level:
request_dict.update({"equipment_id": each_level})
name_hierarchy_query = self.sites_aggregate.find_hierarchy_name(request_dict)
name_hierarchy = self.sites_mongo.find_site_by_aggregate(name_hierarchy_query)
if name_hierarchy:
name_hierarchy = name_hierarchy[0]
selected_device_dict["itemName"] = name_hierarchy.get("itemName")
static_json["Selected_device"].append(deepcopy(selected_device_dict))
for each_required_field in required_fields:
if not static_json.get(each_required_field):
raise RequiredFieldsMissing # Raise error if required fields are not filled
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment