Commit 24d0cc0c authored by arun.uday's avatar arun.uday

Merge branch '1-arun-uday-asset-manager-v1-0' into 'master'

1 arun uday asset manager v1 0

See merge request !4
parents a8343237 a0a786d6
......@@ -4,12 +4,25 @@ DB_NAME=userDB
REDIS_URI=redis://127.0.0.1:6379
REDIS_LOGIN_DB=10
SERVICE_HOST=127.0.0.1
SERVICE_HOST=0.0.0.0
SERVICE_PORT=8671
PROJECT_NAME=AssetManager
BASE_PATH=scripts/
SUB_PATH=log/
ENCODING_TYPE=utf-8
KEY_ENCRYPTION=kliLensKLiLensKL
SECRET_KEY=09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7
ENCODING_TYPE=utf-8
\ No newline at end of file
EMAIL_SENDER=arun.send.email@gmail.com
EMAIL_SMTP=smtp.gmail.com
EMAIL_PORT=465
EMAIL_PASSWORD=gpphuiweedqukchf
HTML_LINK=scripts/utils/link_email.html
RESET_ENDPOINT=http://localhost:8671/asset_manager_api/v1/login/reset?token
CLIENT_ID=1060631831358-a21djaa3hm165a8976fnmo1lerujs5p6.apps.googleusercontent.com
LOG_PATH=log
LOG_LEVEL=INFO
BACKUP_COUNT=100
MAX_BYTES=5
......@@ -38,5 +38,5 @@ if __name__ == "__main__":
# starting the app
uvicorn.run("main:app", host=arguments["bind"], port=int(arguments["port"]))
except Exception as e:
logger.exception(e)
logger.exception(f'Services Failed with error {e}')
[project_name]
PROJECT_NAME = AssetManager
2023-04-04 20:12:33 - INFO - [MainThread:<module>(): 37] - App Starting at 0.0.0.0:8671
2023-04-04 20:08:30 - INFO - [MainThread:<module>(): 37] - App Starting at 0.0.0.0:8671
2023-04-04 20:07:39 - INFO - [MainThread:<module>(): 37] - App Starting at 0.0.0.0:8671
2023-04-04 20:06:55 - INFO - [MainThread:<module>(): 37] - App Starting at 0.0.0.0:8671
2023-04-04 20:06:23 - INFO - [MainThread:<module>(): 37] - App Starting at 0.0.0.0:8671
2023-04-04 20:06:10 - INFO - [MainThread:<module>(): 37] - App Starting at 0.0.0.0:8671
uvicorn==0.21.1
python-dotenv~=1.0.0
pydantic~=1.10.6
pydantic==1.10.7
fastapi==0.95.0
passlib~=1.7.4
pymongo~=4.3.3
......@@ -9,4 +9,7 @@ email-validator~=1.3.1
pycryptodomex~=3.17
PyJWT~=2.6.0
validate_email~=1.3
redis~=4.5.2
\ No newline at end of file
redis==4.5.4
google~=3.0.0
google-auth~=2.17.1
requests~=2.28.2
\ No newline at end of file
......@@ -14,8 +14,17 @@ class _Services(BaseSettings):
CORS_ALLOW_CREDENTIALS: bool = True
CORS_ALLOW_METHODS: list[str] = ["GET", "POST", "DELETE", "PUT"]
CORS_ALLOW_HEADERS: list[str] = ["*"]
LOG_LEVEL: Literal["INFO", "DEBUG", "ERROR", "QTRACE"] = "INFO"
ENABLE_FILE_LOGGING: bool = False
LOG_LEVEL: str
BACKUP_COUNT: int
MAX_BYTES: int
ENABLE_FILE_LOGGING: bool = True
EMAIL_SENDER: str
EMAIL_SMTP: str
EMAIL_PORT: int
EMAIL_PASSWORD: str
HTML_LINK: str
RESET_ENDPOINT: str
DATE_TIME = '%Y-%m-%d %H:%M:%S'
class _Databases(BaseSettings):
......@@ -25,21 +34,18 @@ class _Databases(BaseSettings):
REDIS_LOGIN_DB: int
class _BasePathConf(BaseSettings):
BASE_PATH: str = "scripts/"
class _PathConf:
BASE_PATH: pathlib.Path = pathlib.Path(_BasePathConf().BASE_PATH)
LOG_PATH: pathlib.Path = BASE_PATH / "log/"
class _PathConf(BaseSettings):
LOG_PATH: str
class _Secrets(BaseSettings):
ACCESS_TOKEN_EXPIRE_MINUTES = 30
ACCESS_TOKEN_EXPIRE_MINUTES = 480
TOKEN_EXPIRE_TIME = 5
leeway_in_minutes: int = 10
KEY_ENCRYPTION = "kliLensKLiLensKL"
KEY_ENCRYPTION: str
CLIENT_ID: str
issuer: str = "iotManager"
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
SECRET_KEY: str
ALGORITHM = "HS256"
......
class ApiEndPoints:
# Main root
root = "/asset_manager_api"
# version
version = "/v1"
# common
submit: str = "/submit"
add: str = "/add"
view: str = "/view"
add: str = "/adduser"
view: str = "/table_body"
table_actions: str = "/table_actions"
update: str = "/update"
delete: str = "/delete"
header: str = "/table_header"
download: str = "/download"
search: str = "/search"
forgot: str = "/forgot"
reset: str = "/reset"
logout: str = "/logout"
# login-management
asset_manager_login: str = "/login"
asset_manager_submit: str = asset_manager_login + submit
asset_manager_forgot: str = asset_manager_login + forgot
asset_manager_reset: str = asset_manager_login + reset
# user-management
asset_manager_user_management: str = "/users"
asset_manager_user_add: str = asset_manager_user_management + add
asset_manager_user_view: str = asset_manager_user_management + view
asset_manager_user_header: str = asset_manager_user_management + header
asset_manager_user_table_actions: str = asset_manager_user_management + table_actions
asset_manager_user_update: str = asset_manager_user_management + update
asset_manager_user_delete: str = asset_manager_user_management + delete
asset_manager_user_search: str = asset_manager_user_management + search
asset_manager_user_logout: str = asset_manager_user_management + logout
asset_manager_user_reset: str = asset_manager_user_management + reset
# dashboard-management
asset_manager_dashboard: str = "/dashboard"
asset_manager_dashboard_download_header: str = asset_manager_dashboard + download + header
asset_manager_dashboard_download: str = asset_manager_dashboard + download
class Validations:
email = "user@example.com"
from fastapi.responses import JSONResponse
from fastapi import status
from scripts.logging.logger import logger
from scripts.schemas.default_responses import DefaultResponse
from scripts.utils.response_utils import ResponseData
obj_download_util = ResponseData()
# download header and row data
class DashboardManagement:
def __init__(self):
self.download_files = obj_download_util.download_file_data()
def download_header(self):
try:
# header contents
data = {
"actions": [
{
"class": "fa-download",
"action": "download",
"tooltip": "Download"
}
],
"columnDefs": [
{
"headerName": "File Name",
"field": "file_name",
"key": "file_name",
"flex": 0,
"width": 1010
}],
}
# column urls
column_urls = [{"file_name": key, "file_url": value} for key, value in self.download_files.items()]
data["rowData"] = column_urls
return JSONResponse(
content=DefaultResponse(status="success", message="Fetched Successfully",
data=data).dict(),
status_code=status.HTTP_200_OK)
except Exception as e:
logger.exception(e)
from scripts.core.handlers.normal_login import NormalLogin
from fastapi.responses import JSONResponse
import smtplib
import ssl
from datetime import timedelta, datetime
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from jwt.exceptions import ExpiredSignatureError, DecodeError
from google.oauth2 import id_token
from google.auth.transport import requests
from google.auth.exceptions import InvalidValue
from fastapi import status
from fastapi.responses import JSONResponse, RedirectResponse
from scripts.config import Services, Secrets
from scripts.core.handlers.process_login_data import NormalLogin
from scripts.database.mongo.mongo_db import MongoUser
from scripts.errors import ErrorMessages
from scripts.schemas.default_responses import DefaultFailureResponse, DefaultResponse
from scripts.logging.logger import logger
from scripts.schemas.default_responses import DefaultFailureResponse, DefaultResponse, DefaultSuccessResponse
from scripts.utils.security.jwt_util import JWT
from scripts.utils.security.password_util import EncryptDecryptPassword
obj_mongo_user = MongoUser()
jwt = JWT()
class LoginHandlers:
def __init__(self):
......@@ -12,45 +32,200 @@ class LoginHandlers:
self.pass_decrypt = EncryptDecryptPassword()
self.login_type = ""
def normal_login(self, user_data, request):
self.login_type = "normal"
def general_login(self, user_data, request):
self.login_type = "general_login"
# decrypting the password from the UI
decrypted_password = self.pass_decrypt.password_decrypt(user_data.password)
# validating the received inputs empty or not
response = self.obj_login_handler.user_data_validation(
user_data.email,
decrypted_password)
# password decrypted form - token "password"
try:
responses = self.obj_login_handler.user_data_validation(
user_data.email,
decrypted_password.split("\"")[1])
except AttributeError:
return JSONResponse(content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_INVALID_PASSWORD).dict(),
status_code=status.HTTP_200_OK)
# Account is not registered
if response is not None:
return JSONResponse(content=DefaultFailureResponse(error=response).dict(),
status_code=status.HTTP_400_BAD_REQUEST)
if responses is not None:
return JSONResponse(content=DefaultFailureResponse(status="failed",
message=responses).dict(),
status_code=status.HTTP_200_OK)
# checking for the account and password matching
response, data = self.obj_login_handler.db_password_matching(self.login_type, user_data,
decrypted_password)
user_data_response, data = self.obj_login_handler.db_password_matching(self.login_type, user_data,
decrypted_password.split("\"")[1])
# if the passwords doesn't match with the db data
if response is not None:
return JSONResponse(content=DefaultFailureResponse(error=data).dict(),
status_code=status.HTTP_401_UNAUTHORIZED)
if user_data_response is not None:
return JSONResponse(content=DefaultFailureResponse(status="failed",
message=data).dict(),
status_code=status.HTTP_200_OK)
# generating the access tokens
response = self.obj_login_handler.generate_cookie_tokens(user_data, request)
responses, exp = self.obj_login_handler.generate_cookie_tokens(data, request)
# token generation unsuccessful
if response is None:
if responses is None:
return JSONResponse(
content=DefaultFailureResponse(message="Access Unsuccessful",
error=ErrorMessages.ERROR_TOKEN_GENERATION).dict(),
status_code=status.HTTP_403_FORBIDDEN)
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_TOKEN_GENERATION).dict(),
status_code=status.HTTP_200_OK)
# sending successful response to UI
return JSONResponse(
content=DefaultResponse(message="Login Successful", data=response).dict(),
status_code=status.HTTP_200_OK)
response = JSONResponse(
content=DefaultResponse(status="success", message="Logged In Successfully",
data=data).dict(),
status_code=status.HTTP_200_OK, headers={"Content-Type": "application/json"})
response.set_cookie(key="login-token", value=responses, expires=exp)
return response
# v1
def google_login(self, request):
pass
def google_login(self, user_data, request):
user_data_remove_none = {key: value for key, value in user_data if key != 'login_type' and value is not None}
req = requests.Request()
try:
id_info = id_token.verify_oauth2_token(
user_data_remove_none["id_token"], req, Secrets.CLIENT_ID)
except InvalidValue:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_TOKEN_EXPIRED).dict(),
status_code=status.HTTP_200_OK)
response, message = self.obj_login_handler.db_data_validation(user_data.login_type, id_info["email"])
# if the response is false then an error message is send back
if response is not None:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=message).dict(),
status_code=status.HTTP_200_OK)
message.update({"name": id_info["name"], "pic_url": id_info["picture"]})
responses = self.obj_login_handler.update_pic(obj_mongo_user, id_info)
if responses is None:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_IN_UPDATING).dict(),
status_code=status.HTTP_200_OK)
# generating the access tokens
responses, exp = self.obj_login_handler.generate_cookie_tokens(message, request)
# token generation unsuccessful
if responses is None:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_TOKEN_GENERATION).dict(),
status_code=status.HTTP_200_OK)
# sending successful response to UI
response = JSONResponse(
content=DefaultResponse(status="success", message="Logged In Successfully",
data=message).dict(),
status_code=status.HTTP_200_OK, headers={"Content-Type": "application/json"})
response.set_cookie(key="login-token", value=responses, expires=exp)
return response
# v2
def microsoft_login(self, request):
pass
# forgot password handler
@staticmethod
def forgot_password_handler(email):
try:
# Check if user exists in database
# If user exists, send forgot password email with JWT token
# This should include email and expire time
# Send email using MIME
db_user_data = obj_mongo_user.fetch_one_user_details({"email": email})
# if the user is not available
if not db_user_data:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_USER_ID_DOESNT_EXIST).dict(),
status_code=status.HTTP_200_OK)
mail = MIMEMultipart()
mail['From'] = Services.EMAIL_SENDER
mail['To'] = email
mail['Subject'] = "Link To Reset Password"
to_encode = {"email": email}
expire = datetime.utcnow() + timedelta(minutes=Secrets.TOKEN_EXPIRE_TIME)
to_encode.update({"exp": expire})
jwt_token = jwt.encode(to_encode)
html = ''
# Load the HTML file
try:
with open(Services.HTML_LINK, "r") as f:
html = f.read()
html = html.replace("{{ message }}", "Please click the link to reset your password:").replace(
"{{ link }}", Services.RESET_ENDPOINT + "=" + str(jwt_token))
except Exception as e:
logger.exception(e)
html_body = MIMEText(html, "html")
mail.attach(html_body)
context = ssl.create_default_context()
with smtplib.SMTP_SSL(Services.EMAIL_SMTP, Services.EMAIL_PORT, context=context) as smtp:
smtp.login(Services.EMAIL_SENDER, Services.EMAIL_PASSWORD)
# sending the mail
smtp.sendmail(Services.EMAIL_SENDER, email, mail.as_string())
return JSONResponse(
content=DefaultSuccessResponse(status="success", message="Email Send Successfully").dict(),
status_code=status.HTTP_200_OK)
except Exception as e:
logger.exception(e)
@staticmethod
def validate_jwt(request):
try:
jwt_token = request.query_params['token']
jwt_token_encoded = jwt_token.encode('utf-8')
# Verify and decode the JWT token
try:
decoded_token = jwt.decode(jwt_token_encoded)
db_user_data = obj_mongo_user.fetch_one_user_details({"email": decoded_token['email']})
# if the user is not available
if not db_user_data:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_USER_ID_DOESNT_EXIST).dict(),
status_code=status.HTTP_200_OK)
jwt_token_new = jwt.encode({"email": decoded_token['email']})
return RedirectResponse('http://192.168.2.102/iLens_UI/#/l/login?user_id=' + jwt_token_new)
except ExpiredSignatureError:
return RedirectResponse(
'http://192.168.2.102/iLens_UI/#/l/login?error=' + "true")
except Exception as e:
logger.exception(e)
@staticmethod
def reset_user_password(reset_data):
try:
user_id_token = reset_data.user_id
user_id = user_id_token.encode('utf-8')
# Verify and decode the JWT token
try:
try:
decoded_token = jwt.decode(user_id)
except DecodeError:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_INVALID_TOKEN).dict(),
status_code=status.HTTP_200_OK)
db_user_data = obj_mongo_user.fetch_one_user_details({"email": decoded_token['email']})
# if the user is not available
if not db_user_data:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_USER_ID_DOESNT_EXIST).dict(),
status_code=status.HTTP_200_OK)
response = EncryptDecryptPassword().password_encrypt(reset_data.password)
if not response:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_AUTH_FAILED).dict(),
status_code=status.HTTP_200_OK)
response = obj_mongo_user.update_user({"email": decoded_token['email']}, {"password": response})
if not response:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_IN_UPDATING).dict(),
status_code=status.HTTP_200_OK)
return JSONResponse(
content=DefaultSuccessResponse(status="success", message="Reset Successful").dict(),
status_code=status.HTTP_200_OK)
except ExpiredSignatureError:
return "Password Reset Token Expired"
except Exception as e:
logger.exception(e)
......@@ -19,37 +19,45 @@ class NormalLogin:
self.dt = datetime.now()
self.time_dt = datetime.now()
# user data validation
@staticmethod
def user_data_validation(email, password) -> dict | None:
def user_data_validation(email, password) -> dict | None:
try:
# checking for valid username
if email == "" or validate_email(email) is not True:
return {"message": ErrorMessages.ERROR_INVALID_EMAIL, "data": email}
return ErrorMessages.ERROR_INVALID_EMAIL
# checking for valid password
if password == "":
return {"message": ErrorMessages.ERROR_INVALID_PASSWORD, "data": password}
return ErrorMessages.ERROR_INVALID_PASSWORD
return None
except Exception as e:
logger.exception(e)
# db validation
def db_data_validation(self, login_type, email):
try:
# fetching the data based on the username
self.db_user_data = MongoUser().fetch_one_user_details(email)
self.db_user_data = MongoUser().fetch_one_user_details({"email": email})
# if the user is not available
if not self.db_user_data:
return False, {"message": ErrorMessages.ERROR_UNAUTHORIZED_USER_LOGIN,
"data": {"username": email}}
return False, ErrorMessages.ERROR_ACCOUNT_DOESNT_EXIST
# if the user is not registered through normal login
if self.db_user_data["login_type"] != login_type:
return False, {"message": ErrorMessages.ERROR_LOGIN_TYPE_INVALID,
"data": {"username": email, "Use Login": self.db_user_data["login_type"]}}
return False, ErrorMessages.ERROR_ACCESS_DENIED
try:
response = {"user_id": self.db_user_data["user_id"], "name": self.db_user_data["name"],
"email": email,
"user_role": self.db_user_data["user_role"]}
except KeyError:
response = {"user_id": self.db_user_data["user_id"],
"email": email,
"user_role": self.db_user_data["user_role"]}
# if the user exist
return None, {"message": True}
return None, response
except Exception as e:
logger.exception(e)
# matching the password
def db_password_matching(self, login_type, user_data, password):
try:
# getting the response after checking for the user data in db
......@@ -59,24 +67,31 @@ class NormalLogin:
return response, message
# if the user exists in db then password is matched
if not self.pwd_context.verify(password, self.db_user_data["password"]):
return False, {"message": ErrorMessages.ERROR_PASSWORD_MISMATCH,
"data": {"username": user_data.email}}
return False, ErrorMessages.ERROR_PASSWORD_MISMATCH
# if the password is correct
return None, {"username": user_data.email, "role": self.db_user_data["user_role"]}
return None, message
except Exception as e:
logger.exception(e)
@staticmethod
def update_pic(obj_mongo_user, info_data):
if not obj_mongo_user.update_user({"email": info_data["email"]},
{"name": info_data["name"], "pic_url": info_data["picture"]}):
return None
return True
# cookie and token creation
@staticmethod
def generate_cookie_tokens(user_data, request):
try:
# creating the access token
access_token = create_token(
user_id=user_data.email,
ip=request.ip_address
access_token, exp = create_token(
user_id=user_data["user_id"],
ip=request.client.host
)
# returning the login token
if access_token:
return {"user_id": access_token, "token_type": "bearer"}
return access_token, exp
else:
return None
except Exception as e:
......
import datetime
import uuid
from passlib.context import CryptContext
from scripts.database.mongo.mongo_db import MongoUser
from scripts.database.redis.redis_conn import login_db
from scripts.errors import ErrorMessages
from scripts.logging.logger import logger
from fastapi.responses import JSONResponse
from fastapi import status
from scripts.schemas.default_responses import DefaultResponse, DefaultFailureResponse, DefaultSuccessResponse
from scripts.utils.mongo_utils import MongoStageCreator
from scripts.utils.response_utils import ResponseData
from scripts.utils.security.password_util import EncryptDecryptPassword
from scripts.utils.validations_util import UserDataValidations
obj_mongo_user = MongoUser()
obj_response_data = ResponseData()
obj_stage = MongoStageCreator()
# user management
class UserManagement:
def __init__(self):
self.method = "register"
self.pass_decrypt = EncryptDecryptPassword()
self.pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# for normal registration using email and password
def normal_register(self, user_data):
def general_register(self, user_data):
try:
response, message = UserDataValidations.data_validation(user_data, 'normal', self.method)
response, message = UserDataValidations.register_data_validation(user_data, 'general', self.method)
if not response:
return message
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=message["message"]).dict(),
status_code=status.HTTP_200_OK)
# fetching the data based on the username
db_user_data = MongoUser().fetch_one_user_details(user_data.email)
db_user_data = obj_mongo_user.fetch_one_user_details({"email": user_data.email})
# if the user is not available
if db_user_data:
return {"message": ErrorMessages.ERROR_EMAIL_EXIST,
"data": {"username": user_data.email}}
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_EMAIL_EXIST).dict(),
status_code=status.HTTP_200_OK)
# creating a unique user id
uid = str(uuid.uuid4()).replace("-", "")
user_data.user_id = "user_" + uid
created_at = datetime.datetime.now()
updated_at = datetime.datetime.now()
reg_time = {"created_at": created_at, "updated_at": updated_at}
user_data_dict = {key: (EncryptDecryptPassword().password_encrypt(value)
if key == "password" else value) for key, value in user_data}
encrypted = EncryptDecryptPassword().password_encrypt(user_data.password)
if encrypted is None:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_INVALID_PASSWORD).dict(),
status_code=status.HTTP_200_OK)
# storing the user data in dict, encrypting the password
user_data_dict = {key: (encrypted
if key == "password" else value) for key, value in user_data if key != 'action'}
# adding the registration time to the user data
user_data_reg = user_data_dict | reg_time
# checking if the insertion is working
if not obj_mongo_user.insert_new_user(user_data_reg):
return {"message": ErrorMessages.ERROR_STORING_DATA,
"data": {"username": user_data_dict}}
return {"message": "New user registered",
"data": {"username": user_data_dict["email"]}}
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_STORING_DATA).dict(),
status_code=status.HTTP_200_OK)
return JSONResponse(
content=DefaultSuccessResponse(status="success", message="User Registration Successful").dict(),
status_code=status.HTTP_200_OK)
except Exception as e:
logger.exception(e)
@staticmethod
# for Google registration using gmail
def google_register():
def google_register(user_data):
try:
return {"message": "Not available now"}
# fetching the data based on the username
db_user_data = obj_mongo_user.fetch_one_user_details({"email": user_data.email})
# if the user is not available
if db_user_data:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_EMAIL_EXIST).dict(),
status_code=status.HTTP_200_OK)
# creating a unique user id
uid = str(uuid.uuid4()).replace("-", "")
user_data.user_id = "user_" + uid
created_at = datetime.datetime.now()
updated_at = datetime.datetime.now()
reg_time = {"created_at": created_at, "updated_at": updated_at}
# removing action and none values of the user data
user_data_dict = {key: value for key, value in user_data if key != 'action' and value is not None}
user_data_reg = user_data_dict | reg_time
# checking if the insertion is working
if not obj_mongo_user.insert_new_user(user_data_reg):
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_STORING_DATA).dict(),
status_code=status.HTTP_200_OK)
return JSONResponse(
content=DefaultSuccessResponse(status="success", message="User Registration Successful").dict(),
status_code=status.HTTP_200_OK)
except Exception as e:
logger.exception(e)
......@@ -55,10 +114,185 @@ class UserManagement:
except Exception as e:
logger.exception(e)
# update user details
def update_user_details(self, update_data):
try:
self.method = "update"
# fetching and validating the user details
db_user_data = obj_mongo_user.fetch_one_user_details({"user_id": update_data.user_id})
# if the user is not available
if db_user_data is None:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_USER_ID_DOESNT_EXIST).dict(),
status_code=status.HTTP_404_NOT_FOUND)
# checking the user email and validating
if update_data.email is not None:
db_user_data = obj_mongo_user.fetch_one_user_details(update_data.email)
# if the user is not available
if db_user_data is not None:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_EMAIL_EXIST).dict(),
status_code=status.HTTP_404_NOT_FOUND)
# creating the filter data
filter_data_updated = {"user_id": update_data.user_id}
encrypted = EncryptDecryptPassword().password_encrypt(update_data.password)
if encrypted is None:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_INVALID_PASSWORD).dict(),
status_code=status.HTTP_200_OK)
# encrypting the password
update_data_removed = {key: (encrypted
if key == "password" else value) for key, value in update_data if
key != 'action' and value is not None}
# validating the data
response, message = UserDataValidations.update_data_validation(update_data)
if not response:
return message
# updating the data
response = obj_mongo_user.update_user(filter_data_updated, update_data_removed)
if not response:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_IN_UPDATING).dict(),
status_code=status.HTTP_200_OK)
return JSONResponse(
content=DefaultSuccessResponse(status="success", message="Updated Successfully").dict(),
status_code=status.HTTP_200_OK)
except Exception as e:
logger.exception(e)
# delete user
@staticmethod
def delete_user_details(user_id):
# fetching and validating the user id
db_user_data = obj_mongo_user.fetch_one_user_details({"user_id": user_id})
# if the user is not available
if db_user_data is None:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_USER_ID_DOESNT_EXIST).dict(),
status_code=status.HTTP_404_NOT_FOUND)
# generating the filter
filter_data_updated = {"user_id": user_id}
# deleting the user
response = obj_mongo_user.delete_user(filter_data_updated)
if not response:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_IN_UPDATING).dict(),
status_code=status.HTTP_404_NOT_FOUND)
return JSONResponse(
content=DefaultSuccessResponse(status="success", message="Deleted Successfully").dict(),
status_code=status.HTTP_200_OK)
@staticmethod
def fetch_view_header():
try:
# fetching the table header for the user view
data_response = obj_response_data.user_view_header()
if data_response:
return JSONResponse(
content=DefaultResponse(status="success", message="Header Fetched Successfully",
data=data_response).dict(),
status_code=status.HTTP_200_OK)
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_IN_FETCHING).dict(),
status_code=status.HTTP_404_NOT_FOUND)
except Exception as e:
logger.exception(e)
@staticmethod
def fetch_user_details():
cursor_data = MongoUser().fetch_all_user_details()
list_user_data = []
for users in cursor_data:
list_user_data.append(users)
return list_user_data
try:
# defining the filter values
filter_data = {'_id': 0,
"login_type": 0,
"is_alive": 0,
"password": 0,
"created_at": 0,
"updated_at": 0}
# filtering the users and getting all the details
cursor_data = obj_mongo_user.fetch_all_user_details({}, filter_data)
cursor_data_count = cursor_data.explain()
# counting the total records in the query
if cursor_data_count["executionStats"]["nReturned"] <= 0:
return None
list_user_data = []
for users in cursor_data:
list_user_data.append(users)
# listing the user data
if list_user_data:
return JSONResponse(
content=DefaultResponse(status="success", message="Fetched Successfully",
data={"bodyContent": list_user_data}).dict(),
status_code=status.HTTP_200_OK)
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_IN_FETCHING).dict(),
status_code=status.HTTP_404_NOT_FOUND)
except Exception as e:
logger.exception(e)
# user change password
def reset_password(self, reset_data):
db_user_data = obj_mongo_user.fetch_one_user_details({"user_id": reset_data.user_id})
# if the user is not available
if db_user_data is None:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_USER_ID_DOESNT_EXIST).dict(),
status_code=status.HTTP_404_NOT_FOUND)
try:
decrypted_password = self.pass_decrypt.password_decrypt(reset_data.new_password)
except TypeError:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_INVALID_PASSWORD).dict(),
status_code=status.HTTP_200_OK)
if not self.pwd_context.verify(decrypted_password.split("\"")[1], db_user_data["password"]):
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_PASSWORD_MISMATCH).dict(),
status_code=status.HTTP_200_OK)
response = EncryptDecryptPassword().password_encrypt(reset_data.new_password)
if not response:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_AUTH_FAILED).dict(),
status_code=status.HTTP_200_OK)
response = obj_mongo_user.update_user({"user_id": reset_data.user_id}, {"password": response})
if not response:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_IN_UPDATING).dict(),
status_code=status.HTTP_200_OK)
return JSONResponse(
content=DefaultSuccessResponse(status="success", message="Password Changed Successfully").dict(),
status_code=status.HTTP_200_OK)
# user logout
@staticmethod
def logout_user(user_id, request):
try:
# getting the cookie token
uid = request.login_token
# checking the user id with the user id stored in cookie
if user_id.user_id != request.user_id:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_USER_SESSION).dict(),
status_code=status.HTTP_404_NOT_FOUND)
# deleting the login token from redis
login_db.delete(uid)
response = JSONResponse(
content=DefaultSuccessResponse(status="success", message="Logged Out").dict(),
status_code=status.HTTP_200_OK)
# deleting the cookie
response.delete_cookie("login-token")
return response
except Exception as e:
logger.exception(e)
......@@ -6,6 +6,7 @@ collection_name = DatabaseConstants.collection_user_details
class UserDetailsKeys:
KEY_USER_ID = "user_id"
KEY_NAME = "name"
KEY_EMAIL = "email"
KEY_PASSWORD = "password"
......@@ -17,6 +18,7 @@ class UserDetailsKeys:
class MongoUser(CollectionBaseClass):
key_name = UserDetailsKeys.KEY_NAME
key_user_id = UserDetailsKeys.KEY_USER_ID
key_email = UserDetailsKeys.KEY_EMAIL
key_password = UserDetailsKeys.KEY_PASSWORD
key_user_role = UserDetailsKeys.KEY_USER_ROLE
......@@ -28,28 +30,37 @@ class MongoUser(CollectionBaseClass):
super().__init__(mongo_client, Databases.DB_NAME, collection_name)
# fetching the user details based on the email id
def fetch_one_user_details(self, email):
if user := self.find_one(query={self.key_email: email}):
def fetch_one_user_details(self, query):
if user := self.find_one(query=query):
return user
return None
def fetch_all_user_details(self):
if user := self.find(query={}, filter_dict={'_id': 0,
"login_type": 0,
"is_alive": 0,
"password": 0,
"created_at": 0,
"updated_at": 0}):
# fetching the user data
def fetch_all_user_details(self, query, filter_data):
if user := self.find(query=query, filter_dict=filter_data):
return user
return None
# inserting the user
def insert_new_user(self, data):
if user := self.insert_one(data=data):
return user
return None
# updating the login time
def update_user(self, update, query):
if user := self.update_one(query=update, data=query):
def update_user(self, query, update):
if user := self.update_one(query=query, data=update):
return user
return None
# deleting users
def delete_user(self, query):
if user := self.delete_one(query=query):
return user
return None
# for filtering
def filter_data_aggregate(self, pipeline):
if user := self.aggregate(pipelines=pipeline):
return user
return None
......@@ -4,14 +4,20 @@ class ErrorMessages:
# Authorization Errors
ERROR_AUTH_FAILED = "Authentication Failed. Please verify token"
ERROR_INVALID_LOGIN = "Your are not authorized to view this website."
ERROR_ACCOUNT_DOESNT_EXIST = "Account Does Not Exist"
ERROR_UNAUTHORIZED_USER_LOGIN = "Account is not available"
ERROR_UNAUTHORIZED_ACCESS = "Your are not authorized to view this page"
ERROR_LOGIN_TYPE_INVALID = "Invalid Login Method"
ERROR_ACCESS_DENIED = "Access Denied!"
ERROR_USER_NOT_REGISTERED = "Account is not registered in the portal."
ERROR_PASSWORD_MISMATCH = "Passwords Authentication Failed. Please enter the correct password"
ERROR_PASSWORD_MISMATCH = "Please enter the correct password"
ERROR_TOKEN_GENERATION = "Unsuccessful token generation"
ERROR_STORING_DATA = "New user registration failed"
ERROR_EMAIL_EXIST = "Email Id exists"
ERROR_IN_FETCHING = "Details cannot be fetched"
ERROR_IN_UPDATING = "Error in Updating"
ERROR_INVALID_REQUEST = "Invalid Request"
ERROR_USER_SESSION = "Not The Users Session"
ERROR_TOKEN_EXPIRED = "Google Token Expired"
# Data Validation
ERROR_INVALID_PASSWORD = "Invalid Password"
......@@ -19,3 +25,6 @@ class ErrorMessages:
ERROR_INVALID_EMAIL = "Invalid Email Id"
ERROR_INVALID_PHONE_NUMBER = "Invalid Phone Number"
ERROR_INVALID_USER_ROLE = "Invalid User Role"
ERROR_USER_ID_DOESNT_EXIST = "User Id doesn't exist"
ERROR_USER_ID = "User Id Not Required"
ERROR_INVALID_TOKEN = "Invalid Token"
This source diff could not be displayed because it is too large. You can view the blob instead.
import logging
import os
from logging.handlers import RotatingFileHandler
import pathlib
from logging import StreamHandler
from logging.handlers import RotatingFileHandler, SocketHandler
from scripts.config import Services, PathConf
def get_logger():
"""
Creates a rotating log
"""
__logger__ = logging.getLogger('')
def read_configuration():
return {
"name": Services.PROJECT_NAME,
"handlers": [
{"type": "RotatingFileHandler", "max_bytes": Services.MAX_BYTES, "back_up_count": Services.BACKUP_COUNT},
{"type": "StreamHandler", "name": Services.PROJECT_NAME},
],
}
def init_logger():
logging_config = read_configuration()
# setting the logger level
"""
Creates a rotating log
"""
__logger__ = logging.getLogger(Services.PROJECT_NAME)
__logger__.setLevel(Services.LOG_LEVEL)
# creating the format for the log
log_formatter = "%(asctime)s - %(levelname)-6s - [%(threadName)5s:%(funcName)5s(): %(lineno)s] - %(message)s"
time_format = "%Y-%m-%d %H:%M:%S"
# getting the path for the logger
file_path = PathConf.LOG_PATH
# setting the format
formatter = logging.Formatter(log_formatter, time_format)
# creating the folder if not exist
if not os.path.exists(file_path):
os.makedirs(file_path)
# joining the path
log_file = os.path.join(f"{file_path}/{Services.PROJECT_NAME}log.log")
# creating rotating file handler with max byte as 1
temp_handler = RotatingFileHandler(log_file, maxBytes=1)
# setting the formatter
temp_handler.setFormatter(formatter)
# setting the handler
__logger__.addHandler(temp_handler)
for each_handler in logging_config["handlers"]:
if (
each_handler["type"] in ["RotatingFileHandler"]
and Services.ENABLE_FILE_LOGGING
):
pathlib.Path(PathConf.LOG_PATH).mkdir(parents=True, exist_ok=True)
log_file = pathlib.Path(PathConf.LOG_PATH, f"{Services.PROJECT_NAME}.log")
temp_handler = RotatingFileHandler(
log_file,
maxBytes=each_handler["max_bytes"],
backupCount=each_handler["back_up_count"],
)
temp_handler.setFormatter(formatter)
elif each_handler["type"] in ["SocketHandler"]:
temp_handler = SocketHandler(each_handler["host"], each_handler["port"])
elif each_handler["type"] in ["StreamHandler"]:
temp_handler = StreamHandler()
temp_handler.setFormatter(formatter)
else:
temp_handler = None
__logger__.addHandler(temp_handler)
return __logger__
logger = get_logger()
logger = init_logger()
......@@ -5,12 +5,17 @@ from pydantic import BaseModel
# default responses
class DefaultResponse(BaseModel):
status: bool = True
message: Optional[str]
status: str
message: str
data: Optional[Any]
class DefaultSuccessResponse(BaseModel):
status: str
message: str
# default failure responses
class DefaultFailureResponse(DefaultResponse):
status: bool = False
error: Any
class DefaultFailureResponse(BaseModel):
status: str
message: Any
......@@ -5,8 +5,10 @@ from pydantic import BaseModel
# model for login request
class LoginRequest(BaseModel):
email: str
password: str
login_type: str
email: Optional[str] = None
password: Optional[str] = None
id_token: Optional[str] = None
class RegistrationData(BaseModel):
......@@ -16,3 +18,34 @@ class RegistrationData(BaseModel):
phone_number: Optional[str]
login_type: str
user_role: str
class UserActions(BaseModel):
action: str
user_id: Optional[str] = None
name: Optional[str] = None
email: Optional[str] = None
password: Optional[str]
phone_number: Optional[str] = None
login_type: Optional[str] = None
user_role: Optional[str] = None
class UsersFilter(BaseModel):
name: Optional[str] = None
email: Optional[str] = None
user_role: Optional[str] = None
class ResetPassword(BaseModel):
user_id: str
old_password: Optional[str] = None
new_password: str
class UserIDValidation(BaseModel):
user_id: str
class EmailValidation(BaseModel):
email: str
from fastapi import APIRouter
from scripts.constants.api import ApiEndPoints
from scripts.services import v1
router = APIRouter()
router = APIRouter(prefix=ApiEndPoints.root)
# routing to the V1 router
router.include_router(v1.router)
......@@ -3,6 +3,6 @@ from scripts.constants.api import ApiEndPoints
from scripts.services.v1 import iot_manager_services
# creating the api router with version as the prefix
router = APIRouter()
router = APIRouter(prefix=ApiEndPoints.version)
# routing to the service api
router.include_router(iot_manager_services.router)
from fastapi import APIRouter, HTTPException, status, Depends
from fastapi import APIRouter, HTTPException, status, Depends, Request
from fastapi.responses import JSONResponse
from scripts.constants.api import ApiEndPoints
from scripts.core.handlers.dashboard_handler import DashboardManagement
from scripts.core.handlers.login_handler import LoginHandlers
from scripts.core.handlers.user_management_handler import UserManagement
from scripts.errors import ErrorMessages
from scripts.logging.logger import logger
from scripts.schemas.project_schema import LoginRequest, RegistrationData
from scripts.schemas.default_responses import DefaultFailureResponse
from scripts.schemas.project_schema import LoginRequest, UserActions, EmailValidation, UserIDValidation, ResetPassword
from scripts.utils.security.authorize_access import AuthorizeAccess
from scripts.utils.security.decorators import MetaInfoSchema, auth
# creating the login api
router = APIRouter(prefix=ApiEndPoints.version)
router = APIRouter()
# initializing the handler
obj_login_handler = LoginHandlers()
obj_user_handler = UserManagement()
obj_dashboard_handler = DashboardManagement()
# login API
@router.post(ApiEndPoints.asset_manager_submit)
async def login_default(
login_type: str,
async def login_default_api(
user_data: LoginRequest,
request: MetaInfoSchema = Depends(auth)
request: Request,
):
try:
# mapper for login types
login_mapper = {
"normal": obj_login_handler.normal_login,
"general_login": obj_login_handler.general_login,
"google": obj_login_handler.google_login,
"microsoft": obj_login_handler.microsoft_login
}
# getting the functions based on the login types
if login_type in login_mapper:
return login_mapper[login_type](user_data, request)
if user_data.login_type in login_mapper:
return login_mapper[user_data.login_type](user_data, request)
else:
return HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid Request")
detail=ErrorMessages.ERROR_INVALID_REQUEST)
except Exception as e:
logger.exception(e)
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.OP_FAILED).dict(),
status_code=status.HTTP_200_OK)
# Forgot Password API
@router.post(ApiEndPoints.asset_manager_forgot)
async def forgot_password(
validation_data: EmailValidation
):
try:
# forgot password
response = obj_login_handler.forgot_password_handler(validation_data.email)
return response
except Exception as e:
logger.exception(e)
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.OP_FAILED).dict(),
status_code=status.HTTP_200_OK)
# Reset Password API for sending email
@router.get(ApiEndPoints.asset_manager_reset)
async def reset_password(
request: Request
):
try:
# Get the JWT token from the query parameters
response = obj_login_handler.validate_jwt(request)
return response
except Exception as e:
logger.exception(e)
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.OP_FAILED).dict(),
status_code=status.HTTP_200_OK)
# Reset Password API
@router.post(ApiEndPoints.asset_manager_reset)
async def reset_password(
reset_data: ResetPassword
):
try:
# Get the JWT token from the query parameters
response = obj_login_handler.reset_user_password(reset_data)
return response
except Exception as e:
logger.exception(e)
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.OP_FAILED).dict(),
status_code=status.HTTP_200_OK)
# Create new users API
@router.post(ApiEndPoints.asset_manager_user_add)
async def user_register(
user_data: RegistrationData,
# User Management API
@router.post(ApiEndPoints.asset_manager_user_table_actions)
async def user_management(
user_data: UserActions,
request: MetaInfoSchema = Depends(auth)
):
try:
response = AuthorizeAccess().admin_authorize(request)
if not response:
return HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS)
# mapper for login types
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS).dict(),
status_code=status.HTTP_200_OK)
# mapper for registration types
register_mapper = {
"normal": obj_user_handler.normal_register,
"general_login": obj_user_handler.general_register,
"google": obj_user_handler.google_register,
"microsoft": obj_user_handler.microsoft_register
}
# getting the functions based on the login types
if user_data.login_type in register_mapper:
if user_data.action == "addnew" and user_data.login_type in register_mapper:
# registration
return register_mapper[user_data.login_type](user_data)
elif user_data.action == "edit":
# updating
return obj_user_handler.update_user_details(user_data)
elif user_data.action == "delete":
# deleting
return obj_user_handler.delete_user_details(user_data.user_id)
else:
return HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid Request")
detail=ErrorMessages.ERROR_INVALID_REQUEST)
except Exception as e:
logger.exception(e)
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.OP_FAILED).dict(),
status_code=status.HTTP_200_OK)
# Update users API
@router.post(ApiEndPoints.asset_manager_user_update)
async def user_register(
# View users API Header
@router.post(ApiEndPoints.asset_manager_user_header)
async def user_view_header(
request: MetaInfoSchema = Depends(auth)
):
try:
# authorize the user
response = AuthorizeAccess().admin_authorize(request)
if not response:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS).dict(),
status_code=status.HTTP_200_OK)
# getting the header for the user table
response = obj_user_handler.fetch_view_header()
if not response:
return HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS)
detail=ErrorMessages.ERROR_IN_FETCHING)
return response
except Exception as e:
logger.exception(e)
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.OP_FAILED).dict(),
status_code=status.HTTP_200_OK)
# Delete users API
@router.post(ApiEndPoints.asset_manager_user_delete)
async def user_register(
# View users API
@router.post(ApiEndPoints.asset_manager_user_view)
async def user_view_data(
request: MetaInfoSchema = Depends(auth)
):
try:
# authorize the user
response = AuthorizeAccess().admin_authorize(request)
if not response:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS).dict(),
status_code=status.HTTP_200_OK)
response = obj_user_handler.fetch_user_details()
if not response:
return HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS)
detail=ErrorMessages.ERROR_IN_FETCHING)
return response
except Exception as e:
logger.exception(e)
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.OP_FAILED).dict(),
status_code=status.HTTP_200_OK)
# View users API
@router.post(ApiEndPoints.asset_manager_user_view)
async def user_register(
# API for change password
@router.post(ApiEndPoints.asset_manager_user_reset)
async def user_change_password(
reset_data: ResetPassword,
request: MetaInfoSchema = Depends(auth)
):
try:
response = AuthorizeAccess().admin_authorize(request)
# authorize the user
response = AuthorizeAccess().login_authorize(request, reset_data)
if not response:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS).dict(),
status_code=status.HTTP_200_OK)
response = obj_user_handler.reset_password(reset_data)
if not response:
return HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ErrorMessages.ERROR_IN_FETCHING)
return response
except Exception as e:
logger.exception(f"From user change password - {e}")
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.OP_FAILED).dict(),
status_code=status.HTTP_200_OK)
# download Button Dashboard
@router.post(ApiEndPoints.asset_manager_dashboard_download)
async def dashboard_download_files(
request: MetaInfoSchema = Depends(auth)
):
try:
response = AuthorizeAccess().login_authorize(request, None)
if not response:
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS).dict(),
status_code=status.HTTP_200_OK)
# getting the data for the download dashboard
response = obj_dashboard_handler.download_header()
if not response:
return HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ErrorMessages.ERROR_IN_FETCHING)
return response
except Exception as e:
logger.exception(e)
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.OP_FAILED).dict(),
status_code=status.HTTP_200_OK)
# API for logout
@router.post(ApiEndPoints.asset_manager_user_logout)
async def user_logout(
user_id: UserIDValidation,
request: MetaInfoSchema = Depends(auth)
):
try:
# authorizing the user
# logging out
response = obj_user_handler.logout_user(user_id, request)
if not response:
return HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ErrorMessages.ERROR_UNAUTHORIZED_ACCESS)
return obj_user_handler.fetch_user_details()
detail=ErrorMessages.ERROR_IN_FETCHING)
return response
except Exception as e:
logger.exception(e)
return JSONResponse(
content=DefaultFailureResponse(status="failed",
message=ErrorMessages.OP_FAILED).dict(),
status_code=status.HTTP_200_OK)
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
"http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html>
<head>
<meta charset="UTF-8" content="">
<meta content="width=device-width, initial-scale=1" name="viewport">
<meta name="x-apple-disable-message-reformatting" content="">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta content="telephone=no" name="format-detection">
<title></title>
<!--[if (mso 16)]>
<style type="text/css">a{text-decoration:none}</style>
<![endif]-->
<!--[if gte mso 9]>
<style>sup{font-size:100%!important}</style><![endif]-->
<!--[if !mso]> ln-->
<link href="https://fonts.googleapis.com/css?family=Roboto:400,400i,700,700i" rel="stylesheet">
<!--<![endif]-->
<style amp-custom type="">
@media only screen and (max-width:600px){.st-br{padding-left:10px;padding-right:10px}p,ul li,ol li,a{font-size:16px;line-height:150%}h1{font-size:30px;text-align:center;line-height:120%}h2{font-size:26px;text-align:center;line-height:120%}h3{font-size:20px;text-align:center;line-height:120%}h1 a{font-size:30px;text-align:center}h2 a{font-size:26px;text-align:center}h3 a{font-size:20px;text-align:center}.es-menu td a{font-size:14px}.es-header-body p,.es-header-body ul li,.es-header-body ol li,.es-header-body a{font-size:16px}.es-footer-body p,.es-footer-body ul li,.es-footer-body ol li,.es-footer-body a{font-size:14px}.es-infoblock p,.es-infoblock ul li,.es-infoblock ol li,.es-infoblock a{font-size:12px}*[class="gmail-fix"]{display:none}.es-m-txt-c,.es-m-txt-c h1,.es-m-txt-c h2,.es-m-txt-c h3{text-align:center}.es-m-txt-r,.es-m-txt-r h1,.es-m-txt-r h2,.es-m-txt-r h3{text-align:right}.es-m-txt-l,.es-m-txt-l h1,.es-m-txt-l h2,.es-m-txt-l h3{text-align:left}.es-m-txt-r amp-img{float:right}.es-m-txt-c amp-img{margin:0 auto}.es-m-txt-l amp-img{float:left}.es-button-border{display:block}a.es-button{font-size:16px;display:block;border-left-width:0;border-right-width:0}.es-btn-fw{border-width:10px 0;text-align:center}.es-adaptive table,.es-btn-fw,.es-btn-fw-brdr,.es-left,.es-right{width:100%}.es-content table,.es-header table,.es-footer table,.es-content,.es-footer,.es-header{width:100%;max-width:600px}.es-adapt-td{display:block;width:100%}.adapt-img{width:100%;height:auto}.es-m-p0{padding:0}.es-m-p0r{padding-right:0}.es-m-p0l{padding-left:0}.es-m-p0t{padding-top:0}.es-m-p0b{padding-bottom:0}.es-m-p20b{padding-bottom:20px}.es-mobile-hidden,.es-hidden{display:none}.es-desk-hidden{display:table-row;width:auto;overflow:visible;float:none;max-height:inherit;line-height:inherit}.es-desk-menu-hidden{display:table-cell}table.es-table-not-adapt,.esd-block-html table{width:auto}table.es-social{display:inline-block}table.es-social td{display:inline-block}}a[x-apple-data-detectors]{color:inherit;text-decoration:none;font-size:inherit;font-family:inherit;font-weight:inherit;line-height:inherit}.es-desk-hidden{display:none;float:left;overflow:hidden;width:0;max-height:0;line-height:0}.es-button-border:hover{border-style:solid solid solid solid;background:#d6a700;border-color:#42d159 #42D159 #42d159 #42D159}.es-button-border:hover a.es-button{background:#d6a700;border-color:#d6a700}s{text-decoration:line-through}body{width:100%;font-family:roboto,"helvetica neue",helvetica,arial,sans-serif}table{border-collapse:collapse;border-spacing:0}table td,html,body,.es-wrapper{padding:0;Margin:0}.es-content,.es-header,.es-footer{table-layout:fixed;width:100%}p,hr{Margin:0}h1,h2,h3,h4,h5{Margin:0;line-height:120%;font-family:tahoma,verdana,segoe,sans-serif}.es-left{float:left}.es-right{float:right}.es-p5{padding:5px}.es-p5t{padding-top:5px}.es-p5b{padding-bottom:5px}.es-p5l{padding-left:5px}.es-p5r{padding-right:5px}.es-p10{padding:10px}.es-p10t{padding-top:10px}.es-p10b{padding-bottom:10px}.es-p10l{padding-left:10px}.es-p10r{padding-right:10px}.es-p15{padding:15px}.es-p15t{padding-top:15px}.es-p15b{padding-bottom:15px}.es-p15l{padding-left:15px}.es-p15r{padding-right:15px}.es-p20{padding:20px}.es-p20t{padding-top:20px}.es-p20b{padding-bottom:20px}.es-p20l{padding-left:20px}.es-p20r{padding-right:20px}.es-p25{padding:25px}.es-p25t{padding-top:25px}.es-p25b{padding-bottom:25px}.es-p25l{padding-left:25px}.es-p25r{padding-right:25px}.es-p30{padding:30px}.es-p30t{padding-top:30px}.es-p30b{padding-bottom:30px}.es-p30l{padding-left:30px}.es-p30r{padding-right:30px}.es-p35{padding:35px}.es-p35t{padding-top:35px}.es-p35b{padding-bottom:35px}.es-p35l{padding-left:35px}.es-p35r{padding-right:35px}.es-p40{padding:40px}.es-p40t{padding-top:40px}.es-p40b{padding-bottom:40px}.es-p40l{padding-left:40px}.es-p40r{padding-right:40px}.es-menu td{border:0}a{font-family:roboto,"helvetica neue",helvetica,arial,sans-serif;font-size:16px;text-decoration:none}h1{font-size:30px;font-style:normal;font-weight:bold;color:#212121}h1 a{font-size:30px}h2{font-size:24px;font-style:normal;font-weight:bold;color:#212121}h2 a{font-size:24px}h3{font-size:20px;font-style:normal;font-weight:normal;color:#212121}h3 a{font-size:20px}p,ul li,ol li{font-size:16px;font-family:roboto,"helvetica neue",helvetica,arial,sans-serif;line-height:150%}ul li,ol li{Margin-bottom:15px}.es-menu td a{text-decoration:none;display:block}.es-wrapper{width:100%;height:100%}.es-wrapper-color{background-color:#f6f6f6}.es-content-body{background-color:transparent}.es-content-body p,.es-content-body ul li,.es-content-body ol li{color:#131313}.es-content-body a{color:#2cb543}.es-header{background-color:transparent}.es-header-body{background-color:#fff}.es-header-body p,.es-header-body ul li,.es-header-body ol li{color:#333;font-size:14px}.es-header-body a{color:#1376c8;font-size:14px}.es-footer{background-color:#f6f6f6}.es-footer-body{background-color:transparent}.es-footer-body p,.es-footer-body ul li,.es-footer-body ol li{color:#131313;font-size:16px}.es-footer-body a{color:#000;font-size:16px}.es-infoblock,.es-infoblock p,.es-infoblock ul li,.es-infoblock ol li{line-height:120%;font-size:12px;color:#fff}.es-infoblock a{font-size:12px;color:#fff}.es-button-border{border-style:solid solid solid solid;border-color:#2cb543 #2CB543 #2cb543 #2CB543;background:#ffc80a;border-width:0;display:inline-block;border-radius:3px;width:auto}.es-p-default{padding-top:0;padding-right:0;padding-bottom:0;padding-left:0}.es-p-all-default{padding:0}
</style>
</head>
<body class="es-wrapper-color">
<div class="es-wrapper-color">
<!--[if gte mso 9]>
<v:background xmlns:v="urn:schemas-microsoft-com:vml" fill="t">
<v:fill type="tile" color="#f6f6f6">.</v:fill>
</v:background>
<![endif]-->
<table class="es-wrapper" width="100%" cellspacing="0" cellpadding="0">
<tbody>
<tr>
<td class="esd-email-paddings st-br" valign="top">
<table cellpadding="0" cellspacing="0" class="es-header esd-header-popover" align="center">
<tbody>
<tr>
<td class="esd-stripe esd-checked" align="center">
<div>
<table bgcolor="transparent" class="es-header-body" align="center" cellpadding="0"
cellspacing="0" width="600" style="background-color:transparent">
<tbody>
<tr>
<td class="esd-structure es-p20t es-p20r es-p20l" align="left">
<table cellpadding="0" cellspacing="0" width="100%">
<tbody>
<tr>
<td width="560" class="esd-container-frame" align="center"
valign="top">
<table cellpadding="0" cellspacing="0" width="100%">
<tbody>
<tr>
<td align="center" class="esd-block-spacer"
height="65"></td>
</tr>
</tbody>
</table>
</td>
</tr>
</tbody>
</table>
</td>
</tr>
</tbody>
</table>
</div>
<!--[if gte mso 9]> </v:textbox> </v:rect> <![endif]-->
</td>
</tr>
</tbody>
</table>
<table cellpadding="0" cellspacing="0" class="es-content" align="center">
<tbody>
<tr>
<td class="esd-stripe" align="center" bgcolor="transparent"
style="background-color:transparent">
<table bgcolor="transparent" class="es-content-body" align="center" cellpadding="0"
cellspacing="0" width="600" style="background-color:transparent">
<tbody>
<tr>
<td class="esd-structure es-p30t es-p15b es-p30r es-p30l" align="left"
style="border-radius:10px 10px 0 0;background-color:#fff" bgcolor="#ffffff">
<table cellpadding="0" cellspacing="0" width="100%">
<tbody>
<tr>
<td width="540" class="esd-container-frame" align="center" valign="top">
<table cellpadding="0" cellspacing="0" width="100%"
style="background-position:left bottom">
<tbody>
<tr>
<td align="center" class="esd-block-text">
<h1><img width = "200" src='https://cloud.ilens.io/assets/images/logo.png' alt="iLens Logo"></h1>
</td>
</tr>
</tbody>
</table>
</td>
</tr>
</tbody>
</table>
</td>
</tr>
</tbody>
</table>
</td>
</tr>
</tbody>
</table>
<table cellpadding="0" cellspacing="0" class="esd-footer-popover es-footer" align="center">
<tbody>
<tr>
<td class="esd-stripe esd-checked" align="center">
<table bgcolor="#31cb4b" class="es-footer-body" align="center" cellpadding="0"
cellspacing="0" width="600">
<tbody>
<tr>
<td class="esd-structure es-p20t es-p20b es-p30r es-p30l" align="left"
style="background-position:left top;background-color:#fafafa" bgcolor="#fafafa">
<table cellpadding="0" cellspacing="0" width="100%">
<tbody>
<tr>
<td width="540" class="esd-container-frame" align="center" valign="top">
<table cellpadding="0" cellspacing="0" width="100%">
<tbody>
<tr>
<td align="left" class="esd-block-text es-m-txt-c">
<p style="line-height:150%">Greetings<strong>,
</strong><br><br>{{ message }}</p>
<br><br/>
<a href="{{ link }}"
class="download-link"
style="color: blue"
target="_blank">Click here
to change password</a>
</td>
</tr>
</tbody>
</table>
</td>
</tr>
</tbody>
</table>
</td>
</tr>
<tr>
<td class="esd-structure es-p20t es-p20b es-p30r es-p30l" align="left"
style="background-color:#fafafa" bgcolor="#fafafa">
<table cellpadding="0" cellspacing="0" width="100%">
<tbody>
<tr>
<td width="540" class="esd-container-frame" align="center" valign="top">
</td>
</tr>
</tbody>
</table>
</td>
</tr>
<tr>
<td class="esd-structure es-p30t es-p30b es-p30r es-p30l"
style="border-radius:0 0 10px 10px;background-color:#efefef" align="left"
bgcolor="#efefef">
<table cellspacing="0" cellpadding="0" width="100%">
<tbody>
<tr>
<td class="es-m-p0r esd-container-frame" width="540" align="center">
<table width="100%" cellspacing="0" cellpadding="0">
<tbody>
<tr>
<td align="left" class="esd-block-text">
<p style="color:#010101; font-size:80%;">
Thanks &
Regards,<br>UnifyTwin
Team<br>
<br>This is an automatically generated email, do not reply.
<br>For any technical assistance
kindly contact support.<br>
<a style="color:#010101; font-size:90%;"
href="mailto:support@unifytwin.com">
Contact us at support@unifytwin.com
</a>&nbsp;
</td>
</tr>
</tbody>
</table>
</td>
</tr>
</tbody>
</table>
</td>
</tr>
<tr>
<td class="esd-structure" align="left" style="background-position:left top">
<table cellpadding="0" cellspacing="0" width="100%">
<tbody>
<tr>
<td width="600" class="esd-container-frame" align="center" valign="top">
<table cellpadding="0" cellspacing="0" width="100%">
<tbody>
<tr>
<td align="center" class="esd-block-spacer"
height="40"></td>
</tr>
</tbody>
</table>
</td>
</tr>
</tbody>
</table>
</td>
</tr>
</tbody>
</table>
</td>
</tr>
</tbody>
</table>
</td>
</tr>
</tbody>
</table>
</div>
</body>
</html>
......@@ -57,3 +57,6 @@ class MongoStageCreator:
def sort_stage(self, stage: dict) -> dict:
return self.add_stage("$sort", stage)
def regex_stage(self, stage: dict) -> dict:
return self.add_stage("$regex", stage)
......@@ -3,15 +3,8 @@ import re
from scripts.logging.logger import logger
# for data validations
class RegexValidation:
@staticmethod
def first_name_validation(first_name):
try:
regex = re.fullmatch('([A-Za-z ]{1,100})', str(first_name))
return regex
except Exception as e:
logger.error(f'An Error While listing the home plans {str(e)}')
@staticmethod
def name_validation(name):
try:
......@@ -29,10 +22,11 @@ class RegexValidation:
logger.error(f'An Error While listing the home plans {str(e)}')
@staticmethod
def email_validation(email):
def password_validation(password):
try:
regex = r'\b[A-Za-z0-9._%+-]+@([A-Za-z0-9.-]{5,20})+\.[A-Z|a-z]{3}\b'
if re.fullmatch(regex, email):
return regex
password_regex = r'^(?=.*[a-z])(?=.*[A-Z])(?=.*[!@#$%^&*()_+={}[\]:;\"\'|,.<>\/?]).{8,15}$'
if re.search(password_regex, password):
return False
except Exception as e:
logger.error(f'An Error While listing the home plans {str(e)}')
\ No newline at end of file
logger.error(f'An Error While listing the home plans {str(e)}')
# response data utils
class ResponseData:
@staticmethod
def user_view_header():
header = {
"actions": [
{
"class": "fa-pencil",
"action": "edit",
"tooltip": "Edit"
},
{
"class": "fa-trash",
"action": "delete",
"tooltip": "Delete"
},
], "externalActions": [
{
"type": "button",
"action": "addnew",
"label": "Create User"
}
], "columnDefs": [
{
"headerName": "User Name",
"field": "email",
"key": "email"
},
{
"headerName": "Name",
"field": "name",
"key": "name"
},
{
"headerName": "Role",
"field": "user_role",
"key": "user_role",
"width": 150,
"flex": 0
}
],
}
return header
# download api util
@staticmethod
def download_file_data():
data = {
"SCN101-Manual (Local Config with IoTSetupUI)":
"https://ilens.io/DownloadFiles/SCN_Device_Configuration_Page_Updates_4_7.pdf",
"SCN101 Firmware 4.7 Updates":
"https://ilens.io/DownloadFiles/SCN101Manual.pdf",
"IoTSetupUI":
"https://ilens.io/DownloadFiles/IoTsetupUI-V1.6.zip",
"CP2102 SCN Windows USB Driver":
"https://ilens.io/DownloadFiles/CP2102_Windows.zip",
"SCNFirmwareBurner":
"https://ilens.io/DownloadFiles/flash_download_tool_v3.8.5.zip",
"SCN101A Firmware - V2.7 (Board: 1.4, 1.6 and 4MB Part.) ":
"https://ilens.io/DownloadFiles/SCN101_Firmware_V2_7_B1_6.bin",
"SCN101A Firmware - V2.8 (Board: 1.4, 1.6 and 4MB Part.) ":
"https://ilens.io/DownloadFiles/SCN101_Firmware_V2_8_B1_6.bin",
"SCN101A4G (SCN Relay, 4G, SCN201 and SCN101L) Firmware - V4.5 (16MB Part.) ":
"https://ilens.io/DownloadFiles/SCN101_R_A4G_SCN201_Firmware_V4_5.bin",
"SCN101A4G (SCN Relay, 4G, SCN201 and SCN101L) Firmware - V4.6 (16MB Part.) ":
"https://ilens.io/DownloadFiles/SCN101_R_A4G_SCN201_Firmware_V4_6.bin",
"SCN101A4G (SCN Relay, 4G, SCN201 and SCN101L) Firmware - V4.7 (16MB Part.) ":
"https://ilens.io/DownloadFiles/SCN101_R_A4G_SCN201_Firmware_V4_7.bin",
"P10_LED_Driver_V2.1 (Board 1.1)":
"https://ilens.io/DownloadFiles/P10_LED_Driver_V2.1.bin",
"SCN-LED Reset Firmware (Board 1.1)":
"https://ilens.io/DownloadFiles/SCN-LED_Reset.bin",
"SCN Reset Firmware (Partition: 4MB)":
"https://ilens.io/DownloadFiles/SCN_Reset_4MB_Part.bin",
"SCN Reset Firmware (Partition: 16MB)":
"https://ilens.io/DownloadFiles/SCN_Reset_16MB_Part.bin"
}
return data
from datetime import datetime, timedelta
import uuid
from datetime import datetime, timedelta, timezone
from scripts.config import Secrets
from scripts.database.mongo.mongo_db import MongoUser
......@@ -12,15 +13,19 @@ mongo_user = MongoUser()
def create_token(
user_id,
ip,
login_token=None,
age=Secrets.ACCESS_TOKEN_EXPIRE_MINUTES,
):
"""
This method is to create a cookie
"""
uid = login_token
if not uid:
uid = str(uuid.uuid4()).replace("-", "")
# creating the payload
payload = {"ip": ip, "user_id": user_id, "token": Secrets.SECRET_KEY, "age": age}
payload = {"ip": ip, "user_id": user_id, "token": Secrets.SECRET_KEY, "uid": uid, "age": age}
# getting the current time
current_time = datetime.now()
current_time = datetime.now(timezone.utc)
# generating the expiry time of the token
exp = current_time + timedelta(minutes=age)
# creating the dictionary with issuer and expiry time
......@@ -28,12 +33,9 @@ def create_token(
_payload = payload | _extras
# encoding the token
new_token = jwt.encode(_payload)
# Add session to redis
login_db.set(user_id, new_token)
login_db.expire(user_id, timedelta(minutes=age))
login_db.set(uid, new_token)
login_db.expire(uid, timedelta(minutes=age))
# Add updated time to mongo db
mongo_user.update_user({"email": user_id}, {"updated_at": current_time})
return user_id
mongo_user.update_user({"user_id": user_id}, {"updated_at": current_time})
return uid, exp
from typing import Optional
from scripts.database.mongo.mongo_db import MongoUser
from scripts.schemas.project_schema import ResetPassword
obj_mongo_user = MongoUser()
class AuthorizeAccess:
@staticmethod
# authorize the user
def admin_authorize(request):
user_data = obj_mongo_user.fetch_one_user_details(request.login_token)
if user_data["user_role"] != "super admin":
try:
# returning the user details
user_data = obj_mongo_user.fetch_one_user_details({"user_id": request.user_id})
if user_data["user_role"] != "super_admin":
return False
return True
except TypeError:
return False
return True
@staticmethod
def login_authorize(request, reset_data: Optional[ResetPassword] = None):
if reset_data is not None:
if reset_data.user_id != request.user_id:
return False
return True
if request.user_id:
return True
return False
......@@ -4,7 +4,6 @@ from fastapi.security import APIKeyCookie
from fastapi.security.api_key import APIKeyBase
from pydantic import BaseModel, Field
from scripts.config import Services
from scripts.database.redis.redis_conn import login_db
from scripts.errors import ErrorMessages
from scripts.logging.logger import logger
......@@ -50,16 +49,8 @@ class _CookieAuthentication(APIKeyBase):
login_token = cookies.get(self.cookie_name) or request.headers.get(
self.cookie_name
)
if not login_token or login_token != Services.PROJECT_NAME:
if not login_token:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
# if the cookie name is same as the service name
if login_token == Services.PROJECT_NAME:
return MetaInfoSchema(
ip_address=request.client.host, # type: ignore
login_token=cookies.get("login-token"),
)
# getting the token stored in redis based on the cookie value
if not login_token:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
jwt_token = self.login_redis.get(login_token)
if not jwt_token:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
......@@ -83,7 +74,6 @@ class _CookieAuthentication(APIKeyBase):
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token doesn't have required fields",
)
return MetaInfoSchema(
user_id=user_id,
ip_address=request.client.host, # type: ignore
......
......@@ -6,6 +6,7 @@ from scripts.config import Secrets, Services
from scripts.logging.logger import logger
# utility for the password
class EncryptDecryptPassword:
def __init__(self):
self.pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
......@@ -37,7 +38,12 @@ class EncryptDecryptPassword:
except Exception as e:
logger.exception(e)
# encrypting the password
def password_encrypt(self, password):
# decrypting the UI password
decrypted_password = self.password_decrypt(password)
hashed_password = self.pwd_context.hash(decrypted_password)
# hashing the decrypted password
if decrypted_password is None:
return None
hashed_password = self.pwd_context.hash(decrypted_password.split("\"")[1])
return hashed_password
......@@ -6,9 +6,10 @@ from scripts.errors import ErrorMessages
from scripts.logging.logger import logger
# user data validations
class UserDataValidations:
@staticmethod
def data_validation(user_data, method, feature):
def register_data_validation(user_data, method, feature):
try:
if user_data.name == "":
return False, {"message": ErrorMessages.ERROR_INVALID_NAME,
......@@ -18,14 +19,27 @@ class UserDataValidations:
user_data.email) is not True:
return False, {"message": ErrorMessages.ERROR_INVALID_EMAIL, "data": user_data.email}
# checking for valid password
if method == 'normal':
if method == 'general':
if user_data.password == "" or user_data.password == "string":
return False, {"message": ErrorMessages.ERROR_INVALID_PASSWORD, "data": user_data.password}
# Validate phone number
if user_data.phone_number == "":
return False, {"message": ErrorMessages.ERROR_INVALID_PHONE_NUMBER,
"data": user_data.phone_number}
if user_data.user_role == "" and method == 'normal' and feature == 'register':
if user_data.user_role == "" and method == 'general' and feature == 'register':
return False, {"message": ErrorMessages.ERROR_INVALID_USER_ROLE,
"data": user_data.phone_number}
return True, None
except Exception as e:
logger.exception(e)
@staticmethod
def update_data_validation(user_data):
try:
if user_data.name == "":
return False, {"message": ErrorMessages.ERROR_INVALID_NAME,
"data": user_data.phone_number}
if user_data.user_role == "":
return False, {"message": ErrorMessages.ERROR_INVALID_USER_ROLE,
"data": user_data.phone_number}
return True, None
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment