Commit 8c663dfa authored by OWAIZ MUSTAFA KHAN's avatar OWAIZ MUSTAFA KHAN 😎

Completed update_all_index_from_db()

parent 195e58d4
......@@ -115,13 +115,13 @@ def delete_index(payload: DeleteIndex):
exit(0)
delete_index(DeleteIndex(
name='lookup_name_-1_lookup_id_-1',
collection_name='lookup_table',
metadata_collection_name='test_collection',
db_name='ilens_configuration',
metadata_db_name='__test'
))
# delete_index(DeleteIndex(
# name='lookup_name_-1_lookup_id_-1',
# collection_name='lookup_table',
# metadata_collection_name='test_collection',
# db_name='ilens_configuration',
# metadata_db_name='__test'
# ))
# delete_index(DeleteIndex(
# name='id_1',
......@@ -132,4 +132,4 @@ delete_index(DeleteIndex(
# ))
# update_all_index_from_db(UpdateIndexFromDB())
\ No newline at end of file
update_all_index_from_db(UpdateIndexFromDB(collection_name='index_info_v2'))
\ No newline at end of file
"""
Author: Owaiz Mustafa Khan
Email: owaiz.mustafakhan@rockwellautomation.com
"""
from pymongo.errors import OperationFailure
from scripts.schemas.mongo_schema import DeleteIndex, UpdateIndexFromDB
from scripts.utils.mongo_utils import get_collection, get_collection_info, get_index_info
from scripts.utils.mongo_utils_v2 import index_exists_v2, make_keys
def check_and_create_index(collection_info: dict):
index = collection_info.get('index')
collection, database = get_collection_info(collection_info)
for idx in index:
# if index_exists_v2(idx, {
# 'collection': collection_info.get('collection'),
# 'database': collection_info.get('database')
# }): # Checking if the index already exists
# continue
add_new_index(idx, collection, database) # Adding new index
def add_new_index(index_info: dict, collection: str, database: str):
try:
additional_properties = index_info.get('additional_properties')
fields = make_keys(index_info)
# return
collection = get_collection(collection, database)
if not additional_properties:
collection.create_index(keys=fields)
return
collection.create_index(
keys = fields,
unique=additional_properties.get('unique', False),
sparse=additional_properties.get('sparse', False),
hidden=additional_properties.get('hidden', False),
background=additional_properties.get('background', False)
)
except OperationFailure as of:
if 'Index with name: email_1 already exists with different options' in str(of):
print('Cannot add index as it already exists')
return
except Exception as e:
print(e)
return
def update_all_index_from_db(payload: UpdateIndexFromDB):
try:
collection = get_collection(payload.collection_name, payload.db_name)
# Get all index metadata stored in Mongo
collections_info = list(collection.find({}, {'_id': 0}))
# Checks that data is present
if not len(collections_info):
return {
'status': 'COMPLETED',
'message': 'No index metadata found to create indexes.'
}
# Iterate through data and add new index
for collection_info in collections_info:
check_and_create_index(collection_info)
except Exception as e:
print(f'Exception occurred during update: {e}')
exit(0)
def delete_index(payload: DeleteIndex):
try:
i_collection = get_collection(payload.collection_name, payload.db_name)
if not index_exists_v2(payload.model_dump()):
return
collection = get_collection(payload.metadata_collection_name, payload.metadata_db_name)
index_infos = get_index_info(payload.model_dump())
# Deleting metadata from mongo
docs = collection.find({"db": payload.db_name, "collection": payload.collection_name})
for doc in docs:
indexes = doc.get("indexes", [])
new_indexes = []
for index in indexes:
index_type = index.get("type")
keys = index.get("keys")
if isinstance(keys, list):
_ = list()
for key in keys:
_.append(key[0])
keys = _
for index_info in index_infos:
if ([keys] if index_type == 'simple' else keys) == index_info.get('fields'):
print(f"Deleting entire document: {doc['_id']}")
collection.delete_one({"_id": doc["_id"]})
break
else:
new_indexes.append(index)
else:
# If loop wasn't broken (i.e., not deleted), update or delete based on new index array
if not new_indexes:
collection.delete_one({"_id": doc["_id"]})
else:
collection.update_one(
{"_id": doc["_id"]},
{"$set": {"indexes": new_indexes}}
)
i_collection.drop_index(payload.name)
except Exception as e:
print(f'Some exception occurred while deleting the index: {e}')
exit(0)
# delete_index(DeleteIndex(
# name='lookup_name_-1_lookup_id_-1',
# collection_name='lookup_table',
# metadata_collection_name='test_collection',
# db_name='ilens_configuration',
# metadata_db_name='__test'
# ))
# delete_index(DeleteIndex(
# name='id_1',
# collection_name='design_tag_data',
# metadata_collection_name='test_collection',
# db_name='ilens_configuration',
# metadata_db_name='__test'
# ))
update_all_index_from_db(UpdateIndexFromDB(collection_name='index_info_v2'))
\ No newline at end of file
......@@ -39,11 +39,12 @@ except Exception as e:
class DBConf:
POSTGRES_URI = f'{config.get("POSTGRES_DB", "DB_URIS")}/project_216__ilens_alarms'
pass
POSTGRES_URI = f'{config.get("DB_URIS", "POSTGRES_URI")}/project_216__ilens_alarms'
if not POSTGRES_URI:
print("Error, environment variable POSTGRES_URI not set")
sys.exit(1)
MONGO_URI = config.get("MONGO_DB", "MONGO_URI")
MONGO_URI = config.get("DB_URIS", "MONGO_URI")
if not MONGO_URI:
print("Error, environment variable MONGO_URI not set")
sys.exit(1)
......
......@@ -55,7 +55,7 @@ def get_data(data: str):
def get_collection_info(index: dict):
collection_name = index.get('collection', index.get('collection_name'))
db_name = index.get('db', index.get('db_name'))
db_name = index.get('database', index.get('db', index.get('db_name')))
return collection_name, db_name
def get_index_info(index: dict) -> list[dict]:
......
"""
Author: Owaiz Mustafa Khan
Email: owaiz.mustafakhan@rockwellautomation.com
"""
import pymongo
from pymongo import MongoClient
from pymongo.synchronous.collection import Collection
from pymongo.synchronous.database import Database
from scripts.config.app_configurations import DBConf
# Retrieval
mongo_client = MongoClient(DBConf.MONGO_URI)
def _get_database(name: str) -> Database:
"""
This function is used to get a specific Database from the MongoDB
:param name: Name of the database in which this collection is present
:return: database object of Database class
"""
database = mongo_client.get_database(name)
return database
def get_collection(collection_name: str, db_name: str) -> Collection:
"""
This function is used to get a specific collection from the MongoDB
:param collection_name: The name of collection you want to access
:param db_name: Name of the database in which this collection is present
:return: collection object of Collection class
"""
collection = _get_database(db_name).get_collection(collection_name)
return collection
def execute_aggregate(collection: Collection, query: list):
data = collection.aggregate(query)
data = list(data) if data else []
return data
# Other Utils
def get_data(data: str):
if data == 'ASC':
return pymongo.ASCENDING
elif data == 'DESC':
return pymongo.DESCENDING
elif data == 'GEO2D':
return pymongo.GEO2D
elif data == 'GEOSPHERE':
return pymongo.GEOSPHERE
elif data == 'HASHED':
return pymongo.HASHED
elif data == 'TEXT':
return pymongo.TEXT
def get_collection_info(index: dict):
collection_name = index.get('collection', index.get('collection_name'))
db_name = index.get('database', index.get('db', index.get('db_name')))
return collection_name, db_name
def get_index_info(index: dict) -> list[dict]:
collection_name, db_name = get_collection_info(index)
collection = get_collection(collection_name, db_name)
# Get all index from mongo
all_index = collection.list_indexes()
add_index_list = list(all_index)
# Format to readable data
index_infos = [
{"name": idx["name"], "fields": [[k, 1 if v == 'ASC' else 'DESC'] for k, v in idx["key"].items()]}
for idx in add_index_list
]
return index_infos
def index_exists(index: dict, by_name: bool = False) -> bool:
index_infos = get_index_info(index)
if by_name:
for i in index_infos:
if index.get('name') == i.get('name'):
return True
return False
# Compare if index already exists
for index_info in index_infos:
if index.get('fields') == index_info.get('fields'):
return True
return False
def index_exists_v2(index: dict, collection: dict) -> bool:
index_infos = get_index_info(collection)
keys = list()
if index.get('type', 'simple') == 'simple':
keys = 'something'
else:
keys = [key[0] for key in index.get('keys')]
for key in index.get('keys'):
for index_info in index_infos:
for field in index_info.get('fields'):
if key == field:
return True
return False
def make_keys(indexes: dict) -> list[tuple] | str:
result = list()
keys = indexes.get('keys')
if indexes.get('type') == 'simple':
return [(keys, get_data(indexes.get('additional_properties').get('sort')))]
for key in keys:
result.append((
key[0],
get_data(key[1])
))
return result
# Operations
def find_all(collection: Collection):
"""
This function is used to get all the records present in a collection
:param collection: Object of the collection you want to access
:return: A list of all records present in the collection or [] if there are no records in collections
"""
data = collection.aggregate([{'$project': {'_id': 0}}])
find_result = list(data) if data else []
return find_result
def find_index(collection: Collection,
include_database_name: bool = True,
include_schema_name: bool = True,
include_table_name: bool = True,
full_projection: bool = False,
index_name: str = None,
table_name: str = None,
schema_name: str = None,
database_name: str = None):
"""
This function is used to get all the records present in a collection
:param database_name: Name of the database in which you want to find the index
:param schema_name: Name of the schema in which you want to find the index
:param table_name: Name of the table in which you want to find the index
:param full_projection: True if you want the whole record else False [default: False]
:param index_name: Name of the index you want to find
:param include_table_name: True if you want to include table name in result else False [default: True]
:param include_schema_name: True if you want to include schema name in result else False [default: True]
:param include_database_name: True if you want to include databse name in result else False [default: True]
:param collection: Object of the collection you want to access
:return: A ``list`` of all records present in the collection or [] if there are no records in collections
"""
query = list()
projection = {'_id': 0}
if include_database_name:
projection.update({'database_name': 1})
if include_schema_name:
projection.update({"schema_name": "$schemas.schema_name"})
if include_table_name:
projection.update({"table_name": "$schemas.tables.table_name"})
query.append({"$unwind": "$schemas"})
query.append({"$unwind": "$schemas.tables"})
query.append({"$unwind": "$schemas.tables.indexes"})
projection.update({"index": "$schemas.tables.indexes"})
match = dict()
if database_name:
match.update({"database_name": database_name})
if schema_name:
match.update({"schemas.schema_name": schema_name})
if table_name:
match.update({"schemas.tables.table_name": table_name})
if index_name:
match.update({"schemas.tables.indexes.name": index_name})
if database_name or schema_name or table_name or index_name:
query.append({'$match': match})
if full_projection:
query.append({'$project': {'_id': 0}})
else:
query.append({'$project': projection})
result = execute_aggregate(collection, query)
return result
def add_index(collection: Collection, data: dict):
query = {
"database_name": data.get('database_name'),
"schemas.schema_name": data.get('schema_name', 'public'),
"schemas.tables.table_name": data.get('table_name')
}
update = {
"$push": {
"schemas.$[s].tables.$[t].indexes": {
"name": data.get('index').get('name'),
"columns": data.get('index').get('columns'),
"unique": data.get('index').get('unique'),
"type": data.get('index').get('type')
}
}
}
array_filters = [
{"s.schema_name": data.get('schema_name', 'public')},
{"t.table_name": data.get('table_name')}
]
# Perform the update
result = collection.update_one(query, update, array_filters=array_filters)
if result.modified_count > 0:
return True
else:
return False
def update_index(collection: Collection, data: dict):
query = {
"database_name": data.get('database_name'),
"schemas.schema_name": data.get('schema_name', 'public'),
"schemas.tables.table_name": data.get('table_name'),
"schemas.tables.indexes.name": data.get('index').get('name')
}
update = {
"$set": {
"schemas.$[s].tables.$[t].indexes.$[i].name": data.get('new_index').get('name'),
"schemas.$[s].tables.$[t].indexes.$[i].columns": data.get('new_index').get('columns'),
# New columns for the index
"schemas.$[s].tables.$[t].indexes.$[i].unique": data.get('new_index').get('unique'), # Update uniqueness of the index
"schemas.$[s].tables.$[t].indexes.$[i].type": data.get('new_index').get('type') # Update index type
}
}
array_filters = [
{"s.schema_name": data.get('schema_name', 'public')},
{"t.table_name": data.get('table_name')},
{"i.name": data.get('index').get('name')}
]
# Perform the update
result = collection.update_one(query, update, array_filters=array_filters)
# Print the result
if result.modified_count > 0:
print("Index Updated successfully.")
return True
else:
print("No matching index found or index was not updated.")
return False
def delete_index(collection: Collection, data: dict):
filter_criteria = {
"database_name": data.get('database_name'),
"schemas.schema_name": data.get('schema_name', 'public'),
"schemas.tables.table_name": data.get('table_name'),
"schemas.tables.indexes.name": data.get('index').get('name')
}
update_criteria = {
"$pull": {
"schemas.$[s].tables.$[t].indexes": {
"name": data.get('index').get('name')
}
}
}
array_filters = [
{"s.schema_name": data.get('schema_name', 'public')},
{"t.table_name": data.get('table_name')}
]
# Perform the update operation
result = collection.update_one(
filter_criteria,
update_criteria,
array_filters=array_filters
)
# Output the result
if result.modified_count > 0:
print("Index deleted successfully.")
return True
else:
print("No matching index found or index was not deleted.")
return False
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment