Commit 51568714 authored by Ajil K's avatar Ajil K

init

parents
primary_device:
resync_time: "00:15"
heartbeat_interval: 120
files_to_watch: E:\\iLens\\ilens-agent\\engine\\acquisition_engine\\conf,E:\\iLens\\ilens-agent\\conf
secondary_device:
host_ip: 192.168.1.36
port: 8088
data_check_frequency: 5
quality_time_frequency: 2
acquisition_restart_frequency: 2
local_uploader:
type: udp
host_ip: 127.0.0.1
port: 20015
event_uploader:
type: http
host_ip: 192.168.0.220
port:
start_delay_time: 0
data_source: True
config:
channel:
uploader_host: 2.2.2.29
uploader_port:
agent:
base_url: http://192.168.0.220/dcp_api
monitoring_engine:
host_ip:
port:
url:
from scripts.handlers.files_handler import redundancy_initializer
from scripts.logging.logger import logger
if __name__ == "__main__":
try:
print("----------------------------------------Starting Service----------------------------------------")
logger.info("----------------------------------------Starting Service----------------------------------------")
redundancy_initializer()
except KeyboardInterrupt:
print("-----------------------------------------Service Stopped-----------------------------------------")
logger.info("---------------------------------------Service Stopped---------------------------------------")
uvicorn==0.22.0
fastapi==0.103.2
schedule==1.2.1
PyYAML==5.4.1
requests==2.0.0
watchdog==3.0.0
paho-mqtt==1.5.0
PyJWT==2.4.0
ping3==4.0.4
psutil==5.9.6
\ No newline at end of file
class APIEndpoints:
receive_data = "/receive_data"
config_sync = "/sync_config"
fetch_details = "/fetch_device_details"
api_endpoints = APIEndpoints()
from scripts.utilities.common_util import common_utilities
config_data = common_utilities.read_configs("conf/config.yml")
class AppConfig:
data_check_frequency = int(config_data.get("data_check_frequency"))
quality_time_frequency = int(config_data.get("quality_time_frequency"))
acquisition_restart_frequency = int(config_data.get("acquisition_restart_frequency"))
event_uploader_type = config_data.get("event_uploader").get("type")
event_uploader_host_ip = config_data.get("event_uploader").get("host_ip")
event_uploader_port = config_data.get("event_uploader").get("port", None)
start_delay_time = int(config_data.get("start_delay_time"))
resync_time = config_data.get("primary_device").get("resync_time")
run_time = int(config_data.get("primary_device").get("heartbeat_interval"))
files_to_watch = common_utilities.list_the_files(config_data.get("primary_device").get("files_to_watch"))
host_ip = config_data.get("secondary_device").get("host_ip")
port_no = int(config_data.get("secondary_device").get("port"))
is_data_source = config_data.get("data_source", False)
local_uploader_type = config_data.get("local_uploader").get("type")
local_uploader_ip = config_data.get("local_uploader").get("host_ip")
local_uploader_port = int(config_data.get("local_uploader").get("port"))
channel_uploader_host = config_data.get("config").get("channel").get("uploader_host")
channel_uploader_port = config_data.get("config").get("channel").get("uploader_port")
agent_base_url = config_data.get("config").get("agent").get("base_url")
agent_project_id = config_data.get("config").get("agent").get("project_id")
agent_device_id = config_data.get("config").get("agent").get("device_id")
monitoring_engine_host = config_data.get("config").get("monitoring_engine").get("host_ip")
monitoring_engine_port = config_data.get("config").get("monitoring_engine").get("port")
monitoring_engine_url = config_data.get("config").get("monitoring_engine").get("url")
app_config = AppConfig()
import os
import platform
class Constants:
ilens_path = None
file_path_changes = False
is_windows = False
if platform.system().lower() == "windows":
from scripts.utilities.common_util import WindowsUtilities
windows_utils = WindowsUtilities()
file_path_changes = True
is_windows = True
ilens_path = windows_utils.find_ilens_folder()
elif platform.system().lower() == "linux":
ilens_path = "/iLens"
conf_path = "conf"
pipeline_file = "channel.yml"
file_names = ["channel.yml", "config.yml", "application.conf"]
local_mqtt_topic = "ilens/monitor/live/#"
event_topic = "/csv_reader/api/gateway/create_event"
ilens_agent_path = os.path.join(ilens_path, "ilens-agent")
engine_path = os.path.join(ilens_agent_path, "engine")
ilens_client_path = os.path.join(engine_path, "acquisition_engine")
license_file_path = os.path.join(ilens_agent_path, "license", "license.lic")
channel_pipeline_path = os.path.join(ilens_client_path, conf_path, pipeline_file)
secret_key = "RGF0YUFjcXVpc2l0aW9uU3lzdGVtcyBURUFN"
class Services:
ilens_agent = "ilens-agent"
acquisition_engine = "ilens_client"
monitoring_engine = "ilens-clientmonitor"
ilens_heartbeat = "ilens_heartbeat"
ilens_data_receiver = "ilens_data_receiver"
ilens_data_transmitter = "ilens_data_transmitter"
ilens_events_transmitter = "ilens_events_transmitter"
ilens_file_transmitter = "ilens_file_transmitter"
services = Services()
constants = Constants()
from datetime import datetime
class Counter:
stop_counter = 0
class DataPool:
data_pool = []
class TimeVariable:
api_last_hit_time = datetime.now()
last_quality_data = datetime.now()
class DeviceInfo:
device_id = ""
project_id = ""
class EventConstants:
payload_template = {
"timestamp": "",
"project_id": "",
"logged_by": "iLens Redundancy",
"event_code": "",
"event_category": "",
"event_type": "",
"event_description": "",
"source_description": "iLens Redundancy Module",
"hierarchy": "",
"level": "",
"ilens_device_id": "",
"event_details": {}
}
daily_sync = "IR-PI001"
pipeline_modification = "IR-PI002"
configuration_modification = "IR-PI003"
pipeline_mismatch = "IR-PI004"
primary_acq_stopped = "IR-PI005"
configuration_sync = "IR-SI001"
pipeline_updated = "IR-SI002"
configuration_updated = "IR-SI003"
time_updated = "IR-SI004"
secondary_acq_restarted = "IR-SI005"
secondary_acq_restarted_bad_data = "IR-SI006"
secondary_acq_stopped = "IR-SI007"
secondary_module_restarted = "IR-SI008"
bad_data_quality = "IR-PE001"
primary_acq_stopping_failed = "IR-PE002"
secondary_acq_restart_failed = "IR-SE001"
secondary_acq_stopping_failed = "IR-SE002"
secondary_module_restart_failed = "IR-SE003"
@staticmethod
def get_event_description(event_code):
event_description = {
"IR-PI001": {
"event_code": "IR-PI001",
"event_category": "Daily Sync",
"event_type": "Daily Configuration sync",
"event_description": "All the configurations files are transferred to the secondary device.",
"level": "Information",
},
"IR-PI002": {
"event_code": "IR-PI002",
"event_category": "Channel Pipeline",
"event_type": "Pipeline updated",
"event_description": "Updated changes are transferred to the secondary device.",
"level": "Information",
},
"IR-PI003": {
"event_code": "IR-PI003",
"event_category": "Configuration Update",
"event_type": "Configuration Updated",
"event_description": "Updated changes are transferred to the secondary device.",
"level": "Information",
},
"IR-PI004": {
"event_code": "IR-PI004",
"event_category": "Acquisition Pipeline",
"event_type": "Pipeline Version Missmatch",
"event_description": "Sending pipeline to secondary device.",
"level": "Information",
},
"IR-PI005": {
"event_code": "IR-PI005",
"event_category": "Acquisition Engine",
"event_type": "Stopped Acquisition Engine",
"event_description": "Acquisition engine is stopped on primary device.",
"level": "Information",
},
"IR-SI001": {
"event_code": "IR-SI001",
"event_category": "Daily Sync",
"event_type": "Daily Configuration sync",
"event_description": "All the configurations files are updated in the secondary device.",
"level": "Information",
},
"IR-SI002": {
"event_code": "IR-SI002",
"event_category": "Channel Pipeline",
"event_type": "Pipeline updated",
"event_description": "Pipeline changes are updated in the secondary device.",
"level": "Information",
},
"IR-SI003": {
"event_code": "IR-SI003",
"event_category": "Configuration Update",
"event_type": "Configuration Updated",
"event_description": "Configuration changes are updated in the secondary device.",
"level": "Information",
},
"IR-SI004": {
"event_code": "IR-SI004",
"event_category": "Date-Time Update",
"event_type": "System time updated",
"event_description": "System Date-time updated in the secondary device.",
"level": "Information",
},
"IR-SI005": {
"event_code": "IR-SI005",
"event_category": "Acquisition Engine",
"event_type": "Module restarted",
"event_description": "Acquisition engine module restarted due to no response from primary device.",
"level": "Information",
},
"IR-SI006": {
"event_code": "IR-SI006",
"event_category": "Acquisition Engine",
"event_type": "Module restarted",
"event_description": "Acquisition engine module restarted due to bad quality data in primary device.",
"level": "Information",
},
"IR-SI007": {
"event_code": "IR-SI007",
"event_category": "Acquisition Engine",
"event_type": "Module stopped",
"event_description": "Acquisition engine module stopped due to primary device came online with proper data",
"level": "Information",
},
"IR-SI008": {
"event_code": "IR-SI008",
"event_category": "Module Restarted",
"event_type": "Module restarted on secondary",
"event_description": "Successfully restarted the module - ",
"level": "Information",
},
"IR-SE001": {
"event_code": "IR-SE001",
"event_category": "Acquisition Engine Error",
"event_type": "Failed to restart the module.",
"event_description": "Failed to restart acquisition engine on the secondary device.",
"level": "Error",
},
"IR-SE002": {
"event_code": "IR-SE002",
"event_category": "Acquisition Engine Error",
"event_type": "Failed to stop the module.",
"event_description": "Failed to stop acquisition engine on the secondary device.",
"level": "Error",
},
"IR-SE003": {
"event_code": "IR-SE003",
"event_category": "Module Restarting Error",
"event_type": "Failed to restart the module.",
"event_description": "Failed to restart the module - ",
"level": "Error",
},
"IR-PE001": {
"event_code": "IR-PE001",
"event_category": "Data Error",
"event_type": "Bad Quality Data",
"event_description": "Bad quality data coming in acquisition engine.",
"level": "Error",
},
"IR-PE002": {
"event_code": "IR-PE002",
"event_category": "Acquisition Engine Error",
"event_type": "Failed to stop the module.",
"event_description": "Failed to stop acquisition engine on the primary device.",
"level": "Error",
}
}
return event_description[event_code]
events_constants = EventConstants()
import subprocess
import time
from datetime import datetime, timedelta
from scripts.constants.app_config import app_config
from scripts.constants.app_constants import constants, services
from scripts.constants.app_variables import TimeVariable
from scripts.constants.events import events_constants
from scripts.logging.logger import logger
from scripts.utilities.communication_util import post_events
from scripts.utilities.service_util import service_operations
class DeviceHandler:
@staticmethod
def check_system_time(timestamp_str):
try:
logger.info("Entered in check_system_time")
current_time = datetime.now()
timestamp = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S")
time_difference = current_time - timestamp
five_minutes = timedelta(minutes=5)
if time_difference > five_minutes:
if constants.is_windows:
formatted_date = timestamp.strftime("%d-%m-%Y")
formatted_time = timestamp.strftime("%H:%M:%S")
command = f"date {formatted_date} && time {formatted_time}"
else:
command = f"date -s {timestamp}"
subprocess.run(command, shell=True)
post_events(events_constants.time_updated)
logger.info("System time updated and events send successfully.")
except Exception as e:
logger.exception(f"Exception occurred while checking system time - {e}.")
logger.info("Exiting from check_system_time")
@staticmethod
def check_last_hit():
try:
while True:
logger.info("Entered check_last_hit")
logger.info(
f"data const var before checking in check_last_hit entry - {service_operations.check_ilens_client_status()}")
logger.info(f"TimeVariable.api_last_hit_time in check_last_hit - {TimeVariable.api_last_hit_time}")
current_time = datetime.now()
time_difference = current_time - TimeVariable.api_last_hit_time
logger.info(f"time_difference: {time_difference}")
waiting_time = timedelta(seconds=(app_config.data_check_frequency * app_config.run_time))
if time_difference > waiting_time and app_config.is_data_source:
if not service_operations.check_ilens_client_status():
time.sleep(app_config.start_delay_time)
operation_status = service_operations.restart_service(services.acquisition_engine)
if operation_status:
post_events(events_constants.secondary_acq_restarted)
logger.info("Acquisition Engine module started due to no response from primary device.")
else:
post_events(events_constants.secondary_acq_restart_failed)
logger.error("Failed to start the Acquisition Engine module.")
time.sleep(app_config.run_time)
logger.info(f"TimeVariable.api_last_hit_time in check_last_hit exit - {TimeVariable.api_last_hit_time}")
logger.info(
f"data const var after checking in check_last_hit - {service_operations.check_ilens_client_status()}")
except Exception as e:
logger.exception(f"Exception occurred while checking last hit - {e}.")
logger.info("Exiting from check_last_hit")
@staticmethod
def check_last_quality_data_time(data_check):
try:
logger.info("Entered check_last_quality_data_time")
logger.info(
f"data const var before checking in check_last_quality_data_time - {service_operations.check_ilens_client_status()}")
logger.info(
f"TimeVariable.last_quality_data check_last_quality_data_time entry: {TimeVariable.last_quality_data}")
if data_check:
TimeVariable.last_quality_data = datetime.now()
current_time = datetime.now()
time_difference = current_time - TimeVariable.last_quality_data
waiting_time = timedelta(seconds=(app_config.quality_time_frequency * app_config.run_time))
if time_difference > waiting_time and app_config.is_data_source:
if not service_operations.check_ilens_client_status():
time.sleep(app_config.start_delay_time)
operation_status = service_operations.restart_service(services.acquisition_engine)
if operation_status:
post_events(events_constants.secondary_acq_restarted_bad_data)
logger.info("Acquisition Engine module started due to bad quality data in primary device.")
else:
post_events(events_constants.secondary_acq_restart_failed)
logger.error("Failed to start the Acquisition Engine module.")
elif app_config.is_data_source and service_operations.check_ilens_client_status() and data_check:
operation_status = service_operations.stop_service(services.acquisition_engine)
if operation_status:
post_events(events_constants.secondary_acq_stopped)
logger.info("Acquisition Engine module stopped on secondary device.")
else:
post_events(events_constants.secondary_acq_stopping_failed)
logger.error("Failed to stop the Acquisition Engine module on secondary device.")
logger.info(
f"data const var after checking in check_last_quality_data_time - {service_operations.check_ilens_client_status()}")
except Exception as e:
logger.exception(f"Exception occurred while checking last quality data time - {e}.")
logger.info(
f"TimeVariable.last_quality_data check_last_quality_data_time exit: {TimeVariable.last_quality_data}")
logger.info("Exiting from check_last_quality_data_time")
device_handler = DeviceHandler()
import configparser
import json
import os
import threading
import time
import schedule
import yaml
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
from scripts.constants.app_config import app_config
from scripts.constants.app_constants import constants, services
from scripts.constants.app_variables import DeviceInfo
from scripts.constants.events import events_constants
from scripts.handlers.redundancy_handler import RedundancyHandler
from scripts.logging.logger import logger
from scripts.utilities.common_util import common_utilities
from scripts.utilities.communication_util import post_events, post_data
from scripts.utilities.mqtt_subscription import MQTTSub
from scripts.utilities.service_util import service_operations
from scripts.utilities.udp_subscription import UDPSub
conf_file_path = os.path.join(constants.ilens_agent_path, constants.conf_path, "application.conf")
try:
DeviceInfo.device_id, DeviceInfo.project_id = common_utilities.fetch_device_details_from_lic(
constants.license_file_path, constants.secret_key
)
logger.info(f"device_id: \'{DeviceInfo.device_id}\' and project_id: \'{DeviceInfo.project_id}\'")
if not DeviceInfo.device_id or not DeviceInfo.project_id:
DeviceInfo.device_id, DeviceInfo.project_id = common_utilities.fetch_data_from_conf(conf_file_path)
logger.info("Reading \'device_id\' and \'project_id\' from application conf"
" file due to error in loading data from license file.")
except Exception as e:
DeviceInfo.device_id, DeviceInfo.project_id = common_utilities.fetch_data_from_conf(conf_file_path)
logger.info(f"device_id: \'{DeviceInfo.device_id}\' and project_id: \'{DeviceInfo.project_id}\' - {e}")
class FilesHandler:
@staticmethod
def daily_sync():
response_config_data = {}
try:
logger.info("Entered in daily_sync")
files_list = app_config.files_to_watch
for each_file in files_list:
for filename in constants.file_names:
file_name = os.path.join(each_file, filename)
if os.path.isfile(file_name):
file_data = common_utilities.read_configs(file_name)
response_config_data[file_name] = file_data
post_data(response_config_data, endpoint="sync_config")
post_events(event_code=events_constants.daily_sync)
logger.info("Exited from daily_sync")
except Exception as e:
logger.exception(f"Exception occurred while running daily sync - {e}.")
@staticmethod
def sync_config(files_path, file_content):
try:
logger.info("Entered in sync_config")
logger.info(f"filepath - {files_path} and file_content - {file_content}")
check_pipeline = False
if files_path.endswith(".conf"):
config = configparser.ConfigParser()
config.read_string(file_content)
config['AGENT']['agent_id'] = DeviceInfo.device_id
config['AGENT']['registration_project_id'] = DeviceInfo.project_id
config['MANAGER']['base_url'] = app_config.agent_base_url
updated_config_string = ''
for section in config.sections():
updated_config_string += f'[{section}]\n'
for key, value in config.items(section):
updated_config_string += f'{key} = {value}\n'
updated_config_string += '\n'
file_data = updated_config_string
elif files_path.endswith("channel.yml"):
uploader_list = [
node for node in file_content.get("flow_data", {}).get("nodes", {}).keys()
if not node.startswith("device_instance_") if not node.startswith("collector_")
]
for uploader in uploader_list:
received_node_host = file_content["flow_data"]["nodes"][uploader]["node_configuration"]["host"]
if received_node_host not in ["localhost", "127.0.0.1"]:
print(
f"received host: {file_content['flow_data']['nodes'][uploader]['node_configuration']['host']}")
file_content['flow_data']['nodes'][
uploader]['node_configuration']['host'] = app_config.channel_uploader_host
file_content['flow_data']['nodes'][
uploader]['node_configuration']['port'] = app_config.channel_uploader_port
file_data = json.dumps(file_content)
check_pipeline = True
else:
if "monitoring_engine" in files_path:
if (file_content["uploader"]["type"]).lower() == "http":
if app_config.monitoring_engine_url:
file_content["uploader"]["url"] = app_config.monitoring_engine_url
else:
logger.critical("Error while updating monitoring engine's configuration file,"
" check the field - url in the redundancy configuration.")
elif (file_content["uploader"]["type"]).lower() == "udp":
if app_config.monitoring_engine_host and app_config.monitoring_engine_port:
file_content["uploader"]["host"] = app_config.monitoring_engine_host
file_content["uploader"]["port"] = app_config.monitoring_engine_port
else:
logger.critical("Error while updating monitoring engine's configuration file,"
" check the fields - host_ip and port in the redundancy configuration.")
file_data = yaml.dump(file_content, default_flow_style=False)
common_utilities.update_file(files_path, file_data)
event_code = events_constants.pipeline_updated if check_pipeline else events_constants.configuration_updated
post_events(event_code=event_code)
service_name = service_operations.service_name_mapper(files_path)
if service_name != services.acquisition_engine:
service_status = service_operations.restart_service(service_name)
if service_status:
post_events(event_code=events_constants.secondary_module_restarted, module_name=service_name)
print("Service restarted successfully.")
logger.info("Service restarted successfully.")
else:
post_events(event_code=events_constants.secondary_module_restart_failed)
print(f"Failed to restart the module - {service_name}.")
logger.info(f"Failed to restart the module - {service_name}.")
logger.info("Exited from sync_config")
except Exception as e:
logger.exception(f"Exception occurred while syncing configuration file - {e}.")
@staticmethod
def is_file_path_need(files_path):
try:
logger.info("Entered in is_file_path_need")
logger.info(f"filepath - {files_path}")
if constants.file_path_changes:
from scripts.utilities.common_util import WindowsUtilities
modified_path = WindowsUtilities().modify_file_path(files_path)
logger.info("Exiting from is_file_path_need")
return modified_path
else:
logger.info("Exiting from is_file_path_need")
return files_path
except Exception as e:
logger.exception(f"Exception occurred - {e}.")
return None
class FileChangeHandler(FileSystemEventHandler):
def on_modified(self, event):
transfer_config(event)
def on_created(self, event):
transfer_config(event)
def on_deleted(self, event):
logger.critical(f"File {event.src_path} has been deleted.")
def on_moved(self, event):
if any(event.dest_path.endswith(extension) for extension in constants.file_names):
transfer_config(event, check_destination=True)
def transfer_config(event, check_destination=False):
logger.info("Entered in transfer_config")
logger.info(f"{event} - {check_destination}")
if not event.is_directory:
event_code = (
events_constants.pipeline_modification
if event.src_path.endswith(constants.pipeline_file)
else events_constants.configuration_modification
)
if any(event.src_path.endswith(extension) for extension in constants.file_names):
file_data = common_utilities.read_configs(event.src_path)
response_json = {
"file_name": event.src_path,
"data": file_data
}
post_data(response_json)
post_events(event_code)
logger.info(f"File {event.src_path} has been modified.")
elif check_destination:
file_data = common_utilities.read_configs(event.src_path)
response_json = {
"file_name": event.dest_path,
"data": file_data
}
post_data(response_json)
post_events(event_code)
logger.info(f"File {event.dest_path} has been modified.")
logger.info("Exiting from transfer_config")
def redundancy_initializer():
try:
logger.info("Entered in redundancy_initializer")
file_handler = FilesHandler()
event_handler = FileChangeHandler()
redundancy_handler = RedundancyHandler()
if app_config.is_data_source:
client_status = service_operations.check_ilens_client_status()
if not client_status:
if service_operations.restart_service(services.acquisition_engine):
logger.info("Acquisition Engine started on running primary redundancy module.")
if app_config.local_uploader_type.lower() == "mqtt":
logger.info("Using local MQTT Subscriber.")
MQTTSub(app_config.local_uploader_ip, app_config.local_uploader_port, app_config.local_mqtt_topic)
elif app_config.local_uploader_type.lower() == "udp":
logger.info("Using local UDP Subscriber.")
local_udp_thread = threading.Thread(
target=UDPSub,
args=(app_config.local_uploader_ip, app_config.local_uploader_port, 1024000)
)
local_udp_thread.start()
schedule.every().day.at(app_config.resync_time).do(file_handler.daily_sync)
observer = Observer()
for files_path in app_config.files_to_watch:
observer.schedule(event_handler, files_path, recursive=True)
observer.start()
try:
while True:
schedule.run_pending()
post_data(redundancy_handler.fetch_device_details(), endpoint="fetch_device_details")
time.sleep(app_config.run_time)
except KeyboardInterrupt:
observer.stop()
observer.join()
except Exception as e:
logger.exception(f"Exception occurred while monitoring files - {e}")
from datetime import datetime
from scripts.constants.app_constants import constants
from scripts.constants.app_variables import DataPool
from scripts.constants.events import events_constants
from scripts.constants.app_config import app_config
from scripts.logging.logger import logger
from scripts.utilities.common_util import common_utilities
from scripts.utilities.communication_util import post_events
from scripts.utilities.service_util import service_operations
class RedundancyHandler:
@staticmethod
def fetch_pipeline_details(pipeline_data):
logger.info("Entered in fetch_pipeline_details")
pipeline_version = "upgrade"
try:
pipeline_version = pipeline_data.get("pipeline_version")
except Exception as e:
logger.exception(f"No channel pipeline file found - {e}.")
logger.info(f"pipeline_version - {pipeline_version}")
logger.info("Exiting from fetch_pipeline_details")
return pipeline_version
@staticmethod
def read_current_time():
date_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
return date_time
@staticmethod
def check_system_architecture(pipeline_data):
connection_status = "NA"
try:
logger.info("Entered in check_system_architecture")
flow_data_node_list = [
node for node in pipeline_data.get("flow_data", {}).get("nodes", {}).keys()
if not node.startswith("device_instance_") if not node.startswith("collector_")
]
system_dependency = "No dependency"
for node in flow_data_node_list:
if node.startswith("udp") and pipeline_data.get("flow_data").get("nodes").get(node).get(
"node_configuration").get("host") not in ["127.0.0.1", "localhost"]:
logger.info("Have diode dependency")
system_dependency = "Diode dependency"
node_configuration_details = pipeline_data.get("flow_data").get(
"nodes").get(flow_data_node_list[0]).get("node_configuration")
uploader_name = node_configuration_details.get("name")
host_ip = node_configuration_details.get("host")
host_port = node_configuration_details.get("port")
host_status = common_utilities.ping_host(host_ip)
connection_status = {
"system_dependency": system_dependency,
"uploader": uploader_name,
"host_ip": host_ip,
"host_port": host_port,
"host_status": "Active" if host_status else "Inactive"
}
except Exception as e:
logger.exception(f"Exception occurred while checking system architecture - {e}.")
logger.info("Exiting from check_system_architecture")
return connection_status
def fetch_device_details(self):
response = None
try:
logger.info("Entered in fetch_device_details")
try:
data_check = False
if DataPool.data_pool:
logger.info(f"data pool: \'{DataPool.data_pool}\'")
data_quality_set = set(DataPool.data_pool)
data_check = True
if len(data_quality_set) == 1 and list(data_quality_set)[0] == 2:
data_check = False
post_events(events_constants.bad_data_quality)
except Exception as e:
logger.error(f"data_check exception: {e}")
data_check = False
pipeline_data = common_utilities.read_configs(constants.channel_pipeline_path)
response = {
"pipeline_version": self.fetch_pipeline_details(pipeline_data) if app_config.is_data_source else "NA",
"date_time": self.read_current_time(),
"connection_status": (self.check_system_architecture(pipeline_data)
if app_config.is_data_source else "NA"),
"data_check": data_check if app_config.is_data_source else True
}
logger.info(f"Payload to secondary - {response}")
except Exception as e:
logger.exception(f"Exception occurred while fetching primary device details - {e}.")
logger.info("Exiting from fetch_device_details")
return response
def response_action(self, data_quality):
try:
logger.info("Entered in response_action")
result = {
"date_time": self.read_current_time(),
"pipeline_version": "NA",
"acquisition_status": False
}
if app_config.is_data_source:
pipeline_data = common_utilities.read_configs(constants.channel_pipeline_path)
result["pipeline_version"] = self.fetch_pipeline_details(pipeline_data)
result["acquisition_status"] = (service_operations.check_ilens_client_status() and not data_quality)
return result
except Exception as e:
logger.exception(f"Exception occurred while getting response action details - {e}.")
logger.info("Exiting from response_action")
return None
import logging
import os
from logging.handlers import RotatingFileHandler
def get_logger():
"""
Creates a rotating log
"""
__logger__ = logging.getLogger('')
# setting the logger level
__logger__.setLevel("INFO")
# creating the format for the log
log_formatter = "%(asctime)s-%(levelname)-s-[%(funcName)5s():%(lineno)s]-%(message)s"
time_format = "%Y-%m-%d %H:%M:%S"
# getting the path for the logger
file_path = "logs"
# setting the format
formatter = logging.Formatter(log_formatter, time_format)
# creating the folder if not exist
if not os.path.exists(file_path):
os.makedirs(file_path)
# joining the path
log_file = os.path.join(f"{file_path}/redundancy.log")
# creating rotating file handler with max byte as 1
temp_handler = RotatingFileHandler(log_file,
maxBytes=1000000000,
backupCount=5)
# setting the formatter
temp_handler.setFormatter(formatter)
# setting the handler
__logger__.addHandler(temp_handler)
return __logger__
logger = get_logger()
import json
import os
import socket
from configparser import ConfigParser
import jwt
import ping3
import psutil
import yaml
from scripts.logging.logger import logger
class CommonUtilities:
@staticmethod
def read_configs(file_path):
file_data = None
try:
logger.info("Entered in read_configs")
logger.info(f"file_path: {file_path}")
with open(file_path, 'r') as file:
if file_path.endswith("channel.yml"):
file_data = json.load(file)
elif file_path.endswith(".yml") or file_path.endswith(".yaml"):
file_data = yaml.safe_load(file)
elif file_path.endswith(".conf"):
file_data = file.read()
except Exception as e:
logger.exception(f"Error while reading configuration - {e}.")
logger.info("Exiting from read_configs")
return file_data
@staticmethod
def ping_host(host):
status = False
try:
logger.info("Entered in the ping_host ")
logger.info(f"host - {host}")
response = ping3.ping(host, timeout=2)
if response:
logger.info(f"Host {host} is reachable (Round-trip time: {response} ms)")
status = True
else:
logger.error(f"Host {host} is not reachable")
except socket.error as e:
logger.exception(f"Error while trying to reach {host}: {e}")
logger.info("Exiting from ping_host")
return status
@staticmethod
def list_the_files(files_list):
try:
logger.info("Entered in list_the_files")
return files_list.split(",")
except Exception as e:
logger.exception(f"Exception occurred while listing the files - {e}.")
logger.info("Exiting from list_the_files")
return None
@staticmethod
def update_file(file_path, details):
logger.info("Entered in update_file")
try:
with open(file_path, 'w') as file:
file.write(details)
except Exception as e:
logger.exception(f" - {e}.")
logger.info("Exiting from update_file")
@staticmethod
def jwt_decode(data, secret_key):
logger.info("Entering the jwt_decode")
return jwt.decode(data, secret_key, algorithms=["HS256"])
def fetch_device_details_from_lic(self, license_file_path, secret_key):
try:
logger.info("Entering in fetch_device_details_from_lic")
if os.path.isfile(license_file_path):
with open(license_file_path, 'r') as file:
encoded_data = file.read()
decoded_data = self.jwt_decode(encoded_data, secret_key)
logger.info("Exiting from fetch_device_details_from_lic")
logger.info(
f"ilens_device_id - {decoded_data['ilens_device_id']}, project_id - {decoded_data['project_id']}")
return decoded_data["ilens_device_id"], decoded_data["project_id"]
except Exception as e:
logger.exception(f"Exception occurred while fetching device details - {e}.")
logger.info("Exiting from fetch_device_details_from_lic with exception")
logger.info(f"ilens_device_id - {None}, project_id - {None}")
return None, None
@staticmethod
def fetch_data_from_conf(conf_file_path):
try:
logger.info("Entering in fetch_data_from_conf")
config = ConfigParser()
config.read(conf_file_path)
device_id = config.get("AGENT", "agent_id")
project_id = config.get("AGENT", "registration_project_id")
logger.info(f"ilens_device_id - {device_id}, project_id - {project_id}")
return device_id, project_id
except Exception as e:
logger.exception(f"Exception occurred while fetching device details - {e}.")
return None
class WindowsUtilities:
def find_ilens_folder(self):
try:
logger.info("Entering in find_ilens_folder")
for drive_letter in self.get_windows_drives():
folder_path = os.path.join(drive_letter, "iLens")
if os.path.exists(folder_path) and os.path.isdir(folder_path):
return folder_path
except Exception as e:
logger.exception(f"{e}.")
logger.info("Exiting from find_ilens_folder")
return None
@staticmethod
def get_windows_drives():
try:
logger.info("Entering in get_windows_drives")
drives = []
for partition in psutil.disk_partitions(all=True):
if "cdrom" not in partition.opts and partition.fstype != "":
drives.append(partition.device)
logger.info("Exiting from get_windows_drives")
return drives
except Exception as e:
logger.exception(f"Exception occurred while getting windows drives - {e}.")
logger.info("Exiting from get_windows_drives")
return None
def modify_file_path(self, file_path):
try:
logger.info("Entering in modify_file_path")
drive = (self.find_ilens_folder()).replace("iLens", "")
ilens_path = [directory for directory in ((file_path.split(":"))[1]).split("\\") if directory]
ilens_path.insert(0, drive)
modified_path = os.path.join(*ilens_path)
logger.info("Exiting from modify_file_path")
return modified_path
except Exception as e:
logger.exception(f"Exception occurred while altering the file path - {e}.")
logger.info("Exiting from modify_file_path")
return None
common_utilities = CommonUtilities()
import base64
import json
import socket
from datetime import datetime
import requests
from scripts.constants.app_config import app_config
from scripts.constants.app_constants import constants, services
from scripts.constants.app_variables import Counter, DataPool, DeviceInfo
from scripts.constants.events import events_constants
from scripts.logging.logger import logger
from scripts.utilities.common_util import common_utilities
from scripts.utilities.service_util import service_operations
def post_data(json_data, endpoint="receive_data"):
try:
logger.info("Entered in post_data")
monitoring_endpoint = f"http://{app_config.host_ip}:{app_config.port_no}/{endpoint}"
headers = {'Content-Type': 'application/json'}
response = requests.post(monitoring_endpoint, data=json.dumps(json_data), headers=headers)
if response.status_code == 200:
response_data = response.content.decode()
print(response_data)
logger.info(response_data)
if endpoint == "fetch_device_details":
DataPool.data_pool = []
response_json = (json.loads(response_data)).get("action", {})
if (response_json.get("pipeline_version") != json_data.get("pipeline_version")
and response_json.get("pipeline_version") == "upgrade"):
file_name = constants.channel_pipeline_path
file_data = common_utilities.read_configs(file_name)
response_json = {
"file_name": file_name,
"data": file_data
}
post_data(response_json)
post_events(event_code=events_constants.pipeline_mismatch)
if response_json.get("acquisition_status"):
Counter.stop_counter += 1
if (app_config.is_data_source and Counter.stop_counter >= app_config.acquisition_restart_frequency
and service_operations.check_ilens_client_status()):
if service_operations.stop_service(services.acquisition_engine):
post_events(event_code=events_constants.primary_acq_stopped)
else:
post_events(event_code=events_constants.primary_acq_stopping_failed)
else:
Counter.stop_counter = 0
except requests.RequestException as e:
print(f"# Error sending JSON data: {str(e)}")
logger.exception(f"# Error sending JSON data: {str(e)}")
def post_events(event_code, module_name=None):
try:
logger.info("Entered in post_events")
headers = {'Content-Type': 'application/json'}
event_json = events_constants.payload_template
event_json["timestamp"] = int((datetime.now().timestamp()) * 1000)
event_json["ilens_device_id"] = DeviceInfo.device_id
event_json["project_id"] = DeviceInfo.project_id
event_json.update(events_constants.get_event_description(event_code=event_code))
if module_name:
event_json["event_description"] = f"{event_json['event_description']}{module_name}."
if app_config.event_uploader_type == "udp":
sender_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
final_message = {"topic": constants.event_topic, "data": event_json}
bytes_to_send = base64.b64encode(json.dumps(final_message).encode())
sender_socket.sendto(
bytes_to_send,
(app_config.event_uploader_host_ip, int(app_config.event_uploader_port))
)
sender_socket.close()
elif app_config.event_uploader_type == "http":
event_endpoint = f"{app_config.event_uploader_host_ip}{constants.event_topic}"
response = requests.post(
event_endpoint,
data=json.dumps(event_json),
headers=headers,
timeout=10,
verify=False
)
if response.status_code == 200:
print(response.content.decode())
logger.info(response.content.decode())
except requests.RequestException as e:
print(f"# Error sending Events: {str(e)}")
logger.exception(f"# Error sending Events: {str(e)}")
import json
import paho.mqtt.client as mqtt
from scripts.constants.app_variables import DataPool, DeviceInfo
from scripts.logging.logger import logger
class MQTTSub(object):
def __init__(self, host, port, topic):
DataPool.data_pool = []
self.client = mqtt.Client()
logger.debug(f"Listener MQTT Details {host}:{port}")
try:
self.client.connect(host=host, port=int(port))
self.client.subscribe(topic)
self.client.loop_start()
except Exception as es:
logger.error(f'exception while connection to mqtt : {es}')
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
def on_connect(self, client, userdata, flags, rc):
logger.info("MQTT : Connected with result code " + str(rc))
def on_message(self, client, userdata, msg):
try:
logger.info("Entering in mqtt - on_message")
temp_data_pool = []
message = json.loads(msg.payload.decode())
logger.info(f"message - {message}")
device_id = message.get("a_id")
if device_id == DeviceInfo.device_id:
data = message.get("data")
for key, value in data.items():
temp_data_pool.append(int(value.get('dq')))
DataPool.data_pool = temp_data_pool
logger.info("Exiting from mqtt - on_message")
except Exception as es:
logger.error(f'Exception while fetching data : {es}')
import subprocess
import time
from scripts.constants.app_constants import constants, services
from scripts.logging.logger import logger
class ServiceOperations:
@staticmethod
def is_service_enabled(service_name):
try:
logger.info("Entering in is_service_enabled")
output = subprocess.check_output(["sc", "query", service_name], universal_newlines=True)
if "STATE" in output and "START_TYPE" in output:
state_line = None
start_type_line = None
for line in output.splitlines():
if "STATE" in line:
state_line = line
elif "START_TYPE" in line:
start_type_line = line
state = state_line.split(":")[1].strip()
start_type = start_type_line.split(":")[1].strip()
return state == "4" and start_type == "2"
except subprocess.CalledProcessError:
pass
logger.info("Exiting from is_service_enabled")
return False
def restart_service(self, service_name):
try:
logger.info("Entering in restart_service")
if constants.is_windows:
if not self.is_service_enabled(service_name):
subprocess.run(["sc", "config", service_name, "start=auto"], check=True)
time.sleep(5)
current_status = subprocess.check_output(["sc", "query", service_name]).decode("utf-8")
time.sleep(5)
if "RUNNING" in current_status:
subprocess.run(["sc", "stop", service_name], check=True)
time.sleep(5)
subprocess.run(["sc", "start", service_name], check=True)
time.sleep(5)
return True
else:
subprocess.run(["sudo", "systemctl", "restart", service_name], check=True)
result = subprocess.run(
["sudo", "systemctl", "is-active", service_name],
capture_output=True,
text=True,
)
if result.returncode == 0:
return True
else:
return False
except subprocess.CalledProcessError as e:
logger.exception(f"Failed to restart the service '{service_name}'. Error: {e}")
logger.info("Exiting from restart_service")
return False
def stop_service(self, service_name):
try:
logger.info("Entering in stop_service")
if constants.is_windows:
subprocess.run(["sc", "stop", service_name], check=True)
if self.is_service_enabled(service_name):
subprocess.run(["sc", "config", service_name, "start=disabled"], check=True)
time.sleep(5)
logger.info("Exiting from stop_service in windows as true")
return True
else:
subprocess.run(["sudo", "systemctl", "disable", service_name], check=True)
subprocess.run(["sudo", "systemctl", "stop", service_name], check=True)
result = subprocess.run(
["sudo", "systemctl", "is-active", service_name],
capture_output=True,
text=True,
)
if result.returncode == 0:
logger.info("Exiting from stop_service in linux as false - service doesn't stopped")
return False
else:
logger.info("Exiting from stop_service in linux as true - service stopped")
return True
except Exception as e:
logger.exception(f"Exception occurred while stopping {service_name} - {e}.")
logger.info("Exiting from stop_service")
return False
@staticmethod
def check_ilens_client_status():
try:
logger.info("Entering in check_ilens_client_status")
if constants.is_windows:
output = subprocess.check_output(
["sc", "query", services.acquisition_engine], universal_newlines=True
)
if "STATE" in output:
for line in output.splitlines():
if "STATE" in line and "RUNNING" in line:
logger.info("Exiting from check_ilens_client_status in windows as true - service is active.")
return True
logger.info("Exiting from check_ilens_client_status in windows as false - service is inactive")
return False
else:
result = subprocess.run(
["sudo", "systemctl", "is-active", services.acquisition_engine],
capture_output=True,
text=True,
)
if result.returncode == 0:
logger.info("Exiting from check_ilens_client_status in linux as true - service is active")
return True
else:
logger.info("Exiting from check_ilens_client_status in linux as false - service is inactive")
return False
except Exception as e:
logger.exception(f"Exception occurred while check_ilens_client_status - {e}")
@staticmethod
def service_name_mapper(file_path):
try:
logger.info(f"Entering in service_name_mapper with filepath - {file_path}.")
service_modules = {
".conf": services.ilens_agent,
"monitoring_engine": services.monitoring_engine,
"ilens_heartbeat": services.ilens_heartbeat,
"ilens_data_receiver": services.ilens_data_receiver,
"ilens_data_transmitter": services.ilens_data_transmitter,
"ilens_events_transmitter": services.ilens_events_transmitter,
"ilens_file_transmitter": services.ilens_file_transmitter
}
for module_keyword, module_service in service_modules.items():
if module_keyword in file_path:
logger.info(f"Exiting from service_name_mapper for {module_service} module.")
return module_service
except Exception as e:
logger.exception(f"Exception occurred while mapping service name for module - {e}")
return None
service_operations = ServiceOperations()
import base64
import json
import socket
from scripts.constants.app_variables import DataPool
from scripts.logging.logger import logger
class UDPSub:
def __init__(self, host, port, buffer_size):
try:
receiver_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
receiver_socket.bind((host, int(port)))
while True:
logger.info("Entered in UDPSub")
temp_pool = []
data_received, _ = receiver_socket.recvfrom(buffer_size)
decoded_message = json.loads(base64.b64decode(data_received).decode())
logger.info(f"decoded_message:{decoded_message}")
data = decoded_message.get("data").get("data")
for key, value in data.items():
temp_pool.append(int(value.get('dq')))
logger.info(f"temp_pool : {temp_pool}")
DataPool.data_pool = temp_pool
logger.info(f"data added - {DataPool.data_pool}")
logger.info("Exiting from UDPSub")
except Exception as e:
logger.exception(f"Exception occurred while subscribing local udp uploader - {e}.")
import threading
from datetime import datetime
import uvicorn
from fastapi import FastAPI
from scripts.constants.api_endpoints import api_endpoints
from scripts.constants.app_variables import TimeVariable
from scripts.constants.app_config import app_config
from scripts.handlers.device_handler import device_handler
from scripts.handlers.files_handler import FilesHandler
from scripts.handlers.redundancy_handler import RedundancyHandler
from scripts.logging.logger import logger
app = FastAPI()
files_handler = FilesHandler()
redundancy_handler = RedundancyHandler()
api_hit_thread = threading.Thread(target=device_handler.check_last_hit)
api_hit_thread.start()
@app.post(api_endpoints.receive_data)
async def receive_data(data: dict):
response = {
"status": "Failed",
"message": "File updating failed."
}
try:
given_path = data.get("file_name")
file_path = files_handler.is_file_path_need(given_path)
config_data = data.get("data")
files_handler.sync_config(file_path, config_data)
print("File updated successfully.")
logger.info("File updated successfully.")
response["status"] = "Success"
response["message"] = "File updating successful."
except Exception as e:
logger.Error(f"Exception occurred while updating file - {e}.")
return response
@app.post(api_endpoints.config_sync)
async def receive_config(data: dict):
response = {
"status": "Failed",
"message": "Configuration syncing failed."
}
try:
for given_path, config_data in data.items():
file_path = files_handler.is_file_path_need(given_path)
files_handler.sync_config(file_path, config_data)
print("Configuration synced successfully.")
logger.info("Configuration synced successfully.")
response["status"] = "Success"
response["message"] = "Configuration syncing successful."
except Exception as e:
logger.error(f"Exception occurred while performing configuration syncing - {e}.")
return response
@app.post(api_endpoints.fetch_details)
async def fetch_primary_device_details(data: dict):
response = {
"status": "Failed",
"message": "Primary device details fetching failed.",
"action": None
}
try:
response_data = f"Heartbeat message - {data}"
print(response_data)
logger.info(response_data)
logger.info(f"TimeVariable.api_last_hit_time in receiving hb entry- {TimeVariable.api_last_hit_time}")
TimeVariable.api_last_hit_time = datetime.now()
logger.info(
f"TimeVariable.api_last_hit_time in receiving hb after updating time- {TimeVariable.api_last_hit_time}")
device_handler.check_system_time(data.get("date_time"))
device_handler.check_last_quality_data_time(data.get("data_check"))
response["status"] = "Success"
response["message"] = "Primary device details fetching successful."
response["action"] = redundancy_handler.response_action(data.get("data_check"))
except Exception as e:
logger.error(f"Exception occurred while fetching primary device details - {e}.")
return response
if __name__ == "__main__":
logger.info("------------------------------------------Starting Service------------------------------------------")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment