Commit 9fe429a7 authored by dasharatha.vamshi's avatar dasharatha.vamshi

az down

parent 55f86604
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.7" project-jdk-type="Python SDK" /> <component name="ProjectRootManager" version="2" project-jdk-name="Python 3.7 (welspun-defects) (2)" project-jdk-type="Python SDK" />
</project> </project>
\ No newline at end of file
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
<module type="PYTHON_MODULE" version="4"> <module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager"> <component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" /> <content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" /> <orderEntry type="jdk" jdkName="Python 3.7 (welspun-defects) (2)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" /> <orderEntry type="sourceFolder" forTests="false" />
</component> </component>
<component name="TestRunnerService"> <component name="TestRunnerService">
......
...@@ -5,3 +5,10 @@ SERVICE_CONFIG: ...@@ -5,3 +5,10 @@ SERVICE_CONFIG:
LOGSTASH_HOST: 192.168.1.47 LOGSTASH_HOST: 192.168.1.47
LOGSTASH_PORT: 5000 LOGSTASH_PORT: 5000
#----------------------If read conf from mongo------------#
FOR_EACH_MONGO_CONFIG:
READ_FROM_MONGO: true
MONGO_URI: mongodb://192.168.0.210:27017
MONGO_DB: iLensAiPipeline
MONGO_RUN_COLL: runMetadata
MONGO_SITE_COLL: siteMetadata
2021-02-25 15:00:45,275 INFO PreProccessComponent {'shared_volume': '/mnt/ilens', 'json_path': '/mnt/ilens/response.json', 'fillna_method': 'ffill', 'standard_scalar_path': '', 'preprocessed_pkl_filename': 'preprocessed_X.pkl', 'date_pkl_filename': 'date_X.pkl', 'forcast_data_csv_filename': 'forcast_data.csv'}
2021-02-25 15:00:45,277 INFO PreProccessComponent E:\iLens-AI\Git\preprocess-data
2021-02-25 15:00:45,277 INFO PreProccessComponent ['.git', '.idea', 'conf', 'Dockerfile', 'logs', 'main.py', 'README.md', 'requirements.txt', 'scripts', 'StandardScaler.pkl']
2021-02-25 15:00:45,277 INFO PreProccessComponent Reading Json file
2021-02-25 15:00:45,311 INFO PreProccessComponent Parsing the pickle file at location E:\iLens-AI\Git\preprocess-data\StandardScaler.pkl ......
2021-02-25 15:00:45,331 INFO PreProccessComponent Writing preprocessed data to pkl file at /mnt/ilens\preprocessed_X.pkl
2021-02-25 15:00:45,332 INFO PreProccessComponent Writing date data to pkl file at /mnt/ilens\date_X.pkl
2021-02-25 15:00:45,333 INFO PreProccessComponent Writing forcast data to csv file at /mnt/ilens\forcast_data.csv
2021-02-25 15:00:45,341 INFO PreProccessComponent Component executed Successfully
2021-02-25 15:02:43,749 INFO PreProccessComponent {'shared_volume': 'test', 'json_path': '/mnt/ilens/response.json', 'fillna_method': 'ffill', 'standard_scalar_path': '', 'preprocessed_pkl_filename': 'preprocessed_X.pkl', 'date_pkl_filename': 'date_X.pkl', 'forcast_data_csv_filename': 'forcast_data.csv'}
2021-02-25 15:02:43,750 INFO PreProccessComponent E:\iLens-AI\Git\preprocess-data
2021-02-25 15:02:43,751 INFO PreProccessComponent ['.git', '.idea', 'conf', 'Dockerfile', 'logs', 'main.py', 'README.md', 'requirements.txt', 'scripts', 'StandardScaler.pkl', 'test']
2021-02-25 15:02:43,751 INFO PreProccessComponent Reading Json file
2021-02-25 15:02:43,760 INFO PreProccessComponent Parsing the pickle file at location E:\iLens-AI\Git\preprocess-data\StandardScaler.pkl ......
2021-02-25 15:02:43,763 INFO PreProccessComponent Writing preprocessed data to pkl file at test\preprocessed_X.pkl
2021-02-25 15:02:43,764 INFO PreProccessComponent Writing date data to pkl file at test\date_X.pkl
2021-02-25 15:02:43,765 INFO PreProccessComponent Writing forcast data to csv file at test\forcast_data.csv
2021-02-25 15:02:43,770 INFO PreProccessComponent Component executed Successfully
...@@ -6,6 +6,24 @@ from sklearn.preprocessing import StandardScaler ...@@ -6,6 +6,24 @@ from sklearn.preprocessing import StandardScaler
from scripts.common.config_parser import * from scripts.common.config_parser import *
from scripts.common.constants import PreProcessConstants, ComponentExceptions from scripts.common.constants import PreProcessConstants, ComponentExceptions
from scripts.common.logsetup import logger from scripts.common.logsetup import logger
import os
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
def azure_download(blob_path, download_file_path):
try:
blob_service_client = BlobServiceClient.from_connection_string(PreProcessConstants.AZURE_CONTAINER_STRING)
blob_client = blob_service_client.get_blob_client(container=PreProcessConstants.AZURE_BLOB_CONTAINER,
blob=blob_path)
# download_file_path = os.path.join(local_path, local_file_name)
logger.info("Downloading to path " + download_file_path)
with open(download_file_path, "wb") as download_file:
download_file.write(blob_client.download_blob().readall())
return True
except:
logger.info("Falied to download files from azure")
logger.info(traceback.format_exc())
class PreProcessComponent: class PreProcessComponent:
...@@ -55,7 +73,8 @@ class PreProcessComponent: ...@@ -55,7 +73,8 @@ class PreProcessComponent:
if __name__ == '__main__': if __name__ == '__main__':
logger.info(config) tag_dir = tag_path
# logger.info(config)
# Checking shared Volume # Checking shared Volume
if PreProcessConstants.SHARED_VOLUME in config.keys(): if PreProcessConstants.SHARED_VOLUME in config.keys():
shared_volume = config[PreProcessConstants.SHARED_VOLUME] shared_volume = config[PreProcessConstants.SHARED_VOLUME]
...@@ -71,10 +90,17 @@ if __name__ == '__main__': ...@@ -71,10 +90,17 @@ if __name__ == '__main__':
# Checking pickle path for standard scalar # Checking pickle path for standard scalar
if PreProcessConstants.STANDARD_SCALAR_PATH in config.keys(): if PreProcessConstants.STANDARD_SCALAR_PATH in config.keys():
standard_scalar_path = config[PreProcessConstants.STANDARD_SCALAR_PATH] standard_scalar_path = config[PreProcessConstants.STANDARD_SCALAR_PATH]
if len(standard_scalar_path) < 5: try:
logger.info(os.getcwd()) my_path = PreProcessConstants.AZURE_FILE_PATH + tag_dir + standard_scalar_path.split('/')[-1]
logger.info(os.listdir(os.getcwd())) logger.info("Azure file path " + my_path)
standard_scalar_path = os.path.join(os.getcwd(), 'StandardScaler.pkl') x = azure_download(my_path, standard_scalar_path)
if x:
logger.info("Downloaded pkl file from azure")
else:
logger.info("Failed to Download")
except:
logger.info("Failed to read pickle file")
logger.info(traceback.format_exc())
else: else:
pass pass
else: else:
...@@ -87,18 +113,20 @@ if __name__ == '__main__': ...@@ -87,18 +113,20 @@ if __name__ == '__main__':
raise Exception(ComponentExceptions.INVALID_Fillna_Method) raise Exception(ComponentExceptions.INVALID_Fillna_Method)
obj = PreProcessComponent() obj = PreProcessComponent()
data, date_pkl, forcast_df = obj.preprocess(json_path, standard_scalar_path, fillna_method) data, date_pkl, forcast_df = obj.preprocess(json_path, standard_scalar_path, fillna_method)
logger.info("Got the data writing it to pickle files and csv files (preprocessed_X.pkl and date_X.pkl and forcast_data.csv)") # logger.info("Got the data writing it to pickle files and csv files (preprocessed_X.pkl and date_X.pkl and forcast_data.csv)")
try: try:
logger.info("Writing preprocessed data to pkl file at " + os.path.join(shared_volume, 'preprocessed_X.pkl')) logger.info("Writing preprocessed data to pkl file at " + os.path.join(shared_volume,
output = open(os.path.join(shared_volume, 'preprocessed_X.pkl'), 'wb') config['preprocessed_pkl_filename']))
output = open(os.path.join(shared_volume, config['preprocessed_pkl_filename']), 'wb')
pickle.dump(data, output) pickle.dump(data, output)
output.close() output.close()
logger.info("Writing date data to pkl file at " + os.path.join(shared_volume, 'date_X.pkl')) logger.info("Writing date data to pkl file at " + os.path.join(shared_volume, config['date_pkl_filename']))
output1 = open(os.path.join(shared_volume, 'date_X.pkl'), 'wb') output1 = open(os.path.join(shared_volume, config['date_pkl_filename']), 'wb')
pickle.dump(date_pkl, output1) pickle.dump(date_pkl, output1)
output1.close() output1.close()
logger.info("Writing forcast data to csv file at " + os.path.join(shared_volume, 'forcast_data.csv')) logger.info(
output2 = os.path.join(shared_volume, 'forcast_data.csv') "Writing forcast data to csv file at " + os.path.join(shared_volume, config['forcast_data_csv_filename']))
output2 = os.path.join(shared_volume, config['forcast_data_csv_filename'])
forcast_df.to_csv(output2, index=False) forcast_df.to_csv(output2, index=False)
logger.info("Component executed Successfully") logger.info("Component executed Successfully")
except Exception as e: except Exception as e:
......
...@@ -3,3 +3,5 @@ pyyaml~=5.3.1 ...@@ -3,3 +3,5 @@ pyyaml~=5.3.1
python-logstash-async python-logstash-async
pandas~=1.1.1 pandas~=1.1.1
sklearn sklearn
pymongo
azure-storage-blob
...@@ -3,6 +3,8 @@ import os ...@@ -3,6 +3,8 @@ import os
import sys import sys
import yaml import yaml
import json import json
from pymongo import MongoClient, DESCENDING
from scripts.common.constants import PreProcessConstants
config_path = os.path.join(os.getcwd(), "conf", "configuration.yml") config_path = os.path.join(os.getcwd(), "conf", "configuration.yml")
if os.path.exists(config_path): if os.path.exists(config_path):
...@@ -16,6 +18,65 @@ else: ...@@ -16,6 +18,65 @@ else:
sys.stderr.write("Exiting....") sys.stderr.write("Exiting....")
sys.exit(1) sys.exit(1)
# ----------------- Mongo -----------------------------------------------------------------------
READ_FROM_MONGO = _config.get("FOR_EACH_MONGO_CONFIG", {}).get('READ_FROM_MONGO', False)
COMPONENT_NAME = os.environ.get("type", PreProcessConstants.COMPONENT_NAME)
pipeline_id = os.environ.get('PIPELINE_ID', default="pipeline_313")
if READ_FROM_MONGO:
MONGO_URI = os.environ.get("MONGO_URI", _config.get("FOR_EACH_MONGO_CONFIG", {}).get('MONGO_URI'))
MONGO_DB = os.environ.get("MONGO_DB", _config.get("FOR_EACH_MONGO_CONFIG", {}).get('MONGO_DB'))
MONGO_RUN_COLL = _config.get("FOR_EACH_MONGO_CONFIG", {}).get('MONGO_RUN_COLL')
MONGO_SITE_COLL = _config.get("FOR_EACH_MONGO_CONFIG", {}).get('MONGO_SITE_COLL')
db = MongoClient(MONGO_URI)[MONGO_DB]
get_run_info = db[MONGO_RUN_COLL].find_one({}, sort=[("run_start_time", DESCENDING)])
if get_run_info is None:
raise Exception('No run info found')
if not get_run_info['job_metadata']['in_progress']:
raise Exception('No job in progress')
_tag_hierarchy = get_run_info['job_metadata']['in_progress'][0]
sys.stdout.write(f"_tag_hierarchy --> {_tag_hierarchy}\n")
_tag_hierarchy = _tag_hierarchy.split('$')
tag_path = _tag_hierarchy
site_id = _tag_hierarchy[0]
dept_id = _tag_hierarchy[1]
line_id = _tag_hierarchy[2]
equipment_id = _tag_hierarchy[3]
tag_id = _tag_hierarchy[4]
get_conf = db[MONGO_SITE_COLL].find_one({'site_id': site_id})
dept_dict = list(filter(lambda x: x['dept_id'] == dept_id, get_conf['dept']))
line_dict = list(filter(lambda x: x['line_id'] == line_id, dept_dict[0]['line']))
equipment_dict = list(filter(lambda x: x['equipment_id'] == equipment_id, line_dict[0]['equipment']))
tag_dict = list(filter(lambda x: x['tag_id'] == tag_id, equipment_dict[0]['tag']))
if len(tag_dict) != 1:
raise Exception(f"Tag details not found for hierarchy {_tag_hierarchy}")
pipeline_conf = list(filter(lambda x: x['pipeline_id'] == pipeline_id, tag_dict[0]['pipeline_config']))
if len(pipeline_conf) != 1:
raise Exception(
f"Tag details not found for hierarchy {'$'.join(_tag_hierarchy)} not found for pipeline {pipeline_id}")
component_conf = list(
filter(lambda x: x['component_type'] == COMPONENT_NAME, pipeline_conf[0]['component_config']))
if len(component_conf) == 0:
raise Exception(f"Configuration for component {COMPONENT_NAME} not found")
component_conf = component_conf[0]['env_variables']
for each_key, each_value in component_conf.items():
if not isinstance(each_value, str):
component_conf[each_key] = json.dumps(each_value)
os.environ.update(component_conf)
# ---------------- END MONGO -----------------------------------------------------------------
BASE_LOG_PATH = os.path.join(os.getcwd(), "logs") BASE_LOG_PATH = os.path.join(os.getcwd(), "logs")
if not os.path.exists(os.path.join(os.getcwd(), 'logs')): if not os.path.exists(os.path.join(os.getcwd(), 'logs')):
os.mkdir(os.path.join(os.getcwd(), 'logs')) os.mkdir(os.path.join(os.getcwd(), 'logs'))
...@@ -37,6 +98,9 @@ config = { ...@@ -37,6 +98,9 @@ config = {
"json_path": os.environ.get("json_path"), "json_path": os.environ.get("json_path"),
"fillna_method": os.environ.get("fillna_method", default="ffill"), "fillna_method": os.environ.get("fillna_method", default="ffill"),
"standard_scalar_path": os.environ.get("standard_scalar_path", default=str(PKL_path)), "standard_scalar_path": os.environ.get("standard_scalar_path", default=str(PKL_path)),
"preprocessed_pkl_filename": os.environ.get("preprocessed_pkl_filename"),
"date_pkl_filename": os.environ.get("date_pkl_filename"),
"forcast_data_csv_filename": os.environ.get("forcast_data_csv_filename")
} }
if not os.path.exists(config['shared_volume']): if not os.path.exists(config['shared_volume']):
sys.stderr.write("Shared path does not exist!") sys.stderr.write("Shared path does not exist!")
......
#!/usr/bin/env python #!/usr/bin/env python
class PreProcessConstants: class PreProcessConstants:
AZURE_CONTAINER_STRING = "DefaultEndpointsProtocol=https;AccountName=azrblobfslstg;AccountKey=GCLzpugR7VxyzN0HU2jf8V7QDNme1fECYURBlpfYWZJIRQqqXghcz3Gkq7X6lWSHA2OToMyMYEreCbqvTfKryA==;EndpointSuffix=core.windows.net"
AZURE_BLOB_CONTAINER = "jackieserverlogs"
AZURE_FILE_PATH = "/data/model/"
COMPONENT_NAME = "Preprocess_module_fe"
SHARED_VOLUME = "shared_volume" SHARED_VOLUME = "shared_volume"
JSON_PATH = "json_path" JSON_PATH = "json_path"
FILLNA_METHOD = "fillna_method" FILLNA_METHOD = "fillna_method"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment