Commit 093b76aa authored by aakash.bedi's avatar aakash.bedi

latest files

parent 9108c17e
JOB_ID=1011
HIRARCHY_TAGS=site_100$dept_100$line_100$equipment_106,site_100$dept_100$line_100$equipment_107,site_100$dept_100$line_100$equipment_108
INPUT_TAGS=tag_107,tag_107,tag_109,tag_109,tag_120,tag_120
OUTPUT_TAGS=tag_107,tag_108,tag_109,tag_110,tag_120,tag_121
START_TIMESTAMP=1642636800
NUMBER_FORCASTS=30
FREQUENCY=days
FILTER_TAG=tag_106,tag_106,tag_106
FILTER_VALUE=0,0,0
FILTER_CONDITION=lte,lte,lte
KAIROS_API_URL=http://ilens:iLens$456@qa.ilens.io/kairos
KAIROS_METRC_NAME=project_132__ilens.live_data.raw
RELATIVE_END=1
\ No newline at end of file
FROM python:3.7.12-slim
ADD . /code
WORKDIR /code
RUN mkdir -p /code/logs
RUN pip install -r requirements.txt
CMD ["python","main.py"]
\ No newline at end of file
from datetime import datetime
from dateutil.relativedelta import relativedelta
import requests
import json
import itertools
from typing import List
from scripts.utils.logsetup import logger
from write_kairos_functions import get_kairos_json, get_tags_json
from scripts.utils.config import IS_API
if IS_API=='false':
from scripts.utils.config import config as env_conf
config = env_conf
class MovingAverageComponent:
def __init__(self, api_config:dict=None):
"""
Init function of the component that sets the config
param: api_config: the config dictionary with all parameters to run the module
"""
global config
if api_config and IS_API!='false':
config = api_config
def set_config_and_run(self, api_config):
"""
function to set the config and run the module
param: api_config: the config dictionary with all parameters to run the module
"""
global config
if api_config and IS_API!='false':
config = api_config
self.orchestrator()
def read_kairos(self, tag: str, metric_name: str, frequency: str,
number_of_datapoints: int, end_datapoint: int, filter: bool,
filter_op: str = 'ne', threshold: float = 0) -> List[List]:
"""
This function reads data from kairos for the given tag and specified configuration
and returns the datapoints.
param: tag: kairos tag name for which the data is to be quiried
param: metric_name: Name of the kairos metric from which the data is to be fetched
param: frequency: frequency of the datapoint in kairos
avalable values: [seconds, minutes, hours, days, weeks, months, years]
param: number_of_datapoints: number of relative datapoints to look back to
param: end_datapoint: number of the last datapoint to be considered
param: filter: flag to specify if filter is to be applied
param: filter_op: condition of the filter to apply
avalable values: [equal, ne, lt, lte, gt, gte]
param: threshold: filter thresold to eleminate values based on specified condition
return: List[List]: historical data for the requested tag which is in the below format
example: [[timestamp, value]]
"""
logger.info(f"Fetching data from kairos for {tag} tag for last {number_of_datapoints} datapoints")
query_template = {
"metrics": [
{
"tags": {config['KAIROS_TAG_NAME']:[tag]},
"name": metric_name,
"aggregators": []
}
],
"start_relative": {
"value": f"{number_of_datapoints}",
"unit": frequency
}
}
if end_datapoint>0:
query_template['end_relative'] = {
"value": f"{end_datapoint}",
"unit": frequency
}
if filter:
query_template['metrics'][0]['aggregators'].append({
"name": "filter",
"filter_op": filter_op,
"threshold": threshold
})
response = requests.post(config['KAIROS_API_URL'] + "/api/v1/datapoints/query", data=json.dumps(query_template))
if response.status_code != 200:
logger.exception(f"Unable to fetch data for {tag}")
raise Exception(f"Unable to fetch data for {tag}")
else:
logger.info(f"Fetching data from kairos is successful for {tag} tag for last {number_of_datapoints} datapoints")
return response.json()['queries'][0]['results'][0]['values']
def get_historical_data_forecast(self, tag: str, filter_tag, filter_op: str, threshold: str) -> List[List]:
"""
This function is applying the filtering condition if given.
param: tag: kairos tag name for which the data is to be filter
param: filter_tag: kairos tag name for which data is to be filtered on
param: filter_op: condition of the filter to apply
avalable values: [equal, ne, lt, lte, gt, gte]
param: threshold: filter thresold to eleminate values based on specified condition
return: List[List] : # return exact no of datapoints we required from filtered datapoints
example: [[timestamp, value]]
"""
if filter_tag: # full tag name with hirarchy on which datapoinst are to be filtered
limiter = 10 # use to avoid running the while loop infinete time
start_number = int(config['NUMBER_FORCASTS']*1.5)+config['RELATIVE_END'] # Intially we the taking x 1.5 times datapoints to maintain min no of datapoint in case of filtering
end_number = 0
filtered_data = []
while limiter>0:
kairos_data = self.read_kairos(filter_tag, config['KAIROS_METRC_NAME'], config['FREQUENCY'], start_number, end_number, True, filter_op, threshold)
logger.info(f"fetched {len(kairos_data)} datapoints successfully for {tag}")
filtered_data[0:0] = kairos_data
if len(filtered_data)>=(config['NUMBER_FORCASTS']+config['RELATIVE_END']): # If after filtering operation, no of data points are greater than the min no of datapoints
# required, then we will simply escape from this while loop
limiter = 0
break
end_number = start_number
start_number = int(start_number*1.5) # If after filtering operation, no of data points are lesser than the min no of datapoints
# required, then will take more no of datapoints
limiter-=1 # Initial value of limiter is 10, in case no of data points are lesser than the min no of datapoints required, we will add
# x 1.5 datapoints more in every loop till no of data points becomes greater or equal than the min no of datapoints
# required or limiter becomes 1
else:
logger.error(f"Requested number of valid datapoints {config['NUMBER_FORCASTS']} unavalable looked back {start_number} datapoints")
raise Exception(f"Requested number of valid datapoints {config['NUMBER_FORCASTS']} unavalable looked back {start_number} datapoints")
if config['RELATIVE_END']>0:
filtered_timestamps = list(zip(*filtered_data))[0][-(config['NUMBER_FORCASTS']+config['RELATIVE_END']):-config['RELATIVE_END']] # Function to extract exact no of datapoints we required
else:
filtered_timestamps = list(zip(*filtered_data))[0][-config['NUMBER_FORCASTS']:]
# from filtered datapoints
data = self.read_kairos(tag, config['KAIROS_METRC_NAME'], config['FREQUENCY'], start_number, 0, False, filter_op, threshold)
return list(filter(lambda x: x[0] in filtered_timestamps, data)) # return exact no of datapoints we required
# from filtered datapoints
else:
return self.read_kairos(tag, config['KAIROS_METRC_NAME'], config['FREQUENCY'], config['NUMBER_FORCASTS'], config['RELATIVE_END'], False) # If no filtering condition is given, then this function will
# return only the read kairos data
def write_to_kairos(self, forecast_data: List[List], tag: str, metric: str) -> None:
"""
This function write data to kairos for the given tag, forecast_data and metric name
param: forecast_data: data forecasted by SMA function
param: tag: kairos tag name for which the data is to be write to kairos
param: metric: name of the metric
"""
try:
logger.info(f"pushing forecasted data of {len(forecast_data)} datapoints for {tag}")
data_arr = []
try:
tags_json = get_tags_json(tag)
tags_split_list = tag.split("$")[0:-1]
for i in range(1, len(tags_split_list) + 1):
tags_json[f'l{str(i)}'] = tags_split_list[i - 1]
data_arr.append(get_kairos_json(metric, forecast_data, tags_json))
except Exception as e:
logger.exception(f" Failed to iterate tags {e}")
response = requests.post(config['KAIROS_API_URL'] + "/api/v1/datapoints",
data=json.dumps(data_arr))
logger.info(f"Forecasted data of {len(forecast_data)} datapoints for {tag} successfully pushed to Kairos: {response.status_code}")
if response.status_code != 204:
logger.exception(f"Kairos insertion failed: {response.text}")
raise Exception(f"Kairos Insertion Failed")
except Exception as e:
logger.exception(f"Exception while writing data to KairosDB : {e}")
def get_time_delta(self, freq: str, period: int) -> relativedelta :
"""
In this function we are defining the time index based on frequency and period
param: freq: frequency of the datapoint in kairos : Avalable Values [seconds, minutes, hours, days, weeks, months, years]
-->Relative information, may be negative (argument is plural); adding or subtracting a relativedelta with relative information
performs the corresponding arithmetic operation on the original datetime value with the information in the relativedelta
param: period: Steps between two consecutive components of frequency means whether freq components are increasing or decreasing by 1 or one in
each step
return: : return time index respective to freq and period
"""
time_delta = {
'years':relativedelta(years=+period),
'months':relativedelta(months=+period),
'weeks':relativedelta(weeks=+period),
'days':relativedelta(days=+period),
'hours':relativedelta(hours=+period),
'minutes':relativedelta(minutes=+period),
'seconds':relativedelta(seconds=+period),
}
return time_delta[freq]
def calculate_SMA(self, historical_data: List[List], kairos_input_tag: str) -> List[List]:
"""
Function to calculate simple moving average
param: historical_data: data consisting of list of list of time index and corresponding data values
kairos_input_tag: kairos tag name
return: List[List] : # return list of list consisting of time index and data values
example: [[timestamp, value]]
"""
try:
if config['START_TIMESTAMP']:
if 'today' in config['START_TIMESTAMP'].lower(): # Checking if 'today' string present in START TIMESTAMP
if '+' in config['START_TIMESTAMP']: # Checking whether + or - is input by the user for forecasting days
offset = int("".join(config['START_TIMESTAMP'].split()).lower().split("today+")[1]) # Extracting no of days input by the user for forecasting
else:
offset = -int("".join(config['START_TIMESTAMP'].split()).lower().split("today-")[1]) # Extracting no of days input by the user for forecasting
start_timestamp = datetime.now().date() + relativedelta(days=offset)
elif 'last' in config['START_TIMESTAMP'].lower():
if '+' in config['START_TIMESTAMP']: # Checking whether + or - is input by the user for forecasting days
offset = int("".join(config['START_TIMESTAMP'].split()).lower().split("last+")[1]) # Extracting no of days input by the user for forecasting
else:
offset = -int("".join(config['START_TIMESTAMP'].split()).lower().split("last-")[1]) # Extracting no of days input by the user for forecasting
start_timestamp = datetime.fromtimestamp(historical_data[-1][0]) + relativedelta(days=offset)
else:
start_timestamp = datetime.fromtimestamp(int(config['START_TIMESTAMP'])) # The fromtimestamp() function is used to return the date corresponding
# to a specified timestamp
else:
start_timestamp = datetime.now()
time_index = [int((start_timestamp + self.get_time_delta(config['FREQUENCY'], x)).timestamp())*1000 for x in range(config['NUMBER_FORCASTS'])] # First creating timeindex in form of date format and
# then these are converting into timestamp
# using datetime.timestamp() method.
end_timestamp = datetime.fromtimestamp(int(time_index[-1]/1000))
except Exception as e:
logger.exception(f"Exception while creating time index : {e}")
logger.info(f"calculating simple moving average for {kairos_input_tag} tag from {start_timestamp} to {end_timestamp}")
data_value = list(zip(*historical_data))[1] # Extracting only data values from historical_data consisting of list of list
preds = [sum(data_value[-n:])/n for n in range(1,config['NUMBER_FORCASTS']+1)] # Calculating forcasting data values w.r.to no of days
preds_df = list(map(list, zip(time_index, preds))) # returning list of list of timestamps and respective prediction
return preds_df
def orchestrator(self):
logger.info("Moving Average Component Started")
s = list(itertools.product(list(zip(config['HIRARCHY_TAGS'],config['FILTER_TAG'],config['FILTER_CONDITION'],config['FILTER_VALUE'])), list(zip(config['INPUT_TAGS'], config['OUTPUT_TAGS']))))
for i in list(itertools.product(list(zip(config['HIRARCHY_TAGS'],config['FILTER_TAG'],config['FILTER_CONDITION'],config['FILTER_VALUE'])), list(zip(config['INPUT_TAGS'], config['OUTPUT_TAGS'])))):
logger.info(f"Moving Average Component calculating for {i[0]} hirarchy {i[1][0]} parameter")
hirarchy = i[0][0].strip()
filter_tag = i[0][1].strip()
filter_op = i[0][2].strip()
filter_thres = float(i[0][3].strip())
input_tag = i[1][0].strip()
output_tag = i[1][1].strip()
kairos_input_tag = f'{hirarchy}${input_tag}'
kairos_out_tag = f'{hirarchy}${output_tag}'
kairos_filter_tag = f'{hirarchy}${filter_tag}'
data = self.get_historical_data_forecast(kairos_input_tag, kairos_filter_tag, filter_op, filter_thres)
preds = self.calculate_SMA(data, kairos_input_tag)
self.write_to_kairos(preds, kairos_out_tag, config['KAIROS_METRC_NAME'])
logger.info("Moving Average Component Completed")
if __name__ == "__main__":
if IS_API=='false':
MovingAverageComponent().orchestrator()
import os
from dotenv import load_dotenv
load_dotenv() # take environment variables from .env.
JOB_ID=os.environ.get('JOB_ID')
LOG_LEVEL = os.environ.get("LOG_LEVEL", "DEBUG").upper()
LOGSTASH_HOST = os.environ.get("LOGSTASH_HOST", None)
LOGSTASH_PORT = os.environ.get("LOGSTASH_PORT", None)
LOG_HANDLER_NAME = os.environ.get("LOG_HANDLER_NAME", "MovingAverageComponent")
BASE_LOG_PATH = os.path.join(os.getcwd(), "logs".format())
KAIROS_API_URL=os.environ.get("KAIROS_API_URL").strip()
KAIROS_METRC_NAME=os.environ.get("KAIROS_METRC_NAME").strip()
KAIROS_TAG_NAME = os.environ.get('KAIROS_TAG_NAME', 'c3').strip()
IS_API = os.environ.get('IS_API', 'false').strip().lower() # boolean flag stating is the module run from component or an api
if IS_API == 'false':
HIRARCHY_TAGS = os.environ.get('HIRARCHY_TAGS').strip().split(',') # comma separated ilens hirarchy tags
INPUT_TAGS = os.environ.get('INPUT_TAGS').strip().split(',') # comma separated data tags for forecasting
OUTPUT_TAGS = os.environ.get('OUTPUT_TAGS').strip().split(',') # comma separated data tags to push forecasted data
START_TIMESTAMP = os.environ.get('START_TIMESTAMP', '').strip() # absolute or relative timestamp with respect to today plus offset or exact UTC timestamp in sec or use last historical timestamp plus offset Ex: today+1 or 1642703400 or last+1
NUMBER_FORCASTS = int(os.environ.get('NUMBER_FORCASTS').strip()) # number of future datapoints to forecast
FREQUENCY = os.environ.get('FREQUENCY').strip() # frequency of the datapoint in kairos : Available Values [seconds, minutes, hours, days, weeks, months, years]
FILTER_TAG = os.environ.get('FILTER_TAG', '').strip().split(',') # comma separated tag name on which datapoints are to be filtered
FILTER_VALUE = os.environ.get('FILTER_VALUE', "0").strip().split(',') # comma separated value that needs to be filtered out
FILTER_CONDITION = os.environ.get('FILTER_CONDITION', "ne").strip().split(',') # comma separated condition of the filter to be applied : Available Values [equal, ne, lt, lte, gt, gte]
RELATIVE_END = int(os.environ.get('RELATIVE_END', '0').strip()) # relative end datapoint number to consider for historical data default is 1
config = {
"KAIROS_API_URL":KAIROS_API_URL,
"KAIROS_METRC_NAME":KAIROS_METRC_NAME,
"KAIROS_TAG_NAME":KAIROS_TAG_NAME,
"HIRARCHY_TAGS":HIRARCHY_TAGS,
"INPUT_TAGS":INPUT_TAGS,
"OUTPUT_TAGS":OUTPUT_TAGS,
"START_TIMESTAMP":START_TIMESTAMP,
"NUMBER_FORCASTS":NUMBER_FORCASTS,
"FREQUENCY":FREQUENCY,
"FILTER_TAG":FILTER_TAG,
"FILTER_VALUE":FILTER_VALUE,
"FILTER_CONDITION":FILTER_CONDITION,
"RELATIVE_END":RELATIVE_END
}
\ No newline at end of file
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from scripts.utils.config import LOG_LEVEL, LOG_HANDLER_NAME, BASE_LOG_PATH
import logging
from logging.handlers import RotatingFileHandler
from logging import WARNING,INFO,DEBUG,ERROR
import os
DEFAULT_FORMAT = '%(asctime)s %(levelname)5s %(name)s %(message)s'
DEBUG_FORMAT = '%(asctime)s %(levelname)5s %(name)s [%(threadName)5s] %(message)s'
EXTRA = {}
FORMATTER = DEFAULT_FORMAT
if LOG_LEVEL.strip() == "DEBUG":
FORMATTER = DEBUG_FORMAT
def get_logger(log_handler_name, extra=EXTRA):
"""
Purpose : To create logger .
:param log_handler_name: Name of the log handler.
:param extra: extra args for the logger
:return: logger object.
"""
log_path = os.path.join(BASE_LOG_PATH, log_handler_name + ".log")
logstash_temp = os.path.join(BASE_LOG_PATH, log_handler_name + ".db")
logger = logging.getLogger(log_handler_name)
logger.setLevel(LOG_LEVEL.strip().upper())
log_handler = logging.StreamHandler()
log_handler.setLevel(LOG_LEVEL)
formatter = logging.Formatter(FORMATTER)
log_handler.setFormatter(formatter)
handler = RotatingFileHandler(log_path, maxBytes=10485760,
backupCount=5)
handler.setFormatter(formatter)
logger.addHandler(log_handler)
logger.addHandler(handler)
logger = logging.LoggerAdapter(logger, extra)
return logger
logger = get_logger(LOG_HANDLER_NAME)
from scripts.utils.logsetup import logger
def get_kairos_json(metric, forecast_data, tags_json):
temp_json = {
"name": metric,
"datapoints": forecast_data,
"tags": tags_json
}
return temp_json
def tag_na_value_check(tag_value, tag_id):
if logger.restrict_na_value_push and tag_value in [None, 'NA', 'nan', 'Nan', 'NAN']:
logger.info(f"Invalid tag value Tag:{tag_id} Value: {tag_value}")
return True
return False
def tag_none_value_check(tag_value, tag_id):
if tag_value is None:
logger.error(f" Invalid tag value found for the tag ID: {tag_id} and value: {tag_value}")
return True
return False
def get_tags_json(tag_id):
return {
"c3": tag_id,
"c1": tag_id.split("$")[0],
"c5": tag_id.split("$")[-1]
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment