1
0

tasks.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. # tasks.py
  2. import asyncio
  3. from typing import Dict
  4. from uuid import uuid4
  5. import json
  6. import logging
  7. from redis.asyncio import Redis
  8. from fastapi import Request
  9. from typing import Dict, List, Optional
  10. from open_webui.env import SRC_LOG_LEVELS
  11. log = logging.getLogger(__name__)
  12. log.setLevel(SRC_LOG_LEVELS["MAIN"])
  13. # A dictionary to keep track of active tasks
  14. tasks: Dict[str, asyncio.Task] = {}
  15. chat_tasks = {}
  16. REDIS_TASKS_KEY = "open-webui:tasks"
  17. REDIS_CHAT_TASKS_KEY = "open-webui:tasks:chat"
  18. REDIS_PUBSUB_CHANNEL = "open-webui:tasks:commands"
  19. def is_redis(request: Request) -> bool:
  20. # Called everywhere a request is available to check Redis
  21. return hasattr(request.app.state, "redis") and (request.app.state.redis is not None)
  22. async def redis_task_command_listener(app):
  23. redis: Redis = app.state.redis
  24. pubsub = redis.pubsub()
  25. await pubsub.subscribe(REDIS_PUBSUB_CHANNEL)
  26. async for message in pubsub.listen():
  27. if message["type"] != "message":
  28. continue
  29. try:
  30. command = json.loads(message["data"])
  31. if command.get("action") == "stop":
  32. task_id = command.get("task_id")
  33. local_task = tasks.get(task_id)
  34. if local_task:
  35. local_task.cancel()
  36. except Exception as e:
  37. log.exception(f"Error handling distributed task command: {e}")
  38. ### ------------------------------
  39. ### REDIS-ENABLED HANDLERS
  40. ### ------------------------------
  41. async def redis_save_task(redis: Redis, task_id: str, chat_id: Optional[str]):
  42. pipe = redis.pipeline()
  43. pipe.hset(REDIS_TASKS_KEY, task_id, chat_id or "")
  44. if chat_id:
  45. pipe.sadd(f"{REDIS_CHAT_TASKS_KEY}:{chat_id}", task_id)
  46. await pipe.execute()
  47. async def redis_cleanup_task(redis: Redis, task_id: str, chat_id: Optional[str]):
  48. pipe = redis.pipeline()
  49. pipe.hdel(REDIS_TASKS_KEY, task_id)
  50. if chat_id:
  51. pipe.srem(f"{REDIS_CHAT_TASKS_KEY}:{chat_id}", task_id)
  52. if (await pipe.scard(f"{REDIS_CHAT_TASKS_KEY}:{chat_id}").execute())[-1] == 0:
  53. pipe.delete(f"{REDIS_CHAT_TASKS_KEY}:{chat_id}") # Remove if empty set
  54. await pipe.execute()
  55. async def redis_list_tasks(redis: Redis) -> List[str]:
  56. return list(await redis.hkeys(REDIS_TASKS_KEY))
  57. async def redis_list_chat_tasks(redis: Redis, chat_id: str) -> List[str]:
  58. return list(await redis.smembers(f"{REDIS_CHAT_TASKS_KEY}:{chat_id}"))
  59. async def redis_send_command(redis: Redis, command: dict):
  60. await redis.publish(REDIS_PUBSUB_CHANNEL, json.dumps(command))
  61. async def cleanup_task(request, task_id: str, id=None):
  62. """
  63. Remove a completed or canceled task from the global `tasks` dictionary.
  64. """
  65. if is_redis(request):
  66. await redis_cleanup_task(request.app.state.redis, task_id, id)
  67. tasks.pop(task_id, None) # Remove the task if it exists
  68. # If an ID is provided, remove the task from the chat_tasks dictionary
  69. if id and task_id in chat_tasks.get(id, []):
  70. chat_tasks[id].remove(task_id)
  71. if not chat_tasks[id]: # If no tasks left for this ID, remove the entry
  72. chat_tasks.pop(id, None)
  73. async def create_task(request, coroutine, id=None):
  74. """
  75. Create a new asyncio task and add it to the global task dictionary.
  76. """
  77. task_id = str(uuid4()) # Generate a unique ID for the task
  78. task = asyncio.create_task(coroutine) # Create the task
  79. # Add a done callback for cleanup
  80. task.add_done_callback(
  81. lambda t: asyncio.create_task(cleanup_task(request, task_id, id))
  82. )
  83. tasks[task_id] = task
  84. # If an ID is provided, associate the task with that ID
  85. if chat_tasks.get(id):
  86. chat_tasks[id].append(task_id)
  87. else:
  88. chat_tasks[id] = [task_id]
  89. if is_redis(request):
  90. await redis_save_task(request.app.state.redis, task_id, id)
  91. return task_id, task
  92. async def list_tasks(request):
  93. """
  94. List all currently active task IDs.
  95. """
  96. if is_redis(request):
  97. return await redis_list_tasks(request.app.state.redis)
  98. return list(tasks.keys())
  99. async def list_task_ids_by_chat_id(request, id):
  100. """
  101. List all tasks associated with a specific ID.
  102. """
  103. if is_redis(request):
  104. return await redis_list_chat_tasks(request.app.state.redis, id)
  105. return chat_tasks.get(id, [])
  106. async def stop_task(request, task_id: str):
  107. """
  108. Cancel a running task and remove it from the global task list.
  109. """
  110. if is_redis(request):
  111. # PUBSUB: All instances check if they have this task, and stop if so.
  112. await redis_send_command(
  113. request.app.state.redis,
  114. {
  115. "action": "stop",
  116. "task_id": task_id,
  117. },
  118. )
  119. # Optionally check if task_id still in Redis a few moments later for feedback?
  120. return {"status": True, "message": f"Stop signal sent for {task_id}"}
  121. task = tasks.get(task_id)
  122. if not task:
  123. raise ValueError(f"Task with ID {task_id} not found.")
  124. task.cancel() # Request task cancellation
  125. try:
  126. await task # Wait for the task to handle the cancellation
  127. except asyncio.CancelledError:
  128. # Task successfully canceled
  129. tasks.pop(task_id, None) # Remove it from the dictionary
  130. return {"status": True, "message": f"Task {task_id} successfully stopped."}
  131. return {"status": False, "message": f"Failed to stop task {task_id}."}