Commit 55925c98 authored by harshavardhan.c's avatar harshavardhan.c

Add: New Structure format.

parent dd8ffa4e
......@@ -24,9 +24,10 @@ input_data = {
"Node1": {
"node_id": "270120022",
"action": "add",
"node_name": "Event Name",
"node_name": "madhuri topu",
"project_id": "madhuri's project",
"node_label": "Event Label",
"properties": {
"node_properties": {
"name": "Event 1",
"external_data_source": "mongo",
"external_data_id": "101",
......@@ -35,10 +36,12 @@ input_data = {
"completed_date": 1643394600
},
"edges": [],
"tz": "Asia/Kolkata"
},
"Node2": {
"node_id": "270120023",
"node_label": "Event Label",
"project_id": "madhuri's project",
"action": "add",
"node_name": "asjkfafjk",
"node_properties": {
......@@ -51,7 +54,8 @@ input_data = {
},
"edges": [{
"action": "add",
"rel_name": "causes",
"rel_name": "madhuri_don_boscho",
"new_rel_name": "janu_s_UI",
"bind_to": "Node1",
"bind_id": "BSCH270120022"
}]
......
class APIEndPoints:
graph_base = "/graph"
api_get = "/get"
api_fetch = "/fetch"
api_update = '/update'
api_insert = '/insert'
api_graph_link = '/graph_link'
api_create = '/create'
graph_traverse = "/traverse"
ingest_graph_data = "/ingest"
from typing import List
from scripts.db.models import NodeCreationSchema, RelationShipMapper
from scripts.db.graphdb.neo4j import Neo4jHandler
from scripts.db.models import NodePropertiesSchema, RelationShipMapper
from scripts.logging import logger
from scripts.schemas import GraphData, NodeSchemaValidation
from scripts.schemas import GraphData, NodeSchemaValidation, NodeActionOptions
from scripts.utils.graph_utility import GraphUtility
class GraphTraversal:
......@@ -13,66 +13,53 @@ class GraphTraversal:
based on the input received from the user
"""
self.db = db
self.graph_util = GraphUtility(db=db)
def ingest_data_handler(self, graph_data: GraphData):
try:
node_list = []
relations = []
nodes_map = {}
edges_info = {}
for node_type, node_obj in graph_data.__root__.items():
process_dict = NodeSchemaValidation(**node_obj.dict(), label=node_type)
# if node_obj.action == NodeActionOptions.delete:
# pass
# else:
self.save_single_node(
node_data=NodeCreationSchema(name=process_dict.name, id=f'{process_dict.node_id}',
properties=process_dict.properties,
labels=process_dict.label))
except Exception as e:
logger.exception(f'Exception occurred while performing data insert/delete functionality {e.args}')
raise
def save_single_node(self, node_data: NodeCreationSchema, upsert=False):
try:
res = self.load_data_from_graph({"id": node_data.id})
if upsert and existing_response.id:
res = self.db.save_node_with_id(node_data, _id=existing_response.id)
process_obj = NodeSchemaValidation(**node_obj.dict(), label=node_type)
if node_obj.action == NodeActionOptions.delete:
...
else:
res = self.db.save_node(node_data)
print(res)
except Exception as e:
logger.exception(f'Exception occurred while creating node for the graph {e.args}')
raise
nodes_map, edges_info = self.perform_save_node_action(node_obj=process_obj, nodes_map=nodes_map,
edges_info=edges_info, node_type=node_type)
if edges_info:
for des_edge, edges in edges_info.items():
for edge in edges:
if nodes_map.get(des_edge) and nodes_map.get(edge.bind_to):
self.graph_util.save_relationship(
rel_data=RelationShipMapper(_start_node_id=nodes_map[edge.bind_to]._id,
_end_node_id=nodes_map[des_edge]._id,
_type=edge.rel_name,
**nodes_map[des_edge].dict(),
), new_rel_name=edge.new_rel_name)
def save_multiple_nodes(self, node_data: List[NodeCreationSchema]):
try:
res = self.db.save_nodes(nodes=node_data)
except Exception as e:
logger.exception(f'Exception occurred while creating node for the graph {e.args}')
raise
def save_relationship(self, rel_data: RelationShipMapper):
try:
res = self.db.save_relationship_with_id(rel_data)
if not res:
self.db.save_relationship(rel_data)
except Exception as e:
logger.exception(f'Exception occurred while creating node for the graph {e.args}')
raise
logger.exception(f'{e.args}')
def save_multiple_relationship(self, rel_data: List[RelationShipMapper]):
def perform_save_node_action(self, node_obj: NodeSchemaValidation, nodes_map: dict, edges_info: dict, node_type):
try:
res = self.db.save_relationships(relationships=rel_data)
res = self.graph_util.save_single_node(
node_data=NodePropertiesSchema(**node_obj.dict()))
nodes_map[node_type] = res
if node_obj.edges and isinstance(node_obj.edges, list):
if not edges_info.get(node_type):
edges_info[node_type] = []
edges_info[node_type].extend(node_obj.edges)
return nodes_map, edges_info
except Exception as e:
logger.exception(f'Exception occurred while creating node for the graph {e.args}')
logger.exception(f'Exception occurred while updating node info {e.args}')
raise
def load_data_from_graph(self, **kwargs):
def perform_delete_node_action(self, graph_data: NodeSchemaValidation):
try:
res = self.db.load_node_with_id(kwargs)
print(res)
neo4j_handler = Neo4jHandler(self.db)
neo4j_handler.delete_node_by_id(node=NodePropertiesSchema(**graph_data.dict()))
return "Node Deleted Successfully"
except Exception as e:
logger.exception(f'Exception Occurred while fetching the node data by {key_name} & {key_value} {e.args}')
logger.exception(f'Exception Occurred while deleting node info {e.args}')
raise
from scripts.core.engine import GraphTraversal
from scripts.schemas import GraphData
class GraphTrackingHandler:
def __init__(self):
"""
Graph Tracking Handler class interacts with the Graph Traversal class
that fetches all the nodes and relations
based on the input received from the user
"""
...
def __init__(self, db):
self.graph_traversal = GraphTraversal(db)
def ingest_data_handler(self, graph_data: GraphData) -> str:
"""
"""
return self.graph_traversal.run_gremlin_queries(graph_data=graph_data)
return self.graph_traversal.ingest_data_handler(graph_data=graph_data)
def graph_traverse_handler(self, graph_data_request: GenealogyDataRequest) -> str:
"""
......
from typing import Optional
from gqlalchemy import Relationship, Node
class Neo4jHandler:
def __init__(self, db):
self.db = db
def delete_relationship_by_id(self, relationship: Relationship) -> Optional[Relationship]:
"""Saves a relationship to the database using the relationship._id."""
results = self.db.execute(
f"MATCH (start_node)-[relationship: {relationship._type}]->(end_node)"
f" WHERE id(start_node) = {relationship._start_node_id}"
f" AND id(end_node) = {relationship._end_node_id}"
f" AND id(relationship) = {relationship._id}"
f" DELETE relationship;"
)
return results
def delete_node_by_id(self, node: Node):
"""Saves a relationship to the database using the relationship._id."""
return self.db.execute(
f"MATCH (node: {node._label}) WHERE {node._get_cypher_fields_and_block('node')} DETACH DELETE node;")
from typing import Optional
from gqlalchemy import Node, Relationship
from pydantic import Field
......@@ -7,10 +5,16 @@ from pydantic import Field
class NodeCreationSchema(Node):
id: str = Field()
name: str = Field()
project_id: str = Field()
class NodePropertiesSchema(NodeCreationSchema):
# meta: dict = Field()
...
class RelationShipMapper(Relationship, type="Relation_Mapper"):
date: Optional[str] = Field()
project_id: str = Field()
class Config:
allow_population_by_field_name = True
......@@ -14,6 +14,7 @@ class Edges(BaseModel):
node_id: Optional[str]
action: NodeActionOptions
rel_name: str
new_rel_name: Optional[str]
bind_to: str
bind_id: str
......@@ -25,18 +26,23 @@ class InputRequestSchema(BaseModel):
node_id: str
node_name: str
action: NodeActionOptions
project_id: str
node_properties: Optional[Dict] = {}
edges: Optional[List[Edges]]
tz: Optional[str] = "Asia/Kolkata"
class Config:
allow_population_by_field_name = True
class NodeSchemaValidation(BaseModel):
node_id: str = Field(alias="node_id")
id: str = Field(alias="node_id")
name: str = Field(alias="node_name")
properties: Optional[Dict] = Field(alias="node_properties", default={})
project_id: str
properties: Optional[Dict] = Field(alias="node_properties")
label: str
edges: Optional[List[Edges]] = []
tz: str
class Config:
allow_population_by_field_name = True
......
from typing import Optional, Any
from pydantic import BaseModel
class DefaultResponse(BaseModel):
status: str = "Failed"
message: Optional[str]
data: Optional[Any]
class DefaultFailureResponse(DefaultResponse):
error: Any
message: Optional[Any]
import traceback
from fastapi import APIRouter, Depends
from scripts.constants import APIEndPoints
from scripts.core.handler import GraphTrackingHandler
from scripts.db import get_db
from scripts.logging import logger
from scripts.schemas import GraphData
from scripts.schemas.responses import DefaultFailureResponse, DefaultResponse
router = APIRouter(prefix=APIEndPoints.graph_base, tags=["Graph Traversal"])
genealogy_tracking_handler = GenealogyTrackingHandler()
@router.post(
APIEndPoints.ingest_graph_data
)
def ingest_data_service(request_data: GraphData, db=Depends(get_db)):
try:
gth_obj = GraphTrackingHandler(db=db)
data = gth_obj.ingest_data_handler(graph_data=request_data)
return DefaultResponse(status="success", message="success", data=data)
except Exception as e:
tb = traceback.format_exc()
logger.exception(e)
logger.exception(tb)
return DefaultFailureResponse(error=e.args).dict()
from neo4j.exceptions import ServiceUnavailable
from typing import List
from scripts.db import neo4j_driver
from gqlalchemy import GQLAlchemyError
from scripts.db.graphdb.neo4j import Neo4jHandler
from scripts.db.models import NodeCreationSchema, RelationShipMapper, NodePropertiesSchema
from scripts.logging import logger
class Neo4jConnection:
class GraphUtility:
def __init__(self, db):
self.db = db
def __init__(self):
def save_single_node(self, node_data: NodePropertiesSchema):
try:
self.__driver = neo4j_driver
if node_info := self.load_node_data_from_graph(node_data):
node_data._id = node_info._id
return self.db.save_node_with_id(node_data)
else:
return self.db.save_node(node_data)
except Exception as e:
logger.exception(f"Failed to create the driver:{e}")
def close(self):
if self.__driver is not None:
self.__driver.close()
logger.exception(f'Exception occurred while creating node for the graph {e.args}')
raise
def query(self, query, parameters=None, db=None):
assert self.__driver is not None, "Driver not initialized!"
session = None
response = None
def save_multiple_nodes(self, node_data: List[NodePropertiesSchema]):
try:
session = self.__driver.session(database=db) if db is not None else self.__driver.session()
response = list(session.run(query, parameters))
res = self.db.save_nodes(nodes=node_data)
except Exception as e:
logger.exception(f"Query failed:{e}")
finally:
if session is not None:
session.close()
return response
logger.exception(f'Exception occurred while creating node for the graph {e.args}')
raise
def create_event_relationship(self, query, db=None, **kwargs):
assert self.__driver is not None, "Driver not initialized!"
session = None
def save_relationship(self, rel_data: RelationShipMapper, new_rel_name=None):
try:
session = self.__driver.session(database=db) if db is not None else self.__driver.session()
result = session.write_transaction(
self._execute_relationship, query, **kwargs)
# for record in result:
# print("Created friendship between: {p1}, {p2}".format(
# p1=record['p1'], p2=record['p2']))
if not (rel_info := self.load_relationship_data_from_graph(rel_data)):
return self.db.save_relationship(rel_data)
neo4j_handler = Neo4jHandler(self.db)
neo4j_handler.delete_relationship_by_id(rel_info)
if new_rel_name:
rel_data._type = new_rel_name
return self.db.save_relationship(rel_data)
except Exception as e:
logger.exception(f"Query failed:{e}")
finally:
if session is not None:
session.close()
@staticmethod
def _execute_relationship(tx, query, **kwargs):
logger.exception(f'Exception occurred while creating node for the graph {e.args}')
raise
# To learn more about the Cypher syntax,
# see https://neo4j.com/docs/cypher-manual/current/
def save_multiple_relationship(self, rel_data: List[RelationShipMapper]):
try:
res = self.db.save_relationships(relationships=rel_data)
except Exception as e:
logger.exception(f'Exception occurred while creating node for the graph {e.args}')
raise
# The Reference Card is also a good resource for keywords,
# see https://neo4j.com/docs/cypher-refcard/current/
def load_node_data_from_graph(self, node_data: NodePropertiesSchema):
try:
return self.db.load_node(node_data)
except GQLAlchemyError as e:
logger.debug(f'{e.args}')
return None
except Exception as e:
logger.exception(f'Exception Occurred while fetching the node details by {node_data.id} --> {e.args}')
raise
result = tx.run(query, **kwargs)
def load_relationship_data_from_graph(self, rel_data: RelationShipMapper):
try:
return result
# Capture any errors along with the query and data for traceability
except ServiceUnavailable as exception:
logger.exception("{query} raised an error: \n {exception}".format(
query=query, exception=exception))
return self.db.load_relationship(rel_data)
except GQLAlchemyError as e:
logger.debug(f'{e.args}')
return None
except Exception as e:
logger.exception(f'Exception Occurred while fetching the relation details -> {e.args}')
raise
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment