Commit b135873f authored by dasharatha.vamshi's avatar dasharatha.vamshi

completed the flow

parent 647d193e
...@@ -3,7 +3,7 @@ kind: Workflow ...@@ -3,7 +3,7 @@ kind: Workflow
metadata: metadata:
annotations: annotations:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.18 pipelines.kubeflow.org/kfp_sdk_version: 1.8.18
pipelines.kubeflow.org/pipeline_compilation_time: '2023-09-18T22:25:44.367965' pipelines.kubeflow.org/pipeline_compilation_time: '2023-09-18T23:21:16.312549'
pipelines.kubeflow.org/pipeline_spec: '{"description": "All Components", "inputs": pipelines.kubeflow.org/pipeline_spec: '{"description": "All Components", "inputs":
[{"description": "", "name": "pipeline_param", "type": "JsonObject"}, {"description": [{"description": "", "name": "pipeline_param", "type": "JsonObject"}, {"description":
"", "name": "plant_info", "type": "JsonObject"}], "name": "Dalmia"}' "", "name": "plant_info", "type": "JsonObject"}], "name": "Dalmia"}'
...@@ -1040,8 +1040,8 @@ spec: ...@@ -1040,8 +1040,8 @@ spec:
\ api_version=(0, 10, 1))\n self.producer.flush()\n\ \ api_version=(0, 10, 1))\n self.producer.flush()\n\
\ except Exception as e:\n logger.error(f\"Kafka\ \ except Exception as e:\n logger.error(f\"Kafka\
\ connection error: {e}\")\n\n def publish(self, topic, data):\n \ \ connection error: {e}\")\n\n def publish(self, topic, data):\n \
\ try:\n kafka_response = self.producer.send(topic,\ \ try:\n # kafka_response = self.producer.send(topic,\
\ data)\n self.producer.flush()\n logger.debug(f\"\ \ data)\n # self.producer.flush()\n # logger.debug(f\"\
\ Message sent to kafka with response: {kafka_response}\")\n \ \ Message sent to kafka with response: {kafka_response}\")\n \
\ return True\n except Exception as e:\n logger.error(e)\n\ \ return True\n except Exception as e:\n logger.error(e)\n\
\ return False\n\n class KairosWriter(KafkaProducerUtil):\n\ \ return False\n\n class KairosWriter(KafkaProducerUtil):\n\
...@@ -1326,9 +1326,9 @@ spec: ...@@ -1326,9 +1326,9 @@ spec:
\ panel_id=each_panel)\n\ \ panel_id=each_panel)\n\
\ logger.debug(f'data push for {inv_id} has been completed\ \ logger.debug(f'data push for {inv_id} has been completed\
\ !')\n except Exception as e:\n logger.exception(f'\ \ !')\n except Exception as e:\n logger.exception(f'\
\ Exception - {e}')\n\n final_dict = {\"plant_efficiency_dict\": plant_efficiency_dict.to_dict(orient=\"\ \ Exception - {e}')\n\n final_dict = {\"plant_efficiency_dict\": plant_efficiency_dict,\n\
records\"),\n \"df_inv\": df_inv.to_dict(orient=\"records\"\ \ \"df_inv\": df_inv.to_dict(orient=\"records\")}\n \
)}\n with open(output_path, 'w') as f:\n json.dump(final_dict,\ \ with open(output_path, 'w') as f:\n json.dump(final_dict,\
\ f)\n print(final_dict)\n except Exception as e:\n logger.exception(f'Exception\ \ f)\n print(final_dict)\n except Exception as e:\n logger.exception(f'Exception\
\ - {e}')\n\nimport argparse\n_parser = argparse.ArgumentParser(prog='Inv\ \ - {e}')\n\nimport argparse\n_parser = argparse.ArgumentParser(prog='Inv\
\ and mppt level efficiency', description='')\n_parser.add_argument(\"--get-tags-component-output\"\ \ and mppt level efficiency', description='')\n_parser.add_argument(\"--get-tags-component-output\"\
...@@ -1395,7 +1395,7 @@ spec: ...@@ -1395,7 +1395,7 @@ spec:
path: /tmp/inputs/get_tags_component_output/data path: /tmp/inputs/get_tags_component_output/data
metadata: metadata:
annotations: annotations:
pipelines.kubeflow.org/component_ref: '{"digest": "76cdc18343fc8f2d2384dc338e572397ecf49f6825956085f266fff74b8284c3", pipelines.kubeflow.org/component_ref: '{"digest": "5212de399f87607aa03889b9b29eceecf2208db04603ab23c09a7fe7c00ba189",
"url": "transform_components/inv_and_mppt_level_efficiency/component.yml"}' "url": "transform_components/inv_and_mppt_level_efficiency/component.yml"}'
pipelines.kubeflow.org/component_spec: '{"implementation": {"container": {"args": pipelines.kubeflow.org/component_spec: '{"implementation": {"container": {"args":
["--get-tags-component-output", {"inputPath": "get_tags_component_output"}, ["--get-tags-component-output", {"inputPath": "get_tags_component_output"},
...@@ -1711,9 +1711,9 @@ spec: ...@@ -1711,9 +1711,9 @@ spec:
KafkaProducer(\n bootstrap_servers=kafka_broker,\n value_serializer=lambda KafkaProducer(\n bootstrap_servers=kafka_broker,\n value_serializer=lambda
v: v.encode(''utf-8''),\n api_version=(0, 10, 1))\n self.producer.flush()\n except v: v.encode(''utf-8''),\n api_version=(0, 10, 1))\n self.producer.flush()\n except
Exception as e:\n logger.error(f\"Kafka connection error: Exception as e:\n logger.error(f\"Kafka connection error:
{e}\")\n\n def publish(self, topic, data):\n try:\n kafka_response {e}\")\n\n def publish(self, topic, data):\n try:\n #
= self.producer.send(topic, data)\n self.producer.flush()\n logger.debug(f\" kafka_response = self.producer.send(topic, data)\n # self.producer.flush()\n #
Message sent to kafka with response: {kafka_response}\")\n return logger.debug(f\" Message sent to kafka with response: {kafka_response}\")\n return
True\n except Exception as e:\n logger.error(e)\n return True\n except Exception as e:\n logger.error(e)\n return
False\n\n class KairosWriter(KafkaProducerUtil):\n\n def write_data(self, False\n\n class KairosWriter(KafkaProducerUtil):\n\n def write_data(self,
data_json, topic):\n site_id = \"site_101\"\n logger.debug(f\"Data data_json, topic):\n site_id = \"site_101\"\n logger.debug(f\"Data
...@@ -1928,7 +1928,7 @@ spec: ...@@ -1928,7 +1928,7 @@ spec:
predictions=predictions,\n inv_id=inv_id,\n plant_efficiency_dict=plant_efficiency_dict,\n df_inv_tags=df_inv_tags,\n inv_level_efficiency_tags=inv_level_efficiency_tags,\n panel_id=each_panel)\n logger.debug(f''data predictions=predictions,\n inv_id=inv_id,\n plant_efficiency_dict=plant_efficiency_dict,\n df_inv_tags=df_inv_tags,\n inv_level_efficiency_tags=inv_level_efficiency_tags,\n panel_id=each_panel)\n logger.debug(f''data
push for {inv_id} has been completed !'')\n except Exception push for {inv_id} has been completed !'')\n except Exception
as e:\n logger.exception(f'' Exception - {e}'')\n\n final_dict as e:\n logger.exception(f'' Exception - {e}'')\n\n final_dict
= {\"plant_efficiency_dict\": plant_efficiency_dict.to_dict(orient=\"records\"),\n \"df_inv\": = {\"plant_efficiency_dict\": plant_efficiency_dict,\n \"df_inv\":
df_inv.to_dict(orient=\"records\")}\n with open(output_path, ''w'') df_inv.to_dict(orient=\"records\")}\n with open(output_path, ''w'')
as f:\n json.dump(final_dict, f)\n print(final_dict)\n except as f:\n json.dump(final_dict, f)\n print(final_dict)\n except
Exception as e:\n logger.exception(f''Exception - {e}'')\n\nimport Exception as e:\n logger.exception(f''Exception - {e}'')\n\nimport
......
...@@ -662,9 +662,9 @@ implementation: ...@@ -662,9 +662,9 @@ implementation:
def publish(self, topic, data): def publish(self, topic, data):
try: try:
kafka_response = self.producer.send(topic, data) # kafka_response = self.producer.send(topic, data)
self.producer.flush() # self.producer.flush()
logger.debug(f" Message sent to kafka with response: {kafka_response}") # logger.debug(f" Message sent to kafka with response: {kafka_response}")
return True return True
except Exception as e: except Exception as e:
logger.error(e) logger.error(e)
...@@ -1067,7 +1067,7 @@ implementation: ...@@ -1067,7 +1067,7 @@ implementation:
except Exception as e: except Exception as e:
logger.exception(f' Exception - {e}') logger.exception(f' Exception - {e}')
final_dict = {"plant_efficiency_dict": plant_efficiency_dict.to_dict(orient="records"), final_dict = {"plant_efficiency_dict": plant_efficiency_dict,
"df_inv": df_inv.to_dict(orient="records")} "df_inv": df_inv.to_dict(orient="records")}
with open(output_path, 'w') as f: with open(output_path, 'w') as f:
json.dump(final_dict, f) json.dump(final_dict, f)
......
...@@ -629,9 +629,9 @@ def inv_and_mppt_level_efficiency(get_tags_component_output: InputPath(), ...@@ -629,9 +629,9 @@ def inv_and_mppt_level_efficiency(get_tags_component_output: InputPath(),
def publish(self, topic, data): def publish(self, topic, data):
try: try:
kafka_response = self.producer.send(topic, data) # kafka_response = self.producer.send(topic, data)
self.producer.flush() # self.producer.flush()
logger.debug(f" Message sent to kafka with response: {kafka_response}") # logger.debug(f" Message sent to kafka with response: {kafka_response}")
return True return True
except Exception as e: except Exception as e:
logger.error(e) logger.error(e)
...@@ -1033,8 +1033,8 @@ def inv_and_mppt_level_efficiency(get_tags_component_output: InputPath(), ...@@ -1033,8 +1033,8 @@ def inv_and_mppt_level_efficiency(get_tags_component_output: InputPath(),
logger.debug(f'data push for {inv_id} has been completed !') logger.debug(f'data push for {inv_id} has been completed !')
except Exception as e: except Exception as e:
logger.exception(f' Exception - {e}') logger.exception(f' Exception - {e}')
df_inv['datetime'] = df_inv['datetime'].dt.strftime('%Y-%m-%d %H:%M:%S')
final_dict = {"plant_efficiency_dict": plant_efficiency_dict.to_dict(orient="records"), final_dict = {"plant_efficiency_dict": plant_efficiency_dict,
"df_inv": df_inv.to_dict(orient="records")} "df_inv": df_inv.to_dict(orient="records")}
with open(output_path, 'w') as f: with open(output_path, 'w') as f:
json.dump(final_dict, f) json.dump(final_dict, f)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment