Add new file

parent 8e00d9f5
from azure.storage.blob import BlobServiceClient
import os.path
from datetime import datetime
from datetime import date
from pathlib import Path
import psycopg2
import os
import logging
postgres_database = os.environ.get('postgres_database', default='ilens_jubilant')
postgre_user = os.environ.get('postgre_user', default='ilens')
postgre_password = os.environ.get('postgre_password', default='ilens#4321')
postgre_host = os.environ.get('postgre_host', default='192.168.0.207')
postgre_port = os.environ.get('postgre_port', default='5433')
connection_string = os.environ.get('connection_string', default="DefaultEndpointsProtocol=https;AccountName=azrabsjililens01;AccountKey=OEsYFDThcDilcJSGO7dy8HhZwsuxTCmV231s2UKjAuKaQYClUwX4RH84J1FQM1bLlauS2tDqutyRzbaZsywN4w==;EndpointSuffix=core.windows.net")
blob_container = os.environ.get('blob_container', 'jubilant')
job_id = os.environ.get('JOB_ID', 'job_1')
LOCAL_BLOB_PATH = os.environ.get('LOCAL_PATH','ilens/prod/jubilant')
start_date = os.environ.get('start_date', '')
end_date = os.environ.get('end_date', '')
if start_date != '' and end_date != '':
start_dte = str(start_date)
end_dte = str(end_date)
start_ep = datetime.strptime(start_dte, '%Y/%m/%d')
end_ep = datetime.strptime(end_dte, '%Y/%m/%d')
step = timedelta(days=1)
x = []
while start_ep <= end_ep:
x.append(str(start_ep))
start_ep += step
y = []
for i in x:
c = i.split(' ')
y.append(c[0])
z = []
for i in y:
s = i.split('-')
s = s[::-1]
k = ''.join(map(str,s))
z.append(k)
start_date_list = start_date.split('/')
end_date_list = end_date.split('/')
start_list_dt = start_date_list[::-1]
end_list_dt = end_date_list[::-1]
start = ''.join(map(str, start_list_dt))
end = ''.join(map(str,end_list_dt))
today = date.today()
today = str(today)
today_list = today.split('-')
today_list = today_list[::-1]
today_date = ''.join(map(str, today_list))
date_ts = today
dt = datetime.now()
class AzureBlobDownloaderHistorical:
def __init__(self, my_connection_string=connection_string, my_blob_container=blob_container):
print("Intializing AzureBlobFileDownloader")
self.blob_service_client = BlobServiceClient.from_connection_string(my_connection_string)
self.my_container = self.blob_service_client.get_container_client(my_blob_container)
def save_blob(self, file_name, file_content):
try:
filename = file_name.split('/')
filename_val = filename[0]
file_value = filename_val.split('_')
filename_new = file_value[1]
file_name_data = filename[1]
file_name_data_val = file_name_data.split('.')
file_val = file_name_data_val[0]
final_file = file_val.split('_')
final_file_name = (final_file[0]+'_'+final_file[1]).lower()
current_date = filename_new
final_name = final_file_name+'_'+current_date
if start == end:
if start == current_date and end == current_date:
validate_postgres = self.check_audit_postgres(dt, final_name)
if validate_postgres:
download_file_path = os.path.join(LOCAL_BLOB_PATH, job_id, file_name)
os.makedirs(os.path.dirname(download_file_path), exist_ok=True)
print('downloading', download_file_path)
logging.info("Downloading", file_name)
with open(download_file_path, "wb") as file:
file.write(file_content)
else:
file_start_val = file_name.split('/')
file_dat = file_start_val[0]
file_dat_val = file_dat.split('_')
if file_dat_val[1] in z:
validate_postgres = self.check_audit_postgres(dt, final_name)
if validate_postgres:
download_file_path = os.path.join(LOCAL_BLOB_PATH, job_id, file_name)
print(file_name)
os.makedirs(os.path.dirname(download_file_path), exist_ok=True)
print('downloading', download_file_path)
logging.info("Downloading", file_name)
with open(download_file_path, "wb") as file:
file.write(file_content)
except Exception as e:
print(e)
logging.error(e)
def check_audit_postgres(self, dt, filename):
try:
conn = psycopg2.connect(
host=postgre_host,
database=postgres_database,
user=postgre_user,
password=postgre_password,
port=postgre_port)
logging.info("Connected to postgres sucessfully")
cur = conn.cursor()
cur.execute("SELECT date, filename from audit_data_table")
row = cur.fetchall()
if not row:
cur.execute('insert into audit_data_table(start_date, filename) values(%s, %s)', (dt, filename))
conn.commit()
print("Record Inserted Successfully")
return True
else:
cur.execute("SELECT filename from audit_data_table")
column_list = []
row = cur.fetchall()
for records in row:
column_list.append(records[0])
if filename not in column_list:
cur.execute('insert into audit_data_table(start_date, filename) values(%s, %s)', (dt, filename))
conn.commit()
print("Record Inserted Successfully")
return True
else:
cur.execute("SELECT filename, status from audit_data_table")
row = cur.fetchall()
file_name_list = []
file_status_list = []
for i in row:
file_name_list.append(i[0])
file_status_list.append(i[1])
final_updated_dict = dict(zip(file_name_list,file_status_list))
for key, value in final_updated_dict.items():
if value != 'Success' or value != 'Skipped':
cur.execute('insert into audit_data_table(start_date, filename) values(%s, %s)',(date_ts, filename))
conn.commit()
return True
elif value == 'Success' or value == 'Skipped':
status = 'Skipped'
cur.execute('insert into audit_data_table(start_date, filename, status) values(%s, %s, %s)',
(date_ts, filename, status))
conn.commit()
else:
return False
except Exception as e:
print(e)
logging.error(e)
def download_all_blobs_in_container(self):
try:
my_blobs = self.my_container.list_blobs()
counter = 0
file_conmt = 'jubilant'
for blob in my_blobs:
blobby = blob.name
if blobby.endswith('.csv'):
counter += 1
blob_name = str(counter)+'_'+blobby
logging.info('blob_data', blob_name)
bytes = self.my_container.get_blob_client(blob).download_blob().readall()
self.save_blob(blob_name, bytes)
except Exception as e:
print(e)
logging.error(e)
class AzureBlobDownloader:
def __init__(self, my_connection_string=connection_string, my_blob_container=blob_container):
print("Intializing AzureBlobFileDownloader")
self.blob_service_client = BlobServiceClient.from_connection_string(my_connection_string)
self.my_container = self.blob_service_client.get_container_client(my_blob_container)
def save_blob(self, file_name, file_content):
try:
filename = file_name.split('/')
filename_val = filename[0]
file_value = filename_val.split('_')
filename_new = file_value[1]
file_name_data = filename[1]
file_name_data_val = file_name_data.split('.')
file_val = file_name_data_val[0]
final_file = file_val.split('_')
final_file_name = (final_file[0]+'_'+final_file[1]).lower()
current_date = filename_new
final_name = final_file_name+'_'+current_date
if current_date == today_date:
validate_postgres = self.check_audit_postgres(date_ts, final_name)
if validate_postgres:
download_file_path = os.path.join(LOCAL_BLOB_PATH, job_id, file_name)
os.makedirs(os.path.dirname(download_file_path), exist_ok=True)
print('downloading', download_file_path)
logging.info("Downloading", file_name)
with open(download_file_path, "wb") as file:
file.write(file_content)
except Exception as e:
logging.error(e)
def check_audit_postgres(self, date_ts, filename):
try:
conn = psycopg2.connect(
host=postgre_host,
database=postgres_database,
user=postgre_user,
password=postgre_password,
port=postgre_port)
logging.info("Connected to postgres sucessfully")
cur = conn.cursor()
cur.execute("SELECT start_date, filename from audit_data_table")
row = cur.fetchall()
if not row:
cur.execute('insert into audit_data_table(start_date, filename) values(%s, %s)', (date_ts, filename))
conn.commit()
logging.info("Record Inserted Successfully")
return True
else:
cur.execute("SELECT filename from audit_data_table")
column_list = []
row = cur.fetchall()
for records in row:
column_list.append(records[0])
if filename not in column_list:
cur.execute('insert into audit_data_table(start_date, filename) values(%s, %s)', (date_ts, filename))
conn.commit()
logging.info("Record Inserted Successfully")
return True
else:
cur.execute("SELECT filename, status from audit_data_table")
row = cur.fetchall()
file_name_list = []
file_status_list = []
for i in row:
file_name_list.append(i[0])
file_status_list.append(i[1])
final_updated_dict = dict(zip(file_name_list,file_status_list))
for key, value in final_updated_dict.items():
if key == filename:
if value != 'Success' and value != 'Skipped':
cur.execute('insert into audit_data_table(start_date, filename) values(%s, %s)',(date_ts, filename))
conn.commit()
return True
elif value == 'Success' or value == 'Skipped':
status = 'Skipped'
cur.execute('insert into audit_data_table(start_date, filename, status) values(%s, %s, %s)',(date_ts, filename, status))
conn.commit()
else:
return False
except Exception as e:
logging.error(e)
def download_all_blobs_in_container(self):
try:
my_blobs = self.my_container.list_blobs()
counter = 0
file_conmt = 'jubilant'
for blob in my_blobs:
blobby = blob.name
if blobby.endswith('.csv'):
counter += 1
blob_name = str(counter)+'_'+blobby
logging.info('blob_data', blob_name)
bytes = self.my_container.get_blob_client(blob).download_blob().readall()
self.save_blob(blob_name, bytes)
except Exception as e:
logging.error(e)
if start_date != '' and end_date != '':
azure_blob_file_downloader_historical = AzureBlobDownloaderHistorical()
azure_blob_file_downloader_historical.download_all_blobs_in_container()
else:
azure_blob_file_downloader = AzureBlobDownloader()
azure_blob_file_downloader.download_all_blobs_in_container()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment