Commit 50154cc1 authored by arun.uday's avatar arun.uday

test Commit

parent eb668bf5
MONGO_URI=mongodb://localhost:27017
DB_NAME=userDB
REDIS_URI=redis://127.0.0.1:6379
REDIS_LOGIN_DB=10
SERVICE_HOST=0.0.0.0
SERVICE_PORT=8671
......
......@@ -3,7 +3,7 @@
Author: Arun Uday
Email: arun.uday@knowledgelens.com
Asset Manager Login For Normal User Login
Asset Manager Login / Registration
---------------------------------------------------------
"""
......
......@@ -9,3 +9,4 @@ email-validator~=1.3.1
pycryptodomex~=3.17
PyJWT~=2.6.0
validate_email~=1.3
redis~=4.5.2
\ No newline at end of file
......@@ -17,15 +17,13 @@ class _Services(BaseSettings):
CORS_ALLOW_HEADERS: list[str] = ["*"]
LOG_LEVEL: Literal["INFO", "DEBUG", "ERROR", "QTRACE"] = "INFO"
ENABLE_FILE_LOGGING: bool = False
KEY_ENCRYPTION = "kliLensKLiLensKL"
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
class _Databases(BaseSettings):
MONGO_URI: str
DB_NAME: str
REDIS_URI: str
REDIS_LOGIN_DB: int
class _BasePathConf(BaseSettings):
......@@ -37,12 +35,23 @@ class _PathConf:
LOG_PATH: pathlib.Path = BASE_PATH / "log/"
class _Secrets(BaseSettings):
ACCESS_TOKEN_EXPIRE_MINUTES = 30
leeway_in_minutes: int = 10
KEY_ENCRYPTION = "kliLensKLiLensKL"
issuer: str = "iotManager"
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
Services = _Services()
Databases = _Databases()
PathConf = _PathConf()
Secrets = _Secrets()
__all__ = [
"Services",
"Databases",
"PathConf",
"Secrets"
]
......@@ -9,7 +9,7 @@ class LoginHandlers:
def __init__(self):
self.obj_login_handler = NormalLogin()
def normal_login(self, login_data):
def normal_login(self, login_data, request):
# decrypting the password from the UI
decrypted_password = self.obj_login_handler.password_decrypt(login_data.payload["password"])
# validating the received inputs empty or not
......@@ -27,7 +27,7 @@ class LoginHandlers:
return JSONResponse(content=DefaultFailureResponse(error=data).dict(),
status_code=status.HTTP_401_UNAUTHORIZED)
# generating the access tokens
response = self.obj_login_handler.generate_tokens(login_data.payload, data)
response = self.obj_login_handler.generate_cookie_tokens(login_data.payload, request)
# token generation unsuccessful
if response is None:
return JSONResponse(
......
from __future__ import annotations
import base64
from datetime import datetime, timedelta
from datetime import datetime
import jwt
from Cryptodome.Cipher import AES
from passlib.context import CryptContext
from validate_email import validate_email
from scripts.config import Services
from scripts.config import *
from scripts.database.mongo.mongo_login import MongoUser
from scripts.database.redis.redis_conn import login_db
from scripts.errors import ErrorMessages
from scripts.logging.logger import logger
from scripts.utils.security.apply_encrytion_util import create_token
class NormalLogin:
......@@ -31,7 +32,7 @@ class NormalLogin:
def password_decrypt(self, password):
try:
# encoding the Key
key = Services.KEY_ENCRYPTION.encode(Services.ENCODING_TYPE)
key = Secrets.KEY_ENCRYPTION.encode(Services.ENCODING_TYPE)
# decoding the received password
enc = base64.b64decode(password)
# mode for the decryption
......@@ -46,7 +47,7 @@ class NormalLogin:
raise ValueError("Decrypted data is empty")
# removing the padding to get the password
data = self.un_pad(data)
return data.decode('utf-8')
return data.decode(Services.ENCODING_TYPE)
except Exception as e:
logger.exception(e)
......@@ -96,33 +97,16 @@ class NormalLogin:
logger.exception(e)
@staticmethod
def create_access_token(data: dict, expires_delta):
def generate_cookie_tokens(login_data, request):
try:
# creating a copy of the data
to_encode = data.copy()
# checking if the expires_delta is empty
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
# updating the data with the expiration time
to_encode.update({"expire": expire.isoformat()})
# creating the token
encoded_jwt = jwt.encode(to_encode, Services.SECRET_KEY, algorithm=Services.ALGORITHM)
return encoded_jwt
except Exception as e:
logger.exception(e)
def generate_tokens(self, login_data, data):
try:
# creating the expiration time
access_token_expires = timedelta(minutes=Services.ACCESS_TOKEN_EXPIRE_MINUTES)
# creating the access token
access_token = self.create_access_token(
data={"username": login_data["username"], "role": data["role"]}, expires_delta=access_token_expires
access_token = create_token(
user_id=login_data["username"],
ip=request.ip_address,
project_id=Services.PROJECT_ID,
)
if access_token:
return {"access_token": access_token, "token_type": "bearer"}
return {"user_id": access_token, "token_type": "bearer"}
else:
return None
except Exception as e:
......
import redis
from scripts.config import Databases
redis_uri = Databases.REDIS_URI
login_db = redis.from_url(
redis_uri, db=int(Databases.REDIS_LOGIN_DB), decode_responses=True
)
This diff is collapsed.
from fastapi import APIRouter, HTTPException, status
from typing import Union
from fastapi import APIRouter, HTTPException, status, Depends
from scripts.constants.api import ApiEndPoints
from scripts.core.handlers.login_handler import LoginHandlers
from scripts.logging.logger import logger
from scripts.schemas.login_schema import LoginRequest
from scripts.utils.security.decorators import MetaInfoSchema, auth
# creating the login api
router = APIRouter(prefix=ApiEndPoints.version)
......@@ -11,20 +14,22 @@ router = APIRouter(prefix=ApiEndPoints.version)
obj_login_handler = LoginHandlers()
@router.routes(ApiEndPoints.asset_manager_submit)
async def login_default(request: LoginRequest):
@router.post(ApiEndPoints.asset_manager_submit)
async def login_default(
user_data: LoginRequest, request: MetaInfoSchema = Depends(auth)
):
try:
# v1
if request.login_type == "normal":
return obj_login_handler.normal_login(request)
if user_data.login_type == "normal":
return obj_login_handler.normal_login(user_data, request)
# v1
elif request.login_type == "google":
return obj_login_handler.google_login(request)
elif user_data.login_type == "google":
return obj_login_handler.google_login(user_data)
# v2
elif request.login_type == "microsoft":
return obj_login_handler.microsoft_login(request)
elif user_data.login_type == "microsoft":
return obj_login_handler.microsoft_login(user_data)
else:
return HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid Request")
......@@ -32,6 +37,7 @@ async def login_default(request: LoginRequest):
logger.exception(e)
@router.routes(ApiEndPoints.asset_manager_user_registration)
# TODO user register
@router.post(ApiEndPoints.asset_manager_user_registration)
async def user_register():
return {"message": "Available soon"}
import uuid
from datetime import datetime, timedelta, timezone
from scripts.config import Secrets
from scripts.database.redis.redis_conn import login_db
from scripts.utils.security.jwt_util import JWT
jwt = JWT()
def create_token(
user_id,
ip,
age=Secrets.ACCESS_TOKEN_EXPIRE_MINUTES,
project_id=None,
):
"""
This method is to create a cookie
"""
payload = {"ip": ip, "user_id": user_id, "token": Secrets.SECRET_KEY, "age": age}
if project_id:
payload["project_id"] = project_id
exp = datetime.now() + timedelta(minutes=age)
_extras = {"iss": Secrets.issuer, "exp": exp}
_payload = payload | _extras
new_token = jwt.encode(_payload)
# Add session to redis
login_db.set(user_id, new_token)
login_db.expire(user_id, timedelta(minutes=age))
return user_id
from fastapi import HTTPException, Request, Response, status
from fastapi.openapi.models import APIKey, APIKeyIn
from fastapi.security import APIKeyCookie
from fastapi.security.api_key import APIKeyBase
from pydantic import BaseModel, Field
from scripts.config import Services
from scripts.database.redis.redis_conn import login_db
from scripts.errors import ErrorMessages
from scripts.logging.logger import logger
from scripts.utils.security.jwt_util import JWT
class MetaInfoSchema(BaseModel):
project_id: str = ""
user_id: str = ""
ip_address: str = ""
login_type: str = ""
login_token: str = Field(alias="login-token")
class Config:
allow_population_by_field_name = True
class _CookieAuthentication(APIKeyBase):
"""
Authentication backend using a cookie.
Internally, uses a JWT token to store the data.
"""
scheme: APIKeyCookie
cookie_name: str
cookie_secure: bool
def __init__(
self,
cookie_name: str = "login-token",
):
super().__init__()
self.model: APIKey = APIKey(**{"in": APIKeyIn.cookie}, name=cookie_name)
self.scheme_name = self.__class__.__name__
self.cookie_name = cookie_name
self.scheme = APIKeyCookie(name=self.cookie_name, auto_error=False)
self.login_redis = login_db
self.jwt = JWT()
def __call__(self, request: Request, response: Response) -> MetaInfoSchema:
cookies = request.cookies
login_token = cookies.get(self.cookie_name) or request.headers.get(
self.cookie_name
)
if not login_token or login_token != Services.PROJECT_NAME:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
if login_token == Services.PROJECT_NAME:
return MetaInfoSchema(
project_id=Services.PROJECT_ID,
ip_address=request.client.host, # type: ignore
login_token=cookies.get("login-token"),
)
jwt_token = self.login_redis.get(login_token)
if not jwt_token:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
try:
decoded_token = self.jwt.validate(token=jwt_token)
if not decoded_token:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
except Exception as e:
logger.exception(e)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ErrorMessages.UNKNOWN_ERROR,
)
user_id = decoded_token.get("user_id")
project_id = decoded_token.get("project_id")
if not user_id or not project_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token doesn't have required fields",
)
return MetaInfoSchema(
project_id=project_id,
user_id=user_id,
ip_address=request.client.host, # type: ignore
login_token=cookies.get("login-token"),
)
CookieAuthentication = auth = _CookieAuthentication()
import logging
import jwt
from scripts.config import Secrets
class JWT:
def __init__(self) -> None:
self.max_login_age: int = Secrets.ACCESS_TOKEN_EXPIRE_MINUTES
# self.issuer: str = Secrets.issuer
self.alg: str = Secrets.ALGORITHM
self.key = Secrets.SECRET_KEY
def encode(self, payload) -> str:
try:
return jwt.encode(payload, self.key, algorithm=self.alg)
except Exception as e:
logging.exception(f"Exception while encoding JWT: {str(e)}")
raise
def decode(self, token):
try:
return jwt.decode(token, self.key, algorithms=self.alg)
except Exception as e:
logging.exception(f"Exception while encoding JWT: {str(e)}")
raise
def validate(self, token):
try:
return jwt.decode(
token,
self.key,
algorithms=self.alg
)
except Exception as e:
logging.exception(f"Exception while validating JWT: {str(e)}")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment