Commit f839f2a7 authored by arun.uday's avatar arun.uday

commit first

parents
.idea
\ No newline at end of file
import uvicorn
from fastapi import FastAPI
from scripts.config import applications_config
from scripts.logging.loggers import logger
from scripts.services.task_main_service import receiver_app
app = FastAPI()
app.include_router(receiver_app)
# starting the application
if __name__ == "__main__":
try:
print("Task 15")
uvicorn.run(app, port=int(applications_config.uvicorn_receiver_port))
except Exception as e:
logger.error("Interruption occurred: ", e)
[path]
base_path = scripts/
sub_path = temp/
[uvicorn]
uvicorn_port = 8000
receiver_port = 8001
[api_routes]
route_sender_index = /sender
route_receiver_index = /receiver
route_get = /get_data
[log]
formatter_time = asctime
formatter_level = levelname
log_name = log_task15
[mqtt]
topic = task14
mqtt_host = 192.168.0.220
port = 1883
requests = 60
[encode]
encode = utf-8
\ No newline at end of file
import uvicorn
from fastapi import FastAPI
from scripts.config import applications_config
from scripts.logging.loggers import logger
from scripts.services.task_main_service import sender_app
app = FastAPI()
app.include_router(sender_app)
# starting the application
if __name__ == "__main__":
try:
print("Task 15")
uvicorn.run(app, port=int(applications_config.uvicorn_port))
except Exception as e:
logger.error("Interruption occurred: ", e)
Balabaster @ file:///home/ktietz/src/ci/alabaster_1611921544520/work Balabaster @ file:///home/ktietz/src/ci/alabaster_1611921544520/work
# reading from the applications.conf
import configparser
try:
config = configparser.RawConfigParser()
config.read("conf/applications.conf")
# path
base_path = config.get("path", 'base_path')
sub_path = config.get("path", "sub_path")
full_path = base_path + sub_path
# log
formatter_time = config.get("log", "formatter_time")
formatter_level = config.get("log", "formatter_level")
log_name = config.get("log", "log_name")
# uvicorn
uvicorn_port = config.get("uvicorn", "uvicorn_port")
uvicorn_receiver_port = config.get("uvicorn", "receiver_port")
# mqtt
topic_name = config.get("mqtt", "topic")
mqtt_host = config.get("mqtt", "mqtt_host")
mqtt_port = config.get("mqtt", "port")
request_no = config.get("mqtt", "requests")
# encode
utf_encode = config.get("encode", "encode")
except Exception as e:
print(e)
# reading conf file
import configparser
from scripts.logging.loggers import logger
try:
config = configparser.RawConfigParser()
config.read("conf/applications.conf")
# api paths
route_sender_index = config.get("api_routes", "route_sender_index")
route_receiver_index = config.get("api_routes", "route_receiver_index")
route_get = config.get("api_routes", "route_get")
except Exception as e:
logger.error("Interruption occurred: ", e)
from scripts.config import applications_config
from scripts.logging.loggers import logger
def connect_mqtt(client_name, userdata, flags, rc):
try:
print(userdata, " ", flags)
print("Connected with result code " + str(rc))
# subscribing to the topic
client_name.subscribe(applications_config.topic_name)
except Exception as e:
logger.error("Exception occurred while connecting to mqtt: ", e)
from scripts.logging.loggers import logger
def insert_data(conn, data):
try:
# check if the data is entered correctly
if conn.hmset(data['site_id'], data['data']):
logger.info("Data entered")
else:
logger.info("Insertion failed")
except Exception as e:
logger.exception("Redis Insertion Failed", e)
import json
from scripts.config.applications_config import utf_encode
from scripts.core.handlers.receiver_handlers.select_db_redis import operations_call_
from scripts.logging.loggers import logger
def message_mqtt(client_name, userdata, msg):
try:
print(client_name, " ", userdata)
# decoding the msg to string
payload_decoded = msg.payload.decode(utf_encode)
# payload to json
json_data = json.loads(payload_decoded)
# if type is list then data will get enter else message will be printed
operations_call_(json_data)
except Exception as e:
logger.error("Exception occurred while decoding the message: ", e)
from paho.mqtt.client import Client
from scripts.core.handlers.receiver_handlers.connection_mqtt_receiver import connect_mqtt
from scripts.core.handlers.receiver_handlers.message_mqtt_receive import message_mqtt
from scripts.logging.loggers import logger
def receive_mqtt(mqtt_host, port, request_no):
try:
# client object creation
client = Client(userdata="Receiver")
# connecting to broker
client.connect(mqtt_host, int(port), int(request_no))
client.on_connect = connect_mqtt
# listening to the topic
client.on_message = message_mqtt
client.loop_forever()
except Exception as e:
logger.error("Exception occurred while calling the mqtt: ", e)
from scripts.core.handlers.receiver_handlers.insert_data_redis import insert_data
from scripts.database import redis_db_connection
from scripts.logging.loggers import logger
def operations_call_(json_poll_data):
try:
# selecting the connection according to the quality
mapper = {
0: redis_db_connection.conn_good,
1: redis_db_connection.conn_maintenance,
2: redis_db_connection.conn_error
}
# insert the data to the db
insert_data(mapper.get(json_poll_data['data_quality']), json_poll_data)
except Exception as e:
logger.exception("Redis mapper Failed", e)
import json
import time
from paho.mqtt.client import Client
from scripts.config import applications_config
from scripts.database.models.models_sites import SitesEncoder
from scripts.logging.loggers import logger
def send_data_mqtt(dict_pollution_data):
try:
# dumps the list to json
json_data = json.dumps(dict_pollution_data, cls=SitesEncoder)
# create the paho client
client = Client()
# connect to the mqtt client
client.connect(applications_config.mqtt_host, int(applications_config.mqtt_port))
# publish the topic
client.publish(applications_config.topic_name, json_data)
# disconnecting from the mqtt
client.disconnect()
time.sleep(10)
return {"message": "Data published to receiver", "status": "message sent", "data": json_data}
except Exception as e:
logger.error("Some exception occurred while sending file: ", e)
import json
from pydantic import BaseModel
class Sites(BaseModel):
PM10: int
PM2_5: int
SO2: int
NO2: int
class Quality(BaseModel):
data: Sites
site_id: str
data_quality: int
class SitesEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, Sites):
return obj.dict()
return super().default(obj)
import redis
from scripts.logging.loggers import logger
try:
# gold seats database on redis
conn_good = redis.Redis('127.0.0.1', db=0)
conn_maintenance = redis.Redis('127.0.0.1', db=1)
conn_error = redis.Redis('127.0.0.1', db=2)
except Exception as e:
logger.error("Exception occurred: ", e)
import logging
import os
from logging.handlers import RotatingFileHandler
from scripts.config import applications_config
def get_logger():
"""
Creates a rotating log
"""
__logger__ = logging.getLogger('')
# setting the logger level
__logger__.setLevel(logging.INFO)
# creating the format for the log
log_formatter = f'%({applications_config.formatter_time})s - %({applications_config.formatter_level})-6s - %(message)s'
time_format = "%Y-%m-%d %H:%M:%S"
# getting the path for the logger
file_path = applications_config.full_path
# setting the format
formatter = logging.Formatter(log_formatter, time_format)
# creating the folder if not exist
if not os.path.exists(file_path):
os.makedirs(file_path)
# joining the path
log_file = os.path.join(f"{file_path}{applications_config.log_name}.log")
# creating rotating file handler with max byte as 1
temp_handler = RotatingFileHandler(log_file, maxBytes=1)
# setting the formatter
temp_handler.setFormatter(formatter)
# setting the handler
__logger__.addHandler(temp_handler)
return __logger__
logger = get_logger()
from fastapi.routing import APIRouter
from scripts.config import applications_config
from scripts.constants import api_path_config
from scripts.core.handlers.receiver_handlers.receive_mqtt_data import receive_mqtt
from scripts.core.handlers.sender_handlers.mqtt_data_call import send_data_mqtt
from scripts.database.models.models_sites import Quality
from scripts.logging.loggers import logger
sender_app = APIRouter()
receiver_app = APIRouter()
@sender_app.post(api_path_config.route_sender_index + api_path_config.route_get)
def sender_get_data(pollution: Quality):
try:
status = send_data_mqtt(dict(pollution))
return status
except Exception as e:
logger.error("Some exception in sender api: ", e)
@receiver_app.post(api_path_config.route_receiver_index + api_path_config.route_get)
def receiver_get_data():
try:
receive_mqtt(applications_config.mqtt_host, applications_config.mqtt_port, applications_config.request_no)
except Exception as e:
logger.error("Some exception in receiver api: ", e)
2023-02-20 15:39:52 - INFO - Data entered
2023-02-20 15:40:12 - INFO - Data entered
2023-02-20 15:40:30 - INFO - Data entered
2023-02-20 16:17:18 - INFO - Data entered
2023-02-20 16:22:23 - INFO - Data entered
2023-02-20 16:23:45 - INFO - Data entered
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment