Commit 691d5279 authored by suryakant's avatar suryakant

Trends Dashboard

parent 5cb985a3
# PROJECT DETAILS
HOST = 0.0.0.0
PORT = 80
# POSTGRES DETAILS
POSTGRES_URI = postgresql://admin:UtAdm#Post265399@20.221.119.96:5335/project_101__ilens_assistant
# LOG DETAILS
LOG_BASE_PATH=logs/
LOG_LEVEL=DEBUG
MAX_BYTES=10000000
HANDLER_TYPE=rotating_file_handler
LOG_FILE_NAME=real-time-asset-tracking
BACKUP_COUNT=10
FROM python:3.11.1-slim
ADD . /root/demo/pepsico-view-services
WORKDIR /root/demo/pepsico-view-services
RUN mkdir ~/.pip
RUN pip install -r requirements.txt
ENTRYPOINT ["python3", "app.py"]
\ No newline at end of file
# trends_dashboard
Trends Dashboard
\ No newline at end of file
# Trends Dashboard
version = "v1.0"
"""
app.py - Trends Dashboard
This script initializes and configures the
PepsiCO: Trends Dashboard
Usage:
Start the FastAPI application by running this script .
Developer: str = 'Suryakant Soni - {H115-301}'
"""
import uvicorn
from fastapi import FastAPI
from __version__ import version
from scripts.core.service import api_router
from scripts.constants import MainConstants
from fastapi.middleware.cors import CORSMiddleware
from scripts.configurations import service_details
app = FastAPI(
title="PepsiCO: Trends Dashboard",
version=version,
description=MainConstants.DESCRIPTION_KEY,
openapi_tags=[MainConstants.META_TAG_JSON]
)
app.include_router(router=api_router)
origins = ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=[MainConstants.TOKEN_KEY]
)
if __name__ == '__main__':
uvicorn.run(app=app,
host=service_details.host,
port=int(service_details.port))
"""
This __init__ file contains the configuration settings for the application
"""
from dotenv import load_dotenv
from pydantic_settings import BaseSettings
""" Loading dot env file"""
load_dotenv()
class ServiceDetails(BaseSettings):
"""
Configuration class for Project details
"""
host: str
port: str
class PostgresDetails(BaseSettings):
"""
Configuration class for PostgreSQL database details.
Config:
env_prefix (str): The environment variable prefix for PostgreSQL settings.
"""
uri: str
class Config:
env_prefix = "POSTGRES_"
class LoggingDetails(BaseSettings):
"""
Configuration class for logging database details
"""
log_base_path: str
log_level: str
max_bytes: int
handler_type: str
log_file_name: str
backup_count: int
service_details = ServiceDetails()
postgres_details = PostgresDetails()
logging_details = LoggingDetails()
"""
constants.py - Application Constants
This module defines various constants used in the application.
"""
class APIConstants:
"""
Constants related to API endpoints
"""
SHUTDOWN = "shutdown"
HEALTH_CHECK = "/healthcheck"
TRENDS_ASYNC_SERVICE = "/trends_async"
class MainConstants:
"""
Constants related to the main application
"""
META_TAG_JSON = {
"name": "PepsiCO: Trends Dashboard",
"description": "Service for pushing data into the Postgres table",
}
DESCRIPTION_KEY = "Trends Dashboard"
TOKEN_KEY = "token"
class Constants:
"""
Constants related to the application
"""
EXCEPTION_RAISER = "Exception ->{}"
TRENDS_HANDLING_ENDPOINT = "Trends Handler Endpoints"
class LoggerConstants:
LOGGER_1 = ""
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import NullPool
from scripts.configurations import postgres_details
from scripts.core.logging.application_logging import logger
from sqlalchemy_utils import create_database, database_exists
from scripts.core.schemas.postgres.postgres_tables import TrendsAsyncTable
from scripts.core.exception.app_exceptions import ErrorMessages, GeneralException
def database_init():
"""
:Objective: To create default postgres tables
"""
database_obj = None
try:
# Creating engine for postgres
engine = create_engine(url=postgres_details.uri, poolclass=NullPool)
session_local = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Creating a database is not exist
if not database_exists(engine.url):
create_database(engine.url)
TrendsAsyncTable.__table__.create(bind=engine, checkfirst=True)
logger.info("Tables initiation successful")
TrendsAsyncTable.index_name.create(bind=engine, checkfirst=True)
# Creating database object for CRUD operation
database_obj = session_local()
except Exception as err:
logger.error(ErrorMessages.ERROR002.format(err), exc_info=True)
raise GeneralException(ErrorMessages.ERROR002.format(err))
return database_obj
from sqlalchemy.orm import Session
from scripts.core.utilities.postgres_utility import SQLDBUtils
class TableObject(SQLDBUtils):
def __init__(self, session: Session, table_model):
super().__init__(session)
self.table = table_model
from sqlalchemy import desc, select, func
class AuthenticationError(Exception):
pass
class GeneralException(Exception):
pass
class ErrorMessages:
ERROR001 = "Authentication Failed. Please verify token"
ERROR002 = "Error occurred while creating Database or Tables: {}"
class PostgresMessages:
ERROR001 = "Exception in connecting Postgres DB"
import copy
from datetime import datetime
from scripts.constants import Constants
from scripts.core.db.postgres import database_init
from scripts.core.schemas.postgres import TableObject
from scripts.core.logging.application_logging import logger
from scripts.core.exception.app_exceptions import GeneralException
from scripts.core.schemas.postgres.postgres_tables import \
TrendsAsyncTable
class TrendsDashboardHandler:
"""
Class responsible for creating tables and performing calculations for
PepsiCo metrics.
"""
def test(self):
print("hello World")
def get_max_score_data(self, db_init):
"""
Method to get max scores for every year
:param db_init:
:return:
"""
max_score_json = {}
max_score_obj = TableObject(
db=db_init, table_name=TrendsAsyncTable
)
# max_score_query = fetch_max_score_downday_query(
# table=max_score_obj.table,
# )
max_score_query = ""
response = max_score_obj.execute_query(query=max_score_query)
if response:
for item in response:
max_score_json[item['year']] = {'max': str(item['max'])}
return max_score_json
def bulk_upsert(self, session, table, data_list, primary_key):
"""
Method to perform bulk upsert operation.
Args:
session (Session): Database session.
table (Table): SQLAlchemy Table object.
data_list (list): List of data to be upsert.
primary_key (str): Primary key of the table.
Returns:
bool: True if upsert operation is successful, False otherwise.
"""
try:
existing_ids = session.query(table.date).filter(
table.date.in_(
[item[primary_key] for item in data_list])).all()
existing_ids = [id_[0] for id_ in existing_ids]
# # Filter out existing IDs before inserting
# unique_items = [item for item in data_list if
# item[primary_key] not in existing_ids]
# Filter out existing IDs before inserting
# Updating the unique_items list to replace empty strings with None
new_unique_items = []
for item in data_list:
new_item = {}
for key, value in item.items():
if value == '':
new_item[key] = None
else:
new_item[key] = value
if item[primary_key] not in existing_ids:
new_unique_items.append(new_item)
session.bulk_insert_mappings(table, new_unique_items)
instances = [table(**item) for item in new_unique_items]
session.add_all(instances)
session.commit()
session.close()
return True
except Exception as e:
session.rollback()
raise RuntimeError(f"Error during upsert operation: {e}")
"""
logger utility
"""
import logging
import os
import time
from functools import wraps
from logging.handlers import RotatingFileHandler
from scripts.configurations import logging_details
complete_log_path = os.path.join(
logging_details.log_base_path, logging_details.log_file_name
)
if not os.path.exists(logging_details.log_base_path):
os.mkdir(logging_details.log_base_path)
def timed(func):
"""This decorator prints the execution time for the decorated function."""
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
end = time.time()
logger.debug("{} ran in {}s".format(func.__name__, round(end - start, 5)))
return result
return wrapper
def get_logger(
log_file_name=complete_log_path,
log_level=logging_details.log_level,
time_format="%Y-%m-%d %H:%M:%S",
handler_type=logging_details.handler_type,
max_bytes=logging_details.max_bytes,
backup_count=logging_details.backup_count,
):
"""
Creates a rotating log
"""
log_file = os.path.join(log_file_name + ".log")
__logger__ = logging.getLogger(log_file_name)
__logger__.setLevel(log_level.strip().upper())
debug_formatter = (
"%(asctime)s - %(levelname)-6s - %(name)s - "
"[%(threadName)5s:%(filename)5s:%(funcName)5s():"
"%(lineno)s] - %(message)s"
)
formatter_string = (
"%(asctime)s - %(levelname)-6s - %(name)s - %(levelname)3s - %(message)s"
)
if log_level.strip().upper() == log_level:
formatter_string = debug_formatter
formatter = logging.Formatter(formatter_string, time_format)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
__logger__.addHandler(console_handler)
if str(handler_type).lower() == "rotating_file_handler":
# Rotating File Handler
handler = RotatingFileHandler(
log_file, maxBytes=max_bytes, backupCount=backup_count
)
handler.setFormatter(formatter)
__logger__.addHandler(handler)
return __logger__
logger = get_logger()
from .trends_schema import (
TrendsAsyncInput,
TrendsAsyncOutput,
)
trends_async_request = TrendsAsyncInput
trends_async_response = TrendsAsyncOutput
from __future__ import annotations
from datetime import datetime
from typing import Optional, Any
from pydantic import BaseModel
import json
class TrendsAsyncInput(BaseModel):
date: datetime
line: str
class TrendsAsyncOutput(BaseModel):
status: bool
message: str
from sqlalchemy.orm import Session
from scripts.core.utilities.postgres_utility import SQLDBUtils
class TableObject(SQLDBUtils):
def __init__(self, db: Session, table_name):
super().__init__(db)
self.table = table_name
from sqlalchemy import VARCHAR, Column, Integer, Index, FLOAT
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class TrendsAsyncTable(Base):
__tablename__ = "trends_dashboard_tbl"
id = Column(Integer, primary_key=True)
department = Column(VARCHAR, nullable=True)
sub_menu = Column(VARCHAR, nullable=True)
form = Column(VARCHAR, nullable=True)
filter = Column(VARCHAR, nullable=True)
sub_filter = Column(VARCHAR, nullable=True)
parameter_name = Column(VARCHAR, nullable=True)
lower_limit = Column(FLOAT, nullable=True)
upper_limit = Column(FLOAT, nullable=True)
actual_value = Column(FLOAT, nullable=True)
recheck_value = Column(FLOAT, nullable=True)
index_name = Index('trends_async_index', id, department,
sub_menu, form, filter, sub_filter,
parameter_name)
from fastapi import APIRouter
from scripts.core.service.trends_service import downday_router
"""
Initializing all the service routers
"""
api_router = APIRouter()
api_router.include_router(downday_router)
"""
event_service.py
This script initializes and configures the FastAPI
application for the Vision App: On Demand Service.
The application includes routes, middleware, and server config.
Usage:
from scripts.core.services.event_service import router
"""
from fastapi import APIRouter
from fastapi.responses import JSONResponse
from scripts.constants import Constants, APIConstants
from scripts.core.logging.application_logging import logger
from scripts.core.schemas.api import (
trends_async_request, trends_async_response
)
from scripts.core.handler.trends_handler import TrendsDashboardHandler
downday_router = APIRouter(tags=[Constants.TRENDS_HANDLING_ENDPOINT])
@downday_router.on_event(APIConstants.SHUTDOWN)
async def shutdown_event():
"""
This function prints the `Exiting!` string to the console.
"""
print("Exiting!")
@downday_router.get(APIConstants.HEALTH_CHECK)
async def ping():
"""
This function returns the `200 status message`.
"""
return dict(status=200)
@downday_router.post(APIConstants.TRENDS_ASYNC_SERVICE,
response_model=trends_async_response)
async def save_trends_data(
request_data: trends_async_request
):
"""
Initiate postgres db and create tables
Args:
content (TrendsAsyncInput): Request body containing the
necessary form parameters.
Returns:
:param request_data:
"""
try:
downday_obj = TrendsDashboardHandler()
status, data = downday_obj.test()
if status:
return dict(status=True, message=data)
else:
return dict(status=False, message=data)
except Exception as err:
logger.exception(
Constants.EXCEPTION_RAISER.format(str(err)),
exc_info=True,
)
return JSONResponse(status_code=500, content=str(err))
# import time
# import requests
# from scripts.constants import Constants
# from scripts.core.logging.application_logging import logger
# from scripts.core.exception.app_exceptions import GeneralException
#
#
# class CommonUtility:
# """
# A utility class for common operations
# """
#
# def request_get(self, url, params, headers):
# """
# Sends an HTTP GET request and returns the JSON response.
#
# Parameters:
# - url (str): The URL for the GET request.
# - headers (dict): The headers to be included in the GET request.
#
# Returns:
# - dict or None: The JSON response data if successful, or None if there is an
# error.
# """
# response_data = None
# try:
# response_data = requests.get(
# url=url,
# params=params,
# headers=headers).json()
# except GeneralException as err:
# logger.error(f"Exception in GET API data: {err}")
# return response_data
#
# def request_post(self, url, headers):
# """
# Sends an HTTP POST request and returns the JSON response.
#
# Parameters:
# - url (str): The URL for the POST request.
# - headers (dict): The JSON payload to be included in the POST request.
#
# Returns:
# - dict or None: The JSON response data if successful, or None if there is an
# error.
# """
# response_data = None
# try:
# response_data = requests.post(
# url=url,
# json=headers).json()
# except GeneralException as err:
# logger.error(f"Exception in POST API data: {err}")
# return response_data
import logging
# from fastapi.encoders import jsonable_encoder
from sqlalchemy import Text
from sqlalchemy.orm import Session
from sqlalchemy.dialects.mysql import insert
class SQLDBUtils:
def __init__(self, db: Session):
self.session: Session = db
def close(self):
logging.debug("SQL Session closed")
self.session.close()
@property
def key_filter_expression(self):
return "expression"
@property
def key_filter_column(self):
return "column"
@property
def key_filter_value(self):
return "value"
def add_data(self, table):
self.session.add(table)
self.session.commit()
self.session.refresh(table)
return True
def bulk_insert(self, object_models):
self.session.bulk_save_objects(object_models)
self.session.commit()
return True
def filter_expression(self):
filter_expression = self.filter.get(self.key_filter_expression, "eq")
logging.debug(f"Filter expression: {filter_expression}")
return filter_expression
def filter_column(self):
column = self.filter.get(self.key_filter_column, None)
logging.debug(f"Filter column: {column}")
return column
def filter_value(self):
filter_value = self.filter.get(self.key_filter_value, None)
logging.debug(f"Filter value: {filter_value}")
return filter_value
def _filter(self, session_query, filters=None):
if filters is not None:
for _filter in filters:
self.filter = _filter
if self.filter_column() is None:
continue
session_query = self.get_session_query(session_query=session_query)
return session_query
def get_session_query(self, session_query):
try:
if self.filter_expression() == "eq":
session_query = session_query.filter(
self.filter_column() == self.filter_value()
)
elif self.filter_expression() == "le":
session_query = session_query.filter(
self.filter_column() < self.filter_value()
)
elif self.filter_expression() == "ge":
session_query = session_query.filter(
self.filter_column() > self.filter_value()
)
elif self.filter_expression() == "lte":
session_query = session_query.filter(
self.filter_column() <= self.filter_value()
)
elif self.filter_expression() == "gte":
session_query = session_query.filter(
self.filter_column() >= self.filter_value()
)
elif self.filter_expression() == "neq":
session_query = session_query.filter(
self.filter_column() != self.filter_value()
)
elif self.filter_expression() == "none":
session_query = session_query.filter(self.filter_column().is_(None))
elif self.filter_expression() == "is_none":
session_query = session_query.filter(self.filter_column().is_not(None))
elif self.filter_expression() == "in":
session_query = session_query.filter(
self.filter_column().in_(self.filter_value())
)
except Exception as e:
logging.error(f"Error occurred while filtering the session query {e}")
return session_query
def insert_one(self, table, insert_json):
try:
row = table()
for k in insert_json:
setattr(row, k, insert_json[k])
self.session.merge(row)
self.session.commit()
return True
except Exception as e:
logging.error(f"Error while inserting the record {e}")
raise
def update(
self,
table,
update_json: dict,
filters=None,
insert=False,
insert_id=None,
update_one=False,
):
try:
logging.debug(filters)
row = self.session.query(table)
filtered_row = self._filter(session_query=row, filters=filters)
if update_one:
filtered_row = filtered_row.first()
if filtered_row is None:
logging.debug("There are no rows meeting the given update criteria.")
if insert:
logging.debug("Trying to insert a new record")
if insert_id is None:
logging.warning(
"ID not provided to insert record. Skipping insert."
)
return False
else:
update_json.update(insert_id)
if self.insert_one(table=table, insert_json=update_json):
return True
else:
return False
else:
return False
else:
logging.debug("Record available to update")
for k, v in update_json.items():
if not v:
continue
setattr(filtered_row, k, v)
if not update_one:
filtered_row.update(values=update_json)
self.session.commit()
except Exception as e:
logging.error(f"Error while updating the record {e}")
raise
def delete(self, table, filters=None):
try:
# logging.trace(filters)
row = self.session.query(table)
filtered_row = self._filter(session_query=row, filters=filters)
if filtered_row is None:
logging.debug("There were no records to be deleted")
else:
filtered_row.delete()
self.session.commit()
return True
except Exception as e:
logging.error(f"Failed to delete a record {e}")
raise
def distinct_values_by_column(self, table, column, filters=None):
query = self.session.query(getattr(table, column).distinct().label(column))
query = self._filter(session_query=query, filters=filters)
distinct_values = [getattr(row, column) for row in query.all()]
return distinct_values
def select_from_table(self, table=None, query=None, find_one=False):
if query is None:
query = f"select * from {table}"
result = self.session.execute(query)
response = [dict(zip(row.keys(), row.values())) for row in result]
if find_one and response:
return response[0]
return response
def fetch_from_table(self, table, filter_text, limit_value, skip_value):
logging.debug(filter_text)
row = (
self.session.query(table)
.filter(Text(filter_text))
.limit(limit_value)
.offset(skip_value)
)
result = self.session.execute(row)
return [dict(zip(row.keys(), row.values())) for row in result]
def execute_query(self, table=None, query=None):
try:
if query is None:
query = f"select * from {table}"
result = self.session.execute(query)
# Get column names
columns = result.keys()
# Construct list of dictionaries
output = [dict(zip(columns, row)) for row in result]
self.session.close()
return output
except Exception as e:
logging.error(f"Error occurred during execute_query: {e}")
def fetch_query(self, query):
try:
result = self.session.execute(query)
output = [jsonable_encoder(x) for x in result]
self.session.close()
return output
except Exception as e:
logging.error(f"Error occurred during execute_query: {e}")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment