Commit 8323efc6 authored by hemanthkumar.pasham's avatar hemanthkumar.pasham

Merge branch 'develop' of...

Merge branch 'develop' of https://gitlab-pm.knowledgelens.com/Harshavardhan.C/oee-services into develop
parents 65322bc7 fae9999e
from scripts.utils.kafka_util import DataPush
if __name__ == '__main__':
from dotenv import load_dotenv
load_dotenv()
import os
import time
from datetime import datetime
import pytz
from production_monitoring import ProductionMonitor
prod_mon = ProductionMonitor()
data_push = DataPush()
tag_mapping = {
"oee": "site_100$dept_100$line_100$equipment_101$tag_215",
"availability": "site_100$dept_100$line_100$equipment_101$tag_216",
"performance": "site_100$dept_100$line_100$equipment_101$tag_217",
"quality": "site_100$dept_100$line_100$equipment_101$tag_218",
"running_lot": "site_100$dept_100$line_100$equipment_101$tag_219",
"running_item": "site_100$dept_100$line_100$equipment_101$tag_220",
"target": "site_100$dept_100$line_100$equipment_101$tag_222",
"downtime": "site_100$dept_100$line_100$equipment_101$tag_223",
"setup_time": "site_100$dept_100$line_100$equipment_101$tag_225",
"running_time": "site_100$dept_100$line_100$equipment_101$tag_226"
}
def oee_update():
data = prod_mon.oee_mongo.find_record_by_not_status("completed")
if not data:
print("No jobs are running, waiting for job to start...")
return
print(f"Calculating OEE for {data.get('job')}")
data_dict = {}
if data.get("run_start_time"):
run_start_time = datetime.fromtimestamp(
data.get("run_start_time") // 1000,
tz=pytz.timezone("Asia/Bangkok")
)
downtime = prod_mon.automation_engine.get_downtime(
run_start_time=run_start_time,
production_end_time=datetime.now(tz=pytz.timezone("Asia/Bangkok"))
)
else:
downtime = 0
from datetime import datetime, timedelta
from scripts.constants import CommonConstants, TagCategoryConstants
from scripts.core.engine.oee_calculator import OEEEngine
from scripts.core.handlers.batch_oee_calc_handler import CalculateBatchOEEHandler
from scripts.core.handlers.common_handler import CommonHandler
from scripts.logging import logger
from scripts.schemas.batch_oee import MachineOEERequest, BatchOEEData, OEEDataInsertRequest, OEEDataSaveRequest
from scripts.utils.common_utils import CommonUtils
from scripts.utils.kafka_util import DataPush
oee, availability, performance, quality = prod_mon.calculate_oee_params(data, downtime)
data_dict.update(
{
tag_mapping.get("running_lot"): data.get("job", ""), # job no
tag_mapping.get("running_item"): data.get("item", ""), # item no
tag_mapping.get("target"): data.get("qty_released", 0), # quality released
tag_mapping.get("oee"): oee,
tag_mapping.get("availability"): availability,
tag_mapping.get("performance"): performance,
tag_mapping.get("quality"): quality,
tag_mapping.get("downtime"): downtime,
tag_mapping.get("setup_time"): data.get("setup_time", 0),
tag_mapping.get("running_time"): data.get("running_time", 0),
class MachineOEECalculator:
def __init__(self, project_id=None):
self.common_util = CommonUtils()
self.batch_oee_handler = CalculateBatchOEEHandler()
self.common_handler = CommonHandler(project_id=project_id)
self.oee_engine = OEEEngine()
self.data_push = DataPush()
def calculate_machine_oee(self, request_data: MachineOEERequest):
try:
hierarchy_dict = self.common_handler.get_valid_oee_monitoring_hierarchy(project_id=request_data.project_id)
now = datetime.today() - timedelta(days=2)
oee_start_time = datetime.strptime(request_data.monitor_time, '%H:%M').replace(year=now.year,
month=now.month,
day=now.day).strftime(
CommonConstants.USER_META_TIME_FORMAT)
oee_end_time = datetime.now().strftime(CommonConstants.USER_META_TIME_FORMAT)
for k, v in hierarchy_dict.items():
site_id = k.split("$")[0]
downtime = self.common_util.get_downtime_details_by_hierarchy(
hierarchy=k, project_id=request_data.project_id)
input_data = OEEDataInsertRequest(prod_start_time=oee_start_time,
prod_end_time=oee_end_time, downtime=downtime,
hierarchy=k, cycle_time=os.environ.get("CYCLE_TIME", default=5),
tz=request_data.tz,
project_id=request_data.project_id)
input_data.total_units, input_data.reject_units = self.batch_oee_handler.get_data_for_tags(
input_data=input_data)
oee_response: BatchOEEData = self.oee_engine.start_batch_oee_calc(
request_data=OEEDataSaveRequest(**input_data.dict()))
data_dict = {
v[TagCategoryConstants.OEE_OUTPUT_CATEGORY]: oee_response.oee,
v[TagCategoryConstants.OEE_OUTPUT_PERFORMANCE_CATEGORY]: oee_response.performance,
v[TagCategoryConstants.OEE_OUTPUT_QUALITY_CATEGORY]: oee_response.quality,
v[TagCategoryConstants.OEE_OUTPUT_QUALITY_LOSS_CATEGORY]: oee_response.quality_loss,
v[TagCategoryConstants.OEE_OUTPUT_PERFORMANCE_LOSS_CATEGORY]: oee_response.performance_loss,
v[TagCategoryConstants.OEE_OUTPUT_AVAILABILITY_CATEGORY]: oee_response.availability,
v[TagCategoryConstants.OEE_OUTPUT_AVAILABILITY_LOSS_CATEGORY]: oee_response.availability_loss
}
)
message_dict = {
"data": data_dict,
"site_id": prod_mon.settings["automation"]["site_id"],
"site_id": site_id,
"gw_id": "",
"pd_id": "",
"p_id": prod_mon.settings["automation"]["project_id"],
"p_id": request_data.project_id,
"timestamp": int(time.time() * 1000),
"msg_id": 1,
"retain_flag": False
}
self.data_push.publish_message(message_dict)
data_push.publish_message(message_dict)
good_count, units_produced = prod_mon.get_current_produced_count()
running_time = (datetime.now() - datetime.fromtimestamp(data.get("start_time") / 1000)).total_seconds() / 60
mongo_data = {
"good_count": good_count,
"units_produced": units_produced,
"running_time": running_time
}
data.update(mongo_data)
prod_mon.oee_mongo.update_oee(data, data.get("job", ""), data.get("uf_process", ""), False)
except Exception as e:
logger.exception(f"Exception Occurred while calculating oee for the hierarchy {e.args}")
return
if __name__ == '__main__':
while True:
oee_update()
time.sleep(3)
projects_list = os.environ.get("OEE_PROJECTS", default="project_170")
monitor_start_time = os.environ.get("OEE_START_TIME", default="00:00")
for project in projects_list.split(","):
MachineOEECalculator().calculate_machine_oee(
request_data=MachineOEERequest(project_id=project, monitor_time="00:00",
tz="Asia/Kolkata"))
time.sleep(10)
import time
from datetime import datetime
from typing import List
import pytz
from scripts.constants import OEETagMappingKeys
from scripts.core.engine.automation_engine import AutomationEngine
from scripts.db.redis_connections import live_tags_db
from scripts.logging import logger
from scripts.utils.common_utils import CommonUtils
from scripts.utils.db_name_util import get_project_specific_key
class ProductionMonitor:
def __init__(self, project_id=None):
self.automation_engine = AutomationEngine()
self.common_util = CommonUtils()
@staticmethod
def get_redis_data(tags_list: List, project_id: str):
tag_data = {}
redis_project_prefix = get_project_specific_key(project_id=project_id)
updated_tag_list = [f'{redis_project_prefix}{_tag}' for _tag in tags_list]
redis_response = live_tags_db.mget(updated_tag_list)
for _value in redis_response:
tag_data.update(zip(updated_tag_list, _value))
return tag_data
@staticmethod
def calculate_setup_time(production_start_time: datetime, tz, data):
while True:
try:
if (datetime.now(tz=pytz.timezone(tz)) - production_start_time).total_seconds() > 600:
return round((datetime.now(tz=pytz.timezone(tz)) - production_start_time).total_seconds() / 60)
if data.get(OEETagMappingKeys.OEE_MANUAL_MODE_TAG) == 0 and data.get(
OEETagMappingKeys.OEE_AUTO_MODE_TAG) == 1:
print("production started!!!")
return round((datetime.now(tz=pytz.timezone(tz)) - production_start_time).total_seconds() / 60)
time.sleep(1)
except Exception as e:
print(e)
def calculate_oee_params(self, data, tz):
try:
start_time = datetime.fromtimestamp(
data.get("prod_start_time") // 1000, tz=pytz.timezone(tz))
end_time = datetime.now(tz=pytz.timezone(tz))
available_time = (end_time - start_time).total_seconds() / 60
# if downtime > available_time:
downtime = self.common_util.get_downtime_details_by_hierarchy(hierarchy=data["hierarchy"],
project_id=data["project_id"],
user_id=data.get("trigger_by"))
operating_time = available_time - downtime
oee_tags_list = data.get("oee_tags_list")
project_id = data.get("project_id")
if not oee_tags_list:
return
tags_data = self.get_redis_data(tags_list=oee_tags_list, project_id=project_id)
good_count, units_produced = self.get_current_produced_count(input_data=tags_data)
if not good_count:
good_count = 0
if not units_produced:
units_produced = 0
productive_time = units_produced * (1 / data.get("cycle_time"))
performance = productive_time / operating_time
availability = operating_time / available_time
if units_produced:
quality = good_count / units_produced
else:
quality = 0
oee = availability * performance * quality
return oee * 100, availability * 100, performance * 100, quality * 100
except Exception as e:
logger.exception(f"Exception occurred while updating the production batch {e.args[0]}")
@staticmethod
def check_production_run(input_data: dict):
while True:
try:
if input_data.get(OEETagMappingKeys.OEE_INSPECTION_TAG, 0) > 0:
return True
time.sleep(1)
except Exception as e:
print(e)
@staticmethod
def get_current_produced_count(input_data: dict):
try:
return (
input_data.get(OEETagMappingKeys.OEE_GOOD_COUNT_TAG),
input_data.get(OEETagMappingKeys.OEE_NG_TAG, 0) + input_data.get(OEETagMappingKeys.OEE_GOOD_COUNT_TAG,
0)
)
except Exception as e:
print(e)
if __name__ == '__main__':
from dotenv import load_dotenv
load_dotenv()
import asyncio
import logging
import time
from datetime import datetime
import pytz
from production_monitoring import ProductionMonitor
from scripts.config import read_settings
from scripts.core.engine.automation_engine import AutomationEngine
from scripts.core.handlers.form_handler import FormHandler
from scripts.db.databases import oee_session
from scripts.db.mongo.dbs.siam_oee import SiamOEE
from scripts.schemas.form import FormDetails, EndProductionRequest
from scripts.utils.kafka_util import DataPush
from scripts.utils.security.encryption import create_token
production_mon = ProductionMonitor()
automation_engine = AutomationEngine()
oee_mongo = SiamOEE()
form_handler = FormHandler()
data_push = DataPush()
settings = read_settings()
def calculate_oee_params(data, downtime):
start_time = datetime.fromtimestamp(
data.get("start_time") // 1000, tz=pytz.timezone("Asia/Bangkok"))
end_time = datetime.now(tz=pytz.timezone("Asia/Bangkok"))
available_time = (end_time - start_time).total_seconds() / 60
if downtime > available_time:
downtime = 0
operating_time = available_time - downtime
availability = operating_time / available_time
good_count, units_produced = production_mon.get_current_produced_count()
if not good_count:
good_count = 0
if not units_produced:
units_produced = 0
productive_time = units_produced * (1 / data.get("cycle_time"))
performance = productive_time / operating_time
if units_produced:
quality = good_count / units_produced
else:
quality = 0
oee = availability * performance * quality
return oee * 100, availability * 100, performance * 100, quality * 100
def update_oee():
tag_mapping = {
"oee": "site_100$dept_100$line_100$equipment_101$tag_215",
"availability": "site_100$dept_100$line_100$equipment_101$tag_216",
"performance": "site_100$dept_100$line_100$equipment_101$tag_217",
"quality": "site_100$dept_100$line_100$equipment_101$tag_218",
"running_lot": "site_100$dept_100$line_100$equipment_101$tag_219",
"running_item": "site_100$dept_100$line_100$equipment_101$tag_220",
"target": "site_100$dept_100$line_100$equipment_101$tag_222",
"downtime": "site_100$dept_100$line_100$equipment_101$tag_223"
}
data = oee_mongo.find_record_by_status("started")
if not data:
data = oee_mongo.find_record_by_status("producing")
if not data:
print("No data found, waiting for batch to start producing")
return
data_dict = {}
if data.get("run_start_time"):
run_start_time = datetime.fromtimestamp(data.get("run_start_time") // 1000, tz=pytz.timezone("Asia/Bangkok"))
downtime = automation_engine.get_downtime(
run_start_time=run_start_time,
production_end_time=datetime.now(tz=pytz.timezone("Asia/Bangkok"))
)
else:
downtime = 0
oee, availability, performance, quality = calculate_oee_params(data, downtime)
data_dict.update(
{
tag_mapping.get("running_lot"): data.get("job", ""), # job no
tag_mapping.get("running_item"): data.get("item", ""), # item no
tag_mapping.get("target"): data.get("qty_released", 0), # quality released
tag_mapping.get("oee"): oee,
tag_mapping.get("availability"): availability,
tag_mapping.get("performance"): performance,
tag_mapping.get("quality"): quality,
tag_mapping.get("downtime"): downtime,
}
)
message_dict = {
"data": data_dict,
"site_id": settings["automation"]["site_id"],
"gw_id": "",
"pd_id": "",
"p_id": settings["automation"]["project_id"],
"timestamp": int(time.time() * 1000),
"msg_id": 1,
"retain_flag": False
}
data_push.publish_message(message_dict)
def check_produce_start():
data = oee_mongo.find_record_by_status("running")
if not data:
print("No data found, waiting for batch to start running")
return
if data.get("prod_status") == "running":
print(f"{data.get('job')} is running ....")
if production_mon.check_production_run():
data["prod_status"] = "producing"
oee_mongo.update_oee(data, data.get("job", ""), data.get("uf_process", ""), False)
def check_production_end():
data = oee_mongo.find_record_by_status("producing")
if not data:
print("No data found, waiting for batch to start producing")
return
print(f"{data.get('job')} is producing ....")
if production_mon.check_production_end(data):
data["prod_status"] = "completed"
data["end_time"] = int(time.time() * 1000)
form_details = FormDetails(**data.get("form_details", {})).dict()
end_production_request = EndProductionRequest(**form_details, submitted_data=dict(data=data),
date=int(time.time() * 1000))
cookies = {"login-token": create_token()}
session = oee_session()
try:
asyncio.run(form_handler.end_production(end_production_request, session, cookies))
except Exception as e:
logging.exception(e)
return
oee_mongo.update_oee(data, data.get("job", ""), data.get("uf_process", ""), False)
del session
if __name__ == '__main__':
while True:
check_production_end()
time.sleep(10)
......@@ -75,6 +75,7 @@ class DBConstants:
collection_constants = "constants"
collection_tag_hierarchy = "tag_hierarchy"
collection_oee_layouts = "oee_layouts"
collection_lookup_table = "lookup_table"
class EndpointConstants:
......@@ -84,6 +85,18 @@ class EndpointConstants:
class TagCategoryConstants:
TOTAL_UNITS_CATEGORY = os.environ.get("TOTAL_UNITS_CATEGORY", default="OEE - Total Produced Units")
REJECT_UNITS_CATEGORY = os.environ.get("TOTAL_UNITS_CATEGORY", default="OEE - Reject Units")
OEE_OUTPUT_CATEGORY = os.environ.get("OEE_OUTPUT_CATEGORY", default="OEE Output Category")
OEE_OUTPUT_PERFORMANCE_CATEGORY = os.environ.get("OEE_OUTPUT_PERFORMANCE_CATEGORY",
default="OEE Output Performance Category")
OEE_OUTPUT_QUALITY_CATEGORY = os.environ.get("OEE_OUTPUT_QUALITY_CATEGORY", default="OEE Output Quality Category")
OEE_OUTPUT_AVAILABILITY_CATEGORY = os.environ.get("OEE_OUTPUT_AVAILABILITY_CATEGORY",
default="OEE Output Availability Category")
OEE_OUTPUT_PERFORMANCE_LOSS_CATEGORY = os.environ.get("OEE_OUTPUT_PERFORMANCE_CATEGORY",
default="OEE Output Performance Loss Category")
OEE_OUTPUT_QUALITY_LOSS_CATEGORY = os.environ.get("OEE_OUTPUT_QUALITY_LOSS_CATEGORY",
default="OEE Output Quality Loss Category")
OEE_OUTPUT_AVAILABILITY_LOSS_CATEGORY = os.environ.get("OEE_OUTPUT_AVAILABILITY_LOSS_CATEGORY",
default="OEE Output Availability Loss Category")
class CommonConstants:
......
from copy import deepcopy
import pandas as pd
from scripts.schemas.batch_oee import ChartResponse, ChartDBResponse
from scripts.config import DBConf
from scripts.core.engine.oee_calculator import OEETagFinder
from scripts.core.handlers.batch_oee_calc_handler import CalculateBatchOEEHandler
from scripts.core.handlers.common_handler import CommonHandler
from scripts.db.mongo.schema.tag_hierarchy import GetTagsLists, OutputTagsList
from scripts.errors import DataNotFound
from scripts.logging import logger
from scripts.schemas.batch_oee import ChartResponse, ChartDBResponse, ChartRequest, OEEDataInsertRequest
from scripts.utils.common_utils import CommonUtils
from scripts.utils.kairos_db_util import BaseQuery
from scripts.utils.kairos_db_util.df_formation_util import create_kairos_df
from scripts.utils.kairos_db_util.query_kairos import KairosQuery
class OEEAggregator:
def __init__(self):
def __init__(self, project_id=None):
self.common_util = CommonUtils()
self.base_query = BaseQuery()
self.oee_tag_finder = OEETagFinder()
self.common_handler = CommonHandler(project_id=project_id)
self.oee_handler = CalculateBatchOEEHandler(project_id=project_id)
def processor(self, data):
db_response = ChartDBResponse(**data)
......@@ -24,43 +40,52 @@ class OEEAggregator:
chart_response = ChartResponse(**db_response.dict())
return chart_response.dict()
@staticmethod
def aggregator(data, activity_length=1):
df = pd.DataFrame(data)
df["total_time"] = (df["batch_end_time"] - df["batch_start_time"]) / 60000
df["actual_cycle"] = df["total_units"] / df["total_time"]
df["ideal_cycle"] = df["cycle_time"]
df["good_units"] = df["total_units"] - df["reject_units"]
df["reject_time"] = df["reject_units"] * (1 / df["ideal_cycle"])
agg_oee = df.sum().round(2)
availability = (agg_oee["total_time"] - agg_oee["downtime"]) / agg_oee["total_time"]
performance = agg_oee["productive_time"] / (
agg_oee["total_time"] - agg_oee["downtime"]
)
quality = (agg_oee["total_units"] - agg_oee["reject_units"]) / agg_oee[
"total_units"
]
oee_overall = round(availability * performance * quality, 2) * 100
availability_loss = agg_oee["downtime"] / agg_oee["total_time"] * 100
quality_loss = agg_oee["reject_time"] / agg_oee["total_time"] * 100
chart_response = ChartResponse(
total_units=round(agg_oee["total_units"] - (len(df) * activity_length)),
reject_units=agg_oee["reject_units"],
oee=oee_overall,
availability=round(availability * 100, 2),
downtime=agg_oee["downtime"],
performance=round(performance * 100, 2),
quality=round(quality * 100, 2),
actual_cycle=agg_oee["actual_cycle"],
ideal_cycle=agg_oee["ideal_cycle"],
good_units=round(agg_oee["good_units"] - (len(df) * activity_length)),
availability_loss=availability_loss,
quality_loss=quality_loss,
performance_loss=round(100 - availability_loss - quality_loss - oee_overall, 2),
total_time=agg_oee["total_time"],
productive_time=agg_oee["productive_time"],
def aggregator(self, request_data: ChartRequest):
try:
start_time = int(self.common_util.pendulum_conversion(request_data.queryDate[0], tz=request_data.tz,
timestamp=True)) * 1000
end_time = int(self.common_util.pendulum_conversion(request_data.queryDate[-1], tz=request_data.tz,
timestamp=True)) * 1000
hierarchy_tags = self.common_handler.tag_hierarchy_handler.get_tags_list_by_hierarchy(
GetTagsLists(**request_data.dict()))
total_units_tag_id = self.oee_tag_finder.get_total_units_tag_id(input_data=hierarchy_tags)
reject_units_tag_id = self.oee_tag_finder.get_reject_units_tag_id(input_data=hierarchy_tags)
output_tags_dict = self.common_handler.tag_hierarchy_handler.get_output_tags_for_oee(
input_data=OutputTagsList(**request_data.dict()))
if not output_tags_dict or not output_tags_dict.get(request_data.hierarchy):
return {}
updated_dict = self.common_handler.validate_hierarchy_tags(output_tags_dict[request_data.hierarchy])
new_columns_dict = self.common_handler.get_oee_keys_mapping_dict(output_tags_dict[request_data.hierarchy])
tags_list = list(updated_dict.values())
group_by_tags_list = deepcopy(tags_list)
group_by_tags_list.append(DBConf.KAIROS_DEFAULT_FULL_TAG)
tags_list.extend([total_units_tag_id, reject_units_tag_id])
if not tags_list:
return {}
kairos_util = KairosQuery(url=DBConf.KAIROS_URL)
data = kairos_util.query(
self.base_query.form_generic_query(tags_list=tags_list,
project_id=request_data.project_id,
start_epoch=start_time, end_epoch=end_time))
master_df = pd.DataFrame()
data = [data] if not isinstance(data, list) else data
for each_data in data:
master_df = create_kairos_df(
master_df=master_df,
response_data=each_data,
tags_list=tags_list,
group_by_tags=group_by_tags_list,
tz=request_data.tz
)
filtered = chart_response.dict()
remove_keys = ["productive_time", "downtime", "reject_units"]
[filtered.pop(each, None) for each in remove_keys]
return filtered
if master_df.empty:
raise DataNotFound
master_df_columns = list(master_df.columns)
input_data = {"prod_start_time": start_time, "prod_end_time": end_time}
input_data.update(request_data.dict(exclude_none=True))
input_schema = OEEDataInsertRequest(**input_data)
input_schema.total_units, input_schema.reject_units = self.oee_handler.get_data_for_tags(
input_data=input_schema)
except Exception as e:
logger.execption(f'Exception occurred while plotting the dashboard {e.args}')
......@@ -95,23 +95,17 @@ class APIHandler:
if not request_data.hierarchy:
return dict()
chart_maker = ChartMaker()
if request_data.reference_id:
data = table_obj.get_chart_data(
hierarchy=request_data.hierarchy,
prod_start_time=request_data.queryDate[0],
prod_end_time=request_data.queryDate[1],
reference_id=request_data.reference_id,
aggregation=request_data.aggregation,
tz=request_data.tz
reference_id=request_data.reference_id
)
if not request_data.aggregation or len(data) == 1:
if isinstance(data, list):
data = data[0]
raw_data = self.oee_agg.processor(data)
return chart_maker.main_creator(raw_data, overall=False)
elif len(data) == 0:
return dict()
else:
agg_data = self.oee_agg.aggregator(data)
agg_data = self.oee_agg.aggregator(request_data=request_data)
return chart_maker.main_creator(agg_data)
except Exception as e:
......
......@@ -9,6 +9,7 @@ from scripts.config import DBConf
from scripts.constants import ResponseCodes, CommonConstants, TagCategoryConstants
from scripts.core.engine.oee_calculator import OEEEngine, OEETagFinder
from scripts.core.handlers.tag_handler import TagHierarchyHandler
from scripts.db.mongo.schema.tag_hierarchy import GetTagsLists
from scripts.db.psql.oee_discrete import DiscreteOEE
from scripts.db.redis_connections import oee_production_db
from scripts.errors import DataNotFound
......@@ -125,9 +126,7 @@ class CalculateBatchOEEHandler:
end_epoch = int(
datetime.strptime(input_data.prod_end_time, CommonConstants.USER_META_TIME_FORMAT).astimezone(
tz=pytz.timezone(input_data.tz)).timestamp()) * 1000
# hierarchy_tags = self.tag_hierarchy_handler.get_tags_list_by_hierarchy(GetTagsLists(**input_data.dict()))
hierarchy_tags = {TagCategoryConstants.TOTAL_UNITS_CATEGORY: "site_114$line_1306$equipment_5812$tag_100",
TagCategoryConstants.REJECT_UNITS_CATEGORY: "site_114$line_1306$equipment_5812$tag_60538"}
hierarchy_tags = self.tag_hierarchy_handler.get_tags_list_by_hierarchy(GetTagsLists(**input_data.dict()))
total_units_tag_id = self.oee_tag_finder.get_total_units_tag_id(input_data=hierarchy_tags)
reject_units_tag_id = self.oee_tag_finder.get_reject_units_tag_id(input_data=hierarchy_tags)
kairos_util = KairosQuery(url=DBConf.KAIROS_URL)
......@@ -155,4 +154,5 @@ class CalculateBatchOEEHandler:
reject_units_value = master_df[f'{reject_units_tag_id}_diff'].sum()
return total_units_value, reject_units_value
except Exception as e:
logger.exception(f'Exception occurred while fetching tag details{e.args}')
raise
import os
from scripts.constants import TagCategoryConstants
from scripts.constants.db_connections import mongo_client
from scripts.core.handlers.tag_handler import TagHierarchyHandler
from scripts.db.mongo.ilens_configuration.collections.lookup_table import LookupTable
from scripts.db.mongo.schema.tag_hierarchy import OutputTagsList
from scripts.logging import logger
class CommonHandler:
def __init__(self, project_id=None):
self.lookup_table = LookupTable(project_id=project_id, mongo_client=mongo_client)
self.tag_hierarchy_handler = TagHierarchyHandler(project_id=project_id)
def fetch_oee_hierarchy_from_look_up(self, project_id):
oee_lookup_name = os.environ.get("OEE_LOOKUP_NAME", default="oee_monitoring")
try:
lookup_data = self.lookup_table.map_lookup_keys(lookup_name=oee_lookup_name, project_id=project_id)
if not lookup_data:
logger.debug(f"Lookup details not found for OEE")
return {}
return lookup_data
except Exception as e:
logger.exception(f"Failed to fetch hierarchy details from lookup {e.args}")
return {}
def get_valid_oee_monitoring_hierarchy(self, project_id):
valid_hierarchies_dict = {}
try:
oee_lookup_dict = self.fetch_oee_hierarchy_from_look_up(project_id=project_id)
hierarchy_list = list(oee_lookup_dict.values())
if not hierarchy_list:
logger.debug(f'Hierarchy details not found for the project {project_id} for OEE Monitoring!!!!')
return {}
tags_dict_by_hierarchy = self.tag_hierarchy_handler.get_output_tags_for_oee(
OutputTagsList(project_id=project_id, hierarchy_list=hierarchy_list))
for each_hierarchy in tags_dict_by_hierarchy:
if not tags_dict_by_hierarchy.get(each_hierarchy):
logger.debug(f'Tag details not found for the hierarchy {each_hierarchy} for OEE Monitoring!!!!')
continue
updated_dict = self.validate_hierarchy_tags(tags_dict_by_hierarchy[each_hierarchy])
if not updated_dict:
continue
valid_hierarchies_dict.update({each_hierarchy: updated_dict})
return valid_hierarchies_dict
except Exception as e:
logger.exception(f"Exception Occurred while fetching hierarchy details for monitoring oee {e.args}")
return {}
@staticmethod
def validate_hierarchy_tags(request_dict: dict):
hierarchy_dict = {}
category_constants_list = [v for k, v in TagCategoryConstants.__dict__.items() if not k.startswith("__")]
try:
for _each_tag in category_constants_list:
if _each_tag in {TagCategoryConstants.TOTAL_UNITS_CATEGORY, TagCategoryConstants.REJECT_UNITS_CATEGORY}:
continue
if not request_dict.get(_each_tag):
logger.debug(f"OEE Output Tag not configured for the hierarchy{list(request_dict.keys())[0]}")
return {}
hierarchy_dict.update({_each_tag: request_dict[_each_tag]})
return hierarchy_dict
except Exception as e:
logger.exception(f'Exception Occurred while validating the tags for a hierarchy {e.args}')
return {}
@staticmethod
def get_oee_keys_mapping_dict(input_dict: dict):
final_dict = {}
for k, v in input_dict.items():
if k == TagCategoryConstants.TOTAL_UNITS_CATEGORY:
final_dict.update({v: "total_units"})
elif k == TagCategoryConstants.REJECT_UNITS_CATEGORY:
final_dict.update({v: "reject_units"})
elif k == TagCategoryConstants.OEE_OUTPUT_CATEGORY:
final_dict.update({v: "oee"})
elif k == TagCategoryConstants.OEE_OUTPUT_AVAILABILITY_CATEGORY:
final_dict.update({v: "availability"})
elif k == TagCategoryConstants.OEE_OUTPUT_PERFORMANCE_CATEGORY:
final_dict.update({v: "performance"})
elif k == TagCategoryConstants.OEE_OUTPUT_QUALITY_CATEGORY:
final_dict.update({v: "quality"})
elif k == TagCategoryConstants.OEE_OUTPUT_QUALITY_LOSS_CATEGORY:
final_dict.update({v: "quality_loss"})
elif k == TagCategoryConstants.OEE_OUTPUT_AVAILABILITY_LOSS_CATEGORY:
final_dict.update({v: "availability_loss"})
return final_dict
from datetime import datetime
import pandas as pd
import pytz
from sqlalchemy import create_engine
from scripts.config import DBConf
from scripts.config import Metadata
from scripts.core.engine.automation_engine import AutomationEngine
from scripts.core.handlers.batch_oee_calc_handler import CalculateBatchOEEHandler, OEEDataInsertRequest
from scripts.db.mongo.dbs.siam_oee import SiamOEE
from scripts.db_layer.job_table import JobTable
from scripts.logging.logging import logger as logging
from scripts.models.db_models import JobTable as JobSkeleton
from scripts.schemas.form import (GetRDBValues, CustomQuery, StartProductionRequest, EndProductionRequest,
StartProdJobModel, EndProdJobModel, CalculateOEE, FormSaveRequest, FormDetails,
EndProdJobDB)
from scripts.utils.common_utils import CommonUtils
class FormHandler:
def __init__(self):
self.oee_mongo = SiamOEE()
self.automation_engine = AutomationEngine()
@staticmethod
async def fetch_last_values(request_data: GetRDBValues):
try:
base_engine = create_engine(f"{DBConf.CLIENT_URI}{request_data.db_name}")
query = f"SELECT * from {request_data.table_name}"
if request_data.primary_conditions:
query += " WHERE "
for column, val in request_data.primary_conditions.items():
query += f"{column}='{val}'"
query += " LIMIT 1"
table_data_df = pd.read_sql(query, base_engine)
del base_engine
table_data_df.rename(columns=request_data.column_to_property, inplace=True)
return table_data_df.to_dict(orient="records")[0]
except Exception as e:
logging.exception(e)
@staticmethod
async def custom_query_fetch(request_data: CustomQuery):
try:
base_engine = create_engine(f"{DBConf.CLIENT_URI}{request_data.db_name}")
table_data_df = pd.read_sql(request_data.query, base_engine)
del base_engine
table_data_df.rename(columns=request_data.column_to_property, inplace=True)
automation_eng = AutomationEngine()
current_time = datetime.now()
abs_start_time = automation_eng.get_absolute_start_time(button_click_time=current_time)
erp_table_data = table_data_df.to_dict(orient="records")[0]
erp_table_data.update({
"start_time": abs_start_time.strftime("%Y-%m-%d %H:%M")
})
return erp_table_data
except Exception as e:
logging.exception(e)
async def start_production(self, request_data: StartProductionRequest, db_session):
try:
table_data = JobTable(db_session)
request_data.submitted_data.update(tz=request_data.tz)
job_model = StartProdJobModel(**request_data.submitted_data.get("data", {}))
job_data = job_model.dict(exclude_none=True)
job_data.pop("tz", None)
row_data = JobSkeleton(**job_data)
table_data.add_data(row_data)
# TODO: Create mongo record with job details
job_data.update(form_details=FormDetails(**request_data.dict()).dict(),
prod_status="started")
self.oee_mongo.update_oee(job_data, job_model.job, job_model.uf_process)
except Exception as e:
logging.exception(e)
async def end_production(self, request_data: EndProductionRequest, db_session, request_cookies):
try:
table_data = JobTable(db_session)
job_model, db_data = await self.get_job_data(request_data)
table_data.update_record(job_model.job, job_model.uf_process, db_data.dict(exclude_none=True))
calculate_oee_payload = CalculateOEE(batch_start_time=job_model.prod_start_time,
batch_end_time=job_model.prod_end_time,
batch_id=job_model.job,
setup_time=job_model.setup_time,
cycle_time=job_model.cycle_time,
total_units=job_model.qty_released)
calculate_oee_payload.downtime = await self.get_oee_downtime(request_data.submitted_data["data"],
job_model.prod_end_time, job_model.tz)
_ = await CalculateBatchOEEHandler().calculate_oee(db_session, OEEDataInsertRequest(
**calculate_oee_payload.dict()))
form_response = await self.save_to_form(request_data, request_cookies, job_model)
logging.info(f"FORM SAVE RESPONSE, {form_response}")
return "Form values updated successfully"
except Exception as e:
logging.exception(e)
return f"Server encountered an error during op: {e}"
async def get_job_data(self, request_data: EndProductionRequest):
request_data.submitted_data.update(tz=request_data.tz)
form_data = request_data.submitted_data.get("data", {})
job = form_data.get("job")
uf_process = form_data.get("uf_process")
data_from_mongo = self.oee_mongo.find_record(job, uf_process)
form_data.update(data_from_mongo)
job_model = EndProdJobModel(**form_data)
if not job_model.setup_time:
job_model.setup_time = 0
if data_from_mongo.get("units_produced"):
job_model.qty_released = data_from_mongo.get("units_produced")
db_data = EndProdJobDB(**job_model.dict())
return job_model, db_data
async def save_to_form(self, request_data: EndProductionRequest,
request_cookies,
job_model: EndProdJobModel):
end_date_time = datetime.fromtimestamp(job_model.prod_end_time // 1000, tz=pytz.timezone(request_data.tz))
end_str = end_date_time.strftime("%Y-%m-%d %H:%M")
start_date_time = datetime.fromtimestamp(job_model.prod_start_time // 1000, tz=pytz.timezone(request_data.tz))
start_str = start_date_time.strftime("%Y-%m-%d %H:%M")
tag_data = self.automation_engine.get_all_tags(end_date_time)
tag_data.update(**job_model.dict(exclude_none=True))
form_save_payload = FormSaveRequest(**request_data.dict())
form_save_payload.submitted_data["data"].update(**tag_data)
form_save_payload.submitted_data["data"].update(end_time=end_str, start_time=start_str)
form_response = await CommonUtils.hit_external_service(api_url=f"{Metadata.FORM_API}render/form?save=True",
payload=form_save_payload.dict(),
request_cookies=request_cookies)
form_save_payload.submitted_data["data"].update({"prod_status": "completed"})
self.oee_mongo.update_oee(form_save_payload.submitted_data["data"], job_model.job, job_model.uf_process)
return form_response
async def get_oee_downtime(self, data, end_time, tz):
if isinstance(end_time, int):
end_time = datetime.fromtimestamp(end_time // 1000, tz=pytz.timezone(tz))
run_start_time = data.get("start_time") if not data.get("run_start_time") else data.get("run_start_time")
if isinstance(run_start_time, int):
run_start_time = datetime.fromtimestamp(run_start_time // 1000, tz=pytz.timezone(tz))
else:
run_start_time = datetime.strptime(run_start_time, "%Y-%m-%d %H:%M")
run_start_time = run_start_time.replace(tzinfo=pytz.timezone(tz))
return self.automation_engine.get_downtime(run_start_time, end_time)
......@@ -10,7 +10,7 @@ class LayoutHandler:
async def save_layout(self, layout_request: SaveLayoutRequest):
try:
data = self.oee_layout_conn.update_layout(
data=layout_request.dict(), project_id=layout_request.project_id
data=layout_request.dict(), project_id=layout_request.project_id, upsert=True
)
return data
except Exception:
......
import re
from scripts.constants.db_connections import mongo_client
from scripts.db.mongo.ilens_configuration.aggregations.tag_hierarchy import TagHierarchyAggregate
from scripts.db.mongo.ilens_configuration.collections.tag_hierarchy import TagHierarchy
from scripts.db.mongo.schema.tag_hierarchy import GetTagsLists
from scripts.db.mongo.schema.tag_hierarchy import GetTagsLists, OutputTagsList
from scripts.logging import logger
class TagHierarchyHandler:
......@@ -11,9 +14,29 @@ class TagHierarchyHandler:
def get_tags_list_by_hierarchy(self, input_data: GetTagsLists):
try:
aggregate_query = TagHierarchyAggregate.tag_aggregate(project_id=input_data.project_id,
hierarchy_id=input_data.hierarchy)
tag_lists = self.tag_hierarchy_conn.get_tag_hierarchy_by_aggregate(query=aggregate_query)
tag_lists = tag_lists[0] if tag_lists else {}
return tag_lists
except Exception:
hierarchy_id=re.escape(f'{input_data.hierarchy}$tag'))
tags_list = self.tag_hierarchy_conn.get_tag_hierarchy_by_aggregate(query=aggregate_query)
tags_list = tags_list[0] if tags_list else {}
return tags_list
except Exception as e:
logger.exception(f"failed to fetch tags_list by hierarchy {e.args}")
raise
def get_output_tags_for_oee(self, input_data: OutputTagsList):
try:
if input_data.hierarchy_list:
hierarchy_str = re.escape("|".join([f'{_each}$tag' for _each in input_data.hierarchy_list]))
elif input_data.hierarchy_level:
hierarchy_str = input_data.hierarchy_level
elif input_data.hierarchy:
hierarchy_str = re.escape(f'{input_data.hierarchy}$tag')
else:
return {}
aggregate_query = TagHierarchyAggregate.tag_aggregate_by_hierarchy_list(project_id=input_data.project_id,
hierarchy_str=hierarchy_str)
tags_list = self.tag_hierarchy_conn.get_tag_hierarchy_by_aggregate(query=aggregate_query)
tags_list = tags_list[0] if tags_list else {}
return tags_list
except Exception as e:
logger.exception(f"failed to fetch output tags list by hierarchy {e.args}")
raise
......@@ -6,7 +6,7 @@ class TagHierarchyAggregate:
'$match': {
'project_id': project_id,
'id': {
'$regex': f'{hierarchy_id}\\$tag'
'$regex': f'{hierarchy_id}'
}
}
}, {
......@@ -25,7 +25,7 @@ class TagHierarchyAggregate:
'_id': None,
'data': {
'$push': {
'k': '$tag_category.tag_category_name',
'k': {'$ifNull': ['$tag_category.tag_category_name', ""]},
'v': '$id'
}
}
......@@ -39,3 +39,116 @@ class TagHierarchyAggregate:
}
]
return query
@staticmethod
def tag_aggregate_by_hierarchy_list(project_id, hierarchy_str):
query_json = [
{
'$match': {
'project_id': project_id,
'id': {
'$regex': hierarchy_str
}
}
}, {
'$addFields': {
'new_hierarchy': {
'$slice': [
{
'$split': [
'$id', {
'$literal': '$'
}
]
}, 0, {
'$subtract': [
{
'$size': {
'$split': [
'$id', {
'$literal': '$'
}
]
}
}, 1
]
}
]
}
}
}, {
'$addFields': {
'hierarchy_id': {
'$reduce': {
'input': '$new_hierarchy',
'initialValue': '',
'in': {
'$cond': {
'if': {
'$eq': [
{
'$indexOfArray': [
'$new_hierarchy', '$$this'
]
}, 0
]
},
'then': {
'$concat': [
'$$value', '$$this'
]
},
'else': {
'$concat': [
'$$value', {
'$literal': '$'
}, '$$this'
]
}
}
}
}
}
}
}, {
'$lookup': {
'from': 'tag_category',
'localField': 'tag_category_id',
'foreignField': 'tag_category_id',
'as': 'tag_category'
}
}, {
'$unwind': {
'path': '$tag_category'
}
}, {
'$group': {
'_id': '$hierarchy_id',
'data': {
'$push': {
'k': {'$ifNull': ['$tag_category.tag_category_name', ""]},
'v': '$id'
}
}
}
}, {
'$group': {
'_id': None,
'data': {
'$push': {
'k': '$_id',
'v': {
'$arrayToObject': '$data'
}
}
}
}
}, {
'$replaceRoot': {
'newRoot': {
'$arrayToObject': '$data'
}
}
}
]
return query_json
......@@ -26,7 +26,7 @@ class LookupTable(MongoCollectionBaseClass):
@property
def key_type(self):
return CommonKeys.KEY_TYPE
return "lookup_name"
def find_constant_by_dict(self, _type):
"""
......
from typing import Optional, List
from pydantic import BaseModel
class GetTagsLists(BaseModel):
project_id: str
hierarchy: str
class OutputTagsList(BaseModel):
project_id: str
hierarchy_list: Optional[List]
hierarchy_level: Optional[str]
hierarchy: Optional[str]
class Config:
schema_extra = {
"example": {
"project_id": "project_099",
"hierarchy_list": ["site_101", "site_102"],
"hierarchy_level": "equipment"
}
}
from fastapi.encoders import jsonable_encoder
from sqlalchemy import func
from sqlalchemy.orm import Session, defer
from sqlalchemy.orm import Session
from scripts.db.db_models import OEEDiscreteTable
from scripts.errors import ILensError
......@@ -97,10 +97,9 @@ class DiscreteOEE(SQLDBUtils):
raise
def get_chart_data(
self, prod_start_time, prod_end_time, hierarchy, reference_id, tz, aggregation=False
self, prod_start_time, prod_end_time, hierarchy, reference_id
):
try:
if not aggregation:
data = (
self.session.query(self.table)
.filter(
......@@ -113,22 +112,6 @@ class DiscreteOEE(SQLDBUtils):
)
if data:
return jsonable_encoder(data)
else:
data = (
self.session.query(self.table)
.filter(
self.table.hierarchy == hierarchy,
self.table.prod_start_time >= prod_start_time,
self.table.prod_end_time <= prod_end_time,
)
.options(
defer(self.table.hierarchy),
defer(self.table.batch_id),
defer(self.table.uom),
)
)
if data:
return [jsonable_encoder(each) for each in data]
raise ILensError("Record(s) not found")
except Exception as e:
logger.exception(e)
......
from datetime import datetime
from typing import Optional, Union, List
from pydantic import BaseModel, validator
......@@ -42,10 +43,10 @@ class WaterFallChart(BaseModel):
class ChartRequest(BaseModel):
project_id: str
queryDate: List[str]
queryDate: List[str] = [datetime.now().replace(hour=00, minute=00, second=00),
datetime.now().replace(hour=23, minute=59, second=59)]
hierarchy: Optional[str]
reference_id: Optional[str]
aggregation: Optional[bool] = False
tz: Optional[str] = "Asia/kolkata"
class Config:
......@@ -70,7 +71,6 @@ class ChartDBResponse(BaseModel):
reject_units: int
oee: int
availability: float
downtime: int
performance: int
performance_loss: float
quality: int
......@@ -228,3 +228,9 @@ class GetOeeServices(BaseModel):
project_id: str
meta: dict
oee_tag_mapping: str
class MachineOEERequest(BaseModel):
project_id: str
monitor_time: Optional[str] = "00:00"
tz: Optional[str] = "Asia/Kolkata"
from fastapi import APIRouter
from scripts.logging import logger
from scripts.constants import Endpoints
from scripts.schemas.batch_oee import GetOeeServices
from scripts.core.handlers import oee_handlers
from scripts.logging import logger
from scripts.schemas.batch_oee import GetOeeServices
oee_services = APIRouter(prefix=Endpoints.oee_services, tags=["OEE Calculator"])
......@@ -11,7 +12,7 @@ oee_services = APIRouter(prefix=Endpoints.oee_services, tags=["OEE Calculator"])
async def oee_tag_mapping(oee_tag_mapping_list: GetOeeServices):
try:
tag_mapping = oee_tag_mapping_list.dict()
result = oee_handlers.Oee_Services.oee_tag_mapping(oee_tag_mapping_list)
result = oee_handlers.OEEServices.oee_tag_mapping(oee_tag_mapping_list)
return result
except Exception as e:
......@@ -22,7 +23,7 @@ async def oee_tag_mapping(oee_tag_mapping_list: GetOeeServices):
async def get_oee_tag_mapping(get_oee_tags: GetOeeServices):
try:
get_oee_tags = get_oee_tags.dict()
return_json = oee_handlers.Oee_Services.get_oee_tag_mapping(get_oee_tags)
return_json = oee_handlers.OEEServices.get_oee_tag_mapping(get_oee_tags)
return return_json
except Exception as e:
......@@ -33,7 +34,7 @@ async def get_oee_tag_mapping(get_oee_tags: GetOeeServices):
async def delete_oee_tagging(delete_oee_tags: GetOeeServices):
try:
delete_oee_tags = delete_oee_tags.dict()
return_json = oee_handlers.Oee_Services.delete_oee_tags(project_id=delete_oee_tags["project_id"])
return_json = oee_handlers.OEEServices.delete_oee_tags(project_id=delete_oee_tags["project_id"])
return return_json
except Exception as e:
......@@ -44,7 +45,7 @@ async def delete_oee_tagging(delete_oee_tags: GetOeeServices):
async def update_oee_tagging(update_oee_tags: GetOeeServices):
try:
update_oee_tags = update_oee_tags.dict()
return_json = oee_handlers.Oee_Services.update_oee_tags(update_oee_tags)
return_json = oee_handlers.OEEServices.update_oee_tags(update_oee_tags)
return return_json
except Exception as e:
......
......@@ -5,7 +5,7 @@ import pytz
import shortuuid
from scripts.config import PathToServices
from scripts.constants import Secrets, EndpointConstants, UOM
from scripts.constants import Secrets, EndpointConstants, UOM, CommonConstants
from scripts.db.redis_connections import project_details_db
from scripts.logging import logger
from scripts.utils.auth_util import ILensRequest, AuthenticationError
......@@ -103,6 +103,12 @@ class CommonUtils:
localized_dt = localized_tz.localize(datetime_with_tz)
return localized_dt
@staticmethod
def pendulum_conversion(date_str, tz, output_format=CommonConstants.USER_META_TIME_FORMAT, timestamp=False):
if timestamp:
return pendulum.parse(date_str, tz=tz).timestamp()
return pendulum.parse(date_str, tz=tz).strftime(output_format)
@staticmethod
def get_next_id(_=None) -> str:
return shortuuid.uuid()
......
from ilens_kafka_publisher.v2 import KafkaPublisher
from scripts.config import KafkaConf
from scripts.db.redis_connections import partition_db
from scripts.logging import logger
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment