Add new file

parent b3830203
import os.path
import psycopg2
import pandas as pd
import os
import re
from pathlib import Path
import webbrowser
from sqlalchemy import create_engine
import io
import logging
postgres_database = os.environ.get('postgres_database', default='ilens_jubilant')
postgre_user = os.environ.get('postgre_user', default='ilens')
postgre_password = os.environ.get('postgre_password', default='ilens#4321')
postgre_host = os.environ.get('postgre_host', default='192.168.0.207')
postgre_port = os.environ.get('postgre_port', default='5433')
job_id = os.environ.get('JOB_ID', 'job_1')
dir = os.environ.get('LOCAL_PATH', 'ilens/prod/jubilant')
directory_path = os.path.join(dir, job_id)
directory = directory_path
processed_dir = os.path.join(directory, 'processed')
if not os.path.exists(processed_dir):
os.makedirs(processed_dir)
# directory = Path(directory_path)
class PushDataToPostgres():
def __init__(self):
try:
print("Connecting to postgres .......")
self.engine_data = 'postgresql+psycopg2://{}:{}@{}:{}/{}'.format(postgre_user, postgre_password,
postgre_host,
postgre_port, postgres_database)
self.engine = create_engine(self.engine_data)
self.conn = psycopg2.connect(
host=postgre_host,
database=postgres_database,
user=postgre_user,
password=postgre_password,
port=postgre_port)
logging.info("Connected to postgres successfully")
self.cur = self.conn.cursor()
except Exception as e:
print(f"Failed to connect to postgres : {str(e)}")
raise Exception(str(e))
def push_postgres(self):
try:
print(f"Directory content: {os.listdir(directory)}")
dir_list = []
for each_dir in os.listdir(directory):
if each_dir == 'processed':
continue
filename = each_dir.split('.')
if len(filename) == 1:
dir_list.append(each_dir)
dir_list.sort(key=lambda test_string: list(map(int, re.findall(r'\d+', test_string)))[0])
print(f"Directory after sorting : {dir_list}")
# Sort complete
for each_dir in dir_list:
dir_path = Path(os.path.join(directory, each_dir))
for each_file in os.listdir(dir_path):
file_path = os.path.join(dir_path, each_file)
print("Each file ---> ", file_path)
error = ''
if each_file.endswith('.csv'):
if os.path.isfile(file_path):
dir_path_value = Path(os.path.join(processed_dir, each_file))
# ------------------------------------------------
try:
logging.info("Reading the file from local path to data frame")
df = pd.read_csv(file_path, error_bad_lines=False)
df.to_csv(dir_path_value, index=False)
except Exception as file_read_error:
error = f"Failed to read the file"
print(f"Failed to read the file : {file_read_error}")
pass
# -------------------------------------------------
if os.path.isfile(dir_path_value) and os.path.exists(dir_path_value):
df = pd.read_csv(file_path, error_bad_lines=False)
columns_data = list(df.columns)
for data in columns_data:
data_date = data.split(' ')
if 'Date' in data_date:
final_date = ' '.join(map(str, data_date))
df[final_date] = pd.to_datetime(df[final_date])
filename_list = each_file.split('_')
filename = []
for i in filename_list:
m_list = i.split('-')
n_list = i.split('.')
if len(m_list) == 1 and len(n_list) == 1:
filename.append(i)
logging.info("Pushing the data to postgres")
filename = '_'.join(map(str, filename))
filename = filename.lower()
df.head(0).to_sql(filename, self.engine, if_exists='append', index=False)
conn = self.engine.raw_connection()
cur = conn.cursor()
output = io.StringIO()
df.to_csv(output, sep='\t', header=False, index=False)
output.seek(0)
contents = output.getvalue()
cur.copy_from(output, filename, null="")
conn.commit()
print('Success')
split_by_hif_data = []
files_dataa = each_file.split('_')
for i in files_dataa:
s_data = i.split('-')
if len(s_data) > 1:
for j in s_data:
split_by_hif_data.append(j)
split_by_hif_data_final = ''.join(map(str, split_by_hif_data))
files_final_data = (files_dataa[0] + '_' + files_dataa[1]).lower()
final_files_dataa = files_final_data + '_' + split_by_hif_data_final
cur.execute("SELECT filename from audit_data_table")
row = cur.fetchall()
col_list_data = []
for i in row:
col_list_data.append(i[0])
if final_files_dataa in col_list_data:
cur.execute( "SELECT filename, error_message from audit_data_table where filename = '{}'".format(final_files_dataa))
row = cur.fetchall()
file_name_list = []
file_status_list = []
for i in row:
file_name_list.append(i[0])
file_status_list.append(i[1])
final_updated_dict = dict(zip(file_name_list, file_status_list))
for key, value in final_updated_dict.items():
if value == None:
logging.info("Updating the status of the file", final_files_dataa)
cur.execute('''UPDATE audit_data_table SET status = 'Success' WHERE filename = '{}';'''.format(final_files_dataa))
conn.commit()
else:
conn = self.engine.raw_connection()
cur = conn.cursor()
split_by_hif_data = []
files_dataa = each_file.split('_')
for i in files_dataa:
s_data = i.split('-')
if len(s_data) > 1:
for j in s_data:
split_by_hif_data.append(j)
split_by_hif_data_final = ''.join(map(str, split_by_hif_data))
files_final_data = (files_dataa[0] + '_' + files_dataa[1]).lower()
final_files_dataa = files_final_data + '_' + split_by_hif_data_final
cur.execute("SELECT filename, error_message from audit_data_table where filename = '{}'".format(final_files_dataa))
row = cur.fetchall()
file_name_list = []
file_status_list = []
for i in row:
file_name_list.append(i[0])
file_status_list.append(i[1])
final_updated_dict = dict(zip(file_name_list, file_status_list))
for key, value in final_updated_dict.items():
if value == None:
cur.execute('''UPDATE audit_data_table SET error_message = '{}' WHERE filename = '{}';'''.format(error, final_files_dataa))
conn.commit()
except Exception as e:
print(f"Failed to process files : {str(e)}")
push_data_postgres = PushDataToPostgres()
push_data_postgres.push_postgres()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment