tasks.py 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. # tasks.py
  2. import asyncio
  3. from typing import Dict
  4. from uuid import uuid4
  5. import json
  6. from redis import Redis
  7. # A dictionary to keep track of active tasks
  8. tasks: Dict[str, asyncio.Task] = {}
  9. chat_tasks = {}
  10. def cleanup_task(request, task_id: str, id=None):
  11. """
  12. Remove a completed or canceled task from the global `tasks` dictionary.
  13. """
  14. tasks.pop(task_id, None) # Remove the task if it exists
  15. # If an ID is provided, remove the task from the chat_tasks dictionary
  16. if id and task_id in chat_tasks.get(id, []):
  17. chat_tasks[id].remove(task_id)
  18. if not chat_tasks[id]: # If no tasks left for this ID, remove the entry
  19. chat_tasks.pop(id, None)
  20. def create_task(request, coroutine, id=None):
  21. """
  22. Create a new asyncio task and add it to the global task dictionary.
  23. """
  24. task_id = str(uuid4()) # Generate a unique ID for the task
  25. task = asyncio.create_task(coroutine) # Create the task
  26. # Add a done callback for cleanup
  27. task.add_done_callback(lambda t: cleanup_task(request, task_id, id))
  28. tasks[task_id] = task
  29. # If an ID is provided, associate the task with that ID
  30. if chat_tasks.get(id):
  31. chat_tasks[id].append(task_id)
  32. else:
  33. chat_tasks[id] = [task_id]
  34. return task_id, task
  35. def list_tasks(request):
  36. """
  37. List all currently active task IDs.
  38. """
  39. return list(tasks.keys())
  40. def list_task_ids_by_chat_id(request, id):
  41. """
  42. List all tasks associated with a specific ID.
  43. """
  44. return chat_tasks.get(id, [])
  45. async def stop_task(request, task_id: str):
  46. """
  47. Cancel a running task and remove it from the global task list.
  48. """
  49. task = tasks.get(task_id)
  50. if not task:
  51. raise ValueError(f"Task with ID {task_id} not found.")
  52. task.cancel() # Request task cancellation
  53. try:
  54. await task # Wait for the task to handle the cancellation
  55. except asyncio.CancelledError:
  56. # Task successfully canceled
  57. tasks.pop(task_id, None) # Remove it from the dictionary
  58. return {"status": True, "message": f"Task {task_id} successfully stopped."}
  59. return {"status": False, "message": f"Failed to stop task {task_id}."}