Commit 16af59f4 authored by arun.uday's avatar arun.uday

Merge branch '1-arun-uday-asset-manager-v1-0' into 'master'

1 arun uday asset manager v1 0

See merge request !1
parents 61a63dff 92d0e6df
MONGO_URI=mongodb://localhost:27017
DB_NAME=userDB
SERVICE_HOST=0.0.0.0
SERVICE_PORT=8671
PROJECT_NAME=AssetManager
PROJECT_ID=1256
base_path=scripts/
sub_path=log/
ENCODING_TYPE=utf-8
\ No newline at end of file
# iot manager - Arun Uday
if __name__ == "__main__":
from dotenv import load_dotenv
# getting the env variables
load_dotenv()
import uvicorn
from scripts.config import PROJECT_NAME, Services
from scripts.config import Services
import argparse
from scripts.logging.logger import logger
# starting the application
ap = argparse.ArgumentParser()
if __name__ == "__main__":
try:
print("Api for " + PROJECT_NAME)
uvicorn.run("main:app", port=int(Services.PORT))
# creating the argument for the port
ap.add_argument(
"--port",
"-p",
required=False,
default=Services.PORT,
help="Port to start the application.",
)
# creating the argument for the host
ap.add_argument(
"--bind",
"-b",
required=False,
default=Services.HOST,
help="IF to start the application.",
)
arguments = vars(ap.parse_args())
# logging the info
logger.info(f"App Starting at {arguments['bind']}:{arguments['port']}")
# starting the app
uvicorn.run("main:app", host=arguments["bind"], port=int(arguments["port"]))
except Exception as e:
logger.error(e)
logger.exception(e)
......@@ -3,26 +3,38 @@
Author: Arun Uday
Email: arun.uday@knowledgelens.com
Asset Manager Login For Normal User
Asset Manager Login For Normal User Login
---------------------------------------------------------
For instructions on how to run, check README.md
"""
if __name__ == "__main__":
from dotenv import load_dotenv
# getting the env variables
load_dotenv()
import uvicorn
from fastapi import FastAPI
from scripts.services import router
from scripts.config import PROJECT_NAME, Services
from scripts.config import Services as ServiceConf
from fastapi.middleware.cors import CORSMiddleware
from scripts.logging.logger import logger
app = FastAPI()
# routing to the service
app.include_router(router)
# starting the application
if __name__ == "__main__":
try:
print("Api for " + PROJECT_NAME)
uvicorn.run(router, port=int(Services.PORT))
print("Api for " + ServiceConf.PROJECT_NAME)
# enabling cors for getting UI data
if ServiceConf.ENABLE_CORS:
app.add_middleware(
CORSMiddleware,
allow_origins=ServiceConf.CORS_URLS,
allow_credentials=ServiceConf.CORS_ALLOW_CREDENTIALS,
allow_methods=ServiceConf.CORS_ALLOW_METHODS,
allow_headers=ServiceConf.CORS_ALLOW_HEADERS,
)
except Exception as e:
logger.error(e)
import configparser
from typing import Optional, Literal
import pathlib
from typing import Literal
from pydantic import BaseSettings, Field
config = configparser.RawConfigParser()
config.read("conf/application.conf")
PROJECT_NAME = config.get("project_name", "PROJECT_NAME")
class _Services(BaseSettings):
HOST: str = Field(default="127.0.0.1", env="service_host")
PORT: int = Field(default=8000, env="service_port")
PROJECT_NAME = Field(default="AssetManager", env="project_name")
PROJECT_ID = Field(default="1256", env="project_id")
ENCODING_TYPE = Field(default="utf-8", env="encoding_type")
ENABLE_CORS: bool = True
CORS_URLS: list[str] = ["*.ilens.io"]
CORS_ALLOW_CREDENTIALS: bool = True
......@@ -18,6 +17,7 @@ class _Services(BaseSettings):
CORS_ALLOW_HEADERS: list[str] = ["*"]
LOG_LEVEL: Literal["INFO", "DEBUG", "ERROR", "QTRACE"] = "INFO"
ENABLE_FILE_LOGGING: bool = False
KEY_ENCRYPTION = "kliLensKLiLensKL"
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
......@@ -28,11 +28,21 @@ class _Databases(BaseSettings):
DB_NAME: str
class _BasePathConf(BaseSettings):
BASE_PATH: str = "scripts/"
class _PathConf:
BASE_PATH: pathlib.Path = pathlib.Path(_BasePathConf().BASE_PATH)
LOG_PATH: pathlib.Path = BASE_PATH / "log/"
Services = _Services()
Databases = _Databases()
PathConf = _PathConf()
__all__ = [
"PROJECT_NAME",
"Services",
"Databases",
"PathConf",
]
from __future__ import annotations
import base64
import datetime
from Cryptodome.Cipher import AES
from passlib.context import CryptContext
from scripts.config import Services
from scripts.database.mongo.mongo_login import MongoUser
from scripts.errors import ErrorMessages
from scripts.logging.logger import logger
from scripts.utils.mongo_default_queries import MongoQueries
class LoginHandlers:
def __init__(self):
self.mongo_user = MongoUser()
self.mongo_queries = MongoQueries()
self.pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
self.db_user_data = None
self.db_user_data = None
self.dt = datetime.datetime.now()
self.time_dt = datetime.datetime.now()
@staticmethod
def user_data_validation(login_data) -> dict | None:
if login_data.username == "" or login_data.username == "user@example.com":
return {"message": ErrorMessages.ERROR_INVALID_USERNAME_PASSWORD, "data": login_data.username}
if login_data.password == "" or login_data.password == "string":
return {"message": ErrorMessages.ERROR_INVALID_USERNAME_PASSWORD, "data": login_data.password}
return None
def db_data_validation(self, login_data):
self.db_user_data = MongoUser().fetch_user_details(login_data.username)
if not self.db_user_data:
return {"message": ErrorMessages.ERROR_USER_NOT_REGISTERED, "data": login_data.username}
return None
def db_password_matching(self, login_data):
if response := self.db_data_validation(login_data):
return response
if not self.pwd_context.verify(login_data.password, self.db_user_data["password"]):
return {"message": ErrorMessages.ERROR_PASSWORD_MISMATCH, "data": {login_data.username,
login_data.password}}
return None
def un_pad(s):
# un_padding the encrypted password
return s[:-ord(s[len(s) - 1:])]
def password_decrypt(self, password):
try:
# encoding the Key
key = Services.KEY_ENCRYPTION.encode(Services.ENCODING_TYPE)
# decoding the received password
enc = base64.b64decode(password)
# mode for the decryption
mode = AES.MODE_CBC
# getting the initialization vector
iv = enc[:AES.block_size]
# decoding with AES
cipher = AES.new(key, mode, iv)
# decoding the password
data = cipher.decrypt(enc[AES.block_size:])
if len(data) == 0:
raise ValueError("Decrypted data is empty")
# removing the padding to get the password
data = self.un_pad(data)
return data.decode('utf-8')
except Exception as e:
logger.exception(e)
@staticmethod
def user_data_validation(username, password) -> dict | None:
try:
# checking for valid username
if username == "" or username == "user@example.com":
return {"message": ErrorMessages.ERROR_INVALID_USERNAME, "data": username}
# checking for valid password
if password == "" or password == "string":
return {"message": ErrorMessages.ERROR_INVALID_PASSWORD, "data": password}
return None
except Exception as e:
logger.exception(e)
def new_user_login(self, login_data, password):
# check the domain of the email the user has entered
if login_data.username.split("@")[-1] == "knowledgelens.com":
# hash the password
hashed_password = self.pwd_context.hash(password)
# Enter the user as a guest user to the db
if self.mongo_user.insert_user(
self.mongo_queries.insert_user_query(
login_data,
Services.PROJECT_ID,
hashed_password,
self.time_dt)):
return True
return False
def db_data_validation(self, login_data, password):
try:
# fetching the data based on the username
self.db_user_data = MongoUser().fetch_user_details(login_data.username)
# if the user is not available
if not self.db_user_data:
# if the domain of the user email is knowledge lens create the user as a guest user
if self.new_user_login(login_data, password):
return True, {"message": "new_user", "username": login_data.username, "role": "guest"}
else:
return False, {"message": ErrorMessages.ERROR_UNAUTHORIZED_USER_LOGIN,
"data": {"username": login_data.username}}
# Check the project id from the request body
if self.db_user_data["project_id"] != Services.PROJECT_ID or Services.PROJECT_ID != login_data.project_id:
return False, {"message": ErrorMessages.ERROR_UNAUTHORIZED_USER_LOGIN, "data": login_data.username}
# if the user exist
return None, {"message": True}
except Exception as e:
logger.exception(e)
def db_password_matching(self, login_data, password):
try:
# getting the response after checking for the user data in db
response, message = self.db_data_validation(login_data, password)
if response and message["message"] == "new_user":
return None, message
# if the response is false then an error message is send back
if response is not None:
return response, message
# if the user exists in db then password is matched
if not self.pwd_context.verify(password, self.db_user_data["password"]):
return False, {"message": ErrorMessages.ERROR_PASSWORD_MISMATCH,
"data": {"username": login_data.username}}
# if the password is correct
return None, {"username": login_data.username, "role": self.db_user_data["user_role"]}
except Exception as e:
logger.exception(e)
......@@ -31,3 +31,8 @@ class MongoUser(CollectionBaseClass):
if user := self.find_one(query={self.key_email: email}):
return user
return None
def insert_user(self, query):
if user := self.insert_one(data=query):
return user
return None
......@@ -3,6 +3,9 @@ class ErrorMessages:
OP_FAILED = "Operation failed"
# Authorization Errors
ERROR_AUTH_FAILED = "Authentication Failed. Please verify token"
ERROR_INVALID_USERNAME_PASSWORD = "Invalid Username or Password."
ERROR_INVALID_LOGIN = "Your are not authorized to view this website."
ERROR_INVALID_USERNAME = "Invalid Username"
ERROR_INVALID_PASSWORD = "Invalid Password"
ERROR_UNAUTHORIZED_USER_LOGIN = "Account is not available"
ERROR_USER_NOT_REGISTERED = "Account is not registered in the portal."
ERROR_PASSWORD_MISMATCH = "Passwords Authentication Failed. Please enter the correct password"
2023-03-20 14:32:44 - INFO - [MainThread:<module>(): 37] - App Starting at 0.0.0.0:8671
2023-03-20 14:39:15 - INFO - [MainThread:<module>(): 37] - App Starting at 0.0.0.0:8671
2023-03-20 15:05:53 - INFO - [MainThread:<module>(): 37] - App Starting at 0.0.0.0:8671
2023-03-20 15:06:06 - ERROR - [AnyIO worker thread:login_default(): 41] - Object of type set is not JSON serializable
Traceback (most recent call last):
File "E:\Git\meta-services\scripts\services\v1\iot_manager_services.py", line 33, in login_default
return JSONResponse(content=DefaultFailureResponse(error=data).dict(),
File "E:\Git\meta-services\venv\lib\site-packages\starlette\responses.py", line 196, in __init__
super().__init__(content, status_code, headers, media_type, background)
File "E:\Git\meta-services\venv\lib\site-packages\starlette\responses.py", line 55, in __init__
self.body = self.render(content)
File "E:\Git\meta-services\venv\lib\site-packages\starlette\responses.py", line 199, in render
return json.dumps(
File "C:\Users\arun.uday\AppData\Local\Programs\Python\Python39\lib\json\__init__.py", line 234, in dumps
return cls(
File "C:\Users\arun.uday\AppData\Local\Programs\Python\Python39\lib\json\encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "C:\Users\arun.uday\AppData\Local\Programs\Python\Python39\lib\json\encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "C:\Users\arun.uday\AppData\Local\Programs\Python\Python39\lib\json\encoder.py", line 179, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type set is not JSON serializable
2023-03-20 15:08:19 - INFO - [MainThread:<module>(): 37] - App Starting at 0.0.0.0:8671
2023-03-20 15:08:44 - INFO - [MainThread:<module>(): 37] - App Starting at 0.0.0.0:8671
2023-03-20 15:09:21 - INFO - [MainThread:<module>(): 37] - App Starting at 0.0.0.0:8671
2023-03-20 15:09:36 - INFO - [MainThread:<module>(): 37] - App Starting at 0.0.0.0:8671
2023-03-20 15:10:17 - INFO - [MainThread:<module>(): 37] - App Starting at 0.0.0.0:8671
import logging
import pathlib
from logging import StreamHandler
from logging.handlers import RotatingFileHandler, SocketHandler
import os
from logging.handlers import RotatingFileHandler
from scripts.config import PROJECT_NAME, Services
from scripts.config import Services, PathConf
def read_configuration():
return {
"name": PROJECT_NAME,
"handlers": [
{"type": "RotatingFileHandler", "max_bytes": 100000000, "back_up_count": 5},
{"type": "StreamHandler", "name": PROJECT_NAME},
],
}
def init_logger():
logging_config = read_configuration()
"""
Creates a rotating log
def get_logger():
"""
__logger__ = logging.getLogger(PROJECT_NAME)
Creates a rotating log
"""
__logger__ = logging.getLogger('')
# setting the logger level
__logger__.setLevel(Services.LOG_LEVEL)
# creating the format for the log
log_formatter = "%(asctime)s - %(levelname)-6s - [%(threadName)5s:%(funcName)5s(): %(lineno)s] - %(message)s"
time_format = "%Y-%m-%d %H:%M:%S"
# getting the path for the logger
file_path = PathConf.LOG_PATH
# setting the format
formatter = logging.Formatter(log_formatter, time_format)
for each_handler in logging_config["handlers"]:
if (
each_handler["type"] in ["RotatingFileHandler"]
and Services.ENABLE_FILE_LOGGING
):
pathlib.Path("logs").mkdir(parents=True, exist_ok=True)
log_file = pathlib.Path("logs", f"{PROJECT_NAME}.log")
temp_handler = RotatingFileHandler(
log_file,
maxBytes=each_handler["max_bytes"],
backupCount=each_handler["back_up_count"],
)
temp_handler.setFormatter(formatter)
elif each_handler["type"] in ["SocketHandler"]:
temp_handler = SocketHandler(each_handler["host"], each_handler["port"])
elif each_handler["type"] in ["StreamHandler"]:
temp_handler = StreamHandler()
temp_handler.setFormatter(formatter)
else:
temp_handler = None
__logger__.addHandler(temp_handler)
# creating the folder if not exist
if not os.path.exists(file_path):
os.makedirs(file_path)
# joining the path
log_file = os.path.join(f"{file_path}/{Services.PROJECT_NAME}log.log")
# creating rotating file handler with max byte as 1
temp_handler = RotatingFileHandler(log_file, maxBytes=1)
# setting the formatter
temp_handler.setFormatter(formatter)
# setting the handler
__logger__.addHandler(temp_handler)
return __logger__
logger = init_logger()
logger = get_logger()
......@@ -2,6 +2,7 @@ from pydantic import BaseModel
from typing import Union
# user details schema
class UserDetails(BaseModel):
name: str
email: Union[str: None] = None
......
......@@ -3,12 +3,14 @@ from typing import Any, Optional
from pydantic import BaseModel
# default responses
class DefaultResponse(BaseModel):
status: bool = True
message: Optional[str]
payload: Optional[Any]
# default failure responses
class DefaultFailureResponse(DefaultResponse):
status: bool = False
error: Any
......@@ -3,6 +3,7 @@ from typing import Union
from pydantic import BaseModel, EmailStr
# model for normal login
class NormalLogin(BaseModel):
username: Union[EmailStr, None] = None
password: Union[str, None] = None
......
from fastapi import FastAPI
from fastapi import APIRouter
from scripts.services import v1
router = FastAPI()
router = APIRouter()
# routing to the V1 router
router.include_router(v1.router)
......@@ -2,5 +2,7 @@ from fastapi import APIRouter
from scripts.constants.api import ApiEndPoints
from scripts.services.v1 import iot_manager_services
# creating the api router with version as the prefix
router = APIRouter(prefix=ApiEndPoints.asset_manager_login)
# routing to the service api
router.include_router(iot_manager_services.router)
from fastapi import APIRouter
from fastapi import APIRouter, status
from fastapi.responses import JSONResponse
from scripts.constants.api import ApiEndPoints
from scripts.core.handlers.login_handler import LoginHandlers
from scripts.errors import ErrorMessages
from scripts.logging.logger import logger
from scripts.schemas.default_responses import DefaultResponse, DefaultFailureResponse
from scripts.schemas.login_schema import NormalLogin
# creating the login api
router = APIRouter(prefix=ApiEndPoints.version)
# initializing the handler
obj_login_handler = LoginHandlers()
@router.post(ApiEndPoints.asset_manager_submit)
def login_default(login_data: NormalLogin):
try:
response = obj_login_handler.user_data_validation(login_data)
# decrypting the password from the UI
decrypted_password = obj_login_handler.password_decrypt(login_data.password)
# validating the received inputs empty or not
response = obj_login_handler.user_data_validation(login_data.username, decrypted_password)
# Account is not registered
if response is not None:
return DefaultFailureResponse(error=response["message"])
response = obj_login_handler.db_password_matching(login_data)
return JSONResponse(content=DefaultFailureResponse(error=response["message"]).dict(),
status_code=status.HTTP_400_BAD_REQUEST)
# checking for the account and password matching
response, data = obj_login_handler.db_password_matching(login_data, decrypted_password)
if response is not None and data["message"] == ErrorMessages.ERROR_UNAUTHORIZED_USER_LOGIN:
return JSONResponse(content=DefaultFailureResponse(error=data).dict(),
status_code=status.HTTP_404_NOT_FOUND)
if response is not None:
return DefaultFailureResponse(error=response["message"])
return DefaultResponse(message="Login Successful", payload={"username": login_data.username})
return JSONResponse(content=DefaultFailureResponse(error=data).dict(),
status_code=status.HTTP_401_UNAUTHORIZED)
# sending successful response to UI
return JSONResponse(
content=DefaultResponse(message="Login Successful", payload=data).dict(),
status_code=status.HTTP_200_OK)
except Exception as e:
logger.exception(e)
class MongoQueries:
@staticmethod
def insert_user_query(login_data, project_id, hashed_password, time_dt):
query = {"project_id": project_id, "name": "", "email": login_data.username,
"password": hashed_password,
"user_role": "guest",
"is_alive": True, "created_at": time_dt,
"updated_at": time_dt}
return query
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment