Commit 17dc5ad8 authored by yogesh.m's avatar yogesh.m

update

parent 469aedf0
......@@ -6,16 +6,12 @@ import threading
from time import sleep
import json
localIP = "2.2.2.5"
localPort = 20002
bufferSize = 1024
UDPServerSocket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
UDPServerSocket.bind((localIP, localPort))
class asyncua_server():
def __init__(self):
self.server=None
self.idx=None
self.remote_server_idx=None
self.variables={}
async def delete_nodes(self,nodes):
for node in nodes:
......@@ -37,12 +33,40 @@ class asyncua_server():
try:
# keep the server running until interrupted
while True:
#var.write_value(random.randint(1, 100))
await asyncio.sleep(0.1)
finally:
# stop the server
await self.server.stop()
async def update_variable(self,node,datatype):
if (node.isdigit()):
node_index = "ns=3;i=" + node
else:
node_index = "ns=3;s=" + node
parent_node= self.server.get_node(self.variables[node]["parent"])
datatype = int(datatype)
if (datatype == 11):
await self.variables[node]["node"].delete()
self.variables[node]["node"] = await parent_node.add_variable(node_index, self.variables[node]["name"], float(0))
await self.variables[node]["node"].set_writable()
elif (datatype == 6):
await self.variables[node]["node"].delete()
self.variables[node]["node"] = await parent_node.add_variable(node_index, self.variables[node]["name"], 0, ua.VariantType.Int32)
await self.variables[node]["node"].set_writable()
elif (datatype == 24):
print(node_index, parent_node, self.variables[node]["name"],datatype)
await self.variables[node]["node"].delete()
self.variables[node]["node"] = await parent_node.add_variable(node_index, self.variables[node]["name"], 0,ua.VariantType.Null)
await self.variables[node]["node"].set_writable()
else:
await self.variables[node]["node"].delete()
self.variables[node]["node"] = await parent_node.add_variable(node_index, self.variables[node]["name"], None)
async def add_object(self,node, parent_node, hierarchy):
if (parent_node == None):
if (node.isdigit()):
......@@ -57,34 +81,44 @@ class asyncua_server():
node_index = "ns=3;s=" + parent_node
node_space = self.server.get_node(node_index)
node_id="ns=3;i="+node if node.isdigit() else "ns=3;s="+node
self.variables[node]={}
self.variables[node]["parent"] = node_index
self.variables[node]["name"] = hierarchy["name"]
if(hierarchy["type"]==1):
await node_space.add_object(node_id,hierarchy["name"])
elif(hierarchy["type"]==2):
print(hierarchy["datatype"])
if(hierarchy["datatype"]==11):
var = await node_space.add_variable(node_id,hierarchy["name"],float(0.0))
await var.set_writable()
self.variables[node]["node"]= await node_space.add_variable(node_id,hierarchy["name"],float(0.0))
await self.variables[node]["node"].set_writable()
elif(hierarchy["datatype"]==6):
var = await node_space.add_variable(node_id, hierarchy["name"], 0,ua.VariantType.Int32)
await var.set_writable()
self.variables[node]["node"] = await node_space.add_variable(node_id, hierarchy["name"], 0,ua.VariantType.Int32)
await self.variables[node]["node"].set_writable()
elif (hierarchy["datatype"] == 24):
var = await node_space.add_variable(node_id, hierarchy["name"], 0, ua.VariantType.Null)
await var.set_writable()
self.variables[node]["node"] = await node_space.add_variable(node_id, hierarchy["name"], 0, ua.VariantType.Null)
await self.variables[node]["node"].set_writable()
else:
var = await node_space.add_variable(node_id, hierarchy["name"], None)
await var.set_writable()
self.variables[node]["node"] = await node_space.add_variable(node_id, hierarchy["name"], None)
await self.variables[node]["node"].set_writable()
async def update_datatypes(self,datatypes):
for data in datatypes:
node,datatype=data.split("-")
await self.update_variable(node,datatype)
asr = asyncua_server()
identifier_array_received=[]
identifier_array_existing=[]
received_datatypes=[]
existing_datatypes=[]
def get_received_hierarchy_array(hierarchy):
for key in hierarchy:
if(key!="hash"):
if(isinstance(hierarchy[key],dict)):
if('datatype' in hierarchy[key]):
received_datatypes.append(key+"-"+str(hierarchy[key]["datatype"]))
identifier_array_received.append(key)
get_received_hierarchy_array(hierarchy[key])
return identifier_array_received
return identifier_array_received,received_datatypes
async def get_existing_hierarchy_array(client,node_objects):
node_hierarchy = {}
......@@ -93,14 +127,29 @@ async def get_existing_hierarchy_array(client,node_objects):
node = client.get_node(sub_obj)
children_nodes = await node.get_children()
identifier = str(children_nodes[0].nodeid.Identifier) if children_nodes else ""
node_class = await sub_obj.read_node_class()
if (identifier not in str(sub_obj)):
identifier_name = sub_obj.nodeid.Identifier
identifier_array_existing.append(str(identifier_name))
node_hierarchy[identifier_name] = await get_existing_hierarchy_array(client, children_nodes)
if(node_class.name == "Variable"):
datatype = await sub_obj.read_data_type_as_variant_type()
if (datatype._value_ == 0):
existing_datatypes.append(str(identifier_name) + "-" + str(24))
else:
existing_datatypes.append(str(identifier_name) + "-" + str(datatype._value_))
else:
identifier_name = sub_obj.nodeid.Identifier
identifier_array_existing.append(str(identifier_name))
return identifier_array_existing
if (node_class.name == "Variable"):
datatype = await sub_obj.read_data_type_as_variant_type()
if(datatype._value_==0):
existing_datatypes.append(str(identifier_name) + "-" + str(24))
else:
existing_datatypes.append(str(identifier_name) + "-" + str(datatype._value_))
return identifier_array_existing,existing_datatypes
......@@ -113,21 +162,30 @@ def add_nodes(nodes_to_add,hierarchy,parent_key=None):
add_nodes(nodes_to_add,hierarchy[key],key)
def analyse_hierarchy(hierarchy):
array1=get_received_hierarchy_array(hierarchy)
array1,datatypes1=get_received_hierarchy_array(hierarchy)
root_node=asr.server.get_root_node()
object_root_node=asyncio.run(root_node.get_children())
node_objects= asyncio.run(object_root_node[0].get_children())
array2=asyncio.run(get_existing_hierarchy_array(asr.server,node_objects))
array2,datatypes2=asyncio.run(get_existing_hierarchy_array(asr.server,node_objects))
nodes_to_add=set(array1)-set(array2)
nodes_to_delete=set(array2)-set(array1)
datatypes_to_update=set(datatypes1)-set(datatypes2)
add_nodes(nodes_to_add,hierarchy)
asyncio.run(asr.delete_nodes(nodes_to_delete))
if(datatypes_to_update and not(nodes_to_add) and not(nodes_to_delete)):
asyncio.run(asr.update_datatypes(datatypes_to_update))
identifier_array_received.clear()
received_datatypes.clear()
identifier_array_existing.clear()
existing_datatypes.clear()
def hash_receive():
def hash_receive(udp_receiver_ip,udp_receiver_port):
prev_hash = 0
sleep(1)
localIP = udp_receiver_ip
localPort = udp_receiver_port
bufferSize = 1024
UDPServerSocket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
UDPServerSocket.bind((localIP, localPort))
while (True):
bytesAddressPair = UDPServerSocket.recvfrom(bufferSize)
message = bytesAddressPair[0]
......@@ -137,7 +195,7 @@ def hash_receive():
analyse_hierarchy(hierarchy)
prev_hash=hierarchy["hash"]
def st():
t1=threading.Thread(target=hash_receive)
def st(endpoint_url,namespace_server,udp_hash_receiver_ip,udp_hash_receiver_port):
t1=threading.Thread(target=hash_receive,args=(udp_hash_receiver_ip,udp_hash_receiver_port))
t1.start()
asyncio.run(asr.start_server("opc.tcp://2.2.2.5:53531/myopc/free","http://klopc.com"))
\ No newline at end of file
asyncio.run(asr.start_server(endpoint_url,namespace_server))
\ No newline at end of file
import socket
from opcua import Client,ua
from opcua_subscriber.opcua_subscribe import *
from time import sleep
from opcua_subscriber.opcua_subscribe import opcua_pack
from asyncua_server import st
import asyncio
import threading
Endpoint_Url = "opc.tcp://2.2.2.5:53531/myopc/free"
Namespace_Server ="http://klopc.com"
localIP = "2.2.2.5"
udp_hash_receiver_ip="2.2.2.5"
udp_hash_receiver_port=20002
localPort = 20001
localIP = "2.2.2.5"
localPort = 20001
bufferSize = 1024
msgFromServer = "Hello UDP Client"
bytesToSend = str.encode(msgFromServer)
# Create a datagram socket
UDPServerSocket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
# Bind to address and ip
UDPServerSocket.bind((localIP, localPort))
print("UDP server up and listening")
t1=threading.Thread(target=st,args=())
t1=threading.Thread(target=st,args=(Endpoint_Url,Namespace_Server,udp_hash_receiver_ip,udp_hash_receiver_port,))
t1.start()
sleep(25)
sleep(5)
opua=opcua_pack()
sock = opua.connect("opc.tcp://2.2.2.5:53531/myopc/free")
sock = opua.connect(Endpoint_Url)
while (True):
bytesAddressPair = UDPServerSocket.recvfrom(bufferSize)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment