mineru.py 19 KB


  1. import os
  2. import time
  3. import requests
  4. import logging
  5. import tempfile
  6. import zipfile
  7. from typing import List, Optional
  8. from langchain_core.documents import Document
  9. from fastapi import HTTPException, status
  10. log = logging.getLogger(__name__)
  11. class MinerULoader:
  12. """
  13. MinerU document parser loader supporting both Cloud API and Local API modes.
  14. Cloud API: Uses MinerU managed service with async task-based processing
  15. Local API: Uses self-hosted MinerU API with synchronous processing
  16. """
  17. def __init__(
  18. self,
  19. file_path: str,
  20. api_mode: str = "local",
  21. api_url: str = "http://localhost:8000",
  22. api_key: str = "",
  23. params: dict = None,
  24. ):
  25. self.file_path = file_path
  26. self.api_mode = api_mode.lower()
  27. self.api_url = api_url.rstrip("/")
  28. self.api_key = api_key
  29. # Parse params dict with defaults
  30. params = params or {}
  31. self.enable_ocr = params.get("enable_ocr", False)
  32. self.enable_formula = params.get("enable_formula", True)
  33. self.enable_table = params.get("enable_table", True)
  34. self.language = params.get("language", "en")
  35. self.model_version = params.get("model_version", "pipeline")
  36. self.page_ranges = params.get("page_ranges", "")
  37. # Validate API mode
  38. if self.api_mode not in ["local", "cloud"]:
  39. raise ValueError(
  40. f"Invalid API mode: {self.api_mode}. Must be 'local' or 'cloud'"
  41. )
  42. # Validate Cloud API requirements
  43. if self.api_mode == "cloud" and not self.api_key:
  44. raise ValueError("API key is required for Cloud API mode")
  45. def load(self) -> List[Document]:
  46. """
  47. Main entry point for loading and parsing the document.
  48. Routes to Cloud or Local API based on api_mode.
  49. """
  50. try:
  51. if self.api_mode == "cloud":
  52. return self._load_cloud_api()
  53. else:
  54. return self._load_local_api()
  55. except Exception as e:
  56. log.error(f"Error loading document with MinerU: {e}")
  57. raise
  58. def _load_local_api(self) -> List[Document]:
  59. """
  60. Load document using Local API (synchronous).
  61. Posts file to /file_parse endpoint and gets immediate response.
  62. """
  63. log.info(f"Using MinerU Local API at {self.api_url}")
  64. filename = os.path.basename(self.file_path)
  65. # Build form data for Local API
  66. form_data = {
  67. "return_md": "true",
  68. "formula_enable": str(self.enable_formula).lower(),
  69. "table_enable": str(self.enable_table).lower(),
  70. }
  71. # Parse method based on OCR setting
  72. if self.enable_ocr:
  73. form_data["parse_method"] = "ocr"
  74. else:
  75. form_data["parse_method"] = "auto"
  76. # Language configuration (Local API uses lang_list array)
  77. if self.language:
  78. form_data["lang_list"] = self.language
  79. # Backend/model version (Local API uses "backend" parameter)
  80. if self.model_version == "vlm":
  81. form_data["backend"] = "vlm-vllm-engine"
  82. else:
  83. form_data["backend"] = "pipeline"
  84. # Page ranges (Local API uses start_page_id and end_page_id)
  85. if self.page_ranges:
  86. # For simplicity, if page_ranges is specified, log a warning
  87. # Full page range parsing would require parsing the string
  88. log.warning(
  89. f"Page ranges '{self.page_ranges}' specified but Local API uses different format. "
  90. "Consider using start_page_id/end_page_id parameters if needed."
  91. )
  92. try:
  93. with open(self.file_path, "rb") as f:
  94. files = {"files": (filename, f, "application/octet-stream")}
  95. log.info(f"Sending file to MinerU Local API: {filename}")
  96. log.debug(f"Local API parameters: {form_data}")
  97. response = requests.post(
  98. f"{self.api_url}/file_parse",
  99. data=form_data,
  100. files=files,
  101. timeout=300, # 5 minute timeout for large documents
  102. )
  103. response.raise_for_status()
  104. except FileNotFoundError:
  105. raise HTTPException(
  106. status.HTTP_404_NOT_FOUND, detail=f"File not found: {self.file_path}"
  107. )
  108. except requests.Timeout:
  109. raise HTTPException(
  110. status.HTTP_504_GATEWAY_TIMEOUT,
  111. detail="MinerU Local API request timed out",
  112. )
  113. except requests.HTTPError as e:
  114. error_detail = f"MinerU Local API request failed: {e}"
  115. if e.response is not None:
  116. try:
  117. error_data = e.response.json()
  118. error_detail += f" - {error_data}"
  119. except:
  120. error_detail += f" - {e.response.text}"
  121. raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=error_detail)
  122. except Exception as e:
  123. raise HTTPException(
  124. status.HTTP_500_INTERNAL_SERVER_ERROR,
  125. detail=f"Error calling MinerU Local API: {str(e)}",
  126. )
  127. # Parse response
  128. try:
  129. result = response.json()
  130. except ValueError as e:
  131. raise HTTPException(
  132. status.HTTP_502_BAD_GATEWAY,
  133. detail=f"Invalid JSON response from MinerU Local API: {e}",
  134. )
  135. # Extract markdown content from response
  136. if "results" not in result:
  137. raise HTTPException(
  138. status.HTTP_502_BAD_GATEWAY,
  139. detail="MinerU Local API response missing 'results' field",
  140. )
  141. results = result["results"]
  142. if not results:
  143. raise HTTPException(
  144. status.HTTP_400_BAD_REQUEST,
  145. detail="MinerU returned empty results",
  146. )
  147. # Get the first (and typically only) result
  148. file_result = list(results.values())[0]
  149. markdown_content = file_result.get("md_content", "")
  150. if not markdown_content:
  151. raise HTTPException(
  152. status.HTTP_400_BAD_REQUEST,
  153. detail="MinerU returned empty markdown content",
  154. )
  155. log.info(f"Successfully parsed document with MinerU Local API: {filename}")
  156. # Create metadata
  157. metadata = {
  158. "source": filename,
  159. "api_mode": "local",
  160. "backend": result.get("backend", "unknown"),
  161. "version": result.get("version", "unknown"),
  162. }
  163. return [Document(page_content=markdown_content, metadata=metadata)]
  164. def _load_cloud_api(self) -> List[Document]:
  165. """
  166. Load document using Cloud API (asynchronous).
  167. Uses batch upload endpoint to avoid need for public file URLs.
  168. """
  169. log.info(f"Using MinerU Cloud API at {self.api_url}")
  170. filename = os.path.basename(self.file_path)
  171. # Step 1: Request presigned upload URL
  172. batch_id, upload_url = self._request_upload_url(filename)
  173. # Step 2: Upload file to presigned URL
  174. self._upload_to_presigned_url(upload_url)
  175. # Step 3: Poll for results
  176. result = self._poll_batch_status(batch_id, filename)
  177. # Step 4: Download and extract markdown from ZIP
  178. markdown_content = self._download_and_extract_zip(
  179. result["full_zip_url"], filename
  180. )
  181. log.info(f"Successfully parsed document with MinerU Cloud API: {filename}")
  182. # Create metadata
  183. metadata = {
  184. "source": filename,
  185. "api_mode": "cloud",
  186. "batch_id": batch_id,
  187. }
  188. return [Document(page_content=markdown_content, metadata=metadata)]
  189. def _request_upload_url(self, filename: str) -> tuple:
  190. """
  191. Request presigned upload URL from Cloud API.
  192. Returns (batch_id, upload_url).
  193. """
  194. headers = {
  195. "Authorization": f"Bearer {self.api_key}",
  196. "Content-Type": "application/json",
  197. }
  198. # Build request body
  199. request_body = {
  200. "enable_formula": self.enable_formula,
  201. "enable_table": self.enable_table,
  202. "language": self.language,
  203. "model_version": self.model_version,
  204. "files": [
  205. {
  206. "name": filename,
  207. "is_ocr": self.enable_ocr,
  208. }
  209. ],
  210. }
  211. # Add page ranges if specified
  212. if self.page_ranges:
  213. request_body["files"][0]["page_ranges"] = self.page_ranges
  214. log.info(f"Requesting upload URL for: {filename}")
  215. log.debug(f"Cloud API request body: {request_body}")
  216. try:
  217. response = requests.post(
  218. f"{self.api_url}/file-urls/batch",
  219. headers=headers,
  220. json=request_body,
  221. timeout=30,
  222. )
  223. response.raise_for_status()
  224. except requests.HTTPError as e:
  225. error_detail = f"Failed to request upload URL: {e}"
  226. if e.response is not None:
  227. try:
  228. error_data = e.response.json()
  229. error_detail += f" - {error_data.get('msg', error_data)}"
  230. except:
  231. error_detail += f" - {e.response.text}"
  232. raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=error_detail)
  233. except Exception as e:
  234. raise HTTPException(
  235. status.HTTP_500_INTERNAL_SERVER_ERROR,
  236. detail=f"Error requesting upload URL: {str(e)}",
  237. )
  238. try:
  239. result = response.json()
  240. except ValueError as e:
  241. raise HTTPException(
  242. status.HTTP_502_BAD_GATEWAY,
  243. detail=f"Invalid JSON response: {e}",
  244. )
  245. # Check for API error response
  246. if result.get("code") != 0:
  247. raise HTTPException(
  248. status.HTTP_400_BAD_REQUEST,
  249. detail=f"MinerU Cloud API error: {result.get('msg', 'Unknown error')}",
  250. )
  251. data = result.get("data", {})
  252. batch_id = data.get("batch_id")
  253. file_urls = data.get("file_urls", [])
  254. if not batch_id or not file_urls:
  255. raise HTTPException(
  256. status.HTTP_502_BAD_GATEWAY,
  257. detail="MinerU Cloud API response missing batch_id or file_urls",
  258. )
  259. upload_url = file_urls[0]
  260. log.info(f"Received upload URL for batch: {batch_id}")
  261. return batch_id, upload_url
  262. def _upload_to_presigned_url(self, upload_url: str) -> None:
  263. """
  264. Upload file to presigned URL (no authentication needed).
  265. """
  266. log.info(f"Uploading file to presigned URL")
  267. try:
  268. with open(self.file_path, "rb") as f:
  269. response = requests.put(
  270. upload_url,
  271. data=f,
  272. timeout=300, # 5 minute timeout for large files
  273. )
  274. response.raise_for_status()
  275. except FileNotFoundError:
  276. raise HTTPException(
  277. status.HTTP_404_NOT_FOUND, detail=f"File not found: {self.file_path}"
  278. )
  279. except requests.Timeout:
  280. raise HTTPException(
  281. status.HTTP_504_GATEWAY_TIMEOUT,
  282. detail="File upload to presigned URL timed out",
  283. )
  284. except requests.HTTPError as e:
  285. raise HTTPException(
  286. status.HTTP_400_BAD_REQUEST,
  287. detail=f"Failed to upload file to presigned URL: {e}",
  288. )
  289. except Exception as e:
  290. raise HTTPException(
  291. status.HTTP_500_INTERNAL_SERVER_ERROR,
  292. detail=f"Error uploading file: {str(e)}",
  293. )
  294. log.info("File uploaded successfully")
  295. def _poll_batch_status(self, batch_id: str, filename: str) -> dict:
  296. """
  297. Poll batch status until completion.
  298. Returns the result dict for the file.
  299. """
  300. headers = {
  301. "Authorization": f"Bearer {self.api_key}",
  302. }
  303. max_iterations = 300 # 10 minutes max (2 seconds per iteration)
  304. poll_interval = 2 # seconds
  305. log.info(f"Polling batch status: {batch_id}")
  306. for iteration in range(max_iterations):
  307. try:
  308. response = requests.get(
  309. f"{self.api_url}/extract-results/batch/{batch_id}",
  310. headers=headers,
  311. timeout=30,
  312. )
  313. response.raise_for_status()
  314. except requests.HTTPError as e:
  315. error_detail = f"Failed to poll batch status: {e}"
  316. if e.response is not None:
  317. try:
  318. error_data = e.response.json()
  319. error_detail += f" - {error_data.get('msg', error_data)}"
  320. except:
  321. error_detail += f" - {e.response.text}"
  322. raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=error_detail)
  323. except Exception as e:
  324. raise HTTPException(
  325. status.HTTP_500_INTERNAL_SERVER_ERROR,
  326. detail=f"Error polling batch status: {str(e)}",
  327. )
  328. try:
  329. result = response.json()
  330. except ValueError as e:
  331. raise HTTPException(
  332. status.HTTP_502_BAD_GATEWAY,
  333. detail=f"Invalid JSON response while polling: {e}",
  334. )
  335. # Check for API error response
  336. if result.get("code") != 0:
  337. raise HTTPException(
  338. status.HTTP_400_BAD_REQUEST,
  339. detail=f"MinerU Cloud API error: {result.get('msg', 'Unknown error')}",
  340. )
  341. data = result.get("data", {})
  342. extract_result = data.get("extract_result", [])
  343. # Find our file in the batch results
  344. file_result = None
  345. for item in extract_result:
  346. if item.get("file_name") == filename:
  347. file_result = item
  348. break
  349. if not file_result:
  350. raise HTTPException(
  351. status.HTTP_502_BAD_GATEWAY,
  352. detail=f"File {filename} not found in batch results",
  353. )
  354. state = file_result.get("state")
  355. if state == "done":
  356. log.info(f"Processing complete for {filename}")
  357. return file_result
  358. elif state == "failed":
  359. error_msg = file_result.get("err_msg", "Unknown error")
  360. raise HTTPException(
  361. status.HTTP_400_BAD_REQUEST,
  362. detail=f"MinerU processing failed: {error_msg}",
  363. )
  364. elif state in ["waiting-file", "pending", "running", "converting"]:
  365. # Still processing
  366. if iteration % 10 == 0: # Log every 20 seconds
  367. log.info(
  368. f"Processing status: {state} (iteration {iteration + 1}/{max_iterations})"
  369. )
  370. time.sleep(poll_interval)
  371. else:
  372. log.warning(f"Unknown state: {state}")
  373. time.sleep(poll_interval)
  374. # Timeout
  375. raise HTTPException(
  376. status.HTTP_504_GATEWAY_TIMEOUT,
  377. detail="MinerU processing timed out after 10 minutes",
  378. )
  379. def _download_and_extract_zip(self, zip_url: str, filename: str) -> str:
  380. """
  381. Download ZIP file from CDN and extract markdown content.
  382. Returns the markdown content as a string.
  383. """
  384. log.info(f"Downloading results from: {zip_url}")
  385. try:
  386. response = requests.get(zip_url, timeout=60)
  387. response.raise_for_status()
  388. except requests.HTTPError as e:
  389. raise HTTPException(
  390. status.HTTP_400_BAD_REQUEST,
  391. detail=f"Failed to download results ZIP: {e}",
  392. )
  393. except Exception as e:
  394. raise HTTPException(
  395. status.HTTP_500_INTERNAL_SERVER_ERROR,
  396. detail=f"Error downloading results: {str(e)}",
  397. )
  398. # Save ZIP to temporary file and extract
  399. try:
  400. with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tmp_zip:
  401. tmp_zip.write(response.content)
  402. tmp_zip_path = tmp_zip.name
  403. with tempfile.TemporaryDirectory() as tmp_dir:
  404. # Extract ZIP
  405. with zipfile.ZipFile(tmp_zip_path, "r") as zip_ref:
  406. zip_ref.extractall(tmp_dir)
  407. # Find markdown file - search recursively for any .md file
  408. markdown_content = None
  409. found_md_path = None
  410. # First, list all files in the ZIP for debugging
  411. all_files = []
  412. for root, dirs, files in os.walk(tmp_dir):
  413. for file in files:
  414. full_path = os.path.join(root, file)
  415. all_files.append(full_path)
  416. # Look for any .md file
  417. if file.endswith(".md"):
  418. found_md_path = full_path
  419. log.info(f"Found markdown file at: {full_path}")
  420. try:
  421. with open(full_path, "r", encoding="utf-8") as f:
  422. markdown_content = f.read()
  423. if (
  424. markdown_content
  425. ): # Use the first non-empty markdown file
  426. break
  427. except Exception as e:
  428. log.warning(f"Failed to read {full_path}: {e}")
  429. if markdown_content:
  430. break
  431. if markdown_content is None:
  432. log.error(f"Available files in ZIP: {all_files}")
  433. # Try to provide more helpful error message
  434. md_files = [f for f in all_files if f.endswith(".md")]
  435. if md_files:
  436. error_msg = (
  437. f"Found .md files but couldn't read them: {md_files}"
  438. )
  439. else:
  440. error_msg = (
  441. f"No .md files found in ZIP. Available files: {all_files}"
  442. )
  443. raise HTTPException(
  444. status.HTTP_502_BAD_GATEWAY,
  445. detail=error_msg,
  446. )
  447. # Clean up temporary ZIP file
  448. os.unlink(tmp_zip_path)
  449. except zipfile.BadZipFile as e:
  450. raise HTTPException(
  451. status.HTTP_502_BAD_GATEWAY,
  452. detail=f"Invalid ZIP file received: {e}",
  453. )
  454. except Exception as e:
  455. raise HTTPException(
  456. status.HTTP_500_INTERNAL_SERVER_ERROR,
  457. detail=f"Error extracting ZIP: {str(e)}",
  458. )
  459. if not markdown_content:
  460. raise HTTPException(
  461. status.HTTP_400_BAD_REQUEST,
  462. detail="Extracted markdown content is empty",
  463. )
  464. log.info(
  465. f"Successfully extracted markdown content ({len(markdown_content)} characters)"
  466. )
  467. return markdown_content