Commit 657a0323 authored by ajil.k's avatar ajil.k

added

parents
# Default ignored files
/shelf/
/workspace.xml
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/venv" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyPep8Inspection" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<option name="ignoredErrors">
<list>
<option value="E712" />
</list>
</option>
</inspection_tool>
<inspection_tool class="PyPep8NamingInspection" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<option name="ignoredErrors">
<list>
<option value="N801" />
</list>
</option>
</inspection_tool>
<inspection_tool class="PyUnresolvedReferencesInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredIdentifiers">
<list>
<option value="dict.mobile" />
<option value="dict.dob" />
<option value="dict.gender" />
<option value="dict.name" />
</list>
</option>
</inspection_tool>
</profile>
</component>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.9 (data_quality_checker_subcriber)" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/data_quality_checker_subcriber.iml" filepath="$PROJECT_DIR$/.idea/data_quality_checker_subcriber.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
# importing libraries
import uvicorn
from scripts.constants.application_config import port_no
from scripts.logging.logger import logger
if __name__ == "__main__":
try:
uvicorn.run("main:app", port=int(port_no))
except Exception as e:
logger.error(e)
print("Error-", e)
[fastapi_connection]
port_no = 5000
[MQTT_Connection]
broker = 192.168.0.220
port = 1883
time_out = 60
[redis_connection]
host = 127.0.0.1
redis_port = 6379
good_data = 4
maintenance_data = 5
error_data = 6
\ No newline at end of file
# importing libraries
import uvicorn
from fastapi import FastAPI
from scripts.constants.application_config import port_no
from scripts.services.subscriber import subscriber
app = FastAPI()
app.include_router(subscriber)
if __name__ == "__main__":
try:
uvicorn.run(app, port=int(port_no))
except Exception as e:
print("Error-", e)
from configparser import ConfigParser
try:
config = ConfigParser()
config.read("conf/application.conf")
port_no = config.get("fastapi_connection", "port_no")
broker = config.get("MQTT_Connection", "broker")
mqtt_port = config.get("MQTT_Connection", "port")
time_out = config.get("MQTT_Connection", "time_out")
host = config.get("redis_connection", "host")
redis_port = config.get("redis_connection", "redis_port")
good_data = config.get("redis_connection", "good_data")
maintenance_data = config.get("redis_connection", "maintenance_data")
error_data = config.get("redis_connection", "error_data")
except Exception as e:
print("Exception-", e)
class EndPoints:
root = "/"
subscribe_data = "/subscribe_data/"
# MQTT client connection for publishing website information
import paho.mqtt.client as mqtt
from scripts.constants.application_config import broker, mqtt_port, time_out
from scripts.logging.logger import logger
def mqtt_client_connection():
try:
mqtt_client = mqtt.Client()
mqtt_client.connect(broker, int(mqtt_port), int(time_out))
return mqtt_client
except Exception as e:
logger.error(e)
# Importing redis library
import redis
from scripts.constants.application_config import host, redis_port, good_data, maintenance_data, error_data
from scripts.logging.logger import logger
try:
db_connection1 = redis.StrictRedis(host=host, port=int(redis_port), db=int(good_data))
db_connection2 = redis.StrictRedis(host=host, port=int(redis_port), db=int(maintenance_data))
db_connection3 = redis.StrictRedis(host=host, port=int(redis_port), db=int(error_data))
except Exception as e:
logger.error(e)
import json
from scripts.constants.mqtt_connection import mqtt_client_connection
from scripts.core.handlers.store_in_redis import store_data_in_db
from scripts.logging.logger import logger
def on_message(client, userdata, message):
data = message.payload.decode()
print(f"Message send using topic : {message.topic}, Data received : {data}")
json_data = json.loads(data)
store_data_in_db(json_data)
def start_subscribing():
try:
# MQTT client for subscribing
mqtt_client = mqtt_client_connection()
print("Waiting for messages")
topic = "Website/#"
mqtt_client.subscribe(topic)
mqtt_client.on_message = on_message
mqtt_client.loop_forever()
except Exception as e:
logger.error(e)
import json
from scripts.constants.redis_connection import db_connection1, db_connection2, db_connection3
from scripts.logging.logger import logger
def store_data_in_db(json_data):
try:
quality = json_data["data_quality"]
key = json_data["site_id"]
value = json.dumps(json_data["data"])
if quality == 0:
# if data quality is Good
db_connection1.set(key, value)
elif quality == 1:
# if data quality is Maintenance
db_connection2.set(key, value)
elif quality == 2:
# if data quality is Error
db_connection3.set(key, value)
except Exception as e:
logger.error(e)
import logging
import os
from logging.handlers import RotatingFileHandler
def get_logger():
"""
Creates a rotating log
"""
__logger__ = logging.getLogger('')
__logger__.setLevel("ERROR")
log_formatter = '%(asctime)s - %(levelname)-6s - [%(threadName)5s:%(funcName)5s():%(lineno)s] - %(message)s'
time_format = "%Y-%m-%d %H:%M:%S"
formatter = logging.Formatter(log_formatter, time_format)
log_files = os.path.join("temp/ERROR.log")
temp_handler = RotatingFileHandler(log_files, maxBytes=100000000, backupCount=5)
temp_handler.setFormatter(formatter)
__logger__.addHandler(temp_handler)
return __logger__
logger = get_logger()
# importing libraries
from fastapi import APIRouter
from scripts.constants.endpoints import EndPoints
from scripts.core.handlers.handle_subcribed_data import start_subscribing
from scripts.logging.logger import logger
# Create FastAPI instance
subscriber = APIRouter()
@subscriber.get(EndPoints.root)
async def root():
return {"Message": "It's Working!"}
@subscriber.post(EndPoints.subscribe_data)
async def subscribing():
start_subscribing()
return "Started Subscribing"
2023-02-20 16:52:04 - ERROR - [MainThread:store_data_in_db():20] - Invalid input of type: 'dict'. Convert to a bytes, string, int or float first.
2023-02-20 16:55:18 - ERROR - [MainThread:store_data_in_db():20] - Invalid input of type: 'dict'. Convert to a bytes, string, int or float first.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment