Commit 5be3e234 authored by albin.thomas's avatar albin.thomas

#Added implementation for batch-360 app.

parent 41ad1f0e
MODULE_NAME=ut_module
SECURITY_IP_CHECK=False
BASE_PATH=data
MOUNT_DIR=ut_module
LOG_LEVEL=INFO
SECURE_ACCESS=false
CORS_URLS=*
SW_DOCS_URL=/docs
SW_OPENAPI_URL=/openapi.json
ENABLE_CORS=True
SECURE_COOKIE=False
MODULE_PROXY=/ut_module
#DEV
MONGO_URI=mongodb://ilens:ilens4321@192.168.0.220:2717/
REDIS_URI=redis://192.168.0.220:6379
MQTT_URI=mqtt://192.168.0.220:1883
MQTT_HOST=192.168.0.220
MQTT_PORT=1883
REDIS_LOGIN_DB=9
REDIS_PROJECT_DB=18
VERIFY_SIGNATURE=False
REDIS_USER_ROLE_DB=21
# Created by .ignore support plugin (hsz.mobi)
### JetBrains template
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
reports/*.pdf
reports/*.csv
reports/*.xlsx
# User-specific stuff
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/**/usage.statistics.xml
.idea/**/dictionaries
.idea/**/shelf
.idea
logs
# Generated files
.idea/**/contentModel.xml
# Sensitive or high-churn files
.idea/**/dataSources/
.idea/**/dataSources.ids
.idea/**/dataSources.local.xml
.idea/**/sqlDataSources.xml
.idea/**/dynamic.xml
.idea/**/uiDesigner.xml
.idea/**/dbnavigator.xml
# Gradle
.idea/**/gradle.xml
.idea/**/libraries
# Gradle and Maven with auto-import
# When using Gradle or Maven with auto-import, you should exclude module files,
# since they will be recreated, and may cause churn. Uncomment if using
# auto-import.
# .idea/artifacts
# .idea/compiler.xml
# .idea/jarRepositories.xml
# .idea/modules.xml
# .idea/*.iml
# .idea/modules
# *.iml
# *.ipr
# CMake
cmake-build-*/
# Mongo Explorer plugin
.idea/**/mongoSettings.xml
# File-based project format
*.iws
# IntelliJ
out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Cursive Clojure plugin
.idea/replstate.xml
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
# Editor-based Rest Client
.idea/httpRequests
# Android studio 3.1+ serialized cache file
.idea/caches/build_file_checksums.ser
### Python template
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
### VisualStudio template
## Ignore Visual Studio temporary files, build results, and
## files generated by popular Visual Studio add-ons.
##
## Get latest from https://github.com/github/gitignore/blob/master/VisualStudio.gitignore
# User-specific files
*.rsuser
*.suo
*.user
*.userosscache
*.sln.docstates
# User-specific files (MonoDevelop/Xamarin Studio)
*.userprefs
# Mono auto generated files
mono_crash.*
# Build results
[Dd]ebug/
[Dd]ebugPublic/
[Rr]elease/
[Rr]eleases/
x64/
x86/
[Ww][Ii][Nn]32/
[Aa][Rr][Mm]/
[Aa][Rr][Mm]64/
bld/
[Bb]in/
[Oo]bj/
[Ll]og/
[Ll]ogs/
# Visual Studio 2015/2017 cache/options directory
.vs/
# Uncomment if you have tasks that create the project's static files in wwwroot
#wwwroot/
# Visual Studio 2017 auto generated files
Generated\ Files/
# MSTest test Results
[Tt]est[Rr]esult*/
[Bb]uild[Ll]og.*
# NUnit
*.VisualState.xml
TestResult.xml
nunit-*.xml
# Build Results of an ATL Project
[Dd]ebugPS/
[Rr]eleasePS/
dlldata.c
# Benchmark Results
BenchmarkDotNet.Artifacts/
# .NET Core
project.lock.json
project.fragment.lock.json
artifacts/
# ASP.NET Scaffolding
ScaffoldingReadMe.txt
# StyleCop
StyleCopReport.xml
# Files built by Visual Studio
*_i.c
*_p.c
*_h.h
*.ilk
*.meta
*.obj
*.iobj
*.pch
*.pdb
*.ipdb
*.pgc
*.pgd
*.rsp
*.sbr
*.tlb
*.tli
*.tlh
*.tmp
*.tmp_proj
*_wpftmp.csproj
*.vspscc
*.vssscc
.builds
*.pidb
*.svclog
*.scc
# Chutzpah Test files
_Chutzpah*
# Visual C++ cache files
ipch/
*.aps
*.ncb
*.opendb
*.opensdf
*.sdf
*.cachefile
*.VC.db
*.VC.VC.opendb
# Visual Studio profiler
*.psess
*.vsp
*.vspx
*.sap
# Visual Studio Trace Files
*.e2e
# TFS 2012 Local Workspace
$tf/
# Guidance Automation Toolkit
*.gpState
# ReSharper is a .NET coding add-in
_ReSharper*/
*.[Rr]e[Ss]harper
*.DotSettings.user
# TeamCity is a build add-in
_TeamCity*
# DotCover is a Code Coverage Tool
*.dotCover
# AxoCover is a Code Coverage Tool
.axoCover/*
!.axoCover/settings.json
# Coverlet is a free, cross platform Code Coverage Tool
coverage*.json
coverage*.xml
coverage*.info
# Visual Studio code coverage results
*.coverage
*.coveragexml
# NCrunch
_NCrunch_*
.*crunch*.local.xml
nCrunchTemp_*
# MightyMoose
*.mm.*
AutoTest.Net/
# Web workbench (sass)
.sass-cache/
# Installshield output folder
[Ee]xpress/
# DocProject is a documentation generator add-in
DocProject/buildhelp/
DocProject/Help/*.HxT
DocProject/Help/*.HxC
DocProject/Help/*.hhc
DocProject/Help/*.hhk
DocProject/Help/*.hhp
DocProject/Help/Html2
DocProject/Help/html
# Click-Once directory
publish/
# Publish Web Output
*.[Pp]ublish.xml
*.azurePubxml
# Note: Comment the next line if you want to checkin your web deploy settings,
# but database connection strings (with potential passwords) will be unencrypted
*.pubxml
*.publishproj
# Microsoft Azure Web App publish settings. Comment the next line if you want to
# checkin your Azure Web App publish settings, but sensitive information contained
# in these scripts will be unencrypted
PublishScripts/
# NuGet Packages
*.nupkg
# NuGet Symbol Packages
*.snupkg
# The packages folder can be ignored because of Package Restore
**/[Pp]ackages/*
# except build/, which is used as an MSBuild target.
!**/[Pp]ackages/build/
# Uncomment if necessary however generally it will be regenerated when needed
#!**/[Pp]ackages/repositories.config
# NuGet v3's project.json files produces more ignorable files
*.nuget.props
*.nuget.targets
# Microsoft Azure Build Output
csx/
*.build.csdef
# Microsoft Azure Emulator
ecf/
rcf/
# Windows Store app package directories and files
AppPackages/
BundleArtifacts/
Package.StoreAssociation.xml
_pkginfo.txt
*.appx
*.appxbundle
*.appxupload
# Visual Studio cache files
# files ending in .cache can be ignored
*.[Cc]ache
# but keep track of directories ending in .cache
!?*.[Cc]ache/
# Others
ClientBin/
~$*
*~
*.dbmdl
*.dbproj.schemaview
*.jfm
*.pfx
*.publishsettings
orleans.codegen.cs
# Including strong name files can present a security risk
# (https://github.com/github/gitignore/pull/2483#issue-259490424)
#*.snk
# Since there are multiple workflows, uncomment next line to ignore bower_components
# (https://github.com/github/gitignore/pull/1529#issuecomment-104372622)
#bower_components/
# RIA/Silverlight projects
Generated_Code/
# Backup & report files from converting an old project file
# to a newer Visual Studio version. Backup files are not needed,
# because we have git ;-)
_UpgradeReport_Files/
Backup*/
UpgradeLog*.XML
UpgradeLog*.htm
ServiceFabricBackup/
*.rptproj.bak
# SQL Server files
*.mdf
*.ldf
*.ndf
# Business Intelligence projects
*.rdl.data
*.bim.layout
*.bim_*.settings
*.rptproj.rsuser
*- [Bb]ackup.rdl
*- [Bb]ackup ([0-9]).rdl
*- [Bb]ackup ([0-9][0-9]).rdl
# Microsoft Fakes
FakesAssemblies/
# GhostDoc plugin setting file
*.GhostDoc.xml
# Node.js Tools for Visual Studio
.ntvs_analysis.dat
node_modules/
# Visual Studio 6 build log
*.plg
# Visual Studio 6 workspace options file
*.opt
# Visual Studio 6 auto-generated workspace file (contains which files were open etc.)
*.vbw
# Visual Studio LightSwitch build output
**/*.HTMLClient/GeneratedArtifacts
**/*.DesktopClient/GeneratedArtifacts
**/*.DesktopClient/ModelManifest.xml
**/*.Server/GeneratedArtifacts
**/*.Server/ModelManifest.xml
_Pvt_Extensions
# Paket dependency manager
.paket/paket.exe
paket-files/
# FAKE - F# Make
.fake/
# CodeRush personal settings
.cr/personal
# Python Tools for Visual Studio (PTVS)
*.pyc
# Cake - Uncomment if you are using it
# tools/**
# !tools/packages.config
# Tabs Studio
*.tss
# Telerik's JustMock configuration file
*.jmconfig
# BizTalk build output
*.btp.cs
*.btm.cs
*.odx.cs
*.xsd.cs
# OpenCover UI analysis results
OpenCover/
# Azure Stream Analytics local run output
ASALocalRun/
# MSBuild Binary and Structured Log
*.binlog
# NVidia Nsight GPU debugger configuration file
*.nvuser
# MFractors (Xamarin productivity tool) working folder
.mfractor/
# Local History for Visual Studio
.localhistory/
# BeatPulse healthcheck temp database
healthchecksdb
# Backup folder for Package Reference Convert tool in Visual Studio 2017
MigrationBackup/
# Ionide (cross platform F# VS Code tools) working folder
.ionide/
# Fody - auto-generated XML schema
FodyWeavers.xsd
### JupyterNotebooks template
# gitignore template for Jupyter Notebooks
# website: http://jupyter.org/
*/.ipynb_checkpoints/*
# Remove previous ipynb_checkpoints
# git rm -r .ipynb_checkpoints/
.temp/
#!/usr/bin bash
pip install ruff black isort --upgrade
ruff scripts
black scripts --check
isort scripts --check-only
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
hooks:
- id: end-of-file-fixer
- id: trailing-whitespace
- id: requirements-txt-fixer
- repo: https://github.com/asottile/pyupgrade
rev: v3.3.1
hooks:
- id: pyupgrade
args:
- --py3-plus
- --keep-runtime-typing
- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: v0.0.178
hooks:
- id: ruff
args:
- --fix
- repo: https://github.com/pycqa/isort
rev: 5.12.0
hooks:
- id: isort
name: isort (python)
- id: isort
name: isort (cython)
types: [cython]
- id: isort
name: isort (pyi)
types: [pyi]
- repo: https://github.com/psf/black
rev: 22.12.0
hooks:
- id: black
# It is recommended to specify the latest version of Python
# supported by your project here, or alternatively use
# pre-commit's default_language_version, see
# https://pre-commit.com/#top_level-default_language_version
language_version: python3.9
FROM python:3.9.7-slim
RUN apt update && sleep 10 && \
apt install build-essential libzbar-dev curl -y && \
rm -rf /var/lib/apt/lists/*
COPY requirements.txt /code/requirements.txt
WORKDIR /code
RUN pip install -r requirements.txt
COPY . /code
CMD [ "python", "app.py" ]
version = "v7.0"
if __name__ == "__main__":
from dotenv import load_dotenv
load_dotenv()
from main import app
app.root_path = None
import argparse
import logging
import uvicorn
from scripts.config import Service
ap = argparse.ArgumentParser()
if __name__ == "__main__":
ap.add_argument(
"--port",
"-p",
required=False,
default=Service.PORT,
help="Port to start the application.",
)
ap.add_argument(
"--bind",
"-b",
required=False,
default=Service.HOST,
help="IF to start the application.",
)
arguments = vars(ap.parse_args())
logging.info(f"App Starting at {arguments['bind']}:{arguments['port']}")
uvicorn.run("main:app", host=arguments["bind"], port=int(arguments["port"]))
if __name__ == "__main__":
from dotenv import load_dotenv
load_dotenv()
from main import app
app.root_path = None
import argparse
import gc
import logging
import uvicorn
from scripts.config import Service
gc.collect()
ap = argparse.ArgumentParser()
if __name__ == "__main__":
ap.add_argument(
"--port",
"-p",
required=False,
default=Service.PORT,
help="Port to start the application.",
)
ap.add_argument(
"--bind",
"-b",
required=False,
default=Service.HOST,
help="IF to start the application.",
)
arguments = vars(ap.parse_args())
logging.info(f"App Starting at {arguments['bind']}:{arguments['port']}")
uvicorn.run("main:app", host=arguments["bind"], port=int(arguments["port"]))
MODULE_NAME=ut_module
SECURITY_IP_CHECK=False
BASE_PATH=data
MOUNT_DIR=ut_module
LOG_LEVEL=INFO
SECURE_ACCESS=false
CORS_URLS=staging.ilens.io
SW_DOCS_URL=/docs
SW_OPENAPI_URL=/openapi.json
ENABLE_CORS=True
SECURE_COOKIE=False
MODULE_PROXY=/ut_module
#DEV
MONGO_URI=mongodb://ilens:ilens4321@192.168.0.220:2717/
REDIS_URI=redis://192.168.0.220:6379
MQTT_URI=mqtt://192.168.0.220:1883
MQTT_HOST=192.168.0.220
MQTT_PORT=1883
REDIS_LOGIN_DB=9
REDIS_PROJECT_DB=18
VERIFY_SIGNATURE=False
REDIS_USER_ROLE_DB=21
"""
This is main script where fastapi application is initialized.
Running this file directly should be avoided, it is ideal to run app.py or debug.py
"""
from dotenv import load_dotenv
from scripts.constants.secrets import Secrets
from scripts.services.ut_service import ut_router
load_dotenv()
import gc
import os
from __version__ import version
from fastapi import APIRouter, Depends, FastAPI
from fastapi.middleware.cors import CORSMiddleware
from jwt_signature_validator.encoded_payload import (
EncodedPayloadSignatureMiddleware as SignatureVerificationMiddleware,
)
from scripts.config import Service, Security
from scripts.utils.security_utils.decorators import CookieAuthentication
secure_access = os.environ.get("SECURE_ACCESS", default=False)
auth = CookieAuthentication()
gc.collect()
router = APIRouter(tags=["ping"])
@router.get("/api/ut_module/healthcheck")
async def ping():
return {"status": 200}
app = FastAPI(
title="UT",
version=version,
description="UT module",
root_path=Service.PROXY,
openapi_url=os.environ.get("SW_OPENAPI_URL"),
docs_url=os.environ.get("SW_DOCS_URL"),
redoc_url=None,
)
# Signature Verification
if Security.verify_signature:
app.add_middleware(
SignatureVerificationMiddleware,
jwt_secret=Secrets.signature_key,
jwt_algorithms=Secrets.signature_key_alg,
protect_hosts=Service.protected_hosts,
)
app.include_router(ut_router)
if os.environ.get("ENABLE_CORS") in (True, "true", "True") and os.environ.get(
"CORS_URLS"
):
app.add_middleware(
CORSMiddleware,
allow_origins=os.environ.get("CORS_URLS").split(","),
allow_credentials=True,
allow_methods=["GET", "POST", "DELETE", "PUT"],
allow_headers=["*"],
)
[tool.black]
line-length = 120
[tool.isort]
profile = "black"
[tool.ruff]
select = [
"E", # pycodestyle errors
"W", # pycodestyle warnings
"F", # pyflakes
# "I", # isort
"C", # flake8-comprehensions
"B", # flake8-bugbear
]
ignore = [
"E501", # line too long, handled by black
"B008", # do not perform function calls in argument defaults
"C901", # too complex
"E402",
"B904",
"B905",
"B009"
]
[tool.ruff.per-file-ignores]
"__init__.py" = ["F401"]
cryptography==39.0.0
fastapi~=0.89.1
jwt-signature-validator==0.0.1
orjson==3.8.1
paho-mqtt==1.5.0
pre-commit==2.20.0
pycryptodome==3.16.0
pydantic~=1.10.4
PyJWT==2.4.0
pymongo~=3.13.0
python-dotenv~=0.21.0
redis==3.5.3
uvicorn==0.17.5
class EndPoints:
# Base URLS
api_ut = "/ut"
# UT
api_get_sample = "/sample"
api_get_batch_app_data = "/RefreshBatchDetails"
api_get_batch_parameter_table_data = "/GetBatchParameterData"
api_get_raw_material_data = "/GetRawMaterialData"
api_get_non_complains_data = "/GetNonComplianceData"
import os.path
import sys
from pydantic import BaseSettings, Field
class _Service(BaseSettings):
PORT: int = 9876
HOST: str = "0.0.0.0"
PROXY: str = Field(..., env="MODULE_PROXY")
module_name = Field(default="ut_module", env="MODULE_NAME")
class _Security(BaseSettings):
secure_cookie: bool = os.environ.get("SECURE_COOKIE", default=True)
verify_signature: bool = os.environ.get("VERIFY_SIGNATURE", default=False)
protected_hosts: list = os.environ.get("PROTECTED_HOSTS", default="").split(",")
class _MongoConf(BaseSettings):
uri: str = Field(..., env="MONGO_URI")
class _RedisConf(BaseSettings):
uri: str = Field(..., env="REDIS_URI")
login_db: int = Field(default=9, env="REDIS_LOGIN_DB")
project_tags_db: int = Field(default=18, env="REDIS_PROJECT_DB")
user_role_permissions: int = Field(default=21, env="REDIS_USER_ROLE_DB")
class _MQTTConf(BaseSettings):
uri: str = Field(env="MQTT_URI")
host: str = Field(..., env="MQTT_HOST")
port: int = Field(..., env="MQTT_PORT")
publish_base_topic = Field(default="ilens/notifications")
class _PathToStorage(BaseSettings):
BASE_PATH: str = os.environ.get("BASE_PATH")
MOUNT_DIR: str = os.environ.get("MOUNT_DIR")
if not any([BASE_PATH, MOUNT_DIR]):
print("Error, environment variable BASE_PATH or MOUNT_DIRnot set")
sys.exit(1)
MODULE_PATH: str = os.path.join(BASE_PATH, MOUNT_DIR)
IMAGES_PATH: str = os.path.join(MODULE_PATH, "images")
LOG_PATH: str = os.path.join(BASE_PATH, "logs", MOUNT_DIR)
class _LoggerConf(object):
logging_level: str = os.environ.get("LOG_LEVEL", default="DEBUG")
class _KeyPath(BaseSettings):
keys_path: str = "keys"
public: str = os.path.join(_PathToStorage().BASE_PATH, keys_path, "public")
private: str = os.path.join(_PathToStorage().BASE_PATH, keys_path, "private")
Service = _Service()
Security = _Security()
MongoConf = _MongoConf()
RedisConf = _RedisConf()
MQTTConf = _MQTTConf()
PathToStorage = _PathToStorage()
LoggerConf = _LoggerConf()
KeyPath = _KeyPath()
__all__ = [
"Service",
"Security",
"MongoConf",
"RedisConf",
"MQTTConf",
"PathToStorage",
"LoggerConf",
"KeyPath",
]
class STATUS:
SUCCESS = "success"
FAILED = "failed"
SUCCESS_CODES = [200, 201]
class BatchAppHeaderContentConstants:
B_BATCH_PARAMETER_HEADER = [
"Batch No",
"Product Name",
"Mfg. Date",
"Site",
"Plant",
"Mfg. Stage",
"Unit Operation",
"Parameter",
"Value",
"UOM",
"Source"
]
C_RAW_MATERIAL_DATA = [
"Batch No",
"Material No",
"Material Description",
"Mfg. Date",
"Expiration Date",
"Vendor",
"Parameter",
"Value",
"UOM",
]
D_NON_COMPLIANCE_DATA = [
"Batch No",
"NC ID",
"Classification",
"Description",
"Status",
"Observed Date",
"Closed Date",
"Due Date"
]
class DatabaseConstants:
# Databases
batch_360_app = "batch_360_app"
# Collections
collection_a_batch_info = "a_batch_info"
collection_b_batch_parameter_data = "b_batch_parameter_data"
collection_c_raw_material_data = "c_raw_material_data"
collection_d_non_compliance_data = "d_non_compliance_data"
class Secrets:
LOCK_OUT_TIME_MINS = 30
leeway_in_mins = 10
unique_key = "45c37939-0f75"
token = "8674cd1d-2578-4a62-8ab7-d3ee5f9a"
issuer = "ilens"
alg = "RS256"
signature_key = "kliLensKLiLensKL"
signature_key_alg = ["HS256"]
from scripts.db.mongo.ilens_configuration.collections.a_batch_info import \
ABatchInfoCollection
from scripts.db.mongo.ilens_configuration.collections.b_batch_parameter_data import \
BBatchParameterDataCollection
from scripts.db.mongo.ilens_configuration.collections.c_raw_material_data import \
CRawMaterialDataCollection
from scripts.db.mongo.ilens_configuration.collections.d_non_compliance_data import \
DNonComplianceDataCollection
from scripts.logging import logger
from scripts.schemas.ut_schema import GetSample, GetTableData
from scripts.constants import BatchAppHeaderContentConstants
from py2neo import Graph, Node, Relationship
graph = Graph("bolt://192.168.0.220:7687", auth=("neo4j", "root"))
def make_response_for_batch_info(data, parent_batch_no):
try:
batch_no_id_dict = {}
response = {"edges": [], "nodes": []}
parent_ids = []
for nodes in data:
batch_no_id_dict.update({nodes.get("Batch_No", ""): nodes.get("id", "")})
if nodes.get("Level") == -1:
parent_ids.append(nodes.get("id", ""))
edges_list = []
nodes_list = []
for each_nodes in data:
if each_nodes.get("Batch_No") == parent_batch_no:
nodes_list.append(
{"id": each_nodes.get("id", ""),
"label": each_nodes.get("Batch_No", ""),
"title": each_nodes
}
)
for each_parent in each_nodes.get("Parent", []):
parent_id = batch_no_id_dict.get(each_parent, "")
if any([parent_id not in parent_ids and each_nodes.get("Level") == -2, not parent_id]):
continue
if each_nodes.get("Level") == 0:
continue
each_nodes["Parent"] = ",".join(each_nodes.get("Parent", []))
nodes_list.append(
{"id": each_nodes.get("id", ""),
"label": each_nodes.get("Batch_No", ""),
"title": each_nodes
}
)
if each_nodes.get("id", "") == parent_id:
continue
edges_list.append({"to": each_nodes.get("id", ""), "from": parent_id})
print(edges_list)
response["edges"] = edges_list
response["nodes"] = nodes_list
return response
except Exception as e:
logger.error(f"Error while getting sample data - {e}")
raise
def create_query_for_batch_details(direction, level, batch_no):
try:
query_direction = "<" if direction == "upstream" else ">"
sub_query = f"(source)<-[relationship:OCCURRED*0..{level}]-(target)" \
if query_direction == "<" else f"(source)-[relationship:OCCURRED*0..{level}]->(target)"
cypher_query = f"MATCH {sub_query} WHERE source.Batch_No = {batch_no} return source, relationship,target;"
return cypher_query
except Exception as e:
logger.error(f"Error while getting sample data - {e}")
raise e
class UTHandler:
def __init__(self):
self.a_batch_info = ABatchInfoCollection()
self.b_batch_parameter_data = BBatchParameterDataCollection()
self.c_raw_material_data = CRawMaterialDataCollection()
self.d_non_compliance_data = DNonComplianceDataCollection()
# async def get_sample_data(self, request_data: GetSample):
# try:
# data = self.sample.find_one(
# {
# "project_id": request_data.project_id,
# "sample_id": request_data.sample_id,
# }
# )
# return data
# except Exception as e:
# logger.error(f"Error while getting sample data - {e}")
# raise
async def get_batch_info_neo_4j(self, request_data: GetSample):
try:
cypher_query = create_query_for_batch_details(batch_no=request_data.Batch_No, level=request_data.Batch_Level
, direction=request_data.Traverse.lower().replace(" ", ""))
logger.info("before first query")
print(cypher_query)
result = graph.run(cypher=cypher_query)
# logger.info("after first query")
# logger.info("before second query")
# result2 = graph.evaluate(cypher=cypher_query)
# logger.info("after second query")
response = {}
nodes_list = []
node_ids = []
edges_list = []
relation_mapping_ids = []
result_data = result.data()
logger.info("after all query")
logger.info("before lsting")
# result_data = list(result)
logger.info("after lsting")
for record in result_data:
source_node = record['source']
target_node = record['target']
if source_node.identity not in node_ids:
node_ids.append(source_node.identity)
# source_node["id"] = source_node.identity
source_node["Mfg_Date"] = source_node.pop("Mfg._Date")
source_node["Mfg_Stage"] = source_node.pop("Mfg._Stage")
nodes_list.append(
{"id": source_node.identity,
"label": source_node.get("Batch_No", ""),
"title": dict(source_node)}
)
if target_node.identity not in node_ids:
node_ids.append(target_node.identity)
target_node["Mfg_Date"] = target_node.pop("Mfg._Stage")
target_node["Mfg_Stage"] = target_node.pop("Mfg._Date")
nodes_list.append(
{"id": target_node.identity,
"label": target_node.get("Batch_No", ""),
"title": dict(target_node)}
)
relationship = record['relationship']
for each in relationship:
if f'{each.start_node.identity}_{each.end_node.identity}' in relation_mapping_ids:
continue
relation_mapping_ids.append(f'{each.start_node.identity}_{each.end_node.identity}')
edges_list.append({
"from": each.start_node.identity,
"to": each.end_node.identity,
})
# for each_node in nodes_list:
# each_node["Mfg_Date"] = each_node.get("Mfg._Date", "")
# each_node["Mfg_Stage"] = each_node.get("Mfg._Stage", "")
# each_id = each_node.get("id", "")
# each_node.pop("Mfg._Date")
# each_node.pop("Mfg._Stage")
# each_node.pop("id")
# response_node_list.append(
# {"id": each_id,
# "label": each_node.get("Batch_No", ""),
# "title": each_node
# }
# )
logger.info("END")
response["edges"] = edges_list
response["nodes"] = nodes_list
return response
except Exception as e:
logger.error(f"Error while getting sample data - {e}")
raise
async def get_batch_info(self, request_data: GetSample):
try:
data = list(self.a_batch_info.find(
{
"$or": [{"Parent": request_data.Batch_No, "Level": -1 if request_data.Batch_Level > 0 else 0},
{"Batch_No": request_data.Batch_No}]
}
))
if request_data.Batch_Level >= 2:
if request_data.Traverse.replace(" ", "").lower() == "downstream":
request_data.Batch_Level *= -1
levels = [x for x in range(request_data.Batch_Level, -1)]
multi_level_data = list(self.a_batch_info.find({"Level": {"$in": levels}}))
data.extend(multi_level_data)
node_edge_data = make_response_for_batch_info(data, request_data.Batch_No)
return node_edge_data
except Exception as e:
logger.error(f"Error while getting sample data - {e}")
raise
async def get_batch_parameter_table_data(self, request_data: GetTableData):
try:
# data = list(self.a_batch_info.find(
# query={
# "$or": [{"Parent": request_data.Batch_No},
# {"Batch_No": request_data.Batch_No}]
# },
# filter_dict={"_id": 0, "Batch_No": 1, "Parent": 1}
# ))
# batch_no_list = []
# for batch_data in data:
# if batch_data.get("Batch_No", "") == request_data.Batch_No:
# batch_no_list.extend(batch_data.get("Parent", []))
# batch_no_list.append(batch_data.get("Batch_No", ""))
# set(batch_no_list)
table_data = list(self.b_batch_parameter_data.find(
{
"Batch_No": request_data.Batch_No
}
))
tab_list = []
for tab_data in table_data:
dict_tab = {
"Batch No": tab_data.get("Batch_No"),
"Product Name": tab_data.get("Product_Name"),
"Mfg. Date": tab_data.get("Mfg_Date"),
"Site": tab_data.get("Site"),
"Plant": tab_data.get("Plant"),
"Mfg. Stage": tab_data.get("Mfg_Stage"),
"Unit Operation": tab_data.get("Unit_Operation"),
"Parameter": tab_data.get("Parameter"),
"Value": tab_data.get("Value"),
"UOM": tab_data.get("UOM"),
"Source": tab_data.get("Source")
}
tab_list.append(dict_tab)
table_data_response = {"message":
{"tableData":
{"bodyContent": tab_list,
"headerContent": BatchAppHeaderContentConstants.B_BATCH_PARAMETER_HEADER}},
"tableData": {"bodyContent": [],
"headerContent": BatchAppHeaderContentConstants.B_BATCH_PARAMETER_HEADER}}
return table_data_response
except Exception as e:
logger.error(f"Error while getting sample data - {e}")
raise
async def get_raw_material_table_data(self, request_data: GetTableData):
try:
# data = list(self.a_batch_info.find(
# query={
# "$or": [{"Parent": request_data.Batch_No},
# {"Batch_No": request_data.Batch_No}]
# },
# filter_dict={"_id": 0, "Batch_No": 1, "Parent": 1}
# ))
# batch_no_list = []
# for batch_data in data:
# if batch_data.get("Batch_No", "") == request_data.Batch_No:
# batch_no_list.extend(batch_data.get("Parent", []))
# batch_no_list.append(batch_data.get("Batch_No", ""))
# set(batch_no_list)
table_data = list(self.c_raw_material_data.find(
{
"Batch_No": request_data.Batch_No
}
))
tab_list = []
for tab_data in table_data:
dict_tab = {
"Batch No": tab_data.get("Batch_No"),
"Material No": tab_data.get("Material_No"),
"Material Description": tab_data.get("Material_Description"),
"Mfg. Date": tab_data.get("Mfg_Date"),
"Expiration Date": tab_data.get("Expiration_Date"),
"Vendor": tab_data.get("Vendor"),
"Parameter": tab_data.get("Parameter"),
"Value": tab_data.get("Value"),
"UOM": tab_data.get("UOM"),
}
tab_list.append(dict_tab)
table_data_response = {"message":
{"tableData":
{"bodyContent": tab_list,
"headerContent": BatchAppHeaderContentConstants.C_RAW_MATERIAL_DATA}}}
return table_data_response
except Exception as e:
logger.error(f"Error while getting sample data - {e}")
raise
async def get_non_compliance_table_data(self, request_data: GetTableData):
try:
# data = list(self.a_batch_info.find(
# query={
# "$or": [{"Parent": request_data.Batch_No},
# {"Batch_No": request_data.Batch_No}]
# },
# filter_dict={"_id": 0, "Batch_No": 1, "Parent": 1}
# ))
# batch_no_list = []
# for batch_data in data:
# if batch_data.get("Batch_No", "") == request_data.Batch_No:
# batch_no_list.extend(batch_data.get("Parent", []))
# batch_no_list.append(batch_data.get("Batch_No", ""))
# set(batch_no_list)
table_data = list(self.d_non_compliance_data.find(
{
"Batch_No": request_data.Batch_No
}
))
tab_list = []
for tab_data in table_data:
dict_tab = {
"Batch No": tab_data.get("Batch_No"),
"NC ID": tab_data.get("NC_ID"),
"Classification": tab_data.get("Classification"),
"Description": tab_data.get("Description"),
"Status": tab_data.get("Status"),
"Observed Date": tab_data.get("Observed_Date"),
"Closed Date": tab_data.get("Closed_Date"),
"Due Date": tab_data.get("Due_Date")
}
tab_list.append(dict_tab)
table_data_response = {"message":
{"tableData":
{"bodyContent": tab_list,
"headerContent": BatchAppHeaderContentConstants.D_NON_COMPLIANCE_DATA}}}
return table_data_response
except Exception as e:
logger.error(f"Error while getting sample data - {e}")
raise
from pydantic import BaseModel
from scripts.config import MongoConf
from scripts.utils.mongo_util import MongoCollectionBaseClass, MongoConnect
mongo_uri = MongoConf.uri
mongo_obj = MongoConnect(uri=mongo_uri)
mongo_client = mongo_obj()
CollectionBaseClass = MongoCollectionBaseClass
class MongoBaseSchema(BaseModel):
pass
from scripts.constants.db_constants import DatabaseConstants
database = DatabaseConstants.batch_360_app
"""
Sample is the collection given for reference
Update the collection details before using it
"""
from scripts.constants.db_constants import DatabaseConstants
from scripts.db.mongo import CollectionBaseClass, mongo_client
from scripts.db.mongo.ilens_configuration import database
collection_name = DatabaseConstants.collection_a_batch_info
class ABatchInfoCollection(CollectionBaseClass):
def __init__(self):
super().__init__(mongo_client, database=database, collection=collection_name)
# self.project_id = project_id
"""
Sample is the collection given for reference
Update the collection details before using it
"""
from scripts.constants.db_constants import DatabaseConstants
from scripts.db.mongo import CollectionBaseClass, mongo_client
from scripts.db.mongo.ilens_configuration import database
collection_name = DatabaseConstants.collection_b_batch_parameter_data
class BBatchParameterDataCollection(CollectionBaseClass):
def __init__(self):
super().__init__(mongo_client, database=database, collection=collection_name)
# self.project_id = project_id
"""
Sample is the collection given for reference
Update the collection details before using it
"""
from scripts.constants.db_constants import DatabaseConstants
from scripts.db.mongo import CollectionBaseClass, mongo_client
from scripts.db.mongo.ilens_configuration import database
collection_name = DatabaseConstants.collection_c_raw_material_data
class CRawMaterialDataCollection(CollectionBaseClass):
def __init__(self):
super().__init__(mongo_client, database=database, collection=collection_name)
# self.project_id = project_id
"""
Sample is the collection given for reference
Update the collection details before using it
"""
from scripts.constants.db_constants import DatabaseConstants
from scripts.db.mongo import CollectionBaseClass, mongo_client
from scripts.db.mongo.ilens_configuration import database
collection_name = DatabaseConstants.collection_d_non_compliance_data
class DNonComplianceDataCollection(CollectionBaseClass):
def __init__(self):
super().__init__(mongo_client, database=database, collection=collection_name)
# self.project_id = project_id
import redis
from scripts.config import RedisConf
redis_uri = RedisConf.uri
login_db = redis.from_url(redis_uri, db=int(RedisConf.login_db), decode_responses=True)
project_details_db = redis.from_url(
redis_uri, db=int(RedisConf.project_tags_db), decode_responses=True
)
user_role_permissions_redis = redis.from_url(
redis_uri, db=int(RedisConf.user_role_permissions), decode_responses=True
)
class ILensErrors(Exception):
"""Generic iLens Error"""
class KairosDBError(ILensErrors):
"""Kairos DB Error"""
class AuthenticationError(ILensErrors):
pass
class DataFrameFormationError(ILensErrors):
"""Raise when there is an error during dataframe formation"""
class ErrorMessages:
UNKNOWN_ERROR = "Error occurred while processing your request."
ERROR001 = "Authentication Failed. Please verify token"
ERROR002 = "Signature Expired"
ERROR003 = "Signature Not Valid"
ERROR004 = "No Data available to form dataframe"
ERROR005 = "Error occurred While forming chart"
ERROR006 = "Data Not Found"
class IllegalTimeSelectionError(ILensErrors):
pass
class TemplateFormationError(ILensErrors):
pass
class ProductsNotFoundError(ILensErrors):
"""Raise when products matching conditions are not found"""
pass
class DataNotFound(ILensErrors):
"""
Raise when data is not found
"""
class TagDetailsNotFound(Exception):
"""
Raise when tag details are crucial to proceed and meta service returns empty list
"""
class DuplicateName(ILensErrors):
pass
class InputRequestError(Exception):
pass
class JWTDecodingError(Exception):
pass
class DashboardNotFound(ILensErrors):
pass
class WidgetsNotFound(ILensErrors):
pass
class JobCreationError(Exception):
"""
Raised when a Job Creation throws an exception.
Job Creation happens by adding a record to Mongo.
"""
class TemplateNotFoundError(Exception):
"""
This error is raised when Dashboard/Widget Template is not found
"""
class ChartFormationError(Exception):
pass
class QueryFormationError(Exception):
pass
class UnauthorizedError(Exception):
pass
class PostgresDBError(Exception):
pass
class CustomError(Exception):
pass
import logging
import os
import pathlib
from logging import StreamHandler
from logging.handlers import RotatingFileHandler, SocketHandler
from scripts.config import LoggerConf, PathToStorage, Service
def read_configuration():
return {
"name": "ut_module",
"handlers": [
{
"type": "RotatingFileHandler",
"max_bytes": 100000000,
"back_up_count": 5,
"enable": False
if os.environ.get("ENABLE_FILE_LOG", False) in [False, "False", "false"]
else True,
},
{
"type": "StreamHandler",
"enable": False
if os.environ.get("ENABLE_CONSOLE_LOG", True)
in [False, "False", "false"]
else True,
},
],
}
logging_config = read_configuration()
def get_logger():
"""
Creates a rotating log
"""
__logger__ = logging.getLogger("")
_level = LoggerConf.logging_level.upper()
if not _level:
_level = "INFO"
__logger__.setLevel(_level)
log_formatter = "%(asctime)s - %(levelname)-6s - " \
"[%(threadName)5s:%(funcName)5s(): %(lineno)s] - %(message)s"
time_format = "%Y-%m-%d %H:%M:%S"
formatter = logging.Formatter(log_formatter, time_format)
for each_handler in logging_config["handlers"]:
if each_handler["type"] in ["RotatingFileHandler"] and each_handler.get(
"enable", False
):
pathlib.Path(PathToStorage.LOG_PATH).mkdir(parents=True, exist_ok=True)
log_file = pathlib.Path(
PathToStorage.LOG_PATH, f"{Service.module_name}.log"
)
__logger__.debug("File Logger Enabled")
temp_handler = RotatingFileHandler(
log_file,
maxBytes=each_handler["max_bytes"],
backupCount=each_handler["back_up_count"],
)
temp_handler.setFormatter(formatter)
elif each_handler["type"] in ["StreamHandler"] and each_handler.get(
"enable", True
):
__logger__.debug("Console Logger Enabled")
temp_handler = StreamHandler()
temp_handler.setFormatter(formatter)
elif each_handler["type"] in ["SocketHandler"]:
temp_handler = SocketHandler(each_handler["host"], each_handler["port"])
else:
continue
__logger__.addHandler(temp_handler)
return __logger__
logger = get_logger()
logger:
name: ut_module
level: DEBUG
handlers:
- type: RotatingFileHandler
file_path: logs/
max_bytes: 100000000
back_up_count: 5
- type: SocketHandler
host: localhost
port: 23582
- type: StreamHandler
name: ut_module
from typing import Any, Optional
from pydantic import BaseModel
from scripts.constants import STATUS
class DefaultResponse(BaseModel):
status: str = STATUS.SUCCESS
message: Optional[str]
data: Optional[Any]
class DefaultFailureResponse(DefaultResponse):
status: str = STATUS.FAILED
error: Any
from typing import Optional
from pydantic import BaseModel
class GetSample(BaseModel):
Batch_No: str
Batch_Level: int
page_type: Optional[str]
Traverse: Optional[str]
class GetTableData(BaseModel):
Batch_No: str
\ No newline at end of file
from fastapi import APIRouter, Depends
from scripts.api import EndPoints
from scripts.core.handlers.ut_handler import UTHandler
from scripts.logging import logger
from scripts.schemas.response_models import (DefaultFailureResponse,
DefaultResponse)
from scripts.schemas.ut_schema import GetSample, GetTableData
ut_router = APIRouter(prefix=EndPoints.api_ut, tags=["UT Sample Service"])
# @ut_router.post(EndPoints.api_get_sample)
# async def get_sample_data(
# request_data: GetSample, meta: MetaInfoSchema = Depends(get_cookies)
# ):
# """
# API to get sample data
# """
# try:
# ut_handler = UTHandler(project_id=meta.project_id)
# response = await ut_handler.get_sample_data(request_data)
# return DefaultResponse(message="Success", data=response)
# except Exception as e:
# logger.exception(f"Exception while getting sample data: {e}")
# return DefaultFailureResponse(
# message="Failed to get sample", error=str(e)
# ).dict()
@ut_router.post(EndPoints.api_get_batch_app_data)
async def get_batch_app_data(
request_data: GetSample
):
"""
API to get sample data
"""
try:
# ut_handler = UTHandler(project_id=meta.project_id)
response = await UTHandler().get_batch_info_neo_4j(request_data)
return DefaultResponse(message="Success", data=response)
except Exception as e:
logger.exception(f"Exception while getting sample data: {e}")
return DefaultFailureResponse(
message="Failed to get sample", error=str(e)
).dict()
@ut_router.post(EndPoints.api_get_batch_parameter_table_data)
async def get_batch_table_data(
request_data: GetTableData
):
"""
API to get sample data
"""
try:
# ut_handler = UTHandler(project_id=meta.project_id)
response = await UTHandler().get_batch_parameter_table_data(request_data)
return DefaultResponse(message="Success", data=response)
except Exception as e:
logger.exception(f"Exception while getting sample data: {e}")
return DefaultFailureResponse(
message="Failed to get sample", error=str(e)
).dict()
@ut_router.post(EndPoints.api_get_raw_material_data)
async def get_raw_table_data(
request_data: GetTableData
):
"""
API to get sample data
"""
try:
# ut_handler = UTHandler(project_id=meta.project_id)
response = await UTHandler().get_raw_material_table_data(request_data)
return DefaultResponse(message="Success", data=response)
except Exception as e:
logger.exception(f"Exception while getting sample data: {e}")
return DefaultFailureResponse(
message="Failed to get sample", error=str(e)
).dict()
@ut_router.post(EndPoints.api_get_non_complains_data)
async def get_compliance_table_data(
request_data: GetTableData
):
"""
API to get sample data
"""
try:
# ut_handler = UTHandler(project_id=meta.project_id)
response = await UTHandler().get_non_compliance_table_data(request_data)
return DefaultResponse(message="Success", data=response)
except Exception as e:
logger.exception(f"Exception while getting sample data: {e}")
return DefaultFailureResponse(
message="Failed to get sample", error=str(e)
).dict()
\ No newline at end of file
from typing import Optional
from fastapi import Request, Response
from fastapi.openapi.models import APIKey, APIKeyIn
from fastapi.security.api_key import APIKeyBase, APIKeyCookie
from pydantic import BaseModel
class MetaInfoSchema(BaseModel):
project_id: Optional[str] = ""
user_id: Optional[str] = ""
language: Optional[str] = ""
class LookupTemplate:
TEMP_FILE_PATH = "temp/lookup_templates"
supported_mime_type = [
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"application/vnd.ms-excel",
]
class MetaInfoCookie(APIKeyBase):
"""
Project ID backend using a cookie.
"""
scheme: APIKeyCookie
cookie_name: str
def __init__(self, cookie_name: str = "projectId"):
super().__init__()
self.model: APIKey = APIKey(**{"in": APIKeyIn.cookie}, name=cookie_name)
self.cookie_name = cookie_name
self.scheme_name = self.__class__.__name__
self.scheme = APIKeyCookie(name=self.cookie_name, auto_error=False)
async def __call__(self, request: Request, response: Response):
cookies = request.cookies
cookie_json = {
"projectId": cookies.get(
"projectId", cookies.get("project_id", request.headers.get("projectId"))
),
"userId": cookies.get(
"user_id", cookies.get("userId", request.headers.get("userId"))
),
"language": cookies.get("language", request.headers.get("language")),
}
return MetaInfoSchema(
project_id=cookie_json["projectId"],
user_id=cookie_json["userId"],
language=cookie_json["language"],
)
@staticmethod
def set_response_info(cookie_name, cookie_value, response: Response):
response.set_cookie(cookie_name, cookie_value, samesite="strict", httponly=True)
response.headers[cookie_name] = cookie_value
import json
from functools import lru_cache
@lru_cache()
def get_db_name(redis_client, project_id: str, database: str, delimiter="__"):
if not project_id:
return database
val = redis_client.get(project_id)
if val is None:
raise ValueError(f"Unknown Project, Project ID: {project_id} Not Found!!!")
val = json.loads(val)
if not val:
return database
# Get the prefix flag to apply project_id prefix to any db
prefix_condition = bool(val.get("source_meta", {}).get("add_prefix_to_database"))
if prefix_condition:
# Get the prefix name from mongo or default to project_id
prefix_name = val.get("source_meta", {}).get("prefix") or project_id
return f"{prefix_name}{delimiter}{database}"
return database
@lru_cache()
def get_redis_db_prefix(redis_client, project_id: str, delimiter="__"):
if not project_id:
return False
val = redis_client.get(project_id)
if val is None:
return False
val = json.loads(val)
if not val:
return False
# Get the prefix flag to apply project_id prefix to any db
prefix_condition = bool(val.get("source_meta", {}).get("add_prefix_to_database"))
if prefix_condition:
# Get the prefix name from mongo or default to project_id
prefix_name = val.get("source_meta", {}).get("prefix") or project_id
return f"{prefix_name}{delimiter}"
return False
"""
Mongo Utility
Author: Irfanuddin Shafi Ahmed
Reference: Pymongo Documentation
"""
from typing import Dict, List, Optional
from pymongo import MongoClient
from pymongo.cursor import Cursor
from scripts.db.redis_connections import project_details_db
from scripts.logging import logger
from scripts.utils.db_name_util import get_db_name
class MongoConnect:
def __init__(self, uri):
try:
self.uri = uri
self.client = MongoClient(self.uri, connect=False)
except Exception as e:
logger.error(f"Exception in connection {(str(e))}")
raise e
def __call__(self, *args, **kwargs):
return self.client
def __repr__(self):
return f"Mongo Client(uri:{self.uri}, server_info={self.client.server_info()})"
class MongoCollectionBaseClass:
def __init__(self, mongo_client, database, collection):
self.client = mongo_client
self.database = database
self.collection = collection
# Variable to preserve initiated database
# (if database name changes during runtime)
self.__database = None
def __repr__(self):
return f"{self.__class__.__name__}" \
f"(database={self.database}, collection={self.collection})"
@property
def project_id(self):
return self.project_id
@project_id.setter
def project_id(self, project_id):
if self.__database is None:
# storing original db name if None
self.__database = self.database
self.database = get_db_name(
redis_client=project_details_db,
project_id=project_id,
database=self.__database,
)
def insert_one(self, data: Dict):
"""
The function is used to inserting a
document to a collection in a Mongo Database.
:param data: Data to be inserted
:return: Insert ID
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.insert_one(data)
return response.inserted_id
except Exception as e:
logger.error(f"Error in inserting the data {str(e)}")
raise e
def insert_many(self, data: List):
"""
The function is used to inserting documents to a collection in a Mongo Database.
:param data: List of Data to be inserted
:return: Insert IDs
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.insert_many(data)
return response.inserted_ids
except Exception as e:
logger.error(f"Failed to insert many details {str(e)}")
raise e
def find(
self,
query: Dict,
filter_dict: Optional[Dict] = None,
sort=None,
skip: Optional[int] = 0,
collation: Optional[bool] = False,
limit: Optional[int] = None,
) -> Cursor:
"""
The function is used to query documents
from a given collection in a Mongo Database
:param query: Query Dictionary
:param filter_dict: Filter Dictionary
:param sort: List of tuple with key and direction. [(key, -1), ...]
:param skip: Skip Number
:param limit: Limit Number
:param collation:
:return: List of Documents
"""
if sort is None:
sort = list()
if filter_dict is None:
filter_dict = {"_id": 0}
database_name = self.database
collection_name = self.collection
try:
db = self.client[database_name]
collection = db[collection_name]
if len(sort) > 0:
cursor = (
collection.find(
query,
filter_dict,
)
.sort(sort)
.skip(skip)
)
else:
cursor = collection.find(
query,
filter_dict,
).skip(skip)
if limit:
cursor = cursor.limit(limit)
if collation:
cursor = cursor.collation({"locale": "en"})
return cursor
except Exception as e:
logger.error(f"Error in fetching {str(e)}")
raise e
def find_one(self, query: Dict, filter_dict: Optional[Dict] = None):
try:
database_name = self.database
collection_name = self.collection
if filter_dict is None:
filter_dict = {"_id": 0}
db = self.client[database_name]
collection = db[collection_name]
response = collection.find_one(query, filter_dict)
return response
except Exception as e:
logger.error(f"Failed to fetch {str(e)}")
raise e
def find_decrypted(self, query: Dict, filter_dict: Optional[Dict] = None):
try:
database_name = self.database
collection_name = self.collection
if filter_dict is None:
filter_dict = {"_id": 0}
db = self.client[database_name]
collection = db[collection_name]
mongo_response = collection.find_one(query, filter_dict)
if mongo_response:
mongo_response = [mongo_response]
mongo_response = self.fetch_records_from_object(body=mongo_response)
response = mongo_response[0]
return response
else:
return mongo_response
except Exception as e:
logger.error(f"Failed to find decrypted {str(e)}")
raise e
def update_one(
self,
query: Dict,
data: Dict,
upsert: bool = False,
):
"""
:param upsert:
:param query:
:param data:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.update_one(query, {"$set": data}, upsert=upsert)
return response.modified_count
except Exception as e:
logger.error(f"Failed to update one doc {str(e)}")
raise e
def update_many(self, query: Dict, data: Dict, upsert: bool = False):
"""
:param upsert:
:param query:
:param data:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.update_many(query, {"$set": data}, upsert=upsert)
return response.modified_count
except Exception as e:
logger.error(f"Failed to update many {str(e)}")
raise e
def delete_many(self, query: Dict):
"""
:param query:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.delete_many(query)
return response.deleted_count
except Exception as e:
logger.error(f"Failed to delete {str(e)}")
raise e
def delete_one(self, query: Dict):
"""
:param query:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.delete_one(query)
return response.deleted_count
except Exception as e:
logger.error(f"Failed to delete {str(e)}")
raise e
def distinct(self, query_key: str, filter_json: Optional[Dict] = None):
"""
:param query_key:
:param filter_json:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.distinct(query_key, filter_json)
return response
except Exception as e:
logger.error(f"Failed to distinct {str(e)}")
raise e
def aggregate(self, pipelines: List, allow_disk_use: Optional[bool] = False):
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.aggregate(pipelines, allowDiskUse=allow_disk_use)
return response
except Exception as e:
logger.error(f"Failed to aggregate {str(e)}")
raise e
def find_count(self, json_data, database_name, collection_name):
"""
:param json_data:
:param database_name: The database to which the
collection/ documents belongs to.
:param collection_name: The collection to which the documents belongs to.
:return:
"""
try:
db = self.client[database_name]
mongo_response = db[collection_name].find(json_data).count()
return mongo_response
except Exception as e:
logger.error(f"Failed to find count {str(e)}")
raise e
def count_documents(self, query: Dict):
"""
:param query:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.count_documents(query)
return response
except Exception as e:
logger.error(f"Failed to count documents {str(e)}")
raise e
def bulk_write(self, operation):
try:
database_name = self.database
collection_name = self.collection
database_connection = self.client[database_name]
database_connection[collection_name].bulk_write(operation)
return "success"
except Exception as e:
logger.error(f"Failed to bulk write {str(e)}")
raise e
class MongoAggregateBaseClass:
def __init__(
self,
mongo_client,
database,
):
self.client = mongo_client
self.database = database
def aggregate(
self,
collection,
pipelines: List,
):
try:
database_name = self.database
collection_name = collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.aggregate(pipelines)
return response
except Exception as e:
logger.error(f"Failed to get the aggregate data {str(e)}")
raise e
import json
import paho.mqtt.client as mqtt
from scripts.config import MQTTConf
from scripts.logging import logger
def on_connect(rc):
logger.debug("Publisher Connected with result code " + str(rc))
def push_notification(notification, user_id):
try:
client = mqtt.Client()
client.on_connect = on_connect
client.connect(MQTTConf.host, MQTTConf.port, 30)
topic = f"{MQTTConf.publish_base_topic}/{user_id}/reports"
client.publish(topic, json.dumps(notification), retain=False, qos=1)
logger.info(f"Notification message published to {topic}")
logger.debug(f"Notification: {notification}")
client.disconnect()
return True
except Exception as e:
logger.exception(f"Exception at MQTT Publish: {e}")
return False
import uuid
from datetime import datetime, timedelta
from scripts.constants.secrets import Secrets
from scripts.db.redis_connections import login_db
from scripts.errors import CustomError
from scripts.utils.security_utils.jwt_util import JWT
jwt = JWT()
def create_token(
user_id,
ip,
token,
age=Secrets.LOCK_OUT_TIME_MINS,
login_token=None,
project_id=None,
):
"""
This method is to create a cookie
"""
try:
uid = login_token
if not uid:
uid = str(uuid.uuid4()).replace("-", "")
payload = {"ip": ip, "user_id": user_id, "token": token, "uid": uid, "age": age}
if project_id:
payload["project_id"] = project_id
exp = datetime.utcnow() + timedelta(minutes=age)
_extras = {"iss": Secrets.issuer, "exp": exp}
_payload = {**payload, **_extras}
new_token = jwt.encode(_payload)
# Add session to redis
login_db.set(uid, new_token)
login_db.expire(uid, timedelta(minutes=age))
return uid
except Exception as e:
raise CustomError(f"{str(e)}")
from secrets import compare_digest
from fastapi import HTTPException, Request, Response
from fastapi.openapi.models import APIKey, APIKeyIn
from fastapi.security import APIKeyCookie
from fastapi.security.api_key import APIKeyBase
from scripts.config import Service
from scripts.constants.secrets import Secrets
from scripts.db.redis_connections import login_db
from scripts.utils.security_utils.apply_encryption_util import create_token
from scripts.utils.security_utils.jwt_util import JWT
class CookieAuthentication(APIKeyBase):
"""
Authentication backend using a cookie.
Internally, uses a JWT token to store the data.
"""
scheme: APIKeyCookie
cookie_name: str
cookie_secure: bool
def __init__(
self,
cookie_name: str = "login-token",
):
super().__init__()
self.model: APIKey = APIKey(**{"in": APIKeyIn.cookie}, name=cookie_name)
self.scheme_name = self.__class__.__name__
self.cookie_name = cookie_name
self.scheme = APIKeyCookie(name=self.cookie_name, auto_error=False)
self.login_redis = login_db
self.jwt = JWT()
async def __call__(self, request: Request, response: Response) -> str:
cookies = request.cookies
login_token = cookies.get("login-token")
if not login_token:
login_token = request.headers.get("login-token")
if not login_token:
raise HTTPException(status_code=401)
jwt_token = self.login_redis.get(login_token)
if not jwt_token:
raise HTTPException(status_code=401)
try:
decoded_token = self.jwt.validate(token=jwt_token)
if not decoded_token:
raise HTTPException(status_code=401)
except Exception as e:
raise HTTPException(status_code=401, detail=e.args)
user_id = decoded_token.get("user_id")
cookie_user_id = request.cookies.get(
"user_id",
request.cookies.get(
"userId", request.headers.get("userId", request.headers.get("user_id"))
),
)
project_id = decoded_token.get("project_id")
cookie_project_id = request.cookies.get(
"projectId", request.headers.get("projectId")
)
_token = decoded_token.get("token")
_age = int(decoded_token.get("age", Secrets.LOCK_OUT_TIME_MINS))
if any(
[
not compare_digest(Secrets.token, _token),
login_token != decoded_token.get("uid"),
cookie_user_id and not compare_digest(user_id, cookie_user_id),
project_id
and cookie_project_id
and not compare_digest(project_id, cookie_project_id),
]
):
raise HTTPException(status_code=401)
try:
new_token = create_token(
user_id=user_id,
ip=request.client.host,
token=Secrets.token,
age=_age,
login_token=login_token,
project_id=project_id,
)
except Exception as e:
raise HTTPException(status_code=401, detail=e.args)
response.set_cookie(
"login-token",
new_token,
samesite="strict",
httponly=True,
max_age=Secrets.LOCK_OUT_TIME_MINS * 60,
secure=Service.secure_cookie,
)
response.headers["login-token"] = new_token
return user_id
import jwt
from jwt.exceptions import (ExpiredSignatureError, InvalidSignatureError,
MissingRequiredClaimError)
from scripts.config import KeyPath
from scripts.constants.secrets import Secrets
from scripts.errors import AuthenticationError, ErrorMessages
from scripts.logging import logger
class JWT:
def __init__(self):
self.max_login_age = Secrets.LOCK_OUT_TIME_MINS
self.issuer = Secrets.issuer
self.alg = Secrets.alg
self.public = KeyPath.public
self.private = KeyPath.private
def encode(self, payload):
try:
with open(self.private, "r") as f:
key = f.read()
return jwt.encode(payload, key, algorithm=self.alg)
except Exception as e:
logger.exception(f"Exception while encoding JWT: {str(e)}")
raise
finally:
f.close()
def validate(self, token):
try:
with open(self.public, "r") as f:
key = f.read()
payload = jwt.decode(
token,
key,
algorithms=self.alg,
leeway=Secrets.leeway_in_mins,
options={"require": ["exp", "iss"]},
)
return payload
except InvalidSignatureError:
raise AuthenticationError(ErrorMessages.ERROR003)
except ExpiredSignatureError:
raise AuthenticationError(ErrorMessages.ERROR002)
except MissingRequiredClaimError:
raise AuthenticationError(ErrorMessages.ERROR002)
except Exception as e:
logger.exception(f"Exception while validating JWT: {str(e)}")
raise
finally:
f.close()
def decode(self, token):
try:
with open(self.public, "r") as f:
key = f.read()
return jwt.decode(token, key, algorithms=self.alg)
except Exception as e:
logger.exception(f"Exception while encoding JWT: {str(e)}")
raise
finally:
f.close()
import logging
import os
from datetime import datetime, timedelta, timezone
from functools import lru_cache, wraps
import orjson as json
from fastapi import HTTPException, Request, status
from scripts.db.mongo.ilens_configuration.collections.user import User
from scripts.db.mongo.ilens_configuration.collections.user_project import \
UserProject
from scripts.db.redis_connections import user_role_permissions_redis
def timed_lru_cache(seconds: int = 10, maxsize: int = 128):
def wrapper_cache(func):
func = lru_cache(maxsize=maxsize)(func)
func.lifetime = timedelta(seconds=seconds)
func.expiration = datetime.now(timezone.utc) + func.lifetime
@wraps(func)
def wrapped_func(*args, **kwargs):
if datetime.now(timezone.utc) >= func.expiration:
logging.debug("Cache Expired")
func.cache_clear()
func.expiration = datetime.now(timezone.utc) + func.lifetime
return func(*args, **kwargs)
return wrapped_func
return wrapper_cache
@timed_lru_cache(seconds=60, maxsize=1000)
def get_user_role_id(user_id, project_id):
logging.debug("Fetching user role from DB")
user_conn = User() # user collection from ilens_configuration DB
if user_role := user_conn.find_user_role_for_user_id(
user_id=user_id, project_id=project_id
):
return user_role["userrole"][0]
# if user not found in primary collection, check if user is in project collection
user_proj_conn = (
UserProject()
) # user_project collection from ilens_configuration DB
if user_role := user_proj_conn.find_user_role_for_user_id(
user_id=user_id, project_id=project_id
):
return user_role["userrole"][0]
class RBAC:
def __init__(self, entity_name: str, operation: list[str]):
self.entity_name = entity_name
self.operation = operation
def check_permissions(self, user_id: str, project_id: str) -> dict[str, bool]:
user_role_id = get_user_role_id(user_id, project_id)
if not user_role_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="User role not found!"
)
r_key = f"{project_id}__{user_role_id}" # eg: project_100__user_role_100
user_role_rec = user_role_permissions_redis.hget(r_key, self.entity_name)
if not user_role_rec:
return {}
user_role_rec = json.loads(user_role_rec)
if permission_dict := {i: True for i in self.operation if user_role_rec.get(i)}:
return permission_dict
else:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient Permission!"
)
def __call__(self, request: Request) -> dict[str, bool]:
if not os.getenv("ENABLE_RBAC", False):
return {i: True for i in self.operation}
user_id = request.cookies.get("userId", request.headers.get("userId"))
project_id = request.cookies.get("projectId", request.headers.get("projectId"))
return self.check_permissions(user_id=user_id, project_id=project_id)
import json
import logging
import jwt
from fastapi import Request
from jwt.exceptions import (DecodeError, ExpiredSignatureError,
InvalidSignatureError, MissingRequiredClaimError)
from starlette.middleware.base import BaseHTTPMiddleware
from scripts.config import Service
from scripts.constants.secrets import Secrets
ENFORCE_DOMAIN_WILDCARD = "Domain wildcard patterns must be like '*.example.com'."
protect_hosts = Service.protected_hosts
if not protect_hosts:
protect_hosts = ["*"]
for _pattern in protect_hosts:
assert "*" not in _pattern[1:], ENFORCE_DOMAIN_WILDCARD
if _pattern.startswith("*") and _pattern != "*":
assert _pattern.startswith("*."), ENFORCE_DOMAIN_WILDCARD
class SignatureVerificationMiddleware(BaseHTTPMiddleware):
async def set_body(self, request: Request):
async def verify_signature():
receive_ = await request.receive()
signature = bytearray()
signature.extend(receive_.get("body"))
while receive_["more_body"]:
receive_ = await request.receive()
signature.extend(receive_["body"])
signature = bytes(signature)
try:
signature = jwt.decode(
signature.decode(), Secrets.signature_key, algorithms=["HS256"]
)
except (
InvalidSignatureError,
ExpiredSignatureError,
MissingRequiredClaimError,
DecodeError,
) as inv_exp:
logging.error(inv_exp)
signature = {}
signature = json.dumps(signature).encode()
return {"type": receive_["type"], "body": signature, "more_body": False}
if request.headers.get("Content-Type") == "application/json":
host = request.headers.get("host", "").split(":")[0]
is_protected_host = False
for pattern in protect_hosts:
if host == pattern or (
pattern.startswith("*") and host.endswith(pattern[1:])
):
is_protected_host = True
break
if is_protected_host:
return Request(request.scope, verify_signature, request._send)
return request
async def dispatch(self, request, call_next):
request = await self.set_body(request)
response = await call_next(request)
return response
deployment:
environmentVar:
- name: MODULE_NAME
value: "ut_module"
- name: MONGO_URI
valueFrom:
secretKeyRef:
name: mongo-creds
key: MONGO_URI
- name: MQTT_URL
value: "mqtt-service.ilens-infra"
- name: MQTT_URI
value: "mqtt://mqtt-service.ilens-infra:1883"
- name: MQTT_PORT
value: "1883"
- name: REDIS_URI
value: "redis://redis-db-service.ilens-infra:6379"
- name: BASE_PATH
value: "/code/data"
- name: MOUNT_DIR
value: "ut_module"
- name: LOG_LEVEL
value: "INFO"
- name: SECURE_ACCESS
value: "True"
- name: CORS_URLS
value: "https://qa.ilens.io,https://staging.ilens.io"
- name: SW_DOCS_URL
value: "/docs"
- name: SW_OPENAPI_URL
value: "/openapi.json"
- name: ENABLE_CORS
value: "True"
- name: SECURE_COOKIE
value: "True"
- name: VERIFY_SIGNATURE
value: "False"
- name: REDIS_LOGIN_DB
value: "9"
- name: REDIS_PROJECT_DB
value: "18"
- name: REDIS_USER_ROLE_DB
value: "21"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment