Commit 53073373 authored by arun.uday's avatar arun.uday

AssetManager- V1.0- Not reviewed

Updated cookie storage and authentication, completed user management(create, update, view and delete), create download API for dashboard
parent 2649adbb
......@@ -4,7 +4,7 @@ DB_NAME=userDB
REDIS_URI=redis://127.0.0.1:6379
REDIS_LOGIN_DB=10
SERVICE_HOST=127.0.0.1
SERVICE_HOST=0.0.0.0
SERVICE_PORT=8671
PROJECT_NAME=AssetManager
......@@ -13,3 +13,4 @@ BASE_PATH=scripts/
SUB_PATH=log/
ENCODING_TYPE=utf-8
KEY_ENCRYPTION=kliLensKLiLensKL
\ No newline at end of file
......@@ -37,7 +37,7 @@ class _PathConf:
class _Secrets(BaseSettings):
ACCESS_TOKEN_EXPIRE_MINUTES = 30
leeway_in_minutes: int = 10
KEY_ENCRYPTION = "kliLensKLiLensKL"
KEY_ENCRYPTION: str
issuer: str = "iotManager"
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
......
......@@ -8,6 +8,8 @@ class ApiEndPoints:
view: str = "/view"
update: str = "/update"
delete: str = "/delete"
header: str = "/header"
download: str = "/download"
# login-management
asset_manager_login: str = "/login"
......@@ -19,3 +21,8 @@ class ApiEndPoints:
asset_manager_user_view: str = asset_manager_user_management + view
asset_manager_user_update: str = asset_manager_user_management + update
asset_manager_user_delete: str = asset_manager_user_management + delete
# dashboard-management
asset_manager_dashboard: str = "/dashboard"
asset_manager_dashboard_download_header: str = asset_manager_dashboard + download + header
asset_manager_dashboard_download: str = asset_manager_dashboard + download
from fastapi.responses import JSONResponse
from fastapi import status
from scripts.schemas.default_responses import DefaultResponse
class DashboardManagement:
def __init__(self):
self.download_files = {
"SCN101-Manual (Local Config with IoTSetupUI)":
"https://ilens.io/DownloadFiles/SCN_Device_Configuration_Page_Updates_4_7.pdf",
"SCN101 Firmware 4.7 Updates":
"https://ilens.io/DownloadFiles/SCN101Manual.pdf",
"IoTSetupUI":
"https://ilens.io/DownloadFiles/IoTsetupUI-V1.6.zip",
"CP2102 SCN Windows USB Driver":
"https://ilens.io/DownloadFiles/CP2102_Windows.zip",
"SCNFirmwareBurner":
"https://ilens.io/DownloadFiles/flash_download_tool_v3.8.5.zip",
"SCN101A Firmware - V2.7 (Board: 1.4, 1.6 and 4MB Part.) ":
"https://ilens.io/DownloadFiles/SCN101_Firmware_V2_7_B1_6.bin",
"SCN101A Firmware - V2.8 (Board: 1.4, 1.6 and 4MB Part.) ":
"https://ilens.io/DownloadFiles/SCN101_Firmware_V2_8_B1_6.bin",
"SCN101A4G (SCN Relay, 4G, SCN201 and SCN101L) Firmware - V4.5 (16MB Part.) ":
"https://ilens.io/DownloadFiles/SCN101_R_A4G_SCN201_Firmware_V4_5.bin",
"SCN101A4G (SCN Relay, 4G, SCN201 and SCN101L) Firmware - V4.6 (16MB Part.) ":
"https://ilens.io/DownloadFiles/SCN101_R_A4G_SCN201_Firmware_V4_6.bin",
"SCN101A4G (SCN Relay, 4G, SCN201 and SCN101L) Firmware - V4.7 (16MB Part.) ":
"https://ilens.io/DownloadFiles/SCN101_R_A4G_SCN201_Firmware_V4_7.bin",
"P10_LED_Driver_V2.1 (Board 1.1)":
"https://ilens.io/DownloadFiles/P10_LED_Driver_V2.1.bin",
"SCN-LED Reset Firmware (Board 1.1)":
"https://ilens.io/DownloadFiles/SCN-LED_Reset.bin",
"SCN Reset Firmware (Partition: 4MB)":
"https://ilens.io/DownloadFiles/SCN_Reset_4MB_Part.bin",
"SCN Reset Firmware (Partition: 16MB)":
"https://ilens.io/DownloadFiles/SCN_Reset_16MB_Part.bin"
}
def download_header(self):
data = {
"actions": [
{
"class": "fa-download",
"action": "download",
"tooltip": "Download"
}
],
"column_defs": []
}
print(data["column_defs"])
column_values = {{"header_name": key, "field": key, "key": "file_name"} for key in self.download_files}
print(column_values)
data["column_defs"].append(column_values)
print(data)
return JSONResponse(
content=DefaultResponse(status="success", message="Fetched Successfully",
data=data).dict(),
status_code=status.HTTP_200_OK)
@staticmethod
def download_details():
return JSONResponse(
content=DefaultResponse(status="success", message="Fetched Successfully",
data=download_files).dict(),
status_code=status.HTTP_200_OK)
from fastapi import Response
from scripts.core.handlers.normal_login import NormalLogin
from fastapi.responses import JSONResponse
from fastapi import status
......@@ -16,36 +18,38 @@ class LoginHandlers:
self.login_type = "normal"
# decrypting the password from the UI
decrypted_password = self.pass_decrypt.password_decrypt(user_data.password)
# validating the received inputs empty or not
response = self.obj_login_handler.user_data_validation(
responses = self.obj_login_handler.user_data_validation(
user_data.email,
decrypted_password)
decrypted_password.split("\"")[1])
# Account is not registered
if response is not None:
return JSONResponse(content=DefaultFailureResponse(error=response).dict(),
status_code=status.HTTP_400_BAD_REQUEST)
if responses is not None:
return JSONResponse(content=DefaultFailureResponse(status="failed",
message=responses).dict(),
status_code=status.HTTP_200_OK)
# checking for the account and password matching
response, data = self.obj_login_handler.db_password_matching(self.login_type, user_data,
decrypted_password)
user_data_response, data = self.obj_login_handler.db_password_matching(self.login_type, user_data,
decrypted_password.split("\"")[1])
# if the passwords doesn't match with the db data
if response is not None:
return JSONResponse(content=DefaultFailureResponse(error=data).dict(),
status_code=status.HTTP_401_UNAUTHORIZED)
if user_data_response is not None:
return JSONResponse(content=DefaultFailureResponse(status="failed",
message=data).dict(),
status_code=status.HTTP_200_OK)
# generating the access tokens
response = self.obj_login_handler.generate_cookie_tokens(user_data, request)
responses, exp = self.obj_login_handler.generate_cookie_tokens(user_data, request)
# token generation unsuccessful
if response is None:
return JSONResponse(
content=DefaultFailureResponse(message="Access Unsuccessful",
error=ErrorMessages.ERROR_TOKEN_GENERATION).dict(),
status_code=status.HTTP_403_FORBIDDEN)
# sending successful response to UI
if responses is None:
return JSONResponse(
content=DefaultResponse(message="Login Successful", data=response).dict(),
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_TOKEN_GENERATION).dict(),
status_code=status.HTTP_200_OK)
# sending successful response to UI
response = JSONResponse(
content=DefaultResponse(status="success", message="Logged In Successfully", data=data).dict(),
status_code=status.HTTP_200_OK, headers={"Content-Type": "application/json"})
response.set_cookie(key="login-token", value=responses, expires=exp)
return response
# v1
def google_login(self, request):
......
......@@ -5,6 +5,7 @@ from datetime import datetime
from passlib.context import CryptContext
from validate_email import validate_email
from scripts.config import Secrets
from scripts.database.mongo.mongo_db import MongoUser
from scripts.errors import ErrorMessages
from scripts.logging.logger import logger
......@@ -24,10 +25,10 @@ class NormalLogin:
try:
# checking for valid username
if email == "" or validate_email(email) is not True:
return {"message": ErrorMessages.ERROR_INVALID_EMAIL, "data": email}
return ErrorMessages.ERROR_INVALID_EMAIL
# checking for valid password
if password == "":
return {"message": ErrorMessages.ERROR_INVALID_PASSWORD, "data": password}
return ErrorMessages.ERROR_INVALID_PASSWORD
return None
except Exception as e:
logger.exception(e)
......@@ -38,13 +39,11 @@ class NormalLogin:
self.db_user_data = MongoUser().fetch_one_user_details(email)
# if the user is not available
if not self.db_user_data:
return False, {"message": ErrorMessages.ERROR_UNAUTHORIZED_USER_LOGIN,
"data": {"username": email}}
return False, ErrorMessages.ERROR_UNAUTHORIZED_USER_LOGIN
# if the user is not registered through normal login
if self.db_user_data["login_type"] != login_type:
return False, {"message": ErrorMessages.ERROR_LOGIN_TYPE_INVALID,
"data": {"username": email, "Use Login": self.db_user_data["login_type"]}}
return False, ErrorMessages.ERROR_LOGIN_TYPE_INVALID
# if the user exist
return None, {"message": True}
except Exception as e:
......@@ -59,8 +58,7 @@ class NormalLogin:
return response, message
# if the user exists in db then password is matched
if not self.pwd_context.verify(password, self.db_user_data["password"]):
return False, {"message": ErrorMessages.ERROR_PASSWORD_MISMATCH,
"data": {"username": user_data.email}}
return False, ErrorMessages.ERROR_PASSWORD_MISMATCH
# if the password is correct
return None, {"username": user_data.email, "role": self.db_user_data["user_role"]}
except Exception as e:
......@@ -70,13 +68,14 @@ class NormalLogin:
def generate_cookie_tokens(user_data, request):
try:
# creating the access token
access_token = create_token(
access_token, exp = create_token(
user_id=user_data.email,
ip=request.ip_address
login_token=Secrets.SECRET_KEY,
ip=request.client.host
)
# returning the login token
if access_token:
return {"user_id": access_token, "token_type": "bearer"}
return access_token, exp
else:
return None
except Exception as e:
......
......@@ -3,6 +3,9 @@ import datetime
from scripts.database.mongo.mongo_db import MongoUser
from scripts.errors import ErrorMessages
from scripts.logging.logger import logger
from fastapi.responses import JSONResponse
from fastapi import status
from scripts.schemas.default_responses import DefaultResponse, DefaultFailureResponse
from scripts.utils.security.password_util import EncryptDecryptPassword
from scripts.utils.validations_util import UserDataValidations
......@@ -16,11 +19,11 @@ class UserManagement:
# for normal registration using email and password
def normal_register(self, user_data):
try:
response, message = UserDataValidations.data_validation(user_data, 'normal', self.method)
response, message = UserDataValidations.register_data_validation(user_data, 'normal', self.method)
if not response:
return message
# fetching the data based on the username
db_user_data = MongoUser().fetch_one_user_details(user_data.email)
db_user_data = obj_mongo_user.fetch_one_user_details(user_data.email)
# if the user is not available
if db_user_data:
return {"message": ErrorMessages.ERROR_EMAIL_EXIST,
......@@ -55,10 +58,84 @@ class UserManagement:
except Exception as e:
logger.exception(e)
# update user details
def update_user_details(self, email, update_data):
try:
self.method = "update"
db_user_data = obj_mongo_user.fetch_one_user_details(email)
# if the user is not available
if db_user_data is None:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_EMAIL_ID_DOESNT_EXIST).dict(),
status_code=status.HTTP_404_NOT_FOUND)
if update_data.email is not None:
db_user_data = obj_mongo_user.fetch_one_user_details(update_data.email)
# if the user is not available
if db_user_data is not None:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_EMAIL_EXIST).dict(),
status_code=status.HTTP_404_NOT_FOUND)
filter_data_updated = {"email": email}
update_data_removed = {key: value for key, value in update_data if value is not None}
response, message = UserDataValidations.update_data_validation(update_data)
if not response:
return message
response = obj_mongo_user.update_user(filter_data_updated, update_data_removed)
if not response:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_IN_UPDATING).dict(),
status_code=status.HTTP_200_OK)
return JSONResponse(
content=DefaultResponse(status="success", message="Updated Successfully",
data=update_data_removed).dict(),
status_code=status.HTTP_200_OK)
except Exception as e:
logger.exception(e)
# delete user
@staticmethod
def delete_user_details(email):
db_user_data = obj_mongo_user.fetch_one_user_details(email)
# if the user is not available
if db_user_data is None:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_EMAIL_ID_DOESNT_EXIST).dict(),
status_code=status.HTTP_404_NOT_FOUND)
filter_data_updated = {"email": email}
response = obj_mongo_user.delete_user(filter_data_updated)
if not response:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_IN_UPDATING).dict(),
status_code=status.HTTP_404_NOT_FOUND)
return JSONResponse(
content=DefaultResponse(status="success", message="Deleted Successfully",
data=filter_data_updated).dict(),
status_code=status.HTTP_200_OK)
@staticmethod
def fetch_user_details():
cursor_data = MongoUser().fetch_all_user_details()
try:
filter_data = {'_id': 0,
"login_type": 0,
"is_alive": 0,
"password": 0,
"created_at": 0,
"updated_at": 0}
cursor_data = obj_mongo_user.fetch_all_user_details({}, filter_data)
cursor_data_count = cursor_data.explain()
if cursor_data_count["executionStats"]["nReturned"] <= 0:
return None
list_user_data = []
for users in cursor_data:
list_user_data.append(users)
return list_user_data
return JSONResponse(
content=DefaultResponse(status="success", message="Fetched Successfully",
data=list_user_data).dict(),
status_code=status.HTTP_200_OK)
except Exception as e:
logger.exception(e)
......@@ -33,13 +33,8 @@ class MongoUser(CollectionBaseClass):
return user
return None
def fetch_all_user_details(self):
if user := self.find(query={}, filter_dict={'_id': 0,
"login_type": 0,
"is_alive": 0,
"password": 0,
"created_at": 0,
"updated_at": 0}):
def fetch_all_user_details(self, query, filter_data):
if user := self.find(query=query, filter_dict=filter_data):
return user
return None
......@@ -49,7 +44,13 @@ class MongoUser(CollectionBaseClass):
return None
# updating the login time
def update_user(self, update, query):
if user := self.update_one(query=update, data=query):
def update_user(self, query, update):
if user := self.update_one(query=query, data=update):
return user
return None
# deleting users
def delete_user(self, query):
if user := self.delete_one(query=query):
return user
return None
......@@ -8,10 +8,12 @@ class ErrorMessages:
ERROR_UNAUTHORIZED_ACCESS = "Your are not authorized to view this page"
ERROR_LOGIN_TYPE_INVALID = "Invalid Login Method"
ERROR_USER_NOT_REGISTERED = "Account is not registered in the portal."
ERROR_PASSWORD_MISMATCH = "Passwords Authentication Failed. Please enter the correct password"
ERROR_PASSWORD_MISMATCH = "Please enter the correct password"
ERROR_TOKEN_GENERATION = "Unsuccessful token generation"
ERROR_STORING_DATA = "New user registration failed"
ERROR_EMAIL_EXIST = "Email Id exists"
ERROR_IN_FETCHING = "Details cannot be fetched"
ERROR_IN_UPDATING = "Error in Updating"
# Data Validation
ERROR_INVALID_PASSWORD = "Invalid Password"
......@@ -19,3 +21,4 @@ class ErrorMessages:
ERROR_INVALID_EMAIL = "Invalid Email Id"
ERROR_INVALID_PHONE_NUMBER = "Invalid Phone Number"
ERROR_INVALID_USER_ROLE = "Invalid User Role"
ERROR_EMAIL_ID_DOESNT_EXIST = "Email Id doesn't exist"
This diff is collapsed.
......@@ -5,12 +5,12 @@ from pydantic import BaseModel
# default responses
class DefaultResponse(BaseModel):
status: bool = True
message: Optional[str]
status: str
message: str
data: Optional[Any]
# default failure responses
class DefaultFailureResponse(DefaultResponse):
status: bool = False
error: Any
class DefaultFailureResponse(BaseModel):
status: str
message: Any
......@@ -16,3 +16,10 @@ class RegistrationData(BaseModel):
phone_number: Optional[str]
login_type: str
user_role: str
class UserUpdate(BaseModel):
name: Optional[str] = None
email: Optional[str] = None
phone_number: Optional[str] = None
user_role: Optional[str] = None
from fastapi import APIRouter, HTTPException, status, Depends
from fastapi import APIRouter, HTTPException, status, Depends, Request
from fastapi.responses import JSONResponse
from scripts.constants.api import ApiEndPoints
from scripts.core.handlers.dashboard_handler import DashboardManagement
from scripts.core.handlers.login_handler import LoginHandlers
from scripts.core.handlers.user_management_handler import UserManagement
from scripts.errors import ErrorMessages
from scripts.logging.logger import logger
from scripts.schemas.project_schema import LoginRequest, RegistrationData
from scripts.schemas.default_responses import DefaultFailureResponse
from scripts.schemas.project_schema import LoginRequest, RegistrationData, UserUpdate
from scripts.utils.security.authorize_access import AuthorizeAccess
from scripts.utils.security.decorators import MetaInfoSchema, auth
......@@ -14,6 +17,7 @@ router = APIRouter(prefix=ApiEndPoints.version)
# initializing the handler
obj_login_handler = LoginHandlers()
obj_user_handler = UserManagement()
obj_dashboard_handler = DashboardManagement()
# login API
......@@ -21,7 +25,7 @@ obj_user_handler = UserManagement()
async def login_default(
login_type: str,
user_data: LoginRequest,
request: MetaInfoSchema = Depends(auth)
request: Request,
):
try:
# mapper for login types
......@@ -41,6 +45,10 @@ async def login_default(
except Exception as e:
logger.exception(e)
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.OP_FAILED).dict(),
status_code=status.HTTP_200_OK)
# Create new users API
......@@ -72,11 +80,17 @@ async def user_register(
except Exception as e:
logger.exception(e)
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.OP_FAILED).dict(),
status_code=status.HTTP_200_OK)
# Update users API
@router.post(ApiEndPoints.asset_manager_user_update)
async def user_register(
email: str,
update_data: UserUpdate,
request: MetaInfoSchema = Depends(auth)
):
try:
......@@ -85,13 +99,20 @@ async def user_register(
return HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS)
response = obj_user_handler.update_user_details(email, update_data)
return response
except Exception as e:
logger.exception(e)
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.OP_FAILED).dict(),
status_code=status.HTTP_200_OK)
# Delete users API
@router.post(ApiEndPoints.asset_manager_user_delete)
async def user_register(
email: str,
request: MetaInfoSchema = Depends(auth)
):
try:
......@@ -100,8 +121,14 @@ async def user_register(
return HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS)
response = obj_user_handler.delete_user_details(email)
return response
except Exception as e:
logger.exception(e)
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.OP_FAILED).dict(),
status_code=status.HTTP_200_OK)
# View users API
......@@ -115,6 +142,65 @@ async def user_register(
return HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS)
return obj_user_handler.fetch_user_details()
response = obj_user_handler.fetch_user_details()
if not response:
return HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ErrorMessages.ERROR_IN_FETCHING)
return response
except Exception as e:
logger.exception(e)
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.OP_FAILED).dict(),
status_code=status.HTTP_200_OK)
# download Button Dashboard header
@router.post(ApiEndPoints.asset_manager_dashboard_download_header)
async def dashboard_download(
request: MetaInfoSchema = Depends(auth)
):
try:
response = AuthorizeAccess().admin_authorize(request)
if not response:
return HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS)
response = obj_dashboard_handler.download_header()
if not response:
return HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ErrorMessages.ERROR_IN_FETCHING)
return response
except Exception as e:
logger.exception(e)
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.OP_FAILED).dict(),
status_code=status.HTTP_200_OK)
# download Button Dashboard
@router.post(ApiEndPoints.asset_manager_dashboard_download)
async def dashboard_download(
request: MetaInfoSchema = Depends(auth)
):
try:
response = AuthorizeAccess().admin_authorize(request)
if not response:
return HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS)
response = obj_dashboard_handler.download_details()
if not response:
return HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ErrorMessages.ERROR_IN_FETCHING)
return response
except Exception as e:
logger.exception(e)
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.OP_FAILED).dict(),
status_code=status.HTTP_200_OK)
......@@ -36,3 +36,13 @@ class RegexValidation:
return regex
except Exception as e:
logger.error(f'An Error While listing the home plans {str(e)}')
@staticmethod
def password_validation(password):
try:
password_regex = r'^(?=.*[a-z])(?=.*[A-Z])(?=.*[!@#$%^&*()_+={}[\]:;\"\'|,.<>\/?]).{8,15}$'
if re.search(password_regex, password):
return False
except Exception as e:
logger.error(f'An Error While listing the home plans {str(e)}')
from datetime import datetime, timedelta
import uuid
from datetime import datetime, timedelta, timezone
from scripts.config import Secrets
from scripts.database.mongo.mongo_db import MongoUser
......@@ -12,15 +13,19 @@ mongo_user = MongoUser()
def create_token(
user_id,
ip,
login_token=None,
age=Secrets.ACCESS_TOKEN_EXPIRE_MINUTES,
):
"""
This method is to create a cookie
"""
uid = login_token
if not uid:
uid = str(uuid.uuid4()).replace("-", "")
# creating the payload
payload = {"ip": ip, "user_id": user_id, "token": Secrets.SECRET_KEY, "age": age}
payload = {"ip": ip, "user_id": user_id, "token": Secrets.SECRET_KEY, "uid": uid, "age": age}
# getting the current time
current_time = datetime.now()
current_time = datetime.now(timezone.utc)
# generating the expiry time of the token
exp = current_time + timedelta(minutes=age)
# creating the dictionary with issuer and expiry time
......@@ -30,10 +35,8 @@ def create_token(
new_token = jwt.encode(_payload)
# Add session to redis
login_db.set(user_id, new_token)
login_db.expire(user_id, timedelta(minutes=age))
login_db.set(uid, new_token)
login_db.expire(uid, timedelta(minutes=age))
# Add updated time to mongo db
mongo_user.update_user({"email": user_id}, {"updated_at": current_time})
return user_id
return uid, exp
......@@ -6,7 +6,7 @@ obj_mongo_user = MongoUser()
class AuthorizeAccess:
@staticmethod
def admin_authorize(request):
user_data = obj_mongo_user.fetch_one_user_details(request.login_token)
user_data = obj_mongo_user.fetch_one_user_details(request.user_id)
if user_data["user_role"] != "super admin":
return False
return True
......@@ -4,7 +4,6 @@ from fastapi.security import APIKeyCookie
from fastapi.security.api_key import APIKeyBase
from pydantic import BaseModel, Field
from scripts.config import Services
from scripts.database.redis.redis_conn import login_db
from scripts.errors import ErrorMessages
from scripts.logging.logger import logger
......@@ -50,16 +49,8 @@ class _CookieAuthentication(APIKeyBase):
login_token = cookies.get(self.cookie_name) or request.headers.get(
self.cookie_name
)
if not login_token or login_token != Services.PROJECT_NAME:
if not login_token:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
# if the cookie name is same as the service name
if login_token == Services.PROJECT_NAME:
return MetaInfoSchema(
ip_address=request.client.host, # type: ignore
login_token=cookies.get("login-token"),
)
# getting the token stored in redis based on the cookie value
jwt_token = self.login_redis.get(login_token)
if not jwt_token:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
......@@ -83,7 +74,6 @@ class _CookieAuthentication(APIKeyBase):
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token doesn't have required fields",
)
return MetaInfoSchema(
user_id=user_id,
ip_address=request.client.host, # type: ignore
......
......@@ -8,7 +8,7 @@ from scripts.logging.logger import logger
class UserDataValidations:
@staticmethod
def data_validation(user_data, method, feature):
def register_data_validation(user_data, method, feature):
try:
if user_data.name == "":
return False, {"message": ErrorMessages.ERROR_INVALID_NAME,
......@@ -31,3 +31,16 @@ class UserDataValidations:
return True, None
except Exception as e:
logger.exception(e)
@staticmethod
def update_data_validation(user_data):
try:
if user_data.name == "":
return False, {"message": ErrorMessages.ERROR_INVALID_NAME,
"data": user_data.phone_number}
if user_data.user_role == "":
return False, {"message": ErrorMessages.ERROR_INVALID_USER_ROLE,
"data": user_data.phone_number}
return True, None
except Exception as e:
logger.exception(e)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment