Commit 5a7d4881 authored by arun.uday's avatar arun.uday

AssetManager-V1.0- Not Reviewed

updated files to provide a way to decrypt password from the UI
parent c2ba10de
......@@ -3,6 +3,7 @@ if __name__ == "__main__":
from dotenv import load_dotenv
load_dotenv()
import uvicorn
from scripts.config import PROJECT_NAME, Services
......@@ -12,6 +13,7 @@ from scripts.logging.logger import logger
if __name__ == "__main__":
try:
print("Api for " + PROJECT_NAME)
# uvicorn run
uvicorn.run("main:app", port=int(Services.PORT))
except Exception as e:
logger.error(e)
[project_name]
[PROJECT_NAME]
PROJECT_NAME = AssetManager
[PATH]
base_path = scripts/
sub_path = log/
......@@ -5,7 +5,6 @@ Email: arun.uday@knowledgelens.com
Asset Manager Login For Normal User
---------------------------------------------------------
For instructions on how to run, check README.md
"""
......@@ -27,6 +26,7 @@ app.include_router(router)
if __name__ == "__main__":
try:
print("Api for " + PROJECT_NAME)
# enabling cors for getting UI data
if ServiceConf.ENABLE_CORS:
app.add_middleware(
CORSMiddleware,
......
......@@ -5,12 +5,16 @@ from pydantic import BaseSettings, Field
config = configparser.RawConfigParser()
config.read("conf/application.conf")
PROJECT_NAME = config.get("project_name", "PROJECT_NAME")
PROJECT_NAME = config.get("PROJECT_NAME", "PROJECT_NAME")
class _Services(BaseSettings):
HOST: str = Field(default="127.0.0.1", env="service_host")
PORT: int = Field(default=8000, env="service_port")
# path
BASE_PATH = config.get("PATH", 'base_path')
SUB_PATH = config.get("PATH", "sub_path")
FULL_PATH = BASE_PATH + SUB_PATH
ENABLE_CORS: bool = True
CORS_URLS: list[str] = ["*.ilens.io"]
CORS_ALLOW_CREDENTIALS: bool = True
......@@ -18,6 +22,7 @@ class _Services(BaseSettings):
CORS_ALLOW_HEADERS: list[str] = ["*"]
LOG_LEVEL: Literal["INFO", "DEBUG", "ERROR", "QTRACE"] = "INFO"
ENABLE_FILE_LOGGING: bool = False
KEY_ENCRYPTION = "kliLensKLiLensKL"
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
......
from pydantic import BaseSettings, Field
PROJECT_NAME = "IotManager"
class _Services(BaseSettings):
HOST: str = Field(default="127.0.0.1", env="service_host")
PORT: int = Field(default=8000, env="service_port")
class _Databases(BaseSettings):
MONGO_URI: str
Services = _Services()
Databases = _Databases()
__all__ = [
"PROJECT_NAME",
"Services",
"Databases",
]
from __future__ import annotations
import base64
from Cryptodome.Cipher import AES
from passlib.context import CryptContext
from scripts.config import Services
from scripts.database.mongo.mongo_login import MongoUser
from scripts.errors import ErrorMessages
from scripts.logging.logger import logger
class LoginHandlers:
......@@ -12,23 +17,65 @@ class LoginHandlers:
self.db_user_data = None
@staticmethod
def user_data_validation(login_data) -> dict | None:
if login_data.username == "" or login_data.username == "user@example.com":
return {"message": ErrorMessages.ERROR_INVALID_USERNAME, "data": login_data.username}
if login_data.password == "" or login_data.password == "string":
return {"message": ErrorMessages.ERROR_INVALID_PASSWORD, "data": login_data.password}
return None
def db_data_validation(self, login_data):
self.db_user_data = MongoUser().fetch_user_details(login_data.username)
if not self.db_user_data:
return {"message": ErrorMessages.ERROR_USER_NOT_REGISTERED, "data": login_data.username}
return None
def db_password_matching(self, login_data):
if response := self.db_data_validation(login_data):
return response
if not self.pwd_context.verify(login_data.password, self.db_user_data["password"]):
return {"message": ErrorMessages.ERROR_PASSWORD_MISMATCH, "data": {login_data.username,
login_data.password}}
return None
def un_pad(s):
return s[:-ord(s[len(s) - 1:])]
def password_decrypt(self, password):
try:
# encoding the Key
key = Services.KEY_ENCRYPTION.encode('utf-8')
# decoding the received password
enc = base64.b64decode(password)
# getting the initialization vector
iv = enc[:AES.block_size]
# decoding with AES
cipher = AES.new(key, AES.MODE_CBC, iv)
# decoding the password
data = cipher.decrypt(enc[AES.block_size:])
if len(data) == 0:
raise ValueError("Decrypted data is empty")
# removing the padding to get the password
data = self.un_pad(data)
return data.decode('utf-8')
except Exception as e:
logger.exception(e)
@staticmethod
def user_data_validation(username, password) -> dict | None:
try:
# checking for valid username
if username == "" or username == "user@example.com":
return {"message": ErrorMessages.ERROR_INVALID_USERNAME, "data": username}
# checking for valid password
if password == "" or password == "string":
return {"message": ErrorMessages.ERROR_INVALID_PASSWORD, "data": password}
return None
except Exception as e:
logger.exception(e)
def db_data_validation(self, username):
try:
# fetching the data based on the username
self.db_user_data = MongoUser().fetch_user_details(username)
# if the user is not available
if not self.db_user_data:
return False, {"message": ErrorMessages.ERROR_USER_NOT_REGISTERED, "data": username}
# if the user exist
return None, {"message": True}
except Exception as e:
logger.exception(e)
def db_password_matching(self, username, password):
try:
# getting the response after checking for the user data in db
response, message = self.db_data_validation(username)
# if the response is false then a error message is send back
if response is not None:
return response, message
# if the user exists in db then password is matched
if not self.pwd_context.verify(password, self.db_user_data["password"]):
return False, {"message": ErrorMessages.ERROR_PASSWORD_MISMATCH, "data": {"username": username}}
# if the password is correct
return None, {"username": username, "role": self.db_user_data["user_role"]}
except Exception as e:
logger.exception(e)
......@@ -31,3 +31,8 @@ class MongoUser(CollectionBaseClass):
if user := self.find_one(query={self.key_email: email}):
return user
return None
def insert_user(self, query):
if user := self.insert_one(data=query):
return user
return None
......@@ -3,6 +3,7 @@ class ErrorMessages:
OP_FAILED = "Operation failed"
# Authorization Errors
ERROR_AUTH_FAILED = "Authentication Failed. Please verify token"
ERROR_INVALID_LOGIN = "Your are not authorized to view this website."
ERROR_INVALID_USERNAME = "Invalid Username"
ERROR_INVALID_PASSWORD = "Invalid Password"
ERROR_USER_NOT_REGISTERED = "Account is not registered in the portal."
......
2023-03-17 18:08:46 - ERROR - [AnyIO worker thread:password_decrypt(): 35] - Incorrect padding
Traceback (most recent call last):
File "E:\Git\meta-services\scripts\core\handlers\login_handler.py", line 26, in password_decrypt
enc = base64.b64decode(password)
File "C:\Users\arun.uday\AppData\Local\Programs\Python\Python39\lib\base64.py", line 87, in b64decode
return binascii.a2b_base64(s)
binascii.Error: Incorrect padding
2023-03-17 18:08:46 - ERROR - [AnyIO worker thread:login_default(): 30] - secret must be unicode or bytes, not None
Traceback (most recent call last):
File "E:\Git\meta-services\scripts\services\v1\iot_manager_services.py", line 22, in login_default
response, data = obj_login_handler.db_password_matching(login_data.username, decrypted_password)
File "E:\Git\meta-services\scripts\core\handlers\login_handler.py", line 55, in db_password_matching
if not self.pwd_context.verify(password, self.db_user_data["password"]):
File "E:\Git\meta-services\venv\lib\site-packages\passlib\context.py", line 2347, in verify
return record.verify(secret, hash, **kwds)
File "E:\Git\meta-services\venv\lib\site-packages\passlib\utils\handlers.py", line 787, in verify
validate_secret(secret)
File "E:\Git\meta-services\venv\lib\site-packages\passlib\utils\handlers.py", line 122, in validate_secret
raise exc.ExpectedStringError(secret, "secret")
TypeError: secret must be unicode or bytes, not None
2023-03-17 18:09:51 - ERROR - [AnyIO worker thread:password_decrypt(): 35] - Incorrect padding
Traceback (most recent call last):
File "E:\Git\meta-services\scripts\core\handlers\login_handler.py", line 26, in password_decrypt
enc = base64.b64decode(password)
File "C:\Users\arun.uday\AppData\Local\Programs\Python\Python39\lib\base64.py", line 87, in b64decode
return binascii.a2b_base64(s)
binascii.Error: Incorrect padding
2023-03-17 18:09:51 - ERROR - [AnyIO worker thread:login_default(): 30] - secret must be unicode or bytes, not None
Traceback (most recent call last):
File "E:\Git\meta-services\scripts\services\v1\iot_manager_services.py", line 22, in login_default
response, data = obj_login_handler.db_password_matching(login_data.username, decrypted_password)
File "E:\Git\meta-services\scripts\core\handlers\login_handler.py", line 55, in db_password_matching
if not self.pwd_context.verify(password, self.db_user_data["password"]):
File "E:\Git\meta-services\venv\lib\site-packages\passlib\context.py", line 2347, in verify
return record.verify(secret, hash, **kwds)
File "E:\Git\meta-services\venv\lib\site-packages\passlib\utils\handlers.py", line 787, in verify
validate_secret(secret)
File "E:\Git\meta-services\venv\lib\site-packages\passlib\utils\handlers.py", line 122, in validate_secret
raise exc.ExpectedStringError(secret, "secret")
TypeError: secret must be unicode or bytes, not None
import logging
import pathlib
from logging import StreamHandler
from logging.handlers import RotatingFileHandler, SocketHandler
import os
from logging.handlers import RotatingFileHandler
from scripts.config import PROJECT_NAME, Services
from scripts.config import Services, PROJECT_NAME
def read_configuration():
return {
"name": PROJECT_NAME,
"handlers": [
{"type": "RotatingFileHandler", "max_bytes": 100000000, "back_up_count": 5},
{"type": "StreamHandler", "name": PROJECT_NAME},
],
}
def init_logger():
logging_config = read_configuration()
"""
Creates a rotating log
def get_logger():
"""
__logger__ = logging.getLogger(PROJECT_NAME)
Creates a rotating log
"""
__logger__ = logging.getLogger('')
# setting the logger level
__logger__.setLevel(Services.LOG_LEVEL)
# creating the format for the log
log_formatter = "%(asctime)s - %(levelname)-6s - [%(threadName)5s:%(funcName)5s(): %(lineno)s] - %(message)s"
time_format = "%Y-%m-%d %H:%M:%S"
# getting the path for the logger
file_path = Services.FULL_PATH
# setting the format
formatter = logging.Formatter(log_formatter, time_format)
for each_handler in logging_config["handlers"]:
if (
each_handler["type"] in ["RotatingFileHandler"]
and Services.ENABLE_FILE_LOGGING
):
pathlib.Path("logs").mkdir(parents=True, exist_ok=True)
log_file = pathlib.Path("logs/", f"{PROJECT_NAME}.log")
temp_handler = RotatingFileHandler(
log_file,
maxBytes=each_handler["max_bytes"],
backupCount=each_handler["back_up_count"],
)
temp_handler.setFormatter(formatter)
elif each_handler["type"] in ["SocketHandler"]:
temp_handler = SocketHandler(each_handler["host"], each_handler["port"])
elif each_handler["type"] in ["StreamHandler"]:
temp_handler = StreamHandler()
temp_handler.setFormatter(formatter)
else:
temp_handler = None
__logger__.addHandler(temp_handler)
# creating the folder if not exist
if not os.path.exists(file_path):
os.makedirs(file_path)
# joining the path
log_file = os.path.join(f"{file_path}{PROJECT_NAME}log.log")
# creating rotating file handler with max byte as 1
temp_handler = RotatingFileHandler(log_file, maxBytes=1)
# setting the formatter
temp_handler.setFormatter(formatter)
# setting the handler
__logger__.addHandler(temp_handler)
return __logger__
logger = init_logger()
logger = get_logger()
......@@ -2,6 +2,7 @@ from pydantic import BaseModel
from typing import Union
# user details schema
class UserDetails(BaseModel):
name: str
email: Union[str: None] = None
......
......@@ -3,12 +3,14 @@ from typing import Any, Optional
from pydantic import BaseModel
# default responses
class DefaultResponse(BaseModel):
status: bool = True
message: Optional[str]
payload: Optional[Any]
# default failure responses
class DefaultFailureResponse(DefaultResponse):
status: bool = False
error: Any
......@@ -3,6 +3,7 @@ from typing import Union
from pydantic import BaseModel, EmailStr
# model for normal login
class NormalLogin(BaseModel):
username: Union[EmailStr, None] = None
password: Union[str, None] = None
......
......@@ -2,4 +2,5 @@ from fastapi import APIRouter
from scripts.services import v1
router = APIRouter()
# routing to the V1 router
router.include_router(v1.router)
......@@ -2,5 +2,7 @@ from fastapi import APIRouter
from scripts.constants.api import ApiEndPoints
from scripts.services.v1 import iot_manager_services
# creating the api router with version as the prefix
router = APIRouter(prefix=ApiEndPoints.asset_manager_login)
# routing to the service api
router.include_router(iot_manager_services.router)
......@@ -7,23 +7,33 @@ from scripts.logging.logger import logger
from scripts.schemas.default_responses import DefaultResponse, DefaultFailureResponse
from scripts.schemas.login_schema import NormalLogin
# creating the login api
router = APIRouter(prefix=ApiEndPoints.version)
# initializing the handler
obj_login_handler = LoginHandlers()
@router.post(ApiEndPoints.asset_manager_submit)
def login_default(login_data: NormalLogin):
try:
response = obj_login_handler.user_data_validation(login_data)
# decrypting the password from the UI
decrypted_password = obj_login_handler.password_decrypt(login_data.password)
# validating the received inputs empty or not
response = obj_login_handler.user_data_validation(login_data.username, decrypted_password)
if response is not None:
return JSONResponse(content=DefaultFailureResponse(error=response["message"]).dict(),
status_code=status.HTTP_400_BAD_REQUEST)
response = obj_login_handler.db_password_matching(login_data)
# checking for the account and password matching
response, data = obj_login_handler.db_password_matching(login_data.username, decrypted_password)
if response is not None:
return JSONResponse(content=DefaultFailureResponse(error=response["message"]).dict(),
return JSONResponse(content=DefaultFailureResponse(error=data).dict(),
status_code=status.HTTP_401_UNAUTHORIZED)
# sending successful response to UI
return JSONResponse(
content=DefaultResponse(message="Login Successful", payload={"username": login_data.username}).dict(),
content=DefaultResponse(message="Login Successful", payload=data).dict(),
status_code=status.HTTP_200_OK)
except Exception as e:
logger.exception(e)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment