Commit 0e0ffdd2 authored by harshavardhan.c's avatar harshavardhan.c

Add: New Structure format.

parents
from scripts.utils.graph_utility import Neo4jConnection
conn = Neo4jConnection(uri="neo4j+s://932e0faee63c9427e82b046bc4ab6520.neo4jsandbox.com:7687",
user="madhuri",
pwd="Madhuri@71")
import os
import sys
from configparser import BasicInterpolation, ConfigParser
class EnvInterpolation(BasicInterpolation):
"""
Interpolation which expands environment variables in values.
"""
def before_get(self, parser, section, option, value, defaults):
value = super().before_get(parser, section, option, value, defaults)
if not os.path.expandvars(value).startswith('$'):
return os.path.expandvars(value)
else:
return
try:
config = ConfigParser(interpolation=EnvInterpolation())
config.read(f"conf/application.conf")
except Exception as e:
print(f"Error while loading the config: {e}")
print("Failed to Load Configuration. Exiting!!!")
sys.stdout.flush()
sys.exit()
class Service:
port = config["SERVICE"]["port"]
host = config["SERVICE"]["host"]
class DBConf:
DB_URI = config.get('SQL_DB', 'uri')
if not DB_URI:
print("Error, environment variable DB_URI not set")
sys.exit(1)
GRAPH_URI = config.get('GRAPH_DB', 'GRAPH_URI')
GRAPH_USERNAME = config.get('GRAPH_DB', 'GRAPH_USERNAME')
GRAPH_PASSWORD = config.get('GRAPH_CONN', 'GRAPH_PASSWORD')
class Logging:
level = config.get("LOGGING", "level", fallback="DEBUG")
level = level or "INFO"
tb_flag = config.getboolean("LOGGING", "traceback", fallback=True)
tb_flag = tb_flag if tb_flag is not None else True
from neo4j import GraphDatabase
from scripts.config import DBConf
uri = DBConf.GRAPH_URI
username = DBConf.GRAPH_USERNAME
password = DBConf.GRAPH_PASSWORD
neo4j_driver = GraphDatabase.driver(uri, auth=(username, password))
import logging
import os
from logging import StreamHandler
from logging.handlers import RotatingFileHandler, SocketHandler
import yaml
from scripts.config import Logging
# this method is to read the configuration from backup.conf
def read_configuration(file_name):
"""
:param file_name:
:return: all the configuration constants
"""
with open(file_name, 'r') as stream:
try:
return yaml.safe_load(stream)
except Exception as e:
print(f"Failed to load Configuration. Error: {e}")
config = read_configuration("scripts/logging/logger_conf.yml")
logging_config = config["logger"]
logging_config["level"] = Logging.level
enable_traceback: bool = Logging.tb_flag
def get_logger():
"""
Creates a rotating log
"""
__logger__ = logging.getLogger('')
__logger__.setLevel(logging_config["level"].upper())
log_formatter = '%(asctime)s - %(levelname)-6s - [%(threadName)5s:%(funcName)5s():''' \
'%(lineno)s] - %(message)s'
time_format = "%Y-%m-%db %H:%M:%S"
formatter = logging.Formatter(log_formatter, time_format)
for each_handler in logging_config["handlers"]:
if each_handler["type"] in ["RotatingFileHandler"]:
if not os.path.exists("logs"):
os.makedirs("logs")
log_file = os.path.join("logs", f"{logging_config['name']}.log")
temp_handler = RotatingFileHandler(log_file,
maxBytes=each_handler["max_bytes"],
backupCount=each_handler["back_up_count"])
temp_handler.setFormatter(formatter)
elif each_handler["type"] in ["SocketHandler"]:
temp_handler = SocketHandler(each_handler["host"], each_handler["port"])
elif each_handler["type"] in ["StreamHandler"]:
temp_handler = StreamHandler()
temp_handler.setFormatter(formatter)
else:
temp_handler = None
__logger__.addHandler(temp_handler)
return __logger__
logger = get_logger()
logger:
name: helm-automation-script
level: DEBUG
handlers:
- type: RotatingFileHandler
file_path: logs/
max_bytes: 100000000
back_up_count: 5
- type: StreamHandler
name: helm-automation-script
import logging
import os
from logging import StreamHandler
from logging.handlers import RotatingFileHandler, SocketHandler
import yaml
from scripts.config import Logging
# this method is to read the configuration from backup.conf
def read_configuration(file_name):
"""
:param file_name:
:return: all the configuration constants
"""
with open(file_name, "r") as stream:
try:
return yaml.safe_load(stream)
except Exception as e:
print(f"Failed to load Configuration. Error: {e}")
config = read_configuration("scripts/logging/logger_conf.yml")
logging_config = config["logger"]
logging_config["level"] = Logging.level
enable_traceback: bool = Logging.tb_flag
def get_logger():
"""
Creates a rotating log
"""
__logger__ = logging.getLogger("")
__logger__.setLevel(logging_config["level"].upper())
log_formatter = (
"%(asctime)s - %(levelname)-6s - [%(threadName)5s:%(funcName)5s():"
""
"%(lineno)s] - %(message)s"
)
time_format = "%Y-%m-%db %H:%M:%S"
formatter = logging.Formatter(log_formatter, time_format)
for each_handler in logging_config["handlers"]:
if each_handler["type"] in ["RotatingFileHandler"]:
log_file = os.path.join(
each_handler["file_path"] + logging_config["name"] + ".log"
)
if not os.path.exists(each_handler["file_path"]):
os.makedirs(each_handler["file_path"])
temp_handler = RotatingFileHandler(
log_file,
maxBytes=each_handler["max_bytes"],
backupCount=each_handler["back_up_count"],
)
temp_handler.setFormatter(formatter)
elif each_handler["type"] in ["SocketHandler"]:
temp_handler = SocketHandler(each_handler["host"], each_handler["port"])
elif each_handler["type"] in ["StreamHandler"]:
temp_handler = StreamHandler()
temp_handler.setFormatter(formatter)
else:
temp_handler = None
__logger__.addHandler(temp_handler)
return __logger__
logger = get_logger()
from neo4j.exceptions import ServiceUnavailable
from scripts.db import neo4j_driver
from scripts.logging import logger
class Neo4jConnection:
def __init__(self):
try:
self.__driver = neo4j_driver
except Exception as e:
logger.exception(f"Failed to create the driver:{e}")
def close(self):
if self.__driver is not None:
self.__driver.close()
def query(self, query, parameters=None, db=None):
assert self.__driver is not None, "Driver not initialized!"
session = None
response = None
try:
session = self.__driver.session(database=db) if db is not None else self.__driver.session()
response = list(session.run(query, parameters))
except Exception as e:
logger.exception(f"Query failed:{e}")
finally:
if session is not None:
session.close()
return response
def create_event_relationship(self, query, db=None, **kwargs):
assert self.__driver is not None, "Driver not initialized!"
session = None
try:
session = self.__driver.session(database=db) if db is not None else self.__driver.session()
result = session.write_transaction(
self._execute_relationship, query, **kwargs)
# for record in result:
# print("Created friendship between: {p1}, {p2}".format(
# p1=record['p1'], p2=record['p2']))
except Exception as e:
logger.exception(f"Query failed:{e}")
finally:
if session is not None:
session.close()
@staticmethod
def _execute_relationship(tx, query, **kwargs):
# To learn more about the Cypher syntax,
# see https://neo4j.com/docs/cypher-manual/current/
# The Reference Card is also a good resource for keywords,
# see https://neo4j.com/docs/cypher-refcard/current/
result = tx.run(query, **kwargs)
try:
return result
# Capture any errors along with the query and data for traceability
except ServiceUnavailable as exception:
logger.exception("{query} raised an error: \n {exception}".format(
query=query, exception=exception))
raise
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment