mistral.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. import requests
  2. import logging
  3. import os
  4. import sys
  5. from typing import List, Dict, Any
  6. from langchain_core.documents import Document
  7. from open_webui.env import SRC_LOG_LEVELS, GLOBAL_LOG_LEVEL
  8. logging.basicConfig(stream=sys.stdout, level=GLOBAL_LOG_LEVEL)
  9. log = logging.getLogger(__name__)
  10. log.setLevel(SRC_LOG_LEVELS["RAG"])
  11. class MistralLoader:
  12. """
  13. Loads documents by processing them through the Mistral OCR API.
  14. """
  15. BASE_API_URL = "https://api.mistral.ai/v1"
  16. def __init__(self, api_key: str, file_path: str):
  17. """
  18. Initializes the loader.
  19. Args:
  20. api_key: Your Mistral API key.
  21. file_path: The local path to the PDF file to process.
  22. """
  23. if not api_key:
  24. raise ValueError("API key cannot be empty.")
  25. if not os.path.exists(file_path):
  26. raise FileNotFoundError(f"File not found at {file_path}")
  27. self.api_key = api_key
  28. self.file_path = file_path
  29. self.headers = {"Authorization": f"Bearer {self.api_key}"}
  30. def _handle_response(self, response: requests.Response) -> Dict[str, Any]:
  31. """Checks response status and returns JSON content."""
  32. try:
  33. response.raise_for_status() # Raises HTTPError for bad responses (4xx or 5xx)
  34. # Handle potential empty responses for certain successful requests (e.g., DELETE)
  35. if response.status_code == 204 or not response.content:
  36. return {} # Return empty dict if no content
  37. return response.json()
  38. except requests.exceptions.HTTPError as http_err:
  39. log.error(f"HTTP error occurred: {http_err} - Response: {response.text}")
  40. raise
  41. except requests.exceptions.RequestException as req_err:
  42. log.error(f"Request exception occurred: {req_err}")
  43. raise
  44. except ValueError as json_err: # Includes JSONDecodeError
  45. log.error(f"JSON decode error: {json_err} - Response: {response.text}")
  46. raise # Re-raise after logging
  47. def _upload_file(self) -> str:
  48. """Uploads the file to Mistral for OCR processing."""
  49. log.info("Uploading file to Mistral API")
  50. url = f"{self.BASE_API_URL}/files"
  51. file_name = os.path.basename(self.file_path)
  52. try:
  53. with open(self.file_path, "rb") as f:
  54. files = {"file": (file_name, f, "application/pdf")}
  55. data = {"purpose": "ocr"}
  56. upload_headers = self.headers.copy() # Avoid modifying self.headers
  57. response = requests.post(
  58. url, headers=upload_headers, files=files, data=data
  59. )
  60. response_data = self._handle_response(response)
  61. file_id = response_data.get("id")
  62. if not file_id:
  63. raise ValueError("File ID not found in upload response.")
  64. log.info(f"File uploaded successfully. File ID: {file_id}")
  65. return file_id
  66. except Exception as e:
  67. log.error(f"Failed to upload file: {e}")
  68. raise
  69. def _get_signed_url(self, file_id: str) -> str:
  70. """Retrieves a temporary signed URL for the uploaded file."""
  71. log.info(f"Getting signed URL for file ID: {file_id}")
  72. url = f"{self.BASE_API_URL}/files/{file_id}/url"
  73. params = {"expiry": 1}
  74. signed_url_headers = {**self.headers, "Accept": "application/json"}
  75. try:
  76. response = requests.get(url, headers=signed_url_headers, params=params)
  77. response_data = self._handle_response(response)
  78. signed_url = response_data.get("url")
  79. if not signed_url:
  80. raise ValueError("Signed URL not found in response.")
  81. log.info("Signed URL received.")
  82. return signed_url
  83. except Exception as e:
  84. log.error(f"Failed to get signed URL: {e}")
  85. raise
  86. def _process_ocr(self, signed_url: str) -> Dict[str, Any]:
  87. """Sends the signed URL to the OCR endpoint for processing."""
  88. log.info("Processing OCR via Mistral API")
  89. url = f"{self.BASE_API_URL}/ocr"
  90. ocr_headers = {
  91. **self.headers,
  92. "Content-Type": "application/json",
  93. "Accept": "application/json",
  94. }
  95. payload = {
  96. "model": "mistral-ocr-latest",
  97. "document": {
  98. "type": "document_url",
  99. "document_url": signed_url,
  100. },
  101. "include_image_base64": False,
  102. }
  103. try:
  104. response = requests.post(url, headers=ocr_headers, json=payload)
  105. ocr_response = self._handle_response(response)
  106. log.info("OCR processing done.")
  107. log.debug("OCR response: %s", ocr_response)
  108. return ocr_response
  109. except Exception as e:
  110. log.error(f"Failed during OCR processing: {e}")
  111. raise
  112. def _delete_file(self, file_id: str) -> None:
  113. """Deletes the file from Mistral storage."""
  114. log.info(f"Deleting uploaded file ID: {file_id}")
  115. url = f"{self.BASE_API_URL}/files/{file_id}"
  116. # No specific Accept header needed, default or Authorization is usually sufficient
  117. try:
  118. response = requests.delete(url, headers=self.headers)
  119. delete_response = self._handle_response(
  120. response
  121. ) # Check status, ignore response body unless needed
  122. log.info(
  123. f"File deleted successfully: {delete_response}"
  124. ) # Log the response if available
  125. except Exception as e:
  126. # Log error but don't necessarily halt execution if deletion fails
  127. log.error(f"Failed to delete file ID {file_id}: {e}")
  128. # Depending on requirements, you might choose to raise the error here
  129. def load(self) -> List[Document]:
  130. """
  131. Executes the full OCR workflow: upload, get URL, process OCR, delete file.
  132. Returns:
  133. A list of Document objects, one for each page processed.
  134. """
  135. file_id = None
  136. try:
  137. # 1. Upload file
  138. file_id = self._upload_file()
  139. # 2. Get Signed URL
  140. signed_url = self._get_signed_url(file_id)
  141. # 3. Process OCR
  142. ocr_response = self._process_ocr(signed_url)
  143. # 4. Process results
  144. pages_data = ocr_response.get("pages")
  145. if not pages_data:
  146. log.warning("No pages found in OCR response.")
  147. return [Document(page_content="No text content found", metadata={})]
  148. documents = []
  149. total_pages = len(pages_data)
  150. for page_data in pages_data:
  151. page_content = page_data.get("markdown")
  152. page_index = page_data.get("index") # API uses 0-based index
  153. if page_content is not None and page_index is not None:
  154. documents.append(
  155. Document(
  156. page_content=page_content,
  157. metadata={
  158. "page": page_index, # 0-based index from API
  159. "page_label": page_index
  160. + 1, # 1-based label for convenience
  161. "total_pages": total_pages,
  162. # Add other relevant metadata from page_data if available/needed
  163. # e.g., page_data.get('width'), page_data.get('height')
  164. },
  165. )
  166. )
  167. else:
  168. log.warning(
  169. f"Skipping page due to missing 'markdown' or 'index'. Data: {page_data}"
  170. )
  171. if not documents:
  172. # Case where pages existed but none had valid markdown/index
  173. log.warning(
  174. "OCR response contained pages, but none had valid content/index."
  175. )
  176. return [
  177. Document(
  178. page_content="No text content found in valid pages", metadata={}
  179. )
  180. ]
  181. return documents
  182. except Exception as e:
  183. log.error(f"An error occurred during the loading process: {e}")
  184. # Return an empty list or a specific error document on failure
  185. return [Document(page_content=f"Error during processing: {e}", metadata={})]
  186. finally:
  187. # 5. Delete file (attempt even if prior steps failed after upload)
  188. if file_id:
  189. try:
  190. self._delete_file(file_id)
  191. except Exception as del_e:
  192. # Log deletion error, but don't overwrite original error if one occurred
  193. log.error(
  194. f"Cleanup error: Could not delete file ID {file_id}. Reason: {del_e}"
  195. )