Răsfoiți Sursa

Merge pull request #278 from exo-explore/peer_prio

add a priority to broadcast messages
Alex Cheema 7 luni în urmă
părinte
comite
5e0db20426
1 a modificat fișierele cu 21 adăugiri și 14 ștergeri
  1. 21 14
      exo/networking/udp/udp_discovery.py

+ 21 - 14
exo/networking/udp/udp_discovery.py

@@ -53,7 +53,7 @@ class UDPDiscovery(Discovery):
     self.broadcast_interval = broadcast_interval
     self.discovery_timeout = discovery_timeout
     self.device_capabilities = device_capabilities
-    self.known_peers: Dict[str, Tuple[PeerHandle, float, float]] = {}
+    self.known_peers: Dict[str, Tuple[PeerHandle, float, float, int]] = {}
     self.broadcast_task = None
     self.listen_task = None
     self.cleanup_task = None
@@ -76,24 +76,25 @@ class UDPDiscovery(Discovery):
       while len(self.known_peers) < wait_for_peers:
         if DEBUG_DISCOVERY >= 2: print(f"Current peers: {len(self.known_peers)}/{wait_for_peers}. Waiting for more peers...")
         await asyncio.sleep(0.1)
-    return [peer_handle for peer_handle, _, _ in self.known_peers.values()]
+    return [peer_handle for peer_handle, _, _, _ in self.known_peers.values()]
 
   async def task_broadcast_presence(self):
-    message = json.dumps({
-      "type": "discovery",
-      "node_id": self.node_id,
-      "grpc_port": self.node_port,
-      "device_capabilities": self.device_capabilities.to_dict(),
-    })
-
     if DEBUG_DISCOVERY >= 2:
       print("Starting task_broadcast_presence...")
-      print(f"\nBroadcast message: {message}")
 
     while True:
       # Explicitly broadcasting on all assigned ips since broadcasting on `0.0.0.0` on MacOS does not broadcast over
       # the Thunderbolt bridge when other connection modalities exist such as WiFi or Ethernet
       for addr in get_all_ip_addresses():
+        message = json.dumps({
+          "type": "discovery",
+          "node_id": self.node_id,
+          "grpc_port": self.node_port,
+          "device_capabilities": self.device_capabilities.to_dict(),
+          "priority": 1, # For now, every interface has the same priority. We can make this better by prioriting interfaces based on bandwidth, latency, and jitter e.g. prioritise Thunderbolt over WiFi.
+        })
+        if DEBUG_DISCOVERY >= 3: print(f"Broadcasting presence at ({addr}): {message}")
+
         transport = None
         try:
           transport, _ = await asyncio.get_event_loop().create_datagram_endpoint(
@@ -138,21 +139,27 @@ class UDPDiscovery(Discovery):
       peer_id = message["node_id"]
       peer_host = addr[0]
       peer_port = message["grpc_port"]
+      peer_prio = message["priority"]
       device_capabilities = DeviceCapabilities(**message["device_capabilities"])
 
       if peer_id not in self.known_peers or self.known_peers[peer_id][0].addr() != f"{peer_host}:{peer_port}":
+        if peer_id in self.known_peers:
+          existing_peer_prio = self.known_peers[peer_id][3]
+          if existing_peer_prio >= peer_prio:
+            if DEBUG >= 1: print(f"Ignoring peer {peer_id} at {peer_host}:{peer_port} with priority {peer_prio} because we already know about a peer with higher or equal priority: {existing_peer_prio}")
+            return
         new_peer_handle = self.create_peer_handle(peer_id, f"{peer_host}:{peer_port}", device_capabilities)
         if not await new_peer_handle.health_check():
           if DEBUG >= 1: print(f"Peer {peer_id} at {peer_host}:{peer_port} is not healthy. Skipping.")
           return
         if DEBUG >= 1: print(f"Adding {peer_id=} at {peer_host}:{peer_port}. Replace existing peer_id: {peer_id in self.known_peers}")
-        self.known_peers[peer_id] = (new_peer_handle, time.time(), time.time())
+        self.known_peers[peer_id] = (new_peer_handle, time.time(), time.time(), peer_prio)
       else:
         if not await self.known_peers[peer_id][0].health_check():
           if DEBUG >= 1: print(f"Peer {peer_id} at {peer_host}:{peer_port} is not healthy. Removing.")
           if peer_id in self.known_peers: del self.known_peers[peer_id]
           return
-        self.known_peers[peer_id] = (self.known_peers[peer_id][0], self.known_peers[peer_id][1], time.time())
+        self.known_peers[peer_id] = (self.known_peers[peer_id][0], self.known_peers[peer_id][1], time.time(), peer_prio)
 
   async def task_listen_for_peers(self):
     await asyncio.get_event_loop().create_datagram_endpoint(lambda: ListenProtocol(self.on_listen_message),
@@ -164,13 +171,13 @@ class UDPDiscovery(Discovery):
       try:
         current_time = time.time()
         peers_to_remove = []
-        for peer_id, (peer_handle, connected_at, last_seen) in self.known_peers.items():
+        for peer_id, (peer_handle, connected_at, last_seen, prio) in self.known_peers.items():
           if (not await peer_handle.is_connected() and current_time - connected_at > self.discovery_timeout) or \
              (current_time - last_seen > self.discovery_timeout) or \
              (not await peer_handle.health_check()):
             peers_to_remove.append(peer_id)
 
-        if DEBUG_DISCOVERY >= 2: print("Peer statuses:", {peer_handle.id(): f"is_connected={await peer_handle.is_connected()}, health_check={await peer_handle.health_check()}, {connected_at=}, {last_seen=}" for peer_handle, connected_at, last_seen in self.known_peers.values()})
+        if DEBUG_DISCOVERY >= 2: print("Peer statuses:", {peer_handle.id(): f"is_connected={await peer_handle.is_connected()}, health_check={await peer_handle.health_check()}, {connected_at=}, {last_seen=}, {prio=}" for peer_handle, connected_at, last_seen, prio in self.known_peers.values()})
 
         for peer_id in peers_to_remove:
           if peer_id in self.known_peers: del self.known_peers[peer_id]