Commit 21d7bc6c authored by harshavardhan.c's avatar harshavardhan.c

oee_automation_changes.

parent 1dd752e0
import pytz
from scripts.errors import DataNotFound
if __name__ == '__main__':
from dotenv import load_dotenv
load_dotenv()
import os
import time
from datetime import datetime, timedelta
from datetime import datetime
from scripts.constants import CommonConstants, TagCategoryConstants
from scripts.constants.db_connections import mongo_client
......@@ -35,7 +37,11 @@ class MachineOEECalculator:
def calculate_machine_oee(self, request_data: MachineOEERequest):
try:
hierarchy_dict = self.common_handler.get_valid_oee_monitoring_hierarchy(project_id=request_data.project_id)
now = datetime.today() - timedelta(days=1)
# now = datetime.today() - timedelta(days=1)
now = datetime.today()
start_timestamp = int(datetime.strptime(request_data.monitor_time, '%H:%M').replace(year=now.year,
month=now.month,
day=now.day).timestamp()) * 1000
oee_start_time = datetime.strptime(request_data.monitor_time, '%H:%M').replace(year=now.year,
month=now.month,
day=now.day).strftime(
......@@ -55,16 +61,28 @@ class MachineOEECalculator:
continue
cal_type = self.common_util.get_uom_type(uom_type=os.environ.get("DEFAULT_UOM_TYPE", default="mins"))
downtime = self.common_util.get_downtime_details_by_hierarchy(
hierarchy=k, project_id=request_data.project_id, uom_type=cal_type)
hierarchy=k, project_id=request_data.project_id, uom_type=cal_type,
filters={"start_time": [start_timestamp, curr_timestamp]})
if downtime is None:
logger.debug("Downtime for selected hierarchy got None, Updating to zero")
downtime = 0
input_data = OEEDataInsertRequest(prod_start_time=oee_start_time,
prod_end_time=oee_end_time, downtime=downtime,
hierarchy=k, cycle_time=cycle_time,
tz=request_data.tz,
project_id=request_data.project_id)
input_data.total_units, input_data.reject_units = self.batch_oee_handler.get_data_for_tags(
input_data=input_data)
oee_response: BatchOEEData = self.oee_engine.start_batch_oee_calc(
request_data=OEEDataSaveRequest(**input_data.dict()))
try:
input_data.total_units, input_data.reject_units = self.batch_oee_handler.get_data_for_tags(
input_data=input_data)
oee_response: BatchOEEData = self.oee_engine.start_batch_oee_calc(
request_data=OEEDataSaveRequest(**input_data.dict()))
except DataNotFound:
logger.exception(f"Data Not Found for selected Hierarchy --- {k}")
continue
except Exception as e:
logger.exception(
f"Exception Occurred while calculating oee {e.args}, skipping oee calculation for hierarchy - {k} ")
continue
data_dict = {
v[TagCategoryConstants.OEE_OUTPUT_TOTAL_UNITS_CATEGORY]: oee_response.total_units,
v[TagCategoryConstants.OEE_OUTPUT_REJECT_UNITS_CATEGORY]: oee_response.reject_units,
......
......@@ -56,6 +56,8 @@ class OEECalculator:
if rejected_units > total_units:
logger.error(ErrorCodes.ERR004)
raise ValueError(ErrorCodes.ERR004)
if not rejected_units and not total_units:
return 0
try:
return (total_units - rejected_units) / total_units
except ZeroDivisionError:
......
......@@ -25,7 +25,7 @@ class TagHierarchyHandler:
def get_output_tags_for_oee(self, input_data: OutputTagsList):
try:
if input_data.hierarchy_list:
hierarchy_str = re.escape("|".join([f'{_each}$tag' for _each in input_data.hierarchy_list]))
hierarchy_str = "|".join([re.escape(f'{_each}$tag') for _each in input_data.hierarchy_list])
elif input_data.hierarchy_level:
hierarchy_str = input_data.hierarchy_level
elif input_data.hierarchy:
......
......@@ -3,6 +3,7 @@ from typing import Optional, Union, List
from pydantic import BaseModel, validator
from scripts.constants import CommonConstants
from scripts.utils.common_utils import CommonUtils
common_utils = CommonUtils()
......@@ -133,7 +134,7 @@ class ProductInfo(BaseModel):
class OEEDataInsertRequest(BaseModel):
prod_start_time: str
prod_end_time: str
prod_end_time: Optional[str] = datetime.now().strftime(CommonConstants.USER_META_TIME_FORMAT)
prod_status: Optional[str] = "running"
downtime: Optional[Union[float, int]]
hierarchy: str
......
......@@ -44,15 +44,17 @@ class CommonUtils:
logger.exception(f"Exception in getting data: {e}")
raise
def get_downtime_details_by_hierarchy(self, hierarchy, project_id, user_id=None, uom_type="minutes"):
def get_downtime_details_by_hierarchy(self, hierarchy, project_id, user_id=None, uom_type="minutes",
filters: dict = None):
connection_obj = ILensRequest(url=PathToServices.downtime_proxy,
project_id=project_id)
try:
cookies = {'login-token': self.create_token(user_id=user_id), "user_id": user_id}
input_data = {"project_id": project_id, "hierarchy": hierarchy, "display_type": uom_type}
if filters:
input_data.update({"filters": filters})
downtime_response = connection_obj.post(path=EndpointConstants.hierarchy_downtime,
json={"project_id": project_id,
"hierarchy": hierarchy,
"display_type": uom_type}, cookies=cookies)
json=input_data, cookies=cookies)
response = downtime_response.json()
return response.get("data", 0)
except AuthenticationError:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment