Commit d3574d25 authored by ramya.r's avatar ramya.r

first

parents
MONGO_URI=mongodb://intern_23:intern%40123@192.168.0.220:2717/?authSource=interns_b2_23&authMechanism=SCRAM-SHA-256
\ No newline at end of file
# Default ignored files
/shelf/
/workspace.xml
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="C:\Users\ramya.r\Anaconda3 (2)" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/aggregation.iml" filepath="$PROJECT_DIR$/.idea/aggregation.iml" />
</modules>
</component>
</project>
\ No newline at end of file
[SERVICE]
port=8000
host=0.0.0.0
[MONGO-DB]
mongo_uri=$MONGO_URI
from dotenv import load_dotenv
load_dotenv()
import uvicorn
from fastapi import FastAPI
from script.config.app_config import Service
from script.service.employee import router
app = FastAPI(title="Employee")
app.include_router(router)
if __name__ == "__main__":
uvicorn.run("main:app", host=Service.host, port=Service.port)
import pymongo
client = pymongo.MongoClient(
"mongodb://intern_23:intern%40123@192.168.0.220:2717/?authSource=interns_b2_23&authMechanism=SCRAM-SHA-256")
db = client["interns_b2_23"]
col = db["employee"]
r = col.find()
print(list(r))
import os
from configparser import SafeConfigParser
config = SafeConfigParser()
config.read('conf/application.conf')
class Service:
port = int(config.getint("SERVICE", "port"))
host = config.get("SERVICE", "host")
class Mongo:
mongo_uri: str = os.environ.get("MONGO_URI")
print(mongo_uri)
class APIEndpoints:
aggregate = "/aggregate"
delete = "/delete"
save = "/save"
employee_base = "/employee"
insert = "/insert"
update = "/update"
find = "/find"
from script.config.app_config import Mongo
from script.utils.mongo_utils import MongoConnect
mongo_client = MongoConnect(uri=Mongo.mongo_uri)()
from script.utils.mongo_utils import MongoCollectionBaseClass
class EmployeeDetails(MongoCollectionBaseClass):
def __init__(self, mongo_client):
super().__init__(mongo_client=mongo_client, database="interns_b2_23", collection="emp_data")
def insert_item(self, data):
self.insert_one(data)
\ No newline at end of file
class NameDoesNotExist(Exception):
pass
\ No newline at end of file
from script.core.db.mongo.interns2023 import mongo_client
from script.core.db.mongo.interns2023.employee import EmployeeDetails
from script.core.schema.employee import Employee
class EmployeeData:
def __init__(self):
self.employee_col = EmployeeDetails(mongo_client=mongo_client)
def insert_data(self, request_data: Employee):
try:
d = {"emp_id": request_data.emp_id,
"emp_name": request_data.emp_name,
"emp_department": request_data.emp_department,
"emp_sal": request_data.emp_sal}
self.employee_col.insert_one(d)
except Exception as e:
print(e, "Error Detected in inserting")
def aggregate(self, emp_id):
try:
pipeline = [
{
"$match":
{
"emp_id": emp_id,
},
},
{
"$sort":
{
"emp_name": 1,
},
},
{
"$project":
{
"emp_": "$emp_name",
"_id": 0,
}
}
]
data=self.employee_col.aggregate(pipeline)
list_=[]
for i in data :
list_.append(i)
return list_
except Exception as e:
print(e, "Error Detected in inserting")
from typing import Optional
from pydantic import BaseModel
# define a model for Item
class Employee(BaseModel):
emp_id: Optional[int]
emp_name: str
emp_department: str
emp_sal: int
from typing import Any, Optional
from pydantic import BaseModel
class DefaultResponse(BaseModel):
status: str = "failed"
message: str
data: Optional[Any]
import logging
from fastapi.routing import APIRouter
from script.constants import APIEndpoints
from script.core.handlers.employee import EmployeeData
from script.core.schema.employee import Employee
from script.core.schema.response import DefaultResponse
router = APIRouter(prefix=APIEndpoints.employee_base)
handler = EmployeeData()
@router.post(APIEndpoints.insert)
def insert_item(request_data: Employee):
try:
handler.insert_data(request_data)
return DefaultResponse(message="Successfully Found", status="success")
except ValueError:
return DefaultResponse(message="Due to value error")
except Exception as e:
logging.exception(e)
return DefaultResponse(message="Finding Failed due to server error")
@router.post(APIEndpoints.aggregate)
def aggregate_data(emp_id: int):
try:
data= handler.aggregate(emp_id)
return DefaultResponse(message="Successfully Found", status="success",data=data)
except ValueError:
return DefaultResponse(message="Due to value error")
except Exception as e:
logging.exception(e)
return DefaultResponse(message="Finding Failed due to server error")
from fastapi import APIRouter
from pymongo import MongoClient
from script.config.app_config import Mongo
router = APIRouter()
# Create a MongoClient instance
client = MongoClient(Mongo.mongo_uri)
# Connect to a database
db = client.mydatabase
from typing import Dict, List
from pymongo import MongoClient
import logging
class MongoConnect:
def __init__(self, uri):
try:
self.uri = uri
self.client = MongoClient(self.uri, connect=False)
except Exception as e:
logging.error(f"Exception in connection {(str(e))}")
raise e
def __call__(self, *args, **kwargs):
return self.client
def __repr__(self):
return f"Mongo Client(uri:{self.uri}, server_info={self.client.server_info()})"
class MongoCollectionBaseClass:
def __init__(self, mongo_client, database, collection):
self.client = mongo_client
self.database = database
self.collection = collection
def __repr__(self):
return f"{self.__class__.__name__}(database={self.database}, collection={self.collection}"
def insert_one(self, data: Dict):
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.insert_one(data)
return response.inserted_id
except Exception as e:
logging.error(f"Error in inserting the data {str(e)}")
raise e
def aggregate(self, pipeline: List):
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.aggregate(pipeline)
return response
except Exception as e:
logging.error(f"Error in inserting the data {str(e)}")
raise e
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment