Commit 86779f3c authored by arun.uday's avatar arun.uday

AssetManager-V1.0- Not reviewed

Updated the code with cookie and token generation, added the redis db for token storage in server side, added fields for google and microsoft login, added updation of login time.
parent 75ec7a5c
......@@ -5,6 +5,6 @@ class ApiEndPoints:
# common
asset_manager_submit: str = "/submit"
asset_manager_user_registration: str = "/register"
# login-management
asset_manager_login: str = "/login"
......@@ -10,22 +10,28 @@ class LoginHandlers:
self.obj_login_handler = NormalLogin()
def normal_login(self, login_data, request):
# decrypting the password from the UI
decrypted_password = self.obj_login_handler.password_decrypt(login_data.payload["password"])
# validating the received inputs empty or not
response = self.obj_login_handler.user_data_validation(login_data.payload["username"], decrypted_password)
response = self.obj_login_handler.user_data_validation(
login_data.payload["username"],
login_data.project_id,
decrypted_password)
# Account is not registered
if response is not None:
return JSONResponse(content=DefaultFailureResponse(error=response).dict(),
status_code=status.HTTP_400_BAD_REQUEST)
# checking for the account and password matching
response, data = self.obj_login_handler.db_password_matching(login_data.project_id,
login_data.payload,
response, data = self.obj_login_handler.db_password_matching(login_data.payload,
decrypted_password)
# if the passwords doesn't match with the db data
if response is not None:
return JSONResponse(content=DefaultFailureResponse(error=data).dict(),
status_code=status.HTTP_401_UNAUTHORIZED)
# generating the access tokens
response = self.obj_login_handler.generate_cookie_tokens(login_data.payload, request)
# token generation unsuccessful
......
......@@ -7,9 +7,8 @@ from Cryptodome.Cipher import AES
from passlib.context import CryptContext
from validate_email import validate_email
from scripts.config import *
from scripts.config import Services, Secrets
from scripts.database.mongo.mongo_login import MongoUser
from scripts.database.redis.redis_conn import login_db
from scripts.errors import ErrorMessages
from scripts.logging.logger import logger
from scripts.utils.security.apply_encrytion_util import create_token
......@@ -52,7 +51,7 @@ class NormalLogin:
logger.exception(e)
@staticmethod
def user_data_validation(username, password) -> dict | None:
def user_data_validation(username, project_id, password) -> dict | None:
try:
# checking for valid username
if username == "" or username == "user@example.com" or validate_email(username) is not True:
......@@ -60,39 +59,47 @@ class NormalLogin:
# checking for valid password
if password == "" or password == "string":
return {"message": ErrorMessages.ERROR_INVALID_PASSWORD, "data": password}
# check if the project id matches
if Services.PROJECT_ID != project_id:
return {"message": ErrorMessages.ERROR_INVALID_PROJECT_ID, "data": project_id}
return None
except Exception as e:
logger.exception(e)
def db_data_validation(self, project_id, login_data):
def db_data_validation(self, username):
try:
# fetching the data based on the username
self.db_user_data = MongoUser().fetch_user_details(login_data["username"])
self.db_user_data = MongoUser().fetch_user_details(username)
# if the user is not available
if not self.db_user_data:
return False, {"message": ErrorMessages.ERROR_UNAUTHORIZED_USER_LOGIN,
"data": {"username": login_data["username"]}}
"data": {"username": username}}
# if the user is not registered through normal login
if self.db_user_data["login_type"] != "normal":
return False, {"message": ErrorMessages.ERROR_LOGIN_TYPE_INVALID,
"data": {"username": username, "Use Login": self.db_user_data["login_type"]}}
# Check the project id from the request body
if self.db_user_data["project_id"] != Services.PROJECT_ID or Services.PROJECT_ID != project_id:
return False, {"message": ErrorMessages.ERROR_UNAUTHORIZED_USER_LOGIN, "data": login_data.username}
if self.db_user_data["project_id"] != Services.PROJECT_ID:
return False, {"message": ErrorMessages.ERROR_UNAUTHORIZED_USER_LOGIN, "data": username}
# if the user exist
return None, {"message": True}
except Exception as e:
logger.exception(e)
def db_password_matching(self, project_id, login_data, password):
def db_password_matching(self, payload, password):
try:
# getting the response after checking for the user data in db
response, message = self.db_data_validation(project_id, login_data)
response, message = self.db_data_validation(payload["username"])
# if the response is false then an error message is send back
if response is not None:
return response, message
# if the user exists in db then password is matched
if not self.pwd_context.verify(password, self.db_user_data["password"]):
return False, {"message": ErrorMessages.ERROR_PASSWORD_MISMATCH,
"data": {"username": login_data["username"]}}
"data": {"username": payload["username"]}}
# if the password is correct
return None, {"username": login_data["username"], "role": self.db_user_data["user_role"]}
return None, {"username": payload["username"], "role": self.db_user_data["user_role"]}
except Exception as e:
logger.exception(e)
......@@ -105,6 +112,7 @@ class NormalLogin:
ip=request.ip_address,
project_id=Services.PROJECT_ID,
)
# returning the login token
if access_token:
return {"user_id": access_token, "token_type": "bearer"}
else:
......
from scripts.config import Databases
from scripts.utils.mongo_utils import MongoConnect
# creating the mongo connection
mongo_obj = MongoConnect(uri=Databases.MONGO_URI)
mongo_client = mongo_obj()
CollectionBaseClass = mongo_obj.get_base_class()
......@@ -27,12 +27,14 @@ class MongoUser(CollectionBaseClass):
def __init__(self):
super().__init__(mongo_client, Databases.DB_NAME, collection_name)
# fetching the user details based on the email id
def fetch_user_details(self, email):
if user := self.find_one(query={self.key_email: email}):
return user
return None
def insert_user(self, query):
if user := self.insert_one(data=query):
# updating the login time
def update_user(self, update, query):
if user := self.update_one(query=update, data=query):
return user
return None
......@@ -2,8 +2,10 @@ import redis
from scripts.config import Databases
# creating the redis connection
redis_uri = Databases.REDIS_URI
# user login db
login_db = redis.from_url(
redis_uri, db=int(Databases.REDIS_LOGIN_DB), decode_responses=True
)
......@@ -6,7 +6,9 @@ class ErrorMessages:
ERROR_INVALID_LOGIN = "Your are not authorized to view this website."
ERROR_INVALID_USERNAME = "Invalid Username"
ERROR_INVALID_PASSWORD = "Invalid Password"
ERROR_INVALID_PROJECT_ID = "Invalid Project Id"
ERROR_UNAUTHORIZED_USER_LOGIN = "Account is not available"
ERROR_LOGIN_TYPE_INVALID = "Invalid Login Method"
ERROR_USER_NOT_REGISTERED = "Account is not registered in the portal."
ERROR_PASSWORD_MISMATCH = "Passwords Authentication Failed. Please enter the correct password"
ERROR_TOKEN_GENERATION = "Unsuccessful token generation"
This diff is collapsed.
from typing import Union
from fastapi import APIRouter, HTTPException, status, Depends
from scripts.constants.api import ApiEndPoints
......@@ -19,20 +17,22 @@ async def login_default(
user_data: LoginRequest, request: MetaInfoSchema = Depends(auth)
):
try:
# v1
if user_data.login_type == "normal":
return obj_login_handler.normal_login(user_data, request)
# v1
elif user_data.login_type == "google":
return obj_login_handler.google_login(user_data)
# v2
elif user_data.login_type == "microsoft":
return obj_login_handler.microsoft_login(user_data)
# mapper for login types
login_mapper = {
"normal": obj_login_handler.normal_login,
"google": obj_login_handler.google_login,
"microsoft": obj_login_handler.microsoft_login
}
# getting the functions based on the login types
if user_data.login_type in login_mapper:
return login_mapper[user_data.login_type](user_data, request)
else:
return HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid Request")
return HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid Request")
except Exception as e:
logger.exception(e)
......
import uuid
from datetime import datetime, timedelta, timezone
from datetime import datetime, timedelta
from scripts.config import Secrets
from scripts.database.mongo.mongo_login import MongoUser
from scripts.database.redis.redis_conn import login_db
from scripts.utils.security.jwt_util import JWT
jwt = JWT()
mongo_user = MongoUser()
def create_token(
......@@ -17,17 +18,25 @@ def create_token(
"""
This method is to create a cookie
"""
# creating the payload
payload = {"ip": ip, "user_id": user_id, "token": Secrets.SECRET_KEY, "age": age}
if project_id:
payload["project_id"] = project_id
exp = datetime.now() + timedelta(minutes=age)
# getting the current time
current_time = datetime.now()
# generating the expiry time of the token
exp = current_time + timedelta(minutes=age)
# creating the dictionary with issuer and expiry time
_extras = {"iss": Secrets.issuer, "exp": exp}
_payload = payload | _extras
# encoding the token
new_token = jwt.encode(_payload)
# Add session to redis
login_db.set(user_id, new_token)
login_db.expire(user_id, timedelta(minutes=age))
# Add updated time to mongo db
mongo_user.update_user({"email": user_id}, {"updated_at": current_time})
return user_id
......@@ -45,24 +45,30 @@ class _CookieAuthentication(APIKeyBase):
self.jwt = JWT()
def __call__(self, request: Request, response: Response) -> MetaInfoSchema:
# getting the cookie from the request
cookies = request.cookies
# checking if the request has valid cookie
login_token = cookies.get(self.cookie_name) or request.headers.get(
self.cookie_name
)
if not login_token or login_token != Services.PROJECT_NAME:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
# if the cookie name is same as the service name
if login_token == Services.PROJECT_NAME:
return MetaInfoSchema(
project_id=Services.PROJECT_ID,
ip_address=request.client.host, # type: ignore
login_token=cookies.get("login-token"),
)
# getting the token stored in redis based on the cookie value
jwt_token = self.login_redis.get(login_token)
if not jwt_token:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
try:
# validating the token
decoded_token = self.jwt.validate(token=jwt_token)
if not decoded_token:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
......@@ -73,6 +79,7 @@ class _CookieAuthentication(APIKeyBase):
detail=ErrorMessages.UNKNOWN_ERROR,
)
# checking if the token has necessary fields
user_id = decoded_token.get("user_id")
project_id = decoded_token.get("project_id")
if not user_id or not project_id:
......
......@@ -8,10 +8,11 @@ from scripts.config import Secrets
class JWT:
def __init__(self) -> None:
self.max_login_age: int = Secrets.ACCESS_TOKEN_EXPIRE_MINUTES
# self.issuer: str = Secrets.issuer
self.issuer: str = Secrets.issuer
self.alg: str = Secrets.ALGORITHM
self.key = Secrets.SECRET_KEY
# encoding the payload
def encode(self, payload) -> str:
try:
return jwt.encode(payload, self.key, algorithm=self.alg)
......@@ -19,6 +20,7 @@ class JWT:
logging.exception(f"Exception while encoding JWT: {str(e)}")
raise
# decoding the payload
def decode(self, token):
try:
return jwt.decode(token, self.key, algorithms=self.alg)
......@@ -26,12 +28,15 @@ class JWT:
logging.exception(f"Exception while encoding JWT: {str(e)}")
raise
# validate the payload
def validate(self, token):
try:
return jwt.decode(
token,
self.key,
algorithms=self.alg
algorithms=self.alg,
leeway=Secrets.leeway_in_minutes,
options={"require": ["exp", "iss"]},
)
except Exception as e:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment