Commit 5eb827c1 authored by harshavardhan.c's avatar harshavardhan.c

dashboard view for the hierarachy level oee.

parent 4121d04d
import pytz
if __name__ == '__main__':
from dotenv import load_dotenv
......@@ -34,19 +36,27 @@ class MachineOEECalculator:
try:
hierarchy_dict = self.common_handler.get_valid_oee_monitoring_hierarchy(project_id=request_data.project_id)
now = datetime.today() - timedelta(days=2)
now = datetime.today() - timedelta(days=3)
oee_start_time = datetime.strptime(request_data.monitor_time, '%H:%M').replace(year=now.year,
month=now.month,
day=now.day).strftime(
CommonConstants.USER_META_TIME_FORMAT)
oee_end_time = datetime.now().strftime(CommonConstants.USER_META_TIME_FORMAT)
oee_end_time = datetime.now().astimezone(tz=pytz.timezone(request_data.tz)).strftime(CommonConstants.USER_META_TIME_FORMAT)
for k, v in hierarchy_dict.items():
site_id = k.split("$")[0]
if not v.get(TagCategoryConstants.OEE_CYCLE_DESIGN_CATEGORY):
logger.debug(f"Design Parameter(CYCLE TIME) not found for selected hierarchy {v}")
continue
cycle_time_id = v[TagCategoryConstants.OEE_CYCLE_DESIGN_CATEGORY].split("$")[-1]
cycle_time = self.common_handler.get_cycle_time_value_from_hierarchy(tag_id=cycle_time_id)
if isinstance(cycle_time, bool) and not cycle_time:
logger.debug(f"OEE Cycle Design parameter details not found for selected hierarchy")
continue
downtime = self.common_util.get_downtime_details_by_hierarchy(
hierarchy=k, project_id=request_data.project_id)
input_data = OEEDataInsertRequest(prod_start_time=oee_start_time,
prod_end_time=oee_end_time, downtime=downtime,
hierarchy=k, cycle_time=os.environ.get("CYCLE_TIME", default=5),
hierarchy=k, cycle_time=cycle_time,
tz=request_data.tz,
project_id=request_data.project_id)
input_data.total_units, input_data.reject_units = self.batch_oee_handler.get_data_for_tags(
......@@ -54,13 +64,16 @@ class MachineOEECalculator:
oee_response: BatchOEEData = self.oee_engine.start_batch_oee_calc(
request_data=OEEDataSaveRequest(**input_data.dict()))
data_dict = {
v[TagCategoryConstants.OEE_OUTPUT_TOTAL_UNITS_CATEGORY]: oee_response.total_units,
v[TagCategoryConstants.OEE_OUTPUT_REJECT_UNITS_CATEGORY]: oee_response.reject_units,
v[TagCategoryConstants.OEE_OUTPUT_CATEGORY]: oee_response.oee,
v[TagCategoryConstants.OEE_OUTPUT_PERFORMANCE_CATEGORY]: oee_response.performance,
v[TagCategoryConstants.OEE_OUTPUT_QUALITY_CATEGORY]: oee_response.quality,
v[TagCategoryConstants.OEE_OUTPUT_QUALITY_LOSS_CATEGORY]: oee_response.quality_loss,
v[TagCategoryConstants.OEE_OUTPUT_PERFORMANCE_LOSS_CATEGORY]: oee_response.performance_loss,
v[TagCategoryConstants.OEE_OUTPUT_AVAILABILITY_CATEGORY]: oee_response.availability,
v[TagCategoryConstants.OEE_OUTPUT_AVAILABILITY_LOSS_CATEGORY]: oee_response.availability_loss
v[TagCategoryConstants.OEE_OUTPUT_AVAILABILITY_LOSS_CATEGORY]: oee_response.availability_loss,
v[TagCategoryConstants.OEE_OUTPUT_DOWNTIME_CATEGORY]: oee_response.downtime
}
message_dict = {
"data": data_dict,
......
......@@ -76,6 +76,7 @@ class DBConstants:
collection_tag_hierarchy = "tag_hierarchy"
collection_oee_layouts = "oee_layouts"
collection_lookup_table = "lookup_table"
collection_tags = "tags"
collection_customer_projects = "customer_projects"
......@@ -87,6 +88,9 @@ class TagCategoryConstants:
TOTAL_UNITS_CATEGORY = os.environ.get("TOTAL_UNITS_CATEGORY", default="OEE - Total Produced Units")
REJECT_UNITS_CATEGORY = os.environ.get("TOTAL_UNITS_CATEGORY", default="OEE - Reject Units")
OEE_OUTPUT_CATEGORY = os.environ.get("OEE_OUTPUT_CATEGORY", default="OEE Output Category")
OEE_OUTPUT_DOWNTIME_CATEGORY = os.environ.get("OEE_OUTPUT_DOWNTIME_CATEGORY",
default="OEE Output Downtime Category")
OEE_CYCLE_DESIGN_CATEGORY = os.environ.get("OEE_CYCLE_DESIGN_CATEGORY", default="OEE Cycle Design Category")
OEE_OUTPUT_PERFORMANCE_CATEGORY = os.environ.get("OEE_OUTPUT_PERFORMANCE_CATEGORY",
default="OEE Output Performance Category")
OEE_OUTPUT_QUALITY_CATEGORY = os.environ.get("OEE_OUTPUT_QUALITY_CATEGORY", default="OEE Output Quality Category")
......@@ -96,6 +100,10 @@ class TagCategoryConstants:
default="OEE Output Performance Loss Category")
OEE_OUTPUT_QUALITY_LOSS_CATEGORY = os.environ.get("OEE_OUTPUT_QUALITY_LOSS_CATEGORY",
default="OEE Output Quality Loss Category")
OEE_OUTPUT_TOTAL_UNITS_CATEGORY = os.environ.get("OEE_OUTPUT_QUALITY_LOSS_CATEGORY",
default="OEE Output Total Units Category")
OEE_OUTPUT_REJECT_UNITS_CATEGORY = os.environ.get("OEE_OUTPUT_QUALITY_LOSS_CATEGORY",
default="OEE Output Reject Units Category")
OEE_OUTPUT_AVAILABILITY_LOSS_CATEGORY = os.environ.get("OEE_OUTPUT_AVAILABILITY_LOSS_CATEGORY",
default="OEE Output Availability Loss Category")
......@@ -160,3 +168,18 @@ class CustomerProjectKeys:
KEY_PROCESS_TEMPLT_ID = "process_templt_id"
KEY_SOURCE_META = "source_meta"
KEY_ADD_PREFIX_TO_DATABASE = "add_prefix_to_database"
class TagsCollectionKeys:
KEY_TAG_NAME = "tag_name"
KEY_UNIT = "unit"
KEY_TAG_TYPE = "tag_type"
KEY_DESCRIPTION = "description"
KEY_ID = "id"
KEY_TAG_GROUP_ID = "tag_group_id"
KEY_DATA_TYPE = "data_type"
KEY_DEFAULT = "default"
KEY_SYSTEM_TAG_TYPE = "system_tag_type"
KEY_VALUE_LIST = "value_list"
KEY_PRODUCT_ENCRYPTED = "product_encrypted"
KEY_TAG_CATEGORY_ID = "tag_category_id"
......@@ -3,13 +3,12 @@ from copy import deepcopy
import pandas as pd
from scripts.config import DBConf
from scripts.core.engine.oee_calculator import OEETagFinder
from scripts.core.handlers.batch_oee_calc_handler import CalculateBatchOEEHandler
from scripts.core.engine.oee_calculator import OEETagFinder, OEECalculator
from scripts.core.handlers.common_handler import CommonHandler
from scripts.db.mongo.schema.tag_hierarchy import GetTagsLists, OutputTagsList
from scripts.errors import DataNotFound
from scripts.logging import logger
from scripts.schemas.batch_oee import ChartResponse, ChartDBResponse, ChartRequest, OEEDataInsertRequest
from scripts.schemas.batch_oee import ChartResponse, ChartDBResponse, ChartRequest
from scripts.utils.common_utils import CommonUtils
from scripts.utils.kairos_db_util import BaseQuery
from scripts.utils.kairos_db_util.df_formation_util import create_kairos_df
......@@ -21,23 +20,22 @@ class OEEAggregator:
self.common_util = CommonUtils()
self.base_query = BaseQuery()
self.oee_tag_finder = OEETagFinder()
self.oee_calc = OEECalculator()
self.common_handler = CommonHandler(project_id=project_id)
self.oee_handler = CalculateBatchOEEHandler(project_id=project_id)
def processor(self, data):
db_response = ChartDBResponse(**data)
duration = self.common_util.get_duration(meta=data, difference=True, tz=data["tz"])
cal_type = self.common_util.get_uom_type(uom_type=data["uom"])
db_response.total_time = self.common_util.get_diff_duration_in_int(input_time=duration, return_type=cal_type)
db_response.actual_cycle = round(
db_response.total_units / db_response.total_time, 2
def processor(self, input_data: ChartDBResponse):
duration = self.common_util.get_duration(meta=input_data.dict(), difference=True, tz=input_data.tz)
cal_type = self.common_util.get_uom_type(uom_type=input_data.uom)
input_data.total_time = self.common_util.get_diff_duration_in_int(input_time=duration, return_type=cal_type)
input_data.actual_cycle = round(
input_data.total_units / input_data.total_time, 2
)
db_response.ideal_cycle = round(db_response.cycle_time, 2)
db_response.good_units = round(
db_response.total_units - db_response.reject_units, 2
input_data.ideal_cycle = round(input_data.cycle_time, 2)
input_data.good_units = round(
input_data.total_units - input_data.reject_units, 2
)
chart_response = ChartResponse(**db_response.dict())
chart_response = ChartResponse(**input_data.dict())
return chart_response.dict()
def aggregator(self, request_data: ChartRequest):
......@@ -48,18 +46,22 @@ class OEEAggregator:
timestamp=True)) * 1000
hierarchy_tags = self.common_handler.tag_hierarchy_handler.get_tags_list_by_hierarchy(
GetTagsLists(**request_data.dict()))
total_units_tag_id = self.oee_tag_finder.get_total_units_tag_id(input_data=hierarchy_tags)
reject_units_tag_id = self.oee_tag_finder.get_reject_units_tag_id(input_data=hierarchy_tags)
output_tags_dict = self.common_handler.tag_hierarchy_handler.get_output_tags_for_oee(
input_data=OutputTagsList(**request_data.dict()))
if not output_tags_dict or not output_tags_dict.get(request_data.hierarchy):
return {}
cycle_time_tag_id = self.oee_tag_finder.get_cycle_time_tag_id(input_data=hierarchy_tags)
cycle_time_value = self.common_handler.get_cycle_time_value_from_hierarchy(
tag_id=cycle_time_tag_id.split("$")[-1])
if isinstance(cycle_time_value, bool) and not cycle_time_value:
logger.debug(f"OEE Cycle Design parameter details not found for selected hierarchy")
raise ValueError("Cycle Design Parameters not found")
updated_dict = self.common_handler.validate_hierarchy_tags(output_tags_dict[request_data.hierarchy])
new_columns_dict = self.common_handler.get_oee_keys_mapping_dict(output_tags_dict[request_data.hierarchy])
tags_list = list(updated_dict.values())
group_by_tags_list = deepcopy(tags_list)
group_by_tags_list.append(DBConf.KAIROS_DEFAULT_FULL_TAG)
tags_list.extend([total_units_tag_id, reject_units_tag_id])
if not tags_list:
return {}
kairos_util = KairosQuery(url=DBConf.KAIROS_URL)
......@@ -75,17 +77,24 @@ class OEEAggregator:
response_data=each_data,
tags_list=tags_list,
group_by_tags=group_by_tags_list,
tz=request_data.tz
tz=request_data.tz,
df_diff=False
)
if master_df.empty:
raise DataNotFound
master_df_columns = list(master_df.columns)
input_data = {"prod_start_time": start_time, "prod_end_time": end_time}
input_data.update(request_data.dict(exclude_none=True))
input_schema = OEEDataInsertRequest(**input_data)
input_schema.total_units, input_schema.reject_units = self.oee_handler.get_data_for_tags(
input_data=input_schema)
master_df.rename(columns=new_columns_dict, inplace=True)
final_records = master_df.to_dict('records')
oee_json = final_records[-1] if final_records else {}
oee_json.update({"productive_time": self.oee_calc.calculate_productive_time(cycle_time=cycle_time_value,
units_produced=oee_json[
"total_units"]),
"tz": request_data.tz, "cycle_time": cycle_time_value,
"prod_start_time": request_data.queryDate[0],
"prod_end_time": request_data.queryDate[-1]
})
res = self.processor(input_data=ChartDBResponse(**oee_json))
return res
except Exception as e:
logger.execption(f'Exception occurred while plotting the dashboard {e.args}')
logger.exception(f'Exception occurred while plotting the dashboard {e.args}')
raise
......@@ -104,6 +104,13 @@ class OEETagFinder:
raise ValueError(ErrorCodes.ERR007)
return input_data.get(category_name)
@staticmethod
def get_cycle_time_tag_id(input_data: dict, category_name=TagCategoryConstants.OEE_CYCLE_DESIGN_CATEGORY):
if not input_data.get(category_name):
logger.error(ErrorCodes.ERR008)
raise ValueError(ErrorCodes.ERR008)
return input_data.get(category_name)
class OEEEngine:
def __init__(self):
......
......@@ -13,7 +13,7 @@ from scripts.schemas.batch_oee import (
GetOEERequestOneBatch,
GetBatches,
BatchesGet,
ChartRequest,
ChartRequest, ChartDBResponse,
)
from scripts.schemas.meta import LabelValue
......@@ -102,7 +102,8 @@ class APIHandler:
prod_end_time=request_data.queryDate[1],
reference_id=request_data.reference_id
)
raw_data = self.oee_agg.processor(data)
db_response = ChartDBResponse(**data)
raw_data = self.oee_agg.processor(db_response)
return chart_maker.main_creator(raw_data, overall=False)
else:
agg_data = self.oee_agg.aggregator(request_data=request_data)
......
......@@ -6,10 +6,11 @@ import pytz
from sqlalchemy.orm import Session
from scripts.config import DBConf
from scripts.constants import ResponseCodes, CommonConstants, TagCategoryConstants
from scripts.constants import ResponseCodes, CommonConstants
from scripts.core.engine.oee_calculator import OEEEngine, OEETagFinder
from scripts.core.handlers.common_handler import CommonHandler
from scripts.core.handlers.tag_handler import TagHierarchyHandler
from scripts.db.mongo.schema.tag_hierarchy import GetTagsLists
from scripts.db.mongo.schema.tag_hierarchy import OutputTagsList, GetTagsLists
from scripts.db.psql.oee_discrete import DiscreteOEE
from scripts.db.redis_connections import oee_production_db
from scripts.errors import DataNotFound
......@@ -29,6 +30,7 @@ class CalculateBatchOEEHandler:
def __init__(self, project_id=None):
self.common_util = CommonUtils()
self.base_query = BaseQuery()
self.common_handler = CommonHandler(project_id=project_id)
self.oee_tag_finder = OEETagFinder()
self.tag_hierarchy_handler = TagHierarchyHandler(project_id=project_id)
......@@ -38,7 +40,18 @@ class CalculateBatchOEEHandler:
record_presence = table_obj.get_oee_data_by_reference_id(reference_id=request_data.reference_id,
hierarchy=request_data.hierarchy,
project_id=request_data.project_id)
request_data.total_units, request_data.reject_units = self.get_data_for_tags(input_data=request_data)
self.tag_hierarchy_handler.get_output_tags_for_oee(OutputTagsList(**request_data.dict()))
hierarchy_tags = self.tag_hierarchy_handler.get_tags_list_by_hierarchy(
GetTagsLists(**request_data.dict()))
cycle_time_tag_id = self.oee_tag_finder.get_cycle_time_tag_id(input_data=hierarchy_tags)
cycle_time_value = self.common_handler.get_cycle_time_value_from_hierarchy(
tag_id=cycle_time_tag_id.split("$")[-1])
if isinstance(cycle_time_value, bool) and not cycle_time_value:
logger.debug(f"OEE Cycle Design parameter details not found for selected hierarchy")
raise ValueError("Cycle Design Parameters not found")
request_data.cycle_time = cycle_time_value
request_data.total_units, request_data.reject_units = self.get_data_for_tags(input_data=request_data,
hierarchy_tags=hierarchy_tags)
redis_key = f"{request_data.project_id}${request_data.reference_id}"
if not record_presence:
if not request_data.prod_start_time:
......@@ -116,7 +129,7 @@ class CalculateBatchOEEHandler:
except Exception as e:
raise e
def get_data_for_tags(self, input_data: OEEDataInsertRequest):
def get_data_for_tags(self, input_data: OEEDataInsertRequest, hierarchy_tags=None):
total_units_value = 0
reject_units_value = 0
try:
......@@ -126,7 +139,9 @@ class CalculateBatchOEEHandler:
end_epoch = int(
datetime.strptime(input_data.prod_end_time, CommonConstants.USER_META_TIME_FORMAT).astimezone(
tz=pytz.timezone(input_data.tz)).timestamp()) * 1000
hierarchy_tags = self.tag_hierarchy_handler.get_tags_list_by_hierarchy(GetTagsLists(**input_data.dict()))
if not hierarchy_tags:
hierarchy_tags = self.tag_hierarchy_handler.get_tags_list_by_hierarchy(
GetTagsLists(**input_data.dict()))
total_units_tag_id = self.oee_tag_finder.get_total_units_tag_id(input_data=hierarchy_tags)
reject_units_tag_id = self.oee_tag_finder.get_reject_units_tag_id(input_data=hierarchy_tags)
kairos_util = KairosQuery(url=DBConf.KAIROS_URL)
......
......@@ -3,8 +3,8 @@ import os
from scripts.constants import TagCategoryConstants
from scripts.constants.db_connections import mongo_client
from scripts.core.handlers.tag_handler import TagHierarchyHandler
from scripts.db.mongo.ilens_configuration.collections.customer_projects import CustomerProjects
from scripts.db.mongo.ilens_configuration.collections.lookup_table import LookupTable
from scripts.db.mongo.ilens_configuration.collections.tags import Tags
from scripts.db.mongo.schema.tag_hierarchy import OutputTagsList
from scripts.logging import logger
......@@ -13,6 +13,7 @@ class CommonHandler:
def __init__(self, project_id=None):
self.lookup_table = LookupTable(project_id=project_id, mongo_client=mongo_client)
self.tag_hierarchy_handler = TagHierarchyHandler(project_id=project_id)
self.tag_conn = Tags(mongo_client=mongo_client, project_id=project_id)
def fetch_oee_hierarchy_from_look_up(self, project_id):
oee_lookup_name = os.environ.get("OEE_LOOKUP_NAME", default="oee_monitoring")
......@@ -58,7 +59,8 @@ class CommonHandler:
if _each_tag in {TagCategoryConstants.TOTAL_UNITS_CATEGORY, TagCategoryConstants.REJECT_UNITS_CATEGORY}:
continue
if not request_dict.get(_each_tag):
logger.debug(f"OEE Output Tag not configured for the hierarchy{list(request_dict.keys())[0]}")
logger.debug(
f"OEE Output Tag Category <{_each_tag}> not configured for the hierarchy")
return {}
hierarchy_dict.update({_each_tag: request_dict[_each_tag]})
return hierarchy_dict
......@@ -70,9 +72,9 @@ class CommonHandler:
def get_oee_keys_mapping_dict(input_dict: dict):
final_dict = {}
for k, v in input_dict.items():
if k == TagCategoryConstants.TOTAL_UNITS_CATEGORY:
if k == TagCategoryConstants.OEE_OUTPUT_TOTAL_UNITS_CATEGORY:
final_dict.update({v: "total_units"})
elif k == TagCategoryConstants.REJECT_UNITS_CATEGORY:
elif k == TagCategoryConstants.OEE_OUTPUT_REJECT_UNITS_CATEGORY:
final_dict.update({v: "reject_units"})
elif k == TagCategoryConstants.OEE_OUTPUT_CATEGORY:
final_dict.update({v: "oee"})
......@@ -86,4 +88,21 @@ class CommonHandler:
final_dict.update({v: "quality_loss"})
elif k == TagCategoryConstants.OEE_OUTPUT_AVAILABILITY_LOSS_CATEGORY:
final_dict.update({v: "availability_loss"})
elif k == TagCategoryConstants.OEE_OUTPUT_PERFORMANCE_LOSS_CATEGORY:
final_dict.update({v: "performance_loss"})
elif k == TagCategoryConstants.OEE_OUTPUT_DOWNTIME_CATEGORY:
final_dict.update({v: "downtime"})
return final_dict
def get_cycle_time_value_from_hierarchy(self, tag_id: str):
try:
tag_data = self.tag_conn.find_tag_by_id(tag_id=tag_id)
if tag_data.data_type.lower() != "integer":
return False
parameter_value = tag_data.data_quality_info.get("basic", {}).get("value")
if isinstance(parameter_value, int) and not parameter_value:
return False
return parameter_value
except Exception as e:
logger.exception(f'Exception occurred while fetching cycle value from tag {e.args}')
return False
from typing import List, Optional, Dict, Any
from scripts.constants import TagsCollectionKeys, DBConstants
from scripts.db.mongo.schema import MongoBaseSchema
from scripts.utils.mongo_util import MongoCollectionBaseClass
class TagsSchema(MongoBaseSchema):
"""
This is the Schema for the Mongo DB Collection.
All datastore and general responses will be following the schema.
"""
tag_name: Optional[str]
unit: Optional[str]
tag_type: Optional[str]
description: Optional[str]
id: Optional[str]
tag_group_id: Optional[str]
data_type: Optional[str]
default: Optional[bool]
system_tag_type: Optional[str] = ""
value_list: Optional[List]
product_encrypted: Optional[bool]
tag_category_id: Optional[str]
additional_fields: Optional[Dict] = dict()
attributes: Optional[List] = list()
data_quality_info: Optional[Dict]
tag_label: Optional[Any] = ""
class Tags(MongoCollectionBaseClass):
def __init__(self, mongo_client, project_id=None):
super().__init__(mongo_client, database=DBConstants.db_metadata,
collection=DBConstants.collection_tags)
self.project_id = project_id
@property
def key_tag_category_id(self):
return TagsCollectionKeys.KEY_TAG_CATEGORY_ID
@property
def key_tag_name(self):
return TagsCollectionKeys.KEY_TAG_NAME
@property
def key_unit(self):
return TagsCollectionKeys.KEY_UNIT
@property
def key_tag_type(self):
return TagsCollectionKeys.KEY_TAG_TYPE
@property
def key_description(self):
return TagsCollectionKeys.KEY_DESCRIPTION
@property
def key_id(self):
return TagsCollectionKeys.KEY_ID
@property
def key_tag_group_id(self):
return TagsCollectionKeys.KEY_TAG_GROUP_ID
@property
def key_default(self):
return TagsCollectionKeys.KEY_DEFAULT
@property
def key_data_type(self):
return TagsCollectionKeys.KEY_DATA_TYPE
@property
def key_system_tag_type(self):
return TagsCollectionKeys.KEY_SYSTEM_TAG_TYPE
@property
def key_value_list(self):
return TagsCollectionKeys.KEY_VALUE_LIST
@property
def key_product_encrypted(self):
return TagsCollectionKeys.KEY_PRODUCT_ENCRYPTED
def find_all_tags(self, filter_dict=None,
sort=None, skip=0, limit=None, **query):
"""
The following function will give all tags for the given set of
search parameters as keyword arguments
:param filter_dict:
:param sort:
:param skip:
:param limit:
:param query:
:return:
"""
all_tags = self.find(filter_dict=filter_dict, sort=sort, skip=skip, limit=limit, query=query)
if not all_tags:
return list()
return all_tags
def find_tags(self, query):
all_tags = self.find(query=query)
if all_tags:
return list(all_tags)
return list()
def find_tags_by_query(self, query, filter_dict=None):
all_tags = self.find(query=query, filter_dict=filter_dict)
if all_tags:
return list(all_tags)
return list()
def find_one_tag(self, filter_dict=None, **query) -> TagsSchema:
"""
The following function will give one tag for a given set of
search parameters as keyword arguments
:param filter_dict:
:param query:
:return:
"""
tag = self.find_one(filter_dict=filter_dict, query=query)
if tag:
# tag["tag_category_id"] = [tag.get("tag_category_id","")] \
# if isinstance(tag.get("tag_category_id",""), str) \
# else tag.get("tag_category_id")
try:
return TagsSchema(**tag)
except Exception as e:
raise e
else:
return tag
def insert_one_tag(self, data):
"""
The following function will insert one tag in the
tags collections
:param data:
:return:
"""
return self.insert_one(data)
def insert_many_tags(self, data):
"""
The following function will insert many tags in the
tags collection
:param data:
:return:
"""
return self.insert_many(data)
def update_one_tag(self, data, upsert=False, **query):
"""
The following function will update one tag in
tags collection based on the given query
:param data:
:param upsert:
:param query:
:return:
"""
return self.update_one(data=data, upsert=upsert, query=query)
def update_many_tags(self, data, query, upsert=False):
"""
The following function will update many tags in
tags collection based on the given query
:param data:
:param upsert:
:param query:
:return:
"""
return self.update_many(data=data, upsert=upsert, query=query)
def delete_many_tags(self, **query):
"""
The following function will delete many tag in
tags collection based on the given query
:param query:
:return:
"""
return self.delete_many(query=query)
def delete_one_tag(self, **query):
"""
The following function will delete one tag in
tags collection based on the given query
:param query:
:return:
"""
if query:
return self.delete_one(query=query)
else:
return False
def distinct_tag(self, query_key):
"""
Get a list of distinct values for `key` among all documents
in the result set of this query.
:param query_key:
:return:
"""
return self.distinct(query_key=query_key)
def find_tags_with_list(self, tag_list):
query = {self.key_tag_name: {"$in": tag_list}}
filter_dict = dict(tag_name=1, _id=0)
tags = self.find(query=query, filter_dict=filter_dict)
if not tags:
return list()
return list(tags)
def find_tags_by_ids(self, tag_category_id, tag_list):
query = {"$or": [{self.key_tag_category_id: tag_category_id}, {self.key_id: {"$in": tag_list}}]}
tags = self.find(query=query)
if not tags:
return list()
return list(tags)
def find_tag_by_id(self, tag_id):
query = {self.key_id: tag_id}
res = self.find_one(query=query)
if not res:
return TagsSchema(**{})
return TagsSchema(**res)
def find_tags_by_aggregate(self, query):
tags = self.aggregate(query)
if not tags:
return list()
return list(tags)
def update_many_tags_by_group(self, selected_tags, tag_group_id):
data = {self.key_tag_group_id: tag_group_id}
query = {self.key_id: {"$in": selected_tags}}
self.update_many(query=query, data=data)
def find_by_aggregate(self, query):
tags = self.aggregate(query)
if not tags:
return list()
return list(tags)
def get_tag_name(self, tag_id):
query = [
{
'$match': {
'id': tag_id
}
}, {
'$project': {
'_id': 0,
'tag_name': '$tag_name',
'tag_id': '$tag_id'
}
}
]
tags = self.aggregate(query)
tags = [x for x in tags]
if not tags:
return ""
return tags[0]["tag_name"]
......@@ -18,6 +18,7 @@ class ErrorCodes:
ERR005 = "ERR005 - Batch Start time not supplied"
ERR006 = "ERR006 - Total Units Tag not found!!"
ERR007 = "ERR007 - Reject Units Tag not found!!"
ERR008 = "ERR007 - Cycle Time Tag not found!!"
class UnknownError(Exception):
......
......@@ -5,11 +5,10 @@ from logging.handlers import RotatingFileHandler, SocketHandler
import yaml
# this method is to read the configuration from backup.conf
from scripts.config import Logging
# this method is to read the configuration from backup.conf
def read_configuration(file_name):
"""
:param file_name:
......
......@@ -86,6 +86,7 @@ class ChartDBResponse(BaseModel):
productive_time: int
downtime: int
tz: Optional[str] = "Asia/Kolkata"
uom: Optional[str] = "mins"
@validator("*")
def round_float(cls, v):
......
......@@ -34,7 +34,7 @@ def default_group_by(group_by_dict):
raise DataFrameFormationError(ErrorMessages.DF_ERROR2)
def form_df(metric, tags, values, df_index, tz, group_by_tags, grouping_order, group_by):
def form_df(metric, tags, values, df_index, tz, group_by_tags, grouping_order, group_by, df_diff):
"""Helper function for parsing Kairos data and forming dataframe for each results"""
df = pd.DataFrame(columns=["timestamp", "value"])
try:
......@@ -53,6 +53,7 @@ def form_df(metric, tags, values, df_index, tz, group_by_tags, grouping_order, g
df[df_index] = df[df_index].dt.tz_localize('UTC').dt.tz_convert(timezone)
df.set_index(df_index, inplace=True)
df.sort_index(inplace=True)
if df_diff:
df[f'{selected_hierarchy}_diff'] = df[selected_hierarchy].diff()
except Exception as e:
logging.exception(e)
......@@ -72,7 +73,7 @@ def get_statistics(df, metadata):
def create_kairos_df(response_data, tags_list: List, group_by_tags: List, master_df=None, df_index='timestamp',
tz='Asia/kolkata'):
tz='Asia/kolkata', df_diff=True):
"""
Definition for creating kairos DataFrame
"""
......@@ -92,7 +93,8 @@ def create_kairos_df(response_data, tags_list: List, group_by_tags: List, master
df_index=df_index,
tz=tz,
group_by_tags=group_by_tags,
grouping_order=tags_list
grouping_order=tags_list,
df_diff=df_diff
)
df = df[~df.index.duplicated()]
master_df = master_df.combine_first(df)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment