1
0
Эх сурвалжийг харах

comparing timestamps across distributed systems of consumer devices is probably a bad idea.

Alex Cheema 8 сар өмнө
parent
commit
6e3239494d

+ 3 - 9
exo/networking/tailscale_discovery.py

@@ -1,7 +1,6 @@
 import asyncio
 import asyncio
 import time
 import time
 import traceback
 import traceback
-from datetime import datetime, timezone
 from typing import List, Dict, Callable, Tuple
 from typing import List, Dict, Callable, Tuple
 from tailscale import Tailscale, Device
 from tailscale import Tailscale, Device
 from .discovery import Discovery
 from .discovery import Discovery
@@ -68,7 +67,7 @@ class TailscaleDiscovery(Discovery):
     while True:
     while True:
       try:
       try:
         devices: dict[str, Device] = await self.tailscale.devices()
         devices: dict[str, Device] = await self.tailscale.devices()
-        current_time = datetime.now(timezone.utc).timestamp()
+        current_time = time.time()
 
 
         active_devices = {
         active_devices = {
           name: device for name, device in devices.items()
           name: device for name, device in devices.items()
@@ -80,17 +79,12 @@ class TailscaleDiscovery(Discovery):
         if DEBUG_DISCOVERY >= 2: print("Time since last seen tailscale devices", [(current_time  - device.last_seen.timestamp()) for device in devices.values()])
         if DEBUG_DISCOVERY >= 2: print("Time since last seen tailscale devices", [(current_time  - device.last_seen.timestamp()) for device in devices.values()])
 
 
         for device in active_devices.values():
         for device in active_devices.values():
-          if device.name == self.node_id:
-            continue
+          if device.name == self.node_id: continue
           peer_host = device.addresses[0]
           peer_host = device.addresses[0]
-          peer_id, peer_port, device_capabilities, updated_at = await get_device_attributes(device.device_id, self.tailscale.api_key)
+          peer_id, peer_port, device_capabilities = await get_device_attributes(device.device_id, self.tailscale.api_key)
           if not peer_id:
           if not peer_id:
             if DEBUG_DISCOVERY >= 4: print(f"{device.device_id} does not have exo node attributes. skipping.")
             if DEBUG_DISCOVERY >= 4: print(f"{device.device_id} does not have exo node attributes. skipping.")
             continue
             continue
-          if current_time - updated_at > self.discovery_timeout:
-            if DEBUG_DISCOVERY >= 3: print(f"{device.device_id} has outdated exo node attributes. skipping.")
-            if peer_id in self.known_peers: del self.known_peers[peer_id]
-            continue
 
 
           if peer_id not in self.known_peers or self.known_peers[peer_id][0].addr() != f"{peer_host}:{peer_port}":
           if peer_id not in self.known_peers or self.known_peers[peer_id][0].addr() != f"{peer_host}:{peer_port}":
             if DEBUG >= 1: print(f"Adding {peer_id=} at {peer_host}:{peer_port}. Replace existing peer_id: {peer_id in self.known_peers}")
             if DEBUG >= 1: print(f"Adding {peer_id=} at {peer_host}:{peer_port}. Replace existing peer_id: {peer_id in self.known_peers}")

+ 5 - 9
exo/networking/tailscale_helpers.py

@@ -2,10 +2,9 @@ import json
 import asyncio
 import asyncio
 import aiohttp
 import aiohttp
 import re
 import re
-from typing import Dict, Any, Tuple, Optional
+from typing import Dict, Any, Tuple
 from exo.helpers import DEBUG_DISCOVERY
 from exo.helpers import DEBUG_DISCOVERY
 from exo.topology.device_capabilities import DeviceCapabilities, DeviceFlops
 from exo.topology.device_capabilities import DeviceCapabilities, DeviceFlops
-from datetime import datetime, timezone
 
 
 async def get_device_id() -> str:
 async def get_device_id() -> str:
   try:
   try:
@@ -39,8 +38,7 @@ async def update_device_attributes(device_id: str, api_key: str, node_id: str, n
       "custom:exo_device_capability_memory": str(device_capabilities.memory),
       "custom:exo_device_capability_memory": str(device_capabilities.memory),
       "custom:exo_device_capability_flops_fp16": str(device_capabilities.flops.fp16),
       "custom:exo_device_capability_flops_fp16": str(device_capabilities.flops.fp16),
       "custom:exo_device_capability_flops_fp32": str(device_capabilities.flops.fp32),
       "custom:exo_device_capability_flops_fp32": str(device_capabilities.flops.fp32),
-      "custom:exo_device_capability_flops_int8": str(device_capabilities.flops.int8),
-      "custom:exo_updated_at": str(int(datetime.now(timezone.utc).timestamp()))
+      "custom:exo_device_capability_flops_int8": str(device_capabilities.flops.int8)
     }
     }
 
 
     for attr_name, attr_value in attributes.items():
     for attr_name, attr_value in attributes.items():
@@ -52,7 +50,7 @@ async def update_device_attributes(device_id: str, api_key: str, node_id: str, n
         else:
         else:
           print(f"Failed to update device posture attribute {attr_name}: {response.status} {await response.text()}")
           print(f"Failed to update device posture attribute {attr_name}: {response.status} {await response.text()}")
 
 
-async def get_device_attributes(device_id: str, api_key: str) -> Tuple[str, int, DeviceCapabilities, int]:
+async def get_device_attributes(device_id: str, api_key: str) -> Tuple[str, int, DeviceCapabilities]:
   async with aiohttp.ClientSession() as session:
   async with aiohttp.ClientSession() as session:
     url = f"https://api.tailscale.com/api/v2/device/{device_id}/attributes"
     url = f"https://api.tailscale.com/api/v2/device/{device_id}/attributes"
     headers = {
     headers = {
@@ -74,12 +72,10 @@ async def get_device_attributes(device_id: str, api_key: str) -> Tuple[str, int,
             int8=float(attributes.get("custom:exo_device_capability_flops_int8", 0))
             int8=float(attributes.get("custom:exo_device_capability_flops_int8", 0))
           )
           )
         )
         )
-        updated_at_str = attributes.get("custom:exo_updated_at")
-        updated_at = int(updated_at_str) if updated_at_str else 0
-        return node_id, node_port, device_capabilities, updated_at
+        return node_id, node_port, device_capabilities
       else:
       else:
         print(f"Failed to fetch posture attributes for {device_id}: {response.status}")
         print(f"Failed to fetch posture attributes for {device_id}: {response.status}")
-        return "", 0, DeviceCapabilities(model="", chip="", memory=0, flops=DeviceFlops(fp16=0, fp32=0, int8=0)), 0
+        return "", 0, DeviceCapabilities(model="", chip="", memory=0, flops=DeviceFlops(fp16=0, fp32=0, int8=0))
 
 
 def parse_device_attributes(data: Dict[str, str]) -> Dict[str, Any]:
 def parse_device_attributes(data: Dict[str, str]) -> Dict[str, Any]:
   result = {}
   result = {}

+ 1 - 3
exo/networking/udp_discovery.py

@@ -162,9 +162,7 @@ class UDPDiscovery(Discovery):
           peer_handle.id() for peer_handle, connected_at, last_seen in self.known_peers.values()
           peer_handle.id() for peer_handle, connected_at, last_seen in self.known_peers.values()
           if (not await peer_handle.is_connected() and current_time - connected_at > self.discovery_timeout) or current_time - last_seen > self.discovery_timeout
           if (not await peer_handle.is_connected() and current_time - connected_at > self.discovery_timeout) or current_time - last_seen > self.discovery_timeout
         ]
         ]
-        if DEBUG_DISCOVERY >= 2: print("Peer statuses:", {
-          peer_handle.id(): f"is_connected={await peer_handle.is_connected()}, {connected_at=}, {last_seen=}" for
-          peer_handle, connected_at, last_seen in self.known_peers.values()})
+        if DEBUG_DISCOVERY >= 2: print("Peer statuses:", {peer_handle.id(): f"is_connected={await peer_handle.is_connected()}, {connected_at=}, {last_seen=}" for peer_handle, connected_at, last_seen in self.known_peers.values()})
         for peer_id in peers_to_remove:
         for peer_id in peers_to_remove:
           if peer_id in self.known_peers: del self.known_peers[peer_id]
           if peer_id in self.known_peers: del self.known_peers[peer_id]
           if DEBUG_DISCOVERY >= 2: print(f"Removed peer {peer_id} due to inactivity.")
           if DEBUG_DISCOVERY >= 2: print(f"Removed peer {peer_id} due to inactivity.")