Commit b60bfd58 authored by suryakant's avatar suryakant

Sterlite Custom Report

parent 320d7831
.git
.cache
*.md
!README*.md
README-secret.md
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# pycharm
.idea/
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
#conf/*.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
.idea
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# pycharm
.idea/
# logs
logs/
FROM tiangolo/uvicorn-gunicorn-fastapi:python3.9-slim
COPY requirements.txt /app/requirements.txt
RUN pip install -r /app/requirements.txt
RUN apt-get update && apt-get install tzdata ffmpeg libsm6 libxext6 -y
ADD . /app
WORKDIR /app/
# sterlite_custom_reports # Sterlite Custom Report
\ No newline at end of file
version = "v1.0"
"""
app.py - Sterlite Vedanta: Refinery Custom Report
This script initializes and configures the Custom Report application
for generating custom report.
Usage:
Start the FastAPI application by running this script.
"""
import uvicorn
from fastapi import FastAPI, Depends
from fastapi.middleware.cors import CORSMiddleware
from __version__ import version
from scripts.constants import MainConstants, CommonConstants
from scripts.configurations import service_details
from scripts.core.services import api_router, auth_obj
app = FastAPI(
title="Sterlite Vedanta: Custom Report",
version=version,
description=MainConstants.DESCRIPTION_KEY,
openapi_tags=[MainConstants.META_TAG_JSON]
)
if service_details.env.lower() != CommonConstants.DEV_KEY:
app.include_router(router=api_router, dependencies=[Depends(auth_obj)])
else:
app.include_router(router=api_router)
origins = ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=[MainConstants.TOKEN_KEY]
)
if __name__ == '__main__':
uvicorn.run(app=app,
host=service_details.host_name,
port=int(service_details.port_name))
-----BEGIN RSA PRIVATE KEY-----
MIICWwIBAAKBgQClilTaeHq6Zc+kWHCNl1O0btGRm7ct3O5zqWx1mwwLUWH14eft
Hi5wIbOYh79JQ9BO2OA4UjPq31uwmJ96Okl0OULfENhwd/D7P3mnoRlktPT2t+tt
RRrKvx3wNpOy/3nBsXnNt8EKxyA7k9vbqLbv9pGw2hcqOYe/NGTkmm1PswIDAQAB
AoGAZPARR1l5NBkKYGKQ1rU0E+wSmx+AtVVmjF39RUSyNmB8Q+poebwSgsr58IKt
T6Yq6Tjyl0UAZTGmferCK0xJJrqyP0hMn4nNNut+acWMKyt+9YrA2FO+r5Jb9JuT
SK35xXnM4aZLGppgWJxRzctpIz+qkf6oLRSZme0AuiqcwYECQQDY+QDL3wbWplRW
bze0DsZRMkDAkNY5OCydvjte4SR/mmAzsrpNrS5NztWbaaQrefoPbsdYBPbd8rS7
C/s/0L1zAkEAw1EC5zt2STuhkcKLa/tL+bk8WHHHtf19aC9kBj1TvWBFh+JojWCo
86iK5fLcHzhyQx5Qi3E9LG2HvOWhS1iUwQJAKbEHHyWW2c4SLJ2oVXf1UYrXeGkc
UNhjclgobl3StpZCYAy60cwyNo9E6l0NR7FjhG2j7lzd1t4ZLkvqFmQU0wJATLPe
yQIwBLh3Te+xoxlQD+Tvzuf3/v9qpWSfClhBL4jEJYYDeynvj6iry3whd91J+hPI
m8o/tNfay5L+UcGawQJAAtbqQc7qidFq+KQYLnv5gPRYlX/vNM+sWstUAqvWdMze
JYUoTHKgiXnSZ4mizI6/ovsBOMJTb6o1OJCKQtYylw==
-----END RSA PRIVATE KEY-----
\ No newline at end of file
-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQClilTaeHq6Zc+kWHCNl1O0btGR
m7ct3O5zqWx1mwwLUWH14eftHi5wIbOYh79JQ9BO2OA4UjPq31uwmJ96Okl0OULf
ENhwd/D7P3mnoRlktPT2t+ttRRrKvx3wNpOy/3nBsXnNt8EKxyA7k9vbqLbv9pGw
2hcqOYe/NGTkmm1PswIDAQAB
-----END PUBLIC KEY-----
\ No newline at end of file
# Service Details
HOST_NAME = 0.0.0.0
PORT_NAME = 8080
EXCEPTION_TRACE=True
ENV=dev
# PostgreSQL DB Details
POSTGRES_URI=postgresql://svc_sterlite:sterliteSvc#1247@20.207.116.89:5946/strelite-datalakedev
POSTGRES_HOST = 20.207.116.89
POSTGRES_PORT = 5946
POSTGRES_USERNAME = svc_sterlite
POSTGRES_PASSWORD = sterliteSvc#1247
POSTGRES_DATABASE = strelite-datalakedev
# Log Details
LOG_BASE_PATH=logs/
LOG_LEVEL=DEBUG
MAX_BYTES=10000000
HANDLER_TYPE=rotating_file_handler
LOG_FILE_NAME=event_processor
BACKUP_COUNT=10
# Redis DB Details
REDIS_URI=redis://192.168.0.220:6379
REDIS_LOGIN_DB=9
REDIS_DASHBOARD_DB=20
REDIS_LIVE_TAG_DB=2
REDIS_LIVE_PREVIOUS_TAG_DB=3
REDIS_PARTITION_DB=15
REDIS_PROJECT_TAGS_DB=18
# Security Token Details
SECURE_COOKIE=False
LOCK_OUT_TIME_MINS=30
LEEWAY_IN_MINS=10
TOKEN=8674cd1d-2578-4a62-8ab7-d3ee5f9a
ISSUER=ilens
ALG=RS256
PUBLIC_KEY_PATH=assets/keys/public
PRIVATE_KEY_PATH=assets/keys/private
version: "1.0"
services:
cement_bag_event_processor:
image: cement_bag_event_processor:v0.0.1
container_name: ""
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- /var/mnt/video/:/var/mnt/video/
restart:
always
ports:
- "8080:8080"
env_file:
- .env
logging:
driver: "json-file"
options:
max-file: "5"
max-size: "10m"
pydantic~=1.10.7
python-dotenv==0.19.2
fastapi~=0.95.0
uvicorn~=0.21.1
PyJWT==2.4.0
jwt-signature-validator~=0.0.1
psycopg2-binary==2.9.5
openpyxl~=3.1.2
pandas~=2.1.0
redis~=3.5.3
custom_report_json = [
{
}
]
"""
This __init__ file contains the configuration settings for the application
"""
import os
import shutil
from pydantic import BaseSettings
from dotenv import load_dotenv
""" Loading dot env file"""
load_dotenv("dev-variables.env")
class SecurityDetails(BaseSettings):
"""
Configuration class for security token details
"""
secure_cookie: bool
lock_out_time_mins: int
leeway_in_mins: int
token: str
issuer: str
alg: str
public_key_path: str
private_key_path: str
class ServiceDetails(BaseSettings):
"""
Configuration class for application service details
"""
host_name: str
port_name: str
env: str
exception_trace: bool
class LoggingDetails(BaseSettings):
"""
Configuration class for logging database details
"""
log_base_path: str
log_level: str
max_bytes: int
handler_type: str
log_file_name: str
backup_count: int
class PostgresDetails(BaseSettings):
"""
Configuration class for PostgreSQL database details.
Config:
env_prefix (str): The environment variable prefix for PostgreSQL settings.
"""
uri: str
host: str
port: str
username: str
password: str
database: str
class Config:
env_prefix = "POSTGRES_"
class RedisDetails(BaseSettings):
"""
Configuration class for Redis database details
Config:
env_prefix (str): The environment variable prefix for Redis settings.
"""
uri: str
login_db: int
dashboard_db: int
live_tag_db: int
live_previous_tag_db: int
partition_db: int
project_tags_db: int
class Config:
env_prefix = "REDIS_"
security_conf_details = SecurityDetails()
service_details = ServiceDetails()
logging_details = LoggingDetails()
postgres_details = PostgresDetails()
redis_details = RedisDetails()
"""
"""
class MainConstants:
"""
Constants related to the main application
"""
META_TAG_JSON = {
"name": "Vision App 2.0: Cement Bag Event Processor",
"description": "On Demand Service for capturing the events",
}
DESCRIPTION_KEY = "Cement Bag Event Processor"
TOKEN_KEY = "token"
class APIConstants:
"""
Constants related to API endpoints
"""
STARTUP = "startup"
SHUTDOWN = "shutdown"
HEALTH_CHECK = "/healthcheck"
INIT_DB_ENDPOINT = "/"
class CommonConstants:
"""
Common constants used in the application
"""
GET = "GET"
POST = "POST"
EVENT_HANDLING_ENDPOINT = "Event Handler Endpoints"
EXCEPTION_RAISER = "Exception ->{}"
DEV_KEY = "dev"
class PostgresConstants:
"""
Constants related to PostgreSQL database
"""
figlet = """
____ _ _ _ _
/ ___| | |_ ___ _ __ | | (_) | |_ ___
\___ \ | __| / _ \ | '__| | | | | | __| / _ /
___) | | |_ | __/ | | | | | | | |_ | __/
|____/ \__| \___| |_| |_| |_| \__| \___|
____ _
| _ \ ___ _ __ ___ _ __ | |_
| |_) | / _ \ | '_ \ / _ \ | '__| | __|
| _ < | __/ | |_) | | (_) | | | | |_
|_| \_\ \___| | .__/ \___/ |_| \__|
|_|
"""
class CustomReportQuery:
"""
"""
ANODE_AVAILABILITY_QUERY_1 = ""
\ No newline at end of file
import redis
from scripts.configurations import redis_details
login_db = redis.from_url(
redis_details.uri, db=int(redis_details.login_db), decode_responses=True
)
project_details_db = redis.from_url(
redis_details.uri, db=int(redis_details.project_tags_db), decode_responses=True
)
count_push_db = redis.from_url(
redis_details.uri, db=int(redis_details.live_tag_db), decode_responses=True
)
class AuthenticationError(Exception):
pass
class GeneralException(Exception):
pass
class ErrorMessages:
"""
"""
CONNECTION_EXCEPTION = ""
COMMON_MESSAGE = ""
class PostgresDBError:
"""
"""
from scripts.constants import PostgresConstants, CommonConstants
from scripts.configurations import postgres_details
from scripts.core.logging.application_logging import logger
from scripts.core.utilities.postgresql_db_utils import PostgresDBUtility
class CustomReport:
def __init__(self):
self.postgres_db_obj = PostgresDBUtility()
"""
logger utility
"""
import logging
import os
import time
from functools import wraps
from logging.handlers import RotatingFileHandler
from scripts.configurations import logging_details
complete_log_path = os.path.join(
logging_details.log_base_path, logging_details.log_file_name
)
if not os.path.exists(logging_details.log_base_path):
os.mkdir(logging_details.log_base_path)
def timed(func):
"""This decorator prints the execution time for the decorated function."""
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
end = time.time()
logger.debug("{} ran in {}s".format(func.__name__, round(end - start, 5)))
return result
return wrapper
def get_logger(
log_file_name=complete_log_path,
log_level=logging_details.log_level,
time_format="%Y-%m-%d %H:%M:%S",
handler_type=logging_details.handler_type,
max_bytes=logging_details.max_bytes,
backup_count=logging_details.backup_count,
):
"""
Creates a rotating log
"""
log_file = os.path.join(log_file_name + ".log")
__logger__ = logging.getLogger(log_file_name)
__logger__.setLevel(log_level.strip().upper())
debug_formatter = (
"%(asctime)s - %(levelname)-6s - %(name)s - "
"[%(threadName)5s:%(filename)5s:%(funcName)5s():"
"%(lineno)s] - %(message)s"
)
formatter_string = (
"%(asctime)s - %(levelname)-6s - %(name)s - %(levelname)3s - %(message)s"
)
if log_level.strip().upper() == log_level:
formatter_string = debug_formatter
formatter = logging.Formatter(formatter_string, time_format)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
__logger__.addHandler(console_handler)
if str(handler_type).lower() == "rotating_file_handler":
# Rotating File Handler
handler = RotatingFileHandler(
log_file, maxBytes=max_bytes, backupCount=backup_count
)
handler.setFormatter(formatter)
__logger__.addHandler(handler)
return __logger__
logger = get_logger()
from .custom_report_model import ReportInput, ReportOutput
custom_report_input_model = ReportInput
custom_report_output_model = ReportOutput
from __future__ import annotations
from typing import Optional
from pydantic import BaseModel
class ReportInput(BaseModel):
from_date: Optional[str]
end_date: Optional[str]
class ReportOutput(BaseModel):
status: bool
message: str
data: str
import uuid
from datetime import datetime, timedelta
from scripts.configurations import security_conf_details
from scripts.core.db.redis_connections import login_db
from scripts.core.security.jwt_util import JWT
jwt = JWT()
def create_token(
user_id,
ip,
token,
age=security_conf_details.lock_out_time_mins,
login_token=None,
project_id=None,
):
"""
This method is to create a cookie
"""
uid = login_token
if not uid:
uid = str(uuid.uuid4()).replace("-", "")
payload = {"ip": ip, "user_id": user_id, "token": token, "uid": uid, "age": age}
if project_id:
payload["project_id"] = project_id
exp = datetime.utcnow() + timedelta(minutes=age)
_extras = {"iss": security_conf_details.issuer, "exp": exp}
_payload = {**payload, **_extras}
new_token = jwt.encode(_payload)
# Add session to redis
login_db.set(uid, new_token)
login_db.expire(uid, timedelta(minutes=age))
return uid
from secrets import compare_digest
from typing import Optional
from fastapi import HTTPException, Request, Response, status
from fastapi.openapi.models import APIKey, APIKeyIn
from fastapi.security import APIKeyCookie
from fastapi.security.api_key import APIKeyBase
from pydantic import BaseModel, Field
from scripts.configurations import security_conf_details
from scripts.core.db.redis_connections import login_db
from scripts.core.logging.application_logging import logger
from scripts.core.security.apply_encryption_util import create_token
from scripts.core.security.jwt_util import JWT
class CookieAuthentication(APIKeyBase):
"""
Authentication backend using a cookie.
Internally, uses a JWT token to store the data.
"""
scheme: APIKeyCookie
cookie_name: str
cookie_secure: bool
def __init__(
self,
cookie_name: str = "login-token",
):
super().__init__()
self.model: APIKey = APIKey(**{"in": APIKeyIn.cookie}, name=cookie_name)
self.scheme_name = self.__class__.__name__
self.cookie_name = cookie_name
self.scheme = APIKeyCookie(name=self.cookie_name, auto_error=False)
self.login_redis = login_db
self.jwt = JWT()
def __call__(self, request: Request, response: Response) -> str:
cookies = request.cookies
login_token = cookies.get("login-token")
if not login_token:
login_token = request.headers.get("login-token")
if not login_token:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
jwt_token = self.login_redis.get(login_token)
if not jwt_token:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
try:
decoded_token = self.jwt.validate(token=jwt_token)
if not decoded_token:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
except Exception as e:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=e.args)
user_id = decoded_token.get("user_id", decoded_token.get("userId"))
project_id = decoded_token.get("project_id", decoded_token.get("projectId"))
_token = decoded_token.get("token")
_age = int(decoded_token.get("age", security_conf_details.lock_out_time_mins))
if any(
[
not compare_digest(security_conf_details.token, _token),
login_token != decoded_token.get("uid"),
]
):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
request.cookies.update(
{
"user_id": user_id,
"project_id": project_id,
"projectId": project_id,
"userId": user_id,
}
)
try:
new_token = create_token(
user_id=user_id,
ip=request.client.host,
token=security_conf_details.token,
age=_age,
login_token=login_token,
project_id=project_id,
)
except Exception as e:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=e.args)
response.set_cookie(
"login-token",
new_token,
samesite="strict",
httponly=True,
secure=security_conf_details.secure_cookie,
max_age=security_conf_details.lock_out_time_mins * 60,
)
# If project ID is null, this is susceptible to 500 Status Code.
# Ensure token formation has project ID in
# login token
if not project_id:
logger.info(
"Project ID not found in Old token. "
"Soon to be deprecated. Proceeding for now"
)
response.headers.update(
{"login-token": new_token, "userId": user_id, "user_id": user_id}
)
return user_id
response.headers.update(
{
"login-token": new_token,
"projectId": project_id,
"project_id": project_id,
"userId": user_id,
"user_id": user_id,
}
)
return user_id
class MetaInfoSchema(BaseModel):
projectId: Optional[str] = ""
project_id: Optional[str] = ""
user_id: Optional[str] = ""
language: Optional[str] = ""
ip_address: Optional[str] = ""
login_token: Optional[str] = Field(alias="login-token")
class Config:
allow_population_by_field_name = True
class MetaInfoCookie(APIKeyBase):
"""
Project ID backend using a cookie.
"""
scheme: APIKeyCookie
def __init__(self):
super().__init__()
self.model: APIKey = APIKey(**{"in": APIKeyIn.cookie}, name="meta")
self.scheme_name = self.__class__.__name__
def __call__(self, request: Request, response: Response):
cookies = request.cookies
cookie_json = {
"projectId": cookies.get("projectId", request.headers.get("projectId")),
"userId": cookies.get(
"userId", cookies.get("userId", request.headers.get("userId"))
),
"language": cookies.get("language", request.headers.get("language")),
}
return MetaInfoSchema(
project_id=cookie_json["projectId"],
user_id=cookie_json["userId"],
projectId=cookie_json["projectId"],
language=cookie_json["language"],
ip_address=request.client.host,
login_token=cookies.get("login-token"),
)
class GetUserID(APIKeyBase):
"""
Project ID backend using a cookie.
"""
scheme: APIKeyCookie
def __init__(self):
super().__init__()
self.model: APIKey = APIKey(**{"in": APIKeyIn.cookie}, name="user_id")
self.scheme_name = self.__class__.__name__
def __call__(self, request: Request, response: Response):
if user_id := request.cookies.get(
"user_id", request.cookies.get("userId", request.headers.get("userId"))
):
return user_id
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
import jwt
from jwt.exceptions import (
ExpiredSignatureError,
InvalidSignatureError,
MissingRequiredClaimError,
)
from scripts.configurations import security_conf_details
from scripts.core.exception.app_exceptions import AuthenticationError, ErrorMessages
from scripts.core.logging.application_logging import logger
class JWT:
def __init__(self):
self.max_login_age = security_conf_details.lock_out_time_mins
self.issuer = security_conf_details.issuer
self.alg = security_conf_details.alg
self.public = security_conf_details.public_key_path
self.private = security_conf_details.private_key_path
def encode(self, payload):
try:
with open(self.private) as f:
key = f.read()
return jwt.encode(payload, key, algorithm=self.alg)
except Exception as e:
logger.exception(f"Exception while encoding JWT: {str(e)}")
raise
finally:
f.close()
def validate(self, token):
try:
with open(self.public) as f:
key = f.read()
payload = jwt.decode(
token,
key,
algorithms=self.alg,
leeway=security_conf_details.leeway_in_mins,
options={"require": ["exp", "iss"]},
)
return payload
except InvalidSignatureError:
raise AuthenticationError(ErrorMessages.ERROR003)
except ExpiredSignatureError:
raise AuthenticationError(ErrorMessages.ERROR002)
except MissingRequiredClaimError:
raise AuthenticationError(ErrorMessages.ERROR002)
except Exception as e:
logger.exception(f"Exception while validating JWT: {str(e)}")
raise
finally:
f.close()
from fastapi import APIRouter
from scripts.core.security.decorators import CookieAuthentication
from scripts.core.services.event_service import event_router
"""
Initializing all the service routers
"""
api_router = APIRouter()
api_router.include_router(event_router)
auth_obj = CookieAuthentication()
"""
event_service.py
This script initializes and configures the FastAPI
application for the Vision App: On Demand Service.
The application includes routes, middleware, and server config.
Usage:
from scripts.core.services.event_service import router
"""
from fastapi import APIRouter
from scripts.configurations import service_details
from scripts.core.handler.event_handler import CustomReport
from scripts.core.logging.application_logging import logger
from scripts.constants import APIConstants, CommonConstants, figlet
from scripts.core.schemas.api import (
custom_report_input_model,
custom_report_output_model
)
event_handler_obj = CustomReport()
event_router = APIRouter(tags=[CommonConstants.EVENT_HANDLING_ENDPOINT])
@event_router.on_event(APIConstants.STARTUP)
async def startup_event():
"""
This function prints the `figlet` string to the console.
"""
print(figlet)
@event_router.on_event(APIConstants.SHUTDOWN)
async def shutdown_event():
"""
This function prints the `Exiting!` string to the console.
"""
print("Exiting!")
@event_router.get(
APIConstants.HEALTH_CHECK)
async def ping():
"""
This function returns the `200 status message`.
"""
return {"status": 200}
@event_router.post(
APIConstants.INIT_DB_ENDPOINT, response_model=custom_report_output_model)
async def initialize_db(input_json: custom_report_input_model):
"""
Initiate postgres db and create tables
Args:
content (InitDbInput): Request body containing the necessary parameters.
Returns:
:param :
"""
try:
return {"status": True, "message": ""}
except Exception as err:
logger.exception(
CommonConstants.EXCEPTION_RAISER.format(str(err)),
exc_info=service_details.exception_trace,
)
return {"status": False, "message": str(err)}
import json
import requests
from fastapi import Request, Response
from fastapi.openapi.models import APIKey, APIKeyIn
from fastapi.security import APIKeyCookie
from fastapi.security.api_key import APIKeyBase
from scripts.configurations import security_conf_details
from scripts.core.logging.application_logging import logger
from scripts.core.security.apply_encryption_util import create_token
class CommonUtils:
@staticmethod
def decode_params(params: str):
# This will decode the parameters to dictionary format and return it back.
return json.loads(params.strip("'\\"))
class AppName(APIKeyBase):
"""
Authentication backend using a cookie.
Internally, uses a JWT token to store the data.
"""
scheme: APIKeyCookie
cookie_name: str
cookie_secure: bool
def __init__(
self,
cookie_name: str = "app",
):
super().__init__()
self.model: APIKey = APIKey(**{"in": APIKeyIn.cookie}, name=cookie_name)
self.scheme_name = self.__class__.__name__
self.cookie_name = cookie_name
self.scheme = APIKeyCookie(name=self.cookie_name, auto_error=False)
async def __call__(self, request: Request, response: Response) -> str:
cookies = request.cookies
app = cookies.get("app")
if not app:
app = request.headers.get("app")
if app:
response.set_cookie(
"app",
app,
samesite="strict",
httponly=True,
max_age=6000000000 * 60,
)
response.headers["app"] = app
return app
def create_internal_token(
host: str = "127.0.0.1", user_id=None, internal_token=security_conf_details.token
):
"""
This method is to create a cookie
Check this to see the implementation: https://gitlab-pm.knowledgelens.com/KnowledgeLens/Products/iLens-2.0/core
/server/ilens-assistant/workflow-management/-/blob/develop/scripts/core/handlers/task_handler.py#L365
"""
try:
if user_id is None:
user_id = "user_099"
return create_token(
user_id=user_id,
ip=host,
token=internal_token,
)
except Exception as e:
logger.exception(str(e))
raise
def hit_external_service(
request_url: str, method: str, request_payload: dict, headers: dict, **files
):
try:
logger.info(f"Hitting external service {request_url}")
return requests.request(
method=method,
url=request_url,
data=request_payload,
headers=headers,
timeout=10,
**files,
)
except Exception as e:
logger.exception(f"Exception while hit_external_service {e}")
raise
import json
from functools import lru_cache
@lru_cache()
def get_db_name(redis_client, project_id: str, database: str, delimiter="__"):
if not project_id:
return database
val = redis_client.get(project_id)
if val is None:
raise ValueError(f"Unknown Project, Project ID: {project_id} Not Found!!!")
val = json.loads(val)
if not val:
return database
# Get the prefix flag to apply project_id prefix to any db
prefix_condition = bool(val.get("source_meta", {}).get("add_prefix_to_database"))
if prefix_condition:
# Get the prefix name from mongo or default to project_id
prefix_name = val.get("source_meta", {}).get("prefix") or project_id
return f"{prefix_name}{delimiter}{database}"
return database
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment