Commit 9963760e authored by kavyashree.um's avatar kavyashree.um

Hi

parent 2390a9fb
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
#IDE
.idea/
idea/
\ No newline at end of file
epr-dev-deployment:
stage: deploy
script:
- DT=`date +%d-%m-%Y-%H%M`
- tar czvf $CI_PROJECT_NAME.tar.gz *
- echo "Deploying to the test server..."
# - sshpass -p $OFC_PASSWD ssh -o "StrictHostKeyChecking no" $OFC_USERNAME@$OFC_HOSTNAME "sudo su" && $OFC_HOSTNAME
- sshpass -p $OFC_PASSWD ssh -o "StrictHostKeyChecking no" $OFC_USERNAME@$OFC_HOSTNAME "mkdir -p /tmp/$CI_PROJECT_NAME/tar/ /tmp/$CI_PROJECT_NAME/untar/"
- scp -r $CI_PROJECT_NAME.tar.gz $OFC_PASSWD ssh -o "StrictHostKeyChecking no" $OFC_USERNAME@$OFC_HOSTNAME:/tmp/$CI_PROJECT_NAME/tar/
- sshpass -p $OFC_PASSWD scp -o "StrictHostKeyChecking no" $OFC_USERNAME@$OFC_HOSTNAME -r $CI_PROJECT_NAME.tar.gz $OFC_USERNAME@$OFC_HOSTNAME:/tmp/$CI_PROJECT_NAME/tar/
- sshpass -p $OFC_PASSWD ssh -o "StrictHostKeyChecking no" $OFC_USERNAME@$OFC_HOSTNAME "tar xzvf /tmp/$CI_PROJECT_NAME/tar/$CI_PROJECT_NAME.tar.gz -C /tmp/$CI_PROJECT_NAME/untar/"
- sshpass -p $OFC_PASSWD ssh -o "StrictHostKeyChecking no" $OFC_USERNAME@$OFC_HOSTNAME "sudo cp -r /opt/services/epr/module6/Services-testing/ /opt/services/epr/module6/archive/epr-services-bkp-$DT"
- sshpass -p $OFC_PASSWD ssh -o "StrictHostKeyChecking no" $OFC_USERNAME@$OFC_HOSTNAME "sudo rsync -r /tmp/$CI_PROJECT_NAME/untar/* /opt/services/epr/module6/Services-testing/"
- sshpass -p $OFC_PASSWD ssh -o "StrictHostKeyChecking no" $OFC_USERNAME@$OFC_HOSTNAME "sudo /root/miniconda3/envs/epr-module6/bin/pip install -r /opt/services/epr/module6/Services/requirements.txt"
# - sshpass -p $OFC_PASSWD ssh -o "StrictHostKeyChecking no" $OFC_USERNAME@$OFC_HOSTNAME "sudo chown -R svc-glens:svc-glens /home/svc-glens/.miniconda3/envs/supportlens/ /opt/services/glens/support_lens/"
- sshpass -p $OFC_PASSWD ssh -o "StrictHostKeyChecking no" $OFC_USERNAME@$OFC_HOSTNAME "sudo systemctl restart cpcb-epr-module-6-testing.service && sudo systemctl status cpcb-epr-module-6-testing.service"
after_script:
- ssh $OFC_HOSTNAME "rm -rf /tmp/$CI_PROJECT_NAME"
- rm -f $CI_PROJECT_NAME.tar.gz
only:
- dev
tags:
- shell
\ No newline at end of file
import uvicorn
from fastapi import FastAPI
from scripts.core.services import user_management_service, dashboard_service, leave_management_service, audit_service
from scripts.core.services.audit_service import audit_router
from scripts.logging.application_logging import logger
app = FastAPI(title="Epr 6")
app.include_router(user_management_service.router)
app.include_router(dashboard_service.router)
app.include_router(leave_management_service.router)
app.include_router(audit_router)
@app.get("/")
def read_root():
return "welcome to the application"
if __name__ == "__main__":
logger.info("EPR 6th module")
uvicorn.run(app, host="localhost", port=5050)
\ No newline at end of file
[LOG]
log_level=INFO
base_path=logs/
file_name=fast_api
handlers=console,file
logger_name=Fast API
[MYSQL]
host = 192.168.3.145
user=epr-testing
database= epr_6
password = epr@123
import configparser
config = configparser.ConfigParser()
config.read('conf/application.conf')
db_conf = configparser.ConfigParser()
db_conf.read('conf/db.conf')
api_base_service_url = "/fastapi/template"
LOG_LEVEL = config.get('LOG', 'log_level')
LOG_BASEPATH = config.get('LOG', 'base_path')
LOG_FILE_NAME = LOG_BASEPATH + config.get('LOG', 'file_name')
LOG_HANDLERS = config.get('LOG', 'handlers')
LOGGER_NAME = config.get('LOG', 'logger_name')
"""
mysql config info
"""
MYSQL_HOST = config.get("MYSQL", "host")
MYSQL_USER = config.get("MYSQL", "user")
MYSQL_DB_NAME = config.get("MYSQL", "database")
MYSQL_DB_PASSWORD = config.get("MYSQL", "password", fallback='')
class Base_url:
base_url = "/api/v1.0/epr_audit"
class Audit_user:
list_audit_user = '/list_audit_user'
login = '/login'
general_profile_info = '/general_profile_info'
change_password = '/change_password'
class Dashboard:
dashboard_count = '/list_dashboard_count'
fetch_status_details = '/list_audit_status'
schedule_audit = '/schedule_audit'
check_list_sidebar = '/check_list_sidebar'
audit_questions = '/audit_questions'
class Audit:
save_response = '/save_response'
submit_response = '/submit_response'
class Leave_management:
leave_dashboard = '/leave_dashboard'
audit_calender = '/audit_calender'
list_states = '/list_states'
list_holidays = '/list_holidays'
class Routes:
user = Base_url.base_url + '/user'
audit = Base_url.base_url + '/audit'
class Table_headers:
list_users_header = [
{
"key": 'first_name',
"value": 'First Name'
},
{
"key": 'last_name',
"value": 'Last Name'
},
{
"key": 'ph_number',
"value": 'Phone Number'
},
{
"key": 'email',
"value": 'Email Id'
},
{
"key": 'user_id',
"value": 'User Id'
}
]
status_details = [
{
"key": "audit_name",
"value": "Audit Name"
},
{
"key": "company",
"value": "Company"
},
{
"key": "address",
"value": "Address"
},
{
"key": "state",
"value": "State"
},
{
"key": "district",
"value": "District"
},
{
"key": "created_on",
"value": "Created on"
}
]
progress_header = [
{
"key": "progress",
"value": "Progress"
}
]
completed_header = [
{
"key": "status",
"value": "Status"
}
]
leave_management = [
{
"key": "from_date",
"value": "From"
},
{
"key": "to_date",
"value": "To"
},
{
"key": "Days",
"value": "no_of_days"
},
{
"key": "Applied on",
"value": "applied_date"
},
{
"key": "Reason",
"value": "reason"
},
{
"key": "Status",
"value": "status"
}
]
extra_headers = [{
"key": "scheduled_start_date",
"value": "Scheduled Start Date"
},
{
"key": "scheduled_end_date",
"value": "Scheduled End Date"
}]
holiday_table_header = [{
"key": "date",
"value": "Date"
},
{
"key": "state",
"value": "Location"
},
{
"key": "holiday_name",
"value": "Reason"
},
{
"key": "day",
"value": "Day"
}]
inbox_header = [
{
"key": "sender",
"value": "Sender"
},
{
"key": "Message",
"value": "message"
},
{
"key": "received_at",
"value": "Received Time"
}
]
class Table_constants:
table_data = [
{
"key": "open",
"label": "Open",
"listType": "Open Audits Requests",
"count": 0
},
{
"key": "scheduled",
"label": "Scheduled",
"listType": "Scheduled Audits",
"count": 0
},
{
"key": "in_progress",
"label": "In Progress",
"listType": "Audits In-Progress",
"count": 0
},
{
"key": "completed",
"label": "Completed",
"listType": "Completed Audits",
"count": 0
}
]
class Response_message:
data_fetch = "Data fetch Successful"
login_success = "Login Successful"
invalid_user = "Invalid Credentials"
scheduled_successful = "Scheduled Successfully"
schedule_exists = "Schedule Already Exists"
response_submitted = "Response Submitted Successfully"
class Api_status:
success = "Success"
status_error = "error"
warning = "warning"
class Boolean_keys:
keys = [
{
'key': "yes",
'label': "Yes"
},
{
'key': "no",
'label': "No"
}
]
class ResponseMessage:
@staticmethod
def final_json(status, message, data=None):
if data is not None:
json = {"status": status, "message": message, "data": data}
else:
json = {"status": status, "message": message}
return json
class Message:
success = "Success"
failure = "Failure"
\ No newline at end of file
import json
from scripts.config.app_constants import ResponseMessage, Message
from scripts.core.handlers.dashboard_handler import mysql_utility
from scripts.logging.application_logging import logger
class AuditHandler:
@staticmethod
def upload_response(final_json):
try:
audit_id = final_json.audit_id
section_id = final_json.section_id
data = final_json.data
for each in data:
question_id = each['question_id']
response = each['response']
status = each['status']
# notes = '{' + (', '.join('"' + item + '"' for item in each['notes'])) + '}'
if each['notes'] == []:
note = {}
else:
note = json.dumps(each['notes'])
update_query = f"""update
audit_question
set
answer = '{str(response).replace("'", "''")}',
note = '{note}',
status = '{status}'
where
audit_id = '{audit_id}'AND
section_id = '{section_id}'AND
question_id = '{question_id}'
"""
mysql_utility.update_mysql_table(query=update_query)
return ResponseMessage.final_json(Message.success, "Successfully Saved")
except Exception as e:
logger.error("Exception while Uploading question response :" + str(e))
return ResponseMessage.final_json(status=Message.failure, message="Error while Uploading question response")
@staticmethod
def submit_response(final_json):
try:
enable = final_json.enable
audit_id = final_json.audit_id
section_id = final_json.section_id
data = final_json.data
for each in data:
question_id = each['question_id']
response = each['response']
status = each['status']
# notes = '{' + (', '.join('"' + item + '"' for item in each['notes'])) + '}'
if each['notes'] == []:
note = {}
else:
note = json.dumps(each['notes'])
update_query = f"""update
audit_question
set
answer = '{str(response).replace("'", "''")}',
note = '{note}',
status = '{status}',
enable = '{enable}'
where
audit_id = '{audit_id}'AND
section_id = '{section_id}'AND
question_id = '{question_id}'
"""
print(update_query)
mysql_utility.update_mysql_table(query=update_query)
return ResponseMessage.final_json(Message.success, "Successfully Saved")
except Exception as e:
logger.error("Exception while Uploading question response :" + str(e))
return ResponseMessage.final_json(status=Message.failure, message="Error while Uploading question response")
This diff is collapsed.
from scripts.logging.application_logging import logger
from scripts.utilities.mysql_utility import Utility
from scripts.config.app_constants import Table_headers, Response_message, Api_status
mysql_utility = Utility()
from datetime import datetime
class Leave_management:
@staticmethod
def leave_management_dashboard(user_id):
"""
:param user_id:
:type user_id:
:return:
:rtype:
"""
try:
response = dict()
logger.info('Inside leave_management_dashboard handler')
fetch_query = f"""select from_date, to_date, no_of_days, reason, applied_date, status from
leave_management where user_id = '{user_id}' """
actual_data = mysql_utility.select_mysql_table(query=fetch_query)[1]
response['status'] = Api_status.success
response['message'] = Response_message.data_fetch
response['data'] = dict()
response['data']['header'] = Table_headers.leave_management
response['data']['data'] = actual_data
return response
except Exception as e:
logger.error("Unable to list leave details" + str(e))
@staticmethod
def audit_calender(user_id, start_date, end_date):
"""
Function to show audit calender
:param user_id:
:type user_id:
:return:
:rtype:
"""
try:
response = dict()
new_start_date = datetime.strptime(str(start_date), "%d/%m/%Y").strftime("%Y-%m-%d")
new_end_date = datetime.strptime(str(end_date), "%d/%m/%Y").strftime("%Y-%m-%d")
logger.info("Inside audit_calender dashboard")
schedule_query = f"""select scheduled_start_date as start, scheduled_end_date as end, audit_name as title,
company as companyName FROM audit where acknowledged_by = '{user_id}' and status = 'scheduled'
and scheduled_start_date >= '{new_start_date}' and scheduled_end_date <= '{new_end_date}'"""
scheduled_data = mysql_utility.select_mysql_table(query=schedule_query)[1]
leave_query = f"""select from_date as start, to_date as end from leave_management
where user_id = '{user_id}'and from_date >= '{new_start_date}' and to_date <= '{new_end_date}' """
leave_data = mysql_utility.select_mysql_table(query=leave_query)[1]
scheduled = [dict(item, **{'color': '#B2EFD3', 'textColor': '#0D5130'}) for item in scheduled_data]
leave = [dict(item, **{"color": "#B6CFF5", "title": "Leave", 'textColor': '#0F3573'}) for item in
leave_data]
response['status'] = Api_status.success
response['message'] = Response_message.data_fetch
response['data'] = dict()
response['data']['scheduled'] = scheduled
response['data']['leave'] = leave
return response
except Exception as e:
logger.error("Unable to fetch calender details" + str(e))
@staticmethod
def list_states(user_id):
"""
Function to list states
:return:
:rtype:
"""
try:
response = dict()
response['defaultState'] = ""
logger.info("Inside list_states handler ")
select_query = f"""select state_id as 'value', state_name as 'label' from states"""
if user_id:
default_state_query = f"""select location from audit_users where user_id = '{user_id}'"""
default_state_data = mysql_utility.select_mysql_fetchone(query=default_state_query)[1]
default_query = f"""select state_id from states where state_name = '{default_state_data.get('location', '')}'"""
default_data = mysql_utility.select_mysql_fetchone(query=default_query)[1]
if default_data is not None:
response['defaultState'] = default_data.get('state_id', '')
states_data = mysql_utility.select_mysql_table(query=select_query)[1]
response['status'] = Api_status.success
response['message'] = Response_message.data_fetch
response['data'] = states_data
return response
except Exception as e:
logger.error("Unable to list states" + str(e))
@staticmethod
def list_holidays(req_json):
"""
Function to list holidays as per state
:param req_json:
:type req_json:
:return:
:rtype:
"""
try:
response = dict()
if req_json.state_id is None:
default_state_query = f"""select location from audit_users where user_id = '{req_json.user_id}'"""
default_state_data = mysql_utility.select_mysql_fetchone(query=default_state_query)[1]
default_state = default_state_data.get('location', '')
state_id_query = f"""select state_id from states where state_name = '{default_state}'"""
state_data = mysql_utility.select_mysql_fetchone(query=state_id_query)[1]
if state_data:
select_query = f"""select * from holiday where state_id = '{state_data.get('state_id', '')}'"""
else:
select_query = f"""select * from holiday where state_id = '{req_json.state_id}'"""
state_name_query = f"""select state_name from states where state_id = '{req_json.state_id}'"""
state_data = mysql_utility.select_mysql_fetchone(query=state_name_query)[1]
default_state = state_data.get('state_name', '')
actual_data = mysql_utility.select_mysql_table(query=select_query)[1]
for each_data in actual_data:
date = each_data.get('date', '')
x = datetime.strptime(str(date), '%Y-%m-%d')
new_date = x.strftime('%d-%b-%Y')
each_data.update({'date': new_date, 'state': default_state})
response['status'] = Api_status.success
response['message'] = Response_message.data_fetch
response['data'] = dict()
response['data']['header'] = Table_headers.holiday_table_header
response['data']['data'] = actual_data
return response
except Exception as e:
logger.error("Unable to list holidays" + str(e))
import hashlib, uuid
from datetime import datetime, timedelta
import bcrypt
import re
from scripts.logging.application_logging import logger
from scripts.utilities.mysql_utility import Utility
from scripts.config.app_constants import Table_headers, Response_message, Api_status
mysql_utility = Utility()
class User_management:
@staticmethod
def list_audit_users():
"""
Function to fetch all the audit users
:return:
:rtype:
"""
try:
response = dict()
response['data'] = dict()
logger.info("Inside list_audit_user handler function")
fetch_query = f"""select first_name, last_name, user_id, ph_number, email from audit_users"""
query_data = mysql_utility.select_mysql_table(query=fetch_query)
response['status'] = Api_status.success
response['message'] = Response_message.data_fetch
response['data']['header'] = Table_headers.list_users_header
response['data']['data'] = query_data[1]
return response
except Exception as e:
logger.error("Unable to list user" + str(e))
@staticmethod
def login_page(user_id, password):
"""
:param user_id:
:type user_id:
:param password:
:type password:
:return:
:rtype:
"""
try:
response = dict()
logger.info("Inside login_page function handler")
fetch_query = f"""select first_name, last_name, password from audit_users where user_id = '{user_id}'"""
actual_data = mysql_utility.select_mysql_table(query=fetch_query)[1]
if bcrypt.checkpw(password.encode('ascii'), actual_data[0]['password'].encode('ascii')):
response['status'] = Api_status.success
response['message'] = Response_message.login_success
response['user_id'] = user_id
response['user_name'] = actual_data[0].get('first_name', '') + ' ' + actual_data[0].get('last_name', '')
else:
response['status'] = Api_status.status_error
response['message'] = Response_message.invalid_user
return response
except Exception as e:
logger.info("Unable to login" + str(e))
@staticmethod
def general_profile_info(input_data):
"""
Function to fetch details of particular user
:return:
:rtype:
"""
try:
response = dict()
response['data'] = dict()
logger.info("Inside profile_details handler function")
fetch_query = f"""select first_name,last_name,email,ph_number from audit_users where user_id = '{input_data.user_id}'"""
query_data = mysql_utility.select_mysql_table(query=fetch_query)
response['status'] = Api_status.success
response['message'] = Response_message.data_fetch
response['data']['data'] = query_data[1]
return response
except Exception as e:
logger.error("Unable to list user" + str(e))
@staticmethod
def change_password(req_json):
try:
data = []
response = dict()
response['data'] = dict()
if req_json.old_password is not None and len(req_json.old_password) > 0:
select_query = f"""select first_name,password from audit_users where user_id = '{req_json.user_id}' """
user = mysql_utility.select_mysql_table(query=select_query)
print(user)
if user is not None:
print(req_json.old_password.encode('ascii'))
print(user[1][0]['password'].encode('ascii'))
if bcrypt.checkpw(req_json.old_password.encode('ascii'), user[1][0]['password'].encode('ascii')):
if req_json.old_password != req_json.new_password:
if validate_password(req_json.new_password):
hashed_password = bcrypt.hashpw(req_json.new_password.encode(), bcrypt.gensalt())
hashed_password = hashed_password.decode('ascii')
update_query = f"""update audit_users
set password = '{hashed_password}'
where user_id = '{req_json.user_id}'"""
status,result = mysql_utility.update_mysql_table(query=update_query)
if status:
response['status'] = Api_status.success
response['message'] = "The password is reset for " + str({req_json.user_id})
response['data'] = data
logger.info("Successfully updated the password")
return response
else:
logger.info("Failed to update the password in database")
else:
response['status'] = Api_status.status_error
response[
'message'] = "New password should contain atleast 1 uppercase,1 lower case," \
"1 special character, 1 number and length should be bettween 8 and 16 "
return response
elif req_json.old_password == req_json.new_password:
response['status'] = Api_status.status_error
response[
'message'] = "Both new password and old password cannot be the same"
return response
else:
response['status'] = Api_status.status_error
response[
'message'] = "Incorrect old password. Please enter the correct old password"
return response
except Exception as e:
logger.error('exception at the reset password handler', e)
raise Exception('Failed to reset password in handler', e)
def validate_password(password):
expression = '^(?=.*?[A-Z])(?=.*?[a-z])(?=.*?[0-9])(?=.*?[#?!@$%^&*-]).{8,}$'
match_this = re.compile(expression)
isvalid = re.search(match_this, password)
if isvalid:
return True
else:
return False
from typing import List
from pydantic.main import BaseModel
class SaveResponse(BaseModel):
audit_id: str
section_id: str
data: List[dict]
class SubmitResponse(BaseModel):
audit_id: str
section_id: str
enable: str
data :List[dict]
from pydantic import BaseModel
class Schedule_audit(BaseModel):
audit_id: str
user_id: str
start_date: str
end_date: str
class Dashboard_status_count(BaseModel):
user_id: str
class Dashboard_status_details(BaseModel):
user_id: str
status: str
class Check_list_sidebar(BaseModel):
audit_id: str
class Questions(BaseModel):
section_id: str
audit_id: str
class AddResponse(BaseModel):
audit_id: str
data: list
from pydantic import BaseModel
from typing import Optional
class Leave_dashboard(BaseModel):
user_id: str
class Audit_calender(BaseModel):
user_id: str
start: str
end: str
class Holiday_list(BaseModel):
state_id: Optional[str] = None
user_id: str
class State_list(BaseModel):
user_id: Optional[str] = None
from pydantic import BaseModel
class Login(BaseModel):
user_id: str
password: str
class GeneralProfileDetails(BaseModel):
user_id: str
class ChangePassword(BaseModel):
old_password: str
new_password: str
user_id: str
from typing import List
from fastapi import APIRouter, Form, UploadFile, File
from scripts.config.app_constants import Routes, Audit
from scripts.core.handlers.audit_handler import AuditHandler
from scripts.core.models.audit_model import SaveResponse, SubmitResponse
audit_router = APIRouter(prefix=Routes.audit)
@audit_router.post(Audit.save_response, tags=[Routes.audit])
def save_response(input_json:SaveResponse):
try:
final_json = AuditHandler().upload_response(input_json)
return final_json
except Exception as e:
return False
@audit_router.post(Audit.submit_response, tags=[Routes.audit])
def save_response(input_json:SubmitResponse):
try:
final_json = AuditHandler().submit_response(input_json)
return final_json
except Exception as e:
return False
from fastapi import APIRouter
from scripts.logging.application_logging import logger
from scripts.core.handlers.dashboard_handler import Dashboard as dashboard_obj
from scripts.config.app_constants import Dashboard, Routes
from scripts.core.models.dashboard_model import Schedule_audit, Dashboard_status_count, Dashboard_status_details, \
Check_list_sidebar, Questions
router = APIRouter(prefix=Routes.user)
@router.post(Dashboard.dashboard_count, tags=[Routes.user])
def list_dashboard_status_count(req_json: Dashboard_status_count):
"""
Function to list the count of dashboard
:return:
:rtype:
"""
try:
logger.info("Inside the list_dashboard_status_count service")
response = dashboard_obj.list_dashboard_count(req_json.user_id)
return response
except Exception as e:
logger.error("Unable to fetch the dashboard count" + str(e))
@router.post(Dashboard.fetch_status_details, tags=[Routes.user])
def fetch_dashboard_status_details(req_json: Dashboard_status_details):
"""
Function to fetch individual status details
:return:
:rtype:
"""
try:
logger.info("Inside the fetch_dashboard_status_details service")
response = dashboard_obj.fetch_status_details(req_json.user_id, req_json.status)
return response
except Exception as e:
logger.error("Unable to fetch the dashboard status details" + str(e))
@router.post(Dashboard.schedule_audit, tags=[Routes.user])
def schedule_audit(req_json: Schedule_audit):
"""
Function to Schedule Audit
:return:
:rtype:
"""
try:
logger.info("Inside the schedule_audit service")
response = dashboard_obj.schedule_audit(req_json)
return response
except Exception as e:
logger.error("Unable to schedule the audit" + str(e))
@router.post(Dashboard.check_list_sidebar, tags=[Routes.user])
def sidebar_check(req_json: Check_list_sidebar):
"""
:param req_json:
:type req_json:
:return:
:rtype:
"""
try:
logger.info("Inside the Start_audit Service")
response = dashboard_obj.sidebar_check(req_json.audit_id)
return response
except Exception as e:
logger.error("Unable to start the audit" + str(e))
@router.post(Dashboard.audit_questions, tags=[Routes.user])
def load_questions(req_json: Questions):
"""
:param req_json:
:type req_json:
:return:
:rtype:
"""
try:
logger.info("Inside the load_question Service")
response = dashboard_obj.load_questions(req_json.section_id, req_json.audit_id)
return response
except Exception as e:
logger.error("Unable to load the questions" + str(e))
from fastapi import APIRouter
from scripts.logging.application_logging import logger
from scripts.core.handlers.leave_management_handler import Leave_management as leave_management_obj
from scripts.config.app_constants import Leave_management, Routes
from scripts.core.models.leave_management_model import Leave_dashboard, Audit_calender, Holiday_list, State_list
router = APIRouter(prefix=Routes.user)
@router.post(Leave_management().leave_dashboard, tags=[Routes.user])
def leave_management_dashboard(req_json: Leave_dashboard):
"""
:param req_json:
:type req_json:
:return:
:rtype:
"""
try:
logger.info("Inside leave_management_dashboard service")
response = leave_management_obj.leave_management_dashboard(req_json.user_id)
return response
except Exception as e:
logger.error("Unable to list User" + str(e))
@router.post(Leave_management().audit_calender, tags=[Routes.user])
def audit_calender(req_json: Audit_calender):
"""
:param req_json:
:type req_json:
:return:
:rtype:
"""
try:
logger.info("Inside audit_calender service")
response = leave_management_obj.audit_calender(req_json.user_id, req_json.start, req_json.end)
return response
except Exception as e:
logger.error("Unable to list User" + str(e))
@router.post(Leave_management().list_states, tags=[Routes.user])
def list_states(req_json: State_list):
"""
Function to list states
:return:
:rtype:
"""
try:
logger.info("Inside list_states service")
response = leave_management_obj.list_states(req_json.user_id)
return response
except Exception as e:
logger.error("Unable to list User" + str(e))
@router.post(Leave_management().list_holidays, tags=[Routes.user])
def list_holidays(req_json: Holiday_list):
"""
Function to list holidays
:return:
:rtype:
"""
try:
logger.info("Inside list_holidays service")
response = leave_management_obj.list_holidays(req_json)
return response
except Exception as e:
logger.error("Unable to list User" + str(e))
from fastapi import APIRouter
from scripts.logging.application_logging import logger
from scripts.core.handlers.user_management_handler import User_management as user_management_obj
from scripts.config.app_constants import Audit_user, Routes
from scripts.core.models.user_management_model import Login, GeneralProfileDetails, ChangePassword
router = APIRouter(prefix=Routes.user)
@router.post(Audit_user().list_audit_user, tags=[Routes.user])
def list_audit_user():
try:
logger.info("Inside list_audit_user service layer")
response = user_management_obj.list_audit_users()
return response
except Exception as e:
logger.error("Unable to list User" + str(e))
@router.post(Audit_user().login, tags=[Routes.user])
def login(req_json: Login):
"""
:param req_json:
:type req_json:
:return:
:rtype:
"""
try:
logger.info("inside login service layer")
response = user_management_obj.login_page(req_json.user_id, req_json.password)
return response
except Exception as e:
logger.error("Unable to login" + str(e))
@router.post(Audit_user().general_profile_info, tags=[Routes.user])
def general_profile_details(input_data: GeneralProfileDetails):
try:
logger.info("Inside profile details service layer")
response = user_management_obj.general_profile_info(input_data)
return response
except Exception as e:
logger.error("Unable to list User" + str(e))
@router.post(Audit_user().change_password, tags=[Routes.user])
def change_password(input_data: ChangePassword):
try:
logger.info("Inside change password service layer")
response = user_management_obj.change_password(input_data)
return response
except Exception as e:
logger.error("Unable to change password " + str(e))
import logging.handlers
import os
import sys
import time
from logging import StreamHandler
from scripts.config import app_configurations
LOG_HANDLERS = app_configurations.LOG_HANDLERS
log_level = app_configurations.LOG_LEVEL
log_file = os.path.join(app_configurations.LOG_FILE_NAME + "_" + time.strftime("%Y%m%d") + '.log')
logger = logging.getLogger(app_configurations.LOGGER_NAME)
logger.setLevel(log_level)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(filename)s - %(module)s: %(funcName)s: '
'%(lineno)d - %(message)s')
if 'console' in LOG_HANDLERS:
# Adding the log Console handler to the logger
console_handler = StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
if 'file' in LOG_HANDLERS:
# Adding the log file handler to the logger
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
import pymysql
from scripts.logging.application_logging import logger
from scripts.config.app_configurations import MYSQL_HOST, MYSQL_USER, MYSQL_DB_NAME, MYSQL_DB_PASSWORD
class Utility(object):
"""
Description: Constructor for the utility to initialize the variables used by the class
Input: host of the RDBMS database instance
user of the RDBMS database
password of the RDBMS database
database name RDBMS database
"""
def __init__(self):
self.host = MYSQL_HOST
self.user = MYSQL_USER
self.password = MYSQL_DB_PASSWORD
self.db_name = MYSQL_DB_NAME
def update_mysql_table(self, query, data=[]):
"""
This method is used for updating tables.
:param query: The update query to be executed
:return: status: The status True on success and False on failure
"""
connection = None
try:
connection = pymysql.connect(host=self.host, user=self.user, password=self.password, db=self.db_name,
charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor)
with connection.cursor() as cursor:
cursor.execute(query,data)
connection.commit()
return True, "Success"
except Exception as e:
logger.error("Exception while updating: ", str(e))
return False, str(e)
finally:
try:
if connection is not None:
connection.close()
except Exception as e:
logger.error("Exception while closing connection: ", str(e))
return False, str(e)
def insert_mysql_table(self, query, data=[]):
"""
This method is used for inserting new records in tables.
:param query: The insert query to be executed
:return: status: The status True on success and False on failure
"""
connection = None
try:
connection = pymysql.connect(host=self.host, user=self.user, password=self.password, db=self.db_name,
charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor)
with connection.cursor() as cursor:
cursor.execute(query, data)
connection.commit()
return True, "Success"
except Exception as e:
logger.error("Exception while updating: ", str(e))
return False, str(e)
finally:
try:
if connection is not None:
connection.close()
except Exception as e:
logger.error("Exception while closing connection: ", str(e))
return False, str(e)
def insert_mysql_table_with_return(self, query, data=[]):
"""
This method is used for inserting new records in tables.
:param query: The insert query to be executed
:return: status: The status True on success and False on failure
"""
connection = None
try:
connection = pymysql.connect(host=self.host, user=self.user, password=self.password, db=self.db_name,
charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor)
with connection.cursor() as cursor:
# cursor.execute(query, data)
test = cursor.execute(query, data)
queryId = connection.insert_id()
connection.commit()
queryId1 = connection.insert_id()
return True,queryId,"Success"
except Exception as e:
logger.error("Exception while updating: ", str(e))
return False, str(e)
finally:
try:
if connection is not None:
connection.close()
except Exception as e:
logger.error("Exception while closing connection: ", str(e))
return False, str(e)
def insert_many_mysql_table(self, query, data=[]):
"""
This method is used for inserting new records in tables.
:param query: The insert query to be executed
:return: status: The status True on success and False on failure
"""
connection = None
try:
connection = pymysql.connect(host=self.host, user=self.user, password=self.password, db=self.db_name,
charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor)
with connection.cursor() as cursor:
cursor.executemany(query, data)
connection.commit()
return True, "Success"
except Exception as e:
logger.error("Exception while updating: ", str(e))
return False, str(e)
finally:
try:
if connection is not None:
connection.close()
except Exception as e:
logger.error("Exception while closing connection: ", str(e))
return False, str(e)
def delete_mysql_table(self, query):
"""
This method is used for updating tables.
:param query: The update query to be executed
:return: status: The status True on success and False on failure
"""
connection = None
try:
connection = pymysql.connect(host=self.host, user=self.user, password=self.password, db=self.db_name,
charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor)
with connection.cursor() as cursor:
cursor.execute(query)
connection.commit()
return True, "Success"
except Exception as e:
logger.error("Exception while updating: ", str(e))
return False, str(e)
finally:
try:
if connection is not None:
connection.close()
except Exception as e:
logger.error("Exception while closing connection: ", str(e))
return False, str(e)
def select_mysql_table(self, query):
"""
This method is used for selecting records from tables.
:param query: The select query to be executed
:return: status: The status True on success and False on failure and the list of rows
"""
connection = None
try:
connection = pymysql.connect(host=self.host, user=self.user, password=self.password, db=self.db_name,
charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor)
with connection.cursor() as cursor:
cursor.execute(query)
result = cursor.fetchall()
return True, result
except Exception as e:
logger.error("Exception while updating: ", str(e))
return False, str(e)
finally:
try:
if connection is not None:
connection.close()
except Exception as e:
logger.error("Exception while closing connection: ", str(e))
return False, str(e)
def select_mysql_fetchone(self, query):
"""
This method is used for selecting records from tables.
:param query: The select query to be executed
:return: status: The status True on success and False on failure and the list of rows
"""
connection = None
try:
connection = pymysql.connect(host=self.host, user=self.user, password=self.password, db=self.db_name,
charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor)
with connection.cursor() as cursor:
cursor.execute(query)
result = cursor.fetchone()
return True, result
except Exception as e:
logger.error("Exception while updating: ", str(e))
return False, str(e)
finally:
try:
if connection is not None:
connection.close()
except Exception as e:
logger.error("Exception while closing connection: ", str(e))
return False, str(e)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment