Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
J
jk_edge_code_api_integration
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
CI / CD Analytics
Repository Analytics
Value Stream Analytics
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
sikhin.vc
jk_edge_code_api_integration
Commits
513178d5
Commit
513178d5
authored
May 04, 2023
by
Sikhin VC
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
removing mongo dependency
parent
9faa06a5
Pipeline
#68894
canceled with stage
Changes
9
Pipelines
1
Show whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
134 additions
and
2609 deletions
+134
-2609
app.py
app.py
+1
-1
edge_engine/common/config.py
edge_engine/common/config.py
+4
-50
scripts/cement_counter.py
scripts/cement_counter.py
+8
-10
scripts/cement_counter.py.bkp
scripts/cement_counter.py.bkp
+0
-1324
scripts/cement_counter3.py
scripts/cement_counter3.py
+0
-528
scripts/cement_counter_for_acc_final - Copy.py
scripts/cement_counter_for_acc_final - Copy.py
+0
-568
scripts/common/config.py
scripts/common/config.py
+0
-6
scripts/utils/edge_utils.py
scripts/utils/edge_utils.py
+120
-121
scripts/utils/infocenter.py
scripts/utils/infocenter.py
+1
-1
No files found.
app.py
View file @
513178d5
import
os
#
os.environ["config"]="{\"TZ\": \"Asia/Kolkata\", \"MONGO_URI\": \"mongodb://svc-ilens:svc2345@192.168.3.220:21017\", \"MONGO_DATABASE\": \"ilens_wps\", \"MONGO_COLLECTION\": \"janusDeployment\", \"MONGO_KEY\": \"deploymentId\", \"MONGO_VALUE\": \"_acc_test_7b5692781\", \"MONGO_COLL\": \"serviceConfiguration\", \"MONGO_DB\": \"ilens_wps
\"}"
#
os.environ["config"]="{\"MONGO_VALUE\": \"jkv1_dba3e4e8
\"}"
from
edge_engine.edge_processor
import
ExecutePipeline
from
edge_engine.edge_processor
import
Pubs
from
scripts
import
CementBagCounter
...
...
edge_engine/common/config.py
View file @
513178d5
...
...
@@ -22,38 +22,11 @@ def licence_validator(payload):
return
False
def
get_config_from_mongo
(
mongo_uri
,
dbname
,
basecollection
,
key
,
value
):
mongo
=
MongoClient
(
mongo_uri
)
db
=
mongo
[
dbname
]
config
=
db
[
basecollection
]
.
find_one
({
key
:
value
},
{
"_id"
:
False
})
return
config
def
load_conf
(
config
,
mongo_uri
,
dbname
):
mongo
=
MongoClient
(
mongo_uri
)
db
=
mongo
[
dbname
]
pub_configs
=
[]
for
conf
in
config
[
'pubConfigs'
]:
if
conf
[
"type"
]
.
lower
()
in
[
"mqtt"
,
"mongo"
,]:
key
=
conf
[
"key"
]
value
=
conf
[
"value"
]
collection
=
conf
[
"conectionCollection"
]
pub_conf
=
db
[
collection
]
.
find_one
({
key
:
value
},
{
"_id"
:
False
})
pub_conf
.
update
(
conf
)
pub_configs
.
append
(
pub_conf
)
else
:
pub_configs
.
append
(
conf
)
config
[
'pubConfigs'
]
=
pub_configs
return
config
# """
# {
...
...
@@ -81,35 +54,16 @@ CONFIG_ENV = json.loads(os.environ.get('config', default=None))
# MONGO_COLLECTION = CONFIG_ENV.get('MONGO_COLLECTION', None)
# MONGO_KEY = CONFIG_ENV.get('MONGO_KEY', None)
DEPLOYMENT_ID
=
CONFIG_ENV
.
get
(
'MONGO_VALUE'
,
None
)
#
# if MONGO_URI == None \
# or MONGO_DATABASE is None \
# or MONGO_COLLECTION is None \
# or MONGO_KEY is None \
# or MONGO_VALUE is None:
# sys.stderr.write("invalid mongo config \n")
# sys.exit(1)
#
# EDGE_CONFIG = get_config_from_mongo(
# mongo_uri=MONGO_URI,
# dbname=MONGO_DATABASE, basecollection=MONGO_COLLECTION,
# key=MONGO_KEY, value=MONGO_VALUE
# )
#
# print("edge config1 , : ", EDGE_CONFIG)
# if EDGE_CONFIG is None:
# sys.stderr.write("invalid EDGE_CONFIG config \n")
# sys.exit(1)
#
# EDGE_CONFIG=load_conf(EDGE_CONFIG, mongo_uri=MONGO_URI,
# dbname=MONGO_DATABASE)
# deployment_id = "jkoverlappingremoval_e4958a70"
# deployment_id = os.environ.get('deployment_id')
# print("deployment id", DEPLOYMENT_ID)
url
=
f
'http://192.168.2.228:2325/custom/get_janus?deploymentId={DEPLOYMENT_ID}'
EDGE_CONFIG
=
requests
.
get
(
url
)
.
json
()[
"data"
]
LAST_COUNT
=
requests
.
get
(
url
)
.
json
()[
"latest_count"
]
print
(
"EDGE CONFIG"
)
print
(
EDGE_CONFIG
)
# print("edge config2 , : ", EDGE_CONFIG)
DEVICE_ID
=
EDGE_CONFIG
[
"deviceId"
]
DATA_PATH
=
EDGE_CONFIG
[
"inputConf"
]
.
get
(
'dataPath'
,
os
.
path
.
join
(
os
.
getcwd
(),
"data"
.
format
()))
...
...
scripts/cement_counter.py
View file @
513178d5
...
...
@@ -9,18 +9,18 @@ import numpy as np
from
edge_engine.ai.model.modelwraper
import
ModelWrapper
from
edge_engine.common.logsetup
import
logger
from
expiringdict
import
ExpiringDict
from
pymongo
import
MongoClient
# from scripts.common.config import MONGO_URI
from
scripts.common.config
import
MONGO_URI
from
scripts.common.constants
import
JanusDeploymentConstants
from
scripts.utils.centroidtracker
import
CentroidTracker
from
scripts.utils.edge_utils
import
get_extra_fields
#
from scripts.utils.edge_utils import get_extra_fields
from
scripts.utils.helpers
import
box_iou2
from
scripts.utils.image_utils
import
draw_circles_on_frame
,
resize_to_64_64
from
scripts.utils.infocenter
import
MongoLogger
from
scripts.utils.tracker
import
Tracker
from
scripts.utils.yolov5_trt
import
YoloV5TRT
#
from scripts.utils.yolov5_trt import YoloV5TRT
# from sklearn.utils.linear_assignment_ import linear_assignment
from
scipy.optimize
import
linear_sum_assignment
as
linear_assignment
from
edge_engine.common.config
import
EDGE_CONFIG
class
CementBagCounter
(
ModelWrapper
):
...
...
@@ -34,12 +34,11 @@ class CementBagCounter(ModelWrapper):
logger
.
info
(
f
"Config Received {self.config}"
)
self
.
device_id
=
device_id
self
.
rtp
=
pubs
.
rtp_write
self
.
mongo_logger
=
MongoLogger
()
self
.
frame_skip
=
self
.
config
.
get
(
'frame_skip'
,
False
)
engine_file_path
=
"data/jk_v5_cam_42.engine"
self
.
classes
=
{
0
:
'cement_bag'
}
self
.
yolo_v5_wrapper
=
YoloV5TRT
(
engine_file_path
,
model_config
.
get
(
'conf_thresh'
,
0.7
),
model_config
.
get
(
'iou_thresh'
,
0.15
))
#
self.yolo_v5_wrapper = YoloV5TRT(engine_file_path, model_config.get('conf_thresh', 0.7),
#
model_config.get('iou_thresh', 0.15))
# TRT Additions stop
...
...
@@ -57,8 +56,7 @@ class CementBagCounter(ModelWrapper):
self
.
uncounted_objects
=
ExpiringDict
(
max_len
=
model_config
.
get
(
"uncounted_obj_length"
,
50
),
max_age_seconds
=
model_config
.
get
(
"uncounted_obj_age"
,
60
))
self
.
janus_metadata
=
ExpiringDict
(
max_age_seconds
=
120
,
max_len
=
1
)
# self.mongo_alarm_coll = MongoClient(MONGO_URI)['ilens_events']['triggered_alarms']
self
.
camera_details
=
self
.
mongo_logger
.
get_camera_details
(
self
.
device_id
)
# self.camera_details = EDGE_CONFIG[self.device_id]
self
.
frame_count
=
0
self
.
frame
=
None
self
.
centroid_distance
=
10
...
...
@@ -240,7 +238,7 @@ class CementBagCounter(ModelWrapper):
Get the line coordinates from the deployment JSON
"""
if
not
self
.
janus_metadata
.
get
(
'metadata'
):
self
.
janus_metadata
[
'metadata'
]
=
get_extra_fields
(
self
.
device_id
)
self
.
janus_metadata
[
'metadata'
]
=
EDGE_CONFIG
[
"extra_fields"
]
_coordinates
=
[
self
.
janus_metadata
[
'metadata'
]
.
get
(
coordinate_key
)
for
coordinate_key
in
JanusDeploymentConstants
.
LINE_COORDINATES
]
...
...
scripts/cement_counter.py.bkp
deleted
100644 → 0
View file @
9faa06a5
import cv2
import json
import base64
import numpy as np
from scipy.spatial import distance
from expiringdict import ExpiringDict
import time
from edge_engine.common.logsetup import logger
from scripts.utils.infocenter import MongoLogger
# from yolov5processor.infer import ExecuteInference
from scripts.utils.edge_utils import get_extra_fields
from edge_engine.ai.model.modelwraper import ModelWrapper
from scripts.utils.centroidtracker import CentroidTracker
from scripts.common.constants import JanusDeploymentConstants
from scripts.utils.image_utils import draw_circles_on_frame, resize_to_64_64
from pymongo import MongoClient
from scripts.common.config import MONGO_URI
from uuid import uuid4
import cv2
import base64
import datetime
import numpy as np
import imutils
from collections import deque
from expiringdict import ExpiringDict
from sklearn.utils.linear_assignment_ import linear_assignment
from edge_engine.common.logsetup import logger
from edge_engine.ai.model.modelwraper import ModelWrapper
from scripts.utils.tracker import Tracker
from scripts.utils.helpers import box_iou2
from scripts.utils.edge_utils import Utilities
from scripts.utils.infocenter import MongoLogger
from scripts.utils.model_tracker import ModelCountTracker
from scripts.common.constants import JanusDeploymentConstants
# TRT Additions start
# from yolov5processor.infer import ExecuteInference
from scripts.utils.yolov5_trt import YoloV5TRT
# TRT Additions stop
from scripts.utils.relay_util import RelayHandler
class CementBagCounter(ModelWrapper):
def __init__(self, config, model_config, pubs, device_id):
super().__init__()
"""
init function
"""
self.config = config["config"]
self.device_id = device_id
self.rtp = pubs.rtp_write
self.mongo_logger = MongoLogger()
self.frame_skip = self.config.get('frame_skip', False)
# TRT Additions start
# model = "data/acc_v6.pt"
# self.yp = ExecuteInference(weight=model,
# gpu=model_config.get("gpu", False),
# agnostic_nms=model_config.get("agnostic_nms", True),
# iou=model_config.get("iou", 0.2),
# confidence=model_config.get("confidence", 0.4))
engine_file_path = "data/acc_v14.engine"
# with open("/home/ilens/container_weights/classes.json", 'r') as f:
# self.classes = json.loads(f.read())
# self.classes = {int(k): v for k, v in self.classes.items()}
self.classes = {0: 'ambuja_plus', 1: 'acc_gold', 2: 'acc_suraksha_power_plus', 3: 'ambuja_buildcem', 4: 'mrp', 5: 'acc_suraksha_power', 6: 'acc_nfr', 7: 'acc_concrete_plus'}
self.yolo_v5_wrapper = YoloV5TRT(engine_file_path, model_config.get('conf_thresh', 0.5),
model_config.get('iou_thresh', 0.4))
# TRT Additions stop
self.print_eu_dist = model_config.get('print_eu_dist', 200)
self.ct1 = CentroidTracker(maxDisappeared=5)
self.ct2 = CentroidTracker(maxDisappeared=5)
self.count = 0
self.cement_bag = 0
self.count_suraksha = 0
self.count_whitecem = 0
self.count_gold = 0
self.tracker_list = []
self.max_age = 3
self.min_hits = 0
self.track_id_list = deque([str(i) for i in range(1, 50)])
self.prev_annotation = []
self.mrp_counter = 0
self.count_nfr = 0
self.count_suraksha_power = 0
self.count_concrete_plus = 0
self.count_ambuja_plus = 0
# self.prev_class_name = None
self.initial_object_position = None
self.uncounted_objects = ExpiringDict(max_len=model_config.get("uncounted_obj_length", 50),
max_age_seconds=model_config.get("uncounted_obj_age", 60))
self.janus_metadata = ExpiringDict(max_age_seconds=120, max_len=1)
self.mongo_alarm_coll = MongoClient(MONGO_URI)['ilens_events']['triggered_alarms']
self.camera_details = self.mongo_logger.get_camera_details(self.device_id)
self.black_white_ratio_dict = {'ambuja_plus': 1.2, 'acc_gold': 1.2, 'acc_suraksha_power_plus': 1.2,
'ambuja_buildcem': 1.2, 'acc_suraksha_power': 1.2, 'acc_nfr': 1.2,
'acc_concrete_plus': 1.2}
def _pre_process(self, x):
"""
Do preprocessing here, if any
:param x: payload
:return: payload
"""
return x
def _post_process(self, x):
"""
Apply post processing here, if any
:param x: payload
:return: payload
"""
self.rtp.publish(x) # video stream
return x
def send_payload(self, frame, bag_type='', label='CementBagDetected', bg_color="#474520", font_color="#FFFF00",
alert_sound=None,
message="Cement Bag Detected!", mrp_frmae='', mrp_roi = '', mrp = ''):
"""
Insert event to Mongo
:param message:
:param frame:
:param label:
:param bg_color:
:param font_color:
:param alert_sound:
:return: None
"""
payload = {"deviceId": self.device_id, "message": message,
"frame": 'data:image/jpeg;base64,' + base64.b64encode(
cv2.imencode('.jpg', frame)[1].tostring()).decode("utf-8"), "activity": label,
"bg_color": bg_color, "font_color": font_color, "alert_sound": alert_sound, "bag_type": bag_type,
"mrp_frmae": mrp_frmae, "mrp_roi" : mrp_roi, "mrp" : mrp}
self.mongo_logger.insert_attendance_event_to_mongo(payload)
def kalman_tracker(
self,
bboxs,
img,
):
z_box = bboxs
x_box = []
if len(self.tracker_list) > 0:
for trk in self.tracker_list:
x_box.append(trk.box)
matched, unmatched_dets, unmatched_trks = self.assign_detections_to_trackers(x_box, z_box, iou_thrd=0.03)
# Deal with matched detections
if matched.size > 0:
for trk_idx, det_idx in matched:
z = z_box[det_idx]
z = np.expand_dims(z, axis=0).T
tmp_trk = self.tracker_list[trk_idx]
tmp_trk.kalman_filter(z)
xx = tmp_trk.x_state.T[0].tolist()
xx = [xx[0], xx[2], xx[4], xx[6]]
x_box[trk_idx] = xx
tmp_trk.box = xx
tmp_trk.hits += 1
# Deal with unmatched detections
if len(unmatched_dets) > 0:
for idx in unmatched_dets:
z = z_box[idx]
z = np.expand_dims(z, axis=0).T
tmp_trk = Tracker() # Create a new tracker
x = np.array([[z[0], 0, z[1], 0, z[2], 0, z[3], 0]]).T
tmp_trk.x_state = x
tmp_trk.predict_only()
xx = tmp_trk.x_state
xx = xx.T[0].tolist()
xx = [xx[0], xx[2], xx[4], xx[6]]
tmp_trk.box = xx
tmp_trk.id = self.track_id_list.popleft() # assign an ID for the tracker
self.tracker_list.append(tmp_trk)
x_box.append(xx)
# Deal with unmatched tracks
if len(unmatched_trks) > 0:
for trk_idx in unmatched_trks:
tmp_trk = self.tracker_list[trk_idx]
tmp_trk.no_losses += 1
tmp_trk.predict_only()
xx = tmp_trk.x_state
xx = xx.T[0].tolist()
xx = [xx[0], xx[2], xx[4], xx[6]]
tmp_trk.box = xx
x_box[trk_idx] = xx
# The list of tracks to be annotated
good_tracker_list = []
objects = []
boxs = []
for trk in self.tracker_list:
if (trk.hits >= self.min_hits) and (trk.no_losses <= self.max_age):
good_tracker_list.append(trk)
x_cv2 = trk.box
left, top, right, bottom = x_cv2[1], x_cv2[0], x_cv2[3], x_cv2[2]
centroid = [int(left + ((right - left) / 2)), bottom]
objects.append([int(trk.id), centroid])
boxs.append(x_cv2)
deleted_tracks = filter(lambda _x: _x.no_losses > self.max_age, self.tracker_list)
for trk in deleted_tracks:
self.track_id_list.append(trk.id)
self.tracker_list = [x for x in self.tracker_list if x.no_losses <= self.max_age]
# print("object is ", str(objects))
return img, objects, boxs
@staticmethod
def assign_detections_to_trackers(
trackers,
detections,
iou_thrd=0.3,
):
"""
From current list of trackers and new detections, output matched detections,
un matched trackers, unmatched detections.
"""
iou_mat = np.zeros((len(trackers), len(detections)), dtype=np.float32)
for t, trk in enumerate(trackers):
for d, det in enumerate(detections):
iou_mat[t, d] = box_iou2(trk, det)
matched_idx = linear_assignment(-iou_mat)
unmatched_trackers, unmatched_detections = [], []
for t, trk in enumerate(trackers):
if t not in matched_idx[:, 0]:
unmatched_trackers.append(t)
for d, det in enumerate(detections):
if d not in matched_idx[:, 1]:
unmatched_detections.append(d)
matches = []
for m in matched_idx:
if iou_mat[m[0], m[1]] < iou_thrd:
unmatched_trackers.append(m[0])
unmatched_detections.append(m[1])
else:
matches.append(m.reshape(1, 2))
if len(matches) == 0:
matches = np.empty((0, 2), dtype=int)
else:
matches = np.concatenate(matches, axis=0)
return matches, np.array(unmatched_detections), np.array(unmatched_trackers)
def get_line_coordinates(self):
"""
Get the line coordinates from the deployment JSON
"""
if not self.janus_metadata.get('metadata'):
self.janus_metadata['metadata'] = get_extra_fields(self.device_id)
_coordinates = [self.janus_metadata['metadata'].get(coordinate_key) for coordinate_key in
JanusDeploymentConstants.LINE_COORDINATES]
_alignment = self.janus_metadata['metadata'].get(JanusDeploymentConstants.ALIGNMENT_KEY) \
# _coordinates = [550, 200, 555, 1100]
#
# _alignment = "vertical"
return _alignment, _coordinates
def line_point_position(self, point):
"""
Get the position of point w.r.t. the line
:param point: point to be compared
:return: boolean
"""
_alignment, line_coordinates = self.get_line_coordinates()
assert len(line_coordinates) == 4, "Line coordinates variable is invalid"
assert len(point) == 2, "Point variable is invalid"
_slope = (line_coordinates[3] - line_coordinates[1]) / (line_coordinates[2] - line_coordinates[0])
_point_equation_value = point[1] - line_coordinates[1] - _slope * (point[0] - line_coordinates[0])
if _point_equation_value > 0:
return True
else:
return False
def insert_alarm_event(self, message="MRP Missing", asset_hierarchy=""):
data = {
"device_instance_id": asset_hierarchy,
"device_instance_ids": asset_hierarchy,
"alarm_id": "alarm_configuration_212",
"triggered_devices": [
asset_hierarchy
],
"tag_value": 5.0,
"start_time": None,
"end_time": "",
"current_level": 0,
"id": "alarm_event_1567101",
"trigger_time": [
{
"start_time": None,
"counter": 0
}
],
"trigger_levels": [
{
"timestamp": None,
"notificaton_profile": [
{
"usersOrUserGroup": [
{
"value": "access_group_100",
"type": "access_group",
"label": "ACC Admin"
}
],
"notificationProfile": [
"alarm_notify_type_4"
],
"emailIds": [],
"phoneNumbers": [],
"notificationTone": "",
"isNotificationToneShow": True,
"triggers": [
{
"device_instance_id": None,
"tags": None,
"customValueType": None,
"customValue": None,
"counter": 1
}
],
"counter": 1,
"enable_custom": True
}
]
}
],
"priority": "alarm_priority_type_109",
"tag_id": "",
"template": "MRP Missing",
"acknowledge": True,
"alarmName": message,
"alarmType": "Alarm",
"created_by": "user_100",
"project_id": "project_101",
"product_encrypted": False,
"start_time_in_epoch": None,
"show_data_viz": True,
"alarm_condition": message,
"tag_id_list": [
asset_hierarchy
],
"tag_value_json": {
asset_hierarchy: 5.0
},
"alarm_tag_list": [
asset_hierarchy
],
"acknowledged_at": "2022-01-06 20:51:22",
"acknowledged_by": None
}
epoch = int(time.time()) * 1000
time_string = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
data['start_time_in_epoch'] = epoch
data['trigger_time'][0]['start_time'] = epoch
data['start_time'] = time_string
data['trigger_levels'][0]['timestamp'] = time_string
data['acknowledged_at'] = time_string
data['id'] = f"alarm_event_{str(uuid4()).split('-')[0]}"
self.mongo_alarm_coll.insert_one(data)
def validate_point_position(self, point):
"""
Validate the position of the point w.r.t. the line
:param point: centroid
:return: bool
"""
_alignment, line_coordinates = self.get_line_coordinates()
assert _alignment in [JanusDeploymentConstants.VERTICAL, JanusDeploymentConstants.HORIZONTAL], \
"Invalid alignment variable"
if _alignment == JanusDeploymentConstants.VERTICAL:
# _alignment, line_coordinates = self.get_line_coordinates()
# assert _alignment in ["horizontal", "vertical"], \
# "Invalid alignment variable"
# print(point)
# if _alignment == "vertical":
line_y2 = line_coordinates[3]
line_y1 = line_coordinates[1]
if line_y1 < point[1] < line_y2 or line_y2 < point[1] < line_y1:
return True
else:
return False
else:
line_x2 = line_coordinates[2]
line_x1 = line_coordinates[0]
if line_x1 < point[0] < line_x2 or line_x2 < point[0] < line_x1:
return True
else:
return False
# def verifying_cement_bag_type(self, previous_class, current_class):
# if(previous_class == current_class):
# print("bag changed")
def update_bag_count(self, frame, detection_objects, class_name, detections):
"""
Maintains the bag counts
:param frame: image
:param detection_objects: detection object having object id and centroids
"""
# for class_name, (objectID, centroid) in zip(classes, detection_objects):
for (object_id, det, class_detected) in zip(
detection_objects, detections, class_name
):
centroid = object_id[1]
object_id = object_id[0]
logger.debug(detections)
# print(object_id)
frame = draw_circles_on_frame(
frame, centroid, radius=10, color=(0, 0, 255), thickness=-1
)
if self.validate_point_position(centroid):
logger.debug("centroid detected")
if not isinstance(self.initial_object_position, bool):
logger.debug("Initializing the initial object position")
# self.initial_object_position = self.line_point_position(point=centroid)
self.initial_object_position = True
logger.debug(self.initial_object_position)
_point_position = self.line_point_position(point=centroid)
# print("object ID is : ", str(objectID))
logger.debug(self.uncounted_objects)
# Check point in the same side as the initial object
if _point_position == self.initial_object_position:
logger.debug("same side only")
# print(class_name)
# Check the object is not already counted
if object_id not in self.uncounted_objects:
self.uncounted_objects[object_id] = centroid
elif object_id in self.uncounted_objects:
# print("************")
# print(class_detected)
self.uncounted_objects.pop(object_id, None)
if class_detected == "acc_gold":
gold_flag = self.yellow_thresholding(
detections, frame, class_detected
)
if gold_flag:
self.count_gold += 1
else:
self.count_suraksha += 1
# self.verifying_cement_bag_type(self.prev_class_name, "acc_gold")
# self.prev_class_name = "acc_gold"
class_detected = "acc_suraksha_power_plus"
(
mrp_result,
mrp_frame,
mrp_roi,
) = self.distances(detections, frame, class_detected)
if mrp_result:
if gold_flag:
self.send_payload(
resize_to_64_64(frame=frame),
bag_type="acc_gold",
message="ACC GOLD, MRP:YES",
mrp_frmae=mrp_frame,
mrp_roi=mrp_roi,
mrp="PASS",
)
logger.info(
f"Count: {self.count_gold}, Print Found: True"
)
else:
self.send_payload(
resize_to_64_64(frame=frame),
bag_type="acc_suraksha_power_plus",
message="ACC SURAKSHA PP, MRP:YES",
mrp_frmae=mrp_frame,
mrp_roi=mrp_roi,
mrp="PASS",
)
logger.info(
f"Count: {self.count_suraksha}, Print Found: True"
)
else:
if gold_flag:
self.send_payload(
resize_to_64_64(frame=frame),
bag_type="acc_gold",
message="ACC GOLD, MRP:NO",
mrp_frmae=mrp_frame,
mrp_roi=mrp_roi,
)
logger.info(
f"Count: {self.count_gold}, Print Found: False"
)
else:
self.send_payload(
resize_to_64_64(frame=frame),
bag_type="acc_suraksha_power_plus",
message="ACC SURAKSHA PP, MRP:NO",
mrp_frmae=mrp_frame,
mrp_roi=mrp_roi,
)
logger.info(
f"Count: {self.count_suraksha}, Print Found: False"
)
elif class_detected == "acc_suraksha_power_plus":
self.count_suraksha += 1
# self.verifying_cement_bag_type(self.prev_class_name, "acc_suraksha")
# self.prev_class_name = "acc_suraksha"
logger.debug(self.count_suraksha)
(
mrp_result,
mrp_frame,
mrp_roi
) = self.distances(detections, frame, class_detected)
if mrp_result:
self.send_payload(
resize_to_64_64(frame=frame),
bag_type="acc_suraksha_power_plus",
message="ACC SURAKSHA PP, MRP:YES",
mrp_frmae=mrp_frame,
mrp_roi=mrp_roi,
mrp="PASS",
)
logger.info(
f"Count: {self.count_suraksha}, Print Found: True"
)
else:
self.send_payload(
resize_to_64_64(frame=frame),
bag_type="acc_suraksha_power_plus",
message="ACC SURAKSHA PP, MRP:NO",
mrp_frmae=mrp_frame,
mrp_roi=mrp_roi,
)
logger.info(
f"Count: {self.count_suraksha}, Print Found: False"
)
elif class_detected == "ambuja_buildcem":
self.count_whitecem += 1
# self.verifying_cement_bag_type(self.prev_class_name, "ambuja_buildcem")
# self.prev_class_name = "ambuja_buildcem"
(
mrp_result,
mrp_frame,
mrp_roi
) = self.distances(detections, frame, class_detected)
if mrp_result:
self.send_payload(
resize_to_64_64(frame=frame),
bag_type="ambuja_buildcem",
message="AMBUJA BUILDCEM, MRP:YES",
mrp_frmae=mrp_frame,
mrp_roi=mrp_roi,
mrp="PASS",
)
logger.info(
f"Count: {self.count_whitecem}, Print Found: True"
)
else:
self.send_payload(
resize_to_64_64(frame=frame),
bag_type="ambuja_buildcem",
message="AMBUJA BUILDCEM, MRP:NO",
mrp_frmae=mrp_frame,
mrp_roi=mrp_roi,
)
logger.info(
f"Count: {self.count_whitecem}, Print Found: False"
)
elif class_detected == "acc_nfr":
self.count_nfr += 1
# self.verifying_cement_bag_type(self.prev_class_name, "acc_nfr")
# self.prev_class_name = "acc_nfr"
logger.debug(self.count_nfr)
(
mrp_result,
mrp_frame,
mrp_roi,
) = self.distances(detections, frame, class_detected)
if mrp_result:
self.send_payload(
resize_to_64_64(frame=frame),
bag_type="acc_nfr",
message="ACC NFR, MRP:YES",
mrp_frmae=mrp_frame,
mrp_roi=mrp_roi,
mrp="PASS",
)
logger.info(f"Count: {self.count_nfr}, Print Found: True")
else:
self.send_payload(
resize_to_64_64(frame=frame),
bag_type="acc_nfr",
message="ACC NFR, MRP:NO",
mrp_frmae=mrp_frame,
mrp_roi=mrp_roi,
)
logger.info(f"Count: {self.count_nfr}, Print Found: False")
elif class_detected == "acc_suraksha_power":
self.count_suraksha_power += 1
# self.verifying_cement_bag_type(self.prev_class_name, "acc_suraksha_power")
# self.prev_class_name = "acc_suraksha_power"
logger.debug(self.count_suraksha_power)
(
mrp_result,
mrp_frame,
mrp_roi,
) = self.distances(detections, frame, class_detected)
if mrp_result:
self.send_payload(
resize_to_64_64(frame=frame),
bag_type="acc_suraksha_power",
message="ACC SURAKSHA POWER, MRP:YES",
mrp_frmae=mrp_frame,
mrp_roi=mrp_roi,
mrp="PASS",
)
logger.info(
f"Count: {self.count_suraksha_power}, Print Found: True"
)
else:
self.send_payload(
resize_to_64_64(frame=frame),
bag_type="acc_suraksha_power",
message="ACC SURAKSHA POWER, MRP:NO",
mrp_frmae=mrp_frame,
mrp_roi=mrp_roi,
)
logger.info(
f"Count: {self.count_suraksha_power}, Print Found: False"
)
elif class_detected == "acc_concrete_plus":
self.count_concrete_plus += 1
# self.verifying_cement_bag_type(self.prev_class_name, "acc_nfr")
# self.prev_class_name = "acc_nfr"
logger.debug(self.count_concrete_plus)
(
mrp_result,
mrp_frame,
mrp_roi,
) = self.distances(detections, frame, class_detected)
if mrp_result:
self.send_payload(
resize_to_64_64(frame=frame),
bag_type="acc_concrete_plus",
message="ACC CONCRETE PLUS, MRP:YES",
mrp_frmae=mrp_frame,
mrp_roi=mrp_roi,
mrp="PASS",
)
logger.info(
f"Count: {self.count_concrete_plus}, Print Found: True"
)
else:
self.send_payload(
resize_to_64_64(frame=frame),
bag_type="acc_concrete_plus",
message="ACC CONCRETE PLUS, MRP:NO",
mrp_frmae=mrp_frame,
mrp_roi=mrp_roi,
)
logger.info(
f"Count: {self.count_concrete_plus}, Print Found: False"
)
elif class_detected == "ambuja_plus":
self.count_ambuja_plus += 1
# self.verifying_cement_bag_type(self.prev_class_name, "acc_nfr")
# self.prev_class_name = "acc_nfr"
logger.debug(self.count_ambuja_plus)
(
mrp_result,
mrp_frame,
mrp_roi,
) = self.distances(detections, frame, class_detected)
self.text = "COUNT : {ambuja_count}/10".format(
ambuja_count=self.count_ambuja_plus
)
if mrp_result:
self.send_payload(
resize_to_64_64(frame=frame),
bag_type="ambuja_plus",
message="Ambuja PLUS, MRP:YES",
mrp_frmae=mrp_frame,
mrp_roi=mrp_roi,
mrp="PASS",
)
logger.info(
f"Count: {self.count_ambuja_plus}, Print Found: True"
)
else:
self.send_payload(
resize_to_64_64(frame=frame),
bag_type="ambuja_plus",
message="Ambuja PLUS, MRP:NO",
mrp_frmae=mrp_frame,
mrp_roi=mrp_roi,
)
logger.info(
f"Count: {self.count_ambuja_plus}, Print Found: False"
)
frame = draw_circles_on_frame(
frame, centroid, radius=10, color=(0, 255, 0), thickness=-1
)
# cv2.waitKey(0)
# if centroid['has_print']:
# self.send_payload(resize_to_64_64(frame=frame), message='Print Detected!')
# logger.info(f"Count: {self.count}, Print Found: True")
# else:
# self.send_payload(resize_to_64_64(frame=frame), message='Print Missing!')
# logger.info(f"Count: {self.count}, Print Found: False")
else:
frame = draw_circles_on_frame(
frame, centroid, radius=10, color=(0, 255, 0), thickness=-1
)
# count_text_gold = "ACC_GOLD: " + str(self.count_gold)
# count_text_suraksha = "ACC_SURAKSHA_PLUS: " + str(self.count_suraksha)
# count_text_whitecem = "ACC_WHITE_CEM: " + str(self.count_whitecem)
# count_text_suraksha_power = "ACC_SURAKSHA_POWER: " + str(self.count_suraksha_power)
# count_text_nfr = "ACC_NFR: " + str(self.count_nfr)
# cv2.putText(frame, count_text_gold, (1300, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 3,
# cv2.LINE_AA)
# cv2.putText(frame, count_text_suraksha, (1300, 400), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 3,
# cv2.LINE_AA)
# cv2.putText(frame, count_text_whitecem, (1300, 600), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 3,
# cv2.LINE_AA)
# cv2.putText(frame, count_text_suraksha_power, (1300, 800), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 3,
# cv2.LINE_AA)
# cv2.putText(frame, count_text_nfr, (1300, 1000), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 3,
# cv2.LINE_AA)
# cv2.putText(frame, self.prev_class_name, (1000, 800), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 3,
# cv2.LINE_AA)
return frame
def yellow_thresholding(self, detections, frame, class_detected):
gold_flag = False
mrp_cord = []
cem_bag_cord = []
add_mrp = ""
for det in detections:
if det["class"] == "mrp":
mrp_cord.append(det["points"])
else:
cem_bag_cord.append(det["points"])
for c_cord in cem_bag_cord:
bag_width = c_cord[2] - c_cord[0]
if bag_width > 500:
roi = frame[c_cord[1] + 40: c_cord[3], c_cord[0]: c_cord[2] - 80]
original = roi.copy()
# cv2.imshow("roi", roi)
# cv2.waitKey(0)
image = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
lower = np.array([10, 70, 0], dtype="uint8")
upper = np.array([45, 255, 255], dtype="uint8")
mask = cv2.inRange(image, lower, upper)
cnts = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
cnts = cnts[0] if len(cnts) == 2 else cnts[1]
# print("contours")
# print(len(cnts))
for c in cnts:
x, y, w, h = cv2.boundingRect(c)
# cv2.imshow("bag with yellow region: {w}".format(w=w), original)
# print(x, y, w, h)
if w > 100:
gold_flag = True
cv2.rectangle(original, (x, y), (x + w, y + h), (36, 255, 12), 2)
#
break
# if gold_flag:
# print("Final Bag: ACC Gold")
# else:
# print("Final Bag: ACC SURAKSHA PP")
return gold_flag
def draw_line_over_image(self, frame, color=(255, 255, 255)):
"""
Draws line over the counting line
:param frame: frame for
:param color:
:return:
"""
_alignment, line_coordinates = self.get_line_coordinates()
assert len(line_coordinates) == 4, "Line coordinates variable is invalid"
# return cv2.line(frame, (line_coordinates[0], line_coordinates[1]), (line_coordinates[2], line_coordinates[3]),
# color, 3)
self.drawline(
frame,
(line_coordinates[0], line_coordinates[1]),
(line_coordinates[2], line_coordinates[3]),
color,
thickness=3,
)
return frame
@staticmethod
def drawline(img, pt1, pt2, color, thickness=1, style="dotted", gap=20):
dist = ((pt1[0] - pt2[0]) ** 2 + (pt1[1] - pt2[1]) ** 2) ** 0.5
pts = []
for i in np.arange(0, dist, gap):
r = i / dist
x = int((pt1[0] * (1 - r) + pt2[0] * r) + 0.5)
y = int((pt1[1] * (1 - r) + pt2[1] * r) + 0.5)
p = (x, y)
pts.append(p)
if style == "dotted":
for p in pts:
cv2.circle(img, p, thickness, color, -1)
else:
s = pts[0]
e = pts[0]
i = 0
for p in pts:
s = e
e = p
if i % 2 == 1:
cv2.line(img, s, e, color, thickness)
i += 1
def crop_polygon(self, cement_bag_img):
pts = np.array([[100, 180], [530, 110], [555, 190], [100, 260]])
## (1) Crop the bounding rect
rect = cv2.boundingRect(pts)
x, y, w, h = rect
croped = cement_bag_img[y: y + h, x: x + w].copy()
## (2) make mask
pts = pts - pts.min(axis=0)
mask = np.zeros(croped.shape[:2], np.uint8)
cv2.drawContours(mask, [pts], -1, (255, 255, 255), -1, cv2.LINE_AA)
## (3) do bit-op
dst = cv2.bitwise_and(croped, croped, mask=mask)
## (4) add the white background
bg = np.ones_like(croped, np.uint8) * 255
cv2.bitwise_not(bg, bg, mask=mask)
mrp_tag = bg + dst
# import random
#
# cv2.imwrite(
# "E:\\acc_new\\mrp_region_only\\{random_number}.jpg".format(
# random_number=random.randint(1, 100000)
# ),
# mrp_tag,
# )
return mrp_tag
def mrp_digit_count(self, img, detected_class):
digit_num = {"ambuja_plus": 17, "acc_gold": 13, "acc_suraksha_power_plus": 17}
height, width = img.shape[:2]
# print(height, width)
# img[45: 95, 290:379] = [255, 255, 255]
# img[25: 95, 290:500] = [255, 255, 255]
bag_type = detected_class
blank_image = np.zeros((300, 800, 3), np.uint8)
blank_image[:, :] = (255, 255, 255)
l_img = blank_image.copy() # (600, 900, 3)
x_offset = y_offset = 20
l_img[y_offset: y_offset + height, x_offset: x_offset + width] = img.copy()
# cv2.imshow("l_img", l_img)
img = l_img
if bag_type == "ambuja_plus":
hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
h, s, v = cv2.split(hsv)
th, threshed = cv2.threshold(v, 220, 255, cv2.THRESH_BINARY_INV)
image = cv2.bitwise_and(img, img, mask=threshed)
# cv2.imshow("thresholded_image", image)
# pre-process the image by resizing it, converting it to
# graycale, blurring it, and computing an edge map
image = imutils.resize(image, height=150)
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
edged = cv2.Canny(blurred, 50, 200, 255)
# cv2.imshow("edged", edged)
thresh = cv2.threshold(
blurred, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU
)[1]
# print("After Threshold")
# cv2.imshow("thresh", thresh)
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
thresh = cv2.morphologyEx(
thresh, cv2.MORPH_OPEN, kernel, np.ones((5, 5), np.uint8), iterations=2
)
thresh = cv2.morphologyEx(
thresh, cv2.MORPH_CLOSE, kernel, None, None, 1, cv2.BORDER_REFLECT101
)
# cv2.imshow("thresh2", thresh)
cnts = cv2.findContours(
thresh.copy(), cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE
)
cnts = imutils.grab_contours(cnts)
cv2.drawContours(
thresh,
cnts,
-1,
(0, 0, 255),
1,
)
x = self.find_chars(cnts, thresh)
if bag_type == "acc_suraksha_power_plus":
hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
h, s, v = cv2.split(hsv)
th, threshed = cv2.threshold(v, 220, 255, cv2.THRESH_BINARY_INV)
image = cv2.bitwise_and(img, img, mask=threshed)
# pre-process the image by resizing it, converting it to
# graycale, blurring it, and computing an edge map
image = imutils.resize(image, height=150)
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
edged = cv2.Canny(blurred, 50, 200, 255)
thresh = cv2.threshold(
blurred, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU
)[1]
# print("After Threshold")
# cv2.imshow("thresh4", thresh)
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (7, 7))
thresh = cv2.morphologyEx(
thresh, cv2.MORPH_OPEN, kernel, np.ones((5, 5), np.uint8), iterations=2
)
thresh = cv2.morphologyEx(
thresh, cv2.MORPH_CLOSE, kernel, None, None, 1, cv2.BORDER_REFLECT101
)
# print("After Morphology")
# cv2.imshow("thresh5", thresh)
# find contours in the thresholded image, then initialize the
# digit contours lists
cnts = cv2.findContours(thresh.copy(), cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)
cnts = imutils.grab_contours(cnts)
cv2.drawContours(
thresh,
cnts,
-1,
(0, 255, 0),
1,
)
# cv2.imshow("thresh6", thresh)
x = self.find_chars(cnts, thresh)
if bag_type == "acc_gold":
hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
h, s, v = cv2.split(hsv)
# cv2.imshow("img", img)
th, threshed = cv2.threshold(v, 220, 255, cv2.THRESH_BINARY_INV)
image = cv2.bitwise_and(img, img, mask=threshed)
# cv2.imshow("bitwise and", image)
# pre-process the image by resizing it, converting it to
# graycale, blurring it, and computing an edge map
image = imutils.resize(image, height=150)
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
edged = cv2.Canny(blurred, 50, 200, 255)
thresh = cv2.threshold(
blurred, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU
)[1]
# cv2.imshow("thresh7", thresh)
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
thresh = cv2.morphologyEx(
thresh, cv2.MORPH_OPEN, kernel, np.ones((5, 5), np.uint8), iterations=2
)
thresh = cv2.morphologyEx(
thresh, cv2.MORPH_CLOSE, kernel, None, None, 1, cv2.BORDER_REFLECT101
)
# cv2.imshow("thresh9", thresh)
# find contours in the thresholded image, then initialize the
# digit contours lists
cnts = cv2.findContours(thresh.copy(), cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)
cnts = imutils.grab_contours(cnts)
cv2.drawContours(
thresh,
cnts,
-1,
(0, 255, 0),
1,
)
# cv2.imshow("thresh10", thresh)
x = self.find_chars(cnts, thresh)
# cv2.imshow("thresh11", thresh)
if x >= (digit_num[detected_class] - 2):
return x, True
else:
return x, False
def distances(self, detections, frame, class_detected):
mrp_cord = []
cem_bag_cord = []
add_mrp = ""
mrp_roi = ""
for det in detections:
if det["class"] == "mrp":
mrp_cord.append(det["points"])
else:
cem_bag_cord.append(det["points"])
# if mrp_cord == []:
# return False, add_mrp, mrp_roi
for c_cord in cem_bag_cord:
bag_width = c_cord[2] - c_cord[0]
if bag_width > 500:
roi = frame[c_cord[1]: c_cord[3], c_cord[0]: c_cord[2]]
# cv2.imwrite("E:\\acc_new\\masked_bag_ambuja_\\{count}.jpg".format(count = self.count), roi)
# self.count+= 1
h, w, _ = roi.shape
# cv2.imshow("cement_bag", roi)
roi_half_h = roi[int((h / 2) - 0): h, 0:w]
roi_half_v = roi_half_h[50: int((h / 2)), 0: w - 150]
# cv2.imshow("roi", roi)
mrp_roi = "data:image/jpeg;base64," + base64.b64encode(
cv2.imencode(".jpg", roi)[1].tostring()
).decode("utf-8")
extra_values = get_extra_fields(self.device_id)
mrp_detect = extra_values.get(JanusDeploymentConstants.MRP_DETECT_KEY)
if mrp_detect is not None and mrp_detect.lower() == "yes":
mrp_add = self.mrp_image(roi_half_v, class_detected)
# cv2.imshow("mrp_add", mrp_add)
if mrp_add is not None:
# mrp_region = self.crop_polygon(roi)
# # cv2.imshow("mrp_region_", mrp_region)
# mrp_digits, mrp_status = self.mrp_digit_count(
# mrp_region, class_detected
# )
# # cv2.waitKey(0)
# if mrp_status:
# mrp_check = "PASS"
# else:
# mrp_check = "FAIL"
# cv2.imshow("mrp yes", cv2.resize(mrp_add, (400, 300)))
self.mrp_counter = 0
add_mrp = "data:image/jpeg;base64," + base64.b64encode(
cv2.imencode(".jpg", mrp_add)[1].tostring()
).decode("utf-8")
return True, add_mrp, mrp_roi
else:
# cv2.imshow("mrp no", mrp_add)
self.mrp_counter += 1
if self.mrp_counter >= 5:
self.mrp_counter = 0
logger.debug("activate relay")
self.insert_alarm_event(
asset_hierarchy=self.camera_details.get(
"asset_hierarchy", ""
),
message=self.camera_details.get("asset_name", "")
+ " - "
+ class_detected
+ " : MRP missed",
)
RelayHandler().update_relay_status(
self.camera_details.get("belt_relay_ep", ""),
dict(triggerStatus="stop"),
)
logger.debug(
"Stopped the relay because of 5 consecutive MRP misses"
)
return False, add_mrp, mrp_roi
def inference(
self,
frame,
classes,
):
# TRT Additions start
# dets = self.yp.predict(frame)
result_boxes, result_scores, result_classid = self.yolo_v5_wrapper.infer(frame)
dets = [{"points": list(points), "conf": conf, "class": self.classes.get(class_id)} for points, conf, class_id
in
zip(result_boxes, result_scores, result_classid)]
# TRT Additions stop
class_name = list()
bboxs = []
if dets:
for i in dets:
if (i["class"] in classes):
class_name.append(i["class"])
# cv2.rectangle(frame, (i["points"][0], i["points"][1]), (i["points"][2], i["points"][3]), (255, 255, 0), 2)
bboxs.append([i["points"][1], i["points"][0], i["points"][3], i["points"][2]])
# frame = cv2.rectangle(frame, (bboxs[0][0], bboxs[0][1]), (bboxs[0][2], bboxs[0][3]),(255, 255, 0) , 2)
return bboxs, frame, dets, class_name
def _predict(self, obj):
class_list = ["acc_gold", "acc_suraksha_power_plus", "ambuja_buildcem", "acc_nfr", "acc_suraksha_power",
"acc_concrete_plus", "ambuja_plus"]
mrp = ["mrp"]
try:
frame = obj['frame']
dets, frame, _dets, class_name = self.inference(frame, class_list)
frame = self.draw_line_over_image(frame)
frame, objects, boxs = self.kalman_tracker(dets, frame)
frame = self.update_bag_count(frame=frame, detection_objects=objects, class_name=class_name,
detections=_dets)
logger.debug("self.uncounted_objects --> {}".format(self.uncounted_objects))
obj['frame'] = cv2.resize(frame, (self.config.get('FRAME_WIDTH'), self.config.
get('FRAME_HEIGHT')))
except Exception as e:
logger.exception(f"Error: {e}", exc_info=True)
obj['frame'] = cv2.resize(obj['frame'], (self.config.get('FRAME_WIDTH'), self.config.get('FRAME_HEIGHT')))
return obj
def detect_mrp(self, img, class_detected):
# print("Finding Pixels")
black_pixel = 0
white_pixel = 100
# img_bgr = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
pixels = img.reshape(-1, 3)
for pixel in pixels:
if pixel[0] == pixel[1] == pixel[2] == 0:
black_pixel += 1
else:
white_pixel += 1
black_to_white = (white_pixel / black_pixel) * 100
black_white_ratio = self.black_white_ratio_dict[
"{class_name}".format(class_name=class_detected)
]
# print("black_white_ratio")
# print(black_white_ratio)
# cv2.imshow("black white ratio {black_white_ratio1}.jpg".format(black_white_ratio1=str(black_to_white)),
# img)
# if(black_to_white == 14.99204665959703):
# cv2.imwrite("black white ratio {black_white_ratio1}".format(black_white_ratio1=str(black_to_white)), img)
if int(black_to_white) < int(black_white_ratio):
flag = False
else:
# print("MRP present, finding MRP region..")
flag = True
return flag
def mrp_image(self, img, class_detected):
ROI = None
try:
# cv2.imshow("roi image", img)
img_cp = img.copy()
rgb_planes = cv2.split(img_cp)
kernel = np.array([[0, -1, 0], [-1, 5, -1], [0, -1, 0]])
result_planes = []
for plane in rgb_planes:
dilated_img = cv2.dilate(plane, np.ones((7, 7), np.uint8))
bg_img = cv2.medianBlur(dilated_img, 21)
diff_img = 255 - cv2.absdiff(plane, bg_img)
result_planes.append(diff_img)
result = cv2.merge(result_planes)
# print("Original Image but cropped,and shadow removed")
# cv2.imshow("shadow_removed", result)_predict
shadow_removed = result.copy()
result = cv2.filter2D(src=result, ddepth=-1, kernel=kernel)
# cv2_imshow(result)
img_cp = result
# Convert BGR to HSV
hsv = cv2.cvtColor(img_cp, cv2.COLOR_BGR2HSV)
# define range of black color in HSV
lower_val = np.array([0, 0, 160])
upper_val = np.array([180, 230, 200])
# Threshold the HSV image to get only black colors
mask = cv2.inRange(hsv, lower_val, upper_val)
# Bitwise-AND mask and original image
res = cv2.bitwise_and(img_cp, img_cp, mask=mask)
mrp_flag = self.detect_mrp(res, class_detected)
if not mrp_flag:
return ROI
else:
# cv2.imshow("res", res)
return res
except Exception as e:
logger.exception(f"Error: {e}", exc_info=True)
return ROI
\ No newline at end of file
scripts/cement_counter3.py
deleted
100644 → 0
View file @
9faa06a5
import
cv2
import
base64
import
numpy
as
np
from
scipy.spatial
import
distance
from
expiringdict
import
ExpiringDict
from
edge_engine.common.logsetup
import
logger
from
scripts.utils.infocenter
import
MongoLogger
from
yolov5processor.infer
import
ExecuteInference
from
scripts.utils.edge_utils
import
get_extra_fields
from
edge_engine.ai.model.modelwraper
import
ModelWrapper
from
scripts.utils.centroidtracker
import
CentroidTracker
from
scripts.common.constants
import
JanusDeploymentConstants
from
scripts.utils.image_utils
import
draw_circles_on_frame
,
resize_to_64_64
from
scripts.utils.edge_utils
import
Utilities
from
collections
import
deque
from
scripts.utils.tracker
import
Tracker
from
scripts.utils.helpers
import
box_iou2
from
sklearn.utils.linear_assignment_
import
linear_assignment
class
CementBagCounter
(
ModelWrapper
):
def
__init__
(
self
,
config
,
model_config
,
pubs
,
device_id
):
super
()
.
__init__
()
"""
init function
"""
self
.
config
=
config
[
"config"
]
self
.
device_id
=
device_id
self
.
rtp
=
pubs
.
rtp_write
self
.
mongo_logger
=
MongoLogger
()
self
.
frame_skip
=
self
.
config
.
get
(
'frame_skip'
,
False
)
model
=
"data/ACC_v3.pt"
self
.
yp
=
ExecuteInference
(
weight
=
model
,
gpu
=
model_config
.
get
(
"gpu"
,
False
),
agnostic_nms
=
model_config
.
get
(
"agnostic_nms"
,
True
),
iou
=
model_config
.
get
(
"iou"
,
0.2
),
confidence
=
model_config
.
get
(
"confidence"
,
0.4
))
self
.
print_eu_dist
=
model_config
.
get
(
'print_eu_dist'
,
200
)
self
.
ct1
=
CentroidTracker
(
maxDisappeared
=
5
)
self
.
ct2
=
CentroidTracker
(
maxDisappeared
=
5
)
self
.
frame_skipping
=
{
"skip_current_frame"
:
True
,
"detection_value"
:
None
}
self
.
count
=
0
self
.
cement_bag
=
0
self
.
count_suraksha
=
0
self
.
count_whitecem
=
0
self
.
count_gold
=
0
self
.
mrp_counter
=
0
self
.
initial_object_position
=
Utilities
.
get_direction
(
self
.
device_id
)
self
.
tracker_list
=
[]
self
.
max_age
=
15
self
.
min_hits
=
10
self
.
track_id_list
=
deque
([
str
(
i
)
for
i
in
range
(
1
,
50
)])
self
.
prev_annotation
=
[]
self
.
initial_object_position
=
None
self
.
uncounted_objects
=
ExpiringDict
(
max_len
=
model_config
.
get
(
"uncounted_obj_length"
,
50
),
max_age_seconds
=
model_config
.
get
(
"uncounted_obj_age"
,
60
))
self
.
janus_metadata
=
ExpiringDict
(
max_age_seconds
=
120
,
max_len
=
1
)
def
_pre_process
(
self
,
x
):
"""
Do preprocessing here, if any
:param x: payload
:return: payload
"""
return
x
def
_post_process
(
self
,
x
):
"""
Apply post processing here, if any
:param x: payload
:return: payload
"""
self
.
rtp
.
publish
(
x
)
# video stream
return
x
def
send_payload
(
self
,
frame
,
label
=
'CementBagDetected'
,
bg_color
=
"#474520"
,
font_color
=
"#FFFF00"
,
alert_sound
=
None
,
message
=
"Cement Bag Detected!"
):
"""
Insert event to Mongo
:param message:
:param frame:
:param label:
:param bg_color:
:param font_color:
:param alert_sound:
:return: None
"""
payload
=
{
"deviceId"
:
self
.
device_id
,
"message"
:
message
,
"frame"
:
'data:image/jpeg;base64,'
+
base64
.
b64encode
(
cv2
.
imencode
(
'.jpg'
,
frame
)[
1
]
.
tostring
())
.
decode
(
"utf-8"
),
"activity"
:
label
,
"bg_color"
:
bg_color
,
"font_color"
:
font_color
,
"alert_sound"
:
alert_sound
}
self
.
mongo_logger
.
insert_attendance_event_to_mongo
(
payload
)
def
track_bags
(
self
,
tracker_obj
,
dets
,
im0
,
filter_name
,
centroid_color
=
(
255
,
0
,
0
)):
"""
Track the bags using Centroid based tracking
:param dets: prediction output
:param tracker_obj: prediction output
:param filter_name: prediction output
:param im0: raw frame
:param centroid_color: color given to the centroid marking
:return: centroid points, frame
"""
bags
=
list
()
classes
=
list
()
for
det
in
dets
:
if
(
det
[
"class"
]
in
filter_name
):
bags
.
append
(
np
.
array
(
det
[
'points'
])
.
astype
(
"int"
))
classes
.
append
(
det
[
"class"
])
objects
=
tracker_obj
.
update
(
bags
)
objects
.
pop
(
"frame"
,
None
)
if
centroid_color
is
not
False
:
for
(
objectID
,
centroid
)
in
objects
.
items
():
if
centroid
[
'has_print'
]:
centroid_color
=
(
0
,
255
,
0
)
cv2
.
putText
(
im0
,
str
(
objectID
),
(
centroid
[
'centroid'
][
0
]
-
10
,
centroid
[
'centroid'
][
1
]
-
10
),
cv2
.
FONT_HERSHEY_SIMPLEX
,
1
,
centroid_color
,
2
,
cv2
.
LINE_AA
)
cv2
.
circle
(
im0
,
(
centroid
[
'centroid'
][
0
],
centroid
[
'centroid'
][
1
]),
8
,
centroid_color
,
-
1
)
return
objects
,
classes
,
im0
def
kalman_tracker
(
self
,
bboxs
,
img
,
):
z_box
=
bboxs
x_box
=
[]
if
len
(
self
.
tracker_list
)
>
0
:
for
trk
in
self
.
tracker_list
:
x_box
.
append
(
trk
.
box
)
matched
,
unmatched_dets
,
unmatched_trks
=
self
.
assign_detections_to_trackers
(
x_box
,
z_box
,
iou_thrd
=
0.01
)
# Deal with matched detections
if
matched
.
size
>
0
:
for
trk_idx
,
det_idx
in
matched
:
z
=
z_box
[
det_idx
]
z
=
np
.
expand_dims
(
z
,
axis
=
0
)
.
T
tmp_trk
=
self
.
tracker_list
[
trk_idx
]
tmp_trk
.
kalman_filter
(
z
)
xx
=
tmp_trk
.
x_state
.
T
[
0
]
.
tolist
()
xx
=
[
xx
[
0
],
xx
[
2
],
xx
[
4
],
xx
[
6
]]
x_box
[
trk_idx
]
=
xx
tmp_trk
.
box
=
xx
tmp_trk
.
hits
+=
1
# Deal with unmatched detections
if
len
(
unmatched_dets
)
>
0
:
for
idx
in
unmatched_dets
:
z
=
z_box
[
idx
]
z
=
np
.
expand_dims
(
z
,
axis
=
0
)
.
T
tmp_trk
=
Tracker
()
# Create a new tracker
x
=
np
.
array
([[
z
[
0
],
0
,
z
[
1
],
0
,
z
[
2
],
0
,
z
[
3
],
0
]])
.
T
tmp_trk
.
x_state
=
x
tmp_trk
.
predict_only
()
xx
=
tmp_trk
.
x_state
xx
=
xx
.
T
[
0
]
.
tolist
()
xx
=
[
xx
[
0
],
xx
[
2
],
xx
[
4
],
xx
[
6
]]
tmp_trk
.
box
=
xx
tmp_trk
.
id
=
self
.
track_id_list
.
popleft
()
# assign an ID for the tracker
self
.
tracker_list
.
append
(
tmp_trk
)
x_box
.
append
(
xx
)
# Deal with unmatched tracks
if
len
(
unmatched_trks
)
>
0
:
for
trk_idx
in
unmatched_trks
:
tmp_trk
=
self
.
tracker_list
[
trk_idx
]
tmp_trk
.
no_losses
+=
1
tmp_trk
.
predict_only
()
xx
=
tmp_trk
.
x_state
xx
=
xx
.
T
[
0
]
.
tolist
()
xx
=
[
xx
[
0
],
xx
[
2
],
xx
[
4
],
xx
[
6
]]
tmp_trk
.
box
=
xx
x_box
[
trk_idx
]
=
xx
# The list of tracks to be annotated
good_tracker_list
=
[]
objects
=
[]
boxs
=
[]
for
trk
in
self
.
tracker_list
:
if
(
trk
.
hits
>=
self
.
min_hits
)
and
(
trk
.
no_losses
<=
self
.
max_age
):
good_tracker_list
.
append
(
trk
)
x_cv2
=
trk
.
box
left
,
top
,
right
,
bottom
=
x_cv2
[
1
],
x_cv2
[
0
],
x_cv2
[
3
],
x_cv2
[
2
]
centroid
=
[
int
(
left
+
((
right
-
left
)
/
2
)),
bottom
]
objects
.
append
([
int
(
trk
.
id
),
centroid
])
boxs
.
append
(
x_cv2
)
deleted_tracks
=
filter
(
lambda
_x
:
_x
.
no_losses
>
self
.
max_age
,
self
.
tracker_list
)
for
trk
in
deleted_tracks
:
self
.
track_id_list
.
append
(
trk
.
id
)
self
.
tracker_list
=
[
x
for
x
in
self
.
tracker_list
if
x
.
no_losses
<=
self
.
max_age
]
print
(
"object is "
,
str
(
objects
))
return
img
,
objects
,
boxs
@
staticmethod
def
assign_detections_to_trackers
(
trackers
,
detections
,
iou_thrd
=
0.3
,
):
"""
From current list of trackers and new detections, output matched detections,
un matched trackers, unmatched detections.
"""
iou_mat
=
np
.
zeros
((
len
(
trackers
),
len
(
detections
)),
dtype
=
np
.
float32
)
for
t
,
trk
in
enumerate
(
trackers
):
for
d
,
det
in
enumerate
(
detections
):
iou_mat
[
t
,
d
]
=
box_iou2
(
trk
,
det
)
matched_idx
=
linear_assignment
(
-
iou_mat
)
unmatched_trackers
,
unmatched_detections
=
[],
[]
for
t
,
trk
in
enumerate
(
trackers
):
if
t
not
in
matched_idx
[:,
0
]:
unmatched_trackers
.
append
(
t
)
for
d
,
det
in
enumerate
(
detections
):
if
d
not
in
matched_idx
[:,
1
]:
unmatched_detections
.
append
(
d
)
matches
=
[]
for
m
in
matched_idx
:
if
iou_mat
[
m
[
0
],
m
[
1
]]
<
iou_thrd
:
unmatched_trackers
.
append
(
m
[
0
])
unmatched_detections
.
append
(
m
[
1
])
else
:
matches
.
append
(
m
.
reshape
(
1
,
2
))
if
len
(
matches
)
==
0
:
matches
=
np
.
empty
((
0
,
2
),
dtype
=
int
)
else
:
matches
=
np
.
concatenate
(
matches
,
axis
=
0
)
return
matches
,
np
.
array
(
unmatched_detections
),
np
.
array
(
unmatched_trackers
)
def
get_line_coordinates
(
self
):
"""
Get the line coordinates from the deployment JSON
"""
if
not
self
.
janus_metadata
.
get
(
'metadata'
):
self
.
janus_metadata
[
'metadata'
]
=
get_extra_fields
(
self
.
device_id
)
_coordinates
=
[
self
.
janus_metadata
[
'metadata'
]
.
get
(
coordinate_key
)
for
coordinate_key
in
JanusDeploymentConstants
.
LINE_COORDINATES
]
_alignment
=
self
.
janus_metadata
[
'metadata'
]
.
get
(
JanusDeploymentConstants
.
ALIGNMENT_KEY
)
return
_alignment
,
_coordinates
def
line_point_position
(
self
,
point
):
"""
Get the position of point w.r.t. the line
:param point: point to be compared
:return: boolean
"""
_alignment
,
line_coordinates
=
self
.
get_line_coordinates
()
assert
len
(
line_coordinates
)
==
4
,
"Line coordinates variable is invalid"
assert
len
(
point
)
==
2
,
"Point variable is invalid"
_slope
=
(
line_coordinates
[
3
]
-
line_coordinates
[
1
])
/
(
line_coordinates
[
2
]
-
line_coordinates
[
0
])
_point_equation_value
=
point
[
1
]
-
line_coordinates
[
1
]
-
_slope
*
(
point
[
0
]
-
line_coordinates
[
0
])
if
_point_equation_value
>
0
:
return
True
else
:
return
False
def
validate_point_position
(
self
,
point
):
"""
Validate the position of the point w.r.t. the line
:param point: centroid
:return: bool
"""
_alignment
,
line_coordinates
=
self
.
get_line_coordinates
()
assert
_alignment
in
[
JanusDeploymentConstants
.
VERTICAL
,
JanusDeploymentConstants
.
HORIZONTAL
],
\
"Invalid alignment variable"
if
_alignment
==
JanusDeploymentConstants
.
VERTICAL
:
line_y2
=
line_coordinates
[
3
]
line_y1
=
line_coordinates
[
1
]
if
line_y1
<
point
[
1
]
<
line_y2
or
line_y2
<
point
[
1
]
<
line_y1
:
return
True
else
:
return
False
else
:
line_x2
=
line_coordinates
[
2
]
line_x1
=
line_coordinates
[
0
]
if
line_x1
<
point
[
0
]
<
line_x2
or
line_x2
<
point
[
0
]
<
line_x1
:
return
True
else
:
return
False
def
update_bag_count
(
self
,
frame
,
detection_objects
,
classes
):
"""
Maintains the bag counts
:param frame: image
:param detection_objects: detection object having object id and centroids
"""
for
class_name
,
(
objectID
,
centroid
)
in
zip
(
classes
,
detection_objects
.
items
()):
if
self
.
validate_point_position
(
centroid
[
'centroid'
]):
logger
.
debug
(
"centroid detected"
)
if
not
isinstance
(
self
.
initial_object_position
,
bool
):
logger
.
debug
(
"Initializing the initial object position"
)
self
.
initial_object_position
=
self
.
line_point_position
(
point
=
centroid
[
'centroid'
])
Utilities
.
set_direction
(
self
.
device_id
,
self
.
initial_object_position
)
#self.initial_object_position = True
logger
.
debug
(
self
.
initial_object_position
)
_point_position
=
self
.
line_point_position
(
point
=
centroid
[
'centroid'
])
logger
.
debug
(
"object ID is : "
,
str
(
objectID
))
logger
.
debug
(
self
.
uncounted_objects
)
# Check point in the same side as the initial object
if
_point_position
==
self
.
initial_object_position
:
logger
.
debug
(
"same side only"
)
# Check the object is not already counted
if
objectID
not
in
self
.
uncounted_objects
:
self
.
uncounted_objects
[
objectID
]
=
centroid
[
'centroid'
]
frame
=
draw_circles_on_frame
(
frame
,
centroid
[
'centroid'
],
radius
=
10
,
color
=
(
0
,
0
,
255
),
thickness
=-
1
)
elif
objectID
in
self
.
uncounted_objects
:
logger
.
debug
(
"different side"
)
self
.
uncounted_objects
.
pop
(
objectID
,
None
)
if
(
class_name
==
"acc_gold"
):
self
.
count_gold
+=
1
logger
.
debug
(
self
.
count_gold
)
elif
(
class_name
==
"acc_suraksha_plus"
):
self
.
count_suraksha
+=
1
logger
.
debug
(
self
.
count_suraksha
)
elif
(
class_name
==
"ambuja_whitecem"
):
self
.
count_whitecem
+=
1
logger
.
debug
(
self
.
count_whitecem
)
frame
=
draw_circles_on_frame
(
frame
,
centroid
[
'centroid'
],
radius
=
10
,
color
=
(
0
,
255
,
0
),
thickness
=-
1
)
if
centroid
[
'has_print'
]:
self
.
send_payload
(
resize_to_64_64
(
frame
=
frame
),
message
=
'Print Detected!'
)
logger
.
info
(
f
"Count: {self.count}, Print Found: True"
)
else
:
self
.
send_payload
(
resize_to_64_64
(
frame
=
frame
),
message
=
'Print Missing!'
)
logger
.
info
(
f
"Count: {self.count}, Print Found: False"
)
else
:
frame
=
draw_circles_on_frame
(
frame
,
centroid
[
'centroid'
],
radius
=
10
,
color
=
(
0
,
255
,
0
),
thickness
=-
1
)
count_text_gold
=
"ACC_GOLD: "
+
str
(
self
.
count_gold
)
count_text_suraksha
=
"ACC_SURAKSHA_P_PLUS: "
+
str
(
self
.
count_suraksha
)
count_text_whitecem
=
"PPC_WHITE: "
+
str
(
self
.
count_whitecem
)
cv2
.
putText
(
frame
,
count_text_gold
,
(
1300
,
200
),
cv2
.
FONT_HERSHEY_SIMPLEX
,
1
,
(
255
,
255
,
0
),
3
,
cv2
.
LINE_AA
)
cv2
.
putText
(
frame
,
count_text_suraksha
,
(
1300
,
400
),
cv2
.
FONT_HERSHEY_SIMPLEX
,
1
,
(
255
,
255
,
0
),
3
,
cv2
.
LINE_AA
)
cv2
.
putText
(
frame
,
count_text_whitecem
,
(
1300
,
600
),
cv2
.
FONT_HERSHEY_SIMPLEX
,
1
,
(
255
,
255
,
0
),
3
,
cv2
.
LINE_AA
)
return
frame
def
draw_line_over_image
(
self
,
frame
,
color
=
(
255
,
255
,
255
)):
"""
Draws line over the counting line
:param frame: frame for
:param color:
:return:
"""
_alignment
,
line_coordinates
=
self
.
get_line_coordinates
()
assert
len
(
line_coordinates
)
==
4
,
"Line coordinates variable is invalid"
# return cv2.line(frame, (line_coordinates[0], line_coordinates[1]), (line_coordinates[2], line_coordinates[3]),
# color, 3)
self
.
drawline
(
frame
,
(
line_coordinates
[
0
],
line_coordinates
[
1
]),
(
line_coordinates
[
2
],
line_coordinates
[
3
]),
color
,
thickness
=
3
)
return
frame
@
staticmethod
def
drawline
(
img
,
pt1
,
pt2
,
color
,
thickness
=
1
,
style
=
'dotted'
,
gap
=
20
):
dist
=
((
pt1
[
0
]
-
pt2
[
0
])
**
2
+
(
pt1
[
1
]
-
pt2
[
1
])
**
2
)
**
.5
pts
=
[]
for
i
in
np
.
arange
(
0
,
dist
,
gap
):
r
=
i
/
dist
x
=
int
((
pt1
[
0
]
*
(
1
-
r
)
+
pt2
[
0
]
*
r
)
+
.5
)
y
=
int
((
pt1
[
1
]
*
(
1
-
r
)
+
pt2
[
1
]
*
r
)
+
.5
)
p
=
(
x
,
y
)
pts
.
append
(
p
)
if
style
==
'dotted'
:
for
p
in
pts
:
cv2
.
circle
(
img
,
p
,
thickness
,
color
,
-
1
)
else
:
s
=
pts
[
0
]
e
=
pts
[
0
]
i
=
0
for
p
in
pts
:
s
=
e
e
=
p
if
i
%
2
==
1
:
cv2
.
line
(
img
,
s
,
e
,
color
,
thickness
)
i
+=
1
def
distances
(
self
,
objs1
,
objs2
):
for
key1
,
val1
in
objs1
.
items
():
for
key2
,
val2
in
objs2
.
items
():
dst
=
distance
.
euclidean
(
val1
[
'centroid'
],
val2
[
'centroid'
])
if
objs1
[
key1
][
'has_print'
]:
self
.
mrp_counter
+=
1
if
(
self
.
mrp_counter
>=
5
):
#STOP THE RELAY
pass
continue
elif
dst
<
self
.
print_eu_dist
:
objs1
[
key1
][
'has_print'
]
=
True
self
.
mrp_counter
=
0
# def inference(
# self,
# frame,
# classes,
#
# ):
# dets = self.yp.predict(frame)
# class_name = list()
# bboxs = []
#
# if dets:
# for i in dets:
# if(i["class"] in classes):
# class_name.append(i["class"])
# bboxs.append([i["points"][1], i["points"][0], i["points"][3], i["points"][2]])
#
# print("#######")
# print(bboxs)
# #frame = cv2.rectangle(frame, (bboxs[0][0], bboxs[0][1]), (bboxs[0][2], bboxs[0][3]),(255, 255, 0) , 2)
# return bboxs, frame, dets, class_name
def
inference
(
self
,
frame
,
):
dets
=
self
.
yp
.
predict
(
frame
)
bboxs
=
[]
if
dets
:
for
i
in
dets
:
bboxs
.
append
([
i
[
"points"
][
1
],
i
[
"points"
][
0
],
i
[
"points"
][
3
],
i
[
"points"
][
2
]])
return
bboxs
,
frame
,
dets
def
_predict
(
self
,
obj
):
class_list
=
[
"acc_gold"
,
"acc_suraksha_plus"
,
"ambuja_buildcem"
]
try
:
frame
=
obj
[
'frame'
]
if
self
.
frame_skip
:
if
not
self
.
frame_skipping
[
"skip_current_frame"
]:
dets
=
self
.
yp
.
predict
(
frame
)
self
.
frame_skipping
[
"detection_value"
]
=
dets
self
.
frame_skipping
[
"skip_current_frame"
]
=
True
else
:
dets
=
self
.
frame_skipping
[
"detection_value"
]
self
.
frame_skipping
[
"skip_current_frame"
]
=
False
else
:
dets
,
frame
,
_dets
=
self
.
inference
(
frame
)
print
(
"PRINTING INFERENCE FUNCTION OUTPUT"
)
print
(
dets
)
print
(
_dets
)
#print(class_name)
#if dets:
frame
,
objects
,
boxs
=
self
.
kalman_tracker
(
dets
,
frame
)
print
(
"PRINTING KALMAN OUTPUTS"
)
print
(
objects
)
print
(
boxs
)
dets
=
self
.
yp
.
predict
(
frame
)
frame
=
self
.
draw_line_over_image
(
frame
)
# if [True for e in dets if e['class'] == 'cement_bag']:
#class_list = ["acc_gold", "acc_suraksha_plus", "ambuja_whitecem"]
mrp
=
[
"mrp"
]
objects
,
classes_cement
,
frame
=
self
.
track_bags
(
self
.
ct1
,
dets
,
frame
,
class_list
)
_
,
classes
,
frame
=
self
.
track_bags
(
self
.
ct2
,
dets
,
frame
,
mrp
)
frame
=
self
.
update_bag_count
(
frame
=
frame
,
detection_objects
=
objects
,
classes
=
classes_cement
)
cv2
.
imshow
(
"output is "
,
cv2
.
resize
(
frame
,
(
900
,
600
)))
cv2
.
waitKey
(
1
)
self
.
distances
(
objects
,
_
)
logger
.
debug
(
"self.uncounted_objects --> {}"
.
format
(
self
.
uncounted_objects
))
# for each in dets:
# color = (255, 255, 0)
# class_n = "Cement Bag"
#
# if each['class'] == 'label':
# color = (0, 255, 0)
# class_n = "Printing Detected!"
# cv2.rectangle(frame, (each['points'][0], each['points'][1]), (each['points'][2], each['points'][3]),
# color, 2)
# cv2.putText(frame, class_n, (each['points'][2], each['points'][1]), cv2.FONT_HERSHEY_SIMPLEX,
# 1, color, 2, cv2.LINE_AA)
obj
[
'frame'
]
=
cv2
.
resize
(
frame
,
(
self
.
config
.
get
(
'FRAME_WIDTH'
),
self
.
config
.
get
(
'FRAME_HEIGHT'
)))
except
Exception
as
e
:
logger
.
exception
(
f
"Error: {e}"
,
exc_info
=
True
)
obj
[
'frame'
]
=
cv2
.
resize
(
obj
[
'frame'
],
(
self
.
config
.
get
(
'FRAME_WIDTH'
),
self
.
config
.
get
(
'FRAME_HEIGHT'
)))
return
obj
scripts/cement_counter_for_acc_final - Copy.py
deleted
100644 → 0
View file @
9faa06a5
import
cv2
import
base64
import
numpy
as
np
from
scipy.spatial
import
distance
from
expiringdict
import
ExpiringDict
from
edge_engine.common.logsetup
import
logger
from
scripts.utils.infocenter
import
MongoLogger
from
yolov5processor.infer
import
ExecuteInference
from
scripts.utils.edge_utils
import
get_extra_fields
from
edge_engine.ai.model.modelwraper
import
ModelWrapper
from
scripts.utils.centroidtracker
import
CentroidTracker
from
scripts.common.constants
import
JanusDeploymentConstants
from
scripts.utils.image_utils
import
draw_circles_on_frame
,
resize_to_64_64
import
cv2
import
base64
import
datetime
import
numpy
as
np
from
collections
import
deque
from
expiringdict
import
ExpiringDict
from
sklearn.utils.linear_assignment_
import
linear_assignment
from
edge_engine.common.logsetup
import
logger
from
edge_engine.ai.model.modelwraper
import
ModelWrapper
from
scripts.utils.tracker
import
Tracker
from
scripts.utils.helpers
import
box_iou2
from
scripts.utils.edge_utils
import
Utilities
from
scripts.utils.infocenter
import
MongoLogger
from
scripts.utils.model_tracker
import
ModelCountTracker
from
scripts.common.constants
import
JanusDeploymentConstants
from
yolov5processor.infer
import
ExecuteInference
class
CementBagCounter
(
ModelWrapper
):
def
__init__
(
self
,
config
,
model_config
,
pubs
,
device_id
):
super
()
.
__init__
()
"""
init function
"""
self
.
config
=
config
[
"config"
]
self
.
device_id
=
device_id
self
.
rtp
=
pubs
.
rtp_write
self
.
mongo_logger
=
MongoLogger
()
self
.
frame_skip
=
self
.
config
.
get
(
'frame_skip'
,
False
)
model
=
"data/ACC_v3.pt"
self
.
yp
=
ExecuteInference
(
weight
=
model
,
gpu
=
model_config
.
get
(
"gpu"
,
False
),
agnostic_nms
=
model_config
.
get
(
"agnostic_nms"
,
True
),
iou
=
model_config
.
get
(
"iou"
,
0.2
),
confidence
=
model_config
.
get
(
"confidence"
,
0.4
))
self
.
print_eu_dist
=
model_config
.
get
(
'print_eu_dist'
,
200
)
self
.
ct1
=
CentroidTracker
(
maxDisappeared
=
5
)
self
.
ct2
=
CentroidTracker
(
maxDisappeared
=
5
)
self
.
frame_skipping
=
{
"skip_current_frame"
:
True
,
"detection_value"
:
None
}
self
.
count
=
0
self
.
cement_bag
=
0
self
.
count_suraksha
=
0
self
.
count_whitecem
=
0
self
.
count_gold
=
0
self
.
tracker_list
=
[]
self
.
max_age
=
3
self
.
min_hits
=
0
self
.
track_id_list
=
deque
([
str
(
i
)
for
i
in
range
(
1
,
50
)])
self
.
prev_annotation
=
[]
self
.
initial_object_position
=
None
self
.
uncounted_objects
=
ExpiringDict
(
max_len
=
model_config
.
get
(
"uncounted_obj_length"
,
50
),
max_age_seconds
=
model_config
.
get
(
"uncounted_obj_age"
,
60
))
self
.
janus_metadata
=
ExpiringDict
(
max_age_seconds
=
120
,
max_len
=
1
)
def
_pre_process
(
self
,
x
):
"""
Do preprocessing here, if any
:param x: payload
:return: payload
"""
return
x
def
_post_process
(
self
,
x
):
"""
Apply post processing here, if any
:param x: payload
:return: payload
"""
self
.
rtp
.
publish
(
x
)
# video stream
return
x
def
send_payload
(
self
,
frame
,
label
=
'CementBagDetected'
,
bg_color
=
"#474520"
,
font_color
=
"#FFFF00"
,
alert_sound
=
None
,
message
=
"Cement Bag Detected!"
):
"""
Insert event to Mongo
:param message:
:param frame:
:param label:
:param bg_color:
:param font_color:
:param alert_sound:
:return: None
"""
payload
=
{
"deviceId"
:
self
.
device_id
,
"message"
:
message
,
"frame"
:
'data:image/jpeg;base64,'
+
base64
.
b64encode
(
cv2
.
imencode
(
'.jpg'
,
frame
)[
1
]
.
tostring
())
.
decode
(
"utf-8"
),
"activity"
:
label
,
"bg_color"
:
bg_color
,
"font_color"
:
font_color
,
"alert_sound"
:
alert_sound
}
self
.
mongo_logger
.
insert_attendance_event_to_mongo
(
payload
)
def
kalman_tracker
(
self
,
bboxs
,
img
,
):
z_box
=
bboxs
x_box
=
[]
if
len
(
self
.
tracker_list
)
>
0
:
for
trk
in
self
.
tracker_list
:
x_box
.
append
(
trk
.
box
)
matched
,
unmatched_dets
,
unmatched_trks
=
self
.
assign_detections_to_trackers
(
x_box
,
z_box
,
iou_thrd
=
0.03
)
# Deal with matched detections
if
matched
.
size
>
0
:
for
trk_idx
,
det_idx
in
matched
:
z
=
z_box
[
det_idx
]
z
=
np
.
expand_dims
(
z
,
axis
=
0
)
.
T
tmp_trk
=
self
.
tracker_list
[
trk_idx
]
tmp_trk
.
kalman_filter
(
z
)
xx
=
tmp_trk
.
x_state
.
T
[
0
]
.
tolist
()
xx
=
[
xx
[
0
],
xx
[
2
],
xx
[
4
],
xx
[
6
]]
x_box
[
trk_idx
]
=
xx
tmp_trk
.
box
=
xx
tmp_trk
.
hits
+=
1
# Deal with unmatched detections
if
len
(
unmatched_dets
)
>
0
:
for
idx
in
unmatched_dets
:
z
=
z_box
[
idx
]
z
=
np
.
expand_dims
(
z
,
axis
=
0
)
.
T
tmp_trk
=
Tracker
()
# Create a new tracker
x
=
np
.
array
([[
z
[
0
],
0
,
z
[
1
],
0
,
z
[
2
],
0
,
z
[
3
],
0
]])
.
T
tmp_trk
.
x_state
=
x
tmp_trk
.
predict_only
()
xx
=
tmp_trk
.
x_state
xx
=
xx
.
T
[
0
]
.
tolist
()
xx
=
[
xx
[
0
],
xx
[
2
],
xx
[
4
],
xx
[
6
]]
tmp_trk
.
box
=
xx
tmp_trk
.
id
=
self
.
track_id_list
.
popleft
()
# assign an ID for the tracker
self
.
tracker_list
.
append
(
tmp_trk
)
x_box
.
append
(
xx
)
# Deal with unmatched tracks
if
len
(
unmatched_trks
)
>
0
:
for
trk_idx
in
unmatched_trks
:
tmp_trk
=
self
.
tracker_list
[
trk_idx
]
tmp_trk
.
no_losses
+=
1
tmp_trk
.
predict_only
()
xx
=
tmp_trk
.
x_state
xx
=
xx
.
T
[
0
]
.
tolist
()
xx
=
[
xx
[
0
],
xx
[
2
],
xx
[
4
],
xx
[
6
]]
tmp_trk
.
box
=
xx
x_box
[
trk_idx
]
=
xx
# The list of tracks to be annotated
good_tracker_list
=
[]
objects
=
[]
boxs
=
[]
for
trk
in
self
.
tracker_list
:
if
(
trk
.
hits
>=
self
.
min_hits
)
and
(
trk
.
no_losses
<=
self
.
max_age
):
good_tracker_list
.
append
(
trk
)
x_cv2
=
trk
.
box
left
,
top
,
right
,
bottom
=
x_cv2
[
1
],
x_cv2
[
0
],
x_cv2
[
3
],
x_cv2
[
2
]
centroid
=
[
int
(
left
+
((
right
-
left
)
/
2
)),
bottom
]
objects
.
append
([
int
(
trk
.
id
),
centroid
])
boxs
.
append
(
x_cv2
)
deleted_tracks
=
filter
(
lambda
_x
:
_x
.
no_losses
>
self
.
max_age
,
self
.
tracker_list
)
for
trk
in
deleted_tracks
:
self
.
track_id_list
.
append
(
trk
.
id
)
self
.
tracker_list
=
[
x
for
x
in
self
.
tracker_list
if
x
.
no_losses
<=
self
.
max_age
]
# print("object is ", str(objects))
return
img
,
objects
,
boxs
@
staticmethod
def
assign_detections_to_trackers
(
trackers
,
detections
,
iou_thrd
=
0.3
,
):
"""
From current list of trackers and new detections, output matched detections,
un matched trackers, unmatched detections.
"""
iou_mat
=
np
.
zeros
((
len
(
trackers
),
len
(
detections
)),
dtype
=
np
.
float32
)
for
t
,
trk
in
enumerate
(
trackers
):
for
d
,
det
in
enumerate
(
detections
):
iou_mat
[
t
,
d
]
=
box_iou2
(
trk
,
det
)
matched_idx
=
linear_assignment
(
-
iou_mat
)
unmatched_trackers
,
unmatched_detections
=
[],
[]
for
t
,
trk
in
enumerate
(
trackers
):
if
t
not
in
matched_idx
[:,
0
]:
unmatched_trackers
.
append
(
t
)
for
d
,
det
in
enumerate
(
detections
):
if
d
not
in
matched_idx
[:,
1
]:
unmatched_detections
.
append
(
d
)
matches
=
[]
for
m
in
matched_idx
:
if
iou_mat
[
m
[
0
],
m
[
1
]]
<
iou_thrd
:
unmatched_trackers
.
append
(
m
[
0
])
unmatched_detections
.
append
(
m
[
1
])
else
:
matches
.
append
(
m
.
reshape
(
1
,
2
))
if
len
(
matches
)
==
0
:
matches
=
np
.
empty
((
0
,
2
),
dtype
=
int
)
else
:
matches
=
np
.
concatenate
(
matches
,
axis
=
0
)
return
matches
,
np
.
array
(
unmatched_detections
),
np
.
array
(
unmatched_trackers
)
def
get_line_coordinates
(
self
):
"""
Get the line coordinates from the deployment JSON
"""
if
not
self
.
janus_metadata
.
get
(
'metadata'
):
self
.
janus_metadata
[
'metadata'
]
=
get_extra_fields
(
self
.
device_id
)
_coordinates
=
[
self
.
janus_metadata
[
'metadata'
]
.
get
(
coordinate_key
)
for
coordinate_key
in
JanusDeploymentConstants
.
LINE_COORDINATES
]
_alignment
=
self
.
janus_metadata
[
'metadata'
]
.
get
(
JanusDeploymentConstants
.
ALIGNMENT_KEY
)
\
# _coordinates = [550, 200, 555, 1100]
#
# _alignment = "vertical"
return
_alignment
,
_coordinates
def
line_point_position
(
self
,
point
):
"""
Get the position of point w.r.t. the line
:param point: point to be compared
:return: boolean
"""
_alignment
,
line_coordinates
=
self
.
get_line_coordinates
()
assert
len
(
line_coordinates
)
==
4
,
"Line coordinates variable is invalid"
assert
len
(
point
)
==
2
,
"Point variable is invalid"
_slope
=
(
line_coordinates
[
3
]
-
line_coordinates
[
1
])
/
(
line_coordinates
[
2
]
-
line_coordinates
[
0
])
_point_equation_value
=
point
[
1
]
-
line_coordinates
[
1
]
-
_slope
*
(
point
[
0
]
-
line_coordinates
[
0
])
if
_point_equation_value
>
0
:
return
True
else
:
return
False
def
validate_point_position
(
self
,
point
):
"""
Validate the position of the point w.r.t. the line
:param point: centroid
:return: bool
"""
_alignment
,
line_coordinates
=
self
.
get_line_coordinates
()
assert
_alignment
in
[
JanusDeploymentConstants
.
VERTICAL
,
JanusDeploymentConstants
.
HORIZONTAL
],
\
"Invalid alignment variable"
if
_alignment
==
JanusDeploymentConstants
.
VERTICAL
:
# _alignment, line_coordinates = self.get_line_coordinates()
# assert _alignment in ["horizontal", "vertical"], \
# "Invalid alignment variable"
# print(point)
# if _alignment == "vertical":
line_y2
=
line_coordinates
[
3
]
line_y1
=
line_coordinates
[
1
]
if
line_y1
<
point
[
1
]
<
line_y2
or
line_y2
<
point
[
1
]
<
line_y1
:
return
True
else
:
return
False
else
:
line_x2
=
line_coordinates
[
2
]
line_x1
=
line_coordinates
[
0
]
if
line_x1
<
point
[
0
]
<
line_x2
or
line_x2
<
point
[
0
]
<
line_x1
:
return
True
else
:
return
False
def
update_bag_count
(
self
,
frame
,
detection_objects
,
class_name
,
detections
):
"""
Maintains the bag counts
:param frame: image
:param detection_objects: detection object having object id and centroids
"""
#for class_name, (objectID, centroid) in zip(classes, detection_objects):
for
(
object_id
,
det
)
in
zip
(
detection_objects
,
detections
):
centroid
=
object_id
[
1
]
object_id
=
object_id
[
0
]
logger
.
debug
(
detections
)
#print(object_id)
frame
=
draw_circles_on_frame
(
frame
,
centroid
,
radius
=
10
,
color
=
(
0
,
0
,
255
),
thickness
=-
1
)
if
self
.
validate_point_position
(
centroid
):
logger
.
debug
(
"centroid detected"
)
#if self.validate_point_position(centroid):
# # if not isinstance(self.count, int):
# # logger.debug("Initializing the count variable")
# print("again entering")
# # Initializing the bag count
# if (class_name == "acc_gold"):
# if not isinstance(self.count_gold, int):
# logger.debug("Initializing the count variable")
# self.count_gold = 0
# elif (class_name == "acc_suraksha"):
# if not isinstance(self.count_suraksha, int):
# logger.debug("Initializing the count variable")
# self.count_suraksha = 0
# elif (class_name == "acc_buildcem"):
# if not isinstance(self.count_whitecem, int):
# logger.debug("Initializing the count variable")
# self.count_whitecem = 0
if
not
isinstance
(
self
.
initial_object_position
,
bool
):
logger
.
debug
(
"Initializing the initial object position"
)
#self.initial_object_position = self.line_point_position(point=centroid)
self
.
initial_object_position
=
True
logger
.
debug
(
self
.
initial_object_position
)
_point_position
=
self
.
line_point_position
(
point
=
centroid
)
#print("object ID is : ", str(objectID))
logger
.
debug
(
self
.
uncounted_objects
)
# Check point in the same side as the initial object
if
_point_position
==
self
.
initial_object_position
:
logger
.
debug
(
"same side only"
)
#print(class_name)
# Check the object is not already counted
if
object_id
not
in
self
.
uncounted_objects
:
self
.
uncounted_objects
[
object_id
]
=
centroid
elif
object_id
in
self
.
uncounted_objects
:
self
.
uncounted_objects
.
pop
(
object_id
,
None
)
if
(
"acc_gold"
in
class_name
):
self
.
count_gold
+=
1
mrp_result
=
self
.
distances
(
detections
)
if
mrp_result
:
self
.
send_payload
(
resize_to_64_64
(
frame
=
frame
),
message
=
'ACC GOLD Bag Detected: Print Detected!'
)
logger
.
info
(
f
"Count: {self.count_gold}, Print Found: True"
)
else
:
self
.
send_payload
(
resize_to_64_64
(
frame
=
frame
),
message
=
'ACC GOLD Bag Detected: Print Missing!'
)
logger
.
info
(
f
"Count: {self.count_gold}, Print Found: False"
)
elif
(
"acc_suraksha_plus"
in
class_name
):
self
.
count_suraksha
+=
1
logger
.
debug
(
self
.
count_suraksha
)
mrp_result
=
self
.
distances
(
detections
)
if
mrp_result
:
self
.
send_payload
(
resize_to_64_64
(
frame
=
frame
),
message
=
'ACC SURAKSHA PLUS Bag Detected: Print Detected!'
)
logger
.
info
(
f
"Count: {self.count_suraksha}, Print Found: True"
)
else
:
self
.
send_payload
(
resize_to_64_64
(
frame
=
frame
),
message
=
'ACC SURAKSHA PLUS Bag Detected: Print Missing!'
)
logger
.
info
(
f
"Count: {self.count_suraksha}, Print Found: False"
)
elif
(
"ambuja_whitecem"
in
class_name
):
self
.
count_whitecem
+=
1
mrp_result
=
self
.
distances
(
detections
)
if
mrp_result
:
self
.
send_payload
(
resize_to_64_64
(
frame
=
frame
),
message
=
'PPC White Bag Detected: Print Detected!'
)
logger
.
info
(
f
"Count: {self.count_whitecem}, Print Found: True"
)
else
:
self
.
send_payload
(
resize_to_64_64
(
frame
=
frame
),
message
=
'PPC White Bag Detected: Print Missing!'
)
logger
.
info
(
f
"Count: {self.count_whitecem}, Print Found: False"
)
frame
=
draw_circles_on_frame
(
frame
,
centroid
[
'centroid'
],
radius
=
10
,
color
=
(
0
,
255
,
0
),
thickness
=-
1
)
if
centroid
[
'has_print'
]:
self
.
send_payload
(
resize_to_64_64
(
frame
=
frame
),
message
=
'Print Detected!'
)
logger
.
info
(
f
"Count: {self.count}, Print Found: True"
)
else
:
self
.
send_payload
(
resize_to_64_64
(
frame
=
frame
),
message
=
'Print Missing!'
)
logger
.
info
(
f
"Count: {self.count}, Print Found: False"
)
else
:
frame
=
draw_circles_on_frame
(
frame
,
centroid
[
'centroid'
],
radius
=
10
,
color
=
(
0
,
255
,
0
),
thickness
=-
1
)
# count_text_gold = "ACC_GOLD: " + str(self.count_gold)
# count_text_suraksha = "ACC_SURAKSHA_PLUS: " + str(self.count_suraksha)
# count_text_whitecem = "ACC_WHITE_CEM: " + str(self.count_whitecem)
# cv2.putText(frame, count_text_gold, (1300, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 3,
# cv2.LINE_AA)
# cv2.putText(frame, count_text_suraksha, (1300, 400), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 3,
# cv2.LINE_AA)
# cv2.putText(frame, count_text_whitecem, (1300, 600), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 3,
# cv2.LINE_AA)
return
frame
def
draw_line_over_image
(
self
,
frame
,
color
=
(
255
,
255
,
255
)):
"""
Draws line over the counting line
:param frame: frame for
:param color:
:return:
"""
_alignment
,
line_coordinates
=
self
.
get_line_coordinates
()
assert
len
(
line_coordinates
)
==
4
,
"Line coordinates variable is invalid"
# return cv2.line(frame, (line_coordinates[0], line_coordinates[1]), (line_coordinates[2], line_coordinates[3]),
# color, 3)
self
.
drawline
(
frame
,
(
line_coordinates
[
0
],
line_coordinates
[
1
]),
(
line_coordinates
[
2
],
line_coordinates
[
3
]),
color
,
thickness
=
3
)
return
frame
@
staticmethod
def
drawline
(
img
,
pt1
,
pt2
,
color
,
thickness
=
1
,
style
=
'dotted'
,
gap
=
20
):
dist
=
((
pt1
[
0
]
-
pt2
[
0
])
**
2
+
(
pt1
[
1
]
-
pt2
[
1
])
**
2
)
**
.5
pts
=
[]
for
i
in
np
.
arange
(
0
,
dist
,
gap
):
r
=
i
/
dist
x
=
int
((
pt1
[
0
]
*
(
1
-
r
)
+
pt2
[
0
]
*
r
)
+
.5
)
y
=
int
((
pt1
[
1
]
*
(
1
-
r
)
+
pt2
[
1
]
*
r
)
+
.5
)
p
=
(
x
,
y
)
pts
.
append
(
p
)
if
style
==
'dotted'
:
for
p
in
pts
:
cv2
.
circle
(
img
,
p
,
thickness
,
color
,
-
1
)
else
:
s
=
pts
[
0
]
e
=
pts
[
0
]
i
=
0
for
p
in
pts
:
s
=
e
e
=
p
if
i
%
2
==
1
:
cv2
.
line
(
img
,
s
,
e
,
color
,
thickness
)
i
+=
1
def
distances
(
self
,
detections
):
mrp_cord
=
list
()
cem_bag_cord
=
list
()
for
det
in
detections
:
if
(
det
[
"class"
]
==
"mrp"
):
mrp_cord
.
append
(
det
[
"points"
])
else
:
cem_bag_cord
.
append
(
det
[
"points"
])
for
c_cord
in
cem_bag_cord
:
for
m_cord
in
mrp_cord
:
if
(
m_cord
[
0
]
>
c_cord
[
0
]
and
m_cord
[
0
]
<
c_cord
[
2
]
and
m_cord
[
1
]
>
c_cord
[
1
]
and
m_cord
[
1
]
<
c_cord
[
3
]):
logger
.
debug
(
"print is detected"
)
#cv2.waitKey(0)
return
True
else
:
return
False
def
inference
(
self
,
frame
,
classes
,
):
dets
=
self
.
yp
.
predict
(
frame
)
class_name
=
list
()
bboxs
=
[]
if
dets
:
for
i
in
dets
:
if
(
i
[
"class"
]
in
classes
):
class_name
.
append
(
i
[
"class"
])
#cv2.rectangle(frame, (i["points"][0], i["points"][1]), (i["points"][2], i["points"][3]), (255, 255, 0), 2)
bboxs
.
append
([
i
[
"points"
][
1
],
i
[
"points"
][
0
],
i
[
"points"
][
3
],
i
[
"points"
][
2
]])
# frame = cv2.rectangle(frame, (bboxs[0][0], bboxs[0][1]), (bboxs[0][2], bboxs[0][3]),(255, 255, 0) , 2)
return
bboxs
,
frame
,
dets
,
class_name
def
_predict
(
self
,
obj
):
class_list
=
[
"acc_gold"
,
"acc_suraksha_plus"
,
"ambuja_whitecem"
]
mrp
=
[
"mrp"
]
try
:
frame
=
obj
[
'frame'
]
if
self
.
frame_skip
:
if
not
self
.
frame_skipping
[
"skip_current_frame"
]:
dets
=
self
.
yp
.
predict
(
frame
)
self
.
frame_skipping
[
"detection_value"
]
=
dets
self
.
frame_skipping
[
"skip_current_frame"
]
=
True
else
:
dets
=
self
.
frame_skipping
[
"detection_value"
]
self
.
frame_skipping
[
"skip_current_frame"
]
=
False
else
:
dets
,
frame
,
_dets
,
class_name
=
self
.
inference
(
frame
,
class_list
)
#dets_mrp, frame_mrp, _dets_mrp, class_name_mrp = self.inference(frame, mrp)
frame
=
self
.
draw_line_over_image
(
frame
)
# if [True for e in dets if e['class'] == 'cement_bag']:
#if dets:
frame
,
objects
,
boxs
=
self
.
kalman_tracker
(
dets
,
frame
)
print
(
"PRINTING KALMAN OUTPUT"
)
print
(
objects
)
print
(
boxs
)
for
box
in
boxs
:
cv2
.
rectangle
(
frame
,
(
box
[
1
],
box
[
0
]),
(
box
[
3
],
box
[
2
]),
(
255
,
0
,
0
),
2
)
#objects,classes_cement, frame = self.track_bags(self.ct1, dets, frame, class_list)
#_,classes, frame = self.track_bags(self.ct2, _dets, frame, mrp)
#frame, _, box_mrp = self.kalman_tracker(dets_mrp, frame)
frame
=
self
.
update_bag_count
(
frame
=
frame
,
detection_objects
=
objects
,
class_name
=
class_name
,
detections
=
_dets
)
# cv2.imshow("output is ", cv2.resize(frame, (900, 600)))
# cv2.waitKey(1)
# print("******")
# print(objects)
# print(_)
# self.distances(objects, _)
logger
.
debug
(
"self.uncounted_objects --> {}"
.
format
(
self
.
uncounted_objects
))
# for each in dets:
# color = (255, 255, 0)
# class_n = "Cement Bag"
#
# if each['class'] == 'label':
# color = (0, 255, 0)
# class_n = "Printing Detected!"
# cv2.rectangle(frame, (each['points'][0], each['points'][1]), (each['points'][2], each['points'][3]),
# color, 2)
# cv2.putText(frame, class_n, (each['points'][2], each['points'][1]), cv2.FONT_HERSHEY_SIMPLEX,
# 1, color, 2, cv2.LINE_AA)
obj
[
'frame'
]
=
cv2
.
resize
(
frame
,
(
self
.
config
.
get
(
'FRAME_WIDTH'
),
self
.
config
.
get
(
'FRAME_HEIGHT'
)))
except
Exception
as
e
:
logger
.
exception
(
f
"Error: {e}"
,
exc_info
=
True
)
obj
[
'frame'
]
=
cv2
.
resize
(
obj
[
'frame'
],
(
self
.
config
.
get
(
'FRAME_WIDTH'
),
self
.
config
.
get
(
'FRAME_HEIGHT'
)))
return
obj
scripts/common/config.py
View file @
513178d5
...
...
@@ -2,7 +2,6 @@ import os
import
sys
import
json
from
pymongo
import
MongoClient
MAIN_OS_VARIABLE
=
json
.
loads
(
os
.
environ
.
get
(
'config'
))
if
MAIN_OS_VARIABLE
is
None
:
...
...
@@ -14,8 +13,3 @@ MONGO_URI = MAIN_OS_VARIABLE.get('MONGO_URI')
MONGO_SERVICE_DB
=
MAIN_OS_VARIABLE
.
get
(
'MONGO_DB'
)
MONGO_SERVICE_COLL
=
MAIN_OS_VARIABLE
.
get
(
'MONGO_COLL'
)
PASS_KEY
=
MAIN_OS_VARIABLE
.
get
(
'PASS_KEY'
)
MONGO_DB_OBJ
=
MongoClient
(
MONGO_URI
)[
MONGO_SERVICE_DB
]
HOST_CONFIG
=
MONGO_DB_OBJ
[
MONGO_SERVICE_COLL
]
.
find_one
({
'configId'
:
'hostConfig'
})
.
get
(
'config'
)
APP_MONGO_COLLECTION
=
MONGO_DB_OBJ
[
MONGO_SERVICE_COLL
]
.
find_one
({
'configId'
:
'appMongoConfig'
})
.
get
(
'config'
)
scripts/utils/edge_utils.py
View file @
513178d5
from
scripts.common.constants
import
JanusDeploymentConstants
from
scripts.common.config
import
MONGO_DB_OBJ
,
APP_MONGO_COLLECTION
import
cv2
from
edge_engine.common.logsetup
import
logger
#from scripts.common.config import MONGO_DB_OBJ, APP_MONGO_COLLECTION
#from scripts.common.constants import JanusDeploymentConstants
class
Utilities
:
@
classmethod
def
get_extra_fields
(
cls
,
device_id
,
):
_janus_deployment
=
MONGO_DB_OBJ
[
APP_MONGO_COLLECTION
.
get
(
JanusDeploymentConstants
.
JANUS_DEPLOYMENT_COLLECTION
)]
.
find_one
(
{
JanusDeploymentConstants
.
DEPLOYMENT_ID
:
device_id
})
.
get
(
JanusDeploymentConstants
.
EXTRA_FIELDS_KEY
)
if
_janus_deployment
is
None
:
raise
ValueError
(
"Janus deployment configuration is not found/corrupted"
)
_key_dictionary
=
dict
()
for
each_field
in
_janus_deployment
:
_key_dictionary
[
each_field
[
'key'
]]
=
each_field
[
'value'
]
return
_key_dictionary
@
classmethod
def
get_direction
(
cls
,
device_id
,
):
logger
.
debug
(
"Getting the direction from DB"
)
return
MONGO_DB_OBJ
[
APP_MONGO_COLLECTION
.
get
(
JanusDeploymentConstants
.
JANUS_DEPLOYMENT_COLLECTION
)]
.
find_one
(
{
JanusDeploymentConstants
.
DEPLOYMENT_ID
:
device_id
})
.
get
(
JanusDeploymentConstants
.
DIRECTION_KEY
)
@
classmethod
def
set_direction
(
cls
,
device_id
:
str
,
direction
:
bool
,
):
logger
.
debug
(
"Updating the direction in DB"
)
updated_values
=
{
"$set"
:
{
JanusDeploymentConstants
.
DIRECTION_KEY
:
direction
}}
MONGO_DB_OBJ
[
APP_MONGO_COLLECTION
.
get
(
JanusDeploymentConstants
.
JANUS_DEPLOYMENT_COLLECTION
)]
.
update_one
(
{
JanusDeploymentConstants
.
DEPLOYMENT_ID
:
device_id
},
updated_values
)
@
classmethod
def
draw_circles_on_frame
(
cls
,
frame
,
point
,
radius
=
3
,
color
=
(
255
,
255
,
255
),
thickness
=
1
,
):
"""
draw circle on the objects
:param radius: radius of the circle
:param frame: frame to draw on
:param point: co-ordinate to draw on
:param color: color of the circle
:param thickness: thickness of the circle
:return: frame
"""
return
cv2
.
circle
(
frame
,
tuple
(
point
),
radius
,
color
,
thickness
)
@
classmethod
def
resize_to_64_64
(
cls
,
frame
,
):
"""
resize the from
:param frame: frame
:return: frame
"""
return
cv2
.
resize
(
frame
,
(
64
,
64
))
def
get_extra_fields
(
device_id
):
# _janus_deployment = [
# {
# "type": "number",
# "key": "x1",
# "value": 1000
# },
# {
# "type": "number",
# "key": "y1",
# "value": 0
# },
# {
# "type": "number",
# "key": "x2",
# "value": 1001
# },
# {
# "type": "number",
# "key": "y2",
# "value": 720
# },
# {
# "type": "dropdown",
# "key": "alignment",
# "value": "vertical"
# }
# ]
_janus_deployment
=
MONGO_DB_OBJ
[
APP_MONGO_COLLECTION
.
get
(
JanusDeploymentConstants
.
JANUS_DEPLOYMENT_COLLECTION
)]
.
\
find_one
({
JanusDeploymentConstants
.
DEPLOYMENT_ID
:
device_id
})
.
get
(
JanusDeploymentConstants
.
EXTRA_FIELDS_KEY
)
if
_janus_deployment
is
None
:
raise
ValueError
(
"Janus deployment configuration is not found/corrupted"
)
_key_dictionary
=
dict
()
for
each_field
in
_janus_deployment
:
_key_dictionary
[
each_field
[
'key'
]]
=
each_field
[
'value'
]
return
_key_dictionary
# from scripts.common.constants import JanusDeploymentConstants
#
# import cv2
#
# from edge_engine.common.logsetup import logger
# #from scripts.common.config import MONGO_DB_OBJ, APP_MONGO_COLLECTION
# #from scripts.common.constants import JanusDeploymentConstants
#
#
# class Utilities:
#
# @classmethod
# def get_extra_fields(
# cls,
# device_id,
# ):
# _janus_deployment = MONGO_DB_OBJ[
# APP_MONGO_COLLECTION.get(JanusDeploymentConstants.JANUS_DEPLOYMENT_COLLECTION)].find_one(
# {JanusDeploymentConstants.DEPLOYMENT_ID: device_id}).get(
# JanusDeploymentConstants.EXTRA_FIELDS_KEY)
# if _janus_deployment is None:
# raise ValueError("Janus deployment configuration is not found/corrupted")
#
# _key_dictionary = dict()
# for each_field in _janus_deployment:
# _key_dictionary[each_field['key']] = each_field['value']
# return _key_dictionary
#
# @classmethod
# def get_direction(
# cls,
# device_id,
# ):
# logger.debug("Getting the direction from DB")
# return MONGO_DB_OBJ[APP_MONGO_COLLECTION.get(JanusDeploymentConstants.JANUS_DEPLOYMENT_COLLECTION)].find_one(
# {JanusDeploymentConstants.DEPLOYMENT_ID: device_id}).get(
# JanusDeploymentConstants.DIRECTION_KEY)
#
# @classmethod
# def set_direction(
# cls,
# device_id: str,
# direction: bool,
# ):
# logger.debug("Updating the direction in DB")
# updated_values = {"$set": {JanusDeploymentConstants.DIRECTION_KEY: direction}}
# MONGO_DB_OBJ[APP_MONGO_COLLECTION.get(JanusDeploymentConstants.JANUS_DEPLOYMENT_COLLECTION)].update_one(
# {JanusDeploymentConstants.DEPLOYMENT_ID: device_id}, updated_values)
#
# @classmethod
# def draw_circles_on_frame(
# cls,
# frame,
# point,
# radius=3,
# color=(255, 255, 255),
# thickness=1,
# ):
# """
# draw circle on the objects
# :param radius: radius of the circle
# :param frame: frame to draw on
# :param point: co-ordinate to draw on
# :param color: color of the circle
# :param thickness: thickness of the circle
# :return: frame
# """
# return cv2.circle(frame, tuple(point), radius, color, thickness)
#
# @classmethod
# def resize_to_64_64(
# cls,
# frame,
# ):
# """
# resize the from
# :param frame: frame
# :return: frame
# """
# return cv2.resize(frame, (64, 64))
#
#
# def get_extra_fields(device_id):
# # _janus_deployment = [
# # {
# # "type": "number",
# # "key": "x1",
# # "value": 1000
# # },
# # {
# # "type": "number",
# # "key": "y1",
# # "value": 0
# # },
# # {
# # "type": "number",
# # "key": "x2",
# # "value": 1001
# # },
# # {
# # "type": "number",
# # "key": "y2",
# # "value": 720
# # },
# # {
# # "type": "dropdown",
# # "key": "alignment",
# # "value": "vertical"
# # }
# # ]
# _janus_deployment = MONGO_DB_OBJ[APP_MONGO_COLLECTION.get(JanusDeploymentConstants.JANUS_DEPLOYMENT_COLLECTION)]. \
# find_one({JanusDeploymentConstants.DEPLOYMENT_ID: device_id}).get(JanusDeploymentConstants.EXTRA_FIELDS_KEY)
#
# if _janus_deployment is None:
# raise ValueError("Janus deployment configuration is not found/corrupted")
#
# _key_dictionary = dict()
# for each_field in _janus_deployment:
# _key_dictionary[each_field['key']] = each_field['value']
# return _key_dictionary
scripts/utils/infocenter.py
View file @
513178d5
...
...
@@ -9,7 +9,7 @@ from scripts.utils.ilens_request_handler import post
from
uuid
import
uuid1
from
urllib.parse
import
urljoin
from
edge_engine.common.logsetup
import
logger
from
scripts.common.config
import
MONGO_DB_OBJ
,
APP_MONGO_COLLECTION
class
MongoLogger
:
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment