tasks.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. # tasks.py
  2. import asyncio
  3. from typing import Dict
  4. from uuid import uuid4
  5. import json
  6. from redis.asyncio import Redis
  7. from fastapi import Request
  8. from typing import Dict, List, Optional
  9. # A dictionary to keep track of active tasks
  10. tasks: Dict[str, asyncio.Task] = {}
  11. chat_tasks = {}
  12. REDIS_TASKS_KEY = "open-webui:tasks"
  13. REDIS_CHAT_TASKS_KEY = "open-webui:tasks:chat"
  14. REDIS_PUBSUB_CHANNEL = "open-webui:tasks:commands"
  15. def is_redis(request: Request) -> bool:
  16. # Called everywhere a request is available to check Redis
  17. return hasattr(request.app.state, "redis") and (request.app.state.redis is not None)
  18. async def redis_task_command_listener(app):
  19. redis: Redis = app.state.redis
  20. pubsub = redis.pubsub()
  21. await pubsub.subscribe(REDIS_PUBSUB_CHANNEL)
  22. async for message in pubsub.listen():
  23. if message["type"] != "message":
  24. continue
  25. try:
  26. command = json.loads(message["data"])
  27. if command.get("action") == "stop":
  28. task_id = command.get("task_id")
  29. local_task = tasks.get(task_id)
  30. if local_task:
  31. local_task.cancel()
  32. except Exception as e:
  33. print(f"Error handling distributed task command: {e}")
  34. ### ------------------------------
  35. ### REDIS-ENABLED HANDLERS
  36. ### ------------------------------
  37. async def redis_save_task(redis: Redis, task_id: str, chat_id: Optional[str]):
  38. pipe = redis.pipeline()
  39. pipe.hset(REDIS_TASKS_KEY, task_id, chat_id or "")
  40. if chat_id:
  41. pipe.sadd(f"{REDIS_CHAT_TASKS_KEY}:{chat_id}", task_id)
  42. await pipe.execute()
  43. async def redis_cleanup_task(redis: Redis, task_id: str, chat_id: Optional[str]):
  44. pipe = redis.pipeline()
  45. pipe.hdel(REDIS_TASKS_KEY, task_id)
  46. if chat_id:
  47. pipe.srem(f"{REDIS_CHAT_TASKS_KEY}:{chat_id}", task_id)
  48. if (await pipe.scard(f"{REDIS_CHAT_TASKS_KEY}:{chat_id}").execute())[-1] == 0:
  49. pipe.delete(f"{REDIS_CHAT_TASKS_KEY}:{chat_id}") # Remove if empty set
  50. await pipe.execute()
  51. async def redis_list_tasks(redis: Redis) -> List[str]:
  52. return list(await redis.hkeys(REDIS_TASKS_KEY))
  53. async def redis_list_chat_tasks(redis: Redis, chat_id: str) -> List[str]:
  54. return list(await redis.smembers(f"{REDIS_CHAT_TASKS_KEY}:{chat_id}"))
  55. async def redis_send_command(redis: Redis, command: dict):
  56. await redis.publish(REDIS_PUBSUB_CHANNEL, json.dumps(command))
  57. async def cleanup_task(request, task_id: str, id=None):
  58. """
  59. Remove a completed or canceled task from the global `tasks` dictionary.
  60. """
  61. if is_redis(request):
  62. await redis_cleanup_task(request.app.state.redis, task_id, id)
  63. tasks.pop(task_id, None) # Remove the task if it exists
  64. # If an ID is provided, remove the task from the chat_tasks dictionary
  65. if id and task_id in chat_tasks.get(id, []):
  66. chat_tasks[id].remove(task_id)
  67. if not chat_tasks[id]: # If no tasks left for this ID, remove the entry
  68. chat_tasks.pop(id, None)
  69. async def create_task(request, coroutine, id=None):
  70. """
  71. Create a new asyncio task and add it to the global task dictionary.
  72. """
  73. task_id = str(uuid4()) # Generate a unique ID for the task
  74. task = asyncio.create_task(coroutine) # Create the task
  75. # Add a done callback for cleanup
  76. task.add_done_callback(
  77. lambda t: asyncio.create_task(cleanup_task(request, task_id, id))
  78. )
  79. tasks[task_id] = task
  80. # If an ID is provided, associate the task with that ID
  81. if chat_tasks.get(id):
  82. chat_tasks[id].append(task_id)
  83. else:
  84. chat_tasks[id] = [task_id]
  85. if is_redis(request):
  86. await redis_save_task(request.app.state.redis, task_id, id)
  87. return task_id, task
  88. async def list_tasks(request):
  89. """
  90. List all currently active task IDs.
  91. """
  92. if is_redis(request):
  93. return await redis_list_tasks(request.app.state.redis)
  94. return list(tasks.keys())
  95. async def list_task_ids_by_chat_id(request, id):
  96. """
  97. List all tasks associated with a specific ID.
  98. """
  99. if is_redis(request):
  100. return await redis_list_chat_tasks(request.app.state.redis, id)
  101. return chat_tasks.get(id, [])
  102. async def stop_task(request, task_id: str):
  103. """
  104. Cancel a running task and remove it from the global task list.
  105. """
  106. if is_redis(request):
  107. # PUBSUB: All instances check if they have this task, and stop if so.
  108. await redis_send_command(
  109. request.app.state.redis,
  110. {
  111. "action": "stop",
  112. "task_id": task_id,
  113. },
  114. )
  115. # Optionally check if task_id still in Redis a few moments later for feedback?
  116. return {"status": True, "message": f"Stop signal sent for {task_id}"}
  117. task = tasks.get(task_id)
  118. if not task:
  119. raise ValueError(f"Task with ID {task_id} not found.")
  120. task.cancel() # Request task cancellation
  121. try:
  122. await task # Wait for the task to handle the cancellation
  123. except asyncio.CancelledError:
  124. # Task successfully canceled
  125. tasks.pop(task_id, None) # Remove it from the dictionary
  126. return {"status": True, "message": f"Task {task_id} successfully stopped."}
  127. return {"status": False, "message": f"Failed to stop task {task_id}."}