Bladeren bron

Run hybrid_search in parallel

Phlogi 1 maand geleden
bovenliggende
commit
9c64310db5
1 gewijzigde bestanden met toevoegingen van 30 en 20 verwijderingen
  1. 30 20
      backend/open_webui/retrieval/utils.py

+ 30 - 20
backend/open_webui/retrieval/utils.py

@@ -4,6 +4,7 @@ from typing import Optional, Union
 
 import requests
 import hashlib
+from concurrent.futures import ThreadPoolExecutor
 
 from huggingface_hub import snapshot_download
 from langchain.retrievers import ContextualCompressionRetriever, EnsembleRetriever
@@ -298,30 +299,39 @@ def query_collection_with_hybrid_search(
             log.exception(f"Failed to fetch collection {collection_name}: {e}")
             collection_results[collection_name] = None
 
-    for collection_name in collection_names:
+    log.info(f"Starting hybrid search for {len(queries)} queries in {len(collection_names)} collections...")
+    def process_query(collection_name, query):
         try:
-            for query in queries:
-                result = query_doc_with_hybrid_search(
-                    collection_name=collection_name,
-                    collection_result=collection_results[collection_name],
-                    query=query,
-                    embedding_function=embedding_function,
-                    k=k,
-                    reranking_function=reranking_function,
-                    k_reranker=k_reranker,
-                    r=r,
-                )
-                results.append(result)
-        except Exception as e:
-            log.exception(
-                "Error when querying the collection with " f"hybrid_search: {e}"
+            result = query_doc_with_hybrid_search(
+                collection_name=collection_name,
+                collection_result=collection_results[collection_name],
+                query=query,
+                embedding_function=embedding_function,
+                k=k,
+                reranking_function=reranking_function,
+                k_reranker=k_reranker,
+                r=r,
             )
+            return result, None
+        except Exception as e:
+            log.exception(f"Error when querying the collection with hybrid_search: {e}")
+            return None, e
+
+    tasks = [(collection_name, query) for collection_name in collection_names for query in queries]
+
+    with ThreadPoolExecutor() as executor:
+        future_results = [executor.submit(process_query, cn, q) for cn, q in tasks]
+        task_results = [future.result() for future in future_results]
+
+    for result, err in task_results:
+        if err is not None:
             error = True
+        elif result is not None:
+            results.append(result)
 
-    if error:
-        raise Exception(
-            "Hybrid search failed for all collections. Using Non hybrid search as fallback."
-        )
+    if error and not results:
+        raise Exception("Hybrid search failed for all collections. Using Non-hybrid search as fallback.")
+        
     return merge_and_sort_query_results(results, k=k)