Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
PSSF - Zigbee
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
GitLab community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
michael.divia
PSSF - Zigbee
Commits
51aa41b0
Commit
51aa41b0
authored
6 months ago
by
michael.divia
Browse files
Options
Downloads
Patches
Plain Diff
Rendu Labo
parents
Branches
Branches containing commit
No related tags found
No related merge requests found
Changes
2
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
Coordinateur/main.py
+140
-0
140 additions, 0 deletions
Coordinateur/main.py
Router with ESP32/boot.py
+226
-0
226 additions, 0 deletions
Router with ESP32/boot.py
with
366 additions
and
0 deletions
Coordinateur/main.py
0 → 100644
+
140
−
0
View file @
51aa41b0
import
tkinter
as
tk
import
serial
import
threading
import
time
import
winsound
# For beep on Windows
# --- For Windows ---
PORT
=
"
COM6
"
BAUDRATE
=
115200
DOT_DURATION_MS
=
200
# Dot: 200 ms
DASH_DURATION_MS
=
600
# Dash: 600 ms (3x dot)
##############################################################################
# 1. Open the Serial Port and Send "B\r\n" to Pass Bootloader
##############################################################################
try
:
ser
=
serial
.
Serial
(
PORT
,
BAUDRATE
,
timeout
=
1
)
# Give the serial port some time before sending
time
.
sleep
(
1
)
# Send "B\r\n" to pass the bootloader, as required by your XBee setup
ser
.
write
(
b
"
B
\r\n
"
)
print
(
"
Sent
'
B
\\
r
\\
n
'
to XBee bootloader.
"
)
except
Exception
as
e
:
print
(
f
"
Error opening serial port
{
PORT
}
:
{
e
}
"
)
ser
=
None
##############################################################################
# 2. Background Thread to Continuously Read Incoming Data
##############################################################################
def
read_serial_forever
(
app
):
"""
Continuously reads from the XBee and forwards
'
.
'
or
'
-
'
to the GUI.
"""
if
not
ser
:
return
while
True
:
try
:
raw_line
=
ser
.
readline
()
line
=
raw_line
.
decode
(
"
utf-8
"
,
errors
=
"
ignore
"
).
strip
()
if
line
:
if
line
in
[
"
.
"
,
"
-
"
]:
app
.
handle_signal
(
line
)
else
:
print
(
"
Unknown/extra data:
"
,
line
)
except
Exception
as
e
:
print
(
"
Error reading serial:
"
,
e
)
break
##############################################################################
# 3. Tkinter GUI with a Circular 'LED'
##############################################################################
class
App
:
def
__init__
(
self
,
root
):
self
.
root
=
root
self
.
root
.
title
(
"
XBee Morse LED
"
)
# Create a canvas to draw a circle as the 'LED'
self
.
canvas
=
tk
.
Canvas
(
root
,
width
=
100
,
height
=
100
,
bg
=
"
white
"
,
highlightthickness
=
0
)
self
.
canvas
.
pack
(
padx
=
20
,
pady
=
20
)
# Draw a gray circle to represent the LED off
r
=
40
# radius of the circle
x0
=
50
-
r
y0
=
50
-
r
x1
=
50
+
r
y1
=
50
+
r
# Create an oval with outline + fill
self
.
led_circle
=
self
.
canvas
.
create_oval
(
x0
,
y0
,
x1
,
y1
,
fill
=
"
gray
"
,
outline
=
"
black
"
,
width
=
2
)
def
handle_signal
(
self
,
signal
):
"""
Called from the reader thread when a
'
.
'
or
'
-
'
arrives.
Schedules the LED flash + beep on the main thread.
"""
self
.
root
.
after
(
0
,
self
.
_flash_led
,
signal
)
def
_flash_led
(
self
,
signal
):
"""
Flash the LED circle with a color and play a beep.
'
.
'
= green (dot), 200 ms
'
-
'
= red (dash), 600 ms (3x dot)
After that time, revert to gray.
"""
if
signal
==
"
.
"
:
color
=
"
green
"
duration_ms
=
DOT_DURATION_MS
else
:
# '-'
color
=
"
red
"
duration_ms
=
DASH_DURATION_MS
# Update LED color
self
.
canvas
.
itemconfig
(
self
.
led_circle
,
fill
=
color
)
# Play a beep in a background thread so GUI won't freeze
beep_thread
=
threading
.
Thread
(
target
=
self
.
_play_beep
,
args
=
(
1000
,
duration_ms
),
# beep at 1000 Hz for the duration
)
beep_thread
.
start
()
# Schedule turning off the LED after duration_ms
self
.
root
.
after
(
duration_ms
,
self
.
_turn_off_led
)
def
_play_beep
(
self
,
freq
,
dur_ms
):
"""
Plays a beep using winsound.Beep(freq, duration).
This is a blocking call, so we run it in a thread.
"""
winsound
.
Beep
(
freq
,
dur_ms
)
def
_turn_off_led
(
self
):
"""
Turn LED back to gray/off.
"""
self
.
canvas
.
itemconfig
(
self
.
led_circle
,
fill
=
"
gray
"
)
##############################################################################
# 4. Main Function
##############################################################################
def
main
():
root
=
tk
.
Tk
()
app
=
App
(
root
)
if
ser
:
# Start a background thread that reads from the serial port
t
=
threading
.
Thread
(
target
=
read_serial_forever
,
args
=
(
app
,),
daemon
=
True
)
t
.
start
()
# Start the Tk event loop
root
.
mainloop
()
if
__name__
==
"
__main__
"
:
main
()
This diff is collapsed.
Click to expand it.
Router with ESP32/boot.py
0 → 100644
+
226
−
0
View file @
51aa41b0
import
network
import
socket
import
utime
import
time
from
machine
import
UART
##############################################################################
# I AM ROUTER 4
##############################################################################
print
(
"
I AM ROUTER 4
"
)
##############################################################################
# 1. Wi-Fi Configuration
##############################################################################
WIFI_SSID
=
"
Yes
"
WIFI_PASS
=
"
No
"
def
connect_wifi
():
sta_if
=
network
.
WLAN
(
network
.
STA_IF
)
sta_if
.
active
(
True
)
if
not
sta_if
.
isconnected
():
print
(
"
Connecting to Wi-Fi...
"
)
sta_if
.
connect
(
WIFI_SSID
,
WIFI_PASS
)
while
not
sta_if
.
isconnected
():
time
.
sleep
(
1
)
print
(
"
Connected to Wi-Fi:
"
,
sta_if
.
ifconfig
())
##############################################################################
# 2. UART Configuration (to XBee in Transparent Mode)
##############################################################################
# TX=17, RX=18, 115200 baud
xbee_uart
=
UART
(
1
,
baudrate
=
115200
,
tx
=
17
,
rx
=
18
)
xbee_uart
.
write
(
"
B
\r\n
"
)
def
xbee_send_at
(
cmd
):
"""
Sends an AT command in Transparent (AT) Mode:
1) Enter command mode with +++
2) Send the command + CR
3) (Optionally) wait/parse response
4) Exit command mode
This is a minimal approach for example purposes.
"""
# Enter command mode
xbee_uart
.
write
(
b
"
+++
"
)
time
.
sleep
(
1
)
# Wait for guard time
# Send the command
xbee_uart
.
write
(
cmd
+
b
"
\r
"
)
time
.
sleep
(
0.2
)
def
xbee_set_destination
(
addr64
):
"""
addr64 is a 16-hex-character string, e.g.
'
0013A20041EC4B57
'
.
We
'
ll split into DH (high 8 hex) and DL (low 8 hex).
"""
if
len
(
addr64
)
!=
16
:
print
(
"
Invalid address length:
"
,
addr64
)
return
dh
=
addr64
[
0
:
8
]
dl
=
addr64
[
8
:
16
]
print
(
"
Setting XBee destination => DH:
"
,
dh
,
"
, DL:
"
,
dl
)
xbee_send_at
(
b
"
ATDH
"
+
dh
.
encode
(
"
ascii
"
))
xbee_send_at
(
b
"
ATDL
"
+
dl
.
encode
(
"
ascii
"
))
xbee_send_at
(
b
"
ATWR
"
)
# Write settings
xbee_send_at
(
b
"
ATCN
"
)
# Exit command mode
##############################################################################
# 3. Data / State
##############################################################################
# Dictionary mapping device NAME to 64-bit ADDRESS
TARGETS
=
{
#"Router_4": "0013A2004229C660",
"
Router_13
"
:
"
0013A20041EC4B57
"
,
"
Coordinator
"
:
"
0013A200422BC20D
"
}
# We store the selected device NAME here (not the address).
selected_target_name
=
None
# For measuring press duration
morse_press_start
=
0
DOT_DASH_THRESHOLD
=
0.1
##############################################################################
# 4. Web Server
##############################################################################
def
build_html
():
# Build the <option> tags (unchanged)
options_html
=
""
for
name
,
addr
in
TARGETS
.
items
():
selected
=
"
selected
"
if
(
name
==
selected_target_name
)
else
""
options_html
+=
f
'
<option value=
"
{
name
}
"
{
selected
}
>
{
name
}
</option>
\n
'
# Show the NAME in “Destination actuelle:”
current_dest
=
selected_target_name
if
selected_target_name
else
"
(Aucune)
"
# Add JavaScript for Space bar
# - keydown: fetch('/bip?event=down')
# - keyup: fetch('/bip?event=up')
#
# We track 'bipPressed' to avoid multiple "down" events if user holds space.
script_js
=
"""
\
<script>
let bipPressed = false;
document.addEventListener(
'
keydown
'
, function(e) {
if (e.code ===
'
Space
'
) {
if (!bipPressed) {
bipPressed = true;
fetch(
'
/bip?event=down
'
);
}
e.preventDefault(); // Prevent scrolling
}
});
document.addEventListener(
'
keyup
'
, function(e) {
if (e.code ===
'
Space
'
) {
bipPressed = false;
fetch(
'
/bip?event=up
'
);
e.preventDefault(); // Prevent scrolling
}
});
</script>
"""
html
=
f
"""
\
HTTP/1.1 200 OK
\r
Content-Type: text/html
\r
\r
<!DOCTYPE html>
<html>
<head>
<meta charset=
"
utf-8
"
/>
<title>ESP32 XBee Control</title>
</head>
<body>
<h1>ESP32 XBee Control</h1>
<form action=
"
/
"
method=
"
GET
"
id=
"
targetForm
"
>
<label for=
"
targetSelect
"
>Sélectionnez la destination:</label>
<select name=
"
target
"
id=
"
targetSelect
"
onchange=
"
this.form.submit()
"
>
{
options_html
}
</select>
<p>Destination actuelle:
{
current_dest
}
</p>
</form>
<hr/>
<!-- “BIP” button with onmousedown/onmouseup logic -->
<button
style=
"
font-size: 1.5em; padding: 1em;
"
onmousedown=
"
fetch(
'
/bip?event=down
'
)
"
onmouseup=
"
fetch(
'
/bip?event=up
'
)
"
>
BIP
</button>
{
script_js
}
</body>
</html>
"""
return
html
def
start_server
():
global
selected_target_name
global
morse_press_start
s
=
socket
.
socket
()
s
.
setsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_REUSEADDR
,
1
)
s
.
bind
((
'
0.0.0.0
'
,
8080
))
s
.
listen
(
5
)
while
True
:
conn
,
cli
=
s
.
accept
()
request
=
conn
.
recv
(
1024
).
decode
()
# 1) If user changed the dropdown:
# GET /?target=Router_13 ...
if
"
GET /?target=
"
in
request
:
part
=
request
.
split
(
"
GET /?target=
"
)[
1
]
chosen_name
=
part
.
split
(
"
"
)[
0
]
# up to space or &
if
"
&
"
in
chosen_name
:
chosen_name
=
chosen_name
.
split
(
"
&
"
)[
0
]
# Validate
if
chosen_name
in
TARGETS
:
selected_target_name
=
chosen_name
addr64
=
TARGETS
[
chosen_name
]
print
(
"
User picked:
"
,
chosen_name
,
"
=> address:
"
,
addr64
)
xbee_set_destination
(
addr64
)
else
:
print
(
"
Unknown target name:
"
,
chosen_name
)
# 2) If the user pressed/released the “BIP” button
if
"
GET /bip?event=down
"
in
request
:
morse_press_start
=
utime
.
ticks_ms
()
print
(
"
BIP DOWN
"
)
if
"
GET /bip?event=up
"
in
request
:
press_duration
=
utime
.
ticks_diff
(
utime
.
ticks_ms
(),
morse_press_start
)
press_duration
=
press_duration
/
1000.0
morse_press_start
=
0
symbol
=
"
.
"
if
press_duration
<
DOT_DASH_THRESHOLD
else
"
-
"
print
(
f
"
BIP UP =>
{
symbol
}
, duration
{
press_duration
:
.
2
f
}
s
"
)
# Send that symbol via UART
xbee_uart
.
write
(
symbol
.
encode
(
"
ascii
"
)
+
b
"
\r\n
"
)
time
.
sleep
(
0.5
)
# Always serve the same page
response
=
build_html
()
conn
.
sendall
(
response
.
encode
())
conn
.
close
()
##############################################################################
# 5. Main Boot
##############################################################################
def
main
():
connect_wifi
()
start_server
()
main
()
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment