Lietot ja met kļūdu X_LINK_DEVICE_NOT_FOUND
python -m venv oakenv .\oakenv\Scripts\activate pip install --upgrade pip pip install PySimpleGUI pip install depthai python -m pip install --upgrade depthai pip install pillow
#!/usr/bin/env python3
USE_OPENCV = False
from datetime import timedelta
import depthai as dai
import tempfile
import PySimpleGUI as sg
import sys
from typing import Dict
import platform
import os
import numpy
if USE_OPENCV:
# import cv2
pass
else:
import io
from PIL import Image
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
PLATFORM_ICON_PATH = None
if platform.system() == 'Windows':
PLATFORM_ICON_PATH = f'{SCRIPT_DIR}/assets/icon.ico'
else:
PLATFORM_ICON_PATH = f'{SCRIPT_DIR}/assets/icon.png'
# Apply icon globally
sg.set_global_icon(PLATFORM_ICON_PATH)
CONF_TEXT_POE = ['ipTypeText', 'ipText', 'maskText', 'gatewayText', 'dnsText', 'dnsAltText', 'networkTimeoutText', 'macText']
CONF_INPUT_POE = ['staticBut', 'dynamicBut', 'ip', 'mask', 'gateway', 'dns', 'dnsAlt', 'networkTimeout', 'mac']
CONF_TEXT_USB = ['usbTimeoutText', 'usbSpeedText']
CONF_INPUT_USB = ['usbTimeout', 'usbSpeed']
USB_SPEEDS = ["UNKNOWN", "LOW", "FULL", "HIGH", "SUPER", "SUPER_PLUS"]
def PrintException():
exc_type, exc_obj, tb = sys.exc_info()
f = tb.tb_frame
lineno = tb.tb_lineno
filename = f.f_code.co_filename
print('Exception in {}, line {}; {}'.format(filename, lineno, exc_obj))
def Popup(msg, window):
main_window_location = window.CurrentLocation()
main_window_size = window.size
# Calculate the centered location for the new window
centered_location = (main_window_location[0] + (main_window_size[0] - 50) // 2,
main_window_location[1] + (main_window_size[1] - 50) // 2)
return sg.Popup(msg, location=centered_location)
def check_ip(window, s: str, req = True):
if s == "":
return not req
spl = s.split(".")
if len(spl) != 4:
Popup("Wrong IP format.\nValue should be similar to 255.255.255.255", window=window)
return False
for num in spl:
if 255 < int(num):
Popup("Wrong IP format.\nValues can not be above 255!", window=window)
return False
return True
def check_mac(window, s):
if s.count(":") != 5:
Popup("Wrong MAC format.\nValue should be similar to FF:FF:FF:FF:FF:FF", window=window)
return False
for i in s.split(":"):
for j in i:
if j > "F" or (j < "A" and not j.isdigit()) or len(i) != 2:
Popup("Wrong MAC format.\nValue should be similar to FF:FF:FF:FF:FF:FF", window=window)
return False
return True
class Progress:
def __init__(self, txt = 'Flashing progress: 0.0%'):
layout = [
[sg.Text(txt, key='txt')],
[sg.ProgressBar(1.0, orientation='h', size=(20,20), key='progress')],
]
self.window = sg.Window("Progress", layout, modal=True, finalize=True)
def update(self, val):
self.window['progress'].update(val)
self.window['txt'].update(f'Flashing progress: {val*100:.1f}%')
def finish(self, msg):
self.window.close()
sg.Popup(msg)
class SelectBootloader:
def __init__(self, options, text):
self.ok = False
layout = [
[sg.Text(text)],
[
sg.Text("Bootloader type:", font=('Arial', 10, 'bold'), text_color="black"),
sg.Combo(options, options[0], size=(30, 3), key="bootType"),
],
[sg.Text("Warning! This may soft-brick your device,\nproceed if you are sure what you are doing!")],
[sg.Submit(), sg.Cancel()],
]
self.window = sg.Window("Select bootloader", layout, size=(300,150), modal=True, finalize=True)
def wait(self):
event, values = self.window.Read()
self.window.close()
if values is not None:
type = getattr(dai.DeviceBootloader.Type, values['bootType'])
return (str(event) == "Submit", type)
else:
return (False, None)
class AreYouSure:
def __init__(self, text):
self.ok = False
layout = [
[sg.Text(text)],
[sg.Submit(), sg.Cancel()],
]
self.window = sg.Window("Are You Sure?", layout, size=(450,150), modal=True, finalize=True)
def wait(self):
event, values = self.window.Read()
self.window.close()
return str(event) == "Submit"
class SelectIP:
def __init__(self, window):
self.ok = False
layout = [
[sg.Text("Specify the custom IP of the OAK PoE\ncamera you want to connect to")],
[
sg.Text("IPv4:", font=('Arial', 10, 'bold'), text_color="black"),
sg.InputText(key="ip", size=(16, 2)),
],
[sg.Submit(), sg.Cancel()],
]
self.window = sg.Window("Specify IP", layout, size=(300,110), modal=True, finalize=True)
main_window_location = window.CurrentLocation()
main_window_size = window.size
new_window_size = self.window.size
# Calculate the centered location for the new window
centered_location = (main_window_location[0] + (main_window_size[0] - new_window_size[0]) // 2,
main_window_location[1] + (main_window_size[1] - new_window_size[1]) // 2)
self.window.move(*centered_location)
def wait(self):
event, values = self.window.Read()
self.window.close()
if str(event) == "Cancel" or values is None or not check_ip(self.window, values["ip"]):
return False, ""
return True, values["ip"]
class SearchDevice:
def __init__(self, window):
self.infos = []
layout = [
[sg.Text("Select an OAK camera you would like to connect to.", font=('Arial', 10, 'bold'))],
[sg.Table(values=[], headings=["DeviceId", "Name", "State"],
# col_widths=[25, 17, 17],
# def_col_width=25,
# max_col_width=200,
# background_color='light blue',
display_row_numbers=True,
justification='right',
num_rows=8,
alternating_row_color='lightyellow',
key='table',
row_height=35,
# size=(500, 300)
expand_x = True,
enable_events = True,
enable_click_events = True,
)
],
[sg.Button('Search', size=(15, 2), font=('Arial', 10, 'bold'))],
]
self.window = sg.Window("Select Device", layout, size=(550,375), modal=True, finalize=True)
main_window_location = window.CurrentLocation()
main_window_size = window.size
new_window_size = self.window.size
# Calculate the centered location for the new window
centered_location = (main_window_location[0] + (main_window_size[0] - new_window_size[0]) // 2,
main_window_location[1] + (main_window_size[1] - new_window_size[1]) // 2)
self.window.move(*centered_location)
self.search_devices()
def search_devices(self):
self.infos = dai.XLinkConnection.getAllConnectedDevices()
if not self.infos:
pass
# sg.Popup("No devices found.")
else:
rows = []
for info in self.infos:
if "GATE" in info.state.name: continue # Skip RVC4 devices
rows.append([info.getDeviceId(), info.name, deviceStateTxt(info.state)])
self.window['table'].update(values=rows)
def wait(self) -> dai.DeviceInfo:
while True:
event, values = self.window.Read()
if event is None:
self.window.close()
return None
elif str(event) == 'Search':
self.search_devices()
elif len(event) == 3 and event[0] == "table" and event[1] == "+CLICKED+":
# User selected a device
deviceIndex = event[2][0]
if deviceIndex is not None:
deviceSelected = self.infos[deviceIndex]
self.window.close()
return deviceSelected
def flashBootloader(bl: dai.DeviceBootloader, device: dai.DeviceInfo, type: dai.DeviceBootloader.Type):
userBlWarningMessage = """Updating bootloader can soft-brick a device.
Proceed with caution"""
factoryBlWarningMessage = """Factory Bootloader type or version doesn't support User Bootloader flashing.
Factory bootloader will be updated instead.
Proceed with caution
"""
try:
if bl.isUserBootloaderSupported():
if AreYouSure(text=userBlWarningMessage).wait():
pr = Progress('Flashing...')
progress = lambda p : pr.update(p)
bl.flashUserBootloader(progress)
pr.finish("Flashed newest User Bootloader version.")
else:
return False
elif AreYouSure(text=factoryBlWarningMessage).wait():
bl.close()
pr = Progress('Connecting...')
bl = dai.DeviceBootloader(device, True)
progress = lambda p : pr.update(p)
if type == dai.DeviceBootloader.Type.AUTO:
type = bl.getType()
bl.flashBootloader(memory=dai.DeviceBootloader.Memory.FLASH, type=type, progressCallback=progress)
pr.finish("Flashed newest Factory Bootloader version.")
else:
return False
except Exception as ex:
PrintException()
sg.Popup(f'{ex}')
return True
# Danger
def flashFactoryBootloader(device: dai.DeviceInfo, type: dai.DeviceBootloader.Type):
try:
pr = Progress('Connecting...')
bl = dai.DeviceBootloader(device, True)
progress = lambda p : pr.update(p)
if type == dai.DeviceBootloader.Type.AUTO:
type = bl.getType()
bl.flashBootloader(memory=dai.DeviceBootloader.Memory.FLASH, type=type, progressCallback=progress)
pr.finish("Flashed newest bootloader version.")
except Exception as ex:
PrintException()
sg.Popup(f'{ex}')
def factoryReset(device: dai.DeviceInfo, type: dai.DeviceBootloader.Type):
try:
pr = Progress('Preparing and connecting...')
bl = dai.DeviceBootloader(device, True)
progress = lambda p : pr.update(p)
if type == dai.DeviceBootloader.Type.AUTO:
type = bl.getType()
tmpBlFw = None
try:
blBinary = dai.DeviceBootloader.getEmbeddedBootloaderBinary(type)
# Clear 1 MiB for USB BL and 8 MiB for NETWORK BL
mib = 1 if type == dai.DeviceBootloader.Type.USB else 8
blBinary = blBinary + ([0xFF] * ((mib * 1024 * 1024 + 512) - len(blBinary)))
tmpBlFw = tempfile.NamedTemporaryFile(delete=False)
tmpBlFw.write(bytes(blBinary))
tmpBlFw.flush()
success, msg = bl.flashBootloader(progress, tmpBlFw.name)
except Exception as ex:
# In some depthai Python builds vector<uint8_t> return conversion is unavailable.
if "Unable to convert function return value to a Python type" in str(ex):
success, msg = bl.flashBootloader(
memory=dai.DeviceBootloader.Memory.FLASH,
type=type,
progressCallback=progress
)
else:
raise
finally:
if tmpBlFw is not None:
tmpBlFw.close()
msg = "Factory reset was successful." if success else f"Factory reset failed. Error: {msg}"
pr.finish(msg)
except Exception as ex:
PrintException()
sg.Popup(f'{ex}')
def flashFromFile(file, bl: dai.DeviceBootloader):
try:
if str(file)[-3:] == "dap":
pr = Progress('Flashing application...')
progress = lambda p : pr.update(p)
with open(file, mode = 'rb') as f:
dap = list(f.read())
success, msg = bl.flashDepthaiApplicationPackage(progress, dap)
msg = "Flashing application was successful." if success else f"Flashing application failed. Error: {msg}"
pr.finish(msg)
else:
sg.Popup("Selected file is not .dap!")
except Exception as ex:
PrintException()
sg.Popup(f'{ex}')
def recoveryMode(bl: dai.DeviceBootloader):
try:
bl.bootUsbRomBootloader()
return True
except Exception as ex:
PrintException()
sg.Popup(f'{ex}')
return False
def connectToDevice(device: dai.DeviceInfo) -> dai.DeviceBootloader:
try:
bl = dai.DeviceBootloader(device, False)
return bl
except Exception as ex:
PrintException()
sg.Popup(f'{ex}')
def deviceStateTxt(state: dai.XLinkDeviceState) -> str:
return str(state).replace("XLinkDeviceState.X_LINK_", "")
# Start defying layout
sg.theme('LightGrey2')
# layout for device tab
aboutDeviceLayout = [
[sg.Text("About device", size=(30, 1), font=('Arial', 30, 'bold'), text_color="black")],
[sg.HSeparator()],
[
sg.Button("About device", size=(15, 1), font=('Arial', 10, 'bold'), disabled=True, key="_unique_aboutBtn"),
sg.Button("Config", size=(15, 1), font=('Arial', 10, 'bold'), disabled=False, key="_unique_configBtn"),
sg.Button("Application", size=(15, 1), font=('Arial', 10, 'bold'), disabled=False, key="_unique_appBtn"),
sg.Button("Danger Zone", size=(15, 1), font=('Arial', 10, 'bold'), button_color='#FF0500', disabled=False, key="_unique_dangerBtn"),
],
[sg.HSeparator()],
[
sg.Text("Select device: ", size=(15, 1), font=('Arial', 10, 'bold'), text_color="black"),
sg.Combo([], "Select device", size=(30, 5), key="devices", enable_events=True),
sg.Button("Search", font=('Arial', 10, 'bold')),
sg.Button("Specify IP")
],
[
sg.Text("Name of connected device:", size=(30, 1), font=('Arial', 10, 'bold'), text_color="black"),
sg.VSeparator(),
sg.Text("Device state:", size=(30, 1), font=('Arial', 10, 'bold'), text_color="black")
],
[sg.Text("-name-", key="devName", size=(30, 1)), sg.VSeparator(), sg.Text("-state-", key="devState", size=(30, 1))],
[
sg.Text("Bootloader type:", size=(30, 1), font=('Arial', 10, 'bold'), text_color="black"),
sg.VSeparator(),
sg.Text("IP Assignment:", size=(30, 1), font=('Arial', 10, 'bold'), text_color="black")
],
[
sg.Text("-bootloader type-", key="bootloaderType", size=(30, 1)),
sg.VSeparator(),
sg.Text("-assignment-", key="ipassignment", size=(30, 1))
],
[
sg.Text("Version of newest bootloader:", size=(30, 1), font=('Arial', 10, 'bold'), text_color="black"),
sg.VSeparator(),
sg.Text("Flashed bootloader version:", size=(30, 1), font=('Arial', 10, 'bold'), text_color="black")
],
[
sg.Text("-version-", key="newBoot", size=(30, 1)),
sg.VSeparator(),
sg.Text("-version-", key="currBoot", size=(30, 1))
],
[
sg.Text("Current Depthai version:", size=(30, 1), font=('Arial', 10, 'bold'), text_color="black"),
sg.VSeparator(),
sg.Text("Current commit:", size=(30, 1), font=('Arial', 10, 'bold'), text_color="black"),
],
[
sg.Text("-version-", key="version", size=(30, 1)),
sg.VSeparator(),
sg.Text("-version-", key="commit", size=(31, 1))
]
]
# layout for config tab
deviceConfigLayout = [
[sg.Text("Configuration settings", size=(20, 1), font=('Arial', 30, 'bold'), text_color="black")],
[sg.HSeparator()],
[
sg.Button("About device", size=(15, 1), font=('Arial', 10, 'bold'), disabled=False, key="_unique_aboutBtn"),
sg.Button("Config", size=(15, 1), font=('Arial', 10, 'bold'), disabled=True, key="_unique_configBtn"),
sg.Button("Application", size=(15, 1), font=('Arial', 10, 'bold'), disabled=False, key="_unique_appBtn"),
sg.Button("Danger Zone", size=(15, 1), font=('Arial', 10, 'bold'), button_color='#FF0500', disabled=False, key="_unique_dangerBtn"),
],
[sg.HSeparator()],
[
sg.Text("IPv4 type:", size=(30, 1), font=('Arial', 10, 'bold'), text_color="gray", key="ipTypeText"),
sg.Radio('Static', "ipType", default=True, font=('Arial', 10, 'bold'), text_color="black",
key="staticBut", enable_events=True, disabled=True),
sg.Radio('Dynamic', "ipType", default=False, font=('Arial', 10, 'bold'), text_color="black",
key="dynamicBut", enable_events=True, disabled=True)
],
[
sg.Text("IPv4:", size=(12, 1), font=('Arial', 10, 'bold'), text_color="gray", key="ipText"),
sg.InputText(key="ip", size=(16, 2), disabled=True),
sg.Text("Mask:", size=(5, 1), font=('Arial', 10, 'bold'), text_color="gray", key="maskText"),
sg.InputText(key="mask", size=(16, 2), disabled=True),
sg.Text("Gateway:", size=(8, 1), font=('Arial', 10, 'bold'), text_color="gray", key="gatewayText"),
sg.InputText(key="gateway", size=(16, 2), disabled=True)
],
[
sg.Text("DNS name:", size=(30, 1), font=('Arial', 10, 'bold'), text_color="gray", key="dnsText"),
sg.InputText(key="dns", size=(30, 2), disabled=True)
],
[
sg.Text("Alt DNS name:", size=(30, 1), font=('Arial', 10, 'bold'), text_color="gray", key="dnsAltText"),
sg.InputText(key="dnsAlt", size=(30, 2), disabled=True)
],
[
sg.Text("Network timeout [ms]:", size=(30, 1), font=('Arial', 10, 'bold'), text_color="gray", key="networkTimeoutText"),
sg.InputText(key="networkTimeout", size=(30, 2), disabled=True)
],
[
sg.Text("MAC address:", size=(30, 1), font=('Arial', 10, 'bold'), text_color="gray", key="macText"),
sg.InputText(key="mac", size=(30, 2), disabled=True)
],
[sg.HSeparator()],
[
sg.Text("USB timeout [ms]:", size=(30, 1), font=('Arial', 10, 'bold'), text_color="gray", key="usbTimeoutText"),
sg.InputText(key="usbTimeout", size=(30, 2), disabled=True)
],
[
sg.Text("USB max speed:", size=(30, 1), font=('Arial', 10, 'bold'), text_color="gray", key="usbSpeedText"),
sg.Combo(USB_SPEEDS, "Select speed", key="usbSpeed", size=(30, 6), disabled=True)
],
[sg.HSeparator()],
[
sg.Text("", size=(8, 2)),
sg.Button("Flash configuration", size=(15, 2), font=('Arial', 10, 'bold'), disabled=True,
button_color='#FFA500'),
sg.Button("Clear configuration", size=(15, 2), font=('Arial', 10, 'bold'), disabled=True,
button_color='#FFA500'),
sg.Button("View configuration", size=(15, 2), font=('Arial', 10, 'bold'), disabled=True,
button_color='#FFA500'),
],
]
# layout for app tab
appLayout = [
[sg.Text("Application settings", size=(20, 1), font=('Arial', 30, 'bold'), text_color="black")],
[sg.HSeparator()],
[
sg.Button("About device", size=(15, 1), font=('Arial', 10, 'bold'), disabled=False, key="_unique_aboutBtn"),
sg.Button("Config", size=(15, 1), font=('Arial', 10, 'bold'), disabled=False, key="_unique_configBtn"),
sg.Button("Application", size=(15, 1), font=('Arial', 10, 'bold'), disabled=True, key="_unique_appBtn"),
sg.Button("Danger Zone", size=(15, 1), font=('Arial', 10, 'bold'), button_color='#FF0500', disabled=False, key="_unique_dangerBtn"),
],
[sg.HSeparator()],
[
sg.Button("Flash application", size=(15, 2), font=('Arial', 10, 'bold'), disabled=True,
button_color='#FFA500'),
sg.Button("Remove application", size=(15, 2), font=('Arial', 10, 'bold'), disabled=True,
button_color='#FFA500'),
],
]
# layout for app tab
dangerLayout = [
[sg.Text("Danger Zone", size=(20, 1), font=('Arial', 30, 'bold'), text_color="black")],
[sg.HSeparator()],
[
sg.Button("About device", size=(15, 1), font=('Arial', 10, 'bold'), disabled=False, key="_unique_aboutBtn"),
sg.Button("Config", size=(15, 1), font=('Arial', 10, 'bold'), disabled=False, key="_unique_configBtn"),
sg.Button("Application", size=(15, 1), font=('Arial', 10, 'bold'), disabled=False, key="_unique_appBtn"),
sg.Button("Danger Zone", size=(15, 1), font=('Arial', 10, 'bold'), button_color='#FF0500', disabled=True, key="_unique_dangerBtn"),
],
[sg.HSeparator()],
[
sg.Button("Update Bootloader", size=(20, 2), font=('Arial', 10, 'bold'), disabled=True,
button_color='#FFA500'),
sg.Button("Flash Factory Bootloader", size=(20, 2), font=('Arial', 10, 'bold'), disabled=True,
button_color='#FFA500', key='flashFactoryBootloader'),
],
[sg.HSeparator()],
[
sg.Button("Factory reset", size=(17, 2), font=('Arial', 10, 'bold'), disabled=True, button_color='#FFA500'),
sg.Button("Boot into USB\nRecovery mode", size=(20, 2), font=('Arial', 10, 'bold'), disabled=True,
key='recoveryMode', button_color='#FFA500')
]
]
# layout of whole GUI with closed tabs set to false
layout = [
[
sg.Column(aboutDeviceLayout, key='-COL1-'),
sg.Column(deviceConfigLayout, visible=False, key='-COL2-'),
sg.Column(appLayout, visible=False, key='-COL3-'),
sg.Column(dangerLayout, visible=False, key='-COL4-'),
]
]
class DeviceManager:
devices: Dict[str, dai.DeviceInfo] = dict()
values: Dict = None
def __init__(self) -> None:
self.window = sg.Window(title="Device Manager",
icon=PLATFORM_ICON_PATH,
layout=layout,
size=(645, 380),
finalize=True # So we can do First search for devices
)
self.bl: dai.DeviceBootloader = None
# First device search
self.getDevices()
def isPoE(self) -> bool:
try:
return self.bl.getType() == dai.DeviceBootloader.Type.NETWORK
except Exception as ex:
PrintException()
Popup(f'{ex}', self.window)
def isUsb(self) -> bool:
return not self.isPoE()
def run(self) -> None:
while True:
event, self.values = self.window.read()
if event == sg.WIN_CLOSED:
break
dev = self.values['devices']
if event == "devices":
if dev != "Select device":
# "allow flashing bootloader" boots latest bootloader first
# which makes the information of current bootloader, etc.. not correct (can be checked by "isEmbeddedVersion")
# So leave it to false, uncomment the isEmbeddedVersion below and only boot into latest bootlaoder upon the request to flash new bootloader
# bl = dai.DeviceBootloader(devices[values['devices']], False)
device = self.device
if deviceStateTxt(device.state) == "BOOTED":
# device is already booted somewhere else
Popup("Device is already booted somewhere else!", self.window)
else:
self.resetGui()
self.bl = connectToDevice(device)
if self.bl is None: continue
self.getConfigs()
self.unlockConfig()
else:
self.window.Element('progress').update("No device selected.")
elif event == "Search":
self.getDevices() # Re-search devices for dropdown
selDev = SearchDevice(window=self.window)
di = selDev.wait()
if di is not None:
self.resetGui()
self.addDeviceInfo(di) # Add device info to devices, if it's not there yet
self.window.Element('devices').update(di.getDeviceId())
_, self.values = self.window.read(1)
self.bl = connectToDevice(di)
print('after connecting to device,', self.bl)
if self.bl is None: continue
self.getConfigs()
self.unlockConfig()
elif event == "Specify IP":
select = SelectIP(window=self.window)
ok, ip = select.wait()
if ok:
self.resetGui()
di = dai.DeviceInfo(ip)
di.state = dai.XLinkDeviceState.X_LINK_BOOTLOADER
di.protocol = dai.XLinkProtocol.X_LINK_TCP_IP
self.devices[ip] = di # Add to devices dict
self.window.Element('devices').update(ip) # Show to user
_, self.values = self.window.read(1)
self.bl = connectToDevice(di)
if self.bl is None: continue
self.getConfigs()
self.unlockConfig()
# Danger
elif event == "Update Bootloader":
# Use current type
if flashBootloader(self.bl, self.device, self.bl.getType()):
# Device will reboot, close previous and reset GUI
self.closeDevice()
self.resetGui()
self.getDevices()
else:
print("Flashing bootloader canceled.")
elif event == "flashFactoryBootloader":
sel = SelectBootloader(['AUTO', 'USB', 'NETWORK'], "Select bootloader type to flash.")
ok, type = sel.wait()
if ok:
# We will reconnect, as we need to set allowFlashingBootloader to True
self.closeDevice()
flashFactoryBootloader(self.device, type)
# Device will reboot, close previous and reset GUI
self.closeDevice()
self.resetGui()
self.getDevices()
else:
print("Flashing Factory bootloader cancelled.")
elif event == "Factory reset":
sel = SelectBootloader(['NETWORK', 'USB'], "Select bootloader type used for factory reset.")
ok, type = sel.wait()
if ok:
# We will reconnect, as we need to set allowFlashingBootloader to True
self.closeDevice()
factoryReset(self.device, type)
# Device will reboot, close previous and reset GUI
self.closeDevice()
self.resetGui()
self.getDevices()
else:
print("Factory reset cancelled.")
elif event == "Flash configuration":
if self.flashConfig() is not None:
self.getConfigs()
self.resetGui()
if self.isUsb():
self.unlockConfig()
else:
self.devices.clear()
self.window.Element('devices').update("Search for devices", values=[])
elif event == "Clear configuration":
self.clearConfig()
self.getConfigs()
self.resetGui()
if self.isUsb():
self.unlockConfig()
else:
self.devices.clear()
self.window.Element('devices').update("Search for devices", values=[])
elif event == "View configuration":
try:
confJson = self.bl.readConfigData()
sg.popup_scrolled(confJson, title='Configuration')
except Exception as ex:
Popup(f'No existing config to view ({ex})', self.window)
elif event == "Flash application":
file = sg.popup_get_file("Select .dap file", file_types=(('DepthAI Application Package', '*.dap'), ('All Files', '*.* *')))
flashFromFile(file, self.bl)
elif event == "Remove application":
try:
self.bl.flashClear()
Popup(f'Successfully removed application', self.window)
except Exception as ex:
sg.popup(f"Couldn't remove application ({ex})", self.window)
elif event.startswith("_unique_configBtn"):
self.window['-COL1-'].update(visible=False)
self.window['-COL2-'].update(visible=True)
self.window['-COL3-'].update(visible=False)
self.window['-COL4-'].update(visible=False)
elif event.startswith("_unique_aboutBtn"):
self.window['-COL1-'].update(visible=True)
self.window['-COL2-'].update(visible=False)
self.window['-COL3-'].update(visible=False)
self.window['-COL4-'].update(visible=False)
elif event.startswith("_unique_appBtn"):
self.window['-COL1-'].update(visible=False)
self.window['-COL2-'].update(visible=False)
self.window['-COL3-'].update(visible=True)
self.window['-COL4-'].update(visible=False)
elif event.startswith("_unique_dangerBtn"):
self.window['-COL1-'].update(visible=False)
self.window['-COL2-'].update(visible=False)
self.window['-COL3-'].update(visible=False)
self.window['-COL4-'].update(visible=True)
elif event == "recoveryMode":
if recoveryMode(self.bl):
Popup(f'Device successfully put into USB recovery mode.', self.window)
# Device will reboot, close previous and reset GUI
self.closeDevice()
self.resetGui()
self.getDevices()
elif event == "dynamicBut":
# Clear out IPv4 settings to default to
self.window.Element('ip').update('')
self.window.Element('mask').update('')
self.window.Element('gateway').update('')
self.window.close()
@property
def device(self) -> dai.DeviceInfo:
"""
Get selected device
"""
return self.devices[self.values['devices']]
def addDeviceInfo(self, deviceInfo: dai.DeviceInfo):
if deviceInfo.getDeviceId() not in self.devices:
# Add the new deviceInfo
self.devices[deviceInfo.getDeviceId()] = deviceInfo
def getConfigs(self):
device = self.device
try:
conf = self.bl.readConfig()
except:
conf = dai.DeviceBootloader.Config()
try:
if self.isPoE():
if conf.isStaticIPV4():
self.window.Element('staticBut').update(True)
else:
self.window.Element('dynamicBut').update(True)
if conf.getIPv4() == '0.0.0.0':
self.window.Element('ip').update('')
else:
self.window.Element('ip').update(conf.getIPv4())
if conf.getIPv4Mask() == '0.0.0.0':
self.window.Element('mask').update('')
else:
self.window.Element('mask').update(conf.getIPv4Mask())
if conf.getIPv4Gateway() == '0.0.0.0':
self.window.Element('gateway').update('')
else:
self.window.Element('gateway').update(conf.getIPv4Gateway())
if conf.getDnsIPv4() == '0.0.0.0':
self.window.Element('dns').update('')
else:
self.window.Element('dns').update(conf.getDnsIPv4())
if conf.getDnsAltIPv4() == '0.0.0.0':
self.window.Element('dnsAlt').update('')
else:
self.window.Element('dnsAlt').update(conf.getDnsAltIPv4())
self.window.Element('networkTimeout').update(int(conf.getNetworkTimeout().total_seconds() * 1000))
if conf.getMacAddress() == '00:00:00:00:00:00':
self.window.Element('mac').update('')
else:
self.window.Element('mac').update(conf.getMacAddress())
for el in CONF_INPUT_USB:
self.window[el].update("")
else:
for el in CONF_INPUT_POE:
self.window[el].update("")
self.window.Element('usbTimeout').update(int(conf.getUsbTimeout().total_seconds() * 1000))
self.window.Element('usbSpeed').update(str(conf.getUsbMaxSpeed()).split('.')[1])
self.window.Element('devName').update(device.name)
# self.window.Element('devNameConf').update(device.getDeviceId())
self.window.Element('newBoot').update(dai.DeviceBootloader.getEmbeddedBootloaderVersion())
# The "isEmbeddedVersion" tells you whether BL had to be booted,
# or we connected to an already flashed Bootloader.
if self.bl.isEmbeddedVersion():
self.window.Element('currBoot').update('Not Flashed')
else:
self.window.Element('currBoot').update(self.bl.getVersion())
if deviceStateTxt(device.state) == "UNBOOTED":
self.window.Element('bootloaderType').update('N/A')
else:
self.window.Element('bootloaderType').update(str(self.bl.getType()).split('.')[1])
if self.isPoE():
self.window.Element('ipassignment').update("Static" if conf.isStaticIPV4() else "Dynamic")
else:
self.window.Element('ipassignment').update("N/A")
self.window.Element('version').update(dai.__version__)
self.window.Element('commit').update(dai.__commit__)
self.window.Element('devState').update(deviceStateTxt(self.device.state))
except Exception as ex:
PrintException()
sg.Popup(f'{ex}')
def unlockConfig(self):
if self.bl is None: return
if self.isPoE():
for el in CONF_INPUT_POE:
self.window[el].update(disabled=False)
for el in CONF_TEXT_POE:
self.window[el].update(text_color="black")
else:
for el in CONF_INPUT_USB:
self.window[el].update(disabled=False)
for el in CONF_TEXT_USB:
self.window[el].update(text_color="black")
self.window['Update Bootloader'].update(disabled=False)
self.window['flashFactoryBootloader'].update(disabled=False)
self.window['Flash configuration'].update(disabled=False)
self.window['Clear configuration'].update(disabled=False)
self.window['View configuration'].update(disabled=False)
self.window['Factory reset'].update(disabled=False)
self.window['Flash application'].update(disabled=False)
self.window['Remove application'].update(disabled=False)
self.window['recoveryMode'].update(disabled=False)
def resetGui(self):
"""
Reset GUI to its original state
"""
for conf in [CONF_INPUT_POE, CONF_INPUT_USB]:
for el in conf:
self.window[el].update(disabled=True)
self.window[el].update("")
for conf in [CONF_TEXT_POE, CONF_TEXT_USB]:
for el in conf:
self.window[el].update(text_color="gray")
self.window['Update Bootloader'].update(disabled=True)
self.window['flashFactoryBootloader'].update(disabled=True)
self.window['Flash configuration'].update(disabled=True)
self.window['Clear configuration'].update(disabled=True)
self.window['View configuration'].update(disabled=True)
self.window['Factory reset'].update(disabled=True)
self.window['Flash application'].update(disabled=True)
self.window['Remove application'].update(disabled=True)
self.window['recoveryMode'].update(disabled=True)
self.window.Element('devName').update("-name-")
# self.window.Element('devNameConf').update("")
self.window.Element('newBoot').update("-version-")
self.window.Element('currBoot').update("-version-")
self.window.Element('version').update("-version-")
self.window.Element('commit').update("-version-")
self.window.Element('devState').update("-state-")
# Move back to 'About' page
self.window['-COL1-'].update(visible=True)
self.window['-COL2-'].update(visible=False)
self.window['-COL3-'].update(visible=False)
self.window['-COL4-'].update(visible=False)
def closeDevice(self):
if self.bl is not None:
self.bl.close()
self.bl = None
def getDevices(self):
self.closeDevice() # If we are searching for new devices, first disconnect from the current one
try:
listedDevices = []
self.devices.clear()
deviceInfos = dai.XLinkConnection.getAllConnectedDevices()
if not deviceInfos:
self.window.Element('devices').update("No devices")
# sg.Popup("No devices found.")
else:
for deviceInfo in deviceInfos:
deviceTxt = deviceInfo.getDeviceId()
print(deviceInfo.state.name)
if "GATE" in deviceInfo.state.name: continue # Skip RVC4 devices
listedDevices.append(deviceTxt)
self.devices[deviceTxt] = deviceInfo
# Update the list regardless
self.window.Element('devices').update("Select device", values=listedDevices)
except Exception as ex:
PrintException()
Popup(f'{ex}', window=self.window)
def flashConfig(self):
values = self.values
# Read modify write instead of flashing over
# clearConfig can be used to start from scratch
conf = dai.DeviceBootloader.Config()
try:
conf = self.bl.readConfig()
except:
pass
try:
if self.isPoE:
if self.values['staticBut']:
if check_ip(self.window, values['ip']) and check_ip(self.window, values['mask']) and check_ip(self.window, values['gateway'], req=False):
conf.setStaticIPv4(values['ip'], values['mask'], values['gateway'])
else:
raise Exception('IP or Mask missing using static IP configuration')
else:
if check_ip(self.window, values['ip'], req=False) and check_ip(self.window, values['mask'], req=False) and check_ip(self.window, values['gateway'], req=False):
conf.setDynamicIPv4(values['ip'], values['mask'], values['gateway'])
conf.setDnsIPv4(values['dns'], values['dnsAlt'])
if values['networkTimeout'] != "":
if int(values['networkTimeout']) >= 0:
conf.setNetworkTimeout(timedelta(seconds=int(values['networkTimeout']) / 1000))
else:
Popup("Values can not be negative!", window=self.window)
if values['mac'] != "":
if check_mac(self.window, values['mac']):
conf.setMacAddress(values['mac'])
else:
conf.setMacAddress('00:00:00:00:00:00')
else:
if values['usbTimeout'] != "":
if int(values['usbTimeout']) >= 0:
conf.setUsbTimeout(timedelta(seconds=int(values['usbTimeout']) / 1000))
else:
Popup("Values can not be negative!", window=self.window)
if values['usbSpeed'] != "":
conf.setUsbMaxSpeed(getattr(dai.UsbSpeed, values['usbSpeed']))
success, error = self.bl.flashConfig(conf)
if not success:
Popup(f"Flashing failed: {error}", window=self.window)
return False
else:
Popup("Flashing successful.", window=self.window)
return True
except Exception as ex:
PrintException()
Popup(f'{ex}', window=self.window)
return None
def clearConfig(self):
try:
success, error = self.bl.flashConfigClear()
if not success:
Popup(f"Clearing configuration failed: {error}", window=self.window)
else:
Popup("Successfully cleared configuration.", window=self.window)
except Exception as ex:
PrintException()
Popup(f'{ex}', window=self.window)
app = DeviceManager()
app.run()