Commit af97b683 authored by kayef.ahamad's avatar kayef.ahamad

Merge branch 'development' into 'master'

Can Count

See merge request !1
parents e5227537 cb2b561e
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
*pyc
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
.idea/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
FROM ailens-registry.knowledgelens.com:9084/repository/kl-docker-repo/ilens/ilens-openvino-base:v1
#FROM flare-detection-pytorch:v2.0
RUN rm -rf /var/lib/apt/lists/* && rm -rf /root/.cache/pip/
RUN apt-get update
RUN apt-get install -y libsm6 libxext6 libxrender-dev ffmpeg
RUN pip3 install minio requests cachetools pymongo~=3.10.1 wincertstore==0.2 opencv-contrib-python==4.2.0.34 matplotlib certifi==2020.6.20 imutils
#RUN apt-get install tzdata
RUN pip3 install --upgrade pip
RUN pip3 install minio cachetools
RUN pip3 install expiringdict
RUN pip3 install torch==1.6.0
RUN pip3 install torchvision==0.7.0
RUN pip3 install yolov5processor==0.0.3
RUN pip3 install Cython pillow PyYAML scipy
RUN pip install numpy==1.18.5
RUN pip install msgpack==0.5.6
ADD . /app
WORKDIR /app
RUN pip3 install edge_engine-1.0.0a0-py3-none-any.whl
CMD ["bash","app.sh"]
\ No newline at end of file
{
"deploymentId" : "e6070f12",
"deviceId" : "e6070f12",
"pubConfigs" : [
{
"type" : "OPENCV",
"ffmpegCmd" : [
"ffmpeg",
"-y",
"-f",
"rawvideo",
"-vcodec",
"rawvideo",
"-pix_fmt",
"bgr24",
"-s",
"600x400",
"-i",
"-",
"-c:v",
"libvpx",
"-crf",
"30",
"-pix_fmt",
"yuv420p",
"-b:v",
"1M",
"-f",
"rtp"
],
"RTPEndpoint" : [
"rtp://192.168.3.220:10105?pkt_size=600"
]
}
],
"inputConf" : {
"sourceType" : "videofile",
"gStreamer" : false,
"uri" : "sample_videos/cans.mp4"
},
"modelConfig" : {"confidence": 0.10},
"config" : {
"TZ" : "Asia/Kolkata",
"FRAME_WIDTH" : 600,
"FRAME_HEIGHT" : 400,
"URI" : "intruder_demo.mp4"
}
}
\ No newline at end of file
# Can Count # srf-cyclinder-detection
Can Count
\ No newline at end of file
import os
# os.environ["state"] = "dev"
# os.environ["config"] = '{\"TZ\": \"Asia/Kolkata\", \"MONGO_URI\": \"mongodb://192.168.3.220:2717\", ' \
# '\"MONGO_DATABASE\": \"ilens_ai\", \"MONGO_COLLECTION\": \"janusDeployment\", \"MONGO_KEY\": ' \
# '\"deploymentId\", \"MONGO_VALUE\": \"Monitoring_Camera_7b861450\", \"MONGO_COLL\": ' \
# '\"serviceConfiguration\", \"MONGO_DB\": \"ilens_ai\"} '
from edge_engine.edge_processor import ExecutePipeline
from edge_engine.edge_processor import Pubs
from scripts.core.cans_model import SRF_Cans
from edge_engine.common.config import EDGE_CONFIG
if __name__ == '__main__':
pubs = Pubs()
mod = SRF_Cans(config=EDGE_CONFIG, pubs=pubs,
device_id=EDGE_CONFIG['deviceId'])
ex = ExecutePipeline(mod)
ex.run_model()
source /opt/intel/openvino/bin/setupvars.sh
python3 app.py
\ No newline at end of file
This diff is collapsed.
from edge_engine.ai.model.modelwraper import ModelWrapper
from abc import ABC, abstractmethod
class ModelWrapper(ABC):
def __init__(self, path=None):
"""Implement code to load mask_model here"""
pass
def _pre_process(self, x):
"""Implement code to process raw input into format required for mask_model inference here"""
return x
def _post_process(self, x):
"""Implement any code to post-process mask_model inference response here"""
return x
@abstractmethod
def _predict(self, x):
"""Implement core mask_model inference code here"""
pass
def predict(self, x):
pre_x = self._pre_process(x)
prediction = self._predict(pre_x)
result = self._post_process(prediction)
return result
# import the necessary packages
import cv2
import numpy as np
class GammaPreprocessor:
def __init__(self, gamma=1.0):
# creating Gamma table
self.invGamma = 1.0 / gamma
self.table = np.array([((i / 255.0) ** self.invGamma) * 255
for i in np.arange(0, 256)]).astype("uint8")
def preprocess(self, image):
return cv2.LUT(image, self.table)
# import the necessary packages
from keras.preprocessing.image import img_to_array
class ImageToArrayPreprocessor:
def __init__(self, dataFormat=None):
# store the image data format
self.dataFormat = dataFormat
def preprocess(self, image):
# apply the Keras utility function that correctly rearranges
# the dimensions of the image
return img_to_array(image, data_format=self.dataFormat)
# import the necessary packages
import cv2
class SimpleHistogramPreprocessor:
def __init__(self):
pass
def preprocess(self, image):
# Run Histogram simple Equalization
return cv2.equalizeHist(image)
# import the necessary packages
import cv2
class SimplePreprocessor:
def __init__(self, width, height, inter=cv2.INTER_AREA):
# store the target image width, height, and interpolation
# method used when resizing
self.width = width
self.height = height
self.inter = inter
def preprocess(self, image):
# resize the image to a fixed size, ignoring the aspect
# ratio
return cv2.resize(image, (self.width, self.height),
interpolation=self.inter)
import os
import sys
from edge_engine.common.constants import LicenseModule
from dateutil import parser
from datetime import datetime
from pymongo import MongoClient
from copy import deepcopy
import json
def licence_validator(payload):
try:
dt = parser.parse(payload['valid_till'])
now = datetime.now()
if (now > dt):
sys.stdout.write("Licence Expired \n".format())
sys.stdout.flush()
return False
return True
except KeyError as e:
sys.stderr.write("Error loading licence")
return False
def get_config_from_mongo(mongo_uri, dbname, basecollection,
key, value):
mongo = MongoClient(mongo_uri)
db = mongo[dbname]
config = db[basecollection].find_one({key: value}, {"_id": False})
return config
def load_conf(config, mongo_uri, dbname):
mongo = MongoClient(mongo_uri)
db = mongo[dbname]
pub_configs = []
for conf in config['pubConfigs']:
if conf["type"].lower() in ["mqtt", "mongo", ]:
key = conf["key"]
value = conf["value"]
collection = conf["conectionCollection"]
pub_conf = db[collection].find_one({key: value}, {"_id": False})
pub_conf.update(conf)
pub_configs.append(pub_conf)
else:
pub_configs.append(conf)
config['pubConfigs'] = pub_configs
return config
# """
# {
# "MONGO_URI": "mongodb://192.168.3.220:21017",
# "MONGO_DATABASE": "ilens_thermal_app",
# "MONGO_COLLECTION": "janus_deployment_details",
# "MONGO_KEY": "deploymentId",
# "MONGO_VALUE": "ddd"
# }
# """
LOG_LEVEL = os.environ.get("LOG_LEVEL", default="INFO").upper()
LOG_HANDLER_NAME = os.environ.get("LOG_HANDLER_NAME", default="ilens-edge_engine")
BASE_LOG_PATH = os.environ.get('BASE_LOG_PATH',
default=os.path.join(os.getcwd(), "logs".format()))
if not os.path.isdir(BASE_LOG_PATH):
os.mkdir(BASE_LOG_PATH)
CONFIG_ENV = json.loads(os.environ.get('config', default=None))
sys.stdout.write("config->{} \n".format(json.dumps(CONFIG_ENV)))
MONGO_URI = CONFIG_ENV.get('MONGO_URI', None)
MONGO_DATABASE = CONFIG_ENV.get('MONGO_DATABASE', None)
MONGO_COLLECTION = CONFIG_ENV.get('MONGO_COLLECTION', None)
MONGO_KEY = CONFIG_ENV.get('MONGO_KEY', None)
MONGO_VALUE = CONFIG_ENV.get('MONGO_VALUE', None)
if MONGO_URI == None \
or MONGO_DATABASE is None \
or MONGO_COLLECTION is None \
or MONGO_KEY is None \
or MONGO_VALUE is None:
sys.stderr.write("invalid mongo config \n")
sys.exit(1)
state = os.environ.get('state',default='prod')
if state == 'dev':
with open('Edge_Config.json') as f:
EDGE_CONFIG = json.load(f)
else:
EDGE_CONFIG = get_config_from_mongo(
mongo_uri=MONGO_URI,
dbname=MONGO_DATABASE, basecollection=MONGO_COLLECTION,
key=MONGO_KEY, value=MONGO_VALUE
)
DEVICE_ID = EDGE_CONFIG["deviceId"]
if EDGE_CONFIG is None:
sys.stderr.write("invalid EDGE_CONFIG config \n")
sys.exit(1)
EDGE_CONFIG = load_conf(EDGE_CONFIG, mongo_uri=MONGO_URI,
dbname=MONGO_DATABASE)
DATA_PATH = EDGE_CONFIG["inputConf"].get('dataPath', os.path.join(os.getcwd(), "data".format()))
sys.stderr.write("Loading data from {} \n".format(DATA_PATH))
This diff is collapsed.
class LicenseModule:
private_key = "3139343831323738414d47454e3936363538373136"
encoding_algorithm = "HS256"
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from edge_engine.common.config import LOG_LEVEL, LOG_HANDLER_NAME, BASE_LOG_PATH
import logging
from logging.handlers import RotatingFileHandler
from logging import WARNING,INFO,DEBUG,ERROR
import os
DEFAULT_FORMAT = '%(asctime)s %(levelname)5s %(name)s %(message)s'
DEBUG_FORMAT = '%(asctime)s %(levelname)5s %(name)s [%(threadName)5s:%(filename)5s:%(funcName)5s():%(lineno)s] %(message)s'
EXTRA = {}
FORMATTER = DEFAULT_FORMAT
if LOG_LEVEL.strip() == "DEBUG":
FORMATTER = DEBUG_FORMAT
def get_logger(log_handler_name, extra=EXTRA):
"""
Purpose : To create logger .
:param log_handler_name: Name of the log handler.
:param extra: extra args for the logger
:return: logger object.
"""
log_path = os.path.join(BASE_LOG_PATH, log_handler_name + ".log")
logstash_temp = os.path.join(BASE_LOG_PATH, log_handler_name + ".db")
logger = logging.getLogger(log_handler_name)
logger.setLevel(LOG_LEVEL.strip().upper())
log_handler = logging.StreamHandler()
log_handler.setLevel(LOG_LEVEL)
formatter = logging.Formatter(FORMATTER)
log_handler.setFormatter(formatter)
handler = RotatingFileHandler(log_path, maxBytes=10485760,
backupCount=5)
handler.setFormatter(formatter)
logger.addHandler(log_handler)
logger.addHandler(handler)
logger = logging.LoggerAdapter(logger, extra)
return logger
logger = get_logger(LOG_HANDLER_NAME)
import os, time
from minio import Minio
from minio.error import ResponseError
from edge_engine.common.logsetup import logger
class MinioClient:
def __init__(self ,SECRET_KEY, ACCESS_KEY, BUCKET_NAME, LOCAL_DATA_PATH, MINIO_IP):
logger.info("Initalizing minioclient !!")
self.SECRET_KEY = SECRET_KEY
self.ACCESS_KEY = ACCESS_KEY
self.BUCKET_NAME = BUCKET_NAME
self.LOCAL_DATA_PATH = LOCAL_DATA_PATH
self.MINIO_IP = MINIO_IP
self.logfile = "./logs/videowrite.log"
self.minioClient = self.connect_to_minio()
self.create_bucket(self.BUCKET_NAME)
def connect_to_minio(self):
if self.SECRET_KEY is not None and self.ACCESS_KEY is not None:
logger.info("Connecting to Minio Service... !!! ")
minio_client = Minio(self.MINIO_IP, access_key = self.ACCESS_KEY, secret_key = self.SECRET_KEY,
region='us-east-1', secure=False)
return minio_client
else:
logger.info('Access Key and Secret Key String cannot be null')
raise Exception('Access Key and Secret Key String cannot be null')
def create_bucket(self, bucket_name):
try:
if bucket_name not in self.list_buckets():
logger.info("Creating bucket {}...".format(bucket_name))
self.minioClient.make_bucket(bucket_name, location="us-east-1")
else:
logger.info("Bucket already exists....")
except ResponseError as err:
logger.error(err)
def save_to_bucket(self, bucket_name, data_obj):
try:
with open(data_obj, 'rb') as file:
file_stat = os.stat(data_obj)
self.minioClient.put_object(bucket_name, data_obj.split(self.LOCAL_DATA_PATH)[1],
file, file_stat.st_size)
except ResponseError as err:
logger.error(err)
def list_buckets(self):
bucketobjects = self.minioClient.list_buckets()
bucketlist = []
for eachbucket in bucketobjects:
bucketlist.append(eachbucket.name)
return bucketlist
def read_write_logs(self):
try:
f = open(self.logfile)
except Exception as err:
print(err)
with open(self.logfile, "a") as startfile:
startfile.write("")
f = open(self.logfile)
return [line.split('\n')[0] for line in f]
def write_write_logs(self, log_str):
with open(self.logfile, "a") as my_file:
my_file.write(log_str + "\n")
def upload(self):
if self.LOCAL_DATA_PATH[-1]!='/':
self.LOCAL_DATA_PATH = self.LOCAL_DATA_PATH+"/"
while True:
listoffiles = [os.path.join(path, name) for path, subdirs, files in os.walk(self.LOCAL_DATA_PATH) for name in files]
listofwrittenfiles = self.read_write_logs()
listofnewfiles = list(set(listoffiles) - set(listofwrittenfiles))
for fileName in listofnewfiles:
try:
logger.info("Uploading {}..".format(fileName.split(self.LOCAL_DATA_PATH)[1]))
self.save_to_bucket(self.BUCKET_NAME, fileName)
self.write_write_logs(fileName)
except Exception as e:
logger.error(e)
time.sleep(5)
# if __name__=='__main__':
# SECRET_KEY = 'minioadmin'
# ACCESS_KEY = 'minioadmin'
# BUCKET_NAME = 'videobucket'
# MINIO_IP = '192.168.3.220:29000'
# LOCAL_DATA_PATH = "F:/GDrive Data/Downloads"
# obj = MinioClient(SECRET_KEY, ACCESS_KEY, BUCKET_NAME, LOCAL_DATA_PATH, MINIO_IP)
# obj.upload()
\ No newline at end of file
from edge_engine.ai.model.modelwraper import ModelWrapper
from abc import ABC, abstractmethod
class ModelWrapper(ABC):
def __init__(self, path=None):
"""Implement code to load mask_model here"""
pass
def _pre_process(self, x):
"""Implement code to process raw input into format required for mask_model inference here"""
return x
def _post_process(self, x):
"""Implement any code to post-process mask_model inference response here"""
return x
@abstractmethod
def _predict(self, x):
"""Implement core mask_model inference code here"""
pass
def predict(self, x):
pre_x = self._pre_process(x)
prediction = self._predict(pre_x)
result = self._post_process(prediction)
return result
# import the necessary packages
import cv2
import numpy as np
class GammaPreprocessor:
def __init__(self, gamma=1.0):
# creating Gamma table
self.invGamma = 1.0 / gamma
self.table = np.array([((i / 255.0) ** self.invGamma) * 255
for i in np.arange(0, 256)]).astype("uint8")
def preprocess(self, image):
return cv2.LUT(image, self.table)
# import the necessary packages
from keras.preprocessing.image import img_to_array
class ImageToArrayPreprocessor:
def __init__(self, dataFormat=None):
# store the image data format
self.dataFormat = dataFormat
def preprocess(self, image):
# apply the Keras utility function that correctly rearranges
# the dimensions of the image
return img_to_array(image, data_format=self.dataFormat)
# import the necessary packages
import cv2
class SimpleHistogramPreprocessor:
def __init__(self):
pass
def preprocess(self, image):
# Run Histogram simple Equalization
return cv2.equalizeHist(image)
# import the necessary packages
import cv2
class SimplePreprocessor:
def __init__(self, width, height, inter=cv2.INTER_AREA):
# store the target image width, height, and interpolation
# method used when resizing
self.width = width
self.height = height
self.inter = inter
def preprocess(self, image):
# resize the image to a fixed size, ignoring the aspect
# ratio
return cv2.resize(image, (self.width, self.height),
interpolation=self.inter)
import os
import sys
from edge_engine.common.constants import LicenseModule
from dateutil import parser
from datetime import datetime
from pymongo import MongoClient
from copy import deepcopy
import json
def licence_validator(payload):
try:
dt = parser.parse(payload['valid_till'])
now = datetime.now()
if (now > dt):
sys.stdout.write("Licence Expired \n".format())
sys.stdout.flush()
return False
return True
except KeyError as e:
sys.stderr.write("Error loading licence")
return False
def get_config_from_mongo(mongo_uri, dbname, basecollection,
key, value):
mongo = MongoClient(mongo_uri)
db = mongo[dbname]
config = db[basecollection].find_one({key: value}, {"_id": False})
return config
def load_conf(config, mongo_uri, dbname):
mongo = MongoClient(mongo_uri)
db = mongo[dbname]
pub_configs = []
for conf in config['pubConfigs']:
if conf["type"].lower() in ["mqtt", "mongo", ]:
key = conf["key"]
value = conf["value"]
collection = conf["conectionCollection"]
pub_conf = db[collection].find_one({key: value}, {"_id": False})
pub_conf.update(conf)
pub_configs.append(pub_conf)
else:
pub_configs.append(conf)
config['pubConfigs'] = pub_configs
return config
# """
# {
# "MONGO_URI": "mongodb://192.168.3.220:21017",
# "MONGO_DATABASE": "ilens_thermal_app",
# "MONGO_COLLECTION": "janus_deployment_details",
# "MONGO_KEY": "deploymentId",
# "MONGO_VALUE": "ddd"
# }
# """
LOG_LEVEL = os.environ.get("LOG_LEVEL", default="INFO").upper()
LOG_HANDLER_NAME = os.environ.get("LOG_HANDLER_NAME", default="ilens-edge_engine")
BASE_LOG_PATH = os.environ.get('BASE_LOG_PATH',
default=os.path.join(os.getcwd(), "logs".format()))
if not os.path.isdir(BASE_LOG_PATH):
os.mkdir(BASE_LOG_PATH)
CONFIG_ENV = json.loads(os.environ.get('config', default=None))
sys.stdout.write("config->{} \n".format(json.dumps(CONFIG_ENV)))
MONGO_URI = CONFIG_ENV.get('MONGO_URI', None)
MONGO_DATABASE = CONFIG_ENV.get('MONGO_DATABASE', None)
MONGO_COLLECTION = CONFIG_ENV.get('MONGO_COLLECTION', None)
MONGO_KEY = CONFIG_ENV.get('MONGO_KEY', None)
MONGO_VALUE = CONFIG_ENV.get('MONGO_VALUE', None)
if MONGO_URI == None \
or MONGO_DATABASE is None \
or MONGO_COLLECTION is None \
or MONGO_KEY is None \
or MONGO_VALUE is None:
sys.stderr.write("invalid mongo config \n")
sys.exit(1)
state = os.environ.get('state',default='prod')
if state == 'dev':
with open('Edge_Config.json') as f:
EDGE_CONFIG = json.load(f)
else:
EDGE_CONFIG = get_config_from_mongo(
mongo_uri=MONGO_URI,
dbname=MONGO_DATABASE, basecollection=MONGO_COLLECTION,
key=MONGO_KEY, value=MONGO_VALUE
)
DEVICE_ID = EDGE_CONFIG["deviceId"]
if EDGE_CONFIG is None:
sys.stderr.write("invalid EDGE_CONFIG config \n")
sys.exit(1)
EDGE_CONFIG = load_conf(EDGE_CONFIG, mongo_uri=MONGO_URI,
dbname=MONGO_DATABASE)
DATA_PATH = EDGE_CONFIG["inputConf"].get('dataPath', os.path.join(os.getcwd(), "data".format()))
sys.stderr.write("Loading data from {} \n".format(DATA_PATH))
This diff is collapsed.
class LicenseModule:
private_key = "3139343831323738414d47454e3936363538373136"
encoding_algorithm = "HS256"
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from edge_engine.common.config import LOG_LEVEL, LOG_HANDLER_NAME, BASE_LOG_PATH
import logging
from logging.handlers import RotatingFileHandler
from logging import WARNING,INFO,DEBUG,ERROR
import os
DEFAULT_FORMAT = '%(asctime)s %(levelname)5s %(name)s %(message)s'
DEBUG_FORMAT = '%(asctime)s %(levelname)5s %(name)s [%(threadName)5s:%(filename)5s:%(funcName)5s():%(lineno)s] %(message)s'
EXTRA = {}
FORMATTER = DEFAULT_FORMAT
if LOG_LEVEL.strip() == "DEBUG":
FORMATTER = DEBUG_FORMAT
def get_logger(log_handler_name, extra=EXTRA):
"""
Purpose : To create logger .
:param log_handler_name: Name of the log handler.
:param extra: extra args for the logger
:return: logger object.
"""
log_path = os.path.join(BASE_LOG_PATH, log_handler_name + ".log")
logstash_temp = os.path.join(BASE_LOG_PATH, log_handler_name + ".db")
logger = logging.getLogger(log_handler_name)
logger.setLevel(LOG_LEVEL.strip().upper())
log_handler = logging.StreamHandler()
log_handler.setLevel(LOG_LEVEL)
formatter = logging.Formatter(FORMATTER)
log_handler.setFormatter(formatter)
handler = RotatingFileHandler(log_path, maxBytes=10485760,
backupCount=5)
handler.setFormatter(formatter)
logger.addHandler(log_handler)
logger.addHandler(handler)
logger = logging.LoggerAdapter(logger, extra)
return logger
logger = get_logger(LOG_HANDLER_NAME)
import os, time
from minio import Minio
from minio.error import ResponseError
from edge_engine.common.logsetup import logger
class MinioClient:
def __init__(self ,SECRET_KEY, ACCESS_KEY, BUCKET_NAME, LOCAL_DATA_PATH, MINIO_IP):
logger.info("Initalizing minioclient !!")
self.SECRET_KEY = SECRET_KEY
self.ACCESS_KEY = ACCESS_KEY
self.BUCKET_NAME = BUCKET_NAME
self.LOCAL_DATA_PATH = LOCAL_DATA_PATH
self.MINIO_IP = MINIO_IP
self.logfile = "./logs/videowrite.log"
self.minioClient = self.connect_to_minio()
self.create_bucket(self.BUCKET_NAME)
def connect_to_minio(self):
if self.SECRET_KEY is not None and self.ACCESS_KEY is not None:
logger.info("Connecting to Minio Service... !!! ")
minio_client = Minio(self.MINIO_IP, access_key = self.ACCESS_KEY, secret_key = self.SECRET_KEY,
region='us-east-1', secure=False)
return minio_client
else:
logger.info('Access Key and Secret Key String cannot be null')
raise Exception('Access Key and Secret Key String cannot be null')
def create_bucket(self, bucket_name):
try:
if bucket_name not in self.list_buckets():
logger.info("Creating bucket {}...".format(bucket_name))
self.minioClient.make_bucket(bucket_name, location="us-east-1")
else:
logger.info("Bucket already exists....")
except ResponseError as err:
logger.error(err)
def save_to_bucket(self, bucket_name, data_obj):
try:
with open(data_obj, 'rb') as file:
file_stat = os.stat(data_obj)
self.minioClient.put_object(bucket_name, data_obj.split(self.LOCAL_DATA_PATH)[1],
file, file_stat.st_size)
except ResponseError as err:
logger.error(err)
def list_buckets(self):
bucketobjects = self.minioClient.list_buckets()
bucketlist = []
for eachbucket in bucketobjects:
bucketlist.append(eachbucket.name)
return bucketlist
def read_write_logs(self):
try:
f = open(self.logfile)
except Exception as err:
print(err)
with open(self.logfile, "a") as startfile:
startfile.write("")
f = open(self.logfile)
return [line.split('\n')[0] for line in f]
def write_write_logs(self, log_str):
with open(self.logfile, "a") as my_file:
my_file.write(log_str + "\n")
def upload(self):
if self.LOCAL_DATA_PATH[-1]!='/':
self.LOCAL_DATA_PATH = self.LOCAL_DATA_PATH+"/"
while True:
listoffiles = [os.path.join(path, name) for path, subdirs, files in os.walk(self.LOCAL_DATA_PATH) for name in files]
listofwrittenfiles = self.read_write_logs()
listofnewfiles = list(set(listoffiles) - set(listofwrittenfiles))
for fileName in listofnewfiles:
try:
logger.info("Uploading {}..".format(fileName.split(self.LOCAL_DATA_PATH)[1]))
self.save_to_bucket(self.BUCKET_NAME, fileName)
self.write_write_logs(fileName)
except Exception as e:
logger.error(e)
time.sleep(5)
# if __name__=='__main__':
# SECRET_KEY = 'minioadmin'
# ACCESS_KEY = 'minioadmin'
# BUCKET_NAME = 'videobucket'
# MINIO_IP = '192.168.3.220:29000'
# LOCAL_DATA_PATH = "F:/GDrive Data/Downloads"
# obj = MinioClient(SECRET_KEY, ACCESS_KEY, BUCKET_NAME, LOCAL_DATA_PATH, MINIO_IP)
# obj.upload()
\ No newline at end of file
from edge_engine.common.logsetup import logger
from edge_engine.common.config import EDGE_CONFIG
from edge_engine.streamio.datastream import MQTT
from edge_engine.streamio.datastream import VideoOutputStream
from edge_engine.streamio.datastream import FrameOutputStream
from edge_engine.streamio.datastream import FFMPEGOutputStream, OPENCVOutputStream
from edge_engine.streamio.datastream import MongoDataStreamOut
from edge_engine.streamio.videostream import ThreadedVideoStream
from edge_engine.streamio.videostream import FileVideoStream
from edge_engine.streamio.datastream import DummyPublisher
from edge_engine.streamio.frameProcessor import FrameProcessor
from edge_engine.common.minio_server import MinioClient
import json
from threading import Thread
import os
class Pubs():
def __init__(self):
self.mqtt_pub = DummyPublisher(name="MQTT")
self.frame_write = DummyPublisher(name="FRAME")
self.video_write = DummyPublisher(name="VIDEO")
self.mongo_write = DummyPublisher(name="MONGO")
self.rtp_write = DummyPublisher(name="RTP")
self.build_pubs()
if 'minioConfig' in EDGE_CONFIG.keys() and \
isinstance(EDGE_CONFIG["minioConfig"], dict):
self.minio_thread = self.start_minio(EDGE_CONFIG["minioConfig"])
@staticmethod
def start_minio(minio_conf):
obj = MinioClient(minio_conf['secretKey'], minio_conf['accessKey'],
minio_conf['bucketName'], minio_conf['localDataPath'],
minio_conf['ip'])
t = Thread(target=obj.upload)
t.start()
return t
def build_pubs(self):
logger.info("building publishers ")
state = os.environ.get('state', 'prod')
for conf in EDGE_CONFIG["pubConfigs"]:
if conf["type"].upper() == "MQTT":
self.mqtt_pub = MQTT(broker=conf["broker"], topic=conf["topic"], port=conf["port"]
, publish_hook=json.dumps)
elif conf["type"].upper() == "FRAMEWRITE":
self.frame_write = FrameOutputStream(
basepath=conf["basepath"],
iformat=conf["iformat"],
filenameFormat=conf["filenameFormat"],
publish_hook=None)
elif conf["type"].upper() == "VIDEOWRITE":
self.video_write = VideoOutputStream(basepath=conf["basepath"],
dims=conf["dims"],
filenameFormat=conf["filenameFormat"],
fps=conf["fps"], publish_hook=None)
elif conf["type"].upper() == "MONGO":
self.mongo_write = MongoDataStreamOut(host=conf["host"],
port=conf["port"],
dbname=conf["dbname"],
collection=conf["collection"],
keys=conf["keys"],
authsource=conf["authSource"],
username=conf["username"],
password=conf["password"],
publish_hook=None)
elif conf["type"].upper() == "RTP":
self.rtp_write = FFMPEGOutputStream(conf["ffmpegCmd"], conf["RTPEndpoint"], publish_hook=None)
elif conf["type"].upper() == "OPENCV":
logger.info('Dev Testing, Video output will be loaded on new window')
self.rtp_write = OPENCVOutputStream()
else:
logger.error("Unsupported publisher {}".format(conf["type"]))
class ExecutePipeline:
def __init__(self, model):
self.model = model
def run_model(self):
if EDGE_CONFIG["inputConf"]["sourceType"].lower() in ["rtsp", "usbcam"]:
logger.info("Selected input stream as Direct cv input")
self.threadedVideoStream = ThreadedVideoStream(stream_config=EDGE_CONFIG["inputConf"])
self.threadedVideoStream.start()
self.frameProcessor = FrameProcessor(stream=self.threadedVideoStream,
model=self.model)
elif EDGE_CONFIG["inputConf"]["sourceType"].lower() == "videofile":
self.fileVideoStream = FileVideoStream(stream_config=EDGE_CONFIG["inputConf"])
self.fileVideoStream.start()
self.frameProcessor = FrameProcessor(stream=self.fileVideoStream, model=self.model)
else:
raise ValueError("unsupported source {}".format(EDGE_CONFIG["inputConf"]["sourceType"]))
self.start_model()
def start_model(self):
self.frameProcessor.run_model()
from edge_engine.streamio.frameProcessor import FrameProcessor
from .datastreamprocessor import DataStreamProcessor
from edge_engine.streamio.datastream.datastreamwrapper import DataStreamWrapper
from edge_engine.streamio.datastream.mongodatastreamout import MongoDataStreamOut
from edge_engine.streamio.datastream.frameoutputstream import FrameOutputStream
from edge_engine.streamio.datastream.videooutputstream import VideoOutputStream
from edge_engine.streamio.datastream.mqttstream import MQTT
from edge_engine.streamio.datastream.ffmpegdata_streamout import FFMPEGOutputStream
from edge_engine.streamio.datastream.dummyouputstreamer import DummyPublisher
from edge_engine.streamio.datastream.opencvimshowout import OPENCVOutputStream
\ No newline at end of file
from abc import ABC, abstractmethod
class DataStreamWrapper(ABC):
def __init__(self):
"""Implement code to load mask_model here"""
pass
def publish(self, x):
"""Implement code to publish"""
return x
def subscribe(self,hook):
"""Implement code to subscribe"""
return None
\ No newline at end of file
from edge_engine.common.logsetup import logger
from edge_engine.streamio.datastream.datastreamwrapper import DataStreamWrapper
class DummyPublisher(DataStreamWrapper):
def __init__(self, name):
super().__init__()
self.name = name
def publish(self, x):
logger.warn(f"{self.name} Publisher is not configured but you are using it !!!!!!!!!")
from edge_engine.streamio.datastream.datastreamwrapper import DataStreamWrapper
import subprocess as sp
class FFMPEGOutputStream(DataStreamWrapper):
def __init__(self, ffmpeg_cmd, rtp_endpoint, publish_hook=None):
super().__init__()
self.ffmpeg_cmd = ffmpeg_cmd
self.rtp_endpoint = rtp_endpoint
self.ffmpeg_cmd.append(self.rtp_endpoint[0])
self.proc = sp.Popen(self.ffmpeg_cmd, stdin=sp.PIPE, shell=False)
self.publish_hook = publish_hook
def publish(self, x):
if self.publish_hook is not None:
x = self.publish_hook(x)
frame = x["frame"]
self.proc.stdin.write(frame.tostring())
self.proc.stdin.flush()
from edge_engine.streamio.datastream.datastreamwrapper import DataStreamWrapper
import cv2
import base64
import numpy as np
import os
from edge_engine.common.logsetup import logger
from datetime import datetime
class FrameOutputStream(DataStreamWrapper):
def __init__(self, basepath, iformat="jpg", filenameFormat="{deviceId}_{frameId}_{timestamp}", publish_hook=None):
super().__init__()
self.basepath = basepath
self.iformat = iformat
self.filenameFormat = filenameFormat
self.publish_hook = publish_hook
def publish(self, x):
if self.publish_hook is not None:
x= self.publish_hook(x)
frame = x["frame"]
frame = base64.b64decode(frame.split("data:image/jpeg;base64,")[1])
frame = np.fromstring(frame, np.uint8)
frame = cv2.imdecode(frame, cv2.IMREAD_COLOR)
path = os.path.join(self.basepath, datetime.now().date().isoformat())
if not os.path.isdir(path):
logger.info("Creating {} \n".format(path))
os.mkdir(path)
cv2.imwrite("{path}.{iformat}".format(path=os.path.join(path, self.filenameFormat.format(**x)),
iformat=self.iformat), frame)
return True
def subscribe(self, hook):
super().subscribe(hook)
from edge_engine.streamio.datastream.datastreamwrapper import DataStreamWrapper
from pymongo import MongoClient
class MongoDataStreamOut(DataStreamWrapper):
def __init__(self, host, port, dbname, collection, keys, authsource,username=None,password=None, publish_hook=None):
super().__init__()
self.host = host
self.port = port
self.dbname = dbname
self.username = username
self.password = password
self.collection = collection
self.publish_hook = publish_hook
self.mongo = MongoClient(host=host,
port=int(port),username=self.username,password=self.password)
self.db = self.mongo[dbname]
self.keys = keys
self.authsource = authsource
def subscribe(self, hook=None):
pass
def publish(self, data):
if self.publish_hook is not None:
data = self.publish_hook(data)
fin_dat = {}
for k, v in data.items():
if k in self.keys:
fin_dat[k] = v
self.db[self.collection].insert(fin_dat)
import paho.mqtt.client as paho
from edge_engine.streamio.datastream.datastreamwrapper import DataStreamWrapper
from edge_engine.common.logsetup import logger
from uuid import uuid4
import traceback
class MQTT(DataStreamWrapper):
@staticmethod
def on_connect(client, userdata, flags, rc):
logger.info("Connection returned with result code:" + str(rc))
@staticmethod
def on_disconnect(client, userdata, rc):
logger.info("Disconnection returned result:" + str(rc))
@staticmethod
def on_subscribe(client, userdata, mid, granted_qos):
logger.debug("Subscribing MQTT {} {} {} {}".format(client, userdata, mid, granted_qos))
def on_message(self, client, userdata, msg):
logger.debug("Received message, topic:" + msg.topic + "payload:" + str(msg.payload))
if self.subscribe_hook is not None:
self.subscribe_hook(msg.payload.decode())
def __init__(self, broker, port, topic, qos=2, subscribe_hook=None, publish_hook=None):
super().__init__()
self.broker = broker
self.port = int(port)
self.topic = topic
self.client_name = "{}".format(uuid4())
self.client = paho.Client(self.client_name)
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_subscribe = self.on_subscribe
self.client.on_message = self.on_message
self.client.connect(host=self.broker, port=self.port)
self.subscribe_hook = subscribe_hook
self.publish_hook = publish_hook
self.qos = qos
def subscribe(self, hook=None):
if hook is not None:
self.subscribe_hook =hook
self.client.subscribe((self.topic, self.qos))
self.client.loop_forever()
def publish(self, data):
try:
if self.publish_hook is not None:
data = self.publish_hook(data)
self.client.publish(self.topic, data)
except Exception as e:
logger.error(e)
logger.error(traceback.format_exc())
from edge_engine.streamio.datastream.datastreamwrapper import DataStreamWrapper
import subprocess as sp
import cv2
class OPENCVOutputStream(DataStreamWrapper):
def __init__(self):
super().__init__()
def publish(self, x):
cv2.imshow('Dev Test', x['frame'])
key = cv2.waitKey(1)
from edge_engine.streamio.datastream.datastreamwrapper import DataStreamWrapper
import cv2
import base64
import numpy as np
import os
from edge_engine.common.logsetup import logger
from datetime import datetime
class VideoOutputStream(DataStreamWrapper):
def __init__(self, basepath, dims, filenameFormat="{deviceId}_{timestamp}", fps=30, publish_hook=None):
super().__init__()
self.basepath = basepath
self.dims = (int(dims[0]),int(dims[1]))
self.fps = float(fps)
self.filenameFormat = filenameFormat
self.publish_hook = publish_hook
self.four_cc = cv2.VideoWriter_fourcc(*'mp4v')
self.out = None
def publish(self, x):
if self.publish_hook is not None:
x = self.publish_hook(x)
if len(x["metric"]) > 0:
if self.out is None:
path = os.path.join(self.basepath, datetime.now().date().isoformat())
if not os.path.isdir(path):
logger.info("Creating {} \n".format(path))
os.mkdir(path)
self.out = cv2.VideoWriter("{}.mp4".format(os.path.join(path, self.filenameFormat.format(**x))),
self.four_cc, self.fps, self.dims)
frame = x["frame"]
frame = base64.b64decode(frame.split("data:image/jpeg;base64,")[1])
frame = np.fromstring(frame, np.uint8)
frame = cv2.imdecode(frame, cv2.IMREAD_COLOR)
self.out.write(frame)
else:
if self.out is not None:
self.out.release()
self.out = None
return True
def subscribe(self, hook):
super().subscribe(hook)
from edge_engine.common.logsetup import logger
class DataStreamProcessor:
def __init__(self, model, subsciber, publishers=list()):
self.model = model
self.subsciber = subsciber
self.publishers = publishers
logger.info("Setting up frame processor !!")
def processstream(self, msg):
print(msg)
def run_model(self):
self.subsciber.subscribe(hook=self.processstream)
from edge_engine.common.logsetup import logger
from edge_engine.common.config import DEVICE_ID
from uuid import uuid4
import traceback
class FrameProcessor:
def __init__(self, stream, model):
self.model = model
self.stream = stream
logger.info("Setting up frame processor !!")
def run_model(self):
while self.stream.stream.isOpened():
try:
logger.debug("Getting frame mask_model")
frame = self.stream.read()
logger.debug("Running mask_model")
data = {"frame": frame[0], "frameId":frame[1] , "deviceId": "{}".format(DEVICE_ID)}
self.model.predict(data)
logger.debug("publishing mask_model output")
except Exception as e:
logger.error(e)
logger.error(traceback.format_exc())
\ No newline at end of file
from edge_engine.streamio.videostream.fps import FPS
from edge_engine.streamio.videostream.nvgstreamer import NVGstreamer
from edge_engine.streamio.videostream.simplevideostream import SimpleVideoStream
from edge_engine.streamio.videostream.threadedvideostream import ThreadedVideoStream
from edge_engine.streamio.videostream.filevideostream import FileVideoStream
\ No newline at end of file
# import the necessary packages
from threading import Thread
import sys
import cv2
import time
# import the Queue class from Python 3
if sys.version_info >= (3, 0):
from queue import Queue
# otherwise, import the Queue class for Python 2.7
else:
from Queue import Queue
class FileVideoStream:
def __init__(self, stream_config, transform=None):
# initialize the file video stream along with the boolean
# used to indicate if the thread should be stopped or not
self.transform = transform
self.stream_config = stream_config
# initialize the queue used to store frames read from
# the video file
self.build_pipeline()
def start(self):
# start a thread to read frames from the file video stream
self.thread.start()
return self
def build_cv_obj(self):
self.stream = cv2.VideoCapture(self.stream_config["uri"])
self.stopped = False
def build_pipeline(self):
self.build_cv_obj()
if "queueSize" not in self.stream_config:
self.stream_config["queueSize"] = 128
self.Q = Queue(maxsize=int(self.stream_config["queueSize"]))
# intialize thread
self.thread = Thread(target=self.update, args=())
self.thread.daemon = True
def is_opened(self):
return self.stream.isOpened()
def update(self):
# keep looping infinitely
count = 0
while True:
# if the thread indicator variable is set, stop the
# thread
if self.stopped:
break
# otherwise, ensure the queue has room in it
if not self.Q.full():
# read the next frame from the file
(grabbed, frame) = self.stream.read()
# if the `grabbed` boolean is `False`, then we have
# reached the end of the video file
if grabbed is False or frame is None:
# self.stopped = True
count = 0
self.build_cv_obj()
continue
# if there are transforms to be done, might as well
# do them on producer thread before handing back to
# consumer thread. ie. Usually the producer is so far
# ahead of consumer that we have time to spare.
#
# Python is not parallel but the transform operations
# are usually OpenCV native so release the GIL.
#
# Really just trying to avoid spinning up additional
# native threads and overheads of additional
# producer/consumer queues since this one was generally
# idle grabbing frames.
if self.transform:
frame = self.transform(frame)
# add the frame to the queue
self.Q.put((frame, count))
count = count+1
else:
time.sleep(0.1) # Rest for 10ms, we have a full queue
self.stream.release()
def read(self):
# return next frame in the queue
return self.Q.get()
# Insufficient to have consumer use while(more()) which does
# not take into account if the producer has reached end of
# file stream.
def running(self):
return self.more() or not self.stopped
def more(self):
# return True if there are still frames in the queue. If stream is not stopped, try to wait a moment
tries = 0
while self.Q.qsize() == 0 and not self.stopped and tries < 5:
time.sleep(0.1)
tries += 1
return self.Q.qsize() > 0
def stop(self):
# indicate that the thread should be stopped
self.stopped = True
# wait until stream resources are released (producer thread might be still grabbing frame)
self.thread.join()
# import the necessary packages
import datetime
class FPS:
def __init__(self):
# store the start time, end time, and total number of frames
# that were examined between the start and end intervals
self._start = None
self._end = None
self._numFrames = 0
def start(self):
# start the timer
self._start = datetime.datetime.now()
return self
def stop(self):
# stop the timer
self._end = datetime.datetime.now()
def update(self):
# increment the total number of frames examined during the
# start and end intervals
self._numFrames += 1
def elapsed(self):
# return the total number of seconds between the start and
# end interval
return (self._end - self._start).total_seconds()
def fps(self):
# compute the (approximate) frames per second
return self._numFrames / self.elapsed()
import cv2
class NVGstreamer:
def __init__(self, buildconfig):
self.width = 480
self.height = 640
self.latency = 0
self.framerate = "30/1"
self.fformat = "BGRx"
self.BUILD_CONFIG = {
"width": self.width,
"height": self.height,
"latency": self.latency,
"framerate": self.framerate,
"format": self.fformat,
"gstreamer": True
}
self.BUILD_CONFIG.update(buildconfig)
def open_cam_rtsp(self):
gst_str = ('rtspsrc location={uri} latency={latency} ! '
'rtph264depay ! h264parse ! omxh264dec ! '
'nvvidconv ! videorate ! '
'video/x-raw, width=(int){width}, height=(int){height}, '
'format=(string){format}, framerate=(fraction){framerate} ! '
'videoconvert ! appsink').format(**self.BUILD_CONFIG)
print(gst_str)
return cv2.VideoCapture(gst_str, cv2.CAP_GSTREAMER)
def open_cam_usb(self):
# We want to set width and height here, otherwise we could just do:
# return cv2.VideoCapture(dev)
gst_str = ('v4l2src device=/dev/video{uri} ! '
'video/x-raw, width=(int){width}, height=(int){height}, '
'format=(string){format}, framerate=(fraction){framerate} ! '
'videoconvert ! appsink').format(**self.BUILD_CONFIG)
print(gst_str)
return cv2.VideoCapture(gst_str, cv2.CAP_GSTREAMER)
def open_cam_onboard(self):
# On versions of L4T prior to 28.1, add 'flip-method=2' into gst_str
gst_str = ('nvcamerasrc ! '
'video/x-raw(memory:NVMM), '
'width=(int)2592, height=(int)1458, '
'format=(string)I420 ! '
'nvvidconv ! videorate ! '
'video/x-raw, width=(int){width}, height=(int){height}, '
'format=(string){format}, framerate=(fraction){framerate} !'
'videoconvert ! appsink').format(**self.BUILD_CONFIG)
print(gst_str)
return cv2.VideoCapture(gst_str, cv2.CAP_GSTREAMER)
def custom_pipeline(self):
gst_str = "{customGstPipelineString}".format(**self.BUILD_CONFIG)
print(gst_str)
return cv2.VideoCapture(gst_str, cv2.CAP_GSTREAMER)
def build_pipeline(self):
if self.BUILD_CONFIG["gStreamer"]!=True:
if self.BUILD_CONFIG["sourceType"] == "usbcam":
self.cap = cv2.VideoCapture(int(self.BUILD_CONFIG["uri"]))
else:
self.cap = cv2.VideoCapture(self.BUILD_CONFIG["uri"])
elif self.BUILD_CONFIG["sourceType"] == "rtsp":
self.cap = self.open_cam_rtsp()
elif self.BUILD_CONFIG["sourceType"] == "usbcam":
self.cap = self.open_cam_usb()
elif self.BUILD_CONFIG["sourceType"] == "onboard":
self.cap = self.open_cam_onboard()
elif self.BUILD_CONFIG["sourceType"] == "customPipeline":
self.cap = self.custom_pipeline()
else:
raise ValueError("unimplemented source {}".format(self.BUILD_CONFIG["sourceType"]))
def get_stream(self):
return self.cap
from edge_engine.common.logsetup import logger
from edge_engine.streamio.videostream import NVGstreamer
class SimpleVideoStream:
def __init__(self, stream_config, name="SimpleVideoStream"):
self.stream_config = stream_config
self.build_pipeline()
(self.grabbed, self.frame) = self.stream.read()
self.name = name
def build_pipeline(self):
self.gstreamer = NVGstreamer(self.stream_config)
self.gstreamer.build_pipeline()
self.stream = self.gstreamer.get_stream()
def start(self):
logger.info("Starting video stream ")
if self.stream.isOpened():
self.grabbed, self.frame = self.stream.read()
if self.grabbed is False:
logger.error("Empty Frame !!!! ")
logger.error("Error opening Capture !!!! ")
self.build_pipeline()
return self
else:
logger.error("Error opening Capture !!!! ")
self.build_pipeline()
def is_opened(self):
return self.stream.isOpened()
def read(self):
# return the frame most recently read
if self.stream.isOpened():
self.grabbed, self.frame = self.stream.read()
if self.grabbed is False:
logger.error("Empty Frame !!!! ")
raise ValueError("Empty Frame !!!! ")
return self.frame
else:
logger.error("Error opening Capture !!!! ")
raise ValueError("Error opening Capture !!!! ")
def stop(self):
if self.stream.isOpened():
self.stream.release()
# import the necessary packages
from threading import Thread
import time
from edge_engine.streamio.videostream import NVGstreamer
from edge_engine.common.logsetup import logger
class ThreadedVideoStream:
def __init__(self, stream_config, name="ThreadedVideoStream"):
# initialize the video camera stream and read the first frame
# from the stream
self.stream_config = stream_config
self.build_pipeline()
# self.stream = stream
(self.grabbed, self.frame) = self.stream.read()
# initialize the thread name
self.name = name
# initialize the variable used to indicate if the thread should
# be stopped
self.stopped = False
def build_pipeline(self):
self.gstreamer = NVGstreamer(self.stream_config)
self.gstreamer.build_pipeline()
self.stream = self.gstreamer.get_stream()
def start(self):
# start the thread to read frames from the video stream
t = Thread(target=self.update, name=self.name, args=())
t.daemon = True
t.start()
return self
def update(self):
# keep looping infinitely until the thread is stopped
while True:
# if the thread indicator variable is set, stop the thread
if self.stopped:
return
# otherwise, read the next frame from the stream
(self.grabbed, self.frame) = self.stream.read()
if self.grabbed is False or self.frame is None:
logger.error("Empty Frame !!!! ")
logger.error("Error opening Capture !!!! ")
self.build_pipeline()
def read(self):
# return the frame most recently read
return self.frame
def stop(self):
# indicate that the thread should be stopped
self.stopped = True
time.sleep(0.2)
self.stream.release()
# Copyright 2019 KnowledgeLens pvt Ltd.
VERSION = '1.0.0.alpha'
from edge_engine.common.logsetup import logger
from edge_engine.common.config import EDGE_CONFIG
from edge_engine.streamio.datastream import MQTT
from edge_engine.streamio.datastream import VideoOutputStream
from edge_engine.streamio.datastream import FrameOutputStream
from edge_engine.streamio.datastream import FFMPEGOutputStream, OPENCVOutputStream
from edge_engine.streamio.datastream import MongoDataStreamOut
from edge_engine.streamio.videostream import ThreadedVideoStream
from edge_engine.streamio.videostream import FileVideoStream
from edge_engine.streamio.datastream import DummyPublisher
from edge_engine.streamio.frameProcessor import FrameProcessor
from edge_engine.common.minio_server import MinioClient
import json
from threading import Thread
import os
class Pubs():
def __init__(self):
self.mqtt_pub = DummyPublisher(name="MQTT")
self.frame_write = DummyPublisher(name="FRAME")
self.video_write = DummyPublisher(name="VIDEO")
self.mongo_write = DummyPublisher(name="MONGO")
self.rtp_write = DummyPublisher(name="RTP")
self.build_pubs()
if 'minioConfig' in EDGE_CONFIG.keys() and \
isinstance(EDGE_CONFIG["minioConfig"], dict):
self.minio_thread = self.start_minio(EDGE_CONFIG["minioConfig"])
@staticmethod
def start_minio(minio_conf):
obj = MinioClient(minio_conf['secretKey'], minio_conf['accessKey'],
minio_conf['bucketName'], minio_conf['localDataPath'],
minio_conf['ip'])
t = Thread(target=obj.upload)
t.start()
return t
def build_pubs(self):
logger.info("building publishers ")
state = os.environ.get('state', 'prod')
for conf in EDGE_CONFIG["pubConfigs"]:
if conf["type"].upper() == "MQTT":
self.mqtt_pub = MQTT(broker=conf["broker"], topic=conf["topic"], port=conf["port"]
, publish_hook=json.dumps)
elif conf["type"].upper() == "FRAMEWRITE":
self.frame_write = FrameOutputStream(
basepath=conf["basepath"],
iformat=conf["iformat"],
filenameFormat=conf["filenameFormat"],
publish_hook=None)
elif conf["type"].upper() == "VIDEOWRITE":
self.video_write = VideoOutputStream(basepath=conf["basepath"],
dims=conf["dims"],
filenameFormat=conf["filenameFormat"],
fps=conf["fps"], publish_hook=None)
elif conf["type"].upper() == "MONGO":
self.mongo_write = MongoDataStreamOut(host=conf["host"],
port=conf["port"],
dbname=conf["dbname"],
collection=conf["collection"],
keys=conf["keys"],
authsource=conf["authSource"],
username=conf["username"],
password=conf["password"],
publish_hook=None)
elif conf["type"].upper() == "RTP":
self.rtp_write = FFMPEGOutputStream(conf["ffmpegCmd"], conf["RTPEndpoint"], publish_hook=None)
elif conf["type"].upper() == "OPENCV":
logger.info('Dev Testing, Video output will be loaded on new window')
self.rtp_write = OPENCVOutputStream()
else:
logger.error("Unsupported publisher {}".format(conf["type"]))
class ExecutePipeline:
def __init__(self, model):
self.model = model
def run_model(self):
if EDGE_CONFIG["inputConf"]["sourceType"].lower() in ["rtsp", "usbcam"]:
logger.info("Selected input stream as Direct cv input")
self.threadedVideoStream = ThreadedVideoStream(stream_config=EDGE_CONFIG["inputConf"])
self.threadedVideoStream.start()
self.frameProcessor = FrameProcessor(stream=self.threadedVideoStream,
model=self.model)
elif EDGE_CONFIG["inputConf"]["sourceType"].lower() == "videofile":
self.fileVideoStream = FileVideoStream(stream_config=EDGE_CONFIG["inputConf"])
self.fileVideoStream.start()
self.frameProcessor = FrameProcessor(stream=self.fileVideoStream, model=self.model)
else:
raise ValueError("unsupported source {}".format(EDGE_CONFIG["inputConf"]["sourceType"]))
self.start_model()
def start_model(self):
self.frameProcessor.run_model()
from edge_engine.streamio.frameProcessor import FrameProcessor
from .datastreamprocessor import DataStreamProcessor
from edge_engine.streamio.datastream.datastreamwrapper import DataStreamWrapper
from edge_engine.streamio.datastream.mongodatastreamout import MongoDataStreamOut
from edge_engine.streamio.datastream.frameoutputstream import FrameOutputStream
from edge_engine.streamio.datastream.videooutputstream import VideoOutputStream
from edge_engine.streamio.datastream.mqttstream import MQTT
from edge_engine.streamio.datastream.ffmpegdata_streamout import FFMPEGOutputStream
from edge_engine.streamio.datastream.dummyouputstreamer import DummyPublisher
from edge_engine.streamio.datastream.opencvimshowout import OPENCVOutputStream
\ No newline at end of file
from abc import ABC, abstractmethod
class DataStreamWrapper(ABC):
def __init__(self):
"""Implement code to load mask_model here"""
pass
def publish(self, x):
"""Implement code to publish"""
return x
def subscribe(self,hook):
"""Implement code to subscribe"""
return None
\ No newline at end of file
from edge_engine.common.logsetup import logger
from edge_engine.streamio.datastream.datastreamwrapper import DataStreamWrapper
class DummyPublisher(DataStreamWrapper):
def __init__(self, name):
super().__init__()
self.name = name
def publish(self, x):
logger.warn(f"{self.name} Publisher is not configured but you are using it !!!!!!!!!")
from edge_engine.streamio.datastream.datastreamwrapper import DataStreamWrapper
import subprocess as sp
class FFMPEGOutputStream(DataStreamWrapper):
def __init__(self, ffmpeg_cmd, rtp_endpoint, publish_hook=None):
super().__init__()
self.ffmpeg_cmd = ffmpeg_cmd
self.rtp_endpoint = rtp_endpoint
self.ffmpeg_cmd.append(self.rtp_endpoint[0])
self.proc = sp.Popen(self.ffmpeg_cmd, stdin=sp.PIPE, shell=False)
self.publish_hook = publish_hook
def publish(self, x):
if self.publish_hook is not None:
x = self.publish_hook(x)
frame = x["frame"]
self.proc.stdin.write(frame.tostring())
self.proc.stdin.flush()
from edge_engine.streamio.datastream.datastreamwrapper import DataStreamWrapper
import cv2
import base64
import numpy as np
import os
from edge_engine.common.logsetup import logger
from datetime import datetime
class FrameOutputStream(DataStreamWrapper):
def __init__(self, basepath, iformat="jpg", filenameFormat="{deviceId}_{frameId}_{timestamp}", publish_hook=None):
super().__init__()
self.basepath = basepath
self.iformat = iformat
self.filenameFormat = filenameFormat
self.publish_hook = publish_hook
def publish(self, x):
if self.publish_hook is not None:
x= self.publish_hook(x)
frame = x["frame"]
frame = base64.b64decode(frame.split("data:image/jpeg;base64,")[1])
frame = np.fromstring(frame, np.uint8)
frame = cv2.imdecode(frame, cv2.IMREAD_COLOR)
path = os.path.join(self.basepath, datetime.now().date().isoformat())
if not os.path.isdir(path):
logger.info("Creating {} \n".format(path))
os.mkdir(path)
cv2.imwrite("{path}.{iformat}".format(path=os.path.join(path, self.filenameFormat.format(**x)),
iformat=self.iformat), frame)
return True
def subscribe(self, hook):
super().subscribe(hook)
from edge_engine.streamio.datastream.datastreamwrapper import DataStreamWrapper
from pymongo import MongoClient
class MongoDataStreamOut(DataStreamWrapper):
def __init__(self, host, port, dbname, collection, keys, authsource,username=None,password=None, publish_hook=None):
super().__init__()
self.host = host
self.port = port
self.dbname = dbname
self.username = username
self.password = password
self.collection = collection
self.publish_hook = publish_hook
self.mongo = MongoClient(host=host,
port=int(port),username=self.username,password=self.password)
self.db = self.mongo[dbname]
self.keys = keys
self.authsource = authsource
def subscribe(self, hook=None):
pass
def publish(self, data):
if self.publish_hook is not None:
data = self.publish_hook(data)
fin_dat = {}
for k, v in data.items():
if k in self.keys:
fin_dat[k] = v
self.db[self.collection].insert(fin_dat)
import paho.mqtt.client as paho
from edge_engine.streamio.datastream.datastreamwrapper import DataStreamWrapper
from edge_engine.common.logsetup import logger
from uuid import uuid4
import traceback
class MQTT(DataStreamWrapper):
@staticmethod
def on_connect(client, userdata, flags, rc):
logger.info("Connection returned with result code:" + str(rc))
@staticmethod
def on_disconnect(client, userdata, rc):
logger.info("Disconnection returned result:" + str(rc))
@staticmethod
def on_subscribe(client, userdata, mid, granted_qos):
logger.debug("Subscribing MQTT {} {} {} {}".format(client, userdata, mid, granted_qos))
def on_message(self, client, userdata, msg):
logger.debug("Received message, topic:" + msg.topic + "payload:" + str(msg.payload))
if self.subscribe_hook is not None:
self.subscribe_hook(msg.payload.decode())
def __init__(self, broker, port, topic, qos=2, subscribe_hook=None, publish_hook=None):
super().__init__()
self.broker = broker
self.port = int(port)
self.topic = topic
self.client_name = "{}".format(uuid4())
self.client = paho.Client(self.client_name)
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_subscribe = self.on_subscribe
self.client.on_message = self.on_message
self.client.connect(host=self.broker, port=self.port)
self.subscribe_hook = subscribe_hook
self.publish_hook = publish_hook
self.qos = qos
def subscribe(self, hook=None):
if hook is not None:
self.subscribe_hook =hook
self.client.subscribe((self.topic, self.qos))
self.client.loop_forever()
def publish(self, data):
try:
if self.publish_hook is not None:
data = self.publish_hook(data)
self.client.publish(self.topic, data)
except Exception as e:
logger.error(e)
logger.error(traceback.format_exc())
from edge_engine.streamio.datastream.datastreamwrapper import DataStreamWrapper
import subprocess as sp
import cv2
class OPENCVOutputStream(DataStreamWrapper):
def __init__(self):
super().__init__()
def publish(self, x):
cv2.imshow('Dev Test', x['frame'])
key = cv2.waitKey(1)
from edge_engine.streamio.datastream.datastreamwrapper import DataStreamWrapper
import cv2
import base64
import numpy as np
import os
from edge_engine.common.logsetup import logger
from datetime import datetime
class VideoOutputStream(DataStreamWrapper):
def __init__(self, basepath, dims, filenameFormat="{deviceId}_{timestamp}", fps=30, publish_hook=None):
super().__init__()
self.basepath = basepath
self.dims = (int(dims[0]),int(dims[1]))
self.fps = float(fps)
self.filenameFormat = filenameFormat
self.publish_hook = publish_hook
self.four_cc = cv2.VideoWriter_fourcc(*'mp4v')
self.out = None
def publish(self, x):
if self.publish_hook is not None:
x = self.publish_hook(x)
if len(x["metric"]) > 0:
if self.out is None:
path = os.path.join(self.basepath, datetime.now().date().isoformat())
if not os.path.isdir(path):
logger.info("Creating {} \n".format(path))
os.mkdir(path)
self.out = cv2.VideoWriter("{}.mp4".format(os.path.join(path, self.filenameFormat.format(**x))),
self.four_cc, self.fps, self.dims)
frame = x["frame"]
frame = base64.b64decode(frame.split("data:image/jpeg;base64,")[1])
frame = np.fromstring(frame, np.uint8)
frame = cv2.imdecode(frame, cv2.IMREAD_COLOR)
self.out.write(frame)
else:
if self.out is not None:
self.out.release()
self.out = None
return True
def subscribe(self, hook):
super().subscribe(hook)
from edge_engine.common.logsetup import logger
class DataStreamProcessor:
def __init__(self, model, subsciber, publishers=list()):
self.model = model
self.subsciber = subsciber
self.publishers = publishers
logger.info("Setting up frame processor !!")
def processstream(self, msg):
print(msg)
def run_model(self):
self.subsciber.subscribe(hook=self.processstream)
from edge_engine.common.logsetup import logger
from edge_engine.common.config import DEVICE_ID
from uuid import uuid4
import traceback
class FrameProcessor:
def __init__(self, stream, model):
self.model = model
self.stream = stream
logger.info("Setting up frame processor !!")
def run_model(self):
while self.stream.stream.isOpened():
try:
logger.debug("Getting frame mask_model")
frame = self.stream.read()
logger.debug("Running mask_model")
data = {"frame": frame[0], "frameId":frame[1] , "deviceId": "{}".format(DEVICE_ID)}
self.model.predict(data)
logger.debug("publishing mask_model output")
except Exception as e:
logger.error(e)
logger.error(traceback.format_exc())
\ No newline at end of file
from edge_engine.streamio.videostream.fps import FPS
from edge_engine.streamio.videostream.nvgstreamer import NVGstreamer
from edge_engine.streamio.videostream.simplevideostream import SimpleVideoStream
from edge_engine.streamio.videostream.threadedvideostream import ThreadedVideoStream
from edge_engine.streamio.videostream.filevideostream import FileVideoStream
\ No newline at end of file
# import the necessary packages
from threading import Thread
import sys
import cv2
import time
# import the Queue class from Python 3
if sys.version_info >= (3, 0):
from queue import Queue
# otherwise, import the Queue class for Python 2.7
else:
from Queue import Queue
class FileVideoStream:
def __init__(self, stream_config, transform=None):
# initialize the file video stream along with the boolean
# used to indicate if the thread should be stopped or not
self.transform = transform
self.stream_config = stream_config
# initialize the queue used to store frames read from
# the video file
self.build_pipeline()
def start(self):
# start a thread to read frames from the file video stream
self.thread.start()
return self
def build_cv_obj(self):
self.stream = cv2.VideoCapture(self.stream_config["uri"])
self.stopped = False
def build_pipeline(self):
self.build_cv_obj()
if "queueSize" not in self.stream_config:
self.stream_config["queueSize"] = 128
self.Q = Queue(maxsize=int(self.stream_config["queueSize"]))
# intialize thread
self.thread = Thread(target=self.update, args=())
self.thread.daemon = True
def is_opened(self):
return self.stream.isOpened()
def update(self):
# keep looping infinitely
count = 0
while True:
# if the thread indicator variable is set, stop the
# thread
if self.stopped:
break
# otherwise, ensure the queue has room in it
if not self.Q.full():
# read the next frame from the file
(grabbed, frame) = self.stream.read()
# if the `grabbed` boolean is `False`, then we have
# reached the end of the video file
if grabbed is False or frame is None:
# self.stopped = True
count = 0
self.build_cv_obj()
continue
# if there are transforms to be done, might as well
# do them on producer thread before handing back to
# consumer thread. ie. Usually the producer is so far
# ahead of consumer that we have time to spare.
#
# Python is not parallel but the transform operations
# are usually OpenCV native so release the GIL.
#
# Really just trying to avoid spinning up additional
# native threads and overheads of additional
# producer/consumer queues since this one was generally
# idle grabbing frames.
if self.transform:
frame = self.transform(frame)
# add the frame to the queue
self.Q.put((frame, count))
count = count+1
else:
time.sleep(0.1) # Rest for 10ms, we have a full queue
self.stream.release()
def read(self):
# return next frame in the queue
return self.Q.get()
# Insufficient to have consumer use while(more()) which does
# not take into account if the producer has reached end of
# file stream.
def running(self):
return self.more() or not self.stopped
def more(self):
# return True if there are still frames in the queue. If stream is not stopped, try to wait a moment
tries = 0
while self.Q.qsize() == 0 and not self.stopped and tries < 5:
time.sleep(0.1)
tries += 1
return self.Q.qsize() > 0
def stop(self):
# indicate that the thread should be stopped
self.stopped = True
# wait until stream resources are released (producer thread might be still grabbing frame)
self.thread.join()
# import the necessary packages
import datetime
class FPS:
def __init__(self):
# store the start time, end time, and total number of frames
# that were examined between the start and end intervals
self._start = None
self._end = None
self._numFrames = 0
def start(self):
# start the timer
self._start = datetime.datetime.now()
return self
def stop(self):
# stop the timer
self._end = datetime.datetime.now()
def update(self):
# increment the total number of frames examined during the
# start and end intervals
self._numFrames += 1
def elapsed(self):
# return the total number of seconds between the start and
# end interval
return (self._end - self._start).total_seconds()
def fps(self):
# compute the (approximate) frames per second
return self._numFrames / self.elapsed()
import cv2
class NVGstreamer:
def __init__(self, buildconfig):
self.width = 480
self.height = 640
self.latency = 0
self.framerate = "30/1"
self.fformat = "BGRx"
self.BUILD_CONFIG = {
"width": self.width,
"height": self.height,
"latency": self.latency,
"framerate": self.framerate,
"format": self.fformat,
"gstreamer": True
}
self.BUILD_CONFIG.update(buildconfig)
def open_cam_rtsp(self):
gst_str = ('rtspsrc location={uri} latency={latency} ! '
'rtph264depay ! h264parse ! omxh264dec ! '
'nvvidconv ! videorate ! '
'video/x-raw, width=(int){width}, height=(int){height}, '
'format=(string){format}, framerate=(fraction){framerate} ! '
'videoconvert ! appsink').format(**self.BUILD_CONFIG)
print(gst_str)
return cv2.VideoCapture(gst_str, cv2.CAP_GSTREAMER)
def open_cam_usb(self):
# We want to set width and height here, otherwise we could just do:
# return cv2.VideoCapture(dev)
gst_str = ('v4l2src device=/dev/video{uri} ! '
'video/x-raw, width=(int){width}, height=(int){height}, '
'format=(string){format}, framerate=(fraction){framerate} ! '
'videoconvert ! appsink').format(**self.BUILD_CONFIG)
print(gst_str)
return cv2.VideoCapture(gst_str, cv2.CAP_GSTREAMER)
def open_cam_onboard(self):
# On versions of L4T prior to 28.1, add 'flip-method=2' into gst_str
gst_str = ('nvcamerasrc ! '
'video/x-raw(memory:NVMM), '
'width=(int)2592, height=(int)1458, '
'format=(string)I420 ! '
'nvvidconv ! videorate ! '
'video/x-raw, width=(int){width}, height=(int){height}, '
'format=(string){format}, framerate=(fraction){framerate} !'
'videoconvert ! appsink').format(**self.BUILD_CONFIG)
print(gst_str)
return cv2.VideoCapture(gst_str, cv2.CAP_GSTREAMER)
def custom_pipeline(self):
gst_str = "{customGstPipelineString}".format(**self.BUILD_CONFIG)
print(gst_str)
return cv2.VideoCapture(gst_str, cv2.CAP_GSTREAMER)
def build_pipeline(self):
if self.BUILD_CONFIG["gStreamer"]!=True:
if self.BUILD_CONFIG["sourceType"] == "usbcam":
self.cap = cv2.VideoCapture(int(self.BUILD_CONFIG["uri"]))
else:
self.cap = cv2.VideoCapture(self.BUILD_CONFIG["uri"])
elif self.BUILD_CONFIG["sourceType"] == "rtsp":
self.cap = self.open_cam_rtsp()
elif self.BUILD_CONFIG["sourceType"] == "usbcam":
self.cap = self.open_cam_usb()
elif self.BUILD_CONFIG["sourceType"] == "onboard":
self.cap = self.open_cam_onboard()
elif self.BUILD_CONFIG["sourceType"] == "customPipeline":
self.cap = self.custom_pipeline()
else:
raise ValueError("unimplemented source {}".format(self.BUILD_CONFIG["sourceType"]))
def get_stream(self):
return self.cap
from edge_engine.common.logsetup import logger
from edge_engine.streamio.videostream import NVGstreamer
class SimpleVideoStream:
def __init__(self, stream_config, name="SimpleVideoStream"):
self.stream_config = stream_config
self.build_pipeline()
(self.grabbed, self.frame) = self.stream.read()
self.name = name
def build_pipeline(self):
self.gstreamer = NVGstreamer(self.stream_config)
self.gstreamer.build_pipeline()
self.stream = self.gstreamer.get_stream()
def start(self):
logger.info("Starting video stream ")
if self.stream.isOpened():
self.grabbed, self.frame = self.stream.read()
if self.grabbed is False:
logger.error("Empty Frame !!!! ")
logger.error("Error opening Capture !!!! ")
self.build_pipeline()
return self
else:
logger.error("Error opening Capture !!!! ")
self.build_pipeline()
def is_opened(self):
return self.stream.isOpened()
def read(self):
# return the frame most recently read
if self.stream.isOpened():
self.grabbed, self.frame = self.stream.read()
if self.grabbed is False:
logger.error("Empty Frame !!!! ")
raise ValueError("Empty Frame !!!! ")
return self.frame
else:
logger.error("Error opening Capture !!!! ")
raise ValueError("Error opening Capture !!!! ")
def stop(self):
if self.stream.isOpened():
self.stream.release()
# import the necessary packages
from threading import Thread
import time
from edge_engine.streamio.videostream import NVGstreamer
from edge_engine.common.logsetup import logger
class ThreadedVideoStream:
def __init__(self, stream_config, name="ThreadedVideoStream"):
# initialize the video camera stream and read the first frame
# from the stream
self.stream_config = stream_config
self.build_pipeline()
# self.stream = stream
(self.grabbed, self.frame) = self.stream.read()
# initialize the thread name
self.name = name
# initialize the variable used to indicate if the thread should
# be stopped
self.stopped = False
def build_pipeline(self):
self.gstreamer = NVGstreamer(self.stream_config)
self.gstreamer.build_pipeline()
self.stream = self.gstreamer.get_stream()
def start(self):
# start the thread to read frames from the video stream
t = Thread(target=self.update, name=self.name, args=())
t.daemon = True
t.start()
return self
def update(self):
# keep looping infinitely until the thread is stopped
while True:
# if the thread indicator variable is set, stop the thread
if self.stopped:
return
# otherwise, read the next frame from the stream
(self.grabbed, self.frame) = self.stream.read()
if self.grabbed is False or self.frame is None:
logger.error("Empty Frame !!!! ")
logger.error("Error opening Capture !!!! ")
self.build_pipeline()
def read(self):
# return the frame most recently read
return self.frame
def stop(self):
# indicate that the thread should be stopped
self.stopped = True
time.sleep(0.2)
self.stream.release()
# Copyright 2019 KnowledgeLens pvt Ltd.
VERSION = '1.0.0.alpha'
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
pymongo~=3.10.1
# opencv-python~=4.2.0.34
# requests~=2.23.0
cachetools~=4.1.0
wincertstore==0.2
opencv-contrib-python==4.2.0.34
matplotlib
imutils==0.5.3
certifi==2020.6.20
\ No newline at end of file
import os
import sys
import json
from pymongo import MongoClient
MAIN_OS_VARIABLE = json.loads(os.environ.get('config'))
if MAIN_OS_VARIABLE is None:
sys.stderr.write("Configuration not found...")
sys.stderr.write("Exiting....")
sys.exit(1)
MONGO_URI = MAIN_OS_VARIABLE.get('MONGO_URI')
MONGO_SERVICE_DB = MAIN_OS_VARIABLE.get('MONGO_DB')
MONGO_SERVICE_COLL = MAIN_OS_VARIABLE.get('MONGO_COLL')
MONGO_DB_OBJ = MongoClient(MONGO_URI)[MONGO_SERVICE_DB]
HOST_CONFIG = MONGO_DB_OBJ[MONGO_SERVICE_COLL].find_one({'configId': 'hostConfig'}).get('config')
APP_MONGO_COLLECTION = MONGO_DB_OBJ[MONGO_SERVICE_COLL].find_one({'configId': 'appMongoConfig'}).get('config')
JANUS_CONFIG = MONGO_DB_OBJ[MONGO_SERVICE_COLL].find_one({'configId': 'janusConfig'}).get('config')
class ModelConstants:
sink_layer = {'cls': 'cls_branch_concat_1/concat', 'bbox': 'loc_branch_concat_1/concat'}
model_detector_pth = "./data/model/face_mask_detection.xml"
id2class = {0: 'Mask', 1: 'No Mask'}
id2bool = {0: True, 1: False}
exec_net_device_name = 'CPU'
fallback_initial_temperature = 98.3
net_precision = 'FP32'
class EventConstants:
yellow_cylinder = 'yellowCylinder'
green_cylinder = 'greenCylinder'
import cv2
import base64
import traceback
from yolov5processor.infer import ExecuteInference
import time
import cv2
from edge_engine.common.logsetup import logger
from edge_engine.ai.model.modelwraper import ModelWrapper
import json
import os
from scripts.utils.infocenter import InfoCenter
from scripts.utils.cancounter import CanCounter
try:
import urlparse
except ImportError:
import urllib.parse as urlparse
class SRF_Cans(ModelWrapper):
def __init__(self, config, pubs, device_id):
super().__init__()
"""
init function
"""
self.config = config["config"]
self.type = config['inputConf']['sourceType']
with open('assests/cans.json', 'r') as f:
self.dets = json.loads(f.read())
self.rtp = pubs.rtp_write
self.ic = InfoCenter(device_id=device_id)
logger.info("Loading model to the device")
self.can_counter_obj = CanCounter()
self.base_model_path = 'assests/'
print("[INFO] loading yolov5-Cans Detection Model")
self.model_detector_pth = os.path.join(self.base_model_path, "cans_yolov5s.pt")
self.yp = ExecuteInference(weight=self.model_detector_pth)
self.pre_time = time.time()
def _pre_process(self, x):
"""
Do preprocessing here, if any
:param x: payload
:return: payload
"""
return x
def _post_process(self, x):
"""
Apply post processing here, if any
:param x: payload
:return: payload
"""
self.rtp.publish(x) # video stream
return x
def _predict(self, x):
try:
time.sleep(0.02)
frame = x['frame']
frame_id = x['frameId']
frame = cv2.resize(frame, (640, 400))
lis = self._type(self.type, frame, frame_id)
frame = self.can_counter_obj.process_frame(frame, lis=lis, info_center=self.ic)
curr_time = time.time()
logger.info("Time for 1 frame: {}".format(curr_time - self.pre_time))
self.pre_time = curr_time
x['frame'] = cv2.resize(frame, (self.config.get('FRAME_WIDTH'), self.config.get('FRAME_HEIGHT')))
return x
except Exception as e:
logger.error(f"Error: {e}", exc_info=True)
x['frame'] = cv2.resize(x['frame'], (self.config.get('FRAME_WIDTH'), self.config.get('FRAME_HEIGHT')))
return x
def _type(self, video_type, frame, frame_id):
if video_type not in ['videofile']:
return self.yp.predict(frame)
print("Running without")
return self.dets[int(frame_id)][str(frame_id)]['detections']
# from.facenet_lite import FaceNetLite
import cv2
class CanCounter:
def __init__(self):
self.all_pred = []
self.count = 0
self.max_val = 0
self.length_list = []
self.frame_count = 0
self.send_frame = None
def process_frame(self, frame, lis, info_center):
pred = []
for i in lis:
conf = "{:.2f}".format(int(i['conf'] * 100))
if int(i['conf'] * 100) > 60:
label = 0
try:
label = int(i['class'].item())
except:
label = int(i['class'])
det = i['points']
if int(det[1]) > 208 and label == 0:
# print(i)
pred.append(i)
if not pred and len(self.all_pred) > 0:
self.all_pred.append(pred)
self.length_list.append(0)
for det in pred:
conf = "{:.2f}".format(int(det['conf'] * 100))
if int(det['conf'] * 100) > 60:
label = 0
try:
label = int(det['class'].item())
except:
label = int(det['class'])
det = det['points']
if pred:
self.all_pred.append(pred)
self.length_list.append(int(len(pred)))
if label == 0:
cv2.rectangle(frame, (int(det[0]), int(det[1])), (int(det[2]), int(det[3])), (0, 0, 255), 2)
print("done")
elif label == 1:
cv2.rectangle(frame, (int(det[0]), int(det[1])), (int(det[2]), int(det[3])), (255, 255, 0), 2)
length = len(self.all_pred)
for i in range(len(self.all_pred)):
if len(self.all_pred[i]) > self.max_val:
self.send_frame = cv2.resize(frame, (64, 64))
self.max_val = max(self.max_val, len(self.all_pred[i]))
if length > 51:
res = sum(self.length_list[-20:])
if res == 0:
self.count = self.count + self.max_val
self.all_pred = []
if self.max_val in [9, 12, 15]:
box_count = info_center.get_box_count() + 1
info_center.send_payload(frame=self.send_frame,
message="Box: " + "BOX_" + str(box_count) + " | Cans: " + str(
self.max_val),
can_count=self.max_val, box_id="BOX_" + str(box_count),
eventtype="Completely Filled",
bg_color="#044b04", font_color="#24dc24")
else:
box_count = info_center.get_box_count() + 1
info_center.send_payload(frame=self.send_frame,
message="Box: " + "BOX_" + str(box_count) + " | Cans: " + str(
self.max_val),
can_count=self.max_val, box_id="BOX_" + str(box_count),
eventtype="Partially Filled",
bg_color="#472020", font_color="#ed2020", alert_sound="sound_1")
self.max_val = 0
break
return frame
import cv2
import time
import base64
import os
import json
from uuid import uuid1
from datetime import datetime
from cachetools import cached, TTLCache
from edge_engine.common.logsetup import logger
from scripts.common.config import MONGO_DB_OBJ, APP_MONGO_COLLECTION
TTL = 5
class InfoCenter:
def __init__(self, device_id):
self.camera_key = os.environ.get('app', None)
self.device_id = device_id
self.attendance_event_collection = MONGO_DB_OBJ[APP_MONGO_COLLECTION.get('eventLogCollection')]
self.camera_configuration = MONGO_DB_OBJ[APP_MONGO_COLLECTION.get('cameraConfigurationCollection')]
self.camera_mapping_json = self.get_all_cameras()
self.ttl_check = NotificationFilter()
def get_all_cameras(self):
camera_mapping_json = self.camera_configuration.find({'decommissioned': False}, {"_id": 0})
return {each['cameraId']: each['cameraName'] for each in camera_mapping_json}
def send_payload(self,
frame, can_count, eventtype, box_id,
label='Can_Counting_Solution',
bg_color="#474520",
font_color="#FFFF00",
alert_sound=None,
message=""):
frame = cv2.resize(frame, (64, 64))
payload = {"deviceId": self.device_id,
"message": message,
"frame": 'data:image/jpeg;base64,' + base64.b64encode(
cv2.imencode('.jpg', frame)[1].tostring()).decode("utf-8"),
"activity": label,
"bg_color": bg_color,
"font_color": font_color,
"alert_sound": alert_sound,
"can_count": can_count,
"eventtype": eventtype, "box_id": box_id}
self.insert_attendance_event_to_mongo(payload)
def get_box_count(self):
try:
return self.attendance_event_collection.find(
{
'app': 'can'
}
).count()
except Exception as e:
logger.info(e)
logger.error(traceback.format_exc())
def insert_attendance_event_to_mongo(self, data):
try:
input_data = {
"eventId": str(uuid1()).split('-')[0],
"cameraId": data['deviceId'],
"cameraName": self.camera_mapping_json.get(data['deviceId'], "iLens Camera"),
"timestamp": datetime.now(),
"frame": data['frame'],
"eventtype": data['eventtype'],
"bg_color": data["bg_color"],
"font_color": data["font_color"],
"intrusion_message": data["message"],
"alert_sound": data["alert_sound"],
"can_count": data["can_count"],
"box_id": data["box_id"],
"app": 'can'
}
logger.debug("Pushing to Mongo..")
self.attendance_event_collection.insert(input_data)
except Exception as e:
logger.error(f"Exception occurred: {e}", exc_info=True)
class NotificationFilter(object):
def __init__(self, ttl_value=3):
"""init function
"""
global TTL
TTL = ttl_value
self.TTL = ttl_value
self.face_id_cache = {}
def _update_cache(self, name):
"""updates the cache with a name
Args:
name (str): Name of the person
"""
self.face_id_cache[name] = time.time()
def check_rpt(self, name):
"""Returns a boolean if the notification center has to be notified
or not
Args:
name (str): name of person identified
Returns:
notify (bool): True if notification to be sent False if the
notification is not to be sent
"""
self._clean_up_cache()
if name is None:
notify = False
else:
is_present = self.face_id_cache.get(name, -1)
if is_present == -1:
notify = True
self._update_cache(name)
else:
notify = False
return notify
@cached(cache=TTLCache(maxsize=1024, ttl=TTL))
def _clean_up_cache(self):
"""Cleans up the cached name at regular interval
"""
key_to_delete = [
key
for key, value in self.face_id_cache.items()
if time.time() - value >= self.TTL
]
for key in key_to_delete:
del self.face_id_cache[key]
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment