Commit dd8ffa4e authored by harshavardhan.c's avatar harshavardhan.c

Add: New Structure format.

parent 0e0ffdd2
GRAPH_HOST=192.168.0.220
GRAPH_PORT=7687
GRAPH_USERNAME=neo4j
GRAPH_PASSWORD=root
[SERVICE]
port = 3973
host = 0.0.0.0
[GRAPH_DB]
GRAPH_HOST=$GRAPH_HOST
GRAPH_PORT=$GRAPH_PORT
GRAPH_USERNAME=$GRAPH_USERNAME
GRAPH_PASSWORD=$GRAPH_PASSWORD
DB_TYPE=$DB_TYPE
[LOGGING]
level=$LOG_LEVEL
traceback=true
\ No newline at end of file
from scripts.utils.graph_utility import Neo4jConnection if __name__ == '__main__':
from dotenv import load_dotenv
conn = Neo4jConnection(uri="neo4j+s://932e0faee63c9427e82b046bc4ab6520.neo4jsandbox.com:7687", load_dotenv()
user="madhuri", from scripts.core.engine import GraphTraversal
pwd="Madhuri@71") from scripts.db import get_db
from scripts.schemas import GraphData
def ingest_data_handler(graph_data: GraphData):
try:
node_list = []
relations = []
for node_type, node_obj in graph_data.__root__.items():
db = get_db()
print(node_list)
except Exception as e:
print(e.args)
input_data = {
"Node1": {
"node_id": "270120022",
"action": "add",
"node_name": "Event Name",
"node_label": "Event Label",
"properties": {
"name": "Event 1",
"external_data_source": "mongo",
"external_data_id": "101",
"event_info": "847263",
"received_date": 643308200,
"completed_date": 1643394600
},
"edges": [],
},
"Node2": {
"node_id": "270120023",
"node_label": "Event Label",
"action": "add",
"node_name": "asjkfafjk",
"node_properties": {
"name": "Event 2",
"external_data_source": "mongo",
"external_data_id": "101",
"event_info": "847263",
"received_date": 643308200,
"completed_date": 1643394600
},
"edges": [{
"action": "add",
"rel_name": "causes",
"bind_to": "Node1",
"bind_id": "BSCH270120022"
}]
}
}
gt_obj = GraphTraversal(db=get_db())
GraphData(__root__=input_data)
gt_obj.ingest_data_handler(graph_data=GraphData(__root__=input_data))
...@@ -19,7 +19,7 @@ class EnvInterpolation(BasicInterpolation): ...@@ -19,7 +19,7 @@ class EnvInterpolation(BasicInterpolation):
try: try:
config = ConfigParser(interpolation=EnvInterpolation()) config = ConfigParser(interpolation=EnvInterpolation())
config.read(f"conf/application.conf") config.read("conf/application.conf")
except Exception as e: except Exception as e:
print(f"Error while loading the config: {e}") print(f"Error while loading the config: {e}")
print("Failed to Load Configuration. Exiting!!!") print("Failed to Load Configuration. Exiting!!!")
...@@ -33,13 +33,13 @@ class Service: ...@@ -33,13 +33,13 @@ class Service:
class DBConf: class DBConf:
DB_URI = config.get('SQL_DB', 'uri') GRAPH_HOST = config.get('GRAPH_DB', 'GRAPH_HOST')
if not DB_URI: GRAPH_PORT = config.get('GRAPH_DB', 'GRAPH_PORT')
print("Error, environment variable DB_URI not set")
sys.exit(1)
GRAPH_URI = config.get('GRAPH_DB', 'GRAPH_URI')
GRAPH_USERNAME = config.get('GRAPH_DB', 'GRAPH_USERNAME') GRAPH_USERNAME = config.get('GRAPH_DB', 'GRAPH_USERNAME')
GRAPH_PASSWORD = config.get('GRAPH_CONN', 'GRAPH_PASSWORD') GRAPH_PASSWORD = config.get('GRAPH_DB', 'GRAPH_PASSWORD')
GRAPH_DB_TYPE = config.get('GRAPH_DB', 'DB_TYPE')
if not GRAPH_DB_TYPE:
GRAPH_DB_TYPE = "neo4j"
class Logging: class Logging:
......
from typing import List
from scripts.db.models import NodeCreationSchema, RelationShipMapper
from scripts.logging import logger
from scripts.schemas import GraphData, NodeSchemaValidation
class GraphTraversal:
def __init__(self, db):
"""
Graph Tracking Handler class interacts with the Graph Traversal class
that fetches all the nodes and relations
based on the input received from the user
"""
self.db = db
def ingest_data_handler(self, graph_data: GraphData):
try:
node_list = []
relations = []
for node_type, node_obj in graph_data.__root__.items():
process_dict = NodeSchemaValidation(**node_obj.dict(), label=node_type)
# if node_obj.action == NodeActionOptions.delete:
# pass
# else:
self.save_single_node(
node_data=NodeCreationSchema(name=process_dict.name, id=f'{process_dict.node_id}',
properties=process_dict.properties,
labels=process_dict.label))
except Exception as e:
logger.exception(f'Exception occurred while performing data insert/delete functionality {e.args}')
raise
def save_single_node(self, node_data: NodeCreationSchema, upsert=False):
try:
res = self.load_data_from_graph({"id": node_data.id})
if upsert and existing_response.id:
res = self.db.save_node_with_id(node_data, _id=existing_response.id)
else:
res = self.db.save_node(node_data)
print(res)
except Exception as e:
logger.exception(f'Exception occurred while creating node for the graph {e.args}')
raise
def save_multiple_nodes(self, node_data: List[NodeCreationSchema]):
try:
res = self.db.save_nodes(nodes=node_data)
except Exception as e:
logger.exception(f'Exception occurred while creating node for the graph {e.args}')
raise
def save_relationship(self, rel_data: RelationShipMapper):
try:
res = self.db.save_relationship_with_id(rel_data)
if not res:
self.db.save_relationship(rel_data)
except Exception as e:
logger.exception(f'Exception occurred while creating node for the graph {e.args}')
raise
def save_multiple_relationship(self, rel_data: List[RelationShipMapper]):
try:
res = self.db.save_relationships(relationships=rel_data)
except Exception as e:
logger.exception(f'Exception occurred while creating node for the graph {e.args}')
raise
def load_data_from_graph(self, **kwargs):
try:
res = self.db.load_node_with_id(kwargs)
print(res)
except Exception as e:
logger.exception(f'Exception Occurred while fetching the node data by {key_name} & {key_value} {e.args}')
raise
from scripts.schemas import GraphData
class GraphTrackingHandler:
def __init__(self):
"""
Graph Tracking Handler class interacts with the Graph Traversal class
that fetches all the nodes and relations
based on the input received from the user
"""
...
def ingest_data_handler(self, graph_data: GraphData) -> str:
"""
"""
return self.graph_traversal.run_gremlin_queries(graph_data=graph_data)
def graph_traverse_handler(self, graph_data_request: GenealogyDataRequest) -> str:
"""
"""
return self.graph_traversal.get_genealogy(genealogy_data=graph_data_request)
from neo4j import GraphDatabase from gqlalchemy import Neo4j, Memgraph
from scripts.config import DBConf from scripts.config import DBConf
from scripts.logging import logger
uri = DBConf.GRAPH_URI host = DBConf.GRAPH_HOST
port = DBConf.GRAPH_PORT
username = DBConf.GRAPH_USERNAME username = DBConf.GRAPH_USERNAME
password = DBConf.GRAPH_PASSWORD password = DBConf.GRAPH_PASSWORD
neo4j_driver = GraphDatabase.driver(uri, auth=(username, password)) db_type = DBConf.GRAPH_DB_TYPE
def db_func_mapper(conn_type):
func_mapper = {
"neo4j": Neo4j,
"memgraph": Memgraph
}
return func_mapper.get(conn_type)
def get_db():
connector = db_func_mapper(db_type)
if connector is None:
logger.exception(f"Connection Details not found for the selected database type -- {db_type}")
return connector(host=host, username=username, password=password, port=port)
from typing import Optional
from gqlalchemy import Node, Relationship
from pydantic import Field
class NodeCreationSchema(Node):
id: str = Field()
name: str = Field()
class RelationShipMapper(Relationship, type="Relation_Mapper"):
date: Optional[str] = Field()
class Config:
allow_population_by_field_name = True
from enum import Enum
from typing import Dict, Optional, List
from pydantic import BaseModel, Field
class NodeActionOptions(str, Enum):
add = "add"
delete = "delete"
update = "update"
class Edges(BaseModel):
node_id: Optional[str]
action: NodeActionOptions
rel_name: str
bind_to: str
bind_id: str
class Config:
allow_population_by_field_name = True
class InputRequestSchema(BaseModel):
node_id: str
node_name: str
action: NodeActionOptions
node_properties: Optional[Dict] = {}
edges: Optional[List[Edges]]
class Config:
allow_population_by_field_name = True
class NodeSchemaValidation(BaseModel):
node_id: str = Field(alias="node_id")
name: str = Field(alias="node_name")
properties: Optional[Dict] = Field(alias="node_properties", default={})
label: str
class Config:
allow_population_by_field_name = True
class GraphData(BaseModel):
__root__: Dict[str, InputRequestSchema]
class Config:
allow_population_by_field_name = True
schema_extra = {
"example": {
"Node1": {
"id": "BSCH270120022",
"action": "add",
"name": "Event Name",
"properties": {
"name": "Event 1",
"external_data_source": "mongo",
"external_data_id": "101",
"event_info": "847263",
"received_date": 643308200,
"completed_date": 1643394600
},
"edges": [],
},
"Node2": {
"id": "BSCH270120023",
"action": "add",
"name": "Event Name",
"properties": {
"name": "Event 2",
"external_data_source": "mongo",
"external_data_id": "101",
"event_info": "847263",
"received_date": 643308200,
"completed_date": 1643394600
},
"edges": [{
"action": "add",
"rel_name": "causes",
"bind_to": "Node1",
"bind_id": "BSCH270120022"
}]
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment