Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
P
postgre_ftp_clean_up
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
CI / CD Analytics
Repository Analytics
Value Stream Analytics
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
dasharatha.vamshi
postgre_ftp_clean_up
Commits
9df85558
Commit
9df85558
authored
May 08, 2021
by
dasharatha.vamshi
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
cleanup files
parent
003ba5c6
Changes
3
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
393 additions
and
0 deletions
+393
-0
.gitignore
.gitignore
+144
-0
clean_up.py
clean_up.py
+102
-0
postgre.py
postgre.py
+147
-0
No files found.
.gitignore
0 → 100644
View file @
9df85558
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# pycharm
.idea/
# logs
logs/
\ No newline at end of file
clean_up.py
0 → 100644
View file @
9df85558
import
datetime
from
postgre
import
Postgres
import
logging
import
os
import
sys
from
functools
import
partial
from
ftplib
import
FTP
from
logging
import
StreamHandler
import
socket
from
multiprocessing.pool
import
ThreadPool
logger
=
logging
.
getLogger
(
"postgre_ftp_clean_up"
)
logger
.
setLevel
(
os
.
environ
.
get
(
"logger"
,
"DEBUG"
))
__formatter
=
logging
.
Formatter
(
'
%(asctime)
s -
%(name)
s -
%(levelname)
s -
%(module)
s -
%(lineno)
d -
%(message)
s'
)
__console_handler
=
StreamHandler
(
sys
.
stdout
)
__console_handler
.
setFormatter
(
__formatter
)
logger
.
addHandler
(
__console_handler
)
check_for_n_hours
=
int
(
os
.
environ
.
get
(
'check_for_n_hours'
,
default
=
'3'
))
THREAD_POOL_LIMIT
=
os
.
environ
.
get
(
"THREAD_POOL_LIMIT"
,
default
=
10
)
FTP_HOST
=
os
.
environ
.
get
(
"FTP_HOST"
,
default
=
"hfe-ftp.ilens.io"
)
FTP_USER
=
os
.
environ
.
get
(
"FTP_USER"
,
default
=
"ilenshero1"
)
FTP_PASS
=
os
.
environ
.
get
(
"FTP_PASS"
,
default
=
"ilenshfe123"
)
FTP_MODBUS_DIRECTORY
=
os
.
environ
.
get
(
"FTP_MODBUS_DIRECTORY"
,
default
=
"/upload/DATA/MODBUS/"
)
FTP_IO_DIRECTORY
=
os
.
environ
.
get
(
"FTP_IO_DIRECTORY"
,
default
=
"/upload/DATA/IO/"
)
FTP_WPM_DIRECTORY
=
os
.
environ
.
get
(
"FTP_WPM_DIRECTORY"
,
default
=
"/upload/DATA/"
)
FTP_TARGET_DIRECTORY
=
os
.
environ
.
get
(
"FTP_DIRECTORY"
)
NUMBER_OF_SPLITS
=
int
(
os
.
environ
.
get
(
"NUMBER_OF_SPLITS"
,
20
))
class
SmartFTP
(
FTP
):
def
makepasv
(
self
):
self
.
af
=
socket
.
AF_INET6
invalidhost
,
port
=
super
(
SmartFTP
,
self
)
.
makepasv
()
return
self
.
host
,
port
def
remove
(
path
,
source_directory
,
ftp_host
=
FTP_HOST
,
ftp_user
=
FTP_USER
,
ftp_pass
=
FTP_PASS
):
try
:
ftp
=
SmartFTP
(
ftp_host
,
ftp_user
,
ftp_pass
)
ftp
.
login
(
ftp_user
,
ftp_pass
)
ftp
.
cwd
(
source_directory
)
ftp
.
delete
(
os
.
path
.
basename
(
path
))
logger
.
info
(
"changing directory to {}"
.
format
(
source_directory
))
logger
.
info
(
"getting file list"
)
logger
.
info
(
"removing ftp file {}"
.
format
(
path
))
ftp
.
close
()
return
path
except
:
logger
.
info
(
f
"Skipping {path}"
)
if
__name__
==
"__main__"
:
logger
.
info
(
"making postgre conection"
)
pg
=
Postgres
()
time_stamp
=
datetime
.
datetime
.
utcnow
()
-
datetime
.
timedelta
(
hours
=
check_for_n_hours
)
file_list
=
pg
.
get_file_names
(
'hfe_master_file_status'
,
time_stamp
)
final_file_list
=
[]
WPM_list
=
[]
for
i
in
range
(
0
,
len
(
file_list
)):
if
"WPM"
in
file_list
[
i
]:
WPM_list
.
append
(
file_list
[
i
])
else
:
final_file_list
.
append
(
file_list
[
i
])
print
(
len
(
file_list
),
file_list
)
IO_list
=
[
i
for
i
in
final_file_list
if
'IO'
in
i
]
MODBUS_list
=
[
i
for
i
in
final_file_list
if
'MODBUS'
in
i
]
print
(
len
(
final_file_list
),
final_file_list
)
print
(
len
(
WPM_list
),
WPM_list
)
print
(
len
(
IO_list
),
IO_list
)
print
(
len
(
MODBUS_list
),
MODBUS_list
)
logger
.
info
(
"Closing Postgre Connection"
)
pg
.
close_connection
()
THREAD_POOL_LIMIT
=
20
pool
=
ThreadPool
(
int
(
THREAD_POOL_LIMIT
))
if
len
(
WPM_list
)
>
0
:
try
:
logger
.
info
(
f
"Removing WPM files...."
)
prod_x
=
partial
(
remove
,
source_directory
=
FTP_WPM_DIRECTORY
)
map_output
=
pool
.
map
(
prod_x
,
WPM_list
)
except
Exception
as
e
:
logger
.
info
(
e
)
else
:
logger
.
info
(
f
"No WPM files after {time_stamp}"
)
if
len
(
IO_list
)
>
0
:
try
:
logger
.
info
(
f
"Removing IO files...."
)
prod_x
=
partial
(
remove
,
source_directory
=
FTP_IO_DIRECTORY
)
map_output
=
pool
.
map
(
prod_x
,
IO_list
)
except
Exception
as
e
:
logger
.
info
(
e
)
else
:
logger
.
info
(
f
"No IO files after {time_stamp}"
)
if
len
(
MODBUS_list
)
>
0
:
try
:
logger
.
info
(
f
"Removing MODBUS files...."
)
prod_x
=
partial
(
remove
,
source_directory
=
FTP_MODBUS_DIRECTORY
)
map_output
=
pool
.
map
(
prod_x
,
MODBUS_list
)
except
Exception
as
e
:
logger
.
info
(
e
)
else
:
logger
.
info
(
f
"No MODBUS files after {time_stamp}"
)
\ No newline at end of file
postgre.py
0 → 100644
View file @
9df85558
import
logging
# from PostgreSQLV1_Constants import password
import
os
import
sys
from
logging
import
StreamHandler
import
psycopg2
logger
=
logging
.
getLogger
(
"POSTGRE"
)
logger
.
setLevel
(
'INFO'
)
__formatter
=
logging
.
Formatter
(
'
%(asctime)
s -
%(name)
s -
%(levelname)
s -
%(module)
s -
%(lineno)
d -
%(message)
s'
)
__console_handler
=
StreamHandler
(
sys
.
stdout
)
__console_handler
.
setFormatter
(
__formatter
)
logger
.
addHandler
(
__console_handler
)
password
=
""
# Host: postgres-db-service.ilens-infra
# Port: 5432
# DB Name: ilens-hfae
# Username: hfae
# Passwd: iLens#4321
os
.
environ
[
'postgre_database'
]
=
"ilens-hfae"
os
.
environ
[
'postgre_user'
]
=
"hfae"
os
.
environ
[
'postgre_password'
]
=
"iLens#4321"
os
.
environ
[
'postgre_host'
]
=
"20.198.97.231"
os
.
environ
[
'postgre_port'
]
=
"5432"
class
Postgres
():
def
__init__
(
self
):
self
.
conn
=
psycopg2
.
connect
(
database
=
os
.
environ
.
get
(
"postgre_database"
,
default
=
None
),
user
=
os
.
environ
.
get
(
"postgre_user"
,
default
=
None
),
password
=
os
.
environ
.
get
(
"postgre_password"
,
default
=
None
),
host
=
os
.
environ
.
get
(
"postgre_host"
,
default
=
None
),
port
=
int
(
os
.
environ
.
get
(
"postgre_port"
,
default
=
None
))
)
logger
.
info
(
"Connected to PostgresSQL DB..."
)
self
.
cur
=
self
.
conn
.
cursor
()
def
create_table
(
self
,
table_name
):
self
.
cur
.
execute
(
"CREATE TABLE {table_name}(job_id INT NOT NULL, file_name TEXT NOT NULL,"
"site_mac_id TEXT NOT NULL,"
"start_date DATE NOT NULL,end_date DATE NOT NULL,status TEXT)"
.
format
(
table_name
=
table_name
))
logger
.
info
(
"Table created successfully"
)
self
.
conn
.
commit
()
def
delete_records
(
self
,
table_name
,
pipeline_id
):
self
.
cur
.
execute
(
"DELETE FROM {table_name} WHERE pipeline_id='{pipeline_id}'"
.
format
(
table_name
=
table_name
,
pipeline_id
=
pipeline_id
))
logger
.
info
(
"Record deleted successfully"
)
self
.
conn
.
commit
()
def
insert_records
(
self
,
table_name
,
job_id
,
file_name
,
site_mac_id
,
start_date
,
end_date
,
status
):
try
:
self
.
cur
.
execute
(
"INSERT INTO {table_name} (job_id,file_name, site_mac_id, start_date, end_date, status)"
"values('{job_id}','{file_name}', '{site_mac_id}', '{start_date}', '{end_date}', "
"'{status}')"
.
format
(
table_name
=
table_name
,
job_id
=
job_id
,
file_name
=
file_name
,
site_mac_id
=
site_mac_id
,
start_date
=
start_date
,
end_date
=
end_date
,
status
=
status
))
self
.
conn
.
commit
()
logger
.
info
(
'Records Inserted successfully'
)
except
Exception
as
e
:
logger
.
error
(
f
'failed to upload status {file_name}'
)
logger
.
error
(
e
)
def
update_records
(
self
,
table_name
,
file_name
,
job_id
,
end_date
,
status
):
# self.cur.execute("SELECT * from {table_name}".format(table_name=table_name))
# rows = self.cur.fetchall()
# for file in file_list:
# if row[0] == file_list:
# ri = row[3] + 1
try
:
self
.
cur
.
execute
(
"UPDATE {table_name} set status = '{status}', end_date = '{end_date}' where job_id = '{job_id}' "
"and file_name = '{file_name}' "
.
format
(
table_name
=
table_name
,
file_name
=
file_name
,
status
=
status
,
end_date
=
end_date
,
job_id
=
job_id
))
self
.
conn
.
commit
()
logger
.
info
(
f
'Records Updated successfully for file {file_name}'
)
except
Exception
as
e
:
logger
.
error
(
f
'failed to upload status {file_name}'
)
logger
.
error
(
e
)
def
show_records
(
self
,
table_name
):
self
.
cur
.
execute
(
"SELECT * from {table_name}"
.
format
(
table_name
=
table_name
))
rows
=
self
.
cur
.
fetchall
()
for
row
in
rows
:
logger
.
info
(
row
)
def
get_file_names
(
self
,
table_name
,
time_stamp
):
logger
.
info
(
f
"SELECT file_name FROM {table_name} WHERE status in ('ARCHIVE','UNPROCESSED') and start_date >= timestamp '{time_stamp}'"
)
self
.
cur
.
execute
(
f
"SELECT file_name FROM {table_name} WHERE status in ('ARCHIVE','UNPROCESSED') and start_date >= timestamp '{time_stamp}'"
)
rows
=
self
.
cur
.
fetchall
()
new_list
=
[]
for
row
in
rows
:
new_list
.
append
(
row
[
0
])
return
new_list
def
query_db
(
self
,
query
,
table_name
,
main_list
,
time_stamp
):
logger
.
info
(
f
"SELECT file_name FROM {table_name} WHERE file_name in {query} and start_date >= timestamp '{time_stamp}'"
)
self
.
cur
.
execute
(
f
"SELECT file_name FROM {table_name} WHERE file_name in {query} and start_date >= timestamp '{time_stamp}'"
)
rows
=
self
.
cur
.
fetchall
()
for
row
in
rows
:
print
(
row
)
# try:
# logger.info(f"{row[0]} is already in postgres")
# main_list.remove(row[0])
# except:
# pass
return
main_list
def
query_file_name
(
self
,
file_name
,
table_name
):
try
:
self
.
cur
.
execute
(
f
"SELECT COUNT(file_name) FROM {table_name} WHERE file_name = '{file_name}'"
)
rows
=
self
.
cur
.
fetchone
()
return
rows
[
0
]
except
:
return
0
def
close_connection
(
self
):
self
.
conn
.
close
()
# pg = Postgres()
# import datetime
# check_for_n_hours = 1
# time_stamp = datetime.datetime.utcnow() - datetime.timedelta(hours=check_for_n_hours)
# pg.get_file_names('hfe_master_file_status',time_stamp)
# # pg.query_db("('WD008A67')",'hfe_master_file_status',[],time_stamp)
# # pg.insert_records(108, 'PreProcessing', 112, 1109, '2020-10-01', '2020-11-10', 'running')
# # pg.delete_records('DEVICEA',108)
# # pg.update_records('DEVICEA',103,'error')
# # pg.show_records('DEVICEA')
# pg.close_connection()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment