Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
T
test
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
CI / CD Analytics
Repository Analytics
Value Stream Analytics
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ajil.k
test
Commits
a2304e73
Commit
a2304e73
authored
Jan 06, 2025
by
Ajil K
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
updated structure and added trace mode in logger
parent
6496cab6
Changes
16
Hide whitespace changes
Inline
Side-by-side
Showing
16 changed files
with
520 additions
and
283 deletions
+520
-283
README.md
README.md
+86
-0
conf/config.yml
conf/config.yml
+21
-11
primary_service.py
primary_service.py
+5
-5
requirements.txt
requirements.txt
+2
-2
scripts/constants/app_config.py
scripts/constants/app_config.py
+15
-9
scripts/constants/app_variables.py
scripts/constants/app_variables.py
+4
-4
scripts/core/handlers/device_handler.py
scripts/core/handlers/device_handler.py
+38
-42
scripts/core/handlers/files_handler.py
scripts/core/handlers/files_handler.py
+85
-80
scripts/core/handlers/redundancy_handler.py
scripts/core/handlers/redundancy_handler.py
+23
-22
scripts/logging/logger.py
scripts/logging/logger.py
+167
-28
scripts/utilities/common_util.py
scripts/utilities/common_util.py
+31
-33
scripts/utilities/communication_util.py
scripts/utilities/communication_util.py
+12
-15
scripts/utilities/mqtt_subscription_util.py
scripts/utilities/mqtt_subscription_util.py
+2
-2
scripts/utilities/service_util.py
scripts/utilities/service_util.py
+17
-17
scripts/utilities/udp_subscription_util.py
scripts/utilities/udp_subscription_util.py
+2
-2
secondary_service.py
secondary_service.py
+10
-11
No files found.
README.md
0 → 100644
View file @
a2304e73
primary_device_config
primary_device:
resync_time: "00:15"
heartbeat_interval: 120
files_to_watch:
<path
_to_the_files_and_it_should_be_separated_by_a_comma
>
secondary_device:
host_ip:
<secondary
_device_ip
>
port: 8088
data_check_frequency: 5
quality_time_frequency: 2
acquisition_restart_frequency: 2
local_uploader:
type: udp
host_ip: 127.0.0.1
port: 20015
event_uploader:
type: udp
host_ip:
<primary
_TX
'
s_ip
>
port: 20004
start_delay_time: 0
data_source: True
config:
channel:
uploader_host:
<primary
_TX
'
s_ip
>
uploader_port: 20003
agent:
base_url: udp://
<primary
_TX
'
s_ip
>
:20002/
monitoring_engine:
host_ip:
port:
url:
secondary_device_config
primary_device:
resync_time: "00:15"
heartbeat_interval: 120
files_to_watch:
<path
_to_the_files_and_it_should_be_separated_by_a_comma
>
secondary_device:
host_ip:
<secondary
_device_ip
>
port: 8088
data_check_frequency: 5
quality_time_frequency: 2
acquisition_restart_frequency: 2
local_uploader:
type: udp
host_ip: 127.0.0.1
port: 20015
event_uploader:
type: udp
host_ip:
<secondary
_TX
'
s_ip
>
port: 20004
start_delay_time: 0
data_source: True
config:
channel:
uploader_host:
<secondary
_TX
'
s_ip
>
uploader_port: 20003
agent:
base_url: udp://
<secondary
_TX
'
s_ip
>
:20002/
monitoring_engine:
host_ip:
port:
url:
conf/config.yml
View file @
a2304e73
logger
:
level
:
TRACE
handlers
:
-
back_up_count
:
5
file_path
:
logs/
logg_mqtt
:
true
max_bytes
:
100000000
type
:
RotatingFileHandler
-
type
:
StreamHandler
primary_device
:
resync_time
:
"
00:15
"
resync_time
:
"
19:47
"
heartbeat_interval
:
120
files_to_watch
:
E:\\iLens\\ilens-agent\\engine\\acquisition_engine\\conf,E
:\\iLens\\ilens-agent\\conf
files_to_watch
:
C:\\iLens\\ilens-agent\\engine\\acquisition_engine\\conf,C
:\\iLens\\ilens-agent\\conf
secondary_device
:
host_ip
:
192.168.1.
36
host_ip
:
192.168.1.
15
port
:
8088
data_check_frequency
:
5
...
...
@@ -17,21 +27,21 @@ local_uploader:
port
:
20015
event_uploader
:
type
:
htt
p
host_ip
:
192.168.
0.2
20
port
:
type
:
ud
p
host_ip
:
192.168.
1.
20
port
:
20004
start_delay_time
:
0
data_source
:
Tru
e
data_source
:
Fals
e
config
:
channel
:
uploader_host
:
2.2.2.29
uploader_port
:
uploader_host
:
192.168.1.20
uploader_port
:
20003
agent
:
base_url
:
http://192.168.0.220/dcp_api
base_url
:
udp://192.168.1.20:20002/
monitoring_engine
:
host_ip
:
port
:
url
:
url
:
\ No newline at end of file
primary_service.py
View file @
a2304e73
from
scripts.handlers.files_handler
import
redundancy_initializer
from
scripts.logging.logger
import
logger
from
scripts.core.handlers.files_handler
import
redundancy_initializer
from
scripts.logging.logger
import
get_logger
logger
=
get_logger
(
"primary_redundancy"
)
if
__name__
==
"__main__"
:
try
:
print
(
"----------------------------------------Starting Service----------------------------------------"
)
logger
.
info
(
"----------------------------------------Starting Service----------------------------------------"
)
redundancy_initializer
()
redundancy_initializer
(
logger
)
except
KeyboardInterrupt
:
print
(
"-----------------------------------------Service Stopped-----------------------------------------"
)
logger
.
info
(
"---------------------------------------Service Stopped---------------------------------------"
)
requirements.txt
View file @
a2304e73
...
...
@@ -2,9 +2,9 @@ uvicorn==0.22.0
fastapi
==0.103.2
schedule
==1.2.1
PyYAML
==5.4.1
requests
==2.0.0
watchdog
==3.0.0
paho-mqtt
==1.5.0
PyJWT
==2.4.0
ping3
==4.0.4
psutil
==5.9.6
\ No newline at end of file
psutil
==5.9.6
httpx
~=0.28.1
\ No newline at end of file
scripts/constants/app_config.py
View file @
a2304e73
from
scripts.utilities.common_util
import
common_utilities
from
yaml
import
safe_load
config_data
=
common_utilities
.
read_configs
(
"conf/config.yml"
)
with
open
(
"conf/config.yml"
,
'r'
)
as
file
:
config_data
=
safe_load
(
file
)
class
AppConfig
:
...
...
@@ -8,14 +10,14 @@ class AppConfig:
quality_time_frequency
=
int
(
config_data
.
get
(
"quality_time_frequency"
))
acquisition_restart_frequency
=
int
(
config_data
.
get
(
"acquisition_restart_frequency"
))
event_uploader_type
=
config_data
.
get
(
"event_uploader"
)
.
get
(
"type"
)
event_uploader_host_ip
=
config_data
.
get
(
"event_uploader"
)
.
get
(
"host_ip"
)
event_uploader_port
=
config_data
.
get
(
"event_uploader"
)
.
get
(
"port"
,
None
)
event_uploader_type
=
config_data
.
get
(
"event_uploader"
,
{}
)
.
get
(
"type"
)
event_uploader_host_ip
=
config_data
.
get
(
"event_uploader"
,
{}
)
.
get
(
"host_ip"
)
event_uploader_port
=
config_data
.
get
(
"event_uploader"
,
{}
)
.
get
(
"port"
,
None
)
start_delay_time
=
int
(
config_data
.
get
(
"start_delay_time"
))
resync_time
=
config_data
.
get
(
"primary_device"
)
.
get
(
"resync_time
"
)
run_time
=
int
(
config_data
.
get
(
"primary_device"
)
.
get
(
"heartbeat_interval
"
))
files_to_watch
=
co
mmon_utilities
.
list_the_files
(
config_data
.
get
(
"primary_device"
)
.
get
(
"files_to_watch"
)
)
start_delay_time
=
int
(
config_data
.
get
(
"start_delay_time"
,
0
))
resync_time
=
config_data
.
get
(
"primary_device"
,
{})
.
get
(
"resync_time"
,
"
"
)
run_time
=
int
(
config_data
.
get
(
"primary_device"
,
{})
.
get
(
"heartbeat_interval"
,
"
"
))
files_to_watch
=
co
nfig_data
.
get
(
"primary_device"
,
{})
.
get
(
"files_to_watch"
,
""
)
host_ip
=
config_data
.
get
(
"secondary_device"
)
.
get
(
"host_ip"
)
port_no
=
int
(
config_data
.
get
(
"secondary_device"
)
.
get
(
"port"
))
...
...
@@ -37,4 +39,8 @@ class AppConfig:
monitoring_engine_url
=
config_data
.
get
(
"config"
)
.
get
(
"monitoring_engine"
)
.
get
(
"url"
)
class
LoggerConfigurations
:
LEVEL
:
str
=
config_data
.
get
(
"logger"
,
{})
.
get
(
"level"
,
"INFO"
)
HANDLERS
:
str
=
config_data
.
get
(
"logger"
,
{})
.
get
(
"handlers"
,
""
)
app_config
=
AppConfig
()
scripts/constants/app_variables.py
View file @
a2304e73
...
...
@@ -2,11 +2,11 @@ from datetime import datetime
class
Counter
:
stop_counter
=
0
stop_counter
:
int
=
0
class
DataPool
:
data_pool
=
[]
data_pool
:
list
=
[]
class
TimeVariable
:
...
...
@@ -15,5 +15,5 @@ class TimeVariable:
class
DeviceInfo
:
device_id
=
""
project_id
=
""
device_id
:
str
=
""
project_id
:
str
=
""
scripts/handlers/device_handler.py
→
scripts/
core/
handlers/device_handler.py
View file @
a2304e73
...
...
@@ -6,16 +6,17 @@ from scripts.constants.app_config import app_config
from
scripts.constants.app_constants
import
constants
,
services
from
scripts.constants.app_variables
import
TimeVariable
from
scripts.constants.events
import
events_constants
from
scripts.logging.logger
import
logger
from
scripts.utilities.communication_util
import
post_events
from
scripts.utilities.service_util
import
service_operations
class
DeviceHandler
:
@
staticmethod
def
check_system_time
(
timestamp_str
):
def
__init__
(
self
,
logger
):
self
.
logger
=
logger
def
check_system_time
(
self
,
timestamp_str
):
self
.
logger
.
trace
(
"Entered in check_system_time"
)
try
:
logger
.
info
(
"Entered in check_system_time"
)
current_time
=
datetime
.
now
()
timestamp
=
datetime
.
strptime
(
timestamp_str
,
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
)
time_difference
=
current_time
-
timestamp
...
...
@@ -29,48 +30,46 @@ class DeviceHandler:
command
=
f
"date -s {timestamp}"
subprocess
.
run
(
command
,
shell
=
True
)
post_events
(
events_constants
.
time_updated
)
logger
.
info
(
"System time updated and events send successfully."
)
self
.
logger
.
info
(
"System time updated and events send successfully."
)
except
Exception
as
e
:
logger
.
exception
(
f
"Exception occurred while checking system time - {e}."
)
logger
.
info
(
"Exiting from check_system_time"
)
self
.
logger
.
exception
(
f
"Exception occurred while checking system time - {e}."
)
self
.
logger
.
trace
(
"Exiting from check_system_time"
)
@
staticmethod
def
check_last_hit
():
def
check_last_hit
(
self
):
try
:
while
True
:
logger
.
info
(
"Entered check_last_hit"
)
logger
.
info
(
f
"data const var before checking in check_last_hit entry - {service_operations.check_ilens_client_status()}"
)
logger
.
info
(
f
"TimeVariable.api_last_hit_time in check_last_hit - {TimeVariable.api_last_hit_time}"
)
self
.
logger
.
trace
(
"Entered check_last_hit"
)
self
.
logger
.
trace
(
f
"data const var before checking in check_last_hit entry - {service_operations.check_ilens_client_status(
self.logger
)}"
)
self
.
logger
.
trace
(
f
"TimeVariable.api_last_hit_time in check_last_hit - {TimeVariable.api_last_hit_time}"
)
current_time
=
datetime
.
now
()
time_difference
=
current_time
-
TimeVariable
.
api_last_hit_time
logger
.
info
(
f
"time_difference: {time_difference}"
)
self
.
logger
.
info
(
f
"time_difference: {time_difference}"
)
waiting_time
=
timedelta
(
seconds
=
(
app_config
.
data_check_frequency
*
app_config
.
run_time
))
if
time_difference
>
waiting_time
and
app_config
.
is_data_source
:
if
not
service_operations
.
check_ilens_client_status
():
if
not
service_operations
.
check_ilens_client_status
(
self
.
logger
):
time
.
sleep
(
app_config
.
start_delay_time
)
operation_status
=
service_operations
.
restart_service
(
services
.
acquisition_engine
)
if
operation_status
:
post_events
(
events_constants
.
secondary_acq_restarted
)
logger
.
info
(
"Acquisition Engine module started due to no response from primary device."
)
self
.
logger
.
info
(
"Acquisition Engine module started due to no response from primary device."
)
else
:
post_events
(
events_constants
.
secondary_acq_restart_failed
)
logger
.
error
(
"Failed to start the Acquisition Engine module."
)
self
.
logger
.
error
(
"Failed to start the Acquisition Engine module."
)
time
.
sleep
(
app_config
.
run_time
)
logger
.
info
(
f
"TimeVariable.api_last_hit_time in check_last_hit exit - {TimeVariable.api_last_hit_time}"
)
logger
.
info
(
f
"data const var after checking in check_last_hit - {service_operations.check_ilens_client_status()}"
)
self
.
logger
.
trace
(
f
"TimeVariable.api_last_hit_time in check_last_hit exit - {TimeVariable.api_last_hit_time}"
)
self
.
logger
.
trace
(
f
"data const var after checking in check_last_hit - {service_operations.check_ilens_client_status(
self.logger
)}"
)
except
Exception
as
e
:
logger
.
exception
(
f
"Exception occurred while checking last hit - {e}."
)
logger
.
info
(
"Exiting from check_last_hit"
)
self
.
logger
.
exception
(
f
"Exception occurred while checking last hit - {e}."
)
self
.
logger
.
trace
(
"Exiting from check_last_hit"
)
@
staticmethod
def
check_last_quality_data_time
(
data_check
):
def
check_last_quality_data_time
(
self
,
data_check
):
self
.
logger
.
trace
(
"Entered check_last_quality_data_time"
)
try
:
logger
.
info
(
"Entered check_last_quality_data_time"
)
logger
.
info
(
f
"data const var before checking in check_last_quality_data_time - {service_operations.check_ilens_client_status()}"
)
logger
.
info
(
self
.
logger
.
trace
(
f
"data const var before checking in check_last_quality_data_time - {service_operations.check_ilens_client_status(self.logger)}"
)
self
.
logger
.
trace
(
f
"TimeVariable.last_quality_data check_last_quality_data_time entry: {TimeVariable.last_quality_data}"
)
if
data_check
:
TimeVariable
.
last_quality_data
=
datetime
.
now
()
...
...
@@ -78,30 +77,27 @@ class DeviceHandler:
time_difference
=
current_time
-
TimeVariable
.
last_quality_data
waiting_time
=
timedelta
(
seconds
=
(
app_config
.
quality_time_frequency
*
app_config
.
run_time
))
if
time_difference
>
waiting_time
and
app_config
.
is_data_source
:
if
not
service_operations
.
check_ilens_client_status
():
if
not
service_operations
.
check_ilens_client_status
(
self
.
logger
):
time
.
sleep
(
app_config
.
start_delay_time
)
operation_status
=
service_operations
.
restart_service
(
services
.
acquisition_engine
)
if
operation_status
:
post_events
(
events_constants
.
secondary_acq_restarted_bad_data
)
logger
.
info
(
"Acquisition Engine module started due to bad quality data in primary device."
)
self
.
logger
.
info
(
"Acquisition Engine module started due to bad quality data in primary device."
)
else
:
post_events
(
events_constants
.
secondary_acq_restart_failed
)
logger
.
error
(
"Failed to start the Acquisition Engine module."
)
elif
app_config
.
is_data_source
and
service_operations
.
check_ilens_client_status
()
and
data_check
:
self
.
logger
.
error
(
"Failed to start the Acquisition Engine module."
)
elif
app_config
.
is_data_source
and
service_operations
.
check_ilens_client_status
(
self
.
logger
)
and
data_check
:
operation_status
=
service_operations
.
stop_service
(
services
.
acquisition_engine
)
if
operation_status
:
post_events
(
events_constants
.
secondary_acq_stopped
)
logger
.
info
(
"Acquisition Engine module stopped on secondary device."
)
self
.
logger
.
info
(
"Acquisition Engine module stopped on secondary device."
)
else
:
post_events
(
events_constants
.
secondary_acq_stopping_failed
)
logger
.
error
(
"Failed to stop the Acquisition Engine module on secondary device."
)
logger
.
info
(
f
"data const var after checking in check_last_quality_data_time - {service_operations.check_ilens_client_status()}"
)
self
.
logger
.
error
(
"Failed to stop the Acquisition Engine module on secondary device."
)
self
.
logger
.
trace
(
f
"data const var after checking in check_last_quality_data_time - {service_operations.check_ilens_client_status(
self.logger
)}"
)
except
Exception
as
e
:
logger
.
exception
(
f
"Exception occurred while checking last quality data time - {e}."
)
logger
.
info
(
self
.
logger
.
exception
(
f
"Exception occurred while checking last quality data time - {e}."
)
self
.
logger
.
trace
(
f
"TimeVariable.last_quality_data check_last_quality_data_time exit: {TimeVariable.last_quality_data}"
)
logger
.
info
(
"Exiting from check_last_quality_data_time"
)
device_handler
=
DeviceHandler
()
self
.
logger
.
trace
(
"Exiting from check_last_quality_data_time"
)
scripts/handlers/files_handler.py
→
scripts/
core/
handlers/files_handler.py
View file @
a2304e73
...
...
@@ -13,13 +13,13 @@ from scripts.constants.app_config import app_config
from
scripts.constants.app_constants
import
constants
,
services
from
scripts.constants.app_variables
import
DeviceInfo
from
scripts.constants.events
import
events_constants
from
scripts.handlers.redundancy_handler
import
RedundancyHandler
from
scripts.
core.
handlers.redundancy_handler
import
RedundancyHandler
from
scripts.logging.logger
import
logger
from
scripts.utilities.common_util
import
common_utilities
from
scripts.utilities.communication_util
import
post_events
,
post_data
from
scripts.utilities.mqtt_subscription
import
MQTTSub
from
scripts.utilities.mqtt_subscription
_util
import
MQTTSub
from
scripts.utilities.service_util
import
service_operations
from
scripts.utilities.udp_subscription
import
UDPSub
from
scripts.utilities.udp_subscription
_util
import
UDPSub
conf_file_path
=
os
.
path
.
join
(
constants
.
ilens_agent_path
,
constants
.
conf_path
,
"application.conf"
)
try
:
...
...
@@ -37,29 +37,30 @@ except Exception as e:
class
FilesHandler
:
@
staticmethod
def
daily_sync
():
def
__init__
(
self
,
logger_obj
):
self
.
logger
=
logger_obj
def
daily_sync
(
self
):
self
.
logger
.
trace
(
"Entered in daily_sync"
)
response_config_data
=
{}
try
:
logger
.
info
(
"Entered in daily_sync"
)
files_list
=
app_config
.
files_to_watch
files_list
=
common_utilities
.
list_the_files
(
app_config
.
files_to_watch
)
for
each_file
in
files_list
:
for
filename
in
constants
.
file_names
:
file_name
=
os
.
path
.
join
(
each_file
,
filename
)
if
os
.
path
.
isfile
(
file_name
):
file_data
=
common_utilities
.
read_configs
(
file_name
)
response_config_data
[
file_name
]
=
file_data
post_data
(
response_config_data
,
endpoint
=
"sync_config"
)
post_data
(
response_config_data
,
self
.
logger
,
endpoint
=
"sync_config"
)
post_events
(
event_code
=
events_constants
.
daily_sync
)
logger
.
info
(
"Exited from daily_sync"
)
except
Exception
as
e
:
logger
.
exception
(
f
"Exception occurred while running daily sync - {e}."
)
self
.
logger
.
exception
(
f
"Exception occurred while running daily sync - {e}."
)
self
.
logger
.
trace
(
"Exited from daily_sync"
)
@
staticmethod
def
sync_config
(
files_path
,
file_content
):
def
sync_config
(
self
,
files_path
,
file_content
):
self
.
logger
.
trace
(
"Entered in sync_config"
)
try
:
logger
.
info
(
"Entered in sync_config"
)
logger
.
info
(
f
"filepath - {files_path} and file_content - {file_content}"
)
self
.
logger
.
trace
(
f
"filepath - {files_path} and file_content - {file_content}"
)
check_pipeline
=
False
if
files_path
.
endswith
(
".conf"
):
config
=
configparser
.
ConfigParser
()
...
...
@@ -82,7 +83,7 @@ class FilesHandler:
for
uploader
in
uploader_list
:
received_node_host
=
file_content
[
"flow_data"
][
"nodes"
][
uploader
][
"node_configuration"
][
"host"
]
if
received_node_host
not
in
[
"localhost"
,
"127.0.0.1"
]:
print
(
self
.
logger
.
trace
(
f
"received host: {file_content['flow_data']['nodes'][uploader]['node_configuration']['host']}"
)
file_content
[
'flow_data'
][
'nodes'
][
uploader
][
'node_configuration'
][
'host'
]
=
app_config
.
channel_uploader_host
...
...
@@ -96,14 +97,14 @@ class FilesHandler:
if
app_config
.
monitoring_engine_url
:
file_content
[
"uploader"
][
"url"
]
=
app_config
.
monitoring_engine_url
else
:
logger
.
critical
(
"Error while updating monitoring engine's configuration file,"
self
.
logger
.
critical
(
"Error while updating monitoring engine's configuration file,"
" check the field - url in the redundancy configuration."
)
elif
(
file_content
[
"uploader"
][
"type"
])
.
lower
()
==
"udp"
:
if
app_config
.
monitoring_engine_host
and
app_config
.
monitoring_engine_port
:
file_content
[
"uploader"
][
"host"
]
=
app_config
.
monitoring_engine_host
file_content
[
"uploader"
][
"port"
]
=
app_config
.
monitoring_engine_port
else
:
logger
.
critical
(
"Error while updating monitoring engine's configuration file,"
self
.
logger
.
critical
(
"Error while updating monitoring engine's configuration file,"
" check the fields - host_ip and port in the redundancy configuration."
)
file_data
=
yaml
.
dump
(
file_content
,
default_flow_style
=
False
)
common_utilities
.
update_file
(
files_path
,
file_data
)
...
...
@@ -114,99 +115,102 @@ class FilesHandler:
service_status
=
service_operations
.
restart_service
(
service_name
)
if
service_status
:
post_events
(
event_code
=
events_constants
.
secondary_module_restarted
,
module_name
=
service_name
)
print
(
"Service restarted successfully."
)
logger
.
info
(
"Service restarted successfully."
)
self
.
logger
.
info
(
"Service restarted successfully."
)
else
:
post_events
(
event_code
=
events_constants
.
secondary_module_restart_failed
)
print
(
f
"Failed to restart the module - {service_name}."
)
logger
.
info
(
f
"Failed to restart the module - {service_name}."
)
logger
.
info
(
"Exited from sync_config"
)
self
.
logger
.
error
(
f
"Failed to restart the module - {service_name}."
)
except
Exception
as
e
:
logger
.
exception
(
f
"Exception occurred while syncing configuration file - {e}."
)
self
.
logger
.
exception
(
f
"Exception occurred while syncing configuration file - {e}."
)
self
.
logger
.
trace
(
"Exited from sync_config"
)
@
staticmethod
def
is_file_path_need
(
files_path
):
def
is_file_path_need
(
self
,
files_path
):
self
.
logger
.
trace
(
"Entered in is_file_path_need"
)
try
:
logger
.
info
(
"Entered in is_file_path_need"
)
logger
.
info
(
f
"filepath - {files_path}"
)
self
.
logger
.
info
(
f
"filepath - {files_path}"
)
if
constants
.
file_path_changes
:
from
scripts.utilities.common_util
import
WindowsUtilities
modified_path
=
WindowsUtilities
()
.
modify_file_path
(
files_path
)
logger
.
info
(
"Exiting from is_file_path_need"
)
self
.
logger
.
trace
(
"Exiting from is_file_path_need"
)
return
modified_path
else
:
logger
.
info
(
"Exiting from is_file_path_need"
)
self
.
logger
.
trace
(
"Exiting from is_file_path_need"
)
return
files_path
except
Exception
as
e
:
logger
.
exception
(
f
"Exception occurred - {e}."
)
self
.
logger
.
exception
(
f
"Exception occurred - {e}."
)
return
None
class
FileChangeHandler
(
FileSystemEventHandler
):
def
__init__
(
self
,
logger_obj
):
self
.
logger
=
logger_obj
def
on_modified
(
self
,
event
):
transfer_config
(
event
)
transfer_config
(
event
,
self
.
logger
)
def
on_created
(
self
,
event
):
transfer_config
(
event
)
transfer_config
(
event
,
self
.
logger
)
def
on_deleted
(
self
,
event
):
logger
.
critical
(
f
"File {event.src_path} has been deleted."
)
#
def on_deleted(self, event):
# self.
logger.critical(f"File {event.src_path} has been deleted.")
def
on_moved
(
self
,
event
):
if
any
(
event
.
dest_path
.
endswith
(
extension
)
for
extension
in
constants
.
file_names
):
transfer_config
(
event
,
check_destination
=
True
)
def
transfer_config
(
event
,
check_destination
=
False
):
logger
.
info
(
"Entered in transfer_config"
)
logger
.
info
(
f
"{event} - {check_destination}"
)
if
not
event
.
is_directory
:
event_code
=
(
events_constants
.
pipeline_modification
if
event
.
src_path
.
endswith
(
constants
.
pipeline_file
)
else
events_constants
.
configuration_modification
)
if
any
(
event
.
src_path
.
endswith
(
extension
)
for
extension
in
constants
.
file_names
):
file_data
=
common_utilities
.
read_configs
(
event
.
src_path
)
response_json
=
{
"file_name"
:
event
.
src_path
,
"data"
:
file_data
}
post_data
(
response_json
)
post_events
(
event_code
)
logger
.
info
(
f
"File {event.src_path} has been modified."
)
elif
check_destination
:
file_data
=
common_utilities
.
read_configs
(
event
.
src_path
)
response_json
=
{
"file_name"
:
event
.
dest_path
,
"data"
:
file_data
}
post_data
(
response_json
)
post_events
(
event_code
)
logger
.
info
(
f
"File {event.dest_path} has been modified."
)
logger
.
info
(
"Exiting from transfer_config"
)
def
redundancy_initializer
():
transfer_config
(
event
,
self
.
logger
,
check_destination
=
True
)
def
transfer_config
(
file_path
,
logger_obj
,
check_destination
=
False
):
logger_obj
.
trace
(
"Entered in transfer_config"
)
try
:
logger_obj
.
trace
(
f
"{file_path} - {check_destination}"
)
if
not
file_path
.
is_directory
:
event_code
=
(
events_constants
.
pipeline_modification
if
file_path
.
src_path
.
endswith
(
constants
.
pipeline_file
)
else
events_constants
.
configuration_modification
)
if
any
(
file_path
.
src_path
.
endswith
(
extension
)
for
extension
in
constants
.
file_names
):
file_data
=
common_utilities
.
read_configs
(
file_path
.
src_path
)
response_json
=
{
"file_name"
:
file_path
.
src_path
,
"data"
:
file_data
}
post_data
(
response_json
,
logger_obj
)
post_events
(
event_code
)
logger_obj
.
info
(
f
"File {file_path.src_path} has been modified."
)
elif
check_destination
:
file_data
=
common_utilities
.
read_configs
(
file_path
.
src_path
)
response_json
=
{
"file_name"
:
file_path
.
dest_path
,
"data"
:
file_data
}
post_data
(
response_json
,
logger_obj
)
post_events
(
event_code
)
logger_obj
.
info
(
f
"File {file_path.dest_path} has been modified."
)
else
:
logger_obj
.
error
(
f
"file_path - {file_path} is not proper path to file."
)
except
Exception
as
e
:
logger_obj
.
error
(
f
"Exception occurred while transferring configurations - {file_path} - {e}."
)
logger_obj
.
trace
(
"Exiting from transfer_config"
)
def
redundancy_initializer
(
logger_obj
):
try
:
logger
.
info
(
"Entered in redundancy_initializer"
)
file_handler
=
FilesHandler
()
event_handler
=
FileChangeHandler
()
redundancy_handler
=
RedundancyHandler
()
logger
_obj
.
trace
(
"Entered in redundancy_initializer"
)
file_handler
=
FilesHandler
(
logger_obj
)
event_handler
=
FileChangeHandler
(
logger_obj
)
redundancy_handler
=
RedundancyHandler
(
logger_obj
)
if
app_config
.
is_data_source
:
client_status
=
service_operations
.
check_ilens_client_status
()
client_status
=
service_operations
.
check_ilens_client_status
(
logger_obj
)
if
not
client_status
:
if
service_operations
.
restart_service
(
services
.
acquisition_engine
):
logger
.
info
(
"Acquisition Engine started on running primary redundancy module."
)
logger
_obj
.
info
(
"Acquisition Engine started on running primary redundancy module."
)
if
app_config
.
local_uploader_type
.
lower
()
==
"mqtt"
:
logger
.
info
(
"Using local MQTT Subscriber."
)
logger
_obj
.
info
(
"Using local MQTT Subscriber."
)
MQTTSub
(
app_config
.
local_uploader_ip
,
app_config
.
local_uploader_port
,
app_config
.
local_mqtt_topic
)
elif
app_config
.
local_uploader_type
.
lower
()
==
"udp"
:
logger
.
info
(
"Using local UDP Subscriber."
)
logger
_obj
.
info
(
"Using local UDP Subscriber."
)
local_udp_thread
=
threading
.
Thread
(
target
=
UDPSub
,
args
=
(
app_config
.
local_uploader_ip
,
app_config
.
local_uploader_port
,
1024000
)
...
...
@@ -217,7 +221,7 @@ def redundancy_initializer():
observer
=
Observer
()
for
files_path
in
app_config
.
files_to_watch
:
for
files_path
in
common_utilities
.
list_the_files
(
app_config
.
files_to_watch
)
:
observer
.
schedule
(
event_handler
,
files_path
,
recursive
=
True
)
observer
.
start
()
...
...
@@ -225,10 +229,11 @@ def redundancy_initializer():
try
:
while
True
:
schedule
.
run_pending
()
post_data
(
redundancy_handler
.
fetch_device_details
(),
endpoint
=
"fetch_device_details"
)
post_data
(
redundancy_handler
.
fetch_device_details
(),
logger_obj
,
endpoint
=
"fetch_device_details"
)
time
.
sleep
(
app_config
.
run_time
)
except
KeyboardInterrupt
:
observer
.
stop
()
observer
.
join
()
except
Exception
as
e
:
logger
.
exception
(
f
"Exception occurred while monitoring files - {e}"
)
logger_obj
.
exception
(
f
"Exception occurred while monitoring files - {e}"
)
logger_obj
.
trace
(
f
"Exiting from redundancy_initializer"
)
scripts/handlers/redundancy_handler.py
→
scripts/
core/
handlers/redundancy_handler.py
View file @
a2304e73
...
...
@@ -11,16 +11,17 @@ from scripts.utilities.service_util import service_operations
class
RedundancyHandler
:
@
staticmethod
def
fetch_pipeline_details
(
pipeline_data
):
logger
.
info
(
"Entered in fetch_pipeline_details"
)
def
__init__
(
self
,
logger
):
self
.
logger
=
logger
def
fetch_pipeline_details
(
self
,
pipeline_data
):
self
.
logger
.
trace
(
"Entered in fetch_pipeline_details"
)
pipeline_version
=
"upgrade"
try
:
pipeline_version
=
pipeline_data
.
get
(
"pipeline_version"
)
except
Exception
as
e
:
logger
.
exception
(
f
"No channel pipeline file found - {e}."
)
logger
.
info
(
f
"pipeline_version - {pipeline_version}"
)
logger
.
info
(
"Exiting from fetch_pipeline_details"
)
self
.
logger
.
exception
(
f
"No channel pipeline file found - {e}."
)
self
.
logger
.
trace
(
f
"Exiting from fetch_pipeline_details with pipeline_version - {pipeline_version}"
)
return
pipeline_version
@
staticmethod
...
...
@@ -28,11 +29,10 @@ class RedundancyHandler:
date_time
=
datetime
.
now
()
.
strftime
(
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
)
return
date_time
@
staticmethod
def
check_system_architecture
(
pipeline_data
):
def
check_system_architecture
(
self
,
pipeline_data
):
connection_status
=
"NA"
self
.
logger
.
trace
(
"Entered in check_system_architecture"
)
try
:
logger
.
info
(
"Entered in check_system_architecture"
)
flow_data_node_list
=
[
node
for
node
in
pipeline_data
.
get
(
"flow_data"
,
{})
.
get
(
"nodes"
,
{})
.
keys
()
if
not
node
.
startswith
(
"device_instance_"
)
if
not
node
.
startswith
(
"collector_"
)
...
...
@@ -41,7 +41,7 @@ class RedundancyHandler:
for
node
in
flow_data_node_list
:
if
node
.
startswith
(
"udp"
)
and
pipeline_data
.
get
(
"flow_data"
)
.
get
(
"nodes"
)
.
get
(
node
)
.
get
(
"node_configuration"
)
.
get
(
"host"
)
not
in
[
"127.0.0.1"
,
"localhost"
]:
logger
.
info
(
"Have diode dependency"
)
self
.
logger
.
trace
(
"Have diode dependency"
)
system_dependency
=
"Diode dependency"
node_configuration_details
=
pipeline_data
.
get
(
"flow_data"
)
.
get
(
"nodes"
)
.
get
(
flow_data_node_list
[
0
])
.
get
(
"node_configuration"
)
...
...
@@ -58,25 +58,25 @@ class RedundancyHandler:
}
except
Exception
as
e
:
logger
.
exception
(
f
"Exception occurred while checking system architecture - {e}."
)
logger
.
info
(
"Exiting from check_system_architecture"
)
self
.
logger
.
exception
(
f
"Exception occurred while checking system architecture - {e}."
)
self
.
logger
.
trace
(
"Exiting from check_system_architecture"
)
return
connection_status
def
fetch_device_details
(
self
):
response
=
None
self
.
logger
.
trace
(
"Entered in fetch_device_details"
)
try
:
logger
.
info
(
"Entered in fetch_device_details"
)
try
:
data_check
=
False
if
DataPool
.
data_pool
:
logger
.
info
(
f
"data pool:
\'
{DataPool.data_pool}
\'
"
)
self
.
logger
.
trace
(
f
"data pool:
\'
{DataPool.data_pool}
\'
"
)
data_quality_set
=
set
(
DataPool
.
data_pool
)
data_check
=
True
if
len
(
data_quality_set
)
==
1
and
list
(
data_quality_set
)[
0
]
==
2
:
data_check
=
False
post_events
(
events_constants
.
bad_data_quality
)
except
Exception
as
e
:
logger
.
error
(
f
"data_check exception: {e}"
)
self
.
logger
.
error
(
f
"data_check exception: {e}"
)
data_check
=
False
pipeline_data
=
common_utilities
.
read_configs
(
constants
.
channel_pipeline_path
)
response
=
{
...
...
@@ -86,15 +86,15 @@ class RedundancyHandler:
if
app_config
.
is_data_source
else
"NA"
),
"data_check"
:
data_check
if
app_config
.
is_data_source
else
True
}
logger
.
info
(
f
"Payload to secondary - {response}"
)
self
.
logger
.
info
(
f
"Payload to secondary - {response}"
)
except
Exception
as
e
:
logger
.
exception
(
f
"Exception occurred while fetching primary device details - {e}."
)
logger
.
info
(
"Exiting from fetch_device_details"
)
self
.
logger
.
exception
(
f
"Exception occurred while fetching primary device details - {e}."
)
self
.
logger
.
trace
(
"Exiting from fetch_device_details"
)
return
response
def
response_action
(
self
,
data_quality
):
try
:
logger
.
info
(
"Entered in response_action"
)
self
.
logger
.
trace
(
"Entered in response_action"
)
result
=
{
"date_time"
:
self
.
read_current_time
(),
"pipeline_version"
:
"NA"
,
...
...
@@ -103,9 +103,10 @@ class RedundancyHandler:
if
app_config
.
is_data_source
:
pipeline_data
=
common_utilities
.
read_configs
(
constants
.
channel_pipeline_path
)
result
[
"pipeline_version"
]
=
self
.
fetch_pipeline_details
(
pipeline_data
)
result
[
"acquisition_status"
]
=
(
service_operations
.
check_ilens_client_status
()
and
not
data_quality
)
result
[
"acquisition_status"
]
=
(
service_operations
.
check_ilens_client_status
(
logger
)
and
not
data_quality
)
return
result
except
Exception
as
e
:
logger
.
exception
(
f
"Exception occurred while getting response action details - {e}."
)
logger
.
info
(
"Exiting from response_action"
)
self
.
logger
.
exception
(
f
"Exception occurred while getting response action details - {e}."
)
self
.
logger
.
trace
(
"Exiting from response_action"
)
return
None
\ No newline at end of file
scripts/logging/logger.py
View file @
a2304e73
# import logging
# import os
# from logging.handlers import RotatingFileHandler
#
#
# logger_topic = None
# logging.trace = logging.DEBUG - 5
# logging.addLevelName(logging.DEBUG - 5, "TRACE")
#
# class EngineLoggerClass(logging.getLoggerClass()):
# def __init__(self, name):
# super().__init__(name)
#
# def trace(self, msg, *args, **kwargs):
# if self.isEnabledFor(logging.trace):
# self._log(logging.trace, msg, args, **kwargs)
#
# def info(self, msg, *args, **kwargs):
# if self.isEnabledFor(logging.INFO):
# self._log(logging.INFO, msg, args, **kwargs)
#
# def debug(self, msg, *args, **kwargs):
# if self.isEnabledFor(logging.DEBUG):
# self._log(logging.DEBUG, msg, args, **kwargs)
#
# def warning(self, msg, *args, **kwargs):
# if self.isEnabledFor(logging.WARNING):
# self._log(logging.WARNING, msg, args, **kwargs)
#
# def error(self, msg, *args, **kwargs):
# if self.isEnabledFor(logging.ERROR):
# self._log(logging.ERROR, msg, args, **kwargs, extra={"__logger_topic__": logger_topic})
#
# def critical(self, msg, *args, **kwargs):
# if self.isEnabledFor(logging.CRITICAL):
# self._log(logging.CRITICAL, msg, args, **kwargs)
#
# def get_logger():
# """
# Creates a rotating log
# """
# logging.setLoggerClass(EngineLoggerClass)
# __logger__ = logging.getLogger('')
#
# # setting the logger level
# __logger__.setLevel("INFO")
#
# # creating the format for the log
# log_formatter = "%(asctime)s-%(levelname)-s-[%(funcName)5s():%(lineno)s]-%(message)s"
# time_format = "%Y-%m-%d %H:%M:%S"
#
# # getting the path for the logger
# file_path = "logs"
#
# # setting the format
# formatter = logging.Formatter(log_formatter, time_format)
#
# # creating the folder if not exist
# if not os.path.exists(file_path):
# os.makedirs(file_path)
#
# # joining the path
# log_file = os.path.join(f"{file_path}/redundancy.log")
#
# # creating rotating file handler with max byte as 1
# temp_handler = RotatingFileHandler(log_file,
# maxBytes=1000000000,
# backupCount=5)
#
# # setting the formatter
# temp_handler.setFormatter(formatter)
#
# # setting the handler
# __logger__.addHandler(temp_handler)
#
# return __logger__
#
#
# logger = get_logger()
import
json
import
logging
import
os
from
logging.handlers
import
RotatingFileHandler
from
scripts.constants.app_config
import
LoggerConfigurations
def
get_logger
():
"""
Creates a rotating log
"""
__logger__
=
logging
.
getLogger
(
''
)
logger_topic
=
None
logging
.
trace
=
logging
.
DEBUG
-
5
logging
.
addLevelName
(
logging
.
DEBUG
-
5
,
"TRACE"
)
# setting the logger level
__logger__
.
setLevel
(
"INFO"
)
# creating the format for the log
log_formatter
=
"
%(asctime)
s-
%(levelname)-
s-[
%(funcName)5
s():
%(lineno)
s]-
%(message)
s"
time_format
=
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
class
EngineLoggerClass
(
logging
.
getLoggerClass
()):
def
__init__
(
self
,
name
):
super
()
.
__init__
(
name
)
def
trace
(
self
,
msg
,
*
args
,
**
kwargs
):
if
self
.
isEnabledFor
(
logging
.
trace
):
self
.
_log
(
logging
.
trace
,
msg
,
args
,
**
kwargs
)
def
info
(
self
,
msg
,
*
args
,
**
kwargs
):
if
self
.
isEnabledFor
(
logging
.
INFO
):
self
.
_log
(
logging
.
INFO
,
msg
,
args
,
**
kwargs
)
# getting the path for the logger
file_path
=
"logs"
def
debug
(
self
,
msg
,
*
args
,
**
kwargs
):
if
self
.
isEnabledFor
(
logging
.
DEBUG
):
self
.
_log
(
logging
.
DEBUG
,
msg
,
args
,
**
kwargs
)
# setting the format
formatter
=
logging
.
Formatter
(
log_formatter
,
time_format
)
def
warning
(
self
,
msg
,
*
args
,
**
kwargs
):
if
self
.
isEnabledFor
(
logging
.
WARNING
):
self
.
_log
(
logging
.
WARNING
,
msg
,
args
,
**
kwargs
)
# creating the folder if not exist
if
not
os
.
path
.
exists
(
file_path
):
os
.
makedirs
(
file_path
)
def
error
(
self
,
msg
,
*
args
,
**
kwargs
):
if
self
.
isEnabledFor
(
logging
.
ERROR
):
self
.
_log
(
logging
.
ERROR
,
msg
,
args
,
**
kwargs
,
extra
=
{
"__logger_topic__"
:
logger_topic
}
)
# joining the path
log_file
=
os
.
path
.
join
(
f
"{file_path}/redundancy.log"
)
def
critical
(
self
,
msg
,
*
args
,
**
kwargs
):
if
self
.
isEnabledFor
(
logging
.
CRITICAL
):
self
.
_log
(
logging
.
CRITICAL
,
msg
,
args
,
**
kwargs
)
# creating rotating file handler with max byte as 1
temp_handler
=
RotatingFileHandler
(
log_file
,
maxBytes
=
1000000000
,
backupCount
=
5
)
# setting the formatter
temp_handler
.
setFormatter
(
formatter
)
class
CustomRotatingFileHandler
(
RotatingFileHandler
):
def
emit
(
self
,
record
):
msg
=
self
.
format
(
record
)
try
:
is_eventhandler_present
=
any
(
loggers
.
get
(
"type"
)
==
"eventhandler"
for
loggers
in
LoggerConfigurations
.
HANDLERS
)
if
"$"
in
msg
and
(
"events_constants"
in
msg
or
"error_constants"
in
msg
)
and
not
is_eventhandler_present
:
error_json
=
{}
msg_list
=
msg
.
split
(
"$"
)
error_json
[
"Desc"
]
=
msg_list
[
2
]
.
split
(
':'
)[
0
]
try
:
error_json
[
"ErrorDetails"
]
=
json
.
loads
(
msg_list
[
3
]
.
replace
(
"'"
,
'"'
))
error_json
[
"PythonDesc"
]
=
msg_list
[
4
]
except
:
pass
error_return
=
f
"{error_json.get('Desc', '')} {error_json.get('PythonDesc', '')}"
.
strip
()
record
.
msg
=
error_return
record
.
exc_text
=
None
record
.
exc_info
=
None
super
()
.
emit
(
record
)
except
:
pass
def
get_logger
(
name
):
"""
Creates a rotating log
"""
time_format
=
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
log_level
=
LoggerConfigurations
.
LEVEL
logging
.
setLoggerClass
(
EngineLoggerClass
)
__logger__
=
logging
.
getLogger
(
name
)
__logger__
.
setLevel
(
log_level
.
strip
()
.
upper
())
debug_formatter
=
(
"
%(asctime)
s-
%(levelname)-3
s-"
"[
%(filename)5
s:"
"
%(lineno)
s] ->
%(message)
s"
)
formatter_string
=
(
"
%(asctime)
s -
%(levelname)-6
s -
%(name)
s -
%(levelname)3
s -
%(message)
s"
)
# setting the handler
__logger__
.
addHandler
(
temp_handler
)
if
log_level
.
strip
()
.
upper
()
==
log_level
:
formatter_string
=
debug_formatter
formatter
=
logging
.
Formatter
(
formatter_string
,
time_format
)
for
handler
in
LoggerConfigurations
.
HANDLERS
:
if
handler
[
"type"
]
.
lower
()
==
"rotatingfilehandler"
:
os
.
makedirs
(
handler
[
"file_path"
],
exist_ok
=
True
)
log_file
=
f
'{handler["file_path"]}/{name.replace(".", "_").replace("/", "_").replace(" ", "_")}.log'
temp_handler
=
CustomRotatingFileHandler
(
log_file
,
maxBytes
=
int
(
handler
[
"max_bytes"
]),
backupCount
=
int
(
handler
[
"back_up_count"
]),
)
temp_handler
.
setFormatter
(
formatter
)
if
handler
[
"type"
]
.
lower
()
==
"streamhandler"
or
os
.
environ
.
get
(
'ENABLE_CONSOLE_LOG'
,
default
=
False
):
temp_handler
=
logging
.
StreamHandler
()
temp_handler
.
setFormatter
(
formatter
)
__logger__
.
addHandler
(
temp_handler
)
return
__logger__
logger
=
get_logger
()
logger
=
get_logger
(
name
=
"application"
)
scripts/utilities/common_util.py
View file @
a2304e73
...
...
@@ -15,9 +15,8 @@ class CommonUtilities:
@
staticmethod
def
read_configs
(
file_path
):
file_data
=
None
logger
.
trace
(
f
"Entered in read_configs with file_path: {file_path}"
)
try
:
logger
.
info
(
"Entered in read_configs"
)
logger
.
info
(
f
"file_path: {file_path}"
)
with
open
(
file_path
,
'r'
)
as
file
:
if
file_path
.
endswith
(
"channel.yml"
):
file_data
=
json
.
load
(
file
)
...
...
@@ -27,15 +26,15 @@ class CommonUtilities:
file_data
=
file
.
read
()
except
Exception
as
e
:
logger
.
exception
(
f
"Error while reading configuration - {e}."
)
logger
.
info
(
"Exiting from read_configs"
)
logger
.
trace
(
"Exiting from read_configs"
)
return
file_data
@
staticmethod
def
ping_host
(
host
):
status
=
False
logger
.
trace
(
"Entered in the ping_host "
)
try
:
logger
.
info
(
"Entered in the ping_host "
)
logger
.
info
(
f
"host - {host}"
)
logger
.
trace
(
f
"host - {host}"
)
response
=
ping3
.
ping
(
host
,
timeout
=
2
)
if
response
:
logger
.
info
(
f
"Host {host} is reachable (Round-trip time: {response} ms)"
)
...
...
@@ -44,107 +43,106 @@ class CommonUtilities:
logger
.
error
(
f
"Host {host} is not reachable"
)
except
socket
.
error
as
e
:
logger
.
exception
(
f
"Error while trying to reach {host}: {e}"
)
logger
.
info
(
"Exiting from ping_host"
)
logger
.
trace
(
"Exiting from ping_host"
)
return
status
@
staticmethod
def
list_the_files
(
files_list
):
logger
.
trace
(
"Entered in list_the_files"
)
try
:
logger
.
info
(
"Entered in list_the_files"
)
return
files_list
.
split
(
","
)
except
Exception
as
e
:
logger
.
exception
(
f
"Exception occurred while listing the files - {e}."
)
logger
.
info
(
"Exiting from list_the_files"
)
logger
.
trace
(
"Exiting from list_the_files"
)
return
None
@
staticmethod
def
update_file
(
file_path
,
details
):
logger
.
info
(
"Entered in update_file"
)
logger
.
trace
(
"Entered in update_file"
)
try
:
with
open
(
file_path
,
'w'
)
as
file
:
file
.
write
(
details
)
except
Exception
as
e
:
logger
.
exception
(
f
" - {e}."
)
logger
.
info
(
"Exiting from update_file"
)
logger
.
trace
(
"Exiting from update_file"
)
@
staticmethod
def
jwt_decode
(
data
,
secret_key
):
logger
.
info
(
"Entering the jwt_decode"
)
logger
.
trace
(
"Entering the jwt_decode"
)
return
jwt
.
decode
(
data
,
secret_key
,
algorithms
=
[
"HS256"
])
def
fetch_device_details_from_lic
(
self
,
license_file_path
,
secret_key
):
logger
.
trace
(
"Entering in fetch_device_details_from_lic"
)
device_id
,
project_id
=
None
,
None
try
:
logger
.
info
(
"Entering in fetch_device_details_from_lic"
)
if
os
.
path
.
isfile
(
license_file_path
):
with
open
(
license_file_path
,
'r'
)
as
file
:
encoded_data
=
file
.
read
()
decoded_data
=
self
.
jwt_decode
(
encoded_data
,
secret_key
)
logger
.
info
(
"Exiting from fetch_device_details_from_lic"
)
logger
.
info
(
f
"ilens_device_id - {decoded_data['ilens_device_id']}, project_id - {decoded_data['project_id']}"
)
return
decoded_data
[
"ilens_device_id"
],
decoded_data
[
"project_id"
]
device_id
,
project_id
=
decoded_data
.
get
(
'ilens_device_id'
,
''
),
decoded_data
.
get
(
'project_id'
,
''
)
except
Exception
as
e
:
logger
.
exception
(
f
"Exception occurred while fetching device details - {e}."
)
logger
.
info
(
"Exiting from fetch_device_details_from_lic with exception
"
)
logger
.
info
(
f
"ilens_device_id - {None}, project_id - {None}
"
)
return
None
,
None
logger
.
info
(
f
"ilens_device_id - {device_id}, project_id - {project_id}
"
)
logger
.
trace
(
"Exiting from fetch_device_details_from_lic
"
)
return
device_id
,
project_id
@
staticmethod
def
fetch_data_from_conf
(
conf_file_path
):
logger
.
trace
(
"Entering in fetch_data_from_conf"
)
device_id
,
project_id
=
None
,
None
try
:
logger
.
info
(
"Entering in fetch_data_from_conf"
)
config
=
ConfigParser
()
config
.
read
(
conf_file_path
)
device_id
=
config
.
get
(
"AGENT"
,
"agent_id"
)
project_id
=
config
.
get
(
"AGENT"
,
"registration_project_id"
)
logger
.
info
(
f
"ilens_device_id - {device_id}, project_id - {project_id}"
)
return
device_id
,
project_id
except
Exception
as
e
:
logger
.
exception
(
f
"Exception occurred while fetching device details - {e}."
)
return
None
logger
.
info
(
f
"ilens_device_id - {device_id}, project_id - {project_id}"
)
logger
.
trace
(
f
"ilens_device_id - {device_id}, project_id - {project_id}"
)
return
device_id
,
project_id
class
WindowsUtilities
:
def
find_ilens_folder
(
self
):
logger
.
trace
(
"Entering in find_ilens_folder"
)
try
:
logger
.
info
(
"Entering in find_ilens_folder"
)
for
drive_letter
in
self
.
get_windows_drives
():
folder_path
=
os
.
path
.
join
(
drive_letter
,
"iLens"
)
if
os
.
path
.
exists
(
folder_path
)
and
os
.
path
.
isdir
(
folder_path
):
logger
.
trace
(
"Exiting from find_ilens_folder"
)
return
folder_path
except
Exception
as
e
:
logger
.
exception
(
f
"{e}."
)
logger
.
info
(
"Exiting from find_ilens_folder"
)
logger
.
trace
(
"Exiting from find_ilens_folder"
)
return
None
@
staticmethod
def
get_windows_drives
():
logger
.
trace
(
"Entering in get_windows_drives"
)
try
:
logger
.
info
(
"Entering in get_windows_drives"
)
drives
=
[]
for
partition
in
psutil
.
disk_partitions
(
all
=
True
):
if
"cdrom"
not
in
partition
.
opts
and
partition
.
fstype
!=
""
:
drives
.
append
(
partition
.
device
)
logger
.
info
(
"Exiting from get_windows_drives"
)
logger
.
trace
(
"Exiting from get_windows_drives"
)
return
drives
except
Exception
as
e
:
logger
.
exception
(
f
"Exception occurred while getting windows drives - {e}."
)
logger
.
info
(
"Exiting from get_windows_drives"
)
logger
.
trace
(
"Exiting from get_windows_drives"
)
return
None
def
modify_file_path
(
self
,
file_path
):
logger
.
trace
(
"Entering in modify_file_path"
)
modified_path
=
None
try
:
logger
.
info
(
"Entering in modify_file_path"
)
drive
=
(
self
.
find_ilens_folder
())
.
replace
(
"iLens"
,
""
)
ilens_path
=
[
directory
for
directory
in
((
file_path
.
split
(
":"
))[
1
])
.
split
(
"
\\
"
)
if
directory
]
ilens_path
.
insert
(
0
,
drive
)
modified_path
=
os
.
path
.
join
(
*
ilens_path
)
logger
.
info
(
"Exiting from modify_file_path"
)
return
modified_path
except
Exception
as
e
:
logger
.
exception
(
f
"Exception occurred while altering the file path - {e}."
)
logger
.
info
(
"Exiting from modify_file_path"
)
return
None
logger
.
trace
(
"Exiting from modify_file_path"
)
return
modified_path
common_utilities
=
CommonUtilities
()
scripts/utilities/communication_util.py
View file @
a2304e73
...
...
@@ -3,7 +3,7 @@ import json
import
socket
from
datetime
import
datetime
import
requests
import
httpx
from
scripts.constants.app_config
import
app_config
from
scripts.constants.app_constants
import
constants
,
services
...
...
@@ -14,16 +14,15 @@ from scripts.utilities.common_util import common_utilities
from
scripts.utilities.service_util
import
service_operations
def
post_data
(
json_data
,
endpoint
=
"receive_data"
):
def
post_data
(
json_data
,
logger
,
endpoint
=
"receive_data"
):
try
:
logger
.
info
(
"Entered in post_data"
)
logger
.
trace
(
"Entered in post_data"
)
monitoring_endpoint
=
f
"http://{app_config.host_ip}:{app_config.port_no}/{endpoint}"
headers
=
{
'Content-Type'
:
'application/json'
}
response
=
requests
.
post
(
monitoring_endpoint
,
data
=
json
.
dumps
(
json_data
),
headers
=
headers
)
response
=
httpx
.
post
(
monitoring_endpoint
,
data
=
json
.
dumps
(
json_data
),
headers
=
headers
)
if
response
.
status_code
==
200
:
response_data
=
response
.
content
.
decode
()
print
(
response_data
)
logger
.
info
(
response_data
)
logger
.
trace
(
f
"response_data: {response_data}"
)
if
endpoint
==
"fetch_device_details"
:
DataPool
.
data_pool
=
[]
response_json
=
(
json
.
loads
(
response_data
))
.
get
(
"action"
,
{})
...
...
@@ -35,26 +34,26 @@ def post_data(json_data, endpoint="receive_data"):
"file_name"
:
file_name
,
"data"
:
file_data
}
post_data
(
response_json
)
post_data
(
response_json
,
logger
)
post_events
(
event_code
=
events_constants
.
pipeline_mismatch
)
if
response_json
.
get
(
"acquisition_status"
):
Counter
.
stop_counter
+=
1
if
(
app_config
.
is_data_source
and
Counter
.
stop_counter
>=
app_config
.
acquisition_restart_frequency
and
service_operations
.
check_ilens_client_status
()):
and
service_operations
.
check_ilens_client_status
(
logger
)):
if
service_operations
.
stop_service
(
services
.
acquisition_engine
):
post_events
(
event_code
=
events_constants
.
primary_acq_stopped
)
else
:
post_events
(
event_code
=
events_constants
.
primary_acq_stopping_failed
)
else
:
Counter
.
stop_counter
=
0
except
requests
.
RequestException
as
e
:
print
(
f
"# Error sending JSON data: {str(e)}"
)
except
Exception
as
e
:
logger
.
exception
(
f
"# Error sending JSON data: {str(e)}"
)
logger
.
trace
(
"Exiting from post_data"
)
def
post_events
(
event_code
,
module_name
=
None
):
try
:
logger
.
info
(
"Entered in post_events"
)
logger
.
trace
(
"Entered in post_events"
)
headers
=
{
'Content-Type'
:
'application/json'
}
event_json
=
events_constants
.
payload_template
event_json
[
"timestamp"
]
=
int
((
datetime
.
now
()
.
timestamp
())
*
1000
)
...
...
@@ -74,7 +73,7 @@ def post_events(event_code, module_name=None):
sender_socket
.
close
()
elif
app_config
.
event_uploader_type
==
"http"
:
event_endpoint
=
f
"{app_config.event_uploader_host_ip}{constants.event_topic}"
response
=
requests
.
post
(
response
=
httpx
.
post
(
event_endpoint
,
data
=
json
.
dumps
(
event_json
),
headers
=
headers
,
...
...
@@ -82,8 +81,6 @@ def post_events(event_code, module_name=None):
verify
=
False
)
if
response
.
status_code
==
200
:
print
(
response
.
content
.
decode
())
logger
.
info
(
response
.
content
.
decode
())
except
requests
.
RequestException
as
e
:
print
(
f
"# Error sending Events: {str(e)}"
)
except
Exception
as
e
:
logger
.
exception
(
f
"# Error sending Events: {str(e)}"
)
scripts/utilities/mqtt_subscription.py
→
scripts/utilities/mqtt_subscription
_util
.py
View file @
a2304e73
...
...
@@ -26,8 +26,8 @@ class MQTTSub(object):
logger
.
info
(
"MQTT : Connected with result code "
+
str
(
rc
))
def
on_message
(
self
,
client
,
userdata
,
msg
):
logger
.
trace
(
"Entering in mqtt - on_message"
)
try
:
logger
.
info
(
"Entering in mqtt - on_message"
)
temp_data_pool
=
[]
message
=
json
.
loads
(
msg
.
payload
.
decode
())
logger
.
info
(
f
"message - {message}"
)
...
...
@@ -37,6 +37,6 @@ class MQTTSub(object):
for
key
,
value
in
data
.
items
():
temp_data_pool
.
append
(
int
(
value
.
get
(
'dq'
)))
DataPool
.
data_pool
=
temp_data_pool
logger
.
info
(
"Exiting from mqtt - on_message"
)
except
Exception
as
es
:
logger
.
error
(
f
'Exception while fetching data : {es}'
)
logger
.
trace
(
"Exiting from mqtt - on_message"
)
scripts/utilities/service_util.py
View file @
a2304e73
...
...
@@ -9,7 +9,7 @@ class ServiceOperations:
@
staticmethod
def
is_service_enabled
(
service_name
):
try
:
logger
.
info
(
"Entering in is_service_enabled"
)
logger
.
trace
(
"Entering in is_service_enabled"
)
output
=
subprocess
.
check_output
([
"sc"
,
"query"
,
service_name
],
universal_newlines
=
True
)
if
"STATE"
in
output
and
"START_TYPE"
in
output
:
state_line
=
None
...
...
@@ -24,12 +24,12 @@ class ServiceOperations:
return
state
==
"4"
and
start_type
==
"2"
except
subprocess
.
CalledProcessError
:
pass
logger
.
info
(
"Exiting from is_service_enabled"
)
logger
.
trace
(
"Exiting from is_service_enabled"
)
return
False
def
restart_service
(
self
,
service_name
):
try
:
logger
.
info
(
"Entering in restart_service"
)
logger
.
trace
(
"Entering in restart_service"
)
if
constants
.
is_windows
:
if
not
self
.
is_service_enabled
(
service_name
):
subprocess
.
run
([
"sc"
,
"config"
,
service_name
,
"start=auto"
],
check
=
True
)
...
...
@@ -55,18 +55,18 @@ class ServiceOperations:
return
False
except
subprocess
.
CalledProcessError
as
e
:
logger
.
exception
(
f
"Failed to restart the service '{service_name}'. Error: {e}"
)
logger
.
info
(
"Exiting from restart_service"
)
logger
.
trace
(
"Exiting from restart_service"
)
return
False
def
stop_service
(
self
,
service_name
):
try
:
logger
.
info
(
"Entering in stop_service"
)
logger
.
trace
(
"Entering in stop_service"
)
if
constants
.
is_windows
:
subprocess
.
run
([
"sc"
,
"stop"
,
service_name
],
check
=
True
)
if
self
.
is_service_enabled
(
service_name
):
subprocess
.
run
([
"sc"
,
"config"
,
service_name
,
"start=disabled"
],
check
=
True
)
time
.
sleep
(
5
)
logger
.
info
(
"Exiting from stop_service in windows as true"
)
logger
.
trace
(
"Exiting from stop_service in windows as true"
)
return
True
else
:
subprocess
.
run
([
"sudo"
,
"systemctl"
,
"disable"
,
service_name
],
check
=
True
)
...
...
@@ -77,20 +77,20 @@ class ServiceOperations:
text
=
True
,
)
if
result
.
returncode
==
0
:
logger
.
info
(
"Exiting from stop_service in linux as false - service doesn't stopped"
)
logger
.
trace
(
"Exiting from stop_service in linux as false - service doesn't stopped"
)
return
False
else
:
logger
.
info
(
"Exiting from stop_service in linux as true - service stopped"
)
logger
.
trace
(
"Exiting from stop_service in linux as true - service stopped"
)
return
True
except
Exception
as
e
:
logger
.
exception
(
f
"Exception occurred while stopping {service_name} - {e}."
)
logger
.
info
(
"Exiting from stop_service"
)
logger
.
trace
(
"Exiting from stop_service"
)
return
False
@
staticmethod
def
check_ilens_client_status
():
def
check_ilens_client_status
(
logger
):
try
:
logger
.
info
(
"Entering in check_ilens_client_status"
)
logger
.
trace
(
"Entering in check_ilens_client_status"
)
if
constants
.
is_windows
:
output
=
subprocess
.
check_output
(
[
"sc"
,
"query"
,
services
.
acquisition_engine
],
universal_newlines
=
True
...
...
@@ -98,9 +98,9 @@ class ServiceOperations:
if
"STATE"
in
output
:
for
line
in
output
.
splitlines
():
if
"STATE"
in
line
and
"RUNNING"
in
line
:
logger
.
info
(
"Exiting from check_ilens_client_status in windows as true - service is active."
)
logger
.
trace
(
"Exiting from check_ilens_client_status in windows as true - service is active."
)
return
True
logger
.
info
(
"Exiting from check_ilens_client_status in windows as false - service is inactive"
)
logger
.
trace
(
"Exiting from check_ilens_client_status in windows as false - service is inactive"
)
return
False
else
:
result
=
subprocess
.
run
(
...
...
@@ -109,10 +109,10 @@ class ServiceOperations:
text
=
True
,
)
if
result
.
returncode
==
0
:
logger
.
info
(
"Exiting from check_ilens_client_status in linux as true - service is active"
)
logger
.
trace
(
"Exiting from check_ilens_client_status in linux as true - service is active"
)
return
True
else
:
logger
.
info
(
"Exiting from check_ilens_client_status in linux as false - service is inactive"
)
logger
.
trace
(
"Exiting from check_ilens_client_status in linux as false - service is inactive"
)
return
False
except
Exception
as
e
:
logger
.
exception
(
f
"Exception occurred while check_ilens_client_status - {e}"
)
...
...
@@ -120,7 +120,7 @@ class ServiceOperations:
@
staticmethod
def
service_name_mapper
(
file_path
):
try
:
logger
.
info
(
f
"Entering in service_name_mapper with filepath - {file_path}."
)
logger
.
trace
(
f
"Entering in service_name_mapper with filepath - {file_path}."
)
service_modules
=
{
".conf"
:
services
.
ilens_agent
,
"monitoring_engine"
:
services
.
monitoring_engine
,
...
...
@@ -132,7 +132,7 @@ class ServiceOperations:
}
for
module_keyword
,
module_service
in
service_modules
.
items
():
if
module_keyword
in
file_path
:
logger
.
info
(
f
"Exiting from service_name_mapper for {module_service} module."
)
logger
.
trace
(
f
"Exiting from service_name_mapper for {module_service} module."
)
return
module_service
except
Exception
as
e
:
logger
.
exception
(
f
"Exception occurred while mapping service name for module - {e}"
)
...
...
scripts/utilities/udp_subscription.py
→
scripts/utilities/udp_subscription
_util
.py
View file @
a2304e73
...
...
@@ -12,7 +12,7 @@ class UDPSub:
receiver_socket
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_DGRAM
)
receiver_socket
.
bind
((
host
,
int
(
port
)))
while
True
:
logger
.
info
(
"Entered in UDPSub"
)
logger
.
trace
(
"Entered in UDPSub"
)
temp_pool
=
[]
data_received
,
_
=
receiver_socket
.
recvfrom
(
buffer_size
)
decoded_message
=
json
.
loads
(
base64
.
b64decode
(
data_received
)
.
decode
())
...
...
@@ -23,6 +23,6 @@ class UDPSub:
logger
.
info
(
f
"temp_pool : {temp_pool}"
)
DataPool
.
data_pool
=
temp_pool
logger
.
info
(
f
"data added - {DataPool.data_pool}"
)
logger
.
info
(
"Exiting from UDPSub"
)
logger
.
trace
(
"Exiting from UDPSub"
)
except
Exception
as
e
:
logger
.
exception
(
f
"Exception occurred while subscribing local udp uploader - {e}."
)
secondary_service.py
View file @
a2304e73
...
...
@@ -7,14 +7,18 @@ from fastapi import FastAPI
from
scripts.constants.api_endpoints
import
api_endpoints
from
scripts.constants.app_variables
import
TimeVariable
from
scripts.constants.app_config
import
app_config
from
scripts.handlers.device_handler
import
device_handler
from
scripts.handlers.files_handler
import
FilesHandler
from
scripts.handlers.redundancy_handler
import
RedundancyHandler
from
scripts.logging.logger
import
logger
from
scripts.core.handlers.device_handler
import
DeviceHandler
from
scripts.core.handlers.files_handler
import
FilesHandler
from
scripts.core.handlers.redundancy_handler
import
RedundancyHandler
from
scripts.logging.logger
import
get_logger
logger
=
get_logger
(
"secondary_redundancy"
)
app
=
FastAPI
()
files_handler
=
FilesHandler
()
redundancy_handler
=
RedundancyHandler
()
files_handler
=
FilesHandler
(
logger
)
redundancy_handler
=
RedundancyHandler
(
logger
)
device_handler
=
DeviceHandler
(
logger
)
api_hit_thread
=
threading
.
Thread
(
target
=
device_handler
.
check_last_hit
)
...
...
@@ -32,7 +36,6 @@ async def receive_data(data: dict):
file_path
=
files_handler
.
is_file_path_need
(
given_path
)
config_data
=
data
.
get
(
"data"
)
files_handler
.
sync_config
(
file_path
,
config_data
)
print
(
"File updated successfully."
)
logger
.
info
(
"File updated successfully."
)
response
[
"status"
]
=
"Success"
response
[
"message"
]
=
"File updating successful."
...
...
@@ -51,7 +54,6 @@ async def receive_config(data: dict):
for
given_path
,
config_data
in
data
.
items
():
file_path
=
files_handler
.
is_file_path_need
(
given_path
)
files_handler
.
sync_config
(
file_path
,
config_data
)
print
(
"Configuration synced successfully."
)
logger
.
info
(
"Configuration synced successfully."
)
response
[
"status"
]
=
"Success"
response
[
"message"
]
=
"Configuration syncing successful."
...
...
@@ -69,7 +71,6 @@ async def fetch_primary_device_details(data: dict):
}
try
:
response_data
=
f
"Heartbeat message - {data}"
print
(
response_data
)
logger
.
info
(
response_data
)
logger
.
info
(
f
"TimeVariable.api_last_hit_time in receiving hb entry- {TimeVariable.api_last_hit_time}"
)
TimeVariable
.
api_last_hit_time
=
datetime
.
now
()
...
...
@@ -87,9 +88,7 @@ async def fetch_primary_device_details(data: dict):
if
__name__
==
"__main__"
:
try
:
print
(
"-----------------------------------------Starting Service-----------------------------------------"
)
logger
.
info
(
"-----------------------------------------Starting Service-----------------------------------------"
)
uvicorn
.
run
(
app
,
host
=
app_config
.
host_ip
,
port
=
app_config
.
port_no
)
except
KeyboardInterrupt
:
print
(
"-----------------------------------------Service Stopped-----------------------------------------"
)
logger
.
info
(
"---------------------------------------Service Stopped---------------------------------------"
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment