Commit 0753042f authored by kiran.ak's avatar kiran.ak

initial commit

parents
[LOG]
base_path = logs/
log_level = INFO
max_bytes = 100000000
back_up_count = 5
name = MqttTransmitter
source:
host: "192.168.19.87"
port: 1883
topic: "ilens/monitor/live/site_143"
tag_old: "site_214$line_490$equipment_301$tag_572"
tag: "site_214$line_414$tag_155"
destination:
host: "iot-hub.ilens.io"
port: 1883
topic: "ilens/monitor/live/site_203"
tag_old: "site_143$line_144$equipment_1050$tag_317"
tag: "site_143$line_152$tag_5790"
site_id: "site_143"
#To send the recieved json as it is just mapping the mapping as empty
mapping:
site_143$line_140$equipment_181$tag_198: site_203$line_282$equipment_444$tag_423
site_143$line_140$equipment_178$tag_198: site_203$line_282$equipment_438$tag_423
site_143$line_141$equipment_182$tag_198: site_203$line_283$equipment_445$tag_423
site_143$line_141$equipment_183$tag_198: site_203$line_283$equipment_446$tag_423
site_143$line_164$equipment_230$tag_198: site_203$line_288$equipment_449$tag_423
\ No newline at end of file
{
"source": {
"ip": "192.168.0.220",
"port": 1883,
"topic": "ilens/monitor/live/site_143"
},
"destination": {
"ip": "iot-hub.ilens.io",
"port": 1883,
"topic": "ilens/monitor/live/site_203"
},
"mapper": {
"site_143$line_140$equipment_181$tag_198": "site_203$line_282$equipment_444$tag_423",
"site_143$line_140$equipment_178$tag_198": "site_203$line_282$equipment_438$tag_423",
"site_143$line_141$equipment_182$tag_198": "site_203$line_283$equipment_445$tag_423",
"site_143$line_141$equipment_183$tag_198": "site_203$line_283$equipment_446$tag_423",
"site_143$line_164$equipment_230$tag_198": "site_203$line_288$equipment_449$tag_423"
}
}
\ No newline at end of file
import threading
from scripts.mqtt_subscriber import MQTTSubscriber
from scripts.mqtt_publisher import MQTTPublisher
from scripts.config_reader import getConfig
from scripts.app_constants import DATA_LIST
class myThread(threading.Thread):
def __init__(self, threadID, path_to_config):
threading.Thread.__init__(self)
self.threadID = threadID
self.thread_config = getConfig(path_to_config)
self.mapper = self.thread_config['mapper']
self.mqtt_subs = MQTTSubscriber(self.thread_config['source'])
self.mqtt_pub = MQTTPublisher(self.thread_config['destination'])
def run(self):
self.mqtt_subs.connect_to_source()
self.mqtt_pub.connect_to_destination()
while True:
if len(DATA_LIST) > 0:
payload_data = DATA_LIST.pop()
new_data = {}
for tag in payload_data['data']:
if tag in self.mapper:
new_data[self.mapper[tag]] = payload_data['data'][tag]
payload_data["data"] = new_data
self.mqtt_pub.publish_data(payload_data)
if __name__ == '__main__':
t = myThread(1, 'conf/site_203.json')
t.start()
paho-mqtt==1.5.0
\ No newline at end of file
DATA_LIST = []
\ No newline at end of file
import json
from configparser import ConfigParser
def getConfig(path_to_config):
with open(path_to_config, 'r') as config_file:
config_json = json.loads(config_file.read())
return config_json
config_file_path = r"conf/application.conf"
parser = ConfigParser()
parser.read(config_file_path)
def read_config():
config_json = dict()
config_json["file_path"] = parser.get("LOG", "base_path")
config_json["level"] = parser.get("LOG", "log_level")
config_json["name"] = parser.get("LOG", "name")
config_json["max_bytes"] = int(parser.get("LOG", "max_bytes"))
config_json["back_up_count"] = int(parser.get("LOG", "back_up_count"))
return config_json
LOGGING_JSON = read_config()
import logging
import os
from logging.handlers import RotatingFileHandler
from scripts.config_reader import LOGGING_JSON
def get_logger():
"""
Creates a rotating log
"""
__logger__ = logging.getLogger('')
__logger__.setLevel(LOGGING_JSON["level"].upper())
log_formatter = '%(asctime)s - %(levelname)-6s - [%(threadName)5s:%(funcName)5s():''' \
'%(lineno)s] - %(message)s'
time_format = "%Y-%m-%d %H:%M:%S"
formatter = logging.Formatter(log_formatter, time_format)
log_file = os.path.join(LOGGING_JSON["file_path"] + LOGGING_JSON["name"] + '.log')
if not os.path.exists(LOGGING_JSON["file_path"]):
os.makedirs(LOGGING_JSON["file_path"])
temp_handler = RotatingFileHandler(log_file,
maxBytes=LOGGING_JSON["max_bytes"],
backupCount=LOGGING_JSON["back_up_count"])
temp_handler.setFormatter(formatter)
__logger__.addHandler(temp_handler)
return __logger__
logger = get_logger()
import json
import paho.mqtt.client as paho
from scripts.logger import logger
class MQTTPublisher(object):
def __init__(self, dest_config):
"""
Initializer
"""
self.destination_ip = dest_config["ip"]
self.destination_port = dest_config["port"]
self.destination_topic = dest_config["topic"]
def on_connect(self, rc):
logger.info("Connected to destination with result code {}".format(str(rc)))
def connect_to_destination(self):
try:
self.mqtt_client = paho.Client() # create client object
self.mqtt_client.on_connect = self.on_connect
logger.debug("Connecting to {}:{}".format(self.destination_ip, str(self.destination_port)))
res = self.mqtt_client.connect(self.destination_ip, self.destination_port)
if res != 0:
logger.exception("MQTT Connection Exception")
except Exception as e:
logger.exception("Exception while connecting to MQTT:" + str(e))
def publish_data(self, message):
try:
logger.debug("MQTT Publish to :{}".format(self.destination_topic))
logger.debug("MQTT DATA : {}".format(str(message)))
msg_info = self.mqtt_client.publish(self.destination_topic, json.dumps(message), qos=1)
if msg_info[0] == 4:
logger.debug("mqtt connection lost")
return False
else:
logger.info("MQTT Message published successfully")
return True
except Exception as e:
logger.exception("Exception while publishing the message to topic:" + str(e))
return False
import json
import time
import paho.mqtt.client as mqtt
from scripts.app_constants import DATA_LIST
from scripts.logger import logger
class MQTTSubscriber:
def __init__(self, source_config):
self.source_ip = source_config['ip']
self.source_port = source_config['port']
self.source_topic = source_config['topic']
def on_connect(self, rc):
logger.info("Connected to source with result code {}".format(str(rc)))
def on_message(self, client, userdata, message):
try:
logger.info("data arrived : {}".format(str(time.time())))
DATA_LIST.append(json.loads(message.payload))
except Exception as es:
logger.error("Exception in received data : {}".format(str(es)))
def connect_to_source(self):
try:
logger.info("Connecting to {}:{}".format(str(self.source_ip), str(self.source_port)))
client = mqtt.Client()
client.on_message = self.on_message
client.on_connect = self.on_connect
client.connect(self.source_ip, port=int(self.source_port))
client.subscribe(self.source_topic)
client.loop_start()
except Exception as e:
logger.exception("Exception when connecting to MQTT:{}".format(str(e)))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment