Commit 2cb67346 authored by arun.uday's avatar arun.uday

migrate to gitlab-pm

parents
MONGO_URI = mongodb://localhost:27017
\ No newline at end of file
# Default ignored files
/shelf/
/workspace.xml
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/venv" />
</content>
<orderEntry type="jdk" jdkName="Python 3.9 (MQTT_FastApi)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyDocumentationSettings">
<option name="format" value="PLAIN" />
<option name="myDocStringFormat" value="Plain" />
</component>
</module>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyUnresolvedReferencesInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredIdentifiers">
<list>
<option value="scripts.config.application_config" />
</list>
</option>
</inspection_tool>
</profile>
</component>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.9 (MQTT_FastApi)" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/MQTT_FastApi.iml" filepath="$PROJECT_DIR$/.idea/MQTT_FastApi.iml" />
</modules>
</component>
</project>
\ No newline at end of file
L
user_personal/script.py.mako,2\5\25f681fa8cfe6b65795cbc53d7303301e928b174
D
user_personal/env.py,c\8\c83348fb8c93bb139761daa30e0e964bf9a6a119
h
8user_personal/versions/7192fecebae7_migration_message.py,9\0\906d7154a0d3c7b0e658750d670f504dfddcacab
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = user_personal
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python-dateutil library that can be
# installed by adding `alembic[tz]` to the pip requirements
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to user_personal/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:user_personal/versions
# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
version_path_separator = os # Use os.pathsep. Default configuration used for new projects.
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
sqlalchemy.url = postgresql://postgres:admin@localhost:5432/user_personal_details
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
import uvicorn
from fastapi import FastAPI
from scripts.config.applications_config import uvicorn_receiver_port
from scripts.logging.loggers import logger
from scripts.services.app_receiver_service import receiver_api
app = FastAPI()
app.include_router(receiver_api)
# starting the application
if __name__ == "__main__":
try:
print("MQTT receiver task")
uvicorn.run(app, port=int(uvicorn_receiver_port))
except Exception as e:
logger.error("Interruption occurred: ", e)
[path]
base_path = scripts/
sub_path = external/
[file]
csv_file = user_data.csv
file_mode = wb
[uvicorn]
uvicorn_port = 8000
receiver_port = 8001
# database servers
[connection]
db_name = user_personal_details
collection = users
[api_routes]
route_index = /
route_insert_csv = /insert-csv/
route_insert_one = /insert-one/
route_view_all = /view-all/
route_view_one = /view-one/
route_update = /update/
route_delete = /delete/
route_receiver = /receiver/
route_receiver_alter = /alter/
[log]
formatter_time = asctime
formatter_level = levelname
[mqtt]
topic = topic
mqtt_host = 192.168.0.220
port = 1883
requests = 60
[db]
db_table = user_personal
[encode]
encode = utf-8
[postgres]
hostname = localhost
port = 5432
user = postgres
password = admin
\ No newline at end of file
import uvicorn
from fastapi import FastAPI
from scripts.config.applications_config import uvicorn_port
from scripts.logging.loggers import logger
from scripts.services.app_service_run import mongo_api
app = FastAPI()
app.include_router(mongo_api)
# starting the application
if __name__ == "__main__":
try:
print("MQTT sender task")
uvicorn.run(app, port=int(uvicorn_port))
except Exception as e:
logger.error("Interruption occurred: ", e)
Balabaster @ file:///home/ktietz/src/ci/alabaster_1611921544520/work Balabaster @ file:///home/ktietz/src/ci/alabaster_1611921544520/work
# reading from the applications.conf
import configparser
import os
import sys
from dotenv import load_dotenv
load_dotenv()
try:
config = configparser.RawConfigParser()
config.read("conf/applications.conf")
# path
base_path = config.get("path", 'base_path')
sub_path = config.get("path", "sub_path")
full_path = base_path + sub_path
# file
file_csv = config.get("file", "csv_file")
csv_path = full_path + file_csv
file_mode = config.get("file", "file_mode")
# mongo server
uri = os.getenv("MONGO_URI")
# log
formatter_time = config.get("log", "formatter_time")
formatter_level = config.get("log", "formatter_level")
# uvicorn
uvicorn_port = config.get("uvicorn", "uvicorn_port")
uvicorn_receiver_port = config.get("uvicorn", "receiver_port")
# mongodb conf
db_name = config.get("connection", "db_name")
collection = config.get("connection", "collection")
# mqtt
topic_name = config.get("mqtt", "topic")
mqtt_host = config.get("mqtt", "mqtt_host")
mqtt_port = config.get("mqtt", "port")
request_no = config.get("mqtt", "requests")
# db name
db_table = config.get("db", "db_table")
# encode
utf_encode = config.get("encode", "encode")
# postgres values
postgres_host_name = config.get("postgres", "hostname")
postgres_port = config.get("postgres", "port")
postgres_user = config.get("postgres", "user")
postgres_password = config.get("postgres", "password")
except Exception as e:
print(e)
sys.stdout.flush()
sys.exit()
# reading conf file
import configparser
import sys
try:
config = configparser.RawConfigParser()
config.read("conf/applications.conf")
# api paths
route_index = config.get("api_routes", "route_index")
route_insert_csv = config.get("api_routes", "route_insert_csv")
route_insert_one = config.get("api_routes", "route_insert_one")
route_view_all = config.get("api_routes", "route_view_all")
route_view_one = config.get("api_routes", "route_view_one")
route_update = config.get("api_routes", "route_update")
route_delete = config.get("api_routes", "route_delete")
# receiver
route_receiver = config.get("api_routes", "route_receiver")
route_receiver_alter = config.get("api_routes", "route_receiver_alter")
except Exception as e:
print(e)
sys.stdout.flush()
sys.exit()
from scripts.config.applications_config import topic_name
from scripts.logging.loggers import logger
def connect_mqtt(client_name, userdata, flags, rc):
try:
print(userdata, " ", flags)
print("Connected with result code " + str(rc))
# subscribing to the topic
client_name.subscribe(topic_name)
except Exception as e:
logger.error("Exception occurred while connecting to mqtt: ", e)
from scripts.logging.loggers import logger
def display_fetched_data(cursor):
try:
# Fetch all the rows as a list of tuples
rows = cursor.fetchall()
# Iterate through the rows and print the data
for row in rows:
print(row)
except Exception as e:
logger.error("Exception occurred while fetching the data from table: ", e)
import json
from scripts.config.applications_config import utf_encode
from scripts.core.handlers.receiver_mqtt.operations_postgres import operations_call_
from scripts.logging.loggers import logger
def message_mqtt(client_name, userdata, msg):
try:
print(client_name, " ", userdata)
# decoding the msg to string
payload_decoded = msg.payload.decode(utf_encode)
# payload to json
json_data = json.loads(payload_decoded)
# if type is list then data will get enter else message will be printed
operations_call_(json_data)
except Exception as e:
logger.error("Exception occurred while decoding the message: ", e)
from paho.mqtt.client import Client
from scripts.core.handlers.receiver_mqtt.connection_mqtt_receiver import connect_mqtt
from scripts.core.handlers.receiver_mqtt.message_mqtt_receive import message_mqtt
from scripts.logging.loggers import logger
def mqtt_receiver_call(mqtt_host, port, request_no):
try:
# client object creation
client = Client(userdata="Receiver")
# connecting to broker
client.connect(mqtt_host, int(port), int(request_no))
client.on_connect = connect_mqtt
# listening to the topic
client.on_message = message_mqtt
client.loop_forever()
except Exception as e:
logger.error("Exception occurred while calling the mqtt: ", e)
from scripts.core.handlers.receiver_mqtt.display_data import display_fetched_data
from scripts.logging.loggers import logger
from scripts.utils.postgres.postgres_utils import PostgresOperations
# database operations
def operations_call_(json_data):
try:
# creating a copy of the data
data_list = json_data
# calling the sqlite operations class
obj_db = PostgresOperations()
# dictionary for multiple functions of the sqlite operations class
operations = {
"insert": obj_db.insert_db,
"fetch": obj_db.fetch_data,
"fetch_one": obj_db.fetch_one,
"update": obj_db.update_db,
"delete": obj_db.delete_db
}
# selecting the operation based on the message from the sender
operation_type = data_list[0]
operation = operations.get(operation_type)
# determining the operation
if operation:
if operation_type in ["insert", "delete", "fetch_one"]:
data_fetched = operation(data_list[1])
elif operation_type == "update":
data_fetched = obj_db.update_db(data_list[1], data_list[2])
else:
data_fetched = operation()
else:
data_fetched = None
# fetching the data from the db
display_fetched_data(data_fetched) if data_fetched else print("")
except Exception as e:
logger.error("Exception occurred while converting to data frame: ", e)
from scripts.config.applications_config import db_table
from scripts.databases.postgres.postgres_psycopg import conn
from scripts.logging.loggers import logger
from scripts.utils.postgres.postgres_utils import PostgresOperations
def alter_table_sql(column_values):
try:
# getting the cursor
cur = conn.cursor()
# executing the alter query
cur.execute(PostgresOperations().alter(db_table, column_values))
# committing to the table
conn.commit()
except Exception as e:
logger.error("Exception occurred while altering table: ", e)
else:
print({"message": "Update Table",
"Status": "Table Updated",
"Data": f'Column Name: {column_values["column_name"]},'
f'Datatype: {column_values["data_type"]},'
f'Length: {column_values["length"]}'})
# update the conditions for updating and deletion
from scripts.logging.loggers import logger
# updating the conditions for the query
def conditions_string(conditions):
try:
str_ = ",".join(f"{key} = '{value}'" for key, value in conditions.items())
return str_
except Exception as e:
logger.error("Exception occurred while making conditions to string: ", e)
# updating the updates for the query
def updates_string(updates):
try:
str_ = ",".join(f"{key} = '{value}'" for key, value in updates.items())
return str_
except Exception as e:
logger.error("Exception occurred while making conditions to string: ", e)
# conditions to dict
from scripts.logging.loggers import logger
def remove_none(condition):
# removing the none and string values from the user inputs
try:
dict_ = {key: value for key, value in condition if
value is not None and value != 'string' and value != 'user@example.com'}
return dict_
except Exception as e:
logger.error("Exception occurred while removing none and string: ", e)
# updating conditions and update conditions
from scripts.logging.loggers import logger
def remove_none_updates(condition, update):
# removing the none and string values from the user inputs
# conditions to dictionary
try:
dict_ = {key: value for key, value in condition if
value is not None and value != 'string' and value != 'user@example.com'}
# update conditions to dictionary
update_ = {key: value for key, value in update if
value is not None and value != 'string' and value != 'user@example.com'}
return dict_, update_
except Exception as e:
logger.error("Exception occurred while removing none and string in update: ", e)
# convert csv to json
import pandas as pd
from scripts.logging.loggers import logger
# convert csv to json
def file_convert_dict(csv_path):
try:
# converting the data to csv
csv_file = pd.read_csv(csv_path)
dict_data = csv_file.to_dict(orient="records")
except Exception as e:
logger.error("Some exception occurred while reading csv: ", e)
else:
return dict_data
# creating the message for the receiver
from scripts.logging.loggers import logger
def create_message_one(msg, dict_data):
try:
# creating message for insertion
list_data = [msg, dict_data]
return list_data
except Exception as e:
logger.error("Exception occurred while creating insertion message: ", e)
def create_message_two(msg, condition, update):
try:
# create message for the update
list_ = [msg, condition, update]
return list_
except Exception as e:
logger.error("Exception occurred while creating message for update: ", e)
def create_message_three(msg, condition):
try:
# create message for fetch one and delete
list_ = [msg, condition]
return list_
except Exception as e:
logger.error("Exception occurred while creating message for fetch: ", e)
# cursor handler
from scripts.logging.loggers import logger
def gen_list(dict_data):
for data in dict_data:
yield data
def cursor_to_list(cursor):
try:
dict_data = gen_list(cursor)
# returning the data to api
# return dict_data
return dict_data
except Exception as e:
logger.error("Some exception occurred while trying to run the cursor: ", e)
import json
from paho.mqtt.client import Client
from scripts.config import applications_config
from scripts.logging.loggers import logger
def extract_send_data(dict_):
try:
# dumps the list to json
json_data = json.dumps(dict_)
# create the paho client
client = Client()
# connect to the mqtt client
client.connect(applications_config.mqtt_host, int(applications_config.mqtt_port))
# publish the topic
client.publish(applications_config.topic_name, json_data)
# disconnecting from the mqtt
client.disconnect()
except Exception as e:
logger.error("Some exception occurred while sending file: ", e)
# file upload to csv
import os
from scripts.config import applications_config
from scripts.config.applications_config import file_mode
from scripts.core.handlers.sender_mqtt.convert_to_dict import file_convert_dict
from scripts.core.handlers.sender_mqtt.create_message import create_message_one
from scripts.core.handlers.sender_mqtt.sending_mqtt import extract_send_data
from scripts.logging.loggers import logger
def extract_uploaded_data(full_path_csv, file_data, db_obj, collection):
try:
# splitting the file to get the extension
filename, file_extension = os.path.splitext(file_data.filename)
# checking if the file is csv
if file_extension == ".csv":
# reading from the file
with open(full_path_csv, file_mode) as file_object:
file_object.write(file_data.file.read())
# converting the data to csv
csv_dict_data = file_convert_dict(applications_config.csv_path)
for_list = file_convert_dict(applications_config.csv_path)
# convert to list
list_ = create_message_one("insert", for_list)
# inserting to the collection
data = db_obj.insert_data_many(collection, csv_dict_data)
# extract and send the data
extract_send_data(list_)
return data
else:
logger.error("Invalid File format")
except Exception as e:
logger.error("Some exception occurred while extracting data: ", e)
from pydantic import BaseModel, EmailStr
from pydantic.class_validators import Optional
# model for the insertion data
class User(BaseModel):
id: int
first_name: str
last_name: str
email: EmailStr
address: str
# model for updating and getting conditions
class UpdateUser(BaseModel):
first_name: Optional[str] = None
last_name: Optional[str] = None
email: Optional[EmailStr] = None
address: Optional[str] = None
class NewColumn(BaseModel):
column_name: str
data_type: str
length: int
from sqlalchemy import INTEGER, VARCHAR, Column
from sqlalchemy.ext.declarative import declarative_base
from scripts.config import applications_config
Base = declarative_base()
class Category(Base):
__tablename__ = applications_config.db_table
id = Column('id', INTEGER, primary_key=True)
first_name = Column('first_name', VARCHAR(255), nullable=False)
last_name = Column('last_name', VARCHAR(255), nullable=False)
email = Column('email', VARCHAR(255), nullable=False)
address = Column('address', VARCHAR(255), nullable=False)
# connect to mongodb
from pymongo import MongoClient
from scripts.config import applications_config
from scripts.logging.loggers import logger
def connection_mongo_db():
try:
# connecting to the mongo
mongo_client = MongoClient(applications_config.uri)
# creating the db
db = mongo_client[applications_config.db_name]
# creating the collection
collection = db[applications_config.collection]
return collection
except Exception as e:
logger.exception("Some exception occurred while connecting to mongo: ", e)
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from scripts.config import applications_config
from scripts.databases.models.sqlalchemy_models.postgres_model import Base
from scripts.logging.loggers import logger
try:
db_url = f'postgresql://{applications_config.postgres_user}:{applications_config.postgres_password}' \
f'@{applications_config.postgres_host_name}:{applications_config.postgres_port}/' \
f'{applications_config.db_name}'
engine = create_engine(db_url)
connection = engine.connect()
Base.metadata.create_all(engine)
LocalSession = sessionmaker(bind=engine)
session = LocalSession()
def commit_to_postgres(data):
session.add(data)
session.commit()
except Exception as e:
logger.error("Exception occurred: ", e)
import psycopg2
from scripts.logging.loggers import logger
try:
conn = psycopg2.connect(
host="localhost",
port=5432,
database="user_personal_details",
user="postgres",
password="admin"
)
except Exception as e:
logger.error("Exception occurred: ", e)
id,first_name,last_name,email,address
1,Maurita,Helks,mhelks0@parallels.com,PO Box 3203
2,Prisca,Baxstar,pbaxstar1@whitehouse.gov,20th Floor
3,Camellia,Gallop,cgallop2@comsenz.com,5th Floor
4,Kingston,Keightley,kkeightley3@rediff.com,Suite 35
5,Suzie,Tredgold,stredgold4@myspace.com,PO Box 76402
6,Ade,Coonihan,acoonihan5@stanford.edu,Apt 376
7,Candida,Kobiela,ckobiela6@jiathis.com,10th Floor
8,Karee,Marty,kmarty7@auda.org.au,Suite 75
9,Granville,Weathey,gweathey8@bloglovin.com,2nd Floor
10,Letty,Brecher,lbrecher9@posterous.com,3rd Floor
11,Tam,Graal,tgraala@dmoz.org,Apt 525
12,Dusty,Causbey,dcausbeyb@drupal.org,Apt 1003
13,Kath,Chattey,kchatteyc@smugmug.com,Apt 1074
14,Rutter,Wattins,rwattinsd@hhs.gov,Room 1920
15,Simonette,Gledstane,sgledstanee@nytimes.com,20th Floor
16,Cassey,Gerrelt,cgerreltf@scribd.com,12th Floor
17,Paige,O'Henery,poheneryg@nature.com,Apt 1111
18,Tabbie,Trever,ttreverh@ask.com,20th Floor
19,Hoebart,Valder,hvalderi@tripod.com,5th Floor
20,Margie,Stollenbecker,mstollenbeckerj@fema.gov,Apt 531
21,Bobby,Beet,bbeetk@odnoklassniki.ru,20th Floor
22,Obie,Le Blanc,oleblancl@bravesites.com,Room 832
23,Alyosha,Ruffli,arufflim@bandcamp.com,Suite 14
24,Syman,Granger,sgrangern@bbb.org,PO Box 19521
25,Slade,Weller,swellero@livejournal.com,Room 1911
26,Robina,Rate,rratep@ask.com,Apt 917
27,Karon,Liveley,kliveleyq@tinypic.com,3rd Floor
28,Patty,McGeouch,pmcgeouchr@paginegialle.it,11th Floor
29,Osborn,Ferriday,oferridays@slashdot.org,Room 1474
30,Ellsworth,Dundin,edundint@wikispaces.com,PO Box 17827
31,Mitch,Stetson,mstetsonu@linkedin.com,Apt 603
32,Hardy,Swansbury,hswansburyv@google.it,Apt 1802
33,Nikolas,Telling,ntellingw@ocn.ne.jp,Room 148
34,Gladi,Florey,gfloreyx@dropbox.com,Suite 55
35,Adela,Overbury,aoverburyy@tinyurl.com,4th Floor
36,Evie,McFfaden,emcffadenz@scribd.com,Suite 88
37,Baldwin,Reaman,breaman10@booking.com,Suite 71
38,Beckie,Leadley,bleadley11@blogs.com,Apt 1592
39,Bendick,Kubes,bkubes12@latimes.com,Room 769
40,Huntlee,Mazzeo,hmazzeo13@columbia.edu,Apt 1564
41,Vally,Dzenisenka,vdzenisenka14@themeforest.net,PO Box 96195
42,Lethia,Bolden,lbolden15@yandex.ru,Suite 80
43,Odille,Chataignier,ochataignier16@slate.com,PO Box 73001
44,Cy,Wescott,cwescott17@reuters.com,17th Floor
45,Darnell,Jays,djays18@yellowbook.com,Room 1722
46,Mikael,Wickwar,mwickwar19@bloglines.com,Room 1780
47,Marguerite,McGahey,mmcgahey1a@1688.com,Room 243
48,Jessie,Gaskill,jgaskill1b@angelfire.com,Suite 98
49,Gardiner,Bestall,gbestall1c@cloudflare.com,17th Floor
50,Jo-ann,Rickett,jrickett1d@live.com,Room 903
51,Gavra,Moquin,gmoquin1e@quantcast.com,Room 1139
52,Dylan,Slewcock,dslewcock1f@trellian.com,Apt 1256
53,Danyette,Coolson,dcoolson1g@biglobe.ne.jp,Apt 452
54,Roderich,Ashworth,rashworth1h@163.com,Apt 1416
55,Hildagarde,Haucke,hhaucke1i@ox.ac.uk,Suite 7
56,Darlleen,Bearsmore,dbearsmore1j@google.com,Suite 43
57,Wilmette,Bedwell,wbedwell1k@weebly.com,Suite 65
58,Neville,Swapp,nswapp1l@ocn.ne.jp,Apt 1054
59,Norene,Kopacek,nkopacek1m@domainmarket.com,Suite 38
60,Aubert,Streeting,astreeting1n@cocolog-nifty.com,Suite 82
61,Katrine,Easson,keasson1o@bizjournals.com,Room 1306
62,Celle,Elgie,celgie1p@sciencedirect.com,16th Floor
63,Alvan,Curtin,acurtin1q@booking.com,16th Floor
64,Cornela,Roder,croder1r@google.pl,2nd Floor
65,Wanids,Dansie,wdansie1s@fc2.com,18th Floor
66,Kip,Fillon,kfillon1t@imgur.com,Suite 82
67,Holmes,Kidstoun,hkidstoun1u@usnews.com,Apt 17
68,Freddy,Featherstone,ffeatherstone1v@parallels.com,13th Floor
69,Gusella,Jodlkowski,gjodlkowski1w@chronoengine.com,Apt 515
70,Marietta,Circuit,mcircuit1x@nydailynews.com,18th Floor
71,Brander,Hackley,bhackley1y@drupal.org,Suite 91
72,Nikola,Hughes,nhughes1z@elegantthemes.com,Suite 34
73,Krisha,Aberkirder,kaberkirder20@blogs.com,Suite 70
74,Shina,Meriott,smeriott21@rakuten.co.jp,18th Floor
75,Debra,Sproat,dsproat22@wikia.com,Apt 1650
76,Dunc,Wallworke,dwallworke23@devhub.com,PO Box 58145
77,Rivalee,Gonnet,rgonnet24@oracle.com,Suite 97
78,Regine,Keling,rkeling25@edublogs.org,Room 1519
79,Ashla,Fair,afair26@ucla.edu,Apt 1213
80,Arabela,Theakston,atheakston27@nsw.gov.au,Apt 1277
81,Barney,Grimsdale,bgrimsdale28@reddit.com,PO Box 144
82,Lucho,Braunlein,lbraunlein29@cisco.com,PO Box 26380
83,Jania,Eldredge,jeldredge2a@bloglovin.com,Suite 3
84,Hector,Remer,hremer2b@va.gov,Suite 51
85,Hans,Cantrell,hcantrell2c@bbb.org,Apt 1834
86,Jacquelynn,Kuhne,jkuhne2d@unc.edu,PO Box 70594
87,Way,Moogan,wmoogan2e@chronoengine.com,PO Box 51805
88,Janenna,Costa,jcosta2f@symantec.com,Room 1115
89,Hermine,Bridywater,hbridywater2g@weibo.com,13th Floor
90,Claudette,Highman,chighman2h@ftc.gov,Room 1359
91,Reinhold,Bromehed,rbromehed2i@cargocollective.com,9th Floor
92,Katleen,Dohmer,kdohmer2j@naver.com,PO Box 73746
93,Kellen,Hull,khull2k@eepurl.com,Apt 1569
94,Chrotoem,Lorincz,clorincz2l@behance.net,PO Box 8905
95,Worth,Putton,wputton2m@qq.com,PO Box 91027
96,Luce,Lyford,llyford2n@nps.gov,Room 739
97,Van,Warre,vwarre2o@zimbio.com,Room 1771
98,Kikelia,Benardette,kbenardette2p@vkontakte.ru,PO Box 74859
99,Fabio,Grigoli,fgrigoli2q@plala.or.jp,Apt 1785
100,Rhodia,Batch,rbatch2r@alibaba.com,Room 1597
import logging
import os
from logging.handlers import RotatingFileHandler
from scripts.config import applications_config
from scripts.config.applications_config import formatter_time, formatter_level
def get_logger():
"""
Creates a rotating log
"""
__logger__ = logging.getLogger('')
# setting the logger level
__logger__.setLevel(logging.INFO)
# creating the format for the log
log_formatter = f'%({formatter_time})s - %({formatter_level})-6s - %(message)s'
time_format = "%Y-%m-%d %H:%M:%S"
# getting the path for the logger
file_path = applications_config.full_path
# setting the format
formatter = logging.Formatter(log_formatter, time_format)
# creating the folder if not exist
if not os.path.exists(file_path):
os.makedirs(file_path)
# joining the path
log_file = os.path.join(f"{file_path}{applications_config.db_name}.log")
# creating rotating file handler with max byte as 1
temp_handler = RotatingFileHandler(log_file, maxBytes=1)
# setting the formatter
temp_handler.setFormatter(formatter)
# setting the handler
__logger__.addHandler(temp_handler)
return __logger__
logger = get_logger()
# receiver app api
from fastapi import APIRouter
from scripts.config import applications_config
from scripts.constants import api_path_config
from scripts.core.handlers.receiver_mqtt.mqtt_receive_data import mqtt_receiver_call
from scripts.core.handlers.receiver_mqtt.table_add_col import alter_table_sql
from scripts.databases.models.pydantic_model_ import NewColumn
from scripts.logging.loggers import logger
receiver_api = APIRouter()
@receiver_api.post(api_path_config.route_receiver)
def receiver():
try:
# fetching data from the topic
mqtt_receiver_call(applications_config.mqtt_host, applications_config.mqtt_port, applications_config.request_no)
except Exception as e:
logger.error("Exception occurred: ", e)
# alter the table
@receiver_api.post(api_path_config.route_receiver_alter)
def alter_table(column_values: NewColumn):
try:
# alter the table
return alter_table_sql(dict(column_values))
except Exception as e:
logger.error("Exception occurred: ", e)
# api for convert, read, insert, update and delete
from fastapi import UploadFile, File, APIRouter
from scripts.config import applications_config
from scripts.constants import api_path_config
from scripts.core.handlers.sender_mqtt.conditions_to_dictionary import remove_none
from scripts.core.handlers.sender_mqtt.create_message import create_message_one
from scripts.core.handlers.sender_mqtt.cursor_handle import cursor_to_list
from scripts.core.handlers.sender_mqtt.sending_mqtt import extract_send_data
from scripts.core.handlers.sender_mqtt.uploaded_file_ import extract_uploaded_data
from scripts.databases.models.pydantic_model_ import User, UpdateUser
from scripts.databases.mongo.connect_mongo import connection_mongo_db
from scripts.logging.loggers import logger
from scripts.utils.mongo.mongo_utils import MongoDB
# mongo app api
mongo_api = APIRouter()
# collections
collection = connection_mongo_db()
# mongo class object
db_obj = MongoDB()
# index page
@mongo_api.get(api_path_config.route_index)
def start_app():
return "Starting the app"
@mongo_api.post(api_path_config.route_insert_csv)
def csv_file(file_data: UploadFile = File(...)):
try:
# extracting and inserting the csv to mongo and sending mqtt messages
return extract_uploaded_data(applications_config.csv_path, file_data, db_obj, collection)
except Exception as e:
logger.error("Some exception occurred while running csv insertion: ", e)
@mongo_api.post(api_path_config.route_insert_one)
def insert_one(user_data: User):
try:
# inserting one data and sending mqtt messages
data = db_obj.insert_data_one(collection, user_data)
# convert the data to dictionary
dict_ = dict(user_data)
# creating the message for the receiver
list_ = create_message_one("insert", [dict_])
extract_send_data(list_)
except Exception as e:
logger.error("Some exception occurred while running insertion: ", e)
else:
return data
@mongo_api.post(api_path_config.route_view_all)
def view_data_all():
try:
# fetching data from the mongo and sending messages to mqtt
cursor = db_obj.find_data_all(collection)
# creating the data from the cursor
data_fetch = cursor_to_list(cursor)
except Exception as e:
logger.error("Some exception occurred while running view all: ", e)
else:
return {"message": "Fetch Data",
"Status": "Data Fetched",
"Data": data_fetch}
@mongo_api.post(api_path_config.route_view_one)
def view_data_one(condition: UpdateUser):
try:
# fetching data from the mongo and sending messages to mqtt
cursor = db_obj.find_data_one(collection, condition)
# creating the data from the cursor
data_fetch = cursor_to_list(cursor)
except Exception as e:
logger.error("Some exception occurred while running view all: ", e)
else:
return {"message": f"Fetch Data {remove_none(condition)}",
"Status": "Data Fetched",
"Data": f'{data_fetch}'}
@mongo_api.put(api_path_config.route_update)
def update_data(condition: UpdateUser, update: UpdateUser):
try:
# updating data from the mongo and sending messages to mqtt
list_ = db_obj.update_data(collection, condition, update)
extract_send_data(list_)
except Exception as e:
logger.error("Some exception occurred while running update: ", e)
else:
return {"message": f"Update Data {remove_none(condition)} {remove_none(update)}",
"Status": "Data Updated",
"Data": f'{list_}'}
@mongo_api.delete(api_path_config.route_delete)
def delete_data(condition: UpdateUser):
try:
list_ = db_obj.delete_data(collection, condition)
# creating the message
extract_send_data(list_)
except Exception as e:
logger.error("Some exception occurred while running delete: ", e)
else:
return {"message": f"Update Data {remove_none(condition)}",
"Status": "Data Updated",
"Data": f'{list_}'}
# mongo operations
from scripts.core.handlers.sender_mqtt.conditions_to_dictionary import remove_none
from scripts.core.handlers.sender_mqtt.conditions_updates import remove_none_updates
from scripts.core.handlers.sender_mqtt.create_message import create_message_two, create_message_three
from scripts.core.handlers.sender_mqtt.sending_mqtt import extract_send_data
from scripts.logging.loggers import logger
class MongoDB:
@staticmethod
def insert_data_many(collections, csv_dict_data):
try:
# inserting csv data to mongo
collections.insert_many(csv_dict_data)
except Exception as e:
logger.error("Some exception occurred while running insertion of csv: ", e)
else:
return {"message": "Insert Data",
"Status": "Data Inserted",
"Data": f'{csv_dict_data}'}
@staticmethod
def insert_data_one(collections, user_data):
try:
# inserting single data to mongo
collections.insert_one(dict(user_data))
except Exception as e:
logger.error("Some exception occurred while running insertion: ", e)
else:
return {"message": "Insert Data",
"Status": "Data Inserted",
"Data": f'{dict(user_data)}'}
@staticmethod
def find_data_all(collections):
try:
# finding all the data from mongo
cursor = collections.find({}, {"_id": 0})
# creating a message for the receiver
list_ = create_message_two("fetch", "", "")
extract_send_data(list_)
return cursor
except Exception as e:
logger.error("Some exception occurred while fetching: ", e)
@staticmethod
def find_data_one(collections, condition):
try:
# making conditions to a form that can be used for fetching
new_conditions = remove_none(condition)
# fetching the data based on the query
cursor = collections.find(new_conditions, {"_id": 0})
# creating a message for the receiver
list_ = create_message_three("fetch_one", new_conditions)
extract_send_data(list_)
return cursor
except Exception as e:
logger.error("Some exception occurred while fetching one : ", e)
@staticmethod
def update_data(collections, condition, update):
try:
# making conditions and updates to a form that can be used for updating
new_conditions, update_ = remove_none_updates(condition, update)
# creating a message for the receiver
list_ = create_message_two("update", new_conditions, update_)
collections.update_one(new_conditions, {"$set": update_})
return list_
except Exception as e:
logger.error("Some exception occurred while updating: ", e)
@staticmethod
def delete_data(collections, condition):
try:
# making conditions to a form that can be used for fetching
new_conditions = remove_none(condition)
# creating a message for the receiver
list_ = create_message_three("delete", new_conditions)
collections.delete_one(new_conditions)
return list_
except Exception as e:
logger.error("Some exception occurred while deleting: ", e)
import pandas as pd
from sqlalchemy import text
from scripts.core.handlers.receiver_mqtt.updating_conditions import conditions_string
from scripts.databases.models.sqlalchemy_models.postgres_model import Category
from scripts.databases.postgres.postgres_connect import commit_to_postgres, session
from scripts.logging.loggers import logger
class PostgresOperations:
@staticmethod
def insert_db(data):
try:
for users in data:
insert_data = Category(first_name=users["first_name"],
last_name=users["last_name"],
email=users["email"],
address=users["address"])
commit_to_postgres(insert_data)
except Exception as e:
logger.error("Exception occurred while inserting data: ", e)
else:
print({"message": "Insert Data",
"Status": "Data Inserted",
"Data": data})
@staticmethod
def fetch_data():
try:
# Create a cursor
data_obj = session.query(Category.first_name, Category.last_name, Category.email, Category.address)
if data_obj is None:
print("Empty table")
data_list = [row for row in data_obj]
data_frame = pd.DataFrame(data_list, index=None)
except Exception as e:
logger.error("Exception occurred while fetching data: ", e)
else:
print({"message": "Fetch Data",
"Status": "Data Fetched",
"Data": f'{data_frame.to_string(index=False)}'})
@staticmethod
def fetch_one(conditions):
try:
# Create a cursor
conditions_ = conditions_string(conditions)
data_obj = session.query(Category.first_name, Category.last_name, Category.email, Category.address).filter(
text(conditions_)
)
if data_obj is None:
return
data_list = [row for row in data_obj]
data_frame = pd.DataFrame(data_list, index=None)
except Exception as e:
logger.error("Exception occurred while fetching data one: ", e)
else:
print({"message": f"Fetch Data{conditions_}",
"Status": "Data Fetched",
"Data": f'{data_frame.to_string(index=False)}'})
@staticmethod
def update_db(conditions, updates):
try:
conditions_ = conditions_string(conditions)
session.query(Category).filter(
text(conditions_)
).update(
updates
)
session.commit()
except Exception as e:
logger.error("Exception occurred while updating data: ", e)
else:
print({"message": "Update Data",
"Status": "Data Updated",
"Data": f'{conditions_} {updates}'})
@staticmethod
def delete_db(conditions):
try:
# changing the conditions for the query
conditions_ = conditions_string(conditions)
session.query(Category).filter(
text(conditions_)
).delete()
session.commit()
except Exception as e:
logger.error("Exception occurred while deleting data: ", e)
else:
print({"message": "Delete Data",
"Status": "Data Deleted",
"Data": f'{conditions_}'})
@staticmethod
def alter(db_table, column_values):
try:
query = f"ALTER TABLE {db_table} ADD COLUMN {column_values['column_name']} {column_values['data_type']} " \
f"({int(column_values['length'])})"
return query
except Exception as e:
logger.error("Exception occurred while deleting data: ", e)
Generic single-database configuration.
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment