Commit be430a42 authored by tarun.madamanchi's avatar tarun.madamanchi

first commit

parent f8f8240a
[MONGO_DB] [DB_URIS]
MONGO_URI=$MONGO_URI MONGO_URI=$MONGO_URI
[POSTGRES_DB]
POSTGRES_URI=$POSTGRES_URI POSTGRES_URI=$POSTGRES_URI
REDIS_URI=$REDIS_URI
...@@ -110,7 +110,6 @@ def main(): ...@@ -110,7 +110,6 @@ def main():
setup_logging() setup_logging()
# uri = "mongodb://dev-mongo-readwrite:dev-mongo-readwrite%40123@192.168.0.220:31589/?directConnection=true" # uri = "mongodb://dev-mongo-readwrite:dev-mongo-readwrite%40123@192.168.0.220:31589/?directConnection=true"
# uri="mongodb://admin:iLens%23JUBv705@192.168.0.207:3689/?authMechanism=DEFAULT&directConnection=true"
uri="mongodb://iLens:iLens%231234@192.168.0.207:3599/?authMechanism=DEFAULT&directConnection=true" uri="mongodb://iLens:iLens%231234@192.168.0.207:3599/?authMechanism=DEFAULT&directConnection=true"
client = connect_to_mongo(uri) client = connect_to_mongo(uri)
index_list, json_structure = get_user_defined_indexes(client) index_list, json_structure = get_user_defined_indexes(client)
......
...@@ -39,7 +39,7 @@ except Exception as e: ...@@ -39,7 +39,7 @@ except Exception as e:
class DBConf: class DBConf:
POSTGRES_URI = f'{config.get("POSTGRES_DB", "POSTGRES_URI")}/project_216__ilens_alarms' POSTGRES_URI = f'{config.get("POSTGRES_DB", "DB_URIS")}/project_216__ilens_alarms'
if not POSTGRES_URI: if not POSTGRES_URI:
print("Error, environment variable POSTGRES_URI not set") print("Error, environment variable POSTGRES_URI not set")
sys.exit(1) sys.exit(1)
...@@ -47,3 +47,4 @@ class DBConf: ...@@ -47,3 +47,4 @@ class DBConf:
if not MONGO_URI: if not MONGO_URI:
print("Error, environment variable MONGO_URI not set") print("Error, environment variable MONGO_URI not set")
sys.exit(1) sys.exit(1)
import datetime
from typing import Optional
from sqlalchemy import ForeignKey
from sqlalchemy.orm import Mapped, mapped_column
from scripts.db.psql.databases import Base
class AlarmEventHistorySchema(Base):
__tablename__ = "alarm_event_history"
alarm_event_id: Mapped[str] = mapped_column(primary_key=True)
alarm_id: Mapped[str] = mapped_column(ForeignKey(alarmdefinition_id))
trigger_time: Mapped[datetime.datetime] = mapped_column()
trigger_condition: Mapped[str]
project_id: Mapped[str]
alarm_status: Mapped[Optional[str]] = mapped_column(default="")
alarm_state: Mapped[Optional[str]] = mapped_column(default="")
ack_status: Mapped[Optional[bool]] = mapped_column(default=False)
end_time: Mapped[Optional[datetime.datetime]] = mapped_column(default=None)
duration: Mapped[Optional[int]] = mapped_column(default=0)
alarm_template: Mapped[Optional[str]] = mapped_column(default="")
\ No newline at end of file
"""
Author: Owaiz Mustafa Khan
Email: owaiz.mustafakhan@rockwellautomation.com
"""
from pymongo import MongoClient
from pymongo.synchronous.collection import Collection
from pymongo.synchronous.database import Database
from scripts.config.app_configurations import DBConf
# Retrieval
mongo_client = MongoClient(DBConf.MONGO_URI)
def _get_database(name: str) -> Database:
"""
This function is used to get a specific Database from the MongoDB
:param name: Name of the database in which this collection is present
:return: database object of Database class
"""
database = mongo_client.get_database(name)
return database
def get_collection(collection_name: str, db_name: str) -> Collection:
"""
This function is used to get a specific collection from the MongoDB
:param collection_name: The name of collection you want to access
:param db_name: Name of the database in which this collection is present
:return: collection object of Collection class
"""
collection = _get_database(db_name).get_collection(collection_name)
return collection
def execute_aggregate(collection: Collection, query: list):
data = collection.aggregate(query)
data = list(data) if data else []
return data
# Operations
def find_all(collection: Collection):
"""
This function is used to get all the records present in a collection
:param collection: Object of the collection you want to access
:return: A list of all records present in the collection or [] if there are no records in collections
"""
data = collection.aggregate([{'$project': {'_id': 0}}])
find_result = list(data) if data else []
return find_result
def find_index(collection: Collection,
include_database_name: bool = True,
include_schema_name: bool = True,
include_table_name: bool = True,
full_projection: bool = False,
index_name: str = None,
table_name: str = None,
schema_name: str = None,
database_name: str = None):
"""
This function is used to get all the records present in a collection
:param database_name: Name of the database in which you want to find the index
:param schema_name: Name of the schema in which you want to find the index
:param table_name: Name of the table in which you want to find the index
:param full_projection: True if you want the whole record else False [default: False]
:param index_name: Name of the index you want to find
:param include_table_name: True if you want to include table name in result else False [default: True]
:param include_schema_name: True if you want to include schema name in result else False [default: True]
:param include_database_name: True if you want to include databse name in result else False [default: True]
:param collection: Object of the collection you want to access
:return: A list of all records present in the collection or [] if there are no records in collections
"""
query = list()
projection = {'_id': 0}
if include_database_name:
projection.update({'database_name': 1})
if include_schema_name:
projection.update({"schema_name": "$schemas.schema_name"})
if include_table_name:
projection.update({"table_name": "$schemas.tables.table_name"})
query.append({"$unwind": "$schemas"})
query.append({"$unwind": "$schemas.tables"})
query.append({"$unwind": "$schemas.tables.indexes"})
projection.update({"index": "$schemas.tables.indexes"})
match = dict()
if database_name:
match.update({"database_name": database_name})
if schema_name:
match.update({"schemas.schema_name": schema_name})
if table_name:
match.update({"schemas.tables.table_name": table_name})
if index_name:
match.update({"schemas.tables.indexes.name": index_name})
if database_name or schema_name or table_name or index_name:
query.append({'$match': match})
if full_projection:
query.append({'$project': {'_id': 0}})
else:
query.append({'$project': projection})
result = execute_aggregate(collection, query)
return result
def add_index(collection: Collection, data: dict):
query = {
"database_name": data.get('database_name'),
"schemas.schema_name": data.get('schema_name', 'public'),
"schemas.tables.table_name": data.get('table_name')
}
update = {
"$push": {
"schemas.$[s].tables.$[t].indexes": {
"name": data.get('index').get('name'),
"columns": data.get('index').get('columns'),
"unique": data.get('index').get('unique'),
"type": data.get('index').get('type')
}
}
}
array_filters = [
{"s.schema_name": data.get('schema_name', 'public')},
{"t.table_name": data.get('table_name')}
]
# Perform the update
result = collection.update_one(query, update, array_filters=array_filters)
if result.modified_count > 0:
return True
else:
return False
def update_index(collection: Collection, data: dict):
query = {
"database_name": data.get('database_name'),
"schemas.schema_name": data.get('schema_name', 'public'),
"schemas.tables.table_name": data.get('table_name'),
"schemas.tables.indexes.name": data.get('index').get('name')
}
update = {
"$set": {
"schemas.$[s].tables.$[t].indexes.$[i].name": data.get('new_index').get('name'),
"schemas.$[s].tables.$[t].indexes.$[i].columns": data.get('new_index').get('columns'),
# New columns for the index
"schemas.$[s].tables.$[t].indexes.$[i].unique": data.get('new_index').get('unique'), # Update uniqueness of the index
"schemas.$[s].tables.$[t].indexes.$[i].type": data.get('new_index').get('type') # Update index type
}
}
array_filters = [
{"s.schema_name": data.get('schema_name', 'public')},
{"t.table_name": data.get('table_name')},
{"i.name": data.get('index').get('name')}
]
# Perform the update
result = collection.update_one(query, update, array_filters=array_filters)
# Print the result
if result.modified_count > 0:
print("Index Updated successfully.")
return True
else:
print("No matching index found or index was not updated.")
return False
def delete_index(collection: Collection, data: dict):
filter_criteria = {
"database_name": data.get('database_name'),
"schemas.schema_name": data.get('schema_name', 'public'),
"schemas.tables.table_name": data.get('table_name'),
"schemas.tables.indexes.name": data.get('index').get('name')
}
update_criteria = {
"$pull": {
"schemas.$[s].tables.$[t].indexes": {
"name": data.get('index').get('name')
}
}
}
array_filters = [
{"s.schema_name": data.get('schema_name', 'public')},
{"t.table_name": data.get('table_name')}
]
# Perform the update operation
result = collection.update_one(
filter_criteria,
update_criteria,
array_filters=array_filters
)
# Output the result
if result.modified_count > 0:
print("Index deleted successfully.")
return True
else:
print("No matching index found or index was not deleted.")
return False
"""
Author: Owaiz Mustafa Khan
Email: owaiz.mustafakhan@rockwellautomation.com
"""
import logging
from sqlalchemy import text
from sqlalchemy.engine import reflection
from sqlalchemy.orm import Session
from scripts.db.psql.databases import engine
def _table_has_column(table, column):
insp = reflection.Inspector.from_engine(engine)
has_column = False
for col in insp.get_columns(table):
if column != col["name"]:
continue
has_column = True
return has_column
def _table_has_index(table, index):
insp = reflection.Inspector.from_engine(engine)
has_index = False
for ind in insp.get_indexes(table):
if index != ind["name"]:
continue
has_index = True
return has_index
def create_index_from_data(index_data: dict):
index = index_data["index"]
if index_data.__contains__('new_index'):
index = index_data['new_index']
index_name = index["name"]
columns = index["columns"]
using = index.get("type", "btree").upper()
unique = index.get("unique", False)
schema = index_data.get("schema_name", "public")
table_name = index_data["table_name"]
column_str = ", ".join(f'"{col}"' for col in columns)
unique_str = "UNIQUE " if unique else ""
create_sql = (
f'CREATE {unique_str}INDEX IF NOT EXISTS "{index_name}" '
f'ON "{schema}"."{table_name}" USING {using} ({column_str});'
)
with engine.begin() as conn:
conn.execute(text(create_sql))
print(f"Created index: {index_name} on {schema}.{table_name}")
def drop_index_from_data(index_data: dict):
index_name = index_data["index"]["name"]
schema = index_data.get("schema_name", "public") # fallback to 'public'
drop_sql = f'DROP INDEX IF EXISTS "{schema}"."{index_name}";'
with engine.begin() as conn:
conn.execute(text(drop_sql))
print(f"Dropped index: {schema}.{index_name}")
def recreate_index(index_data: dict):
drop_index_from_data(index_data)
create_index_from_data(index_data)
print(f"Updated index: {index_data['new_index']['name']} on {index_data.get('schema_name', 'public')}.{index_data.get('table_name')}")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment