Commit a6140124 authored by arun.uday's avatar arun.uday

migrate to gitlab-pm

parents
# Default ignored files
/shelf/
/workspace.xml
task7_mqtt_sender
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.9 (task7)" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/task7_mqtt_receiver.iml" filepath="$PROJECT_DIR$/.idea/task7_mqtt_receiver.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/venv" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
import uvicorn
from scripts.logging.loggers import logger
from scripts.config.application_config import uvicorn_port
# pip install python-multipart
if __name__ == "__main__":
try:
print("MQTT task")
uvicorn.run("main:app", port=int(uvicorn_port))
except Exception as e:
logger.error("Interruption occurred: ", e)
[path]
base_path = scripts/
sub_path = temp/
core_path = core/
handlers_path = handlers/
[file]
file_name_csv = data.csv
file_name_json = data.json
file_mode = wb
[mqtt]
topic = topic
mqtt_host = 192.168.0.220
port = 1883
requests = 60
[uvicorn]
uvicorn_port = 8000
[api_routes]
api_upload = /upload/
api_submit = /submit_file/
api_csv = /csv/
[dataframes]
orientation = records
[html]
html_template = upload_file.html
[log]
formatter_time = asctime
formatter_level = levelname
\ No newline at end of file
import uvicorn
from fastapi import FastAPI
from scripts.config.application_config import uvicorn_port
from scripts.logging.loggers import logger
from scripts.services.app_services_run import sender_tasks
app = FastAPI()
app.include_router(sender_tasks)
# starting the application
if __name__ == "__main__":
try:
print("Fast API task")
uvicorn.run(app, port=int(uvicorn_port))
except Exception as e:
logger.error("Interruption occurred: ", e)
import configparser
config = configparser.RawConfigParser()
config.read("conf/applications.conf")
# path
base_path = config.get("path", 'base_path')
sub_path = config.get("path", "sub_path")
core_path = config.get("path", "core_path")
handlers_path = config.get("path", "handlers_path")
# file name
file_name_csv = config.get("file", "file_name_csv")
file_name_json = config.get("file", "file_name_json")
file_mode = config.get("file", "file_mode")
full_path = base_path + sub_path
full_path_csv = full_path + file_name_csv
full_path_json = full_path + file_name_json
# mqtt
topic_name = config.get("mqtt", "topic")
mqtt_host = config.get("mqtt", "mqtt_host")
mqtt_port = config.get("mqtt", "port")
request_no = config.get("mqtt", "requests")
# uvicorn
uvicorn_port = config.get("uvicorn", "uvicorn_port")
# log
formatter_time = config.get("log", "formatter_time")
formatter_level = config.get("log", "formatter_level")
# data frame
orientation = config.get("dataframes", "orientation")
# html template
html_template = config.get("html", "html_template")
import configparser
config = configparser.RawConfigParser()
config.read("conf/applications.conf")
# path
api_upload = config.get("api_routes", 'api_upload')
api_submit = config.get("api_routes", 'api_submit')
api_csv = config.get("api_routes", 'api_csv')
# dictionary to json
import pandas as pd
from scripts.config.application_config import orientation
from scripts.logging.loggers import logger
# extracting the csv to dictionary
def extract_data(csv_file):
try:
data_csv = pd.read_csv(csv_file)
json_data = data_csv.to_dict(orient=orientation)
return json_data
except Exception as e:
logger.error("Some exception occurred while reding the csv file: ", e)
import json
from paho.mqtt.client import Client
from scripts.config import application_config
from scripts.core.handlers.dict_to_json import extract_data
from scripts.core.handlers.remove_list_json import remove_list
from scripts.logging.loggers import logger
def extract_send_data():
try:
dict_data = extract_data(application_config.full_path_csv)
list_removed = remove_list(dict_data)
# dumps the list to json
json_data = json.dumps(list_removed)
# create the paho client
client = Client()
client.connect(application_config.mqtt_host, int(application_config.mqtt_port))
# publish the topic
client.publish(application_config.topic_name, json_data)
client.disconnect()
except Exception as e:
logger.error("Some exception occurred while sending file: ", e)
# remove list from json
from scripts.logging.loggers import logger
def remove_list(json_data_list):
try:
result = json_data_list
dict_cols = {}
for i in range(0, len(result)):
key_dict = i
val_dict = result[i]
dict_cols.update({key_dict: val_dict})
return dict_cols
except Exception as e:
logger.error("Some exception occurred while converting json to dictionary: ", e)
<!DOCTYPE HTML>
<html lang="en">
<title>File Upload</title>
<body>
<!-- form upload -->
<form enctype = "multipart/form-data" action = "/submit_file/" method = "post">
<p>Upload File: <input type = "file" name = "file_data" /></p>
<p><input type = "submit" value = "Upload" /></p>
</form>
</body>
</html>
# file upload to csv
from scripts.config.application_config import file_mode
from scripts.logging.loggers import logger
def extract_uploaded_data(full_path_csv, file_data):
try:
# reading the csv
with open(full_path_csv, file_mode) as file_object:
file_object.write(file_data.file.read())
except Exception as e:
logger.error("Some exception occurred while reading the file: ", e)
import logging
import os
from logging.handlers import RotatingFileHandler
from scripts.config import application_config
from scripts.config.application_config import formatter_time, formatter_level
def get_logger():
"""
Creates a rotating log
"""
__logger__ = logging.getLogger('')
__logger__.setLevel(logging.INFO)
log_formatter = f'%({formatter_time})s - %({formatter_level})-6s - %(message)s'
time_format = "%Y-%m-%d %H:%M:%S"
file_path = application_config.full_path
formatter = logging.Formatter(log_formatter, time_format)
if not os.path.exists(file_path):
os.makedirs(file_path)
log_file = os.path.join(f"{file_path}{application_config.topic_name}.log")
temp_handler = RotatingFileHandler(log_file, maxBytes=1)
temp_handler.setFormatter(formatter)
__logger__.addHandler(temp_handler)
return __logger__
logger = get_logger()
from fastapi import APIRouter, Request, UploadFile, File
from fastapi.templating import Jinja2Templates
from fastapi.responses import HTMLResponse
from scripts.constants.api_routes_config import api_upload, api_submit, api_csv
from scripts.config import application_config
from scripts.core.handlers.extracting_sending import extract_send_data
from scripts.core.handlers.uploaded_file_ import extract_uploaded_data
from scripts.logging.loggers import logger
sender_tasks = APIRouter()
templates = Jinja2Templates(application_config.base_path + application_config.core_path +
application_config.handlers_path)
# html form access using api
@sender_tasks.get(api_upload, response_class=HTMLResponse)
def template_render(request: Request):
try:
return templates.TemplateResponse(application_config.html_template, {"request": request})
except Exception as e:
logger.error("Some exception occurred : ", e)
# handling the form submit using api
@sender_tasks.post(api_submit)
async def submit_file_data(file_data: UploadFile = File(...)):
try:
extract_uploaded_data(application_config.full_path_csv, file_data)
except Exception as e:
logger.error("Some exception occurred while extracting the file: ", e)
else:
return {"msg": "file uploading successful..."}
# reading csv and loading the data to json finally publishing it to mqtt
@sender_tasks.get(api_csv)
def dict_to_json():
try:
extract_send_data()
except Exception as e:
logger.error("Some exception occurred: ", e)
else:
print("Data published")
return {"msg": "Data published"}
id,first_name,last_name
1,Marna,Pergens
2,Morie,Iddens
3,Ilysa,Thonger
4,Catherine,Dundendale
5,Randene,Brookz
6,Hester,Yeude
7,Philomena,Dyott
8,Sarene,Petren
9,Rosella,Kinavan
10,El,Creamer
id,first_name,last_name
1,Marna,Pergens
2,Morie,Iddens
3,Ilysa,Thonger
4,Catherine,Dundendale
5,Randene,Brookz
6,Hester,Yeude
7,Philomena,Dyott
8,Sarene,Petren
9,Rosella,Kinavan
10,El,Creamer
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment