Commit 3b920c2c authored by yogesh.m's avatar yogesh.m

update

parent 8cd89de1
......@@ -189,9 +189,8 @@ def hash_receive(udp_receiver_ip,udp_receiver_port):
while (True):
bytesAddressPair = UDPServerSocket.recvfrom(bufferSize)
message = bytesAddressPair[0]
string_length = struct.unpack("!I", message[:4])[0]
json_string = message[4:].decode('utf-8')
hierarchy=json.loads(json_string)
json_string = message.decode()
hierarchy = json.loads(json_string)
if(prev_hash!=hierarchy["hash"] and asr.idx):
analyse_hierarchy(hierarchy)
prev_hash=hierarchy["hash"]
......
......@@ -7,7 +7,11 @@ import json
nm_no = None
async def create_node_hierarchy(client, node_objects):
class Hash_Sender():
def __init__(self):
self.hash = ""
async def create_node_hierarchy(self,client, node_objects):
node_hierarchy={}
for sub_obj in node_objects:
if("ns="+str(nm_no)+";" in str(sub_obj)):
......@@ -18,7 +22,7 @@ async def create_node_hierarchy(client, node_objects):
if (identifier not in str(sub_obj)):
child_disp_name = await sub_obj.read_display_name()
identifier_name = sub_obj.nodeid.Identifier
node_hierarchy[identifier_name] = await create_node_hierarchy(client, children_nodes)
node_hierarchy[identifier_name] = await self.create_node_hierarchy(client, children_nodes)
node_hierarchy[identifier_name]["name"] = child_disp_name.Text
node_hierarchy[identifier_name]["type"] = node_class._value_
if (node_class._name_ == "Variable"):
......@@ -35,7 +39,7 @@ async def create_node_hierarchy(client, node_objects):
node_hierarchy[identifier_name]["datatype"] = datatype._value_
return node_hierarchy
async def get_send_hash(ENDPOINT,NAMESPACE,server_hash_udp_ip,server_hash_udp_port):
async def get_send_hash(self,ENDPOINT,NAMESPACE,server_hash_udp_ip,server_hash_udp_port):
global nm_no
serverAddressPort = (server_hash_udp_ip, server_hash_udp_port)
UDPClientSocket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
......@@ -46,14 +50,13 @@ async def get_send_hash(ENDPOINT,NAMESPACE,server_hash_udp_ip,server_hash_udp_po
node_objects = root_objects[0]
while True:
node_objects_children = await node_objects.get_children() # replace with your own node id
node_hierarchy = await create_node_hierarchy(client, node_objects_children)
node_hierarchy["hash"] = hash(str(node_hierarchy))
node_hierarchy = await self.create_node_hierarchy(client, node_objects_children)
self.hash = node_hierarchy["hash"] = hash(str(node_hierarchy))
node_hierarchy["namespace_idx"]=nm_no
json_hierarchy = json.dumps(node_hierarchy)
hierarchy=json.dumps(json_hierarchy)
binary_data = hierarchy.encode()
print(binary_data)
#UDPClientSocket.sendto(binary_data, serverAddressPort)
binary_data = json_hierarchy.encode()
UDPClientSocket.sendto(binary_data, serverAddressPort)
def send_hash(Endpoint,Namespace,server_hash_udp_ip,server_hash_udp_port):
asyncio.run(get_send_hash(Endpoint,Namespace,server_hash_udp_ip,server_hash_udp_port))
\ No newline at end of file
def send_hash(self,Endpoint,Namespace,server_hash_udp_ip,server_hash_udp_port):
asyncio.run(self.get_send_hash(Endpoint,Namespace,server_hash_udp_ip,server_hash_udp_port))
return self.hash
\ No newline at end of file
......@@ -5,22 +5,16 @@ from asyncua_server import st
import threading
import yaml
yamlfile=open("opc_ua_receiver_config.yaml")
data = yaml.load(yamlfile, Loader=yaml.FullLoader)
Endpoint_Url = data["configuration"]["endpoint_url"]
Namespace_Server = data["configuration"]["namespace_server"]
udp_hash_receiver_ip=data["configuration"]["udp_hash_receiver_ip"]
udp_hash_receiver_port=data["configuration"]["udp_hash_receiver_port"]
localIP = data["configuration"]["udp_ip"]
localPort = data["configuration"]["udp_port"]
if(os.path.isfile("opc_ua_receiver_config.yaml")):
yamlfile=open("opc_ua_receiver_config.yaml")
data = yaml.load(yamlfile, Loader=yaml.FullLoader)
Endpoint_Url = data["configuration"]["endpoint_url"]
Namespace_Server = data["configuration"]["namespace_server"]
udp_hash_receiver_ip=data["configuration"]["udp_hash_receiver_ip"]
udp_hash_receiver_port=data["configuration"]["udp_hash_receiver_port"]
localIP = data["configuration"]["udp_ip"]
localPort = data["configuration"]["udp_port"]
bufferSize = 1024
UDPServerSocket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
UDPServerSocket.bind((localIP, localPort))
t1=threading.Thread(target=st,args=(Endpoint_Url,Namespace_Server,udp_hash_receiver_ip,udp_hash_receiver_port,))
t1.start()
opua=None
sock=None
......@@ -36,6 +30,12 @@ def start_server():
def receive_packets():
print("wait until server starts ...")
t1 = threading.Thread(target=st,
args=(Endpoint_Url, Namespace_Server, udp_hash_receiver_ip, udp_hash_receiver_port,))
t1.start()
bufferSize = 1024
UDPServerSocket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
UDPServerSocket.bind((localIP, localPort))
opua,sock=start_server()
print("receiving packets ...")
while (True):
......
import asyncio
import socket
import threading
from hash_sender import Hash_Sender
from asyncua import Client, Node
from asyncua.common.subscription import DataChangeNotif, SubHandler
import yaml
import os
if(os.path.isfile("opc_ua_transmitter_config.yaml")):
yamlfile=open("opc_ua_transmitter_config.yaml")
data = yaml.load(yamlfile, Loader=yaml.FullLoader)
server_hash_udp_ip = data["configuration"]["server_hash_udp_ip"]
server_hash_udp_port = data["configuration"]["server_hash_udp_port"]
server_udp_ip = data["configuration"]["server_udp_ip"]
server_udp_port = int(data["configuration"]["server_udp_port"])
serverAddressPort = (server_udp_ip,server_udp_port)
ENDPOINT = data["configuration"]["endpoint"]
NAMESPACE = data["configuration"]["namespace"]
bufferSize = 1024
UDPClientSocket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
nm_no = ''
class MyHandler(SubHandler):
def __init__(self):
self._queue = asyncio.Queue()
def datachange_notification(self, node: Node, value, data: DataChangeNotif) -> None:
self._queue.put_nowait([node, value, data])
async def process(self) -> None:
try:
while True:
[node, value, data] = self._queue.get_nowait()
datatype = type(value)
packet = str(datatype) + "&" + str(value) + "&" + str(node.nodeid.NamespaceIndex) + "&" + str(node.nodeid.Identifier)
UDPClientSocket.sendto(str.encode(packet), serverAddressPort)
except asyncio.QueueEmpty:
pass
variables=[]
async def get_nodes(client,node_objects):
for sub_obj in node_objects:
if ("ns="+nm_no+";" in str(sub_obj)):
node = client.get_node(sub_obj)
children_nodes = await node.get_children()
identifier = str(children_nodes[0].nodeid.Identifier) if children_nodes else ""
node_class = await sub_obj.read_node_class()
if (identifier not in str(sub_obj)):
await get_nodes(client, children_nodes)
if (node_class._name_ == "Variable"):
variables.append(sub_obj)
else:
if (node_class._name_ == "Variable"):
variables.append(sub_obj)
return variables
hash_changed=False
async def subscribe_node():
global nm_no
async with Client(url=ENDPOINT) as client:
nm_no = str(await client.get_namespace_index(NAMESPACE))
global hash_changed
object_root_node = client.get_objects_node()
objects = await object_root_node.get_children()
node = await get_nodes(client, objects)
handler = MyHandler()
subscription = await client.create_subscription(period=0, handler=handler)
await subscription.subscribe_data_change(node)
while True and not hash_changed:
await handler.process()
await asyncio.sleep(0.01)
hash_changed=False
def subscribe_to_node():
asyncio.run(subscribe_node())
async def main() -> None:
hs=Hash_Sender()
global hash_changed
prev_hash=""
try:
t1=threading.Thread(target=hs.send_hash,args=(ENDPOINT,NAMESPACE,server_hash_udp_ip,server_hash_udp_port,))
t1.start()
while True:
hash=hs.hash
if(hash!=prev_hash and not hash_changed):
prev_hash=hash
hash_changed = True
while(hash_changed):
hash_changed = False
t2=threading.Thread(target=subscribe_to_node,args=())
t2.start()
except Exception:
print("No connection found for opcua server, check if the server is active")
def transmitter_main(_server_hash_udp_ip,_server_hash_udp_port,_server_udp_ip,_server_udp_port,_ENDPOINT,_NAMESPACE):
global server_hash_udp_ip
global server_hash_udp_port
global server_udp_ip
global server_hash_udp_port
global server_udp_ip
global server_udp_port
global serverAddressPort
global ENDPOINT
global NAMESPACE
server_hash_udp_ip = _server_hash_udp_ip
server_hash_udp_port = _server_hash_udp_port
server_udp_ip = _server_udp_ip
server_udp_port = _server_udp_port
serverAddressPort = (_server_udp_ip, _server_udp_port)
ENDPOINT = _ENDPOINT
NAMESPACE = _NAMESPACE
asyncio.run(main())
if __name__ == "__main__":
asyncio.run(main())
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment