Commit 3c064dd1 authored by arun.uday's avatar arun.uday

AssetManagerV1.0-Not Reviewed

Log files Updated.
All Files return statements are updated for exceptions.
Comments are updated
parent a0a786d6
......@@ -19,6 +19,7 @@ EMAIL_PORT=465
EMAIL_PASSWORD=gpphuiweedqukchf
HTML_LINK=scripts/utils/link_email.html
RESET_ENDPOINT=http://localhost:8671/asset_manager_api/v1/login/reset?token
REDIRECT_URL_RESET=http://192.168.2.102/iLens_UI/#/l/login
CLIENT_ID=1060631831358-a21djaa3hm165a8976fnmo1lerujs5p6.apps.googleusercontent.com
......
......@@ -38,5 +38,5 @@ if __name__ == "__main__":
# starting the app
uvicorn.run("main:app", host=arguments["bind"], port=int(arguments["port"]))
except Exception as e:
logger.exception(f'Services Failed with error {e}')
logger.exception(f'Services Failed with error from app {e}')
2023-04-04 20:12:33 - INFO - [MainThread:<module>(): 37] - App Starting at 0.0.0.0:8671
2023-04-05 11:00:52 - INFO - [MainThread:<module>(): 37] - App Starting at 0.0.0.0:8671
2023-04-04 20:08:30 - INFO - [MainThread:<module>(): 37] - App Starting at 0.0.0.0:8671
2023-04-05 10:58:48 - ERROR - [MainThread:reset_user_password(): 258] - Services Failed with error from reset user password 'ResetPassword' object has no attribute 'password'
2023-04-04 20:07:39 - INFO - [MainThread:<module>(): 37] - App Starting at 0.0.0.0:8671
2023-04-05 10:54:36 - INFO - [MainThread:<module>(): 37] - App Starting at 0.0.0.0:8671
2023-04-04 20:06:55 - INFO - [MainThread:<module>(): 37] - App Starting at 0.0.0.0:8671
2023-04-05 10:14:04 - INFO - [MainThread:<module>(): 37] - App Starting at 0.0.0.0:8671
2023-04-04 20:06:23 - INFO - [MainThread:<module>(): 37] - App Starting at 0.0.0.0:8671
2023-04-04 20:12:33 - INFO - [MainThread:<module>(): 37] - App Starting at 0.0.0.0:8671
2023-04-04 20:06:10 - INFO - [MainThread:<module>(): 37] - App Starting at 0.0.0.0:8671
2023-04-04 20:08:30 - INFO - [MainThread:<module>(): 37] - App Starting at 0.0.0.0:8671
2023-04-04 20:07:39 - INFO - [MainThread:<module>(): 37] - App Starting at 0.0.0.0:8671
2023-04-04 20:06:55 - INFO - [MainThread:<module>(): 37] - App Starting at 0.0.0.0:8671
2023-04-04 20:06:23 - INFO - [MainThread:<module>(): 37] - App Starting at 0.0.0.0:8671
2023-04-04 20:06:10 - INFO - [MainThread:<module>(): 37] - App Starting at 0.0.0.0:8671
......@@ -37,4 +37,4 @@ if __name__ == "__main__":
allow_headers=ServiceConf.CORS_ALLOW_HEADERS,
)
except Exception as e:
logger.error(e)
logger.error(f'Services Failed with error from main {e}')
import pathlib
from typing import Literal
from pydantic import BaseSettings, Field
......@@ -24,6 +21,7 @@ class _Services(BaseSettings):
EMAIL_PASSWORD: str
HTML_LINK: str
RESET_ENDPOINT: str
REDIRECT_URL_RESET: str
DATE_TIME = '%Y-%m-%d %H:%M:%S'
......
......@@ -42,4 +42,5 @@ class DashboardManagement:
data=data).dict(),
status_code=status.HTTP_200_OK)
except Exception as e:
logger.exception(e)
logger.error(f'Services Failed with error from download header {e}')
return None
......@@ -33,88 +33,103 @@ class LoginHandlers:
self.login_type = ""
def general_login(self, user_data, request):
self.login_type = "general_login"
# decrypting the password from the UI
decrypted_password = self.pass_decrypt.password_decrypt(user_data.password)
# validating the received inputs empty or not
# password decrypted form - token "password"
try:
responses = self.obj_login_handler.user_data_validation(
user_data.email,
decrypted_password.split("\"")[1])
except AttributeError:
return JSONResponse(content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_INVALID_PASSWORD).dict(),
status_code=status.HTTP_200_OK)
# Account is not registered
if responses is not None:
return JSONResponse(content=DefaultFailureResponse(status="failed",
message=responses).dict(),
status_code=status.HTTP_200_OK)
# checking for the account and password matching
user_data_response, data = self.obj_login_handler.db_password_matching(self.login_type, user_data,
decrypted_password.split("\"")[1])
# if the passwords doesn't match with the db data
if user_data_response is not None:
return JSONResponse(content=DefaultFailureResponse(status="failed",
message=data).dict(),
status_code=status.HTTP_200_OK)
# generating the access tokens
responses, exp = self.obj_login_handler.generate_cookie_tokens(data, request)
# token generation unsuccessful
if responses is None:
self.login_type = "general_login"
# decrypting the password from the UI
decrypted_password = self.pass_decrypt.password_decrypt(user_data.password)
# validating the received inputs empty or not
# password decrypted form - token "password"
try:
responses = self.obj_login_handler.user_data_validation(
user_data.email,
decrypted_password.split("\"")[1])
except AttributeError:
return JSONResponse(content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_INVALID_PASSWORD).dict(),
status_code=status.HTTP_200_OK)
# Account is not registered
if responses is not None:
return JSONResponse(content=DefaultFailureResponse(status="failed",
message=responses).dict(),
status_code=status.HTTP_200_OK)
# checking for the account and password matching
user_data_response, data = self.obj_login_handler.db_password_matching(self.login_type, user_data,
decrypted_password.split("\"")[1])
# if the passwords doesn't match with the db data
if user_data_response is not None:
return JSONResponse(content=DefaultFailureResponse(status="failed",
message=data).dict(),
status_code=status.HTTP_200_OK)
# generating the access tokens
responses, exp = self.obj_login_handler.generate_cookie_tokens(data, request)
# token generation unsuccessful
if responses is None:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_TOKEN_GENERATION).dict(),
status_code=status.HTTP_200_OK)
# sending successful response to UI
response = JSONResponse(
content=DefaultResponse(status="success", message="Logged In Successfully",
data=data).dict(),
status_code=status.HTTP_200_OK, headers={"Content-Type": "application/json"})
response.set_cookie(key="login-token", value=responses, expires=exp)
return response
except Exception as e:
logger.error(f'Services Failed with error from general_login {e}')
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_TOKEN_GENERATION).dict(),
message=ErrorMessages.ERROR_IN_LOGIN).dict(),
status_code=status.HTTP_200_OK)
# sending successful response to UI
response = JSONResponse(
content=DefaultResponse(status="success", message="Logged In Successfully",
data=data).dict(),
status_code=status.HTTP_200_OK, headers={"Content-Type": "application/json"})
response.set_cookie(key="login-token", value=responses, expires=exp)
return response
def google_login(self, user_data, request):
user_data_remove_none = {key: value for key, value in user_data if key != 'login_type' and value is not None}
req = requests.Request()
try:
id_info = id_token.verify_oauth2_token(
user_data_remove_none["id_token"], req, Secrets.CLIENT_ID)
except InvalidValue:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_TOKEN_EXPIRED).dict(),
status_code=status.HTTP_200_OK)
response, message = self.obj_login_handler.db_data_validation(user_data.login_type, id_info["email"])
# if the response is false then an error message is send back
if response is not None:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=message).dict(),
status_code=status.HTTP_200_OK)
message.update({"name": id_info["name"], "pic_url": id_info["picture"]})
responses = self.obj_login_handler.update_pic(obj_mongo_user, id_info)
if responses is None:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_IN_UPDATING).dict(),
status_code=status.HTTP_200_OK)
# generating the access tokens
responses, exp = self.obj_login_handler.generate_cookie_tokens(message, request)
# token generation unsuccessful
if responses is None:
user_data_remove_none = {key: value for key, value in user_data if
key != 'login_type' and value is not None}
req = requests.Request()
try:
id_info = id_token.verify_oauth2_token(
user_data_remove_none["id_token"], req, Secrets.CLIENT_ID)
except InvalidValue:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_TOKEN_EXPIRED).dict(),
status_code=status.HTTP_200_OK)
response, message = self.obj_login_handler.db_data_validation(user_data.login_type, id_info["email"])
# if the response is false then an error message is send back
if response is not None:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=message).dict(),
status_code=status.HTTP_200_OK)
message.update({"name": id_info["name"], "pic_url": id_info["picture"]})
responses = self.obj_login_handler.update_pic(obj_mongo_user, id_info)
if responses is None:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_IN_UPDATING).dict(),
status_code=status.HTTP_200_OK)
# generating the access tokens
responses, exp = self.obj_login_handler.generate_cookie_tokens(message, request)
# token generation unsuccessful
if responses is None:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_TOKEN_GENERATION).dict(),
status_code=status.HTTP_200_OK)
# sending successful response to UI
response = JSONResponse(
content=DefaultResponse(status="success", message="Logged In Successfully",
data=message).dict(),
status_code=status.HTTP_200_OK, headers={"Content-Type": "application/json"})
response.set_cookie(key="login-token", value=responses, expires=exp)
return response
except Exception as e:
logger.error(f'Services Failed with error from google login {e}')
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_TOKEN_GENERATION).dict(),
message=ErrorMessages.ERROR_IN_LOGIN).dict(),
status_code=status.HTTP_200_OK)
# sending successful response to UI
response = JSONResponse(
content=DefaultResponse(status="success", message="Logged In Successfully",
data=message).dict(),
status_code=status.HTTP_200_OK, headers={"Content-Type": "application/json"})
response.set_cookie(key="login-token", value=responses, expires=exp)
return response
# v2
def microsoft_login(self, request):
......@@ -143,7 +158,6 @@ class LoginHandlers:
expire = datetime.utcnow() + timedelta(minutes=Secrets.TOKEN_EXPIRE_TIME)
to_encode.update({"exp": expire})
jwt_token = jwt.encode(to_encode)
html = ''
# Load the HTML file
try:
with open(Services.HTML_LINK, "r") as f:
......@@ -151,7 +165,9 @@ class LoginHandlers:
html = html.replace("{{ message }}", "Please click the link to reset your password:").replace(
"{{ link }}", Services.RESET_ENDPOINT + "=" + str(jwt_token))
except Exception as e:
logger.exception(e)
logger.exception(logger.error(f'Services Failed sending email {e}'))
return
html_body = MIMEText(html, "html")
mail.attach(html_body)
context = ssl.create_default_context()
......@@ -164,7 +180,11 @@ class LoginHandlers:
content=DefaultSuccessResponse(status="success", message="Email Send Successfully").dict(),
status_code=status.HTTP_200_OK)
except Exception as e:
logger.exception(e)
logger.error(f'Services Failed with error from forgot password handler {e}')
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_IN_SENDING_MAIL).dict(),
status_code=status.HTTP_200_OK)
@staticmethod
def validate_jwt(request):
......@@ -182,12 +202,16 @@ class LoginHandlers:
message=ErrorMessages.ERROR_USER_ID_DOESNT_EXIST).dict(),
status_code=status.HTTP_200_OK)
jwt_token_new = jwt.encode({"email": decoded_token['email']})
return RedirectResponse('http://192.168.2.102/iLens_UI/#/l/login?user_id=' + jwt_token_new)
return RedirectResponse(Services.REDIRECT_URL_RESET + '?user_id=' + jwt_token_new)
except ExpiredSignatureError:
return RedirectResponse(
'http://192.168.2.102/iLens_UI/#/l/login?error=' + "true")
Services.REDIRECT_URL_RESET + '?error=' + "true")
except Exception as e:
logger.exception(e)
logger.error(f'Services Failed with error from validate jwt {e}')
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_IN_RESETTING_PASSWORD).dict(),
status_code=status.HTTP_200_OK)
@staticmethod
def reset_user_password(reset_data):
......@@ -210,13 +234,13 @@ class LoginHandlers:
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_USER_ID_DOESNT_EXIST).dict(),
status_code=status.HTTP_200_OK)
response = EncryptDecryptPassword().password_encrypt(reset_data.password)
if not response:
password_encrypted = EncryptDecryptPassword().password_encrypt(reset_data.new_password)
if not password_encrypted:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_AUTH_FAILED).dict(),
status_code=status.HTTP_200_OK)
response = obj_mongo_user.update_user({"email": decoded_token['email']}, {"password": response})
response = obj_mongo_user.update_user({"email": decoded_token['email']}, {"password": password_encrypted})
if not response:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
......@@ -226,6 +250,13 @@ class LoginHandlers:
content=DefaultSuccessResponse(status="success", message="Reset Successful").dict(),
status_code=status.HTTP_200_OK)
except ExpiredSignatureError:
return "Password Reset Token Expired"
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_INVALID_TOKEN).dict(),
status_code=status.HTTP_200_OK)
except Exception as e:
logger.exception(e)
logger.error(f'Services Failed with error from reset user password {e}')
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_IN_RESETTING_PASSWORD).dict(),
status_code=status.HTTP_200_OK)
......@@ -31,7 +31,8 @@ class NormalLogin:
return ErrorMessages.ERROR_INVALID_PASSWORD
return None
except Exception as e:
logger.exception(e)
logger.error(f'Services Failed with error from process login user data validation {e}')
return None
# db validation
def db_data_validation(self, login_type, email):
......@@ -55,7 +56,8 @@ class NormalLogin:
# if the user exist
return None, response
except Exception as e:
logger.exception(e)
logger.error(f'Services Failed with error from process login db data validation {e}')
return True, "Exception"
# matching the password
def db_password_matching(self, login_type, user_data, password):
......@@ -71,14 +73,19 @@ class NormalLogin:
# if the password is correct
return None, message
except Exception as e:
logger.exception(e)
logger.error(f'Services Failed with error from process login db password matching {e}')
return True, "Exception"
@staticmethod
def update_pic(obj_mongo_user, info_data):
if not obj_mongo_user.update_user({"email": info_data["email"]},
{"name": info_data["name"], "pic_url": info_data["picture"]}):
try:
if not obj_mongo_user.update_user({"email": info_data["email"]},
{"name": info_data["name"], "pic_url": info_data["picture"]}):
return None
return True
except Exception as e:
logger.error(f'Services Failed with error from process login update pic {e}')
return None
return True
# cookie and token creation
@staticmethod
......@@ -95,4 +102,5 @@ class NormalLogin:
else:
return None
except Exception as e:
logger.exception(e)
logger.error(f'Services Failed with error from process login cookie tokens {e}')
return None
......@@ -71,7 +71,11 @@ class UserManagement:
content=DefaultSuccessResponse(status="success", message="User Registration Successful").dict(),
status_code=status.HTTP_200_OK)
except Exception as e:
logger.exception(e)
logger.exception(f'Services Failed with error from general register {e}')
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_IN_REGISTERING).dict(),
status_code=status.HTTP_200_OK)
@staticmethod
# for Google registration using gmail
......@@ -104,7 +108,11 @@ class UserManagement:
content=DefaultSuccessResponse(status="success", message="User Registration Successful").dict(),
status_code=status.HTTP_200_OK)
except Exception as e:
logger.exception(e)
logger.exception(f'Services Failed with error from google register {e}')
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_IN_REGISTERING).dict(),
status_code=status.HTTP_200_OK)
@staticmethod
# for microsoft registration using microsoft account
......@@ -112,7 +120,11 @@ class UserManagement:
try:
return {"message": "Not available"}
except Exception as e:
logger.exception(e)
logger.exception(f'Services Failed with error from microsoft register {e}')
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_IN_REGISTERING).dict(),
status_code=status.HTTP_200_OK)
# update user details
def update_user_details(self, update_data):
......@@ -162,31 +174,38 @@ class UserManagement:
content=DefaultSuccessResponse(status="success", message="Updated Successfully").dict(),
status_code=status.HTTP_200_OK)
except Exception as e:
logger.exception(e)
logger.exception(f'Services Failed with error from update user {e}')
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_IN_UPDATING).dict(),
status_code=status.HTTP_200_OK)
# delete user
@staticmethod
def delete_user_details(user_id):
# fetching and validating the user id
db_user_data = obj_mongo_user.fetch_one_user_details({"user_id": user_id})
# if the user is not available
if db_user_data is None:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_USER_ID_DOESNT_EXIST).dict(),
status_code=status.HTTP_404_NOT_FOUND)
# generating the filter
filter_data_updated = {"user_id": user_id}
# deleting the user
response = obj_mongo_user.delete_user(filter_data_updated)
if not response:
try:
# fetching and validating the user id
db_user_data = obj_mongo_user.fetch_one_user_details({"user_id": user_id})
# if the user is not available
if db_user_data is None:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_USER_ID_DOESNT_EXIST).dict(),
status_code=status.HTTP_404_NOT_FOUND)
# generating the filter
filter_data_updated = {"user_id": user_id}
# deleting the user
response = obj_mongo_user.delete_user(filter_data_updated)
if not response:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_IN_UPDATING).dict(),
status_code=status.HTTP_404_NOT_FOUND)
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_IN_UPDATING).dict(),
status_code=status.HTTP_404_NOT_FOUND)
return JSONResponse(
content=DefaultSuccessResponse(status="success", message="Deleted Successfully").dict(),
status_code=status.HTTP_200_OK)
content=DefaultSuccessResponse(status="success", message="Deleted Successfully").dict(),
status_code=status.HTTP_200_OK)
except Exception as e:
logger.exception(f'Services Failed with error from delete user {e}')
@staticmethod
def fetch_view_header():
......@@ -203,7 +222,11 @@ class UserManagement:
message=ErrorMessages.ERROR_IN_FETCHING).dict(),
status_code=status.HTTP_404_NOT_FOUND)
except Exception as e:
logger.exception(e)
logger.exception(f'Services Failed with error from fetch user header {e}')
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_IN_FETCHING_HEADER).dict(),
status_code=status.HTTP_200_OK)
@staticmethod
def fetch_user_details():
......@@ -235,44 +258,55 @@ class UserManagement:
message=ErrorMessages.ERROR_IN_FETCHING).dict(),
status_code=status.HTTP_404_NOT_FOUND)
except Exception as e:
logger.exception(e)
logger.exception(f'Services Failed with error from fetch user details {e}')
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_IN_FETCHING).dict(),
status_code=status.HTTP_200_OK)
# user change password
def reset_password(self, reset_data):
db_user_data = obj_mongo_user.fetch_one_user_details({"user_id": reset_data.user_id})
# if the user is not available
if db_user_data is None:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_USER_ID_DOESNT_EXIST).dict(),
status_code=status.HTTP_404_NOT_FOUND)
try:
decrypted_password = self.pass_decrypt.password_decrypt(reset_data.new_password)
except TypeError:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_INVALID_PASSWORD).dict(),
status_code=status.HTTP_200_OK)
if not self.pwd_context.verify(decrypted_password.split("\"")[1], db_user_data["password"]):
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_PASSWORD_MISMATCH).dict(),
status_code=status.HTTP_200_OK)
response = EncryptDecryptPassword().password_encrypt(reset_data.new_password)
if not response:
db_user_data = obj_mongo_user.fetch_one_user_details({"user_id": reset_data.user_id})
# if the user is not available
if db_user_data is None:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_USER_ID_DOESNT_EXIST).dict(),
status_code=status.HTTP_404_NOT_FOUND)
try:
decrypted_password = self.pass_decrypt.password_decrypt(reset_data.new_password)
except TypeError:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_INVALID_PASSWORD).dict(),
status_code=status.HTTP_200_OK)
if not self.pwd_context.verify(decrypted_password.split("\"")[1], db_user_data["password"]):
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_PASSWORD_MISMATCH).dict(),
status_code=status.HTTP_200_OK)
response = EncryptDecryptPassword().password_encrypt(reset_data.new_password)
if not response:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_AUTH_FAILED).dict(),
status_code=status.HTTP_200_OK)
response = obj_mongo_user.update_user({"user_id": reset_data.user_id}, {"password": response})
if not response:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_IN_UPDATING).dict(),
status_code=status.HTTP_200_OK)
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_AUTH_FAILED).dict(),
content=DefaultSuccessResponse(status="success", message="Password Changed Successfully").dict(),
status_code=status.HTTP_200_OK)
response = obj_mongo_user.update_user({"user_id": reset_data.user_id}, {"password": response})
if not response:
except Exception as e:
logger.exception(f'Services Failed with error from reset user password {e}')
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_IN_UPDATING).dict(),
message=ErrorMessages.ERROR_IN_RESETTING_PASSWORD).dict(),
status_code=status.HTTP_200_OK)
return JSONResponse(
content=DefaultSuccessResponse(status="success", message="Password Changed Successfully").dict(),
status_code=status.HTTP_200_OK)
# user logout
@staticmethod
......@@ -295,4 +329,8 @@ class UserManagement:
response.delete_cookie("login-token")
return response
except Exception as e:
logger.exception(e)
logger.exception(f'Services Failed with error from user logout {e}')
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_IN_LOGOUT).dict(),
status_code=status.HTTP_200_OK)
......@@ -58,9 +58,3 @@ class MongoUser(CollectionBaseClass):
if user := self.delete_one(query=query):
return user
return None
# for filtering
def filter_data_aggregate(self, pipeline):
if user := self.aggregate(pipelines=pipeline):
return user
return None
......@@ -6,6 +6,7 @@ class ErrorMessages:
ERROR_INVALID_LOGIN = "Your are not authorized to view this website."
ERROR_ACCOUNT_DOESNT_EXIST = "Account Does Not Exist"
ERROR_UNAUTHORIZED_USER_LOGIN = "Account is not available"
ERROR_IN_LOGIN = "Login Error"
ERROR_UNAUTHORIZED_ACCESS = "Your are not authorized to view this page"
ERROR_ACCESS_DENIED = "Access Denied!"
ERROR_USER_NOT_REGISTERED = "Account is not registered in the portal."
......@@ -18,6 +19,12 @@ class ErrorMessages:
ERROR_INVALID_REQUEST = "Invalid Request"
ERROR_USER_SESSION = "Not The Users Session"
ERROR_TOKEN_EXPIRED = "Google Token Expired"
ERROR_IN_SENDING_MAIL = "Error In Sending Mail"
ERROR_IN_RESETTING_PASSWORD = "Password Reset Failed"
ERROR_IN_REGISTERING = "User Cannot Be Added"
ERROR_IN_DELETING = "User Deletion Error"
ERROR_IN_FETCHING_HEADER = "Header Fetch Failed"
ERROR_IN_LOGOUT = "Logout Failed"
# Data Validation
ERROR_INVALID_PASSWORD = "Invalid Password"
......@@ -28,3 +35,4 @@ class ErrorMessages:
ERROR_USER_ID_DOESNT_EXIST = "User Id doesn't exist"
ERROR_USER_ID = "User Id Not Required"
ERROR_INVALID_TOKEN = "Invalid Token"
ERROR_IN_VALIDATION = "Validation Failed"
......@@ -43,7 +43,7 @@ async def login_default_api(
detail=ErrorMessages.ERROR_INVALID_REQUEST)
except Exception as e:
logger.exception(e)
logger.exception(f'Services Failed with error from login_default_api {e}')
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.OP_FAILED).dict(),
......@@ -58,9 +58,14 @@ async def forgot_password(
try:
# forgot password
response = obj_login_handler.forgot_password_handler(validation_data.email)
if not response:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_IN_SENDING_MAIL).dict(),
status_code=status.HTTP_200_OK)
return response
except Exception as e:
logger.exception(e)
logger.exception(f'Services Failed with error from forgot password api {e}')
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.OP_FAILED).dict(),
......@@ -75,9 +80,14 @@ async def reset_password(
try:
# Get the JWT token from the query parameters
response = obj_login_handler.validate_jwt(request)
if not response:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_IN_RESETTING_PASSWORD).dict(),
status_code=status.HTTP_200_OK)
return response
except Exception as e:
logger.exception(e)
logger.exception(f'Services Failed with error from get reset password api {e}')
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.OP_FAILED).dict(),
......@@ -92,9 +102,14 @@ async def reset_password(
try:
# Get the JWT token from the query parameters
response = obj_login_handler.reset_user_password(reset_data)
if not response:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_IN_RESETTING_PASSWORD).dict(),
status_code=status.HTTP_200_OK)
return response
except Exception as e:
logger.exception(e)
logger.exception(f'Services Failed with error from post reset password api {e}')
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.OP_FAILED).dict(),
......@@ -135,7 +150,7 @@ async def user_management(
status_code=status.HTTP_403_FORBIDDEN,
detail=ErrorMessages.ERROR_INVALID_REQUEST)
except Exception as e:
logger.exception(e)
logger.exception(f'Services Failed with error from user management api {e}')
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.OP_FAILED).dict(),
......@@ -163,7 +178,7 @@ async def user_view_header(
detail=ErrorMessages.ERROR_IN_FETCHING)
return response
except Exception as e:
logger.exception(e)
logger.exception(f'Services Failed with error from view users header api {e}')
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.OP_FAILED).dict(),
......@@ -190,7 +205,7 @@ async def user_view_data(
detail=ErrorMessages.ERROR_IN_FETCHING)
return response
except Exception as e:
logger.exception(e)
logger.exception(f'Services Failed with error from view users data api {e}')
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.OP_FAILED).dict(),
......@@ -218,7 +233,7 @@ async def user_change_password(
detail=ErrorMessages.ERROR_IN_FETCHING)
return response
except Exception as e:
logger.exception(f"From user change password - {e}")
logger.exception(f'Services Failed with error from change password api {e}')
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.OP_FAILED).dict(),
......@@ -245,7 +260,7 @@ async def dashboard_download_files(
detail=ErrorMessages.ERROR_IN_FETCHING)
return response
except Exception as e:
logger.exception(e)
logger.exception(f'Services Failed with error from download button api {e}')
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.OP_FAILED).dict(),
......@@ -268,7 +283,7 @@ async def user_logout(
detail=ErrorMessages.ERROR_IN_FETCHING)
return response
except Exception as e:
logger.exception(e)
logger.exception(f'Services Failed with error from logout api {e}')
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.OP_FAILED).dict(),
......
......@@ -11,7 +11,8 @@ class RegexValidation:
regex = re.fullmatch(r'^([A-Za-z]+)*$', str(name))
return regex
except Exception as e:
logger.error(f'An Error While listing the home plans {str(e)}')
logger.error(f'Error in name regex {str(e)}')
return None
@staticmethod
def phone_number_validation(phone_number):
......@@ -19,7 +20,8 @@ class RegexValidation:
regex = re.fullmatch('([0-9]{1,20})', str(phone_number))
return regex
except Exception as e:
logger.error(f'An Error While listing the home plans {str(e)}')
logger.error(f'An Error in phone number regex {str(e)}')
return None
@staticmethod
def password_validation(password):
......@@ -29,4 +31,5 @@ class RegexValidation:
return False
except Exception as e:
logger.error(f'An Error While listing the home plans {str(e)}')
logger.error(f'Error in password regex {str(e)}')
return None
from typing import Optional
from scripts.database.mongo.mongo_db import MongoUser
from scripts.logging.logger import logger
from scripts.schemas.project_schema import ResetPassword
obj_mongo_user = MongoUser()
......@@ -18,13 +19,20 @@ class AuthorizeAccess:
return True
except TypeError:
return False
except Exception as e:
logger.error(f'Services Failed with error from authorize access {e}')
return False
@staticmethod
def login_authorize(request, reset_data: Optional[ResetPassword] = None):
if reset_data is not None:
if reset_data.user_id != request.user_id:
return False
return True
if request.user_id:
return True
return False
try:
if reset_data is not None:
if reset_data.user_id != request.user_id:
return False
return True
if request.user_id:
return True
return False
except Exception as e:
logger.error(f'Services Failed with error from login authorize {e}')
return False
......@@ -39,3 +39,4 @@ class JWT:
except Exception as e:
logging.exception(f"Exception while validating JWT: {str(e)}")
raise
......@@ -36,14 +36,19 @@ class EncryptDecryptPassword:
data = self.un_pad(data)
return data.decode(Services.ENCODING_TYPE)
except Exception as e:
logger.exception(e)
logger.error(f'Services Failed with error from password util password decrypt {e}')
return None
# encrypting the password
def password_encrypt(self, password):
# decrypting the UI password
decrypted_password = self.password_decrypt(password)
# hashing the decrypted password
if decrypted_password is None:
try:
# decrypting the UI password
decrypted_password = self.password_decrypt(password)
# hashing the decrypted password
if decrypted_password is None:
return None
hashed_password = self.pwd_context.hash(decrypted_password.split("\"")[1])
return hashed_password
except Exception as e:
logger.error(f'Services Failed with error from password util password encrypt {e}')
return None
hashed_password = self.pwd_context.hash(decrypted_password.split("\"")[1])
return hashed_password
......@@ -12,36 +12,33 @@ class UserDataValidations:
def register_data_validation(user_data, method, feature):
try:
if user_data.name == "":
return False, {"message": ErrorMessages.ERROR_INVALID_NAME,
"data": user_data.phone_number}
return False, ErrorMessages.ERROR_INVALID_NAME
# checking for valid username
if user_data.email == "" or validate_email(
user_data.email) is not True:
return False, {"message": ErrorMessages.ERROR_INVALID_EMAIL, "data": user_data.email}
return False, ErrorMessages.ERROR_INVALID_EMAIL
# checking for valid password
if method == 'general':
if user_data.password == "" or user_data.password == "string":
return False, {"message": ErrorMessages.ERROR_INVALID_PASSWORD, "data": user_data.password}
return False, ErrorMessages.ERROR_INVALID_PASSWORD
# Validate phone number
if user_data.phone_number == "":
return False, {"message": ErrorMessages.ERROR_INVALID_PHONE_NUMBER,
"data": user_data.phone_number}
return False, ErrorMessages.ERROR_INVALID_PHONE_NUMBER
if user_data.user_role == "" and method == 'general' and feature == 'register':
return False, {"message": ErrorMessages.ERROR_INVALID_USER_ROLE,
"data": user_data.phone_number}
return False, ErrorMessages.ERROR_INVALID_USER_ROLE
return True, None
except Exception as e:
logger.exception(e)
logger.error(f'Register data validation error {e}')
return False, ErrorMessages.ERROR_IN_VALIDATION
@staticmethod
def update_data_validation(user_data):
try:
if user_data.name == "":
return False, {"message": ErrorMessages.ERROR_INVALID_NAME,
"data": user_data.phone_number}
return False, ErrorMessages.ERROR_INVALID_NAME
if user_data.user_role == "":
return False, {"message": ErrorMessages.ERROR_INVALID_USER_ROLE,
"data": user_data.phone_number}
return False, ErrorMessages.ERROR_INVALID_USER_ROLE
return True, None
except Exception as e:
logger.exception(e)
logger.error(f'Update data validation error {e}')
return False, ErrorMessages.ERROR_IN_VALIDATION
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment