Commit cc9339fd authored by aakash.bedi's avatar aakash.bedi

added data push function

parent 4c5662f8
Pipeline #58260 failed with stage
...@@ -2,6 +2,7 @@ if __name__ == "__main__": ...@@ -2,6 +2,7 @@ if __name__ == "__main__":
from dotenv import load_dotenv from dotenv import load_dotenv
load_dotenv(dotenv_path='config.env') load_dotenv(dotenv_path='config.env')
import warnings import warnings
import numpy as np
from loguru import logger from loguru import logger
from scripts.core.engine.mppt_data import GetData from scripts.core.engine.mppt_data import GetData
from scripts.utils.reading_tags import GetTags from scripts.utils.reading_tags import GetTags
...@@ -9,6 +10,8 @@ from scripts.core.engine.tags_data import get_tags_data ...@@ -9,6 +10,8 @@ from scripts.core.engine.tags_data import get_tags_data
from scripts.utils.start_end_date import KairosStartEndDate from scripts.utils.start_end_date import KairosStartEndDate
from scripts.utils.preprocessing import DataPreprocessing from scripts.utils.preprocessing import DataPreprocessing
from scripts.core.engine.inv_and_mppt_level import TrainingInference from scripts.core.engine.inv_and_mppt_level import TrainingInference
from scripts.core.engine.final_tags import GetFinalDf
from scripts.core.data_puller_push.kafka_push import CalculatedDataPush
warnings.filterwarnings("ignore") warnings.filterwarnings("ignore")
...@@ -19,7 +22,9 @@ start_date, end_date, start_timestamp, end_timestamp = KairosStartEndDate().star ...@@ -19,7 +22,9 @@ start_date, end_date, start_timestamp, end_timestamp = KairosStartEndDate().star
def get_tag_details(): def get_tag_details():
try: try:
get_tags = GetTags(base_path=base_path) get_tags = GetTags(base_path=base_path)
get_final_df = GetFinalDf()
tags_excel = get_tags.read_tag_excel() tags_excel = get_tags.read_tag_excel()
df_final_tags = get_final_df.get_predicted_current_tags(tags_excel=tags_excel)
mppt_tags = get_tags.get_mppt_tags(df=tags_excel, substrings='MPPT') mppt_tags = get_tags.get_mppt_tags(df=tags_excel, substrings='MPPT')
df = get_tags_data(mppt_tags=mppt_tags, df = get_tags_data(mppt_tags=mppt_tags,
start_timestamp=start_timestamp, start_timestamp=start_timestamp,
...@@ -31,7 +36,8 @@ def get_tag_details(): ...@@ -31,7 +36,8 @@ def get_tag_details():
df_mppt = mppt_data.current_voltage_mppt_data(df=df) df_mppt = mppt_data.current_voltage_mppt_data(df=df)
data_preprocessing = DataPreprocessing() data_preprocessing = DataPreprocessing()
df_mppt = data_preprocessing.remove_outliers(df=df_mppt, param_list=['tilt_irradiance', 'voltage_mppt', 'current_mppt']) df_mppt = data_preprocessing.remove_outliers(df=df_mppt, param_list=['tilt_irradiance', 'voltage_mppt',
'current_mppt'])
df_mppt, df_train, df_test = data_preprocessing.train_test_split(df=df_mppt) df_mppt, df_train, df_test = data_preprocessing.train_test_split(df=df_mppt)
unique_inv_id = list(df_train.inv_id.unique()) unique_inv_id = list(df_train.inv_id.unique())
unique_mppt_id = list(df_train.mppt_id.unique()) unique_mppt_id = list(df_train.mppt_id.unique())
...@@ -45,9 +51,17 @@ def get_tag_details(): ...@@ -45,9 +51,17 @@ def get_tag_details():
model=model, model=model,
inv_id=inv_id, inv_id=inv_id,
mppt_id=mppt_id) mppt_id=mppt_id)
df_result = mppt_data.get_final_data(x_test=x_test, y_test=y_test, predictions=predictions) df_result = get_final_df.get_final_data(x_test=x_test,
y_test=y_test,
predictions=predictions)
final_dict = get_final_df.get_final_predicted_tags(df_predicted_current_tags=df_final_tags,
inv_id=inv_id, mppt_id=mppt_id)
df_result["timestamp"] = df_result["datetime"].values.astype(np.int64) / 10 ** 9
df_result["timestamp"] = df_result["timestamp"].astype('int')
CalculatedDataPush(df_result=df_result, final_tags_dict=final_dict).kafka_data_push()
# function to push the data to kairos # function to push the data to kairos
logger.info(f'{df_result.shape}') logger.info(f'{final_dict}')
except Exception as e: except Exception as e:
logger.exception(f'Exception - {e}') logger.exception(f'Exception - {e}')
except Exception as e: except Exception as e:
......
This source diff could not be displayed because it is too large. You can view the blob instead.
This diff is collapsed.
This diff is collapsed.
...@@ -2,7 +2,7 @@ APP_NAME=dalmia-solar-degradation ...@@ -2,7 +2,7 @@ APP_NAME=dalmia-solar-degradation
KAIROS_URI=https://iLens:iLensDAL$456@dalmia.ilens.io/kairos/ KAIROS_URI=https://iLens:iLensDAL$456@dalmia.ilens.io/kairos/
KAIROS_METRIC=ilens.live_data.raw KAIROS_METRIC=ilens.live_data.raw
AGGREGATOR=max AGGREGATOR=max
AGGREGATOR_VALUE=30 AGGREGATOR_VALUE=15
AGGREGATOR_UNIT=minutes AGGREGATOR_UNIT=minutes
KAFKA_HOST=192.168.0.220 KAFKA_HOST=192.168.0.220
KAFKA_PORT=9092 KAFKA_PORT=9092
......
from loguru import logger
from scripts.core.data_puller_push.push_data import insert_values_3cp
class CalculatedDataPush:
def __init__(self, df_calculated, all_cal_tags_dict):
self.df_calculated = df_calculated
self.all_cal_tags_dict = all_cal_tags_dict
def kafka_data_push(self, df_calculated):
try:
logger.info(f"Calculated dict length = {len(self.all_cal_tags_dict)}")
logger.info(f"Calculated df shape with Date and Timestamp column = {df_calculated.shape}")
df_calculated_tags_dict = {col: self.all_cal_tags_dict[col] for col in df_calculated.columns
if col not in ('Date', 'timestamp')}
for i, j in df_calculated.iterrows():
my_dict = {v: j[k] for k, v in df_calculated_tags_dict.items()}
logger.info(f"{j['timestamp'], j['Date'], my_dict}")
insert_values_3cp(j['timestamp'], my_dict)
except Exception as e:
logger.exception(f'Exception - {e}')
...@@ -34,7 +34,9 @@ class KairosQuery: ...@@ -34,7 +34,9 @@ class KairosQuery:
"sampling": { "sampling": {
"value": KairosDb.aggregator_value, "value": KairosDb.aggregator_value,
"unit": KairosDb.aggregator_unit "unit": KairosDb.aggregator_unit
} },
"align_sampling": True,
"align_end_time": True
} }
] ]
} }
......
from loguru import logger
from scripts.core.data_puller_push.push_data import insert_values_dalmia
class CalculatedDataPush:
def __init__(self, df_result, final_tags_dict):
self.df_result = df_result
self.final_tags_dict = final_tags_dict
def kafka_data_push(self):
try:
logger.info(f"final_tags_dict = {self.final_tags_dict}")
logger.info(f"df result shape = {self.df_result.shape}")
for i, j in self.df_result.iterrows():
my_dict = {v: j[k] for k, v in self.final_tags_dict.items()}
logger.info(f"{j['timestamp'], j['datetime'], my_dict}")
insert_values_dalmia(j['timestamp'], my_dict)
except Exception as e:
logger.exception(f'Exception - {e}')
from json import dumps from json import dumps
from kafka import KafkaProducer from kafka import KafkaProducer
from loguru import logger from loguru import logger
from scripts.constants.app_configuration import Kafka from scripts.constants.app_configuration import Kafka
def insert_values_3cp(ts, my_dict): def insert_values_dalmia(ts, my_dict):
kairos_writer = KairosWriter() kairos_writer = KairosWriter()
kairos_writer.write_data( kairos_writer.write_data(
{ {
...@@ -46,7 +44,7 @@ class KafkaProducerUtil: ...@@ -46,7 +44,7 @@ class KafkaProducerUtil:
class KairosWriter(KafkaProducerUtil): class KairosWriter(KafkaProducerUtil):
def write_data(self, data_json, topic): def write_data(self, data_json, topic):
site_id = "site_116" site_id = "site_107"
logger.debug(f"Data being pushed to kafka topic: {topic}") logger.debug(f"Data being pushed to kafka topic: {topic}")
msg_counter = 0 msg_counter = 0
for k, v in data_json.items(): for k, v in data_json.items():
......
import pandas as pd
from loguru import logger
class GetFinalDf:
@staticmethod
def get_final_data(x_test, y_test, predictions):
try:
df_result = pd.DataFrame(index=[i for i in range(len(y_test))])
df_result['datetime'] = x_test['datetime']
df_result['actual_current_mppt'] = y_test
df_result['predicted_current_mppt'] = predictions
df_result.drop(['actual_current_mppt'], axis=1, inplace=True)
df_result = df_result.round(2)
df_result.reset_index(drop=True, inplace=True)
logger.info(f'{df_result.shape}')
return df_result
except Exception as e:
logger.exception(f'Exception - {e}')
@staticmethod
def get_predicted_current_tags(tags_excel):
try:
df = tags_excel.copy()
df['parameter_name'] = df['parameter_name'].str.replace('Potential Current MPPT ',
'potential_current_mppt_')
df['inv_id'] = df['inv_id'].str.replace('INV ', 'inv_')
df['mppt_id'] = df['parameter_name'].copy()
df['mppt_id'] = df['mppt_id'].str.replace('potential_current_', '')
req_substrings = 'mppt_'
data_with_substring = [data for data in df['mppt_id'] if req_substrings in data]
df = df.loc[df['mppt_id'].isin(data_with_substring)]
df = df.sort_values(['inv_id', 'mppt_id'])
df.reset_index(drop=True, inplace=True)
return df
except Exception as e:
logger.exception(f'Exception - {e}')
@staticmethod
def get_final_predicted_tags(df_predicted_current_tags, inv_id, mppt_id):
try:
df = df_predicted_current_tags[df_predicted_current_tags['inv_id'] == inv_id]
df = df[df['mppt_id'] == mppt_id]
df.reset_index(drop=True, inplace=True)
final_dict = {}
for index in range(df.shape[0]):
tag_id = df.iloc[index, df.columns.get_loc('tag_id')]
parameter_name = df.iloc[index, df.columns.get_loc('parameter_name')]
final_dict['predicted_current_mppt'] = tag_id
return final_dict
except Exception as e:
logger.exception(f'Exception - {e}')
\ No newline at end of file
...@@ -30,29 +30,5 @@ class GetData: ...@@ -30,29 +30,5 @@ class GetData:
except Exception as e: except Exception as e:
logger.exception(f'Exception - {e}') logger.exception(f'Exception - {e}')
@staticmethod
def get_final_data(x_test, y_test, predictions):
try:
df_result = pd.DataFrame(index=[i for i in range(len(y_test))])
df_result['datetime'] = x_test['datetime']
df_result['actual_current_mppt'] = y_test
df_result['predicted_current_mppt'] = predictions
df_result['potential_current_mppt_loss'] = df_result['predicted_current_mppt'] - \
df_result['actual_current_mppt']
df_result.loc[df_result['potential_current_mppt_loss'] < 0, 'potential_current_mppt_loss'] = 0
df_result['hour'] = df_result['datetime'].dt.hour
df_result.loc[df_result['hour'].between(left=18, right=23, inclusive='both'),
'potential_current_mppt_loss'] = 0
df_result.loc[df_result['hour'].between(left=0, right=6, inclusive='both'),
'potential_current_mppt_loss'] = 0
df_result.drop(['hour'], axis=1, inplace=True)
df_result['total_potential_current_mppt_loss'] = df_result['potential_current_mppt_loss']. \
rolling(min_periods=1, window=len(df_result)).sum()
df_result.drop(['actual_current_mppt', 'predicted_current_mppt'], axis=1, inplace=True)
df_result = df_result.round(2)
df_result.reset_index(drop=True, inplace=True)
logger.info(f'{df_result.shape}')
return df_result
except Exception as e:
logger.exception(f'Exception - {e}')
...@@ -9,7 +9,9 @@ class GetTags: ...@@ -9,7 +9,9 @@ class GetTags:
def read_tag_excel(self): def read_tag_excel(self):
try: try:
df = pd.read_excel(f'{self.base_path}/tags_download.xlsx') df = pd.read_excel(f'{self.base_path}/tags_download.xlsx')
df.drop(['Site', 'Plant', 'Line', 'Tag'], axis=1, inplace=True) df.drop(['Site', 'Plant', 'Line', 'Tag', 'Unit', 'Tag Register', 'System Rules', 'Target',
'Target limits', 'Deviation', 'Indicator', 'Lower Limit', 'Upper Limit'],
axis=1, inplace=True)
df.rename(columns={'Tag ID': 'tag_id', 'Tag Name': 'tag_name', df.rename(columns={'Tag ID': 'tag_id', 'Tag Name': 'tag_name',
'Equipment': 'inv_id', 'Parameter Name': 'parameter_name'}, inplace=True) 'Equipment': 'inv_id', 'Parameter Name': 'parameter_name'}, inplace=True)
return df return df
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment