tasks.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. # tasks.py
  2. import asyncio
  3. from typing import Dict
  4. from uuid import uuid4
  5. # A dictionary to keep track of active tasks
  6. tasks: Dict[str, asyncio.Task] = {}
  7. chat_tasks = {}
  8. def cleanup_task(task_id: str, id=None):
  9. """
  10. Remove a completed or canceled task from the global `tasks` dictionary.
  11. """
  12. tasks.pop(task_id, None) # Remove the task if it exists
  13. # If an ID is provided, remove the task from the chat_tasks dictionary
  14. if id and task_id in chat_tasks.get(id, []):
  15. chat_tasks[id].remove(task_id)
  16. if not chat_tasks[id]: # If no tasks left for this ID, remove the entry
  17. chat_tasks.pop(id, None)
  18. def create_task(coroutine, id=None):
  19. """
  20. Create a new asyncio task and add it to the global task dictionary.
  21. """
  22. task_id = str(uuid4()) # Generate a unique ID for the task
  23. task = asyncio.create_task(coroutine) # Create the task
  24. # Add a done callback for cleanup
  25. task.add_done_callback(lambda t: cleanup_task(task_id, id))
  26. tasks[task_id] = task
  27. # If an ID is provided, associate the task with that ID
  28. if chat_tasks.get(id):
  29. chat_tasks[id].append(task_id)
  30. else:
  31. chat_tasks[id] = [task_id]
  32. return task_id, task
  33. def get_task(task_id: str):
  34. """
  35. Retrieve a task by its task ID.
  36. """
  37. return tasks.get(task_id)
  38. def list_tasks():
  39. """
  40. List all currently active task IDs.
  41. """
  42. return list(tasks.keys())
  43. def list_task_ids_by_chat_id(id):
  44. """
  45. List all tasks associated with a specific ID.
  46. """
  47. return chat_tasks.get(id, [])
  48. async def stop_task(task_id: str):
  49. """
  50. Cancel a running task and remove it from the global task list.
  51. """
  52. task = tasks.get(task_id)
  53. if not task:
  54. raise ValueError(f"Task with ID {task_id} not found.")
  55. task.cancel() # Request task cancellation
  56. try:
  57. await task # Wait for the task to handle the cancellation
  58. except asyncio.CancelledError:
  59. # Task successfully canceled
  60. tasks.pop(task_id, None) # Remove it from the dictionary
  61. return {"status": True, "message": f"Task {task_id} successfully stopped."}
  62. return {"status": False, "message": f"Failed to stop task {task_id}."}