|
@@ -22,7 +22,6 @@ class ManualDiscovery(Discovery):
|
|
|
self.create_peer_handle = create_peer_handle
|
|
|
|
|
|
self.listen_task = None
|
|
|
- self.cleanup_task = None
|
|
|
self.known_peers: Dict[str, PeerHandle] = {}
|
|
|
|
|
|
self._cached_peers: Dict[str, PeerConfig] = {}
|
|
@@ -32,11 +31,9 @@ class ManualDiscovery(Discovery):
|
|
|
|
|
|
async def start(self) -> None:
|
|
|
self.listen_task = asyncio.create_task(self.task_find_peers_from_config())
|
|
|
- self.cleanup_task = asyncio.create_task(self.task_clean_up_peers_from_config())
|
|
|
|
|
|
async def stop(self) -> None:
|
|
|
if self.listen_task: self.listen_task.cancel()
|
|
|
- if self.cleanup_task: self.cleanup_task.cancel()
|
|
|
self._file_executor.shutdown(wait=True)
|
|
|
|
|
|
async def discover_peers(self, wait_for_peers: int = 0) -> List[PeerHandle]:
|
|
@@ -51,6 +48,7 @@ class ManualDiscovery(Discovery):
|
|
|
if DEBUG_DISCOVERY >= 2: print("Starting task to find peers from config...")
|
|
|
while True:
|
|
|
peers_from_config = await self._get_peers()
|
|
|
+ new_known_peers = {}
|
|
|
for peer_id, peer_config in peers_from_config.items():
|
|
|
try:
|
|
|
if DEBUG_DISCOVERY >= 2: print(f"Checking peer {peer_id=} at {peer_config.address}:{peer_config.port}")
|
|
@@ -61,29 +59,15 @@ class ManualDiscovery(Discovery):
|
|
|
is_healthy = await peer.health_check()
|
|
|
if is_healthy:
|
|
|
if DEBUG_DISCOVERY >= 2: print(f"{peer_id=} at {peer_config.address}:{peer_config.port} is healthy.")
|
|
|
- self.known_peers[peer_id] = peer
|
|
|
- else:
|
|
|
- if DEBUG_DISCOVERY >= 2: print(f"{peer_id=} at {peer_config.address}:{peer_config.port} is not healthy.")
|
|
|
- self.known_peers.pop(peer_id, None)
|
|
|
+ new_known_peers[peer_id] = peer
|
|
|
+ elif DEBUG_DISCOVERY >= 2: print(f"{peer_id=} at {peer_config.address}:{peer_config.port} is not healthy. Removing.")
|
|
|
except Exception as e:
|
|
|
if DEBUG_DISCOVERY >= 2: print(f"Exception occured when attempting to add {peer_id=}: {e}")
|
|
|
+ self.known_peers = new_known_peers
|
|
|
await asyncio.sleep(1.0)
|
|
|
|
|
|
if DEBUG_DISCOVERY >= 2: print(f"Current known peers: {[peer.id() for peer in self.known_peers.values()]}")
|
|
|
|
|
|
- async def task_clean_up_peers_from_config(self):
|
|
|
- if DEBUG_DISCOVERY >= 2: print("Starting task to clean up peers from config...")
|
|
|
- while True:
|
|
|
- peers_from_config = await self._get_peers()
|
|
|
- if peers_from_config:
|
|
|
- peers_to_remove = [peer for peer in self.known_peers.keys() if peer not in peers_from_config]
|
|
|
-
|
|
|
- for peer in peers_to_remove:
|
|
|
- if DEBUG_DISCOVERY >= 2: print(f"{peer} is no longer found in the config but is currently a known peer. Removing from known peers...")
|
|
|
- self.known_peers.pop(peer, None)
|
|
|
-
|
|
|
- await asyncio.sleep(1.0)
|
|
|
-
|
|
|
async def _get_peers(self):
|
|
|
try:
|
|
|
async with self._lock:
|