Commit 6449136a authored by harshavardhan.c's avatar harshavardhan.c

Dev: Energy and forecast changes.

parents
#Ignore the logs directory
logs/
#Ignoring the password file
passwords.txt
#Ignoring git and cache folders
.git
.cache
#Ignoring all the markdown and class files
*.md
**/*.class
.env
__pycache__
*.pyc
*.pyo
*.pyd
.Python
env
pip-log.txt
pip-delete-this-directory.txt
.tox
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
*.log
[SERVICE]
port=5121
host=0.0.0.0
version_no=1.0.0
workers=1
module_name=$APP_NAME
enable_traceback = True
[MONGO_DB]
uri= $MONGO_URI
[POSTGRES]
maintenance = $MAINTENANCE_URI
assistant = $ASSISTANT_URI
[DATABASES]
metadata_db=$METADATA_DB
ilens_assistant=$ILENS_ASSISTANT_DB
ilens_asset_model=$ILENS_ASSET_MODEL_DB
[PATH_TO_SERVICES]
scheduler_proxy = $SCHEDULER_PROXY
data_engine=$FORM_DE
form_mt = $FORM_MT
metadata_services=$METADATA_SERVICES
audit_proxy=$AUDIT_PROXY
[BACKFILL]
interval=$INTERVAL
[DIRECTORY]
base_path = $BASE_PATH
mount_dir = $MOUNT_DIR
keys_path = data/keys
[REDIS]
host=$REDIS_HOST
port=$REDIS_PORT
login_db = 9
project_tags_db = 18
[KAFKA]
host=$KAFKA_HOST
port=$KAFKA_PORT
topic=$KAFKA_TOPIC
audit_topic=$KAFKA_AUDIT_TOPIC
enable_sites_partition=$ENABLE_KAFKA_PARTITION
split_key=$KAFKA_PARTITION_KEY
round_robin_enable=$ROUND_ROBIN_PARTITION
[AUDITING]
periodic_entry_auditing=$PERIODIC_ENTRY_AUDITING
form_non_periodic_auditing=$FORM_NON_PERIODIC_AUDITING
form_periodic_auditing=$FORM_PERIODIC_AUDITING
[PATH_TO_OTHER_SERVICES]
email_service = $EMAIL_SERVICE_PROXY
[MQTT]
uri = $MQTT_URI
host = $MQTT_URL
port = $MQTT_PORT
publish_base_topic = ilens/notifications
"""
This file exposes configurations from config file and environments as Class Objects
"""
import shutil
if __name__ == '__main__':
from dotenv import load_dotenv
load_dotenv()
import os.path
import sys
from configparser import ConfigParser, BasicInterpolation
class EnvInterpolation(BasicInterpolation):
"""
Interpolation which expands environment variables in values.
"""
def before_get(self, parser, section, option, value, defaults):
value = super().before_get(parser, section, option, value, defaults)
if not os.path.expandvars(value).startswith('$'):
return os.path.expandvars(value)
else:
return
try:
config = ConfigParser(interpolation=EnvInterpolation())
config.read(f"conf/application.conf")
except Exception as e:
print(f"Error while loading the config: {e}")
print("Failed to Load Configuration. Exiting!!!")
sys.stdout.flush()
sys.exit()
class Service:
HOST = config.get("SERVICE", "host")
PORT = config.getint("SERVICE", "port")
class DBConf:
MONGO_URI = config.get('MONGO_DB', 'uri')
if not MONGO_URI:
print("Error, environment variable MONGO_URI not set")
sys.exit(1)
MAINTENANCE_DB_URI = config.get('POSTGRES', "maintenance")
if not MAINTENANCE_DB_URI:
print("MAINTENANCE_DB_URI env variables missing")
sys.exit(1)
ASSISTANT_DB_URI = config.get('POSTGRES', "assistant")
if not ASSISTANT_DB_URI:
print("ASSISTANT_DB_URI env variables missing")
sys.exit(1)
class KafkaConf:
host = config.get('KAFKA', 'host')
port = config.get('KAFKA', 'port')
topic = config.get('KAFKA', 'topic')
if not any([topic, host, port]):
print("KAFKA env variables missing, continuing without Kafka/Kairos support")
audit_topic = config.get('KAFKA', 'audit_topic')
enable_sites_partition = config.getboolean("KAFKA", "ENABLE_KAFKA_PARTITION", fallback=True)
split_key = config["KAFKA"].get('KAFKA_PARTITION_KEY', 'site_id')
round_robin_enable = config.getboolean("KAFKA", "ROUND_ROBIN_PARTITION", fallback=True)
class StoragePaths:
module_name = config.get('SERVICE', 'module_name')
if not module_name:
module_name = "form_management"
base_path = os.path.join("data", module_name)
class DatabaseConstants:
metadata_db = config.get("DATABASES", "metadata_db")
if not bool(metadata_db):
metadata_db = "ilens_configuration"
ilens_assistant_db = config.get("DATABASES", "ilens_assistant")
if not bool(ilens_assistant_db):
ilens_assistant_db = "ilens_assistant"
ilens_asset_model_db = config.get("DATABASES", "ilens_asset_model")
if not bool(ilens_asset_model_db):
ilens_asset_model_db = "ilens_asset_model"
class PathToServices:
DATA_ENGINE = config.get("PATH_TO_SERVICES", "data_engine")
if not bool(DATA_ENGINE):
print("FORM_DE not set, proceeding without data engine support")
METADATA_SERVICES = config.get("PATH_TO_SERVICES", "metadata_services")
if not bool(METADATA_SERVICES):
print("METADATA_SERVICES not set, proceeding without metadata_services support")
AUDIT_PROXY = config.get("PATH_TO_SERVICES", "audit_proxy")
if not bool(AUDIT_PROXY):
print("AUDIT_PROXY not set, proceeding without audit_proxy support")
FORM_MT = config.get("PATH_TO_SERVICES", "form_mt")
if not bool(FORM_MT):
print("Error, environment variable FORM_MT not set")
sys.exit(1)
class PathToStorage:
BASE_PATH = config.get("DIRECTORY", "base_path")
if not BASE_PATH:
print("Error, environment variable BASE_PATH not set")
sys.exit(1)
MOUNT_DIR = config.get("DIRECTORY", "mount_dir")
if not MOUNT_DIR:
print("Error, environment variable MOUNT_DIR not set")
sys.exit(1)
MODULE_PATH = os.path.join(BASE_PATH, MOUNT_DIR.lstrip('/'))
FORM_IO_UPLOADS = os.path.join(MODULE_PATH, "form_io_uploads")
TEMPLATES_UPLOADS = os.path.join(MODULE_PATH, "templates_uploads")
LOGS_MODULE_PATH = f"{BASE_PATH}/logs{MOUNT_DIR}/"
class KeyPath:
keys_path = config['DIRECTORY']['keys_path']
if not os.path.isfile(os.path.join(keys_path, "public")) or not os.path.isfile(
os.path.join(keys_path, "private")):
if not os.path.exists(keys_path):
os.makedirs(keys_path)
shutil.copy(os.path.join("assets", "keys", "public"), os.path.join(keys_path, "public"))
shutil.copy(os.path.join("assets", "keys", "private"), os.path.join(keys_path, "private"))
public = os.path.join(keys_path, "public")
private = os.path.join(keys_path, "private")
class RedisConfig:
host = config["REDIS"]["host"]
port = int(config["REDIS"]["port"])
login_db = config["REDIS"]["login_db"]
project_tags_db = config.getint("REDIS", "project_tags_db")
class BackFill:
interval_in_mins = config.get("BACKFILL", "interval", fallback=60)
class EnableAuditing:
periodic_entry_auditing = config.getboolean("AUDITING", "periodic_entry_auditing", fallback=False)
form_non_periodic_auditing = config.getboolean("AUDITING", "form_non_periodic_auditing", fallback=False)
form_periodic_auditing = config.getboolean("AUDITING", "form_periodic_auditing", fallback=False)
class OtherService:
EMAIL_URL = config["PATH_TO_OTHER_SERVICES"]["email_service"]
class MQTTConf:
uri = config["MQTT"]["uri"]
host = config["MQTT"]["host"]
port = int(config["MQTT"]["port"])
publish_base_topic = config["MQTT"]["publish_base_topic"]
import os.path
import sys
from datetime import datetime
from dateutil import parser
from openpyxl import load_workbook
from pytz import timezone
from scripts.logging.logging import logger
from scripts.utils.excel_render import ExcelReportRender
from scripts.utils.ilens_publish_data import DataPush
enable_timestamp = os.environ.get("CURRENT_TIMESTAMP", default="true")
code_timezone = os.environ.get("TIMEZONE", default="Asia/Kolkata")
project_id = os.environ.get("PROJECT_ID", default="project_099")
files_mapping = os.environ.get("FILES_MAPPING", default="")
if not files_mapping:
print("Files Mapping Found empty, existing!!")
sys.exit(0)
class ExcelUploadHandler:
def __init__(self):
self.excel = ExcelReportRender()
self.tz = code_timezone
self.enable_current_timestamp = True if enable_timestamp.lower() in [True, 'true'] else False
self.project_id = project_id
self.kafka_conn = DataPush()
def parse_excel_data(self, template_file_path, data_file_path, template_fill_range,
sheet_name="Sheet1"):
try:
if not os.path.join(template_file_path):
logger.exception(f"file_path for the file {template_file_path} not exists")
raise FileNotFoundError
if not os.path.join(data_file_path):
logger.exception(f"file_path for the file {data_file_path} not exists")
raise FileNotFoundError
template_parameters = self.get_file_object_data(file=template_file_path, fill_range=template_fill_range,
sheet_name=sheet_name)
data_parameters = self.get_file_object_data(file=data_file_path, fill_range=template_fill_range,
sheet_name=sheet_name)
self.execute_data_insertion(template_parameters=template_parameters, data_parameters=data_parameters,
fill_range=template_fill_range)
print(data_parameters)
except Exception as e:
logger.exception(f"Exception occurred while parsing the excel data {e.args}")
def get_file_object_data(self, file, fill_range, sheet_name):
try:
file_obj = load_workbook(filename=file)
return_dict = {}
sheet_names = file_obj.sheetnames
if sheet_name not in sheet_names:
raise
sheet_obj = file_obj[sheet_name]
__start_coords__, __end_coords__ = self.excel.get_row_column(fill_range)
start_row, start_column = __start_coords__
end_row, end_column = __end_coords__
for i in range(start_row, end_row + 1):
for j in range(start_column, end_column + 1):
cell_value = sheet_obj.cell(row=i, column=j).value
return_dict.update({f"{self.excel.num_to_coord(row=i, column=j, zero_indexed=False)}": cell_value})
return return_dict
except Exception as e:
logger.error(e.args)
raise
def execute_data_insertion(self, template_parameters: dict, data_parameters: dict, fill_range):
try:
__start_coords__, __end_coords__ = self.excel.get_row_column(fill_range)
start_row, start_column = __start_coords__
end_row, end_column = __end_coords__
columns_range = [self.excel.column_num_to_string(i) for i in range(start_column, end_column + 1)]
rows_range = [str(i) for i in range(start_row, end_row + 1)]
insert_data = dict()
insert_json = {'data': {},
'site_id': 'site_100', 'gw_id': 'gw_1', 'pd_id': '1',
'retain_flag': True, 'msg_id': 1, "p_id": ""}
key_mapping_json = dict(zip(template_parameters.values(), template_parameters.keys()))
date_keys = [item for item in list(key_mapping_json.keys()) if item and item.startswith("date")]
for _row in rows_range:
res = [item for item in list(template_parameters.keys()) if item.endswith(_row)]
_dict = {template_parameters[_element]: data_parameters[_element] for _element in res if
template_parameters.get(_element) and data_parameters.get(
_element) and (
"$" in template_parameters.get(_element) or "date" in template_parameters.get(
_element))}
insert_data = self.prepare_data_processor_json(input_json=_dict, data_parameters=data_parameters,
key_mapping_json=key_mapping_json,
date_keys=date_keys, insert_data=insert_data)
for _column in columns_range:
res = [item for item in list(template_parameters.keys()) if item.startswith(_column)]
_dict = {template_parameters[_element]: data_parameters[_element] for _element in res if
template_parameters.get(_element) and data_parameters.get(_element) and
("$" in template_parameters.get(_element) or "date" in template_parameters.get(_element))}
insert_data = self.prepare_data_processor_json(input_json=_dict, data_parameters=data_parameters,
key_mapping_json=key_mapping_json,
date_keys=date_keys, insert_data=insert_data)
msg_counter = 1
for k, v in insert_data.items():
timestamp = int(k)
insert_json.update({
"data": v, "p_id": project_id, "msg_id": msg_counter, "timestamp": timestamp
})
msg_counter += 1
logger.debug(f"Timestamp: {timestamp}, Values: {insert_data}")
self.kafka_conn.publish_message(msg=insert_json)
msg_counter += 1
except Exception as e:
logger.exception(f"Exception occurred while parsing the excel data {e.args}")
def convert_date_string_to_timestamp(self, input_data):
timestamp = ""
if not input_data:
return timestamp
try:
if type(input_data) is datetime:
timestamp = int(input_data.astimezone(timezone(self.tz)).timestamp()) * 1000
else:
timestamp = int(parser.parse(input_data).astimezone(timezone(self.tz)).timestamp()) * 1000
except Exception as e:
logger.exception(f"Exception Occurred while converting the date object {e.args}")
return timestamp
def prepare_data_processor_json(self, input_json, data_parameters: dict, key_mapping_json: dict, date_keys: list,
insert_data: dict):
try:
date_key = [item for item in list(input_json.keys()) if item and item.startswith("date")]
timestamp_value = self.convert_date_string_to_timestamp(input_data=date_key)
if not timestamp_value and date_keys:
timestamp_value = data_parameters[key_mapping_json[data_parameters[0]]]
timestamp_value = self.convert_date_string_to_timestamp(input_data=timestamp_value)
if not timestamp_value and self.enable_current_timestamp:
timestamp_value = int(datetime.now().replace(hour=0, minute=0, second=0, microsecond=0).astimezone(
timezone(self.tz)).timestamp()) * 1000
timestamp_str = str(timestamp_value)
if timestamp_value not in insert_data:
insert_data[timestamp_str] = {}
insert_data[timestamp_str].update(input_json)
except Exception as e:
logger.exception(f"Exception occurred while json creation {e.args}")
return insert_data
class InternalError(Exception):
pass
class UnauthorizedError(Exception):
pass
class ProjectIdError(Exception):
pass
class ILensPermissionError(Exception):
pass
class DuplicateTemplateNameError(Exception):
pass
class DuplicateWorkflowNameError(Exception):
pass
class ImplementationError(Exception):
pass
class StepsNotConfigured(Exception):
pass
class DuplicateLogbookNameError(Exception):
pass
class LeftNavigationNotPresent(Exception):
pass
class RequiredFieldMissing(Exception):
pass
class BulkUploadError(Exception):
pass
class ColumnsMisMatch(Exception):
pass
class InvalidValueFound(Exception):
pass
class QuantityGreaterThanException(Exception):
pass
class ILensErrors(Exception):
def __init__(self, msg):
Exception.__init__(self, msg)
"""
Base Error Class
"""
class ErrorCodes:
ERR001 = "ERR001 - Operating Time is greater than Planned Time"
ERR002 = "ERR002 - Zero Values are not allowed"
ERR003 = "ERR003 - Operating Time is less than Productive Time"
ERR004 = "ERR004 - Rejected Units is greater than Total Units"
class DowntimeResponseError(ILensErrors):
"""
Error Occurred during fetch of downtime
"""
class AuthenticationError(ILensErrors):
"""
JWT Authentication Error
"""
class FileNotFoundException(ILensErrors):
"""
File Not Found Error
"""
class ErrorMessages:
ERROR001 = "Authentication Failed. Please verify token"
ERROR002 = "Signature Expired"
ERROR003 = "Signature Not Valid"
logger:
name: form-management
level: DEBUG
handlers:
- type: RotatingFileHandler
max_bytes: 100000000
back_up_count: 5
- type: SocketHandler
host: localhost
port: 23582
- type: StreamHandler
name: ebpr-engine
import logging
import os
from logging import StreamHandler
from logging.handlers import RotatingFileHandler, SocketHandler
import yaml
from scripts.config.app_configurations import PathToStorage
# this method is to read the configuration from backup.conf
def read_configuration(file_name):
"""
:param file_name:
:return: all the configuration constants
"""
with open(file_name, 'r') as stream:
try:
return yaml.safe_load(stream)
except Exception as e:
print(f"Failed to load Configuration. Error: {e}")
config = read_configuration("scripts/logging/logger_conf.yml")
logging_config = config["logger"]
def get_logger():
"""
Creates a rotating log
"""
__logger__ = logging.getLogger('ilens')
__logger__.setLevel(logging_config["level"].upper())
log_formatter = '%(asctime)s - %(levelname)-6s - [%(threadName)5s:%(funcName)5s():''' \
'%(lineno)s] - %(message)s'
time_format = "%Y-%m-%d %H:%M:%S"
file_path = PathToStorage.LOGS_MODULE_PATH
formatter = logging.Formatter(log_formatter, time_format)
for each_handler in logging_config["handlers"]:
if each_handler["type"] in ["RotatingFileHandler"]:
if not os.path.exists(file_path):
os.makedirs(file_path)
log_file = os.path.join(f"{file_path}{logging_config['name']}.log")
temp_handler = RotatingFileHandler(log_file,
maxBytes=each_handler["max_bytes"],
backupCount=each_handler["back_up_count"])
temp_handler.setFormatter(formatter)
elif each_handler["type"] in ["SocketHandler"]:
temp_handler = SocketHandler(each_handler["host"], each_handler["port"])
elif each_handler["type"] in ["StreamHandler"]:
temp_handler = StreamHandler()
temp_handler.setFormatter(formatter)
else:
temp_handler = None
__logger__.addHandler(temp_handler)
return __logger__
logger = get_logger()
This diff is collapsed.
import json
from contextlib import suppress
from ilens_kafka_publisher import KafkaPublisher
from kafka import KafkaProducer
from scripts.config.app_configurations import KafkaConf, RedisConfig
from scripts.logging.logging import logger
class DataPush:
def __init__(self):
try:
self.obj = KafkaPublisher(kafka_host=KafkaConf.host,
kafka_port=int(KafkaConf.port),
kafka_topic=KafkaConf.topic,
redis_host=RedisConfig.host,
redis_port=RedisConfig.port,
redis_db=RedisConfig.login_db,
enable_sites_partition=KafkaConf.enable_sites_partition,
split_key=KafkaConf.split_key,
round_robin_enable=KafkaConf.round_robin_enable)
except Exception as e:
logger.error(f"Could not connect to Kafka: {e}")
def publish_message(self, msg):
try:
self.obj.perform_task(msg)
except Exception as e:
logger.debug(f"Failed to publish message - {e}")
logger.debug(f"Trying reconnect")
class KafkaProducerUtil:
def __init__(self):
try:
self.host = KafkaConf.host
self.port = int(KafkaConf.port)
kafka_broker = f"{self.host}:{str(self.port)}"
self.producer = KafkaProducer(bootstrap_servers=kafka_broker,
value_serializer=lambda v: v.encode('utf-8'),
api_version=(0, 10, 1))
self.producer.flush()
except Exception as e:
logger.error(f"Kafka connection error: {e}")
def publish(self, topic, data):
try:
kafka_response = self.producer.send(topic, data)
logger.debug(f" Message sent to kafka with response: {kafka_response}")
return True
except Exception as e:
logger.error(e)
return False
class KairosWriter:
def write_data(self, data_json, topic, project_id):
kafka_conn = DataPush()
logger.debug(f"Data being pushed to kafka topic: {topic}")
msg_counter = 0
for k, v in data_json.items():
timestamp, data, site_ids = self.data_validator(k, v)
if not data:
continue
for each in site_ids:
values = {tag_id: value for tag_id, value in data.items() if tag_id.startswith(each)}
if not values:
continue
write_json = {
"data": values,
"site_id": each[:-1],
"gw_id": "",
"pd_id": "",
"p_id": project_id,
"timestamp": timestamp,
"msg_id": msg_counter,
"retain_flag": False
}
logger.debug(f"Timestamp: {timestamp}, Values: {data}")
kafka_conn.publish_message(msg=write_json)
msg_counter += 1
return msg_counter
@staticmethod
def audit_data(data_json, topic):
old_kafka_conn = KafkaProducerUtil()
logger.debug(f"Audit Data being pushed to kafka topic: {topic}")
msg_counter = len(data_json)
for each in data_json:
audit_json = dict(model="form_data_audits", record=each)
old_kafka_conn.publish(topic, json.dumps(audit_json))
return msg_counter
@staticmethod
def data_validator(timestamp, data):
__temp__ = {}
site_ids = set()
for k, v in data.items():
if not k.startswith("site"):
continue
if isinstance(v, int) or isinstance(v, float):
__temp__[k] = v
site_id = f"{k.split('$')[0]}$"
site_ids.add(site_id)
continue
with suppress(ValueError):
__temp__[k] = float(v)
site_id = f"{k.split('$')[0]}$"
site_ids.add(site_id)
return int(timestamp), __temp__, site_ids
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment