Commit a587db48 authored by dasharatha.vamshi's avatar dasharatha.vamshi

init

parent e5d3d702
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# pycharm
.idea/
# logs
logs/
\ No newline at end of file
FROM python:3.7-slim
ADD . /opt
WORKDIR /opt
RUN pip install -r requirements.txt
CMD python main.py
#---------------Service Configurations----------------#
SERVICE_CONFIG:
LOG_LEVEL: info
LOG_HANDLER_NAME: AddtoModelStore
LOGSTASH_HOST: 192.168.1.47
LOGSTASH_PORT: 5000
#--------------System Configurations--------------------#
# for testing use temp/mnt/ilens/ai-jobs/
#--------------System Configurations--------------------#
SYSTEM_CONFIG:
shared_mount_base_ai_job: /mnt/ilens/ai-jobs/
connection_string: DefaultEndpointsProtocol=https;AccountName=azrabsilensqa01;AccountKey=DN6q6kX98JM8yUwtuJh2bAaXUGFo1zRS5HJSsa/ZA+MlmctjC000eHP7bdXiQqkI/MVtADhS8c9E88LI5T4UHw==;EndpointSuffix=core.windows.net
default_run_id: default_run_id
prod_connection_string: DefaultEndpointsProtocol=https;AccountName=azrabsemshfe01;AccountKey=ui/sJShausvY8VCZzBP7E6BXJZR800s50rJ4XqhkRDYMJuiplFVjQjTVtz+kQ23cW7Itxv81bQedUvPQplzH3w==;EndpointSuffix=core.windows.net
test_connection_string: DefaultEndpointsProtocol=https;AccountName=vamshistorage;AccountKey=tSvnCJx/uMMhRb4ErjJgaBfK63W1vqMC/ha8sM33FxphbtCaIm6PSxEW8u7Ir54jzAMqMJFSU5jd76seldB/oQ==;EndpointSuffix=core.windows.net
#----------------------If read conf from mongo------------#
FOR_EACH_MONGO_CONFIG:
READ_FROM_MONGO: true
MONGO_URI: mongodb://192.168.0.210:27017
MONGO_DB: iLensAiPipeline
MONGO_RUN_COLL: runMetadata
MONGO_SITE_COLL: siteMetadata
import traceback
from scripts.common.config_parser import *
from scripts.common.constants import AddtoModelStoreConstants, ComponentExceptions
from scripts.common.logsetup import logger
from sklearn.ensemble import RandomForestRegressor
import pickle
from azure.storage.blob import BlobServiceClient
import os.path
from os import path
from datetime import date
import uuid
class AddtoModelStore:
def __init__(self, query):
self.query = query
self.container = self.query["container_name"]
self.connection_string = self.query["connection_string"]
self.blob_service_client = BlobServiceClient.from_connection_string(self.connection_string)
self.container_client = self.blob_service_client.get_container_client(self.container)
self.ilens_tag_hierarchy = self.query['ilens_tag_hierarchy']
self.component_input_dir = self.query['component_input_dir']
self.artifact_current_path = self.query['artifact_current_path']
self.artifact_archived_path = self.query['artifact_archived_path']
self.new_model_path = os.path.join(self.component_input_dir, os.listdir(self.component_input_dir)[0])
self.new_model_name = os.listdir(self.component_input_dir)[0]
self.meta_data_file = self.query['metadata_file']
self.update_current_model = {
"model_name": "randomforest",
"model_params": None,
"training_date": str(date.today()),
"framework": None,
"serializedObjectType": "pkl",
"model_fl_name": self.new_model_name
}
self.first_run = [
{
'id': self.ilens_tag_hierarchy,
'current_model': {
'model_name': 'randomforest',
'model_params': None,
'training_date': str(date.today()),
'framework': None,
'serializedObjectType': 'pkl',
'model_fl_name': self.new_model_name
},
'archived': [
]
}
]
def move_files(self, file_name):
try:
logger.info("started moving files from current to archived folder......")
source_blob_path = os.path.join(self.artifact_current_path, file_name)
dest_blob_path = os.path.join(self.artifact_archived_path, file_name)
source_blob = self.blob_service_client.get_blob_client(self.container, source_blob_path)
dest_blob = self.blob_service_client.get_blob_client(self.container, dest_blob_path)
dest_blob.start_copy_from_url(source_blob.url, requires_sync=True)
copy_properties = dest_blob.get_blob_properties().copy
logger.info("File at " + source_blob_path + " is moved to " + dest_blob_path)
if copy_properties.status != "success":
dest_blob.abort_copy(copy_properties.id)
raise Exception(
f"Unable to copy blob %s with status %s"
% (source_blob_path, copy_properties.status)
)
source_blob.delete_blob()
except Exception as e:
logger.info(traceback.format_exc())
raise Exception(e)
def check_meta_data(self):
return path.exists(self.query['metadata_file'])
def upload_to_blob(self):
try:
logger.info("started uploading data..............")
blob_client = self.blob_service_client.get_blob_client(container=self.container,
blob=os.path.join(self.artifact_current_path,
self.new_model_name))
with open(self.new_model_path, "rb") as data:
blob_client.upload_blob(data, overwrite=True)
logger.info("File at " + self.new_model_path + " is uploaded to " + os.path.join(self.artifact_current_path,
self.new_model_name))
return True
except Exception as e:
logger.info(traceback.format_exc())
raise Exception(e)
def update_meta_data(self, data):
try:
logger.info("Updating metadata json file...............")
for i in data:
if i['id'] == self.ilens_tag_hierarchy:
i['archived'].append(i['current_model'])
i['current_model'] = self.update_current_model
else:
pass
# print(data)
return data
except Exception as e:
logger.info(traceback.format_exc())
raise Exception(e)
def get_file_name_from_meta_data(self, data):
try:
logger.info("Reading metadata json file to get the old models name....")
for i in data:
if i['id'] == self.ilens_tag_hierarchy:
return i['current_model']['model_fl_name']
except Exception as e:
logger.info(traceback.format_exc())
raise Exception(e)
def run(self):
try:
checK_meta_file = self.check_meta_data()
# print(checK_meta_file)
if checK_meta_file:
logger.info("metadata json file is present.........")
try:
logger.info("Reading Json file")
with open(self.meta_data_file) as f:
data = json.load(f)
except:
logger.info("Failed reading Json File")
logger.info(traceback.format_exc())
old_model_name = self.get_file_name_from_meta_data(data)
self.move_files(old_model_name)
self.upload_to_blob()
data = self.update_meta_data(data)
json_object = json.dumps(data, indent=4)
with open(self.query['metadata_file'], "w") as outfile:
outfile.write(json_object)
else:
logger.info(
"metadata json file is not present so created metadata json file at " + self.query['metadata_file'])
if self.upload_to_blob():
json_object = json.dumps(self.first_run, indent=4)
with open(self.query['metadata_file'], "w") as outfile:
outfile.write(json_object)
else:
logger.info("AddToModelStore component failed...............")
return True
except Exception as e:
logger.info(traceback.format_exc())
raise Exception(e)
if __name__ == '__main__':
try:
obj = AddtoModelStore(config)
obj.run()
except:
logger.info("Model Object Component Failed")
logger.info(traceback.format_exc())
#!/usr/bin/env python
import os
import sys
import yaml
import json
from pymongo import MongoClient, DESCENDING
from scripts.common.constants import AddtoModelStoreConstants
config_path = os.path.join(os.getcwd(), "conf", "configuration.yml")
if os.path.exists(config_path):
sys.stderr.write("Reading config from --> {}".format(config_path))
sys.stderr.write("\n")
with open(config_path, 'r') as stream:
_config = yaml.safe_load(stream)
else:
sys.stderr.write("Configuration not found...")
sys.stderr.write("Exiting....")
sys.exit(1)
# uncomment for Testing
# os.environ['pipeline_id'] = "pipe1"
# os.environ['artifact_base_path'] = "/data/ai-models"
# os.environ['ilens_tag_hierarchy'] = 'site1'
# os.environ['container_name'] = 'test'
# ------------------------ Configurations -----------------------------------------------------------------------------
pipeline_id = os.environ.get('PIPELINE_ID', default="pipeline_313")
shared_mount_base_ai_job = os.environ.get("shared_mount_base_ai_job",
_config.get("SYSTEM_CONFIG", {}).get('shared_mount_base_ai_job'))
# read from $shared_mount_base_ai_job/$pipeline_id/run.config
# run_id_path = shared_mount_base_ai_job + "/" + pipeline_id + "/run_config.json"
run_id_path = os.path.join(shared_mount_base_ai_job, pipeline_id, "run_config.json")
metadata_path = os.path.join(shared_mount_base_ai_job, "models")
try:
sys.stderr.write("Checking for run id parameters at path " + run_id_path + "\n")
with open(run_id_path) as f:
run_id_param = json.load(f)
run_id = run_id_param['run_id']
except Exception as e:
sys.stderr.write("No run_config.json file is there so keeping run id as default_run_id " + "\n")
run_id = _config.get("SYSTEM_CONFIG", {}).get('default_run_id')
data_store = os.environ.get('data_store', default="Azure")
container_name = os.environ.get('container_name', default=AddtoModelStoreConstants.CONTAINER)
# change prod to test for testing
connection_string = os.environ.get('connection_string', _config.get("SYSTEM_CONFIG", {}).get('test_connection_string'))
artifact_base_path = os.environ.get('artifact_base_path')
artifact_current_path = os.path.join(artifact_base_path, "current")
artifact_archived_path = os.path.join(artifact_base_path, "archived")
# Component Parameter:Read from $shared_mount_base_ai_job/$pipeline_id/$run_id/GetDataFromStore/param.json
# component_parameter_path = shared_mount_base_ai_job + "/" + pipeline_id + "/" + run_id + "/" +
# GetDataFromStoreConstants.COMPONENT_NAME + "/param.json"
component_parameter_path = os.path.join(shared_mount_base_ai_job, pipeline_id, run_id,
AddtoModelStoreConstants.COMPONENT_NAME, "param.json")
# shared_mount_base_ai_job/$pipeline_id/$run_id/AddtoModelStore/input/
component_input_dir = os.path.join(shared_mount_base_ai_job, pipeline_id, run_id,
AddtoModelStoreConstants.COMPONENT_NAME, "input")
try:
sys.stderr.write("Checking for component parameters at path " + component_parameter_path + "\n")
with open(component_parameter_path) as f:
component_parameter = json.load(f)
ilens_tag_hierarchy = component_parameter['ilens_tag_hierarchy']
except Exception as e:
sys.stderr.write("No param.json file so trying to take from env" + "\n")
try:
ilens_tag_hierarchy = os.environ.get('ilens_tag_hierarchy')
except Exception as e:
raise Exception(e)
BASE_LOG_PATH = os.path.join(os.getcwd(), "logs")
if not os.path.exists(os.path.join(os.getcwd(), 'logs')):
os.mkdir(os.path.join(os.getcwd(), 'logs'))
LOG_LEVEL = os.environ.get("LOG_LEVEL", _config.get('SERVICE_CONFIG', {}).get("LOG_LEVEL", "INFO")).upper()
LOG_HANDLER_NAME = _config.get('SERVICE_CONFIG', {}).get("LOG_HANDLER_NAME", "AddtoModelStore")
ENABLE_LOGSTASH_LOG = os.environ.get("ENABLE_LOGSTASH_LOG", 'False').lower()
LOGSTASH_HOST = _config.get('SERVICE_CONFIG', {}).get('LOGSTASH_HOST')
LOGSTASH_PORT = str(_config.get('SERVICE_CONFIG', {}).get('LOGSTASH_PORT'))
# os.environ['azure_file_path'] = '/data/model/tested/test1.pkl'
# os.environ['local_file_path'] = r'E:\iLens-AI\azure-file-download\StandardScaler.pkl'
config = {
'pipeline_id': pipeline_id,
'run_id': run_id,
'shared_mount_base_ai_job': shared_mount_base_ai_job,
'data_store': data_store.lower(),
'container_name': container_name,
'artifact_base_path': artifact_base_path,
'ilens_tag_hierarchy': ilens_tag_hierarchy,
'connection_string': connection_string,
'component_input_dir': component_input_dir,
'artifact_current_path': artifact_current_path,
'artifact_archived_path': artifact_archived_path,
'metadata_path': metadata_path,
'metadata_file': os.path.join(metadata_path, 'metadata.json')
}
if not os.path.exists(config['shared_mount_base_ai_job']):
sys.stderr.write("Shared path does not exist!" + "\n")
sys.stderr.write("Creating path --> {}".format(config['shared_mount_base_ai_job'] + "\n"))
os.makedirs(config['shared_mount_base_ai_job'])
if not os.path.exists(config['metadata_path']):
sys.stderr.write("MetaData path does not exist!" + "\n")
sys.stderr.write("Creating path --> {}".format(config['metadata_path'] + "\n"))
os.makedirs(config['metadata_path'])
#!/usr/bin/env python
class AddtoModelStoreConstants:
COMPONENT_NAME = "AddtoModelStore"
CONTAINER = "ilensqa"
HTTP = "http://"
LOG_VAR_MESSAGE = "\n" + "#" * 25 + "\n" + "{}" + "\n" + "#" * 25 + "\n" + "{}\n"
class ComponentExceptions:
INVALID_AZURE_FILE_PATH_EXCEPTION = "AZURE PATH ERROR"
INVALID_LOCAL_FILE_PATH_EXCEPTION = "No File in the local path"
INVALID_ARTIFACT_BASE_PATH_EXCEPTION = "Artifact base path value is missing"
INVALID_AZURE_FILE_NAME_EXCEPTION = "Artifact name is missing"
INVALID_CONTAINER_NAME = "Container name is missing"
import os
import logging
from logging.handlers import RotatingFileHandler
from logstash_async.handler import AsynchronousLogstashHandler
from scripts.common.config_parser import LOG_LEVEL, LOG_HANDLER_NAME, BASE_LOG_PATH
from scripts.common.config_parser import LOG_LEVEL, LOG_HANDLER_NAME, BASE_LOG_PATH, LOGSTASH_HOST, LOGSTASH_PORT, ENABLE_LOGSTASH_LOG
DEFAULT_FORMAT = '%(asctime)s %(levelname)5s %(name)s %(message)s'
DEBUG_FORMAT = '%(asctime)s %(levelname)5s %(name)s [%(threadName)5s:%(filename)5s:%(funcName)5s():%(lineno)s] %(' \
'message)s '
EXTRA = {}
FORMATTER = DEFAULT_FORMAT
if LOG_LEVEL.strip() == "DEBUG":
FORMATTER = DEBUG_FORMAT
logging.trace = logging.DEBUG - 5
logging.addLevelName(logging.DEBUG - 5, 'TRACE')
class ILensLogger(logging.getLoggerClass()):
def __init__(self, name):
super().__init__(name)
def trace(self, msg, *args, **kwargs):
if self.isEnabledFor(logging.trace):
self._log(logging.trace, msg, args, **kwargs)
def get_logger(log_handler_name):
"""
Purpose : To create logger .
:param log_handler_name: Name of the log handler.
:return: logger object.
"""
log_path = os.path.join(BASE_LOG_PATH, log_handler_name + ".log")
logging.setLoggerClass(ILensLogger)
_logger = logging.getLogger(log_handler_name)
_logger.setLevel(LOG_LEVEL.strip().upper())
log_handler = logging.StreamHandler()
log_handler.setLevel(LOG_LEVEL)
formatter = logging.Formatter(FORMATTER)
log_handler.setFormatter(formatter)
handler = RotatingFileHandler(log_path, maxBytes=10485760,
backupCount=5)
handler.setFormatter(formatter)
_logger.addHandler(log_handler)
_logger.addHandler(handler)
if ENABLE_LOGSTASH_LOG == 'true' and LOGSTASH_PORT is not None and LOGSTASH_HOST is not None and LOGSTASH_PORT.isdigit():
_logger.addHandler(AsynchronousLogstashHandler(LOGSTASH_HOST, int(LOGSTASH_PORT), database_path=None))
return _logger
logger = get_logger(LOG_HANDLER_NAME)
[
{
"id": "site1",
"current_model": {
"model_name": "randomforest",
"model_params": null,
"training_date": "2021-03-10",
"framework": null,
"serializedObjectType": "pkl",
"model_fl_name": "model1.pkl"
},
"archived": [
{
"model_name": "randomforest",
"model_params": null,
"training_date": "2021-03-10",
"framework": null,
"serializedObjectType": "pkl",
"model_fl_name": "model.pkl"
},
{
"model_name": "randomforest",
"model_params": null,
"training_date": "2021-03-10",
"framework": null,
"serializedObjectType": "pkl",
"model_fl_name": "model1.pkl"
}
]
}
]
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment