Commit 1b54f368 authored by aakash.bedi's avatar aakash.bedi

neshap api

parent 3702f467
JOB_ID=1011
HIRARCHY_TAGS=site_100$dept_100$line_100$equipment_106
INPUT_TAGS=tag_107,tag_107,tag_109,tag_109,tag_120,tag_120
OUTPUT_TAGS=tag_107,tag_108,tag_109,tag_110,tag_120,tag_121
START_TIMESTAMP=1642636800
NUMBER_FORCASTS=30
FREQUENCY=days
FILTER_TAG=site_100$dept_100$line_100$equipment_106$tag_106
FILTER_VALUE=0
FILTER_CONDITION=lte
KAIROS_API_URL=http://ilens:iLens$456@qa.ilens.io/kairos/
KAIROS_METRC_NAME=project_132__ilens.live_data.raw
\ No newline at end of file
FROM python:3.7.12-slim
ADD . /code
WORKDIR /code
RUN mkdir -p /code/logs
RUN pip install -r requirements.txt
CMD ["python","main.py"]
\ No newline at end of file
from datetime import datetime
from dateutil.relativedelta import relativedelta
import requests
import json
import itertools
from typing import List
import logging as logger
# from scripts.utils.logsetup import logger
# from write_kairos_functions import get_kairos_json, get_tags_json
from scripts.utils.config import *
import uvicorn
from fastapi import FastAPI
from fastapi.responses import JSONResponse
app = FastAPI()
@app.get('/read_kairos')
def read_kairos(tag: str, metric_name: str, frequency: str, number_of_datapoints: int, end_datapoint: int, filter: bool) -> List[List]:
"""
This function reads data from kairos for the given tag and specified configuration
and returns the datapoints.
param: tag: kairos tag name for which the data is to be quiried
param: metric_name: Name of the kairos metric from which the data is to be fetched
param: frequency: frequency of the datapoint in kairos
avalable values: [seconds, minutes, hours, days, weeks, months, years]
param: number_of_datapoints: number of relative datapoints to look back to
param: end_datapoint: number of the last datapoint to be considered
param: filter: flag to specify if filter is to be applied
return: List[List]: historical data for the requested tag which is in the below format
example: [[timestamp, value]]
"""
logger.info(f"Fetching data from kairos for {tag} tag for last {number_of_datapoints} datapoints")
query_template = {
"metrics": [
{
"tags": {KAIROS_TAG_NAME:[tag]},
"name": metric_name,
"aggregators": []
}
],
"start_relative": {
"value": f"{number_of_datapoints}",
"unit": frequency
},
"end_relative": {
"value": f"{end_datapoint}",
"unit": frequency
}
}
if filter:
query_template['metrics'][0]['aggregators'].append({
"name": "filter",
"filter_op": FILTER_CONDITION,
"threshold": FILTER_VALUE
})
response = requests.post(KAIROS_API_URL + "/api/v1/datapoints/query", data=json.dumps(query_template))
if response.status_code != 200:
logger.exception(f"Unable to fetch data for {tag}")
raise Exception(f"Unable to fetch data for {tag}")
else:
logger.info(f"Fetching data from kairos is successful for {tag} tag for last {number_of_datapoints} datapoints")
return response.json()['queries'][0]['results'][0]['values']
@app.get('/get_historical_data_forecast')
def get_historical_data_forecast(tag: str) -> List[List]:
"""
This function is applying the filtering condition if given.
param: tag: kairos tag name for which the data is to be filter
return: List[List] : # return exact no of datapoints we required from filtered datapoints
example: [[timestamp, value]]
"""
if FILTER_TAG: # full tag name with hirarchy on which datapoinst are to be filtered
limiter = 10 # use to avoid running the while loop infinete time
start_number = int(NUMBER_FORCASTS*1.5) # Intially we the taking x 1.5 times datapoints to maintain min no of datapoint in case of filtering
end_number = RELATIVE_END
filtered_data = []
while limiter>0:
kairos_data = read_kairos(FILTER_TAG, KAIROS_METRC_NAME, FREQUENCY, start_number, end_number, True)
logger.info(f"fetched {len(kairos_data)} datapoints successfully for {tag}")
filtered_data[0:0] = kairos_data
if len(filtered_data)>=NUMBER_FORCASTS: # If after filtering operation, no of data points are greater than the min no of datapoints
# required, then we will simply escape from this while loop
limiter = 0
break
end_number = start_number
start_number = int(start_number*1.5) # If after filtering operation, no of data points are lesser than the min no of datapoints
# required, then will take more no of datapoints
limiter-=1 # Initial value of limiter is 10, in case no of data points are lesser than the min no of datapoints required, we will add
# x 1.5 datapoints more in every loop till no of data points becomes greater or equal than the min no of datapoints
# required or limiter becomes 1
else:
logger.error(f"Requested number of valid datapoints {NUMBER_FORCASTS} unavalable looked back {start_number} datapoints")
raise Exception(f"Requested number of valid datapoints {NUMBER_FORCASTS} unavalable looked back {start_number} datapoints")
filtered_timestamps = list(zip(*filtered_data))[0][-NUMBER_FORCASTS:] # Function to extract exact no of datapoints we required
# from filtered datapoints
data = read_kairos(tag, KAIROS_METRC_NAME, FREQUENCY, start_number, RELATIVE_END, False)
return list(filter(lambda x: x[0] in filtered_timestamps, data)) # return exact no of datapoints we required
# from filtered datapoints
else:
return read_kairos(tag, KAIROS_METRC_NAME, FREQUENCY, NUMBER_FORCASTS, RELATIVE_END, False) # If no filtering condition is given, then this function will
# return only the read kairos data
# def write_to_kairos(forecast_data: List[List], tag: str, metric: str) -> None:
# """
# This function write data to kairos for the given tag, forecast_data and metric name
# param: forecast_data: data forecasted by SMA function
# param: tag: kairos tag name for which the data is to be write to kairos
# param: metric: name of the metric
# """
# try:
# logger.info(f"pushing forecasted data of {len(forecast_data)} datapoints for {tag}")
# data_arr = []
# try:
# tags_json = get_tags_json(tag)
# tags_split_list = tag.split("$")[0:-1]
# for i in range(1, len(tags_split_list) + 1):
# tags_json[f'l{str(i)}'] = tags_split_list[i - 1]
# data_arr.append(get_kairos_json(metric, forecast_data, tags_json))
# except Exception as e:
# logger.exception(f" Failed to iterate tags {e}")
# response = requests.post(KAIROS_API_URL + "/api/v1/datapoints",
# data=json.dumps(data_arr))
# logger.info(f"Forecasted data of {len(forecast_data)} datapoints for {tag} successfully pushed to Kairos: {response.status_code}")
# if response.status_code != 204:
# logger.exception(f"Kairos insertion failed: {response.text}")
# raise Exception(f"Kairos Insertion Failed")
# except Exception as e:
# logger.exception(f"Exception while writing data to KairosDB : {e}")
@app.get('/get_time_delta')
def get_time_delta(freq: str, period: int) -> relativedelta :
"""
In this function we are defining the time index based on frequency and period
param: freq: frequency of the datapoint in kairos : Avalable Values [seconds, minutes, hours, days, weeks, months, years]
-->Relative information, may be negative (argument is plural); adding or subtracting a relativedelta with relative information
performs the corresponding arithmetic operation on the original datetime value with the information in the relativedelta
param: period: Steps between two consecutive components of frequency means whether freq components are increasing or decreasing by 1 or one in
each step
return: : return time index respective to freq and period
"""
time_delta = {
'years':relativedelta(years=+period),
'months':relativedelta(months=+period),
'weeks':relativedelta(weeks=+period),
'days':relativedelta(days=+period),
'hours':relativedelta(hours=+period),
'minutes':relativedelta(minutes=+period),
'seconds':relativedelta(seconds=+period),
}
return time_delta[freq]
@app.post('/predict')
def calculate_SMA(historical_data: List[List], kairos_input_tag: str) -> List[List]:
"""
Function to calculate simple moving average
param: historical_data: data consisting of list of list of time index and corresponding data values
kairos_input_tag: kairos tag name
return: List[List] : # return list of list consisting of time index and data values
example: [[timestamp, value]]
"""
try:
if 'today' in START_TIMESTAMP.lower(): # Checking if 'today' string present in START TIMESTAMP
if '+' in START_TIMESTAMP: # Checking whether + or - is input by the user for forecasting days
offset = int("".join(START_TIMESTAMP.split()).lower().split("today+")[1]) # Extracting no of days input by the user for forecasting
else:
offset = -int("".join(START_TIMESTAMP.split()).lower().split("today-")[1]) # Extracting no of days input by the user for forecasting
start_timestamp = datetime.now().date() + relativedelta(days=offset)
else:
start_timestamp = datetime.fromtimestamp(int(START_TIMESTAMP)) # The fromtimestamp() function is used to return the date corresponding
# to a specified timestamp
time_index = [int((start_timestamp + get_time_delta(FREQUENCY, x)).timestamp())*1000 for x in range(NUMBER_FORCASTS)] # First creating timeindex in form of date format and
# then these are converting into timestamp
# using datetime.timestamp() method.
end_timestamp = datetime.fromtimestamp(int(time_index[-1]/1000))
except Exception as e:
logger.exception(f"Exception while creating time index : {e}")
logger.info(f"calculating simple moving average for {kairos_input_tag} tag from {start_timestamp} to {end_timestamp}")
data_value = list(zip(*historical_data))[1] # Extracting only data values from historical_data consisting of list of list
preds = [round(sum(data_value[-n:])/n, 2) for n in range(1,NUMBER_FORCASTS+1)] # Calculating forcasting data values w.r.to no of days
preds_df = list(map(list, zip(time_index, preds))) # returning list of list of timestamps and respective prediction
# return preds_df
return JSONResponse(content=json.dumps({'Prediction': preds_df}))
def orchestrator():
logger.info("Moving Average Component Started")
for i in list(itertools.product(HIRARCHY_TAGS, list(zip(INPUT_TAGS, OUTPUT_TAGS)))):
logger.info(f"Moving Average Component calculating for {i[0]} hirarchy {i[1][0]} parameter")
kairos_input_tag = f'{i[0].strip()}${i[1][0].strip()}'
kairos_out_tag = f'{i[0].strip()}${i[1][1].strip()}'
data = get_historical_data_forecast(kairos_input_tag)
preds = calculate_SMA(data, kairos_input_tag)
# write_to_kairos(preds, kairos_out_tag, KAIROS_METRC_NAME)
logger.info("Moving Average Component Completed")
# if __name__ == "__main__":
# orchestrator()
if __name__ == '__main__':
orchestrator()
uvicorn.run("main:app", host='127.0.0.1', port=8000)
\ No newline at end of file
from datetime import datetime
from dateutil.relativedelta import relativedelta
import requests
import json
import itertools
from typing import List
import logging as logger
# from scripts.utils.logsetup import logger
# from write_kairos_functions import get_kairos_json, get_tags_json
from scripts.utils.config import *
import uvicorn
from fastapi import FastAPI
from fastapi.responses import JSONResponse
app = FastAPI()
@app.get('/read_kairos')
def read_kairos(tag: str, metric_name: str, frequency: str, number_of_datapoints: int, end_datapoint: int, filter: bool) -> List[List]:
"""
This function reads data from kairos for the given tag and specified configuration
and returns the datapoints.
param: tag: kairos tag name for which the data is to be quiried
param: metric_name: Name of the kairos metric from which the data is to be fetched
param: frequency: frequency of the datapoint in kairos
avalable values: [seconds, minutes, hours, days, weeks, months, years]
param: number_of_datapoints: number of relative datapoints to look back to
param: end_datapoint: number of the last datapoint to be considered
param: filter: flag to specify if filter is to be applied
return: List[List]: historical data for the requested tag which is in the below format
example: [[timestamp, value]]
"""
logger.info(f"Fetching data from kairos for {tag} tag for last {number_of_datapoints} datapoints")
query_template = {
"metrics": [
{
"tags": {KAIROS_TAG_NAME:[tag]},
"name": metric_name,
"aggregators": []
}
],
"start_relative": {
"value": f"{number_of_datapoints}",
"unit": frequency
},
"end_relative": {
"value": f"{end_datapoint}",
"unit": frequency
}
}
if filter:
query_template['metrics'][0]['aggregators'].append({
"name": "filter",
"filter_op": FILTER_CONDITION,
"threshold": FILTER_VALUE
})
response = requests.post(KAIROS_API_URL + "/api/v1/datapoints/query", data=json.dumps(query_template))
if response.status_code != 200:
logger.exception(f"Unable to fetch data for {tag}")
raise Exception(f"Unable to fetch data for {tag}")
else:
logger.info(f"Fetching data from kairos is successful for {tag} tag for last {number_of_datapoints} datapoints")
return response.json()['queries'][0]['results'][0]['values']
@app.get('/get_historical_data_forecast')
def get_historical_data_forecast(tag: str) -> List[List]:
"""
This function is applying the filtering condition if given.
param: tag: kairos tag name for which the data is to be filter
return: List[List] : # return exact no of datapoints we required from filtered datapoints
example: [[timestamp, value]]
"""
if FILTER_TAG: # full tag name with hirarchy on which datapoinst are to be filtered
limiter = 10 # use to avoid running the while loop infinete time
start_number = int(NUMBER_FORCASTS*1.5) # Intially we the taking x 1.5 times datapoints to maintain min no of datapoint in case of filtering
end_number = RELATIVE_END
filtered_data = []
while limiter>0:
kairos_data = read_kairos(FILTER_TAG, KAIROS_METRC_NAME, FREQUENCY, start_number, end_number, True)
logger.info(f"fetched {len(kairos_data)} datapoints successfully for {tag}")
filtered_data[0:0] = kairos_data
if len(filtered_data)>=NUMBER_FORCASTS: # If after filtering operation, no of data points are greater than the min no of datapoints
# required, then we will simply escape from this while loop
limiter = 0
break
end_number = start_number
start_number = int(start_number*1.5) # If after filtering operation, no of data points are lesser than the min no of datapoints
# required, then will take more no of datapoints
limiter-=1 # Initial value of limiter is 10, in case no of data points are lesser than the min no of datapoints required, we will add
# x 1.5 datapoints more in every loop till no of data points becomes greater or equal than the min no of datapoints
# required or limiter becomes 1
else:
logger.error(f"Requested number of valid datapoints {NUMBER_FORCASTS} unavalable looked back {start_number} datapoints")
raise Exception(f"Requested number of valid datapoints {NUMBER_FORCASTS} unavalable looked back {start_number} datapoints")
filtered_timestamps = list(zip(*filtered_data))[0][-NUMBER_FORCASTS:] # Function to extract exact no of datapoints we required
# from filtered datapoints
data = read_kairos(tag, KAIROS_METRC_NAME, FREQUENCY, start_number, RELATIVE_END, False)
return list(filter(lambda x: x[0] in filtered_timestamps, data)) # return exact no of datapoints we required
# from filtered datapoints
else:
# return read_kairos(tag, KAIROS_METRC_NAME, FREQUENCY, NUMBER_FORCASTS, RELATIVE_END, False) # If no filtering condition is given, then this function will
# # return only the read kairos data
return JSONResponse(content=json.dumps({'Get historical data forecast': read_kairos(tag, KAIROS_METRC_NAME, FREQUENCY, NUMBER_FORCASTS, RELATIVE_END, False)}))
# def write_to_kairos(forecast_data: List[List], tag: str, metric: str) -> None:
# """
# This function write data to kairos for the given tag, forecast_data and metric name
# param: forecast_data: data forecasted by SMA function
# param: tag: kairos tag name for which the data is to be write to kairos
# param: metric: name of the metric
# """
# try:
# logger.info(f"pushing forecasted data of {len(forecast_data)} datapoints for {tag}")
# data_arr = []
# try:
# tags_json = get_tags_json(tag)
# tags_split_list = tag.split("$")[0:-1]
# for i in range(1, len(tags_split_list) + 1):
# tags_json[f'l{str(i)}'] = tags_split_list[i - 1]
# data_arr.append(get_kairos_json(metric, forecast_data, tags_json))
# except Exception as e:
# logger.exception(f" Failed to iterate tags {e}")
# response = requests.post(KAIROS_API_URL + "/api/v1/datapoints",
# data=json.dumps(data_arr))
# logger.info(f"Forecasted data of {len(forecast_data)} datapoints for {tag} successfully pushed to Kairos: {response.status_code}")
# if response.status_code != 204:
# logger.exception(f"Kairos insertion failed: {response.text}")
# raise Exception(f"Kairos Insertion Failed")
# except Exception as e:
# logger.exception(f"Exception while writing data to KairosDB : {e}")
@app.get('/get_time_delta')
def get_time_delta(freq: str, period: int) -> relativedelta :
"""
In this function we are defining the time index based on frequency and period
param: freq: frequency of the datapoint in kairos : Avalable Values [seconds, minutes, hours, days, weeks, months, years]
-->Relative information, may be negative (argument is plural); adding or subtracting a relativedelta with relative information
performs the corresponding arithmetic operation on the original datetime value with the information in the relativedelta
param: period: Steps between two consecutive components of frequency means whether freq components are increasing or decreasing by 1 or one in
each step
return: : return time index respective to freq and period
"""
time_delta = {
'years':relativedelta(years=+period),
'months':relativedelta(months=+period),
'weeks':relativedelta(weeks=+period),
'days':relativedelta(days=+period),
'hours':relativedelta(hours=+period),
'minutes':relativedelta(minutes=+period),
'seconds':relativedelta(seconds=+period),
}
return time_delta[freq]
@app.post('/predict')
def calculate_SMA(historical_data: List[List], kairos_input_tag: str) -> List[List]:
"""
Function to calculate simple moving average
param: historical_data: data consisting of list of list of time index and corresponding data values
kairos_input_tag: kairos tag name
return: List[List] : # return list of list consisting of time index and data values
example: [[timestamp, value]]
"""
try:
if 'today' in START_TIMESTAMP.lower(): # Checking if 'today' string present in START TIMESTAMP
if '+' in START_TIMESTAMP: # Checking whether + or - is input by the user for forecasting days
offset = int("".join(START_TIMESTAMP.split()).lower().split("today+")[1]) # Extracting no of days input by the user for forecasting
else:
offset = -int("".join(START_TIMESTAMP.split()).lower().split("today-")[1]) # Extracting no of days input by the user for forecasting
start_timestamp = datetime.now().date() + relativedelta(days=offset)
else:
start_timestamp = datetime.fromtimestamp(int(START_TIMESTAMP)) # The fromtimestamp() function is used to return the date corresponding
# to a specified timestamp
time_index = [int((start_timestamp + get_time_delta(FREQUENCY, x)).timestamp())*1000 for x in range(NUMBER_FORCASTS)] # First creating timeindex in form of date format and
# then these are converting into timestamp
# using datetime.timestamp() method.
end_timestamp = datetime.fromtimestamp(int(time_index[-1]/1000))
except Exception as e:
logger.exception(f"Exception while creating time index : {e}")
logger.info(f"calculating simple moving average for {kairos_input_tag} tag from {start_timestamp} to {end_timestamp}")
data_value = list(zip(*historical_data))[1] # Extracting only data values from historical_data consisting of list of list
preds = [round(sum(data_value[-n:])/n, 2) for n in range(1,NUMBER_FORCASTS+1)] # Calculating forcasting data values w.r.to no of days
preds_df = list(map(list, zip(time_index, preds))) # returning list of list of timestamps and respective prediction
# return preds_df
return JSONResponse(content=json.dumps({'Prediction': preds_df}))
def orchestrator():
logger.info("Moving Average Component Started")
for i in list(itertools.product(HIRARCHY_TAGS, list(zip(INPUT_TAGS, OUTPUT_TAGS)))):
logger.info(f"Moving Average Component calculating for {i[0]} hirarchy {i[1][0]} parameter")
kairos_input_tag = f'{i[0].strip()}${i[1][0].strip()}'
kairos_out_tag = f'{i[0].strip()}${i[1][1].strip()}'
data = get_historical_data_forecast(kairos_input_tag)
preds = calculate_SMA(historical_data = data , kairos_input_tag = kairos_input_tag)
# write_to_kairos(preds, kairos_out_tag, KAIROS_METRC_NAME)
logger.info("Moving Average Component Completed")
# if __name__ == "__main__":
# orchestrator()
if __name__ == '__main__':
orchestrator()
uvicorn.run("main:app", host='127.0.0.1', port=8000)
\ No newline at end of file
import os
from dotenv import load_dotenv
load_dotenv() # take environment variables from .env.
JOB_ID=os.environ.get('JOB_ID')
LOG_LEVEL = os.environ.get("LOG_LEVEL", "DEBUG").upper()
LOGSTASH_HOST = os.environ.get("LOGSTASH_HOST", None)
LOGSTASH_PORT = os.environ.get("LOGSTASH_PORT", None)
LOG_HANDLER_NAME = os.environ.get("LOG_HANDLER_NAME", "MovingAverageComponent")
BASE_LOG_PATH = os.path.join(os.getcwd(), "logs".format())
KAIROS_API_URL=os.environ.get("KAIROS_API_URL").strip()
KAIROS_METRC_NAME=os.environ.get("KAIROS_METRC_NAME").strip()
KAIROS_TAG_NAME = os.environ.get('KAIROS_TAG_NAME', 'c3').strip()
HIRARCHY_TAGS = os.environ.get('HIRARCHY_TAGS').strip().split(',') # comma seperated ilens hirarchy tags
INPUT_TAGS = os.environ.get('INPUT_TAGS').strip().split(',') # comma seperated data tags for forecasting
OUTPUT_TAGS = os.environ.get('OUTPUT_TAGS').strip().split(',') # comma seperated data tags to push forecasted data
START_TIMESTAMP = os.environ.get('START_TIMESTAMP').strip() # absolute or relative timestamp with respect to today or exact UTC timestamp in sec Ex: today+1 or 1642703400
NUMBER_FORCASTS = int(os.environ.get('NUMBER_FORCASTS').strip()) # number of future datapoinst to forecast
FREQUENCY = os.environ.get('FREQUENCY').strip() # frequency of the datapoint in kairos : Avalable Values [seconds, minutes, hours, days, weeks, months, years]
FILTER_TAG = os.environ.get('FILTER_TAG', '').strip() # full tag name with hirarchy on which datapoinst are to be filtered
FILTER_VALUE = float(os.environ.get('FILTER_VALUE', "0").strip()) # value that needs to be filtered out
FILTER_CONDITION = os.environ.get('FILTER_CONDITION', "ne").strip() # condition of the filter to be applied : Avalable Values [equal, ne, lt, lte, gt, gte]
RELATIVE_END = int(os.environ.get('RELATIVE_END', '1').strip()) # relative end datapoint number to concider for historical data default is 1
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from scripts.utils.config import LOG_LEVEL, LOG_HANDLER_NAME, BASE_LOG_PATH
import logging
from logging.handlers import RotatingFileHandler
from logging import WARNING,INFO,DEBUG,ERROR
import os
DEFAULT_FORMAT = '%(asctime)s %(levelname)5s %(name)s %(message)s'
DEBUG_FORMAT = '%(asctime)s %(levelname)5s %(name)s [%(threadName)5s] %(message)s'
EXTRA = {}
FORMATTER = DEFAULT_FORMAT
if LOG_LEVEL.strip() == "DEBUG":
FORMATTER = DEBUG_FORMAT
def get_logger(log_handler_name, extra=EXTRA):
"""
Purpose : To create logger .
:param log_handler_name: Name of the log handler.
:param extra: extra args for the logger
:return: logger object.
"""
log_path = os.path.join(BASE_LOG_PATH, log_handler_name + ".log")
logstash_temp = os.path.join(BASE_LOG_PATH, log_handler_name + ".db")
logger = logging.getLogger(log_handler_name)
logger.setLevel(LOG_LEVEL.strip().upper())
log_handler = logging.StreamHandler()
log_handler.setLevel(LOG_LEVEL)
formatter = logging.Formatter(FORMATTER)
log_handler.setFormatter(formatter)
handler = RotatingFileHandler(log_path, maxBytes=10485760,
backupCount=5)
handler.setFormatter(formatter)
logger.addHandler(log_handler)
logger.addHandler(handler)
logger = logging.LoggerAdapter(logger, extra)
return logger
logger = get_logger(LOG_HANDLER_NAME)
from scripts.utils.logsetup import logger
def get_kairos_json(metric, forecast_data, tags_json):
temp_json = {
"name": metric,
"datapoints": forecast_data,
"tags": tags_json
}
return temp_json
def tag_na_value_check(tag_value, tag_id):
if logger.restrict_na_value_push and tag_value in [None, 'NA', 'nan', 'Nan', 'NAN']:
logger.info(f"Invalid tag value Tag:{tag_id} Value: {tag_value}")
return True
return False
def tag_none_value_check(tag_value, tag_id):
if tag_value is None:
logger.error(f" Invalid tag value found for the tag ID: {tag_id} and value: {tag_value}")
return True
return False
def get_tags_json(tag_id):
return {
"c3": tag_id,
"c1": tag_id.split("$")[0],
"c5": tag_id.split("$")[-1]
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment