Update read_azure_data

parent a10ff04f
......@@ -146,13 +146,13 @@ class AzureBlobDownloaderHistorical:
final_updated_dict = dict(zip(file_name_list,file_status_list))
for key, value in final_updated_dict.items():
if value != 'Success' or value != 'Skipped':
cur.execute('insert into audit_data_table(start_date, filename) values(%s, %s)',(date_ts, filename))
cur.execute('insert into audit_data_table(start_date, filename) values(%s, %s)',(dt, filename))
conn.commit()
return True
elif value == 'Success' or value == 'Skipped':
status = 'Skipped'
cur.execute('insert into audit_data_table(start_date, filename, status) values(%s, %s, %s)',
(date_ts, filename, status))
(dt, filename, status))
conn.commit()
else:
return False
......@@ -198,7 +198,7 @@ class AzureBlobDownloader:
current_date = filename_new
final_name = final_file_name+'_'+current_date
if current_date == today_date:
validate_postgres = self.check_audit_postgres(date_ts, final_name)
validate_postgres = self.check_audit_postgres(dt, final_name)
if validate_postgres:
download_file_path = os.path.join(LOCAL_BLOB_PATH, job_id, file_name)
os.makedirs(os.path.dirname(download_file_path), exist_ok=True)
......@@ -209,7 +209,7 @@ class AzureBlobDownloader:
except Exception as e:
logging.error(e)
def check_audit_postgres(self, date_ts, filename):
def check_audit_postgres(self, dt, filename):
try:
conn = psycopg2.connect(
host=postgre_host,
......@@ -222,7 +222,7 @@ class AzureBlobDownloader:
cur.execute("SELECT start_date, filename from audit_data_table")
row = cur.fetchall()
if not row:
cur.execute('insert into audit_data_table(start_date, filename) values(%s, %s)', (date_ts, filename))
cur.execute('insert into audit_data_table(start_date, filename) values(%s, %s)', (dt, filename))
conn.commit()
logging.info("Record Inserted Successfully")
return True
......@@ -233,7 +233,7 @@ class AzureBlobDownloader:
for records in row:
column_list.append(records[0])
if filename not in column_list:
cur.execute('insert into audit_data_table(start_date, filename) values(%s, %s)', (date_ts, filename))
cur.execute('insert into audit_data_table(start_date, filename) values(%s, %s)', (dt, filename))
conn.commit()
logging.info("Record Inserted Successfully")
return True
......@@ -249,12 +249,12 @@ class AzureBlobDownloader:
for key, value in final_updated_dict.items():
if key == filename:
if value != 'Success' and value != 'Skipped':
cur.execute('insert into audit_data_table(start_date, filename) values(%s, %s)',(date_ts, filename))
cur.execute('insert into audit_data_table(start_date, filename) values(%s, %s)',(dt, filename))
conn.commit()
return True
elif value == 'Success' or value == 'Skipped':
status = 'Skipped'
cur.execute('insert into audit_data_table(start_date, filename, status) values(%s, %s, %s)',(date_ts, filename, status))
cur.execute('insert into audit_data_table(start_date, filename, status) values(%s, %s, %s)',(dt, filename, status))
conn.commit()
else:
return False
......@@ -285,3 +285,4 @@ if start_date != '' and end_date != '':
else:
azure_blob_file_downloader = AzureBlobDownloader()
azure_blob_file_downloader.download_all_blobs_in_container()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment