Commit 1df069bc authored by arun.uday's avatar arun.uday

Merge branch '1-arun-uday-asset-manager-v1-0' into 'master'

1 arun uday asset manager v1 0

See merge request !2
parents 16af59f4 0f326899
MONGO_URI=mongodb://localhost:27017 MONGO_URI=mongodb://localhost:27017
DB_NAME=userDB DB_NAME=userDB
SERVICE_HOST=0.0.0.0 REDIS_URI=redis://127.0.0.1:6379
REDIS_LOGIN_DB=10
SERVICE_HOST=127.0.0.1
SERVICE_PORT=8671 SERVICE_PORT=8671
PROJECT_NAME=AssetManager PROJECT_NAME=AssetManager
PROJECT_ID=1256
base_path=scripts/ BASE_PATH=scripts/
sub_path=log/ SUB_PATH=log/
ENCODING_TYPE=utf-8 ENCODING_TYPE=utf-8
\ No newline at end of file
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
Author: Arun Uday Author: Arun Uday
Email: arun.uday@knowledgelens.com Email: arun.uday@knowledgelens.com
Asset Manager Login For Normal User Login Asset Manager Login / Registration
--------------------------------------------------------- ---------------------------------------------------------
""" """
......
uvicorn~=0.21.0 uvicorn==0.21.1
python-dotenv~=1.0.0 python-dotenv~=1.0.0
pydantic~=1.10.6 pydantic~=1.10.6
fastapi~=0.94.1 fastapi==0.95.0
passlib~=1.7.4 passlib~=1.7.4
pymongo~=4.3.3 pymongo~=4.3.3
bcrypt~=4.0.1 bcrypt~=4.0.1
\ No newline at end of file email-validator~=1.3.1
pycryptodomex~=3.17
PyJWT~=2.6.0
validate_email~=1.3
redis~=4.5.2
\ No newline at end of file
...@@ -8,7 +8,6 @@ class _Services(BaseSettings): ...@@ -8,7 +8,6 @@ class _Services(BaseSettings):
HOST: str = Field(default="127.0.0.1", env="service_host") HOST: str = Field(default="127.0.0.1", env="service_host")
PORT: int = Field(default=8000, env="service_port") PORT: int = Field(default=8000, env="service_port")
PROJECT_NAME = Field(default="AssetManager", env="project_name") PROJECT_NAME = Field(default="AssetManager", env="project_name")
PROJECT_ID = Field(default="1256", env="project_id")
ENCODING_TYPE = Field(default="utf-8", env="encoding_type") ENCODING_TYPE = Field(default="utf-8", env="encoding_type")
ENABLE_CORS: bool = True ENABLE_CORS: bool = True
CORS_URLS: list[str] = ["*.ilens.io"] CORS_URLS: list[str] = ["*.ilens.io"]
...@@ -17,15 +16,13 @@ class _Services(BaseSettings): ...@@ -17,15 +16,13 @@ class _Services(BaseSettings):
CORS_ALLOW_HEADERS: list[str] = ["*"] CORS_ALLOW_HEADERS: list[str] = ["*"]
LOG_LEVEL: Literal["INFO", "DEBUG", "ERROR", "QTRACE"] = "INFO" LOG_LEVEL: Literal["INFO", "DEBUG", "ERROR", "QTRACE"] = "INFO"
ENABLE_FILE_LOGGING: bool = False ENABLE_FILE_LOGGING: bool = False
KEY_ENCRYPTION = "kliLensKLiLensKL"
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
class _Databases(BaseSettings): class _Databases(BaseSettings):
MONGO_URI: str MONGO_URI: str
DB_NAME: str DB_NAME: str
REDIS_URI: str
REDIS_LOGIN_DB: int
class _BasePathConf(BaseSettings): class _BasePathConf(BaseSettings):
...@@ -37,12 +34,23 @@ class _PathConf: ...@@ -37,12 +34,23 @@ class _PathConf:
LOG_PATH: pathlib.Path = BASE_PATH / "log/" LOG_PATH: pathlib.Path = BASE_PATH / "log/"
class _Secrets(BaseSettings):
ACCESS_TOKEN_EXPIRE_MINUTES = 30
leeway_in_minutes: int = 10
KEY_ENCRYPTION = "kliLensKLiLensKL"
issuer: str = "iotManager"
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
Services = _Services() Services = _Services()
Databases = _Databases() Databases = _Databases()
PathConf = _PathConf() PathConf = _PathConf()
Secrets = _Secrets()
__all__ = [ __all__ = [
"Services", "Services",
"Databases", "Databases",
"PathConf", "PathConf",
"Secrets"
] ]
from pydantic import BaseSettings, Field
PROJECT_NAME = "IotManager"
class _Services(BaseSettings):
HOST: str = Field(default="127.0.0.1", env="service_host")
PORT: int = Field(default=8000, env="service_port")
class _Databases(BaseSettings):
MONGO_URI: str
Services = _Services()
Databases = _Databases()
__all__ = [
"PROJECT_NAME",
"Services",
"Databases",
]
...@@ -3,7 +3,19 @@ class ApiEndPoints: ...@@ -3,7 +3,19 @@ class ApiEndPoints:
version = "/v1" version = "/v1"
# common # common
asset_manager_submit: str = "/submit" submit: str = "/submit"
create: str = "/create"
insert: str = "/insert"
update: str = "/update"
delete: str = "/delete"
# login-management # login-management
asset_manager_login: str = "/login" asset_manager_login: str = "/login"
asset_manager_submit: str = asset_manager_login + submit
# user-registration
asset_manager_user_registration: str = "/register"
asset_manager_user_add: str = asset_manager_user_registration + create
asset_manager_user_update: str = asset_manager_user_registration + update
asset_manager_user_delete: str = asset_manager_user_registration + delete
from __future__ import annotations from scripts.core.handlers.normal_login import NormalLogin
from fastapi.responses import JSONResponse
import base64 from fastapi import status
import datetime
from Cryptodome.Cipher import AES
from passlib.context import CryptContext
from scripts.config import Services
from scripts.database.mongo.mongo_login import MongoUser
from scripts.errors import ErrorMessages from scripts.errors import ErrorMessages
from scripts.logging.logger import logger from scripts.schemas.default_responses import DefaultFailureResponse, DefaultResponse
from scripts.utils.mongo_default_queries import MongoQueries from scripts.utils.security.password_decryption_util import DecryptPassword
class LoginHandlers: class LoginHandlers:
def __init__(self): def __init__(self):
self.mongo_user = MongoUser() self.obj_login_handler = NormalLogin()
self.mongo_queries = MongoQueries() self.pass_decrypt = DecryptPassword()
self.pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") self.login_type = ""
self.db_user_data = None
self.db_user_data = None def normal_login(self, user_data, request):
self.dt = datetime.datetime.now() self.login_type = "normal"
self.time_dt = datetime.datetime.now() # decrypting the password from the UI
decrypted_password = self.pass_decrypt.password_decrypt(user_data.password)
@staticmethod
def un_pad(s): # validating the received inputs empty or not
# un_padding the encrypted password response = self.obj_login_handler.user_data_validation(
return s[:-ord(s[len(s) - 1:])] user_data.username,
decrypted_password)
def password_decrypt(self, password):
try: # Account is not registered
# encoding the Key if response is not None:
key = Services.KEY_ENCRYPTION.encode(Services.ENCODING_TYPE) return JSONResponse(content=DefaultFailureResponse(error=response).dict(),
# decoding the received password status_code=status.HTTP_400_BAD_REQUEST)
enc = base64.b64decode(password) # checking for the account and password matching
# mode for the decryption response, data = self.obj_login_handler.db_password_matching(self.login_type, user_data,
mode = AES.MODE_CBC decrypted_password)
# getting the initialization vector # if the passwords doesn't match with the db data
iv = enc[:AES.block_size] if response is not None:
# decoding with AES return JSONResponse(content=DefaultFailureResponse(error=data).dict(),
cipher = AES.new(key, mode, iv) status_code=status.HTTP_401_UNAUTHORIZED)
# decoding the password
data = cipher.decrypt(enc[AES.block_size:]) # generating the access tokens
if len(data) == 0: response = self.obj_login_handler.generate_cookie_tokens(user_data, request)
raise ValueError("Decrypted data is empty") # token generation unsuccessful
# removing the padding to get the password if response is None:
data = self.un_pad(data) return JSONResponse(
return data.decode('utf-8') content=DefaultFailureResponse(message="Access Unsuccessful",
except Exception as e: error=ErrorMessages.ERROR_TOKEN_GENERATION).dict(),
logger.exception(e) status_code=status.HTTP_403_FORBIDDEN)
# sending successful response to UI
@staticmethod return JSONResponse(
def user_data_validation(username, password) -> dict | None: content=DefaultResponse(message="Login Successful", data=response).dict(),
try: status_code=status.HTTP_200_OK)
# checking for valid username
if username == "" or username == "user@example.com": # v1
return {"message": ErrorMessages.ERROR_INVALID_USERNAME, "data": username} def google_login(self, request):
# checking for valid password pass
if password == "" or password == "string":
return {"message": ErrorMessages.ERROR_INVALID_PASSWORD, "data": password} # v2
return None def microsoft_login(self, request):
except Exception as e: pass
logger.exception(e)
def new_user_login(self, login_data, password):
# check the domain of the email the user has entered
if login_data.username.split("@")[-1] == "knowledgelens.com":
# hash the password
hashed_password = self.pwd_context.hash(password)
# Enter the user as a guest user to the db
if self.mongo_user.insert_user(
self.mongo_queries.insert_user_query(
login_data,
Services.PROJECT_ID,
hashed_password,
self.time_dt)):
return True
return False
def db_data_validation(self, login_data, password):
try:
# fetching the data based on the username
self.db_user_data = MongoUser().fetch_user_details(login_data.username)
# if the user is not available
if not self.db_user_data:
# if the domain of the user email is knowledge lens create the user as a guest user
if self.new_user_login(login_data, password):
return True, {"message": "new_user", "username": login_data.username, "role": "guest"}
else:
return False, {"message": ErrorMessages.ERROR_UNAUTHORIZED_USER_LOGIN,
"data": {"username": login_data.username}}
# Check the project id from the request body
if self.db_user_data["project_id"] != Services.PROJECT_ID or Services.PROJECT_ID != login_data.project_id:
return False, {"message": ErrorMessages.ERROR_UNAUTHORIZED_USER_LOGIN, "data": login_data.username}
# if the user exist
return None, {"message": True}
except Exception as e:
logger.exception(e)
def db_password_matching(self, login_data, password):
try:
# getting the response after checking for the user data in db
response, message = self.db_data_validation(login_data, password)
if response and message["message"] == "new_user":
return None, message
# if the response is false then an error message is send back
if response is not None:
return response, message
# if the user exists in db then password is matched
if not self.pwd_context.verify(password, self.db_user_data["password"]):
return False, {"message": ErrorMessages.ERROR_PASSWORD_MISMATCH,
"data": {"username": login_data.username}}
# if the password is correct
return None, {"username": login_data.username, "role": self.db_user_data["user_role"]}
except Exception as e:
logger.exception(e)
from __future__ import annotations
from datetime import datetime
from passlib.context import CryptContext
from validate_email import validate_email
from scripts.database.mongo.mongo_login import MongoUser
from scripts.errors import ErrorMessages
from scripts.logging.logger import logger
from scripts.utils.security.apply_encrytion_util import create_token
class NormalLogin:
def __init__(self):
self.pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
self.db_user_data = None
self.db_user_data = None
self.dt = datetime.now()
self.time_dt = datetime.now()
@staticmethod
def user_data_validation(username, password) -> dict | None:
try:
# checking for valid username
if username == "" or username == "user@example.com" or validate_email(username) is not True:
return {"message": ErrorMessages.ERROR_INVALID_USERNAME, "data": username}
# checking for valid password
if password == "" or password == "string":
return {"message": ErrorMessages.ERROR_INVALID_PASSWORD, "data": password}
return None
except Exception as e:
logger.exception(e)
def db_data_validation(self, login_type, username):
try:
# fetching the data based on the username
self.db_user_data = MongoUser().fetch_user_details(username)
# if the user is not available
if not self.db_user_data:
return False, {"message": ErrorMessages.ERROR_UNAUTHORIZED_USER_LOGIN,
"data": {"username": username}}
# if the user is not registered through normal login
if self.db_user_data["login_type"] != login_type:
return False, {"message": ErrorMessages.ERROR_LOGIN_TYPE_INVALID,
"data": {"username": username, "Use Login": self.db_user_data["login_type"]}}
# if the user exist
return None, {"message": True}
except Exception as e:
logger.exception(e)
def db_password_matching(self, login_type, user_data, password):
try:
# getting the response after checking for the user data in db
response, message = self.db_data_validation(login_type, user_data.username)
# if the response is false then an error message is send back
if response is not None:
return response, message
# if the user exists in db then password is matched
if not self.pwd_context.verify(password, self.db_user_data["password"]):
return False, {"message": ErrorMessages.ERROR_PASSWORD_MISMATCH,
"data": {"username": user_data.username}}
# if the password is correct
return None, {"username": user_data.username, "role": self.db_user_data["user_role"]}
except Exception as e:
logger.exception(e)
@staticmethod
def generate_cookie_tokens(user_data, request):
try:
# creating the access token
access_token = create_token(
user_id=user_data.username,
ip=request.ip_address
)
# returning the login token
if access_token:
return {"user_id": access_token, "token_type": "bearer"}
else:
return None
except Exception as e:
logger.exception(e)
from scripts.config import Databases from scripts.config import Databases
from scripts.utils.mongo_utils import MongoConnect from scripts.utils.mongo_utils import MongoConnect
# creating the mongo connection
mongo_obj = MongoConnect(uri=Databases.MONGO_URI) mongo_obj = MongoConnect(uri=Databases.MONGO_URI)
mongo_client = mongo_obj() mongo_client = mongo_obj()
CollectionBaseClass = mongo_obj.get_base_class() CollectionBaseClass = mongo_obj.get_base_class()
...@@ -27,12 +27,14 @@ class MongoUser(CollectionBaseClass): ...@@ -27,12 +27,14 @@ class MongoUser(CollectionBaseClass):
def __init__(self): def __init__(self):
super().__init__(mongo_client, Databases.DB_NAME, collection_name) super().__init__(mongo_client, Databases.DB_NAME, collection_name)
# fetching the user details based on the email id
def fetch_user_details(self, email): def fetch_user_details(self, email):
if user := self.find_one(query={self.key_email: email}): if user := self.find_one(query={self.key_email: email}):
return user return user
return None return None
def insert_user(self, query): # updating the login time
if user := self.insert_one(data=query): def update_user(self, update, query):
if user := self.update_one(query=update, data=query):
return user return user
return None return None
import redis
from scripts.config import Databases
# creating the redis connection
redis_uri = Databases.REDIS_URI
# user login db
login_db = redis.from_url(
redis_uri, db=int(Databases.REDIS_LOGIN_DB), decode_responses=True
)
...@@ -7,5 +7,7 @@ class ErrorMessages: ...@@ -7,5 +7,7 @@ class ErrorMessages:
ERROR_INVALID_USERNAME = "Invalid Username" ERROR_INVALID_USERNAME = "Invalid Username"
ERROR_INVALID_PASSWORD = "Invalid Password" ERROR_INVALID_PASSWORD = "Invalid Password"
ERROR_UNAUTHORIZED_USER_LOGIN = "Account is not available" ERROR_UNAUTHORIZED_USER_LOGIN = "Account is not available"
ERROR_LOGIN_TYPE_INVALID = "Invalid Login Method"
ERROR_USER_NOT_REGISTERED = "Account is not registered in the portal." ERROR_USER_NOT_REGISTERED = "Account is not registered in the portal."
ERROR_PASSWORD_MISMATCH = "Passwords Authentication Failed. Please enter the correct password" ERROR_PASSWORD_MISMATCH = "Passwords Authentication Failed. Please enter the correct password"
ERROR_TOKEN_GENERATION = "Unsuccessful token generation"
This diff is collapsed.
...@@ -7,7 +7,7 @@ from pydantic import BaseModel ...@@ -7,7 +7,7 @@ from pydantic import BaseModel
class DefaultResponse(BaseModel): class DefaultResponse(BaseModel):
status: bool = True status: bool = True
message: Optional[str] message: Optional[str]
payload: Optional[Any] data: Optional[Any]
# default failure responses # default failure responses
......
from typing import Union
from pydantic import BaseModel, EmailStr
# model for normal login
class NormalLogin(BaseModel):
username: Union[EmailStr, None] = None
password: Union[str, None] = None
project_id: Union[str, None] = None
from typing import Union
from pydantic import BaseModel
# model for login request
class LoginRequest(BaseModel):
username: Union[str, None] = None
password: Union[str, None] = None
class RegistrationData:
name: Union[str, None] = None
email: Union[str: None] = None
password: Union[str: None] = None
user_role: Union[str: None] = None
is_alive: Union[bool: None] = None
created_at: Union[str: None] = None
from fastapi import APIRouter, status from fastapi import APIRouter, HTTPException, status, Depends
from fastapi.responses import JSONResponse
from scripts.constants.api import ApiEndPoints from scripts.constants.api import ApiEndPoints
from scripts.core.handlers.login_handler import LoginHandlers from scripts.core.handlers.login_handler import LoginHandlers
from scripts.errors import ErrorMessages
from scripts.logging.logger import logger from scripts.logging.logger import logger
from scripts.schemas.default_responses import DefaultResponse, DefaultFailureResponse from scripts.schemas.project_schema import LoginRequest
from scripts.schemas.login_schema import NormalLogin from scripts.utils.security.decorators import MetaInfoSchema, auth
# creating the login api # creating the login api
router = APIRouter(prefix=ApiEndPoints.version) router = APIRouter(prefix=ApiEndPoints.version)
...@@ -14,29 +12,35 @@ router = APIRouter(prefix=ApiEndPoints.version) ...@@ -14,29 +12,35 @@ router = APIRouter(prefix=ApiEndPoints.version)
obj_login_handler = LoginHandlers() obj_login_handler = LoginHandlers()
@router.post(ApiEndPoints.asset_manager_submit) @router.post(ApiEndPoints.asset_manager_login)
def login_default(login_data: NormalLogin): async def login_default(
login_type: str,
user_data: LoginRequest,
request: MetaInfoSchema = Depends(auth)
):
try: try:
# decrypting the password from the UI # mapper for login types
decrypted_password = obj_login_handler.password_decrypt(login_data.password) login_mapper = {
# validating the received inputs empty or not "normal": obj_login_handler.normal_login,
response = obj_login_handler.user_data_validation(login_data.username, decrypted_password) "google": obj_login_handler.google_login,
# Account is not registered "microsoft": obj_login_handler.microsoft_login
if response is not None: }
return JSONResponse(content=DefaultFailureResponse(error=response["message"]).dict(),
status_code=status.HTTP_400_BAD_REQUEST) # getting the functions based on the login types
# checking for the account and password matching if login_type in login_mapper:
response, data = obj_login_handler.db_password_matching(login_data, decrypted_password) return login_mapper[login_type](user_data, request)
if response is not None and data["message"] == ErrorMessages.ERROR_UNAUTHORIZED_USER_LOGIN: else:
return JSONResponse(content=DefaultFailureResponse(error=data).dict(), return HTTPException(
status_code=status.HTTP_404_NOT_FOUND) status_code=status.HTTP_403_FORBIDDEN,
if response is not None: detail="Invalid Request")
return JSONResponse(content=DefaultFailureResponse(error=data).dict(),
status_code=status.HTTP_401_UNAUTHORIZED)
# sending successful response to UI
return JSONResponse(
content=DefaultResponse(message="Login Successful", payload=data).dict(),
status_code=status.HTTP_200_OK)
except Exception as e: except Exception as e:
logger.exception(e) logger.exception(e)
@router.post(ApiEndPoints.asset_manager_user_registration)
async def user_register(
user_data: LoginRequest,
request: MetaInfoSchema = Depends(auth)
):
print(user_data, request)
class MongoQueries:
@staticmethod
def insert_user_query(login_data, project_id, hashed_password, time_dt):
query = {"project_id": project_id, "name": "", "email": login_data.username,
"password": hashed_password,
"user_role": "guest",
"is_alive": True, "created_at": time_dt,
"updated_at": time_dt}
return query
from datetime import datetime, timedelta
from scripts.config import Secrets
from scripts.database.mongo.mongo_login import MongoUser
from scripts.database.redis.redis_conn import login_db
from scripts.utils.security.jwt_util import JWT
jwt = JWT()
mongo_user = MongoUser()
def create_token(
user_id,
ip,
age=Secrets.ACCESS_TOKEN_EXPIRE_MINUTES,
):
"""
This method is to create a cookie
"""
# creating the payload
payload = {"ip": ip, "user_id": user_id, "token": Secrets.SECRET_KEY, "age": age}
# getting the current time
current_time = datetime.now()
# generating the expiry time of the token
exp = current_time + timedelta(minutes=age)
# creating the dictionary with issuer and expiry time
_extras = {"iss": Secrets.issuer, "exp": exp}
_payload = payload | _extras
# encoding the token
new_token = jwt.encode(_payload)
# Add session to redis
login_db.set(user_id, new_token)
login_db.expire(user_id, timedelta(minutes=age))
# Add updated time to mongo db
mongo_user.update_user({"email": user_id}, {"updated_at": current_time})
return user_id
from fastapi import HTTPException, Request, Response, status
from fastapi.openapi.models import APIKey, APIKeyIn
from fastapi.security import APIKeyCookie
from fastapi.security.api_key import APIKeyBase
from pydantic import BaseModel, Field
from scripts.config import Services
from scripts.database.redis.redis_conn import login_db
from scripts.errors import ErrorMessages
from scripts.logging.logger import logger
from scripts.utils.security.jwt_util import JWT
class MetaInfoSchema(BaseModel):
user_id: str = ""
ip_address: str = ""
login_type: str = ""
login_token: str = Field(alias="login-token")
class Config:
allow_population_by_field_name = True
class _CookieAuthentication(APIKeyBase):
"""
Authentication backend using a cookie.
Internally, uses a JWT token to store the data.
"""
scheme: APIKeyCookie
cookie_name: str
cookie_secure: bool
def __init__(
self,
cookie_name: str = "login-token",
):
super().__init__()
self.model: APIKey = APIKey(**{"in": APIKeyIn.cookie}, name=cookie_name)
self.scheme_name = self.__class__.__name__
self.cookie_name = cookie_name
self.scheme = APIKeyCookie(name=self.cookie_name, auto_error=False)
self.login_redis = login_db
self.jwt = JWT()
def __call__(self, request: Request, response: Response) -> MetaInfoSchema:
# getting the cookie from the request
cookies = request.cookies
# checking if the request has valid cookie
login_token = cookies.get(self.cookie_name) or request.headers.get(
self.cookie_name
)
if not login_token or login_token != Services.PROJECT_NAME:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
# if the cookie name is same as the service name
if login_token == Services.PROJECT_NAME:
return MetaInfoSchema(
ip_address=request.client.host, # type: ignore
login_token=cookies.get("login-token"),
)
# getting the token stored in redis based on the cookie value
jwt_token = self.login_redis.get(login_token)
if not jwt_token:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
try:
# validating the token
decoded_token = self.jwt.validate(token=jwt_token)
if not decoded_token:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
except Exception as e:
logger.exception(e)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ErrorMessages.UNKNOWN_ERROR,
)
# checking if the token has necessary fields
user_id = decoded_token.get("user_id")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token doesn't have required fields",
)
return MetaInfoSchema(
user_id=user_id,
ip_address=request.client.host, # type: ignore
login_token=cookies.get("login-token"),
)
CookieAuthentication = auth = _CookieAuthentication()
import logging
import jwt
from scripts.config import Secrets
class JWT:
def __init__(self) -> None:
self.max_login_age: int = Secrets.ACCESS_TOKEN_EXPIRE_MINUTES
self.issuer: str = Secrets.issuer
self.alg: str = Secrets.ALGORITHM
self.key = Secrets.SECRET_KEY
# encoding the payload
def encode(self, payload) -> str:
try:
return jwt.encode(payload, self.key, algorithm=self.alg)
except Exception as e:
logging.exception(f"Exception while encoding JWT: {str(e)}")
raise
# decoding the payload
def decode(self, token):
try:
return jwt.decode(token, self.key, algorithms=self.alg)
except Exception as e:
logging.exception(f"Exception while encoding JWT: {str(e)}")
raise
# validate the payload
def validate(self, token):
try:
return jwt.decode(
token,
self.key,
algorithms=self.alg,
leeway=Secrets.leeway_in_minutes,
options={"require": ["exp", "iss"]},
)
except Exception as e:
logging.exception(f"Exception while validating JWT: {str(e)}")
import base64
from Cryptodome.Cipher import AES
from scripts.config import Secrets, Services
from scripts.logging.logger import logger
class DecryptPassword:
@staticmethod
def un_pad(s):
# un_padding the encrypted password
return s[:-ord(s[len(s) - 1:])]
def password_decrypt(self, password):
try:
# encoding the Key
key = Secrets.KEY_ENCRYPTION.encode(Services.ENCODING_TYPE)
# decoding the received password
enc = base64.b64decode(password)
# mode for the decryption
mode = AES.MODE_CBC
# getting the initialization vector
iv = enc[:AES.block_size]
# decoding with AES
cipher = AES.new(key, mode, iv)
# decoding the password
data = cipher.decrypt(enc[AES.block_size:])
if len(data) == 0:
raise ValueError("Decrypted data is empty")
# removing the padding to get the password
data = self.un_pad(data)
return data.decode(Services.ENCODING_TYPE)
except Exception as e:
logger.exception(e)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment