Commit 15ccf0a8 authored by harshavardhan.c's avatar harshavardhan.c

project structure setup.

parent a227b5ad
FROM python:3.9.10-slim
ADD . /code
WORKDIR /code
RUN pip install -r requirements.txt
ENTRYPOINT [ "python" ]
CMD [ "app.py" ]
# oee-services # siam-custom
Custom OEE project for Siam
Release Note:
Feature:
Patch:
# To specify a variable to be read from environment, use $<env_name>
[service]
host=0.0.0.0
port=6869
workers=1
module_name=$APP_NAME
enable_traceback = True
secure_cookie=$SECURE_COOKIE
allow_cross_origin = true
[LOGGING]
level=DEBUG
[postgres]
uri=$OEE_POSTGRES_URI
downtime_uri=$DOWNTIME_URI
[mongo]
uri=$MONGO_URI
[PATH_TO_SERVICES]
event_explorer=$EVENT_EXPLORER
metadata_proxy=$METADATA_PROXY
[DIRECTORY]
base_path = $BASE_PATH
mount_dir = $MOUNT_DIR
keys_path = data/keys
[REDIS]
uri=$REDIS_URI
login_db = 9
project_tags_db = 18
downtime_db=30
\ No newline at end of file
from scripts.utils.kafka_util import DataPush
if __name__ == '__main__':
from dotenv import load_dotenv
load_dotenv()
import time
from datetime import datetime
import pytz
from production_monitoring import ProductionMonitor
prod_mon = ProductionMonitor()
data_push = DataPush()
tag_mapping = {
"oee": "site_100$dept_100$line_100$equipment_101$tag_215",
"availability": "site_100$dept_100$line_100$equipment_101$tag_216",
"performance": "site_100$dept_100$line_100$equipment_101$tag_217",
"quality": "site_100$dept_100$line_100$equipment_101$tag_218",
"running_lot": "site_100$dept_100$line_100$equipment_101$tag_219",
"running_item": "site_100$dept_100$line_100$equipment_101$tag_220",
"target": "site_100$dept_100$line_100$equipment_101$tag_222",
"downtime": "site_100$dept_100$line_100$equipment_101$tag_223",
"setup_time": "site_100$dept_100$line_100$equipment_101$tag_225",
"running_time": "site_100$dept_100$line_100$equipment_101$tag_226"
}
def oee_update():
data = prod_mon.oee_mongo.find_record_by_not_status("completed")
if not data:
print("No jobs are running, waiting for job to start...")
return
print(f"Calculating OEE for {data.get('job')}")
data_dict = {}
if data.get("run_start_time"):
run_start_time = datetime.fromtimestamp(
data.get("run_start_time") // 1000,
tz=pytz.timezone("Asia/Bangkok")
)
downtime = prod_mon.automation_engine.get_downtime(
run_start_time=run_start_time,
production_end_time=datetime.now(tz=pytz.timezone("Asia/Bangkok"))
)
else:
downtime = 0
oee, availability, performance, quality = prod_mon.calculate_oee_params(data, downtime)
data_dict.update(
{
tag_mapping.get("running_lot"): data.get("job", ""), # job no
tag_mapping.get("running_item"): data.get("item", ""), # item no
tag_mapping.get("target"): data.get("qty_released", 0), # quality released
tag_mapping.get("oee"): oee,
tag_mapping.get("availability"): availability,
tag_mapping.get("performance"): performance,
tag_mapping.get("quality"): quality,
tag_mapping.get("downtime"): downtime,
tag_mapping.get("setup_time"): data.get("setup_time", 0),
tag_mapping.get("running_time"): data.get("running_time", 0),
}
)
message_dict = {
"data": data_dict,
"site_id": prod_mon.settings["automation"]["site_id"],
"gw_id": "",
"pd_id": "",
"p_id": prod_mon.settings["automation"]["project_id"],
"timestamp": int(time.time() * 1000),
"msg_id": 1,
"retain_flag": False
}
data_push.publish_message(message_dict)
good_count, units_produced = prod_mon.get_current_produced_count()
running_time = (datetime.now() - datetime.fromtimestamp(data.get("start_time") / 1000)).total_seconds() / 60
mongo_data = {
"good_count": good_count,
"units_produced": units_produced,
"running_time": running_time
}
data.update(mongo_data)
prod_mon.oee_mongo.update_oee(data, data.get("job", ""), data.get("uf_process", ""), False)
if __name__ == '__main__':
while True:
oee_update()
time.sleep(3)
import json
import os
from fastapi import FastAPI, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from scripts.services.calculate_oee import calc_oee_router
from scripts.services.form_services import form_router
from scripts.services.meta_services import meta_service_router
from scripts.services.ui_services import ui_service_router
app = FastAPI(
title="iLens OEE V3",
version="5.5.0",
description="OEE App",
docs_url=os.environ.get("SW_DOCS_URL"),
openapi_url=os.environ.get("SW_OPENAPI_URL")
)
if os.environ.get("CORS_URLS").split(","):
app.add_middleware(
CORSMiddleware,
allow_origins=os.environ.get("CORS_URLS").split(","),
allow_credentials=True,
allow_methods=["GET", "POST", "DELETE", "PUT"],
allow_headers=["*"]
)
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
cookies = request.cookies
response: Response = await call_next(request)
response.headers.append("Cookie", json.dumps(cookies))
return response
app.include_router(calc_oee_router)
app.include_router(ui_service_router)
app.include_router(meta_service_router)
app.include_router(form_router)
import json
import time
from datetime import datetime
import pytz
from scripts.config import read_settings, DBConf
from scripts.core.engine.automation_engine import AutomationEngine
from scripts.db.mongo.dbs.siam_oee import SiamOEE
from scripts.db.redis_conn import live_tags_db_object
class ProductionMonitor:
def __init__(self):
self.automation_engine = AutomationEngine()
self.settings = read_settings()
self.oee_mongo = SiamOEE()
self.machine_mode_tags = self.settings["automation"]["setup_time_logic"]["tags"]
self.manual_mode_tag = self.settings["automation"]["setup_time_logic"]["manual_mode_tag"]
self.auto_mode_tag = self.settings["automation"]["setup_time_logic"]["auto_mode_tag"]
self.production_count_tags = self.settings["automation"]["production_end_logic"]["tags"]
self.good_count_tag = self.settings["automation"]["production_end_logic"]["good_count_tag"]
self.inspection_count_tag = self.settings["automation"]["production_end_logic"]["inspection_count_tag"]
self.ng_count_tag = self.settings["automation"]["production_end_logic"]["ng_count_tag"]
@staticmethod
def get_redis_data(tag_list):
tag_data = {}
redis_response = live_tags_db_object.mget(tag_list)
for index, each_tag in enumerate(tag_list):
_val = redis_response.__getitem__(index)
if not _val:
continue
_redis_resp = json.loads(_val)
hierarchy = each_tag.removeprefix(DBConf.REDIS_PREFIX)
if hierarchy not in tag_data:
tag_data[hierarchy] = dict()
tag_data[hierarchy] = _redis_resp.get("value")
return tag_data
def calculate_oee_params(self, data, downtime):
start_time = datetime.fromtimestamp(
data.get("start_time") // 1000, tz=pytz.timezone("Asia/Bangkok"))
end_time = datetime.now(tz=pytz.timezone("Asia/Bangkok"))
available_time = (end_time - start_time).total_seconds() / 60
if downtime > available_time:
downtime = 0
operating_time = available_time - downtime
availability = operating_time / available_time
good_count, units_produced = self.get_current_produced_count()
if not good_count:
good_count = 0
if not units_produced:
units_produced = 0
productive_time = units_produced * (1 / data.get("cycle_time"))
performance = productive_time / operating_time
if units_produced:
quality = good_count / units_produced
else:
quality = 0
oee = availability * performance * quality
return oee * 100, availability * 100, performance * 100, quality * 100
def calculate_setup_time(self, production_start_time: datetime, tz):
tag_list = [f"{DBConf.REDIS_PREFIX}{i}" for i in self.machine_mode_tags]
while True:
try:
if (datetime.now(tz=pytz.timezone(tz)) - production_start_time).total_seconds() > 600:
return round((datetime.now(tz=pytz.timezone(tz)) - production_start_time).total_seconds() / 60)
tag_data = self.get_redis_data(tag_list)
if tag_data.get(self.manual_mode_tag) == 0 and tag_data.get(self.auto_mode_tag) == 1:
print("production started!!!")
return round((datetime.now(tz=pytz.timezone(tz)) - production_start_time).total_seconds() / 60)
print(tag_data)
time.sleep(1)
except Exception as e:
print(e)
def check_mongo_for_finish(self, job_id, machine_type) -> bool:
data = self.oee_mongo.find_record(job_id, machine_type)
if data.get("prod_status", "") == "completed":
return True
else:
return False
def check_production_end(self, data):
total_count = data.get("actual_received_qty", data.get("qty_released"))
if total_count <= 0:
total_count = data.get("qty_released")
job_id = data.get("job", "")
machine_type = data.get("uf_process", "")
tag_list = [f"{DBConf.REDIS_PREFIX}{i}" for i in self.production_count_tags]
while True:
if self.check_mongo_for_finish(job_id, machine_type):
break
try:
tag_data = self.get_redis_data(tag_list)
if tag_data.get(self.good_count_tag, 0) >= total_count \
or tag_data.get(self.inspection_count_tag, 0) >= total_count \
or tag_data.get(self.inspection_count_tag, -1) == 0 \
or tag_data.get(self.good_count_tag, -1) == 0:
print("production ended")
break
time.sleep(1)
except Exception as e:
print(e)
return True
def check_production_run(self):
tag_list = [f"{DBConf.REDIS_PREFIX}{i}" for i in self.production_count_tags]
while True:
try:
tag_data = self.get_redis_data(tag_list)
if tag_data.get(self.inspection_count_tag) > 0:
return True
time.sleep(1)
except Exception as e:
print(e)
def get_current_produced_count(self):
tag_list = [f"{DBConf.REDIS_PREFIX}{i}" for i in self.production_count_tags]
try:
tag_data = self.get_redis_data(tag_list)
return (
tag_data.get(self.good_count_tag),
tag_data.get(self.ng_count_tag, 0) + tag_data.get(self.good_count_tag, 0)
)
except Exception as e:
print(e)
if __name__ == '__main__':
from dotenv import load_dotenv
load_dotenv()
import time
from production_monitoring import ProductionMonitor
production_mon = ProductionMonitor()
def check_produce_start():
data = production_mon.oee_mongo.find_record_by_status("running")
if not data:
print("No data found, waiting for batch to start running")
return
if data.get("prod_status") == "running":
print(f"{data.get('job')} is running ....")
if production_mon.check_production_run():
data["prod_status"] = "producing"
production_mon.oee_mongo.update_oee(
data,
data.get("job", ""),
data.get("uf_process", ""),
False
)
if __name__ == '__main__':
while True:
check_produce_start()
time.sleep(10)
if __name__ == '__main__':
from dotenv import load_dotenv
load_dotenv()
import asyncio
import logging
import time
from datetime import datetime
import pytz
from production_monitoring import ProductionMonitor
from scripts.config import read_settings
from scripts.core.engine.automation_engine import AutomationEngine
from scripts.core.handlers.form_handler import FormHandler
from scripts.db.databases import oee_session
from scripts.db.mongo.dbs.siam_oee import SiamOEE
from scripts.schemas.form import FormDetails, EndProductionRequest
from scripts.utils.kafka_util import DataPush
from scripts.utils.security.encryption import create_token
production_mon = ProductionMonitor()
automation_engine = AutomationEngine()
oee_mongo = SiamOEE()
form_handler = FormHandler()
data_push = DataPush()
settings = read_settings()
def calculate_oee_params(data, downtime):
start_time = datetime.fromtimestamp(
data.get("start_time") // 1000, tz=pytz.timezone("Asia/Bangkok"))
end_time = datetime.now(tz=pytz.timezone("Asia/Bangkok"))
available_time = (end_time - start_time).total_seconds() / 60
if downtime > available_time:
downtime = 0
operating_time = available_time - downtime
availability = operating_time / available_time
good_count, units_produced = production_mon.get_current_produced_count()
if not good_count:
good_count = 0
if not units_produced:
units_produced = 0
productive_time = units_produced * (1 / data.get("cycle_time"))
performance = productive_time / operating_time
if units_produced:
quality = good_count / units_produced
else:
quality = 0
oee = availability * performance * quality
return oee * 100, availability * 100, performance * 100, quality * 100
def update_oee():
tag_mapping = {
"oee": "site_100$dept_100$line_100$equipment_101$tag_215",
"availability": "site_100$dept_100$line_100$equipment_101$tag_216",
"performance": "site_100$dept_100$line_100$equipment_101$tag_217",
"quality": "site_100$dept_100$line_100$equipment_101$tag_218",
"running_lot": "site_100$dept_100$line_100$equipment_101$tag_219",
"running_item": "site_100$dept_100$line_100$equipment_101$tag_220",
"target": "site_100$dept_100$line_100$equipment_101$tag_222",
"downtime": "site_100$dept_100$line_100$equipment_101$tag_223"
}
data = oee_mongo.find_record_by_status("started")
if not data:
data = oee_mongo.find_record_by_status("producing")
if not data:
print("No data found, waiting for batch to start producing")
return
data_dict = {}
if data.get("run_start_time"):
run_start_time = datetime.fromtimestamp(data.get("run_start_time") // 1000, tz=pytz.timezone("Asia/Bangkok"))
downtime = automation_engine.get_downtime(
run_start_time=run_start_time,
production_end_time=datetime.now(tz=pytz.timezone("Asia/Bangkok"))
)
else:
downtime = 0
oee, availability, performance, quality = calculate_oee_params(data, downtime)
data_dict.update(
{
tag_mapping.get("running_lot"): data.get("job", ""), # job no
tag_mapping.get("running_item"): data.get("item", ""), # item no
tag_mapping.get("target"): data.get("qty_released", 0), # quality released
tag_mapping.get("oee"): oee,
tag_mapping.get("availability"): availability,
tag_mapping.get("performance"): performance,
tag_mapping.get("quality"): quality,
tag_mapping.get("downtime"): downtime,
}
)
message_dict = {
"data": data_dict,
"site_id": settings["automation"]["site_id"],
"gw_id": "",
"pd_id": "",
"p_id": settings["automation"]["project_id"],
"timestamp": int(time.time() * 1000),
"msg_id": 1,
"retain_flag": False
}
data_push.publish_message(message_dict)
def check_produce_start():
data = oee_mongo.find_record_by_status("running")
if not data:
print("No data found, waiting for batch to start running")
return
if data.get("prod_status") == "running":
print(f"{data.get('job')} is running ....")
if production_mon.check_production_run():
data["prod_status"] = "producing"
oee_mongo.update_oee(data, data.get("job", ""), data.get("uf_process", ""), False)
def check_production_end():
data = oee_mongo.find_record_by_status("producing")
if not data:
print("No data found, waiting for batch to start producing")
return
print(f"{data.get('job')} is producing ....")
if production_mon.check_production_end(data):
data["prod_status"] = "completed"
data["end_time"] = int(time.time() * 1000)
form_details = FormDetails(**data.get("form_details", {})).dict()
end_production_request = EndProductionRequest(**form_details, submitted_data=dict(data=data),
date=int(time.time() * 1000))
cookies = {"login-token": create_token()}
session = oee_session()
try:
asyncio.run(form_handler.end_production(end_production_request, session, cookies))
except Exception as e:
logging.exception(e)
return
oee_mongo.update_oee(data, data.get("job", ""), data.get("uf_process", ""), False)
del session
if __name__ == '__main__':
while True:
check_production_end()
time.sleep(10)
cryptography~=36.0.1
crypto~=1.4.1
fastapi~=0.74.1
httpx~=0.22.0
ilens-kafka-publisher==0.4.2
kafka-python~=2.0.2
pandas~=1.4.1
psycopg2-binary~=2.9.3
pydantic~=1.9.0
pyjwt~=2.3.0
pymongo~=4.0.1
pymssql~=2.2.4
pytest==3.2.4
python-dotenv~=0.19.2
pytz~=2021.3
pyyaml~=6.0
redis~=4.1.4
requests==2.26.0
sqlalchemy-utils~=0.38.2
sqlalchemy~=1.3.24
uvicorn~=0.17.5
\ No newline at end of file
"""
This file exposes configurations from config file and environments as Class Objects
"""
import shutil
if __name__ == '__main__':
from dotenv import load_dotenv
load_dotenv()
import os.path
import sys
from configparser import ConfigParser, BasicInterpolation
class EnvInterpolation(BasicInterpolation):
"""
Interpolation which expands environment variables in values.
"""
def before_get(self, parser, section, option, value, defaults):
value = super().before_get(parser, section, option, value, defaults)
if not os.path.expandvars(value).startswith('$'):
return os.path.expandvars(value)
else:
return
try:
config = ConfigParser(interpolation=EnvInterpolation())
config.read(f"conf/application.conf")
except Exception as e:
print(f"Error while loading the config: {e}")
print("Failed to Load Configuration. Exiting!!!")
sys.stdout.flush()
sys.exit()
class Service:
HOST = config.get("service", "host")
PORT = config.getint("service", "port")
WORKERS = config.getint("service", "workers")
class DBConf:
POSTGRES_URI = config.get('postgres', 'uri')
if not POSTGRES_URI:
print("Error, environment variable OEE_POSTGRES_URI not set")
sys.exit(1)
DOWNTIME_URI = config.get('postgres', 'downtime_uri')
if not DOWNTIME_URI:
print("Error, environment variable DOWNTIME_URI not set")
sys.exit(1)
MONGO_URI = config.get('mongo', 'uri')
if not MONGO_URI:
print("Error, environment variable MONGO_URI not set")
sys.exit(1)
class PathToStorage(object):
BASE_PATH = config.get("DIRECTORY", "base_path")
if not BASE_PATH:
print("Error, environment variable BASE_PATH not set")
sys.exit(1)
MOUNT_DIR = config.get("DIRECTORY", "mount_dir")
if not MOUNT_DIR:
print("Error, environment variable MOUNT_DIR not set")
sys.exit(1)
MODULE_PATH = os.path.join(BASE_PATH, MOUNT_DIR)
FORM_IO_UPLOADS = os.path.join(MODULE_PATH, "form_io_uploads")
UPLOAD_FILE_PATH = os.path.join(MODULE_PATH, "csv_files")
LOGS_MODULE_PATH = f"{BASE_PATH}/logs{MOUNT_DIR}/"
class RedisConfig(object):
uri = config.get("REDIS", "uri")
login_db = config["REDIS"]["login_db"]
project_tags_db = config.getint("REDIS", "project_tags_db")
downtime_db = config.getint("REDIS", "downtime_db")
class KeyPath(object):
keys_path = config['DIRECTORY']['keys_path']
if not os.path.isfile(os.path.join(keys_path, "public")) or not os.path.isfile(
os.path.join(keys_path, "private")):
if not os.path.exists(keys_path):
os.makedirs(keys_path)
shutil.copy(os.path.join("assets", "keys", "public"), os.path.join(keys_path, "public"))
shutil.copy(os.path.join("assets", "keys", "private"), os.path.join(keys_path, "private"))
public = os.path.join(keys_path, "public")
private = os.path.join(keys_path, "private")
class Logging:
level = config.get("LOGGING", "level", fallback="INFO")
level = level if level else "INFO"
tb_flag = config.getboolean("LOGGING", "traceback", fallback=True)
tb_flag = tb_flag if tb_flag is not None else True
class StoragePaths:
module_name = config.get('SERVICE', 'module_name')
if not module_name:
module_name = "downtime_oee"
base_path = os.path.join("data", module_name)
report_path = os.path.join(base_path, "reports")
class MQTTConf:
host = config["MQTT"]["host"]
port = int(config["MQTT"]["port"])
publish_base_topic = config["MQTT"]["publish_base_topic"]
from collections import namedtuple
class Endpoints:
calc_oee_base = "/calculator"
calculate_batch_oee = "/batch_oee/calculate"
update_batch_oee = "/batch_oee/update"
api_batches = "/batches"
get_batch_oee_all = "batch_oee/get/all"
get_batch_oee_batch_id = "batch_oee/get/one_batch"
api_get = "/get"
api_chart_data = "/chart_data"
get_layout = "/get_layout"
save_layout = "/save_layout"
# Meta services
api_hierarchy = "/hierarchy"
hierarchy_api = 'ilens_config/get_site_level_hierarchy'
class StatusCodes:
SUCCESS = [200, 201, 204]
class ResponseCodes:
SUCCESS = "success"
class UOM:
minutes = "mins"
seconds = "secs"
hours = "hrs"
millis = "msecs"
Divisor = namedtuple("Divisor", ["minutes", "seconds", "hours", "millis"])
time_divs = Divisor(60000, 1000, 360000, 1)
class Secrets:
LOCK_OUT_TIME_MINS = 30
leeway_in_mins = 10
unique_key = '45c37939-0f75'
token = '8674cd1d-2578-4a62-8ab7-d3ee5f9a'
issuer = "ilens"
alg = "RS256"
class CommonKeys:
KEY_TYPE = "type"
KEY_CONTENT_TYPE = "content_type"
class DBConstants:
# Databases
db_events = "ilens_events"
db_metadata = "ilens_configuration"
db_ebpr = "ilens_ebpr"
db_ilens_assistant = "ilens_assistant"
collection_constants = "constants"
from scripts.config import DBConf
from scripts.utils.mongo_util import MongoConnect
mongo_client = MongoConnect(uri=DBConf.MONGO_URI)()
class Layouts:
default_layout = {
"type": "default",
"data": [
{
"label": "OEE",
"value": "oee",
"description": "Displays OEE percentage",
"show": True,
"category": "basicOeeKpis",
"cols": 3,
"rows": 20,
"minItemRows": 13,
"minItemCols": 3,
"x": 0,
"y": 0
},
{
"label": "Availability",
"value": "availability",
"description": "Displays availability percentage",
"show": True,
"category": "basicOeeKpis",
"cols": 3,
"rows": 20,
"minItemRows": 13,
"minItemCols": 3,
"x": 3,
"y": 0
},
{
"label": "Performance",
"value": "performance",
"description": "Displays performance percentage",
"category": "basicOeeKpis",
"show": True,
"cols": 3,
"rows": 20,
"minItemRows": 13,
"minItemCols": 3,
"x": 6,
"y": 0
},
{
"label": "Quality",
"value": "quality",
"category": "basicOeeKpis",
"description": "Displays quantity percentage",
"show": True,
"cols": 3,
"rows": 20,
"minItemRows": 13,
"minItemCols": 3,
"x": 9,
"y": 0
},
{
"label": "Total Units",
"value": "totalUnits",
"description": "Displays number of total units.",
"category": "quantity",
"show": True,
"cols": 3,
"rows": 7,
"minItemRows": 7,
"minItemCols": 2,
"x": 0,
"y": 20
},
{
"label": "Loss Analysis Graph",
"value": "lossAnalysisGraph",
"description": "Displays a graph showing loss analysis",
"category": "visualization",
"show": True,
"cols": 6,
"rows": 30,
"minItemRows": 30,
"minItemCols": 4,
"x": 0,
"y": 27
},
{
"label": "Good Units",
"value": "goodUnits",
"category": "quantity",
"description": "Displays number of good units.",
"show": True,
"cols": 3,
"rows": 7,
"minItemRows": 7,
"minItemCols": 2,
"x": 3,
"y": 20
},
{
"label": "Ideal Cycle per Unit",
"value": "idealCyclePerUnit",
"description": "Displays Ideal Cycle per Unit",
"category": "cyclePerUnit",
"show": True,
"cols": 3,
"rows": 7,
"minItemRows": 7,
"minItemCols": 2,
"x": 6,
"y": 20
},
{
"label": "Actual Cycle per Unit",
"value": "actualCyclePerUnit",
"description": "Displays Actual Cycle per Unit",
"category": "cyclePerUnit",
"show": True,
"cols": 3,
"rows": 7,
"minItemRows": 7,
"minItemCols": 2,
"x": 9,
"y": 20
},
{
"label": "Downtime Reason Log",
"value": "downtimeReasonLog",
"description": "Displays a table showing downtime logs",
"show": True,
"category": "detailedViews",
"cols": 6,
"rows": 29,
"minItemRows": 20,
"minItemCols": 6,
"x": 6,
"y": 27
}
]
}
class DonutChart:
base = {
"tooltip": {"formatter": "{a} <br/>{c} {b}"},
"series": {
"name": "",
"type": "gauge",
"radius": "100%",
"min": 0,
"max": 100,
"startAngle": 180,
"endAngle": 0,
"axisLine": {"lineStyle": {"width": 35, "color": []}},
"axisTick": {"show": False},
"splitLine": {"show": False},
"pointer": {"show": False},
"title": {"top": 20, "offsetCenter": ["0%", "-10%"], "fontSize": 25},
"center": ["50%", "70%"],
"detail": {
"color": "black",
"formatter": "Some text",
"fontSize": 15,
"offsetCenter": ["0%", "30%"],
},
"axisLabel": {"show": False},
"data": [],
},
}
chart_keys = ["availability", "performance", "quality", "oee"]
waterfall_chart_keys = [
"availability_loss",
"performance_loss",
"quality_loss",
"oee",
]
color_mapping = {
"oee": {"colors": ["#00AAFF", "#B8E1FF"]},
"availability": {"colors": ["#40D62B", "#BAF4C9"]},
"performance": {"colors": ["#FFAA00", "#FFEDB8"]},
"quality": {"colors": ["#AD35F4", "#EFC0F4"]},
}
class BaseWaterChart:
@staticmethod
def waterfall_series_builder(series):
data = [
["Total Time", series[0]],
["Availability Loss", series[1]],
["Performance Loss", series[2]],
["Quality Loss", series[3]],
["OEE", series[4]],
]
return data
waterfall_base = {
"type": "waterfall_chart_template",
"data": {
"backgroundColor": "white",
"toolbox": {
"orient": "vertical",
"show": True,
"right": 0,
"feature": {
"saveAsImage": {
"title": "Save as image",
"name": "OEE Waterfall Chart",
"pixelRatio": 3
}
}
},
"xAxis": [
{
"type": "category",
"name": "Metrics",
"axisTick": {
"alignWithLabel": True
},
"axisLabel": {
"interval": 0
},
"data": [
"Total Time",
"Availability Loss",
"Performance Loss",
"Quality Loss",
"OEE"
]
}
],
"yAxis": [
{
"type": "value",
"name": "",
"offset": 0,
"splitLine": {
"show": False
},
"axisTick": {
"alignWithLabel": True
},
"nameLocation": "end",
"min": 0,
"max": 100
}
],
"series": [
{
"type": "bar",
"color": "white",
"stack": "OEE",
"data": []
},
{
"label": {
"backgroundColor": "transparent",
"color": "black",
"distance": 5,
"padding": 1,
"position": "top",
"show": True
},
"type": "bar",
"color": "#4a0a10",
"stack": "OEE",
"data": []
}
],
"animation": True
}
}
from copy import deepcopy
from scripts.constants.db_connections import mongo_client
from scripts.constants.ui_constants import DonutChart, BaseWaterChart
from scripts.db.mongo.ilens_configuration.collections.constants import Constants
from scripts.schemas.batch_oee import WaterFallChart
class ChartMaker:
def __init__(self, project_id=None):
self.constants_con = Constants(mongo_client=mongo_client)
def main_creator(self, data, activity_length, overall=True):
chart_data = dict()
chart_data["waterfall"] = self.waterfall_chart(WaterFallChart(**data))
return self.donut_chart(data, chart_data, activity_length, overall)
@staticmethod
def waterfall_chart(data: WaterFallChart):
availability_to = 100 - data.availability_loss
performance_to = availability_to - data.performance_loss
quality_to = performance_to - data.quality_loss
oee_to = 0
first_series = [0, availability_to, performance_to, quality_to, oee_to]
second_series = [
100,
data.availability_loss,
data.performance_loss,
data.quality_loss,
data.oee,
]
waterfall_base = BaseWaterChart.waterfall_base
waterfall_base["data"]["series"][0]["data"] = BaseWaterChart.waterfall_series_builder(
first_series
)
waterfall_base["data"]["series"][1]["data"] = BaseWaterChart.waterfall_series_builder(
second_series
)
return waterfall_base["data"]
@staticmethod
def donut_chart(data, chart_data, activity_length, overall: bool):
base = DonutChart.base
chart_keys = DonutChart.chart_keys
for each in chart_keys:
color_map = DonutChart.color_mapping[each]["colors"]
each_base = base.copy()
actual_value = data[each]
rounded_value = round(data[each] / 100, 2)
each_base["series"]["axisLine"]["lineStyle"]["color"] = [
[rounded_value, color_map[0]],
[1, color_map[1]],
]
each_base["series"]["detail"]["formatter"] = (
each.capitalize().replace("_", " ")
if each != "oee"
else each.upper().replace("_", " ")
)
each_base["series"]["data"] = [
dict(value=actual_value, name=f"{actual_value}%")
]
chart_data[each] = deepcopy(each_base)
do_not_display = DonutChart.chart_keys + DonutChart.waterfall_chart_keys
for k, v in data.items():
if k not in do_not_display:
if k in ["total_units", "good_units"] and not overall:
v = round(v - activity_length, 2)
chart_data.update(
{
k: dict(label=k.capitalize().replace("_", " "), value=v)
}
)
return chart_data
import pandas as pd
from scripts.schemas.batch_oee import ChartResponse, ChartDBResponse
def processor(data):
db_response = ChartDBResponse(**data)
db_response.total_time = (
db_response.batch_end_time - db_response.batch_start_time
) / 60000
db_response.actual_cycle = round(
db_response.total_units / db_response.total_time, 2
)
db_response.ideal_cycle = round(db_response.cycle_time, 2)
db_response.good_units = round(
db_response.total_units - db_response.reject_units, 2
)
chart_response = ChartResponse(**db_response.dict())
return chart_response.dict()
def aggregator(data, activity_length):
df = pd.DataFrame(data)
df["total_time"] = (df["batch_end_time"] - df["batch_start_time"]) / 60000
df["actual_cycle"] = df["total_units"] / df["total_time"]
df["ideal_cycle"] = df["cycle_time"]
df["good_units"] = df["total_units"] - df["reject_units"]
df["reject_time"] = df["reject_units"] * (1 / df["ideal_cycle"])
agg_oee = df.sum().round(2)
availability = (agg_oee["total_time"] - agg_oee["downtime"]) / agg_oee["total_time"]
performance = agg_oee["productive_time"] / (
agg_oee["total_time"] - agg_oee["downtime"]
)
quality = (agg_oee["total_units"] - agg_oee["reject_units"]) / agg_oee[
"total_units"
]
oee_overall = round(availability * performance * quality, 2) * 100
availability_loss = agg_oee["downtime"] / agg_oee["total_time"] * 100
quality_loss = agg_oee["reject_time"] / agg_oee["total_time"] * 100
chart_response = ChartResponse(
total_units=round(agg_oee["total_units"] - (len(df) * activity_length)),
reject_units=agg_oee["reject_units"],
oee=oee_overall,
availability=round(availability * 100, 2),
downtime=agg_oee["downtime"],
performance=round(performance * 100, 2),
quality=round(quality * 100, 2),
actual_cycle=agg_oee["actual_cycle"],
ideal_cycle=agg_oee["ideal_cycle"],
good_units=round(agg_oee["good_units"] - (len(df) * activity_length)),
availability_loss=availability_loss,
quality_loss=quality_loss,
performance_loss=round(100 - availability_loss - quality_loss - oee_overall, 2),
total_time=agg_oee["total_time"],
productive_time=agg_oee["productive_time"],
)
filtered = chart_response.dict()
remove_keys = ["productive_time", "downtime", "reject_units"]
[filtered.pop(each, None) for each in remove_keys]
return filtered
import time
from scripts.errors import ErrorCodes
from scripts.constants import UOM
from scripts.logging import logger
from scripts.schemas.batch_oee import BatchOEEDataSaveRequest, BatchOEEData
class OEECalculator:
@staticmethod
async def calculate_availability(operating_time, planned_prod_time):
if operating_time > planned_prod_time:
logger.error(ErrorCodes.ERR001)
raise ValueError(ErrorCodes.ERR001)
try:
return operating_time / planned_prod_time
except Exception as e:
logger.exception(e)
raise
@staticmethod
async def calculate_performance(units_produced, cycle_time, operating_time):
try:
if cycle_time == 0 or operating_time == 0:
logger.error(ErrorCodes.ERR002)
raise ValueError(ErrorCodes.ERR002)
productive_time = units_produced * (1 / cycle_time)
if productive_time > operating_time:
logger.error(ErrorCodes.ERR003)
raise ValueError(ErrorCodes.ERR003)
return productive_time / operating_time
except Exception as e:
logger.exception(e)
raise
@staticmethod
async def calculate_productive_time(units_produced, cycle_time):
try:
if cycle_time == 0:
logger.error(ErrorCodes.ERR002)
raise ValueError(ErrorCodes.ERR002)
return units_produced * (1 / cycle_time)
except Exception as e:
logger.exception(e)
raise
@staticmethod
async def calculate_quality(rejected_units, total_units):
if rejected_units > total_units:
logger.error(ErrorCodes.ERR004)
raise ValueError(ErrorCodes.ERR004)
try:
return (total_units - rejected_units) / total_units
except ZeroDivisionError:
return 0
except Exception as e:
logger.exception(e)
raise
@staticmethod
async def calculate_oee(availability, performance, quality):
try:
return availability * performance * quality
except Exception as e:
logger.exception(e)
raise
class OEELossesCalculator:
@staticmethod
async def calculate_availability_loss(downtime, available_time):
return (downtime / available_time) * 100
@staticmethod
async def calculate_quality_loss(reject_units, cycle_time, available_time):
return ((reject_units * (1 / cycle_time)) / available_time) * 100
@staticmethod
async def calculate_performance_loss(
oee_percentage, availability_loss, quality_loss
):
return 100 - availability_loss - quality_loss - oee_percentage
class OEEEngine:
def __init__(self):
self.oee_calc = OEECalculator()
self.oee_loss_calc = OEELossesCalculator()
async def start_batch_oee_calc(
self,
product_info: BatchOEEDataSaveRequest
) -> BatchOEEData:
try:
logger.debug(f"Calculating OEE for {product_info.batch_id}")
# Start and End time should be in milliseconds since epoch.
if product_info.uom == UOM.minutes:
divisor = UOM.time_divs.minutes
elif product_info.uom == UOM.seconds:
divisor = UOM.time_divs.seconds
elif product_info.uom == UOM.hours:
divisor = UOM.time_divs.hours
elif product_info.uom == UOM.millis:
divisor = UOM.time_divs.millis
else:
divisor = UOM.time_divs.minutes
planned_production_time = (
product_info.batch_end_time - product_info.batch_start_time
) / divisor
operating_time = planned_production_time - product_info.downtime
availability = await self.oee_calc.calculate_availability(
operating_time=operating_time,
planned_prod_time=planned_production_time,
)
performance = await self.oee_calc.calculate_performance(
units_produced=product_info.total_units,
operating_time=operating_time,
cycle_time=product_info.cycle_time,
)
quality = await self.oee_calc.calculate_quality(
total_units=product_info.total_units,
rejected_units=product_info.reject_units,
)
oee = await self.oee_calc.calculate_oee(
availability=availability,
performance=performance,
quality=quality,
)
productive_time = await self.oee_calc.calculate_productive_time(
cycle_time=product_info.cycle_time,
units_produced=product_info.total_units,
)
availability_loss = await self.oee_loss_calc.calculate_availability_loss(
downtime=product_info.downtime,
available_time=planned_production_time,
)
quality_loss = await self.oee_loss_calc.calculate_quality_loss(
reject_units=product_info.reject_units,
available_time=planned_production_time,
cycle_time=product_info.cycle_time,
)
performance_loss = await self.oee_loss_calc.calculate_performance_loss(
oee_percentage=oee * 100,
availability_loss=availability_loss,
quality_loss=quality_loss,
)
oee_dict = {
"availability": availability * 100,
"performance": performance * 100,
"quality": quality * 100,
}
oee_loss = {
"availability_loss": availability_loss,
"quality_loss": quality_loss,
"performance_loss": performance_loss,
}
logger.debug(f"OEE: {product_info.batch_id}: {oee_dict}")
logger.debug(f"OEE Loss: {product_info.batch_id}: {oee_loss}")
batch_oee = BatchOEEData(
**product_info.dict(),
calculated_on=int(time.time() * 1000),
productive_time=productive_time,
availability=availability * 100,
performance=performance * 100,
quality=quality * 100,
availability_loss=availability_loss,
quality_loss=quality_loss,
performance_loss=performance_loss,
oee=oee * 100,
)
return batch_oee
except Exception:
raise
import time
from sqlalchemy.orm import Session
from scripts.core.engine.chart_creators import ChartMaker
from scripts.core.engine.oee_aggregator import processor, aggregator
from scripts.db_layer import batch_oee_table
from scripts.logging.logging import logger
from scripts.schemas.batch_oee import (
GetOEERequest,
GetOEERequestOneBatch,
GetBatches,
GetProducts,
ChartRequest,
)
from scripts.schemas.meta import LabelValue
from scripts.utils.mongo_util import MongoConnect
class APIHandler:
@staticmethod
async def get_oee_all(db: Session, get_oee_request: GetOEERequest):
try:
table_obj = batch_oee_table.BatchOEETable(db=db)
data = table_obj.get_oee_data_all(
start_time=get_oee_request.start_time,
end_time=get_oee_request.end_time,
hierarchy=get_oee_request.hierarchy,
)
return data
except Exception:
raise
finally:
del table_obj
@staticmethod
async def get_oee_batch(db: Session, get_oee_request: GetOEERequestOneBatch):
try:
table_obj = batch_oee_table.BatchOEETable(db=db)
data = table_obj.get_oee_data_batch_id(
hierarchy=get_oee_request.hierarchy, batch_id=get_oee_request.batch_id
)
return data
except Exception:
raise
finally:
del table_obj
@staticmethod
async def get_batches(db: Session, request_data: GetBatches):
try:
table_obj = batch_oee_table.BatchOEETable(db=db)
if not request_data.end_time:
request_data.end_time = int(time.time() * 1000)
data = table_obj.get_batches(
hierarchy=request_data.hierarchy,
start_time=request_data.start_time,
end_time=request_data.end_time,
)
return data
except Exception:
raise
finally:
del table_obj
@staticmethod
async def get_products(db: Session, request_data: GetProducts):
try:
table_obj = batch_oee_table.BatchOEETable(db=db)
data = table_obj.get_products(
hierarchy=request_data.hierarchy,
start_time=request_data.queryDate[0],
end_time=request_data.queryDate[1],
)
return [
LabelValue(
label=each[0], value=each[0], start_time=each[1], end_time=each[2]
) if isinstance(each, list) else LabelValue(
label=each['batch_id'], value=each['batch_id'],
start_time=each['batch_start_time'], end_time=each['batch_end_time']
)
for each in data
]
except Exception as e:
logger.exception(e, exc_info=True)
raise
finally:
del table_obj
@staticmethod
async def get_chart_data(db: Session, request_data: ChartRequest):
try:
table_obj = batch_oee_table.BatchOEETable(db=db)
if not request_data.hierarchy:
return dict()
chart_maker = ChartMaker()
data = table_obj.get_chart_data(
hierarchy=request_data.hierarchy,
start_time=request_data.queryDate[0],
end_time=request_data.queryDate[1],
product_id=request_data.productId,
aggregation=request_data.aggregation,
)
if not request_data.aggregation or len(data) == 1:
if isinstance(data, list):
data = data[0]
raw_data = processor(data)
return chart_maker.main_creator(raw_data, overall=False)
elif len(data) == 0:
return dict()
else:
agg_data = aggregator(data)
return chart_maker.main_creator(agg_data)
except Exception as e:
raise
finally:
del table_obj
from sqlalchemy.orm import Session
from scripts.constants import ResponseCodes
from scripts.core.engine.oee_calculator import OEEEngine
from scripts.db.psql.oee_discrete import DiscreteOEE
from scripts.errors import ILensError, ErrorCodes
from scripts.logging import logger
from scripts.schemas.batch_oee import BatchOEEDataRequest, BatchOEEData, GetOEERequestOneBatch, BatchOEEDataSaveRequest
from scripts.schemas.response_models import DefaultResponse
oee_engine = OEEEngine()
class CalculateBatchOEEHandler:
async def calculate_oee(self, db, product_info: BatchOEEDataRequest):
table_obj = DiscreteOEE(db=db)
try:
record_presence = table_obj.get_oee_data_batch_id(batch_id=product_info.batch_id,
hierarchy=product_info.hierarchy)
if not record_presence:
if not product_info.batch_start_time:
raise ILensError(ErrorCodes.ERR005)
product_info = BatchOEEDataSaveRequest(**product_info.dict(exclude_none=True))
oee_calculation = await oee_engine.start_batch_oee_calc(product_info=product_info)
await self.save_oee_data(oee_calculation, db)
response = DefaultResponse(
status=ResponseCodes.SUCCESS,
data=oee_calculation,
message="OEE saved Successfully",
)
return response
status = await self.update_oee_data(product_info.dict(exclude_none=True), record_presence, db)
response = DefaultResponse(
status=ResponseCodes.SUCCESS,
data=status,
message="OEE updated Successfully",
)
return response
except Exception as e:
logger.exception(f"Exception while saving oee record: {e}")
raise e
finally:
del table_obj
@staticmethod
async def save_oee_data(oee_data: BatchOEEData, db: Session):
try:
oee_table = BatchOEETable(**oee_data.dict())
table_obj = batch_oee_table.BatchOEETable(db=db)
table_obj.add_data(oee_table)
return True
except Exception as e:
raise e
finally:
del table_obj
@staticmethod
async def update_oee_data(product_info: dict, old_record: dict, db: Session):
try:
table_obj = batch_oee_table.BatchOEETable(db=db)
old_record.update(**product_info)
oee_calculation = await oee_engine.start_batch_oee_calc(
product_info=BatchOEEDataSaveRequest(**old_record))
filters = GetOEERequestOneBatch(batch_id=product_info["batch_id"], hierarchy=product_info["hierarchy"])
table_obj.update_record(filters=filters, update_obj=oee_calculation.dict())
return True
except Exception as e:
raise e
finally:
del table_obj
from datetime import datetime
import pandas as pd
import pytz
from sqlalchemy import create_engine
from scripts.config import DBConf
from scripts.config import Metadata
from scripts.core.engine.automation_engine import AutomationEngine
from scripts.core.handlers.batch_oee_calc_handler import CalculateBatchOEEHandler, BatchOEEDataRequest
from scripts.db.mongo.dbs.siam_oee import SiamOEE
from scripts.db_layer.job_table import JobTable
from scripts.logging.logging import logger as logging
from scripts.models.db_models import JobTable as JobSkeleton
from scripts.schemas.form import (GetRDBValues, CustomQuery, StartProductionRequest, EndProductionRequest,
StartProdJobModel, EndProdJobModel, CalculateOEE, FormSaveRequest, FormDetails,
EndProdJobDB)
from scripts.utils.common_utils import CommonUtils
class FormHandler:
def __init__(self):
self.oee_mongo = SiamOEE()
self.automation_engine = AutomationEngine()
@staticmethod
async def fetch_last_values(request_data: GetRDBValues):
try:
base_engine = create_engine(f"{DBConf.CLIENT_URI}{request_data.db_name}")
query = f"SELECT * from {request_data.table_name}"
if request_data.primary_conditions:
query += " WHERE "
for column, val in request_data.primary_conditions.items():
query += f"{column}='{val}'"
query += " LIMIT 1"
table_data_df = pd.read_sql(query, base_engine)
del base_engine
table_data_df.rename(columns=request_data.column_to_property, inplace=True)
return table_data_df.to_dict(orient="records")[0]
except Exception as e:
logging.exception(e)
@staticmethod
async def custom_query_fetch(request_data: CustomQuery):
try:
base_engine = create_engine(f"{DBConf.CLIENT_URI}{request_data.db_name}")
table_data_df = pd.read_sql(request_data.query, base_engine)
del base_engine
table_data_df.rename(columns=request_data.column_to_property, inplace=True)
automation_eng = AutomationEngine()
current_time = datetime.now()
abs_start_time = automation_eng.get_absolute_start_time(button_click_time=current_time)
erp_table_data = table_data_df.to_dict(orient="records")[0]
erp_table_data.update({
"start_time": abs_start_time.strftime("%Y-%m-%d %H:%M")
})
return erp_table_data
except Exception as e:
logging.exception(e)
async def start_production(self, request_data: StartProductionRequest, db_session):
try:
table_data = JobTable(db_session)
request_data.submitted_data.update(tz=request_data.tz)
job_model = StartProdJobModel(**request_data.submitted_data.get("data", {}))
job_data = job_model.dict(exclude_none=True)
job_data.pop("tz", None)
row_data = JobSkeleton(**job_data)
table_data.add_data(row_data)
# TODO: Create mongo record with job details
job_data.update(form_details=FormDetails(**request_data.dict()).dict(),
prod_status="started")
self.oee_mongo.update_oee(job_data, job_model.job, job_model.uf_process)
except Exception as e:
logging.exception(e)
async def end_production(self, request_data: EndProductionRequest, db_session, request_cookies):
try:
table_data = JobTable(db_session)
job_model, db_data = await self.get_job_data(request_data)
table_data.update_record(job_model.job, job_model.uf_process, db_data.dict(exclude_none=True))
calculate_oee_payload = CalculateOEE(batch_start_time=job_model.start_time,
batch_end_time=job_model.end_time,
batch_id=job_model.job,
setup_time=job_model.setup_time,
cycle_time=job_model.cycle_time,
total_units=job_model.qty_released)
calculate_oee_payload.downtime = await self.get_oee_downtime(request_data.submitted_data["data"],
job_model.end_time, job_model.tz)
_ = await CalculateBatchOEEHandler().calculate_oee(db_session, BatchOEEDataRequest(
**calculate_oee_payload.dict()))
form_response = await self.save_to_form(request_data, request_cookies, job_model)
logging.info(f"FORM SAVE RESPONSE, {form_response}")
return "Form values updated successfully"
except Exception as e:
logging.exception(e)
return f"Server encountered an error during op: {e}"
async def get_job_data(self, request_data: EndProductionRequest):
request_data.submitted_data.update(tz=request_data.tz)
form_data = request_data.submitted_data.get("data", {})
job = form_data.get("job")
uf_process = form_data.get("uf_process")
data_from_mongo = self.oee_mongo.find_record(job, uf_process)
form_data.update(data_from_mongo)
job_model = EndProdJobModel(**form_data)
if not job_model.setup_time:
job_model.setup_time = 0
if data_from_mongo.get("units_produced"):
job_model.qty_released = data_from_mongo.get("units_produced")
db_data = EndProdJobDB(**job_model.dict())
return job_model, db_data
async def save_to_form(self, request_data: EndProductionRequest,
request_cookies,
job_model: EndProdJobModel):
end_date_time = datetime.fromtimestamp(job_model.end_time // 1000, tz=pytz.timezone(request_data.tz))
end_str = end_date_time.strftime("%Y-%m-%d %H:%M")
start_date_time = datetime.fromtimestamp(job_model.start_time // 1000, tz=pytz.timezone(request_data.tz))
start_str = start_date_time.strftime("%Y-%m-%d %H:%M")
tag_data = self.automation_engine.get_all_tags(end_date_time)
tag_data.update(**job_model.dict(exclude_none=True))
form_save_payload = FormSaveRequest(**request_data.dict())
form_save_payload.submitted_data["data"].update(**tag_data)
form_save_payload.submitted_data["data"].update(end_time=end_str, start_time=start_str)
form_response = await CommonUtils.hit_external_service(api_url=f"{Metadata.FORM_API}render/form?save=True",
payload=form_save_payload.dict(),
request_cookies=request_cookies)
form_save_payload.submitted_data["data"].update({"prod_status": "completed"})
self.oee_mongo.update_oee(form_save_payload.submitted_data["data"], job_model.job, job_model.uf_process)
return form_response
async def get_oee_downtime(self, data, end_time, tz):
if isinstance(end_time, int):
end_time = datetime.fromtimestamp(end_time // 1000, tz=pytz.timezone(tz))
run_start_time = data.get("start_time") if not data.get("run_start_time") else data.get("run_start_time")
if isinstance(run_start_time, int):
run_start_time = datetime.fromtimestamp(run_start_time // 1000, tz=pytz.timezone(tz))
else:
run_start_time = datetime.strptime(run_start_time, "%Y-%m-%d %H:%M")
run_start_time = run_start_time.replace(tzinfo=pytz.timezone(tz))
return self.automation_engine.get_downtime(run_start_time, end_time)
from scripts.db.mongo.dbs.oee_layouts import OeeLayoutCollectionQueries
from scripts.schemas.layout import SaveLayoutRequest, GetLayoutRequest
from scripts.db.mongo.dbs.oee_layouts import OeeLayoutCollection
class LayoutHandler:
def __init__(self):
self.layout_con = OeeLayoutCollectionQueries()
async def save_layout(self, layout_request: SaveLayoutRequest):
try:
data = self.layout_con.update_layout(
layout_request.dict(), layout_request.project_id
)
return data
except Exception:
raise
async def fetch_layout(self, layout_request: GetLayoutRequest):
try:
data: OeeLayoutCollection = self.layout_con.find_layout(
layout_request.project_id
)
return data.data
except Exception:
raise
# Copyright (c) NOIR
from sqlalchemy import Column, Integer, String, BigInteger, Float
from scripts.db.psql.databases import Base
class OEEDiscreteTable(Base):
__tablename__ = "oee_discrete"
id = Column(Integer, autoincrement=True, primary_key=True)
hierarchy = Column(String)
process = Column(String, nullable=True)
reference = Column(String)
prod_start_time = Column(BigInteger)
prod_end_time = Column(BigInteger)
total_downtime = Column(Float, default=0)
cycle_time = Column(Float)
total_units = Column(Float)
reject_units = Column(Float, default=0)
productive_time = Column(Float, default=0)
availability = Column(Float)
performance = Column(Float)
quality = Column(Float)
availability_loss = Column(Float)
quality_loss = Column(Float)
performance_loss = Column(Float)
oee = Column(Float)
calculated_on = Column(BigInteger)
uom = Column(String, default="mins")
setup_time = Column(Float, default=0)
from scripts.constants import DBConstants
database = DBConstants.db_metadata
from scripts.constants import DBConstants
collection_constants = DBConstants.collection_constants
from typing import Any, Optional
from pydantic import BaseModel
from scripts.constants import CommonKeys
from scripts.db.mongo.ilens_configuration import database
from scripts.db.mongo.ilens_configuration.collections import collection_constants
from scripts.utils.mongo_util import MongoCollectionBaseClass
class ConstantsSchema(BaseModel):
"""
This is the Schema for the Mongo DB Collection.
All datastore and general responses will be following the schema.
"""
type: Optional[str]
data: Optional[Any]
map_json: Optional[Any]
content_type: Optional[Any]
content: Optional[Any]
class Constants(MongoCollectionBaseClass):
def __init__(self, mongo_client):
super().__init__(mongo_client, database=database,
collection=collection_constants)
@property
def key_type(self):
return CommonKeys.KEY_TYPE
@property
def key_content_type(self):
return CommonKeys.KEY_CONTENT_TYPE
def find_constant_by_dict(self, _type):
"""
The following function will give one record for a given set of
search parameters as keyword arguments
:param _type:
:return:
"""
record = self.find_one(query={self.key_type: _type})
if not record:
return dict(record)
return dict(record)
def find_constant(self, _type, filter_dict=None):
"""
The following function will give one record for a given set of
search parameters as keyword arguments
:param _type:
:param filter_dict:
:return:
"""
query = {self.key_type: _type}
record = self.find_one(query=query, filter_dict=filter_dict)
if not record:
return dict()
return ConstantsSchema(**record).dict()
def find_constant_by_query(self, query, filter_dict=None):
record = self.find_one(query=query, filter_dict=filter_dict)
if not record:
return dict()
return ConstantsSchema(**record).dict()
def insert_one_constant(self, data):
"""
The following function will insert one tag in the
tags collections
:param self:
:param data:
:return:
"""
return self.insert_one(data)
def find_constant_by_content(self, content_type):
"""
The following function will give one record for a given set of
search parameters as keyword arguments
"""
query = {self.key_content_type: content_type}
record = self.find_one(query=query)
if not record:
return dict()
return record
def find_constant_by_aggregate(self, query):
constant = self.aggregate(query)
if not constant:
return list()
return list(constant)
def find_constant_dict(self, _type, filter_dict=None):
"""
The following function will give one record for a given set of
search parameters as keyword arguments
:param _type:
:param filter_dict:
:return:
"""
query = {self.key_type: _type}
record = self.find_one(query=query, filter_dict=filter_dict)
if not record:
return dict()
return dict(record)
import sys
from sqlalchemy import create_engine
from sqlalchemy_utils import database_exists, create_database
from scripts.config import DBConf
from scripts.db.db_models import DownTimeCategory, DownTimeLog, DownTimeMasterError
from scripts.logging import logger
engine = create_engine(DBConf.POSTGRES_URI)
def create_default_psql_dependencies():
try:
if not database_exists(engine.url):
create_database(engine.url)
DownTimeCategory.__table__.create(bind=engine, checkfirst=True)
DownTimeMasterError.__table__.create(bind=engine, checkfirst=True)
DownTimeLog.__table__.create(bind=engine, checkfirst=True)
except Exception as e:
logger.error(f"Error occurred while creating: {e}", exc_info=True)
sys.exit()
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# from scripts.constants.app_constants import TableNames
from scripts.config import DBConf
# table_name = "UserDataEntryTable"
engine = create_engine(DBConf.POSTGRES_URI)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
from fastapi.encoders import jsonable_encoder
from sqlalchemy.orm import Session, defer
from scripts.db.db_models import OEEDiscreteTable
from scripts.errors import ILensError
from scripts.logging import logger
from scripts.utils.postgres_util import SQLDBUtils
class DiscreteOEE(SQLDBUtils):
def __init__(self, db: Session):
super().__init__(db)
self.table = OEEDiscreteTable
def get_oee_data_all(self, start_time, end_time, hierarchy):
try:
data = (
self.session.query(self.table)
.order_by(self.table.calculated_on)
.filter(
self.table.hierarchy == hierarchy,
self.table.calculated_on >= start_time,
self.table.calculated_on <= end_time,
)
)
if data:
return [jsonable_encoder(i) for i in data]
else:
return list()
except Exception as e:
logger.exception(e)
raise
def get_oee_data_batch_id(self, batch_id, hierarchy):
try:
data = (
self.session.query(self.table)
.order_by(self.table.calculated_on)
.filter(
self.table.hierarchy == hierarchy, self.table.batch_id == batch_id
)
.first()
)
if data:
return jsonable_encoder(data)
else:
return None
except Exception as e:
logger.exception(e)
raise
def get_batches(self, hierarchy, start_time, end_time):
try:
data = (
self.session.query(self.table.batch_id)
.order_by(self.table.calculated_on)
.filter(
self.table.hierarchy == hierarchy,
self.table.calculated_on >= start_time,
self.table.calculated_on <= end_time,
)
)
if data:
return [getattr(i, self.column_batch_id) for i in data]
else:
return list()
except Exception as e:
logger.exception(e)
raise
def get_products(self, hierarchy, start_time, end_time):
try:
data = (
self.session.query(
self.table.batch_id,
self.table.batch_start_time,
self.table.batch_end_time,
)
.order_by(self.table.calculated_on)
.filter(
self.table.hierarchy == hierarchy,
self.table.batch_start_time >= start_time,
self.table.batch_end_time <= end_time,
)
)
if data:
return [jsonable_encoder(each) for each in data]
else:
return list()
except Exception as e:
logger.exception(e)
raise
def get_chart_data(
self, start_time, end_time, hierarchy, product_id, aggregation=False
):
try:
if not aggregation:
data = (
self.session.query(self.table)
.filter(
self.table.hierarchy == hierarchy,
self.table.batch_id == product_id,
self.table.batch_start_time >= start_time,
self.table.batch_end_time <= end_time,
)
.first()
)
if data:
return jsonable_encoder(data)
else:
data = (
self.session.query(self.table)
.filter(
self.table.hierarchy == hierarchy,
self.table.batch_start_time >= start_time,
self.table.batch_end_time <= end_time,
)
.options(
defer(self.table.hierarchy),
defer(self.table.batch_id),
defer(self.table.uom),
)
)
if data:
return [jsonable_encoder(each) for each in data]
raise ILensError("Record(s) not found")
except Exception as e:
logger.exception(e)
raise
import redis
from scripts.config import RedisConfig
login_db = redis.from_url(RedisConfig.uri, db=int(RedisConfig.login_db), decode_responses=True)
project_details_db = redis.from_url(RedisConfig.uri, db=int(RedisConfig.project_tags_db), decode_responses=True)
downtime_db = redis.from_url(RedisConfig.uri, db=int(RedisConfig.downtime_db), decode_responses=True)
class ErrorMessages:
UNKNOWN = "Unknown Error occurred"
ERR001 = "Configurations not available, please verify the database."
ERR002 = "Data Not Found"
class ErrorCodes:
ERR001 = "ERR001 - Operating Time is greater than Planned Time"
ERR002 = "ERR002 - Zero Values are not allowed"
ERR003 = "ERR003 - Operating Time is less than Productive Time"
ERR004 = "ERR004 - Rejected Units is greater than Total Units"
ERR005 = "ERR005 - Batch Start time not supplied"
class UnknownError(Exception):
pass
class KairosDBError(Exception):
pass
class UnauthorizedError(Exception):
pass
class ILensError(Exception):
pass
class NameExists(Exception):
pass
class InputRequestError(ILensError):
pass
class IllegalTimeSelectionError(ILensError):
pass
class DataNotFound(Exception):
pass
class AuthenticationError(ILensError):
"""
JWT Authentication Error
"""
class JWTDecodingError(Exception):
pass
class DuplicateReportNameError(Exception):
pass
class PathNotExistsException(Exception):
pass
class ImplementationError(Exception):
pass
import logging
import os
from logging import StreamHandler
from logging.handlers import RotatingFileHandler, SocketHandler
import yaml
# this method is to read the configuration from backup.conf
from scripts.config import Logging
def read_configuration(file_name):
"""
:param file_name:
:return: all the configuration constants
"""
with open(file_name, 'r') as stream:
try:
return yaml.safe_load(stream)
except Exception as e:
print(f"Failed to load Configuration. Error: {e}")
config = read_configuration("scripts/logging/logger_conf.yml")
logging_config = config["logger"]
logging_config["level"] = Logging.level
enable_traceback: bool = Logging.tb_flag
def get_logger():
"""
Creates a rotating log
"""
__logger__ = logging.getLogger('')
__logger__.setLevel(logging_config["level"].upper())
log_formatter = '%(asctime)s - %(levelname)-6s - [%(threadName)5s:%(funcName)5s():''' \
'%(lineno)s] - %(message)s'
time_format = "%Y-%m-%d %H:%M:%S"
formatter = logging.Formatter(log_formatter, time_format)
for each_handler in logging_config["handlers"]:
if each_handler["type"] in ["RotatingFileHandler"]:
if not os.path.exists("logs"):
os.makedirs("logs")
log_file = os.path.join("logs", f"{logging_config['name']}.log")
temp_handler = RotatingFileHandler(log_file,
maxBytes=each_handler["max_bytes"],
backupCount=each_handler["back_up_count"])
temp_handler.setFormatter(formatter)
elif each_handler["type"] in ["SocketHandler"]:
temp_handler = SocketHandler(each_handler["host"], each_handler["port"])
elif each_handler["type"] in ["StreamHandler"]:
temp_handler = StreamHandler()
temp_handler.setFormatter(formatter)
else:
temp_handler = None
__logger__.addHandler(temp_handler)
return __logger__
logger = get_logger()
logger:
name: downtime_oee
level: DEBUG
handlers:
- type: RotatingFileHandler
file_path: logs/
max_bytes: 100000000
back_up_count: 5
- type: StreamHandler
name: downtime_oee
from typing import Optional, Union, List
import time
from pydantic import BaseModel, validator
class GetProducts(BaseModel):
queryDate: List[int]
hierarchy: str
project_id: Optional[str]
class WaterFallChart(BaseModel):
performance_loss: float
quality_loss: float
availability_loss: float
oee: float
@validator("*")
def round_float(cls, v):
return round(v, 2)
class ChartRequest(BaseModel):
project_id: str
queryDate: List[int]
hierarchy: Optional[str]
productId: Optional[str]
aggregation: Optional[bool] = False
class ChartDBResponse(BaseModel):
total_units: float
reject_units: int
oee: int
availability: float
downtime: int
performance: int
performance_loss: float
quality: int
availability_loss: float
quality_loss: float
cycle_time: float
batch_start_time: int
batch_end_time: int
good_units: Optional[float]
actual_cycle: Optional[float]
ideal_cycle: Optional[float]
total_time: Optional[float]
productive_time: int
downtime: int
@validator("*")
def round_float(cls, v):
return round(v, 2)
class ChartResponse(BaseModel):
total_units: float
reject_units: int
oee: int
availability: float
downtime: int
performance: int
quality: int
actual_cycle: float
ideal_cycle: float
good_units: float
availability_loss: float
quality_loss: float
performance_loss: float
total_time: float
productive_time: int
class DowntimeLogsRequest(BaseModel):
project_id: str
start_time: int
end_time: int
hierarchy: str
class ProductInfo(BaseModel):
product_start_time: int
product_end_time: int
hierarchy: str
product_id: str
setup_time: Optional[Union[float, int]] = 0
cycle_time: Union[float, int]
total_units: Union[float, int]
reject_units: Optional[Union[float, int]] = 0
class BatchOEEDataRequest(BaseModel):
batch_start_time: Optional[int]
batch_end_time: Optional[int]
downtime: Optional[Union[float, int]]
hierarchy: Optional[str]
batch_id: Optional[str]
setup_time: Optional[Union[float, int]]
cycle_time: Optional[Union[float, int]]
total_units: Optional[Union[float, int]]
reject_units: Optional[Union[float, int]]
uom: Optional[str]
class BatchOEEDataSaveRequest(BaseModel):
batch_start_time: Optional[int]
batch_end_time: Optional[int] = int(time.time() * 1000)
downtime: Optional[Union[float, int]] = 0
hierarchy: Optional[str]
batch_id: Optional[str]
setup_time: Optional[Union[float, int]] = 0
cycle_time: Union[float, int]
total_units: Optional[Union[float, int]] = 0
reject_units: Optional[Union[float, int]] = 0
uom: Optional[str] = "mins"
class BatchOEEData(BatchOEEDataRequest):
calculated_on: int
productive_time: float
availability: float
performance: float
quality: float
availability_loss: float
quality_loss: float
performance_loss: float
oee: float
class Config:
orm_mode = True
class GetOEERequest(BaseModel):
start_time: int
end_time: int
hierarchy: str
class GetOEERequestOneBatch(BaseModel):
hierarchy: str
batch_id: str
class GetBatches(GetOEERequest):
pass
from typing import Optional, Union, List
from pydantic import BaseModel
class GetLayoutRequest(BaseModel):
project_id: str
class LayoutData(BaseModel):
label: Optional[str]
value: Optional[str]
description: Optional[str]
show: Optional[bool]
category: Optional[str]
cols: Optional[int]
rows: Optional[int]
min_item_rows: Optional[int]
min_item_cols: Optional[int]
x: Optional[int]
y: Optional[int]
class SaveLayoutRequest(BaseModel):
data: List
project_id: str
from typing import Optional, Union, Dict
from pydantic import BaseModel
class GetHierarchyRequest(BaseModel):
type: str
filter: Dict = dict()
project_id: str
site_id: Optional[str]
node_id: Optional[str]
class LabelValue(BaseModel):
label: str
value: str
start_time: int
end_time: int
from typing import Optional, Any
from pydantic import BaseModel
class DefaultResponse(BaseModel):
status: str = "Failed"
message: Optional[str]
data: Optional[Any]
class DefaultFailureResponse(DefaultResponse):
error: Any
import traceback
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from pydantic import ValidationError
from scripts.constants import Endpoints
from scripts.core.handlers.batch_oee_calc_handler import CalculateBatchOEEHandler
from scripts.db.psql.databases import get_db
from scripts.logging import logger
from scripts.schemas.batch_oee import BatchOEEDataRequest
from scripts.schemas.response_models import DefaultFailureResponse
from scripts.errors import ILensError
import json
batch_oee_handler = CalculateBatchOEEHandler()
calc_oee_router = APIRouter(prefix=Endpoints.calc_oee_base, tags=["OEE Calculator"])
@calc_oee_router.post(Endpoints.calculate_batch_oee)
async def calculate_oee_for_batch(
product_info: BatchOEEDataRequest, db: Session = Depends(get_db)
):
try:
return await batch_oee_handler.calculate_oee(product_info=product_info, db=db)
except ILensError as error_code:
return DefaultFailureResponse(error=error_code.args[0])
except ValidationError as e:
return DefaultFailureResponse(error=json.loads(e.json()))
except Exception as e:
tb = traceback.format_exc()
logger.exception(e)
logger.exception(tb)
return DefaultFailureResponse(error=e.args)
@calc_oee_router.post(Endpoints.update_batch_oee)
def calculate_oee_for_batch(
product_info: BatchOEEDataRequest, db: Session = Depends(get_db)
):
try:
return batch_oee_handler.calculate_oee(product_info=product_info, db=db)
except ValidationError as e:
return DefaultFailureResponse(error=json.loads(e.json()))
except Exception as e:
tb = traceback.format_exc()
logger.exception(e)
logger.exception(tb)
return DefaultFailureResponse(error=e.args)
import traceback
from fastapi import APIRouter, Request, Cookie
from scripts.constants import Endpoints, ResponseCodes
from scripts.core.handlers.meta_handler import MetaHandler
from scripts.logging.logging import logger
from scripts.schemas.meta import GetHierarchyRequest
from scripts.schemas.response_models import DefaultFailureResponse, DefaultResponse
meta_handler = MetaHandler()
meta_service_router = APIRouter(prefix=Endpoints.api_hierarchy, tags=["Meta Services"])
@meta_service_router.post(Endpoints.api_get)
async def find_hierarchy(get_hierarchy_request: GetHierarchyRequest, request: Request):
try:
data = await meta_handler.find_hierarchy(
get_hierarchy_request=get_hierarchy_request, request=request
)
return DefaultResponse(
data=data,
status=ResponseCodes.SUCCESS,
message="Hierarchies listed successfully",
)
except Exception as e:
tb = traceback.format_exc()
logger.exception(e)
logger.exception(tb)
return DefaultFailureResponse(error=e.args)
import traceback
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from scripts.constants import Endpoints, ResponseCodes
from scripts.core.handlers.api_handler import APIHandler
from scripts.core.handlers.layout_handler import LayoutHandler
from scripts.db.databases import get_db
from scripts.logging.logging import logger
from scripts.schemas.batch_oee import GetProducts, ChartRequest
from scripts.schemas.layout import GetLayoutRequest, SaveLayoutRequest
from scripts.schemas.response_models import DefaultFailureResponse, DefaultResponse
from scripts.errors import ILensError
api_handler = APIHandler()
layout_handler = LayoutHandler()
ui_service_router = APIRouter(prefix=Endpoints.api_batches, tags=["UI Services"])
@ui_service_router.post(Endpoints.api_get)
async def get_all_products(request_data: GetProducts, db: Session = Depends(get_db)):
try:
data = await api_handler.get_products(request_data=request_data, db=db)
return DefaultResponse(
data=data,
status=ResponseCodes.SUCCESS,
message="Products fetched successfully",
)
except Exception as e:
tb = traceback.format_exc()
logger.exception(e)
logger.exception(tb)
return DefaultFailureResponse(error=e.args)
@ui_service_router.post(Endpoints.api_chart_data)
async def get_chart_data(request_data: ChartRequest, db: Session = Depends(get_db)):
try:
data = await api_handler.get_chart_data(request_data=request_data, db=db)
return DefaultResponse(
data=data,
status=ResponseCodes.SUCCESS,
message="Chart data fetched successfully",
)
except ILensError as e:
return DefaultFailureResponse(error=e.args)
except Exception as e:
tb = traceback.format_exc()
logger.exception(e)
logger.exception(tb)
return DefaultFailureResponse(error=e.args)
@ui_service_router.post(Endpoints.get_layout)
async def get_layout(request_data: GetLayoutRequest):
try:
data = await layout_handler.fetch_layout(layout_request=request_data)
return DefaultResponse(
data=data,
status=ResponseCodes.SUCCESS,
message="Layout fetched successfully",
)
except Exception as e:
tb = traceback.format_exc()
logger.exception(e)
logger.exception(tb)
return DefaultFailureResponse(error=e.args)
@ui_service_router.post(Endpoints.save_layout)
async def save_layout(request_data: SaveLayoutRequest):
try:
data = await layout_handler.save_layout(layout_request=request_data)
return DefaultResponse(
data=data, status=ResponseCodes.SUCCESS, message="Layout saved successfully"
)
except Exception as e:
tb = traceback.format_exc()
logger.exception(e)
logger.exception(tb)
return DefaultFailureResponse(error=e.args)
# -------------Code Demo Backup----------------#
#
# @ui_service_router.post(Endpoints.get_batch_oee_all)
# async def get_all_batch_oee(get_oee_request: GetOEERequest,
# db: Session = Depends(get_db)):
# try:
# return await api_handler.get_oee_all(get_oee_request=get_oee_request, db=db)
# except Exception as e:
# tb = traceback.format_exc()
# logger.exception(e)
# logger.exception(tb)
# return DefaultFailureResponse(error=e.args)
#
#
# @ui_service_router.post(Endpoints.get_batch_oee_batch_id)
# async def get_all_batch_oee(get_oee_request: GetOEERequestOneBatch,
# db: Session = Depends(get_db)):
# try:
# return await api_handler.get_oee_batch(get_oee_request=get_oee_request, db=db)
# except Exception as e:
# tb = traceback.format_exc()
# logger.exception(e)
# logger.exception(tb)
# return DefaultFailureResponse(error=e.args)
from abc import abstractmethod
from urllib.parse import urlparse
import aiohttp
import httpx
import requests
from aiohttp import ClientResponse
from scripts.constants.app_constants import CommonStatusCode, Secrets
from scripts.logging import logger
from scripts.utils.security_utils.apply_encrytion_util import create_token
class AuthenticationError(Exception): ...
class ForbiddenError(Exception): ...
token = ""
secrets = {}
class BaseRequestHandler:
"""
BaseRequestHandler
"""
def __init__(self, url, time_out=None) -> None:
self.time_out = time_out
self.url = url
self.verify = False
@property
def get_timeout(self):
return self.time_out
def post(self, path="", json=None, data=None, update_args=True, **kwargs) -> requests.Response:
"""doc"""
url = self.get_url(path)
logger.debug(f"url path -- {url}")
response = requests.post(url=url, data=data, json=json, **self.prepare_args(**kwargs))
if response.status_code not in CommonStatusCode.SUCCESS_CODES:
logger.exception(f"status - {response.status_code}")
raise AuthenticationError(f"Connection Failure.. Status Code - {response.status_code}")
if update_args:
self.update_args(response=response)
return response
def get(self, path="", params=None, **kwargs) -> requests.Response:
"""doc"""
url = self.get_url(path)
# Commenting this as currently there is no use case for this
# self.update_args(response = response)
response = requests.get(url=url, params=params, **self.prepare_args(**kwargs))
if response.status_code == 401:
raise AuthenticationError("Not authorized")
if response.status_code == 403:
raise ForbiddenError("Not Permitted")
return response
def get_url(self, path):
return f"{self.url.rstrip('/')}/{path.lstrip('/').rstrip('/')}"
def verify_request(self):
if self.url_scheme(self.url) == 'https':
self.verify = True
return self.verify
@staticmethod
def url_scheme(url):
return urlparse(url).scheme
@abstractmethod
def prepare_args(self, **kwargs):
...
@abstractmethod
def update_args(self, **kwargs):
...
class HTTPXRequestHandler:
def __init__(self, url, time_out=None) -> None:
self.time_out = time_out
self.url = url
self.verify = False
@property
def get_timeout(self):
return self.time_out
def httpx_post(self, path="", json=None, data=None, update_args=True, **kwargs) -> requests.Response:
"""doc"""
url = self.get_url(path)
print(url)
with httpx.Client() as client:
response = client.post(url=url, data=data, json=json, **self.prepare_args(**kwargs))
if response.status_code == 401:
raise AuthenticationError("Not authorized")
if response.status_code == 403:
raise ForbiddenError("Not Permitted")
if update_args:
self.update_args(response=response)
return response
def httpx_get(self, path="", params=None, **kwargs) -> requests.Response:
"""doc"""
url = self.get_url(path)
# Commenting this as currently there is no use case for this
# self.update_args(response = response)
with httpx.Client() as client:
response = client.get(url=url, params=params, **self.prepare_args(**kwargs))
if response.status_code == 401:
raise AuthenticationError("Not authorized")
if response.status_code == 403:
raise ForbiddenError("Not Permitted")
return response
def get_url(self, path):
return f"{self.url.rstrip('/')}/{path.lstrip('/').rstrip('/')}"
def verify_request(self):
if self.url_scheme(self.url) == 'https':
self.verify = True
return self.verify
@staticmethod
def url_scheme(url):
return urlparse(url).scheme
@abstractmethod
def prepare_args(self, **kwargs):
...
@abstractmethod
def update_args(self, **kwargs):
...
class AIOHTTPRequestHandler:
def __init__(self, url, time_out=None) -> None:
self.time_out = time_out
self.url = url
self.verify = False
@property
def get_timeout(self):
return self.time_out
async def aiohttp_post(self, path="", json=None, data=None, update_args=True, **kwargs) -> ClientResponse:
"""doc"""
url = self.get_url(path)
print(url)
async with aiohttp.ClientSession() as client:
async with client.post(
url=url, data=data, json=json, **self.prepare_args(**kwargs)
) as resp:
response = resp
if response.status == 401:
raise AuthenticationError("Not authorized")
if response.status == 403:
raise ForbiddenError("Not Permitted")
if update_args:
self.update_args(response=response)
return response
async def aiohttp_get(self, path="", params=None, **kwargs) -> ClientResponse:
"""doc"""
url = self.get_url(path)
# Commenting this as currently there is no use case for this
# self.update_args(response = response)
async with aiohttp.ClientSession() as client:
async with client.get(
url=url, params=params, **self.prepare_args(**kwargs)
) as resp:
response = resp
if response.status == 401:
raise AuthenticationError("Not authorized")
if response.status == 403:
raise ForbiddenError("Not Permitted")
return response
def get_url(self, path):
return f"{self.url.rstrip('/')}/{path.lstrip('/').rstrip('/')}"
def verify_request(self):
if self.url_scheme(self.url) == 'https':
self.verify = True
return self.verify
@staticmethod
def url_scheme(url):
return urlparse(url).scheme
@abstractmethod
def prepare_args(self, **kwargs):
...
@abstractmethod
def update_args(self, **kwargs):
...
class ILensRequest(BaseRequestHandler, HTTPXRequestHandler, AIOHTTPRequestHandler):
"""
Utility to use ilens API's Directly
"""
def __init__(self, url, project_id) -> None:
super().__init__(url)
self.project_id = project_id
def prepare_args(self, **kwargs) -> dict:
"""
doc
"""
post_args = {}
post_args.update(**kwargs)
post_args.update(headers=self.get_headers(**kwargs))
post_args.update(timeout=self.get_timeout)
post_args.update(verify=self.verify_request())
post_args.update(cookies=self.get_cookies(**kwargs))
return post_args
def update_args(self, **kwargs) -> bool:
data = kwargs.get('response').headers.get('token')
if data:
global token
token = data
return data
def get_headers(self, **kwargs) -> dict:
headers = {'X-Content-Type-Options': 'nosniff',
'X-Frame-Options': 'SAMEORIGIN', 'Cache-Control': 'no-store', 'projectId': self.project_id}
headers.update(kwargs.get("headers", {}))
return headers
@staticmethod
def get_cookies(**kwargs) -> dict:
cookies = {}
cookies.update(kwargs.get("cookies", {}))
return cookies
@staticmethod
def create_token(host: str = '127.0.0.1', user_id=None, internal_token=Secrets.token):
"""
This method is to create a cookie
"""
try:
if user_id is None:
user_id = "user_099"
new_token = create_token(
user_id=user_id,
ip=host,
token=internal_token,
)
return new_token
except Exception as e:
logger.exception(str(e))
raise
import json
from functools import lru_cache
@lru_cache()
def get_db_name(redis_client, project_id: str, database: str, delimiter="__"):
if not project_id:
return database
val = redis_client.get(project_id)
if val is None:
raise ValueError(
f"Unknown Project, Project ID: {project_id} Not Found!!!")
val = json.loads(val)
if not val:
return database
# Get the prefix flag to apply project_id prefix to any db
prefix_condition = bool(
val.get("source_meta", {}).get("add_prefix_to_database"))
if prefix_condition:
# Get the prefix name from mongo or default to project_id
prefix_name = val.get("source_meta", {}).get("prefix") or project_id
return f"{prefix_name}{delimiter}{database}"
return database
from typing import Dict, List, Optional
from pymongo import MongoClient
from pymongo.cursor import Cursor
from scripts.db.redis_connections import project_details_db
from scripts.logging import logger
from scripts.utils.db_name_util import get_db_name
class MongoConnect:
def __init__(self, uri):
try:
self.uri = uri
self.client = MongoClient(self.uri, connect=False)
except Exception as e:
logger.exception(str(e))
raise
def __call__(self, *args, **kwargs):
return self.client
def __repr__(self):
return f"Mongo Client(uri:{self.uri}, server_info={self.client.server_info()})"
class MongoCollectionBaseClass:
def __init__(self, mongo_client, database, collection):
self.client = mongo_client
self.database = database
self.collection = collection
self.__database = None
def __repr__(self):
return f"{self.__class__.__name__}(database={self.database}, collection={self.collection})"
@property
def project_id(self):
return self.project_id
@project_id.setter
def project_id(self, project_id):
if self.__database is None:
# storing original db name if None
self.__database = self.database
self.database = get_db_name(
redis_client=project_details_db,
project_id=project_id,
database=self.__database)
def insert_one(self, data: Dict):
"""
The function is used to inserting a document to a collection in a Mongo Database.
:param data: Data to be inserted
:return: Insert ID
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.insert_one(data)
return response.inserted_id
except Exception as e:
logger.exception(str(e))
raise
def insert_many(self, data: List):
"""
The function is used to inserting documents to a collection in a Mongo Database.
:param data: List of Data to be inserted
:return: Insert IDs
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.insert_many(data)
return response.inserted_ids
except Exception as e:
logger.exception(str(e))
raise
def find(self,
query: Dict,
filter_dict: Optional[Dict] = None,
sort=None,
collation: Optional[bool] = False,
skip: Optional[int] = 0,
limit: Optional[int] = None) -> Cursor:
"""
The function is used to query documents from a given collection in a Mongo Database
:param query: Query Dictionary
:param filter_dict: Filter Dictionary
:param sort: List of tuple with key and direction. [(key, -1), ...]
:param collation: can add rules for lettercase and accent marks.
:param skip: Skip Number
:param limit: Limit Number
:return: List of Documents
"""
if sort is None:
sort = list()
if filter_dict is None:
filter_dict = {"_id": 0}
database_name = self.database
collection_name = self.collection
try:
db = self.client[database_name]
collection = db[collection_name]
if len(sort) > 0:
cursor = collection.find(query, filter_dict, ).sort(sort).skip(skip)
else:
cursor = collection.find(query, filter_dict, ).skip(skip)
if limit:
cursor = cursor.limit(limit)
if collation:
cursor = cursor.collation({"locale": "en"})
return cursor
except Exception as e:
logger.exception(str(e))
raise
def count_documents(self,
query: Dict,
limit: Optional[int] = 1) -> Cursor:
"""
The function is used to count documents from a given collection in a Mongo Database
:param query: Query Dictionary
:param limit: Limit Number
:return: List of Documents
"""
database_name = self.database
collection_name = self.collection
try:
db = self.client[database_name]
collection = db[collection_name]
cursor = collection.count_documents(query, limit=limit)
return cursor
except Exception as e:
logger.exception(str(e))
raise
def find_one(self,
query: Dict,
filter_dict: Optional[Dict] = None):
try:
database_name = self.database
collection_name = self.collection
if filter_dict is None:
filter_dict = {"_id": 0}
db = self.client[database_name]
collection = db[collection_name]
response = collection.find_one(query, filter_dict)
return response
except Exception as e:
logger.exception(str(e))
raise
def update_one(self,
query: Dict,
data: Dict,
upsert: bool = False):
"""
:param upsert:
:param query:
:param data:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.update_one(query, {"$set": data}, upsert=upsert)
return response.modified_count
except Exception as e:
logger.exception(str(e))
raise
def update_to_set(self,
query: Dict,
param: str,
data: Dict,
upsert: bool = False):
"""
:param upsert:
:param query:
:param param:
:param data:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.update_one(query, {"$addToSet": {param: data}}, upsert=upsert)
return response.modified_count
except Exception as e:
logger.exception(str(e))
raise
def update_many(self,
query: Dict,
data: Dict,
upsert: bool = False):
"""
:param upsert:
:param query:
:param data:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.update_many(query, {"$set": data}, upsert=upsert)
return response.modified_count
except Exception as e:
logger.exception(str(e))
raise
def delete_many(self, query: Dict):
"""
:param query:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.delete_many(query)
return response.deleted_count
except Exception as e:
logger.exception(str(e))
raise
def delete_one(self, query: Dict):
"""
:param query:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.delete_one(query)
return response.deleted_count
except Exception as e:
logger.exception(str(e))
raise
def distinct(self, query_key: str, filter_json: Optional[Dict] = None):
"""
:param query_key:
:param filter_json:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.distinct(query_key, filter_json)
return response
except Exception as e:
logger.exception(str(e))
raise
def aggregate(self, pipelines: List, ):
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.aggregate(pipelines)
return response
except Exception as e:
logger.exception(str(e))
raise
class MongoAggregateBaseClass:
def __init__(self, mongo_client, database, ):
self.client = mongo_client
self.database = database
def aggregate(self, collection, pipelines: List, ):
try:
database_name = self.database
collection_name = collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.aggregate(pipelines)
return response
except Exception as e:
logger.exception(str(e))
raise
import json
import paho.mqtt.client as mqtt
from scripts.config import MQTTConf
from scripts.logging import logger
def on_connect(rc):
logger.debug("Publisher Connected with result code " + str(rc))
def push_notification(notification, user_id):
try:
client = mqtt.Client()
client.on_connect = on_connect
client.connect(MQTTConf.host, MQTTConf.port, 30)
topic = f"{MQTTConf.publish_base_topic}/{user_id}/reports"
if not client.is_connected():
client.reconnect()
client.publish(topic, json.dumps(notification), retain=False, qos=1)
logger.info(f"Notification message published to {topic}")
logger.debug(f"Notification: {notification}")
client.disconnect()
return True
except Exception as e:
logger.exception(f"Exception at MQTT Publish: {e}")
return False
import importlib
import logging
from fastapi.encoders import jsonable_encoder
from sqlalchemy import Text, create_engine
from sqlalchemy.orm import Session
from scripts.config import DBConf
class SQLDBUtils:
def __init__(self, db: Session):
self.session: Session = db
def close(self):
logging.debug("SQL Session closed")
self.session.close()
@property
def key_filter_expression(self):
return "expression"
@property
def key_filter_column(self):
return "column"
@property
def key_filter_value(self):
return "value"
def add_data(self, table):
self.session.add(table)
self.session.commit()
self.session.refresh(table)
return True
def bulk_insert(self, object_models):
self.session.bulk_save_objects(object_models)
self.session.commit()
return True
def filter_expression(self):
filter_expression = self.filter.get(self.key_filter_expression, "eq")
logging.debug(f"Filter expression: {filter_expression}")
return filter_expression
def filter_column(self):
column = self.filter.get(self.key_filter_column, None)
logging.debug(f"Filter column: {column}")
return column
def filter_value(self):
filter_value = self.filter.get(self.key_filter_value, None)
logging.debug(f"Filter value: {filter_value}")
return filter_value
def _filter(self, session_query, filters=None):
if filters is not None:
for _filter in filters:
self.filter = _filter
if self.filter_column() is None:
continue
session_query = self.get_session_query(session_query=session_query)
return session_query
def get_session_query(self, session_query):
try:
if self.filter_expression() == "eq":
session_query = session_query.filter(
self.filter_column() == self.filter_value()
)
if self.filter_expression() == "le":
session_query = session_query.filter(
self.filter_column() < self.filter_value()
)
if self.filter_expression() == "ge":
session_query = session_query.filter(
self.filter_column() > self.filter_value()
)
if self.filter_expression() == "lte":
session_query = session_query.filter(
self.filter_column() <= self.filter_value()
)
if self.filter_expression() == "gte":
session_query = session_query.filter(
self.filter_column() >= self.filter_value()
)
if self.filter_expression() == "neq":
session_query = session_query.filter(
self.filter_column() != self.filter_value()
)
if self.filter_expression() == "none":
session_query = session_query.filter(
self.filter_column().is_(None)
)
if self.filter_expression() == "is_none":
session_query = session_query.filter(
self.filter_column().is_not(None)
)
except Exception as e:
logging.error(f"Error occurred while filtering the session query {e}")
return session_query
def insert_one(self, table, insert_json):
try:
row = table()
for k in insert_json:
setattr(row, k, insert_json[k])
self.session.merge(row)
self.session.commit()
return True
except Exception as e:
logging.error(f"Error while inserting the record {e}")
raise
def update(self, table, update_json, filters=None, insert=False, insert_id=None, update_one=False):
try:
logging.debug(filters)
row = self.session.query(table)
filtered_row = self._filter(session_query=row, filters=filters)
if update_one:
filtered_row = filtered_row.first()
if filtered_row is None:
logging.debug("There are no rows meeting the given update criteria.")
if insert:
logging.debug("Trying to insert a new record")
if insert_id is None:
logging.warning(
"ID not provided to insert record. Skipping insert."
)
return False
else:
update_json.update(insert_id)
if self.insert_one(table=table, insert_json=update_json):
return True
else:
return False
else:
return False
else:
logging.debug("Record available to update")
for k in update_json:
if not update_json[k]:
continue
setattr(filtered_row, k, update_json[k])
if not update_one:
filtered_row.update(values=update_json)
self.session.commit()
except Exception as e:
logging.error(f"Error while updating the record {e}")
raise
def delete(self, table, filters=None):
try:
# logging.trace(filters)
row = self.session.query(table)
filtered_row = self._filter(session_query=row, filters=filters)
if filtered_row is None:
logging.debug("There were no records to be deleted")
else:
filtered_row.delete()
self.session.commit()
return True
except Exception as e:
logging.error(f"Failed to delete a record {e}")
raise
def distinct_values_by_column(self, table, column, filters=None):
query = self.session.query(getattr(table, column).distinct().label(column))
query = self._filter(session_query=query, filters=filters)
distinct_values = [getattr(row, column) for row in query.all()]
return distinct_values
def select_from_table(self, table=None, query=None, find_one=False):
if query is None:
query = f"select * from {table}"
result = self.session.execute(query)
for i in result:
print(i)
response = [dict(zip(row.keys(), row.values())) for row in result]
if find_one and response:
return response[0]
return response
def fetch_from_table(self, table, filter_text, limit_value, skip_value):
logging.debug(filter_text)
row = (
self.session.query(table)
.filter(Text(filter_text))
.limit(limit_value)
.offset(skip_value)
)
result = self.session.execute(row)
return [dict(zip(row.keys(), row.values())) for row in result]
def execute_query(self, session, table=None, query=None):
try:
if query is None:
query = f"select * from {table}"
result = self.session.execute(query)
# output = [dict(zip(row.keys(), row.values())) for row in result]
output = [x for x in result]
self.session.close()
return output
except Exception as e:
logging.error(f"Error occurred during execute_query: {e}")
def fetch_query(self, query):
try:
result = self.session.execute(query)
output = [jsonable_encoder(x) for x in result]
self.session.close()
return output
except Exception as e:
logging.error(f"Error occurred during execute_query: {e}")
def create_table(table_name):
try:
engine = create_engine(DBConf.POSTGRES_URI, echo=True)
if not engine.dialect.has_table(engine, table_name):
table_models = importlib.import_module('scripts.db.db_models')
ORMTable = getattr(table_models, table_name)
ORMTable.__table__.create(bind=engine, checkfirst=True)
except Exception as e:
logging.error(f"Error occurred during start-up: {e}", exc_info=True)
return True
import base64
from Cryptodome import Random
from Cryptodome.Cipher import AES
class AESCipher:
"""
A classical AES Cipher. Can use any size of data and any size of password thanks to padding.
Also ensure the coherence and the type of the data with a unicode to byte converter.
"""
def __init__(self, key):
self.bs = 16
self.key = AESCipher.str_to_bytes(key)
@staticmethod
def str_to_bytes(data):
u_type = type(b''.decode('utf8'))
if isinstance(data, u_type):
return data.encode('utf8')
return data
def _pad(self, s):
return s + (self.bs - len(s) % self.bs) * AESCipher.str_to_bytes(chr(self.bs - len(s) % self.bs))
@staticmethod
def _unpad(s):
return s[:-ord(s[len(s) - 1:])]
def encrypt(self, raw):
raw = self._pad(AESCipher.str_to_bytes(raw))
iv = Random.new().read(AES.block_size)
cipher = AES.new(self.key, AES.MODE_CBC, iv)
return base64.b64encode(iv + cipher.encrypt(raw)).decode('utf-8')
def decrypt(self, enc):
enc = base64.b64decode(enc)
iv = enc[:AES.block_size]
cipher = AES.new(self.key, AES.MODE_CBC, iv)
data = self._unpad(cipher.decrypt(enc[AES.block_size:]))
return data.decode('utf-8')
import uuid
from datetime import timedelta, datetime
from scripts.constants import Secrets
from scripts.db.redis_connections import login_db
from scripts.utils.security_utils.jwt_util import JWT
jwt = JWT()
def create_token(user_id, ip, token, age=Secrets.LOCK_OUT_TIME_MINS, login_token=None):
"""
This method is to create a cookie
"""
try:
uid = login_token
if not uid:
uid = str(uuid.uuid4()).replace("-", "")
payload = {
"ip": ip,
"user_id": user_id,
"token": token,
"uid": uid,
"age": age
}
exp = datetime.utcnow() + timedelta(minutes=age)
_extras = {"iss": Secrets.issuer, "exp": exp}
_payload = {**payload, **_extras}
new_token = jwt.encode(_payload)
# Add session to redis
login_db.set(uid, new_token)
login_db.expire(uid, timedelta(minutes=age))
return uid
except Exception:
raise
from typing import Optional
from fastapi import Response, Request
from fastapi.openapi.models import APIKey, APIKeyIn
from fastapi.security.api_key import APIKeyBase, APIKeyCookie
from pydantic import BaseModel
class MetaInfoSchema(BaseModel):
project_id: Optional[str] = ""
user_id: Optional[str] = ""
language: Optional[str] = ""
class MetaInfoCookie(APIKeyBase):
"""
Project ID backend using a cookie.
"""
scheme: APIKeyCookie
cookie_name: str
def __init__(self, cookie_name: str = "projectId"):
super().__init__()
self.model: APIKey = APIKey(**{"in": APIKeyIn.cookie}, name=cookie_name)
self.cookie_name = cookie_name
self.scheme_name = self.__class__.__name__
self.scheme = APIKeyCookie(name=self.cookie_name, auto_error=False)
async def __call__(self, request: Request, response: Response):
cookies = request.cookies
cookie_json = {
"projectId": cookies.get("projectId", request.headers.get("projectId")),
"userId": cookies.get("user_id", cookies.get("userId", request.headers.get("userId"))),
"language": cookies.get("language", request.headers.get("language"))
}
return MetaInfoSchema(project_id=cookie_json["projectId"], user_id=cookie_json["userId"],
language=cookie_json["language"])
@staticmethod
def set_response_info(cookie_name, cookie_value, response: Response):
response.set_cookie(
cookie_name,
cookie_value,
samesite="strict",
httponly=True
)
response.headers[cookie_name] = cookie_value
from secrets import compare_digest
from fastapi import Response, Request, HTTPException
from fastapi.openapi.models import APIKey, APIKeyIn
from fastapi.security import APIKeyCookie
from fastapi.security.api_key import APIKeyBase
from scripts.config import Service
from scripts.constants import Secrets
from scripts.db.redis_connections import login_db
from scripts.utils.security_utils.apply_encrytion_util import create_token
from scripts.utils.security_utils.jwt_util import JWT
class CookieAuthentication(APIKeyBase):
"""
Authentication backend using a cookie.
Internally, uses a JWT token to store the data.
"""
scheme: APIKeyCookie
cookie_name: str
cookie_secure: bool
def __init__(
self,
cookie_name: str = "login-token",
):
super().__init__()
self.model: APIKey = APIKey(**{"in": APIKeyIn.cookie}, name=cookie_name)
self.scheme_name = self.__class__.__name__
self.cookie_name = cookie_name
self.scheme = APIKeyCookie(name=self.cookie_name, auto_error=False)
self.login_redis = login_db
self.jwt = JWT()
async def __call__(self, request: Request, response: Response) -> str:
cookies = request.cookies
login_token = cookies.get("login-token")
if not login_token:
login_token = request.headers.get("login-token")
if not login_token:
raise HTTPException(status_code=401)
jwt_token = self.login_redis.get(login_token)
if not jwt_token:
raise HTTPException(status_code=401)
try:
decoded_token = self.jwt.validate(token=jwt_token)
if not decoded_token:
raise HTTPException(status_code=401)
except Exception as e:
raise HTTPException(status_code=401, detail=e.args)
user_id = decoded_token.get("user_id")
_token = decoded_token.get("token")
if not compare_digest(Secrets.token, _token):
raise HTTPException(status_code=401)
if login_token != decoded_token.get("uid"):
raise HTTPException(status_code=401)
try:
new_token = create_token(
user_id=user_id,
ip=request.client.host,
token=Secrets.token,
login_token=login_token,
)
except Exception as e:
raise HTTPException(status_code=401, detail=e.args)
response.set_cookie(
'login-token',
new_token,
samesite='strict',
secure=Service.secure_cookie,
httponly=True,
max_age=Secrets.LOCK_OUT_TIME_MINS * 60,
)
response.headers['login-token'] = new_token
return user_id
import jwt
from jwt.exceptions import (
InvalidSignatureError,
ExpiredSignatureError,
MissingRequiredClaimError,
)
from scripts.config import KeyPath
from scripts.constants import Secrets
from scripts.errors import AuthenticationError, ErrorMessages
from scripts.logging import logger
class JWT:
def __init__(self):
self.max_login_age = Secrets.LOCK_OUT_TIME_MINS
self.issuer = Secrets.issuer
self.alg = Secrets.alg
self.public = KeyPath.public
self.private = KeyPath.private
def encode(self, payload):
try:
with open(self.private, "r") as f:
key = f.read()
return jwt.encode(payload, key, algorithm=self.alg)
except Exception as e:
logger.exception(f'Exception while encoding JWT: {str(e)}')
raise
finally:
f.close()
def validate(self, token):
try:
with open(self.public, "r") as f:
key = f.read()
payload = jwt.decode(
token,
key,
algorithms=self.alg,
leeway=Secrets.leeway_in_mins,
options={"require": ["exp", "iss"]},
)
return payload
except InvalidSignatureError:
raise AuthenticationError(ErrorMessages.ERROR003)
except ExpiredSignatureError:
raise AuthenticationError(ErrorMessages.ERROR002)
except MissingRequiredClaimError:
raise AuthenticationError(ErrorMessages.ERROR002)
except Exception as e:
logger.exception(f'Exception while validating JWT: {str(e)}')
raise
finally:
f.close()
import pytz
if __name__ == '__main__':
from dotenv import load_dotenv
load_dotenv()
import time
from datetime import datetime
from production_monitoring import ProductionMonitor
production_mon = ProductionMonitor()
def calculate_setup_time():
data = production_mon.oee_mongo.find_record_by_status("started")
if not data:
print("No data found, waiting for batch to start")
return
tz = data.get("form_details", {}).get("tz", "Asia/Bangkok")
prod_start_time = datetime.fromtimestamp(
int(data.get("start_time") / 1000),
tz=pytz.timezone(tz))
setup_time = production_mon.calculate_setup_time(prod_start_time, tz)
data["setup_time"] = setup_time
data["run_start_time"] = int(time.time() * 1000)
data["prod_status"] = "running"
production_mon.oee_mongo.update_oee(data, data.get("job", ""), data.get("uf_process", ""), False)
if __name__ == '__main__':
while True:
calculate_setup_time()
time.sleep(10)
deployment:
environmentVar:
- name: MONGO_URI
valueFrom:
secretKeyRef:
name: mongo-creds
key: MONGO_URI
- name: POSTGRES_URI
value: "postgresql://ilens:iLens#4321@postgres-db-service.ilens-infra:5432/"
- name: META_API
value: "http://metadata-service.ilens-core:8989"
- name: CORS_URLS
value: "staging.ilens.io"
- name: SW_DOCS_URL
value: "/docs"
- name: SW_OPENAPI_URL
value: "/openapi.json"
- name: ENABLE_CORS
value: "True"
- name: CLIENT_URI
value: "mssql+pymssql://rik:@192.168.0.2:1433/"
- name: FORM_API
value: "http://form-management.ilens-core.svc.cluster.local:5121/"
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment