Commit 568194e3 authored by kiran.ak's avatar kiran.ak

-Initial Commit

parents
[LOG]
base_path = logs/
log_level = DEBUG
max_bytes = 100000000
back_up_count = 5
name = motor-controller
[MQTT]
host = iot-hub.ilens.io
port = 1883
topic = ilens/motor
[MOTOR]
pin = 16
status_file = motor_status.json
[SITE]
tag = site_175$dept_169$line_270$equipment_399$tag_412
\ No newline at end of file
from scripts.core.mqtt_handler import live_data
from threading import Thread
from scripts.core.GPIO_controller import turn_on
from scripts.util.logger import logger
from scripts.util.file_util import file_write
import time
class motorThread(Thread):
def __init__(self):
super().__init__()
self.prev_time = int(time.time()) * 1000
def run(self):
while True:
current_time = int(time.time()) * 1000
# TODO: TIME BASED LOGIC TO SET prev_totalized_value = 0
start = time.strptime(time.strftime(
'%Y-%m-%d %H:%M:%S', time.localtime(self.prev_time / 1000)), '%Y-%m-%d %H:%M:%S')
end = time.strptime(time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(current_time / 1000)), '%Y-%m-%d %H:%M:%S')
start_day = start.tm_mday
end_day = end.tm_mday
if start_day != end_day:
logger.info('Turning on motor at {}'.format(str(time.time())))
turn_on()
else:
pass
self.prev_time = current_time
time.sleep(1)
class mqttThread(Thread):
def run(self):
live_data()
if __name__ == '__main__':
file_write(content={"status": "on"})
motor = motorThread()
motor.start()
mqtt = mqttThread()
mqtt.start()
{
"status": "on"
}
\ No newline at end of file
from configparser import ConfigParser
config_file_path = r"conf/application.conf"
parser = ConfigParser()
parser.read(config_file_path)
def read_config():
config_json = dict()
config_json["file_path"] = parser.get("LOG", "base_path")
config_json["level"] = parser.get("LOG", "log_level")
config_json["name"] = parser.get("LOG", "name")
config_json["max_bytes"] = int(parser.get("LOG", "max_bytes"))
config_json["back_up_count"] = int(parser.get("LOG", "back_up_count"))
return config_json
def read_application_config():
config_json = dict()
config_json['mqtt'] = {
"host": parser.get("MQTT", "host"),
"port": int(parser.get("MQTT", "port")),
"topic": parser.get("MQTT", "topic"),
}
config_json['motor'] = {
"pin": int(parser.get("MOTOR", "pin")),
"status_file": parser.get("MOTOR", "status_file")
}
config_json['site'] = {
"tag": parser.get("SITE", "tag")
}
return config_json
LOGGING_JSON = read_config()
application_config = read_application_config()
import RPi.GPIO as GPIO
import time
from scripts.config.config_reader import application_config
from scripts.util.file_util import file_write
from scripts.util.logger import logger
def turn_off():
pin = int(application_config['motor']['pin'])
GPIO.setmode(GPIO.BOARD)
GPIO.setup(pin, GPIO.OUT)
GPIO.output(pin, GPIO.HIGH)
file_write(content={"status": "off"})
logger.info('json file update with off')
def turn_on():
pin = int(application_config['motor']['pin'])
GPIO.setmode(GPIO.BOARD)
GPIO.setup(pin, GPIO.OUT)
GPIO.output(pin, GPIO.LOW)
file_write(content={"status": "on"})
logger.info('json file update with on')
import paho.mqtt.client as mqtt
import json
import time
from scripts.config.config_reader import application_config
from scripts.core.GPIO_controller import turn_off
from scripts.util.logger import logger
from scripts.util.file_util import file_write, file_read
def on_connect(client, userdata, flags, rc):
logger.info("MQTT : Connected with result code " + str(rc))
def on_message(client, userdata, msg):
motor_status = file_read()
try:
payload = msg.payload
payload = payload.decode('utf-8')
logger.info('received {} at {}'.format(payload, str(time.time())))
if 'TriggerMotor' in payload and motor_status['status'] == 'on':
logger.info('Turning off motor at {}'.format(str(time.time())))
turn_off()
except Exception as es:
logger.error('Exception while fetching data : {}'.format(str(es)))
def live_data():
try:
client.connect(host=application_config['mqtt']['host'],
port=int(application_config['mqtt']['port']))
client.subscribe(application_config['mqtt']['topic'])
client.loop_start()
except Exception as es:
logger.error('exception while connection to mqtt : {}'.format(str(es)))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
import json
from scripts.config.config_reader import application_config
def file_write(content):
fileObj = open(application_config['motor']['status_file'], 'w')
fileObj.write(json.dumps(content))
def file_read():
fileObj = open(application_config['motor']['status_file'], 'r')
content = json.loads(fileObj.read())
return content
import logging
import os
from logging.handlers import RotatingFileHandler
from scripts.config.config_reader import LOGGING_JSON
def get_logger():
"""
Creates a rotating log
"""
if not os.path.exists(LOGGING_JSON["file_path"]):
os.makedirs(LOGGING_JSON["file_path"])
__logger__ = logging.getLogger('')
__logger__.setLevel(LOGGING_JSON["level"].upper())
log_formatter = '%(asctime)s - %(levelname)-6s - %(message)s'
time_format = "%Y-%m-%d %H:%M:%S"
formatter = logging.Formatter(log_formatter, time_format)
log_file = os.path.join(LOGGING_JSON["file_path"] + LOGGING_JSON["name"] + '.log')
temp_handler = RotatingFileHandler(log_file,
maxBytes=LOGGING_JSON["max_bytes"],
backupCount=LOGGING_JSON["back_up_count"])
temp_handler.setFormatter(formatter)
__logger__.addHandler(temp_handler)
return __logger__
# import logging.handlers
# from logging.handlers import RotatingFileHandler
#
#
# def get_logger():
# log_file = os.path.join(LOGGING_JSON["file_path"] + LOGGING_JSON["name"] + '.log')
# if not os.path.exists(LOGGING_JSON["file_path"]):
# os.makedirs(LOGGING_JSON["file_path"])
# __logger__ = logging.getLogger(LOGGING_JSON["name"] + '_Client')
# __logger__.propagate = False
# formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s', "%Y-%m-%d %H:%M:%S")
# __logger__.setLevel(logging.DEBUG)
#
# handler = RotatingFileHandler(log_file,
# maxBytes=int(LOGGING_JSON["max_bytes"]),
# backupCount=int(LOGGING_JSON["back_up_count"]))
# handler.setFormatter(formatter)
# if __logger__.hasHandlers():
# __logger__.handlers.clear()
#
# __logger__.addHandler(handler)
# __logger__.debug('Logger Initialized')
#
# return __logger__
logger = get_logger()
Totalizer Calculator
\ No newline at end of file
[LOG]
base_path = logs/
log_level = DEBUG
max_bytes = 100000000
back_up_count = 5
name = totalizer
{
"redis":{
"host": "192.168.0.209",
"port": 6379,
"tags_data": 2
},
"mqtt":{
"host": "192.168.0.209",
"port": 1883,
"topic1": "ilens/monitor/live/site_174",
"topic2": "ilens/monitor/live/site_168",
"publish_topic": "ilens/monitor/live/"
},
"kairos":{
"start_time": 1609459200,
"end_time": 0,
"reader_url": "http://192.168.0.217:8080/api/v1/datapoints/query",
"writer_url": "http://192.168.0.217:8080/api/v1/datapoints"
},
"kairos_tag": "true",
"redis_tag": "false"
}
\ No newline at end of file
{
"site_171$dept_164$line_265$equipment_348$tag_322": "site_171$dept_164$line_265$equipment_348$tag_390",
"site_171$dept_164$line_265$equipment_390$tag_322": "site_171$dept_164$line_265$equipment_390$tag_390",
"site_171$dept_164$line_265$equipment_347$tag_322": "site_171$dept_164$line_265$equipment_347$tag_390",
"site_171$dept_164$line_265$equipment_349$tag_322": "site_171$dept_164$line_265$equipment_349$tag_390",
"site_171$dept_164$line_265$equipment_360$tag_322": "site_171$dept_164$line_265$equipment_360$tag_390",
"site_171$dept_164$line_265$equipment_361$tag_322": "site_171$dept_164$line_265$equipment_361$tag_390",
"site_171$dept_164$line_265$equipment_362$tag_322": "site_171$dept_164$line_265$equipment_362$tag_390",
"site_171$dept_164$line_265$equipment_363$tag_322": "site_171$dept_164$line_265$equipment_363$tag_390",
"site_171$dept_164$line_265$equipment_364$tag_322": "site_171$dept_164$line_265$equipment_364$tag_390",
"site_171$dept_164$line_265$equipment_365$tag_322": "site_171$dept_164$line_265$equipment_365$tag_390",
"site_171$dept_164$line_265$equipment_366$tag_322": "site_171$dept_164$line_265$equipment_366$tag_390",
"site_171$dept_164$line_265$equipment_367$tag_322": "site_171$dept_164$line_265$equipment_367$tag_390",
"site_171$dept_164$line_265$equipment_368$tag_322": "site_171$dept_164$line_265$equipment_368$tag_390",
"site_171$dept_164$line_265$equipment_369$tag_322": "site_171$dept_164$line_265$equipment_369$tag_390",
"site_171$dept_164$line_265$equipment_370$tag_322": "site_171$dept_164$line_265$equipment_370$tag_390",
"site_171$dept_164$line_265$equipment_371$tag_322": "site_171$dept_164$line_265$equipment_371$tag_390",
"site_171$dept_164$line_265$equipment_372$tag_322": "site_171$dept_164$line_265$equipment_372$tag_390",
"site_171$dept_164$line_265$equipment_373$tag_322": "site_171$dept_164$line_265$equipment_373$tag_390",
"site_171$dept_164$line_265$equipment_374$tag_322": "site_171$dept_164$line_265$equipment_374$tag_390",
"site_171$dept_164$line_265$equipment_375$tag_322": "site_171$dept_164$line_265$equipment_375$tag_390",
"site_171$dept_164$line_265$equipment_376$tag_322": "site_171$dept_164$line_265$equipment_376$tag_390",
"site_171$dept_164$line_265$equipment_382$tag_322": "site_171$dept_164$line_265$equipment_382$tag_390",
"site_171$dept_164$line_265$equipment_383$tag_322": "site_171$dept_164$line_265$equipment_383$tag_390",
"site_171$dept_164$line_265$equipment_388$tag_322": "site_171$dept_164$line_265$equipment_388$tag_390",
"site_171$dept_164$line_265$equipment_389$tag_322": "site_171$dept_164$line_265$equipment_389$tag_390",
"site_171$dept_163$line_264$equipment_392$tag_322": "site_171$dept_163$line_264$equipment_392$tag_390",
"site_171$dept_163$line_264$equipment_393$tag_322": "site_171$dept_163$line_264$equipment_393$tag_390",
"site_171$dept_163$line_264$equipment_346$tag_322": "site_171$dept_163$line_264$equipment_346$tag_390",
"site_171$dept_163$line_264$equipment_345$tag_322": "site_171$dept_163$line_264$equipment_345$tag_390",
"site_171$dept_163$line_264$equipment_344$tag_322": "site_171$dept_163$line_264$equipment_344$tag_390",
"site_171$dept_163$line_264$equipment_381$tag_322": "site_171$dept_163$line_264$equipment_381$tag_390",
"site_171$dept_163$line_264$equipment_385$tag_322": "site_171$dept_163$line_264$equipment_385$tag_390",
"site_171$dept_163$line_264$equipment_384$tag_322": "site_171$dept_163$line_264$equipment_384$tag_390",
"site_171$dept_163$line_264$equipment_386$tag_322": "site_171$dept_163$line_264$equipment_386$tag_390",
"site_171$dept_163$line_264$equipment_387$tag_322": "site_171$dept_163$line_264$equipment_387$tag_390",
"site_171$dept_163$line_264$equipment_391$tag_322": "site_171$dept_163$line_264$equipment_391$tag_390",
"site_171$dept_165$line_266$equipment_350$tag_322": "site_171$dept_165$line_266$equipment_350$tag_390",
"site_171$dept_165$line_266$equipment_351$tag_322": "site_171$dept_165$line_266$equipment_351$tag_390"
}
\ No newline at end of file
{
"site_174$dept_168$line_269$equipment_377$tag_322": "site_174$dept_168$line_269$equipment_377$tag_390",
"site_174$dept_168$line_269$equipment_378$tag_322": "site_174$dept_168$line_269$equipment_378$tag_390"
}
\ No newline at end of file
{
"site_168$dept_162$line_262$equipment_342$tag_380": "site_168$dept_162$line_262$equipment_342$tag_390",
"site_168$dept_162$line_263$equipment_343$tag_380": "site_168$dept_162$line_263$equipment_343$tag_390",
"site_174$dept_168$line_269$equipment_377$tag_322": "site_174$dept_168$line_269$equipment_377$tag_390",
"site_174$dept_168$line_269$equipment_378$tag_322": "site_174$dept_168$line_269$equipment_378$tag_390"
}
\ No newline at end of file
{
"site_168$dept_162$line_262$equipment_342$tag_380": "site_168$dept_162$line_262$equipment_342$tag_390",
"site_168$dept_162$line_263$equipment_343$tag_380": "site_168$dept_162$line_263$equipment_343$tag_390"
}
\ No newline at end of file
from configparser import ConfigParser
config_file_path = r"conf/application.conf"
parser = ConfigParser()
parser.read(config_file_path)
def read_config():
config_json = dict()
config_json["file_path"] = parser.get("LOG", "base_path")
config_json["level"] = parser.get("LOG", "log_level")
config_json["name"] = parser.get("LOG", "name")
config_json["max_bytes"] = int(parser.get("LOG", "max_bytes"))
config_json["back_up_count"] = int(parser.get("LOG", "back_up_count"))
return config_json
LOGGING_JSON = read_config()
import json
def read_application_json():
try:
app_obj = open('conf/application.json')
application_conf = json.loads(app_obj.read())
return application_conf
except Exception as es:
print(f'Exception in reading tag_list.json : {es}')
def read_tags_json():
try:
tags_obj = open('conf/tags_list.json')
tags = json.loads(tags_obj.read())
return tags
except Exception as es:
print(f'Exception in reading tag_list.json : {es}')
tags_list = read_tags_json()
application_config = read_application_json()
prev_tag_data = dict()
live_tag_data = []
import requests
import json
import pandas as pd
import time
from scripts.constants.tag_constants import prev_tag_data
from scripts.config.json_reader import tags_list, application_config
from scripts.util.logger import logger
class kairos_utils(object):
def __init__(self):
self.data_arr = []
self.totaliser_tag = tags_list
self.kairos_url = application_config['kairos']['reader_url']
@staticmethod
def create_query_max_one_min(start_time, end_time, tag_list):
query = {
"metrics": [
{
"tags": {
"c3": tag_list
},
"name": "ilens.live_data.raw",
"group_by": [
{
"name": "tag",
"tags": [
"c3"
]
}
],
"aggregators": [
{
"name": "max",
"sampling": {
"value": "1",
"unit": "minutes"
},
"align_start_time": True
}
]
}
],
"plugins": [],
"cache_time": 0,
"start_absolute": start_time,
"end_absolute": end_time
}
return query
def fetch_datapoints_from_kairos(self, query):
response = requests.post(self.kairos_url, json=query, timeout=600)
status_code = response.status_code
logger.info(f"Received status code {status_code} from kairos")
if status_code in [200, 204]:
return response.json()
else:
return dict()
@staticmethod
def create_kairos_df(response_data):
"""
Definition for creating kairos DataFrame
"""
master_df = pd.DataFrame(columns=["timestamp"])
for each_response in response_data["queries"]:
if each_response["sample_size"] == 0:
continue
for each_result in each_response["results"]:
if not each_result["tags"]:
selected_hierarchy = each_result["group_by"][0]["group"]["c3"]
df = pd.DataFrame(columns=["timestamp", "value"])
new_column_name = selected_hierarchy
df.rename(columns={"value": new_column_name}, inplace=True)
master_df = master_df.merge(
df, on='timestamp', how='outer')
continue
kairos_data = each_result["values"]
selected_hierarchy = each_result["group_by"][0]["group"]["c3"]
df = pd.DataFrame(kairos_data, columns=["timestamp", "value"])
new_column_name = selected_hierarchy
df.rename(columns={"value": new_column_name}, inplace=True)
master_df = master_df.merge(df, on='timestamp', how='outer')
master_df.set_index("timestamp", inplace=True)
master_df.sort_index(inplace=True)
return master_df
def write_template(self, tag_id, value, timestamp):
temp_json = {
"name": "ilens.live_data.raw",
"datapoints": [[int(timestamp), value]],
"tags": {
"c3": self.totaliser_tag[tag_id],
"c1": self.totaliser_tag[tag_id].split("$")[0],
"c5": self.totaliser_tag[tag_id].split("$")[-1]
}
}
self.data_arr.append(temp_json)
def kairos_write(self):
response = requests.post("http://192.168.0.217:8080/api/v1/datapoints",
data=json.dumps(self.data_arr),
headers={'content-type': 'application/json'})
self.data_arr = []
logger.info("Pushed raw data to KairosDB with status code:" +
str(response.status_code))
def totalizer_calculator():
logger.info('KAIROS')
kairos_obj = kairos_utils()
start_time = int(1609459200) * 1000
end_time = int(time.time()) * 1000
for tag in tags_list:
logger.info(f'TAG : {tag}')
query = kairos_obj.create_query_max_one_min(start_time, end_time, tag)
data = kairos_obj.fetch_datapoints_from_kairos(query)
data_frame = kairos_obj.create_kairos_df(data)
data_frame.dropna(inplace=True)
data_frame.reset_index(inplace=True)
vals = data_frame.to_dict(orient='records')
prev_totaliser_value = 0
for index in range(len(vals)):
if index == 0:
timestamp = int(vals[index]['timestamp'])
totalised_value = vals[index][tag]
else:
current_time = int(vals[index]['timestamp'])
current_value = vals[index][tag]
time_diff = (((current_time - prev_time) / 1000) / 60)
if time_diff <= 60:
totalised_value = (time_diff / 60) * \
((prev_value + current_value) / 2)
else:
totalised_value = (time_diff / 60) * \
((0 + current_value) / 2)
timestamp = current_time
# TODO: TIME BASED LOGIC TO SET prev_totaliser_value = 0
start = time.strptime(time.strftime(
'%Y-%m-%d %H:%M:%S', time.localtime(prev_time / 1000)), '%Y-%m-%d %H:%M:%S')
end = time.strptime(time.strftime(
'%Y-%m-%d %H:%M:%S', time.localtime(current_time / 1000)), '%Y-%m-%d %H:%M:%S')
start_day = start.tm_mday
end_day = end.tm_mday
if start_day != end_day:
prev_totaliser_value = 0
else:
pass
if totalised_value < 0:
totalised_value = 0
prev_totaliser_value += totalised_value
prev_time = int(vals[index]['timestamp'])
prev_value = vals[index][tag]
kairos_obj.write_template(tag_id=tag,
value=prev_totaliser_value,
timestamp=timestamp)
prev_tag_data[tag] = {
'previous_value': prev_value,
'previous_time': prev_time,
'previous_totalized_value': prev_totaliser_value
}
kairos_obj.kairos_write()
logger.info(f'tags_data_after_kairos : {prev_tag_data}')
return True
from scripts.util.redis_db import previousTag
from scripts.config.json_reader import tags_list
from scripts.constants.tag_constants import prev_tag_data
from scripts.util.logger import logger
import json
import time
redis_db_obj = previousTag()
def get_previous_data():
logger.info('REDIS')
current_time = int(time.time()) * 1000
for tag in tags_list:
tag_data = redis_db_obj.get_data().mget([tag, tags_list[tag]])
logger.info(f"{tag} : {tag_data[0]}")
logger.info(f"{tags_list[tag]} : {tag_data[1]}")
# print(f"{tag} : {tag_data[0]}")
# print(f"{tags_list[tag]} : {tag_data[1]}")
if tag_data[0] is None:
tag_data[0] = '{"timestamp": ' + str(current_time) + ', "value": 0.0}'
if tag_data[1] is None:
tag_data[1] = '{"timestamp": ' + str(current_time) + ', "value": 0.0}'
tag_data[0] = json.loads(tag_data[0])
tag_data[1] = json.loads(tag_data[1])
prev_tag_data[tag] = {
'previous_value': tag_data[0]['value'],
'previous_time': tag_data[1]['timestamp'],
'previous_totalized_value': tag_data[1]['value']
}
import time
def get_totalized_value(current_value, current_time, prev_value, prev_time, prev_totalized_value):
time_diff = ((current_time - prev_time) / 1000) / 60
if time_diff <= 60:
totalized_value = (time_diff / 60) * ((prev_value + current_value) / 2)
else:
totalized_value = (time_diff / 60) * ((0 + current_value) / 2)
if totalized_value < 0:
totalized_value = 0
# TODO: TIME BASED LOGIC TO SET prev_totalized_value = 0
start = time.strptime(time.strftime(
'%Y-%m-%d %H:%M:%S', time.localtime(prev_time / 1000)), '%Y-%m-%d %H:%M:%S')
end = time.strptime(time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(current_time / 1000)), '%Y-%m-%d %H:%M:%S')
start_day = start.tm_mday
end_day = end.tm_mday
if start_day != end_day:
prev_totalized_value = 0
else:
pass
prev_totalized_value += totalized_value
return prev_totalized_value
import logging
import os
from logging.handlers import RotatingFileHandler
from scripts.config.config_reader import LOGGING_JSON
def get_logger():
"""
Creates a rotating log
"""
if not os.path.exists(LOGGING_JSON["file_path"]):
os.makedirs(LOGGING_JSON["file_path"])
__logger__ = logging.getLogger('')
__logger__.setLevel(LOGGING_JSON["level"].upper())
log_formatter = '%(asctime)s - %(levelname)-6s - %(message)s'
time_format = "%Y-%m-%d %H:%M:%S"
formatter = logging.Formatter(log_formatter, time_format)
log_file = os.path.join(LOGGING_JSON["file_path"] + LOGGING_JSON["name"] + '.log')
temp_handler = RotatingFileHandler(log_file,
maxBytes=LOGGING_JSON["max_bytes"],
backupCount=LOGGING_JSON["back_up_count"])
temp_handler.setFormatter(formatter)
__logger__.addHandler(temp_handler)
return __logger__
# import logging.handlers
# from logging.handlers import RotatingFileHandler
#
#
# def get_logger():
# log_file = os.path.join(LOGGING_JSON["file_path"] + LOGGING_JSON["name"] + '.log')
# if not os.path.exists(LOGGING_JSON["file_path"]):
# os.makedirs(LOGGING_JSON["file_path"])
# __logger__ = logging.getLogger(LOGGING_JSON["name"] + '_Client')
# __logger__.propagate = False
# formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s', "%Y-%m-%d %H:%M:%S")
# __logger__.setLevel(logging.DEBUG)
#
# handler = RotatingFileHandler(log_file,
# maxBytes=int(LOGGING_JSON["max_bytes"]),
# backupCount=int(LOGGING_JSON["back_up_count"]))
# handler.setFormatter(formatter)
# if __logger__.hasHandlers():
# __logger__.handlers.clear()
#
# __logger__.addHandler(handler)
# __logger__.debug('Logger Initialized')
#
# return __logger__
logger = get_logger()
import paho.mqtt.client as mqtt
import json
import time
from scripts.config.json_reader import application_config, tags_list
from scripts.constants.tag_constants import live_tag_data
from scripts.util.logger import logger
# from scripts.core.totaliser_calculator import calculate_totalized_values
counter = 0
def on_connect(client, userdata, flags, rc):
logger.info("MQTT : Connected with result code " + str(rc))
def on_message(client, userdata, msg):
global counter
try:
payload = json.loads(msg.payload)
# logger.info(f"Received Payload : {payload}")
live_tag_data.append(payload)
except Exception as es:
logger.error(f'Exception while fetching data : {es}')
def live_data():
try:
client.connect(host=application_config['mqtt']['host'], port=int(
application_config['mqtt']['port']))
client.subscribe([(application_config['mqtt']['topic1'], 1), (application_config['mqtt']['topic2'], 1)])
client.loop_start()
except Exception as es:
logger.error(f'exception while connection to mqtt : {es}')
def send_data(message):
try:
_topic = f"{application_config['mqtt']['publish_topic']}{message['site_id']}"
# logger.info(f"Publishing Topic : {_topic}")
logger.info(f"Publishing Data :{message}")
msg_info = client.publish(_topic, json.dumps(message), qos=1)
if msg_info[0] == 4:
logger.debug("mqtt connection lost")
else:
logger.info("MQTT Messaged published successfully")
except Exception as e:
logger.exception(f"Exception at MQTT Publish: {e}")
raise
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
from redis import Redis
from scripts.config.json_reader import application_config
from scripts.util.logger import logger
class previousTag:
def __init__(self):
self.host = application_config['redis']['host']
self.port = int(application_config['redis']['port'])
self.db = application_config['redis']['tags_data']
def get_data(self):
try:
live_tags_db_object = Redis(
host=self.host,
port=self.port,
db=self.db,
decode_responses=True,
)
return live_tags_db_object
except Exception as e:
logger.exception(f'Error in connecting to Redis : {es}')
raise
from scripts.config.json_reader import tags_list, application_config
from scripts.constants.tag_constants import live_tag_data, prev_tag_data
from scripts.core.totaliser_calculator import get_totalized_value
from scripts.core.redis_calculator import get_previous_data
from scripts.core.kairos_calculator import totalizer_calculator
from scripts.util.logger import logger
from scripts.util import mqtt_util
import json
import time
counter = 0
def payload_parser(payload):
tags = dict()
for tag in payload['data']:
if tag in tags_list:
tags[tag] = {
'value': payload['data'][tag],
'timestamp': payload['timestamp']
}
if bool(tags):
logger.info(f'Live Flow Tags : {tags}')
return tags
def calculate():
global counter
if len(live_tag_data) > 0:
current_tags_data = payload_parser(live_tag_data.pop(0))
temp_list = {
'data': {}
}
for tag in current_tags_data:
if current_tags_data.get(tag):
current_value, current_time = current_tags_data[tag]['value'], current_tags_data[tag]['timestamp']
logger.info(f"Previous Tag Data : {tag} : {prev_tag_data[tag]}")
prev_value = prev_tag_data[tag]['previous_value']
prev_time = prev_tag_data[tag]['previous_time']
prev_totalized_value = prev_tag_data[tag]['previous_totalized_value']
if current_time != prev_time:
totalized_value = get_totalized_value(current_value, current_time, prev_value, prev_time,
prev_totalized_value)
temp_list['data'][tags_list[tag]] = round(totalized_value, 5)
temp_list['site_id'] = tag.split('$')[0]
prev_tag_data[tag] = {
'previous_value': current_value,
'previous_time': current_time,
'previous_totalized_value': totalized_value
}
if bool(temp_list['data']):
counter += 1
mqtt_util.send_data(message={
"data": temp_list['data'],
"site_id": temp_list['site_id'],
"gw_id": "gw_111",
"pd_id": "pd_130",
"msg_id": counter,
"retain_flag": True,
"timestamp": current_time,
})
if __name__ == "__main__":
logger.info("*************** STARTING ***************")
mqtt_util.live_data()
if application_config['kairos_tag'] == 'true':
if totalizer_calculator():
while True:
calculate()
elif application_config['redis_tag'] == 'true':
get_previous_data()
while True:
calculate()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment