Commit eb668bf5 authored by arun.uday's avatar arun.uday

AssetManager-V1.0 - To be reviewed

Added new requirements, removed the feature which allowed users with knowledgelens to login as guest in normal login, changed the structure the code, added space for new features for adding google and microsoft login.
parent 8d15b8cc
uvicorn~=0.21.0 uvicorn==0.21.1
python-dotenv~=1.0.0 python-dotenv~=1.0.0
pydantic~=1.10.6 pydantic~=1.10.6
fastapi~=0.94.1 fastapi==0.95.0
passlib~=1.7.4 passlib~=1.7.4
pymongo~=4.3.3 pymongo~=4.3.3
bcrypt~=4.0.1 bcrypt~=4.0.1
email-validator~=1.3.1 email-validator~=1.3.1
pycryptodomex~=3.17 pycryptodomex~=3.17
PyJWT~=2.6.0 PyJWT~=2.6.0
validate_email~=1.3
...@@ -4,6 +4,7 @@ class ApiEndPoints: ...@@ -4,6 +4,7 @@ class ApiEndPoints:
# common # common
asset_manager_submit: str = "/submit" asset_manager_submit: str = "/submit"
asset_manager_user_registration: str = "/register"
# login-management # login-management
asset_manager_login: str = "/login" asset_manager_login: str = "/login"
from __future__ import annotations from scripts.core.handlers.normal_login import NormalLogin
from fastapi.responses import JSONResponse
import base64 from fastapi import status
from datetime import datetime, timedelta
import jwt
from Cryptodome.Cipher import AES
from passlib.context import CryptContext
from scripts.config import Services
from scripts.database.mongo.mongo_login import MongoUser
from scripts.errors import ErrorMessages from scripts.errors import ErrorMessages
from scripts.logging.logger import logger from scripts.schemas.default_responses import DefaultFailureResponse, DefaultResponse
from scripts.utils.mongo_default_queries import MongoQueries
class LoginHandlers: class LoginHandlers:
def __init__(self): def __init__(self):
self.mongo_user = MongoUser() self.obj_login_handler = NormalLogin()
self.mongo_queries = MongoQueries()
self.pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def normal_login(self, login_data):
self.db_user_data = None # decrypting the password from the UI
self.db_user_data = None decrypted_password = self.obj_login_handler.password_decrypt(login_data.payload["password"])
self.dt = datetime.now() # validating the received inputs empty or not
self.time_dt = datetime.now() response = self.obj_login_handler.user_data_validation(login_data.payload["username"], decrypted_password)
# Account is not registered
@staticmethod
def un_pad(s):
# un_padding the encrypted password
return s[:-ord(s[len(s) - 1:])]
def password_decrypt(self, password):
try:
# encoding the Key
key = Services.KEY_ENCRYPTION.encode(Services.ENCODING_TYPE)
# decoding the received password
enc = base64.b64decode(password)
# mode for the decryption
mode = AES.MODE_CBC
# getting the initialization vector
iv = enc[:AES.block_size]
# decoding with AES
cipher = AES.new(key, mode, iv)
# decoding the password
data = cipher.decrypt(enc[AES.block_size:])
if len(data) == 0:
raise ValueError("Decrypted data is empty")
# removing the padding to get the password
data = self.un_pad(data)
return data.decode('utf-8')
except Exception as e:
logger.exception(e)
@staticmethod
def user_data_validation(username, password) -> dict | None:
try:
# checking for valid username
if username == "" or username == "user@example.com":
return {"message": ErrorMessages.ERROR_INVALID_USERNAME, "data": username}
# checking for valid password
if password == "" or password == "string":
return {"message": ErrorMessages.ERROR_INVALID_PASSWORD, "data": password}
return None
except Exception as e:
logger.exception(e)
def new_user_login(self, login_data, password):
# check the domain of the email the user has entered
if login_data.username.split("@")[-1] == "knowledgelens.com":
# hash the password
hashed_password = self.pwd_context.hash(password)
# Enter the user as a guest user to the db
if self.mongo_user.insert_user(
self.mongo_queries.insert_user_query(
login_data,
Services.PROJECT_ID,
hashed_password,
self.time_dt)):
return True
return False
def db_data_validation(self, login_data, password):
try:
# fetching the data based on the username
self.db_user_data = MongoUser().fetch_user_details(login_data.username)
# if the user is not available
if not self.db_user_data:
# if the domain of the user email is knowledge lens create the user as a guest user
if self.new_user_login(login_data, password):
return True, {"message": "new_user", "username": login_data.username, "role": "guest"}
else:
return False, {"message": ErrorMessages.ERROR_UNAUTHORIZED_USER_LOGIN,
"data": {"username": login_data.username}}
# Check the project id from the request body
if self.db_user_data["project_id"] != Services.PROJECT_ID or Services.PROJECT_ID != login_data.project_id:
return False, {"message": ErrorMessages.ERROR_UNAUTHORIZED_USER_LOGIN, "data": login_data.username}
# if the user exist
return None, {"message": True}
except Exception as e:
logger.exception(e)
def db_password_matching(self, login_data, password):
try:
# getting the response after checking for the user data in db
response, message = self.db_data_validation(login_data, password)
if response and message["message"] == "new_user":
return None, message
# if the response is false then an error message is send back
if response is not None: if response is not None:
return response, message return JSONResponse(content=DefaultFailureResponse(error=response).dict(),
# if the user exists in db then password is matched status_code=status.HTTP_400_BAD_REQUEST)
if not self.pwd_context.verify(password, self.db_user_data["password"]): # checking for the account and password matching
return False, {"message": ErrorMessages.ERROR_PASSWORD_MISMATCH, response, data = self.obj_login_handler.db_password_matching(login_data.project_id,
"data": {"username": login_data.username}} login_data.payload,
# if the password is correct decrypted_password)
return None, {"username": login_data.username, "role": self.db_user_data["user_role"]} # if the passwords doesn't match with the db data
except Exception as e: if response is not None:
logger.exception(e) return JSONResponse(content=DefaultFailureResponse(error=data).dict(),
status_code=status.HTTP_401_UNAUTHORIZED)
@staticmethod # generating the access tokens
def create_access_token(data: dict, expires_delta): response = self.obj_login_handler.generate_tokens(login_data.payload, data)
try: # token generation unsuccessful
# creating a copy of the data if response is None:
to_encode = data.copy() return JSONResponse(
# checking if the expires_delta is empty content=DefaultFailureResponse(message="Access Unsuccessful",
if expires_delta: error=ErrorMessages.ERROR_TOKEN_GENERATION).dict(),
expire = datetime.utcnow() + expires_delta status_code=status.HTTP_403_FORBIDDEN)
else: # sending successful response to UI
expire = datetime.utcnow() + timedelta(minutes=15) return JSONResponse(
# updating the data with the expiration time content=DefaultResponse(message="Login Successful", payload=response).dict(),
to_encode.update({"expire": expire.isoformat()}) status_code=status.HTTP_200_OK)
# creating the token
encoded_jwt = jwt.encode(to_encode, Services.SECRET_KEY, algorithm=Services.ALGORITHM) # v1
return encoded_jwt def google_login(self, request):
except Exception as e: pass
logger.exception(e)
# v2
def generate_tokens(self, login_data, data): def microsoft_login(self, request):
try: pass
# creating the expiration time
access_token_expires = timedelta(minutes=Services.ACCESS_TOKEN_EXPIRE_MINUTES)
# creating the access token
access_token = self.create_access_token(
data={"username": login_data.username, "role": data["role"]}, expires_delta=access_token_expires
)
if access_token:
return {"access_token": access_token, "token_type": "bearer"}
else:
return None
except Exception as e:
logger.exception(e)
from __future__ import annotations
import base64
from datetime import datetime, timedelta
import jwt
from Cryptodome.Cipher import AES
from passlib.context import CryptContext
from validate_email import validate_email
from scripts.config import Services
from scripts.database.mongo.mongo_login import MongoUser
from scripts.errors import ErrorMessages
from scripts.logging.logger import logger
class NormalLogin:
def __init__(self):
self.mongo_user = MongoUser()
self.pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
self.db_user_data = None
self.db_user_data = None
self.dt = datetime.now()
self.time_dt = datetime.now()
@staticmethod
def un_pad(s):
# un_padding the encrypted password
return s[:-ord(s[len(s) - 1:])]
def password_decrypt(self, password):
try:
# encoding the Key
key = Services.KEY_ENCRYPTION.encode(Services.ENCODING_TYPE)
# decoding the received password
enc = base64.b64decode(password)
# mode for the decryption
mode = AES.MODE_CBC
# getting the initialization vector
iv = enc[:AES.block_size]
# decoding with AES
cipher = AES.new(key, mode, iv)
# decoding the password
data = cipher.decrypt(enc[AES.block_size:])
if len(data) == 0:
raise ValueError("Decrypted data is empty")
# removing the padding to get the password
data = self.un_pad(data)
return data.decode('utf-8')
except Exception as e:
logger.exception(e)
@staticmethod
def user_data_validation(username, password) -> dict | None:
try:
# checking for valid username
if username == "" or username == "user@example.com" or validate_email(username) is not True:
return {"message": ErrorMessages.ERROR_INVALID_USERNAME, "data": username}
# checking for valid password
if password == "" or password == "string":
return {"message": ErrorMessages.ERROR_INVALID_PASSWORD, "data": password}
return None
except Exception as e:
logger.exception(e)
def db_data_validation(self, project_id, login_data):
try:
# fetching the data based on the username
self.db_user_data = MongoUser().fetch_user_details(login_data["username"])
# if the user is not available
if not self.db_user_data:
return False, {"message": ErrorMessages.ERROR_UNAUTHORIZED_USER_LOGIN,
"data": {"username": login_data["username"]}}
# Check the project id from the request body
if self.db_user_data["project_id"] != Services.PROJECT_ID or Services.PROJECT_ID != project_id:
return False, {"message": ErrorMessages.ERROR_UNAUTHORIZED_USER_LOGIN, "data": login_data.username}
# if the user exist
return None, {"message": True}
except Exception as e:
logger.exception(e)
def db_password_matching(self, project_id, login_data, password):
try:
# getting the response after checking for the user data in db
response, message = self.db_data_validation(project_id, login_data)
# if the response is false then an error message is send back
if response is not None:
return response, message
# if the user exists in db then password is matched
if not self.pwd_context.verify(password, self.db_user_data["password"]):
return False, {"message": ErrorMessages.ERROR_PASSWORD_MISMATCH,
"data": {"username": login_data["username"]}}
# if the password is correct
return None, {"username": login_data["username"], "role": self.db_user_data["user_role"]}
except Exception as e:
logger.exception(e)
@staticmethod
def create_access_token(data: dict, expires_delta):
try:
# creating a copy of the data
to_encode = data.copy()
# checking if the expires_delta is empty
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
# updating the data with the expiration time
to_encode.update({"expire": expire.isoformat()})
# creating the token
encoded_jwt = jwt.encode(to_encode, Services.SECRET_KEY, algorithm=Services.ALGORITHM)
return encoded_jwt
except Exception as e:
logger.exception(e)
def generate_tokens(self, login_data, data):
try:
# creating the expiration time
access_token_expires = timedelta(minutes=Services.ACCESS_TOKEN_EXPIRE_MINUTES)
# creating the access token
access_token = self.create_access_token(
data={"username": login_data["username"], "role": data["role"]}, expires_delta=access_token_expires
)
if access_token:
return {"access_token": access_token, "token_type": "bearer"}
else:
return None
except Exception as e:
logger.exception(e)
This diff is collapsed.
from typing import Union from typing import Union
from pydantic import BaseModel, EmailStr from pydantic import BaseModel
# model for normal login # model for login request
class NormalLogin(BaseModel): class LoginRequest(BaseModel):
username: Union[EmailStr, None] = None
password: Union[str, None] = None
project_id: Union[str, None] = None project_id: Union[str, None] = None
payload: Union[dict, None] = None
login_type: Union[str, None] = None
from fastapi import APIRouter, status from fastapi import APIRouter, HTTPException, status
from fastapi.responses import JSONResponse
from scripts.constants.api import ApiEndPoints from scripts.constants.api import ApiEndPoints
from scripts.core.handlers.login_handler import LoginHandlers from scripts.core.handlers.login_handler import LoginHandlers
from scripts.errors import ErrorMessages
from scripts.logging.logger import logger from scripts.logging.logger import logger
from scripts.schemas.default_responses import DefaultResponse, DefaultFailureResponse from scripts.schemas.login_schema import LoginRequest
from scripts.schemas.login_schema import NormalLogin
# creating the login api # creating the login api
router = APIRouter(prefix=ApiEndPoints.version) router = APIRouter(prefix=ApiEndPoints.version)
...@@ -14,38 +11,27 @@ router = APIRouter(prefix=ApiEndPoints.version) ...@@ -14,38 +11,27 @@ router = APIRouter(prefix=ApiEndPoints.version)
obj_login_handler = LoginHandlers() obj_login_handler = LoginHandlers()
@router.post(ApiEndPoints.asset_manager_submit) @router.routes(ApiEndPoints.asset_manager_submit)
async def login_default(login_data: NormalLogin): async def login_default(request: LoginRequest):
try: try:
# decrypting the password from the UI # v1
decrypted_password = obj_login_handler.password_decrypt(login_data.password) if request.login_type == "normal":
# validating the received inputs empty or not return obj_login_handler.normal_login(request)
response = obj_login_handler.user_data_validation(login_data.username, decrypted_password)
# Account is not registered # v1
if response is not None: elif request.login_type == "google":
return JSONResponse(content=DefaultFailureResponse(error=response["message"]).dict(), return obj_login_handler.google_login(request)
status_code=status.HTTP_400_BAD_REQUEST)
# checking for the account and password matching # v2
response, data = obj_login_handler.db_password_matching(login_data, decrypted_password) elif request.login_type == "microsoft":
# if the user is not valid return obj_login_handler.microsoft_login(request)
if response is not None and data["message"] == ErrorMessages.ERROR_UNAUTHORIZED_USER_LOGIN:
return JSONResponse(content=DefaultFailureResponse(error=data).dict(), else:
status_code=status.HTTP_404_NOT_FOUND) return HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid Request")
# if the passwords doesn't match with the db data
if response is not None:
return JSONResponse(content=DefaultFailureResponse(error=data).dict(),
status_code=status.HTTP_401_UNAUTHORIZED)
# generating the access tokens
response = obj_login_handler.generate_tokens(login_data, data)
# token generation unsuccessful
if response is None:
return JSONResponse(
content=DefaultFailureResponse(message="Access Unsuccessful",
error=ErrorMessages.ERROR_TOKEN_GENERATION).dict(),
status_code=status.HTTP_403_FORBIDDEN)
# sending successful response to UI
return JSONResponse(
content=DefaultResponse(message="Login Successful", payload=response).dict(),
status_code=status.HTTP_200_OK)
except Exception as e: except Exception as e:
logger.exception(e) logger.exception(e)
@router.routes(ApiEndPoints.asset_manager_user_registration)
async def user_register():
return {"message": "Available soon"}
class MongoQueries:
@staticmethod
def insert_user_query(login_data, project_id, hashed_password, time_dt):
query = {"project_id": project_id, "name": "", "email": login_data.username,
"password": hashed_password,
"user_role": "guest",
"is_alive": True, "created_at": time_dt,
"updated_at": time_dt}
return query
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment