Commit 03f4f9a7 authored by arun.uday's avatar arun.uday

AssetManager-V1.0- Not Reviewed

Created download api for the download button, changed the login api , added API's for forgot password and user data filter, changed response data for UI display.
parent 53073373
...@@ -13,4 +13,12 @@ BASE_PATH=scripts/ ...@@ -13,4 +13,12 @@ BASE_PATH=scripts/
SUB_PATH=log/ SUB_PATH=log/
ENCODING_TYPE=utf-8 ENCODING_TYPE=utf-8
KEY_ENCRYPTION=kliLensKLiLensKL KEY_ENCRYPTION=kliLensKLiLensKL
\ No newline at end of file SECRET_KEY=09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7
EMAIL_SENDER=arun.send.email@gmail.com
EMAIL_SMTP=smtp.gmail.com
EMAIL_PORT=465
EMAIL_PASSWORD=gpphuiweedqukchf
HTML_LINK=scripts/utils/link_email.html
RESET_ENDPOINT=http://localhost:8671/v1/login/reset?token
\ No newline at end of file
...@@ -16,6 +16,12 @@ class _Services(BaseSettings): ...@@ -16,6 +16,12 @@ class _Services(BaseSettings):
CORS_ALLOW_HEADERS: list[str] = ["*"] CORS_ALLOW_HEADERS: list[str] = ["*"]
LOG_LEVEL: Literal["INFO", "DEBUG", "ERROR", "QTRACE"] = "INFO" LOG_LEVEL: Literal["INFO", "DEBUG", "ERROR", "QTRACE"] = "INFO"
ENABLE_FILE_LOGGING: bool = False ENABLE_FILE_LOGGING: bool = False
EMAIL_SENDER: str
EMAIL_SMTP: str
EMAIL_PORT: int
EMAIL_PASSWORD: str
HTML_LINK: str
RESET_ENDPOINT: str
class _Databases(BaseSettings): class _Databases(BaseSettings):
...@@ -36,10 +42,11 @@ class _PathConf: ...@@ -36,10 +42,11 @@ class _PathConf:
class _Secrets(BaseSettings): class _Secrets(BaseSettings):
ACCESS_TOKEN_EXPIRE_MINUTES = 30 ACCESS_TOKEN_EXPIRE_MINUTES = 30
TOKEN_EXPIRE_TIME = 5
leeway_in_minutes: int = 10 leeway_in_minutes: int = 10
KEY_ENCRYPTION: str KEY_ENCRYPTION: str
issuer: str = "iotManager" issuer: str = "iotManager"
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" SECRET_KEY: str
ALGORITHM = "HS256" ALGORITHM = "HS256"
......
...@@ -10,10 +10,15 @@ class ApiEndPoints: ...@@ -10,10 +10,15 @@ class ApiEndPoints:
delete: str = "/delete" delete: str = "/delete"
header: str = "/header" header: str = "/header"
download: str = "/download" download: str = "/download"
search: str = "/search"
forgot: str = "/forgot"
reset: str = "/reset"
# login-management # login-management
asset_manager_login: str = "/login" asset_manager_login: str = "/login"
asset_manager_submit: str = asset_manager_login + submit asset_manager_submit: str = asset_manager_login + submit
asset_manager_forgot: str = asset_manager_login + forgot
asset_manager_reset: str = asset_manager_login + reset
# user-management # user-management
asset_manager_user_management: str = "/users" asset_manager_user_management: str = "/users"
...@@ -21,6 +26,7 @@ class ApiEndPoints: ...@@ -21,6 +26,7 @@ class ApiEndPoints:
asset_manager_user_view: str = asset_manager_user_management + view asset_manager_user_view: str = asset_manager_user_management + view
asset_manager_user_update: str = asset_manager_user_management + update asset_manager_user_update: str = asset_manager_user_management + update
asset_manager_user_delete: str = asset_manager_user_management + delete asset_manager_user_delete: str = asset_manager_user_management + delete
asset_manager_user_search: str = asset_manager_user_management + search
# dashboard-management # dashboard-management
asset_manager_dashboard: str = "/dashboard" asset_manager_dashboard: str = "/dashboard"
......
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from fastapi import status from fastapi import status
from scripts.logging.logger import logger
from scripts.schemas.default_responses import DefaultResponse from scripts.schemas.default_responses import DefaultResponse
from scripts.utils.response_utils import ResponseData
obj_download_util = ResponseData()
class DashboardManagement: class DashboardManagement:
def __init__(self): def __init__(self):
self.download_files = { self.download_files = obj_download_util.download_file_data()
"SCN101-Manual (Local Config with IoTSetupUI)":
"https://ilens.io/DownloadFiles/SCN_Device_Configuration_Page_Updates_4_7.pdf",
"SCN101 Firmware 4.7 Updates":
"https://ilens.io/DownloadFiles/SCN101Manual.pdf",
"IoTSetupUI":
"https://ilens.io/DownloadFiles/IoTsetupUI-V1.6.zip",
"CP2102 SCN Windows USB Driver":
"https://ilens.io/DownloadFiles/CP2102_Windows.zip",
"SCNFirmwareBurner":
"https://ilens.io/DownloadFiles/flash_download_tool_v3.8.5.zip",
"SCN101A Firmware - V2.7 (Board: 1.4, 1.6 and 4MB Part.) ":
"https://ilens.io/DownloadFiles/SCN101_Firmware_V2_7_B1_6.bin",
"SCN101A Firmware - V2.8 (Board: 1.4, 1.6 and 4MB Part.) ":
"https://ilens.io/DownloadFiles/SCN101_Firmware_V2_8_B1_6.bin",
"SCN101A4G (SCN Relay, 4G, SCN201 and SCN101L) Firmware - V4.5 (16MB Part.) ":
"https://ilens.io/DownloadFiles/SCN101_R_A4G_SCN201_Firmware_V4_5.bin",
"SCN101A4G (SCN Relay, 4G, SCN201 and SCN101L) Firmware - V4.6 (16MB Part.) ":
"https://ilens.io/DownloadFiles/SCN101_R_A4G_SCN201_Firmware_V4_6.bin",
"SCN101A4G (SCN Relay, 4G, SCN201 and SCN101L) Firmware - V4.7 (16MB Part.) ":
"https://ilens.io/DownloadFiles/SCN101_R_A4G_SCN201_Firmware_V4_7.bin",
"P10_LED_Driver_V2.1 (Board 1.1)":
"https://ilens.io/DownloadFiles/P10_LED_Driver_V2.1.bin",
"SCN-LED Reset Firmware (Board 1.1)":
"https://ilens.io/DownloadFiles/SCN-LED_Reset.bin",
"SCN Reset Firmware (Partition: 4MB)":
"https://ilens.io/DownloadFiles/SCN_Reset_4MB_Part.bin",
"SCN Reset Firmware (Partition: 16MB)":
"https://ilens.io/DownloadFiles/SCN_Reset_16MB_Part.bin"
}
def download_header(self): def download_header(self):
data = { try:
"actions": [ data = {
{ "actions": [
"class": "fa-download", {
"action": "download", "class": "fa-download",
"tooltip": "Download" "action": "download",
} "tooltip": "Download"
], }
"column_defs": [] ],
} "columnDefs": [
print(data["column_defs"]) {
column_values = {{"header_name": key, "field": key, "key": "file_name"} for key in self.download_files} "headerName": "File Name",
print(column_values) "field": "file_name",
data["column_defs"].append(column_values) "key": "file_name"
print(data) }],
return JSONResponse(
content=DefaultResponse(status="success", message="Fetched Successfully",
data=data).dict(),
status_code=status.HTTP_200_OK)
@staticmethod }
def download_details(): column_urls = [{"file_name": key, "file_url": value} for key, value in self.download_files.items()]
return JSONResponse( data["rowData"] = column_urls
content=DefaultResponse(status="success", message="Fetched Successfully", return JSONResponse(
data=download_files).dict(), content=DefaultResponse(status="success", message="Fetched Successfully",
status_code=status.HTTP_200_OK) data=data).dict(),
status_code=status.HTTP_200_OK)
except Exception as e:
logger.exception(e)
from fastapi import Response import smtplib
import ssl
from datetime import timedelta, datetime
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from scripts.core.handlers.normal_login import NormalLogin
from fastapi.responses import JSONResponse
from fastapi import status from fastapi import status
from fastapi.responses import JSONResponse
from scripts.config import Services, Secrets
from scripts.core.handlers.normal_login import NormalLogin
from scripts.database.mongo.mongo_db import MongoUser
from scripts.errors import ErrorMessages from scripts.errors import ErrorMessages
from scripts.logging.logger import logger
from scripts.schemas.default_responses import DefaultFailureResponse, DefaultResponse from scripts.schemas.default_responses import DefaultFailureResponse, DefaultResponse
from scripts.utils.security.jwt_util import JWT
from scripts.utils.security.password_util import EncryptDecryptPassword from scripts.utils.security.password_util import EncryptDecryptPassword
obj_mongo_user = MongoUser()
jwt = JWT()
class LoginHandlers: class LoginHandlers:
def __init__(self): def __init__(self):
...@@ -15,10 +27,11 @@ class LoginHandlers: ...@@ -15,10 +27,11 @@ class LoginHandlers:
self.login_type = "" self.login_type = ""
def normal_login(self, user_data, request): def normal_login(self, user_data, request):
self.login_type = "normal" self.login_type = "general_login"
# decrypting the password from the UI # decrypting the password from the UI
decrypted_password = self.pass_decrypt.password_decrypt(user_data.password) decrypted_password = self.pass_decrypt.password_decrypt(user_data.password)
# validating the received inputs empty or not # validating the received inputs empty or not
# password decrypted form - token "password"
responses = self.obj_login_handler.user_data_validation( responses = self.obj_login_handler.user_data_validation(
user_data.email, user_data.email,
decrypted_password.split("\"")[1]) decrypted_password.split("\"")[1])
...@@ -58,3 +71,49 @@ class LoginHandlers: ...@@ -58,3 +71,49 @@ class LoginHandlers:
# v2 # v2
def microsoft_login(self, request): def microsoft_login(self, request):
pass pass
@staticmethod
def forgot_password_handler(email):
try:
# Check if user exists in database
# If user exists, send forgot password email with JWT token
# This should include email and expire time
# Send email using MIME
db_user_data = obj_mongo_user.fetch_one_user_details(email)
# if the user is not available
if not db_user_data:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_EMAIL_ID_DOESNT_EXIST).dict(),
status_code=status.HTTP_200_OK)
mail = MIMEMultipart()
mail['From'] = Services.EMAIL_SENDER
mail['To'] = email
mail['Subject'] = "Link TO Reset Password"
to_encode = {"email": email}
expire = datetime.utcnow() + timedelta(minutes=Secrets.TOKEN_EXPIRE_TIME)
to_encode.update({"exp": expire})
jwt_token = jwt.encode(to_encode)
html = ''
# Load the HTML file
try:
with open(Services.HTML_LINK, "r") as f:
html = f.read()
html = html.replace("{{ message }}", "Please click the link to reset your password:").replace(
"{{ link }}", Services.RESET_ENDPOINT + "=" + str(jwt_token))
except Exception as e:
logger.exception(e)
html_body = MIMEText(html, "html")
mail.attach(html_body)
context = ssl.create_default_context()
with smtplib.SMTP_SSL(Services.EMAIL_SMTP, Services.EMAIL_PORT, context=context) as smtp:
smtp.login(Services.EMAIL_SENDER, Services.EMAIL_PASSWORD)
# sending the mail
smtp.sendmail(Services.EMAIL_SENDER, email, mail.as_string())
return JSONResponse(
content=DefaultResponse(status="success", message="Email Send Successfully",
data={"username": email}).dict(),
status_code=status.HTTP_200_OK)
except Exception as e:
logger.exception(e)
...@@ -40,7 +40,6 @@ class NormalLogin: ...@@ -40,7 +40,6 @@ class NormalLogin:
# if the user is not available # if the user is not available
if not self.db_user_data: if not self.db_user_data:
return False, ErrorMessages.ERROR_UNAUTHORIZED_USER_LOGIN return False, ErrorMessages.ERROR_UNAUTHORIZED_USER_LOGIN
# if the user is not registered through normal login # if the user is not registered through normal login
if self.db_user_data["login_type"] != login_type: if self.db_user_data["login_type"] != login_type:
return False, ErrorMessages.ERROR_LOGIN_TYPE_INVALID return False, ErrorMessages.ERROR_LOGIN_TYPE_INVALID
......
...@@ -6,10 +6,12 @@ from scripts.logging.logger import logger ...@@ -6,10 +6,12 @@ from scripts.logging.logger import logger
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from fastapi import status from fastapi import status
from scripts.schemas.default_responses import DefaultResponse, DefaultFailureResponse from scripts.schemas.default_responses import DefaultResponse, DefaultFailureResponse
from scripts.utils.response_utils import ResponseData
from scripts.utils.security.password_util import EncryptDecryptPassword from scripts.utils.security.password_util import EncryptDecryptPassword
from scripts.utils.validations_util import UserDataValidations from scripts.utils.validations_util import UserDataValidations
obj_mongo_user = MongoUser() obj_mongo_user = MongoUser()
obj_response_data = ResponseData()
class UserManagement: class UserManagement:
...@@ -19,15 +21,20 @@ class UserManagement: ...@@ -19,15 +21,20 @@ class UserManagement:
# for normal registration using email and password # for normal registration using email and password
def normal_register(self, user_data): def normal_register(self, user_data):
try: try:
response, message = UserDataValidations.register_data_validation(user_data, 'normal', self.method) response, message = UserDataValidations.register_data_validation(user_data, 'general', self.method)
if not response: if not response:
return message return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=message["message"]).dict(),
status_code=status.HTTP_200_OK)
# fetching the data based on the username # fetching the data based on the username
db_user_data = obj_mongo_user.fetch_one_user_details(user_data.email) db_user_data = obj_mongo_user.fetch_one_user_details(user_data.email)
# if the user is not available # if the user is not available
if db_user_data: if db_user_data:
return {"message": ErrorMessages.ERROR_EMAIL_EXIST, return JSONResponse(
"data": {"username": user_data.email}} content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_EMAIL_EXIST).dict(),
status_code=status.HTTP_200_OK)
created_at = datetime.datetime.now() created_at = datetime.datetime.now()
updated_at = datetime.datetime.now() updated_at = datetime.datetime.now()
reg_time = {"created_at": created_at, "updated_at": updated_at} reg_time = {"created_at": created_at, "updated_at": updated_at}
...@@ -35,10 +42,14 @@ class UserManagement: ...@@ -35,10 +42,14 @@ class UserManagement:
if key == "password" else value) for key, value in user_data} if key == "password" else value) for key, value in user_data}
user_data_reg = user_data_dict | reg_time user_data_reg = user_data_dict | reg_time
if not obj_mongo_user.insert_new_user(user_data_reg): if not obj_mongo_user.insert_new_user(user_data_reg):
return {"message": ErrorMessages.ERROR_STORING_DATA, return JSONResponse(
"data": {"username": user_data_dict}} content=DefaultFailureResponse(status="failed",
return {"message": "New user registered", message=ErrorMessages.ERROR_STORING_DATA).dict(),
"data": {"username": user_data_dict["email"]}} status_code=status.HTTP_200_OK)
return JSONResponse(
content=DefaultResponse(status="success", message="Inserted Successfully",
data={"username": user_data_dict["email"]}).dict(),
status_code=status.HTTP_200_OK)
except Exception as e: except Exception as e:
logger.exception(e) logger.exception(e)
...@@ -133,6 +144,19 @@ class UserManagement: ...@@ -133,6 +144,19 @@ class UserManagement:
list_user_data = [] list_user_data = []
for users in cursor_data: for users in cursor_data:
list_user_data.append(users) list_user_data.append(users)
data_response = obj_response_data.user_view()
print(data_response.update({"rowData": list_user_data}))
return JSONResponse(
content=DefaultResponse(status="success", message="Fetched Successfully",
data=data_response).dict(),
status_code=status.HTTP_200_OK)
except Exception as e:
logger.exception(e)
@staticmethod
def user_filter_data(body_data):
try:
return JSONResponse( return JSONResponse(
content=DefaultResponse(status="success", message="Fetched Successfully", content=DefaultResponse(status="success", message="Fetched Successfully",
data=list_user_data).dict(), data=list_user_data).dict(),
......
This diff is collapsed.
...@@ -23,3 +23,7 @@ class UserUpdate(BaseModel): ...@@ -23,3 +23,7 @@ class UserUpdate(BaseModel):
email: Optional[str] = None email: Optional[str] = None
phone_number: Optional[str] = None phone_number: Optional[str] = None
user_role: Optional[str] = None user_role: Optional[str] = None
class EmailValidation(BaseModel):
email: str
...@@ -8,7 +8,7 @@ from scripts.core.handlers.user_management_handler import UserManagement ...@@ -8,7 +8,7 @@ from scripts.core.handlers.user_management_handler import UserManagement
from scripts.errors import ErrorMessages from scripts.errors import ErrorMessages
from scripts.logging.logger import logger from scripts.logging.logger import logger
from scripts.schemas.default_responses import DefaultFailureResponse from scripts.schemas.default_responses import DefaultFailureResponse
from scripts.schemas.project_schema import LoginRequest, RegistrationData, UserUpdate from scripts.schemas.project_schema import LoginRequest, RegistrationData, UserUpdate, EmailValidation
from scripts.utils.security.authorize_access import AuthorizeAccess from scripts.utils.security.authorize_access import AuthorizeAccess
from scripts.utils.security.decorators import MetaInfoSchema, auth from scripts.utils.security.decorators import MetaInfoSchema, auth
...@@ -30,7 +30,7 @@ async def login_default( ...@@ -30,7 +30,7 @@ async def login_default(
try: try:
# mapper for login types # mapper for login types
login_mapper = { login_mapper = {
"normal": obj_login_handler.normal_login, "general_login": obj_login_handler.normal_login,
"google": obj_login_handler.google_login, "google": obj_login_handler.google_login,
"microsoft": obj_login_handler.microsoft_login "microsoft": obj_login_handler.microsoft_login
} }
...@@ -51,6 +51,39 @@ async def login_default( ...@@ -51,6 +51,39 @@ async def login_default(
status_code=status.HTTP_200_OK) status_code=status.HTTP_200_OK)
# Forgot Password API
@router.post(ApiEndPoints.asset_manager_forgot)
async def forgot_password(
validation_data: EmailValidation
):
try:
# forgot password
response = obj_login_handler.forgot_password_handler(validation_data.email)
return response
except Exception as e:
logger.exception(e)
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.OP_FAILED).dict(),
status_code=status.HTTP_200_OK)
# Reset Password API
@router.get(ApiEndPoints.asset_manager_reset)
async def forgot_password(
token: str
):
try:
# forgot password
print(token)
except Exception as e:
logger.exception(e)
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.OP_FAILED).dict(),
status_code=status.HTTP_200_OK)
# Create new users API # Create new users API
@router.post(ApiEndPoints.asset_manager_user_add) @router.post(ApiEndPoints.asset_manager_user_add)
async def user_register( async def user_register(
...@@ -60,12 +93,13 @@ async def user_register( ...@@ -60,12 +93,13 @@ async def user_register(
try: try:
response = AuthorizeAccess().admin_authorize(request) response = AuthorizeAccess().admin_authorize(request)
if not response: if not response:
return HTTPException( return JSONResponse(
status_code=status.HTTP_403_FORBIDDEN, content=DefaultFailureResponse(status="failed",
detail=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS) message=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS).dict(),
status_code=status.HTTP_200_OK)
# mapper for login types # mapper for login types
register_mapper = { register_mapper = {
"normal": obj_user_handler.normal_register, "general": obj_user_handler.normal_register,
"google": obj_user_handler.google_register, "google": obj_user_handler.google_register,
"microsoft": obj_user_handler.microsoft_register "microsoft": obj_user_handler.microsoft_register
} }
...@@ -96,9 +130,10 @@ async def user_register( ...@@ -96,9 +130,10 @@ async def user_register(
try: try:
response = AuthorizeAccess().admin_authorize(request) response = AuthorizeAccess().admin_authorize(request)
if not response: if not response:
return HTTPException( return JSONResponse(
status_code=status.HTTP_403_FORBIDDEN, content=DefaultFailureResponse(status="failed",
detail=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS) message=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS).dict(),
status_code=status.HTTP_200_OK)
response = obj_user_handler.update_user_details(email, update_data) response = obj_user_handler.update_user_details(email, update_data)
return response return response
except Exception as e: except Exception as e:
...@@ -118,9 +153,10 @@ async def user_register( ...@@ -118,9 +153,10 @@ async def user_register(
try: try:
response = AuthorizeAccess().admin_authorize(request) response = AuthorizeAccess().admin_authorize(request)
if not response: if not response:
return HTTPException( return JSONResponse(
status_code=status.HTTP_403_FORBIDDEN, content=DefaultFailureResponse(status="failed",
detail=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS) message=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS).dict(),
status_code=status.HTTP_200_OK)
response = obj_user_handler.delete_user_details(email) response = obj_user_handler.delete_user_details(email)
return response return response
except Exception as e: except Exception as e:
...@@ -139,9 +175,10 @@ async def user_register( ...@@ -139,9 +175,10 @@ async def user_register(
try: try:
response = AuthorizeAccess().admin_authorize(request) response = AuthorizeAccess().admin_authorize(request)
if not response: if not response:
return HTTPException( return JSONResponse(
status_code=status.HTTP_403_FORBIDDEN, content=DefaultFailureResponse(status="failed",
detail=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS) message=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS).dict(),
status_code=status.HTTP_200_OK)
response = obj_user_handler.fetch_user_details() response = obj_user_handler.fetch_user_details()
if not response: if not response:
return HTTPException( return HTTPException(
...@@ -156,17 +193,18 @@ async def user_register( ...@@ -156,17 +193,18 @@ async def user_register(
status_code=status.HTTP_200_OK) status_code=status.HTTP_200_OK)
# download Button Dashboard header # download Button Dashboard
@router.post(ApiEndPoints.asset_manager_dashboard_download_header) @router.post(ApiEndPoints.asset_manager_dashboard_download)
async def dashboard_download( async def dashboard_download(
request: MetaInfoSchema = Depends(auth) request: MetaInfoSchema = Depends(auth)
): ):
try: try:
response = AuthorizeAccess().admin_authorize(request) response = AuthorizeAccess().admin_authorize(request)
if not response: if not response:
return HTTPException( return JSONResponse(
status_code=status.HTTP_403_FORBIDDEN, content=DefaultFailureResponse(status="failed",
detail=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS) message=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS).dict(),
status_code=status.HTTP_200_OK)
response = obj_dashboard_handler.download_header() response = obj_dashboard_handler.download_header()
if not response: if not response:
return HTTPException( return HTTPException(
...@@ -181,18 +219,21 @@ async def dashboard_download( ...@@ -181,18 +219,21 @@ async def dashboard_download(
status_code=status.HTTP_200_OK) status_code=status.HTTP_200_OK)
# download Button Dashboard # API for users search filter
@router.post(ApiEndPoints.asset_manager_dashboard_download) @router.post(ApiEndPoints.asset_manager_user_search)
async def dashboard_download( async def user_search_filter(
payload_data: Request,
request: MetaInfoSchema = Depends(auth) request: MetaInfoSchema = Depends(auth)
): ):
try: try:
response = AuthorizeAccess().admin_authorize(request) response = AuthorizeAccess().admin_authorize(request)
if not response: if not response:
return HTTPException( return JSONResponse(
status_code=status.HTTP_403_FORBIDDEN, content=DefaultFailureResponse(status="failed",
detail=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS) message=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS).dict(),
response = obj_dashboard_handler.download_details() status_code=status.HTTP_200_OK)
request_body = await payload_data.json()
response = obj_user_handler.user_filter_data(request_body)
if not response: if not response:
return HTTPException( return HTTPException(
status_code=status.HTTP_403_FORBIDDEN, status_code=status.HTTP_403_FORBIDDEN,
......
This diff is collapsed.
...@@ -4,14 +4,6 @@ from scripts.logging.logger import logger ...@@ -4,14 +4,6 @@ from scripts.logging.logger import logger
class RegexValidation: class RegexValidation:
@staticmethod
def first_name_validation(first_name):
try:
regex = re.fullmatch('([A-Za-z ]{1,100})', str(first_name))
return regex
except Exception as e:
logger.error(f'An Error While listing the home plans {str(e)}')
@staticmethod @staticmethod
def name_validation(name): def name_validation(name):
try: try:
...@@ -28,15 +20,6 @@ class RegexValidation: ...@@ -28,15 +20,6 @@ class RegexValidation:
except Exception as e: except Exception as e:
logger.error(f'An Error While listing the home plans {str(e)}') logger.error(f'An Error While listing the home plans {str(e)}')
@staticmethod
def email_validation(email):
try:
regex = r'\b[A-Za-z0-9._%+-]+@([A-Za-z0-9.-]{5,20})+\.[A-Z|a-z]{3}\b'
if re.fullmatch(regex, email):
return regex
except Exception as e:
logger.error(f'An Error While listing the home plans {str(e)}')
@staticmethod @staticmethod
def password_validation(password): def password_validation(password):
try: try:
......
class ResponseData:
@staticmethod
def user_view():
header = {
"actions": [
{
"class": "fa-pencil",
"action": "edit",
"tooltip": "Edit"
},
{
"class": "fa-trash",
"action": "delete",
"tooltip": "Delete"
}
], "externalActions": [
{
"type": "button",
"action": "addnew",
"label": "Create User"
}
], "columnDefs": [
{
"headerName": "User Name",
"field": "userName",
"key": "userName"
},
{
"headerName": "Name",
"field": "name",
"key": "name"
},
{
"headerName": "Role",
"field": "role",
"key": "role"
}
],
}
return header
@staticmethod
def download_file_data():
data = {
"SCN101-Manual (Local Config with IoTSetupUI)":
"https://ilens.io/DownloadFiles/SCN_Device_Configuration_Page_Updates_4_7.pdf",
"SCN101 Firmware 4.7 Updates":
"https://ilens.io/DownloadFiles/SCN101Manual.pdf",
"IoTSetupUI":
"https://ilens.io/DownloadFiles/IoTsetupUI-V1.6.zip",
"CP2102 SCN Windows USB Driver":
"https://ilens.io/DownloadFiles/CP2102_Windows.zip",
"SCNFirmwareBurner":
"https://ilens.io/DownloadFiles/flash_download_tool_v3.8.5.zip",
"SCN101A Firmware - V2.7 (Board: 1.4, 1.6 and 4MB Part.) ":
"https://ilens.io/DownloadFiles/SCN101_Firmware_V2_7_B1_6.bin",
"SCN101A Firmware - V2.8 (Board: 1.4, 1.6 and 4MB Part.) ":
"https://ilens.io/DownloadFiles/SCN101_Firmware_V2_8_B1_6.bin",
"SCN101A4G (SCN Relay, 4G, SCN201 and SCN101L) Firmware - V4.5 (16MB Part.) ":
"https://ilens.io/DownloadFiles/SCN101_R_A4G_SCN201_Firmware_V4_5.bin",
"SCN101A4G (SCN Relay, 4G, SCN201 and SCN101L) Firmware - V4.6 (16MB Part.) ":
"https://ilens.io/DownloadFiles/SCN101_R_A4G_SCN201_Firmware_V4_6.bin",
"SCN101A4G (SCN Relay, 4G, SCN201 and SCN101L) Firmware - V4.7 (16MB Part.) ":
"https://ilens.io/DownloadFiles/SCN101_R_A4G_SCN201_Firmware_V4_7.bin",
"P10_LED_Driver_V2.1 (Board 1.1)":
"https://ilens.io/DownloadFiles/P10_LED_Driver_V2.1.bin",
"SCN-LED Reset Firmware (Board 1.1)":
"https://ilens.io/DownloadFiles/SCN-LED_Reset.bin",
"SCN Reset Firmware (Partition: 4MB)":
"https://ilens.io/DownloadFiles/SCN_Reset_4MB_Part.bin",
"SCN Reset Firmware (Partition: 16MB)":
"https://ilens.io/DownloadFiles/SCN_Reset_16MB_Part.bin"
}
return data
...@@ -6,7 +6,10 @@ obj_mongo_user = MongoUser() ...@@ -6,7 +6,10 @@ obj_mongo_user = MongoUser()
class AuthorizeAccess: class AuthorizeAccess:
@staticmethod @staticmethod
def admin_authorize(request): def admin_authorize(request):
user_data = obj_mongo_user.fetch_one_user_details(request.user_id) try:
if user_data["user_role"] != "super admin": user_data = obj_mongo_user.fetch_one_user_details(request.user_id)
if user_data["user_role"] != "super admin":
return False
return True
except TypeError:
return False return False
return True
...@@ -39,5 +39,5 @@ class EncryptDecryptPassword: ...@@ -39,5 +39,5 @@ class EncryptDecryptPassword:
def password_encrypt(self, password): def password_encrypt(self, password):
decrypted_password = self.password_decrypt(password) decrypted_password = self.password_decrypt(password)
hashed_password = self.pwd_context.hash(decrypted_password) hashed_password = self.pwd_context.hash(decrypted_password.split("\"")[1])
return hashed_password return hashed_password
...@@ -18,14 +18,14 @@ class UserDataValidations: ...@@ -18,14 +18,14 @@ class UserDataValidations:
user_data.email) is not True: user_data.email) is not True:
return False, {"message": ErrorMessages.ERROR_INVALID_EMAIL, "data": user_data.email} return False, {"message": ErrorMessages.ERROR_INVALID_EMAIL, "data": user_data.email}
# checking for valid password # checking for valid password
if method == 'normal': if method == 'general':
if user_data.password == "" or user_data.password == "string": if user_data.password == "" or user_data.password == "string":
return False, {"message": ErrorMessages.ERROR_INVALID_PASSWORD, "data": user_data.password} return False, {"message": ErrorMessages.ERROR_INVALID_PASSWORD, "data": user_data.password}
# Validate phone number # Validate phone number
if user_data.phone_number == "": if user_data.phone_number == "":
return False, {"message": ErrorMessages.ERROR_INVALID_PHONE_NUMBER, return False, {"message": ErrorMessages.ERROR_INVALID_PHONE_NUMBER,
"data": user_data.phone_number} "data": user_data.phone_number}
if user_data.user_role == "" and method == 'normal' and feature == 'register': if user_data.user_role == "" and method == 'general' and feature == 'register':
return False, {"message": ErrorMessages.ERROR_INVALID_USER_ROLE, return False, {"message": ErrorMessages.ERROR_INVALID_USER_ROLE,
"data": user_data.phone_number} "data": user_data.phone_number}
return True, None return True, None
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment