| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541 |
- import os
- import time
- import requests
- import logging
- import tempfile
- import zipfile
- from typing import List, Optional
- from langchain_core.documents import Document
- from fastapi import HTTPException, status
- log = logging.getLogger(__name__)
- class MinerULoader:
- """
- MinerU document parser loader supporting both Cloud API and Local API modes.
- Cloud API: Uses MinerU managed service with async task-based processing
- Local API: Uses self-hosted MinerU API with synchronous processing
- """
- def __init__(
- self,
- file_path: str,
- api_mode: str = "local",
- api_url: str = "http://localhost:8000",
- api_key: str = "",
- params: dict = None,
- ):
- self.file_path = file_path
- self.api_mode = api_mode.lower()
- self.api_url = api_url.rstrip("/")
- self.api_key = api_key
- # Parse params dict with defaults
- params = params or {}
- self.enable_ocr = params.get("enable_ocr", False)
- self.enable_formula = params.get("enable_formula", True)
- self.enable_table = params.get("enable_table", True)
- self.language = params.get("language", "en")
- self.model_version = params.get("model_version", "pipeline")
- self.page_ranges = params.get("page_ranges", "")
- # Validate API mode
- if self.api_mode not in ["local", "cloud"]:
- raise ValueError(
- f"Invalid API mode: {self.api_mode}. Must be 'local' or 'cloud'"
- )
- # Validate Cloud API requirements
- if self.api_mode == "cloud" and not self.api_key:
- raise ValueError("API key is required for Cloud API mode")
- def load(self) -> List[Document]:
- """
- Main entry point for loading and parsing the document.
- Routes to Cloud or Local API based on api_mode.
- """
- try:
- if self.api_mode == "cloud":
- return self._load_cloud_api()
- else:
- return self._load_local_api()
- except Exception as e:
- log.error(f"Error loading document with MinerU: {e}")
- raise
- def _load_local_api(self) -> List[Document]:
- """
- Load document using Local API (synchronous).
- Posts file to /file_parse endpoint and gets immediate response.
- """
- log.info(f"Using MinerU Local API at {self.api_url}")
- filename = os.path.basename(self.file_path)
- # Build form data for Local API
- form_data = {
- "return_md": "true",
- "formula_enable": str(self.enable_formula).lower(),
- "table_enable": str(self.enable_table).lower(),
- }
- # Parse method based on OCR setting
- if self.enable_ocr:
- form_data["parse_method"] = "ocr"
- else:
- form_data["parse_method"] = "auto"
- # Language configuration (Local API uses lang_list array)
- if self.language:
- form_data["lang_list"] = self.language
- # Backend/model version (Local API uses "backend" parameter)
- if self.model_version == "vlm":
- form_data["backend"] = "vlm-vllm-engine"
- else:
- form_data["backend"] = "pipeline"
- # Page ranges (Local API uses start_page_id and end_page_id)
- if self.page_ranges:
- # For simplicity, if page_ranges is specified, log a warning
- # Full page range parsing would require parsing the string
- log.warning(
- f"Page ranges '{self.page_ranges}' specified but Local API uses different format. "
- "Consider using start_page_id/end_page_id parameters if needed."
- )
- try:
- with open(self.file_path, "rb") as f:
- files = {"files": (filename, f, "application/octet-stream")}
- log.info(f"Sending file to MinerU Local API: {filename}")
- log.debug(f"Local API parameters: {form_data}")
- response = requests.post(
- f"{self.api_url}/file_parse",
- data=form_data,
- files=files,
- timeout=300, # 5 minute timeout for large documents
- )
- response.raise_for_status()
- except FileNotFoundError:
- raise HTTPException(
- status.HTTP_404_NOT_FOUND, detail=f"File not found: {self.file_path}"
- )
- except requests.Timeout:
- raise HTTPException(
- status.HTTP_504_GATEWAY_TIMEOUT,
- detail="MinerU Local API request timed out",
- )
- except requests.HTTPError as e:
- error_detail = f"MinerU Local API request failed: {e}"
- if e.response is not None:
- try:
- error_data = e.response.json()
- error_detail += f" - {error_data}"
- except:
- error_detail += f" - {e.response.text}"
- raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=error_detail)
- except Exception as e:
- raise HTTPException(
- status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"Error calling MinerU Local API: {str(e)}",
- )
- # Parse response
- try:
- result = response.json()
- except ValueError as e:
- raise HTTPException(
- status.HTTP_502_BAD_GATEWAY,
- detail=f"Invalid JSON response from MinerU Local API: {e}",
- )
- # Extract markdown content from response
- if "results" not in result:
- raise HTTPException(
- status.HTTP_502_BAD_GATEWAY,
- detail="MinerU Local API response missing 'results' field",
- )
- results = result["results"]
- if not results:
- raise HTTPException(
- status.HTTP_400_BAD_REQUEST,
- detail="MinerU returned empty results",
- )
- # Get the first (and typically only) result
- file_result = list(results.values())[0]
- markdown_content = file_result.get("md_content", "")
- if not markdown_content:
- raise HTTPException(
- status.HTTP_400_BAD_REQUEST,
- detail="MinerU returned empty markdown content",
- )
- log.info(f"Successfully parsed document with MinerU Local API: {filename}")
- # Create metadata
- metadata = {
- "source": filename,
- "api_mode": "local",
- "backend": result.get("backend", "unknown"),
- "version": result.get("version", "unknown"),
- }
- return [Document(page_content=markdown_content, metadata=metadata)]
- def _load_cloud_api(self) -> List[Document]:
- """
- Load document using Cloud API (asynchronous).
- Uses batch upload endpoint to avoid need for public file URLs.
- """
- log.info(f"Using MinerU Cloud API at {self.api_url}")
- filename = os.path.basename(self.file_path)
- # Step 1: Request presigned upload URL
- batch_id, upload_url = self._request_upload_url(filename)
- # Step 2: Upload file to presigned URL
- self._upload_to_presigned_url(upload_url)
- # Step 3: Poll for results
- result = self._poll_batch_status(batch_id, filename)
- # Step 4: Download and extract markdown from ZIP
- markdown_content = self._download_and_extract_zip(
- result["full_zip_url"], filename
- )
- log.info(f"Successfully parsed document with MinerU Cloud API: {filename}")
- # Create metadata
- metadata = {
- "source": filename,
- "api_mode": "cloud",
- "batch_id": batch_id,
- }
- return [Document(page_content=markdown_content, metadata=metadata)]
- def _request_upload_url(self, filename: str) -> tuple:
- """
- Request presigned upload URL from Cloud API.
- Returns (batch_id, upload_url).
- """
- headers = {
- "Authorization": f"Bearer {self.api_key}",
- "Content-Type": "application/json",
- }
- # Build request body
- request_body = {
- "enable_formula": self.enable_formula,
- "enable_table": self.enable_table,
- "language": self.language,
- "model_version": self.model_version,
- "files": [
- {
- "name": filename,
- "is_ocr": self.enable_ocr,
- }
- ],
- }
- # Add page ranges if specified
- if self.page_ranges:
- request_body["files"][0]["page_ranges"] = self.page_ranges
- log.info(f"Requesting upload URL for: {filename}")
- log.debug(f"Cloud API request body: {request_body}")
- try:
- response = requests.post(
- f"{self.api_url}/file-urls/batch",
- headers=headers,
- json=request_body,
- timeout=30,
- )
- response.raise_for_status()
- except requests.HTTPError as e:
- error_detail = f"Failed to request upload URL: {e}"
- if e.response is not None:
- try:
- error_data = e.response.json()
- error_detail += f" - {error_data.get('msg', error_data)}"
- except:
- error_detail += f" - {e.response.text}"
- raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=error_detail)
- except Exception as e:
- raise HTTPException(
- status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"Error requesting upload URL: {str(e)}",
- )
- try:
- result = response.json()
- except ValueError as e:
- raise HTTPException(
- status.HTTP_502_BAD_GATEWAY,
- detail=f"Invalid JSON response: {e}",
- )
- # Check for API error response
- if result.get("code") != 0:
- raise HTTPException(
- status.HTTP_400_BAD_REQUEST,
- detail=f"MinerU Cloud API error: {result.get('msg', 'Unknown error')}",
- )
- data = result.get("data", {})
- batch_id = data.get("batch_id")
- file_urls = data.get("file_urls", [])
- if not batch_id or not file_urls:
- raise HTTPException(
- status.HTTP_502_BAD_GATEWAY,
- detail="MinerU Cloud API response missing batch_id or file_urls",
- )
- upload_url = file_urls[0]
- log.info(f"Received upload URL for batch: {batch_id}")
- return batch_id, upload_url
- def _upload_to_presigned_url(self, upload_url: str) -> None:
- """
- Upload file to presigned URL (no authentication needed).
- """
- log.info(f"Uploading file to presigned URL")
- try:
- with open(self.file_path, "rb") as f:
- response = requests.put(
- upload_url,
- data=f,
- timeout=300, # 5 minute timeout for large files
- )
- response.raise_for_status()
- except FileNotFoundError:
- raise HTTPException(
- status.HTTP_404_NOT_FOUND, detail=f"File not found: {self.file_path}"
- )
- except requests.Timeout:
- raise HTTPException(
- status.HTTP_504_GATEWAY_TIMEOUT,
- detail="File upload to presigned URL timed out",
- )
- except requests.HTTPError as e:
- raise HTTPException(
- status.HTTP_400_BAD_REQUEST,
- detail=f"Failed to upload file to presigned URL: {e}",
- )
- except Exception as e:
- raise HTTPException(
- status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"Error uploading file: {str(e)}",
- )
- log.info("File uploaded successfully")
- def _poll_batch_status(self, batch_id: str, filename: str) -> dict:
- """
- Poll batch status until completion.
- Returns the result dict for the file.
- """
- headers = {
- "Authorization": f"Bearer {self.api_key}",
- }
- max_iterations = 300 # 10 minutes max (2 seconds per iteration)
- poll_interval = 2 # seconds
- log.info(f"Polling batch status: {batch_id}")
- for iteration in range(max_iterations):
- try:
- response = requests.get(
- f"{self.api_url}/extract-results/batch/{batch_id}",
- headers=headers,
- timeout=30,
- )
- response.raise_for_status()
- except requests.HTTPError as e:
- error_detail = f"Failed to poll batch status: {e}"
- if e.response is not None:
- try:
- error_data = e.response.json()
- error_detail += f" - {error_data.get('msg', error_data)}"
- except:
- error_detail += f" - {e.response.text}"
- raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=error_detail)
- except Exception as e:
- raise HTTPException(
- status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"Error polling batch status: {str(e)}",
- )
- try:
- result = response.json()
- except ValueError as e:
- raise HTTPException(
- status.HTTP_502_BAD_GATEWAY,
- detail=f"Invalid JSON response while polling: {e}",
- )
- # Check for API error response
- if result.get("code") != 0:
- raise HTTPException(
- status.HTTP_400_BAD_REQUEST,
- detail=f"MinerU Cloud API error: {result.get('msg', 'Unknown error')}",
- )
- data = result.get("data", {})
- extract_result = data.get("extract_result", [])
- # Find our file in the batch results
- file_result = None
- for item in extract_result:
- if item.get("file_name") == filename:
- file_result = item
- break
- if not file_result:
- raise HTTPException(
- status.HTTP_502_BAD_GATEWAY,
- detail=f"File {filename} not found in batch results",
- )
- state = file_result.get("state")
- if state == "done":
- log.info(f"Processing complete for {filename}")
- return file_result
- elif state == "failed":
- error_msg = file_result.get("err_msg", "Unknown error")
- raise HTTPException(
- status.HTTP_400_BAD_REQUEST,
- detail=f"MinerU processing failed: {error_msg}",
- )
- elif state in ["waiting-file", "pending", "running", "converting"]:
- # Still processing
- if iteration % 10 == 0: # Log every 20 seconds
- log.info(
- f"Processing status: {state} (iteration {iteration + 1}/{max_iterations})"
- )
- time.sleep(poll_interval)
- else:
- log.warning(f"Unknown state: {state}")
- time.sleep(poll_interval)
- # Timeout
- raise HTTPException(
- status.HTTP_504_GATEWAY_TIMEOUT,
- detail="MinerU processing timed out after 10 minutes",
- )
- def _download_and_extract_zip(self, zip_url: str, filename: str) -> str:
- """
- Download ZIP file from CDN and extract markdown content.
- Returns the markdown content as a string.
- """
- log.info(f"Downloading results from: {zip_url}")
- try:
- response = requests.get(zip_url, timeout=60)
- response.raise_for_status()
- except requests.HTTPError as e:
- raise HTTPException(
- status.HTTP_400_BAD_REQUEST,
- detail=f"Failed to download results ZIP: {e}",
- )
- except Exception as e:
- raise HTTPException(
- status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"Error downloading results: {str(e)}",
- )
- # Save ZIP to temporary file and extract
- try:
- with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tmp_zip:
- tmp_zip.write(response.content)
- tmp_zip_path = tmp_zip.name
- with tempfile.TemporaryDirectory() as tmp_dir:
- # Extract ZIP
- with zipfile.ZipFile(tmp_zip_path, "r") as zip_ref:
- zip_ref.extractall(tmp_dir)
- # Find markdown file - search recursively for any .md file
- markdown_content = None
- found_md_path = None
- # First, list all files in the ZIP for debugging
- all_files = []
- for root, dirs, files in os.walk(tmp_dir):
- for file in files:
- full_path = os.path.join(root, file)
- all_files.append(full_path)
- # Look for any .md file
- if file.endswith(".md"):
- found_md_path = full_path
- log.info(f"Found markdown file at: {full_path}")
- try:
- with open(full_path, "r", encoding="utf-8") as f:
- markdown_content = f.read()
- if (
- markdown_content
- ): # Use the first non-empty markdown file
- break
- except Exception as e:
- log.warning(f"Failed to read {full_path}: {e}")
- if markdown_content:
- break
- if markdown_content is None:
- log.error(f"Available files in ZIP: {all_files}")
- # Try to provide more helpful error message
- md_files = [f for f in all_files if f.endswith(".md")]
- if md_files:
- error_msg = (
- f"Found .md files but couldn't read them: {md_files}"
- )
- else:
- error_msg = (
- f"No .md files found in ZIP. Available files: {all_files}"
- )
- raise HTTPException(
- status.HTTP_502_BAD_GATEWAY,
- detail=error_msg,
- )
- # Clean up temporary ZIP file
- os.unlink(tmp_zip_path)
- except zipfile.BadZipFile as e:
- raise HTTPException(
- status.HTTP_502_BAD_GATEWAY,
- detail=f"Invalid ZIP file received: {e}",
- )
- except Exception as e:
- raise HTTPException(
- status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"Error extracting ZIP: {str(e)}",
- )
- if not markdown_content:
- raise HTTPException(
- status.HTTP_400_BAD_REQUEST,
- detail="Extracted markdown content is empty",
- )
- log.info(
- f"Successfully extracted markdown content ({len(markdown_content)} characters)"
- )
- return markdown_content
|