فهرست منبع

add a priority to broadcast messages where the broadcaster can indicate how to prioritise that particular interface. for now all priorities are set to 1 but in the future this will be based on network latency, bandwidth, jitter, etc.. e.g. Thunderbolt prioritised over WiFi

Alex Cheema 10 ماه پیش
والد
کامیت
ac6f1bedac
1فایلهای تغییر یافته به همراه20 افزوده شده و 13 حذف شده
  1. 20 13
      exo/networking/udp/udp_discovery.py

+ 20 - 13
exo/networking/udp/udp_discovery.py

@@ -53,7 +53,7 @@ class UDPDiscovery(Discovery):
     self.broadcast_interval = broadcast_interval
     self.discovery_timeout = discovery_timeout
     self.device_capabilities = device_capabilities
-    self.known_peers: Dict[str, Tuple[PeerHandle, float, float]] = {}
+    self.known_peers: Dict[str, Tuple[PeerHandle, float, float, int]] = {}
     self.broadcast_task = None
     self.listen_task = None
     self.cleanup_task = None
@@ -76,16 +76,9 @@ class UDPDiscovery(Discovery):
       while len(self.known_peers) < wait_for_peers:
         if DEBUG_DISCOVERY >= 2: print(f"Current peers: {len(self.known_peers)}/{wait_for_peers}. Waiting for more peers...")
         await asyncio.sleep(0.1)
-    return [peer_handle for peer_handle, _, _ in self.known_peers.values()]
+    return [peer_handle for peer_handle, _, _, _ in self.known_peers.values()]
 
   async def task_broadcast_presence(self):
-    message = json.dumps({
-      "type": "discovery",
-      "node_id": self.node_id,
-      "grpc_port": self.node_port,
-      "device_capabilities": self.device_capabilities.to_dict(),
-    })
-
     if DEBUG_DISCOVERY >= 2:
       print("Starting task_broadcast_presence...")
       print(f"\nBroadcast message: {message}")
@@ -94,6 +87,14 @@ class UDPDiscovery(Discovery):
       # Explicitly broadcasting on all assigned ips since broadcasting on `0.0.0.0` on MacOS does not broadcast over
       # the Thunderbolt bridge when other connection modalities exist such as WiFi or Ethernet
       for addr in get_all_ip_addresses():
+        message = json.dumps({
+          "type": "discovery",
+          "node_id": self.node_id,
+          "grpc_port": self.node_port,
+          "device_capabilities": self.device_capabilities.to_dict(),
+          "priority": 1, # For now, every interface has the same priority. We can make this better by prioriting interfaces based on bandwidth, latency, and jitter e.g. prioritise Thunderbolt over WiFi.
+        })
+
         transport = None
         try:
           transport, _ = await asyncio.get_event_loop().create_datagram_endpoint(
@@ -138,21 +139,27 @@ class UDPDiscovery(Discovery):
       peer_id = message["node_id"]
       peer_host = addr[0]
       peer_port = message["grpc_port"]
+      peer_prio = message["priority"]
       device_capabilities = DeviceCapabilities(**message["device_capabilities"])
 
       if peer_id not in self.known_peers or self.known_peers[peer_id][0].addr() != f"{peer_host}:{peer_port}":
+        if peer_id in self.known_peers:
+          existing_peer_prio = self.known_peers[peer_id][3]
+          if existing_peer_prio >= peer_prio:
+            if DEBUG >= 1: print(f"Ignoring peer {peer_id} at {peer_host}:{peer_port} with priority {peer_prio} because we already know about a peer with higher or equal priority: {existing_peer_prio}")
+            return
         new_peer_handle = self.create_peer_handle(peer_id, f"{peer_host}:{peer_port}", device_capabilities)
         if not await new_peer_handle.health_check():
           if DEBUG >= 1: print(f"Peer {peer_id} at {peer_host}:{peer_port} is not healthy. Skipping.")
           return
         if DEBUG >= 1: print(f"Adding {peer_id=} at {peer_host}:{peer_port}. Replace existing peer_id: {peer_id in self.known_peers}")
-        self.known_peers[peer_id] = (new_peer_handle, time.time(), time.time())
+        self.known_peers[peer_id] = (new_peer_handle, time.time(), time.time(), peer_prio)
       else:
         if not await self.known_peers[peer_id][0].health_check():
           if DEBUG >= 1: print(f"Peer {peer_id} at {peer_host}:{peer_port} is not healthy. Removing.")
           if peer_id in self.known_peers: del self.known_peers[peer_id]
           return
-        self.known_peers[peer_id] = (self.known_peers[peer_id][0], self.known_peers[peer_id][1], time.time())
+        self.known_peers[peer_id] = (self.known_peers[peer_id][0], self.known_peers[peer_id][1], time.time(), peer_prio)
 
   async def task_listen_for_peers(self):
     await asyncio.get_event_loop().create_datagram_endpoint(lambda: ListenProtocol(self.on_listen_message),
@@ -164,13 +171,13 @@ class UDPDiscovery(Discovery):
       try:
         current_time = time.time()
         peers_to_remove = []
-        for peer_id, (peer_handle, connected_at, last_seen) in self.known_peers.items():
+        for peer_id, (peer_handle, connected_at, last_seen, prio) in self.known_peers.items():
           if (not await peer_handle.is_connected() and current_time - connected_at > self.discovery_timeout) or \
              (current_time - last_seen > self.discovery_timeout) or \
              (not await peer_handle.health_check()):
             peers_to_remove.append(peer_id)
 
-        if DEBUG_DISCOVERY >= 2: print("Peer statuses:", {peer_handle.id(): f"is_connected={await peer_handle.is_connected()}, health_check={await peer_handle.health_check()}, {connected_at=}, {last_seen=}" for peer_handle, connected_at, last_seen in self.known_peers.values()})
+        if DEBUG_DISCOVERY >= 2: print("Peer statuses:", {peer_handle.id(): f"is_connected={await peer_handle.is_connected()}, health_check={await peer_handle.health_check()}, {connected_at=}, {last_seen=}, {prio=}" for peer_handle, connected_at, last_seen, prio in self.known_peers.values()})
 
         for peer_id in peers_to_remove:
           if peer_id in self.known_peers: del self.known_peers[peer_id]