|
@@ -130,9 +130,9 @@ class Node:
|
|
|
self.buffered_token_output[request_id][0].append(token.item())
|
|
|
is_finished = token.item() == self.inference_engine.tokenizer.eos_token_id or is_finished or len(self.buffered_token_output[request_id][0]) >= self.max_generate_tokens
|
|
|
if DEBUG >= 2: print(f"[{request_id}] result size: {result.size}, is finished: {is_finished}, buffered tokens: {len(self.buffered_token_output[request_id][0])}")
|
|
|
- asyncio.create_task(self.broadcast_result(request_id, *self.buffered_token_output[request_id]))
|
|
|
+ asyncio.create_task(self.broadcast_result(request_id, [self.buffered_token_output[request_id][0][-1]], is_finished))
|
|
|
forward = token.reshape(1, -1)
|
|
|
- intermediate_result = self.buffered_token_output[request_id][0]
|
|
|
+ intermediate_result = self.buffered_token_output[request_id][0][-1]
|
|
|
else:
|
|
|
forward = result
|
|
|
else:
|
|
@@ -586,17 +586,17 @@ class Node:
|
|
|
if DEBUG >= 2: print(f"Triggering all on_token callbacks with {request_id=} {token=} {is_finished=}")
|
|
|
self.on_token.trigger_all(request_id, token, is_finished)
|
|
|
|
|
|
- async def broadcast_new_token(self, request_id: str, token: int, is_finished: bool) -> None:
|
|
|
- async def send_new_token_to_peer(peer):
|
|
|
+ async def broadcast_result(self, request_id: str, result: List[int], is_finished: bool) -> None:
|
|
|
+ async def send_result_to_peer(peer):
|
|
|
try:
|
|
|
- await asyncio.wait_for(peer.send_new_token(request_id, token, is_finished), timeout=15.0)
|
|
|
+ await asyncio.wait_for(peer.send_result(request_id, result, is_finished), timeout=15.0)
|
|
|
except asyncio.TimeoutError:
|
|
|
- print(f"Timeout broadcasting new token to {peer.id()}")
|
|
|
+ print(f"Timeout broadcasting result to {peer.id()}")
|
|
|
except Exception as e:
|
|
|
- print(f"Error broadcasting new token to {peer.id()}: {e}")
|
|
|
+ print(f"Error broadcasting result to {peer.id()}: {e}")
|
|
|
traceback.print_exc()
|
|
|
|
|
|
- await asyncio.gather(*[send_new_token_to_peer(peer) for peer in self.peers], return_exceptions=True)
|
|
|
+ await asyncio.gather(*[send_result_to_peer(peer) for peer in self.peers], return_exceptions=True)
|
|
|
|
|
|
async def broadcast_opaque_status(self, request_id: str, status: str) -> None:
|
|
|
if DEBUG >= 8: print(f"Broadcasting opaque status: {request_id=} {status=}")
|