Commit e5e4f7df authored by suryakant's avatar suryakant

Trends Dashboard

parent edfe1e7a
......@@ -13,7 +13,8 @@ class APIConstants:
SHUTDOWN = "shutdown"
HEALTH_CHECK = "/healthcheck"
TRENDS_ASYNC_SERVICE = "/trends_async"
DASHBOARD_METADATA_ENDPOINT = "/get_filter"
DASHBOARD_DATA_ENDPOINT = "/get_data"
class MainConstants:
......@@ -35,6 +36,13 @@ class Constants:
"""
EXCEPTION_RAISER = "Exception ->{}"
TRENDS_HANDLING_ENDPOINT = "Trends Handler Endpoints"
DATA = "data"
LINE = "line"
DEPARTMENT = "department"
FORM = "form"
FUNCTION = "function"
VALUES = "values"
FILTER_NAME = "filter_name"
class LoggerConstants:
......
......@@ -4,7 +4,8 @@ from sqlalchemy.pool import NullPool
from scripts.configurations import postgres_details
from scripts.core.logging.application_logging import logger
from sqlalchemy_utils import create_database, database_exists
from scripts.core.schemas.postgres.postgres_tables import TrendsMasterTable
from scripts.core.schemas.postgres.postgres_tables import TrendsMasterTable, \
TrendsFormTable
from scripts.core.exception.app_exceptions import ErrorMessages, GeneralException
......@@ -12,7 +13,6 @@ def database_init():
"""
:Objective: To create default postgres tables
"""
database_obj = None
try:
# Creating engine for postgres
engine = create_engine(url=postgres_details.uri, poolclass=NullPool)
......@@ -23,9 +23,11 @@ def database_init():
create_database(engine.url)
TrendsMasterTable.__table__.create(bind=engine, checkfirst=True)
TrendsFormTable.__table__.create(bind=engine, checkfirst=True)
logger.info("Tables initiation successful")
TrendsMasterTable.index_name.create(bind=engine, checkfirst=True)
# TrendsMasterTable.index_name.create(bind=engine, checkfirst=True)
# TrendsFormTable.index_name.create(bind=engine, checkfirst=True)
# Creating database object for CRUD operation
database_obj = session_local()
......
from sqlalchemy import desc, select, func
from scripts.constants import Constants
from sqlalchemy import select, and_, cast, Text
def get_unique_departments(table):
return select(
table.department.label(Constants.DATA)
).distinct()
def get_unique_function(table, department):
return select(
table.function.label(Constants.DATA)
).distinct().where(
table.department == department
)
def get_unique_form(table, department, function):
return select(
table.form.label(Constants.DATA)
).distinct().where(
and_(
table.department == department,
table.function == function
))
def get_unique_filters(table, department, function, form):
return select(
cast(table.filter, Text).label(Constants.DATA)
).distinct().where(
and_(
table.department == department,
table.function == function,
table.form == form
)
)
......@@ -2,11 +2,15 @@ import copy
from datetime import datetime
from scripts.constants import Constants
from scripts.core.db.postgres import database_init
from scripts.core.db.postgres.psql_query import (
get_unique_departments, get_unique_function,
get_unique_form, get_unique_filters
)
from scripts.core.schemas.postgres import TableObject
from scripts.core.logging.application_logging import logger
from scripts.core.exception.app_exceptions import GeneralException
from scripts.core.schemas.postgres.postgres_tables import \
TrendsMasterTable
TrendsMasterTable, TrendsFormTable
class TrendsDashboardHandler:
......@@ -14,76 +18,75 @@ class TrendsDashboardHandler:
Class responsible for creating tables and performing calculations for
PepsiCo metrics.
"""
def test(self):
print("hello World")
def get_max_score_data(self, db_init):
def get_trends_metadata(self, request_data):
"""
Method to get max scores for every year
:param db_init:
:return:
get_trends_metadata
"""
max_score_json = {}
filter_flag = False
max_score_obj = TableObject(
db=db_init, table_name=TrendsMasterTable
final_metadata_json = dict(
values=[]
)
# max_score_query = fetch_max_score_downday_query(
# table=max_score_obj.table,
# )
max_score_query = ""
response = max_score_obj.execute_query(query=max_score_query)
if response:
for item in response:
max_score_json[item['year']] = {'max': str(item['max'])}
logger.info("Database initialization")
db_init = database_init()
return max_score_json
# Creating table object
trends_master_tbl_obj = TableObject(
db=db_init, table_name=TrendsMasterTable
)
def bulk_upsert(self, session, table, data_list, primary_key):
"""
Method to perform bulk upsert operation.
if request_data.department and request_data.function and request_data.form:
filter_flag = True
filter_name = Constants.LINE
filter_query = get_unique_filters(
table=TrendsMasterTable,
department=request_data.department,
function=request_data.function,
form=request_data.form
)
Args:
session (Session): Database session.
table (Table): SQLAlchemy Table object.
data_list (list): List of data to be upsert.
primary_key (str): Primary key of the table.
elif request_data.department and request_data.function:
filter_name = Constants.FORM
filter_query = get_unique_form(
table=TrendsMasterTable,
department=request_data.department,
function=request_data.function
)
Returns:
bool: True if upsert operation is successful, False otherwise.
"""
try:
existing_ids = session.query(table.date).filter(
table.date.in_(
[item[primary_key] for item in data_list])).all()
existing_ids = [id_[0] for id_ in existing_ids]
elif request_data.department:
filter_name = Constants.FUNCTION
filter_query = get_unique_function(
table=TrendsMasterTable,
department=request_data.department
)
# # Filter out existing IDs before inserting
# unique_items = [item for item in data_list if
# item[primary_key] not in existing_ids]
else:
filter_name = Constants.DEPARTMENT
filter_query = get_unique_departments(
table=TrendsMasterTable
)
# Filter out existing IDs before inserting
# Updating the unique_items list to replace empty strings with None
new_unique_items = []
for item in data_list:
new_item = {}
for key, value in item.items():
if value == '':
new_item[key] = None
else:
new_item[key] = value
if item[primary_key] not in existing_ids:
new_unique_items.append(new_item)
# Getting response from the Trends Master Table
response_data = trends_master_tbl_obj.execute_query(
query=filter_query
)
session.bulk_insert_mappings(table, new_unique_items)
if response_data and not filter_flag:
final_metadata_json[Constants.FILTER_NAME] = filter_name
for each_metadata in response_data:
final_metadata_json[Constants.VALUES].append(
dict(
key=each_metadata[Constants.DATA],
label=each_metadata[Constants.DATA],
)
)
instances = [table(**item) for item in new_unique_items]
session.add_all(instances)
session.commit()
session.close()
return final_metadata_json
return True
except Exception as e:
session.rollback()
raise RuntimeError(f"Error during upsert operation: {e}")
def get_trends_data(self, request_data):
"""
Docstring
"""
return True
from .trends_schema import (
TrendsAsyncInput,
TrendsAsyncOutput,
DashboardFilterInput,
DashboardFilterOutput,
)
trends_async_request = TrendsAsyncInput
trends_async_response = TrendsAsyncOutput
trends_request = DashboardFilterInput
trends_response = DashboardFilterOutput
from __future__ import annotations
import json
from datetime import datetime
from typing import Optional, Any
from pydantic import BaseModel
import json
class TrendsAsyncInput(BaseModel):
date: datetime
line: str
class DashboardFilterInput(BaseModel):
department: Optional[str] = None
function: Optional[str] = None
form: Optional[str] = None
class DashboardFilterOutput(BaseModel):
filter_name: str
values: list
class DashboardFormInput(BaseModel):
start_date: Optional[str] = None
end_date: Optional[str] = None
mapping_id: Optional[str] = None
department: Optional[str] = None
function: Optional[str] = None
form: Optional[str] = None
parameter: Optional[str] = None
class TrendsAsyncOutput(BaseModel):
status: bool
message: str
class DashboardFormOutput(BaseModel):
filter_name: str
values: list
from sqlalchemy import VARCHAR, Column, Integer, Index, FLOAT, BOOLEAN
from datetime import datetime
from sqlalchemy import VARCHAR, Column, Integer, Index, FLOAT, BOOLEAN, TIMESTAMP, JSON
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class TrendsMasterTable(Base):
__tablename__ = "trends_master_tbl"
__tablename__ = "trends_master_tbl_"
id = Column(Integer, primary_key=True, autoincrement=True)
mapping_id = Column(VARCHAR, nullable=True)
department = Column(VARCHAR, nullable=True)
sub_menu = Column(VARCHAR, nullable=True)
form_id = Column(VARCHAR, nullable=True)
line_id = Column(VARCHAR, nullable=True)
equipment_id = Column(VARCHAR, nullable=True)
function = Column(VARCHAR, nullable=True)
form = Column(VARCHAR, nullable=True)
filter = Column(JSON, nullable=True)
parameter = Column(VARCHAR, nullable=True)
trend_captured = Column(BOOLEAN, nullable=True)
min = Column(FLOAT, nullable=True)
max = Column(FLOAT, nullable=True)
actual_value = Column(FLOAT, nullable=True)
recheck_value = Column(FLOAT, nullable=True)
lower_limit = Column(FLOAT, nullable=True)
upper_value = Column(FLOAT, nullable=True)
index_name = Index('trends_master_indx', mapping_id, department,
sub_menu, form_id, line_id, equipment_id,
parameter)
# index_name = Index("trends_master_tbl_", mapping_id, department,
# function, form, filter, parameter)
class TrendsFormTable(Base):
__tablename__ = "trends_form_tbl"
__tablename__ = "trends_form_tbl_"
id = Column(Integer, primary_key=True, autoincrement=True)
mapping_id = Column(VARCHAR, nullable=True)
time_stamp = Column(TIMESTAMP)
value = Column(VARCHAR, nullable=True)
time = Column(datetime, nullable=True)
index_name = Index('trends_form_indx', mapping_id, time)
# index_name = Index("trends_form_tbl_", mapping_id, time_stamp)
......@@ -10,12 +10,14 @@ Usage:
from scripts.core.services.event_service import router
"""
import json
from fastapi import APIRouter
from fastapi.responses import JSONResponse
from scripts.constants import Constants, APIConstants
from scripts.core.logging.application_logging import logger
from scripts.core.schemas.api import (
trends_async_request, trends_async_response
trends_request, trends_response
)
from scripts.core.handler.trends_handler import TrendsDashboardHandler
......@@ -38,28 +40,48 @@ async def ping():
return dict(status=200)
@downday_router.post(APIConstants.TRENDS_ASYNC_SERVICE,
response_model=trends_async_response)
async def save_trends_data(
request_data: trends_async_request
@downday_router.post(APIConstants.DASHBOARD_METADATA_ENDPOINT,
response_model=trends_response)
async def dashboard_metadata(
request_data: trends_request
):
"""
Initiate postgres db and create tables
Args:
request_data (DashboardFilterInput): Request body containing the
necessary form parameters.
Returns:
"""
try:
trends_obj = TrendsDashboardHandler()
return trends_obj.get_trends_metadata(request_data=request_data)
except Exception as err:
logger.exception(
Constants.EXCEPTION_RAISER.format(str(err)),
exc_info=True,
)
return JSONResponse(status_code=500, content=str(err))
@downday_router.post(APIConstants.DASHBOARD_DATA_ENDPOINT,
response_model=trends_response)
async def dashboard_data(
request_data: trends_request
):
"""
Initiate postgres db and create tables
Args:
content (TrendsAsyncInput): Request body containing the
request_data (DashboardFilterInput): Request body containing the
necessary form parameters.
Returns:
:param request_data:
"""
try:
downday_obj = TrendsDashboardHandler()
status, data = downday_obj.test()
if status:
return dict(status=True, message=data)
else:
return dict(status=False, message=data)
trends_obj = TrendsDashboardHandler()
return trends_obj.get_trends_data(request_data=request_data)
except Exception as err:
logger.exception(
Constants.EXCEPTION_RAISER.format(str(err)),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment