Commit 4f23397a authored by mohammed.shibili's avatar mohammed.shibili

first

parents
# Default ignored files
/shelf/
/workspace.xml
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/venv" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyPep8NamingInspection" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<option name="ignoredErrors">
<list>
<option value="N801" />
</list>
</option>
</inspection_tool>
</profile>
</component>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.9 (DataQualityTask)" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/DataQualityTask.iml" filepath="$PROJECT_DIR$/.idea/DataQualityTask.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
from script.service.receiveData import run
if __name__ == "__main__":
run()
\ No newline at end of file
[api]
api_port = 8000
[mqtt]
mqtt_broker=192.168.0.220
mqtt_port=1883
mqtt_time=10
mqtt_topic=quality/
[redis]
redis_host=127.0.0.1
redis_port=6379
redis_db1=3
redis_db2=4
redis_db3=5
[end_point]
post_end = /data
\ No newline at end of file
import uvicorn
from script.config.appconfig import api_port
if __name__ == "__main__":
uvicorn.run("script.service.GettingData:app", port=api_port, reload=True)
import configparser
config = configparser.RawConfigParser()
config.read("conf/application.conf")
api_port = int(config.get("api", "api_port"))
mqtt_broker = config.get("mqtt", "mqtt_broker")
mqtt_port = int(config.get("mqtt", "mqtt_port"))
mqtt_time = int(config.get("mqtt", "mqtt_time"))
mqtt_topic = config.get("mqtt", "mqtt_topic")
redis_host = config.get("redis", "redis_host")
redis_port = int(config.get("redis", "redis_port"))
redis_db = int(config.get("redis", "redis_db1"))
redis_db2 = int(config.get("redis", "redis_db2"))
redis_db3 = int(config.get("redis", "redis_db3"))
post_end = config.get("end_point", "post_end")
from paho.mqtt import client as mqtt
from script.config.appconfig import mqtt_broker, mqtt_port, mqtt_time
from script.logging.loggers import logger
def mqtt_connection():
try:
mqtt_ = mqtt.Client()
mqtt_.connect(mqtt_broker, mqtt_port, mqtt_time)
print("mqtt connected")
return mqtt_
except Exception as e:
logger.exception(e)
print("mqtt fail", e)
import json
from script.config.appconfig import mqtt_topic
from script.core.handlers.mqttConnection import mqtt_connection
from script.database.basemodel import DataEncoder
def mqtt_publish(data_format):
mqtt_ = mqtt_connection()
data_to_publish = json.dumps(data_format, cls=DataEncoder)
mqtt_.publish(mqtt_topic, data_to_publish)
return json.loads(data_to_publish)
def mqtt_consume():
mqtt_ = mqtt_connection()
return mqtt_
import redis as redis
from script.config.appconfig import redis_host, redis_port, redis_db, redis_db2, redis_db3
r1 = redis.Redis(host=redis_host, port=redis_port, db=redis_db)
r2 = redis.Redis(host=redis_host, port=redis_port, db=redis_db2)
r3 = redis.Redis(host=redis_host, port=redis_port, db=redis_db3)
import json
from script.core.handlers.redisConnection import r1, r2, r3
def store_redis(data_received):
quality = data_received["data_quality"]
print(quality)
data_to_store = data_received["data"]
key = data_received["site_id"]
mapper = {0: QualityInsert.good,
1: QualityInsert.maintenance,
2: QualityInsert.error}
mapper.get(quality)(key, data_to_store)
class QualityInsert:
@staticmethod
def good(key, data):
r1.set(key, json.dumps(data))
@staticmethod
def maintenance(key, data):
r2.set(key, json.dumps(data))
@staticmethod
def error(key, data):
r3.set(key, json.dumps(data))
import json
from pydantic import BaseModel
class DataModel(BaseModel):
PM10: int
PM2_5: int
SO2: int
NO2: int
class DataEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, DataModel):
return obj.dict()
return super().default(obj)
import logging
import os
from logging.handlers import RotatingFileHandler
def get_logger():
# creating rotating logging
__logger__ = logging.getLogger('')
__logger__.setLevel(logging.INFO)
file_path = "script/logging/"
formatter = logging.Formatter('%(asctime)s - %(levelname)-6s - %(message)s', "%Y-%m-%d %H:%M:%S")
log_file = os.path.join(f"{file_path}exceptions.log")
temp_handler = RotatingFileHandler(log_file, maxBytes=1, backupCount=10)
temp_handler.setFormatter(formatter)
__logger__.addHandler(temp_handler)
return __logger__
logger = get_logger()
\ No newline at end of file
from fastapi import FastAPI
from script.config.appconfig import post_end
from script.core.handlers.publishing import mqtt_publish
from script.database.basemodel import DataModel
app = FastAPI()
@app.post(post_end)
async def get_data(data: DataModel, site_id: str, data_quality: int):
data_format = {"data": data, "site_id": site_id, "data_quality": data_quality}
response = mqtt_publish(data_format)
return response
import json
from script.config.appconfig import mqtt_topic
from script.core.handlers.publishing import mqtt_consume
from script.core.handlers.storeToRedis import store_redis
def subscribe(client):
def on_message(client, userdata, msg):
data_received = json.loads(msg.payload.decode())
print(data_received)
store_redis(data_received)
client.subscribe(mqtt_topic)
client.on_message = on_message
def run():
try:
client = mqtt_consume()
subscribe(client)
client.loop_forever()
except KeyboardInterrupt:
client.loop_stop()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment