Commit 5bf30852 authored by Yuvaraj Subramanian's avatar Yuvaraj Subramanian

yolo model

parent 76547106
# Default ignored files
/shelf/
/workspace.xml
<component name="ProjectDictionaryState">
<dictionary name="Akshay G">
<words>
<w>idxs</w>
</words>
</dictionary>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Python 3.7 (SRF-Cylinders) (2)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PackageRequirementsSettings">
<option name="removeUnused" value="true" />
</component>
</module>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyPackageRequirementsInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredPackages">
<value>
<list size="3">
<item index="0" class="java.lang.String" itemvalue="pyaml" />
<item index="1" class="java.lang.String" itemvalue="flask_cors" />
<item index="2" class="java.lang.String" itemvalue="numpy" />
</list>
</value>
</option>
</inspection_tool>
<inspection_tool class="PyPep8NamingInspection" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<option name="ignoredErrors">
<list>
<option value="N801" />
</list>
</option>
</inspection_tool>
<inspection_tool class="PyUnreachableCodeInspection" enabled="false" level="WARNING" enabled_by_default="false" />
<inspection_tool class="PyUnresolvedReferencesInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredIdentifiers">
<list>
<option value="app.flask_cors" />
<option value="requests" />
</list>
</option>
</inspection_tool>
</profile>
</component>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.7 (SRF-Cylinders) (2)" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/edge_app_v1.0.iml" filepath="$PROJECT_DIR$/.idea/edge_app_v1.0.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
FROM azracrilensai.azurecr.io/repository/ilens-ai/ilens-openvino-base:v1
RUN apt-get update
RUN apt-get install tzdata
RUN pip3 install --upgrade pip
RUN pip3 install minio cachetools
RUN pip3 install expiringdict
RUN pip3 install torch==1.6.0
RUN pip3 install torchvision==0.7.0 numpy pillow imutils pyYAML opencv-python
RUN pip3 install yolov5processor==0.0.3
ADD . /app
WORKDIR /app
CMD ["bash","app.sh"]
#import os
#os.environ["config"] = '{"MONGO_URI": "mongodb://svc-ilens:svc2345@192.168.3.220:21017","MONGO_DATABASE": "ilens_wps", "MONGO_DB": "ilens_wps", ' \
# '"MONGO_COLLECTION": "janusDeployment", "MONGO_KEY": "deploymentId", "MONGO_VALUE": "6d971066", "MONGO_COLL": "serviceConfiguration" }'
from edge_engine.edge_processor import ExecutePipeline
from edge_engine.edge_processor import Pubs
from scripts import fruits_model
from edge_engine.common.config import EDGE_CONFIG
if __name__ == '__main__':
pubs = Pubs()
mod = fruits_model(config=EDGE_CONFIG, model_config=EDGE_CONFIG["modelConfig"], pubs=pubs, device_id=EDGE_CONFIG['deviceId'])
ex = ExecutePipeline(mod)
ex.run_model()
source /opt/intel/openvino/bin/setupvars.sh
python3 app.py
\ No newline at end of file
FROM nvcr.io/nvidia/pytorch:20.03-py3
#FROM flare-detection-pytorch:v2.0
RUN rm -rf /var/lib/apt/lists/* && rm -rf /root/.cache/pip/
RUN apt-get update
RUN pip uninstall torch torchvision -y
RUN apt-get install -y libsm6 libxext6 libxrender-dev ffmpeg
RUN pip install numpy==1.18.5
RUN pip install msgpack==0.5.6
RUN pip install minio requests cachetools pymongo~=3.10.1 wincertstore==0.2 opencv-contrib-python==4.2.0.34 certifi==2020.6.20 imutils Cython pillow PyYAML scipy yolov5processor==0.0.3
ADD . /app
WORKDIR /app
RUN pip install edge_engine-1.0.0a0-py3-none-any.whl
CMD ["bash","app.sh"]
\ No newline at end of file
pymongo~=3.10.1
# opencv-python~=4.2.0.34
# requests~=2.23.0
cachetools~=4.1.0
wincertstore==0.2
opencv-contrib-python==4.2.0.34
matplotlib
imutils==0.5.3
certifi==2020.6.20
from.face_model import fruits_model
import os
import sys
import json
from pymongo import MongoClient
MAIN_OS_VARIABLE = json.loads(os.environ.get('config'))
if MAIN_OS_VARIABLE is None:
sys.stderr.write("Configuration not found...")
sys.stderr.write("Exiting....")
sys.exit(1)
MONGO_URI = MAIN_OS_VARIABLE.get('MONGO_URI')
MONGO_SERVICE_DB = MAIN_OS_VARIABLE.get('MONGO_DB')
MONGO_SERVICE_COLL = MAIN_OS_VARIABLE.get('MONGO_COLL')
PASS_KEY = MAIN_OS_VARIABLE.get('PASS_KEY')
MONGO_DB_OBJ = MongoClient(MONGO_URI)[MONGO_SERVICE_DB]
HOST_CONFIG = MONGO_DB_OBJ[MONGO_SERVICE_COLL].find_one({'configId': 'hostConfig'}).get('config')
APP_MONGO_COLLECTION = MONGO_DB_OBJ[MONGO_SERVICE_COLL].find_one({'configId': 'appMongoConfig'}).get('config')
DEPLOYMENT_CONFIG = MONGO_DB_OBJ[MONGO_SERVICE_COLL].find_one({'configId': 'deploymentMongoConfig'}).get('config')
MINIO_CONFIG = MONGO_DB_OBJ[MONGO_SERVICE_COLL].find_one({'configId': 'minioConfig'}).get('config')
UI_CONFIG = MONGO_DB_OBJ[MONGO_SERVICE_COLL].find_one({'configId': 'uiConfiguration'}).get('config')
REGISTRY_CONFIG = MONGO_DB_OBJ[MONGO_SERVICE_COLL].find_one({'configId': 'registryConfig'}).get('config')
JANUS_CONFIG = MONGO_DB_OBJ[MONGO_SERVICE_COLL].find_one({'configId': 'janusConfig'}).get('config')
MODEL_SERVER_CONFIG = MONGO_DB_OBJ[MONGO_SERVICE_COLL].find_one({'configId': 'modelServerConfig'}).get('config')
MODEL_TRAINER_CONFIG = MONGO_DB_OBJ[MONGO_SERVICE_COLL].find_one({'configId': 'modelTrainerConfig'}).get('config')
MS_SERVER_CONFIG = MONGO_DB_OBJ[MONGO_SERVICE_COLL].find_one({'configId': 'msServerConfig'}).get('config')
THERMAL_CAM_CONFIG = MONGO_DB_OBJ[MONGO_SERVICE_COLL].find_one({'configId': 'thermalCamera'}).get('config')
class ModelConstants:
sink_layer = {'cls': 'cls_branch_concat_1/concat', 'bbox': 'loc_branch_concat_1/concat'}
model_detector_pth = "./data/model/face_mask_detection.xml"
id2class = {0: 'Mask', 1: 'No Mask'}
id2bool = {0: True, 1: False}
exec_net_device_name = 'CPU'
fallback_initial_temperature = 98.3
net_precision = 'FP32'
\ No newline at end of file
from edge_engine.ai.model.modelwraper import ModelWrapper
from scripts.utils.infocenter import MongoLogger
from edge_engine.common.logsetup import logger
from scripts.utils.notify_infocenter import NotificationFilter
import cv2
import base64
import numpy as np
from imutils.video import FPS
import datetime
import traceback
# from scripts.utils.yolo_params import YoloParams
from yolov5processor.infer import ExecuteInference
import os
from math import exp as exp
import time
try:
import urlparse
except ImportError:
import urllib.parse as urlparse
class fruits_model(ModelWrapper):
def __init__(self, config, model_config, pubs, device_id):
super().__init__()
"""
init function
"""
self.config = config["config"]
self.device_id = device_id
self.rtp = pubs.rtp_write
logger.info("Loading model to the device")
self.mongologger = MongoLogger()
self.notify_bool = NotificationFilter(ttl_value=model_config.get('notification_ttl_value_sec'))
self.base_model_path = 'scripts/model/'
print("[INFO] loading yolov5-Fruits Detection Model")
self.model_detector_pth = os.path.join(self.base_model_path, "fruits_300.pt")
self.yp = ExecuteInference(weight=self.model_detector_pth)
# self.all_pred = []
# self.count = 0
# self.max_val = 0
# self.length_list = []
# self.frame_count = 0
# self.pre_time = time.time()
def _pre_process(self, x):
"""
Do preprocessing here, if any
:param x: payload
:return: payload
"""
return x
def _post_process(self, x):
"""
Apply post processing here, if any
:param x: payload
:return: payload
"""
self.rtp.publish(x) # video stream
return x
def send_payload(self, message, frame, label, bg_color, font_color, alert_sound):
"""
For selective sending of notification to the infocenter
:param _emp_name: employee name
:param _emp_temp: employee temperature recorded
:param _emp_id: employee id
:param message: message tp be shown
:param temp_exceedence_check: boolean temperature excedence or not
:param class_idx: str class mapping for mask detection
:param temperature_flag: risky or safe str
:param croped_face: cropped frame of face
:return: None
"""
payload = {"deviceId": self.device_id, "message": message,
"frame": 'data:image/jpeg;base64,' + base64.b64encode(
cv2.imencode('.jpg', frame)[1].tostring()).decode("utf-8"), "activity": label,
"bg_color": bg_color, "font_color": font_color, "alert_sound": alert_sound}
self.mongologger.insert_attendance_event_to_mongo(payload)
def _predict(self, obj):
try:
frame = obj['frame']
orig_image = cv2.resize(frame, (640, 400))
frame = self.process_frame(orig_image)
curr_time = time.time()
#logger.info("Time for 1 frame: {}".format(curr_time - self.pre_time))
#self.pre_time = curr_time
obj['frame'] = cv2.resize(frame, (self.config.get('FRAME_WIDTH'), self.config.get('FRAME_HEIGHT')))
#timestamp = datetime.datetime.now().replace(microsecond=0).isoformat()
#obj["timestamp"] = timestamp
#logger.info("cpu percent--> ", psutil.cpu_percent(), "cpu count--> ", str(psutil.cpu_count()))
#logger.info("virtual memory--> ", psutil.virtual_memory())
except Exception as e:
logger.exception(f"Error: {e}")
obj['frame'] = cv2.resize(obj['frame'], (self.config.get('FRAME_WIDTH'), self.config.get('FRAME_HEIGHT')))
traceback.print_exc()
obj["error"] = "{}".format(e)
obj["message"] = "{}".format("error processing frame")
obj["status"] = False
obj["timestamp"] = datetime.datetime.now().replace(microsecond=0).isoformat()
return obj
# @staticmethod
# def update_net(net1):
# for input_key in net1.inputs:
# if len(net1.inputs[input_key].layout) == 4:
# input_name = input_key
# # logger.info("Batch size is {}".format(net1.batch_size))
# # net.inputs[input_key].precision = 'FP16'
# elif len(net1.inputs[input_key].layout) == 2:
# input_info_name = input_key
# net1.inputs[input_key].precision = 'FP32'
# if net1.inputs[input_key].shape[1] != 3 and net1.inputs[input_key].shape[1] != 6 or \
# net1.inputs[input_key].shape[
# 0] != 1:
# pass
# # logger.error('Invalid input info. Should be 3 or 6 values length.')
# # --------------------------- Prepare output blobs ----------------------------------------------------
# # logger.info('Preparing output blobs')
# output_info = net1.outputs[next(iter(net1.outputs.keys()))]
# output_info.precision = "FP32"
# for output_key in net1.outputs:
# output_name, output_info = output_key, net1.outputs[output_key]
# # print("output")
# # print(output_name, output_info)
# # -----------------------------------------------------------------------------------------------------
# def check_zone(self, point, frame_size):
# frame_size = [frame_size[1], frame_size[0]]
# if point[0] <= frame_size[0] / 2 and point[1] <= frame_size[1] / 2:
# return 'Vilolation in Zone 1'
# elif (point[0] > frame_size[0] / 2 and point[1] < frame_size[1] / 2):
# return 'Vilolation in Zone 2'
# elif (point[0] > frame_size[0] / 2 and point[1] > frame_size[1] / 2):
# return 'Vilolation in Zone 3'
# else:
# return 'Vilolation in Zone 4'
def send_payload(self, message, frame, label, bg_color, font_color, alert_sound):
"""
For selective sending of notification to the infocenter
:param _emp_name: employee name
:param _emp_temp: employee temperature recorded
:param _emp_id: employee id
:param message: message tp be shown
:param temp_exceedence_check: boolean temperature excedence or not
:param class_idx: str class mapping for mask detection
:param temperature_flag: risky or safe str
:param croped_face: cropped frame of face
:return: None
"""
frame = cv2.resize(frame, (64, 64))
payload = {"deviceId": self.device_id, "message": message,
"frame": 'data:image/jpeg;base64,' + base64.b64encode(
cv2.imencode('.jpg', frame)[1].tostring()).decode("utf-8"), "activity": label,
"bg_color": bg_color, "font_color": font_color, "alert_sound": alert_sound}
self.mongologger.insert_attendance_event_to_mongo(payload)
def process_frame(self, frame):
pred = self.yp.predict(frame)
for det in pred:
conf = "{:.2f}".format(int(det['conf'] * 100))
if (int(det['conf'] * 100) > 60):
label = int(det['class'].item())
# print(label.item())
det = det['points']
if (label == 0):
cv2.rectangle(frame, (int(det[0]), int(det[1])), (int(det[2]), int(det[3])), (0, 0, 255), 2)
cv2.putText(img=frame, text="Apple: Red Delicious " + conf, org=(int(det[0]), int(det[1]) - 10),
color=(0, 0, 255), thickness=2,
fontScale=0.5, fontFace=cv2.LINE_AA)
elif (label == 1):
cv2.rectangle(frame, (int(det[0]), int(det[1])), (int(det[2]), int(det[3])), (255, 0, 255), 2)
cv2.putText(img=frame, text="Apple: Gaya " + conf, org=(int(det[0]), int(det[1]) - 10),
color=(255, 0, 255),
thickness=2,
fontScale=0.5, fontFace=cv2.LINE_AA)
elif (label == 2):
cv2.rectangle(frame, (int(det[0]), int(det[1])), (int(det[2]), int(det[3])), (0, 255, 0), 2)
cv2.putText(img=frame, text="Apple: Granny Smith " + conf, org=(int(det[0]), int(det[1]) - 10),
color=(0, 255, 0),
thickness=2,
fontScale=0.5, fontFace=cv2.LINE_AA)
elif (label == 3):
cv2.rectangle(frame, (int(det[0]), int(det[1])), (int(det[2]), int(det[3])), (51, 87, 255), 2)
cv2.putText(img=frame, text="Orange " + conf, org=(int(det[0]), int(det[1]) - 10),
color=(51, 87, 225),
thickness=2,
fontScale=0.5, fontFace=cv2.LINE_AA)
elif (label == 4):
cv2.rectangle(frame, (int(det[0]), int(det[1])), (int(det[2]), int(det[3])), (0, 195, 255), 2)
cv2.putText(img=frame, text="Mango " + conf, org=(int(det[0]), int(det[1]) - 10),
color=(0, 195, 255),
thickness=2,
fontScale=0.5, fontFace=cv2.LINE_AA)
elif (label == 5):
cv2.rectangle(frame, (int(det[0]), int(det[1])), (int(det[2]), int(det[3])), (0, 0, 128), 2)
cv2.putText(img=frame, text="Kiwi Actinidia Deliciosa " + conf, org=(int(det[0]), int(det[1]) - 10),
color=(0, 0, 128),
thickness=2,
fontScale=0.5, fontFace=cv2.LINE_AA)
elif (label == 6):
cv2.rectangle(frame, (int(det[0]), int(det[1])), (int(det[2]), int(det[3])), (0, 128, 0), 2)
cv2.putText(img=frame, text="Kiwi Hardy" + conf, org=(int(det[0]), int(det[1]) - 10),
color=(0, 128, 0),
thickness=2,
fontScale=0.5, fontFace=cv2.LINE_AA)
return frame
# from.facenet_lite import FaceNetLite
from edge_engine.common.logsetup import logger
from pymongo import MongoClient
from scripts.common.config import MONGO_DB_OBJ, APP_MONGO_COLLECTION
class AlarmUtils:
def __init__(self):
self.attendance_alarm_configuration_collection = MONGO_DB_OBJ[
APP_MONGO_COLLECTION.get('alarmConfigurationCollection')]
self.attendance_tag_collection = MONGO_DB_OBJ[APP_MONGO_COLLECTION.get('tagConfiguration')]
self.attendance_alarm_priority_collection = MONGO_DB_OBJ[APP_MONGO_COLLECTION.get('alarmPriorityCollection')]
def get_alarms(self):
alarms = [each for each in self.attendance_alarm_configuration_collection.find({}, {'_id': 0})]
return alarms
def get_priority_name(self, priority):
data = self.attendance_alarm_priority_collection.find_one({'id': priority}, {'_id': 0})
priority_name = data.get('priority_name')
return priority_name
def build_condition_string(self, rule):
condition = rule['condition']
tag_name = self.attendance_tag_collection.find_one({'tag_id': rule['leftHandSide']['tag']}, {'Tag Name': 1})[
'Tag Name']
condition_value = rule['rightHandSide']['customValue']
condition_name = f"{tag_name} {condition} {condition_value}"
return condition_name, tag_name
def build_message(self, each_alarm, tag_value):
alarmTemplate = each_alarm['alarmTemplate']
alarmTemplate = alarmTemplate.replace('[Tag Name]',
self.build_condition_string(each_alarm['ruleSets'][0]['rules'][0])[1])
alarmTemplate = alarmTemplate.replace('[Tag Value]', str(tag_value))
return alarmTemplate
def process_data(self, tag_value, av_alarms):
# add mongo query to fetch alarms
if av_alarms:
event_decision = {
"Safe": False,
"Risky": False,
"Message": self.build_message(av_alarms[0], tag_value)
}
for each_alarm in av_alarms:
try:
event_decision = self.process_alarm(alarm_record=each_alarm, tag_value=tag_value,
event_decision=event_decision)
except Exception as e:
raise Exception(str(e))
else:
event_decision = {
"Safe": False,
"Risky": False,
"Message": ""
}
logger.debug(" no alarms found")
return event_decision
def process_alarm(self, alarm_record, tag_value, event_decision):
try:
try:
rule_set_operation = "and" if alarm_record["ruleSetAndOrOperationData"][
"isAnd"] else "or"
rule_set_outputs = []
for each_rule_set in alarm_record["ruleSets"]:
try:
rule_operation = "and" if each_rule_set["ruleAndOrOperationData"][
"isAnd"] else "or"
rule_output = []
for each_rule in each_rule_set["rules"]:
try:
primary_output = self.alarm_rule_parser(each_rule, tag_value)
if primary_output is not None:
rule_output.append(primary_output)
except Exception as e:
logger.exception("Exception in parsing rule:" + str(e))
temp_output = self.aggregate_result(rule_operation, rule_output)
if temp_output is not None:
rule_set_outputs.append(temp_output)
except Exception as e:
logger.exception("Exception during parsing of individual rule set:" + str(e))
final_output = self.aggregate_result(rule_set_operation, rule_set_outputs)
# print("alarm condition output:", final_output)
logger.debug(
"{} {} {} {}".format(self.build_condition_string(alarm_record['ruleSets'][0]['rules'][0])[0],
tag_value,
self.get_priority_name(alarm_record['priority']),
final_output))
if self.get_priority_name(alarm_record['priority']) == 'Safe' and final_output == True:
event_decision['Safe'] = True
elif self.get_priority_name(alarm_record['priority']) == 'Risky' and final_output == True:
event_decision['Risky'] = True
# if final output is True, condition satisfied, if not condition not satisfied
return event_decision
except Exception as e:
logger.exception("Exception during alarm rule parsing:" + str(e))
except Exception as e:
logger.exception("Exception during alarm rule parsing:" + str(e))
# logger.debug("EXIT: Alarm Parser")
def get_tag_value(self, tag_value):
try:
return tag_value
except Exception as e:
logger.exception(" Unable to fetch tag value:" + str(e))
def alarm_rule_parser(self, rule, tag_value):
try:
left_side_value = self.get_tag_value(tag_value)
right_opr = str(rule["rightHandSide"][rule["rightHandSide"]["compareOption"]])
final_opr = ""
final_opr += str(left_side_value) + " "
final_opr += rule["condition"] + " "
final_opr += str(right_opr) + " "
rule_result = eval(final_opr)
return rule_result
except Exception as e:
logger.exception("Exception in parsing rule in alarms:" + str(e))
return None
def aggregate_result(self, condition, output):
try:
final_response = ""
if len(output) == 0:
logger.debug("empty array found inside aggregate result")
return None
# logger.debug("OUTPUT:" + str(output))
for i in range(0, len(output)):
final_response += str(output[i]) + " "
if i <= len(output) - 2:
final_response += condition + " "
final_response = eval(final_response)
# logger.debug("Result: " + str(final_response))
return final_response
except Exception as e:
logger.exception("Exception in parsing:" + str(e))
return None
import traceback
from datetime import datetime
from uuid import uuid1
from pymongo import MongoClient
from scripts.common.config import MONGO_DB_OBJ, APP_MONGO_COLLECTION
from edge_engine.common.logsetup import logger
class MongoLogger:
def __init__(self):
self.attendance_event_collection = MONGO_DB_OBJ[APP_MONGO_COLLECTION.get('eventLogCollection')]
self.camera_configuration = MONGO_DB_OBJ[APP_MONGO_COLLECTION.get('cameraConfigurationCollection')]
self.camera_mapping_json = self.get_all_cameras()
def get_all_cameras(self):
camera_mapping_json = self.camera_configuration.find({'decommissioned': False},
{"_id": 0})
camera_json = {}
for each in camera_mapping_json:
camera_json[each['cameraId']] = each['cameraName']
return camera_json
def insert_attendance_event_to_mongo(self, data):
try:
input_data = {
"eventId": str(uuid1()).split('-')[0],
"cameraId": data['deviceId'],
"cameraName": self.camera_mapping_json.get(data['deviceId'], "Thermal Camera"),
"timestamp": datetime.now().strftime("%b %d %Y %H:%M:%S"),
"frame": data['frame'],
"eventtype": "Intrusion Detection",
"bg_color": data["bg_color"],
"font_color": data["font_color"],
"intrusion_message": data["message"],
"alert_sound": data["alert_sound"],
"logged_activity": data["activity"]}
logger.info("Pushing to Mongo..")
input_data["timestamp"] = datetime.now()
self.attendance_event_collection.insert(input_data)
except Exception as e:
logger.info(e)
logger.error(traceback.format_exc())
from cachetools import cached, TTLCache
import time
TTL = 1
class NotificationFilter(object):
def __init__(self, ttl_value=3):
"""init function
"""
global TTL
TTL = ttl_value
self.TTL = ttl_value
self.face_id_cache = {}
def _update_cache(self, name):
"""updates the cache with a name
Args:
name (str): Name of the person
"""
self.face_id_cache[name] = time.time()
def send_notification(self, name):
"""Returns a boolean if the notification center has to be notified
or not
Args:
name (str): name of person identified
Returns:
notify (bool): True if notification to be sent False if the
notification is not to be sent
"""
self._clean_up_cache()
if name is None:
notify = False
else:
is_present = self.face_id_cache.get(name, -1)
if is_present == -1:
notify = True
self._update_cache(name)
else:
notify = False
return notify
@cached(cache=TTLCache(maxsize=1024, ttl=TTL))
def _clean_up_cache(self):
"""Cleans up the cached name at regular interval
"""
key_to_delete = []
for key, value in self.face_id_cache.items():
if time.time() - value >= self.TTL:
key_to_delete.append(key)
for key in key_to_delete:
del self.face_id_cache[key]
if __name__ == "__main__":
obj = NotificationFilter()
print(obj.send_notification("Litesh"))
print(obj.send_notification("Litesh"))
time.sleep(3)
print(obj.send_notification("Litesh"))
time.sleep(1)
print(obj.send_notification("Litesh"))
import logging as log
class YoloParams:
# ------------------------------------------- Extracting layer parameters ------------------------------------------
# Magic numbers are copied from yolo samples
def __init__(self, param, side):
self.num = 3 if 'num' not in param else int(param['num'])
self.coords = 4 if 'coords' not in param else int(param['coords'])
self.classes = 80 if 'classes' not in param else int(param['classes'])
self.side = side
self.anchors = [10.0, 13.0, 16.0, 30.0, 33.0, 23.0, 30.0, 61.0, 62.0, 45.0, 59.0, 119.0, 116.0, 90.0, 156.0,
198.0,
373.0, 326.0] if 'anchors' not in param else [float(a) for a in param['anchors'].split(',')]
self.isYoloV3 = False
if param.get('mask'):
mask = [int(idx) for idx in param['mask'].split(',')]
self.num = len(mask)
maskedAnchors = []
for idx in mask:
maskedAnchors += [self.anchors[idx * 2], self.anchors[idx * 2 + 1]]
self.anchors = maskedAnchors
self.isYoloV3 = True # Weak way to determine but the only one.
def log_params(self):
params_to_print = {'classes': self.classes, 'num': self.num, 'coords': self.coords, 'anchors': self.anchors}
[log.info(" {:8}: {}".format(param_name, param)) for param_name, param in params_to_print.items()]
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment