Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
O
opcua-cloning
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
CI / CD Analytics
Repository Analytics
Value Stream Analytics
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
yogesh.m
opcua-cloning
Commits
024a7d4f
Commit
024a7d4f
authored
Mar 15, 2023
by
yogesh.m
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
update
parent
a0fa6edb
Changes
9
Show whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
216 additions
and
3 deletions
+216
-3
__pycache__/asyncua_server.cpython-39.pyc
__pycache__/asyncua_server.cpython-39.pyc
+0
-0
__pycache__/hash_sender.cpython-39.pyc
__pycache__/hash_sender.cpython-39.pyc
+0
-0
__pycache__/opcua_receiver.cpython-39.pyc
__pycache__/opcua_receiver.cpython-39.pyc
+0
-0
__pycache__/opcua_transmitter.cpython-39.pyc
__pycache__/opcua_transmitter.cpython-39.pyc
+0
-0
main.py
main.py
+176
-0
opc_ua_transmitter_config.yaml
opc_ua_transmitter_config.yaml
+1
-1
opcua_receiver.py
opcua_receiver.py
+18
-1
opcua_subscriber/__pycache__/opcua_subscribe.cpython-39.pyc
opcua_subscriber/__pycache__/opcua_subscribe.cpython-39.pyc
+0
-0
opcua_transmitter.py
opcua_transmitter.py
+21
-1
No files found.
__pycache__/asyncua_server.cpython-39.pyc
View file @
024a7d4f
No preview for this file type
__pycache__/hash_sender.cpython-39.pyc
View file @
024a7d4f
No preview for this file type
__pycache__/opcua_receiver.cpython-39.pyc
0 → 100644
View file @
024a7d4f
File added
__pycache__/opcua_transmitter.cpython-39.pyc
0 → 100644
View file @
024a7d4f
File added
main.py
0 → 100644
View file @
024a7d4f
import
tkinter
as
tk
from
opcua_transmitter
import
transmitter_main
#from opcua_receiver import receive_main
class
App
(
tk
.
Frame
):
def
__init__
(
self
,
master
=
None
):
super
()
.
__init__
(
master
)
self
.
master
=
master
self
.
pack
()
self
.
create_widgets
()
def
create_widgets
(
self
):
# Label at the top
self
.
label
=
tk
.
Label
(
self
,
text
=
"Choose the right operation:"
)
self
.
label
.
pack
(
side
=
"top"
)
# Send button
self
.
send_button
=
tk
.
Button
(
self
,
text
=
"Send"
,
command
=
self
.
send_window
)
self
.
send_button
.
pack
(
side
=
"left"
)
# Receive button
self
.
receive_button
=
tk
.
Button
(
self
,
text
=
"Receive"
,
command
=
self
.
receive_window
)
self
.
receive_button
.
pack
(
side
=
"right"
)
def
send_window
(
self
):
# Remove existing widgets
self
.
label
.
pack_forget
()
self
.
send_button
.
pack_forget
()
self
.
receive_button
.
pack_forget
()
# URL label and entry widget
self
.
url_label
=
tk
.
Label
(
self
,
text
=
"Enter Endpoint URL:"
)
self
.
url_label
.
pack
(
side
=
"top"
)
self
.
url_entry
=
tk
.
Entry
(
self
)
self
.
url_entry
.
pack
(
side
=
"top"
)
self
.
namespace_label
=
tk
.
Label
(
self
,
text
=
"Enter Namespace URL:"
)
self
.
namespace_label
.
pack
(
side
=
"top"
)
self
.
namespace_entry
=
tk
.
Entry
(
self
)
self
.
namespace_entry
.
pack
(
side
=
"top"
)
self
.
server_hash_udp_ip_label
=
tk
.
Label
(
self
,
text
=
"Enter server_hash_udp_ip:"
)
self
.
server_hash_udp_ip_label
.
pack
(
side
=
"top"
)
self
.
server_hash_udp_ip_entry
=
tk
.
Entry
(
self
)
self
.
server_hash_udp_ip_entry
.
pack
(
side
=
"top"
)
self
.
server_hash_udp_port_label
=
tk
.
Label
(
self
,
text
=
"Enter server_hash_port_ip:"
)
self
.
server_hash_udp_port_label
.
pack
(
side
=
"top"
)
self
.
server_hash_udp_port_entry
=
tk
.
Entry
(
self
)
self
.
server_hash_udp_port_entry
.
pack
(
side
=
"top"
)
self
.
server_udp_ip_label
=
tk
.
Label
(
self
,
text
=
"Enter server_udp_ip:"
)
self
.
server_udp_ip_label
.
pack
(
side
=
"top"
)
self
.
server_udp_ip_entry
=
tk
.
Entry
(
self
)
self
.
server_udp_ip_entry
.
pack
(
side
=
"top"
)
self
.
server_udp_port_label
=
tk
.
Label
(
self
,
text
=
"Enter server_port_ip:"
)
self
.
server_udp_port_label
.
pack
(
side
=
"top"
)
self
.
server_udp_port_entry
=
tk
.
Entry
(
self
)
self
.
server_udp_port_entry
.
pack
(
side
=
"top"
)
# Submit button
self
.
submit_button
=
tk
.
Button
(
self
,
text
=
"Submit"
,
command
=
lambda
:
self
.
submit_url
(
"Send"
))
self
.
submit_button
.
pack
(
side
=
"top"
)
# Back button
self
.
back_button
=
tk
.
Button
(
self
,
text
=
"Back"
,
command
=
self
.
back_of_send
)
self
.
back_button
.
pack
(
side
=
"top"
)
def
receive_window
(
self
):
# Remove existing widgets
self
.
label
.
pack_forget
()
self
.
send_button
.
pack_forget
()
self
.
receive_button
.
pack_forget
()
# URL label and entry widget
self
.
url_label
=
tk
.
Label
(
self
,
text
=
"Enter URL:"
)
self
.
url_label
.
pack
(
side
=
"top"
)
self
.
url_entry
=
tk
.
Entry
(
self
)
self
.
url_entry
.
pack
(
side
=
"top"
)
self
.
namespace_label
=
tk
.
Label
(
self
,
text
=
"Enter Namespace URL:"
)
self
.
namespace_label
.
pack
(
side
=
"top"
)
self
.
namespace_entry
=
tk
.
Entry
(
self
)
self
.
namespace_entry
.
pack
(
side
=
"top"
)
self
.
udp_hash_receiver_ip_label
=
tk
.
Label
(
self
,
text
=
"Enter udp_hash_receiver_ip:"
)
self
.
udp_hash_receiver_ip_label
.
pack
(
side
=
"top"
)
self
.
udp_hash_receiver_ip_entry
=
tk
.
Entry
(
self
)
self
.
udp_hash_receiver_ip_entry
.
pack
(
side
=
"top"
)
self
.
udp_hash_receiver_port_label
=
tk
.
Label
(
self
,
text
=
"Enter udp_hash_receiver_port:"
)
self
.
udp_hash_receiver_port_label
.
pack
(
side
=
"top"
)
self
.
udp_hash_receiver_port_entry
=
tk
.
Entry
(
self
)
self
.
udp_hash_receiver_port_entry
.
pack
(
side
=
"top"
)
self
.
udp_ip_label
=
tk
.
Label
(
self
,
text
=
"Enter udp_ip:"
)
self
.
udp_ip_label
.
pack
(
side
=
"top"
)
self
.
udp_ip_entry
=
tk
.
Entry
(
self
)
self
.
udp_ip_entry
.
pack
(
side
=
"top"
)
self
.
udp_port_label
=
tk
.
Label
(
self
,
text
=
"Enter server_port:"
)
self
.
udp_port_label
.
pack
(
side
=
"top"
)
self
.
udp_port_entry
=
tk
.
Entry
(
self
)
self
.
udp_port_entry
.
pack
(
side
=
"top"
)
# Submit button
self
.
submit_button
=
tk
.
Button
(
self
,
text
=
"Submit"
,
command
=
lambda
:
self
.
submit_url
(
"Receive"
))
self
.
submit_button
.
pack
(
side
=
"top"
)
# Back button
self
.
back_button
=
tk
.
Button
(
self
,
text
=
"Back"
,
command
=
self
.
back_of_receive
)
self
.
back_button
.
pack
(
side
=
"top"
)
def
submit_url
(
self
,
button_label
):
# Get the URL from the entry widget
# Do something with the URL (e.g. send or receive data)
if
(
button_label
==
"Send"
):
url
=
self
.
url_entry
.
get
()
namespace
=
self
.
namespace_entry
.
get
()
hash_server_ip
=
self
.
server_hash_udp_ip_entry
.
get
()
hash_server_port
=
int
(
self
.
server_hash_udp_port_entry
.
get
())
send_to_ip
=
self
.
server_udp_ip_entry
.
get
()
send_to_port
=
int
(
self
.
server_udp_port_entry
.
get
())
status
=
transmitter_main
(
hash_server_ip
,
hash_server_port
,
send_to_ip
,
send_to_port
,
url
,
namespace
)
# else:
# status = receive_main(_Endpoint_Url,_Namespace_Server,_udp_hash_receiver_ip,_udp_hash_receiver_port,_localIP,_localPort)
# Clear the entry widget
self
.
url_entry
.
delete
(
0
,
tk
.
END
)
def
back_of_send
(
self
):
# Remove existing widgets
self
.
url_label
.
pack_forget
()
self
.
url_entry
.
pack_forget
()
self
.
submit_button
.
pack_forget
()
self
.
back_button
.
pack_forget
()
self
.
namespace_label
.
pack_forget
()
self
.
namespace_entry
.
pack_forget
()
self
.
server_hash_udp_ip_entry
.
pack_forget
()
self
.
server_hash_udp_ip_label
.
pack_forget
()
self
.
server_hash_udp_port_entry
.
pack_forget
()
self
.
server_hash_udp_port_label
.
pack_forget
()
self
.
server_udp_ip_entry
.
pack_forget
()
self
.
server_udp_ip_label
.
pack_forget
()
self
.
server_udp_port_entry
.
pack_forget
()
self
.
server_udp_port_label
.
pack_forget
()
def
back_of_receive
(
self
):
self
.
url_label
.
pack_forget
()
self
.
url_entry
.
pack_forget
()
self
.
submit_button
.
pack_forget
()
self
.
back_button
.
pack_forget
()
self
.
namespace_label
.
pack_forget
()
self
.
namespace_entry
.
pack_forget
()
self
.
udp_hash_receiver_ip_entry
.
pack_forget
()
self
.
udp_hash_receiver_ip_label
.
pack_forget
()
self
.
udp_hash_receiver_port_entry
.
pack_forget
()
self
.
udp_hash_receiver_port_label
.
pack_forget
()
self
.
udp_ip_label
.
pack_forget
()
self
.
udp_port_label
.
pack_forget
()
self
.
udp_ip_entry
.
pack_forget
()
self
.
udp_port_entry
.
pack_forget
()
self
.
create_widgets
()
# Create the main window
root
=
tk
.
Tk
()
# Create the app
app
=
App
(
master
=
root
)
# Run the app
app
.
mainloop
()
opc_ua_transmitter_config.yaml
View file @
024a7d4f
configuration
:
configuration
:
server_hash_udp_ip
:
"
2.2.2.5"
server_hash_udp_ip
:
2.2.2.5
server_hash_udp_port
:
20002
server_hash_udp_port
:
20002
server_udp_ip
:
2.2.2.5
server_udp_ip
:
2.2.2.5
server_udp_port
:
20001
server_udp_port
:
20001
...
...
opcua_receiver.py
View file @
024a7d4f
...
@@ -44,4 +44,21 @@ def receive_packets():
...
@@ -44,4 +44,21 @@ def receive_packets():
datatype
,
value
,
ns
,
nodeid
=
message
.
decode
()
.
split
(
"&"
)
datatype
,
value
,
ns
,
nodeid
=
message
.
decode
()
.
split
(
"&"
)
opua
.
write_opcua_value
(
sock
,
ns
,
nodeid
,
value
,
datatype
)
opua
.
write_opcua_value
(
sock
,
ns
,
nodeid
,
value
,
datatype
)
receive_packets
()
def
receive_main
(
_Endpoint_Url
,
_Namespace_Server
,
_udp_hash_receiver_ip
,
_udp_hash_receiver_port
,
_localIP
,
_localPort
):
\ No newline at end of file
global
Endpoint_Url
global
Namespace_Server
global
udp_hash_receiver_ip
global
udp_hash_receiver_port
global
localIP
global
localPort
Endpoint_Url
=
_Endpoint_Url
Namespace_Server
=
_Namespace_Server
udp_hash_receiver_ip
=
_udp_hash_receiver_ip
udp_hash_receiver_port
=
_udp_hash_receiver_port
localIP
=
_localIP
localPort
=
_localPort
receive_packets
()
if
__name__
==
"__main__"
:
receive_packets
()
\ No newline at end of file
opcua_subscriber/__pycache__/opcua_subscribe.cpython-39.pyc
View file @
024a7d4f
No preview for this file type
opcua
-
transmitter.py
→
opcua
_
transmitter.py
View file @
024a7d4f
...
@@ -77,6 +77,26 @@ async def main() -> None:
...
@@ -77,6 +77,26 @@ async def main() -> None:
except
:
except
:
print
(
"No connection found for opcua server, check if the server is active"
)
print
(
"No connection found for opcua server, check if the server is active"
)
if
__name__
==
'__main__'
:
def
transmitter_main
(
_server_hash_udp_ip
,
_server_hash_udp_port
,
_server_udp_ip
,
_server_udp_port
,
_ENDPOINT
,
_NAMESPACE
):
global
server_hash_udp_ip
global
server_hash_udp_port
global
server_udp_ip
global
server_hash_udp_port
global
server_udp_ip
global
server_udp_port
global
serverAddressPort
global
ENDPOINT
global
NAMESPACE
server_hash_udp_ip
=
_server_hash_udp_ip
server_hash_udp_port
=
_server_hash_udp_port
server_udp_ip
=
_server_udp_ip
server_udp_port
=
_server_udp_port
serverAddressPort
=
(
_server_udp_ip
,
_server_udp_port
)
ENDPOINT
=
_ENDPOINT
NAMESPACE
=
_NAMESPACE
print
(
server_hash_udp_ip
,
server_hash_udp_port
,
server_udp_ip
,
server_hash_udp_port
,
server_udp_ip
,
server_udp_port
,
serverAddressPort
,
ENDPOINT
,
NAMESPACE
)
asyncio
.
run
(
main
())
asyncio
.
run
(
main
())
if
__name__
==
"__main__"
:
print
(
server_hash_udp_ip
,
server_hash_udp_port
,
server_udp_ip
,
server_hash_udp_port
,
server_udp_ip
,
server_udp_port
,
serverAddressPort
,
ENDPOINT
,
NAMESPACE
)
asyncio
.
run
(
main
())
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment