|
@@ -76,7 +76,7 @@ class UDPDiscovery(Discovery):
|
|
|
while len(self.known_peers) < wait_for_peers:
|
|
|
if DEBUG_DISCOVERY >= 2: print(f"Current peers: {len(self.known_peers)}/{wait_for_peers}. Waiting for more peers...")
|
|
|
await asyncio.sleep(0.1)
|
|
|
- return [peer_handle for peer_handle, _, _, last_is_healthy in self.known_peers.values() if last_is_healthy]
|
|
|
+ return [peer_handle for peer_handle, _, _ in self.known_peers.values()]
|
|
|
|
|
|
async def task_broadcast_presence(self):
|
|
|
message = json.dumps({
|
|
@@ -140,17 +140,11 @@ class UDPDiscovery(Discovery):
|
|
|
peer_port = message["grpc_port"]
|
|
|
device_capabilities = DeviceCapabilities(**message["device_capabilities"])
|
|
|
|
|
|
- # Create a new peer handle
|
|
|
- new_peer_handle = self.create_peer_handle(peer_id, f"{peer_host}:{peer_port}", device_capabilities)
|
|
|
-
|
|
|
- # Check if the new peer is healthy before adding
|
|
|
if peer_id not in self.known_peers or self.known_peers[peer_id][0].addr() != f"{peer_host}:{peer_port}":
|
|
|
- if DEBUG >= 1: print(
|
|
|
- f"Adding {peer_id=} at {peer_host}:{peer_port}. Replace existing peer_id: {peer_id in self.known_peers}")
|
|
|
- self.known_peers[peer_id] = (new_peer_handle, time.time(), time.time(), await new_peer_handle.health_check())
|
|
|
+ if DEBUG >= 1: print(f"Adding {peer_id=} at {peer_host}:{peer_port}. Replace existing peer_id: {peer_id in self.known_peers}")
|
|
|
+ self.known_peers[peer_id] = (self.create_peer_handle(peer_id, f"{peer_host}:{peer_port}", device_capabilities), time.time(), time.time())
|
|
|
else:
|
|
|
- # Update last seen time for existing peer
|
|
|
- self.known_peers[peer_id] = (self.known_peers[peer_id][0], self.known_peers[peer_id][1], time.time(), await new_peer_handle.health_check())
|
|
|
+ self.known_peers[peer_id] = (self.known_peers[peer_id][0], self.known_peers[peer_id][1], time.time())
|
|
|
|
|
|
async def task_listen_for_peers(self):
|
|
|
await asyncio.get_event_loop().create_datagram_endpoint(lambda: ListenProtocol(self.on_listen_message),
|
|
@@ -162,12 +156,13 @@ class UDPDiscovery(Discovery):
|
|
|
try:
|
|
|
current_time = time.time()
|
|
|
peers_to_remove = []
|
|
|
- for peer_id, (peer_handle, connected_at, last_seen, last_is_healthy) in self.known_peers.items():
|
|
|
- if ((not await peer_handle.is_connected() or not last_is_healthy or not await peer_handle.health_check()) and current_time - connected_at > self.discovery_timeout) or \
|
|
|
- current_time - last_seen > self.discovery_timeout:
|
|
|
+ for peer_id, (peer_handle, connected_at, last_seen) in self.known_peers.items():
|
|
|
+ if (not await peer_handle.is_connected() and current_time - connected_at > self.discovery_timeout) or \
|
|
|
+ (current_time - last_seen > self.discovery_timeout) or \
|
|
|
+ (not await peer_handle.health_check()):
|
|
|
peers_to_remove.append(peer_id)
|
|
|
|
|
|
- if DEBUG_DISCOVERY >= 2: print("Peer statuses:", {peer_handle.id(): f"is_connected={await peer_handle.is_connected()}, health_check={await peer_handle.health_check()}, {connected_at=}, {last_seen=}" for peer_handle, connected_at, last_seen, last_is_healthy in self.known_peers.values()})
|
|
|
+ if DEBUG_DISCOVERY >= 2: print("Peer statuses:", {peer_handle.id(): f"is_connected={await peer_handle.is_connected()}, health_check={await peer_handle.health_check()}, {connected_at=}, {last_seen=}" for peer_handle, connected_at, last_seen in self.known_peers.values()})
|
|
|
|
|
|
for peer_id in peers_to_remove:
|
|
|
if peer_id in self.known_peers: del self.known_peers[peer_id]
|