|
@@ -1,30 +1,35 @@
|
|
|
import logging
|
|
|
import os
|
|
|
-import uuid
|
|
|
from typing import Optional, Union
|
|
|
|
|
|
-import asyncio
|
|
|
import requests
|
|
|
import hashlib
|
|
|
+from concurrent.futures import ThreadPoolExecutor
|
|
|
|
|
|
from huggingface_hub import snapshot_download
|
|
|
from langchain.retrievers import ContextualCompressionRetriever, EnsembleRetriever
|
|
|
from langchain_community.retrievers import BM25Retriever
|
|
|
from langchain_core.documents import Document
|
|
|
|
|
|
-
|
|
|
from open_webui.config import VECTOR_DB
|
|
|
from open_webui.retrieval.vector.connector import VECTOR_DB_CLIENT
|
|
|
-from open_webui.utils.misc import get_last_user_message, calculate_sha256_string
|
|
|
|
|
|
from open_webui.models.users import UserModel
|
|
|
from open_webui.models.files import Files
|
|
|
|
|
|
+from open_webui.retrieval.vector.main import GetResult
|
|
|
+
|
|
|
+
|
|
|
from open_webui.env import (
|
|
|
SRC_LOG_LEVELS,
|
|
|
OFFLINE_MODE,
|
|
|
ENABLE_FORWARD_USER_INFO_HEADERS,
|
|
|
)
|
|
|
+from open_webui.config import (
|
|
|
+ RAG_EMBEDDING_QUERY_PREFIX,
|
|
|
+ RAG_EMBEDDING_CONTENT_PREFIX,
|
|
|
+ RAG_EMBEDDING_PREFIX_FIELD_NAME,
|
|
|
+)
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
log.setLevel(SRC_LOG_LEVELS["RAG"])
|
|
@@ -49,7 +54,7 @@ class VectorSearchRetriever(BaseRetriever):
|
|
|
) -> list[Document]:
|
|
|
result = VECTOR_DB_CLIENT.search(
|
|
|
collection_name=self.collection_name,
|
|
|
- vectors=[self.embedding_function(query)],
|
|
|
+ vectors=[self.embedding_function(query, RAG_EMBEDDING_QUERY_PREFIX)],
|
|
|
limit=self.top_k,
|
|
|
)
|
|
|
|
|
@@ -102,18 +107,18 @@ def get_doc(collection_name: str, user: UserModel = None):
|
|
|
|
|
|
def query_doc_with_hybrid_search(
|
|
|
collection_name: str,
|
|
|
+ collection_result: GetResult,
|
|
|
query: str,
|
|
|
embedding_function,
|
|
|
k: int,
|
|
|
reranking_function,
|
|
|
+ k_reranker: int,
|
|
|
r: float,
|
|
|
) -> dict:
|
|
|
try:
|
|
|
- result = VECTOR_DB_CLIENT.get(collection_name=collection_name)
|
|
|
-
|
|
|
bm25_retriever = BM25Retriever.from_texts(
|
|
|
- texts=result.documents[0],
|
|
|
- metadatas=result.metadatas[0],
|
|
|
+ texts=collection_result.documents[0],
|
|
|
+ metadatas=collection_result.metadatas[0],
|
|
|
)
|
|
|
bm25_retriever.k = k
|
|
|
|
|
@@ -128,7 +133,7 @@ def query_doc_with_hybrid_search(
|
|
|
)
|
|
|
compressor = RerankCompressor(
|
|
|
embedding_function=embedding_function,
|
|
|
- top_n=k,
|
|
|
+ top_n=k_reranker,
|
|
|
reranking_function=reranking_function,
|
|
|
r_score=r,
|
|
|
)
|
|
@@ -138,10 +143,23 @@ def query_doc_with_hybrid_search(
|
|
|
)
|
|
|
|
|
|
result = compression_retriever.invoke(query)
|
|
|
+
|
|
|
+ distances = [d.metadata.get("score") for d in result]
|
|
|
+ documents = [d.page_content for d in result]
|
|
|
+ metadatas = [d.metadata for d in result]
|
|
|
+
|
|
|
+ # retrieve only min(k, k_reranker) items, sort and cut by distance if k < k_reranker
|
|
|
+ if k < k_reranker:
|
|
|
+ sorted_items = sorted(
|
|
|
+ zip(distances, metadatas, documents), key=lambda x: x[0], reverse=True
|
|
|
+ )
|
|
|
+ sorted_items = sorted_items[:k]
|
|
|
+ distances, documents, metadatas = map(list, zip(*sorted_items))
|
|
|
+
|
|
|
result = {
|
|
|
- "distances": [[d.metadata.get("score") for d in result]],
|
|
|
- "documents": [[d.page_content for d in result]],
|
|
|
- "metadatas": [[d.metadata for d in result]],
|
|
|
+ "distances": [distances],
|
|
|
+ "documents": [documents],
|
|
|
+ "metadatas": [metadatas],
|
|
|
}
|
|
|
|
|
|
log.info(
|
|
@@ -174,12 +192,9 @@ def merge_get_results(get_results: list[dict]) -> dict:
|
|
|
return result
|
|
|
|
|
|
|
|
|
-def merge_and_sort_query_results(
|
|
|
- query_results: list[dict], k: int, reverse: bool = False
|
|
|
-) -> dict:
|
|
|
+def merge_and_sort_query_results(query_results: list[dict], k: int) -> dict:
|
|
|
# Initialize lists to store combined data
|
|
|
- combined = []
|
|
|
- seen_hashes = set() # To store unique document hashes
|
|
|
+ combined = dict() # To store documents with unique document hashes
|
|
|
|
|
|
for data in query_results:
|
|
|
distances = data["distances"][0]
|
|
@@ -192,12 +207,17 @@ def merge_and_sort_query_results(
|
|
|
document.encode()
|
|
|
).hexdigest() # Compute a hash for uniqueness
|
|
|
|
|
|
- if doc_hash not in seen_hashes:
|
|
|
- seen_hashes.add(doc_hash)
|
|
|
- combined.append((distance, document, metadata))
|
|
|
+ if doc_hash not in combined.keys():
|
|
|
+ combined[doc_hash] = (distance, document, metadata)
|
|
|
+ continue # if doc is new, no further comparison is needed
|
|
|
+
|
|
|
+ # if doc is alredy in, but new distance is better, update
|
|
|
+ if distance > combined[doc_hash][0]:
|
|
|
+ combined[doc_hash] = (distance, document, metadata)
|
|
|
|
|
|
+ combined = list(combined.values())
|
|
|
# Sort the list based on distances
|
|
|
- combined.sort(key=lambda x: x[0], reverse=reverse)
|
|
|
+ combined.sort(key=lambda x: x[0], reverse=True)
|
|
|
|
|
|
# Slice to keep only the top k elements
|
|
|
sorted_distances, sorted_documents, sorted_metadatas = (
|
|
@@ -237,7 +257,7 @@ def query_collection(
|
|
|
) -> dict:
|
|
|
results = []
|
|
|
for query in queries:
|
|
|
- query_embedding = embedding_function(query)
|
|
|
+ query_embedding = embedding_function(query, prefix=RAG_EMBEDDING_QUERY_PREFIX)
|
|
|
for collection_name in collection_names:
|
|
|
if collection_name:
|
|
|
try:
|
|
@@ -253,12 +273,7 @@ def query_collection(
|
|
|
else:
|
|
|
pass
|
|
|
|
|
|
- if VECTOR_DB == "chroma":
|
|
|
- # Chroma uses unconventional cosine similarity, so we don't need to reverse the results
|
|
|
- # https://docs.trychroma.com/docs/collections/configure#configuring-chroma-collections
|
|
|
- return merge_and_sort_query_results(results, k=k, reverse=False)
|
|
|
- else:
|
|
|
- return merge_and_sort_query_results(results, k=k, reverse=True)
|
|
|
+ return merge_and_sort_query_results(results, k=k)
|
|
|
|
|
|
|
|
|
def query_collection_with_hybrid_search(
|
|
@@ -267,39 +282,66 @@ def query_collection_with_hybrid_search(
|
|
|
embedding_function,
|
|
|
k: int,
|
|
|
reranking_function,
|
|
|
+ k_reranker: int,
|
|
|
r: float,
|
|
|
) -> dict:
|
|
|
results = []
|
|
|
error = False
|
|
|
+ # Fetch collection data once per collection sequentially
|
|
|
+ # Avoid fetching the same data multiple times later
|
|
|
+ collection_results = {}
|
|
|
for collection_name in collection_names:
|
|
|
try:
|
|
|
- for query in queries:
|
|
|
- result = query_doc_with_hybrid_search(
|
|
|
- collection_name=collection_name,
|
|
|
- query=query,
|
|
|
- embedding_function=embedding_function,
|
|
|
- k=k,
|
|
|
- reranking_function=reranking_function,
|
|
|
- r=r,
|
|
|
- )
|
|
|
- results.append(result)
|
|
|
+ collection_results[collection_name] = VECTOR_DB_CLIENT.get(
|
|
|
+ collection_name=collection_name
|
|
|
+ )
|
|
|
except Exception as e:
|
|
|
- log.exception(
|
|
|
- "Error when querying the collection with " f"hybrid_search: {e}"
|
|
|
+ log.exception(f"Failed to fetch collection {collection_name}: {e}")
|
|
|
+ collection_results[collection_name] = None
|
|
|
+
|
|
|
+ log.info(
|
|
|
+ f"Starting hybrid search for {len(queries)} queries in {len(collection_names)} collections..."
|
|
|
+ )
|
|
|
+
|
|
|
+ def process_query(collection_name, query):
|
|
|
+ try:
|
|
|
+ result = query_doc_with_hybrid_search(
|
|
|
+ collection_name=collection_name,
|
|
|
+ collection_result=collection_results[collection_name],
|
|
|
+ query=query,
|
|
|
+ embedding_function=embedding_function,
|
|
|
+ k=k,
|
|
|
+ reranking_function=reranking_function,
|
|
|
+ k_reranker=k_reranker,
|
|
|
+ r=r,
|
|
|
)
|
|
|
+ return result, None
|
|
|
+ except Exception as e:
|
|
|
+ log.exception(f"Error when querying the collection with hybrid_search: {e}")
|
|
|
+ return None, e
|
|
|
+
|
|
|
+ tasks = [
|
|
|
+ (collection_name, query)
|
|
|
+ for collection_name in collection_names
|
|
|
+ for query in queries
|
|
|
+ ]
|
|
|
+
|
|
|
+ with ThreadPoolExecutor() as executor:
|
|
|
+ future_results = [executor.submit(process_query, cn, q) for cn, q in tasks]
|
|
|
+ task_results = [future.result() for future in future_results]
|
|
|
+
|
|
|
+ for result, err in task_results:
|
|
|
+ if err is not None:
|
|
|
error = True
|
|
|
+ elif result is not None:
|
|
|
+ results.append(result)
|
|
|
|
|
|
- if error:
|
|
|
+ if error and not results:
|
|
|
raise Exception(
|
|
|
- "Hybrid search failed for all collections. Using Non hybrid search as fallback."
|
|
|
+ "Hybrid search failed for all collections. Using Non-hybrid search as fallback."
|
|
|
)
|
|
|
|
|
|
- if VECTOR_DB == "chroma":
|
|
|
- # Chroma uses unconventional cosine similarity, so we don't need to reverse the results
|
|
|
- # https://docs.trychroma.com/docs/collections/configure#configuring-chroma-collections
|
|
|
- return merge_and_sort_query_results(results, k=k, reverse=False)
|
|
|
- else:
|
|
|
- return merge_and_sort_query_results(results, k=k, reverse=True)
|
|
|
+ return merge_and_sort_query_results(results, k=k)
|
|
|
|
|
|
|
|
|
def get_embedding_function(
|
|
@@ -311,29 +353,38 @@ def get_embedding_function(
|
|
|
embedding_batch_size,
|
|
|
):
|
|
|
if embedding_engine == "":
|
|
|
- return lambda query, user=None: embedding_function.encode(query).tolist()
|
|
|
+ return lambda query, prefix=None, user=None: embedding_function.encode(
|
|
|
+ query, prompt=prefix if prefix else None
|
|
|
+ ).tolist()
|
|
|
elif embedding_engine in ["ollama", "openai"]:
|
|
|
- func = lambda query, user=None: generate_embeddings(
|
|
|
+ func = lambda query, prefix=None, user=None: generate_embeddings(
|
|
|
engine=embedding_engine,
|
|
|
model=embedding_model,
|
|
|
text=query,
|
|
|
+ prefix=prefix,
|
|
|
url=url,
|
|
|
key=key,
|
|
|
user=user,
|
|
|
)
|
|
|
|
|
|
- def generate_multiple(query, user, func):
|
|
|
+ def generate_multiple(query, prefix, user, func):
|
|
|
if isinstance(query, list):
|
|
|
embeddings = []
|
|
|
for i in range(0, len(query), embedding_batch_size):
|
|
|
embeddings.extend(
|
|
|
- func(query[i : i + embedding_batch_size], user=user)
|
|
|
+ func(
|
|
|
+ query[i : i + embedding_batch_size],
|
|
|
+ prefix=prefix,
|
|
|
+ user=user,
|
|
|
+ )
|
|
|
)
|
|
|
return embeddings
|
|
|
else:
|
|
|
- return func(query, user)
|
|
|
+ return func(query, prefix, user)
|
|
|
|
|
|
- return lambda query, user=None: generate_multiple(query, user, func)
|
|
|
+ return lambda query, prefix=None, user=None: generate_multiple(
|
|
|
+ query, prefix, user, func
|
|
|
+ )
|
|
|
else:
|
|
|
raise ValueError(f"Unknown embedding engine: {embedding_engine}")
|
|
|
|
|
@@ -345,6 +396,7 @@ def get_sources_from_files(
|
|
|
embedding_function,
|
|
|
k,
|
|
|
reranking_function,
|
|
|
+ k_reranker,
|
|
|
r,
|
|
|
hybrid_search,
|
|
|
full_context=False,
|
|
@@ -461,6 +513,7 @@ def get_sources_from_files(
|
|
|
embedding_function=embedding_function,
|
|
|
k=k,
|
|
|
reranking_function=reranking_function,
|
|
|
+ k_reranker=k_reranker,
|
|
|
r=r,
|
|
|
)
|
|
|
except Exception as e:
|
|
@@ -553,9 +606,14 @@ def generate_openai_batch_embeddings(
|
|
|
texts: list[str],
|
|
|
url: str = "https://api.openai.com/v1",
|
|
|
key: str = "",
|
|
|
+ prefix: str = None,
|
|
|
user: UserModel = None,
|
|
|
) -> Optional[list[list[float]]]:
|
|
|
try:
|
|
|
+ json_data = {"input": texts, "model": model}
|
|
|
+ if isinstance(RAG_EMBEDDING_PREFIX_FIELD_NAME, str) and isinstance(prefix, str):
|
|
|
+ json_data[RAG_EMBEDDING_PREFIX_FIELD_NAME] = prefix
|
|
|
+
|
|
|
r = requests.post(
|
|
|
f"{url}/embeddings",
|
|
|
headers={
|
|
@@ -572,7 +630,7 @@ def generate_openai_batch_embeddings(
|
|
|
else {}
|
|
|
),
|
|
|
},
|
|
|
- json={"input": texts, "model": model},
|
|
|
+ json=json_data,
|
|
|
)
|
|
|
r.raise_for_status()
|
|
|
data = r.json()
|
|
@@ -586,9 +644,18 @@ def generate_openai_batch_embeddings(
|
|
|
|
|
|
|
|
|
def generate_ollama_batch_embeddings(
|
|
|
- model: str, texts: list[str], url: str, key: str = "", user: UserModel = None
|
|
|
+ model: str,
|
|
|
+ texts: list[str],
|
|
|
+ url: str,
|
|
|
+ key: str = "",
|
|
|
+ prefix: str = None,
|
|
|
+ user: UserModel = None,
|
|
|
) -> Optional[list[list[float]]]:
|
|
|
try:
|
|
|
+ json_data = {"input": texts, "model": model}
|
|
|
+ if isinstance(RAG_EMBEDDING_PREFIX_FIELD_NAME, str) and isinstance(prefix, str):
|
|
|
+ json_data[RAG_EMBEDDING_PREFIX_FIELD_NAME] = prefix
|
|
|
+
|
|
|
r = requests.post(
|
|
|
f"{url}/api/embed",
|
|
|
headers={
|
|
@@ -605,7 +672,7 @@ def generate_ollama_batch_embeddings(
|
|
|
else {}
|
|
|
),
|
|
|
},
|
|
|
- json={"input": texts, "model": model},
|
|
|
+ json=json_data,
|
|
|
)
|
|
|
r.raise_for_status()
|
|
|
data = r.json()
|
|
@@ -619,15 +686,34 @@ def generate_ollama_batch_embeddings(
|
|
|
return None
|
|
|
|
|
|
|
|
|
-def generate_embeddings(engine: str, model: str, text: Union[str, list[str]], **kwargs):
|
|
|
+def generate_embeddings(
|
|
|
+ engine: str,
|
|
|
+ model: str,
|
|
|
+ text: Union[str, list[str]],
|
|
|
+ prefix: Union[str, None] = None,
|
|
|
+ **kwargs,
|
|
|
+):
|
|
|
url = kwargs.get("url", "")
|
|
|
key = kwargs.get("key", "")
|
|
|
user = kwargs.get("user")
|
|
|
|
|
|
+ if prefix is not None and RAG_EMBEDDING_PREFIX_FIELD_NAME is None:
|
|
|
+ if isinstance(text, list):
|
|
|
+ text = [f"{prefix}{text_element}" for text_element in text]
|
|
|
+ else:
|
|
|
+ text = f"{prefix}{text}"
|
|
|
+
|
|
|
if engine == "ollama":
|
|
|
if isinstance(text, list):
|
|
|
embeddings = generate_ollama_batch_embeddings(
|
|
|
- **{"model": model, "texts": text, "url": url, "key": key, "user": user}
|
|
|
+ **{
|
|
|
+ "model": model,
|
|
|
+ "texts": text,
|
|
|
+ "url": url,
|
|
|
+ "key": key,
|
|
|
+ "prefix": prefix,
|
|
|
+ "user": user,
|
|
|
+ }
|
|
|
)
|
|
|
else:
|
|
|
embeddings = generate_ollama_batch_embeddings(
|
|
@@ -636,16 +722,20 @@ def generate_embeddings(engine: str, model: str, text: Union[str, list[str]], **
|
|
|
"texts": [text],
|
|
|
"url": url,
|
|
|
"key": key,
|
|
|
+ "prefix": prefix,
|
|
|
"user": user,
|
|
|
}
|
|
|
)
|
|
|
return embeddings[0] if isinstance(text, str) else embeddings
|
|
|
elif engine == "openai":
|
|
|
if isinstance(text, list):
|
|
|
- embeddings = generate_openai_batch_embeddings(model, text, url, key, user)
|
|
|
+ embeddings = generate_openai_batch_embeddings(
|
|
|
+ model, text, url, key, prefix, user
|
|
|
+ )
|
|
|
else:
|
|
|
- embeddings = generate_openai_batch_embeddings(model, [text], url, key, user)
|
|
|
-
|
|
|
+ embeddings = generate_openai_batch_embeddings(
|
|
|
+ model, [text], url, key, prefix, user
|
|
|
+ )
|
|
|
return embeddings[0] if isinstance(text, str) else embeddings
|
|
|
|
|
|
|
|
@@ -681,9 +771,9 @@ class RerankCompressor(BaseDocumentCompressor):
|
|
|
else:
|
|
|
from sentence_transformers import util
|
|
|
|
|
|
- query_embedding = self.embedding_function(query)
|
|
|
+ query_embedding = self.embedding_function(query, RAG_EMBEDDING_QUERY_PREFIX)
|
|
|
document_embedding = self.embedding_function(
|
|
|
- [doc.page_content for doc in documents]
|
|
|
+ [doc.page_content for doc in documents], RAG_EMBEDDING_CONTENT_PREFIX
|
|
|
)
|
|
|
scores = util.cos_sim(query_embedding, document_embedding)[0]
|
|
|
|