Commit d5eda3b4 authored by yogesh.m's avatar yogesh.m

update

parent 3b920c2c
import asyncio
import socket
import threading
import hash_sender
from asyncua import Client, Node
from asyncua.common.subscription import DataChangeNotif, SubHandler
import yaml
yamlfile=open("opc_ua_transmitter_config.yaml")
data = yaml.load(yamlfile, Loader=yaml.FullLoader)
server_hash_udp_ip = data["configuration"]["server_hash_udp_ip"]
server_hash_udp_port = data["configuration"]["server_hash_udp_port"]
server_udp_ip = data["configuration"]["server_udp_ip"]
server_udp_port = int(data["configuration"]["server_udp_port"])
serverAddressPort = (server_udp_ip,server_udp_port)
ENDPOINT = data["configuration"]["endpoint"]
NAMESPACE = data["configuration"]["namespace"]
bufferSize = 1024
UDPClientSocket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
nm_no = ''
class MyHandler(SubHandler):
def __init__(self):
self._queue = asyncio.Queue()
def datachange_notification(self, node: Node, value, data: DataChangeNotif) -> None:
self._queue.put_nowait([node, value, data])
async def process(self) -> None:
try:
while True:
[node, value, data] = self._queue.get_nowait()
datatype = type(value)
print(value)
packet = str(datatype) + "&" + str(value) + "&" + str(node.nodeid.NamespaceIndex) + "&" + str(node.nodeid.Identifier)
UDPClientSocket.sendto(str.encode(packet), serverAddressPort)
except asyncio.QueueEmpty:
pass
variables=[]
async def get_nodes(client,node_objects):
for sub_obj in node_objects:
if ("ns="+nm_no+";" in str(sub_obj)):
node = client.get_node(sub_obj)
children_nodes = await node.get_children()
identifier = str(children_nodes[0].nodeid.Identifier) if children_nodes else ""
node_class = await sub_obj.read_node_class()
if (identifier not in str(sub_obj)):
await get_nodes(client, children_nodes)
if (node_class._name_ == "Variable"):
variables.append(sub_obj)
else:
if (node_class._name_ == "Variable"):
variables.append(sub_obj)
return variables
async def main() -> None:
global nm_no
try:
async with Client(url=ENDPOINT) as client:
nm_no = str(await client.get_namespace_index(NAMESPACE))
object_root_node = client.get_objects_node()
objects = await object_root_node.get_children()
node = await get_nodes(client,objects)
t1=threading.Thread(target=hash_sender.send_hash,args=(ENDPOINT,NAMESPACE,server_hash_udp_ip,server_hash_udp_port,))
t1.start()
handler = MyHandler()
subscription = await client.create_subscription(period=0, handler=handler)
await subscription.subscribe_data_change(node)
while True:
await handler.process()
await asyncio.sleep(0.01)
except:
print("No connection found for opcua server, check if the server is active")
def transmitter_main(_server_hash_udp_ip,_server_hash_udp_port,_server_udp_ip,_server_udp_port,_ENDPOINT,_NAMESPACE):
global server_hash_udp_ip
global server_hash_udp_port
global server_udp_ip
global server_hash_udp_port
global server_udp_ip
global server_udp_port
global serverAddressPort
global ENDPOINT
global NAMESPACE
server_hash_udp_ip = _server_hash_udp_ip
server_hash_udp_port = _server_hash_udp_port
server_udp_ip = _server_udp_ip
server_udp_port = _server_udp_port
serverAddressPort = (_server_udp_ip, _server_udp_port)
ENDPOINT = _ENDPOINT
NAMESPACE = _NAMESPACE
asyncio.run(main())
if __name__ == "__main__":
asyncio.run(main())
......@@ -426,7 +426,7 @@ class opcua_pack():
def connect(self,address):
host,port=address.split("/")[2].split(":")
server_address = (host, int(port))
server_address = (socket.gethostbyname(host), int(port))
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(server_address)
message = self.helf(address)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment