Update push_data_postgres

parent 10157df8
......@@ -4,18 +4,18 @@ import pandas as pd
import os
import re
from pathlib import Path
import webbrowser
# import webbrowser
from sqlalchemy import create_engine
import io
import logging
postgres_database = os.environ.get('postgres_database', default='ilens_jubilant')
postgre_user = os.environ.get('postgre_user', default='ilens')
postgre_password = os.environ.get('postgre_password', default='ilens#4321')
postgre_host = os.environ.get('postgre_host', default='192.168.0.207')
postgre_port = os.environ.get('postgre_port', default='5433')
job_id = os.environ.get('JOB_ID', 'job_1')
dir = os.environ.get('LOCAL_PATH', 'ilens/prod/jubilant')
postgre_password = os.environ.get('postgre_password', default='iLens#4321')
postgre_host = os.environ.get('postgre_host', default='postgres-db-service.ilens-infra.svc.cluster.local')
postgre_port = os.environ.get('postgre_port', default='5432')
job_id = os.environ.get('JOB_ID', 'job_6')
dir = os.environ.get('LOCAL_PATH', '/mnt/ilens/prod/jubilant')
directory_path = os.path.join(dir, job_id)
directory = directory_path
......@@ -23,6 +23,11 @@ processed_dir = os.path.join(directory, 'processed')
if not os.path.exists(processed_dir):
os.makedirs(processed_dir)
current_activity = 'ingested'
success_status = 'Success'
failed_status = 'Failed'
skipped_status = 'Skipped'
# directory = Path(directory_path)
......@@ -47,6 +52,19 @@ class PushDataToPostgres():
print(f"Failed to connect to postgres : {str(e)}")
raise Exception(str(e))
def update_status(self, filename, status, error_message=None):
try:
filename = filename.split('.csv')[0]
if error_message is None:
query = f'''UPDATE audit_data_table SET status = '{status}', current_activity = '{current_activity}' WHERE filename = '{filename}' and job_id = '{job_id}';'''
else:
query = f'''UPDATE audit_data_table SET status = '{status}', error_message = '{error_message}' WHERE filename = '{filename}' and job_id = '{job_id}';'''
print(query)
self.cur.execute(query)
self.conn.commit()
except Exception as e:
print(f'Failed to update status:{str(e)}')
def push_postgres(self):
try:
print(f"Directory content: {os.listdir(directory)}")
......@@ -74,15 +92,6 @@ class PushDataToPostgres():
# ------------------------------------------------
try:
logging.info("Reading the file from local path to data frame")
df = pd.read_csv(file_path, error_bad_lines=False)
df.to_csv(dir_path_value, index=False)
except Exception as file_read_error:
error = f"Failed to read the file"
print(f"Failed to read the file : {file_read_error}")
pass
# -------------------------------------------------
if os.path.isfile(dir_path_value) and os.path.exists(dir_path_value):
df = pd.read_csv(file_path, error_bad_lines=False)
columns_data = list(df.columns)
for data in columns_data:
......@@ -100,73 +109,23 @@ class PushDataToPostgres():
filename.append(i)
logging.info("Pushing the data to postgres")
filename = '_'.join(map(str, filename))
filename = filename.lower()
df.head(0).to_sql(filename, self.engine, if_exists='append', index=False)
table_name = filename.lower()
df.head(0).to_sql(table_name, self.engine, if_exists='append', index=False)
conn = self.engine.raw_connection()
cur = conn.cursor()
output = io.StringIO()
df.to_csv(output, sep='\t', header=False, index=False)
output.seek(0)
contents = output.getvalue()
cur.copy_from(output, filename, null="")
conn.commit()
print('Success')
split_by_hif_data = []
files_dataa = each_file.split('_')
for i in files_dataa:
s_data = i.split('-')
if len(s_data) > 1:
for j in s_data:
split_by_hif_data.append(j)
split_by_hif_data_final = ''.join(map(str, split_by_hif_data))
files_final_data = (files_dataa[0] + '_' + files_dataa[1]).lower()
final_files_dataa = files_final_data + '_' + split_by_hif_data_final
cur.execute("SELECT filename from audit_data_table")
row = cur.fetchall()
col_list_data = []
for i in row:
col_list_data.append(i[0])
if final_files_dataa in col_list_data:
cur.execute( "SELECT filename, error_message from audit_data_table where filename = '{}'".format(final_files_dataa))
row = cur.fetchall()
file_name_list = []
file_status_list = []
for i in row:
file_name_list.append(i[0])
file_status_list.append(i[1])
final_updated_dict = dict(zip(file_name_list, file_status_list))
for key, value in final_updated_dict.items():
if value == None:
logging.info("Updating the status of the file", final_files_dataa)
cur.execute('''UPDATE audit_data_table SET status = 'Success' WHERE filename = '{}';'''.format(final_files_dataa))
conn.commit()
else:
conn = self.engine.raw_connection()
cur = conn.cursor()
split_by_hif_data = []
files_dataa = each_file.split('_')
for i in files_dataa:
s_data = i.split('-')
if len(s_data) > 1:
for j in s_data:
split_by_hif_data.append(j)
split_by_hif_data_final = ''.join(map(str, split_by_hif_data))
files_final_data = (files_dataa[0] + '_' + files_dataa[1]).lower()
final_files_dataa = files_final_data + '_' + split_by_hif_data_final
cur.execute("SELECT filename, error_message from audit_data_table where filename = '{}'".format(final_files_dataa))
row = cur.fetchall()
file_name_list = []
file_status_list = []
for i in row:
file_name_list.append(i[0])
file_status_list.append(i[1])
final_updated_dict = dict(zip(file_name_list, file_status_list))
for key, value in final_updated_dict.items():
if value == None:
cur.execute('''UPDATE audit_data_table SET error_message = '{}' WHERE filename = '{}';'''.format(error, final_files_dataa))
conn.commit()
self.update_status(filename=each_file, status=success_status)
except Exception as file_read_error:
error = f"Failed to read the file"
print(f"Failed to read the file : {file_read_error}")
self.update_status(filename=each_file, status=failed_status, error_message=error)
pass
except Exception as e:
print(f"Failed to process files : {str(e)}")
print(f'Failed to ingest Data:{str(e)}')
push_data_postgres = PushDataToPostgres()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment