Commit 69edf522 authored by arun.uday's avatar arun.uday

migrate to gitlab-pm

parents
# Default ignored files
/shelf/
/workspace.xml
task7_mqtt_receiver
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$/scripts/temp/user_details.db" charset="ISO-8859-1" />
</component>
</project>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.9 (task7subscribe)" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/task7_mqtt_receiver.iml" filepath="$PROJECT_DIR$/.idea/task7_mqtt_receiver.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/venv" />
</content>
<orderEntry type="jdk" jdkName="Python 3.9 (task7subscribe)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
import uvicorn
from scripts.config.application_config import uvicorn_host, uvicorn_port
from scripts.logging.loggers import logger
# starting the application
if __name__ == "__main__":
try:
print("MQTT task receiver")
uvicorn.run("main:app", host=uvicorn_host, port=int(uvicorn_port))
except Exception as e:
logger.error("Interruption occurred: ", e)
[path]
base_path = scripts/
sub_path = temp/
[file]
file_name_csv = MOCK_DATA.csv
file_name_json = data.json
[mqtt]
topic = topic
mqtt_host = 192.168.0.220
port = 1883
requests = 60
[uvicorn]
uvicorn_host = 127.0.0.1
uvicorn_port = 8001
uvicorn_app = scripts.services.receiver_app_services_run:app
[db]
db_name = user_details.db
db_table = details
[encode]
encode = utf-8
[api_routes]
api_index = /index/
[log]
formatter_time = asctime
formatter_level = levelname
\ No newline at end of file
import uvicorn
from fastapi import FastAPI
from scripts.config.application_config import uvicorn_host, uvicorn_port, uvicorn_app
from scripts.logging.loggers import logger
from scripts.services.receiver_app_services_run import mqtt_receive
app = FastAPI()
app.include_router(mqtt_receive)
# starting the application
if __name__ == "__main__":
try:
print("MQTT task receiver")
uvicorn.run(app, host=uvicorn_host, port=int(uvicorn_port))
except Exception as e:
logger.error("Interruption occurred: ", e)
import configparser
config = configparser.RawConfigParser()
config.read("conf/applications.conf")
# path
base_path = config.get("path", 'base_path')
sub_path = config.get("path", "sub_path")
full_path = base_path + sub_path
# mqtt
topic_name = config.get("mqtt", "topic")
mqtt_host = config.get("mqtt", "mqtt_host")
port = config.get("mqtt", "port")
request_no = config.get("mqtt", "requests")
# uvicorn
uvicorn_host = config.get("uvicorn", "uvicorn_host")
uvicorn_port = config.get("uvicorn", "uvicorn_port")
uvicorn_app = config.get("uvicorn", "uvicorn_app")
# file name
file_name_csv = config.get("file", "file_name_csv")
file_name_json = config.get("file", "file_name_json")
full_path_csv = base_path + sub_path + file_name_csv
full_path_json = base_path + sub_path + file_name_json
# db name
db_name = config.get("db", "db_name")
db_table = config.get("db", "db_table")
# encode
utf_encode = config.get("encode", "encode")
# log
formatter_time = config.get("log", "formatter_time")
formatter_level = config.get("log", "formatter_level")
import configparser
config = configparser.RawConfigParser()
config.read("conf/applications.conf")
# path
api_index = config.get("api_routes", "api_index")
from scripts.config.application_config import topic_name
from scripts.logging.loggers import logger
def connect_mqtt(client_name, userdata, flags, rc):
try:
print(userdata, " ", flags)
print("Connected with result code " + str(rc))
client_name.subscribe(topic_name)
except Exception as e:
logger.error("Exception occurred while connecting to mqtt: ", e)
import pandas as pd
from scripts.core.handlers.display_data import display_fetched_data
from scripts.core.handlers.operations_db import SqliteOperations
from scripts.logging.loggers import logger
def json_to_dictionary(json_data):
try:
data_list = list(json_data.values())
# list to dataframe
frames = pd.DataFrame(data_list, index=None)
obj_db = SqliteOperations()
# inserting the data to table
obj_db.insert_db(frames)
# fetching the data
data_fetched = obj_db.fetch_data()
display_fetched_data(data_fetched)
except Exception as e:
logger.error("Exception occurred while converting to data frame: ", e)
from scripts.logging.loggers import logger
def display_fetched_data(cursor):
try:
# Fetch all the rows as a list of tuples
rows = cursor.fetchall()
# Iterate through the rows and print the data
for row in rows:
print(row)
except Exception as e:
logger.error("Exception occurred while fetching the data from table: ", e)
import json
from scripts.config.application_config import utf_encode
from scripts.core.handlers.convert_to_dictionary import json_to_dictionary
from scripts.logging.loggers import logger
def message_mqtt(client_name, userdata, msg):
try:
print(client_name, " ", userdata)
# decoding the msg to string
payload_decoded = msg.payload.decode(utf_encode)
json_data = json.loads(payload_decoded)
# decoded msg to list
json_to_dictionary(json_data)
except Exception as e:
logger.error("Exception occurred while decoding the message: ", e)
from scripts.core.handlers.connection_mqtt_receiver import connect_mqtt
from scripts.core.handlers.message_mqtt_receive import message_mqtt
from scripts.logging.loggers import logger
def mqtt_receiver_call(mqtt, mqtt_host, port, request_no):
try:
client = mqtt.Client()
client.connect(mqtt_host, int(port), int(request_no))
client.on_connect = connect_mqtt
client.on_message = message_mqtt
client.loop_forever()
except Exception as e:
logger.error("Exception occurred while calling the mqtt: ", e)
import sqlite3
from scripts.config.application_config import db_name, db_table, full_path
from scripts.logging.loggers import logger
from scripts.utilis.db_queries import DbQueries
class SqliteOperations:
def __init__(self):
# connecting to the database
self.conn = sqlite3.connect(full_path + db_name)
def create_db(self):
try:
# creating the table
self.conn.execute(DbQueries().create_table)
self.conn.close()
except Exception as e:
logger.error("Exception occurred while creating table: ", e)
def insert_db(self, frames):
try:
# inserting data to table
frames.to_sql(db_table, self.conn, if_exists='replace')
self.conn.commit()
except Exception as e:
logger.error("Exception occurred while inserting data: ", e)
def fetch_data(self):
try:
# Create a cursor
cursor = self.conn.cursor()
# Execute a SELECT statement to fetch data from the "details" table
cursor.execute(DbQueries().select)
return cursor
except Exception as e:
logger.error("Exception occurred while fetching data: ", e)
import logging
import os
from logging.handlers import RotatingFileHandler
from scripts.config import application_config
from scripts.config.application_config import formatter_time, formatter_level
def get_logger():
"""
Creates a rotating log
"""
__logger__ = logging.getLogger('')
__logger__.setLevel(logging.INFO)
log_formatter = f'%({formatter_time})s - %({formatter_level})-6s - %(message)s'
time_format = "%Y-%m-%d %H:%M:%S"
file_path = application_config.full_path
formatter = logging.Formatter(log_formatter, time_format)
if not os.path.exists(file_path):
os.makedirs(file_path)
log_file = os.path.join(f"{file_path}{application_config.topic_name}.log")
temp_handler = RotatingFileHandler(log_file, maxBytes=1)
temp_handler.setFormatter(formatter)
__logger__.addHandler(temp_handler)
return __logger__
logger = get_logger()
import paho.mqtt.client as mqtt
from fastapi import APIRouter
from scripts.constants.api_routes_config import api_index
from scripts.config.application_config import mqtt_host, port, request_no
from scripts.core.handlers.mqtt_receive_data import mqtt_receiver_call
from scripts.logging.loggers import logger
# This is the Subscriber
mqtt_receive = APIRouter()
@mqtt_receive.get(api_index)
def receiver():
try:
mqtt_receiver_call(mqtt, mqtt_host, port, request_no)
except Exception as e:
logger.error("Exception occurred: ", e)
# database queries
from scripts.config.application_config import db_table
class DbQueries:
def __init__(self):
# table creation
self.create_table = '''CREATE TABLE IF NOT EXISTS details
(ID INT PRIMARY KEY NOT NULL,
FIRSTNAME TEXT NOT NULL,
LASTNAME TEXT NOT NULL);'''
# table selection
self.select = "SELECT * FROM "+db_table
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment