Commit 643d9f53 authored by dasharatha.vamshi's avatar dasharatha.vamshi

init

parent d36dd9fe
This diff is collapsed.
#!/usr/bin bash
pip install ruff black isort --upgrade
ruff scripts
black scripts --check
isort scripts --check-only
FROM python:3.8-buster
COPY . /code
WORKDIR /code
RUN pip install -r requirements.txt
CMD [ "python","app.py" ]
\ No newline at end of file
__version__ = "V0.0.1"
from loguru import logger
from scripts.utils.pipeline_util import Pipeline
class RawMaterialOptimization:
def __init__(self, model_id):
logger.info("Starting the Module")
self.pipeline_obj = Pipeline()
self.model_id = model_id
def build_model(self):
model_data = self.pipeline_obj.get_meta_data(self.model_id)
print(model_data)
return model_data
if __name__ == "__main__":
model_catalog_id = "01595870-f91a-4db0-abfa-28efe60b219c"
__rmo = RawMaterialOptimization(model_catalog_id)
__rmo.build_model()
[KAIROS_DB]
uri=$KAIROS_URI
[KAFKA]
kafka_host=$KAFKA_HOST
kafka_port=$KAFKA_PORT
kafka_topic=$KAFKA_TOPIC
[POSTGRES]
postgres_uri=$POSTGRES_URI
[MONGO]
mongo_uri = $MONGO_URI
[MLFLOW]
mlflow_tracking_uri = $MLFLOW_TRACKING_URI
[MQTT]
host=$MQTT_HOST
port=$MQTT_PORT
username=$MQTT_USERNAME
password=$MQTT_PASSWORD
ssl=$SSL
connection_type=$CONNECTION_TYPE
\ No newline at end of file
KAIROS_URI= https://ilens:iLens$456@qa.ilens.io/kairos
KAFKA_HOST=192.168.0.220
KAFKA_PORT=9092
KAFKA_TOPIC=ilens_dev
POSTGRES_URI = postgresql://iLens:iLensJUB$456@jubilant.ilens.io/kairos
MONGO_URI=mongodb://iLens:iLens#1234@192.168.0.217:30904/
AZURE_STORAGE_CONNECTION_STRING=DefaultEndpointsProtocol=https;AccountName=azrmlilensqa006382180551;AccountKey=tDGOKfiZ2svfoMvVmS0Fbpf0FTHfTq4wKYuDX7cAxlhve/3991QuzdvJHm9vWc+lo6mtC+x9yPSghWNR4+gacg==;EndpointSuffix=core.windows.net
AZURE_STORAGE_ACCESS_KEY=tDGOKfiZ2svfoMvVmS0Fbpf0FTHfTq4wKYuDX7cAxlhve/3991QuzdvJHm9vWc+lo6mtC+x9yPSghWNR4+gacg==
MLFLOW_TRACKING_URI=http://104.208.73.213/mlflow-v2/
MLFLOW_TRACKING_USERNAME=iLens
MLFLOW_TRACKING_PASSWORD=iLensMLFLow$12#
MQTT_HOST=192.168.0.207
MQTT_PORT=8082
SSL=false
CONNECTION_TYPE=tcp
\ No newline at end of file
pytz==2021.3
loguru==0.5.3
scipy==1.7.1
numpy==1.21.2
pandas==1.3.3
mlflow==1.20.2
sklearn
simplejson==3.17.5
requests==2.26.0
pydantic==1.8.2
python-dotenv==0.19.2
PyYAML==6.0
kafka-python==1.4.7
SQLAlchemy
sqlparse==0.4.2
psycopg2==2.9.1
azure-storage-blob==12.7.1
pymongo==3.12.0
\ No newline at end of file
import os
import sys
from configparser import BasicInterpolation, ConfigParser
from dotenv import load_dotenv
# Configuration File Constants
from pydantic import BaseModel
_application_conf = "./conf/application.conf"
_default_conf = "./config.env"
load_dotenv(dotenv_path=_default_conf)
class EnvInterpolation(BasicInterpolation):
"""
Interpolation which expands environment variables in values.
"""
def before_get(self, parser, section, option, value, defaults):
value = super().before_get(parser, section, option, value, defaults)
if not os.path.expandvars(value).startswith("$"):
return os.path.expandvars(value)
else:
return
try:
config = ConfigParser(interpolation=EnvInterpolation())
config.read(_application_conf)
except Exception as e:
print(f"Error while loading the config: {e}")
print("Failed to Load Configuration. Exiting!!!")
sys.exit()
class Logging:
level = config.get("LOGGING", "level", fallback="INFO")
level = level if level else "INFO"
tb_flag = config.getboolean("LOGGING", "traceback", fallback=True)
tb_flag = tb_flag if tb_flag is not None else True
# Configuration Variables
# Kairos Configuration Variables
KAIROS_DB_HOST = config["KAIROS_DB"]["uri"]
# Postgres Configuration Variables
POSTGRES_URI = config["POSTGRES"]["postgres_uri"]
# Kafka Configuration Variables
KAFKA_HOST = config["KAFKA"]["kafka_host"]
KAFKA_PORT = config["KAFKA"]["kafka_port"]
KAFKA_TOPIC = config["KAFKA"]["kafka_topic"]
MONGO_URI = config["MONGO"]["mongo_uri"]
MLFLOW_TRACKING_URI = config["MLFLOW"]["mlflow_tracking_uri"]
class MQTTConfig(BaseModel):
host: str = config.get("MQTT", "host")
port: int = config.getint("MQTT", "port")
username: str = config.get("MQTT", "username")
password: str = config.get("MQTT", "password")
ssl: bool = config.getboolean("MQTT", "ssl")
connection_type: str = config.get("MQTT", "connection_type")
class KairosConstants:
TRAINING_QUERY = {
"metrics": [],
"plugins": [],
"cache_time": 0,
"time_zone": "Asia/Calcutta",
"start_absolute": None,
"end_absolute": None,
}
LIVE_QUERY = {
"metrics": [],
"plugins": [],
"cache_time": 0,
"time_zone": "Asia/Calcutta",
"start_relative": {"value": "30", "unit": "days"},
}
TAGS = {"tags": {"c3": []}, "name": None}
GROUP_BY = {"group_by": [{"name": "tag", "tags": ["c3"]}]}
# aggregators supported avg,count,first,gaps,last,least_squares,max,min,sum. total 9
# aggregators not supported dev,diff,div,filter,percentile,rate,sampler,save_as
# ,scale,trim total 10
AGGREGATORS = {
"aggregators": [{"name": None, "sampling": {"value": None, "unit": None}}]
}
AGGREGATOR_KEYS_MAPPING = {
"AVG": "avg",
"COUNT": "count",
"FIRST": "first",
"GAPS": "gaps",
"LAST": "last",
"LEAST SQUARES": "least_squares",
"MAX": "max",
"MIN": "min",
"SUM": "sum",
}
ALIGNMENT_MAPPING = {
"None": None,
"Sample": "align_sampling",
"Start Time": "align_start_time",
"End Time": "align_end_time",
}
UNITS_MAPPING = {
"Years": "years",
"Months": "months",
"Weeks": "weeks",
"Days": "days",
"Hours": "hours",
"Minutes": "minutes",
"Seconds": "seconds",
"Milliseconds": "milliseconds",
}
TRAINING_INPUT_DICT = {
"metric_name": "ilens.live_data.raw",
"start_absolute": None,
"end_absolute": None,
"tag_details": [],
}
DEFAULT_TRAINING_INPUT_GROUP_BY = {
"tag": None,
"group_by": True,
"aggregators": {
"name": "AVG",
"sampling_value": "15",
"sampling_unit": "Minutes",
"align": "End Time",
},
}
class Constants:
FEATURE_IMPORTANCE_FILE_NAME = "feature_importance.csv"
PROFILE_FILE_NAME = "profile.csv"
TRAINING_FILE_NAME = "training.csv"
class DBConstants:
# Databases
db_ilens_yield_optimizer = "project_181__ilens_yield_optimizer"
# Collections
collection_yield_model_catalog = "yield_model_catalog"
collection_yield_profile_catalog = "yield_profile_catalog"
from kafka import KafkaProducer
from loguru import logger
from scripts.constants.app_configuration import KAFKA_HOST, KAFKA_PORT
class KafkaProducerUtil:
def __init__(self):
try:
self.host = KAFKA_HOST
self.port = KAFKA_PORT
logger.debug(f"Connecting to Kafka with details: {self.host}, {self.port}")
kafka_broker = [self.host + ":" + str(self.port)]
self.producer = KafkaProducer(
bootstrap_servers=kafka_broker,
value_serializer=lambda v: v.encode("utf-8"),
api_version=(0, 10, 1),
)
self.producer.flush()
except Exception as e:
logger.error(f"Kafka connection error: {e}")
def publish(self, topic, data):
try:
kafka_response = self.producer.send(topic, data)
self.producer.flush()
logger.debug(f" Message sent to kafka with response: {kafka_response}")
return True
except Exception as e:
logger.error(e)
return False
import copy
import json
import pandas as pd
import requests
from loguru import logger
from scripts.constants.app_constants import KairosConstants
class KairosQueryBuilder:
def __init__(self, inp_dict):
self.inp_dict = inp_dict
@staticmethod
def group_tags(tag_details):
logger.info("Grouping all the tags having same group_by and aggregators")
tags_dict = []
for i in tag_details:
if len(tag_details) == 0:
tags_dict.append(i)
else:
flag = False
for j in tags_dict:
if (
j["group_by"] == i["group_by"]
and j["aggregators"] == i["aggregators"]
):
j["tag"] = j["tag"] + i["tag"]
flag = True
if not flag:
tags_dict.append(i)
return tags_dict
@staticmethod
def check_group_by(temp_dict, k):
groupby_flag = True
if k["group_by"]:
temp_dict["group_by"] = copy.deepcopy(KairosConstants.GROUP_BY)["group_by"]
else:
groupby_flag = False
return temp_dict, groupby_flag
@staticmethod
def check_aggregation(temp_dict, k):
aggregation_flag = True
if len(k["aggregators"]) > 0:
temp_dict["aggregators"][0][
"name"
] = KairosConstants.AGGREGATOR_KEYS_MAPPING[k["aggregators"]["name"]]
temp_dict["aggregators"][0]["sampling"]["value"] = k["aggregators"][
"sampling_value"
]
temp_dict["aggregators"][0]["sampling"][
"unit"
] = KairosConstants.UNITS_MAPPING[k["aggregators"]["sampling_unit"]]
else:
aggregation_flag = False
return temp_dict, aggregation_flag
def get_query_type(self, train=True):
if train:
# getting the start and end absolute time
logger.info("Building the Training Query")
output_query = copy.deepcopy(KairosConstants.TRAINING_QUERY)
output_query["start_absolute"] = self.inp_dict["start_absolute"]
output_query["end_absolute"] = self.inp_dict["end_absolute"]
else:
logger.info("Building the Live Query")
output_query = copy.deepcopy(KairosConstants.LIVE_QUERY)
return output_query
def build_query(self, train=True):
output_query = self.get_query_type(train)
# getting the metric name
metric_name = self.inp_dict["metric_name"]
# getting all the tag details
tag_details = self.inp_dict["tag_details"]
if len(tag_details) > 0:
# grouping all the tags whose group_by and aggregators match
tags_dict = self.group_tags(tag_details)
for k in tags_dict:
tags_skeleton = copy.deepcopy(KairosConstants.TAGS)
tags_skeleton["name"] = metric_name
temp_dict = {
"c3": k["tag"],
"aggregators": copy.deepcopy(KairosConstants.AGGREGATORS)[
"aggregators"
],
}
# adding group by data
temp_dict, groupby_flag = self.check_group_by(temp_dict, k)
# adding aggregation data
temp_dict, aggregation_flag = self.check_aggregation(temp_dict, k)
if k["aggregators"]["align"] is None:
logger.info("No align needed")
elif k["aggregators"]["align"] in ["Sample", "Start Time", "End Time"]:
if k["aggregators"]["align"] in ["Start Time", "End Time"]:
temp_dict["aggregators"][0][
KairosConstants.ALIGNMENT_MAPPING["Sample"]
] = True
temp_dict["aggregators"][0][
KairosConstants.ALIGNMENT_MAPPING[k["aggregators"]["align"]]
] = True
tags_skeleton["tags"]["c3"] = temp_dict["c3"]
if groupby_flag:
tags_skeleton["group_by"] = temp_dict["group_by"]
if aggregation_flag:
tags_skeleton["aggregators"] = temp_dict["aggregators"]
output_query["metrics"].append(tags_skeleton)
else:
logger.info("tag details not found")
return output_query
class DataPuller(object):
def __init__(self, db_host, payload, absolute_time=None, optional_payload=None):
self.db_host_url = db_host
self.request_url = "{kairos_host}/api/v1/datapoints/query".format(
kairos_host=self.db_host_url
)
self.payload = payload
self.column_rename = {}
if absolute_time is not None:
if "start_relative" in self.payload:
del self.payload["start_relative"]
if "end_relative" in self.payload:
del self.payload["end_relative"]
self.payload["start_absolute"] = absolute_time["start_absolute"]
self.payload["end_absolute"] = absolute_time["end_absolute"]
self.payload = json.dumps(self.payload)
def get_data(self):
logger.info("Data for the parameters being pulled from Kairos Database")
response_data = requests.post(url=self.request_url, data=self.payload).json()
output_data = response_data["queries"]
logger.debug("Data pull complete")
df_final = pd.DataFrame()
for i in range(len(output_data)):
grouped_output_data = output_data[i]["results"]
for each_grouped_data in grouped_output_data:
value = each_grouped_data["values"]
tag_id = each_grouped_data["group_by"][0]["group"]["c3"]
try:
logger.debug(
"Renamed {} to {} in Data".format(
tag_id, self.column_rename[tag_id]
)
)
column_name = self.column_rename[tag_id]
except KeyError as ke:
logger.debug(f"Column Renaming Logic not found for {tag_id} - {ke}")
column_name = tag_id
df_column_data = pd.DataFrame(
data=value, columns=["timestamp", column_name]
)
if df_final.empty:
df_final = df_column_data
else:
df_final = df_final.merge(
df_column_data,
how="outer",
left_on="timestamp",
right_on="timestamp",
)
df_final["epochtime"] = df_final["timestamp"]
df_final["timestamp"] = (
pd.to_datetime(df_final["timestamp"], unit="ms")
.dt.tz_localize("UTC")
.dt.tz_convert("Asia/Kolkata")
)
df_final.to_csv("data-upload.csv", index=False)
df_final["shift"] = df_final.apply(self.shift_identifier, axis=1)
df_final["date"] = df_final.apply(self.shift_date_identifier, axis=1)
logger.debug(
"Final number of columns : {}".format(str(len(list(df_final.columns))))
)
df_final.to_csv("data-upload", index=False)
return df_final
@staticmethod
def shift_identifier(row):
# morning 6 am to afternoon 2 pm is shift A, afternoon 2 pm to evening
# 10 pm is shift B, evening 10 pm to night
# 6 am is shift C
if 6 <= row["timestamp"].hour < 14:
return "A"
elif 14 <= row["timestamp"].hour < 22:
return "B"
else:
return "C"
@staticmethod
def shift_date_identifier(row):
if 6 <= row["timestamp"].hour < 14:
return row["timestamp"].date()
elif 14 <= row["timestamp"].hour < 22:
return row["timestamp"].date()
elif 22 <= row["timestamp"].hour <= 23:
return row["timestamp"].date() + pd.Timedelta(days=0)
else:
return row["timestamp"].date() + pd.Timedelta(days=-1)
from uuid import uuid4
from scripts.constants.app_constants import DBConstants
from scripts.utils.mongo_util import MongoCollectionBaseClass
class ModelCatalogCollection(MongoCollectionBaseClass):
def __init__(self, mongo_client, project_id=None):
super().__init__(
mongo_client,
database=DBConstants.db_ilens_yield_optimizer,
collection=DBConstants.collection_yield_model_catalog,
)
self.project_id = project_id
def save_model(self, data: dict):
if data.get("id") is None:
data["id"] = str(uuid4())
self.update_one(query={"id": data.get("id")}, data=data, upsert=True)
return {"id": data.get("id")}
def partial_update_model(self, model_id, data: dict):
# todo update meta also.
self.update_one(query={"id": model_id}, data=data)
return id
def get_one_model_catalog(self, model_id):
return self.find_one(query={"id": model_id})
from uuid import uuid4
from scripts.constants.app_constants import DBConstants
from scripts.utils.mongo_util import MongoCollectionBaseClass
class ProfileCatalogCollection(MongoCollectionBaseClass):
def __init__(self, mongo_client, project_id=None):
super().__init__(
mongo_client,
database=DBConstants.db_ilens_yield_optimizer,
collection=DBConstants.collection_yield_profile_catalog,
)
self.project_id = project_id
def save_profile(self, data: dict):
if data.get("id") is None:
data["id"] = str(uuid4())
self.update_one(
query={"model_catalog_id": data.get("model_catalog_id")},
data=data,
upsert=True,
)
return {"id": data.get("id")}
def partial_update_profile(self, profile_id, data: dict):
# todo update meta also.
self.update_one(query={"id": profile_id}, data=data)
return id
def get_one_profile_catalog(self, model_catalog_id):
return self.find_one(query={"model_catalog_id": model_catalog_id})
from typing import Any, Dict
from pydantic import BaseModel
class KafkaDataModel(BaseModel):
data: Dict[str, Any]
site_id: str
gw_id: str = ""
pd_id: str = ""
timestamp: int
msg_id: int = 0
partition: str = ""
retain_flag: bool = False
from typing import Dict, List, Optional
from pymongo import MongoClient
from pymongo.cursor import Cursor
class MongoConnect:
def __init__(self, uri):
try:
self.uri = uri
self.client = MongoClient(self.uri, connect=False)
except Exception:
raise
def __call__(self, *args, **kwargs):
return self.client
def __repr__(self):
return f"Mongo Client(uri:{self.uri}, server_info={self.client.server_info()})"
class MongoCollectionBaseClass:
def __init__(self, mongo_client, database, collection):
self.client = mongo_client
self.database = database
self.collection = collection
# self.data_encryption = MongoDataEncryption()
# Variable to preserve initiated database
# (if database name changes during runtime)
self.__database = None
def __repr__(self):
return (
f"{self.__class__.__name__}"
f"(database={self.database}, collection={self.collection})"
)
def insert_one(self, data: Dict):
"""
The function is used to inserting a document to a collection in a Mongo DB.
:param: data: Data to be inserted
:return: Insert ID
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.insert_one(data)
return response.inserted_id
except Exception:
raise
def insert_many(self, data: List):
"""
The function is used to inserting documents to a collection in a Mongo Database.
:param: data: List of Data to be inserted
:return: Insert IDs
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.insert_many(data)
return response.inserted_ids
except Exception:
raise
def find(
self,
query: Dict,
filter_dict: Optional[Dict] = None,
sort=None,
skip: Optional[int] = 0,
limit: Optional[int] = None,
) -> Cursor:
"""
The function is used to query documents from a given collection in a Mongo DB
:param query: Query Dictionary
:param filter_dict: Filter Dictionary
:param sort: List of tuple with key and direction. [(key, -1), ...]
:param skip: Skip Number
:param limit: Limit Number
:return: List of Documents
"""
if sort is None:
sort = list()
if filter_dict is None:
filter_dict = {"_id": 0}
database_name = self.database
collection_name = self.collection
try:
db = self.client[database_name]
collection = db[collection_name]
if len(sort) > 0:
cursor = (
collection.find(
query,
filter_dict,
)
.sort(sort)
.skip(skip)
)
else:
cursor = collection.find(
query,
filter_dict,
).skip(skip)
if limit:
cursor = cursor.limit(limit)
return cursor
except Exception:
raise
def find_one(self, query: Dict, filter_dict: Optional[Dict] = None):
try:
database_name = self.database
collection_name = self.collection
if filter_dict is None:
filter_dict = {"_id": 0}
db = self.client[database_name]
collection = db[collection_name]
response = collection.find_one(query, filter_dict)
return response
except Exception:
raise
def update_one(self, query: Dict, data: Dict, upsert: bool = False):
"""
:param upsert:
:param query:
:param data:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.update_one(query, {"$set": data}, upsert=upsert)
return response.modified_count
except Exception:
raise
def update_to_set(self, query: Dict, param: str, data: Dict, upsert: bool = False):
"""
:param upsert:
:param query:
:param param:
:param data:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.update_one(
query, {"$addToSet": {param: data}}, upsert=upsert
)
return response.modified_count
except Exception:
raise
def update_many(self, query: Dict, data: Dict, upsert: bool = False):
"""
:param upsert:
:param query:
:param data:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.update_many(query, {"$set": data}, upsert=upsert)
return response.modified_count
except Exception:
raise
def delete_many(self, query: Dict):
"""
:param query:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.delete_many(query)
return response.deleted_count
except Exception:
raise
def delete_one(self, query: Dict):
"""
:param query:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.delete_one(query)
return response.deleted_count
except Exception:
raise
def distinct(self, query_key: str, filter_json: Optional[Dict] = None):
"""
:param query_key:
:param filter_json:
:return:
"""
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.distinct(query_key, filter_json)
return response
except Exception:
raise
def aggregate(
self,
pipelines: List,
):
try:
database_name = self.database
collection_name = self.collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.aggregate(pipelines)
return response
except Exception:
raise
class MongoAggregateBaseClass:
def __init__(
self,
mongo_client,
database,
):
self.client = mongo_client
self.database = database
def aggregate(
self,
collection,
pipelines: List,
):
try:
database_name = self.database
collection_name = collection
db = self.client[database_name]
collection = db[collection_name]
response = collection.aggregate(pipelines)
return response
except Exception:
raise
import json
from abc import abstractmethod
import paho.mqtt.client as mqtt
from loguru import logger
class BaseMQTTUtil(object):
def __init__(
self,
client_id="",
host="localhost",
port=1883,
username=None,
password=None,
ssl=False,
certificate_type=None,
connection_type="tcp",
) -> None:
super().__init__()
self.client_id = client_id
self.host = host
self.port = port
self.username = username
self.password = password
self.ssl = ssl
self.certificate_type = certificate_type
self.connection_type = connection_type
self.ca_cert = None
self.certfile = None
self.keyfile = None
self.connection_error_codes = {1: 1, 2: 2, 3: 3, 4: 4, 5: 5}
self._client = self.create_client()
self.topiclist = []
self.func = None
self._client.connect(host=self.host, port=self.port, keepalive=60)
def create_client(self):
client = mqtt.Client(
client_id=self.client_id, clean_session=True, transport=self.connection_type
)
if self.ssl:
self.prepare_tls_set_args()
client.tls_set()
# raise NotImplementedError(
# "ssl based MQTT connection is not enabled")
if self.username is not None and self.password is not None:
logger.info("Configuring Credentials for MQTT")
client.username_pw_set(username=self.username, password=self.password)
return client
def executor_function(self, f):
"""
Function to be executed for the data after subscribing to topics
- This function should always be called before subscribing to a topic
"""
self.func = f
@property
def client(self):
return self._client
def disconnect(self):
self._client.disconnect()
def f(self):
...
def prepare_tls_set_args(self):
...
@abstractmethod
def on_message(self, client, userdata, msg):
...
@abstractmethod
def on_connect(self, client, userdata, flags, rc):
...
@abstractmethod
def on_disconnect(self, client, userdata, rc=0):
...
@abstractmethod
def publish(self, topic, payload=None, qos=0, retain=False):
...
@abstractmethod
def subscribe(self, topic, qos=0):
...
class MQTTUtil(BaseMQTTUtil):
"""
### Usage:
----------
#### Subscribing to a topic
>>> mqtt_obj = MQTTUtil(host='localhost', port=1883)
>>> mqtt_obj.executor_function(print)
>>> mqtt_obj.subscribe(topic="mqtt/topic", qos=0, return_type = "payload")
#### Publishing to a topic
>>> mqtt_obj = MQTTUtil(host='localhost', port=1883)
>>> mqtt_obj.publish(topic="mqtt/topic", payload="data", qos=0, retain = False)
"""
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
def on_message(self, client, userdata, msg):
logger.trace("Message received on high priority channel")
if self.return_type == "payload":
self.func(json.loads(msg.payload.decode("utf-8")))
elif self.return_type == "all":
self.func(
{
"data": msg.payload.decode("utf-8"),
"topic": msg.topic,
"qos": msg.qos,
"msg": msg,
}
)
else:
raise TypeError("Unsupported return type for the executor function")
def on_connect(self, client, userdata, flags, rc):
logger.info("Successfully connected to (MQTT)")
client.subscribe(self.topiclist)
logger.debug(
"Agent has subscribed to the MQTT topic '{}'".format(self.topiclist)
)
def on_disconnect(self, client, userdata, rc=0):
logger.warning(
"MQTT lost connection: {}".format(self.connection_error_codes[rc])
)
print(self.connection_error_codes[rc])
self._client.reconnect()
def publish(self, topic, payload=None, qos=0, retain=False):
if not self._client.is_connected():
self._client = self.create_client()
logger.info("client Not connected\nConnecting client to Host...")
self._client.connect(host=self.host, port=self.port, keepalive=60)
logger.info("client connected")
return self._client.publish(topic, payload=payload, qos=qos, retain=retain)
def subscribe(self, topic, qos=0, return_type="payload"):
"""
:param -> return_type = "payload" | "all"
payload: returns decoded subscribed mqtt message
all: returns a dict of all keys of mqtt message (with a mqtt msg object)
"""
self.return_type = return_type
if self.func is None:
raise ModuleNotFoundError(
"Executor Function is not set.\ncall executor function and then"
" pass the function to be executed for subscribed topic"
)
self.topiclist.append((topic, qos))
if self._client.is_connected():
self._client.disconnect()
self._client.reinitialise()
self._client.on_connect = self.on_connect
self._client.on_disconnect = self.on_disconnect
self._client.on_message = self.on_message
if not self._client.is_connected():
logger.info("client Not connected\nConnecting client to Host...")
self._client.connect(host=self.host, port=self.port, keepalive=60)
logger.info("client connected")
self._client.loop_start()
def prepare_tls_set_args(self):
...
from dateutil import parser
from loguru import logger
from scripts.constants.app_configuration import MONGO_URI
from scripts.core.db.model_catalog import ModelCatalogCollection
from scripts.utils.mongo_util import MongoConnect
class Pipeline:
def __init__(self, project_id=None):
self.mongo_client = MongoConnect(uri=MONGO_URI)()
self.project_id = project_id
def get_model_catalog_data(self, model_catalog_id):
logger.info(f"Getting Data for model catalog id: {model_catalog_id}")
model_catalog_collection = ModelCatalogCollection(
mongo_client=self.mongo_client, project_id=self.project_id
)
return model_catalog_collection.get_one_model_catalog(model_id=model_catalog_id)
def parse_model_data(self, data, model_catalog_id):
basic_info = data["basic_info"]
process_name = basic_info["process_name"]
stage_name = basic_info["stage_name"]
logger.info(f"Process Name: {process_name} and Stage Name: {stage_name}")
start_time = None
end_time = None
for i in data["criteria"]["args"]:
if i["key"] == "date_from":
start_time = i["value"]
elif i["key"] == "date_to":
end_time = i["value"]
logger.info(f"Start time: {start_time}, end time: {end_time}")
start_time = parser.parse(start_time).timestamp() * 1000
end_time = parser.parse(end_time).timestamp() * 1000
return {
"process_name": process_name,
"stage_name": stage_name,
"start_time": int(start_time),
"end_time": int(end_time),
}
def get_meta_data(self, model_id):
model_data = self.get_model_catalog_data(model_catalog_id=model_id)
model_data_dict = self.parse_model_data(model_data, model_id)
return model_data_dict
from scripts.constants.app_configuration import MQTTConfig
from scripts.utils.mqtt_util import MQTTUtil
class PushNotification:
def __init__(self) -> None:
self.mqtt_obj = MQTTUtil(**MQTTConfig().dict())
def send_notification(self, user_id, **payload):
...
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment