mineru.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522
  1. import os
  2. import time
  3. import requests
  4. import logging
  5. import tempfile
  6. import zipfile
  7. from typing import List, Optional
  8. from langchain_core.documents import Document
  9. from fastapi import HTTPException, status
  10. log = logging.getLogger(__name__)
  11. class MinerULoader:
  12. """
  13. MinerU document parser loader supporting both Cloud API and Local API modes.
  14. Cloud API: Uses MinerU managed service with async task-based processing
  15. Local API: Uses self-hosted MinerU API with synchronous processing
  16. """
  17. def __init__(
  18. self,
  19. file_path: str,
  20. api_mode: str = "local",
  21. api_url: str = "http://localhost:8000",
  22. api_key: str = "",
  23. params: dict = None,
  24. ):
  25. self.file_path = file_path
  26. self.api_mode = api_mode.lower()
  27. self.api_url = api_url.rstrip("/")
  28. self.api_key = api_key
  29. # Parse params dict with defaults
  30. self.params = params or {}
  31. self.enable_ocr = params.get("enable_ocr", False)
  32. self.enable_formula = params.get("enable_formula", True)
  33. self.enable_table = params.get("enable_table", True)
  34. self.language = params.get("language", "en")
  35. self.model_version = params.get("model_version", "pipeline")
  36. self.page_ranges = self.params.pop("page_ranges", "")
  37. # Validate API mode
  38. if self.api_mode not in ["local", "cloud"]:
  39. raise ValueError(
  40. f"Invalid API mode: {self.api_mode}. Must be 'local' or 'cloud'"
  41. )
  42. # Validate Cloud API requirements
  43. if self.api_mode == "cloud" and not self.api_key:
  44. raise ValueError("API key is required for Cloud API mode")
  45. def load(self) -> List[Document]:
  46. """
  47. Main entry point for loading and parsing the document.
  48. Routes to Cloud or Local API based on api_mode.
  49. """
  50. try:
  51. if self.api_mode == "cloud":
  52. return self._load_cloud_api()
  53. else:
  54. return self._load_local_api()
  55. except Exception as e:
  56. log.error(f"Error loading document with MinerU: {e}")
  57. raise
  58. def _load_local_api(self) -> List[Document]:
  59. """
  60. Load document using Local API (synchronous).
  61. Posts file to /file_parse endpoint and gets immediate response.
  62. """
  63. log.info(f"Using MinerU Local API at {self.api_url}")
  64. filename = os.path.basename(self.file_path)
  65. # Build form data for Local API
  66. form_data = {
  67. **self.params,
  68. "return_md": "true",
  69. }
  70. # Page ranges (Local API uses start_page_id and end_page_id)
  71. if self.page_ranges:
  72. # For simplicity, if page_ranges is specified, log a warning
  73. # Full page range parsing would require parsing the string
  74. log.warning(
  75. f"Page ranges '{self.page_ranges}' specified but Local API uses different format. "
  76. "Consider using start_page_id/end_page_id parameters if needed."
  77. )
  78. try:
  79. with open(self.file_path, "rb") as f:
  80. files = {"files": (filename, f, "application/octet-stream")}
  81. log.info(f"Sending file to MinerU Local API: {filename}")
  82. log.debug(f"Local API parameters: {form_data}")
  83. response = requests.post(
  84. f"{self.api_url}/file_parse",
  85. data=form_data,
  86. files=files,
  87. timeout=300, # 5 minute timeout for large documents
  88. )
  89. response.raise_for_status()
  90. except FileNotFoundError:
  91. raise HTTPException(
  92. status.HTTP_404_NOT_FOUND, detail=f"File not found: {self.file_path}"
  93. )
  94. except requests.Timeout:
  95. raise HTTPException(
  96. status.HTTP_504_GATEWAY_TIMEOUT,
  97. detail="MinerU Local API request timed out",
  98. )
  99. except requests.HTTPError as e:
  100. error_detail = f"MinerU Local API request failed: {e}"
  101. if e.response is not None:
  102. try:
  103. error_data = e.response.json()
  104. error_detail += f" - {error_data}"
  105. except:
  106. error_detail += f" - {e.response.text}"
  107. raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=error_detail)
  108. except Exception as e:
  109. raise HTTPException(
  110. status.HTTP_500_INTERNAL_SERVER_ERROR,
  111. detail=f"Error calling MinerU Local API: {str(e)}",
  112. )
  113. # Parse response
  114. try:
  115. result = response.json()
  116. except ValueError as e:
  117. raise HTTPException(
  118. status.HTTP_502_BAD_GATEWAY,
  119. detail=f"Invalid JSON response from MinerU Local API: {e}",
  120. )
  121. # Extract markdown content from response
  122. if "results" not in result:
  123. raise HTTPException(
  124. status.HTTP_502_BAD_GATEWAY,
  125. detail="MinerU Local API response missing 'results' field",
  126. )
  127. results = result["results"]
  128. if not results:
  129. raise HTTPException(
  130. status.HTTP_400_BAD_REQUEST,
  131. detail="MinerU returned empty results",
  132. )
  133. # Get the first (and typically only) result
  134. file_result = list(results.values())[0]
  135. markdown_content = file_result.get("md_content", "")
  136. if not markdown_content:
  137. raise HTTPException(
  138. status.HTTP_400_BAD_REQUEST,
  139. detail="MinerU returned empty markdown content",
  140. )
  141. log.info(f"Successfully parsed document with MinerU Local API: {filename}")
  142. # Create metadata
  143. metadata = {
  144. "source": filename,
  145. "api_mode": "local",
  146. "backend": result.get("backend", "unknown"),
  147. "version": result.get("version", "unknown"),
  148. }
  149. return [Document(page_content=markdown_content, metadata=metadata)]
  150. def _load_cloud_api(self) -> List[Document]:
  151. """
  152. Load document using Cloud API (asynchronous).
  153. Uses batch upload endpoint to avoid need for public file URLs.
  154. """
  155. log.info(f"Using MinerU Cloud API at {self.api_url}")
  156. filename = os.path.basename(self.file_path)
  157. # Step 1: Request presigned upload URL
  158. batch_id, upload_url = self._request_upload_url(filename)
  159. # Step 2: Upload file to presigned URL
  160. self._upload_to_presigned_url(upload_url)
  161. # Step 3: Poll for results
  162. result = self._poll_batch_status(batch_id, filename)
  163. # Step 4: Download and extract markdown from ZIP
  164. markdown_content = self._download_and_extract_zip(
  165. result["full_zip_url"], filename
  166. )
  167. log.info(f"Successfully parsed document with MinerU Cloud API: {filename}")
  168. # Create metadata
  169. metadata = {
  170. "source": filename,
  171. "api_mode": "cloud",
  172. "batch_id": batch_id,
  173. }
  174. return [Document(page_content=markdown_content, metadata=metadata)]
  175. def _request_upload_url(self, filename: str) -> tuple:
  176. """
  177. Request presigned upload URL from Cloud API.
  178. Returns (batch_id, upload_url).
  179. """
  180. headers = {
  181. "Authorization": f"Bearer {self.api_key}",
  182. "Content-Type": "application/json",
  183. }
  184. # Build request body
  185. request_body = {
  186. **self.params,
  187. "files": [
  188. {
  189. "name": filename,
  190. "is_ocr": self.enable_ocr,
  191. }
  192. ],
  193. }
  194. # Add page ranges if specified
  195. if self.page_ranges:
  196. request_body["files"][0]["page_ranges"] = self.page_ranges
  197. log.info(f"Requesting upload URL for: {filename}")
  198. log.debug(f"Cloud API request body: {request_body}")
  199. try:
  200. response = requests.post(
  201. f"{self.api_url}/file-urls/batch",
  202. headers=headers,
  203. json=request_body,
  204. timeout=30,
  205. )
  206. response.raise_for_status()
  207. except requests.HTTPError as e:
  208. error_detail = f"Failed to request upload URL: {e}"
  209. if e.response is not None:
  210. try:
  211. error_data = e.response.json()
  212. error_detail += f" - {error_data.get('msg', error_data)}"
  213. except:
  214. error_detail += f" - {e.response.text}"
  215. raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=error_detail)
  216. except Exception as e:
  217. raise HTTPException(
  218. status.HTTP_500_INTERNAL_SERVER_ERROR,
  219. detail=f"Error requesting upload URL: {str(e)}",
  220. )
  221. try:
  222. result = response.json()
  223. except ValueError as e:
  224. raise HTTPException(
  225. status.HTTP_502_BAD_GATEWAY,
  226. detail=f"Invalid JSON response: {e}",
  227. )
  228. # Check for API error response
  229. if result.get("code") != 0:
  230. raise HTTPException(
  231. status.HTTP_400_BAD_REQUEST,
  232. detail=f"MinerU Cloud API error: {result.get('msg', 'Unknown error')}",
  233. )
  234. data = result.get("data", {})
  235. batch_id = data.get("batch_id")
  236. file_urls = data.get("file_urls", [])
  237. if not batch_id or not file_urls:
  238. raise HTTPException(
  239. status.HTTP_502_BAD_GATEWAY,
  240. detail="MinerU Cloud API response missing batch_id or file_urls",
  241. )
  242. upload_url = file_urls[0]
  243. log.info(f"Received upload URL for batch: {batch_id}")
  244. return batch_id, upload_url
  245. def _upload_to_presigned_url(self, upload_url: str) -> None:
  246. """
  247. Upload file to presigned URL (no authentication needed).
  248. """
  249. log.info(f"Uploading file to presigned URL")
  250. try:
  251. with open(self.file_path, "rb") as f:
  252. response = requests.put(
  253. upload_url,
  254. data=f,
  255. timeout=300, # 5 minute timeout for large files
  256. )
  257. response.raise_for_status()
  258. except FileNotFoundError:
  259. raise HTTPException(
  260. status.HTTP_404_NOT_FOUND, detail=f"File not found: {self.file_path}"
  261. )
  262. except requests.Timeout:
  263. raise HTTPException(
  264. status.HTTP_504_GATEWAY_TIMEOUT,
  265. detail="File upload to presigned URL timed out",
  266. )
  267. except requests.HTTPError as e:
  268. raise HTTPException(
  269. status.HTTP_400_BAD_REQUEST,
  270. detail=f"Failed to upload file to presigned URL: {e}",
  271. )
  272. except Exception as e:
  273. raise HTTPException(
  274. status.HTTP_500_INTERNAL_SERVER_ERROR,
  275. detail=f"Error uploading file: {str(e)}",
  276. )
  277. log.info("File uploaded successfully")
  278. def _poll_batch_status(self, batch_id: str, filename: str) -> dict:
  279. """
  280. Poll batch status until completion.
  281. Returns the result dict for the file.
  282. """
  283. headers = {
  284. "Authorization": f"Bearer {self.api_key}",
  285. }
  286. max_iterations = 300 # 10 minutes max (2 seconds per iteration)
  287. poll_interval = 2 # seconds
  288. log.info(f"Polling batch status: {batch_id}")
  289. for iteration in range(max_iterations):
  290. try:
  291. response = requests.get(
  292. f"{self.api_url}/extract-results/batch/{batch_id}",
  293. headers=headers,
  294. timeout=30,
  295. )
  296. response.raise_for_status()
  297. except requests.HTTPError as e:
  298. error_detail = f"Failed to poll batch status: {e}"
  299. if e.response is not None:
  300. try:
  301. error_data = e.response.json()
  302. error_detail += f" - {error_data.get('msg', error_data)}"
  303. except:
  304. error_detail += f" - {e.response.text}"
  305. raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=error_detail)
  306. except Exception as e:
  307. raise HTTPException(
  308. status.HTTP_500_INTERNAL_SERVER_ERROR,
  309. detail=f"Error polling batch status: {str(e)}",
  310. )
  311. try:
  312. result = response.json()
  313. except ValueError as e:
  314. raise HTTPException(
  315. status.HTTP_502_BAD_GATEWAY,
  316. detail=f"Invalid JSON response while polling: {e}",
  317. )
  318. # Check for API error response
  319. if result.get("code") != 0:
  320. raise HTTPException(
  321. status.HTTP_400_BAD_REQUEST,
  322. detail=f"MinerU Cloud API error: {result.get('msg', 'Unknown error')}",
  323. )
  324. data = result.get("data", {})
  325. extract_result = data.get("extract_result", [])
  326. # Find our file in the batch results
  327. file_result = None
  328. for item in extract_result:
  329. if item.get("file_name") == filename:
  330. file_result = item
  331. break
  332. if not file_result:
  333. raise HTTPException(
  334. status.HTTP_502_BAD_GATEWAY,
  335. detail=f"File {filename} not found in batch results",
  336. )
  337. state = file_result.get("state")
  338. if state == "done":
  339. log.info(f"Processing complete for {filename}")
  340. return file_result
  341. elif state == "failed":
  342. error_msg = file_result.get("err_msg", "Unknown error")
  343. raise HTTPException(
  344. status.HTTP_400_BAD_REQUEST,
  345. detail=f"MinerU processing failed: {error_msg}",
  346. )
  347. elif state in ["waiting-file", "pending", "running", "converting"]:
  348. # Still processing
  349. if iteration % 10 == 0: # Log every 20 seconds
  350. log.info(
  351. f"Processing status: {state} (iteration {iteration + 1}/{max_iterations})"
  352. )
  353. time.sleep(poll_interval)
  354. else:
  355. log.warning(f"Unknown state: {state}")
  356. time.sleep(poll_interval)
  357. # Timeout
  358. raise HTTPException(
  359. status.HTTP_504_GATEWAY_TIMEOUT,
  360. detail="MinerU processing timed out after 10 minutes",
  361. )
  362. def _download_and_extract_zip(self, zip_url: str, filename: str) -> str:
  363. """
  364. Download ZIP file from CDN and extract markdown content.
  365. Returns the markdown content as a string.
  366. """
  367. log.info(f"Downloading results from: {zip_url}")
  368. try:
  369. response = requests.get(zip_url, timeout=60)
  370. response.raise_for_status()
  371. except requests.HTTPError as e:
  372. raise HTTPException(
  373. status.HTTP_400_BAD_REQUEST,
  374. detail=f"Failed to download results ZIP: {e}",
  375. )
  376. except Exception as e:
  377. raise HTTPException(
  378. status.HTTP_500_INTERNAL_SERVER_ERROR,
  379. detail=f"Error downloading results: {str(e)}",
  380. )
  381. # Save ZIP to temporary file and extract
  382. try:
  383. with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tmp_zip:
  384. tmp_zip.write(response.content)
  385. tmp_zip_path = tmp_zip.name
  386. with tempfile.TemporaryDirectory() as tmp_dir:
  387. # Extract ZIP
  388. with zipfile.ZipFile(tmp_zip_path, "r") as zip_ref:
  389. zip_ref.extractall(tmp_dir)
  390. # Find markdown file - search recursively for any .md file
  391. markdown_content = None
  392. found_md_path = None
  393. # First, list all files in the ZIP for debugging
  394. all_files = []
  395. for root, dirs, files in os.walk(tmp_dir):
  396. for file in files:
  397. full_path = os.path.join(root, file)
  398. all_files.append(full_path)
  399. # Look for any .md file
  400. if file.endswith(".md"):
  401. found_md_path = full_path
  402. log.info(f"Found markdown file at: {full_path}")
  403. try:
  404. with open(full_path, "r", encoding="utf-8") as f:
  405. markdown_content = f.read()
  406. if (
  407. markdown_content
  408. ): # Use the first non-empty markdown file
  409. break
  410. except Exception as e:
  411. log.warning(f"Failed to read {full_path}: {e}")
  412. if markdown_content:
  413. break
  414. if markdown_content is None:
  415. log.error(f"Available files in ZIP: {all_files}")
  416. # Try to provide more helpful error message
  417. md_files = [f for f in all_files if f.endswith(".md")]
  418. if md_files:
  419. error_msg = (
  420. f"Found .md files but couldn't read them: {md_files}"
  421. )
  422. else:
  423. error_msg = (
  424. f"No .md files found in ZIP. Available files: {all_files}"
  425. )
  426. raise HTTPException(
  427. status.HTTP_502_BAD_GATEWAY,
  428. detail=error_msg,
  429. )
  430. # Clean up temporary ZIP file
  431. os.unlink(tmp_zip_path)
  432. except zipfile.BadZipFile as e:
  433. raise HTTPException(
  434. status.HTTP_502_BAD_GATEWAY,
  435. detail=f"Invalid ZIP file received: {e}",
  436. )
  437. except Exception as e:
  438. raise HTTPException(
  439. status.HTTP_500_INTERNAL_SERVER_ERROR,
  440. detail=f"Error extracting ZIP: {str(e)}",
  441. )
  442. if not markdown_content:
  443. raise HTTPException(
  444. status.HTTP_400_BAD_REQUEST,
  445. detail="Extracted markdown content is empty",
  446. )
  447. log.info(
  448. f"Successfully extracted markdown content ({len(markdown_content)} characters)"
  449. )
  450. return markdown_content