Commit 50912331 authored by harshavardhan.c's avatar harshavardhan.c

removal of production_monitoring.py

parent 9c8c4d96
import time
from datetime import datetime
from typing import List
import pytz
from scripts.constants import OEETagMappingKeys
from scripts.core.engine.automation_engine import AutomationEngine
from scripts.db.redis_connections import live_tags_db
from scripts.logging import logger
from scripts.utils.common_utils import CommonUtils
from scripts.utils.db_name_util import get_project_specific_key
class ProductionMonitor:
def __init__(self, project_id=None):
self.automation_engine = AutomationEngine()
self.common_util = CommonUtils()
@staticmethod
def get_redis_data(tags_list: List, project_id: str):
tag_data = {}
redis_project_prefix = get_project_specific_key(project_id=project_id)
updated_tag_list = [f'{redis_project_prefix}{_tag}' for _tag in tags_list]
redis_response = live_tags_db.mget(updated_tag_list)
for _value in redis_response:
tag_data.update(zip(updated_tag_list, _value))
return tag_data
@staticmethod
def calculate_setup_time(production_start_time: datetime, tz, data):
while True:
try:
if (datetime.now(tz=pytz.timezone(tz)) - production_start_time).total_seconds() > 600:
return round((datetime.now(tz=pytz.timezone(tz)) - production_start_time).total_seconds() / 60)
if data.get(OEETagMappingKeys.OEE_MANUAL_MODE_TAG) == 0 and data.get(
OEETagMappingKeys.OEE_AUTO_MODE_TAG) == 1:
print("production started!!!")
return round((datetime.now(tz=pytz.timezone(tz)) - production_start_time).total_seconds() / 60)
time.sleep(1)
except Exception as e:
print(e)
def calculate_oee_params(self, data, tz):
try:
start_time = datetime.fromtimestamp(
data.get("prod_start_time") // 1000, tz=pytz.timezone(tz))
end_time = datetime.now(tz=pytz.timezone(tz))
available_time = (end_time - start_time).total_seconds() / 60
# if downtime > available_time:
downtime = self.common_util.get_downtime_details_by_hierarchy(hierarchy=data["hierarchy"],
project_id=data["project_id"],
user_id=data.get("trigger_by"))
operating_time = available_time - downtime
oee_tags_list = data.get("oee_tags_list")
project_id = data.get("project_id")
if not oee_tags_list:
return
tags_data = self.get_redis_data(tags_list=oee_tags_list, project_id=project_id)
good_count, units_produced = self.get_current_produced_count(input_data=tags_data)
if not good_count:
good_count = 0
if not units_produced:
units_produced = 0
productive_time = units_produced * (1 / data.get("cycle_time"))
performance = productive_time / operating_time
availability = operating_time / available_time
if units_produced:
quality = good_count / units_produced
else:
quality = 0
oee = availability * performance * quality
return oee * 100, availability * 100, performance * 100, quality * 100
except Exception as e:
logger.exception(f"Exception occurred while updating the production batch {e.args[0]}")
@staticmethod
def check_production_run(input_data: dict):
while True:
try:
if input_data.get(OEETagMappingKeys.OEE_INSPECTION_TAG, 0) > 0:
return True
time.sleep(1)
except Exception as e:
print(e)
@staticmethod
def get_current_produced_count(input_data: dict):
try:
return (
input_data.get(OEETagMappingKeys.OEE_GOOD_COUNT_TAG),
input_data.get(OEETagMappingKeys.OEE_NG_TAG, 0) + input_data.get(OEETagMappingKeys.OEE_GOOD_COUNT_TAG,
0)
)
except Exception as e:
print(e)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment