Commit e4bc86c3 authored by Ajil K's avatar Ajil K

updated the function calls

parent fa24dbcd
...@@ -9,12 +9,12 @@ logger: ...@@ -9,12 +9,12 @@ logger:
- type: StreamHandler - type: StreamHandler
primary_device: primary_device:
resync_time: "19:47" resync_time: "00:15"
heartbeat_interval: 120 heartbeat_interval: 120
files_to_watch: C:\\iLens\\ilens-agent\\engine\\acquisition_engine\\conf,C:\\iLens\\ilens-agent\\conf files_to_watch: C:\\iLens\\ilens-agent\\engine\\acquisition_engine\\conf,C:\\iLens\\ilens-agent\\conf
secondary_device: secondary_device:
host_ip: 192.168.1.15 host_ip: 192.168.2.0
port: 8088 port: 8088
data_check_frequency: 5 data_check_frequency: 5
...@@ -22,28 +22,28 @@ quality_time_frequency: 2 ...@@ -22,28 +22,28 @@ quality_time_frequency: 2
acquisition_restart_frequency: 2 acquisition_restart_frequency: 2
local_uploader: local_uploader:
type: udp type: mqtt
host_ip: 127.0.0.1 host_ip: 192.168.0.220
port: 20015 port: 31767
username: username: admin
password: password: FtAdm#MqTt71513
event_uploader: event_uploader:
type: udp type: http
host_ip: 192.168.1.20 host_ip: http://dev.unifytwin.com
port: 20004 # port: 20004
start_delay_time: 0 start_delay_time: 0
data_source: False data_source: True
config: config:
channel: channel:
uploader_host: 192.168.1.20 uploader_host: 192.168.3.145
uploader_port: 20003 uploader_port: 502
agent: agent:
base_url: udp://192.168.1.20:20002/ base_url: http://dev.unifytwin.com/dcp_api
monitoring_engine: monitoring_engine:
host_ip: # host_ip:
port: # port:
url: url: http://dev.unifytwin.com
\ No newline at end of file \ No newline at end of file
...@@ -29,7 +29,7 @@ class DeviceHandler: ...@@ -29,7 +29,7 @@ class DeviceHandler:
else: else:
command = f"date -s {timestamp}" command = f"date -s {timestamp}"
subprocess.run(command, shell=True) subprocess.run(command, shell=True)
post_events(events_constants.time_updated) post_events(events_constants.time_updated, self.logger)
self.logger.info("System time updated and events send successfully.") self.logger.info("System time updated and events send successfully.")
except Exception as e: except Exception as e:
self.logger.exception(f"Exception occurred while checking system time - {e}.") self.logger.exception(f"Exception occurred while checking system time - {e}.")
...@@ -49,12 +49,12 @@ class DeviceHandler: ...@@ -49,12 +49,12 @@ class DeviceHandler:
if time_difference > waiting_time and app_config.is_data_source: if time_difference > waiting_time and app_config.is_data_source:
if not service_operations.check_ilens_client_status(self.logger): if not service_operations.check_ilens_client_status(self.logger):
time.sleep(app_config.start_delay_time) time.sleep(app_config.start_delay_time)
operation_status = service_operations.restart_service(services.acquisition_engine) operation_status = service_operations.restart_service(services.acquisition_engine, self.logger)
if operation_status: if operation_status:
post_events(events_constants.secondary_acq_restarted) post_events(events_constants.secondary_acq_restarted, self.logger)
self.logger.info("Acquisition Engine module started due to no response from primary device.") self.logger.info("Acquisition Engine module started due to no response from primary device.")
else: else:
post_events(events_constants.secondary_acq_restart_failed) post_events(events_constants.secondary_acq_restart_failed, self.logger)
self.logger.error("Failed to start the Acquisition Engine module.") self.logger.error("Failed to start the Acquisition Engine module.")
time.sleep(app_config.run_time) time.sleep(app_config.run_time)
self.logger.trace(f"TimeVariable.api_last_hit_time in check_last_hit exit - {TimeVariable.api_last_hit_time}") self.logger.trace(f"TimeVariable.api_last_hit_time in check_last_hit exit - {TimeVariable.api_last_hit_time}")
...@@ -79,20 +79,20 @@ class DeviceHandler: ...@@ -79,20 +79,20 @@ class DeviceHandler:
if time_difference > waiting_time and app_config.is_data_source: if time_difference > waiting_time and app_config.is_data_source:
if not service_operations.check_ilens_client_status(self.logger): if not service_operations.check_ilens_client_status(self.logger):
time.sleep(app_config.start_delay_time) time.sleep(app_config.start_delay_time)
operation_status = service_operations.restart_service(services.acquisition_engine) operation_status = service_operations.restart_service(services.acquisition_engine, self.logger)
if operation_status: if operation_status:
post_events(events_constants.secondary_acq_restarted_bad_data) post_events(events_constants.secondary_acq_restarted_bad_data, self.logger)
self.logger.info("Acquisition Engine module started due to bad quality data in primary device.") self.logger.info("Acquisition Engine module started due to bad quality data in primary device.")
else: else:
post_events(events_constants.secondary_acq_restart_failed) post_events(events_constants.secondary_acq_restart_failed, self.logger)
self.logger.error("Failed to start the Acquisition Engine module.") self.logger.error("Failed to start the Acquisition Engine module.")
elif app_config.is_data_source and service_operations.check_ilens_client_status(self.logger) and data_check: elif app_config.is_data_source and service_operations.check_ilens_client_status(self.logger) and data_check:
operation_status = service_operations.stop_service(services.acquisition_engine) operation_status = service_operations.stop_service(services.acquisition_engine, self.logger)
if operation_status: if operation_status:
post_events(events_constants.secondary_acq_stopped) post_events(events_constants.secondary_acq_stopped, self.logger)
self.logger.info("Acquisition Engine module stopped on secondary device.") self.logger.info("Acquisition Engine module stopped on secondary device.")
else: else:
post_events(events_constants.secondary_acq_stopping_failed) post_events(events_constants.secondary_acq_stopping_failed, self.logger)
self.logger.error("Failed to stop the Acquisition Engine module on secondary device.") self.logger.error("Failed to stop the Acquisition Engine module on secondary device.")
self.logger.trace( self.logger.trace(
f"data const var after checking in check_last_quality_data_time - {service_operations.check_ilens_client_status(self.logger)}") f"data const var after checking in check_last_quality_data_time - {service_operations.check_ilens_client_status(self.logger)}")
......
...@@ -52,7 +52,7 @@ class FilesHandler: ...@@ -52,7 +52,7 @@ class FilesHandler:
file_data = common_utilities.read_configs(file_name) file_data = common_utilities.read_configs(file_name)
response_config_data[file_name] = file_data response_config_data[file_name] = file_data
post_data(response_config_data, self.logger, endpoint="sync_config") post_data(response_config_data, self.logger, endpoint="sync_config")
post_events(event_code=events_constants.daily_sync) post_events(event_code=events_constants.daily_sync, logger=self.logger)
except Exception as e: except Exception as e:
self.logger.exception(f"Exception occurred while running daily sync - {e}.") self.logger.exception(f"Exception occurred while running daily sync - {e}.")
self.logger.trace("Exited from daily_sync") self.logger.trace("Exited from daily_sync")
...@@ -109,8 +109,8 @@ class FilesHandler: ...@@ -109,8 +109,8 @@ class FilesHandler:
file_data = yaml.dump(file_content, default_flow_style=False) file_data = yaml.dump(file_content, default_flow_style=False)
common_utilities.update_file(files_path, file_data) common_utilities.update_file(files_path, file_data)
event_code = events_constants.pipeline_updated if check_pipeline else events_constants.configuration_updated event_code = events_constants.pipeline_updated if check_pipeline else events_constants.configuration_updated
post_events(event_code=event_code) post_events(event_code=event_code, logger=self.logger)
service_name = service_operations.service_name_mapper(files_path) service_name = service_operations.service_name_mapper(files_path, self.logger)
if service_name != services.acquisition_engine: if service_name != services.acquisition_engine:
service_restart_thread = threading.Thread(target=self.restart_service_thread, args=(service_name,)) service_restart_thread = threading.Thread(target=self.restart_service_thread, args=(service_name,))
service_restart_thread.start() service_restart_thread.start()
...@@ -120,13 +120,13 @@ class FilesHandler: ...@@ -120,13 +120,13 @@ class FilesHandler:
def restart_service_thread(self, service_name): def restart_service_thread(self, service_name):
try: try:
service_status = service_operations.restart_service(service_name) service_status = service_operations.restart_service(service_name, self.logger)
threading.Thread() threading.Thread()
if service_status: if service_status:
post_events(event_code=events_constants.secondary_module_restarted, module_name=service_name) post_events(event_code=events_constants.secondary_module_restarted, module_name=service_name, logger=self.logger)
self.logger.info("Service restarted successfully.") self.logger.info("Service restarted successfully.")
else: else:
post_events(event_code=events_constants.secondary_module_restart_failed) post_events(event_code=events_constants.secondary_module_restart_failed, logger=self.logger)
self.logger.error(f"Failed to restart the module - {service_name}.") self.logger.error(f"Failed to restart the module - {service_name}.")
except Exception as e: except Exception as e:
self.logger.exception(f"Exception occurred while restarting {service_name} - {e}.") self.logger.exception(f"Exception occurred while restarting {service_name} - {e}.")
...@@ -160,8 +160,8 @@ class FileChangeHandler(FileSystemEventHandler): ...@@ -160,8 +160,8 @@ class FileChangeHandler(FileSystemEventHandler):
def on_created(self, event): def on_created(self, event):
transfer_config(event, self.logger) transfer_config(event, self.logger)
# def on_deleted(self, event): def on_deleted(self, event):
# self.logger.critical(f"File {event.src_path} has been deleted.") self.logger.critical(f"File {event.src_path} has been deleted.")
def on_moved(self, event): def on_moved(self, event):
if any(event.dest_path.endswith(extension) for extension in constants.file_names): if any(event.dest_path.endswith(extension) for extension in constants.file_names):
...@@ -185,7 +185,7 @@ def transfer_config(file_path, logger_obj, check_destination=False): ...@@ -185,7 +185,7 @@ def transfer_config(file_path, logger_obj, check_destination=False):
"data": file_data "data": file_data
} }
post_data(response_json, logger_obj) post_data(response_json, logger_obj)
post_events(event_code) post_events(event_code, logger_obj)
logger_obj.info(f"File {file_path.src_path} has been modified.") logger_obj.info(f"File {file_path.src_path} has been modified.")
elif check_destination: elif check_destination:
file_data = common_utilities.read_configs(file_path.src_path) file_data = common_utilities.read_configs(file_path.src_path)
...@@ -194,7 +194,7 @@ def transfer_config(file_path, logger_obj, check_destination=False): ...@@ -194,7 +194,7 @@ def transfer_config(file_path, logger_obj, check_destination=False):
"data": file_data "data": file_data
} }
post_data(response_json, logger_obj) post_data(response_json, logger_obj)
post_events(event_code) post_events(event_code, logger_obj)
logger_obj.info(f"File {file_path.dest_path} has been modified.") logger_obj.info(f"File {file_path.dest_path} has been modified.")
else: else:
logger_obj.error(f"file_path - {file_path} is not proper path to file.") logger_obj.error(f"file_path - {file_path} is not proper path to file.")
...@@ -213,7 +213,7 @@ def redundancy_initializer(logger_obj): ...@@ -213,7 +213,7 @@ def redundancy_initializer(logger_obj):
if app_config.is_data_source: if app_config.is_data_source:
client_status = service_operations.check_ilens_client_status(logger_obj) client_status = service_operations.check_ilens_client_status(logger_obj)
if not client_status: if not client_status:
if service_operations.restart_service(services.acquisition_engine): if service_operations.restart_service(services.acquisition_engine, logger_obj):
logger_obj.info("Acquisition Engine started on primary redundancy module.") logger_obj.info("Acquisition Engine started on primary redundancy module.")
if app_config.local_uploader_type.lower() == "mqtt": if app_config.local_uploader_type.lower() == "mqtt":
logger_obj.info("Using local MQTT Subscriber.") logger_obj.info("Using local MQTT Subscriber.")
......
...@@ -74,7 +74,7 @@ class RedundancyHandler: ...@@ -74,7 +74,7 @@ class RedundancyHandler:
data_check = True data_check = True
if len(data_quality_set) == 1 and list(data_quality_set)[0] == 2: if len(data_quality_set) == 1 and list(data_quality_set)[0] == 2:
data_check = False data_check = False
post_events(events_constants.bad_data_quality) post_events(events_constants.bad_data_quality, self.logger)
except Exception as e: except Exception as e:
self.logger.error(f"data_check exception: {e}") self.logger.error(f"data_check exception: {e}")
data_check = False data_check = False
......
...@@ -9,7 +9,6 @@ from scripts.constants.app_config import app_config ...@@ -9,7 +9,6 @@ from scripts.constants.app_config import app_config
from scripts.constants.app_constants import constants, services from scripts.constants.app_constants import constants, services
from scripts.constants.app_variables import Counter, DataPool, DeviceInfo from scripts.constants.app_variables import Counter, DataPool, DeviceInfo
from scripts.constants.events import events_constants from scripts.constants.events import events_constants
from scripts.logging.logger import logger
from scripts.utilities.common_util import common_utilities from scripts.utilities.common_util import common_utilities
from scripts.utilities.service_util import service_operations from scripts.utilities.service_util import service_operations
...@@ -35,15 +34,15 @@ def post_data(json_data, logger, endpoint="receive_data"): ...@@ -35,15 +34,15 @@ def post_data(json_data, logger, endpoint="receive_data"):
"data": file_data "data": file_data
} }
post_data(response_json, logger) post_data(response_json, logger)
post_events(event_code=events_constants.pipeline_mismatch) post_events(event_code=events_constants.pipeline_mismatch, logger=logger)
if response_json.get("acquisition_status"): if response_json.get("acquisition_status"):
Counter.stop_counter += 1 Counter.stop_counter += 1
if (app_config.is_data_source and Counter.stop_counter >= app_config.acquisition_restart_frequency if (app_config.is_data_source and Counter.stop_counter >= app_config.acquisition_restart_frequency
and service_operations.check_ilens_client_status(logger)): and service_operations.check_ilens_client_status(logger)):
if service_operations.stop_service(services.acquisition_engine): if service_operations.stop_service(services.acquisition_engine, logger):
post_events(event_code=events_constants.primary_acq_stopped) post_events(event_code=events_constants.primary_acq_stopped, logger=logger)
else: else:
post_events(event_code=events_constants.primary_acq_stopping_failed) post_events(event_code=events_constants.primary_acq_stopping_failed, logger=logger)
else: else:
Counter.stop_counter = 0 Counter.stop_counter = 0
except Exception as e: except Exception as e:
...@@ -51,7 +50,7 @@ def post_data(json_data, logger, endpoint="receive_data"): ...@@ -51,7 +50,7 @@ def post_data(json_data, logger, endpoint="receive_data"):
logger.trace("Exiting from post_data") logger.trace("Exiting from post_data")
def post_events(event_code, module_name=None): def post_events(event_code, logger, module_name=None):
try: try:
logger.trace("Entered in post_events") logger.trace("Entered in post_events")
headers = {'Content-Type': 'application/json'} headers = {'Content-Type': 'application/json'}
...@@ -63,6 +62,7 @@ def post_events(event_code, module_name=None): ...@@ -63,6 +62,7 @@ def post_events(event_code, module_name=None):
if module_name: if module_name:
event_json["event_description"] = f"{event_json['event_description']}{module_name}." event_json["event_description"] = f"{event_json['event_description']}{module_name}."
if app_config.event_uploader_type == "udp": if app_config.event_uploader_type == "udp":
logger.trace(f"Sending events using UDP.")
sender_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sender_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
final_message = {"topic": constants.event_topic, "data": event_json} final_message = {"topic": constants.event_topic, "data": event_json}
bytes_to_send = base64.b64encode(json.dumps(final_message).encode()) bytes_to_send = base64.b64encode(json.dumps(final_message).encode())
...@@ -72,6 +72,7 @@ def post_events(event_code, module_name=None): ...@@ -72,6 +72,7 @@ def post_events(event_code, module_name=None):
) )
sender_socket.close() sender_socket.close()
elif app_config.event_uploader_type == "http": elif app_config.event_uploader_type == "http":
logger.trace(f"Sending events using http.")
event_endpoint = f"{app_config.event_uploader_host_ip}{constants.event_topic}" event_endpoint = f"{app_config.event_uploader_host_ip}{constants.event_topic}"
response = httpx.post( response = httpx.post(
event_endpoint, event_endpoint,
...@@ -84,3 +85,4 @@ def post_events(event_code, module_name=None): ...@@ -84,3 +85,4 @@ def post_events(event_code, module_name=None):
logger.info(response.content.decode()) logger.info(response.content.decode())
except Exception as e: except Exception as e:
logger.exception(f"# Error sending Events: {str(e)}") logger.exception(f"# Error sending Events: {str(e)}")
logger.trace(f"Exiting from post_events.")
...@@ -21,15 +21,15 @@ class MQTTSub(object): ...@@ -21,15 +21,15 @@ class MQTTSub(object):
self.client.connect(host=host, port=int(port), keepalive=60) self.client.connect(host=host, port=int(port), keepalive=60)
self.client.reconnect_delay_set(min_delay=1, max_delay=120) self.client.reconnect_delay_set(min_delay=1, max_delay=120)
except Exception as es: except Exception as es:
print(f"Exception while connecting to MQTT: {es}") logger.exception(f"Exception while connecting to MQTT: {es}")
self.client.loop_forever() self.client.loop_forever()
def on_connect(self, client, userdata, flags, rc): def on_connect(self, client, userdata, flags, rc):
if rc == 0: if rc == 0:
print("MQTT: Connected successfully!") logger.info("MQTT: Connected successfully!")
self.subscribe() self.subscribe()
else: else:
print(f"MQTT: Failed to connect, return code {rc}") logger.error(f"MQTT: Failed to connect, return code {rc}")
def on_message(self, client, userdata, msg): def on_message(self, client, userdata, msg):
logger.trace("Entering in mqtt - on_message") logger.trace("Entering in mqtt - on_message")
...@@ -43,7 +43,7 @@ class MQTTSub(object): ...@@ -43,7 +43,7 @@ class MQTTSub(object):
for key, value in data.items(): for key, value in data.items():
temp_data_pool.append(int(value.get('dq'))) temp_data_pool.append(int(value.get('dq')))
DataPool.data_pool = temp_data_pool DataPool.data_pool = temp_data_pool
logger.trace(f"data added - {DataPool.data_pool}") logger.trace(f"data added in data pool - {DataPool.data_pool}")
except Exception as es: except Exception as es:
logger.error(f'Exception while fetching data : {es}') logger.error(f'Exception while fetching data : {es}')
logger.trace("Exiting from mqtt - on_message") logger.trace("Exiting from mqtt - on_message")
......
...@@ -2,12 +2,11 @@ import subprocess ...@@ -2,12 +2,11 @@ import subprocess
import time import time
from scripts.constants.app_constants import constants, services from scripts.constants.app_constants import constants, services
from scripts.logging.logger import logger
class ServiceOperations: class ServiceOperations:
@staticmethod @staticmethod
def is_service_enabled(service_name): def is_service_enabled(service_name, logger):
try: try:
logger.trace("Entering in is_service_enabled") logger.trace("Entering in is_service_enabled")
output = subprocess.check_output(["sc", "query", service_name], universal_newlines=True) output = subprocess.check_output(["sc", "query", service_name], universal_newlines=True)
...@@ -27,11 +26,11 @@ class ServiceOperations: ...@@ -27,11 +26,11 @@ class ServiceOperations:
logger.trace("Exiting from is_service_enabled") logger.trace("Exiting from is_service_enabled")
return False return False
def restart_service(self, service_name): def restart_service(self, service_name, logger):
try: try:
logger.trace("Entering in restart_service") logger.trace("Entering in restart_service")
if constants.is_windows: if constants.is_windows:
if not self.is_service_enabled(service_name): if not self.is_service_enabled(service_name, logger):
subprocess.run(["sc", "config", service_name, "start=auto"], check=True) subprocess.run(["sc", "config", service_name, "start=auto"], check=True)
time.sleep(5) time.sleep(5)
current_status = subprocess.check_output(["sc", "query", service_name]).decode("utf-8") current_status = subprocess.check_output(["sc", "query", service_name]).decode("utf-8")
...@@ -58,12 +57,12 @@ class ServiceOperations: ...@@ -58,12 +57,12 @@ class ServiceOperations:
logger.trace("Exiting from restart_service") logger.trace("Exiting from restart_service")
return False return False
def stop_service(self, service_name): def stop_service(self, service_name, logger):
try: try:
logger.trace("Entering in stop_service") logger.trace("Entering in stop_service")
if constants.is_windows: if constants.is_windows:
subprocess.run(["sc", "stop", service_name], check=True) subprocess.run(["sc", "stop", service_name], check=True)
if self.is_service_enabled(service_name): if self.is_service_enabled(service_name, logger):
subprocess.run(["sc", "config", service_name, "start=disabled"], check=True) subprocess.run(["sc", "config", service_name, "start=disabled"], check=True)
time.sleep(5) time.sleep(5)
logger.trace("Exiting from stop_service in windows as true") logger.trace("Exiting from stop_service in windows as true")
...@@ -118,7 +117,7 @@ class ServiceOperations: ...@@ -118,7 +117,7 @@ class ServiceOperations:
logger.exception(f"Exception occurred while check_ilens_client_status - {e}") logger.exception(f"Exception occurred while check_ilens_client_status - {e}")
@staticmethod @staticmethod
def service_name_mapper(file_path): def service_name_mapper(file_path, logger):
try: try:
logger.trace(f"Entering in service_name_mapper with filepath - {file_path}.") logger.trace(f"Entering in service_name_mapper with filepath - {file_path}.")
service_modules = { service_modules = {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment