Commit 8d15b8cc authored by arun.uday's avatar arun.uday

AssetManager-V1.0- To be reviewed

Added jwt token generation using bearer
parent 1a9a31b5
......@@ -7,7 +7,7 @@ SERVICE_PORT=8671
PROJECT_NAME=AssetManager
PROJECT_ID=1256
base_path=scripts/
sub_path=log/
BASE_PATH=scripts/
SUB_PATH=log/
ENCODING_TYPE=utf-8
\ No newline at end of file
......@@ -4,4 +4,7 @@ pydantic~=1.10.6
fastapi~=0.94.1
passlib~=1.7.4
pymongo~=4.3.3
bcrypt~=4.0.1
\ No newline at end of file
bcrypt~=4.0.1
email-validator~=1.3.1
pycryptodomex~=3.17
PyJWT~=2.6.0
\ No newline at end of file
from __future__ import annotations
import base64
import datetime
from datetime import datetime, timedelta
import jwt
from Cryptodome.Cipher import AES
from passlib.context import CryptContext
......@@ -20,8 +21,8 @@ class LoginHandlers:
self.pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
self.db_user_data = None
self.db_user_data = None
self.dt = datetime.datetime.now()
self.time_dt = datetime.datetime.now()
self.dt = datetime.now()
self.time_dt = datetime.now()
@staticmethod
def un_pad(s):
......@@ -115,3 +116,36 @@ class LoginHandlers:
return None, {"username": login_data.username, "role": self.db_user_data["user_role"]}
except Exception as e:
logger.exception(e)
@staticmethod
def create_access_token(data: dict, expires_delta):
try:
# creating a copy of the data
to_encode = data.copy()
# checking if the expires_delta is empty
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
# updating the data with the expiration time
to_encode.update({"expire": expire.isoformat()})
# creating the token
encoded_jwt = jwt.encode(to_encode, Services.SECRET_KEY, algorithm=Services.ALGORITHM)
return encoded_jwt
except Exception as e:
logger.exception(e)
def generate_tokens(self, login_data, data):
try:
# creating the expiration time
access_token_expires = timedelta(minutes=Services.ACCESS_TOKEN_EXPIRE_MINUTES)
# creating the access token
access_token = self.create_access_token(
data={"username": login_data.username, "role": data["role"]}, expires_delta=access_token_expires
)
if access_token:
return {"access_token": access_token, "token_type": "bearer"}
else:
return None
except Exception as e:
logger.exception(e)
......@@ -9,3 +9,4 @@ class ErrorMessages:
ERROR_UNAUTHORIZED_USER_LOGIN = "Account is not available"
ERROR_USER_NOT_REGISTERED = "Account is not registered in the portal."
ERROR_PASSWORD_MISMATCH = "Passwords Authentication Failed. Please enter the correct password"
ERROR_TOKEN_GENERATION = "Unsuccessful token generation"
This diff is collapsed.
......@@ -15,7 +15,7 @@ obj_login_handler = LoginHandlers()
@router.post(ApiEndPoints.asset_manager_submit)
def login_default(login_data: NormalLogin):
async def login_default(login_data: NormalLogin):
try:
# decrypting the password from the UI
decrypted_password = obj_login_handler.password_decrypt(login_data.password)
......@@ -27,16 +27,25 @@ def login_default(login_data: NormalLogin):
status_code=status.HTTP_400_BAD_REQUEST)
# checking for the account and password matching
response, data = obj_login_handler.db_password_matching(login_data, decrypted_password)
# if the user is not valid
if response is not None and data["message"] == ErrorMessages.ERROR_UNAUTHORIZED_USER_LOGIN:
return JSONResponse(content=DefaultFailureResponse(error=data).dict(),
status_code=status.HTTP_404_NOT_FOUND)
# if the passwords doesn't match with the db data
if response is not None:
return JSONResponse(content=DefaultFailureResponse(error=data).dict(),
status_code=status.HTTP_401_UNAUTHORIZED)
# generating the access tokens
response = obj_login_handler.generate_tokens(login_data, data)
# token generation unsuccessful
if response is None:
return JSONResponse(
content=DefaultFailureResponse(message="Access Unsuccessful",
error=ErrorMessages.ERROR_TOKEN_GENERATION).dict(),
status_code=status.HTTP_403_FORBIDDEN)
# sending successful response to UI
return JSONResponse(
content=DefaultResponse(message="Login Successful", payload=data).dict(),
content=DefaultResponse(message="Login Successful", payload=response).dict(),
status_code=status.HTTP_200_OK)
except Exception as e:
logger.exception(e)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment