Update read_azure_data

parent 94da76cb
from azure.storage.blob import BlobServiceClient from azure.storage.blob import BlobServiceClient
import os.path import os.path
import pandas as pd
from datetime import datetime, timedelta from datetime import datetime, timedelta
from sqlalchemy import create_engine
from datetime import date from datetime import date
from pathlib import Path from pathlib import Path
import psycopg2 import psycopg2
...@@ -9,59 +11,43 @@ import logging ...@@ -9,59 +11,43 @@ import logging
postgres_database = os.environ.get('postgres_database', default='ilens_jubilant') postgres_database = os.environ.get('postgres_database', default='ilens_jubilant')
postgre_user = os.environ.get('postgre_user', default='ilens') postgre_user = os.environ.get('postgre_user', default='ilens')
postgre_password = os.environ.get('postgre_password', default='ilens#4321') postgre_password = os.environ.get('postgre_password', default='iLens@123')
postgre_host = os.environ.get('postgre_host', default='192.168.0.207') postgre_host = os.environ.get('postgre_host', default='postgres-db-service.ilens-infra.svc.cluster.local')
postgre_port = os.environ.get('postgre_port', default='5433') postgre_port = os.environ.get('postgre_port', default='5432')
connection_string = os.environ.get('connection_string', default="DefaultEndpointsProtocol=https;AccountName=azrabsjililens01;AccountKey=OEsYFDThcDilcJSGO7dy8HhZwsuxTCmV231s2UKjAuKaQYClUwX4RH84J1FQM1bLlauS2tDqutyRzbaZsywN4w==;EndpointSuffix=core.windows.net") connection_string = os.environ.get('connection_string',
default="DefaultEndpointsProtocol=https;AccountName=azrabsjililens01;AccountKey=OEsYFDThcDilcJSGO7dy8HhZwsuxTCmV231s2UKjAuKaQYClUwX4RH84J1FQM1bLlauS2tDqutyRzbaZsywN4w==;EndpointSuffix=core.windows.net")
blob_container = os.environ.get('blob_container', 'jubilant') blob_container = os.environ.get('blob_container', 'jubilant')
job_id = os.environ.get('JOB_ID', 'job_1') job_id = os.environ.get('JOB_ID', 'job_6')
LOCAL_BLOB_PATH = os.environ.get('LOCAL_PATH','ilens/prod/jubilant') LOCAL_BLOB_PATH = os.environ.get('LOCAL_PATH', '/mnt/ilens/prod/jubilant')
# start_date = os.environ.get('start_date', '2021/07/06')
# end_date = os.environ.get('end_date', '2021/07/09')
start_date = os.environ.get('start_date', '') start_date = os.environ.get('start_date', '')
end_date = os.environ.get('end_date', '') end_date = os.environ.get('end_date', '')
date_lis = []
if start_date != '' and end_date != '': if start_date != '' and end_date != '':
start_dte = str(start_date) date_list = pd.date_range(start=start_date,
end_dte = str(end_date) end=end_date, freq='1d')
start_ep = datetime.strptime(start_dte, '%Y/%m/%d') date_range = [str(each_day).split(' ')[0] for each_day in date_list]
end_ep = datetime.strptime(end_dte, '%Y/%m/%d') for i in date_range:
step = timedelta(days=1) k = i.split('-')
k_1 = k[::-1]
x = [] p = ''.join(map(str, k_1))
while start_ep <= end_ep: date_lis.append(p)
x.append(str(start_ep)) start = datetime.strptime(date_range[0], "%Y-%m-%d").strftime('%d%m%Y')
start_ep += step end = datetime.strptime(date_range[-1], "%Y-%m-%d").strftime('%d%m%Y')
y = []
for i in x:
c = i.split(' ')
y.append(c[0])
z = []
for i in y:
s = i.split('-')
s = s[::-1]
k = ''.join(map(str,s))
z.append(k)
start_date_list = start_date.split('/') today_date = datetime.now().strftime('%d%m%Y')
end_date_list = end_date.split('/')
start_list_dt = start_date_list[::-1]
end_list_dt = end_date_list[::-1]
start = ''.join(map(str, start_list_dt))
end = ''.join(map(str,end_list_dt))
today = date.today()
today = str(today)
today_list = today.split('-')
today_list = today_list[::-1]
today_date = ''.join(map(str, today_list))
date_ts = today
dt = datetime.now() dt = datetime.now()
class AzureBlobDownloaderHistorical: class AzureBlobDownloaderHistorical:
def __init__(self, my_connection_string=connection_string, my_blob_container=blob_container): def __init__(self, my_connection_string=connection_string, my_blob_container=blob_container):
self.engine_data = 'postgresql+psycopg2://{}:{}@{}:{}/{}'.format(postgre_user, postgre_password,
postgre_host,
postgre_port, postgres_database)
self.engine = create_engine(self.engine_data)
print("Intializing AzureBlobFileDownloader") print("Intializing AzureBlobFileDownloader")
self.blob_service_client = BlobServiceClient.from_connection_string(my_connection_string) self.blob_service_client = BlobServiceClient.from_connection_string(my_connection_string)
self.my_container = self.blob_service_client.get_container_client(my_blob_container) self.my_container = self.blob_service_client.get_container_client(my_blob_container)
...@@ -70,39 +56,57 @@ class AzureBlobDownloaderHistorical: ...@@ -70,39 +56,57 @@ class AzureBlobDownloaderHistorical:
try: try:
filename = file_name.split('/') filename = file_name.split('/')
filename_val = filename[0] filename_val = filename[0]
filename_for_tab = filename[1]
file_value = filename_val.split('_') file_value = filename_val.split('_')
filename_new = file_value[1] filename_new = file_value[1]
file_name_data = filename[1] file_name_data = filename[1]
file_name_data_val = file_name_data.split('.') file_name_data_val = file_name_data.split('.')
file_val = file_name_data_val[0] file_val = file_name_data_val[0]
final_file = file_val.split('_') final_file = file_val.split('_')
final_file_name = (final_file[0]+'_'+final_file[1]).lower() final_file_name = (final_file[0] + '_' + final_file[1]).lower()
current_date = filename_new current_date = filename_new
final_name = final_file_name+'_'+current_date final_name = final_file_name + '_' + current_date
if start == end: if start == end:
if start == current_date and end == current_date: if start == current_date and end == current_date:
validate_postgres = self.check_audit_postgres(dt, final_name) validate_postgres = self.check_audit_postgres(dt, filename_for_tab.split('.csv')[0])
if validate_postgres: if validate_postgres:
download_file_path = os.path.join(LOCAL_BLOB_PATH, job_id, file_name) try:
os.makedirs(os.path.dirname(download_file_path), exist_ok=True) download_file_path = os.path.join(LOCAL_BLOB_PATH, job_id, file_name)
print('downloading', download_file_path) os.makedirs(os.path.dirname(download_file_path), exist_ok=True)
logging.info("Downloading", file_name) print('downloading', download_file_path)
with open(download_file_path, "wb") as file: logging.info("Downloading", file_name)
file.write(file_content) with open(download_file_path, "wb") as file:
file.write(file_content)
conn = self.engine.raw_connection()
cur = conn.cursor()
curr_act = 'Downloaded'
cur.execute('''UPDATE audit_data_table SET current_activity = '{}' WHERE filename = '{}';'''.format(curr_act, filename_for_tab.split('.csv')[0]))
conn.commit()
except Exception as e:
print(e)
else: else:
file_start_val = file_name.split('/') file_start_val = file_name.split('/')
file_dat = file_start_val[0] file_dat = file_start_val[0]
file_dat_val = file_dat.split('_') file_dat_val = file_dat.split('_')
if file_dat_val[1] in z: if file_dat_val[1] in date_lis:
validate_postgres = self.check_audit_postgres(dt, final_name) validate_postgres = self.check_audit_postgres(dt, filename_for_tab.split('.csv')[0])
if validate_postgres: if validate_postgres:
download_file_path = os.path.join(LOCAL_BLOB_PATH, job_id, file_name) try:
print(file_name) download_file_path = os.path.join(LOCAL_BLOB_PATH, job_id, file_name)
os.makedirs(os.path.dirname(download_file_path), exist_ok=True) print(file_name)
print('downloading', download_file_path) os.makedirs(os.path.dirname(download_file_path), exist_ok=True)
logging.info("Downloading", file_name) print('downloading', download_file_path)
with open(download_file_path, "wb") as file: logging.info("Downloading", file_name)
file.write(file_content) with open(download_file_path, "wb") as file:
file.write(file_content)
conn = self.engine.raw_connection()
cur = conn.cursor()
curr_act = 'Downloaded'
cur.execute('''UPDATE audit_data_table SET current_activity = '{}' WHERE filename = '{}';'''.format(curr_act, filename_for_tab.split('.csv')[0]))
conn.commit()
except Exception as e:
print(e)
except Exception as e: except Exception as e:
print(e) print(e)
logging.error(e) logging.error(e)
...@@ -117,10 +121,10 @@ class AzureBlobDownloaderHistorical: ...@@ -117,10 +121,10 @@ class AzureBlobDownloaderHistorical:
port=postgre_port) port=postgre_port)
logging.info("Connected to postgres sucessfully") logging.info("Connected to postgres sucessfully")
cur = conn.cursor() cur = conn.cursor()
cur.execute("SELECT date, filename from audit_data_table") cur.execute("SELECT start_date, filename from audit_data_table")
row = cur.fetchall() row = cur.fetchall()
if not row: if not row:
cur.execute('insert into audit_data_table(start_date, filename) values(%s, %s)', (dt, filename)) cur.execute('insert into audit_data_table(start_date, filename, job_id) values(%s, %s, %s)', (dt, filename, job_id))
conn.commit() conn.commit()
print("Record Inserted Successfully") print("Record Inserted Successfully")
return True return True
...@@ -131,10 +135,10 @@ class AzureBlobDownloaderHistorical: ...@@ -131,10 +135,10 @@ class AzureBlobDownloaderHistorical:
for records in row: for records in row:
column_list.append(records[0]) column_list.append(records[0])
if filename not in column_list: if filename not in column_list:
cur.execute('insert into audit_data_table(start_date, filename) values(%s, %s)', (dt, filename)) cur.execute('insert into audit_data_table(start_date, filename, job_id) values(%s, %s, %s)', (dt, filename, job_id))
conn.commit() conn.commit()
print("Record Inserted Successfully") print("Record Inserted Successfully")
return True return True
else: else:
cur.execute("SELECT filename, status from audit_data_table") cur.execute("SELECT filename, status from audit_data_table")
row = cur.fetchall() row = cur.fetchall()
...@@ -143,19 +147,19 @@ class AzureBlobDownloaderHistorical: ...@@ -143,19 +147,19 @@ class AzureBlobDownloaderHistorical:
for i in row: for i in row:
file_name_list.append(i[0]) file_name_list.append(i[0])
file_status_list.append(i[1]) file_status_list.append(i[1])
final_updated_dict = dict(zip(file_name_list,file_status_list)) final_updated_dict = dict(zip(file_name_list, file_status_list))
for key, value in final_updated_dict.items(): for key, value in final_updated_dict.items():
if value != 'Success' or value != 'Skipped': if key == filename:
cur.execute('insert into audit_data_table(start_date, filename) values(%s, %s)',(dt, filename)) if value != 'Success' and value != 'Skipped':
conn.commit() cur.execute('insert into audit_data_table(start_date, filename, job_id) values(%s, %s, %s)',(dt, filename, job_id))
return True conn.commit()
elif value == 'Success' or value == 'Skipped': return True
status = 'Skipped' elif value == 'Success' or value == 'Skipped':
cur.execute('insert into audit_data_table(start_date, filename, status) values(%s, %s, %s)', status = 'Skipped'
(dt, filename, status)) cur.execute('insert into audit_data_table(start_date, filename, status, job_id) values(%s, %s, %s, %s)',(dt, filename, status, job_id))
conn.commit() conn.commit()
else: else:
return False return False
except Exception as e: except Exception as e:
print(e) print(e)
logging.error(e) logging.error(e)
...@@ -169,7 +173,7 @@ class AzureBlobDownloaderHistorical: ...@@ -169,7 +173,7 @@ class AzureBlobDownloaderHistorical:
blobby = blob.name blobby = blob.name
if blobby.endswith('.csv'): if blobby.endswith('.csv'):
counter += 1 counter += 1
blob_name = str(counter)+'_'+blobby blob_name = str(counter) + '_' + blobby
logging.info('blob_data', blob_name) logging.info('blob_data', blob_name)
bytes = self.my_container.get_blob_client(blob).download_blob().readall() bytes = self.my_container.get_blob_client(blob).download_blob().readall()
self.save_blob(blob_name, bytes) self.save_blob(blob_name, bytes)
...@@ -180,6 +184,10 @@ class AzureBlobDownloaderHistorical: ...@@ -180,6 +184,10 @@ class AzureBlobDownloaderHistorical:
class AzureBlobDownloader: class AzureBlobDownloader:
def __init__(self, my_connection_string=connection_string, my_blob_container=blob_container): def __init__(self, my_connection_string=connection_string, my_blob_container=blob_container):
self.engine_data = 'postgresql+psycopg2://{}:{}@{}:{}/{}'.format(postgre_user, postgre_password,
postgre_host,
postgre_port, postgres_database)
self.engine = create_engine(self.engine_data)
print("Intializing AzureBlobFileDownloader") print("Intializing AzureBlobFileDownloader")
self.blob_service_client = BlobServiceClient.from_connection_string(my_connection_string) self.blob_service_client = BlobServiceClient.from_connection_string(my_connection_string)
self.my_container = self.blob_service_client.get_container_client(my_blob_container) self.my_container = self.blob_service_client.get_container_client(my_blob_container)
...@@ -188,24 +196,33 @@ class AzureBlobDownloader: ...@@ -188,24 +196,33 @@ class AzureBlobDownloader:
try: try:
filename = file_name.split('/') filename = file_name.split('/')
filename_val = filename[0] filename_val = filename[0]
filename_for_tab = filename[1]
file_value = filename_val.split('_') file_value = filename_val.split('_')
filename_new = file_value[1] filename_new = file_value[1]
file_name_data = filename[1] file_name_data = filename[1]
file_name_data_val = file_name_data.split('.') file_name_data_val = file_name_data.split('.')
file_val = file_name_data_val[0] file_val = file_name_data_val[0]
final_file = file_val.split('_') final_file = file_val.split('_')
final_file_name = (final_file[0]+'_'+final_file[1]).lower() final_file_name = (final_file[0] + '_' + final_file[1]).lower()
current_date = filename_new current_date = filename_new
final_name = final_file_name+'_'+current_date final_name = final_file_name + '_' + current_date
if current_date == today_date: if current_date == today_date:
validate_postgres = self.check_audit_postgres(dt, final_name) validate_postgres = self.check_audit_postgres(dt, filename_for_tab.split('.csv')[0])
if validate_postgres: if validate_postgres:
download_file_path = os.path.join(LOCAL_BLOB_PATH, job_id, file_name) try:
os.makedirs(os.path.dirname(download_file_path), exist_ok=True) download_file_path = os.path.join(LOCAL_BLOB_PATH, job_id, file_name)
print('downloading', download_file_path) os.makedirs(os.path.dirname(download_file_path), exist_ok=True)
logging.info("Downloading", file_name) print('downloading', download_file_path)
with open(download_file_path, "wb") as file: logging.info("Downloading", file_name)
file.write(file_content) with open(download_file_path, "wb") as file:
file.write(file_content)
conn = self.engine.raw_connection()
cur = conn.cursor()
curr_act = 'Downloaded'
cur.execute('''UPDATE audit_data_table SET current_activity = '{}' WHERE filename = '{}';'''.format(curr_act, filename_for_tab.split('.csv')[0]))
conn.commit()
except Exception as e:
print(e)
except Exception as e: except Exception as e:
logging.error(e) logging.error(e)
...@@ -217,12 +234,12 @@ class AzureBlobDownloader: ...@@ -217,12 +234,12 @@ class AzureBlobDownloader:
user=postgre_user, user=postgre_user,
password=postgre_password, password=postgre_password,
port=postgre_port) port=postgre_port)
logging.info("Connected to postgres sucessfully") logging.info("Connected to postgres successfully")
cur = conn.cursor() cur = conn.cursor()
cur.execute("SELECT start_date, filename from audit_data_table") cur.execute("SELECT start_date, filename from audit_data_table")
row = cur.fetchall() row = cur.fetchall()
if not row: if not row:
cur.execute('insert into audit_data_table(start_date, filename) values(%s, %s)', (dt, filename)) cur.execute('insert into audit_data_table(start_date, filename, job_id) values(%s, %s, %s)', (dt, filename, job_id))
conn.commit() conn.commit()
logging.info("Record Inserted Successfully") logging.info("Record Inserted Successfully")
return True return True
...@@ -233,10 +250,10 @@ class AzureBlobDownloader: ...@@ -233,10 +250,10 @@ class AzureBlobDownloader:
for records in row: for records in row:
column_list.append(records[0]) column_list.append(records[0])
if filename not in column_list: if filename not in column_list:
cur.execute('insert into audit_data_table(start_date, filename) values(%s, %s)', (dt, filename)) cur.execute('insert into audit_data_table(start_date, filename, job_id) values(%s, %s, %s)', (dt, filename, job_id))
conn.commit() conn.commit()
logging.info("Record Inserted Successfully") logging.info("Record Inserted Successfully")
return True return True
else: else:
cur.execute("SELECT filename, status from audit_data_table") cur.execute("SELECT filename, status from audit_data_table")
row = cur.fetchall() row = cur.fetchall()
...@@ -245,16 +262,19 @@ class AzureBlobDownloader: ...@@ -245,16 +262,19 @@ class AzureBlobDownloader:
for i in row: for i in row:
file_name_list.append(i[0]) file_name_list.append(i[0])
file_status_list.append(i[1]) file_status_list.append(i[1])
final_updated_dict = dict(zip(file_name_list,file_status_list)) final_updated_dict = dict(zip(file_name_list, file_status_list))
for key, value in final_updated_dict.items(): for key, value in final_updated_dict.items():
if key == filename: if key == filename:
if value != 'Success' and value != 'Skipped': if value != 'Success' and value != 'Skipped':
cur.execute('insert into audit_data_table(start_date, filename) values(%s, %s)',(dt, filename)) cur.execute('insert into audit_data_table(start_date, filename, job_id) values(%s, %s, %s)',
(dt, filename, job_id))
conn.commit() conn.commit()
return True return True
elif value == 'Success' or value == 'Skipped': elif value == 'Success' or value == 'Skipped':
status = 'Skipped' status = 'Skipped'
cur.execute('insert into audit_data_table(start_date, filename, status) values(%s, %s, %s)',(dt, filename, status)) cur.execute(
'insert into audit_data_table(start_date, filename, status, job_id) values(%s, %s, %s, %s)',
(dt, filename, status, job_id))
conn.commit() conn.commit()
else: else:
return False return False
...@@ -270,7 +290,7 @@ class AzureBlobDownloader: ...@@ -270,7 +290,7 @@ class AzureBlobDownloader:
blobby = blob.name blobby = blob.name
if blobby.endswith('.csv'): if blobby.endswith('.csv'):
counter += 1 counter += 1
blob_name = str(counter)+'_'+blobby blob_name = str(counter) + '_' + blobby
logging.info('blob_data', blob_name) logging.info('blob_data', blob_name)
bytes = self.my_container.get_blob_client(blob).download_blob().readall() bytes = self.my_container.get_blob_client(blob).download_blob().readall()
self.save_blob(blob_name, bytes) self.save_blob(blob_name, bytes)
...@@ -284,5 +304,4 @@ if start_date != '' and end_date != '': ...@@ -284,5 +304,4 @@ if start_date != '' and end_date != '':
else: else:
azure_blob_file_downloader = AzureBlobDownloader() azure_blob_file_downloader = AzureBlobDownloader()
azure_blob_file_downloader.download_all_blobs_in_container() azure_blob_file_downloader.download_all_blobs_in_container()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment