Browse Source

broadcast results concurrently fixes #31

Alex Cheema 1 year ago
parent
commit
47163d22db
1 changed files with 7 additions and 3 deletions
  1. 7 3
      exo/orchestration/standard_node.py

+ 7 - 3
exo/orchestration/standard_node.py

@@ -240,10 +240,14 @@ class StandardNode(Node):
         self.on_token.trigger_all(request_id, tokens, is_finished)
 
     async def broadcast_result(self, request_id: str, result: List[int], is_finished: bool) -> None:
-        for peer in self.peers:
+        async def send_result_to_peer(peer):
             try:
-                await peer.send_result(request_id, result, is_finished)
+                await asyncio.wait_for(peer.send_result(request_id, result, is_finished), timeout=15.0)
+            except asyncio.TimeoutError:
+                print(f"Timeout broadcasting result to {peer.id()}")
             except Exception as e:
+                print(f"Error broadcasting result to {peer.id()}: {e}")
                 import traceback
                 traceback.print_exc()
-                print(f"Error broadcasting result to {peer.id()}: {e}")
+
+        await asyncio.gather(*[send_result_to_peer(peer) for peer in self.peers], return_exceptions=True)