Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
H
historical_BAAN_data
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
CI / CD Analytics
Repository Analytics
Value Stream Analytics
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
vamsikrishna.vemulamanda
historical_BAAN_data
Commits
fa9cf4d4
Commit
fa9cf4d4
authored
Jul 03, 2021
by
vamsikrishna.vemulamanda
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Add new file
parent
75a9b30e
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
122 additions
and
0 deletions
+122
-0
read_historical_azure_data/read_azure_data
read_historical_azure_data/read_azure_data
+122
-0
No files found.
read_historical_azure_data/read_azure_data
0 → 100644
View file @
fa9cf4d4
from azure.storage.blob import BlobServiceClient
import os.path
from datetime import datetime
from datetime import date
from pathlib import Path
import psycopg2
import os
import logging
postgres_database = os.environ.get('postgres_database', default='ilens_jubilant')
postgre_user = os.environ.get('postgre_user', default='ilens')
postgre_password = os.environ.get('postgre_password', default='ilens#4321')
postgre_host = os.environ.get('postgre_host', default='192.168.0.207')
postgre_port = os.environ.get('postgre_port', default='5433')
connection_string = os.environ.get('connection_string', default="DefaultEndpointsProtocol=https;AccountName=azrabsjililens01;AccountKey=OEsYFDThcDilcJSGO7dy8HhZwsuxTCmV231s2UKjAuKaQYClUwX4RH84J1FQM1bLlauS2tDqutyRzbaZsywN4w==;EndpointSuffix=core.windows.net")
blob_container = os.environ.get('blob_container', 'jubilant')
job_id = os.environ.get('JOB_ID', 'job_1')
LOCAL_BLOB_PATH = os.environ.get('LOCAL_PATH','ilens/prod/jubilant')
today = date.today()
today = str(today)
today_list = today.split('-')
today_list = today_list[::-1]
today_date = ''.join(map(str, today_list))
date_ts = today
class AzureBlobDownloader:
def __init__(self, my_connection_string=connection_string, my_blob_container=blob_container):
print("Intializing AzureBlobFileDownloader")
self.blob_service_client = BlobServiceClient.from_connection_string(my_connection_string)
self.my_container = self.blob_service_client.get_container_client(my_blob_container)
def save_blob(self, file_name, file_content):
try:
filename = file_name.split('/')
filename_val = filename[0]
file_value = filename_val.split('_')
filename_new = file_value[1]
file_name_data = filename[1]
file_name_data_val = file_name_data.split('.')
file_val = file_name_data_val[0]
final_file = file_val.split('_')
final_file_name = (final_file[0]+'_'+final_file[1]).lower()
current_date = filename_new
final_name = final_file_name+'_'+current_date
if current_date != today_date:
validate_postgres = self.check_audit_postgres(date_ts, final_name)
if validate_postgres:
download_file_path = os.path.join(LOCAL_BLOB_PATH, job_id, file_name)
os.makedirs(os.path.dirname(download_file_path), exist_ok=True)
print('downloading', download_file_path)
logging.info("Downloading", file_name)
with open(download_file_path, "wb") as file:
file.write(file_content)
except Exception as e:
logging.error(e)
def check_audit_postgres(self, date_ts, filename):
try:
conn = psycopg2.connect(
host=postgre_host,
database=postgres_database,
user=postgre_user,
password=postgre_password,
port=postgre_port)
logging.info("Connected to postgres sucessfully")
cur = conn.cursor()
cur.execute("SELECT date, filename from audit_table")
row = cur.fetchall()
if not row:
cur.execute('insert into audit_table(date, filename) values(%s, %s)', (date_ts, filename))
conn.commit()
logging.info("Record Inserted Successfully")
return True
else:
cur.execute("SELECT filename from audit_table")
column_list = []
row = cur.fetchall()
for records in row:
column_list.append(records[0])
if filename not in column_list:
cur.execute('insert into audit_table(date, filename) values(%s, %s)', (date_ts, filename))
conn.commit()
logging.info("Record Inserted Successfully")
return True
else:
cur.execute("SELECT filename, status from audit_table")
row = cur.fetchall()
file_name_list = []
file_status_list = []
for i in row:
file_name_list.append(i[0])
file_status_list.append(i[1])
final_updated_dict = dict(zip(file_name_list,file_status_list))
for key, value in final_updated_dict.items():
if key == filename:
if value != 'Success':
return True
else:
return False
except Exception as e:
logging.error(e)
def download_all_blobs_in_container(self):
try:
my_blobs = self.my_container.list_blobs()
counter = 0
file_conmt = 'jubilant'
for blob in my_blobs:
blobby = blob.name
if blobby.endswith('.csv'):
counter += 1
blob_name = str(counter)+'_'+blobby
logging.info('blob_data', blob_name)
bytes = self.my_container.get_blob_client(blob).download_blob().readall()
self.save_blob(blob_name, bytes)
except Exception as e:
logging.error(e)
azure_blob_file_downloader = AzureBlobDownloader()
azure_blob_file_downloader.download_all_blobs_in_container()
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment