Commit 2c2d1df3 authored by dasharatha.vamshi's avatar dasharatha.vamshi

find/delete

parent bcc1acf4
# postgre_ftp_clean_up # postgre_ftp_clean_up
This utility can be used to display/delete the list of files which should be deleted in FTP as they are alredy processed
How to Run it:
1. To Command to get the list of all files in WPM folder or IO folder or MODBUS folder in FTP is:
python clean_up.py find
2. To Command to Delete files in WPM folder or IO folder or MODBUS folder in FTP is:
python clean_up.py delete
...@@ -8,6 +8,7 @@ from ftplib import FTP ...@@ -8,6 +8,7 @@ from ftplib import FTP
from logging import StreamHandler from logging import StreamHandler
import socket import socket
from multiprocessing.pool import ThreadPool from multiprocessing.pool import ThreadPool
import re
logger = logging.getLogger("postgre_ftp_clean_up") logger = logging.getLogger("postgre_ftp_clean_up")
logger.setLevel(os.environ.get("logger", "DEBUG")) logger.setLevel(os.environ.get("logger", "DEBUG"))
...@@ -16,7 +17,7 @@ __console_handler = StreamHandler(sys.stdout) ...@@ -16,7 +17,7 @@ __console_handler = StreamHandler(sys.stdout)
__console_handler.setFormatter(__formatter) __console_handler.setFormatter(__formatter)
logger.addHandler(__console_handler) logger.addHandler(__console_handler)
check_for_n_hours = int(os.environ.get('check_for_n_hours', default='1')) check_for_n_hours = int(os.environ.get('check_for_n_hours', default='60'))
THREAD_POOL_LIMIT = os.environ.get("THREAD_POOL_LIMIT", default=10) THREAD_POOL_LIMIT = os.environ.get("THREAD_POOL_LIMIT", default=10)
FTP_HOST = os.environ.get("FTP_HOST", default="hfe-ftp.ilens.io") FTP_HOST = os.environ.get("FTP_HOST", default="hfe-ftp.ilens.io")
FTP_USER = os.environ.get("FTP_USER", default="ilenshero1") FTP_USER = os.environ.get("FTP_USER", default="ilenshero1")
...@@ -28,6 +29,13 @@ FTP_TARGET_DIRECTORY = os.environ.get("FTP_DIRECTORY") ...@@ -28,6 +29,13 @@ FTP_TARGET_DIRECTORY = os.environ.get("FTP_DIRECTORY")
NUMBER_OF_SPLITS = int(os.environ.get("NUMBER_OF_SPLITS", 20)) NUMBER_OF_SPLITS = int(os.environ.get("NUMBER_OF_SPLITS", 20))
enable_traceback = False enable_traceback = False
try:
functionality = sys.argv[1]
# functionality = "DELETE"
except Exception as e:
logger.exception(f"find/delete functionality not found in command line arguments: {e}")
raise Exception(f"find/delete functionality not found in command line arguments: {e}")
class SmartFTP(FTP): class SmartFTP(FTP):
def makepasv(self): def makepasv(self):
...@@ -36,6 +44,21 @@ class SmartFTP(FTP): ...@@ -36,6 +44,21 @@ class SmartFTP(FTP):
return self.host, port return self.host, port
def file_copy_list(ftp_host, ftp_user, ftp_pass, source_directory):
logger.info("Creating FTP connection to {}".format(ftp_host))
ftp = SmartFTP(ftp_host, ftp_user, ftp_pass)
ftp.login(ftp_user, ftp_pass)
logger.info("changing directory to {}".format(source_directory))
ftp.cwd(source_directory)
logger.info("getting file list")
filenames = ftp.nlst()
required_list = []
for each in filenames:
if each.endswith(".csv.gz"):
required_list.append(each)
return required_list
def remove(path, source_directory, ftp_host=FTP_HOST, ftp_user=FTP_USER, ftp_pass=FTP_PASS): def remove(path, source_directory, ftp_host=FTP_HOST, ftp_user=FTP_USER, ftp_pass=FTP_PASS):
try: try:
ftp = SmartFTP(ftp_host, ftp_user, ftp_pass) ftp = SmartFTP(ftp_host, ftp_user, ftp_pass)
...@@ -50,7 +73,20 @@ def remove(path, source_directory, ftp_host=FTP_HOST, ftp_user=FTP_USER, ftp_pas ...@@ -50,7 +73,20 @@ def remove(path, source_directory, ftp_host=FTP_HOST, ftp_user=FTP_USER, ftp_pas
if '550 Delete operation failed' in str(e): if '550 Delete operation failed' in str(e):
logger.info(f"Skipping {path}") logger.info(f"Skipping {path}")
else: else:
logger.error(f"Error when removing files from FTP location {source_directory} {e}", exc_info=enable_traceback) logger.error(f"Error when removing files from FTP location {source_directory} {e}",
exc_info=enable_traceback)
def get_files_from_ftp_to_delete(source_dir, postgre_list):
delete_list = []
ftp_files = file_copy_list(ftp_host=FTP_HOST,
ftp_user=FTP_USER,
ftp_pass=FTP_PASS,
source_directory=source_dir)
for f in postgre_list:
if f in ftp_files:
delete_list.append(f)
return delete_list
if __name__ == "__main__": if __name__ == "__main__":
...@@ -58,9 +94,10 @@ if __name__ == "__main__": ...@@ -58,9 +94,10 @@ if __name__ == "__main__":
pg = Postgres() pg = Postgres()
time_stamp = datetime.datetime.utcnow() - datetime.timedelta(hours=check_for_n_hours) time_stamp = datetime.datetime.utcnow() - datetime.timedelta(hours=check_for_n_hours)
file_list = pg.get_file_names('hfe_master_file_status', time_stamp) file_list = pg.get_file_names('hfe_master_file_status', time_stamp)
file_list = list(set(file_list))
pg.close_connection() pg.close_connection()
final_file_list = [] final_file_list = []
WPM_list = [] WPM_list = list()
for i in range(0, len(file_list)): for i in range(0, len(file_list)):
if "WPM" in file_list[i]: if "WPM" in file_list[i]:
WPM_list.append(file_list[i]) WPM_list.append(file_list[i])
...@@ -75,30 +112,45 @@ if __name__ == "__main__": ...@@ -75,30 +112,45 @@ if __name__ == "__main__":
logger.debug("Closing Postgre Connection") logger.debug("Closing Postgre Connection")
THREAD_POOL_LIMIT = 20 THREAD_POOL_LIMIT = 20
pool = ThreadPool(int(THREAD_POOL_LIMIT)) pool = ThreadPool(int(THREAD_POOL_LIMIT))
if len(WPM_list) > 0:
try: final_WPM_list = get_files_from_ftp_to_delete(FTP_WPM_DIRECTORY,WPM_list)
logger.debug(f"Total files which should be deleted in ftp path {FTP_WPM_DIRECTORY} are {len(final_WPM_list)}")
final_IO_list = get_files_from_ftp_to_delete(FTP_IO_DIRECTORY,IO_list)
logger.debug(f"Total files which should be deleted in ftp path {FTP_IO_DIRECTORY} are {len(final_IO_list)}")
final_MODBUS_list = get_files_from_ftp_to_delete(FTP_MODBUS_DIRECTORY,MODBUS_list)
logger.debug(f"Total files which should be deleted in ftp path {FTP_MODBUS_DIRECTORY} are {len(final_MODBUS_list)}")
if len(final_WPM_list) > 0:
if functionality.lower() == 'find':
logger.info(f"files which should be deleted in FTP path {FTP_WPM_DIRECTORY} are {final_WPM_list}")
elif functionality.lower() == 'delete':
logger.info(f"files which should be deleted in ftp path {FTP_WPM_DIRECTORY} are {final_WPM_list}")
logger.info(f"Removing WPM files....") logger.info(f"Removing WPM files....")
prod_x = partial(remove, source_directory=FTP_WPM_DIRECTORY) prod_x = partial(remove, source_directory=FTP_WPM_DIRECTORY)
map_output = pool.map(prod_x, WPM_list) map_output = pool.map(prod_x, final_WPM_list)
except Exception as e:
logger.error(f"Error when removing WPM files from FTP {e}", exc_info=enable_traceback)
else: else:
logger.info(f"No WPM files after {time_stamp}") logger.info(f"No WPM files should be deleted after {time_stamp}")
if len(IO_list) > 0:
try: if len(final_IO_list) > 0:
if functionality.lower() == 'find':
logger.info(f"files which should be deleted in ftp path {FTP_IO_DIRECTORY} are {final_IO_list}")
elif functionality.lower() == 'delete':
logger.info(f"files which should be deleted in ftp path {FTP_IO_DIRECTORY} are {final_IO_list}")
logger.info(f"Removing IO files....") logger.info(f"Removing IO files....")
prod_x = partial(remove, source_directory=FTP_IO_DIRECTORY) prod_x = partial(remove, source_directory=FTP_IO_DIRECTORY)
map_output = pool.map(prod_x, IO_list) map_output = pool.map(prod_x, final_IO_list)
except Exception as e:
logger.error(f"Error when removing IO files from FTP {e}", exc_info=enable_traceback)
else: else:
logger.info(f"No IO files after {time_stamp}") logger.info(f"No IO files should be deleted after {time_stamp}")
if len(MODBUS_list) > 0:
try: if len(final_MODBUS_list) > 0:
if functionality.lower() == 'find':
logger.info(f"files which should be deleted in ftp path {FTP_MODBUS_DIRECTORY} are {final_MODBUS_list}")
elif functionality.lower() == 'delete':
logger.info(f"files which should be deleted in ftp path {FTP_MODBUS_DIRECTORY} are {final_MODBUS_list}")
logger.info(f"Removing MODBUS files....") logger.info(f"Removing MODBUS files....")
prod_x = partial(remove, source_directory=FTP_MODBUS_DIRECTORY) prod_x = partial(remove, source_directory=FTP_MODBUS_DIRECTORY)
map_output = pool.map(prod_x, MODBUS_list) map_output = pool.map(prod_x, final_MODBUS_list)
except Exception as e:
logger.error(f"Error when removing MODBUS files from FTP {e}", exc_info=enable_traceback)
else: else:
logger.info(f"No MODBUS files after {time_stamp}") logger.info(f"No MODBUS files should be deleted after {time_stamp}")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment