Commit 61a63dff authored by arun.uday's avatar arun.uday

updated including new file

parent 2b256755
...@@ -18,6 +18,9 @@ class _Services(BaseSettings): ...@@ -18,6 +18,9 @@ class _Services(BaseSettings):
CORS_ALLOW_HEADERS: list[str] = ["*"] CORS_ALLOW_HEADERS: list[str] = ["*"]
LOG_LEVEL: Literal["INFO", "DEBUG", "ERROR", "QTRACE"] = "INFO" LOG_LEVEL: Literal["INFO", "DEBUG", "ERROR", "QTRACE"] = "INFO"
ENABLE_FILE_LOGGING: bool = False ENABLE_FILE_LOGGING: bool = False
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
class _Databases(BaseSettings): class _Databases(BaseSettings):
......
class DatabaseConstants: class DatabaseConstants:
collection_user_details = "userDetails" collection_user_details = "UserDetails"
collection_user_roles = "userRoles" collection_user_roles = "UserRoles"
collection_user_devices = "userDevices" collection_user_devices = "UserDevices"
from __future__ import annotations
from passlib.context import CryptContext
from scripts.database.mongo.mongo_login import MongoUser
from scripts.errors import ErrorMessages
class LoginHandlers:
def __init__(self):
self.pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
self.db_user_data = None
@staticmethod
def user_data_validation(login_data) -> dict | None:
if login_data.username == "" or login_data.username == "user@example.com":
return {"message": ErrorMessages.ERROR_INVALID_USERNAME_PASSWORD, "data": login_data.username}
if login_data.password == "" or login_data.password == "string":
return {"message": ErrorMessages.ERROR_INVALID_USERNAME_PASSWORD, "data": login_data.password}
return None
def db_data_validation(self, login_data):
self.db_user_data = MongoUser().fetch_user_details(login_data.username)
if not self.db_user_data:
return {"message": ErrorMessages.ERROR_USER_NOT_REGISTERED, "data": login_data.username}
return None
def db_password_matching(self, login_data):
if response := self.db_data_validation(login_data):
return response
if not self.pwd_context.verify(login_data.password, self.db_user_data["password"]):
return {"message": ErrorMessages.ERROR_PASSWORD_MISMATCH, "data": {login_data.username,
login_data.password}}
return None
import bcrypt
from scripts.database.mongo.mongo_login import MongoUser
from scripts.logging.logger import logger
class LoginManager:
@staticmethod
def verify_password(encrypted_password, password):
pwd = password
decoded_pwd = pwd.encode('utf-8')
print(encrypted_password, password)
if bcrypt.checkpw(encrypted_password, decoded_pwd):
return True
return False
@staticmethod
def hashed_password(password):
try:
pwd = password
encode_pwd = pwd.encode('utf-8')
# Generate salt
salted = bcrypt.gensalt()
# Hash password
pwd_hash = bcrypt.hashpw(encode_pwd, salted)
return pwd_hash
except Exception as e:
logger.exception(e)
@staticmethod
def get_data(username):
data = MongoUser().fetch_user_details(username)
return data
from scripts.logging.logger import logger
class ValidateUser:
@staticmethod
def valid_user(db_data, username, password):
try:
if db_data["email"] != username:
return False
if db_data["password"] != password:
print(db_data["password"], password)
return False
return True
except Exception as e:
logger.exception(e)
class ErrorMessages:
UNKNOWN_ERROR = "Unknown Error Occurred"
OP_FAILED = "Operation failed"
# Authorization Errors
ERROR_AUTH_FAILED = "Authentication Failed. Please verify token"
ERROR_INVALID_USERNAME_PASSWORD = "Invalid Username or Password."
ERROR_USER_NOT_REGISTERED = "Account is not registered in the portal."
ERROR_PASSWORD_MISMATCH = "Passwords Authentication Failed. Please enter the correct password"
from pydantic import BaseModel
from typing import Union
class UserDetails(BaseModel):
name: str
email: Union[str: None] = None
password: Union[str: None] = None
user_role: Union[str: None] = None
is_alive: Union[bool: None] = None
created_at: Union[str: None] = None
updated_at: Union[str: None] = None
from typing import Any, Optional
from pydantic import BaseModel
class DefaultResponse(BaseModel):
status: bool = True
message: Optional[str]
payload: Optional[Any]
class DefaultFailureResponse(DefaultResponse):
status: bool = False
error: Any
from typing import Union
from pydantic import BaseModel, EmailStr from pydantic import BaseModel, EmailStr
class NormalLogin(BaseModel): class NormalLogin(BaseModel):
username: EmailStr username: Union[EmailStr, None] = None
password: str password: Union[str, None] = None
project_id: Union[str, None] = None
from fastapi import APIRouter from fastapi import APIRouter
from scripts.constants.api import ApiEndPoints from scripts.constants.api import ApiEndPoints
from scripts.core.handlers.loginmanager import LoginManager from scripts.core.handlers.login_handler import LoginHandlers
from scripts.logging.logger import logger from scripts.logging.logger import logger
from scripts.schemas.default_responses import DefaultResponse, DefaultFailureResponse
from scripts.schemas.login_schema import NormalLogin from scripts.schemas.login_schema import NormalLogin
router = APIRouter(prefix=ApiEndPoints.version) router = APIRouter(prefix=ApiEndPoints.version)
obj_login_handler = LoginHandlers()
@router.post(ApiEndPoints.asset_manager_submit) @router.post(ApiEndPoints.asset_manager_submit)
def login_default(login_data: NormalLogin): def login_default(login_data: NormalLogin):
try: try:
obj_login_manager = LoginManager() response = obj_login_handler.user_data_validation(login_data)
password_hashed = obj_login_manager.hashed_password(login_data.password) if response is not None:
cursor_user_data = obj_login_manager.get_data(login_data.username) return DefaultFailureResponse(error=response["message"])
user_exists = obj_login_manager.verify_password(password_hashed, cursor_user_data["password"]) response = obj_login_handler.db_password_matching(login_data)
return user_exists if response is not None:
return DefaultFailureResponse(error=response["message"])
return DefaultResponse(message="Login Successful", payload={"username": login_data.username})
except Exception as e: except Exception as e:
logger.exception(e) logger.exception(e)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment