Commit a8343237 authored by arun.uday's avatar arun.uday

Merge branch '1-arun-uday-asset-manager-v1-0' into 'master'

AssetManager-V1.0- Not reveiwed

See merge request !3
parents 8f017c7c d68c6dda
......@@ -4,8 +4,8 @@ class ApiEndPoints:
# common
submit: str = "/submit"
create: str = "/create"
insert: str = "/insert"
add: str = "/add"
view: str = "/view"
update: str = "/update"
delete: str = "/delete"
......@@ -13,8 +13,9 @@ class ApiEndPoints:
asset_manager_login: str = "/login"
asset_manager_submit: str = asset_manager_login + submit
# user-registration
asset_manager_user_registration: str = "/register"
asset_manager_user_add: str = asset_manager_user_registration + create
asset_manager_user_update: str = asset_manager_user_registration + update
asset_manager_user_delete: str = asset_manager_user_registration + delete
# user-management
asset_manager_user_management: str = "/users"
asset_manager_user_add: str = asset_manager_user_management + add
asset_manager_user_view: str = asset_manager_user_management + view
asset_manager_user_update: str = asset_manager_user_management + update
asset_manager_user_delete: str = asset_manager_user_management + delete
class Validations:
email = "user@example.com"
......@@ -3,13 +3,13 @@ from fastapi.responses import JSONResponse
from fastapi import status
from scripts.errors import ErrorMessages
from scripts.schemas.default_responses import DefaultFailureResponse, DefaultResponse
from scripts.utils.security.password_decryption_util import DecryptPassword
from scripts.utils.security.password_util import EncryptDecryptPassword
class LoginHandlers:
def __init__(self):
self.obj_login_handler = NormalLogin()
self.pass_decrypt = DecryptPassword()
self.pass_decrypt = EncryptDecryptPassword()
self.login_type = ""
def normal_login(self, user_data, request):
......@@ -19,7 +19,7 @@ class LoginHandlers:
# validating the received inputs empty or not
response = self.obj_login_handler.user_data_validation(
user_data.username,
user_data.email,
decrypted_password)
# Account is not registered
......
......@@ -5,7 +5,7 @@ from datetime import datetime
from passlib.context import CryptContext
from validate_email import validate_email
from scripts.database.mongo.mongo_login import MongoUser
from scripts.database.mongo.mongo_db import MongoUser
from scripts.errors import ErrorMessages
from scripts.logging.logger import logger
from scripts.utils.security.apply_encrytion_util import create_token
......@@ -20,31 +20,31 @@ class NormalLogin:
self.time_dt = datetime.now()
@staticmethod
def user_data_validation(username, password) -> dict | None:
def user_data_validation(email, password) -> dict | None:
try:
# checking for valid username
if username == "" or username == "user@example.com" or validate_email(username) is not True:
return {"message": ErrorMessages.ERROR_INVALID_USERNAME, "data": username}
if email == "" or validate_email(email) is not True:
return {"message": ErrorMessages.ERROR_INVALID_EMAIL, "data": email}
# checking for valid password
if password == "" or password == "string":
if password == "":
return {"message": ErrorMessages.ERROR_INVALID_PASSWORD, "data": password}
return None
except Exception as e:
logger.exception(e)
def db_data_validation(self, login_type, username):
def db_data_validation(self, login_type, email):
try:
# fetching the data based on the username
self.db_user_data = MongoUser().fetch_user_details(username)
self.db_user_data = MongoUser().fetch_one_user_details(email)
# if the user is not available
if not self.db_user_data:
return False, {"message": ErrorMessages.ERROR_UNAUTHORIZED_USER_LOGIN,
"data": {"username": username}}
"data": {"username": email}}
# if the user is not registered through normal login
if self.db_user_data["login_type"] != login_type:
return False, {"message": ErrorMessages.ERROR_LOGIN_TYPE_INVALID,
"data": {"username": username, "Use Login": self.db_user_data["login_type"]}}
"data": {"username": email, "Use Login": self.db_user_data["login_type"]}}
# if the user exist
return None, {"message": True}
except Exception as e:
......@@ -53,16 +53,16 @@ class NormalLogin:
def db_password_matching(self, login_type, user_data, password):
try:
# getting the response after checking for the user data in db
response, message = self.db_data_validation(login_type, user_data.username)
response, message = self.db_data_validation(login_type, user_data.email)
# if the response is false then an error message is send back
if response is not None:
return response, message
# if the user exists in db then password is matched
if not self.pwd_context.verify(password, self.db_user_data["password"]):
return False, {"message": ErrorMessages.ERROR_PASSWORD_MISMATCH,
"data": {"username": user_data.username}}
"data": {"username": user_data.email}}
# if the password is correct
return None, {"username": user_data.username, "role": self.db_user_data["user_role"]}
return None, {"username": user_data.email, "role": self.db_user_data["user_role"]}
except Exception as e:
logger.exception(e)
......@@ -71,7 +71,7 @@ class NormalLogin:
try:
# creating the access token
access_token = create_token(
user_id=user_data.username,
user_id=user_data.email,
ip=request.ip_address
)
# returning the login token
......
import datetime
from scripts.database.mongo.mongo_db import MongoUser
from scripts.errors import ErrorMessages
from scripts.logging.logger import logger
from scripts.utils.security.password_util import EncryptDecryptPassword
from scripts.utils.validations_util import UserDataValidations
obj_mongo_user = MongoUser()
class UserManagement:
def __init__(self):
self.method = "register"
# for normal registration using email and password
def normal_register(self, user_data):
try:
response, message = UserDataValidations.data_validation(user_data, 'normal', self.method)
if not response:
return message
# fetching the data based on the username
db_user_data = MongoUser().fetch_one_user_details(user_data.email)
# if the user is not available
if db_user_data:
return {"message": ErrorMessages.ERROR_EMAIL_EXIST,
"data": {"username": user_data.email}}
created_at = datetime.datetime.now()
updated_at = datetime.datetime.now()
reg_time = {"created_at": created_at, "updated_at": updated_at}
user_data_dict = {key: (EncryptDecryptPassword().password_encrypt(value)
if key == "password" else value) for key, value in user_data}
user_data_reg = user_data_dict | reg_time
if not obj_mongo_user.insert_new_user(user_data_reg):
return {"message": ErrorMessages.ERROR_STORING_DATA,
"data": {"username": user_data_dict}}
return {"message": "New user registered",
"data": {"username": user_data_dict["email"]}}
except Exception as e:
logger.exception(e)
@staticmethod
# for Google registration using gmail
def google_register():
try:
return {"message": "Not available now"}
except Exception as e:
logger.exception(e)
@staticmethod
# for microsoft registration using microsoft account
def microsoft_register():
try:
return {"message": "Not available"}
except Exception as e:
logger.exception(e)
@staticmethod
def fetch_user_details():
cursor_data = MongoUser().fetch_all_user_details()
list_user_data = []
for users in cursor_data:
list_user_data.append(users)
return list_user_data
......@@ -28,11 +28,26 @@ class MongoUser(CollectionBaseClass):
super().__init__(mongo_client, Databases.DB_NAME, collection_name)
# fetching the user details based on the email id
def fetch_user_details(self, email):
def fetch_one_user_details(self, email):
if user := self.find_one(query={self.key_email: email}):
return user
return None
def fetch_all_user_details(self):
if user := self.find(query={}, filter_dict={'_id': 0,
"login_type": 0,
"is_alive": 0,
"password": 0,
"created_at": 0,
"updated_at": 0}):
return user
return None
def insert_new_user(self, data):
if user := self.insert_one(data=data):
return user
return None
# updating the login time
def update_user(self, update, query):
if user := self.update_one(query=update, data=query):
......
......@@ -4,10 +4,18 @@ class ErrorMessages:
# Authorization Errors
ERROR_AUTH_FAILED = "Authentication Failed. Please verify token"
ERROR_INVALID_LOGIN = "Your are not authorized to view this website."
ERROR_INVALID_USERNAME = "Invalid Username"
ERROR_INVALID_PASSWORD = "Invalid Password"
ERROR_UNAUTHORIZED_USER_LOGIN = "Account is not available"
ERROR_UNAUTHORIZED_ACCESS = "Your are not authorized to view this page"
ERROR_LOGIN_TYPE_INVALID = "Invalid Login Method"
ERROR_USER_NOT_REGISTERED = "Account is not registered in the portal."
ERROR_PASSWORD_MISMATCH = "Passwords Authentication Failed. Please enter the correct password"
ERROR_TOKEN_GENERATION = "Unsuccessful token generation"
ERROR_STORING_DATA = "New user registration failed"
ERROR_EMAIL_EXIST = "Email Id exists"
# Data Validation
ERROR_INVALID_PASSWORD = "Invalid Password"
ERROR_INVALID_NAME = "Invalid Name"
ERROR_INVALID_EMAIL = "Invalid Email Id"
ERROR_INVALID_PHONE_NUMBER = "Invalid Phone Number"
ERROR_INVALID_USER_ROLE = "Invalid User Role"
This diff is collapsed.
from typing import Union
from typing import Optional
from pydantic import BaseModel
# model for login request
class LoginRequest(BaseModel):
username: str
email: str
password: str
class RegistrationData:
class RegistrationData(BaseModel):
name: str
email: str
password: Optional[str]
phone_number: Optional[str]
login_type: str
user_role: str
......@@ -2,16 +2,21 @@ from fastapi import APIRouter, HTTPException, status, Depends
from scripts.constants.api import ApiEndPoints
from scripts.core.handlers.login_handler import LoginHandlers
from scripts.core.handlers.user_management_handler import UserManagement
from scripts.errors import ErrorMessages
from scripts.logging.logger import logger
from scripts.schemas.project_schema import LoginRequest
from scripts.schemas.project_schema import LoginRequest, RegistrationData
from scripts.utils.security.authorize_access import AuthorizeAccess
from scripts.utils.security.decorators import MetaInfoSchema, auth
# creating the login api
router = APIRouter(prefix=ApiEndPoints.version)
# initializing the handler
obj_login_handler = LoginHandlers()
obj_user_handler = UserManagement()
# login API
@router.post(ApiEndPoints.asset_manager_submit)
async def login_default(
login_type: str,
......@@ -38,9 +43,78 @@ async def login_default(
logger.exception(e)
@router.post(ApiEndPoints.asset_manager_user_registration)
# Create new users API
@router.post(ApiEndPoints.asset_manager_user_add)
async def user_register(
user_data: LoginRequest,
user_data: RegistrationData,
request: MetaInfoSchema = Depends(auth)
):
print(user_data, request)
try:
response = AuthorizeAccess().admin_authorize(request)
if not response:
return HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS)
# mapper for login types
register_mapper = {
"normal": obj_user_handler.normal_register,
"google": obj_user_handler.google_register,
"microsoft": obj_user_handler.microsoft_register
}
# getting the functions based on the login types
if user_data.login_type in register_mapper:
return register_mapper[user_data.login_type](user_data)
else:
return HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid Request")
except Exception as e:
logger.exception(e)
# Update users API
@router.post(ApiEndPoints.asset_manager_user_update)
async def user_register(
request: MetaInfoSchema = Depends(auth)
):
try:
response = AuthorizeAccess().admin_authorize(request)
if not response:
return HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS)
except Exception as e:
logger.exception(e)
# Delete users API
@router.post(ApiEndPoints.asset_manager_user_delete)
async def user_register(
request: MetaInfoSchema = Depends(auth)
):
try:
response = AuthorizeAccess().admin_authorize(request)
if not response:
return HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS)
except Exception as e:
logger.exception(e)
# View users API
@router.post(ApiEndPoints.asset_manager_user_view)
async def user_register(
request: MetaInfoSchema = Depends(auth)
):
try:
response = AuthorizeAccess().admin_authorize(request)
if not response:
return HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS)
return obj_user_handler.fetch_user_details()
except Exception as e:
logger.exception(e)
import re
from scripts.logging.logger import logger
class RegexValidation:
@staticmethod
def first_name_validation(first_name):
try:
regex = re.fullmatch('([A-Za-z ]{1,100})', str(first_name))
return regex
except Exception as e:
logger.error(f'An Error While listing the home plans {str(e)}')
@staticmethod
def name_validation(name):
try:
regex = re.fullmatch(r'^([A-Za-z]+)*$', str(name))
return regex
except Exception as e:
logger.error(f'An Error While listing the home plans {str(e)}')
@staticmethod
def phone_number_validation(phone_number):
try:
regex = re.fullmatch('([0-9]{1,20})', str(phone_number))
return regex
except Exception as e:
logger.error(f'An Error While listing the home plans {str(e)}')
@staticmethod
def email_validation(email):
try:
regex = r'\b[A-Za-z0-9._%+-]+@([A-Za-z0-9.-]{5,20})+\.[A-Z|a-z]{3}\b'
if re.fullmatch(regex, email):
return regex
except Exception as e:
logger.error(f'An Error While listing the home plans {str(e)}')
\ No newline at end of file
from datetime import datetime, timedelta
from scripts.config import Secrets
from scripts.database.mongo.mongo_login import MongoUser
from scripts.database.mongo.mongo_db import MongoUser
from scripts.database.redis.redis_conn import login_db
from scripts.utils.security.jwt_util import JWT
......
from scripts.database.mongo.mongo_db import MongoUser
obj_mongo_user = MongoUser()
class AuthorizeAccess:
@staticmethod
def admin_authorize(request):
user_data = obj_mongo_user.fetch_one_user_details(request.login_token)
if user_data["user_role"] != "super admin":
return False
return True
......@@ -51,15 +51,14 @@ class _CookieAuthentication(APIKeyBase):
self.cookie_name
)
if not login_token or login_token != Services.PROJECT_NAME:
if not login_token:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
# if the cookie name is same as the service name
if login_token == Services.PROJECT_NAME:
return MetaInfoSchema(
ip_address=request.client.host, # type: ignore
login_token=cookies.get("login-token"),
)
# getting the token stored in redis based on the cookie value
jwt_token = self.login_redis.get(login_token)
if not jwt_token:
......
import base64
from Cryptodome.Cipher import AES
from passlib.context import CryptContext
from scripts.config import Secrets, Services
from scripts.logging.logger import logger
class DecryptPassword:
class EncryptDecryptPassword:
def __init__(self):
self.pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
@staticmethod
def un_pad(s):
......@@ -32,3 +36,8 @@ class DecryptPassword:
return data.decode(Services.ENCODING_TYPE)
except Exception as e:
logger.exception(e)
def password_encrypt(self, password):
decrypted_password = self.password_decrypt(password)
hashed_password = self.pwd_context.hash(decrypted_password)
return hashed_password
from __future__ import annotations
from validate_email import validate_email
from scripts.errors import ErrorMessages
from scripts.logging.logger import logger
class UserDataValidations:
@staticmethod
def data_validation(user_data, method, feature):
try:
if user_data.name == "":
return False, {"message": ErrorMessages.ERROR_INVALID_NAME,
"data": user_data.phone_number}
# checking for valid username
if user_data.email == "" or validate_email(
user_data.email) is not True:
return False, {"message": ErrorMessages.ERROR_INVALID_EMAIL, "data": user_data.email}
# checking for valid password
if method == 'normal':
if user_data.password == "" or user_data.password == "string":
return False, {"message": ErrorMessages.ERROR_INVALID_PASSWORD, "data": user_data.password}
# Validate phone number
if user_data.phone_number == "":
return False, {"message": ErrorMessages.ERROR_INVALID_PHONE_NUMBER,
"data": user_data.phone_number}
if user_data.user_role == "" and method == 'normal' and feature == 'register':
return False, {"message": ErrorMessages.ERROR_INVALID_USER_ROLE,
"data": user_data.phone_number}
return True, None
except Exception as e:
logger.exception(e)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment