Commit e45d7531 authored by harshavardhan.c's avatar harshavardhan.c

dev: new changes for graph db.

parent 0bdf4dee
......@@ -22,12 +22,12 @@ def ingest_data_handler(graph_data: GraphData):
input_data = {
"Node1": {
"node_id": "270120022",
"node_id": "event_001",
"action": "add",
"node_name": "madhuri topu",
"project_id": "madhuri's project",
"node_label": "Event Label",
"node_properties": {
"node_name": "Event 20",
"project_id": "project_099",
"node_type": "Events",
"properties": {
"name": "Event 1",
"external_data_source": "mongo",
"external_data_id": "101",
......@@ -38,13 +38,13 @@ input_data = {
"edges": [],
"tz": "Asia/Kolkata"
},
"Node2": {
"node_id": "270120023",
"node_label": "Event Label",
"project_id": "madhuri's project",
"Node10": {
"node_id": "event_10",
"project_id": "project_099",
"action": "add",
"node_name": "asjkfafjk",
"node_properties": {
"node_name": "Event Name",
"node_type": "Events,Harsha",
"properties": {
"name": "Event 2",
"external_data_source": "mongo",
"external_data_id": "101",
......
python-dotenv~=0.19.2
SQLAlchemy==1.4.35
GQLAlchemy
psycopg2-binary==2.9.3
fastapi~=0.74.1
pytz~=2021.3
PyYAML~=6.0
redis~=4.1.4
PyJWT~=2.3.0
pymongo==3.7.2
ilens-kafka-publisher==0.4.2
kafka-python==1.4.7
faust==1.10.4
SQLAlchemy-Utils==0.38.2
pendulum==2.1.2
from scripts.db.graphdb.neo4j import Neo4jHandler
from scripts.db.models import NodePropertiesSchema, RelationShipMapper, NodeFetchSchema
from scripts.db.models import NodePropertiesSchema, RelationShipMapper
from scripts.logging import logger
from scripts.schemas import GraphData, NodeSchemaValidation, NodeActionOptions
from scripts.schemas import GraphData, InputRequestSchema, NodeActionOptions, GetNodeInfo, ResponseModelSchema
from scripts.utils.graph_utility import GraphUtility
......@@ -20,11 +20,10 @@ class GraphTraversal:
nodes_map = {}
edges_info = {}
for node_type, node_obj in graph_data.__root__.items():
process_obj = NodeSchemaValidation(**node_obj.dict(), label=node_type)
if node_obj.action == NodeActionOptions.delete:
self.perform_delete_node_action(graph_data=process_obj)
self.perform_delete_node_action(graph_data=node_obj)
else:
nodes_map, edges_info = self.perform_save_node_action(node_obj=process_obj, nodes_map=nodes_map,
nodes_map, edges_info = self.perform_save_node_action(node_obj=node_obj, nodes_map=nodes_map,
edges_info=edges_info, node_type=node_type)
if edges_info:
for des_edge, edges in edges_info.items():
......@@ -39,10 +38,10 @@ class GraphTraversal:
except Exception as e:
logger.exception(f'{e.args}')
def perform_save_node_action(self, node_obj: NodeSchemaValidation, nodes_map: dict, edges_info: dict, node_type):
def perform_save_node_action(self, node_obj: InputRequestSchema, nodes_map: dict, edges_info: dict, node_type):
try:
res = self.graph_util.save_single_node(
node_data=NodePropertiesSchema(**node_obj.dict()))
node_data=NodePropertiesSchema(_labels=node_obj.node_type.split(","), **node_obj.dict()))
nodes_map[node_type] = res
if node_obj.edges and isinstance(node_obj.edges, list):
if not edges_info.get(node_type):
......@@ -53,7 +52,7 @@ class GraphTraversal:
logger.exception(f'Exception occurred while updating node info {e.args}')
raise
def perform_delete_node_action(self, graph_data: NodeSchemaValidation):
def perform_delete_node_action(self, graph_data: InputRequestSchema):
try:
neo4j_handler = Neo4jHandler(self.db)
neo4j_handler.delete_node_by_id(node=NodePropertiesSchema(**graph_data.dict()))
......@@ -62,9 +61,29 @@ class GraphTraversal:
logger.exception(f'Exception Occurred while deleting node info {e.args}')
raise
def fetch_node_data(self, graph_request: NodeFetchSchema):
def fetch_node_data(self, graph_request: GetNodeInfo):
return_data = ResponseModelSchema(nodes=[], links=[])
try:
existing_data = self.graph_util.get_connecting_nodes_info(input_data=graph_request)
existing_data = self.graph_util.get_connecting_nodes_info(input_data=graph_request.dict())
existing_node_info = []
for k, v in existing_data.items():
for _item in v:
node_info = _item.dict()
ui_dict = _item.dict(exclude_none=True)
if k.lower() == "r":
ui_dict["source"] = ui_dict.pop("_start_node_id")
ui_dict["target"] = ui_dict.pop("_end_node_id")
ui_dict["linkName"] = ui_dict.pop("_type")
return_data.series_data["links"].append(ui_dict)
continue
node_id = node_info.get("id")
unique_id = node_info.get("_id")
if not node_id or node_id in unique_id:
continue
existing_node_info.append(unique_id)
ui_dict.update({"x": '', "y": ''})
return_data.series_data["nodes"].append(ui_dict)
return return_data
except Exception as e:
logger.exception(f"Exception Occurred while fetching data from node - {e.args}")
raise
from scripts.core.engine import GraphTraversal
from scripts.logging import logger
from scripts.schemas import GraphData, GetNodeInfo
......@@ -11,7 +12,11 @@ class GraphTrackingHandler:
"""
return self.graph_traversal.ingest_data_handler(graph_data=graph_data)
def graph_traverse_handler(self, graph_data_request: GetNodeInfo) -> str:
def graph_traverse_handler(self, graph_data_request: GetNodeInfo):
"""
"""
return self.graph_traversal.get_genealogy(genealogy_data=graph_data_request)
try:
return self.graph_traversal.fetch_node_data(graph_data_request)
except Exception as e:
logger.exception(f"Exception Occurred while fetching the connection details for selected node {e.args}")
raise
from gqlalchemy import Neo4j, Memgraph
from gqlalchemy.connection import Neo4jConnection
from scripts.config import DBConf
from scripts.logging import logger
......@@ -7,20 +8,19 @@ host = DBConf.GRAPH_HOST
port = DBConf.GRAPH_PORT
username = DBConf.GRAPH_USERNAME
password = DBConf.GRAPH_PASSWORD
db_type = DBConf.GRAPH_DB_TYPE
def db_func_mapper(conn_type):
func_mapper = {
"neo4j": Neo4j,
"Neo4jConnection": Neo4jConnection,
"memgraph": Memgraph
}
return func_mapper.get(conn_type)
def get_db():
def get_db(db_type=DBConf.GRAPH_DB_TYPE):
connector = db_func_mapper(db_type)
if connector is None:
logger.exception(f"Connection Details not found for the selected database type -- {db_type}")
return connector(host=host, username=username, password=password, port=port)
from scripts.db.models import NodePropertiesSchema
class QueryFormation:
@staticmethod
def get_data_by_project_id_and_node_id(node: NodePropertiesSchema):
return f"MATCH (node: {node._label}) WHERE node.id='{node.node_id}' AND node.project_id='{node.project_id}' RETURN node;"
......@@ -8,7 +8,6 @@ class Neo4jHandler:
self.db = db
def delete_relationship_by_id(self, relationship: Relationship) -> Optional[Relationship]:
"""Saves a relationship to the database using the relationship._id."""
results = self.db.execute(
f"MATCH (start_node)-[relationship: {relationship._type}]->(end_node)"
f" WHERE id(start_node) = {relationship._start_node_id}"
......@@ -20,6 +19,8 @@ class Neo4jHandler:
return results
def delete_node_by_id(self, node: Node):
"""Saves a relationship to the database using the relationship._id."""
return self.db.execute(
f"MATCH (node: {node._label}) WHERE {node._get_cypher_fields_and_block('node')} DETACH DELETE node;")
def execute_custom_query(self, node):
connection = self.db._get_cached_connection()
......@@ -5,9 +5,10 @@ from pydantic import Field
class NodeCreationSchema(Node):
id: str = Field()
name: str = Field()
node_id: str = Field()
node_name: str = Field()
project_id: str = Field()
node_properties: Optional[str]
class NodePropertiesSchema(NodeCreationSchema):
......@@ -23,6 +24,5 @@ class RelationShipMapper(Relationship, type="Relation_Mapper"):
class NodeFetchSchema(Node):
id: Optional[str]
name: Optional[str]
id: str
project_id: str
import json
from enum import Enum
from typing import Dict, Optional, List
from pydantic import BaseModel, Field
from pydantic import BaseModel, root_validator
class NodeActionOptions(str, Enum):
......@@ -26,25 +27,19 @@ class InputRequestSchema(BaseModel):
node_name: str
action: NodeActionOptions
project_id: str
node_properties: Optional[Dict] = {}
node_type: Optional[str] = "Events"
properties: Optional[Dict] = {}
node_properties: Optional[str]
edges: Optional[List[Edges]]
tz: Optional[str] = "Asia/Kolkata"
class Config:
allow_population_by_field_name = True
class NodeSchemaValidation(BaseModel):
id: str = Field(alias="node_id")
name: str = Field(alias="node_name")
project_id: str
properties: Optional[Dict] = Field(alias="node_properties")
label: str
edges: Optional[List[Edges]] = []
tz: str
class Config:
allow_population_by_field_name = True
@root_validator
def date_range_validator(cls, values):
if values['properties']:
values['node_properties'] = json.dumps(values['properties'])
return values
class GraphData(BaseModel):
......@@ -93,5 +88,8 @@ class GraphData(BaseModel):
class GetNodeInfo(BaseModel):
project_id: str
event_id: str
id: str
class ResponseModelSchema(BaseModel):
series_data: Optional[dict]
......@@ -6,11 +6,10 @@ from scripts.constants import APIEndPoints
from scripts.core.handler import GraphTrackingHandler
from scripts.db import get_db
from scripts.logging import logger
from scripts.schemas import GraphData
from scripts.schemas import GraphData, GetNodeInfo
from scripts.schemas.responses import DefaultFailureResponse, DefaultResponse
router = APIRouter(prefix=APIEndPoints.graph_base, tags=["Graph Traversal"])
genealogy_tracking_handler = GenealogyTrackingHandler()
@router.post(
......@@ -26,3 +25,18 @@ def ingest_data_service(request_data: GraphData, db=Depends(get_db)):
logger.exception(e)
logger.exception(tb)
return DefaultFailureResponse(error=e.args).dict()
@router.post(
APIEndPoints.api_graph_link
)
def ingest_data_service(request_data: GetNodeInfo, db=Depends(get_db)):
try:
gth_obj = GraphTrackingHandler(db=db)
data = gth_obj.graph_traverse_handler(request_data)
return DefaultResponse(status="success", message="success", data=data)
except Exception as e:
tb = traceback.format_exc()
logger.exception(e)
logger.exception(tb)
return DefaultFailureResponse(error=e.args).dict()
import json
from typing import Iterator, Dict, Any
class CommonUtils:
def __init__(self):
...
@staticmethod
def convert_dict_str_format(input_dict):
return_str = json.dumps(input_dict).replace('"', "'")
for each_key in input_dict.keys():
return_str = return_str.replace(f"'{each_key}'", each_key)
return return_str.replace('{', '').replace('}', '')
@staticmethod
def process_generator_result(input_data: Iterator[Dict[str, Any]]):
return_dict = {}
for _item in input_data:
if isinstance(_item, dict):
for k, v in _item.items():
if k not in return_dict:
return_dict[k] = []
return_dict[k].append(v)
return return_dict
from typing import List
from gqlalchemy import GQLAlchemyError
from gqlalchemy.query_builders import neo4j_query_builder
from scripts.db import get_db
from scripts.db.graphdb.graph_query import QueryFormation
from scripts.db.graphdb.neo4j import Neo4jHandler
from scripts.db.models import RelationShipMapper, NodePropertiesSchema
from scripts.logging import logger
from scripts.utils.common_utils import CommonUtils
class GraphUtility:
def __init__(self, db):
self.db = db
self.common_util = CommonUtils()
self.query_util = QueryFormation()
def save_single_node(self, node_data: NodePropertiesSchema):
try:
if node_info := self.load_node_data_from_graph(node_data):
if node_info := self.load_node_data_from_graph_by_id(node_data):
node_data._id = node_info._id
return self.db.save_node_with_id(node_data)
else:
......@@ -50,14 +56,16 @@ class GraphUtility:
logger.exception(f'Exception occurred while creating node for the graph {e.args}')
raise
def load_node_data_from_graph(self, node_data: NodePropertiesSchema):
def load_node_data_from_graph_by_id(self, node_data: NodePropertiesSchema):
try:
return self.db.load_node(node_data)
query = self.query_util.get_data_by_project_id_and_node_id(node_data)
results = self.db.execute_and_fetch(query)
return self.db.get_variable_assume_one(results, "node")
except GQLAlchemyError as e:
logger.debug(f'{e.args}')
return None
except Exception as e:
logger.exception(f'Exception Occurred while fetching the node details by {node_data.id} --> {e.args}')
logger.exception(f'Exception Occurred while fetching the node details by {node_data.node_id} --> {e.args}')
raise
def load_relationship_data_from_graph(self, rel_data: RelationShipMapper):
......@@ -70,11 +78,10 @@ class GraphUtility:
logger.exception(f'Exception Occurred while fetching the relation details -> {e.args}')
raise
def get_connecting_nodes_info(self, input_data):
def get_connecting_nodes_info(self, input_data, label='NodeCreationSchema'):
try:
query = f"MATCH (a: {input_data.get('label')}'{input_data}')-[r]-(b) RETURN r,a,b;"
relation_info, input_nodes, output_nodes = self.db.execute_and_fetch(query=query)
return relation_info, input_nodes, output_nodes
query = f"MATCH (a: {label}{{{self.common_util.convert_dict_str_format(input_dict=input_data)}}})-[r]-(b) RETURN r,a,b;"
return self.common_util.process_generator_result(self.db.execute_and_fetch(query=query))
except GQLAlchemyError as e:
logger.debug(f'{e.args}')
return None
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment