Commit d345e786 authored by harshavardhan.c's avatar harshavardhan.c

Project structure for new maintenance logbook

parents
import os
def path():
return os.path.dirname(__file__)
version = "v5.9.1"
import platform
import argparse
from scripts.constants import Conf
if platform.system().lower() != 'windows':
from app.gunicorn_config import run_app
else:
from app.waitress_config import run_app
if __name__ == '__main__':
ap = argparse.ArgumentParser()
ap.add_argument("--host", "-H", required=False, default=Conf.HOST,
help="Port to start the application.")
ap.add_argument("--port", "-P", required=False, default=Conf.PORT,
help="Port to start the application.")
ap.add_argument("--threads", "-T", required=False, default=Conf.THREADS,
help="Number of threads the application.")
ap.add_argument("--workers", "-W", required=False, default=Conf.WORKERS,
help="Number of threads the application.")
arguments = vars(ap.parse_args())
host = arguments["host"]
user_selected_port = int(arguments["port"])
threads = int(arguments["threads"])
workers = int(arguments['workers'])
run_app(host=host, port=user_selected_port, workers=workers, threads=threads)
from abc import ABC
import gunicorn.app.base
from main import app
class StandaloneApplication(gunicorn.app.base.BaseApplication, ABC):
def __init__(self, application, options=None):
self.options = options or {}
self.application = application
super().__init__()
def load_config(self):
config = {key: value for key, value in self.options.items()
if key in self.cfg.settings and value is not None}
for key, value in config.items():
self.cfg.set(key.lower(), value)
def load(self):
return self.application
def run_app(host, port, workers, threads):
options = {
'bind': '%s:%s' % (host, port),
'workers': workers,
'threads': threads
}
StandaloneApplication(app, options).run()
#!/opt/miniconda3/bin/python3.7
"""
This file is the main flask app. Do not run this directly. Run app.py in the root directory
"""
import gc
from scripts.constants import Conf
from flask import Flask
# from flask_compress import Compress
from flask_cors import CORS
gc.collect()
app = Flask(__name__)
# Compress(app)
# Blueprints
# @app.after_request
# def apply_caching(response):
# response.headers["API-Version"] = Service.version_no
# return response
if Conf.ALLOW_CROSS_ORIGIN:
CORS(app, origins="*", allow_headers=["Content-Type", "Authorization", "Access-Control-Allow-Credentials"],
supports_credentials=True, intercept_exceptions=False)
from waitress import serve
from app.main import app
def run_app(host, port, workers, threads):
print(f"Can't use {workers} workers in Windows, defaulting to 1.\nWindows doesn't support forking!")
serve(app, host=host, port=port, threads=threads, ident="ilens")
from main import app
if __name__ == "__main__":
app.run()
[SERVICE]
host=0.0.0.0
port=9337
cors=True
apply_processor_count = False
workers = 3
threads = 6
enable_traceback = False
[MONGO_DB]
uri=mongodb://192.168.0.220:2717/
metadata_db=ilens_configuration
authSource=
authMechanism=
mongo_constants_file_path= conf/mongo_encryption_constants.json
[DB]
type = postgres
host = localhost
port = 2717
username =
password =
name = ilens_logbook
\ No newline at end of file
from __version__ import version
def get_version():
return version
from scripts.constants.configurations import Service, SqlDB, MongoDB, ApplicationKeys
from scripts.constants.mongoconstants import CommonConstants, MongoConstants
class Conf(SqlDB, Service, MongoDB, ApplicationKeys):
pass
class Constants(CommonConstants, MongoConstants):
pass
import os
import sys
from configparser import SafeConfigParser, ConfigParser
from pydantic import BaseSettings
from __root__ import path
config = ConfigParser()
db_folder_path = os.path.join(path(), 'db')
try:
_config = SafeConfigParser()
APP_ENV = os.environ.get('APP_ENV')
if not APP_ENV:
APP_ENV = 'dev'
_conf_path = os.path.join("conf", APP_ENV, "application.conf")
print(_conf_path)
_config.read(_conf_path)
except Exception as e:
print(f"Error while loading the config: {e}")
print("Failed to Load Configuration. Exiting!!!")
sys.exit()
class _Configuration(BaseSettings):
SERVICE_HOST: str = _config["SERVICE"]["host"]
SERVICE_PORT: int = int(_config["SERVICE"]["port"])
# ENABLE_CORS: bool = _config.getboolean("SERVICE", "cors",fallback=False)
SERVICE_WORKERS: int = _config["SERVICE"]["workers"]
SERVICE_THREADS: int = _config["SERVICE"]["threads"]
LOG_ENABLE_TRACEBACK: bool = _config.getboolean('SERVICE', 'enable_traceback', fallback=False)
MONGO_URI: str = _config.get("MONGO_DB", "uri")
MONGO_AUTH_SOURCE: str = _config.get("MONGO_DB", "authSource")
MONGO_AUTH_MECHANISM: str = _config.get("MONGO_DB", "authMechanism")
METADATA_DB: str = _config.get("MONGO_DB", "metadata_db", fallback="ilens_configuration")
MONGO_ENCRYPTION_FILE_PATH: str = _config.get("MONGO_DB", "mongo_constants_file_path",
fallback="conf/mongo_encryption_constants.json")
DB_TYPE: str = _config.get('DB', 'type', fallback='postgres')
DB_HOST: str = _config.get('DB', 'host', fallback='localhost')
DB_PORT: int = _config.get('DB', 'port', fallback=2717)
DB_USERNAME: str = _config.get('DB', 'username', fallback=None)
DB_PASSWORD: str = _config.get('DB', 'password', fallback=None)
DB_NAME: str = _config.get('DB', 'name', fallback='ilens_logbook')
SQLITE_DEFAULT_DB_PATH: str = os.path.join(db_folder_path, 'ilens_logbook.db')
class Config:
env_file = f'{APP_ENV}.env'
env_file_encoding = 'utf-8'
_conf = _Configuration()
class Service(object):
HOST = _conf.SERVICE_HOST
PORT = _conf.SERVICE_PORT
ALLOW_CROSS_ORIGIN = _config.getboolean("SERVICE", "cors", fallback=False)
WORKERS = _conf.SERVICE_WORKERS
THREADS = _conf.SERVICE_THREADS
LOG_ENABLE_TRACEBACK = _conf.LOG_ENABLE_TRACEBACK
class MongoDB(object):
URI = _conf.MONGO_URI
AUTH_SOURCE = _conf.MONGO_AUTH_SOURCE
AUTH_MECHANISM = _conf.MONGO_AUTH_MECHANISM
METADATA_DB = _conf.METADATA_DB
MONGO_ENCRYPTION_FILE_PATH = _conf.MONGO_ENCRYPTION_FILE_PATH
class ApplicationKeys(object):
username_decryption_key = "H)8A$R%^S~H12@A/"
password_decryption_key = 'QVY1bWdMQ0Zxc'
class SqlDB(object):
DB_TYPE = _conf.DB_TYPE
DB_HOST = _conf.DB_HOST
DB_PORT = _conf.DB_PORT
DB_USERNAME = _conf.DB_USERNAME
DB_PASSWORD = _conf.DB_PASSWORD
DB_NAME = _conf.DB_NAME
SQLITE_DEFAULT_DB_PATH = _conf.SQLITE_DEFAULT_DB_PATH
from scripts import version
class CommonConstants(object):
version = version
ui = 'ui_datetime_format'
utc = 'utc_datetime_format'
nsc = 'no_special_chars_datetime_format'
__utc_datetime_format__ = '%Y-%m-%dT%H:%M:%SZ'
__ui_datetime_format__ = '%Y-%m-%d %H:%M:%S'
__no_special_chars_datetime_format__ = '%Y%m%d%H%M%S'
class MongoConstants:
# mongo encryption keys
key_encrypt_keys = "encrypt_keys"
key_exclude_encryption = "exclude_encryption"
product_encrypted = "product_encrypted"
max_docs_per_batch = 5
# cipher_key = "a985195aaa464e61"
# Product based configurable constants
cipher_key = {
'k': '-----BEGIN RSA PRIVATE KEY-----\nMIIEowIBAAKCAQEArVED5cr+tMtFtVmXl2O0cvQbEgoYSIFd8yvkmm6z7'
'XAdX6Eg\nYkKez0ydTl26KOdJ18A7Kn8etWGe8nTkSGheJl9rn/J+lE1zpo4Zg/T3wDnM8FM3\nyuM26vpIb+0oJmNc9'
'DkFXo4WtxRFZDytdETg/YyI+eJYDRDrZSrlqAzIDpAdLpv9\nUhsMhYQ+2n3PcauLeJb0dKPVTc6kSvGCs3LZ0WyTbRnQ'
'yJMCWnaxzpSIUcH7qaqO\nKC/fBCKsZmRjRNSmQ3gepz4VnQKyJCm7CJk+cQiQMQzrspRPvhmGouHZUM36KjsG\n6ylx2'
'Bu6OYy/HbrdRkJKNlv3u6BBL6Pn/ZJZGQIDAQABAoIBABI8eMhESnYbm1RI\nW8S8YzeIO1Pz13hDku7cArcEKG72kcSm'
'58knAN5HjbK59nVI1tJ6gc84JnNH1Qlm\nZsG+p49qkWC4S3zPxHg1MfaaPzpM6qUr4G4656OkV5xdTBDz+gshd9Dp6vZ'
'zDdUc\n9FRMTg8nqx79461mRxpzP8xloaQ0NcKBzFK9e3g/4i72LwgNP3E6xmESiu7goqJ1\nGOAI2mJie3TTY1z8sf4u'
'iSFLMaFrExkq4z4KkwS7qF2nOJxhv8H/g9TGPNWrnzAw\nyBHwINBoUaJwiOT51xxIDLgNQiNoIFuaMKVu2l+rWtoQWKG'
'iOnw1ZhYxeJCXByXC\nQqpAfgECgYEAwpzSfyot3PAlxm9iVK5Zc6lRdBq7Jazt7t91U6zeY7C4xzNG1Tuf\ncSYK3qRwl'
'Mw2uXl9auxyV41rziX9sZhtFUnm4jcGv9MHeaAaSSPSsvrtZDFBS7ky\nl2Ixk1078LTZCLMYmAKCAr2XLmShBPSVcuaL'
'kDRX4rvw7scWmMb86wECgYEA4/yC\nEAjXlL0WlOYDJ3J//Pg4iBtIedHXmn30goNuCBBaoYygXapeytEmU2q5hybQTMTX'
'\nVl/vIAFiu0TX81VQ7LDLJaber/7GEsIT3x+xm0jFvOxFYVhT5b0s0z1CQolnRFsA\ndIwQ5u5GkP65hyJUa3ZMh+L6Vi'
'sSCTKpAco9ZhkCgYAKFZ5CwKjHvhn3AmaSBMbV\n23xBC/GOrjtWGXY288pCWDH7AIk3G3PTpSkDCHpc+4gKbGU3WTFDoC'
'xp7kYLId7l\nL4MrTban0gOdJdK234hXhfEvM+4yQlKAzbHL9RTaEET+0mj/14FtKu3elZBSdWoZ\nHiE1Q8EaGqsNdHuT'
'RxxsAQKBgQCqw7enyveusPL5FE/IfDrhgArX55ehp0uWrEE4\ngLkp0RYRawOzJKlbwMy0LnxIfwoGdmnUiIbTsjYBjs8'
'xs/WpU8LVsOeba3lxE21O\n8q5VYWy61T4ia9ZrjgbFMl0u+TwgNwlgQolmb5Lrh9/vGAejdjhcj+ZyJFCeExQE\nAzd6'
'AQKBgBhe+FwMhTus961jqEKXBXm0/OaOgzOdgl/asuC8oLU7cAVD7sS2LFcU\nu7ofIVIG6cRWRruajIuCdlIcLOedTE4'
'YL5jAuRL1TyVvxMm0FsrkWPABFrHWhsZs\nTSzpiOFJkLJTVnT7hlW/+m0qrKiW0zrFza0JaFwP/lj+hRrYGkOl\n'
'-----END RSA PRIVATE KEY-----'}
encrypt_collection_dict = {}
from scripts.exceptions.exception_codes import MongoExceptionCodes
class Exceptions(MongoExceptionCodes):
pass
class MongoExceptionCodes:
MONGO001 = "Error Code MONGO001: Server was unable to establish connection with MongoDB"
MONGO002 = "Error Code MONGO002: Server faced a problem when inserting document(s) into MongoDB"
MONGO003 = "Error Code MONGO003: Server faced a problem to find the document(s) with the given condition"
MONGO004 = "Error Code MONGO004: Server faced a problem to delete the document(s) with the given condition"
MONGO005 = "Error Code MONGO005: Server faced a problem to update the document(s) with the given condition and data"
MONGO006 = "Error Code MONGO006: Server faced a problem when aggregating the data"
MONGO007 = "Error Code MONGO007: Server faced a problem when closing MongoDB connection"
MONGO008 = "Error Code MONGO008: Found an existing record with the same ID in MongoDB"
MONGO009 = "Error Code MONGO009: Server faced a problem when fetching distinct documents from MongoDB"
MONGO010 = "Error Code MONGO010: Server faced a problem when performing a search and replace in MongoDB"
MONGO011 = "Error Code MONGO011: Server faced a problem when de-serializing MongoDB object"
class ILensException(Exception):
pass
class MongoException(ILensException):
pass
class MongoConnectionException(MongoException):
pass
class MongoQueryException(MongoException):
pass
class MongoEncryptionException(MongoException):
pass
class MongoRecordInsertionException(MongoQueryException):
pass
class MongoFindException(MongoQueryException):
pass
class MongoDeleteException(MongoQueryException):
pass
class MongoUpdateException(MongoQueryException):
pass
class MongoUnknownDatatypeException(MongoEncryptionException):
pass
class MongoDistictQueryException(MongoException):
pass
class MongoFindAndReplaceException(MongoException):
pass
class MongoObjectDeserializationException(MongoException):
pass
class MongoException(Exception):
pass
logger:
name: logbook
level: DEBUG
handlers:
- type: RotatingFileHandler
file_path: logs/
max_bytes: 100000000
back_up_count: 5
- type: SocketHandler
host: localhost
port: 23582
- type: StreamHandler
name: logbook
import logging
import os
from logging import StreamHandler
from logging.handlers import RotatingFileHandler, SocketHandler
import yaml
# this method is to read the configuration from backup.conf
def read_configuration(file_name):
"""
:param file_name:
:return: all the configuration constants
"""
with open(file_name, 'r') as stream:
try:
return yaml.safe_load(stream)
except Exception as e:
print(f"Failed to load Configuration. Error: {e}")
config = read_configuration("scripts/logging/logger_conf.yml")
logging_config = config["logger"]
def get_logger():
"""
Creates a rotating log
"""
__logger__ = logging.getLogger('')
__logger__.setLevel(logging_config["level"].upper())
log_formatter = '%(asctime)s - %(levelname)-6s - [%(threadName)5s:%(funcName)5s():''' \
'%(lineno)s] - %(message)s'
time_format = "%Y-%m-%d %H:%M:%S"
formatter = logging.Formatter(log_formatter, time_format)
for each_handler in logging_config["handlers"]:
if each_handler["type"] in ["RotatingFileHandler"]:
log_file = os.path.join(each_handler["file_path"] + logging_config["name"] + '.log')
if not os.path.exists(each_handler["file_path"]):
os.makedirs(each_handler["file_path"])
temp_handler = RotatingFileHandler(log_file,
maxBytes=each_handler["max_bytes"],
backupCount=each_handler["back_up_count"])
temp_handler.setFormatter(formatter)
elif each_handler["type"] in ["SocketHandler"]:
temp_handler = SocketHandler(each_handler["host"], each_handler["port"])
elif each_handler["type"] in ["StreamHandler"]:
temp_handler = StreamHandler()
temp_handler.setFormatter(formatter)
else:
temp_handler = None
__logger__.addHandler(temp_handler)
return __logger__
logger = get_logger()
from scripts.utils.static_messages import StaticMessages
class Messages(StaticMessages):
pass
import os
import time
from datetime import datetime, timezone
from scripts.constants import Constants, Conf
from scripts.logging.logging import logger as LOG
from scripts.utils import Messages
from scripts.utils.encryption_utility import AESCipher
class CommonUtils(Constants):
def __init__(self):
pass
@staticmethod
def get_utc_datetime_now():
return datetime.utcnow()
def ui_datetime_format(self):
return self.__ui_datetime_format__
def utc_datetime_format(self):
return self.__utc_datetime_format__
def no_special_chars_datetime_format(self):
return self.__no_special_chars_datetime_format__
def get_datetime_str(self, dt=None, dt_format='utc_datetime_format'):
"""
Returns the datetime string.
:param dt: The datetime value to be converted as str. If not passes, Current UTC time will be returned.
:param dt_format: The format to which the date string should be converted. 'utc_datetime_format' is the default.
Supported values are:- 'ui_datetime_format', 'utc_datetime_format', 'no_special_chars_datetime_format'
:return: Datetime string
"""
if dt is None:
dt = self.get_utc_datetime_now()
return dt.strftime(getattr(self, dt_format)())
def get_datetime_dt(self, dt_str, dt_format='utc_datetime_format'):
return datetime.strptime(date_string=dt_str, format=getattr(self, dt_format)())
@staticmethod
def get_epoch_now(_round=True):
if _round:
return int(time.time())
else:
return time.time()
@staticmethod
def system_timezone():
return datetime.now(timezone.utc).astimezone().tzname()
@staticmethod
def local_time_offset(t=None):
if t is None:
t = time.time()
if time.localtime(t).tm_isdst and time.daylight:
return -time.altzone
else:
return -time.timezone
def tz_offset_hm(self):
seconds = self.local_time_offset()
hour = seconds // 3600
seconds %= 3600
minutes = seconds // 60
seconds %= 60
return "%d:%02d:%02d" % (hour, minutes, seconds)
@staticmethod
def delete_file(file_name):
try:
os.remove(file_name)
return True
except OSError as e:
print(e)
return False
@staticmethod
def write_to_file(file_name, content, overwrite=True):
try:
if overwrite:
write_mode = 'w+'
else:
write_mode = 'a+'
f = open(file_name, write_mode)
f.write(content)
f.close()
return True
except Exception as e:
raise Exception(f'Failed to write contents to file: {e}')
@staticmethod
def read_from_file(file_name):
try:
f = open(file_name, 'r+')
content = f.read()
f.close()
return content
except Exception as e:
raise Exception(f'Failed to read contents from file: {e}')
@staticmethod
def db_host():
return Conf.DB_HOST
@staticmethod
def db_port():
return int(Conf.DB_PORT)
@staticmethod
def db_type():
return Conf.DB_TYPE
@staticmethod
def db_name():
return Conf.DB_NAME
@staticmethod
def db_username():
if 'ENC(' in Conf.DB_USERNAME:
aes_obj = AESCipher(key=Conf.username_decryption_key)
enc_username = Conf.DB_USERNAME.lstrip('ENC(').rstrip(')')
username = aes_obj.decrypt(enc=enc_username)
return username
else:
return Conf.DB_USERNAME
@staticmethod
def db_password():
if 'ENC(' in Conf.DB_PASSWORD:
aes_obj = AESCipher(key=Conf.password_decryption_key)
enc_password = Conf.DB_PASSWORD.lstrip('ENC(').rstrip(')')
password = aes_obj.decrypt(enc=enc_password)
return password
else:
return Conf.DB_PASSWORD
def sqlite_db_endpoint(self):
LOG.debug("DB type SQLite configured")
if self.db_host() is None:
LOG.debug(f"DB path/host not configured. Falling back to default path '{Conf.SQLITE_DEFAULT_DB_PATH}'")
return f'{self.db_type()}://{Conf.SQLITE_DEFAULT_DB_PATH}'
else:
LOG.debug(f"Using DB from user given path '{self.db_host()}'")
return f'{self.db_type()}://{self.db_host()}'
def mysql_db_endpoint(self):
if self.db_username() is None or self.db_password() is None:
LOG.debug(Messages.MESSAGE01)
return f'{self.db_type()}://{self.db_host()}/{self.db_name()}'
else:
LOG.debug(Messages.MESSAGE01)
return f'{self.db_type()}://{self.db_username()}:{self.db_password()}@{self.db_host()}/{self.db_name()}'
def postgresql_db_endpoint(self):
if self.db_username() is None or self.db_password() is None:
LOG.debug(Messages.MESSAGE01)
return f'{self.db_type()}://{self.db_host()}/{self.db_name()}'
else:
LOG.debug(Messages.MESSAGE01)
return f'{self.db_type()}://{self.db_username()}:{self.db_password()}@{self.db_host()}/{self.db_name()}'
def oracle_db_endpoint(self):
if self.db_username() is None or self.db_password() is None:
LOG.debug(Messages.MESSAGE01)
return f'{self.db_type()}://{self.db_host()}/{self.db_name()}'
else:
LOG.debug(Messages.MESSAGE01)
return f'{self.db_type()}://{self.db_username()}:' \
f'{self.db_password()}@{self.db_host()}/{self.db_name()}'
def mssql_db_endpoint(self):
if self.db_username() is None or self.db_password() is None:
LOG.debug(Messages.MESSAGE01)
return f'{self.db_type()}+pymssql://{self.db_host()}/{self.db_name()}'
else:
LOG.debug(Messages.MESSAGE01)
return f'{self.db_type()}+' \
f'pymssql://{self.db_username()}:{self.db_password()}@{self.db_host()}/{self.db_name()}'
def db_endpoint(self):
return getattr(self, f'{self.db_type()}_db_endpoint')()
@staticmethod
def sqlalchemy_echo():
if Conf.LOG_LEVEL.lower() == 'trace':
return True
else:
return False
@staticmethod
def enable_traceback():
return Conf.LOG_ENABLE_TRACEBACK
@staticmethod
def db_uri():
return Conf.URI
@staticmethod
def db_mongo_name():
return Conf.METADATA_DB
@staticmethod
def db_auth_mechanism():
return Conf.AUTH_MECHANISM
@staticmethod
def db_auth_source():
return Conf.AUTH_SOURCE
@staticmethod
def db_encryption_constants_file_path():
return Conf.MONGO_ENCRYPTION_FILE_PATH
@staticmethod
def server_host():
return Conf.HOST
@staticmethod
def server_port():
return Conf.PORT
@staticmethod
def cross_origin():
return Conf.ALLOW_CROSS_ORIGIN
@staticmethod
def server_threads():
return Conf.THREADS
@staticmethod
def server_workers():
return Conf.WORKERS
@staticmethod
def key_encrypt_keys():
return Constants.key_encrypt_keys
@staticmethod
def key_exclude_encryption():
return Constants.key_exclude_encryption
@staticmethod
def product_encrypted():
return Constants.product_encrypted
def max_docs_per_batch(self):
return Constants.max_docs_per_batch
import base64
from Cryptodome import Random
from Cryptodome.Cipher import AES
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.asymmetric import rsa
from scripts.logging.logging import logger as LOG
class AESCipher(object):
"""
A classical AES Cipher. Can use any size of data and any size of password thanks to padding.
Also ensure the coherence and the type of the data with a unicode to byte converter.
"""
def __init__(self, key):
self.bs = AES.block_size
self.key = AESCipher.str_to_bytes(key)
@staticmethod
def str_to_bytes(data):
u_type = type(b''.decode('utf8'))
if isinstance(data, u_type):
return data.encode('utf8')
return data
def _pad(self, s):
return s + (self.bs - len(s) % self.bs) * AESCipher.str_to_bytes(chr(self.bs - len(s) % self.bs))
@staticmethod
def _unpad(s):
return s[:-ord(s[len(s) - 1:])]
def encrypt(self, raw):
raw = self._pad(AESCipher.str_to_bytes(raw))
iv = Random.new().read(AES.block_size)
cipher = AES.new(self.key, AES.MODE_CBC, iv)
return base64.b64encode(iv + cipher.encrypt(raw)).decode('utf-8')
def decrypt(self, enc):
# self.key = self.key.decode()
enc = base64.b64decode(enc)
iv = enc[:AES.block_size]
cipher = AES.new(self.key, AES.MODE_CBC, iv)
data = self._unpad(cipher.decrypt(enc[AES.block_size:]))
return data.decode('utf-8')
class AsymmetricEncryption(object):
"""
This utility is used for :-
1.generating public and private key pairs
2.serialization of keys to strings
3.deserialization of strings to keys
4.Encryption and decryption of data
"""
def __init__(self):
pass
@staticmethod
def generate_private_key():
try:
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
return private_key
except Exception as e:
LOG.error("Exception in generating private key" + str(e))
return None
@staticmethod
def generate_public_key(private_key):
return private_key.public_key()
@staticmethod
def encrypt_data(public_key, message):
"""
param: message - -- string
param: public_key - --object
:returns string
"""
try:
encrypted_msg = public_key.encrypt(
message.encode('utf-8'),
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return base64.b64encode(encrypted_msg).decode('utf-8')
except Exception as e:
LOG.error("Exception in encryption" + str(e))
raise e
@staticmethod
def decrypt_data(private_key, encrypted_data):
"""
:param encrypted_data - -- string
:param private_key - --object
:returns string
"""
try:
decrypted_data = private_key.decrypt(
base64.b64decode(encrypted_data.encode('utf-8')),
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return decrypted_data.decode('utf-8')
except Exception as e:
LOG.error("Exception in decryption" + str(e))
return None
@staticmethod
def gen_signature(message, private_key):
"""
:param message--- string
:param private_key---object
:returns string
"""
try:
signature = private_key.sign(
data=message.encode('utf-8'),
padding=padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
algorithm=hashes.SHA256()
)
return base64.b64encode(signature).decode("utf-8")
except Exception as e:
LOG.error("Exception in signing" + str(e))
return None
@staticmethod
def verify_signature(signature, public_key, message):
"""
:param signature: str
:param public_key: str
:param message: str
"""
try:
public_key.verify(
signature=base64.b64decode(signature.encode("utf-8")),
data=message.encode("utf-8"),
padding=padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
algorithm=hashes.SHA256()
)
signature_valid = True
except InvalidSignature:
signature_valid = False
return signature_valid
@staticmethod
def serialize_public_key(public_key):
public_key_pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
).decode("utf-8")
return public_key_pem
@staticmethod
def serialize_private_key(private_key):
private_key_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
).decode("utf-8")
return private_key_pem
@staticmethod
def deserialize_private_key(private_key_pem):
private_key = serialization.load_pem_private_key(
private_key_pem.encode("utf-8"),
password=None,
backend=default_backend()
)
return private_key
@staticmethod
def deserialize_public_key(public_key_pem):
public_key = serialization.load_pem_public_key(
public_key_pem.encode("utf-8"),
backend=default_backend()
)
return public_key
This diff is collapsed.
class StaticMessages:
MESSAGE01 = "DB Username/Password not Configured"
from scripts.constants import Conf
from scripts.utils.encryption_utility import AESCipher
# username = "ENC(Tptxz28wsDcTAd0CAya+klMAWrM6VqswgTYjNaX/Z3k=)"
username = "harsha"
if 'ENC(' in username:
aes_obj = AESCipher(key=Conf.username_decryption_key)
enc_username = username.lstrip('ENC(').rstrip(')')
print(enc_username)
username = aes_obj.decrypt(enc=enc_username)
print(username)
else:
print(username)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment