Commit c72ad9c0 authored by dasharatha.vamshi's avatar dasharatha.vamshi

init

parent c8dd48f6
This diff is collapsed.
FROM python:3.9-buster
COPY . /code
WORKDIR /code
RUN pip install -r requirements.txt
CMD [ "python","app.py" ]
\ No newline at end of file
__version__ = "V0.0.1"
from scripts.constants.app_configuration import METADATA
from scripts.core.utils.compliance_util import Compliance
from scripts.core.utils.timestamp_util import get_timestamps
if __name__ == '__main__':
all_tags = METADATA['tags']
tags_data = {}
tags_type = ['upper', 'lower', 'live']
for param, param_data in all_tags.items():
for param_data_type in tags_type:
tags_data[f'{param}_{param_data_type}'] = param_data[param_data_type]
column_renamer = {v: k for k, v in tags_data.items()}
payload = METADATA['query']
start_date = METADATA['start_date']
end_date = METADATA['end_date']
print(start_date, end_date)
all_timestamps = get_timestamps(start_date, end_date)
print(all_timestamps)
payload['metrics'][0]['tags']['c3'] = list(column_renamer.keys())
obj = Compliance(payload, column_renamer, all_tags)
obj.start_calculation(all_timestamps)
[KAIROS_DB]
uri=$KAIROS_URI
[POSTGRES]
postgres_uri=$POSTGRES_URI
metadata:
client: "Jubilant"
site: "PP2 R5 Reaction Section"
project_name: "Golden Batch for R5 Reaction"
compliance_percentage: 70
start_date: "2022-08-01"
end_date: "2022-12-31"
tags:
"Cooling tower.TI-5414":
"upper": site_107$dept_113$line_194$equipment_2632$tag_4352
"lower": site_107$dept_113$line_194$equipment_2632$tag_4351
"live": site_107$dept_113$line_194$equipment_2632$tag_743
"VAHP.TI-5415":
"upper": site_107$dept_113$line_194$equipment_2634$tag_4354
"lower": site_107$dept_113$line_194$equipment_2634$tag_4353
"live": site_107$dept_113$line_194$equipment_2634$tag_778
"Chiller.TI-5451":
"upper": site_107$dept_113$line_194$equipment_2635$tag_4356
"lower": site_107$dept_113$line_194$equipment_2635$tag_4355
"live": site_107$dept_113$line_194$equipment_2635$tag_775
"Feed Drum.FIC-5161":
"upper": site_107$dept_113$line_198$equipment_2588$tag_4358
"lower": site_107$dept_113$line_198$equipment_2588$tag_4357
"live": site_107$dept_113$line_198$equipment_2588$tag_1968
"Feed Drum.FIC-5111":
"upper": site_107$dept_113$line_198$equipment_2588$tag_4364
"lower": site_107$dept_113$line_198$equipment_2588$tag_4363
"live": site_107$dept_113$line_198$equipment_2588$tag_500
"Feed Drum.FIC-5102":
"upper": site_107$dept_113$line_198$equipment_2588$tag_4360
"lower": site_107$dept_113$line_198$equipment_2588$tag_4359
"live": site_107$dept_113$line_198$equipment_2588$tag_765
"Feed Drum.FIC-5103":
"upper": site_107$dept_113$line_198$equipment_2588$tag_4362
"lower": site_107$dept_113$line_198$equipment_2588$tag_4361
"live": site_107$dept_113$line_198$equipment_2588$tag_766
"Reactor.REACTINVENTORY":
"upper": site_107$dept_113$line_198$equipment_2590$tag_4366
"lower": site_107$dept_113$line_198$equipment_2590$tag_4365
"live": site_107$dept_113$line_198$equipment_2590$tag_2403
"Reactor.CATCIRCULATION":
"upper": site_107$dept_113$line_198$equipment_2590$tag_4368
"lower": site_107$dept_113$line_198$equipment_2590$tag_4367
"live": site_107$dept_113$line_198$equipment_2590$tag_2404
"Reactor.TI-5126":
"upper": site_107$dept_113$line_198$equipment_2590$tag_4370
"lower": site_107$dept_113$line_198$equipment_2590$tag_4369
"live": site_107$dept_113$line_198$equipment_2590$tag_732
"Reactor.TI-5108C":
"upper": site_107$dept_113$line_198$equipment_2590$tag_4372
"lower": site_107$dept_113$line_198$equipment_2590$tag_4371
"live": site_107$dept_113$line_198$equipment_2590$tag_785
"Regenerator.RGNINVENTORY":
"upper": site_107$dept_113$line_198$equipment_2592$tag_4374
"lower": site_107$dept_113$line_198$equipment_2592$tag_4373
"live": site_107$dept_113$line_198$equipment_2592$tag_2407
"Regenerator.TI-5113B":
"upper": site_107$dept_113$line_198$equipment_2592$tag_4376
"lower": site_107$dept_113$line_198$equipment_2592$tag_4375
"live": site_107$dept_113$line_198$equipment_2592$tag_512
"Venturi Scrubber.TI-5127":
"upper": site_107$dept_113$line_198$equipment_2594$tag_4378
"lower": site_107$dept_113$line_198$equipment_2594$tag_4377
"live": site_107$dept_113$line_198$equipment_2594$tag_744
"Venturi Scrubber.TI-5120":
"upper": site_107$dept_113$line_198$equipment_2594$tag_4380
"lower": site_107$dept_113$line_198$equipment_2594$tag_4379
"live": site_107$dept_113$line_198$equipment_2594$tag_745
"Venturi Scrubber.TI-5121":
"upper": site_107$dept_113$line_198$equipment_2594$tag_4382
"lower": site_107$dept_113$line_198$equipment_2594$tag_4381
"live": site_107$dept_113$line_198$equipment_2594$tag_774
"7302011030 Absorber.TI-5129":
"upper": site_107$dept_113$line_198$equipment_2595$tag_4384
"lower": site_107$dept_113$line_198$equipment_2595$tag_4383
"live": site_107$dept_113$line_198$equipment_2595$tag_748
"7302011030 Recovery Column-A.FI-5263A":
"upper": site_107$dept_113$line_198$equipment_2596$tag_4386
"lower": site_107$dept_113$line_198$equipment_2596$tag_4385
"live": site_107$dept_113$line_198$equipment_2596$tag_1988
query:
metrics:
- tags:
c3:
name: ilens.live_data.raw
group_by:
- name: tag
tags:
- c3
aggregators:
- name: avg
sampling:
value: '15'
unit: minutes
align_sampling: true
align_start_time: true
plugins: [ ]
cache_time: 0
time_zone: Asia/Calcutta
start_absolute: 0
end_absolute: 0
KAIROS_URI= https://iLens:iLensJUB$456@jub-kairos.ilens.io/kairos
POSTGRES_URI = postgresql://iLens:iLensJUB$456@jubilant.ilens.io/kairos
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
from dotenv import load_dotenv
import os
import sys
from configparser import ConfigParser, BasicInterpolation
import yaml
# Configuration File Constants
_application_conf = "./conf/application.conf"
_default_conf = "./config.env"
data_conf = "./conf/data.yml"
load_dotenv(dotenv_path=_default_conf)
class EnvInterpolation(BasicInterpolation):
"""
Interpolation which expands environment variables in values.
"""
def before_get(self, parser, section, option, value, defaults):
value = super().before_get(parser, section, option, value, defaults)
if not os.path.expandvars(value).startswith("$"):
return os.path.expandvars(value)
else:
return
try:
config = ConfigParser(interpolation=EnvInterpolation())
config.read(_application_conf)
except Exception as e:
print(f"Error while loading the config: {e}")
print("Failed to Load Configuration. Exiting!!!")
sys.exit()
class Logging:
level = config.get("LOGGING", "level", fallback="INFO")
level = level or "INFO"
tb_flag = config.getboolean("LOGGING", "traceback", fallback=True)
tb_flag = tb_flag if tb_flag is not None else True
# Configuration Variables
# Kairos Configuration Variables
KAIROS_DB_HOST = config["KAIROS_DB"]["uri"]
# Postgres Configuration Variables
POSTGRES_URI = config["POSTGRES"]["postgres_uri"]
# Read the configuration file
with open(data_conf, "r") as _cf:
_config = yaml.full_load(_cf)
METADATA = _config["metadata"]
import json
import pandas as pd
import requests
from loguru import logger
class DataPuller(object):
def __init__(self, db_host, column_rename, payload, absolute_time=None, optional_payload=None):
self.optional_payload = optional_payload
self.db_host_url = db_host
self.request_url = "{kairos_host}/api/v1/datapoints/query".format(kairos_host=self.db_host_url)
self.payload = payload
self.column_rename = column_rename
if absolute_time is not None:
if "start_relative" in self.payload:
del self.payload["start_relative"]
if "end_relative" in self.payload:
del self.payload["end_relative"]
self.payload["start_absolute"] = absolute_time["start_absolute"]
self.payload["end_absolute"] = absolute_time["end_absolute"]
def get_data(self, start_timestamp, end_timestamp):
logger.info("Data for the parameters being pulled from Kairos Database")
self.payload['start_absolute'] = start_timestamp
self.payload['end_absolute'] = end_timestamp
response_data = requests.post(url=self.request_url, data=json.dumps(self.payload)).json()
output_data = response_data["queries"]
logger.debug("Data pull complete")
df_final = pd.DataFrame()
for i in range(len(output_data)):
grouped_output_data = output_data[i]["results"]
for each_grouped_data in grouped_output_data:
value = (each_grouped_data["values"])
tag_id = each_grouped_data["group_by"][0]["group"]["c3"]
try:
logger.debug(f"Renamed {tag_id} to {self.column_rename[tag_id]} in Data")
column_name = self.column_rename[tag_id]
except KeyError as ke:
logger.debug(f"Column Renaming Logic not found for {tag_id} - {ke}")
column_name = tag_id
df_column_data = pd.DataFrame(data=value, columns=["timestamp", column_name])
if df_final.empty:
df_final = df_column_data
else:
df_final = df_final.merge(df_column_data, how="outer", left_on="timestamp", right_on="timestamp")
df_final["epoch_time"] = df_final["timestamp"]
df_final["timestamp"] = pd.to_datetime(df_final['timestamp'], unit="ms").dt.tz_localize('UTC').dt.tz_convert(
'Asia/Kolkata')
logger.debug(f"Final number of columns : {len(list(df_final.columns))}")
return df_final
from datetime import datetime
from scripts.constants.app_configuration import KAIROS_DB_HOST, METADATA
from scripts.core.data.data_import import DataPuller
from loguru import logger
import pandas as pd
class Compliance:
def __init__(self, payload, column_rename, tags_data):
self.payload = payload
self.column_rename = column_rename
self.tags_data = tags_data
self._dp_ = DataPuller(db_host=KAIROS_DB_HOST, payload=self.payload, column_rename=self.column_rename)
@staticmethod
def add_compliance_column(df, column, live_col, upper_col, lower_col):
try:
df[f'{column}_compliance'] = (df[live_col] > df[lower_col]) & (df[live_col] < df[upper_col])
except Exception as e:
logger.warning(f"Error adding compliance column - {e}")
return df
@staticmethod
def create_compliance_sheet(final_data_dict, compliance_cols, total_columns_criteria):
logger.info("Calculating overall compliance...")
c_df = pd.DataFrame()
try:
c_df['client'] = [METADATA['client']] * len(final_data_dict)
c_df['site'] = [METADATA['site']] * len(final_data_dict)
c_df['project_name'] = [METADATA['project_name']] * len(final_data_dict)
final_compliance_list = []
timestamp_data = []
for idx, value in final_data_dict.items():
data_list = []
for k, v in value.items():
if k in compliance_cols:
data_list.append(v)
else:
timestamp_data.append(v)
count = data_list.count(True)
if count >= total_columns_criteria:
final_compliance_list.append(1)
else:
final_compliance_list.append(0)
c_df['time'] = timestamp_data
c_df['compliance'] = final_compliance_list
except Exception as e:
logger.warning(f'Error - {e}')
return c_df
def start_calculation(self, all_timestamps):
all_dfs = []
parameter_wise_dfs = []
for i in all_timestamps:
start_timestamp = i['start']
end_timestamp = i['end']
start_time = datetime.fromtimestamp(start_timestamp//1000)
end_time = datetime.fromtimestamp(end_timestamp//1000)
logger.info(f"Calculating for {start_time} to {end_time}")
df = self._dp_.get_data(start_timestamp, end_timestamp)
total_cols = len(df.columns) - 2
required_total_cols = len(self.tags_data) * 3
if total_cols != required_total_cols:
logger.warning(f"No Data for {start_time} to {end_time}")
else:
df.dropna(inplace=True)
compliance_cols = []
for column, column_data in self.tags_data.items():
df = self.add_compliance_column(df, column, f'{column}_live', f'{column}_upper', f'{column}_lower')
compliance_cols.append(f'{column}_compliance')
# df.to_csv('r5-parameter-wise-compliance.csv', index=False)
parameter_wise_dfs.append(df)
total_columns_criteria = int((METADATA['compliance_percentage'] / 100) * len(compliance_cols))
logger.info(f"Need {total_columns_criteria} from {len(compliance_cols)} columns to satisfy the "
f"compliance")
rq_cols = compliance_cols.copy()
rq_cols.append('timestamp')
df = df[rq_cols]
final_data_dict = df.to_dict(orient='index')
df_final = self.create_compliance_sheet(final_data_dict, compliance_cols, total_columns_criteria)
all_dfs.append(df_final)
if all_dfs:
logger.info("Combining the Data")
final_df = pd.concat(all_dfs)
final_df.to_csv('r5-overall-compliance.csv', index=False)
if parameter_wise_dfs:
params_df = pd.concat(parameter_wise_dfs)
params_df.to_csv('r5-parameter-wise-compliance.csv', index=False)
from datetime import datetime, timedelta
from loguru import logger
from scripts.errors import DateError
def get_timestamps(start_date, end_date):
logger.info(f"Getting start and end time from {start_date} and {end_date}")
s_date = datetime.strptime(start_date, "%Y-%m-%d")
e_date = datetime.strptime(end_date, "%Y-%m-%d")
if s_date > e_date:
raise DateError("Invalid Dates mentioned")
my_dates = [s_date + timedelta(days=x) for x in range((e_date - s_date).days + 1)]
print(my_dates)
start = 0
end = len(my_dates)
step = 30
timestamps = []
for i in range(start, end, step):
x = i
t_list = my_dates[x:x + step]
start = t_list[0]
end = t_list[-1] + timedelta(hours=24, minutes=0, seconds=0, milliseconds=0)
end = min(end, e_date)
timestamps.append({
"start": int(start.timestamp()) * 1000,
"end": int(end.timestamp()) * 1000
})
return timestamps
class KairosDataPullError(Exception):
pass
class DataframeFormationError(Exception):
pass
class OperationError(Exception):
pass
class ForecastError(Exception):
pass
class KairosInsertionError(Exception):
pass
class DateError(Exception):
pass
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment