Explorar o código

Merge pull request #528 from exo-explore/interfaceprio

show interface type and name, fix propagation of topology
Alex Cheema hai 8 meses
pai
achega
17411dff0a

+ 1 - 1
exo/networking/grpc/grpc_server.py

@@ -85,7 +85,7 @@ class GRPCServer(node_service_pb2_grpc.NodeServiceServicer):
   async def CollectTopology(self, request, context):
     max_depth = request.max_depth
     visited = set(request.visited)
-    topology = await self.node.collect_topology(visited, max_depth)
+    topology = self.node.current_topology
     nodes = {
       node_id:
         node_service_pb2.DeviceCapabilities(

+ 3 - 3
exo/networking/grpc/node_service.proto

@@ -66,9 +66,9 @@ message PeerConnections {
 }
 
 message DeviceFlops {
-  float fp32 = 1;
-  float fp16 = 2;
-  float int8 = 3;
+  double fp32 = 1;
+  double fp16 = 2;
+  double int8 = 3;
 }
 
 message DeviceCapabilities {

A diferenza do arquivo foi suprimida porque é demasiado grande
+ 0 - 0
exo/networking/grpc/node_service_pb2.py


+ 4 - 2
exo/networking/udp/udp_discovery.py

@@ -88,7 +88,7 @@ class UDPDiscovery(Discovery):
       # Explicitly broadcasting on all assigned ips since broadcasting on `0.0.0.0` on MacOS does not broadcast over
       # the Thunderbolt bridge when other connection modalities exist such as WiFi or Ethernet
       for addr, interface_name in get_all_ip_addresses_and_interfaces():
-        interface_priority, _ = get_interface_priority_and_type(interface_name)
+        interface_priority, interface_type = get_interface_priority_and_type(interface_name)
         message = json.dumps({
           "type": "discovery",
           "node_id": self.node_id,
@@ -96,6 +96,7 @@ class UDPDiscovery(Discovery):
           "device_capabilities": self.device_capabilities.to_dict(),
           "priority": interface_priority, # TODO: Prioritise interfaces based on bandwidth, latency, and jitter e.g. prioritise Thunderbolt over WiFi.
           "interface_name": interface_name,
+          "interface_type": interface_type,
         })
         if DEBUG_DISCOVERY >= 3: print(f"Broadcasting presence at ({addr} - {interface_name} - {interface_priority}): {message}")
 
@@ -145,6 +146,7 @@ class UDPDiscovery(Discovery):
       peer_port = message["grpc_port"]
       peer_prio = message["priority"]
       peer_interface_name = message["interface_name"]
+      peer_interface_type = message["interface_type"]
       device_capabilities = DeviceCapabilities(**message["device_capabilities"])
 
       if peer_id not in self.known_peers or self.known_peers[peer_id][0].addr() != f"{peer_host}:{peer_port}":
@@ -154,7 +156,7 @@ class UDPDiscovery(Discovery):
             if DEBUG >= 1:
               print(f"Ignoring peer {peer_id} at {peer_host}:{peer_port} with priority {peer_prio} because we already know about a peer with higher or equal priority: {existing_peer_prio}")
             return
-        new_peer_handle = self.create_peer_handle(peer_id, f"{peer_host}:{peer_port}", peer_interface_name, device_capabilities)
+        new_peer_handle = self.create_peer_handle(peer_id, f"{peer_host}:{peer_port}", f"{peer_interface_type} ({peer_interface_name})", device_capabilities)
         if not await new_peer_handle.health_check():
           if DEBUG >= 1: print(f"Peer {peer_id} at {peer_host}:{peer_port} is not healthy. Skipping.")
           return

+ 8 - 8
exo/orchestration/standard_node.py

@@ -56,7 +56,7 @@ class StandardNode(Node):
     await self.server.start()
     await self.discovery.start()
     await self.update_peers(wait_for_peers)
-    await self.collect_topology()
+    await self.collect_topology(set())
     if DEBUG >= 2: print(f"Collected topology: {self.topology}")
     asyncio.create_task(self.periodic_topology_collection(1.0))
 
@@ -83,7 +83,7 @@ class StandardNode(Node):
         download_progress = RepoProgressEvent.from_dict(status_data.get('progress'))
         self.node_download_progress[status_data.get('node_id')] = download_progress
       if self.topology_viz:
-        self.topology_viz.update_visualization(self.current_topology, self.partitioning_strategy.partition(self.current_topology), self.id, self.node_download_progress)
+        self.topology_viz.update_visualization(self.topology, self.partitioning_strategy.partition(self.topology), self.id, self.node_download_progress)
     except Exception as e:
       if DEBUG >= 1: print(f"Error updating visualization: {e}")
       if DEBUG >= 1: traceback.print_exc()
@@ -374,8 +374,8 @@ class StandardNode(Node):
       try:
         did_peers_change = await self.update_peers()
         if DEBUG >= 2: print(f"{did_peers_change=}")
+        await self.collect_topology(set())
         if did_peers_change:
-          await self.collect_topology()
           await self.select_best_inference_engine()
       except Exception as e:
         print(f"Error collecting topology: {e}")
@@ -386,7 +386,7 @@ class StandardNode(Node):
       return None, False
     return np.array(self.buffered_token_output[request_id][0]), self.buffered_token_output[request_id][1]
 
-  async def collect_topology(self, visited: set[str] = set(), max_depth: int = 4) -> Topology:
+  async def collect_topology(self, visited: set[str], max_depth: int = 4) -> Topology:
     next_topology = Topology()
     next_topology.update_node(self.id, self.device_capabilities)
 
@@ -410,16 +410,16 @@ class StandardNode(Node):
       try:
         other_topology = await asyncio.wait_for(peer.collect_topology(visited, max_depth=max_depth - 1), timeout=5.0)
         if DEBUG >= 2: print(f"Collected topology from: {peer.id()}: {other_topology}")
-        self.topology.merge(other_topology)
+        next_topology.merge(other_topology)
       except Exception as e:
         print(f"Error collecting topology from {peer.id()}: {e}")
         traceback.print_exc()
 
-    next_topology.active_node_id = self.topology.active_node_id  # this is not so clean.
+    next_topology.active_node_id = self.topology.active_node_id
     self.topology = next_topology
     if self.topology_viz:
-      self.topology_viz.update_visualization(self.current_topology, self.partitioning_strategy.partition(self.current_topology), self.id)
-    return next_topology
+      self.topology_viz.update_visualization(self.topology, self.partitioning_strategy.partition(self.topology), self.id)
+    return self.topology
 
   @property
   def on_token(self) -> AsyncCallbackSystem[str, Tuple[str, List[int], bool]]:

+ 6 - 27
exo/topology/topology.py

@@ -1,5 +1,5 @@
 from .device_capabilities import DeviceCapabilities
-from typing import Dict, Set, Optional, NamedTuple
+from typing import Dict, Set, Optional
 from dataclasses import dataclass
 
 @dataclass
@@ -21,7 +21,6 @@ class PeerConnection:
 class Topology:
   def __init__(self):
     self.nodes: Dict[str, DeviceCapabilities] = {}
-    # Store PeerConnection objects in the adjacency lists
     self.peer_graph: Dict[str, Set[PeerConnection]] = {}
     self.active_node_id: Optional[str] = None
 
@@ -34,31 +33,11 @@ class Topology:
   def all_nodes(self):
     return self.nodes.items()
 
-  def add_edge(self, node1_id: str, node2_id: str, description: Optional[str] = None):
-    if node1_id not in self.peer_graph:
-      self.peer_graph[node1_id] = set()
-    if node2_id not in self.peer_graph:
-      self.peer_graph[node2_id] = set()
-
-    # Create bidirectional connections with the same description
-    conn1 = PeerConnection(node1_id, node2_id, description)
-    conn2 = PeerConnection(node2_id, node1_id, description)
-
-    self.peer_graph[node1_id].add(conn1)
-    self.peer_graph[node2_id].add(conn2)
-
-  def get_neighbors(self, node_id: str) -> Set[str]:
-    # Convert PeerConnection objects back to just destination IDs
-    return {conn.to_id for conn in self.peer_graph.get(node_id, set())}
-
-  def all_edges(self):
-    edges = []
-    for node_id, connections in self.peer_graph.items():
-      for conn in connections:
-        # Only include each edge once by checking if reverse already exists
-        if not any(e[0] == conn.to_id and e[1] == conn.from_id for e in edges):
-          edges.append((conn.from_id, conn.to_id, conn.description))
-    return edges
+  def add_edge(self, from_id: str, to_id: str, description: Optional[str] = None):
+    if from_id not in self.peer_graph:
+      self.peer_graph[from_id] = set()
+    conn = PeerConnection(from_id, to_id, description)
+    self.peer_graph[from_id].add(conn)
 
   def merge(self, other: "Topology"):
     for node_id, capabilities in other.nodes.items():

+ 1 - 1
exo/viz/topology_viz.py

@@ -253,7 +253,7 @@ class TopologyViz:
       conn2 = self.topology.peer_graph.get(self.partitions[next_i].node_id, set())
       description1 = next((c.description for c in conn1 if c.to_id == self.partitions[next_i].node_id), "")
       description2 = next((c.description for c in conn2 if c.to_id == partition.node_id), "")
-      connection_description = f"{description1}/{description2}" if description1 != description2 else description1
+      connection_description = f"{description1}/{description2}"
 
       # Simple line drawing
       steps = max(abs(next_x - x), abs(next_y - y))

Algúns arquivos non se mostraron porque demasiados arquivos cambiaron neste cambio