Commit 2649adbb authored by arun.uday's avatar arun.uday

AssetManager-V1.0- Not reveiwed

Added user management API's, create users is completed but requires to add more data validations, update, delete and view API is created. Created the script for View but the response requires formatting.
parent 0f326899
......@@ -4,8 +4,8 @@ class ApiEndPoints:
# common
submit: str = "/submit"
create: str = "/create"
insert: str = "/insert"
add: str = "/add"
view: str = "/view"
update: str = "/update"
delete: str = "/delete"
......@@ -13,9 +13,9 @@ class ApiEndPoints:
asset_manager_login: str = "/login"
asset_manager_submit: str = asset_manager_login + submit
# user-registration
asset_manager_user_registration: str = "/register"
asset_manager_user_add: str = asset_manager_user_registration + create
asset_manager_user_update: str = asset_manager_user_registration + update
asset_manager_user_delete: str = asset_manager_user_registration + delete
# user-management
asset_manager_user_management: str = "/users"
asset_manager_user_add: str = asset_manager_user_management + add
asset_manager_user_view: str = asset_manager_user_management + view
asset_manager_user_update: str = asset_manager_user_management + update
asset_manager_user_delete: str = asset_manager_user_management + delete
class Validations:
email = "user@example.com"
......@@ -3,13 +3,13 @@ from fastapi.responses import JSONResponse
from fastapi import status
from scripts.errors import ErrorMessages
from scripts.schemas.default_responses import DefaultFailureResponse, DefaultResponse
from scripts.utils.security.password_decryption_util import DecryptPassword
from scripts.utils.security.password_util import EncryptDecryptPassword
class LoginHandlers:
def __init__(self):
self.obj_login_handler = NormalLogin()
self.pass_decrypt = DecryptPassword()
self.pass_decrypt = EncryptDecryptPassword()
self.login_type = ""
def normal_login(self, user_data, request):
......@@ -19,7 +19,7 @@ class LoginHandlers:
# validating the received inputs empty or not
response = self.obj_login_handler.user_data_validation(
user_data.username,
user_data.email,
decrypted_password)
# Account is not registered
......
......@@ -5,7 +5,7 @@ from datetime import datetime
from passlib.context import CryptContext
from validate_email import validate_email
from scripts.database.mongo.mongo_login import MongoUser
from scripts.database.mongo.mongo_db import MongoUser
from scripts.errors import ErrorMessages
from scripts.logging.logger import logger
from scripts.utils.security.apply_encrytion_util import create_token
......@@ -20,31 +20,31 @@ class NormalLogin:
self.time_dt = datetime.now()
@staticmethod
def user_data_validation(username, password) -> dict | None:
def user_data_validation(email, password) -> dict | None:
try:
# checking for valid username
if username == "" or username == "user@example.com" or validate_email(username) is not True:
return {"message": ErrorMessages.ERROR_INVALID_USERNAME, "data": username}
if email == "" or validate_email(email) is not True:
return {"message": ErrorMessages.ERROR_INVALID_EMAIL, "data": email}
# checking for valid password
if password == "" or password == "string":
if password == "":
return {"message": ErrorMessages.ERROR_INVALID_PASSWORD, "data": password}
return None
except Exception as e:
logger.exception(e)
def db_data_validation(self, login_type, username):
def db_data_validation(self, login_type, email):
try:
# fetching the data based on the username
self.db_user_data = MongoUser().fetch_user_details(username)
self.db_user_data = MongoUser().fetch_one_user_details(email)
# if the user is not available
if not self.db_user_data:
return False, {"message": ErrorMessages.ERROR_UNAUTHORIZED_USER_LOGIN,
"data": {"username": username}}
"data": {"username": email}}
# if the user is not registered through normal login
if self.db_user_data["login_type"] != login_type:
return False, {"message": ErrorMessages.ERROR_LOGIN_TYPE_INVALID,
"data": {"username": username, "Use Login": self.db_user_data["login_type"]}}
"data": {"username": email, "Use Login": self.db_user_data["login_type"]}}
# if the user exist
return None, {"message": True}
except Exception as e:
......@@ -53,16 +53,16 @@ class NormalLogin:
def db_password_matching(self, login_type, user_data, password):
try:
# getting the response after checking for the user data in db
response, message = self.db_data_validation(login_type, user_data.username)
response, message = self.db_data_validation(login_type, user_data.email)
# if the response is false then an error message is send back
if response is not None:
return response, message
# if the user exists in db then password is matched
if not self.pwd_context.verify(password, self.db_user_data["password"]):
return False, {"message": ErrorMessages.ERROR_PASSWORD_MISMATCH,
"data": {"username": user_data.username}}
"data": {"username": user_data.email}}
# if the password is correct
return None, {"username": user_data.username, "role": self.db_user_data["user_role"]}
return None, {"username": user_data.email, "role": self.db_user_data["user_role"]}
except Exception as e:
logger.exception(e)
......@@ -71,7 +71,7 @@ class NormalLogin:
try:
# creating the access token
access_token = create_token(
user_id=user_data.username,
user_id=user_data.email,
ip=request.ip_address
)
# returning the login token
......
import datetime
from scripts.database.mongo.mongo_db import MongoUser
from scripts.errors import ErrorMessages
from scripts.logging.logger import logger
from scripts.utils.security.password_util import EncryptDecryptPassword
from scripts.utils.validations_util import UserDataValidations
obj_mongo_user = MongoUser()
class UserManagement:
def __init__(self):
self.method = "register"
# for normal registration using email and password
def normal_register(self, user_data):
try:
response, message = UserDataValidations.data_validation(user_data, 'normal', self.method)
if not response:
return message
# fetching the data based on the username
db_user_data = MongoUser().fetch_one_user_details(user_data.email)
# if the user is not available
if db_user_data:
return {"message": ErrorMessages.ERROR_EMAIL_EXIST,
"data": {"username": user_data.email}}
created_at = datetime.datetime.now()
updated_at = datetime.datetime.now()
reg_time = {"created_at": created_at, "updated_at": updated_at}
user_data_dict = {key: (EncryptDecryptPassword().password_encrypt(value)
if key == "password" else value) for key, value in user_data}
user_data_reg = user_data_dict | reg_time
if not obj_mongo_user.insert_new_user(user_data_reg):
return {"message": ErrorMessages.ERROR_STORING_DATA,
"data": {"username": user_data_dict}}
return {"message": "New user registered",
"data": {"username": user_data_dict["email"]}}
except Exception as e:
logger.exception(e)
@staticmethod
# for Google registration using gmail
def google_register():
try:
return {"message": "Not available now"}
except Exception as e:
logger.exception(e)
@staticmethod
# for microsoft registration using microsoft account
def microsoft_register():
try:
return {"message": "Not available"}
except Exception as e:
logger.exception(e)
@staticmethod
def fetch_user_details():
cursor_data = MongoUser().fetch_all_user_details()
list_user_data = []
for users in cursor_data:
list_user_data.append(users)
return list_user_data
......@@ -28,11 +28,26 @@ class MongoUser(CollectionBaseClass):
super().__init__(mongo_client, Databases.DB_NAME, collection_name)
# fetching the user details based on the email id
def fetch_user_details(self, email):
def fetch_one_user_details(self, email):
if user := self.find_one(query={self.key_email: email}):
return user
return None
def fetch_all_user_details(self):
if user := self.find(query={}, filter_dict={'_id': 0,
"login_type": 0,
"is_alive": 0,
"password": 0,
"created_at": 0,
"updated_at": 0}):
return user
return None
def insert_new_user(self, data):
if user := self.insert_one(data=data):
return user
return None
# updating the login time
def update_user(self, update, query):
if user := self.update_one(query=update, data=query):
......
......@@ -4,10 +4,18 @@ class ErrorMessages:
# Authorization Errors
ERROR_AUTH_FAILED = "Authentication Failed. Please verify token"
ERROR_INVALID_LOGIN = "Your are not authorized to view this website."
ERROR_INVALID_USERNAME = "Invalid Username"
ERROR_INVALID_PASSWORD = "Invalid Password"
ERROR_UNAUTHORIZED_USER_LOGIN = "Account is not available"
ERROR_UNAUTHORIZED_ACCESS = "Your are not authorized to view this page"
ERROR_LOGIN_TYPE_INVALID = "Invalid Login Method"
ERROR_USER_NOT_REGISTERED = "Account is not registered in the portal."
ERROR_PASSWORD_MISMATCH = "Passwords Authentication Failed. Please enter the correct password"
ERROR_TOKEN_GENERATION = "Unsuccessful token generation"
ERROR_STORING_DATA = "New user registration failed"
ERROR_EMAIL_EXIST = "Email Id exists"
# Data Validation
ERROR_INVALID_PASSWORD = "Invalid Password"
ERROR_INVALID_NAME = "Invalid Name"
ERROR_INVALID_EMAIL = "Invalid Email Id"
ERROR_INVALID_PHONE_NUMBER = "Invalid Phone Number"
ERROR_INVALID_USER_ROLE = "Invalid User Role"
This diff is collapsed.
from typing import Union
from typing import Optional
from pydantic import BaseModel
# model for login request
class LoginRequest(BaseModel):
username: Union[str, None] = None
password: Union[str, None] = None
email: str
password: str
class RegistrationData:
name: Union[str, None] = None
email: Union[str: None] = None
password: Union[str: None] = None
user_role: Union[str: None] = None
is_alive: Union[bool: None] = None
created_at: Union[str: None] = None
class RegistrationData(BaseModel):
name: str
email: str
password: Optional[str]
phone_number: Optional[str]
login_type: str
user_role: str
......@@ -3,6 +3,6 @@ from scripts.constants.api import ApiEndPoints
from scripts.services.v1 import iot_manager_services
# creating the api router with version as the prefix
router = APIRouter(prefix=ApiEndPoints.asset_manager_login)
router = APIRouter()
# routing to the service api
router.include_router(iot_manager_services.router)
......@@ -2,17 +2,22 @@ from fastapi import APIRouter, HTTPException, status, Depends
from scripts.constants.api import ApiEndPoints
from scripts.core.handlers.login_handler import LoginHandlers
from scripts.core.handlers.user_management_handler import UserManagement
from scripts.errors import ErrorMessages
from scripts.logging.logger import logger
from scripts.schemas.project_schema import LoginRequest
from scripts.schemas.project_schema import LoginRequest, RegistrationData
from scripts.utils.security.authorize_access import AuthorizeAccess
from scripts.utils.security.decorators import MetaInfoSchema, auth
# creating the login api
router = APIRouter(prefix=ApiEndPoints.version)
# initializing the handler
obj_login_handler = LoginHandlers()
obj_user_handler = UserManagement()
@router.post(ApiEndPoints.asset_manager_login)
# login API
@router.post(ApiEndPoints.asset_manager_submit)
async def login_default(
login_type: str,
user_data: LoginRequest,
......@@ -38,9 +43,78 @@ async def login_default(
logger.exception(e)
@router.post(ApiEndPoints.asset_manager_user_registration)
# Create new users API
@router.post(ApiEndPoints.asset_manager_user_add)
async def user_register(
user_data: LoginRequest,
user_data: RegistrationData,
request: MetaInfoSchema = Depends(auth)
):
print(user_data, request)
try:
response = AuthorizeAccess().admin_authorize(request)
if not response:
return HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS)
# mapper for login types
register_mapper = {
"normal": obj_user_handler.normal_register,
"google": obj_user_handler.google_register,
"microsoft": obj_user_handler.microsoft_register
}
# getting the functions based on the login types
if user_data.login_type in register_mapper:
return register_mapper[user_data.login_type](user_data)
else:
return HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid Request")
except Exception as e:
logger.exception(e)
# Update users API
@router.post(ApiEndPoints.asset_manager_user_update)
async def user_register(
request: MetaInfoSchema = Depends(auth)
):
try:
response = AuthorizeAccess().admin_authorize(request)
if not response:
return HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS)
except Exception as e:
logger.exception(e)
# Delete users API
@router.post(ApiEndPoints.asset_manager_user_delete)
async def user_register(
request: MetaInfoSchema = Depends(auth)
):
try:
response = AuthorizeAccess().admin_authorize(request)
if not response:
return HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS)
except Exception as e:
logger.exception(e)
# View users API
@router.post(ApiEndPoints.asset_manager_user_view)
async def user_register(
request: MetaInfoSchema = Depends(auth)
):
try:
response = AuthorizeAccess().admin_authorize(request)
if not response:
return HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS)
return obj_user_handler.fetch_user_details()
except Exception as e:
logger.exception(e)
import re
from scripts.logging.logger import logger
class RegexValidation:
@staticmethod
def first_name_validation(first_name):
try:
regex = re.fullmatch('([A-Za-z ]{1,100})', str(first_name))
return regex
except Exception as e:
logger.error(f'An Error While listing the home plans {str(e)}')
@staticmethod
def name_validation(name):
try:
regex = re.fullmatch(r'^([A-Za-z]+)*$', str(name))
return regex
except Exception as e:
logger.error(f'An Error While listing the home plans {str(e)}')
@staticmethod
def phone_number_validation(phone_number):
try:
regex = re.fullmatch('([0-9]{1,20})', str(phone_number))
return regex
except Exception as e:
logger.error(f'An Error While listing the home plans {str(e)}')
@staticmethod
def email_validation(email):
try:
regex = r'\b[A-Za-z0-9._%+-]+@([A-Za-z0-9.-]{5,20})+\.[A-Z|a-z]{3}\b'
if re.fullmatch(regex, email):
return regex
except Exception as e:
logger.error(f'An Error While listing the home plans {str(e)}')
\ No newline at end of file
from datetime import datetime, timedelta
from scripts.config import Secrets
from scripts.database.mongo.mongo_login import MongoUser
from scripts.database.mongo.mongo_db import MongoUser
from scripts.database.redis.redis_conn import login_db
from scripts.utils.security.jwt_util import JWT
......
from scripts.database.mongo.mongo_db import MongoUser
obj_mongo_user = MongoUser()
class AuthorizeAccess:
@staticmethod
def admin_authorize(request):
user_data = obj_mongo_user.fetch_one_user_details(request.login_token)
if user_data["user_role"] != "super admin":
return False
return True
......@@ -51,15 +51,14 @@ class _CookieAuthentication(APIKeyBase):
self.cookie_name
)
if not login_token or login_token != Services.PROJECT_NAME:
if not login_token:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
# if the cookie name is same as the service name
if login_token == Services.PROJECT_NAME:
return MetaInfoSchema(
ip_address=request.client.host, # type: ignore
login_token=cookies.get("login-token"),
)
# getting the token stored in redis based on the cookie value
jwt_token = self.login_redis.get(login_token)
if not jwt_token:
......
import base64
from Cryptodome.Cipher import AES
from passlib.context import CryptContext
from scripts.config import Secrets, Services
from scripts.logging.logger import logger
class DecryptPassword:
class EncryptDecryptPassword:
def __init__(self):
self.pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
@staticmethod
def un_pad(s):
......@@ -32,3 +36,8 @@ class DecryptPassword:
return data.decode(Services.ENCODING_TYPE)
except Exception as e:
logger.exception(e)
def password_encrypt(self, password):
decrypted_password = self.password_decrypt(password)
hashed_password = self.pwd_context.hash(decrypted_password)
return hashed_password
from __future__ import annotations
from validate_email import validate_email
from scripts.errors import ErrorMessages
from scripts.logging.logger import logger
class UserDataValidations:
@staticmethod
def data_validation(user_data, method, feature):
try:
if user_data.name == "":
return False, {"message": ErrorMessages.ERROR_INVALID_NAME,
"data": user_data.phone_number}
# checking for valid username
if user_data.email == "" or validate_email(
user_data.email) is not True:
return False, {"message": ErrorMessages.ERROR_INVALID_EMAIL, "data": user_data.email}
# checking for valid password
if method == 'normal':
if user_data.password == "" or user_data.password == "string":
return False, {"message": ErrorMessages.ERROR_INVALID_PASSWORD, "data": user_data.password}
# Validate phone number
if user_data.phone_number == "":
return False, {"message": ErrorMessages.ERROR_INVALID_PHONE_NUMBER,
"data": user_data.phone_number}
if user_data.user_role == "" and method == 'normal' and feature == 'register':
return False, {"message": ErrorMessages.ERROR_INVALID_USER_ROLE,
"data": user_data.phone_number}
return True, None
except Exception as e:
logger.exception(e)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment