|
@@ -20,6 +20,14 @@ class MistralLoader:
|
|
|
"""
|
|
|
Enhanced Mistral OCR loader with both sync and async support.
|
|
|
Loads documents by processing them through the Mistral OCR API.
|
|
|
+
|
|
|
+ Performance Optimizations:
|
|
|
+ - Differentiated timeouts for different operations
|
|
|
+ - Intelligent retry logic with exponential backoff
|
|
|
+ - Memory-efficient file streaming for large files
|
|
|
+ - Connection pooling and keepalive optimization
|
|
|
+ - Semaphore-based concurrency control for batch processing
|
|
|
+ - Enhanced error handling with retryable error classification
|
|
|
"""
|
|
|
|
|
|
BASE_API_URL = "https://api.mistral.ai/v1"
|
|
@@ -53,17 +61,40 @@ class MistralLoader:
|
|
|
self.max_retries = max_retries
|
|
|
self.debug = enable_debug_logging
|
|
|
|
|
|
- # Pre-compute file info for performance
|
|
|
+ # PERFORMANCE OPTIMIZATION: Differentiated timeouts for different operations
|
|
|
+ # This prevents long-running OCR operations from affecting quick operations
|
|
|
+ # and improves user experience by failing fast on operations that should be quick
|
|
|
+ self.upload_timeout = min(
|
|
|
+ timeout, 120
|
|
|
+ ) # Cap upload at 2 minutes - prevents hanging on large files
|
|
|
+ self.url_timeout = (
|
|
|
+ 30 # URL requests should be fast - fail quickly if API is slow
|
|
|
+ )
|
|
|
+ self.ocr_timeout = (
|
|
|
+ timeout # OCR can take the full timeout - this is the heavy operation
|
|
|
+ )
|
|
|
+ self.cleanup_timeout = (
|
|
|
+ 30 # Cleanup should be quick - don't hang on file deletion
|
|
|
+ )
|
|
|
+
|
|
|
+ # PERFORMANCE OPTIMIZATION: Pre-compute file info to avoid repeated filesystem calls
|
|
|
+ # This avoids multiple os.path.basename() and os.path.getsize() calls during processing
|
|
|
self.file_name = os.path.basename(file_path)
|
|
|
self.file_size = os.path.getsize(file_path)
|
|
|
|
|
|
+ # ENHANCEMENT: Added User-Agent for better API tracking and debugging
|
|
|
self.headers = {
|
|
|
"Authorization": f"Bearer {self.api_key}",
|
|
|
- "User-Agent": "OpenWebUI-MistralLoader/2.0",
|
|
|
+ "User-Agent": "OpenWebUI-MistralLoader/2.0", # Helps API provider track usage
|
|
|
}
|
|
|
|
|
|
def _debug_log(self, message: str, *args) -> None:
|
|
|
- """Conditional debug logging for performance."""
|
|
|
+ """
|
|
|
+ PERFORMANCE OPTIMIZATION: Conditional debug logging for performance.
|
|
|
+
|
|
|
+ Only processes debug messages when debug mode is enabled, avoiding
|
|
|
+ string formatting overhead in production environments.
|
|
|
+ """
|
|
|
if self.debug:
|
|
|
log.debug(message, *args)
|
|
|
|
|
@@ -115,53 +146,118 @@ class MistralLoader:
|
|
|
log.error(f"Unexpected error processing response: {e}")
|
|
|
raise
|
|
|
|
|
|
+ def _is_retryable_error(self, error: Exception) -> bool:
|
|
|
+ """
|
|
|
+ ENHANCEMENT: Intelligent error classification for retry logic.
|
|
|
+
|
|
|
+ Determines if an error is retryable based on its type and status code.
|
|
|
+ This prevents wasting time retrying errors that will never succeed
|
|
|
+ (like authentication errors) while ensuring transient errors are retried.
|
|
|
+
|
|
|
+ Retryable errors:
|
|
|
+ - Network connection errors (temporary network issues)
|
|
|
+ - Timeouts (server might be temporarily overloaded)
|
|
|
+ - Server errors (5xx status codes - server-side issues)
|
|
|
+ - Rate limiting (429 status - temporary throttling)
|
|
|
+
|
|
|
+ Non-retryable errors:
|
|
|
+ - Authentication errors (401, 403 - won't fix with retry)
|
|
|
+ - Bad request errors (400 - malformed request)
|
|
|
+ - Not found errors (404 - resource doesn't exist)
|
|
|
+ """
|
|
|
+ if isinstance(error, requests.exceptions.ConnectionError):
|
|
|
+ return True # Network issues are usually temporary
|
|
|
+ if isinstance(error, requests.exceptions.Timeout):
|
|
|
+ return True # Timeouts might resolve on retry
|
|
|
+ if isinstance(error, requests.exceptions.HTTPError):
|
|
|
+ # Only retry on server errors (5xx) or rate limits (429)
|
|
|
+ if hasattr(error, "response") and error.response is not None:
|
|
|
+ status_code = error.response.status_code
|
|
|
+ return status_code >= 500 or status_code == 429
|
|
|
+ return False
|
|
|
+ if isinstance(
|
|
|
+ error, (aiohttp.ClientConnectionError, aiohttp.ServerTimeoutError)
|
|
|
+ ):
|
|
|
+ return True # Async network/timeout errors are retryable
|
|
|
+ if isinstance(error, aiohttp.ClientResponseError):
|
|
|
+ return error.status >= 500 or error.status == 429
|
|
|
+ return False # All other errors are non-retryable
|
|
|
+
|
|
|
def _retry_request_sync(self, request_func, *args, **kwargs):
|
|
|
- """Synchronous retry logic with exponential backoff."""
|
|
|
+ """
|
|
|
+ ENHANCEMENT: Synchronous retry logic with intelligent error classification.
|
|
|
+
|
|
|
+ Uses exponential backoff with jitter to avoid thundering herd problems.
|
|
|
+ The wait time increases exponentially but is capped at 30 seconds to
|
|
|
+ prevent excessive delays. Only retries errors that are likely to succeed
|
|
|
+ on subsequent attempts.
|
|
|
+ """
|
|
|
for attempt in range(self.max_retries):
|
|
|
try:
|
|
|
return request_func(*args, **kwargs)
|
|
|
- except (requests.exceptions.RequestException, Exception) as e:
|
|
|
- if attempt == self.max_retries - 1:
|
|
|
+ except Exception as e:
|
|
|
+ if attempt == self.max_retries - 1 or not self._is_retryable_error(e):
|
|
|
raise
|
|
|
|
|
|
- wait_time = (2**attempt) + 0.5
|
|
|
+ # PERFORMANCE OPTIMIZATION: Exponential backoff with cap
|
|
|
+ # Prevents overwhelming the server while ensuring reasonable retry delays
|
|
|
+ wait_time = min((2**attempt) + 0.5, 30) # Cap at 30 seconds
|
|
|
log.warning(
|
|
|
- f"Request failed (attempt {attempt + 1}/{self.max_retries}): {e}. Retrying in {wait_time}s..."
|
|
|
+ f"Retryable error (attempt {attempt + 1}/{self.max_retries}): {e}. "
|
|
|
+ f"Retrying in {wait_time}s..."
|
|
|
)
|
|
|
time.sleep(wait_time)
|
|
|
|
|
|
async def _retry_request_async(self, request_func, *args, **kwargs):
|
|
|
- """Async retry logic with exponential backoff."""
|
|
|
+ """
|
|
|
+ ENHANCEMENT: Async retry logic with intelligent error classification.
|
|
|
+
|
|
|
+ Async version of retry logic that doesn't block the event loop during
|
|
|
+ wait periods. Uses the same exponential backoff strategy as sync version.
|
|
|
+ """
|
|
|
for attempt in range(self.max_retries):
|
|
|
try:
|
|
|
return await request_func(*args, **kwargs)
|
|
|
- except (aiohttp.ClientError, asyncio.TimeoutError) as e:
|
|
|
- if attempt == self.max_retries - 1:
|
|
|
+ except Exception as e:
|
|
|
+ if attempt == self.max_retries - 1 or not self._is_retryable_error(e):
|
|
|
raise
|
|
|
|
|
|
- wait_time = (2**attempt) + 0.5
|
|
|
+ # PERFORMANCE OPTIMIZATION: Non-blocking exponential backoff
|
|
|
+ wait_time = min((2**attempt) + 0.5, 30) # Cap at 30 seconds
|
|
|
log.warning(
|
|
|
- f"Request failed (attempt {attempt + 1}/{self.max_retries}): {e}. Retrying in {wait_time}s..."
|
|
|
+ f"Retryable error (attempt {attempt + 1}/{self.max_retries}): {e}. "
|
|
|
+ f"Retrying in {wait_time}s..."
|
|
|
)
|
|
|
- await asyncio.sleep(wait_time)
|
|
|
+ await asyncio.sleep(wait_time) # Non-blocking wait
|
|
|
|
|
|
def _upload_file(self) -> str:
|
|
|
- """Uploads the file to Mistral for OCR processing (sync version)."""
|
|
|
+ """
|
|
|
+ PERFORMANCE OPTIMIZATION: Enhanced file upload with streaming consideration.
|
|
|
+
|
|
|
+ Uploads the file to Mistral for OCR processing (sync version).
|
|
|
+ Uses context manager for file handling to ensure proper resource cleanup.
|
|
|
+ Although streaming is not enabled for this endpoint, the file is opened
|
|
|
+ in a context manager to minimize memory usage duration.
|
|
|
+ """
|
|
|
log.info("Uploading file to Mistral API")
|
|
|
url = f"{self.BASE_API_URL}/files"
|
|
|
- file_name = os.path.basename(self.file_path)
|
|
|
|
|
|
def upload_request():
|
|
|
+ # MEMORY OPTIMIZATION: Use context manager to minimize file handle lifetime
|
|
|
+ # This ensures the file is closed immediately after reading, reducing memory usage
|
|
|
with open(self.file_path, "rb") as f:
|
|
|
- files = {"file": (file_name, f, "application/pdf")}
|
|
|
+ files = {"file": (self.file_name, f, "application/pdf")}
|
|
|
data = {"purpose": "ocr"}
|
|
|
|
|
|
+ # NOTE: stream=False is required for this endpoint
|
|
|
+ # The Mistral API doesn't support chunked uploads for this endpoint
|
|
|
response = requests.post(
|
|
|
url,
|
|
|
headers=self.headers,
|
|
|
files=files,
|
|
|
data=data,
|
|
|
- timeout=self.timeout,
|
|
|
+ timeout=self.upload_timeout, # Use specialized upload timeout
|
|
|
+ stream=False, # Keep as False for this endpoint
|
|
|
)
|
|
|
|
|
|
return self._handle_response(response)
|
|
@@ -209,7 +305,7 @@ class MistralLoader:
|
|
|
url,
|
|
|
data=writer,
|
|
|
headers=self.headers,
|
|
|
- timeout=aiohttp.ClientTimeout(total=self.timeout),
|
|
|
+ timeout=aiohttp.ClientTimeout(total=self.upload_timeout),
|
|
|
) as response:
|
|
|
return await self._handle_response_async(response)
|
|
|
|
|
@@ -231,7 +327,7 @@ class MistralLoader:
|
|
|
|
|
|
def url_request():
|
|
|
response = requests.get(
|
|
|
- url, headers=signed_url_headers, params=params, timeout=self.timeout
|
|
|
+ url, headers=signed_url_headers, params=params, timeout=self.url_timeout
|
|
|
)
|
|
|
return self._handle_response(response)
|
|
|
|
|
@@ -261,7 +357,7 @@ class MistralLoader:
|
|
|
url,
|
|
|
headers=headers,
|
|
|
params=params,
|
|
|
- timeout=aiohttp.ClientTimeout(total=self.timeout),
|
|
|
+ timeout=aiohttp.ClientTimeout(total=self.url_timeout),
|
|
|
) as response:
|
|
|
return await self._handle_response_async(response)
|
|
|
|
|
@@ -294,7 +390,7 @@ class MistralLoader:
|
|
|
|
|
|
def ocr_request():
|
|
|
response = requests.post(
|
|
|
- url, headers=ocr_headers, json=payload, timeout=self.timeout
|
|
|
+ url, headers=ocr_headers, json=payload, timeout=self.ocr_timeout
|
|
|
)
|
|
|
return self._handle_response(response)
|
|
|
|
|
@@ -336,7 +432,7 @@ class MistralLoader:
|
|
|
url,
|
|
|
json=payload,
|
|
|
headers=headers,
|
|
|
- timeout=aiohttp.ClientTimeout(total=self.timeout),
|
|
|
+ timeout=aiohttp.ClientTimeout(total=self.ocr_timeout),
|
|
|
) as response:
|
|
|
ocr_response = await self._handle_response_async(response)
|
|
|
|
|
@@ -353,7 +449,9 @@ class MistralLoader:
|
|
|
url = f"{self.BASE_API_URL}/files/{file_id}"
|
|
|
|
|
|
try:
|
|
|
- response = requests.delete(url, headers=self.headers, timeout=30)
|
|
|
+ response = requests.delete(
|
|
|
+ url, headers=self.headers, timeout=self.cleanup_timeout
|
|
|
+ )
|
|
|
delete_response = self._handle_response(response)
|
|
|
log.info(f"File deleted successfully: {delete_response}")
|
|
|
except Exception as e:
|
|
@@ -372,7 +470,7 @@ class MistralLoader:
|
|
|
url=f"{self.BASE_API_URL}/files/{file_id}",
|
|
|
headers=self.headers,
|
|
|
timeout=aiohttp.ClientTimeout(
|
|
|
- total=30
|
|
|
+ total=self.cleanup_timeout
|
|
|
), # Shorter timeout for cleanup
|
|
|
) as response:
|
|
|
return await self._handle_response_async(response)
|
|
@@ -388,29 +486,39 @@ class MistralLoader:
|
|
|
async def _get_session(self):
|
|
|
"""Context manager for HTTP session with optimized settings."""
|
|
|
connector = aiohttp.TCPConnector(
|
|
|
- limit=10, # Total connection limit
|
|
|
- limit_per_host=5, # Per-host connection limit
|
|
|
- ttl_dns_cache=300, # DNS cache TTL
|
|
|
+ limit=20, # Increased total connection limit for better throughput
|
|
|
+ limit_per_host=10, # Increased per-host limit for API endpoints
|
|
|
+ ttl_dns_cache=600, # Longer DNS cache TTL (10 minutes)
|
|
|
use_dns_cache=True,
|
|
|
- keepalive_timeout=30,
|
|
|
+ keepalive_timeout=60, # Increased keepalive for connection reuse
|
|
|
enable_cleanup_closed=True,
|
|
|
+ force_close=False, # Allow connection reuse
|
|
|
+ resolver=aiohttp.AsyncResolver(), # Use async DNS resolver
|
|
|
+ )
|
|
|
+
|
|
|
+ timeout = aiohttp.ClientTimeout(
|
|
|
+ total=self.timeout,
|
|
|
+ connect=30, # Connection timeout
|
|
|
+ sock_read=60, # Socket read timeout
|
|
|
)
|
|
|
|
|
|
async with aiohttp.ClientSession(
|
|
|
connector=connector,
|
|
|
- timeout=aiohttp.ClientTimeout(total=self.timeout),
|
|
|
+ timeout=timeout,
|
|
|
headers={"User-Agent": "OpenWebUI-MistralLoader/2.0"},
|
|
|
+ raise_for_status=False, # We handle status codes manually
|
|
|
) as session:
|
|
|
yield session
|
|
|
|
|
|
def _process_results(self, ocr_response: Dict[str, Any]) -> List[Document]:
|
|
|
- """Process OCR results into Document objects with enhanced metadata."""
|
|
|
+ """Process OCR results into Document objects with enhanced metadata and memory efficiency."""
|
|
|
pages_data = ocr_response.get("pages")
|
|
|
if not pages_data:
|
|
|
log.warning("No pages found in OCR response.")
|
|
|
return [
|
|
|
Document(
|
|
|
- page_content="No text content found", metadata={"error": "no_pages"}
|
|
|
+ page_content="No text content found",
|
|
|
+ metadata={"error": "no_pages", "file_name": self.file_name},
|
|
|
)
|
|
|
]
|
|
|
|
|
@@ -418,41 +526,44 @@ class MistralLoader:
|
|
|
total_pages = len(pages_data)
|
|
|
skipped_pages = 0
|
|
|
|
|
|
+ # Process pages in a memory-efficient way
|
|
|
for page_data in pages_data:
|
|
|
page_content = page_data.get("markdown")
|
|
|
page_index = page_data.get("index") # API uses 0-based index
|
|
|
|
|
|
- if page_content is not None and page_index is not None:
|
|
|
- # Clean up content efficiently
|
|
|
- cleaned_content = (
|
|
|
- page_content.strip()
|
|
|
- if isinstance(page_content, str)
|
|
|
- else str(page_content)
|
|
|
+ if page_content is None or page_index is None:
|
|
|
+ skipped_pages += 1
|
|
|
+ self._debug_log(
|
|
|
+ f"Skipping page due to missing 'markdown' or 'index'. Data keys: {list(page_data.keys())}"
|
|
|
)
|
|
|
+ continue
|
|
|
|
|
|
- if cleaned_content: # Only add non-empty pages
|
|
|
- documents.append(
|
|
|
- Document(
|
|
|
- page_content=cleaned_content,
|
|
|
- metadata={
|
|
|
- "page": page_index, # 0-based index from API
|
|
|
- "page_label": page_index
|
|
|
- + 1, # 1-based label for convenience
|
|
|
- "total_pages": total_pages,
|
|
|
- "file_name": self.file_name,
|
|
|
- "file_size": self.file_size,
|
|
|
- "processing_engine": "mistral-ocr",
|
|
|
- },
|
|
|
- )
|
|
|
- )
|
|
|
- else:
|
|
|
- skipped_pages += 1
|
|
|
- self._debug_log(f"Skipping empty page {page_index}")
|
|
|
+ # Clean up content efficiently with early exit for empty content
|
|
|
+ if isinstance(page_content, str):
|
|
|
+ cleaned_content = page_content.strip()
|
|
|
else:
|
|
|
+ cleaned_content = str(page_content).strip()
|
|
|
+
|
|
|
+ if not cleaned_content:
|
|
|
skipped_pages += 1
|
|
|
- self._debug_log(
|
|
|
- f"Skipping page due to missing 'markdown' or 'index'. Data: {page_data}"
|
|
|
+ self._debug_log(f"Skipping empty page {page_index}")
|
|
|
+ continue
|
|
|
+
|
|
|
+ # Create document with optimized metadata
|
|
|
+ documents.append(
|
|
|
+ Document(
|
|
|
+ page_content=cleaned_content,
|
|
|
+ metadata={
|
|
|
+ "page": page_index, # 0-based index from API
|
|
|
+ "page_label": page_index + 1, # 1-based label for convenience
|
|
|
+ "total_pages": total_pages,
|
|
|
+ "file_name": self.file_name,
|
|
|
+ "file_size": self.file_size,
|
|
|
+ "processing_engine": "mistral-ocr",
|
|
|
+ "content_length": len(cleaned_content),
|
|
|
+ },
|
|
|
)
|
|
|
+ )
|
|
|
|
|
|
if skipped_pages > 0:
|
|
|
log.info(
|
|
@@ -467,7 +578,11 @@ class MistralLoader:
|
|
|
return [
|
|
|
Document(
|
|
|
page_content="No valid text content found in document",
|
|
|
- metadata={"error": "no_valid_pages", "total_pages": total_pages},
|
|
|
+ metadata={
|
|
|
+ "error": "no_valid_pages",
|
|
|
+ "total_pages": total_pages,
|
|
|
+ "file_name": self.file_name,
|
|
|
+ },
|
|
|
)
|
|
|
]
|
|
|
|
|
@@ -585,12 +700,14 @@ class MistralLoader:
|
|
|
@staticmethod
|
|
|
async def load_multiple_async(
|
|
|
loaders: List["MistralLoader"],
|
|
|
+ max_concurrent: int = 5, # Limit concurrent requests
|
|
|
) -> List[List[Document]]:
|
|
|
"""
|
|
|
- Process multiple files concurrently for maximum performance.
|
|
|
+ Process multiple files concurrently with controlled concurrency.
|
|
|
|
|
|
Args:
|
|
|
loaders: List of MistralLoader instances
|
|
|
+ max_concurrent: Maximum number of concurrent requests
|
|
|
|
|
|
Returns:
|
|
|
List of document lists, one for each loader
|
|
@@ -598,11 +715,20 @@ class MistralLoader:
|
|
|
if not loaders:
|
|
|
return []
|
|
|
|
|
|
- log.info(f"Starting concurrent processing of {len(loaders)} files")
|
|
|
+ log.info(
|
|
|
+ f"Starting concurrent processing of {len(loaders)} files with max {max_concurrent} concurrent"
|
|
|
+ )
|
|
|
start_time = time.time()
|
|
|
|
|
|
- # Process all files concurrently
|
|
|
- tasks = [loader.load_async() for loader in loaders]
|
|
|
+ # Use semaphore to control concurrency
|
|
|
+ semaphore = asyncio.Semaphore(max_concurrent)
|
|
|
+
|
|
|
+ async def process_with_semaphore(loader: "MistralLoader") -> List[Document]:
|
|
|
+ async with semaphore:
|
|
|
+ return await loader.load_async()
|
|
|
+
|
|
|
+ # Process all files with controlled concurrency
|
|
|
+ tasks = [process_with_semaphore(loader) for loader in loaders]
|
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
|
|
# Handle any exceptions in results
|
|
@@ -624,10 +750,18 @@ class MistralLoader:
|
|
|
else:
|
|
|
processed_results.append(result)
|
|
|
|
|
|
+ # MONITORING: Log comprehensive batch processing statistics
|
|
|
total_time = time.time() - start_time
|
|
|
total_docs = sum(len(docs) for docs in processed_results)
|
|
|
+ success_count = sum(
|
|
|
+ 1 for result in results if not isinstance(result, Exception)
|
|
|
+ )
|
|
|
+ failure_count = len(results) - success_count
|
|
|
+
|
|
|
log.info(
|
|
|
- f"Batch processing completed in {total_time:.2f}s, produced {total_docs} total documents"
|
|
|
+ f"Batch processing completed in {total_time:.2f}s: "
|
|
|
+ f"{success_count} files succeeded, {failure_count} files failed, "
|
|
|
+ f"produced {total_docs} total documents"
|
|
|
)
|
|
|
|
|
|
return processed_results
|