Commit fa24dbcd authored by Ajil K's avatar Ajil K

updated mqtt subscriber and few other changes

parent a2304e73
.idea
dist
build
*.spec
......@@ -25,6 +25,8 @@ local_uploader:
type: udp
host_ip: 127.0.0.1
port: 20015
username:
password:
event_uploader:
type: udp
......
......@@ -6,12 +6,12 @@ with open("conf/config.yml", 'r') as file:
class AppConfig:
data_check_frequency = int(config_data.get("data_check_frequency"))
quality_time_frequency = int(config_data.get("quality_time_frequency"))
acquisition_restart_frequency = int(config_data.get("acquisition_restart_frequency"))
data_check_frequency = int(config_data.get("data_check_frequency", 120))
quality_time_frequency = int(config_data.get("quality_time_frequency", 5))
acquisition_restart_frequency = int(config_data.get("acquisition_restart_frequency", 2))
event_uploader_type = config_data.get("event_uploader", {}).get("type")
event_uploader_host_ip = config_data.get("event_uploader", {}).get("host_ip")
event_uploader_type = config_data.get("event_uploader", {}).get("type", "")
event_uploader_host_ip = config_data.get("event_uploader", {}).get("host_ip", "")
event_uploader_port = config_data.get("event_uploader", {}).get("port", None)
start_delay_time = int(config_data.get("start_delay_time", 0))
......@@ -19,24 +19,26 @@ class AppConfig:
run_time = int(config_data.get("primary_device", {}).get("heartbeat_interval", ""))
files_to_watch = config_data.get("primary_device", {}).get("files_to_watch", "")
host_ip = config_data.get("secondary_device").get("host_ip")
port_no = int(config_data.get("secondary_device").get("port"))
host_ip = config_data.get("secondary_device", {}).get("host_ip", "")
port_no = int(config_data.get("secondary_device", {}).get("port", 8088))
is_data_source = config_data.get("data_source", False)
local_uploader_type = config_data.get("local_uploader").get("type")
local_uploader_ip = config_data.get("local_uploader").get("host_ip")
local_uploader_port = int(config_data.get("local_uploader").get("port"))
local_uploader_type = config_data.get("local_uploader", {}).get("type", "")
local_uploader_ip = config_data.get("local_uploader", {}).get("host_ip", "")
local_uploader_port = int(config_data.get("local_uploader", {}).get("port", 20015))
local_uploader_username = config_data.get("local_uploader", {}).get("username", "")
local_uploader_password = config_data.get("local_uploader", {}).get("password", "")
channel_uploader_host = config_data.get("config").get("channel").get("uploader_host")
channel_uploader_port = config_data.get("config").get("channel").get("uploader_port")
channel_uploader_host = config_data.get("config", {}).get("channel", {}).get("uploader_host", "")
channel_uploader_port = config_data.get("config", {}).get("channel", {}).get("uploader_port", 9876)
agent_base_url = config_data.get("config").get("agent").get("base_url")
agent_project_id = config_data.get("config").get("agent").get("project_id")
agent_device_id = config_data.get("config").get("agent").get("device_id")
agent_base_url = config_data.get("config", {}).get("agent", {}).get("base_url", "")
agent_project_id = config_data.get("config", {}).get("agent", {}).get("project_id", "")
agent_device_id = config_data.get("config", {}).get("agent", {}).get("device_id", "")
monitoring_engine_host = config_data.get("config").get("monitoring_engine").get("host_ip")
monitoring_engine_port = config_data.get("config").get("monitoring_engine").get("port")
monitoring_engine_url = config_data.get("config").get("monitoring_engine").get("url")
monitoring_engine_host = config_data.get("config", {}).get("monitoring_engine", {}).get("host_ip", "")
monitoring_engine_port = config_data.get("config", {}).get("monitoring_engine", {}).get("port", 9876)
monitoring_engine_url = config_data.get("config", {}).get("monitoring_engine", {}).get("url", "")
class LoggerConfigurations:
......
......@@ -112,7 +112,16 @@ class FilesHandler:
post_events(event_code=event_code)
service_name = service_operations.service_name_mapper(files_path)
if service_name != services.acquisition_engine:
service_restart_thread = threading.Thread(target=self.restart_service_thread, args=(service_name,))
service_restart_thread.start()
except Exception as e:
self.logger.exception(f"Exception occurred while syncing configuration file - {e}.")
self.logger.trace("Exited from sync_config")
def restart_service_thread(self, service_name):
try:
service_status = service_operations.restart_service(service_name)
threading.Thread()
if service_status:
post_events(event_code=events_constants.secondary_module_restarted, module_name=service_name)
self.logger.info("Service restarted successfully.")
......@@ -120,8 +129,7 @@ class FilesHandler:
post_events(event_code=events_constants.secondary_module_restart_failed)
self.logger.error(f"Failed to restart the module - {service_name}.")
except Exception as e:
self.logger.exception(f"Exception occurred while syncing configuration file - {e}.")
self.logger.trace("Exited from sync_config")
self.logger.exception(f"Exception occurred while restarting {service_name} - {e}.")
def is_file_path_need(self, files_path):
self.logger.trace("Entered in is_file_path_need")
......@@ -145,6 +153,7 @@ class FilesHandler:
class FileChangeHandler(FileSystemEventHandler):
def __init__(self, logger_obj):
self.logger = logger_obj
def on_modified(self, event):
transfer_config(event, self.logger)
......@@ -205,10 +214,16 @@ def redundancy_initializer(logger_obj):
client_status = service_operations.check_ilens_client_status(logger_obj)
if not client_status:
if service_operations.restart_service(services.acquisition_engine):
logger_obj.info("Acquisition Engine started on running primary redundancy module.")
logger_obj.info("Acquisition Engine started on primary redundancy module.")
if app_config.local_uploader_type.lower() == "mqtt":
logger_obj.info("Using local MQTT Subscriber.")
MQTTSub(app_config.local_uploader_ip, app_config.local_uploader_port, app_config.local_mqtt_topic)
local_mqtt_thread = threading.Thread(
target=MQTTSub,
args=(app_config.local_uploader_ip, app_config.local_uploader_port,
constants.local_mqtt_topic, app_config.local_uploader_username,
app_config.local_uploader_password)
)
local_mqtt_thread.start()
elif app_config.local_uploader_type.lower() == "udp":
logger_obj.info("Using local UDP Subscriber.")
local_udp_thread = threading.Thread(
......
......@@ -17,10 +17,10 @@ class RedundancyHandler:
def fetch_pipeline_details(self, pipeline_data):
self.logger.trace("Entered in fetch_pipeline_details")
pipeline_version = "upgrade"
try:
pipeline_version = pipeline_data.get("pipeline_version")
except Exception as e:
self.logger.exception(f"No channel pipeline file found - {e}.")
if pipeline_data:
pipeline_version = pipeline_data.get("pipeline_version", "NA")
else:
self.logger.exception(f"No channel pipeline file found.")
self.logger.trace(f"Exiting from fetch_pipeline_details with pipeline_version - {pipeline_version}")
return pipeline_version
......
......@@ -17,6 +17,7 @@ class CommonUtilities:
file_data = None
logger.trace(f"Entered in read_configs with file_path: {file_path}")
try:
if os.path.exists(file_path):
with open(file_path, 'r') as file:
if file_path.endswith("channel.yml"):
file_data = json.load(file)
......@@ -24,6 +25,8 @@ class CommonUtilities:
file_data = yaml.safe_load(file)
elif file_path.endswith(".conf"):
file_data = file.read()
else:
logger.error(f"No such file - {file_path}.")
except Exception as e:
logger.exception(f"Error while reading configuration - {e}.")
logger.trace("Exiting from read_configs")
......
......@@ -8,22 +8,28 @@ from scripts.logging.logger import logger
class MQTTSub(object):
def __init__(self, host, port, topic):
def __init__(self, host, port, topic, username, password):
DataPool.data_pool = []
self.topic = topic
self.client = mqtt.Client()
logger.debug(f"Listener MQTT Details {host}:{port}")
try:
self.client.connect(host=host, port=int(port))
self.client.subscribe(topic)
self.client.loop_start()
except Exception as es:
logger.error(f'exception while connection to mqtt : {es}')
if username and password:
self.client.username_pw_set(username, password)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
try:
self.client.connect(host=host, port=int(port), keepalive=60)
self.client.reconnect_delay_set(min_delay=1, max_delay=120)
except Exception as es:
print(f"Exception while connecting to MQTT: {es}")
self.client.loop_forever()
def on_connect(self, client, userdata, flags, rc):
logger.info("MQTT : Connected with result code " + str(rc))
if rc == 0:
print("MQTT: Connected successfully!")
self.subscribe()
else:
print(f"MQTT: Failed to connect, return code {rc}")
def on_message(self, client, userdata, msg):
logger.trace("Entering in mqtt - on_message")
......@@ -37,6 +43,14 @@ class MQTTSub(object):
for key, value in data.items():
temp_data_pool.append(int(value.get('dq')))
DataPool.data_pool = temp_data_pool
logger.trace(f"data added - {DataPool.data_pool}")
except Exception as es:
logger.error(f'Exception while fetching data : {es}')
logger.trace("Exiting from mqtt - on_message")
def subscribe(self):
try:
logger.info(f"Subscribing to topic: {self.topic}")
self.client.subscribe(self.topic)
except Exception as es:
logger.exception(f"Error subscribing to topic {self.topic}: {es}")
......@@ -123,6 +123,7 @@ class ServiceOperations:
logger.trace(f"Entering in service_name_mapper with filepath - {file_path}.")
service_modules = {
".conf": services.ilens_agent,
"acquisition_engine": services.acquisition_engine,
"monitoring_engine": services.monitoring_engine,
"ilens_heartbeat": services.ilens_heartbeat,
"ilens_data_receiver": services.ilens_data_receiver,
......
......@@ -16,11 +16,11 @@ class UDPSub:
temp_pool = []
data_received, _ = receiver_socket.recvfrom(buffer_size)
decoded_message = json.loads(base64.b64decode(data_received).decode())
logger.info(f"decoded_message:{decoded_message}")
logger.trace(f"decoded_message:{decoded_message}")
data = decoded_message.get("data").get("data")
for key, value in data.items():
temp_pool.append(int(value.get('dq')))
logger.info(f"temp_pool : {temp_pool}")
logger.trace(f"temp_pool : {temp_pool}")
DataPool.data_pool = temp_pool
logger.info(f"data added - {DataPool.data_pool}")
logger.trace("Exiting from UDPSub")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment