Commit 64302ca5 authored by arjun.b's avatar arjun.b

first commit

parents
MONGO_URI=mongodb://localhost:27017
SERVICE_HOST=127.0.0.1
SERVICE_PORT=8000
PROJECT_NAME=AssetManagerV2
LOG_PATH=log
LOG_LEVEL=INFO
BACKUP_COUNT=5
MAX_BYTES=10000000
ENABLE_BACKUP=True
BACKUP_DB=iot_devices_backup
WORKERS=1
CONFIG_PATH=DATA/Config/
DEVICE_LIST_PATH=scripts/utils/device_list.json
YML_PATH=conf/config.yml
\ No newline at end of file
# Default ignored files
/shelf/
/workspace.xml
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Python 3.9" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyPep8NamingInspection" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<option name="ignoredErrors">
<list>
<option value="N801" />
<option value="N803" />
</list>
</option>
</inspection_tool>
</profile>
</component>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.9" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/Asset Manager V2.iml" filepath="$PROJECT_DIR$/.idea/Asset Manager V2.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
import uvicorn
from scripts.config import Services
from scripts.logging.logger import logger
if __name__ == "__main__":
try:
uvicorn.run("main:app", host=Services.HOST, port=Services.PORT)
except Exception as e:
logger.error(f"error from app {e}")
2023-05-12 10:58:24 - ERROR - [MainThread:<module>(): 10] - error from app post() missing 1 required positional argument: 'path'
2023-05-12 10:58:41 - ERROR - [MainThread:<module>(): 10] - error from app post() missing 1 required positional argument: 'path'
2023-05-12 11:14:43 - INFO - [MainThread:store_tag(): 12] - tag of a device is stored
2023-05-12 13:04:46 - ERROR - [MainThread:update_db(): 28] - error from asset management.py
2023-05-12 13:07:05 - ERROR - [MainThread:update_db(): 29] - error from asset management.py
2023-05-12 14:08:53 - ERROR - [MainThread:update_db(): 29] - error from asset management.py
2023-05-12 14:26:10 - ERROR - [MainThread:update_db(): 29] - error from asset management.py
2023-05-12 14:51:52 - ERROR - [MainThread:update_db(): 29] - error from asset management.py
2023-05-12 14:59:41 - ERROR - [MainThread:update_db(): 29] - error from asset management.py
2023-05-12 15:04:04 - ERROR - [MainThread:update_db(): 29] - error from asset management.py
2023-05-12 15:05:24 - ERROR - [MainThread:update_db(): 29] - error from asset management.py
2023-05-12 16:38:46 - ERROR - [MainThread:update_db(): 29] - error from asset management.py
2023-05-12 16:47:03 - ERROR - [MainThread:update_db(): 29] - error from asset management.py
2023-05-12 16:54:31 - ERROR - [MainThread:update_db(): 29] - error from asset management.py
2023-05-12 16:56:18 - INFO - [MainThread:update_db(): 26] - update method called
2023-05-12 16:56:18 - ERROR - [MainThread:update_db(): 30] - error from asset management.py
2023-05-12 16:57:14 - INFO - [MainThread:update_db(): 26] - update method called
2023-05-12 16:57:14 - ERROR - [MainThread:update_db(): 30] - error from asset management.py
2023-05-12 16:57:36 - INFO - [MainThread:update_db(): 26] - update method called
2023-05-12 16:57:36 - ERROR - [MainThread:update_db(): 30] - error from asset management.py update_name() missing 1 required positional argument: 'self'
2023-05-12 16:58:56 - INFO - [MainThread:update_db(): 26] - update method called
2023-05-12 16:58:56 - ERROR - [MainThread:update_one(): 147] - The dollar ($) prefixed field '$set' in '$set' is not allowed in the context of an update's replacement document. Consider using an aggregation pipeline with $replaceWith., full error: {'index': 0, 'code': 52, 'errmsg': "The dollar ($) prefixed field '$set' in '$set' is not allowed in the context of an update's replacement document. Consider using an aggregation pipeline with $replaceWith."}
Traceback (most recent call last):
File "E:\Asset Manager V2\scripts\utils\mongo_tools\mongo_sync.py", line 145, in update_one
return collection.update_one(query, {strategy: data}, upsert=upsert)
File "E:\Anaconda\lib\site-packages\pymongo\collection.py", line 1028, in update_one
self._update_retryable(
File "E:\Anaconda\lib\site-packages\pymongo\collection.py", line 877, in _update_retryable
return self.__database.client._retryable_write(
File "E:\Anaconda\lib\site-packages\pymongo\mongo_client.py", line 1552, in _retryable_write
return self._retry_with_session(retryable, func, s, None)
File "E:\Anaconda\lib\site-packages\pymongo\mongo_client.py", line 1438, in _retry_with_session
return self._retry_internal(retryable, func, session, bulk)
File "E:\Anaconda\lib\site-packages\pymongo\mongo_client.py", line 1470, in _retry_internal
return func(session, sock_info, retryable)
File "E:\Anaconda\lib\site-packages\pymongo\collection.py", line 869, in _update
return self._update(
File "E:\Anaconda\lib\site-packages\pymongo\collection.py", line 846, in _update
_check_write_command_response(result)
File "E:\Anaconda\lib\site-packages\pymongo\helpers.py", line 229, in _check_write_command_response
_raise_last_write_error(write_errors)
File "E:\Anaconda\lib\site-packages\pymongo\helpers.py", line 211, in _raise_last_write_error
raise WriteError(error.get("errmsg"), error.get("code"), error)
pymongo.errors.WriteError: The dollar ($) prefixed field '$set' in '$set' is not allowed in the context of an update's replacement document. Consider using an aggregation pipeline with $replaceWith., full error: {'index': 0, 'code': 52, 'errmsg': "The dollar ($) prefixed field '$set' in '$set' is not allowed in the context of an update's replacement document. Consider using an aggregation pipeline with $replaceWith."}
2023-05-12 16:58:56 - ERROR - [MainThread:update_db(): 30] - error from asset management.py The dollar ($) prefixed field '$set' in '$set' is not allowed in the context of an update's replacement document. Consider using an aggregation pipeline with $replaceWith., full error: {'index': 0, 'code': 52, 'errmsg': "The dollar ($) prefixed field '$set' in '$set' is not allowed in the context of an update's replacement document. Consider using an aggregation pipeline with $replaceWith."}
2023-05-12 17:03:51 - INFO - [MainThread:update_db(): 26] - update method called
2023-05-12 17:03:51 - ERROR - [MainThread:update_db(): 30] - error from asset management.py update_name() missing 1 required positional argument: 'mac'
2023-05-12 17:06:55 - INFO - [MainThread:update_db(): 27] - update method called
2023-05-12 17:06:55 - ERROR - [MainThread:update_one(): 147] - cannot encode object: TestUpdate(mac='b8:27:eb:6c:67:9f'), of type: <class 'scripts.schemas.test_schema.TestUpdate'>
Traceback (most recent call last):
File "E:\Asset Manager V2\scripts\utils\mongo_tools\mongo_sync.py", line 145, in update_one
return collection.update_one(query, {strategy: data}, upsert=upsert)
File "E:\Anaconda\lib\site-packages\pymongo\collection.py", line 1028, in update_one
self._update_retryable(
File "E:\Anaconda\lib\site-packages\pymongo\collection.py", line 877, in _update_retryable
return self.__database.client._retryable_write(
File "E:\Anaconda\lib\site-packages\pymongo\mongo_client.py", line 1552, in _retryable_write
return self._retry_with_session(retryable, func, s, None)
File "E:\Anaconda\lib\site-packages\pymongo\mongo_client.py", line 1438, in _retry_with_session
return self._retry_internal(retryable, func, session, bulk)
File "E:\Anaconda\lib\site-packages\pymongo\mongo_client.py", line 1470, in _retry_internal
return func(session, sock_info, retryable)
File "E:\Anaconda\lib\site-packages\pymongo\collection.py", line 869, in _update
return self._update(
File "E:\Anaconda\lib\site-packages\pymongo\collection.py", line 838, in _update
result = sock_info.command(
File "E:\Anaconda\lib\site-packages\pymongo\pool.py", line 726, in command
self._raise_connection_failure(error)
File "E:\Anaconda\lib\site-packages\pymongo\pool.py", line 710, in command
return command(self, dbname, spec, secondary_ok,
File "E:\Anaconda\lib\site-packages\pymongo\network.py", line 118, in command
request_id, msg, size, max_doc_size = message._op_msg(
File "E:\Anaconda\lib\site-packages\pymongo\message.py", line 742, in _op_msg
return _op_msg_uncompressed(
bson.errors.InvalidDocument: cannot encode object: TestUpdate(mac='b8:27:eb:6c:67:9f'), of type: <class 'scripts.schemas.test_schema.TestUpdate'>
2023-05-12 17:06:55 - ERROR - [MainThread:update_db(): 31] - error from asset management.py cannot encode object: TestUpdate(mac='b8:27:eb:6c:67:9f'), of type: <class 'scripts.schemas.test_schema.TestUpdate'>
2023-05-12 17:10:06 - INFO - [MainThread:update_db(): 27] - update method called
2023-05-12 17:10:06 - ERROR - [MainThread:update_one(): 147] - The dollar ($) prefixed field '$set' in '$set' is not allowed in the context of an update's replacement document. Consider using an aggregation pipeline with $replaceWith., full error: {'index': 0, 'code': 52, 'errmsg': "The dollar ($) prefixed field '$set' in '$set' is not allowed in the context of an update's replacement document. Consider using an aggregation pipeline with $replaceWith."}
Traceback (most recent call last):
File "E:\Asset Manager V2\scripts\utils\mongo_tools\mongo_sync.py", line 145, in update_one
return collection.update_one(query, {strategy: data}, upsert=upsert)
File "E:\Anaconda\lib\site-packages\pymongo\collection.py", line 1028, in update_one
self._update_retryable(
File "E:\Anaconda\lib\site-packages\pymongo\collection.py", line 877, in _update_retryable
return self.__database.client._retryable_write(
File "E:\Anaconda\lib\site-packages\pymongo\mongo_client.py", line 1552, in _retryable_write
return self._retry_with_session(retryable, func, s, None)
File "E:\Anaconda\lib\site-packages\pymongo\mongo_client.py", line 1438, in _retry_with_session
return self._retry_internal(retryable, func, session, bulk)
File "E:\Anaconda\lib\site-packages\pymongo\mongo_client.py", line 1470, in _retry_internal
return func(session, sock_info, retryable)
File "E:\Anaconda\lib\site-packages\pymongo\collection.py", line 869, in _update
return self._update(
File "E:\Anaconda\lib\site-packages\pymongo\collection.py", line 846, in _update
_check_write_command_response(result)
File "E:\Anaconda\lib\site-packages\pymongo\helpers.py", line 229, in _check_write_command_response
_raise_last_write_error(write_errors)
File "E:\Anaconda\lib\site-packages\pymongo\helpers.py", line 211, in _raise_last_write_error
raise WriteError(error.get("errmsg"), error.get("code"), error)
pymongo.errors.WriteError: The dollar ($) prefixed field '$set' in '$set' is not allowed in the context of an update's replacement document. Consider using an aggregation pipeline with $replaceWith., full error: {'index': 0, 'code': 52, 'errmsg': "The dollar ($) prefixed field '$set' in '$set' is not allowed in the context of an update's replacement document. Consider using an aggregation pipeline with $replaceWith."}
2023-05-12 17:10:06 - ERROR - [MainThread:update_db(): 31] - error from asset management.py The dollar ($) prefixed field '$set' in '$set' is not allowed in the context of an update's replacement document. Consider using an aggregation pipeline with $replaceWith., full error: {'index': 0, 'code': 52, 'errmsg': "The dollar ($) prefixed field '$set' in '$set' is not allowed in the context of an update's replacement document. Consider using an aggregation pipeline with $replaceWith."}
2023-05-12 18:48:17 - INFO - [MainThread:update_db(): 27] - update method called
2023-05-12 18:50:07 - INFO - [MainThread:update_db(): 27] - update method called
2023-05-12 18:51:58 - INFO - [MainThread:update_db(): 27] - update method called
2023-05-12 18:51:58 - ERROR - [MainThread:update_one(): 147] - cannot encode object: TestUpdate(mac='b8:27:eb:4f:5e:87'), of type: <class 'scripts.schemas.test_schema.TestUpdate'>
Traceback (most recent call last):
File "E:\Asset Manager V2\scripts\utils\mongo_tools\mongo_sync.py", line 145, in update_one
return collection.update_one(query, {strategy: data}, upsert=upsert)
File "E:\Anaconda\lib\site-packages\pymongo\collection.py", line 1028, in update_one
self._update_retryable(
File "E:\Anaconda\lib\site-packages\pymongo\collection.py", line 877, in _update_retryable
return self.__database.client._retryable_write(
File "E:\Anaconda\lib\site-packages\pymongo\mongo_client.py", line 1552, in _retryable_write
return self._retry_with_session(retryable, func, s, None)
File "E:\Anaconda\lib\site-packages\pymongo\mongo_client.py", line 1438, in _retry_with_session
return self._retry_internal(retryable, func, session, bulk)
File "E:\Anaconda\lib\site-packages\pymongo\mongo_client.py", line 1470, in _retry_internal
return func(session, sock_info, retryable)
File "E:\Anaconda\lib\site-packages\pymongo\collection.py", line 869, in _update
return self._update(
File "E:\Anaconda\lib\site-packages\pymongo\collection.py", line 838, in _update
result = sock_info.command(
File "E:\Anaconda\lib\site-packages\pymongo\pool.py", line 726, in command
self._raise_connection_failure(error)
File "E:\Anaconda\lib\site-packages\pymongo\pool.py", line 710, in command
return command(self, dbname, spec, secondary_ok,
File "E:\Anaconda\lib\site-packages\pymongo\network.py", line 118, in command
request_id, msg, size, max_doc_size = message._op_msg(
File "E:\Anaconda\lib\site-packages\pymongo\message.py", line 742, in _op_msg
return _op_msg_uncompressed(
bson.errors.InvalidDocument: cannot encode object: TestUpdate(mac='b8:27:eb:4f:5e:87'), of type: <class 'scripts.schemas.test_schema.TestUpdate'>
2023-05-12 18:51:58 - ERROR - [MainThread:update_db(): 31] - error from asset management.py cannot encode object: TestUpdate(mac='b8:27:eb:4f:5e:87'), of type: <class 'scripts.schemas.test_schema.TestUpdate'>
2023-05-12 18:58:27 - INFO - [MainThread:update_db(): 27] - update method called
import uvicorn
from fastapi import FastAPI
from scripts import services
from scripts.config import Services
from scripts.logging.logger import logger
app = FastAPI()
app.include_router(services.router)
# starting the application
if __name__ == "__main__":
try:
uvicorn.run("main:app", host=Services.HOST, port=Services.PORT)
except Exception as e:
logger.exception(e)
from pydantic import BaseSettings, Field
from dotenv import load_dotenv
load_dotenv()
PROJECT_NAME = "assets_manager_V2"
class _Services(BaseSettings):
HOST: str = Field(default="127.0.0.1", env="service_host")
PORT: int = Field(default=8000, env="service_port")
PROJECT_NAME = Field(default="AssetManager", env="project_name")
ENCODING_TYPE = Field(default="utf-8", env="encoding_type")
ENABLE_CORS: bool = True
CORS_URLS: list = ["*.ilens.io"]
CORS_ALLOW_CREDENTIALS: bool = True
CORS_ALLOW_METHODS: list = ["GET", "POST"]
CORS_ALLOW_HEADERS: list = ["*"]
LOG_LEVEL: str
BACKUP_COUNT: int
MAX_BYTES: int
ENABLE_FILE_LOGGING: bool = True
ENABLE_BACKUP_STORING: bool = Field(default=False, env="enable_backup")
WORKERS: int = Field(default=1, env="workers")
class _Databases(BaseSettings):
MONGO_URI: str
BACKUP_DB: str
class _BasePathConf(BaseSettings):
BASE_PATH: str = "/"
class _PathConf(BaseSettings):
LOG_PATH: str
CONFIG_PATH: str
DEVICE_LIST_PATH: str
YML_PATH: str
Services = _Services()
Databases = _Databases()
PathConf = _PathConf()
__all__ = [
"PROJECT_NAME",
"Services",
"Databases",
"PathConf",
]
class _APIEndPoints:
root: str = "/asset_manager_api"
api_create: str = "/create"
db_test: str = "/update"
APIEndPoints = _APIEndPoints()
from scripts.config import Databases
class DatabaseConstants:
# Databases
db_iot_manager = "asset_manager"
backup_db = Databases.BACKUP_DB
# Collections
collection = "test"
def db_test(self):
self.insert_one()
\ No newline at end of file
from pydantic import BaseModel
from scripts.config import Databases
from scripts.utils.mongo_utils import MongoConnect
mongo_obj = MongoConnect(uri=Databases.MONGO_URI)
mongo_client = mongo_obj()
CollectionBaseClass = mongo_obj.get_base_class()
class MongoBaseSchema(BaseModel):
pass
from scripts.constants.db_constants import DatabaseConstants
database = DatabaseConstants.db_iot_manager
from typing import Any
from scripts.constants.db_constants import DatabaseConstants
from scripts.database.mongodb import CollectionBaseClass, mongo_client
from scripts.database.mongodb.asset_manager import database
from scripts.logging.logger import logger
from scripts.schemas.test_schema import TestUpdate
from scripts.utils.mongo_utils import MongoStageCreator
collection_name = DatabaseConstants.collection
mongo_stage_creator = MongoStageCreator()
class Devices(CollectionBaseClass):
def __init__(self):
super().__init__(
mongo_client,
database=database,
collection=collection_name
)
def update_name(self, mac):
# Construct the filter to match the document with the given _id
filter = {"mac_address": mac}
# Construct the update document to set the customer field to "ARJUN B"
update = {"customer": "ARJUN B"}
# Call the update_one method to update the document
result = self.update_one(filter, update)
# Print the number of documents updated
print(result.modified_count)
# def update_db(self, data: TestUpdate) -> Any:
# """
# :param mac_address:
# :return:
# """
# try:
# logger.info("entered check_mac_address function. devices.py")
# # self.update_one({"mac_address": mac_address}, {"$set":
# # {"customer": "Arjun B"}})
# self.update_one(query={"mac_address": data.mac}, data={"customer": "Arjun B"})
# # Device with mac_address already exists
#
# except Exception as e:
# logger.error(f"error from check_mac_address() - {e}")
# return False
import logging
import os
from logging.handlers import RotatingFileHandler
from scripts.config import Services, PathConf, PROJECT_NAME
def get_logger():
"""
Creates a rotating log
"""
__logger__ = logging.getLogger('')
# setting the logger level
__logger__.setLevel(Services.LOG_LEVEL)
# creating the format for the log
log_formatter = "%(asctime)s - %(levelname)-6s - [%(threadName)5s:%(funcName)5s(): %(lineno)s] - %(message)s"
time_format = "%Y-%m-%d %H:%M:%S"
# getting the path for the logger
file_path = PathConf.LOG_PATH
# setting the format
formatter = logging.Formatter(log_formatter, time_format)
# creating the folder if not exist
if not os.path.exists(file_path):
os.makedirs(file_path)
# joining the path
log_file = os.path.join(f"{file_path}/{PROJECT_NAME}_log.log")
# creating rotating file handler with max byte as 1
temp_handler = RotatingFileHandler(log_file, maxBytes=1)
# setting the formatter
temp_handler.setFormatter(formatter)
# setting the handler
__logger__.addHandler(temp_handler)
return __logger__
logger = get_logger()
from pydantic import BaseModel
class TestUpdate(BaseModel):
mac: str
from fastapi import APIRouter
from scripts.constants.api import APIEndPoints
from scripts.services import v2
router = APIRouter()
router.include_router(v2.router)
from fastapi import APIRouter
from scripts.constants.api import APIEndPoints
from scripts.services.v2 import asset_management_services
# creating the api router with version as the prefix
router = APIRouter(prefix=APIEndPoints.root)
# routing to the service api
router.include_router(asset_management_services.router)
import logging
from fastapi import APIRouter, Request
from scripts.constants.api import APIEndPoints
from scripts.database.mongodb.asset_manager.collections.device import Devices
from scripts.logging.logger import logger
from scripts.schemas.test_schema import TestUpdate
router = APIRouter()
obj = Devices()
@router.post(APIEndPoints.api_create, tags=["tag api"])
async def store_tag():
logger.info("tag of a device is stored")
try:
# payload=await response.body()
# status=
return {"data": "api running successfully"}
except Exception as e:
logger.error(f"error from asset_management")
@router.post(APIEndPoints.db_test, tags=["update test"])
async def update_db(data: TestUpdate):
try:
logger.info("update method called")
obj.update_name(data.mac)
return {"data": "updated"}
except Exception as e:
logger.error(f"error from asset management.py {e}")
from __future__ import annotations
import logging
from typing import Any, Dict, List, Mapping, Optional, Sequence, Tuple, Union
from pymongo import MongoClient
from pymongo.command_cursor import CommandCursor
from pymongo.cursor import Cursor
from pymongo.results import (
DeleteResult,
InsertManyResult,
InsertOneResult,
UpdateResult,
)
class MongoCollectionBaseClass:
def __init__(
self,
mongo_client: MongoClient,
database: str,
collection: str,
) -> None:
self.client = mongo_client
self.database = database
self.collection = collection
def __repr__(self) -> str:
return f"{self.__class__.__name__}(database={self.database}, collection={self.collection})"
def insert_one(self, data: Dict) -> InsertOneResult:
"""
The function is used to inserting a document to a collection in a Mongo Database.
:param data: Data to be inserted
:return: Insert ID
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
return collection.insert_one(data)
except Exception as e:
logging.exception(e)
raise
def insert_many(self, data: List) -> InsertManyResult:
"""
The function is used to inserting documents to a collection in a Mongo Database.
:param data: List of Data to be inserted
:return: Insert IDs
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
return collection.insert_many(data)
except Exception as e:
logging.exception(e)
raise
def find(
self,
query: dict,
filter_dict: Optional[dict] = None,
sort: Union[
None, str, Sequence[Tuple[str, Union[int, str, Mapping[str, Any]]]]
] = None,
skip: int = 0,
limit: Optional[int] = None,
) -> Cursor:
"""
The function is used to query documents from a given collection in a Mongo Database
:param query: Query Dictionary
:param filter_dict: Filter Dictionary
:param sort: List of tuple with key and direction. [(key, -1), ...]
:param skip: Skip Number
:param limit: Limit Number
:return: List of Documents
"""
if sort is None:
sort = []
if filter_dict is None:
filter_dict = {"_id": 0}
database_name = self.database
collection_name = self.collection
try:
db = self.client[database_name]
collection = db[collection_name]
if len(sort) > 0:
cursor = (
collection.find(
query,
filter_dict,
)
.sort(sort)
.skip(skip)
)
else:
cursor = collection.find(
query,
filter_dict,
).skip(skip)
if limit:
cursor = cursor.limit(limit)
return cursor
except Exception as e:
logging.exception(e)
raise
def find_one(self, query: dict, filter_dict: Optional[dict] = None) -> dict | None:
try:
database_name = self.database
collection_name = self.collection
if filter_dict is None:
filter_dict = {"_id": 0}
db = self.client[database_name]
collection = db[collection_name]
return collection.find_one(query, filter_dict)
except Exception as e:
logging.exception(e)
raise
def update_one(
self,
query: dict,
data: dict,
upsert: bool = False,
strategy: str = "$set",
) -> UpdateResult:
"""
:param strategy:
:param upsert:
:param query:
:param data:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
return collection.update_one(query, {strategy: data}, upsert=upsert)
except Exception as e:
logging.exception(e)
raise
def update_many(
self, query: dict, data: dict, upsert: bool = False
) -> UpdateResult:
"""
:param upsert:
:param query:
:param data:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
return collection.update_many(query, {"$set": data}, upsert=upsert)
except Exception as e:
logging.exception(e)
raise
def delete_many(self, query: dict) -> DeleteResult:
"""
:param query:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
return collection.delete_many(query)
except Exception as e:
logging.exception(e)
raise
def delete_one(self, query: dict) -> DeleteResult:
"""
:param query:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
return collection.delete_one(query)
except Exception as e:
logging.exception(e)
raise
def find_one_and_delete(self, query: dict) -> DeleteResult:
"""
:param query:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
return collection.find_one_and_delete(query)
except Exception as e:
logging.exception(e)
raise
def distinct(self, query_key: str, filter_json: Optional[dict] = None) -> list:
"""
:param query_key:
:param filter_json:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
return collection.distinct(query_key, filter_json)
except Exception as e:
logging.exception(e)
raise
def aggregate(
self,
pipelines: list,
) -> CommandCursor:
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
return collection.aggregate(pipelines)
except Exception as e:
logging.exception(e)
raise
""" Mongo DB utility
All definitions related to mongodb db is defined in this module
"""
import logging
from pymongo import MongoClient
from scripts.utils.mongo_tools import mongo_sync
class MongoConnect:
def __init__(self, uri):
try:
self.uri = uri
self.client = MongoClient(uri, connect=False)
except Exception as e:
logging.exception(e)
raise
def __call__(self, *args, **kwargs):
return self.client
def get_client(self):
return self.client
def __repr__(self):
return f"Mongo Client(uri:{self.uri}, server_info={self.client.server_info()})"
@staticmethod
def get_base_class():
return mongo_sync.MongoCollectionBaseClass
class MongoStageCreator:
@staticmethod
def add_stage(stage_name: str, stage: dict) -> dict:
return {stage_name: stage}
def projection_stage(self, stage: dict) -> dict:
return self.add_stage("$project", stage)
def match_stage(self, stage: dict) -> dict:
return self.add_stage("$match", stage)
def lookup_stage(self, stage: dict) -> dict:
return self.add_stage("$lookup", stage)
def unwind_stage(self, stage: dict) -> dict:
return self.add_stage("$unwind", stage)
def group_stage(self, stage: dict) -> dict:
return self.add_stage("$group", stage)
def add_fields(self, stage: dict) -> dict:
return self.add_stage("$addFields", stage)
def sort_stage(self, stage: dict) -> dict:
return self.add_stage("$sort", stage)
from pymongo import MongoClient
# Connection URI
uri = "mongodb://ilens:ilens%401234@192.168.0.220:4720/"
# Create a new MongoClient
client = MongoClient(uri)
# Get a reference to the database and collection
db = client["Asset_test"]
collection = db["test"]
# Construct the filter to match the document with the given _id
filter = {"mac_address": "b8:27:eb:ca:99:f1"}
# Construct the update document to set the customer field to "ARJUN B"
update = {"$set": {"customer": "ARJUN B"}}
# Call the update_one method to update the document
result = collection.update_one(filter, update)
# Print the number of documents updated
print(result.modified_count)
# Close the MongoClient
client.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment