Commit 5c699869 authored by arun.uday's avatar arun.uday

project initial structure

parent 9232ec46
uvicorn~=0.21.0
python-dotenv~=1.0.0
pydantic~=1.10.6
fastapi~=0.94.1
\ No newline at end of file
fastapi~=0.94.1
passlib~=1.7.4
pymongo~=4.3.3
\ No newline at end of file
......@@ -19,6 +19,9 @@ class _Services(BaseSettings):
SELF_PROXY: Optional[str] = Field(None, env="DIGITAL_SIGNATURE_PROXY")
LOG_LEVEL: Literal["INFO", "DEBUG", "ERROR", "QTRACE"] = "INFO"
ENABLE_FILE_LOGGING: bool = False
SECRET_KEY = "a8f3e3ed5f0dd73bca711df807b1141e15a499bc5e555d44fcab501c4147ee23"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRES_MINUTES = 800
class _Databases(BaseSettings):
......
from scripts.config import PROJECT_NAME
class ApiEndPoints:
version = "/v1"
start_point = PROJECT_NAME
asset_manager_login = start_point + "/login"
asset_manager_submit = start_point + "/nor-login"
asset_manager_login = "/login"
asset_manager_submit = "/nor-login"
from fastapi.security import OAuth2PasswordBearer
from passlib.context import CryptContext
class LoginManager:
def __init__(self):
self.pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
self.oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def hashed_password(self, password):
return self.pwd_context.hash(password)
# def get_data(self, username, hashed):
from pydantic import BaseModel
from pydantic import BaseModel, EmailStr
class NormalLogin(BaseModel):
username: str
username: EmailStr
password: str
from fastapi import APIRouter
from scripts.constants.api import ApiEndPoints
from scripts.core.handlers.loginmanager import LoginManager
from scripts.schemas.login_schema import NormalLogin
router = APIRouter(prefix=ApiEndPoints.version)
@router.get(ApiEndPoints.asset_manager_login)
def login_default():
return {"message": "here"}
@router.post(ApiEndPoints.asset_manager_login)
def login_default(login_data: NormalLogin):
obj_login_manager = LoginManager()
password_hashed = obj_login_manager.hashed_password(login_data.password)
return password_hashed
@router.post(ApiEndPoints.asset_manager_submit)
def submit_login_details(login_data: NormalLogin):
def submit_login_details():
return {"message": "its working"}
import logging
from typing import Any, Dict, List, Mapping, Optional, Sequence, Tuple, Union
from pymongo import MongoClient
from pymongo.command_cursor import CommandCursor
from pymongo.cursor import Cursor
from pymongo.results import (
DeleteResult,
InsertManyResult,
InsertOneResult,
UpdateResult,
)
from scripts.utils.db_name_util import get_db_name
class MongoCollectionBaseClass:
def __init__(
self,
mongo_client: MongoClient,
database: str,
collection: str,
project_id: Optional[str],
) -> None:
self.client = mongo_client
self.database = database
self.collection = collection
if project_id:
self.database = get_db_name(project_id=project_id, database=self.database)
def __repr__(self) -> str:
return f"{self.__class__.__name__}(database={self.database}, collection={self.collection})"
def insert_one(self, data: Dict) -> InsertOneResult:
"""
The function is used to inserting a document to a collection in a Mongo Database.
:param data: Data to be inserted
:return: Insert ID
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
return collection.insert_one(data)
except Exception as e:
logging.exception(e)
raise
def insert_many(self, data: List) -> InsertManyResult:
"""
The function is used to inserting documents to a collection in a Mongo Database.
:param data: List of Data to be inserted
:return: Insert IDs
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
return collection.insert_many(data)
except Exception as e:
logging.exception(e)
raise
def find(
self,
query: dict,
filter_dict: Optional[dict] = None,
sort: Union[
None, str, Sequence[Tuple[str, Union[int, str, Mapping[str, Any]]]]
] = None,
skip: int = 0,
limit: Optional[int] = None,
) -> Cursor:
"""
The function is used to query documents from a given collection in a Mongo Database
:param query: Query Dictionary
:param filter_dict: Filter Dictionary
:param sort: List of tuple with key and direction. [(key, -1), ...]
:param skip: Skip Number
:param limit: Limit Number
:return: List of Documents
"""
if sort is None:
sort = []
if filter_dict is None:
filter_dict = {"_id": 0}
database_name = self.database
collection_name = self.collection
try:
db = self.client[database_name]
collection = db[collection_name]
if len(sort) > 0:
cursor = (
collection.find(
query,
filter_dict,
)
.sort(sort)
.skip(skip)
)
else:
cursor = collection.find(
query,
filter_dict,
).skip(skip)
if limit:
cursor = cursor.limit(limit)
return cursor
except Exception as e:
logging.exception(e)
raise
def find_one(self, query: dict, filter_dict: Optional[dict] = None) -> dict | None:
try:
database_name = self.database
collection_name = self.collection
if filter_dict is None:
filter_dict = {"_id": 0}
db = self.client[database_name]
collection = db[collection_name]
return collection.find_one(query, filter_dict)
except Exception as e:
logging.exception(e)
raise
def update_one(
self,
query: dict,
data: dict,
upsert: bool = False,
strategy: str = "$set",
) -> UpdateResult:
"""
:param strategy:
:param upsert:
:param query:
:param data:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
return collection.update_one(query, {strategy: data}, upsert=upsert)
except Exception as e:
logging.exception(e)
raise
def update_many(
self, query: dict, data: dict, upsert: bool = False
) -> UpdateResult:
"""
:param upsert:
:param query:
:param data:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
return collection.update_many(query, {"$set": data}, upsert=upsert)
except Exception as e:
logging.exception(e)
raise
def delete_many(self, query: dict) -> DeleteResult:
"""
:param query:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
return collection.delete_many(query)
except Exception as e:
logging.exception(e)
raise
def delete_one(self, query: dict) -> DeleteResult:
"""
:param query:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
return collection.delete_one(query)
except Exception as e:
logging.exception(e)
raise
def distinct(self, query_key: str, filter_json: Optional[dict] = None) -> list:
"""
:param query_key:
:param filter_json:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
return collection.distinct(query_key, filter_json)
except Exception as e:
logging.exception(e)
raise
def aggregate(
self,
pipelines: list,
) -> CommandCursor:
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
return collection.aggregate(pipelines)
except Exception as e:
logging.exception(e)
raise
""" Mongo DB utility
All definitions related to mongo db is defined in this module
"""
import logging
from pymongo import MongoClient
from .mongo_tools import mongo_sync
class MongoConnect:
def __init__(self, uri):
try:
self.uri = uri
self.client = MongoClient(uri, connect=False)
except Exception as e:
logging.exception(e)
raise
def __call__(self, *args, **kwargs):
return self.client
def get_client(self):
return self.client
def __repr__(self):
return f"Mongo Client(uri:{self.uri}, server_info={self.client.server_info()})"
@staticmethod
def get_base_class():
return mongo_sync.MongoCollectionBaseClass
class MongoStageCreator:
@staticmethod
def add_stage(stage_name: str, stage: dict) -> dict:
return {stage_name: stage}
def projection_stage(self, stage: dict) -> dict:
return self.add_stage("$project", stage)
def match_stage(self, stage: dict) -> dict:
return self.add_stage("$match", stage)
def lookup_stage(self, stage: dict) -> dict:
return self.add_stage("$lookup", stage)
def unwind_stage(self, stage: dict) -> dict:
return self.add_stage("$unwind", stage)
def group_stage(self, stage: dict) -> dict:
return self.add_stage("$group", stage)
def add_fields(self, stage: dict) -> dict:
return self.add_stage("$addFields", stage)
def sort_stage(self, stage: dict) -> dict:
return self.add_stage("$sort", stage)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment