Răsfoiți Sursa

show interface type and name, fix propagation of topology

Alex Cheema 8 luni în urmă
părinte
comite
d5b4039f3d

+ 4 - 2
exo/networking/udp/udp_discovery.py

@@ -88,7 +88,7 @@ class UDPDiscovery(Discovery):
       # Explicitly broadcasting on all assigned ips since broadcasting on `0.0.0.0` on MacOS does not broadcast over
       # the Thunderbolt bridge when other connection modalities exist such as WiFi or Ethernet
       for addr, interface_name in get_all_ip_addresses_and_interfaces():
-        interface_priority, _ = get_interface_priority_and_type(interface_name)
+        interface_priority, interface_type = get_interface_priority_and_type(interface_name)
         message = json.dumps({
           "type": "discovery",
           "node_id": self.node_id,
@@ -96,6 +96,7 @@ class UDPDiscovery(Discovery):
           "device_capabilities": self.device_capabilities.to_dict(),
           "priority": interface_priority, # TODO: Prioritise interfaces based on bandwidth, latency, and jitter e.g. prioritise Thunderbolt over WiFi.
           "interface_name": interface_name,
+          "interface_type": interface_type,
         })
         if DEBUG_DISCOVERY >= 3: print(f"Broadcasting presence at ({addr} - {interface_name} - {interface_priority}): {message}")
 
@@ -145,6 +146,7 @@ class UDPDiscovery(Discovery):
       peer_port = message["grpc_port"]
       peer_prio = message["priority"]
       peer_interface_name = message["interface_name"]
+      peer_interface_type = message["interface_type"]
       device_capabilities = DeviceCapabilities(**message["device_capabilities"])
 
       if peer_id not in self.known_peers or self.known_peers[peer_id][0].addr() != f"{peer_host}:{peer_port}":
@@ -154,7 +156,7 @@ class UDPDiscovery(Discovery):
             if DEBUG >= 1:
               print(f"Ignoring peer {peer_id} at {peer_host}:{peer_port} with priority {peer_prio} because we already know about a peer with higher or equal priority: {existing_peer_prio}")
             return
-        new_peer_handle = self.create_peer_handle(peer_id, f"{peer_host}:{peer_port}", peer_interface_name, device_capabilities)
+        new_peer_handle = self.create_peer_handle(peer_id, f"{peer_host}:{peer_port}", f"{peer_interface_type} ({peer_interface_name})", device_capabilities)
         if not await new_peer_handle.health_check():
           if DEBUG >= 1: print(f"Peer {peer_id} at {peer_host}:{peer_port} is not healthy. Skipping.")
           return

+ 4 - 4
exo/orchestration/standard_node.py

@@ -410,16 +410,16 @@ class StandardNode(Node):
       try:
         other_topology = await asyncio.wait_for(peer.collect_topology(visited, max_depth=max_depth - 1), timeout=5.0)
         if DEBUG >= 2: print(f"Collected topology from: {peer.id()}: {other_topology}")
-        self.topology.merge(other_topology)
+        next_topology.merge(other_topology)
       except Exception as e:
         print(f"Error collecting topology from {peer.id()}: {e}")
         traceback.print_exc()
 
-    next_topology.active_node_id = self.topology.active_node_id  # this is not so clean.
+    next_topology.active_node_id = self.topology.active_node_id
     self.topology = next_topology
     if self.topology_viz:
-      self.topology_viz.update_visualization(self.current_topology, self.partitioning_strategy.partition(self.current_topology), self.id)
-    return next_topology
+      self.topology_viz.update_visualization(self.topology, self.partitioning_strategy.partition(self.topology), self.id)
+    return self.topology
 
   @property
   def on_token(self) -> AsyncCallbackSystem[str, Tuple[str, List[int], bool]]: