Commit df131074 authored by arun.uday's avatar arun.uday

migrate to gitlab-pm

parents
# Default ignored files
/shelf/
/workspace.xml
task4.1_mongo_post
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.9 (task4.1mongo_post)" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/task4.1_mongo_post.iml" filepath="$PROJECT_DIR$/.idea/task4.1_mongo_post.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/venv" />
</content>
<orderEntry type="jdk" jdkName="Python 3.9 (task4.1mongo_post)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
# main application for database task
from scripts.services.access_functions import mongo_db_access_op, postgres_db_access_op
# mongodb
db_choice = input("Enter mongo or postgres")
if db_choice == 'mongo':
print("Mongo DB")
mongo_db_access_op()
elif db_choice == 'postgres':
print("PostgresSQL")
result = postgres_db_access_op()
if not result:
print("Lost in atoms")
else:
print("Successful")
else:
print("Lost in atoms")
# database servers
[connection]
mongodb = mongodb://localhost:27017
[postgres]
hostname = localhost
port = 5432
dbname = storemanager
user = postgres
password = admin
table_name = products
# accessing conf variables
import configparser
# creating configparser
configparse = configparser.ConfigParser()
configparse.read("conf/db.conf")
# db connection
client_connect = configparse.get("connection", "mongodb")
# postgres connect
import configparser
conf_postgres = configparser.ConfigParser()
conf_postgres.read("conf/db.conf")
# read values
host_name = conf_postgres.get("postgres", "hostname")
port = conf_postgres.get("postgres", "port")
dbname = conf_postgres.get("postgres", "dbname")
user = conf_postgres.get("postgres", "user")
password = conf_postgres.get("postgres", "password")
products = conf_postgres.get("postgres", "table_name")
class ConstantsValues:
# Exception constant
exception_msg = "Exception occurred"
# column names
columns_names = ['product_id', 'product_name', 'product_category', 'retail_price',
'discount_price', 'description', 'overall_rating']
# connecting to mongodb server
from pymongo import MongoClient
from scripts.config.mongo_connect_config import client_connect
# connect to server
client = MongoClient(client_connect)
class MongoDBQuery:
@staticmethod
def insert(db, p_id, p_name, p_category, retail_price, discount_price, description, over_rating):
cur = db.insert_one({"product_id": p_id, "product_name": p_name, "product_category": p_category,
"retail_price": retail_price, "discount_price": discount_price,
"description": description, "overall_rating": over_rating})
return cur
@staticmethod
def update(db, condition_values, update_data):
update_d = db.update_many(condition_values, update_data)
return update_d
@staticmethod
def view(db, field_filter):
view_data = db.find(field_filter, {'_id': 0})
return view_data
@staticmethod
def delete(db, condition_values):
delete_data = db.delete_one(condition_values)
return delete_data
# postgres db connection
import psycopg2 as py
from scripts.config.postgres_connect_config import host_name, port, dbname, user, password
from scripts.constants.proj_constants import ConstantsValues
from scripts.core.database.postgres.queries.post_queries import QueriesPost
try:
postgres_client = py.connect(host=host_name, port=port, user=user, password=password)
postgres_client.set_session(autocommit=True)
# creating cursor for the postgres
cur = postgres_client.cursor()
cur.execute(QueriesPost().databases())
# Fetch the result
result = cur.fetchall()
# check if database exist
if (dbname,) in result:
postgres_client = py.connect(host=host_name, port=port, dbname=dbname, user=user, password=password)
postgres_client.set_session(autocommit=True)
cur = postgres_client.cursor()
else:
# end current
postgres_client.commit()
cur.execute(f'CREATE DATABASE {dbname}')
except Exception as e:
print(f'{ConstantsValues().exception_msg},{e}')
from scripts.config.postgres_connect_config import products
class QueriesPost:
@staticmethod
def databases():
db = "SELECT datname FROM pg_database WHERE datistemplate = false;"
return db
@staticmethod
def create_table():
table = f"CREATE TABLE if not exists {products} " \
f"(product_id SERIAL PRIMARY KEY, product_name VARCHAR(255)," \
f"product_category VARCHAR(255)," \
f"retail_price INT," \
f"discounted_price INT," \
f"description VARCHAR(255)," \
f"overall_rating INT)"
return table
@staticmethod
def insert_data():
insert_data = f"INSERT INTO {products} (product_name, product_category, retail_price, discounted_price," \
f"description, overall_rating) VALUES (%s, %s, %s, %s, %s, %s)"
return insert_data
@staticmethod
def update_data(updates, conditions):
view_data = f"UPDATE {products} SET {updates} WHERE {conditions}"
return view_data
@staticmethod
def fetch_conditions(conditions):
fetch_data = f"SELECT * FROM {products} WHERE {conditions}"
return fetch_data
@staticmethod
def fetch_all():
fetch_data = f"SELECT * FROM {products} ORDER BY product_id ASC"
return fetch_data
@staticmethod
def delete_data(del_data):
del_val = f"DELETE FROM {products} WHERE {del_data}"
return del_val
# reading user inputs
from scripts.core.database.mongo.queries.mongodb_queires import MongoDBQuery
def read_inputs():
p_id = int(input("Enter the product id: "))
p_name = input("Enter the product name: ")
p_category = input("Enter the category: ")
retail_price = int(input("Enter the retail price: "))
discount_price = int(input("Enter the discounted price: "))
description = input("Enter the product description: ")
over_rating = int(input("Enter the overall rating"))
return p_id, p_name, p_category, retail_price, discount_price, description, over_rating
# inserting data to mongo
def insert_data_db(db, p_id, p_name, p_category, retail_price, discount_price, description, over_rating):
try:
insert_res = MongoDBQuery().insert(db, p_id, p_name, p_category, retail_price, discount_price, description,
over_rating)
return insert_res
except Exception as e:
print("Exception occurred :", e)
# deleting data
# getting user inputs
from scripts.constants.proj_constants import ConstantsValues
from scripts.core.database.mongo.queries.mongodb_queires import MongoDBQuery
def delete_db_input():
columns_ = ConstantsValues().columns_names
integer_val = [0, 3, 4, 6]
counter = 0
condition_values = {}
print("Conditions")
while counter < len(columns_):
field_value = input(f"Enter the {columns_[counter]} data: ")
if field_value != '' and (counter in integer_val):
condition_values.update({f'{columns_[counter]}': int(field_value)})
if field_value != '' and (counter not in integer_val):
condition_values.update({f'{columns_[counter]}': f'{field_value}'})
counter += 1
return condition_values
# deleting the record
def db_delete(db, condition_values):
cur = MongoDBQuery().delete(db, condition_values)
return cur
from scripts.core.database.postgres.postgres_connect import cur
from scripts.core.database.postgres.queries.post_queries import QueriesPost
def post_columns():
# Execute a SELECT statement to get the column names of a table
cur.execute(QueriesPost().fetch_all())
# Get the column names
field_names = [desc[0] for desc in cur.description]
return field_names
# class for mongodb operations
from scripts.config.postgres_connect_config import dbname, products
from scripts.constants.proj_constants import ConstantsValues
from scripts.core.database.mongo.mongodb_connect import client
from scripts.core.handlers.data_insert import insert_data_db, read_inputs
from scripts.core.handlers.delete_db_data import delete_db_input, db_delete
from scripts.core.handlers.update_mongo_db import db_update, input_update_db
from scripts.core.handlers.view_data_db import view_fields, view_cond
class MongoDbStart:
# create mongo table
@staticmethod
def mongo_create():
try:
client_con = client
except Exception as e:
print("Connection failed: ", e)
else:
db = client_con[dbname]
store_manager = db[products]
return store_manager
# mongo data insert
@staticmethod
# getting input and insert into db
def mongo_insert(db):
try:
num_prod = int(input("Enter the number of products: "))
while num_prod > 0:
p_id, p_name, p_category, retail_price, discount_price, description, over_rating = read_inputs()
insert_check = insert_data_db(db, p_id, p_name,
p_category, retail_price, discount_price, description, over_rating)
if insert_check.acknowledged:
print("Data inserted....")
num_prod -= 1
else:
print("Something went wrong data cannot be inserted....")
except Exception as e:
print("Insertion failed", e)
# mongo update table
@staticmethod
def mongo_update(db):
try:
condition_values, update_values = input_update_db()
update_check = db_update(db, condition_values, update_values)
if update_check.matched_count > 0:
print("Data updated")
else:
print("Data not updated")
except Exception as e:
print(f'{ConstantsValues().exception_msg}{e}')
# mongo delete data
@staticmethod
def mongo_delete(db):
try:
condition_values = delete_db_input()
del_check = db_delete(db, condition_values)
if del_check.deleted_count > 0:
print("Data deleted")
else:
print("Data not deleted")
except Exception as e:
print(f'{ConstantsValues().exception_msg}{e}')
# mongo view data
@staticmethod
def mongo_view(db):
try:
field_filter = view_cond()
view_fields(db, field_filter)
except Exception as e:
print(f'{ConstantsValues().exception_msg}{e}')
# deleting data
# getting user inputs
from scripts.constants.proj_constants import ConstantsValues
from scripts.core.database.postgres.queries.post_queries import QueriesPost
# user input
def delete_db_post():
columns_ = ConstantsValues().columns_names
integer_val = [0, 3, 4, 6]
counter = 0
condition_values = {}
print("Conditions")
while counter < len(columns_):
field_value = input(f"Enter the {columns_[counter]} data: ")
if field_value != '' and (counter in integer_val):
condition_values.update({f'{columns_[counter]}': int(field_value)})
if field_value != '' and (counter not in integer_val):
condition_values.update({f'{columns_[counter]}': f'{field_value}'})
counter += 1
return condition_values
# deleting the record
def db_delete_post(cur, condition_values):
conditions = [f"{field} = '{value}'" for field, value in condition_values.items()]
conditions = "and ".join(conditions)
if conditions:
cur = cur.execute(QueriesPost().delete_data(condition_values))
else:
cur = None
return cur
# reading user inputs
from scripts.constants.proj_constants import ConstantsValues
from scripts.core.database.postgres.queries.post_queries import QueriesPost
# user inputs
def read_inputs_post():
p_name = input("Enter the product name: ")
p_category = input("Enter the category: ")
retail_price = int(input("Enter the retail price: "))
discount_price = int(input("Enter the discounted price: "))
description = input("Enter the product description: ")
over_rating = input("Enter the overall rating")
return p_name, p_category, retail_price, discount_price, description, over_rating
# insert in to table
def insert_data_post(cur, p_name, p_category, retail_price, discount_price, description, over_rating):
try:
val_data = [p_name, p_category, retail_price, discount_price, description, over_rating]
cur.execute(QueriesPost().insert_data(), val_data)
except Exception as e:
print(f'{ConstantsValues().exception_msg},{e}')
return False
else:
return True
# updating the db
from scripts.constants.proj_constants import ConstantsValues
from scripts.core.database.postgres.queries.post_queries import QueriesPost
# user data
def input_update_post():
columns_ = ConstantsValues().columns_names
counter = 0
condition_values = {}
print("Conditions")
while counter < len(columns_):
field_value = input(f"Enter the {columns_[counter]} data: ")
if field_value != '':
condition_values.update({columns_[counter]: field_value})
counter += 1
counter = 1
update_values = {}
print("New values")
while counter < len(columns_):
field_value = input(f"Enter the {columns_[counter]} data: ")
if field_value != '':
update_values.update({columns_[counter]: field_value})
counter += 1
return condition_values, update_values
# updating the database
def db_update_post(cur, condition_values, update_values):
conditions = [f"{field} = '{value}'" for field, value in condition_values.items()]
conditions = "and ".join(conditions)
updates = [f"{field} = '{value}'" for field, value in update_values.items()]
updates = ", ".join(updates)
if conditions and updates:
cur = cur.execute(QueriesPost().update_data(updates, conditions))
else:
cur = None
return cur
# view db data
# display conditions
import pandas as pd
from scripts.constants.proj_constants import ConstantsValues
from scripts.core.database.postgres.queries.post_queries import QueriesPost
from scripts.core.handlers.get_post_columns import post_columns
# user input for fetch
def input_view_post():
columns_ = post_columns()
counter = 0
condition_values = {}
print("Conditions")
while counter < len(columns_):
field_value = input(f"Enter the {columns_[counter]} data: ")
if field_value != '':
condition_values.update({columns_[counter]: field_value})
counter += 1
return condition_values
# view the data
def view_fields_post(cur, condition_values):
conditions = [f"{field} = '{value}'" for field, value in condition_values.items()]
conditions = " and ".join(conditions)
if conditions:
print(conditions)
cur.execute(QueriesPost().fetch_conditions(conditions))
cursor_data = cur.fetchall()
if cursor_data:
print("Data collected....")
view_data = pd.DataFrame((view_data_record for view_data_record in cursor_data),
columns=ConstantsValues().columns_names)
print(view_data)
else:
print("Data fetching failed")
else:
cur.execute(QueriesPost().fetch_all())
cursor_data = cur.fetchall()
if cursor_data:
print("Data collected....")
view_data = pd.DataFrame((view_data_record for view_data_record in cursor_data),
columns=ConstantsValues().columns_names)
print(view_data)
else:
print("Data fetching failed")
# class for postgresSql
from scripts.constants.proj_constants import ConstantsValues
from scripts.core.database.postgres.postgres_connect import cur, postgres_client
from scripts.core.handlers.postgres_data_del import delete_db_post, db_delete_post
from scripts.core.handlers.postgres_data_insert import read_inputs_post, insert_data_post
from scripts.core.handlers.postgres_data_update import input_update_post, db_update_post
from scripts.core.handlers.postgres_data_view import view_fields_post, input_view_post
from scripts.core.handlers.postgres_table_create import create_table
class PostgresStart:
@staticmethod
def post_create():
try:
client_cursor = cur
check_result = create_table(client_cursor)
if not check_result:
return False
else:
return check_result
except Exception as e:
print(f'{ConstantsValues().exception_msg}{e}')
@staticmethod
def post_insert():
try:
num_prod = int(input("Enter the number of products: "))
while num_prod > 0:
p_name, p_category, retail_price, discount_price, description, over_rating = read_inputs_post()
insert_check = insert_data_post(cur, p_name,
p_category, retail_price, discount_price, description, over_rating)
if insert_check:
print("Data inserted....")
num_prod -= 1
else:
print("Something went wrong data cannot be inserted....")
except Exception as e:
print("Insertion failed", e)
@staticmethod
def post_update():
try:
condition_values, update_values = input_update_post()
update_check = db_update_post(cur, condition_values, update_values)
if update_check is None:
print("Data updated")
else:
print("Data not updated")
except Exception as e:
print(f'{ConstantsValues().exception_msg},{e}')
@staticmethod
def post_delete():
try:
condition_values = delete_db_post()
del_check = db_delete_post(cur, condition_values)
if del_check is None:
print("Data deleted")
else:
print("Data not deleted")
except Exception as e:
print(f'{ConstantsValues().exception_msg}{e}')
@staticmethod
def post_view():
try:
condition_values = input_view_post()
view_fields_post(cur, condition_values)
except Exception as e:
print(f'{ConstantsValues().exception_msg}{e}')
@staticmethod
def post_quit():
# Close the cursor and connection
cur.close()
postgres_client.close()
# creating a table
from scripts.constants.proj_constants import ConstantsValues
from scripts.core.database.postgres.queries.post_queries import QueriesPost
# create table if exist or not
def create_table(cur):
try:
cursor_db = cur
cursor_db.execute(QueriesPost().create_table())
except Exception as e:
print(f'{ConstantsValues().exception_msg},{e}')
return False
else:
return cursor_db
# updating the db
from scripts.constants.proj_constants import ConstantsValues
from scripts.core.database.mongo.queries.mongodb_queires import MongoDBQuery
# user data
def input_update_db():
columns_ = ConstantsValues().columns_names
integer_val = [0, 3, 4, 6]
counter = 0
condition_values = {}
print("Conditions")
while counter < len(columns_):
field_value = input(f"Enter the {columns_[counter]} data: ")
if field_value != '' and (counter in integer_val):
condition_values.update({f'{columns_[counter]}': int(field_value)})
if field_value != '' and (counter not in integer_val):
condition_values.update({f'{columns_[counter]}': f'{field_value}'})
counter += 1
counter = 1
update_values = {}
print("New values")
while counter < len(columns_):
field_value = input(f"Enter the {columns_[counter]} data: ")
if field_value != '' and (counter in integer_val):
update_values.update({f'{columns_[counter]}': int(field_value)})
if field_value != '' and (counter not in integer_val):
update_values.update({f'{columns_[counter]}': f'{field_value}'})
counter += 1
return condition_values, update_values
# updating the database
def db_update(db, condition_values, update_values):
update_data = {"$set": update_values}
cur = MongoDBQuery().update(db, condition_values, update_data)
return cur
# view db data
# display conditions
import pandas as pd
from scripts.constants.proj_constants import ConstantsValues
from scripts.core.database.mongo.queries.mongodb_queires import MongoDBQuery
# display conditions
def view_cond():
columns_ = ConstantsValues().columns_names
integer_val = [0, 3, 4, 6]
counter = 0
condition_values = {}
print("Conditions")
while counter < len(columns_):
field_value = input(f"Enter the {columns_[counter]} data: ")
if field_value != '' and (counter in integer_val):
condition_values.update({f'{columns_[counter]}': int(field_value)})
if field_value != '' and (counter not in integer_val):
condition_values.update({f'{columns_[counter]}': f'{field_value}'})
counter += 1
return condition_values
# view the data
def view_fields(db, field_filter):
view_data_cursor = MongoDBQuery().view(db, field_filter)
view_cursor_count = db.count_documents(field_filter)
if view_cursor_count > 0:
print("Data collected....")
view_data = pd.json_normalize([view_data_record for view_data_record in view_data_cursor])
print(view_data)
else:
print("Data fetching failed")
# access class functions
from scripts.core.handlers.mongo_db_operations import MongoDbStart
from scripts.core.handlers.postgres_db_operations import PostgresStart
# mongo access
def mongo_db_access_op():
db_class = MongoDbStart()
db_created = db_class.mongo_create()
while True:
try:
user_query = input("Enter insert/ update/ delete/ view/ quit : ")
if user_query == 'insert':
db_class.mongo_insert(db_created)
elif user_query == 'update':
db_class.mongo_update(db_created)
elif user_query == 'delete':
db_class.mongo_delete(db_created)
elif user_query == 'view':
db_class.mongo_view(db_created)
elif user_query == 'view-all':
db_class.mongo_view_all(db_created)
else:
print("Exiting....")
break
except Exception as e:
print("Exception occurred: ", e)
else:
choice = input("Do you want to continue (yes/no): ")
if choice == 'yes':
continue
else:
print("Thank you....")
break
# postgres access
def postgres_db_access_op():
db_postgres_class = PostgresStart()
create_result = db_postgres_class.post_create()
if not create_result:
print("Cannot create table")
return False
while True:
try:
user_query = input("Enter insert/ update/ delete/ view/ quit : ")
if user_query == 'insert':
db_postgres_class.post_insert()
elif user_query == 'update':
db_postgres_class.post_update()
elif user_query == 'delete':
db_postgres_class.post_delete()
elif user_query == 'view':
db_postgres_class.post_view()
else:
db_postgres_class.post_quit()
print("Exiting....")
break
except Exception as e:
print("Exception occurred: ", e)
else:
choice = input("Do you want to continue (yes/no): ")
if choice == 'yes':
continue
else:
print("Thank you....")
return True
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment