Commit 84e3ccd7 authored by Akshay G's avatar Akshay G

Added Kairos Aggregation functionality

parent f0fd658e
FROM python:3.7-slim
FROM azacrknowledgelens.azurecr.io/ai-forecasting/batch-multi-karios-read-component:v0.6
ADD . /opt
WORKDIR /opt
RUN pip install -r requirements.txt
CMD python main.py
\ No newline at end of file
CMD python main.py
FROM python:3.7-slim
ADD . /opt
WORKDIR /opt
RUN pip install -r requirements.txt
CMD python main.py
\ No newline at end of file
import requests
from pandas import DataFrame
from datetime import datetime
from scripts.common.config_parser import *
from scripts.common.logsetup import logger
from scripts.utils import get_aggregation_query
from scripts.common.constants import KariosConstants, ComponentExceptions
......@@ -66,7 +66,9 @@ def get_data(query):
query.pop(KariosConstants.END_ABSOLUTE_KEY)
query.pop(KariosConstants.END_RELATIVE_KEY)
if KariosConstants.AGGREGATORS_KEY not in config.keys():
if config.get(KariosConstants.AGGREGATION_OPS) != "None" or config.get(KariosConstants.AGGREGATION_OPS) is not None:
query[KariosConstants.METRICS_KEY][0][KariosConstants.AGGREGATORS_KEY].append(get_aggregation_query(config))
else:
query[KariosConstants.METRICS_KEY][0].pop(KariosConstants.AGGREGATORS_KEY)
if KariosConstants.GROUPBY_KEY not in config.keys():
query[KariosConstants.METRICS_KEY][0].pop(KariosConstants.GROUPBY_KEY)
......@@ -77,14 +79,13 @@ def get_data(query):
raise Exception(ComponentExceptions.MISSING_TAG_HIERARCHY_EXCEPTION)
logger.info("Querying Karios DB...")
logger.debug("Query --> {}".format(query))
response = requests.post(kairosdb_server + KariosConstants.KARIOS_API, data=json.dumps(query))
if response.status_code == KariosConstants.REQUEST_SUCCESS_CODE:
logger.info("Receiving data...")
data = response.json()[KariosConstants.QUERIES_KEY][0][KariosConstants.RESULTS_KEY][0] \
[KariosConstants.VALUES_KEY]
df = DataFrame.from_dict(data)
logger.info("Dataframe --> {}".format(df.head()))
if len(df) > 0:
df.columns = [KariosConstants.TIMESTAMP_COLUMN_NAME, list(config[KariosConstants.TAG_HIERARCHY_KEY].keys())
[0]]
......@@ -92,13 +93,15 @@ def get_data(query):
logger.warn("The dataframe is empty!")
df = DataFrame(columns=[KariosConstants.TIMESTAMP_COLUMN_NAME, list(config[KariosConstants.
TAG_HIERARCHY_KEY].keys())[0]])
logger.debug("Dataframe --> \n{}".format(df.head()))
return df
else:
raise Exception(response.json()[KariosConstants.ERRORS_KEY])
if __name__ == '__main__':
logger.debug("Config --> {}".format(config))
logger.debug(KariosConstants.LOG_VAR_MESSAGE.format("COMPONENT CONFIG", json.dumps(config, indent=1)))
data = get_data(KariosConstants.QUERY)
data.to_csv(os.path.join(config['shared_volume'], 'data.csv'), index=False)
logger.info("Data successfully written to --> {}".format(os.path.join(config['shared_volume'], 'data.csv')))
......@@ -4,6 +4,7 @@ import sys
import yaml
import json
from scripts.utils import str2bool
# os.environ['config'] = '{"kairosdb_url": "http://192.168.0.207:8080",' \
# '"metric_name": "ilens.live_data.raw",' \
# '"tag_hierarchy": "site_153$dept_1006$line_220$equipment_1258$tag_5790",' \
......@@ -37,9 +38,14 @@ config = {
"start_absolute": os.environ.get("start_absolute"),
"tag_hierarchy": json.loads(os.environ.get("tag_hierarchy")),
"metric_name": os.environ.get("metric_name"),
"kairosdb_url": os.environ.get("kairosdb_url")
"kairosdb_url": os.environ.get("kairosdb_url"),
"aggregation_ops": os.environ.get("aggregation_ops"),
"aggregation_sampling_value": os.environ.get("aggregation_sampling_value"),
"aggregation_sampling_unit": os.environ.get("aggregation_sampling_unit"),
"aggregation_alignment": os.environ.get("aggregation_alignment")
}
if not os.path.exists(config['shared_volume']):
sys.stderr.write("Shared path does not exist!")
sys.stderr.write("Creating path --> {}".format(config['shared_volume']))
os.makedirs(config['shared_volume'])
......@@ -9,6 +9,11 @@ class KariosConstants:
START_RELATIVE_KEY = "start_relative"
END_ABSOLUTE_KEY = "end_absolute"
END_RELATIVE_KEY = "end_relative"
APPLY_AGGREGATION_KEY = "apply_aggregation"
AGGREGATION_OPS = "aggregation_ops"
AGGREGATION_SAMPLING_VALUE = "aggregation_sampling_value"
AGGREGATION_SAMPLING_UNIT = "aggregation_sampling_unit"
AGGREGATION_ALIGNMENT = "aggregation_alignment"
VALUE_KEY = "value"
VALUES_KEY = "values"
METRICS_KEY = "metrics"
......@@ -27,6 +32,7 @@ class KariosConstants:
PLUGINS_KEY = "plugins"
CACHE_TIME_KEY = "cache_time"
TIMESTAMP_COLUMN_NAME = "timestamp"
SAMPLING_KEY = "sampling"
QUERY = {
METRICS_KEY: [
{
......@@ -34,7 +40,7 @@ class KariosConstants:
},
NAME_KEY: None,
AGGREGATORS_KEY: None,
AGGREGATORS_KEY: list(),
GROUPBY_KEY: None
}
......@@ -52,6 +58,13 @@ class KariosConstants:
UNIT_KEY: None
}
}
ALIGNMENT_MAPPING = {
"None": None,
"Sample": "align_sampling",
"Start Time": "align_start_time",
"End Time": "align_end_time"
}
LOG_VAR_MESSAGE = "\n"+"#"*25+"\n"+"{}"+"\n"+"#"*25+"\n"+"{}\n"
class ComponentExceptions:
......
from scripts.common.constants import KariosConstants
def str2bool(txt):
if str(txt).lower() in ['True', 'true', 1, '1', True, 'yes']:
return True
return False
def get_value_or_raise_exception(dictionary, key):
value = dictionary.get(key)
if value is None:
raise Exception("Invalid/Missing value for the key {} in {}".format(key, dictionary))
return value
def get_aggregation_query(config):
agg_dict = {
KariosConstants.NAME_KEY: get_value_or_raise_exception(config, KariosConstants.AGGREGATION_OPS).lower(),
KariosConstants.SAMPLING_KEY: {
KariosConstants.VALUE_KEY: get_value_or_raise_exception(config, KariosConstants.AGGREGATION_SAMPLING_VALUE),
KariosConstants.UNIT_KEY: get_value_or_raise_exception(config, KariosConstants.AGGREGATION_SAMPLING_UNIT).lower()
}
}
if KariosConstants.ALIGNMENT_MAPPING.get(KariosConstants.AGGREGATION_ALIGNMENT) is not None:
agg_dict[KariosConstants.ALIGNMENT_MAPPING.get(KariosConstants.AGGREGATION_ALIGNMENT)] = True
return agg_dict
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment