Commit 195e58d4 authored by OWAIZ MUSTAFA KHAN's avatar OWAIZ MUSTAFA KHAN 😎

Setup: index_update

parent 2968047c
.venv
.idea
\ No newline at end of file
......@@ -2,9 +2,7 @@
Author: Owaiz Mustafa Khan
Email: owaiz.mustafakhan@rockwellautomation.com
"""
import pymongo
from pymongo import MongoClient
from pymongo.synchronous.collection import Collection
from pymongo.synchronous.database import Database
......@@ -40,6 +38,68 @@ def execute_aggregate(collection: Collection, query: list):
data = list(data) if data else []
return data
# Other Utils
def get_data(data: str):
if data == 'ASC':
return pymongo.ASCENDING
elif data == 'DESC':
return pymongo.DESCENDING
elif data == 'GEO2D':
return pymongo.GEO2D
elif data == 'GEOSPHERE':
return pymongo.GEOSPHERE
elif data == 'HASHED':
return pymongo.HASHED
elif data == 'TEXT':
return pymongo.TEXT
def get_collection_info(index: dict):
collection_name = index.get('collection', index.get('collection_name'))
db_name = index.get('db', index.get('db_name'))
return collection_name, db_name
def get_index_info(index: dict) -> list[dict]:
collection_name, db_name = get_collection_info(index)
collection = get_collection(collection_name, db_name)
# Get all index from mongo
all_index = collection.list_indexes()
# Format to readable data
index_infos = [
{"name": idx["name"], "fields": list(idx["key"].keys())}
for idx in all_index
]
return index_infos
def index_exists(index: dict, by_name: bool = False) -> bool:
index_infos = get_index_info(index)
if by_name:
for i in index_infos:
if index.get('name') == i.get('name'):
return True
return False
# Compare if index already exists
for index_info in index_infos:
if index.get('fields') == index_info.get('fields'):
return True
return False
def make_keys(indexes: list[dict]) -> list[tuple] | str:
result = list()
keys = indexes[0].get('keys')
if indexes[0].get('type') == 'simple':
return keys
for key in keys:
result.append((
key[0],
get_data(key[1])
))
return result
# Operations
def find_all(collection: Collection):
......@@ -72,7 +132,7 @@ def find_index(collection: Collection,
:param include_schema_name: True if you want to include schema name in result else False [default: True]
:param include_database_name: True if you want to include databse name in result else False [default: True]
:param collection: Object of the collection you want to access
:return: A list of all records present in the collection or [] if there are no records in collections
:return: A ``list`` of all records present in the collection or [] if there are no records in collections
"""
query = list()
......
......@@ -5,73 +5,11 @@ Email: owaiz.mustafakhan@rockwellautomation.com
import pymongo
from pymongo.errors import OperationFailure
from scripts.schemas.mongo_schema import DeleteIndex, UpdateIndexFromDB
from scripts.utils.common import get_collection
def get_data(data: str):
if data == 'ASC':
return pymongo.ASCENDING
elif data == 'DESC':
return pymongo.DESCENDING
elif data == 'GEO2D':
return pymongo.GEO2D
elif data == 'GEOSPHERE':
return pymongo.GEOSPHERE
elif data == 'HASHED':
return pymongo.HASHED
elif data == 'TEXT':
return pymongo.TEXT
def get_collection_info(index: dict):
collection_name = index.get('collection', index.get('collection_name'))
db_name = index.get('db', index.get('db_name'))
return collection_name, db_name
def get_index_info(index: dict) -> list[dict]:
collection_name, db_name = get_collection_info(index)
collection = get_collection(collection_name, db_name)
# Get all index from mongo
all_index = collection.list_indexes()
# Format to readable data
index_infos = [
{"name": idx["name"], "fields": list(idx["key"].keys())}
for idx in all_index
]
return index_infos
from scripts.utils.mongo_utils import get_collection, get_collection_info, make_keys, index_exists, get_index_info
def index_exists(index: dict, by_name: bool = False) -> bool:
index_infos = get_index_info(index)
if by_name:
for i in index_infos:
if index.get('name') == i.get('name'):
return True
return False
# Compare if index already exists
for index_info in index_infos:
if index.get('fields') == index_info.get('fields'):
return True
return False
def make_keys(indexes: list[dict]) -> list[tuple] | str:
result = list()
keys = indexes[0].get('keys')
if indexes[0].get('type') == 'simple':
return keys
for key in keys:
result.append((
key[0],
get_data(key[1])
))
return result
def add_new_index(index_info):
try:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment