Commit 2968047c authored by tarun.madamanchi's avatar tarun.madamanchi

first commit

parent be430a42
"""
Author: Owaiz Mustafa Khan
Email: owaiz.mustafakhan@rockwellautomation.com
"""
import platform
from fastapi import status, HTTPException
from psycopg2 import NotSupportedError
from scripts.constants.db import MongoConstants
from scripts.schemas.postgres_schema import GetTablesInfoResponse, AddIndex, \
UpdateIndexResponse, UpdateIndex, DeleteIndex, DeleteIndexResponse, AddIndexResponse
from scripts.schemas.util_schema import DetectOS
from scripts.utils.common.db.mongo import find_all, get_collection, find_index, delete_index, add_index, update_index
from scripts.utils.common.db.postgres import create_index_from_data, recreate_index, drop_index_from_data
class CommonUtils:
@staticmethod
def detect_os() -> DetectOS:
"""
Helper Function To Detect OS
:return: **DetectOS**
"""
result = DetectOS(
os_name = platform.system(),
os_version=platform.version(),
os_release= platform.release(),
message="Unknown or unsupported OS."
)
print(f"Operating System: {result.os_name}")
print(f"OS Release: {result.os_release}")
print(f"OS Version: {result.os_version}")
if result.os_name == "Windows":
result.message = "This is a Windows system."
elif result.os_name == "Linux":
result.message = "This is a Linux system."
elif result.os_name == "Darwin":
result.message = "This is macOS."
print(result.message)
return result
if __name__ == "__main__":
print(detect_os())
def get_tables_info(self):
"""
This function is used to get all the records from the collection 'postgres_default_schema_info'
:return: **GetTablesInfoResponse**: all the records of the collection or False if any error occurs
"""
try:
collection = get_collection(
MongoConstants.collection_postgres_default_schema_info,
MongoConstants.db_default_info
)
records = find_all(collection)
records = {'data': records}
return GetTablesInfoResponse(**records)
except Exception as e:
print(f'Exception occurred: {e}')
return False
def update_index_from_db(self) -> dict:
"""
This function applies all the indexes to the postgres tables on the basis of data in Mongo
:return: dict response {'status': 'SUCCESS', 'message': 'All the indexes from the db are applied to postgres'}
"""
try:
collection = get_collection(
MongoConstants.collection_postgres_default_schema_info,
MongoConstants.db_default_info
)
records = find_index(collection)
for record in records:
record = UpdateIndex(**dict(record))
create_index_from_data(record.model_dump())
return {
'status': 'SUCCESS',
'message': 'All the indexes from the db are applied to postgres'
}
except Exception as e:
print(f'Exception occurred: {e}')
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f'''There was some technical error at our end we'll resolve it quickly. Thank you for your patience.'''
)
def add_index(self, payload: AddIndex):
"""
This is a dynamic functions that Adds Index if not present or Updates Index if present subsequently updating the metadata in Mongo
AddIndex:
database_name: Name of database you want the index to be added or updated on
schema_name: Name of schema you want the index to be added or updated on
table_name: Name of table you want the index to be added or updated on
index: Class: PostgresTableIndex
name: Name of the Index
columns: Array of columns you want to include in index
type: The Access Method of Index
unique: [true/false]
:param payload: Class: AddIndex(Information of AddIndex is Mentioned Above)
:return: **AddIndexResponse** or **UpdateIndexResponse** the newly added or updated index info record of Mongo
"""
try:
collection = get_collection(
MongoConstants.collection_postgres_default_schema_info,
MongoConstants.db_default_info
)
exists = True
records = find_index(
collection,
index_name=payload.index.name,
table_name=payload.table_name,
schema_name=payload.schema_name,
database_name=payload.database_name
)
if not len(records):
exists = False
# Till here index of same name doesn't exist
# Now will check for columns
records = find_index(
collection,
database_name=payload.database_name,
schema_name=payload.schema_name,
table_name=payload.table_name
)
for record in records:
record = UpdateIndex(**record)
if (
(payload.index.columns == record.index.columns)
and
(payload.index.type == record.index.type)
):
records = [record.model_dump()]
exists = True
break
if exists:
for record in records:
index_data = dict(record)
record = UpdateIndexResponse(**index_data)
index_data.update({'new_index': payload.index.model_dump()})
recreate_index(index_data)
if not update_index(collection, index_data):
return HTTPException(
status_code=status.HTTP_417_EXPECTATION_FAILED,
detail="""Unable to update index metadata in database"""
)
return record
create_index_from_data(payload.model_dump())
if not add_index(collection, payload.model_dump()):
return HTTPException(
status_code=status.HTTP_417_EXPECTATION_FAILED,
detail="""Unable to add metadata to the database"""
)
return AddIndexResponse(**payload.model_dump())
except NotSupportedError as e:
if 'access method "hash" does not support multicolumn indexes' in str(e):
raise HTTPException(
status_code=status.HTTP_406_NOT_ACCEPTABLE,
detail="HASH Doesn't Support Multiple Columns"
)
except Exception as e:
print(f'Exception occurred: {e}')
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f'''There was some technical error at our end we'll resolve it quickly. Thank you for your patience.'''
)
def delete_index(self, payload: DeleteIndex) -> None | HTTPException | DeleteIndexResponse:
"""
This functions Deletes the index if it exists subsequently deleting the metadata in Mongo
DeleteIndex:
name: Name of the Index to delete
:param payload: Class: DeleteIndex(Information of DeleteIndex is Mentioned Above)
:return: **DeleteIndexResponse** the deleted index info record of Mongo or **HTTPException**
"""
try:
collection = get_collection(
MongoConstants.collection_postgres_default_schema_info,
MongoConstants.db_default_info
)
records = find_index(
collection,
index_name=payload.name
)
if not len(records):
return HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f'Failed To Delete.... No Index Like "{payload.name}" Exists'
)
for record in records:
index_data = dict(record)
record = DeleteIndexResponse(**record)
drop_index_from_data(index_data)
if not delete_index(collection, index_data):
return HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED,
detail="""Unable to delete metadata for db"""
)
return record
except Exception as e:
print(f'Exception occurred: {e}')
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f'''There was some technical error at our end we'll resolve it quickly. Thank you for your patience.'''
)
"""
Author: Owaiz Mustafa Khan
Email: owaiz.mustafakhan@rockwellautomation.com
"""
from pymongo import MongoClient
from pymongo.synchronous.collection import Collection
from pymongo.synchronous.database import Database
from scripts.config.app_configurations import DBConf
# Retrieval
mongo_client = MongoClient(DBConf.MONGO_URI)
def _get_database(name: str) -> Database:
"""
This function is used to get a specific Database from the MongoDB
:param name: Name of the database in which this collection is present
:return: database object of Database class
"""
database = mongo_client.get_database(name)
return database
def get_collection(collection_name: str, db_name: str) -> Collection:
"""
This function is used to get a specific collection from the MongoDB
:param collection_name: The name of collection you want to access
:param db_name: Name of the database in which this collection is present
:return: collection object of Collection class
"""
collection = _get_database(db_name).get_collection(collection_name)
return collection
def execute_aggregate(collection: Collection, query: list):
data = collection.aggregate(query)
data = list(data) if data else []
return data
# Operations
def find_all(collection: Collection):
"""
This function is used to get all the records present in a collection
:param collection: Object of the collection you want to access
:return: A list of all records present in the collection or [] if there are no records in collections
"""
data = collection.aggregate([{'$project': {'_id': 0}}])
find_result = list(data) if data else []
return find_result
def find_index(collection: Collection,
include_database_name: bool = True,
include_schema_name: bool = True,
include_table_name: bool = True,
full_projection: bool = False,
index_name: str = None,
table_name: str = None,
schema_name: str = None,
database_name: str = None):
"""
This function is used to get all the records present in a collection
:param database_name: Name of the database in which you want to find the index
:param schema_name: Name of the schema in which you want to find the index
:param table_name: Name of the table in which you want to find the index
:param full_projection: True if you want the whole record else False [default: False]
:param index_name: Name of the index you want to find
:param include_table_name: True if you want to include table name in result else False [default: True]
:param include_schema_name: True if you want to include schema name in result else False [default: True]
:param include_database_name: True if you want to include databse name in result else False [default: True]
:param collection: Object of the collection you want to access
:return: A list of all records present in the collection or [] if there are no records in collections
"""
query = list()
projection = {'_id': 0}
if include_database_name:
projection.update({'database_name': 1})
if include_schema_name:
projection.update({"schema_name": "$schemas.schema_name"})
if include_table_name:
projection.update({"table_name": "$schemas.tables.table_name"})
query.append({"$unwind": "$schemas"})
query.append({"$unwind": "$schemas.tables"})
query.append({"$unwind": "$schemas.tables.indexes"})
projection.update({"index": "$schemas.tables.indexes"})
match = dict()
if database_name:
match.update({"database_name": database_name})
if schema_name:
match.update({"schemas.schema_name": schema_name})
if table_name:
match.update({"schemas.tables.table_name": table_name})
if index_name:
match.update({"schemas.tables.indexes.name": index_name})
if database_name or schema_name or table_name or index_name:
query.append({'$match': match})
if full_projection:
query.append({'$project': {'_id': 0}})
else:
query.append({'$project': projection})
result = execute_aggregate(collection, query)
return result
def add_index(collection: Collection, data: dict):
query = {
"database_name": data.get('database_name'),
"schemas.schema_name": data.get('schema_name', 'public'),
"schemas.tables.table_name": data.get('table_name')
}
update = {
"$push": {
"schemas.$[s].tables.$[t].indexes": {
"name": data.get('index').get('name'),
"columns": data.get('index').get('columns'),
"unique": data.get('index').get('unique'),
"type": data.get('index').get('type')
}
}
}
array_filters = [
{"s.schema_name": data.get('schema_name', 'public')},
{"t.table_name": data.get('table_name')}
]
# Perform the update
result = collection.update_one(query, update, array_filters=array_filters)
if result.modified_count > 0:
return True
else:
return False
def update_index(collection: Collection, data: dict):
query = {
"database_name": data.get('database_name'),
"schemas.schema_name": data.get('schema_name', 'public'),
"schemas.tables.table_name": data.get('table_name'),
"schemas.tables.indexes.name": data.get('index').get('name')
}
update = {
"$set": {
"schemas.$[s].tables.$[t].indexes.$[i].name": data.get('new_index').get('name'),
"schemas.$[s].tables.$[t].indexes.$[i].columns": data.get('new_index').get('columns'),
# New columns for the index
"schemas.$[s].tables.$[t].indexes.$[i].unique": data.get('new_index').get('unique'), # Update uniqueness of the index
"schemas.$[s].tables.$[t].indexes.$[i].type": data.get('new_index').get('type') # Update index type
}
}
array_filters = [
{"s.schema_name": data.get('schema_name', 'public')},
{"t.table_name": data.get('table_name')},
{"i.name": data.get('index').get('name')}
]
# Perform the update
result = collection.update_one(query, update, array_filters=array_filters)
# Print the result
if result.modified_count > 0:
print("Index Updated successfully.")
return True
else:
print("No matching index found or index was not updated.")
return False
def delete_index(collection: Collection, data: dict):
filter_criteria = {
"database_name": data.get('database_name'),
"schemas.schema_name": data.get('schema_name', 'public'),
"schemas.tables.table_name": data.get('table_name'),
"schemas.tables.indexes.name": data.get('index').get('name')
}
update_criteria = {
"$pull": {
"schemas.$[s].tables.$[t].indexes": {
"name": data.get('index').get('name')
}
}
}
array_filters = [
{"s.schema_name": data.get('schema_name', 'public')},
{"t.table_name": data.get('table_name')}
]
# Perform the update operation
result = collection.update_one(
filter_criteria,
update_criteria,
array_filters=array_filters
)
# Output the result
if result.modified_count > 0:
print("Index deleted successfully.")
return True
else:
print("No matching index found or index was not deleted.")
return False
"""
Author: Owaiz Mustafa Khan
Email: owaiz.mustafakhan@rockwellautomation.com
"""
import logging
from sqlalchemy import text
from sqlalchemy.engine import reflection
from sqlalchemy.orm import Session
from scripts.db.psql.databases import engine
def _table_has_column(table, column):
insp = reflection.Inspector.from_engine(engine)
has_column = False
for col in insp.get_columns(table):
if column != col["name"]:
continue
has_column = True
return has_column
def _table_has_index(table, index):
insp = reflection.Inspector.from_engine(engine)
has_index = False
for ind in insp.get_indexes(table):
if index != ind["name"]:
continue
has_index = True
return has_index
def create_index_from_data(index_data: dict):
index = index_data["index"]
if index_data.__contains__('new_index'):
index = index_data['new_index']
index_name = index["name"]
columns = index["columns"]
using = index.get("type", "btree").upper()
unique = index.get("unique", False)
schema = index_data.get("schema_name", "public")
table_name = index_data["table_name"]
column_str = ", ".join(f'"{col}"' for col in columns)
unique_str = "UNIQUE " if unique else ""
create_sql = (
f'CREATE {unique_str}INDEX IF NOT EXISTS "{index_name}" '
f'ON "{schema}"."{table_name}" USING {using} ({column_str});'
)
with engine.begin() as conn:
conn.execute(text(create_sql))
print(f"Created index: {index_name} on {schema}.{table_name}")
def drop_index_from_data(index_data: dict):
index_name = index_data["index"]["name"]
schema = index_data.get("schema_name", "public") # fallback to 'public'
drop_sql = f'DROP INDEX IF EXISTS "{schema}"."{index_name}";'
with engine.begin() as conn:
conn.execute(text(drop_sql))
print(f"Dropped index: {schema}.{index_name}")
def recreate_index(index_data: dict):
drop_index_from_data(index_data)
create_index_from_data(index_data)
print(f"Updated index: {index_data['new_index']['name']} on {index_data.get('schema_name', 'public')}.{index_data.get('table_name')}")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment