Commit 6f6d3f08 authored by Jithu Tagore's avatar Jithu Tagore

changed the cv2 text

parent 3d6b0537
File added
FROM python:3.7
RUN apt-get update
RUN apt-get install tzdata vim -y
RUN apt-get update && apt-get install tzdata ffmpeg libsm6 libxext6 -y
RUN pip3 install --upgrade pip
# RUN pip3 install matplotlib>=3.2.2 tensorboard>=2.4.1 numpy>=1.18.5 opencv-python>=4.1.2 Pillow>=7.1.2 PyYAML>=5.3.1 requests>=2.23.0 scipy>=1.4.1 torch>=1.7.0 torchvision>=0.8.1 tqdm>=4.41.0 pandas seaborn expiringdict minio cachetools
# RUN pip3 install pymongo Cython paho-mqtt==1.5.0 scikit-learn==0.22.2
RUN pip3 install absl-py==1.3.0 asttokens==2.2.1 backcall==0.2.0 cachetools==5.2.0 certifi==2022.12.7 charset-normalizer==2.1.1 colorama==0.4.6 contourpy==1.0.6
RUN pip3 install cycler==0.11.0 Cython==0.29.32 decorator==5.1.1 dnspython==2.2.1 executing==1.2.0 expiringdict==1.2.2 fonttools==4.38.0
RUN pip3 install google-auth==2.15.0 google-auth-oauthlib==0.4.6
RUN pip3 install grpcio==1.51.1 idna==3.4 importlib-metadata==5.1.0 ipython jedi==0.18.2 kiwisolver==1.4.4 Markdown==3.4.1 MarkupSafe==2.1.1 matplotlib matplotlib-inline==0.1.6 minio==7.1.12 numpy>=1.18.5
RUN pip3 install oauthlib==3.2.2 opencv-python==4.6.0.66 packaging==22.0 paho-mqtt==1.6.1 pandas parso==0.8.3 pickleshare==0.7.5 Pillow==9.3.0 prompt-toolkit==3.0.36 protobuf==3.20.3 pure-eval==0.2.2 pyasn1==0.4.8 pyasn1-modules==0.2.8 Pygments==2.13.0 pymongo==4.3.3 pyparsing==3.0.9 python-dateutil==2.8.2 pytz==2022.6 PyYAML==6.0 requests==2.28.1 requests-oauthlib==1.3.1 rsa==4.9 scipy torch==1.9.0 torchvision==0.10.0 tqdm==4.64.1 traitlets==5.7.0 typing_extensions==4.4.0 urllib3==1.26.13 wcwidth==0.2.5 Werkzeug==2.2.2 zipp==3.11.0
RUN pip3 install seaborn
ADD . /app
ADD Arial.ttf /root/.config/Ultralytics/
WORKDIR /app
CMD ["python3","app.py"]
import os
os.environ["config"]="{\"TZ\": \"Asia/Kolkata\", \"MONGO_URI\": \"mongodb://admin:iLens$1234@192.168.3.181:2717/admin\", \"MONGO_DATABASE\": \"ilens_ai\", \"MONGO_COLLECTION\": \"janusDeployment\", \"MONGO_KEY\": \"deploymentId\", \"MONGO_VALUE\": \"aarti_ppe\", \"MONGO_COLL\": \"serviceConfiguration\", \"MONGO_DB\": \"ilens_ai\"}"
# os.environ["config"]="{\"TZ\": \"Asia/Kolkata\", \"MONGO_URI\": \"mongodb://svc-ilens:svc2345@192.168.3.220:21017\", \"MONGO_DATABASE\": \"ilens_wps\", \"MONGO_COLLECTION\": \"janusDeployment\", \"MONGO_KEY\": \"deploymentId\", \"MONGO_VALUE\": \"rahul_12345\", \"MONGO_COLL\": \"serviceConfiguration\", \"MONGO_DB\": \"ilens_wps\"}"
# os.environ["config"]="{\"TZ\": \"Asia/Kolkata\", \"MONGO_URI\": \"mongodb://admin:iLens$HPCLv605@10.5.2.91:2717\", \"MONGO_DATABASE\": \"ilens_ai\", \"MONGO_COLLECTION\": \"janusDeployment\", \"MONGO_KEY\": \"deploymentId\", \"MONGO_VALUE\": \"hpcl_cctv_ppe\", \"MONGO_COLL\": \"serviceConfiguration\", \"MONGO_DB\": \"ilens_ai\"}"
# os.environ["config"]="{\"TZ\": \"Asia/Kolkata\", \"MONGO_URI\": \"mongodb://admin:iLens$1234@192.168.3.181:2717/admin\", \"MONGO_DATABASE\": \"ilens_ai\", \"MONGO_COLLECTION\": \"janusDeployment\", \"MONGO_KEY\": \"deploymentId\", \"MONGO_VALUE\": \"aarti_ppe\", \"MONGO_COLL\": \"serviceConfiguration\", \"MONGO_DB\": \"ilens_ai\"}"
from edge_engine.edge_processor import ExecutePipeline
from edge_engine.edge_processor import Pubs
from scripts import Ppe
from edge_engine.common.config import EDGE_CONFIG
if __name__ == '__main__':
pubs = Pubs()
mod = Ppe(config=EDGE_CONFIG,
model_config=EDGE_CONFIG["modelConfig"],
pubs=pubs,
device_id=EDGE_CONFIG['deviceId'])
ex = ExecutePipeline(mod)
ex.run_model()
source /opt/intel/openvino/bin/setupvars.sh
python3 app.py
\ No newline at end of file
File added
File added
from edge_engine.ai.model.modelwraper import ModelWrapper
from abc import ABC, abstractmethod
class ModelWrapper(ABC):
def __init__(self, path=None):
"""Implement code to load mask_model here"""
pass
def _pre_process(self, x):
"""Implement code to process raw input into format required for mask_model inference here"""
return x
def _post_process(self, x):
"""Implement any code to post-process mask_model inference response here"""
return x
@abstractmethod
def _predict(self, x):
"""Implement core mask_model inference code here"""
pass
def predict(self, x):
pre_x = self._pre_process(x)
prediction = self._predict(pre_x)
result = self._post_process(prediction)
return result
# import the necessary packages
import cv2
import numpy as np
class GammaPreprocessor:
def __init__(self, gamma=1.0):
# creating Gamma table
self.invGamma = 1.0 / gamma
self.table = np.array([((i / 255.0) ** self.invGamma) * 255
for i in np.arange(0, 256)]).astype("uint8")
def preprocess(self, image):
return cv2.LUT(image, self.table)
# import the necessary packages
from keras.preprocessing.image import img_to_array
class ImageToArrayPreprocessor:
def __init__(self, dataFormat=None):
# store the image data format
self.dataFormat = dataFormat
def preprocess(self, image):
# apply the Keras utility function that correctly rearranges
# the dimensions of the image
return img_to_array(image, data_format=self.dataFormat)
# import the necessary packages
import cv2
class SimpleHistogramPreprocessor:
def __init__(self):
pass
def preprocess(self, image):
# Run Histogram simple Equalization
return cv2.equalizeHist(image)
# import the necessary packages
import cv2
class SimplePreprocessor:
def __init__(self, width, height, inter=cv2.INTER_AREA):
# store the target image width, height, and interpolation
# method used when resizing
self.width = width
self.height = height
self.inter = inter
def preprocess(self, image):
# resize the image to a fixed size, ignoring the aspect
# ratio
return cv2.resize(image, (self.width, self.height),
interpolation=self.inter)
import os
import sys
from edge_engine.common.constants import LicenseModule
from dateutil import parser
from datetime import datetime
from pymongo import MongoClient
from copy import deepcopy
import json
def licence_validator(payload):
try:
dt = parser.parse(payload['valid_till'])
now = datetime.now()
if (now > dt):
sys.stdout.write("Licence Expired \n".format())
sys.stdout.flush()
return False
return True
except KeyError as e:
sys.stderr.write("Error loading licence")
return False
def get_config_from_mongo(mongo_uri, dbname, basecollection,
key, value):
mongo = MongoClient(mongo_uri)
db = mongo[dbname]
config = db[basecollection].find_one({key: value}, {"_id": False})
return config
def load_conf(config,mongo_uri, dbname):
mongo = MongoClient(mongo_uri)
db = mongo[dbname]
pub_configs = []
for conf in config['pubConfigs']:
if conf["type"].lower() in ["mqtt","mongo",]:
key= conf["key"]
value=conf["value"]
collection = conf["conectionCollection"]
pub_conf = db[collection].find_one({key: value}, {"_id": False})
pub_conf.update(conf)
pub_configs.append(pub_conf)
else :
pub_configs.append(conf)
config['pubConfigs'] = pub_configs
return config
# """
# {
# "MONGO_URI": "mongodb://192.168.3.220:21017",
# "MONGO_DATABASE": "ilens_thermal_app",
# "MONGO_COLLECTION": "janus_deployment_details",
# "MONGO_KEY": "deploymentId",
# "MONGO_VALUE": "ddd"
# }
# """
LOG_LEVEL = os.environ.get("LOG_LEVEL", default="INFO").upper()
LOG_HANDLER_NAME = os.environ.get("LOG_HANDLER_NAME", default="ilens-edge_engine")
BASE_LOG_PATH = os.environ.get('BASE_LOG_PATH',
default=os.path.join(os.getcwd(), "logs".format()))
if not os.path.isdir(BASE_LOG_PATH):
os.mkdir(BASE_LOG_PATH)
CONFIG_ENV = json.loads(os.environ.get('config', default=None))
sys.stdout.write("config->{} \n".format(json.dumps(CONFIG_ENV)))
MONGO_URI = CONFIG_ENV.get('MONGO_URI', None)
MONGO_DATABASE = CONFIG_ENV.get('MONGO_DATABASE', None)
MONGO_COLLECTION = CONFIG_ENV.get('MONGO_COLLECTION', None)
MONGO_KEY = CONFIG_ENV.get('MONGO_KEY', None)
MONGO_VALUE = CONFIG_ENV.get('MONGO_VALUE',None)
if MONGO_URI == None \
or MONGO_DATABASE is None \
or MONGO_COLLECTION is None \
or MONGO_KEY is None \
or MONGO_VALUE is None:
sys.stderr.write("invalid mongo config \n")
sys.exit(1)
EDGE_CONFIG = get_config_from_mongo(
mongo_uri=MONGO_URI,
dbname=MONGO_DATABASE, basecollection=MONGO_COLLECTION,
key=MONGO_KEY, value=MONGO_VALUE
)
DEVICE_ID = EDGE_CONFIG["deviceId"]
if EDGE_CONFIG is None:
sys.stderr.write("invalid EDGE_CONFIG config \n")
sys.exit(1)
EDGE_CONFIG=load_conf(EDGE_CONFIG, mongo_uri=MONGO_URI,
dbname=MONGO_DATABASE)
DATA_PATH = EDGE_CONFIG["inputConf"].get('dataPath',os.path.join(os.getcwd(), "data".format()))
sys.stderr.write("Loading data from {} \n".format(DATA_PATH))
\ No newline at end of file
This diff is collapsed.
class LicenseModule:
private_key = "3139343831323738414d47454e3936363538373136"
encoding_algorithm = "HS256"
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from edge_engine.common.config import LOG_LEVEL, LOG_HANDLER_NAME, BASE_LOG_PATH
import logging
from logging.handlers import RotatingFileHandler
from logging import WARNING,INFO,DEBUG,ERROR
import os
DEFAULT_FORMAT = '%(asctime)s %(levelname)5s %(name)s %(message)s'
DEBUG_FORMAT = '%(asctime)s %(levelname)5s %(name)s [%(threadName)5s:%(filename)5s:%(funcName)5s():%(lineno)s] %(message)s'
EXTRA = {}
FORMATTER = DEFAULT_FORMAT
if LOG_LEVEL.strip() == "DEBUG":
FORMATTER = DEBUG_FORMAT
def get_logger(log_handler_name, extra=EXTRA):
"""
Purpose : To create logger .
:param log_handler_name: Name of the log handler.
:param extra: extra args for the logger
:return: logger object.
"""
log_path = os.path.join(BASE_LOG_PATH, log_handler_name + ".log")
logstash_temp = os.path.join(BASE_LOG_PATH, log_handler_name + ".db")
logger = logging.getLogger(log_handler_name)
logger.setLevel(LOG_LEVEL.strip().upper())
log_handler = logging.StreamHandler()
log_handler.setLevel(LOG_LEVEL)
formatter = logging.Formatter(FORMATTER)
log_handler.setFormatter(formatter)
handler = RotatingFileHandler(log_path, maxBytes=10485760,
backupCount=5)
handler.setFormatter(formatter)
logger.addHandler(log_handler)
logger.addHandler(handler)
logger = logging.LoggerAdapter(logger, extra)
return logger
logger = get_logger(LOG_HANDLER_NAME)
import os, time
from minio import Minio
from edge_engine.common.logsetup import logger
class MinioClient:
def __init__(self ,SECRET_KEY, ACCESS_KEY, BUCKET_NAME, LOCAL_DATA_PATH, MINIO_IP):
logger.info("Initalizing minioclient !!")
self.SECRET_KEY = SECRET_KEY
self.ACCESS_KEY = ACCESS_KEY
self.BUCKET_NAME = BUCKET_NAME
self.LOCAL_DATA_PATH = LOCAL_DATA_PATH
self.MINIO_IP = MINIO_IP
self.logfile = "./logs/videowrite.log"
self.minioClient = self.connect_to_minio()
self.create_bucket(self.BUCKET_NAME)
def connect_to_minio(self):
if self.SECRET_KEY is not None and self.ACCESS_KEY is not None:
logger.info("Connecting to Minio Service... !!! ")
minio_client = Minio(self.MINIO_IP, access_key = self.ACCESS_KEY, secret_key = self.SECRET_KEY,
region='us-east-1', secure=False)
return minio_client
else:
logger.info('Access Key and Secret Key String cannot be null')
raise Exception('Access Key and Secret Key String cannot be null')
def create_bucket(self, bucket_name):
try:
if bucket_name not in self.list_buckets():
logger.info("Creating bucket {}...".format(bucket_name))
self.minioClient.make_bucket(bucket_name, location="us-east-1")
else:
logger.info("Bucket already exists....")
except Exception as err:
logger.error(err)
def save_to_bucket(self, bucket_name, data_obj):
try:
with open(data_obj, 'rb') as file:
file_stat = os.stat(data_obj)
self.minioClient.put_object(bucket_name, data_obj.split(self.LOCAL_DATA_PATH)[1],
file, file_stat.st_size)
except Exception as err:
logger.error(err)
def list_buckets(self):
bucketobjects = self.minioClient.list_buckets()
bucketlist = []
for eachbucket in bucketobjects:
bucketlist.append(eachbucket.name)
return bucketlist
def read_write_logs(self):
try:
f = open(self.logfile)
except Exception as err:
print(err)
with open(self.logfile, "a") as startfile:
startfile.write("")
f = open(self.logfile)
return [line.split('\n')[0] for line in f]
def write_write_logs(self, log_str):
with open(self.logfile, "a") as my_file:
my_file.write(log_str + "\n")
def upload(self):
if self.LOCAL_DATA_PATH[-1]!='/':
self.LOCAL_DATA_PATH = self.LOCAL_DATA_PATH+"/"
while True:
listoffiles = [os.path.join(path, name) for path, subdirs, files in os.walk(self.LOCAL_DATA_PATH) for name in files]
listofwrittenfiles = self.read_write_logs()
listofnewfiles = list(set(listoffiles) - set(listofwrittenfiles))
for fileName in listofnewfiles:
try:
logger.info("Uploading {}..".format(fileName.split(self.LOCAL_DATA_PATH)[1]))
self.save_to_bucket(self.BUCKET_NAME, fileName)
self.write_write_logs(fileName)
except Exception as e:
logger.error(e)
time.sleep(5)
# if __name__=='__main__':
# SECRET_KEY = 'minioadmin'
# ACCESS_KEY = 'minioadmin'
# BUCKET_NAME = 'videobucket'
# MINIO_IP = '192.168.3.220:29000'
# LOCAL_DATA_PATH = "F:/GDrive Data/Downloads"
# obj = MinioClient(SECRET_KEY, ACCESS_KEY, BUCKET_NAME, LOCAL_DATA_PATH, MINIO_IP)
# obj.upload()
from edge_engine.common.logsetup import logger
from edge_engine.common.config import EDGE_CONFIG
from edge_engine.streamio.datastream import MQTT
from edge_engine.streamio.datastream import VideoOutputStream
from edge_engine.streamio.datastream import FrameOutputStream
from edge_engine.streamio.datastream import FFMPEGOutputStream
from edge_engine.streamio.datastream import MongoDataStreamOut
from edge_engine.streamio.videostream import ThreadedVideoStream
from edge_engine.streamio.videostream import FileVideoStream
from edge_engine.streamio.frameProcessor import FrameProcessor
from edge_engine.common.minio_server import MinioClient
import json
from threading import Thread
class Pubs():
def __init__(self):
self.mqtt_pub = None
self.frame_write = None
self.video_write = None
self.mongo_write = None
self.rtp_write = None
self.build_pubs()
if 'minioConfig' in EDGE_CONFIG.keys() and \
isinstance(EDGE_CONFIG["minioConfig"],dict):
self.minio_thread = self.start_minio(EDGE_CONFIG["minioConfig"])
@staticmethod
def start_minio(minio_conf):
obj = MinioClient(minio_conf['secretKey'], minio_conf['accessKey'],
minio_conf['bucketName'], minio_conf['localDataPath'],
minio_conf['ip'])
t = Thread(target=obj.upload)
t.start()
return t
def build_pubs(self):
logger.info("building publishers ")
for conf in EDGE_CONFIG["pubConfigs"]:
if conf["type"].upper() == "MQTT":
self.mqtt_pub = MQTT(broker=conf["broker"], topic=conf["topic"], port=conf["port"]
, publish_hook=json.dumps)
elif conf["type"].upper() == "FRAMEWRITE":
self.frame_write = FrameOutputStream(
basepath=conf["basepath"],
iformat=conf["iformat"],
filenameFormat=conf["filenameFormat"],
publish_hook=None)
elif conf["type"].upper() == "VIDEOWRITE":
self.video_write = VideoOutputStream(basepath=conf["basepath"],
dims=conf["dims"],
filenameFormat=conf["filenameFormat"],
fps=conf["fps"], publish_hook=None)
elif conf["type"].upper() == "MONGO":
self.mongo_write = MongoDataStreamOut(host=conf["host"],
port=conf["port"],
dbname=conf["dbname"],
collection=conf["collection"],
keys=conf["keys"],
authsource=conf["authSource"],
username=conf["username"],
password=conf["password"],
publish_hook=None)
elif conf["type"].upper() == "RTP":
self.rtp_write = FFMPEGOutputStream(conf["ffmpegCmd"], conf["RTPEndpoint"]
, publish_hook=None)
else:
logger.error("Unsupported publisher {}".format(conf["type"]))
class ExecutePipeline:
def __init__(self, model):
self.model = model
def run_model(self):
if EDGE_CONFIG["inputConf"]["sourceType"].lower() in ["rtsp", "usbcam"]:
logger.info("Selected input stream as Direct cv input")
self.threadedVideoStream = ThreadedVideoStream(stream_config=EDGE_CONFIG["inputConf"])
self.threadedVideoStream.start()
self.frameProcessor = FrameProcessor(stream=self.threadedVideoStream,
model=self.model)
elif EDGE_CONFIG["inputConf"]["sourceType"].lower() == "videofile":
self.fileVideoStream = FileVideoStream(stream_config=EDGE_CONFIG["inputConf"])
self.fileVideoStream.start()
self.frameProcessor = FrameProcessor(stream=self.fileVideoStream, model=self.model)
else:
raise ValueError("unsupported source {}".format(EDGE_CONFIG["inputConf"]["sourceType"]))
self.start_model()
def start_model(self):
self.frameProcessor.run_model()
from edge_engine.streamio.frameProcessor import FrameProcessor
from .datastreamprocessor import DataStreamProcessor
from edge_engine.streamio.datastream.datastreamwrapper import DataStreamWrapper
from edge_engine.streamio.datastream.mongodatastreamout import MongoDataStreamOut
from edge_engine.streamio.datastream.frameoutputstream import FrameOutputStream
from edge_engine.streamio.datastream.videooutputstream import VideoOutputStream
from edge_engine.streamio.datastream.mqttstream import MQTT
from edge_engine.streamio.datastream.ffmpegdata_streamout import FFMPEGOutputStream
\ No newline at end of file
from abc import ABC, abstractmethod
class DataStreamWrapper(ABC):
def __init__(self):
"""Implement code to load mask_model here"""
pass
def publish(self, x):
"""Implement code to publish"""
return x
def subscribe(self,hook):
"""Implement code to subscribe"""
return None
\ No newline at end of file
from edge_engine.streamio.datastream.datastreamwrapper import DataStreamWrapper
import subprocess as sp
class FFMPEGOutputStream(DataStreamWrapper):
def __init__(self, ffmpeg_cmd, rtp_endpoint, publish_hook=None):
super().__init__()
self.ffmpeg_cmd = ffmpeg_cmd
self.rtp_endpoint = rtp_endpoint
self.ffmpeg_cmd.append(self.rtp_endpoint[0])
self.proc = sp.Popen(self.ffmpeg_cmd, stdin=sp.PIPE, shell=False)
self.publish_hook = publish_hook
def publish(self, x):
if self.publish_hook is not None:
x = self.publish_hook(x)
frame = x["frame"]
self.proc.stdin.write(frame.tostring())
self.proc.stdin.flush()
from edge_engine.streamio.datastream.datastreamwrapper import DataStreamWrapper
import cv2
import base64
import numpy as np
import os
from edge_engine.common.logsetup import logger
from datetime import datetime
class FrameOutputStream(DataStreamWrapper):
def __init__(self, basepath, iformat="jpg", filenameFormat="{deviceId}_{frameId}_{timestamp}", publish_hook=None):
super().__init__()
self.basepath = basepath
self.iformat = iformat
self.filenameFormat = filenameFormat
self.publish_hook = publish_hook
def publish(self, x):
if self.publish_hook is not None:
x= self.publish_hook(x)
frame = x["frame"]
frame = base64.b64decode(frame.split("data:image/jpeg;base64,")[1])
frame = np.fromstring(frame, np.uint8)
frame = cv2.imdecode(frame, cv2.IMREAD_COLOR)
path = os.path.join(self.basepath, datetime.now().date().isoformat())
if not os.path.isdir(path):
logger.info("Creating {} \n".format(path))
os.mkdir(path)
cv2.imwrite("{path}.{iformat}".format(path=os.path.join(path, self.filenameFormat.format(**x)),
iformat=self.iformat), frame)
return True
def subscribe(self, hook):
super().subscribe(hook)
from edge_engine.streamio.datastream.datastreamwrapper import DataStreamWrapper
from pymongo import MongoClient
class MongoDataStreamOut(DataStreamWrapper):
def __init__(self, host, port, dbname, collection, keys, authsource,username=None,password=None, publish_hook=None):
super().__init__()
self.host = host
self.port = port
self.dbname = dbname
self.username = username
self.password = password
self.collection = collection
self.publish_hook = publish_hook
self.mongo = MongoClient(host=host,
port=int(port),username=self.username,password=self.password)
self.db = self.mongo[dbname]
self.keys = keys
self.authsource = authsource
def subscribe(self, hook=None):
pass
def publish(self, data):
if self.publish_hook is not None:
data = self.publish_hook(data)
fin_dat = {}
for k, v in data.items():
if k in self.keys:
fin_dat[k] = v
self.db[self.collection].insert(fin_dat)
import paho.mqtt.client as paho
from edge_engine.streamio.datastream.datastreamwrapper import DataStreamWrapper
from edge_engine.common.logsetup import logger
from uuid import uuid4
import traceback
class MQTT(DataStreamWrapper):
@staticmethod
def on_connect(client, userdata, flags, rc):
logger.info("Connection returned with result code:" + str(rc))
@staticmethod
def on_disconnect(client, userdata, rc):
logger.info("Disconnection returned result:" + str(rc))
@staticmethod
def on_subscribe(client, userdata, mid, granted_qos):
logger.debug("Subscribing MQTT {} {} {} {}".format(client, userdata, mid, granted_qos))
def on_message(self, client, userdata, msg):
logger.debug("Received message, topic:" + msg.topic + "payload:" + str(msg.payload))
if self.subscribe_hook is not None:
self.subscribe_hook(msg.payload.decode())
def __init__(self, broker, port, topic, qos=2, subscribe_hook=None, publish_hook=None):
super().__init__()
self.broker = broker
self.port = int(port)
self.topic = topic
self.client_name = "{}".format(uuid4())
self.client = paho.Client(self.client_name)
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_subscribe = self.on_subscribe
self.client.on_message = self.on_message
self.client.connect(host=self.broker, port=self.port)
self.subscribe_hook = subscribe_hook
self.publish_hook = publish_hook
self.qos = qos
def subscribe(self, hook=None):
if hook is not None:
self.subscribe_hook =hook
self.client.subscribe((self.topic, self.qos))
self.client.loop_forever()
def publish(self, data):
try:
if self.publish_hook is not None:
data = self.publish_hook(data)
self.client.publish(self.topic, data)
except Exception as e:
logger.error(e)
logger.error(traceback.format_exc())
from edge_engine.streamio.datastream.datastreamwrapper import DataStreamWrapper
import cv2
import base64
import numpy as np
import os
from edge_engine.common.logsetup import logger
from datetime import datetime
class VideoOutputStream(DataStreamWrapper):
def __init__(self, basepath, dims, filenameFormat="{deviceId}_{timestamp}", fps=30, publish_hook=None):
super().__init__()
self.basepath = basepath
self.dims = (int(dims[0]),int(dims[1]))
self.fps = float(fps)
self.filenameFormat = filenameFormat
self.publish_hook = publish_hook
self.four_cc = cv2.VideoWriter_fourcc(*'mp4v')
self.out = None
def publish(self, x):
if self.publish_hook is not None:
x = self.publish_hook(x)
if len(x["metric"]) > 0:
if self.out is None:
path = os.path.join(self.basepath, datetime.now().date().isoformat())
if not os.path.isdir(path):
logger.info("Creating {} \n".format(path))
os.mkdir(path)
self.out = cv2.VideoWriter("{}.mp4".format(os.path.join(path, self.filenameFormat.format(**x))),
self.four_cc, self.fps, self.dims)
frame = x["frame"]
frame = base64.b64decode(frame.split("data:image/jpeg;base64,")[1])
frame = np.fromstring(frame, np.uint8)
frame = cv2.imdecode(frame, cv2.IMREAD_COLOR)
self.out.write(frame)
else:
if self.out is not None:
self.out.release()
self.out = None
return True
def subscribe(self, hook):
super().subscribe(hook)
from edge_engine.common.logsetup import logger
class DataStreamProcessor:
def __init__(self, model, subsciber, publishers=list()):
self.model = model
self.subsciber = subsciber
self.publishers = publishers
logger.info("Setting up frame processor !!")
def processstream(self, msg):
print(msg)
def run_model(self):
self.subsciber.subscribe(hook=self.processstream)
from edge_engine.common.logsetup import logger
from edge_engine.common.config import DEVICE_ID
from uuid import uuid4
import traceback
class FrameProcessor:
def __init__(self, stream, model):
self.model = model
self.stream = stream
logger.info("Setting up frame processor !!")
def run_model(self):
while self.stream.stream.isOpened():
try:
logger.debug("Getting frame mask_model")
frame = self.stream.read()
logger.debug("Running mask_model")
data = {"frame": frame, "frameId": "{}".format(uuid4()), "deviceId": "{}".format(DEVICE_ID)}
self.model.predict(data)
logger.debug("publishing mask_model output")
except Exception as e:
logger.error(e)
logger.error(traceback.format_exc())
\ No newline at end of file
from edge_engine.streamio.videostream.fps import FPS
from edge_engine.streamio.videostream.nvgstreamer import NVGstreamer
from edge_engine.streamio.videostream.simplevideostream import SimpleVideoStream
from edge_engine.streamio.videostream.threadedvideostream import ThreadedVideoStream
from edge_engine.streamio.videostream.filevideostream import FileVideoStream
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment