main.py 5.6 KB


  1. import re
  2. import requests
  3. from fastapi import (
  4. FastAPI,
  5. Request,
  6. Depends,
  7. HTTPException,
  8. status,
  9. UploadFile,
  10. File,
  11. Form,
  12. )
  13. from fastapi.middleware.cors import CORSMiddleware
  14. from faster_whisper import WhisperModel
  15. from constants import ERROR_MESSAGES
  16. from utils.utils import (
  17. get_current_user,
  18. get_admin_user,
  19. )
  20. from utils.misc import calculate_sha256
  21. from typing import Optional
  22. from pydantic import BaseModel
  23. from config import AUTOMATIC1111_BASE_URL
  24. app = FastAPI()
  25. app.add_middleware(
  26. CORSMiddleware,
  27. allow_origins=["*"],
  28. allow_credentials=True,
  29. allow_methods=["*"],
  30. allow_headers=["*"],
  31. )
  32. app.state.AUTOMATIC1111_BASE_URL = AUTOMATIC1111_BASE_URL
  33. app.state.ENABLED = app.state.AUTOMATIC1111_BASE_URL != ""
  34. app.state.IMAGE_SIZE = "512x512"
  35. @app.get("/enabled", response_model=bool)
  36. async def get_enable_status(request: Request, user=Depends(get_admin_user)):
  37. return app.state.ENABLED
  38. @app.get("/enabled/toggle", response_model=bool)
  39. async def toggle_enabled(request: Request, user=Depends(get_admin_user)):
  40. try:
  41. r = requests.head(app.state.AUTOMATIC1111_BASE_URL)
  42. app.state.ENABLED = not app.state.ENABLED
  43. return app.state.ENABLED
  44. except Exception as e:
  45. raise HTTPException(status_code=r.status_code, detail=ERROR_MESSAGES.DEFAULT(e))
  46. class UrlUpdateForm(BaseModel):
  47. url: str
  48. @app.get("/url")
  49. async def get_openai_url(user=Depends(get_admin_user)):
  50. return {"AUTOMATIC1111_BASE_URL": app.state.AUTOMATIC1111_BASE_URL}
  51. @app.post("/url/update")
  52. async def update_openai_url(form_data: UrlUpdateForm, user=Depends(get_admin_user)):
  53. if form_data.url == "":
  54. app.state.AUTOMATIC1111_BASE_URL = AUTOMATIC1111_BASE_URL
  55. else:
  56. app.state.AUTOMATIC1111_BASE_URL = form_data.url.strip("/")
  57. return {
  58. "AUTOMATIC1111_BASE_URL": app.state.AUTOMATIC1111_BASE_URL,
  59. "status": True,
  60. }
  61. class ImageSizeUpdateForm(BaseModel):
  62. size: str
  63. @app.get("/size")
  64. async def get_image_size(user=Depends(get_admin_user)):
  65. return {"IMAGE_SIZE": app.state.IMAGE_SIZE}
  66. @app.post("/size/update")
  67. async def update_image_size(
  68. form_data: ImageSizeUpdateForm, user=Depends(get_admin_user)
  69. ):
  70. pattern = r"^\d+x\d+$" # Regular expression pattern
  71. if re.match(pattern, form_data.size):
  72. app.state.IMAGE_SIZE = form_data.size
  73. return {
  74. "IMAGE_SIZE": app.state.IMAGE_SIZE,
  75. "status": True,
  76. }
  77. else:
  78. raise HTTPException(
  79. status_code=400,
  80. detail=ERROR_MESSAGES.INCORRECT_FORMAT(" (e.g., 512x512)."),
  81. )
  82. class ImageStepsUpdateForm(BaseModel):
  83. steps: int
  84. @app.get("/steps")
  85. async def get_image_size(user=Depends(get_admin_user)):
  86. return {"IMAGE_STEPS": app.state.IMAGE_STEPS}
  87. @app.post("/steps/update")
  88. async def update_image_size(
  89. form_data: ImageStepsUpdateForm, user=Depends(get_admin_user)
  90. ):
  91. if form_data.steps >= 0:
  92. app.state.IMAGE_STEPS = form_data.steps
  93. return {
  94. "IMAGE_STEPS": app.state.IMAGE_STEPS,
  95. "status": True,
  96. }
  97. else:
  98. raise HTTPException(
  99. status_code=400,
  100. detail=ERROR_MESSAGES.INCORRECT_FORMAT(" (e.g., 50)."),
  101. )
  102. @app.get("/models")
  103. def get_models(user=Depends(get_current_user)):
  104. try:
  105. r = requests.get(url=f"{app.state.AUTOMATIC1111_BASE_URL}/sdapi/v1/sd-models")
  106. models = r.json()
  107. return models
  108. except Exception as e:
  109. raise HTTPException(status_code=r.status_code, detail=ERROR_MESSAGES.DEFAULT(e))
  110. @app.get("/models/default")
  111. async def get_default_model(user=Depends(get_admin_user)):
  112. try:
  113. r = requests.get(url=f"{app.state.AUTOMATIC1111_BASE_URL}/sdapi/v1/options")
  114. options = r.json()
  115. return {"model": options["sd_model_checkpoint"]}
  116. except Exception as e:
  117. raise HTTPException(status_code=r.status_code, detail=ERROR_MESSAGES.DEFAULT(e))
  118. class UpdateModelForm(BaseModel):
  119. model: str
  120. def set_model_handler(model: str):
  121. r = requests.get(url=f"{app.state.AUTOMATIC1111_BASE_URL}/sdapi/v1/options")
  122. options = r.json()
  123. if model != options["sd_model_checkpoint"]:
  124. options["sd_model_checkpoint"] = model
  125. r = requests.post(
  126. url=f"{app.state.AUTOMATIC1111_BASE_URL}/sdapi/v1/options", json=options
  127. )
  128. return options
  129. @app.post("/models/default/update")
  130. def update_default_model(
  131. form_data: UpdateModelForm,
  132. user=Depends(get_current_user),
  133. ):
  134. return set_model_handler(form_data.model)
  135. class GenerateImageForm(BaseModel):
  136. model: Optional[str] = None
  137. prompt: str
  138. n: int = 1
  139. size: str = "512x512"
  140. negative_prompt: Optional[str] = None
  141. @app.post("/generations")
  142. def generate_image(
  143. form_data: GenerateImageForm,
  144. user=Depends(get_current_user),
  145. ):
  146. print(form_data)
  147. try:
  148. if form_data.model:
  149. set_model_handler(form_data.model)
  150. width, height = tuple(map(int, app.state.IMAGE_SIZE.split("x")))
  151. data = {
  152. "prompt": form_data.prompt,
  153. "batch_size": form_data.n,
  154. "width": width,
  155. "height": height,
  156. }
  157. if app.state.IMAGE_STEPS != None:
  158. data["steps"] = app.state.IMAGE_STEPS
  159. if form_data.negative_prompt != None:
  160. data["negative_prompt"] = form_data.negative_prompt
  161. print(data)
  162. r = requests.post(
  163. url=f"{app.state.AUTOMATIC1111_BASE_URL}/sdapi/v1/txt2img",
  164. json=data,
  165. )
  166. return r.json()
  167. except Exception as e:
  168. print(e)
  169. raise HTTPException(status_code=r.status_code, detail=ERROR_MESSAGES.DEFAULT(e))