Commit d8a46221 authored by hemanthkumar.pasham's avatar hemanthkumar.pasham

Added healthcheck

parents 8323efc6 0d043bc9
# siam-custom
# OEE Services
OEE Monitoring
Custom OEE project for Siam
![js-standard-style](https://img.shields.io/badge/OEE%20Monitoring%20-Beta-yellow?style=for-the-badge&logo=python&logoColor=blue)
[![FastAPI 0.73.0](https://img.shields.io/badge/FastAPI-005571?style=for-the-badge&logo=fastapi)](https://fastapi.tiangolo.com/)
[![Python 3.9](https://img.shields.io/badge/python-3.9+-blue.svg)](https://www.python.org/downloads/release/python-390/)
[![pydantic 1.9](https://img.shields.io/badge/Pydantic-1.9-pink.svg)](https://pypi.org/project/pydantic/)
<img src="https://unifytwin.com/assets/imgs/UT_logo.png" width="300" align="right"></img>
# UnifyTwin OEE v6.5
By [Unify Twin](https://unifytwin.com)
OEE is used for monitoring and tracking performance of the machine.
### Technologies
Developed using following libraries:
- Python 3.9
## Getting Started
The instructions below will get you a copy of the project up and running on your local machine for development and testing purposes.
*See deployment section for notes on how to deploy the project on a live system.*
### Prerequisites
**Note:** This project requires **python >=v3.9**.
### Setting Up Development Environment
Follow the instructions below to setup development environment.
**Install Python Package dependencies**
```bash
pip install -r requirements.txt
```
### Environment variables
Add environment variables in .env file to run application.
**Start the Application**
```bash
python app.py --port <port_number>
# or
uvicorn main:app --host 0.0.0.0 --port <port> --proxy-headers
```
**Start OEE Agent**
```bash
python live_dashboard.py --project_id <project_id>
```
Not specifying the port will make the app start in the port specified in the config file.
## Authors
* **Irfanuddin**
* **Harshavardhan**
* **Irfan Rayachuru**
* **Sudheer**
## Maintainers
* **Irfanuddin**
* **Harshavardhan**
Release Note:
1.OEE monitoring with tags
2.Live OEE for equipment level
Feature:
......
......@@ -2,19 +2,30 @@ if __name__ == '__main__':
from dotenv import load_dotenv
load_dotenv()
import argparse
import os
import time
from datetime import datetime, timedelta
from datetime import datetime
import pytz
from scripts.constants import CommonConstants, TagCategoryConstants
from scripts.constants.db_connections import mongo_client
from scripts.core.engine.oee_calculator import OEEEngine
from scripts.core.handlers.batch_oee_calc_handler import CalculateBatchOEEHandler
from scripts.core.handlers.common_handler import CommonHandler
from scripts.db.mongo.ilens_configuration.aggregations.customer_projects import ProjectAggregate
from scripts.db.mongo.ilens_configuration.collections.customer_projects import CustomerProjects
from scripts.errors import DataNotFound
from scripts.logging import logger
from scripts.schemas.batch_oee import MachineOEERequest, BatchOEEData, OEEDataInsertRequest, OEEDataSaveRequest
from scripts.utils.common_utils import CommonUtils
from scripts.utils.kafka_util import DataPush
customer_conn = CustomerProjects(mongo_client=mongo_client)
customer_agg = ProjectAggregate()
class MachineOEECalculator:
def __init__(self, project_id=None):
......@@ -25,36 +36,64 @@ class MachineOEECalculator:
self.data_push = DataPush()
def calculate_machine_oee(self, request_data: MachineOEERequest):
try:
hierarchy_dict = self.common_handler.get_valid_oee_monitoring_hierarchy(project_id=request_data.project_id)
now = datetime.today() - timedelta(days=2)
hierarchy_dict = self.common_handler.get_valid_oee_monitoring_hierarchy(request_data)
now = datetime.today()
start_timestamp = int(datetime.strptime(request_data.monitor_time, '%H:%M').replace(year=now.year,
month=now.month,
day=now.day).timestamp()) * 1000
oee_start_time = datetime.strptime(request_data.monitor_time, '%H:%M').replace(year=now.year,
month=now.month,
day=now.day).strftime(
CommonConstants.USER_META_TIME_FORMAT)
oee_end_time = datetime.now().strftime(CommonConstants.USER_META_TIME_FORMAT)
curr_end_time = datetime.now().astimezone(tz=pytz.timezone(request_data.tz))
curr_timestamp = int(curr_end_time.timestamp()) * 1000
oee_end_time = curr_end_time.strftime(CommonConstants.USER_META_TIME_FORMAT)
for k, v in hierarchy_dict.items():
site_id = k.split("$")[0]
if not v.get(TagCategoryConstants.OEE_CYCLE_DESIGN_CATEGORY):
logger.debug(f"Design Parameter(CYCLE TIME) not found for selected hierarchy {v}")
continue
cycle_time_id = v[TagCategoryConstants.OEE_CYCLE_DESIGN_CATEGORY].split("$")[-1]
cycle_time = self.common_handler.get_cycle_time_value_from_hierarchy(tag_id=cycle_time_id)
if isinstance(cycle_time, bool) and not cycle_time:
logger.debug(f"OEE Cycle Design parameter details not found for selected hierarchy")
continue
cal_type = self.common_util.get_uom_type(uom_type=os.environ.get("DEFAULT_UOM_TYPE", default="mins"))
downtime = self.common_util.get_downtime_details_by_hierarchy(
hierarchy=k, project_id=request_data.project_id)
hierarchy=k, project_id=request_data.project_id, uom_type=cal_type,
filters={"start_time": [start_timestamp, curr_timestamp]})
if downtime is None:
logger.debug("Downtime for selected hierarchy got None, Updating to zero")
downtime = 0
input_data = OEEDataInsertRequest(prod_start_time=oee_start_time,
prod_end_time=oee_end_time, downtime=downtime,
hierarchy=k, cycle_time=os.environ.get("CYCLE_TIME", default=5),
hierarchy=k, cycle_time=cycle_time,
tz=request_data.tz,
project_id=request_data.project_id)
input_data.total_units, input_data.reject_units = self.batch_oee_handler.get_data_for_tags(
input_data=input_data)
oee_response: BatchOEEData = self.oee_engine.start_batch_oee_calc(
request_data=OEEDataSaveRequest(**input_data.dict()))
try:
input_data.total_units, input_data.reject_units = self.batch_oee_handler.get_data_for_tags(
input_data=input_data)
oee_response: BatchOEEData = self.oee_engine.start_batch_oee_calc(
request_data=OEEDataSaveRequest(**input_data.dict()))
except DataNotFound:
logger.exception(f"Data Not Found for selected Hierarchy --- {k}")
continue
except Exception as e:
logger.exception(
f"Exception Occurred while calculating oee {e.args}, skipping oee calculation for hierarchy - {k} ")
continue
data_dict = {
v[TagCategoryConstants.OEE_OUTPUT_TOTAL_UNITS_CATEGORY]: oee_response.total_units,
v[TagCategoryConstants.OEE_OUTPUT_REJECT_UNITS_CATEGORY]: oee_response.reject_units,
v[TagCategoryConstants.OEE_OUTPUT_CATEGORY]: oee_response.oee,
v[TagCategoryConstants.OEE_OUTPUT_PERFORMANCE_CATEGORY]: oee_response.performance,
v[TagCategoryConstants.OEE_OUTPUT_QUALITY_CATEGORY]: oee_response.quality,
v[TagCategoryConstants.OEE_OUTPUT_QUALITY_LOSS_CATEGORY]: oee_response.quality_loss,
v[TagCategoryConstants.OEE_OUTPUT_PERFORMANCE_LOSS_CATEGORY]: oee_response.performance_loss,
v[TagCategoryConstants.OEE_OUTPUT_AVAILABILITY_CATEGORY]: oee_response.availability,
v[TagCategoryConstants.OEE_OUTPUT_AVAILABILITY_LOSS_CATEGORY]: oee_response.availability_loss
v[TagCategoryConstants.OEE_OUTPUT_AVAILABILITY_LOSS_CATEGORY]: oee_response.availability_loss,
v[TagCategoryConstants.OEE_OUTPUT_DOWNTIME_CATEGORY]: oee_response.downtime
}
message_dict = {
"data": data_dict,
......@@ -62,7 +101,7 @@ class MachineOEECalculator:
"gw_id": "",
"pd_id": "",
"p_id": request_data.project_id,
"timestamp": int(time.time() * 1000),
"timestamp": curr_timestamp,
"msg_id": 1,
"retain_flag": False
}
......@@ -74,11 +113,39 @@ class MachineOEECalculator:
if __name__ == '__main__':
ap = argparse.ArgumentParser()
ap.add_argument(
"--skip",
"-sk",
required=False,
default=0,
help="Skip Records.",
)
ap.add_argument(
"--limit",
"-li",
required=False,
default=10,
help="Limit Records",
)
ap.add_argument(
"--project_id",
"-p",
required=False,
default=None,
help="project_id",
)
arguments = vars(ap.parse_args())
while True:
projects_list = os.environ.get("OEE_PROJECTS", default="project_170")
project_dict = customer_conn.find_project_by_aggregate(
customer_agg.get_tz_mapping_query_with_project_id(arguments['project_id']))
project_dict = project_dict[0] if project_dict else {}
monitor_start_time = os.environ.get("OEE_START_TIME", default="00:00")
for project in projects_list.split(","):
# for project in projects_list.split(","):
for k, v in project_dict.items():
MachineOEECalculator().calculate_machine_oee(
request_data=MachineOEERequest(project_id=project, monitor_time="00:00",
tz="Asia/Kolkata"))
time.sleep(10)
request_data=MachineOEERequest(monitor_time="00:00", skip=arguments['skip'], limit=arguments['limit'],
project_id=k, tz=v))
sleep_time_in_mins = int(os.environ.get("AUTOMATION_SLEEP_TIME_IN_MINS", default=1))
sleep_time_in_seconds = sleep_time_in_mins * 60
time.sleep(sleep_time_in_seconds)
......@@ -2,7 +2,7 @@ import os
from dataclasses import dataclass, field
from typing import Optional
from fastapi import FastAPI, Depends
from fastapi import FastAPI, Depends, APIRouter
from fastapi.middleware.cors import CORSMiddleware
from scripts.db.psql.create_default_tables import create_default_psql_dependencies
......@@ -23,13 +23,21 @@ class FastAPIConfig:
openapi_url: Optional[str] = os.environ.get("SW_OPENAPI_URL")
app = FastAPI(**FastAPIConfig().__dict__)
@app.get("/api/oee_services/health_check")
async def ping():
return {"status": 200}
SECURE_ACCESS = True if os.environ.get("SECURE_ACCESS") in {'true', 'True', True} else False
if SECURE_ACCESS:
app_config_dict = FastAPIConfig().__dict__
app_config_dict.update({"dependencies": [Depends(auth)]})
app = FastAPI(**app_config_dict)
app.include_router(route, dependencies=[Depends(auth)])
app.include_router(tag_service_router, dependencies=[Depends(auth)])
else:
app = FastAPI(**FastAPIConfig().__dict__)
app.include_router(route)
app.include_router(tag_service_router)
if os.environ.get("ENABLE_CORS") in (True, "true", "True") and os.environ.get(
"CORS_URLS"
......@@ -43,9 +51,6 @@ if os.environ.get("ENABLE_CORS") in (True, "true", "True") and os.environ.get(
)
print(os.environ.get("CORS_URLS").split(","))
app.include_router(route)
app.include_router(tag_service_router)
@app.on_event("startup")
async def startup_event():
......
......@@ -76,6 +76,9 @@ class DBConstants:
collection_tag_hierarchy = "tag_hierarchy"
collection_oee_layouts = "oee_layouts"
collection_lookup_table = "lookup_table"
collection_tags = "tags"
collection_customer_projects = "customer_projects"
collection_asset="assets"
class EndpointConstants:
......@@ -86,6 +89,9 @@ class TagCategoryConstants:
TOTAL_UNITS_CATEGORY = os.environ.get("TOTAL_UNITS_CATEGORY", default="OEE - Total Produced Units")
REJECT_UNITS_CATEGORY = os.environ.get("TOTAL_UNITS_CATEGORY", default="OEE - Reject Units")
OEE_OUTPUT_CATEGORY = os.environ.get("OEE_OUTPUT_CATEGORY", default="OEE Output Category")
OEE_OUTPUT_DOWNTIME_CATEGORY = os.environ.get("OEE_OUTPUT_DOWNTIME_CATEGORY",
default="OEE Output Downtime Category")
OEE_CYCLE_DESIGN_CATEGORY = os.environ.get("OEE_CYCLE_DESIGN_CATEGORY", default="OEE Cycle Design Category")
OEE_OUTPUT_PERFORMANCE_CATEGORY = os.environ.get("OEE_OUTPUT_PERFORMANCE_CATEGORY",
default="OEE Output Performance Category")
OEE_OUTPUT_QUALITY_CATEGORY = os.environ.get("OEE_OUTPUT_QUALITY_CATEGORY", default="OEE Output Quality Category")
......@@ -95,6 +101,10 @@ class TagCategoryConstants:
default="OEE Output Performance Loss Category")
OEE_OUTPUT_QUALITY_LOSS_CATEGORY = os.environ.get("OEE_OUTPUT_QUALITY_LOSS_CATEGORY",
default="OEE Output Quality Loss Category")
OEE_OUTPUT_TOTAL_UNITS_CATEGORY = os.environ.get("OEE_OUTPUT_QUALITY_LOSS_CATEGORY",
default="OEE Output Total Units Category")
OEE_OUTPUT_REJECT_UNITS_CATEGORY = os.environ.get("OEE_OUTPUT_QUALITY_LOSS_CATEGORY",
default="OEE Output Reject Units Category")
OEE_OUTPUT_AVAILABILITY_LOSS_CATEGORY = os.environ.get("OEE_OUTPUT_AVAILABILITY_LOSS_CATEGORY",
default="OEE Output Availability Loss Category")
......@@ -150,3 +160,34 @@ class TagHierarchyKeys:
KEY_ASSET_MODEL_ID = "asset_model_id"
KEY_ASSET_VERSION = "asset_version"
KEY_additional_fields = "additional_fields"
class CustomerProjectKeys:
KEY_CUSTOMER_PROJECT_ID = "customer_project_id"
KEY_CUSTOMER_PROJECT_NAME = "customer_project_name"
KEY_SITE_TEMPLT_ID = "site_templt_id"
KEY_PROCESS_TEMPLT_ID = "process_templt_id"
KEY_SOURCE_META = "source_meta"
KEY_ADD_PREFIX_TO_DATABASE = "add_prefix_to_database"
class TagsCollectionKeys:
KEY_TAG_NAME = "tag_name"
KEY_UNIT = "unit"
KEY_TAG_TYPE = "tag_type"
KEY_DESCRIPTION = "description"
KEY_ID = "id"
KEY_TAG_GROUP_ID = "tag_group_id"
KEY_DATA_TYPE = "data_type"
KEY_DEFAULT = "default"
KEY_SYSTEM_TAG_TYPE = "system_tag_type"
KEY_VALUE_LIST = "value_list"
KEY_PRODUCT_ENCRYPTED = "product_encrypted"
KEY_TAG_CATEGORY_ID = "tag_category_id"
class AssetCollectionKeys:
KEY_EQUIPMENT_ID = "equipment_id"
KEY_STATUS = "status"
KEY_TAG_TYPE = "tag_type"
KEY_HIERARCHY = "hierarchy"
......@@ -3,13 +3,12 @@ from copy import deepcopy
import pandas as pd
from scripts.config import DBConf
from scripts.core.engine.oee_calculator import OEETagFinder
from scripts.core.handlers.batch_oee_calc_handler import CalculateBatchOEEHandler
from scripts.core.engine.oee_calculator import OEETagFinder, OEECalculator
from scripts.core.handlers.common_handler import CommonHandler
from scripts.db.mongo.schema.tag_hierarchy import GetTagsLists, OutputTagsList
from scripts.errors import DataNotFound
from scripts.errors import DataNotFound, ILensError
from scripts.logging import logger
from scripts.schemas.batch_oee import ChartResponse, ChartDBResponse, ChartRequest, OEEDataInsertRequest
from scripts.schemas.batch_oee import ChartResponse, ChartDBResponse, ChartRequest
from scripts.utils.common_utils import CommonUtils
from scripts.utils.kairos_db_util import BaseQuery
from scripts.utils.kairos_db_util.df_formation_util import create_kairos_df
......@@ -21,23 +20,22 @@ class OEEAggregator:
self.common_util = CommonUtils()
self.base_query = BaseQuery()
self.oee_tag_finder = OEETagFinder()
self.oee_calc = OEECalculator()
self.common_handler = CommonHandler(project_id=project_id)
self.oee_handler = CalculateBatchOEEHandler(project_id=project_id)
def processor(self, data):
db_response = ChartDBResponse(**data)
duration = self.common_util.get_duration(meta=data, difference=True, tz=data["tz"])
cal_type = self.common_util.get_uom_type(uom_type=data["uom"])
db_response.total_time = self.common_util.get_diff_duration_in_int(input_time=duration, return_type=cal_type)
db_response.actual_cycle = round(
db_response.total_units / db_response.total_time, 2
def processor(self, input_data: ChartDBResponse):
duration = self.common_util.get_duration(meta=input_data.dict(), difference=True, tz=input_data.tz)
cal_type = self.common_util.get_uom_type(uom_type=input_data.uom)
input_data.total_time = self.common_util.get_diff_duration_in_int(input_time=duration, return_type=cal_type)
input_data.actual_cycle = round(
input_data.total_units / input_data.total_time, 2
)
db_response.ideal_cycle = round(db_response.cycle_time, 2)
db_response.good_units = round(
db_response.total_units - db_response.reject_units, 2
input_data.ideal_cycle = round(input_data.cycle_time, 2)
input_data.good_units = round(
input_data.total_units - input_data.reject_units, 2
)
chart_response = ChartResponse(**db_response.dict())
chart_response = ChartResponse(**input_data.dict())
return chart_response.dict()
def aggregator(self, request_data: ChartRequest):
......@@ -48,18 +46,22 @@ class OEEAggregator:
timestamp=True)) * 1000
hierarchy_tags = self.common_handler.tag_hierarchy_handler.get_tags_list_by_hierarchy(
GetTagsLists(**request_data.dict()))
total_units_tag_id = self.oee_tag_finder.get_total_units_tag_id(input_data=hierarchy_tags)
reject_units_tag_id = self.oee_tag_finder.get_reject_units_tag_id(input_data=hierarchy_tags)
output_tags_dict = self.common_handler.tag_hierarchy_handler.get_output_tags_for_oee(
input_data=OutputTagsList(**request_data.dict()))
if not output_tags_dict or not output_tags_dict.get(request_data.hierarchy):
return {}
raise ILensError("Data not found for selected Filters!!!")
cycle_time_tag_id = self.oee_tag_finder.get_cycle_time_tag_id(input_data=hierarchy_tags)
cycle_time_value = self.common_handler.get_cycle_time_value_from_hierarchy(
tag_id=cycle_time_tag_id.split("$")[-1])
if isinstance(cycle_time_value, bool) and not cycle_time_value:
logger.debug(f"OEE Cycle Design parameter details not found for selected hierarchy")
raise ILensError("Cycle Design Parameters not found")
updated_dict = self.common_handler.validate_hierarchy_tags(output_tags_dict[request_data.hierarchy])
new_columns_dict = self.common_handler.get_oee_keys_mapping_dict(output_tags_dict[request_data.hierarchy])
tags_list = list(updated_dict.values())
group_by_tags_list = deepcopy(tags_list)
group_by_tags_list.append(DBConf.KAIROS_DEFAULT_FULL_TAG)
tags_list.extend([total_units_tag_id, reject_units_tag_id])
if not tags_list:
return {}
kairos_util = KairosQuery(url=DBConf.KAIROS_URL)
......@@ -75,17 +77,24 @@ class OEEAggregator:
response_data=each_data,
tags_list=tags_list,
group_by_tags=group_by_tags_list,
tz=request_data.tz
tz=request_data.tz,
df_diff=False
)
if master_df.empty:
raise DataNotFound
master_df_columns = list(master_df.columns)
input_data = {"prod_start_time": start_time, "prod_end_time": end_time}
input_data.update(request_data.dict(exclude_none=True))
input_schema = OEEDataInsertRequest(**input_data)
input_schema.total_units, input_schema.reject_units = self.oee_handler.get_data_for_tags(
input_data=input_schema)
master_df.rename(columns=new_columns_dict, inplace=True)
final_records = master_df.to_dict('records')
oee_json = final_records[-1] if final_records else {}
oee_json.update({"productive_time": self.oee_calc.calculate_productive_time(cycle_time=cycle_time_value,
units_produced=oee_json[
"total_units"]),
"tz": request_data.tz, "cycle_time": cycle_time_value,
"prod_start_time": request_data.queryDate[0],
"prod_end_time": request_data.queryDate[-1]
})
res = self.processor(input_data=ChartDBResponse(**oee_json))
return res
except Exception as e:
logger.execption(f'Exception occurred while plotting the dashboard {e.args}')
logger.exception(f'Exception occurred while plotting the dashboard {e.args}')
raise
......@@ -56,6 +56,8 @@ class OEECalculator:
if rejected_units > total_units:
logger.error(ErrorCodes.ERR004)
raise ValueError(ErrorCodes.ERR004)
if not rejected_units and not total_units:
return 0
try:
return (total_units - rejected_units) / total_units
except ZeroDivisionError:
......@@ -104,6 +106,13 @@ class OEETagFinder:
raise ValueError(ErrorCodes.ERR007)
return input_data.get(category_name)
@staticmethod
def get_cycle_time_tag_id(input_data: dict, category_name=TagCategoryConstants.OEE_CYCLE_DESIGN_CATEGORY):
if not input_data.get(category_name):
logger.error(ErrorCodes.ERR008)
raise ValueError(ErrorCodes.ERR008)
return input_data.get(category_name)
class OEEEngine:
def __init__(self):
......
......@@ -13,7 +13,7 @@ from scripts.schemas.batch_oee import (
GetOEERequestOneBatch,
GetBatches,
BatchesGet,
ChartRequest,
ChartRequest, ChartDBResponse,
)
from scripts.schemas.meta import LabelValue
......@@ -102,7 +102,8 @@ class APIHandler:
prod_end_time=request_data.queryDate[1],
reference_id=request_data.reference_id
)
raw_data = self.oee_agg.processor(data)
db_response = ChartDBResponse(**data)
raw_data = self.oee_agg.processor(db_response)
return chart_maker.main_creator(raw_data, overall=False)
else:
agg_data = self.oee_agg.aggregator(request_data=request_data)
......
......@@ -6,10 +6,11 @@ import pytz
from sqlalchemy.orm import Session
from scripts.config import DBConf
from scripts.constants import ResponseCodes, CommonConstants, TagCategoryConstants
from scripts.constants import ResponseCodes, CommonConstants
from scripts.core.engine.oee_calculator import OEEEngine, OEETagFinder
from scripts.core.handlers.common_handler import CommonHandler
from scripts.core.handlers.tag_handler import TagHierarchyHandler
from scripts.db.mongo.schema.tag_hierarchy import GetTagsLists
from scripts.db.mongo.schema.tag_hierarchy import OutputTagsList, GetTagsLists
from scripts.db.psql.oee_discrete import DiscreteOEE
from scripts.db.redis_connections import oee_production_db
from scripts.errors import DataNotFound
......@@ -29,6 +30,7 @@ class CalculateBatchOEEHandler:
def __init__(self, project_id=None):
self.common_util = CommonUtils()
self.base_query = BaseQuery()
self.common_handler = CommonHandler(project_id=project_id)
self.oee_tag_finder = OEETagFinder()
self.tag_hierarchy_handler = TagHierarchyHandler(project_id=project_id)
......@@ -38,7 +40,18 @@ class CalculateBatchOEEHandler:
record_presence = table_obj.get_oee_data_by_reference_id(reference_id=request_data.reference_id,
hierarchy=request_data.hierarchy,
project_id=request_data.project_id)
request_data.total_units, request_data.reject_units = self.get_data_for_tags(input_data=request_data)
self.tag_hierarchy_handler.get_output_tags_for_oee(OutputTagsList(**request_data.dict()))
hierarchy_tags = self.tag_hierarchy_handler.get_tags_list_by_hierarchy(
GetTagsLists(**request_data.dict()))
cycle_time_tag_id = self.oee_tag_finder.get_cycle_time_tag_id(input_data=hierarchy_tags)
cycle_time_value = self.common_handler.get_cycle_time_value_from_hierarchy(
tag_id=cycle_time_tag_id.split("$")[-1])
if isinstance(cycle_time_value, bool) and not cycle_time_value:
logger.debug(f"OEE Cycle Design parameter details not found for selected hierarchy")
raise ValueError("Cycle Design Parameters not found")
request_data.cycle_time = cycle_time_value
request_data.total_units, request_data.reject_units = self.get_data_for_tags(input_data=request_data,
hierarchy_tags=hierarchy_tags)
redis_key = f"{request_data.project_id}${request_data.reference_id}"
if not record_presence:
if not request_data.prod_start_time:
......@@ -53,8 +66,19 @@ class CalculateBatchOEEHandler:
CommonConstants.USER_META_TIME_FORMAT).astimezone(
tz=pytz.timezone(request_data.tz)).isoformat()
request_data = OEEDataSaveRequest(**request_data.dict(exclude_none=True))
request_data.downtime = self.common_util.get_downtime_details_by_hierarchy(
hierarchy=request_data.hierarchy, project_id=request_data.project_id)
start_timestamp = int(
self.common_util.pendulum_conversion(request_data.prod_start_time, timestamp=True,
tz=request_data.tz)) * 1000
curr_timestamp = int(
self.common_util.pendulum_conversion(request_data.prod_end_time, timestamp=True,
tz=request_data.tz)) * 1000
downtime = self.common_util.get_downtime_details_by_hierarchy(
hierarchy=request_data.hierarchy, project_id=request_data.project_id,
filters={"start_time": [start_timestamp, curr_timestamp]})
if downtime is None:
logger.debug("Downtime for selected hierarchy got None, Updating to zero")
downtime = 0
request_data.downtime = downtime
oee_calculation = oee_engine.start_batch_oee_calc(request_data=request_data)
self.save_oee_data(oee_calculation, db)
if not request_data.prod_end_time:
......@@ -116,7 +140,7 @@ class CalculateBatchOEEHandler:
except Exception as e:
raise e
def get_data_for_tags(self, input_data: OEEDataInsertRequest):
def get_data_for_tags(self, input_data: OEEDataInsertRequest, hierarchy_tags=None):
total_units_value = 0
reject_units_value = 0
try:
......@@ -126,7 +150,9 @@ class CalculateBatchOEEHandler:
end_epoch = int(
datetime.strptime(input_data.prod_end_time, CommonConstants.USER_META_TIME_FORMAT).astimezone(
tz=pytz.timezone(input_data.tz)).timestamp()) * 1000
hierarchy_tags = self.tag_hierarchy_handler.get_tags_list_by_hierarchy(GetTagsLists(**input_data.dict()))
if not hierarchy_tags:
hierarchy_tags = self.tag_hierarchy_handler.get_tags_list_by_hierarchy(
GetTagsLists(**input_data.dict()))
total_units_tag_id = self.oee_tag_finder.get_total_units_tag_id(input_data=hierarchy_tags)
reject_units_tag_id = self.oee_tag_finder.get_reject_units_tag_id(input_data=hierarchy_tags)
kairos_util = KairosQuery(url=DBConf.KAIROS_URL)
......
......@@ -3,15 +3,20 @@ import os
from scripts.constants import TagCategoryConstants
from scripts.constants.db_connections import mongo_client
from scripts.core.handlers.tag_handler import TagHierarchyHandler
from scripts.db.mongo.ilens_configuration.collections.asset_overview import AssetOverview
from scripts.db.mongo.ilens_configuration.collections.lookup_table import LookupTable
from scripts.db.mongo.ilens_configuration.collections.tags import Tags
from scripts.db.mongo.schema.tag_hierarchy import OutputTagsList
from scripts.logging import logger
from scripts.schemas.batch_oee import MachineOEERequest
class CommonHandler:
def __init__(self, project_id=None):
self.lookup_table = LookupTable(project_id=project_id, mongo_client=mongo_client)
self.tag_hierarchy_handler = TagHierarchyHandler(project_id=project_id)
self.tag_conn = Tags(mongo_client=mongo_client, project_id=project_id)
self.asset_conn = AssetOverview(mongo_client=mongo_client, project_id=project_id)
def fetch_oee_hierarchy_from_look_up(self, project_id):
oee_lookup_name = os.environ.get("OEE_LOOKUP_NAME", default="oee_monitoring")
......@@ -25,16 +30,20 @@ class CommonHandler:
logger.exception(f"Failed to fetch hierarchy details from lookup {e.args}")
return {}
def get_valid_oee_monitoring_hierarchy(self, project_id):
def get_valid_oee_monitoring_hierarchy(self, request_data: MachineOEERequest):
valid_hierarchies_dict = {}
try:
oee_lookup_dict = self.fetch_oee_hierarchy_from_look_up(project_id=project_id)
hierarchy_list = list(oee_lookup_dict.values())
hierarchy_list = []
asset_data = self.asset_conn.find_by_query(query=dict(project_id=request_data.project_id, monitor_oee=True),
skip=request_data.skip,
limit=request_data.limit)
for each in asset_data:
hierarchy_list.append(each.get('hierarchy'))
if not hierarchy_list:
logger.debug(f'Hierarchy details not found for the project {project_id} for OEE Monitoring!!!!')
logger.debug(f'Hierarchy details for {request_data.project_id} not found for OEE Monitoring!!!!')
return {}
tags_dict_by_hierarchy = self.tag_hierarchy_handler.get_output_tags_for_oee(
OutputTagsList(project_id=project_id, hierarchy_list=hierarchy_list))
OutputTagsList(project_id=request_data.project_id, hierarchy_list=hierarchy_list))
for each_hierarchy in tags_dict_by_hierarchy:
if not tags_dict_by_hierarchy.get(each_hierarchy):
logger.debug(f'Tag details not found for the hierarchy {each_hierarchy} for OEE Monitoring!!!!')
......@@ -57,7 +66,8 @@ class CommonHandler:
if _each_tag in {TagCategoryConstants.TOTAL_UNITS_CATEGORY, TagCategoryConstants.REJECT_UNITS_CATEGORY}:
continue
if not request_dict.get(_each_tag):
logger.debug(f"OEE Output Tag not configured for the hierarchy{list(request_dict.keys())[0]}")
logger.debug(
f"OEE Output Tag Category <{_each_tag}> not configured for the hierarchy")
return {}
hierarchy_dict.update({_each_tag: request_dict[_each_tag]})
return hierarchy_dict
......@@ -69,9 +79,9 @@ class CommonHandler:
def get_oee_keys_mapping_dict(input_dict: dict):
final_dict = {}
for k, v in input_dict.items():
if k == TagCategoryConstants.TOTAL_UNITS_CATEGORY:
if k == TagCategoryConstants.OEE_OUTPUT_TOTAL_UNITS_CATEGORY:
final_dict.update({v: "total_units"})
elif k == TagCategoryConstants.REJECT_UNITS_CATEGORY:
elif k == TagCategoryConstants.OEE_OUTPUT_REJECT_UNITS_CATEGORY:
final_dict.update({v: "reject_units"})
elif k == TagCategoryConstants.OEE_OUTPUT_CATEGORY:
final_dict.update({v: "oee"})
......@@ -85,4 +95,21 @@ class CommonHandler:
final_dict.update({v: "quality_loss"})
elif k == TagCategoryConstants.OEE_OUTPUT_AVAILABILITY_LOSS_CATEGORY:
final_dict.update({v: "availability_loss"})
elif k == TagCategoryConstants.OEE_OUTPUT_PERFORMANCE_LOSS_CATEGORY:
final_dict.update({v: "performance_loss"})
elif k == TagCategoryConstants.OEE_OUTPUT_DOWNTIME_CATEGORY:
final_dict.update({v: "downtime"})
return final_dict
def get_cycle_time_value_from_hierarchy(self, tag_id: str):
try:
tag_data = self.tag_conn.find_tag_by_id(tag_id=tag_id)
if tag_data.data_type.lower() != "integer":
return False
parameter_value = tag_data.data_quality_info.get("basic", {}).get("value")
if isinstance(parameter_value, int) and not parameter_value:
return False
return parameter_value
except Exception as e:
logger.exception(f'Exception occurred while fetching cycle value from tag {e.args}')
return False
import traceback
from scripts.constants.db_connections import mongo_client
from scripts.constants.db_connections import mongo_connection
from scripts.db.mongo.ilens_configuration.collections import collection_constants
from scripts.logging import logger
from scripts.schemas.oee_config_schema import Oee_Tag_Mapping_List, Get_Oee_Tag, Update_Oee_Tags, Get_Project_Id
......
......@@ -25,7 +25,7 @@ class TagHierarchyHandler:
def get_output_tags_for_oee(self, input_data: OutputTagsList):
try:
if input_data.hierarchy_list:
hierarchy_str = re.escape("|".join([f'{_each}$tag' for _each in input_data.hierarchy_list]))
hierarchy_str = "|".join([re.escape(f'{_each}$tag') for _each in input_data.hierarchy_list])
elif input_data.hierarchy_level:
hierarchy_str = input_data.hierarchy_level
elif input_data.hierarchy:
......
class ProjectAggregate:
@staticmethod
def get_tz_mapping_query_with_project_id(project_id=None):
query_json = [
{'$group': {
'_id': None,
'data': {
'$push': {
'k': {'$ifNull': ['$customer_project_id', '']},
'v': {'$ifNull': ['$timezone', '']},
}
},
}}, {'$replaceRoot': {'newRoot': {'$arrayToObject': '$data'}}}]
if project_id:
query_json.insert(0, {'$match': {'customer_project_id': project_id}})
return query_json
from typing import Optional
from scripts.constants import DBConstants, AssetCollectionKeys
from scripts.db.mongo.schema import MongoBaseSchema
from scripts.utils.mongo_util import MongoCollectionBaseClass
class AssetOverviewSchema(MongoBaseSchema):
"""
This is the Schema for the Mongo DB Collection.
All datastore and general responses will be following the schema.
"""
equipment_id: Optional[str]
status: Optional[str]
hierarchy: Optional[str]
class AssetOverview(MongoCollectionBaseClass):
def __init__(self, mongo_client, project_id=None):
super().__init__(mongo_client, database=DBConstants.db_metadata,
collection=DBConstants.collection_asset)
self.project_id = project_id
@property
def key_equipment_id(self):
return AssetCollectionKeys.KEY_EQUIPMENT_ID
@property
def key_status(self):
return AssetCollectionKeys.KEY_STATUS
@property
def key_hierarchy(self):
return AssetCollectionKeys.KEY_HIERARCHY
def update_asset_detail(self, hierarchy, data):
"""
The following function will update the record based on hierarchy
"""
query = {self.key_hierarchy: hierarchy}
return self.update_one(query=query, data=data, upsert=True)
def insert_one_asset_status(self, data):
"""
The following function will insert one tag in the
tags collections
:param data:
:return:
"""
return self.insert_one(data)
def find_one_asset_data(self, hierarchy):
"""
The following function will fetch one asset overview data
"""
query = {self.key_hierarchy: hierarchy}
return self.find_one(query=query)
def delete_many_tags(self, **query):
"""
The following function will delete many tag in
tags collection based on the given query
:param query:
:return:
"""
return self.delete_many(query=query)
def delete_one_tag(self, **query):
"""
The following function will delete one tag in
tags collection based on the given query
:param query:
:return:
"""
if query:
return self.delete_one(query=query)
else:
return False
def distinct_tag(self, query_key):
"""
Get a list of distinct values for `key` among all documents
in the result set of this query.
:param query_key:
:return:
"""
return self.distinct(query_key=query_key)
def find_by_query(self, query, skip=None, limit=None, filter_dict=None):
return list(self.find(query=query, filter_dict=filter_dict, skip=skip, limit=limit))
def get_asset_data_by_aggregate(self, query_json: list):
return list(self.aggregate(query_json))
from typing import Optional, Dict
from scripts.constants import DBConstants, CustomerProjectKeys
from scripts.db.mongo.schema import MongoBaseSchema
from scripts.utils.mongo_util import MongoCollectionBaseClass
class CustomerProjectsSchema(MongoBaseSchema):
"""
This is the Schema for the Mongo DB Collection.
All datastore and general responses will be following the schema.
"""
customer_project_name: Optional[str]
description: Optional[str]
site_templt_id: Optional[str]
logo_name: Optional[str]
logo_url: Optional[str]
process_templt_id: Optional[str]
update_details: Optional[Dict]
user_id: Optional[str]
customer_project_id: Optional[str]
product_encrypted: Optional[bool]
timezone: Optional[str]
country: Optional[str]
client_name: Optional[str]
add_prefix_to_database: Optional[bool]
class CustomerProjects(MongoCollectionBaseClass):
def __init__(self, mongo_client, project_id=None):
super().__init__(mongo_client, database=DBConstants.db_metadata,
collection=DBConstants.collection_customer_projects)
self.project_id = project_id
@property
def key_customer_project_id(self):
return CustomerProjectKeys.KEY_CUSTOMER_PROJECT_ID
@property
def key_customer_project_name(self):
return CustomerProjectKeys.KEY_CUSTOMER_PROJECT_NAME
def find_project(self, project_id=None, project_name=None, filter_dict=None):
"""
The following function will give one record for a given set of
search parameters as keyword arguments
:param filter_dict:
:param project_id:
:param project_name
:return:
"""
query = dict()
if project_id:
query.update({self.key_customer_project_id: project_id})
if project_name:
query.update({self.key_customer_project_name: project_name})
record = self.find_one(query=query, filter_dict=filter_dict)
if not record:
return dict()
return CustomerProjectsSchema(**record).dict()
def find_project_by_query(self, query, filter_dict=None):
record = self.find(query=query, filter_dict=filter_dict)
if record:
return record
return list()
def insert_one_project(self, data):
"""
The following function will insert one project in the
customer_projects collections
:param self:
:param data:
:return:
"""
return self.insert_one(data)
def update_one_project(self, project_id, data):
query = {self.key_customer_project_id: project_id}
return self.update_one(query=query, data=data)
def delete_one_project(self, project_id):
if project_id:
query = {self.key_customer_project_id: project_id}
return self.delete_one(query)
else:
return False
def find_project_by_aggregate(self, query):
return list(self.aggregate(pipelines=query))
from typing import List, Optional, Dict, Any
from scripts.constants import TagsCollectionKeys, DBConstants
from scripts.db.mongo.schema import MongoBaseSchema
from scripts.utils.mongo_util import MongoCollectionBaseClass
class TagsSchema(MongoBaseSchema):
"""
This is the Schema for the Mongo DB Collection.
All datastore and general responses will be following the schema.
"""
tag_name: Optional[str]
unit: Optional[str]
tag_type: Optional[str]
description: Optional[str]
id: Optional[str]
tag_group_id: Optional[str]
data_type: Optional[str]
default: Optional[bool]
system_tag_type: Optional[str] = ""
value_list: Optional[List]
product_encrypted: Optional[bool]
tag_category_id: Optional[str]
additional_fields: Optional[Dict] = dict()
attributes: Optional[List] = list()
data_quality_info: Optional[Dict]
tag_label: Optional[Any] = ""
class Tags(MongoCollectionBaseClass):
def __init__(self, mongo_client, project_id=None):
super().__init__(mongo_client, database=DBConstants.db_metadata,
collection=DBConstants.collection_tags)
self.project_id = project_id
@property
def key_tag_category_id(self):
return TagsCollectionKeys.KEY_TAG_CATEGORY_ID
@property
def key_tag_name(self):
return TagsCollectionKeys.KEY_TAG_NAME
@property
def key_unit(self):
return TagsCollectionKeys.KEY_UNIT
@property
def key_tag_type(self):
return TagsCollectionKeys.KEY_TAG_TYPE
@property
def key_description(self):
return TagsCollectionKeys.KEY_DESCRIPTION
@property
def key_id(self):
return TagsCollectionKeys.KEY_ID
@property
def key_tag_group_id(self):
return TagsCollectionKeys.KEY_TAG_GROUP_ID
@property
def key_default(self):
return TagsCollectionKeys.KEY_DEFAULT
@property
def key_data_type(self):
return TagsCollectionKeys.KEY_DATA_TYPE
@property
def key_system_tag_type(self):
return TagsCollectionKeys.KEY_SYSTEM_TAG_TYPE
@property
def key_value_list(self):
return TagsCollectionKeys.KEY_VALUE_LIST
@property
def key_product_encrypted(self):
return TagsCollectionKeys.KEY_PRODUCT_ENCRYPTED
def find_all_tags(self, filter_dict=None,
sort=None, skip=0, limit=None, **query):
"""
The following function will give all tags for the given set of
search parameters as keyword arguments
:param filter_dict:
:param sort:
:param skip:
:param limit:
:param query:
:return:
"""
all_tags = self.find(filter_dict=filter_dict, sort=sort, skip=skip, limit=limit, query=query)
if not all_tags:
return list()
return all_tags
def find_tags(self, query):
all_tags = self.find(query=query)
if all_tags:
return list(all_tags)
return list()
def find_tags_by_query(self, query, filter_dict=None):
all_tags = self.find(query=query, filter_dict=filter_dict)
if all_tags:
return list(all_tags)
return list()
def find_one_tag(self, filter_dict=None, **query) -> TagsSchema:
"""
The following function will give one tag for a given set of
search parameters as keyword arguments
:param filter_dict:
:param query:
:return:
"""
tag = self.find_one(filter_dict=filter_dict, query=query)
if tag:
# tag["tag_category_id"] = [tag.get("tag_category_id","")] \
# if isinstance(tag.get("tag_category_id",""), str) \
# else tag.get("tag_category_id")
try:
return TagsSchema(**tag)
except Exception as e:
raise e
else:
return tag
def insert_one_tag(self, data):
"""
The following function will insert one tag in the
tags collections
:param data:
:return:
"""
return self.insert_one(data)
def insert_many_tags(self, data):
"""
The following function will insert many tags in the
tags collection
:param data:
:return:
"""
return self.insert_many(data)
def update_one_tag(self, data, upsert=False, **query):
"""
The following function will update one tag in
tags collection based on the given query
:param data:
:param upsert:
:param query:
:return:
"""
return self.update_one(data=data, upsert=upsert, query=query)
def update_many_tags(self, data, query, upsert=False):
"""
The following function will update many tags in
tags collection based on the given query
:param data:
:param upsert:
:param query:
:return:
"""
return self.update_many(data=data, upsert=upsert, query=query)
def delete_many_tags(self, **query):
"""
The following function will delete many tag in
tags collection based on the given query
:param query:
:return:
"""
return self.delete_many(query=query)
def delete_one_tag(self, **query):
"""
The following function will delete one tag in
tags collection based on the given query
:param query:
:return:
"""
if query:
return self.delete_one(query=query)
else:
return False
def distinct_tag(self, query_key):
"""
Get a list of distinct values for `key` among all documents
in the result set of this query.
:param query_key:
:return:
"""
return self.distinct(query_key=query_key)
def find_tags_with_list(self, tag_list):
query = {self.key_tag_name: {"$in": tag_list}}
filter_dict = dict(tag_name=1, _id=0)
tags = self.find(query=query, filter_dict=filter_dict)
if not tags:
return list()
return list(tags)
def find_tags_by_ids(self, tag_category_id, tag_list):
query = {"$or": [{self.key_tag_category_id: tag_category_id}, {self.key_id: {"$in": tag_list}}]}
tags = self.find(query=query)
if not tags:
return list()
return list(tags)
def find_tag_by_id(self, tag_id):
query = {self.key_id: tag_id}
res = self.find_one(query=query)
if not res:
return TagsSchema(**{})
return TagsSchema(**res)
def find_tags_by_aggregate(self, query):
tags = self.aggregate(query)
if not tags:
return list()
return list(tags)
def update_many_tags_by_group(self, selected_tags, tag_group_id):
data = {self.key_tag_group_id: tag_group_id}
query = {self.key_id: {"$in": selected_tags}}
self.update_many(query=query, data=data)
def find_by_aggregate(self, query):
tags = self.aggregate(query)
if not tags:
return list()
return list(tags)
def get_tag_name(self, tag_id):
query = [
{
'$match': {
'id': tag_id
}
}, {
'$project': {
'_id': 0,
'tag_name': '$tag_name',
'tag_id': '$tag_id'
}
}
]
tags = self.aggregate(query)
tags = [x for x in tags]
if not tags:
return ""
return tags[0]["tag_name"]
......@@ -18,6 +18,7 @@ class ErrorCodes:
ERR005 = "ERR005 - Batch Start time not supplied"
ERR006 = "ERR006 - Total Units Tag not found!!"
ERR007 = "ERR007 - Reject Units Tag not found!!"
ERR008 = "ERR007 - Cycle Time Tag not found!!"
class UnknownError(Exception):
......
......@@ -5,11 +5,10 @@ from logging.handlers import RotatingFileHandler, SocketHandler
import yaml
# this method is to read the configuration from backup.conf
from scripts.config import Logging
# this method is to read the configuration from backup.conf
def read_configuration(file_name):
"""
:param file_name:
......
......@@ -3,6 +3,7 @@ from typing import Optional, Union, List
from pydantic import BaseModel, validator
from scripts.constants import CommonConstants
from scripts.utils.common_utils import CommonUtils
common_utils = CommonUtils()
......@@ -86,6 +87,7 @@ class ChartDBResponse(BaseModel):
productive_time: int
downtime: int
tz: Optional[str] = "Asia/Kolkata"
uom: Optional[str] = "mins"
@validator("*")
def round_float(cls, v):
......@@ -132,7 +134,7 @@ class ProductInfo(BaseModel):
class OEEDataInsertRequest(BaseModel):
prod_start_time: str
prod_end_time: str
prod_end_time: Optional[str] = datetime.now().strftime(CommonConstants.USER_META_TIME_FORMAT)
prod_status: Optional[str] = "running"
downtime: Optional[Union[float, int]]
hierarchy: str
......@@ -231,6 +233,8 @@ class GetOeeServices(BaseModel):
class MachineOEERequest(BaseModel):
project_id: str
skip: Optional[int]=0
limit: Optional[int]=10
project_id: Optional[str]
monitor_time: Optional[str] = "00:00"
tz: Optional[str] = "Asia/Kolkata"
......@@ -7,7 +7,7 @@ from scripts.constants import Endpoints, ResponseCodes
from scripts.core.handlers.api_handler import APIHandler
from scripts.core.handlers.layout_handler import LayoutHandler
from scripts.db.psql.databases import get_db
from scripts.errors import ILensError
from scripts.errors import ILensError, DataNotFound
from scripts.logging import logger
from scripts.schemas.batch_oee import BatchesGet, ChartRequest
from scripts.schemas.layout import GetLayoutRequest, SaveLayoutRequest
......@@ -47,8 +47,10 @@ async def get_chart_data(request_data: ChartRequest, db: Session = Depends(get_d
status=ResponseCodes.SUCCESS,
message="Chart data fetched successfully",
)
except DataNotFound:
return DefaultResponse(message="Data not Found for selected filters", status="info")
except ILensError as e:
return DefaultFailureResponse(error=e.args)
return DefaultResponse(message=e.args[0], status="info")
except Exception as e:
tb = traceback.format_exc()
logger.exception(e)
......@@ -86,29 +88,3 @@ async def save_layout(request_data: SaveLayoutRequest, meta: MetaInfoSchema = De
logger.exception(e)
logger.exception(tb)
return DefaultFailureResponse(error=e.args)
# -------------Code Demo Backup----------------#
#
# @ui_service_router.post(Endpoints.get_batch_oee_all)
# async def get_all_batch_oee(get_oee_request: GetOEERequest,
# db: Session = Depends(get_db)):
# try:
# return await api_handler.get_oee_all(get_oee_request=get_oee_request, db=db)
# except Exception as e:
# tb = traceback.format_exc()
# logger.exception(e)
# logger.exception(tb)
# return DefaultFailureResponse(error=e.args)
#
#
# @ui_service_router.post(Endpoints.get_batch_oee_batch_id)
# async def get_all_batch_oee(get_oee_request: GetOEERequestOneBatch,
# db: Session = Depends(get_db)):
# try:
# return await api_handler.get_oee_batch(get_oee_request=get_oee_request, db=db)
# except Exception as e:
# tb = traceback.format_exc()
# logger.exception(e)
# logger.exception(tb)
# return DefaultFailureResponse(error=e.args)
......@@ -44,14 +44,17 @@ class CommonUtils:
logger.exception(f"Exception in getting data: {e}")
raise
def get_downtime_details_by_hierarchy(self, hierarchy, project_id, user_id=None):
def get_downtime_details_by_hierarchy(self, hierarchy, project_id, user_id=None, uom_type="minutes",
filters: dict = None):
connection_obj = ILensRequest(url=PathToServices.downtime_proxy,
project_id=project_id)
try:
cookies = {'login-token': self.create_token(user_id=user_id), "user_id": user_id}
input_data = {"project_id": project_id, "hierarchy": hierarchy, "display_type": uom_type}
if filters:
input_data.update({"filters": filters})
downtime_response = connection_obj.post(path=EndpointConstants.hierarchy_downtime,
json={"project_id": project_id,
"hierarchy": hierarchy})
json=input_data, cookies=cookies)
response = downtime_response.json()
return response.get("data", 0)
except AuthenticationError:
......
......@@ -34,7 +34,7 @@ def default_group_by(group_by_dict):
raise DataFrameFormationError(ErrorMessages.DF_ERROR2)
def form_df(metric, tags, values, df_index, tz, group_by_tags, grouping_order, group_by):
def form_df(metric, tags, values, df_index, tz, group_by_tags, grouping_order, group_by, df_diff):
"""Helper function for parsing Kairos data and forming dataframe for each results"""
df = pd.DataFrame(columns=["timestamp", "value"])
try:
......@@ -53,7 +53,8 @@ def form_df(metric, tags, values, df_index, tz, group_by_tags, grouping_order, g
df[df_index] = df[df_index].dt.tz_localize('UTC').dt.tz_convert(timezone)
df.set_index(df_index, inplace=True)
df.sort_index(inplace=True)
df[f'{selected_hierarchy}_diff'] = df[selected_hierarchy].diff()
if df_diff:
df[f'{selected_hierarchy}_diff'] = df[selected_hierarchy].diff()
except Exception as e:
logging.exception(e)
return df
......@@ -72,7 +73,7 @@ def get_statistics(df, metadata):
def create_kairos_df(response_data, tags_list: List, group_by_tags: List, master_df=None, df_index='timestamp',
tz='Asia/kolkata'):
tz='Asia/kolkata', df_diff=True):
"""
Definition for creating kairos DataFrame
"""
......@@ -92,7 +93,8 @@ def create_kairos_df(response_data, tags_list: List, group_by_tags: List, master
df_index=df_index,
tz=tz,
group_by_tags=group_by_tags,
grouping_order=tags_list
grouping_order=tags_list,
df_diff=df_diff
)
df = df[~df.index.duplicated()]
master_df = master_df.combine_first(df)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment