Commit 49bd76d4 authored by shreya.m's avatar shreya.m

Merge branch 'develop' of...

Merge branch 'develop' of https://gitlab-pm.knowledgelens.com/Harshavardhan.C/oee-services into dev_shreya

 Conflicts:
	scripts/constants/__init__.py
	scripts/core/handlers/oee_handlers.py
	scripts/utils/common_utils.py
parents 3288814e f85e10e2
...@@ -52,4 +52,11 @@ topic=$KAFKA_TOPIC ...@@ -52,4 +52,11 @@ topic=$KAFKA_TOPIC
enable_sites_partition=$ENABLE_KAFKA_PARTITION enable_sites_partition=$ENABLE_KAFKA_PARTITION
split_key=$KAFKA_PARTITION_KEY split_key=$KAFKA_PARTITION_KEY
round_robin_enable=$ROUND_ROBIN_PARTITION round_robin_enable=$ROUND_ROBIN_PARTITION
partition_db=13 partition_db=13
\ No newline at end of file
[KAIROS_DB]
uri = $KAIROS_URI
default_full_tag_level = c3
default_tag_level = c5
default_site_tag_level = c1
kairos_cache_time=$KAIROS_CACHE_TIME
\ No newline at end of file
...@@ -7,6 +7,7 @@ from fastapi.middleware.cors import CORSMiddleware ...@@ -7,6 +7,7 @@ from fastapi.middleware.cors import CORSMiddleware
from scripts.db.psql.create_default_tables import create_default_psql_dependencies from scripts.db.psql.create_default_tables import create_default_psql_dependencies
from scripts.services import route from scripts.services import route
from scripts.services.tag_list_services import tag_service_router
from scripts.utils.security_utils.decorators import CookieAuthentication from scripts.utils.security_utils.decorators import CookieAuthentication
auth = CookieAuthentication() auth = CookieAuthentication()
...@@ -43,6 +44,7 @@ if os.environ.get("ENABLE_CORS") in (True, "true", "True") and os.environ.get( ...@@ -43,6 +44,7 @@ if os.environ.get("ENABLE_CORS") in (True, "true", "True") and os.environ.get(
print(os.environ.get("CORS_URLS").split(",")) print(os.environ.get("CORS_URLS").split(","))
app.include_router(route) app.include_router(route)
app.include_router(tag_service_router)
@app.on_event("startup") @app.on_event("startup")
......
...@@ -25,4 +25,6 @@ paho-mqtt~=1.5.0 ...@@ -25,4 +25,6 @@ paho-mqtt~=1.5.0
pendulum~=2.1.2 pendulum~=2.1.2
requests~=2.27.1 requests~=2.27.1
aiohttp~=3.8.1 aiohttp~=3.8.1
numpy==1.22.3 numpy==1.22.3
\ No newline at end of file python-dateutil~=2.8.2
configparser~=3.5.3
\ No newline at end of file
...@@ -52,6 +52,11 @@ class DBConf: ...@@ -52,6 +52,11 @@ class DBConf:
if not MONGO_URI: if not MONGO_URI:
print("Error, environment variable MONGO_URI not set") print("Error, environment variable MONGO_URI not set")
sys.exit(1) sys.exit(1)
KAIROS_URL = config.get("KAIROS_DB", "uri")
KAIROS_DEFAULT_FULL_TAG = config.get("KAIROS_DB", "default_full_tag_level")
KAIROS_DEFAULT_SITE_TAG = config.get("KAIROS_DB", "default_site_tag_level")
KAIROS_DEFAULT_TAG = config.get("KAIROS_DB", "default_tag_level")
KAIROS_CACHE_TIME = config.getint("KAIROS_DB", "kairos_cache_time")
class PathToStorage(object): class PathToStorage(object):
......
...@@ -20,15 +20,18 @@ class Endpoints: ...@@ -20,15 +20,18 @@ class Endpoints:
hierarchy_api = 'ilens_config/get_site_level_hierarchy' hierarchy_api = 'ilens_config/get_site_level_hierarchy'
# OEE SERVICES # OEE SERVICES
api_oee_services = "/oee_services" oee_services = "/oee"
api_save_tag = "/save" oee_services = "/oee_post"
api_delete_tag = "/delete"
api_update_tag = "/update" # tag_lists
api_get_tag = "/get" api_tags_lists = "/tag_lists"
api_tags = "/tags"
class StatusCodes: class StatusCodes:
SUCCESS = [200, 201, 204] SUCCESS = "success"
FAILED = "failed"
SUCCESS_CODES = [200, 201, 204]
class ResponseCodes: class ResponseCodes:
...@@ -67,14 +70,23 @@ class DBConstants: ...@@ -67,14 +70,23 @@ class DBConstants:
db_ilens_assistant = "ilens_assistant" db_ilens_assistant = "ilens_assistant"
collection_constants = "constants" collection_constants = "constants"
collection_lookup_table = "lookup_table"
collection_tag_hierarchy = "tag_hierarchy" collection_tag_hierarchy = "tag_hierarchy"
class EndpointConstants:
hierarchy_downtime = "/downtime_log/get/overall_downtime"
class TagCategoryConstants:
TOTAL_UNITS_CATEGORY = os.environ.get("TOTAL_UNITS_CATEGORY", default="OEE - Total Produced Units")
REJECT_UNITS_CATEGORY = os.environ.get("TOTAL_UNITS_CATEGORY", default="OEE - Reject Units")
class CommonConstants: class CommonConstants:
ui = 'ui_datetime_format' ui = 'ui_datetime_format'
utc = 'utc_datetime_format' utc = 'utc_datetime_format'
nsc = 'no_special_chars_datetime_format' nsc = 'no_special_chars_datetime_format'
USER_META_TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
umtf = 'user_meta_time_format' umtf = 'user_meta_time_format'
__temporary_format__ = '%Y-%m-%dT%H:%M:%S+0530' __temporary_format__ = '%Y-%m-%dT%H:%M:%S+0530'
__iso_format__ = '%Y-%m-%dT%H:%M:%S%z' __iso_format__ = '%Y-%m-%dT%H:%M:%S%z'
...@@ -89,14 +101,6 @@ class CommonConstants: ...@@ -89,14 +101,6 @@ class CommonConstants:
__time_format__ = "%H:%M:%S" __time_format__ = "%H:%M:%S"
class OEETagMappingKeys:
OEE_GOOD_COUNT_TAG = os.environ.get("OEE_GOOD_TAG", default="good_count_tag")
OEE_INSPECTION_TAG = os.environ.get("OEE_INSPECTION_TAG", default="oee_inspection_tag")
OEE_MANUAL_MODE_TAG = os.environ.get("OEE_INSPECTION_TAG", default="oee_manual_mode_tag")
OEE_AUTO_MODE_TAG = os.environ.get("OEE_INSPECTION_TAG", default="oee_auto_mode_tag")
OEE_NG_TAG = os.environ.get("OEE_NG_TAG", default="oee_ng_tag")
class CommonStatusCode: class CommonStatusCode:
SUCCESS_CODES = ( SUCCESS_CODES = (
200, 200,
......
...@@ -3,7 +3,6 @@ from copy import deepcopy ...@@ -3,7 +3,6 @@ from copy import deepcopy
from scripts.constants.db_connections import mongo_client from scripts.constants.db_connections import mongo_client
from scripts.constants.ui_constants import DonutChart, BaseWaterChart from scripts.constants.ui_constants import DonutChart, BaseWaterChart
from scripts.db.mongo.ilens_configuration.collections.constants import Constants from scripts.db.mongo.ilens_configuration.collections.constants import Constants
from scripts.schemas.batch_oee import WaterFallChart from scripts.schemas.batch_oee import WaterFallChart
...@@ -11,10 +10,10 @@ class ChartMaker: ...@@ -11,10 +10,10 @@ class ChartMaker:
def __init__(self, project_id=None): def __init__(self, project_id=None):
self.constants_con = Constants(mongo_client=mongo_client) self.constants_con = Constants(mongo_client=mongo_client)
def main_creator(self, data, activity_length, overall=True): def main_creator(self, data, overall=True):
chart_data = dict() chart_data = dict()
chart_data["waterfall"] = self.waterfall_chart(WaterFallChart(**data)) chart_data["waterfall"] = self.waterfall_chart(WaterFallChart(**data))
return self.donut_chart(data, chart_data, activity_length, overall) return self.donut_chart(data, chart_data, overall)
@staticmethod @staticmethod
def waterfall_chart(data: WaterFallChart): def waterfall_chart(data: WaterFallChart):
...@@ -40,7 +39,7 @@ class ChartMaker: ...@@ -40,7 +39,7 @@ class ChartMaker:
return waterfall_base["data"] return waterfall_base["data"]
@staticmethod @staticmethod
def donut_chart(data, chart_data, activity_length, overall: bool): def donut_chart(data, chart_data, overall: bool):
base = DonutChart.base base = DonutChart.base
chart_keys = DonutChart.chart_keys chart_keys = DonutChart.chart_keys
for each in chart_keys: for each in chart_keys:
...@@ -65,7 +64,7 @@ class ChartMaker: ...@@ -65,7 +64,7 @@ class ChartMaker:
for k, v in data.items(): for k, v in data.items():
if k not in do_not_display: if k not in do_not_display:
if k in ["total_units", "good_units"] and not overall: if k in ["total_units", "good_units"] and not overall:
v = round(v - activity_length, 2) v = round(v, 2)
chart_data.update( chart_data.update(
{ {
k: dict(label=k.capitalize().replace("_", " "), value=v) k: dict(label=k.capitalize().replace("_", " "), value=v)
......
from datetime import datetime from datetime import datetime
from scripts.constants import UOM import pendulum
import pytz
from scripts.constants import UOM, TagCategoryConstants
from scripts.errors import ErrorCodes from scripts.errors import ErrorCodes
from scripts.logging import logger from scripts.logging import logger
from scripts.schemas.batch_oee import OEEDataSaveRequest, BatchOEEData from scripts.schemas.batch_oee import OEEDataSaveRequest, BatchOEEData
...@@ -87,6 +90,22 @@ class OEELossesCalculator: ...@@ -87,6 +90,22 @@ class OEELossesCalculator:
return 100 - availability_loss - quality_loss - oee_percentage return 100 - availability_loss - quality_loss - oee_percentage
class OEETagFinder:
@staticmethod
def get_total_units_tag_id(input_data: dict, category_name=TagCategoryConstants.TOTAL_UNITS_CATEGORY):
if not input_data.get(category_name):
logger.error(ErrorCodes.ERR006)
raise ValueError(ErrorCodes.ERR006)
return input_data.get(category_name)
@staticmethod
def get_reject_units_tag_id(input_data: dict, category_name=TagCategoryConstants.REJECT_UNITS_CATEGORY):
if not input_data.get(category_name):
logger.error(ErrorCodes.ERR007)
raise ValueError(ErrorCodes.ERR007)
return input_data.get(category_name)
class OEEEngine: class OEEEngine:
def __init__(self): def __init__(self):
self.oee_calc = OEECalculator() self.oee_calc = OEECalculator()
...@@ -103,28 +122,35 @@ class OEEEngine: ...@@ -103,28 +122,35 @@ class OEEEngine:
# Start and End time should be in milliseconds since epoch. # Start and End time should be in milliseconds since epoch.
if request_data.uom == UOM.minutes: if request_data.uom == UOM.minutes:
divisor = UOM.time_divs.minutes divisor = UOM.time_divs.minutes
cal_type = "minutes"
elif request_data.uom == UOM.seconds: elif request_data.uom == UOM.seconds:
divisor = UOM.time_divs.seconds divisor = UOM.time_divs.seconds
cal_type = "seconds"
elif request_data.uom == UOM.hours: elif request_data.uom == UOM.hours:
divisor = UOM.time_divs.hours divisor = UOM.time_divs.hours
cal_type = "hours"
elif request_data.uom == UOM.millis: elif request_data.uom == UOM.millis:
divisor = UOM.time_divs.millis divisor = UOM.time_divs.millis
cal_type = "microseconds"
else: else:
divisor = UOM.time_divs.minutes divisor = UOM.time_divs.minutes
cal_type = "minutes"
planned_production_time = self.common_util.get_duration(tz=request_data.tz, meta=request_data.dict(), duration = self.common_util.get_duration(tz=request_data.tz, meta=request_data.dict(),
difference=True) / divisor difference=True)
planned_production_time = self.get_updated_planned_production_time(
operating_time = planned_production_time - request_data.downtime input_time=duration, return_type=cal_type)
# operating time is production time
production_time = planned_production_time - request_data.downtime
availability = self.oee_calc.calculate_availability( availability = self.oee_calc.calculate_availability(
operating_time=operating_time, operating_time=production_time,
planned_prod_time=planned_production_time, planned_prod_time=planned_production_time,
) )
performance = self.oee_calc.calculate_performance( performance = self.oee_calc.calculate_performance(
units_produced=request_data.total_units, units_produced=request_data.total_units,
operating_time=operating_time, operating_time=production_time,
cycle_time=request_data.cycle_time, cycle_time=request_data.cycle_time,
) )
...@@ -178,7 +204,7 @@ class OEEEngine: ...@@ -178,7 +204,7 @@ class OEEEngine:
batch_oee = BatchOEEData( batch_oee = BatchOEEData(
**request_data.dict(), **request_data.dict(),
calculated_on=datetime.now().astimezone(tz=request_data.tz).isoformat(), calculated_on=datetime.now().astimezone(tz=pytz.timezone(request_data.tz)).isoformat(),
productive_time=productive_time, productive_time=productive_time,
availability=availability * 100, availability=availability * 100,
performance=performance * 100, performance=performance * 100,
...@@ -190,5 +216,19 @@ class OEEEngine: ...@@ -190,5 +216,19 @@ class OEEEngine:
) )
return batch_oee return batch_oee
except Exception: except Exception as e:
logger.exception(f"Exception occurred while calculating batch oee {e.args}")
raise raise
@staticmethod
def get_updated_planned_production_time(input_time: pendulum.Duration, return_type):
if return_type == "minutes":
return input_time.in_minutes()
elif return_type == "seconds":
return input_time.in_seconds()
elif return_type == "hours":
return input_time.in_hours()
elif return_type == "microseconds":
return input_time.total_seconds()
else:
return input_time.in_minutes()
import json import json
from datetime import datetime from datetime import datetime
import pandas as pd
import pytz import pytz
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from scripts.constants import ResponseCodes from scripts.config import DBConf
from scripts.core.engine.oee_calculator import OEEEngine from scripts.constants import ResponseCodes, CommonConstants, TagCategoryConstants
from scripts.core.engine.oee_calculator import OEEEngine, OEETagFinder
from scripts.core.handlers.tag_handler import TagHierarchyHandler
from scripts.db.psql.oee_discrete import DiscreteOEE from scripts.db.psql.oee_discrete import DiscreteOEE
from scripts.db.redis_connections import oee_production_db from scripts.db.redis_connections import oee_production_db
from scripts.errors import DataNotFound
from scripts.logging import logger from scripts.logging import logger
from scripts.schemas.batch_oee import OEEDataInsertRequest, BatchOEEData, OEEDataSaveRequest from scripts.schemas.batch_oee import OEEDataInsertRequest, BatchOEEData, OEEDataSaveRequest
from scripts.schemas.response_models import DefaultResponse from scripts.schemas.response_models import DefaultResponse
from scripts.utils.common_utils import CommonUtils from scripts.utils.common_utils import CommonUtils
from scripts.utils.kairos_db_util import BaseQuery
from scripts.utils.kairos_db_util.df_formation_util import create_kairos_df
from scripts.utils.kairos_db_util.query_kairos import KairosQuery
oee_engine = OEEEngine() oee_engine = OEEEngine()
...@@ -20,6 +27,9 @@ class CalculateBatchOEEHandler: ...@@ -20,6 +27,9 @@ class CalculateBatchOEEHandler:
def __init__(self, project_id=None): def __init__(self, project_id=None):
self.common_util = CommonUtils() self.common_util = CommonUtils()
self.base_query = BaseQuery()
self.oee_tag_finder = OEETagFinder()
self.tag_hierarchy_handler = TagHierarchyHandler(project_id=project_id)
def calculate_oee(self, db, request_data: OEEDataInsertRequest): def calculate_oee(self, db, request_data: OEEDataInsertRequest):
table_obj = DiscreteOEE(db=db) table_obj = DiscreteOEE(db=db)
...@@ -27,12 +37,20 @@ class CalculateBatchOEEHandler: ...@@ -27,12 +37,20 @@ class CalculateBatchOEEHandler:
record_presence = table_obj.get_oee_data_by_reference_id(reference_id=request_data.reference_id, record_presence = table_obj.get_oee_data_by_reference_id(reference_id=request_data.reference_id,
hierarchy=request_data.hierarchy, hierarchy=request_data.hierarchy,
project_id=request_data.project_id) project_id=request_data.project_id)
request_data.total_units, request_data.reject_units = self.get_data_for_tags(input_data=request_data)
redis_key = f"{request_data.project_id}${request_data.reference_id}" redis_key = f"{request_data.project_id}${request_data.reference_id}"
if not record_presence: if not record_presence:
if not request_data.prod_start_time: if not request_data.prod_start_time:
request_data.prod_start_time = datetime.now().astimezone( request_data.prod_start_time = datetime.now().astimezone(
tz=pytz.timezone(request_data.tz)).isoformat() tz=pytz.timezone(request_data.tz)).isoformat()
else:
request_data.prod_start_time = datetime.strptime(request_data.prod_start_time,
CommonConstants.USER_META_TIME_FORMAT).astimezone(
tz=pytz.timezone(request_data.tz)).isoformat()
if request_data.prod_end_time:
request_data.prod_end_time = datetime.strptime(request_data.prod_end_time,
CommonConstants.USER_META_TIME_FORMAT).astimezone(
tz=pytz.timezone(request_data.tz)).isoformat()
request_data = OEEDataSaveRequest(**request_data.dict(exclude_none=True)) request_data = OEEDataSaveRequest(**request_data.dict(exclude_none=True))
request_data.downtime = self.common_util.get_downtime_details_by_hierarchy( request_data.downtime = self.common_util.get_downtime_details_by_hierarchy(
hierarchy=request_data.hierarchy, project_id=request_data.project_id) hierarchy=request_data.hierarchy, project_id=request_data.project_id)
...@@ -95,3 +113,45 @@ class CalculateBatchOEEHandler: ...@@ -95,3 +113,45 @@ class CalculateBatchOEEHandler:
return True return True
except Exception as e: except Exception as e:
raise e raise e
def get_data_for_tags(self, input_data: OEEDataInsertRequest):
total_units_value = 0
reject_units_value = 0
try:
start_epoch = int(
datetime.strptime(input_data.prod_start_time, CommonConstants.USER_META_TIME_FORMAT).astimezone(
tz=pytz.timezone(input_data.tz)).timestamp()) * 1000
end_epoch = int(
datetime.strptime(input_data.prod_end_time, CommonConstants.USER_META_TIME_FORMAT).astimezone(
tz=pytz.timezone(input_data.tz)).timestamp()) * 1000
# hierarchy_tags = self.tag_hierarchy_handler.get_tags_list_by_hierarchy(GetTagsLists(**input_data.dict()))
hierarchy_tags = {TagCategoryConstants.TOTAL_UNITS_CATEGORY: "site_114$line_1306$equipment_5812$tag_100",
TagCategoryConstants.REJECT_UNITS_CATEGORY: "site_114$line_1306$equipment_5812$tag_60538"}
total_units_tag_id = self.oee_tag_finder.get_total_units_tag_id(input_data=hierarchy_tags)
reject_units_tag_id = self.oee_tag_finder.get_reject_units_tag_id(input_data=hierarchy_tags)
kairos_util = KairosQuery(url=DBConf.KAIROS_URL)
data = kairos_util.query(
self.base_query.form_generic_query(tags_list=[total_units_tag_id, reject_units_tag_id],
project_id=input_data.project_id,
start_epoch=start_epoch, end_epoch=end_epoch))
master_df = pd.DataFrame()
data = [data] if not isinstance(data, list) else data
for each_data in data:
master_df = create_kairos_df(
master_df=master_df,
response_data=each_data,
tags_list=[total_units_tag_id, reject_units_tag_id],
group_by_tags=[total_units_tag_id, reject_units_tag_id, DBConf.KAIROS_DEFAULT_FULL_TAG],
tz=input_data.tz
)
if master_df.empty:
raise DataNotFound
master_df_columns = list(master_df.columns)
if f'{total_units_tag_id}_diff' not in master_df_columns:
return total_units_value, reject_units_value
total_units_value = master_df[f'{total_units_tag_id}_diff'].sum()
if f'{reject_units_tag_id}_diff' in master_df_columns:
reject_units_value = master_df[f'{reject_units_tag_id}_diff'].sum()
return total_units_value, reject_units_value
except Exception as e:
raise
...@@ -2,9 +2,11 @@ import traceback ...@@ -2,9 +2,11 @@ import traceback
from scripts.db.mongo.ilens_assistant.collection.tag_instance_data import TagInstanceData from scripts.db.mongo.ilens_assistant.collection.tag_instance_data import TagInstanceData
from scripts.schemas.batch_oee import GetOeeServices from scripts.schemas.batch_oee import GetOeeServices
from scripts.constants.db_connections import mongo_client
from scripts.db.mongo.ilens_configuration.collections import collection_constants
from scripts.logging import logger from scripts.logging import logger
from scripts.utils.common_utils import CommonUtils from scripts.utils.common_utils import CommonUtils
from scripts.utils.mongo_util import MongoCollectionBaseClass
from scripts.schemas.oee_config_schema import Oee_Tag_Mapping_List, Get_Oee_Tag, Update_Oee_Tags, Get_Project_Id from scripts.schemas.oee_config_schema import Oee_Tag_Mapping_List, Get_Oee_Tag, Update_Oee_Tags, Get_Project_Id
from scripts.constants import CommonKeys from scripts.constants import CommonKeys
from scripts.db.mongo.ilens_configuration import database from scripts.db.mongo.ilens_configuration import database
......
from scripts.constants.db_connections import mongo_client
from scripts.db.mongo.ilens_configuration.aggregations.tag_hierarchy import TagHierarchyAggregate
from scripts.db.mongo.ilens_configuration.collections.tag_hierarchy import TagHierarchy
from scripts.db.mongo.schema.tag_hierarchy import GetTagsLists
class TagHierarchyHandler:
def __init__(self, project_id=None):
self.tag_hierarchy_conn = TagHierarchy(mongo_client=mongo_client, project_id=project_id)
def get_tags_list_by_hierarchy(self, input_data: GetTagsLists):
try:
aggregate_query = TagHierarchyAggregate.tag_aggregate(project_id=input_data.project_id,
hierarchy_id=input_data.hierarchy)
tag_lists = self.tag_hierarchy_conn.get_tag_hierarchy_by_aggregate(query=aggregate_query)
tag_lists = tag_lists[0] if tag_lists else {}
return tag_lists
except Exception:
raise
...@@ -12,8 +12,7 @@ class OEEDiscreteTable(Base): ...@@ -12,8 +12,7 @@ class OEEDiscreteTable(Base):
reference_id = Column(String, nullable=True, unique=True) reference_id = Column(String, nullable=True, unique=True)
prod_start_time = Column(TIMESTAMP(timezone=True), nullable=False) prod_start_time = Column(TIMESTAMP(timezone=True), nullable=False)
prod_end_time = Column(TIMESTAMP(timezone=True), nullable=True) prod_end_time = Column(TIMESTAMP(timezone=True), nullable=True)
total_downtime = Column(Float, default=0) downtime = Column(Float, default=0)
planned_units = Column(Float, nullable=False)
total_units = Column(Float) total_units = Column(Float)
reject_units = Column(Float, default=0) reject_units = Column(Float, default=0)
cycle_time = Column(Float) cycle_time = Column(Float)
......
class TagHierarchyAggregate:
@staticmethod
def tag_aggregate(project_id, hierarchy_id):
query = [
{
'$match': {
'project_id': project_id,
'id': {
'$regex': f'{hierarchy_id}\\$tag'
}
}
}, {
'$lookup': {
'from': 'tag_category',
'localField': 'tag_category_id',
'foreignField': 'tag_category_id',
'as': 'tag_category'
}
}, {
'$unwind': {
'path': '$tag_category'
}
}, {
'$group': {
'_id': None,
'data': {
'$push': {
'k': '$tag_category.tag_category_name',
'v': '$id'
}
}
}
}, {
'$replaceRoot': {
'newRoot': {
'$arrayToObject': '$data'
}
}
}
]
return query
from pydantic import BaseModel
class GetTagsLists(BaseModel):
project_id: str
hierarchy: str
...@@ -2,6 +2,12 @@ class ErrorMessages: ...@@ -2,6 +2,12 @@ class ErrorMessages:
UNKNOWN = "Unknown Error occurred" UNKNOWN = "Unknown Error occurred"
ERR001 = "Configurations not available, please verify the database." ERR001 = "Configurations not available, please verify the database."
ERR002 = "Data Not Found" ERR002 = "Data Not Found"
K_ERROR1 = "Data Not Found in Time series Database"
K_ERROR2 = "Time series Database returned with an error"
K_ERROR3 = "Communication Error with Time series Database"
DF_ERROR1 = "Error occurred while forming Dataframe"
DF_ERROR2 = "Given group-by parameters are invalid"
META_ERROR1 = "Tags not Found in Meta"
class ErrorCodes: class ErrorCodes:
...@@ -10,6 +16,8 @@ class ErrorCodes: ...@@ -10,6 +16,8 @@ class ErrorCodes:
ERR003 = "ERR003 - Operating Time is less than Productive Time" ERR003 = "ERR003 - Operating Time is less than Productive Time"
ERR004 = "ERR004 - Rejected Units is greater than Total Units" ERR004 = "ERR004 - Rejected Units is greater than Total Units"
ERR005 = "ERR005 - Batch Start time not supplied" ERR005 = "ERR005 - Batch Start time not supplied"
ERR006 = "ERR006 - Total Units Tag not found!!"
ERR007 = "ERR007 - Reject Units Tag not found!!"
class UnknownError(Exception): class UnknownError(Exception):
...@@ -64,3 +72,11 @@ class PathNotExistsException(Exception): ...@@ -64,3 +72,11 @@ class PathNotExistsException(Exception):
class ImplementationError(Exception): class ImplementationError(Exception):
pass pass
class ILensErrors(Exception):
"""Generic iLens Error"""
class DataFrameFormationError(ILensErrors):
"""Raise when there is an error during dataframe formation"""
...@@ -2,10 +2,9 @@ from typing import Optional, Union, List ...@@ -2,10 +2,9 @@ from typing import Optional, Union, List
from pydantic import BaseModel, validator from pydantic import BaseModel, validator
from scripts.utils.common_utils import CommonUtils
# from scripts.utils.common_utils import CommonUtils common_utils = CommonUtils()
#
# common_utils = CommonUtils()
class GetProducts(BaseModel): class GetProducts(BaseModel):
...@@ -96,11 +95,11 @@ class ProductInfo(BaseModel): ...@@ -96,11 +95,11 @@ class ProductInfo(BaseModel):
class OEEDataInsertRequest(BaseModel): class OEEDataInsertRequest(BaseModel):
prod_start_time: Optional[str] prod_start_time: str
prod_end_time: Optional[str] prod_end_time: str
prod_status: Optional[str] = "running" prod_status: Optional[str] = "running"
downtime: Optional[Union[float, int]] downtime: Optional[Union[float, int]]
hierarchy: Optional[str] hierarchy: str
reference_id: Optional[str] reference_id: Optional[str]
setup_time: Optional[Union[float, int]] setup_time: Optional[Union[float, int]]
cycle_time: Optional[Union[float, int]] cycle_time: Optional[Union[float, int]]
...@@ -109,7 +108,7 @@ class OEEDataInsertRequest(BaseModel): ...@@ -109,7 +108,7 @@ class OEEDataInsertRequest(BaseModel):
tz: Optional[str] = 'Asia/Kolkata' tz: Optional[str] = 'Asia/Kolkata'
project_id: str project_id: str
trigger_by: Optional[str] trigger_by: Optional[str]
uom: Optional[str] uom: Optional[str] = "mins"
class Config: class Config:
schema_extra = { schema_extra = {
...@@ -120,29 +119,30 @@ class OEEDataInsertRequest(BaseModel): ...@@ -120,29 +119,30 @@ class OEEDataInsertRequest(BaseModel):
"tz": "Asia/Kolkata", "tz": "Asia/Kolkata",
"project_id": "project_099", "project_id": "project_099",
"reference_id": "reference_id", "reference_id": "reference_id",
"downtime": "", "downtime": float(0),
"setup_time": "", "setup_time": float(0),
"cycle_time": "", "cycle_time": float(5),
"total_units": "", "total_units": float(0),
"reject_units": "", "reject_units": float(0),
"uom": "'" "uom": "mins"
} }
} }
# @validator('prod_start_time') @validator('prod_start_time')
# def date_format_validator_start_date(cls, v): def date_format_validator_start_date(cls, v):
# common_utils.check_date_format(v, "%Y-%m-%d %H:%M:%S") common_utils.check_date_format(v, "%Y-%m-%d %H:%M:%S")
# return v return v
#
# @validator('prod_end_time') @validator('prod_end_time')
# def date_format_validator_end_date(cls, v): def date_format_validator_end_date(cls, v):
# common_utils.check_date_format(v, "%Y-%m-%d %H:%M:%S") common_utils.check_date_format(v, "%Y-%m-%d %H:%M:%S")
# return v return v
class OEEDataSaveRequest(BaseModel): class OEEDataSaveRequest(BaseModel):
prod_start_time: Optional[str] prod_start_time: str
prod_end_time: Optional[str] prod_end_time: str
prod_status: Optional[str] = "running"
downtime: Optional[Union[float, int]] = 0 downtime: Optional[Union[float, int]] = 0
hierarchy: Optional[str] hierarchy: Optional[str]
reference_id: Optional[str] reference_id: Optional[str]
...@@ -152,38 +152,11 @@ class OEEDataSaveRequest(BaseModel): ...@@ -152,38 +152,11 @@ class OEEDataSaveRequest(BaseModel):
reject_units: Optional[Union[float, int]] = 0 reject_units: Optional[Union[float, int]] = 0
uom: Optional[str] = "mins" uom: Optional[str] = "mins"
tz: Optional[str] = 'Asia/Kolkata' tz: Optional[str] = 'Asia/Kolkata'
trigger_by: Optional[str]
project_id: str project_id: str
class Config:
schema_extra = {
"example": {
"hierarchy": "site_100$dept_100$line_100$equipment_100",
"prod_start_time": "2022-04-22 19:49:00",
"prod_end_time": "2022-04-22 19:49:00",
"tz": "Asia/Kolkata",
"project_id": "project_099",
"reference_id": "reference_id",
"downtime": "",
"setup_time": "",
"cycle_time": "",
"total_units": "",
"reject_units": "",
"uom": "'"
}
}
# @validator('prod_start_time')
# def date_format_validator_start_date(cls, v):
# common_utils.check_date_format(v, "%Y-%m-%d %H:%M:%S")
# return v
#
# @validator('prod_end_time')
# def date_format_validator_end_date(cls, v):
# common_utils.check_date_format(v, "%Y-%m-%d %H:%M:%S")
# return v
class BatchOEEData(OEEDataInsertRequest): class BatchOEEData(OEEDataSaveRequest):
calculated_on: str calculated_on: str
productive_time: float productive_time: float
availability: float availability: float
......
from fastapi import APIRouter
from scripts.constants import Endpoints, ResponseCodes
from scripts.core.handlers.tag_handler import TagHierarchyHandler
from scripts.db.mongo.schema.tag_hierarchy import GetTagsLists
from scripts.schemas.response_models import DefaultFailureResponse, DefaultResponse
tag_service_router = APIRouter(prefix=Endpoints.api_tags,tags=["Tags Services"])
@tag_service_router.post(Endpoints.api_tags_lists)
def get_tags_list(request_data: GetTagsLists):
try:
tags_list = TagHierarchyHandler()
data = tags_list.get_tags_list_by_hierarchy(request_data)
return DefaultResponse(
data=data,
status=ResponseCodes.SUCCESS,
message="Tags Categories fetched successfully",
)
except Exception as e:
return DefaultFailureResponse(error=e)
from datetime import datetime from datetime import datetime
import time import time
import pendulum import pendulum
import pytz
import shortuuid
from scripts.constants.db_connections import mongo_client from scripts.constants.db_connections import mongo_client
from scripts.config import PathToServices from scripts.config import PathToServices
from scripts.constants import Secrets, EndpointConstants
from scripts.db.redis_connections import project_details_db
from scripts.constants import Secrets, CommonKeys from scripts.constants import Secrets, CommonKeys
from scripts.logging import logger from scripts.logging import logger
from scripts.utils.auth_util import ILensRequest, AuthenticationError from scripts.utils.auth_util import ILensRequest, AuthenticationError
from scripts.utils.db_name_util import get_db_name
from scripts.utils.security_utils.apply_encrytion_util import create_token from scripts.utils.security_utils.apply_encrytion_util import create_token
...@@ -37,7 +43,7 @@ class CommonUtils: ...@@ -37,7 +43,7 @@ class CommonUtils:
diff = to_time - from_time diff = to_time - from_time
if difference: if difference:
return diff return diff
return f"{int(diff.in_hours())} hours {int(diff.minutes)} minutes" return f"{int(diff.in_hours())} hours {int(diff.in_hours())} minutes"
except Exception as e: except Exception as e:
logger.exception(f"Exception in getting data: {e}") logger.exception(f"Exception in getting data: {e}")
raise raise
...@@ -47,7 +53,7 @@ class CommonUtils: ...@@ -47,7 +53,7 @@ class CommonUtils:
project_id=project_id) project_id=project_id)
try: try:
cookies = {'login-token': self.create_token(user_id=user_id), "user_id": user_id} cookies = {'login-token': self.create_token(user_id=user_id), "user_id": user_id}
downtime_response = connection_obj.post(path="/downtime_log/get/overall_downtime", downtime_response = connection_obj.post(path=EndpointConstants.hierarchy_downtime,
json={"project_id": project_id, json={"project_id": project_id,
"hierarchy": hierarchy}) "hierarchy": hierarchy})
response = downtime_response.json() response = downtime_response.json()
...@@ -79,6 +85,39 @@ class CommonUtils: ...@@ -79,6 +85,39 @@ class CommonUtils:
logger.exception(str(e)) logger.exception(str(e))
raise raise
@staticmethod
def ts_from_format_as_timezone(_date, _format, tz):
date = datetime.strptime(_date, _format) \
.astimezone(pytz.timezone(tz)).timestamp() * 1000
return date
@staticmethod
def convert_to_timestamp(x, date_format=None) -> int:
if not isinstance(x, int):
if isinstance(x, str):
x = int(datetime.strptime(x, date_format).timestamp() * 1000)
else:
x = int(x.timestamp() * 1000)
return x
@staticmethod
def datetime_from_str(date_str, date_format, tz):
localized_tz = pytz.timezone(tz)
datetime_with_tz = datetime.strptime(f"{date_str}", date_format)
localized_dt = localized_tz.localize(datetime_with_tz)
return localized_dt
@staticmethod
def get_next_id(_=None) -> str:
return shortuuid.uuid()
@staticmethod
def metric_name_by_project(metric, project_id):
"""For DB splitting enabled projects"""
db_splitting_enabled = get_db_name(project_id=project_id,
database=metric, redis_client=project_details_db)
return db_splitting_enabled
def get_user_meta(self, user_id=None, check_flag=False): def get_user_meta(self, user_id=None, check_flag=False):
data_for_meta = {} data_for_meta = {}
if check_flag: if check_flag:
......
from copy import deepcopy
from dataclasses import dataclass
from typing import List
from scripts.config import DBConf
from scripts.utils.common_utils import CommonUtils
@dataclass
class KairosConstants:
kairos_std_metric_name: str = 'ilens.live_data.raw'
kairos_complete_tag_key = "complete_tag"
kairos_column_names: tuple = ("timestamp", "value",)
kairos_time_unit: str = "ms"
kairos_version_api: str = "/api/v1/version"
kairos_health_check: str = "/api/v1/health/status"
kairos_query_api: str = "/api/v1/datapoints/query"
kairos_write_data_api: str = "/api_v1/v1/datapoints"
kairos_time_keys = ("start_absolute", "end_absolute",)
kairos_empty_result = {
"queries": [
{
"sample_size": 0, "results":
[
{
"name": kairos_std_metric_name, "tags": {}, "values": []
}
]
}
]
}
kairos_query_levels = [{
"label": "Parameter",
"value": DBConf.KAIROS_DEFAULT_TAG
}, {
"label": "Complete Tag",
"value": DBConf.KAIROS_DEFAULT_FULL_TAG
}]
default_selection = [
{
"label": "Live Data",
"metric": "ilens.live_data.raw"
},
{
"label": "Error",
"metric": "ilens.live_data.error"
}
]
# note: this doesn't work perfectly for months (31 days) or years (365 days)
SECONDS_IN_UNIT = {
'milliseconds': 0.001,
'seconds': 1,
'minutes': 60,
'hours': 3600,
'days': 86400,
'weeks': 604800,
'months': 2678400,
'years': 31536000
}
# constants used in get_range_needed
FETCH_BEFORE = 'prepend'
FETCH_AFTER = 'append'
FETCH_ALL = 'overwrite'
class BaseQuery:
def __init__(self):
self.common_util = CommonUtils()
metric_query = {
"tags": {
f"{DBConf.KAIROS_DEFAULT_FULL_TAG}": [
"site$dept$equipment$tag"
]
},
"name": KairosConstants.kairos_std_metric_name,
"group_by": [
{
"name": "tag",
"tags": [
f"{DBConf.KAIROS_DEFAULT_FULL_TAG}"
]
}
],
"aggregators": [
{
"name": "last",
"sampling": {
"value": "1",
"unit": "hours"
},
"align_start_time": True
}
]
}
kairos_query = {
"metrics": [
],
"plugins": [
],
"cache_time": DBConf.KAIROS_CACHE_TIME,
"start_absolute": 0,
"end_absolute": 0
}
static_kairos_query = {"metrics": [
{
"tags": {
"c2": [
"dept_1238"
]
},
"name": KairosConstants.kairos_std_metric_name,
"group_by": [
{
"name": "tag",
"tags": [
"c2"
]
}
]
}
],
"plugins": [],
"cache_time": 0
}
site_status_query = {
"metrics": [
{
"tags": {
"c1": "sitelist" # parameter
},
"name": KairosConstants.kairos_std_metric_name,
"group_by": [
{
"name": "tag",
"tags": [
"c1"
]
}
],
"aggregators": "aggregators" # parameter
}
],
"plugins": [],
"cache_time": 0,
"start_relative": "start_relative" # parameter
}
def form_generic_query(self, tags_list: List, start_epoch: int, end_epoch: int, project_id: str,
metric_name=KairosConstants.kairos_std_metric_name, category=DBConf.KAIROS_DEFAULT_FULL_TAG):
metric_name = self.common_util.metric_name_by_project(metric=metric_name, project_id=project_id)
group_by_tags = deepcopy(tags_list)
group_by_tags.append(category)
query = {
"metrics": [
{
"tags": {
category: tags_list
},
"name": metric_name,
"group_by": [
{
"name": "tag",
"tags": group_by_tags
}
]
}
],
"plugins": [],
"cache_time": 0,
"start_absolute": start_epoch,
"end_absolute": end_epoch
}
return query
import logging
from typing import List
import pandas as pd
from scripts.config import DBConf
from scripts.errors import DataFrameFormationError, ErrorMessages
from . import KairosConstants
def get_group_by(group_by_dict, group_by_tag_list, grouping_order):
hierarchy_id = []
if DBConf.KAIROS_DEFAULT_FULL_TAG in group_by_tag_list:
return default_group_by(group_by_dict)
for tag in grouping_order:
for each_group in group_by_dict:
if each_group.get("name") != "tag": # Only group by tag is supported;
continue
all_groups = each_group.get("group", {})
if tag in group_by_tag_list and tag in all_groups:
hierarchy_id.append(all_groups[tag])
if not hierarchy_id:
raise DataFrameFormationError(ErrorMessages.DF_ERROR2)
return "$".join(hierarchy_id)
def default_group_by(group_by_dict):
for each_group in group_by_dict:
if each_group.get("name") != "tag": # Only group by tag is supported;
continue
all_groups = each_group.get("group", {})
if DBConf.KAIROS_DEFAULT_FULL_TAG in all_groups:
return all_groups[DBConf.KAIROS_DEFAULT_FULL_TAG]
raise DataFrameFormationError(ErrorMessages.DF_ERROR2)
def form_df(metric, tags, values, df_index, tz, group_by_tags, grouping_order, group_by):
"""Helper function for parsing Kairos data and forming dataframe for each results"""
df = pd.DataFrame(columns=["timestamp", "value"])
try:
selected_hierarchy = get_group_by(group_by, group_by_tags, grouping_order)
if not tags or not values:
new_column_name = selected_hierarchy
df.rename(columns={"value": new_column_name}, inplace=True)
df.set_index(df_index, inplace=True)
return df
df = pd.DataFrame(values, columns=KairosConstants().kairos_column_names)
if KairosConstants().kairos_std_metric_name in metric:
new_column_name = selected_hierarchy
df.rename(columns={"value": new_column_name}, inplace=True)
timezone = tz
df[df_index] = pd.to_datetime(df[df_index], unit=KairosConstants.kairos_time_unit)
df[df_index] = df[df_index].dt.tz_localize('UTC').dt.tz_convert(timezone)
df.set_index(df_index, inplace=True)
df.sort_index(inplace=True)
df[f'{selected_hierarchy}_diff'] = df[selected_hierarchy].diff()
except Exception as e:
logging.exception(e)
return df
def get_statistics(df, metadata):
try:
if metadata.enable_stats:
metadata.data_statistics = {
"min": df.min().round(2).to_dict(),
"max": df.max().round(2).to_dict(),
"avg": df.mean().round(2).to_dict()
}
except Exception as e:
logging.exception(e)
def create_kairos_df(response_data, tags_list: List, group_by_tags: List, master_df=None, df_index='timestamp',
tz='Asia/kolkata'):
"""
Definition for creating kairos DataFrame
"""
if master_df is None:
master_df = pd.DataFrame()
try:
for each_response in response_data.get("queries"):
if each_response["sample_size"] == 0:
continue
for each_result in each_response["results"]:
df = form_df(
metric=each_result.get("name"),
tags=each_result.get("tags"),
values=each_result.get("values"),
group_by=each_result.get("group_by"),
df_index=df_index,
tz=tz,
group_by_tags=group_by_tags,
grouping_order=tags_list
)
df = df[~df.index.duplicated()]
master_df = master_df.combine_first(df)
master_df = master_df.fillna(0)
return master_df
except Exception as e:
raise DataFrameFormationError(e)
import asyncio
import logging
import math
from copy import deepcopy
from datetime import datetime, timedelta
from pprint import pformat
from typing import Dict, Tuple
import httpx
from scripts.constants import StatusCodes
from scripts.errors import KairosDBError, ErrorMessages
from scripts.utils.common_utils import CommonUtils
from . import KairosConstants
kairos_const = KairosConstants()
httpx_timeout = httpx.Timeout(30, read=None)
class KairosQuery:
def __init__(self, url):
self.url = url
self.common_util = CommonUtils()
def query(self, query_data: dict) -> dict:
"""
An API Interface to query Kairos REST API with a standard query given as input
:param query_data: query json/dictionary as given in
documentation: https://kairosdb.github.io/docs/restapi/QueryMetrics.html#id3
:return: Dictionary with Data, if available
"""
try:
with httpx.Client() as client:
logging.debug(pformat(query_data))
r = client.post(
self.url + kairos_const.kairos_query_api,
json=query_data,
timeout=httpx_timeout
)
if r.status_code not in StatusCodes.SUCCESS_CODES:
logging.error(f"KAIROS RETURNED {r.status_code}")
raise KairosDBError
logging.debug(f"status code: {r.status_code}")
self.validate_results(r.json())
return r.json()
except Exception as e:
logging.exception(e)
raise
async def query_async(self, query_data: dict, client=None) -> dict:
"""
An API Interface to query Kairos REST API with a standard query given as input
:param client: httpx.AsyncClient
:param query_data: query json/dictionary as given in
documentation: https://kairosdb.github.io/docs/restapi/QueryMetrics.html#id3
:return: Dictionary with Data, if available
"""
try:
if client:
return await self.perform_query(query_data, client)
else:
async with httpx.AsyncClient() as client:
return await self.perform_query(query_data, client)
except Exception as e:
logging.exception(e)
return kairos_const.kairos_empty_result
async def perform_query(self, query_data: dict, client):
r = client.post(self.url + kairos_const.kairos_query_api, json=query_data,
timeout=httpx_timeout)
logging.debug(f"status code: {r.status_code}, elapsed time: {r.elapsed}")
self.validate_results(r.json())
return r.json()
@staticmethod
def validate_results(response) -> None:
"""
Validates the response from Kairos. If errors are found, it raises an Error.
"""
if not isinstance(response, dict):
raise KairosDBError(ErrorMessages.K_ERROR2)
if "errors" in response:
logging.error(f"Kairos returned with an error: {response.get('errors')}")
raise KairosDBError(ErrorMessages.K_ERROR2)
@staticmethod
def get_timedelta(value):
""" input has keys value, unit. common inputs noted start_relative, end_relative """
seconds = int(value['value']) * kairos_const.SECONDS_IN_UNIT[value['unit']]
return timedelta(seconds=seconds)
def get_needed_absolute_time_range(self, time_range, now=None):
"""
Create date-times from Kairos timestamp data.
:param time_range: dict, containing Kairos timestamp data: keys {start,end}_{relative,absolute}.
:param now: datetime.datetime, optional. set to remove drift in time during execution.
:return: 2-tuple (start, end), both datetime.datetime. end may be NoneType.
"""
if not now:
now = datetime.now()
if time_range.get('start_absolute'):
start = datetime.fromtimestamp(int(time_range['start_absolute']) / 1000)
else:
td = self.get_timedelta(time_range.get('start_relative'))
start = now - td
if time_range.get('end_absolute'):
end = datetime.fromtimestamp(int(time_range['end_absolute']) / 1000)
elif time_range.get('end_relative'):
td = self.get_timedelta(time_range.get('end_relative'))
end = now - td
else:
end = None
return start, end
def get_chunked_time_ranges(self, time_range):
"""
Given a long kairos range, return N timestamp pairs so we can parallelize COLD calls (new->old).
This implements up to second precision.
:param time_range: dict, generated by populate_time_range containing kairos-formatted keys.
:return: list of 2-tuples of datetime.datetime
"""
chunk_length = 3600 # 1 hour default
num_chunks = 10
now = datetime.now()
start_time, end_time = self.get_needed_absolute_time_range(time_range, now)
if not end_time:
end_time = now
start_time = start_time.replace(microsecond=0)
end_time = end_time.replace(microsecond=0)
elapsed_secs = (end_time - start_time).total_seconds()
if elapsed_secs <= chunk_length:
return (self.common_util.convert_to_timestamp(start_time), self.common_util.convert_to_timestamp(end_time)),
else:
# need to increase chunk length to fit into max chunks
if elapsed_secs > chunk_length * num_chunks:
chunk_length = int(elapsed_secs / num_chunks)
# need to downsize max chunks because we can use them of size chunk_length
else:
num_chunks = int(math.ceil(elapsed_secs / chunk_length))
chunks = []
length_td = timedelta(seconds=chunk_length)
# end_time is mutated below here
for _ in range(num_chunks):
start = end_time - length_td
# Sanity check: make sure we limit the earliest chunk if it's partial
if start < start_time:
start = start_time
# Add an offset so the chunks don't overlap.
start += timedelta(seconds=1)
chunks.append(
(self.common_util.convert_to_timestamp(start), self.common_util.convert_to_timestamp(end_time)))
end_time -= length_td
return chunks
@staticmethod
async def form_query(query: Dict, start_time: int, end_time: int):
"""
Helper function for forming a query after chunking
"""
__new_query__ = deepcopy(query)
__new_query__.update(
{
kairos_const.kairos_time_keys[0]: start_time,
kairos_const.kairos_time_keys[1]: end_time
}
)
return __new_query__
async def parallel_query(self, query) -> Tuple:
"""
A helper function that has built-in algorithm to split queries into a given number of chucks.
The number of chunks are configurable; some aggregations may not work well when using this.
The primary purpose of this function is to query large amount of data in parallel to reduce
query time and load.
"""
logging.debug(f"kairos query : , {query}")
time_range = {key: query[key] for key in kairos_const.kairos_time_keys}
time_chunks = self.get_chunked_time_ranges(time_range)
queries = asyncio.gather(*[
self.form_query(
query, start_time=i[0],
end_time=i[1]) for i in time_chunks
])
async with httpx.AsyncClient() as client:
results = await asyncio.gather(*[self.query_async(i, client) for i in queries])
return results
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment