datalab_marker.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. import os
  2. import time
  3. import requests
  4. import logging
  5. import json
  6. from typing import List, Optional
  7. from langchain_core.documents import Document
  8. from fastapi import HTTPException, status
  9. log = logging.getLogger(__name__)
  10. class DatalabMarkerLoader:
  11. def __init__(
  12. self,
  13. file_path: str,
  14. api_key: str,
  15. api_base_url: str,
  16. additional_config: Optional[str] = None,
  17. use_llm: bool = False,
  18. skip_cache: bool = False,
  19. force_ocr: bool = False,
  20. paginate: bool = False,
  21. strip_existing_ocr: bool = False,
  22. disable_image_extraction: bool = False,
  23. format_lines: bool = False,
  24. output_format: str = None,
  25. ):
  26. self.file_path = file_path
  27. self.api_key = api_key
  28. self.api_base_url = api_base_url
  29. self.additional_config = additional_config
  30. self.use_llm = use_llm
  31. self.skip_cache = skip_cache
  32. self.force_ocr = force_ocr
  33. self.paginate = paginate
  34. self.strip_existing_ocr = strip_existing_ocr
  35. self.disable_image_extraction = disable_image_extraction
  36. self.format_lines = format_lines
  37. self.output_format = output_format
  38. def _get_mime_type(self, filename: str) -> str:
  39. ext = filename.rsplit(".", 1)[-1].lower()
  40. mime_map = {
  41. "pdf": "application/pdf",
  42. "xls": "application/vnd.ms-excel",
  43. "xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
  44. "ods": "application/vnd.oasis.opendocument.spreadsheet",
  45. "doc": "application/msword",
  46. "docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
  47. "odt": "application/vnd.oasis.opendocument.text",
  48. "ppt": "application/vnd.ms-powerpoint",
  49. "pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation",
  50. "odp": "application/vnd.oasis.opendocument.presentation",
  51. "html": "text/html",
  52. "epub": "application/epub+zip",
  53. "png": "image/png",
  54. "jpeg": "image/jpeg",
  55. "jpg": "image/jpeg",
  56. "webp": "image/webp",
  57. "gif": "image/gif",
  58. "tiff": "image/tiff",
  59. }
  60. return mime_map.get(ext, "application/octet-stream")
  61. def check_marker_request_status(self, request_id: str) -> dict:
  62. url = f"{self.api_base_url}/{request_id}"
  63. headers = {"X-Api-Key": self.api_key}
  64. try:
  65. response = requests.get(url, headers=headers)
  66. response.raise_for_status()
  67. result = response.json()
  68. log.info(f"Marker API status check for request {request_id}: {result}")
  69. return result
  70. except requests.HTTPError as e:
  71. log.error(f"Error checking Marker request status: {e}")
  72. raise HTTPException(
  73. status.HTTP_502_BAD_GATEWAY,
  74. detail=f"Failed to check Marker request: {e}",
  75. )
  76. except ValueError as e:
  77. log.error(f"Invalid JSON checking Marker request: {e}")
  78. raise HTTPException(
  79. status.HTTP_502_BAD_GATEWAY, detail=f"Invalid JSON: {e}"
  80. )
  81. def load(self) -> List[Document]:
  82. filename = os.path.basename(self.file_path)
  83. mime_type = self._get_mime_type(filename)
  84. headers = {"X-Api-Key": self.api_key}
  85. form_data = {
  86. "use_llm": str(self.use_llm).lower(),
  87. "skip_cache": str(self.skip_cache).lower(),
  88. "force_ocr": str(self.force_ocr).lower(),
  89. "paginate": str(self.paginate).lower(),
  90. "strip_existing_ocr": str(self.strip_existing_ocr).lower(),
  91. "disable_image_extraction": str(self.disable_image_extraction).lower(),
  92. "format_lines": str(self.format_lines).lower(),
  93. "output_format": self.output_format,
  94. }
  95. if self.additional_config and self.additional_config.strip():
  96. form_data["additional_config"] = self.additional_config
  97. log.info(
  98. f"Datalab Marker POST request parameters: {{'filename': '{filename}', 'mime_type': '{mime_type}', **{form_data}}}"
  99. )
  100. try:
  101. with open(self.file_path, "rb") as f:
  102. files = {"file": (filename, f, mime_type)}
  103. response = requests.post(
  104. f"{self.api_base_url}",
  105. data=form_data,
  106. files=files,
  107. headers=headers,
  108. )
  109. response.raise_for_status()
  110. result = response.json()
  111. except FileNotFoundError:
  112. raise HTTPException(
  113. status.HTTP_404_NOT_FOUND, detail=f"File not found: {self.file_path}"
  114. )
  115. except requests.HTTPError as e:
  116. raise HTTPException(
  117. status.HTTP_400_BAD_REQUEST,
  118. detail=f"Datalab Marker request failed: {e}",
  119. )
  120. except ValueError as e:
  121. raise HTTPException(
  122. status.HTTP_502_BAD_GATEWAY, detail=f"Invalid JSON response: {e}"
  123. )
  124. except Exception as e:
  125. raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
  126. if not result.get("success"):
  127. raise HTTPException(
  128. status.HTTP_400_BAD_REQUEST,
  129. detail=f"Datalab Marker request failed: {result.get('error', 'Unknown error')}",
  130. )
  131. check_url = result.get("request_check_url")
  132. request_id = result.get("request_id")
  133. # Check if this is a direct response (self-hosted) or polling response (DataLab)
  134. if check_url:
  135. # DataLab polling pattern
  136. for _ in range(300): # Up to 10 minutes
  137. time.sleep(2)
  138. try:
  139. poll_response = requests.get(check_url, headers=headers)
  140. poll_response.raise_for_status()
  141. poll_result = poll_response.json()
  142. except (requests.HTTPError, ValueError) as e:
  143. raw_body = poll_response.text
  144. log.error(f"Polling error: {e}, response body: {raw_body}")
  145. raise HTTPException(
  146. status.HTTP_502_BAD_GATEWAY, detail=f"Polling failed: {e}"
  147. )
  148. status_val = poll_result.get("status")
  149. success_val = poll_result.get("success")
  150. if status_val == "complete":
  151. summary = {
  152. k: poll_result.get(k)
  153. for k in (
  154. "status",
  155. "output_format",
  156. "success",
  157. "error",
  158. "page_count",
  159. "total_cost",
  160. )
  161. }
  162. log.info(
  163. f"Marker processing completed successfully: {json.dumps(summary, indent=2)}"
  164. )
  165. break
  166. if status_val == "failed" or success_val is False:
  167. log.error(
  168. f"Marker poll failed full response: {json.dumps(poll_result, indent=2)}"
  169. )
  170. error_msg = (
  171. poll_result.get("error")
  172. or "Marker returned failure without error message"
  173. )
  174. raise HTTPException(
  175. status.HTTP_400_BAD_REQUEST,
  176. detail=f"Marker processing failed: {error_msg}",
  177. )
  178. else:
  179. raise HTTPException(
  180. status.HTTP_504_GATEWAY_TIMEOUT,
  181. detail="Marker processing timed out",
  182. )
  183. if not poll_result.get("success", False):
  184. error_msg = poll_result.get("error") or "Unknown processing error"
  185. raise HTTPException(
  186. status.HTTP_400_BAD_REQUEST,
  187. detail=f"Final processing failed: {error_msg}",
  188. )
  189. # DataLab format - content in format-specific fields
  190. content_key = self.output_format.lower()
  191. raw_content = poll_result.get(content_key)
  192. final_result = poll_result
  193. else:
  194. # Self-hosted direct response - content in "output" field
  195. if "output" in result:
  196. log.info("Self-hosted Marker returned direct response without polling")
  197. raw_content = result.get("output")
  198. final_result = result
  199. else:
  200. available_fields = (
  201. list(result.keys())
  202. if isinstance(result, dict)
  203. else "non-dict response"
  204. )
  205. raise HTTPException(
  206. status.HTTP_502_BAD_GATEWAY,
  207. detail=f"Custom Marker endpoint returned success but no 'output' field found. Available fields: {available_fields}. Expected either 'request_check_url' for polling or 'output' field for direct response.",
  208. )
  209. if self.output_format.lower() == "json":
  210. full_text = json.dumps(raw_content, indent=2)
  211. elif self.output_format.lower() in {"markdown", "html"}:
  212. full_text = str(raw_content).strip()
  213. else:
  214. raise HTTPException(
  215. status.HTTP_400_BAD_REQUEST,
  216. detail=f"Unsupported output format: {self.output_format}",
  217. )
  218. if not full_text:
  219. raise HTTPException(
  220. status.HTTP_400_BAD_REQUEST,
  221. detail="Marker returned empty content",
  222. )
  223. marker_output_dir = os.path.join("/app/backend/data/uploads", "marker_output")
  224. os.makedirs(marker_output_dir, exist_ok=True)
  225. file_ext_map = {"markdown": "md", "json": "json", "html": "html"}
  226. file_ext = file_ext_map.get(self.output_format.lower(), "txt")
  227. output_filename = f"{os.path.splitext(filename)[0]}.{file_ext}"
  228. output_path = os.path.join(marker_output_dir, output_filename)
  229. try:
  230. with open(output_path, "w", encoding="utf-8") as f:
  231. f.write(full_text)
  232. log.info(f"Saved Marker output to: {output_path}")
  233. except Exception as e:
  234. log.warning(f"Failed to write marker output to disk: {e}")
  235. metadata = {
  236. "source": filename,
  237. "output_format": final_result.get("output_format", self.output_format),
  238. "page_count": final_result.get("page_count", 0),
  239. "processed_with_llm": self.use_llm,
  240. "request_id": request_id or "",
  241. }
  242. images = final_result.get("images", {})
  243. if images:
  244. metadata["image_count"] = len(images)
  245. metadata["images"] = json.dumps(list(images.keys()))
  246. for k, v in metadata.items():
  247. if isinstance(v, (dict, list)):
  248. metadata[k] = json.dumps(v)
  249. elif v is None:
  250. metadata[k] = ""
  251. return [Document(page_content=full_text, metadata=metadata)]