oracle23ai.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700
  1. from typing import Optional, List, Dict, Any
  2. from decimal import Decimal
  3. import os
  4. import oracledb
  5. from open_webui.retrieval.vector.main import (
  6. VectorDBBase,
  7. VectorItem,
  8. SearchResult,
  9. GetResult,
  10. )
  11. from open_webui.config import (
  12. ORACLE_DB_USE_WALLET
  13. ORACLE_DB_USER,
  14. ORACLE_DB_PASSWORD,
  15. ORACLE_DB_DSN,
  16. ORACLE_WALLET_DIR,
  17. ORACLE_WALLET_PASSWORD,
  18. ORACLE_VECTOR_LENGTH,
  19. )
  20. class Oracle23aiClient(VectorDBBase):
  21. """
  22. Oracle Vector Database Client for vector similarity search using Oracle Database 23ai.
  23. This client provides an interface to store, retrieve, and search vector embeddings
  24. in an Oracle database. It uses connection pooling for efficient database access
  25. and supports vector similarity search operations.
  26. Attributes:
  27. pool: Connection pool for Oracle database connections
  28. """
  29. def __init__(self) -> None:
  30. """
  31. Initialize the Oracle23aiClient with a connection pool.
  32. Creates a connection pool with min=2 and max=10 connections, initializes
  33. the database schema if needed, and sets up necessary tables and indexes.
  34. Raises:
  35. ValueError: If required configuration parameters are missing
  36. Exception: If database initialization fails
  37. """
  38. try:
  39. if not ORACLE_DB_DSN:
  40. raise ValueError("ORACLE_DB_DSN is required for Oracle Vector Search")
  41. self.pool = oracledb.create_pool(
  42. user=ORACLE_DB_USER,
  43. password=ORACLE_DB_PASSWORD,
  44. dsn=ORACLE_DB_DSN,
  45. min=2,
  46. max=10,
  47. increment=1,
  48. config_dir=ORACLE_WALLET_DIR,
  49. wallet_location=ORACLE_WALLET_DIR,
  50. wallet_password=ORACLE_WALLET_PASSWORD
  51. )
  52. log.info(f" >>> Creating Connection Pool [{ORACLE_DB_USER}:**@{ORACLE_DB_DSN}]")
  53. with self.get_connection() as connection:
  54. log.info("Connection version:", connection.version)
  55. self._initialize_database(connection)
  56. print("Oracle Vector Search initialization complete.")
  57. except Exception as e:
  58. print(f"Error during Oracle Vector Search initialization: {e}")
  59. raise
  60. def get_connection(self):
  61. """
  62. Acquire a connection from the connection pool.
  63. Returns:
  64. connection: A database connection with output type handler configured
  65. """
  66. connection = self.pool.acquire()
  67. connection.outputtypehandler = self._output_type_handler
  68. return connection
  69. def _output_type_handler(self, cursor, metadata):
  70. """
  71. Handle Oracle vector type conversion.
  72. Args:
  73. cursor: Oracle database cursor
  74. metadata: Metadata for the column
  75. Returns:
  76. A variable with appropriate conversion for vector types
  77. """
  78. if metadata.type_code is oracledb.DB_TYPE_VECTOR:
  79. return cursor.var(metadata.type_code, arraysize=cursor.arraysize,
  80. outconverter=list)
  81. def _initialize_database(self, connection) -> None:
  82. """
  83. Initialize database schema, tables and indexes.
  84. Creates the document_chunk table and necessary indexes if they don't exist.
  85. Args:
  86. connection: Oracle database connection
  87. Raises:
  88. Exception: If schema initialization fails
  89. """
  90. with connection.cursor() as cursor:
  91. print(f" >>> Creating Table document_chunk")
  92. cursor.execute(f"""
  93. BEGIN
  94. EXECUTE IMMEDIATE '
  95. CREATE TABLE IF NOT EXISTS document_chunk (
  96. id VARCHAR2(255) PRIMARY KEY,
  97. collection_name VARCHAR2(255) NOT NULL,
  98. text CLOB,
  99. vmetadata JSON,
  100. vector vector(*, float32)
  101. )
  102. ';
  103. EXCEPTION
  104. WHEN OTHERS THEN
  105. IF SQLCODE != -955 THEN
  106. RAISE;
  107. END IF;
  108. END;
  109. """)
  110. print(f" >>> Creating Table document_chunk_collection_name_idx")
  111. cursor.execute("""
  112. BEGIN
  113. EXECUTE IMMEDIATE '
  114. CREATE INDEX IF NOT exists document_chunk_collection_name_idx
  115. ON document_chunk (collection_name)
  116. ';
  117. EXCEPTION
  118. WHEN OTHERS THEN
  119. IF SQLCODE != -955 THEN
  120. RAISE;
  121. END IF;
  122. END;
  123. """)
  124. print(f" >>> Creating VECTOR INDEX document_chunk_vector_ivf_idx")
  125. cursor.execute("""
  126. BEGIN
  127. EXECUTE IMMEDIATE '
  128. create vector index IF NOT EXISTS document_chunk_vector_ivf_idx on document_chunk(vector)
  129. organization neighbor partitions
  130. distance cosine
  131. with target accuracy 95
  132. PARAMETERS (type IVF, NEIGHBOR PARTITIONS 100)
  133. ';
  134. EXCEPTION
  135. WHEN OTHERS THEN
  136. IF SQLCODE != -955 THEN
  137. RAISE;
  138. END IF;
  139. END;
  140. """)
  141. connection.commit()
  142. def check_vector_length(self) -> None:
  143. """
  144. Check vector length compatibility (placeholder).
  145. This method would check if the configured vector length matches the database schema.
  146. Currently implemented as a placeholder.
  147. """
  148. pass
  149. def _vector_to_blob(self, vector: List[float]) -> bytes:
  150. """
  151. Convert a vector to Oracle BLOB format.
  152. Args:
  153. vector (List[float]): The vector to convert
  154. Returns:
  155. bytes: The vector in Oracle BLOB format
  156. """
  157. import array
  158. return array.array("f", vector)
  159. def adjust_vector_length(self, vector: List[float]) -> List[float]:
  160. """
  161. Adjust vector to the expected length if needed.
  162. Args:
  163. vector (List[float]): The vector to adjust
  164. Returns:
  165. List[float]: The adjusted vector
  166. """
  167. return vector
  168. def _decimal_handler(self, obj):
  169. """
  170. Handle Decimal objects for JSON serialization.
  171. Args:
  172. obj: Object to serialize
  173. Returns:
  174. float: Converted decimal value
  175. Raises:
  176. TypeError: If object is not JSON serializable
  177. """
  178. if isinstance(obj, Decimal):
  179. return float(obj)
  180. raise TypeError(f"{obj} is not JSON serializable")
  181. def _metadata_to_json(self, metadata: Dict) -> str:
  182. """
  183. Convert metadata dictionary to JSON string.
  184. Args:
  185. metadata (Dict): Metadata dictionary
  186. Returns:
  187. str: JSON representation of metadata
  188. """
  189. import json
  190. return json.dumps(metadata, default=self._decimal_handler) if metadata else "{}"
  191. def _json_to_metadata(self, json_str: str) -> Dict:
  192. """
  193. Convert JSON string to metadata dictionary.
  194. Args:
  195. json_str (str): JSON string
  196. Returns:
  197. Dict: Metadata dictionary
  198. """
  199. import json
  200. return json.loads(json_str) if json_str else {}
  201. def insert(self, collection_name: str, items: List[VectorItem]) -> None:
  202. """
  203. Insert vector items into the database.
  204. Args:
  205. collection_name (str): Name of the collection
  206. items (List[VectorItem]): List of vector items to insert
  207. Raises:
  208. Exception: If insertion fails
  209. Example:
  210. >>> client = Oracle23aiClient()
  211. >>> items = [
  212. ... {"id": "1", "text": "Sample text", "vector": [0.1, 0.2, ...], "metadata": {"source": "doc1"}},
  213. ... {"id": "2", "text": "Another text", "vector": [0.3, 0.4, ...], "metadata": {"source": "doc2"}}
  214. ... ]
  215. >>> client.insert("my_collection", items)
  216. """
  217. print(f"Oracle23aiClient:Inserting {len(items)} items into collection '{collection_name}'.")
  218. with self.get_connection() as connection:
  219. try:
  220. with connection.cursor() as cursor:
  221. for item in items:
  222. vector_blob = self._vector_to_blob(item["vector"])
  223. metadata_json = self._metadata_to_json(item["metadata"])
  224. cursor.execute("""
  225. INSERT INTO document_chunk
  226. (id, collection_name, text, vmetadata, vector)
  227. VALUES (:id, :collection_name, :text, :metadata, :vector)
  228. """, {
  229. 'id': item["id"],
  230. 'collection_name': collection_name,
  231. 'text': item["text"],
  232. 'metadata': metadata_json,
  233. 'vector': vector_blob
  234. })
  235. connection.commit()
  236. print(f"Oracle23aiClient:Inserted {len(items)} items into collection '{collection_name}'.")
  237. except Exception as e:
  238. connection.rollback()
  239. print(f"Error during insert: {e}")
  240. raise
  241. def upsert(self, collection_name: str, items: List[VectorItem]) -> None:
  242. """
  243. Update or insert vector items into the database.
  244. If an item with the same ID exists, it will be updated;
  245. otherwise, it will be inserted.
  246. Args:
  247. collection_name (str): Name of the collection
  248. items (List[VectorItem]): List of vector items to upsert
  249. Raises:
  250. Exception: If upsert operation fails
  251. Example:
  252. >>> client = Oracle23aiClient()
  253. >>> items = [
  254. ... {"id": "1", "text": "Updated text", "vector": [0.1, 0.2, ...], "metadata": {"source": "doc1"}},
  255. ... {"id": "3", "text": "New item", "vector": [0.5, 0.6, ...], "metadata": {"source": "doc3"}}
  256. ... ]
  257. >>> client.upsert("my_collection", items)
  258. """
  259. with self.get_connection() as connection:
  260. try:
  261. with connection.cursor() as cursor:
  262. for item in items:
  263. vector_blob = self._vector_to_blob(item["vector"])
  264. metadata_json = self._metadata_to_json(item["metadata"])
  265. cursor.execute("""
  266. MERGE INTO document_chunk d
  267. USING (SELECT :id as id FROM dual) s
  268. ON (d.id = s.id)
  269. WHEN MATCHED THEN
  270. UPDATE SET
  271. collection_name = :collection_name,
  272. text = :text,
  273. vmetadata = :metadata,
  274. vector = :vector
  275. WHEN NOT MATCHED THEN
  276. INSERT (id, collection_name, text, vmetadata, vector)
  277. VALUES (:id, :collection_name, :text, :metadata, :vector)
  278. """, {
  279. 'id': item["id"],
  280. 'collection_name': collection_name,
  281. 'text': item["text"],
  282. 'metadata': metadata_json,
  283. 'vector': vector_blob,
  284. 'id': item["id"],
  285. 'collection_name': collection_name,
  286. 'text': item["text"],
  287. 'metadata': metadata_json,
  288. 'vector': vector_blob
  289. })
  290. connection.commit()
  291. print(f"Upserted {len(items)} items into collection '{collection_name}'.")
  292. except Exception as e:
  293. connection.rollback()
  294. print(f"Error during upsert: {e}")
  295. raise
  296. def search(
  297. self,
  298. collection_name: str,
  299. vectors: List[List[float]],
  300. limit: Optional[int] = None
  301. ) -> Optional[SearchResult]:
  302. """
  303. Search for similar vectors in the database.
  304. Performs vector similarity search using cosine distance.
  305. Args:
  306. collection_name (str): Name of the collection to search
  307. vectors (List[List[float]]): Query vectors to find similar items for
  308. limit (Optional[int]): Maximum number of results to return per query
  309. Returns:
  310. Optional[SearchResult]: Search results containing ids, distances, documents, and metadata
  311. Example:
  312. >>> client = Oracle23aiClient()
  313. >>> query_vector = [0.1, 0.2, 0.3, ...] # Must match VECTOR_LENGTH
  314. >>> results = client.search("my_collection", [query_vector], limit=5)
  315. >>> if results:
  316. ... print(f"Found {len(results.ids[0])} matches")
  317. ... for i, (id, dist) in enumerate(zip(results.ids[0], results.distances[0])):
  318. ... print(f"Match {i+1}: id={id}, distance={dist}")
  319. """
  320. print(f"Oracle23aiClient:Searching items from collection '{collection_name}'.")
  321. try:
  322. if not vectors:
  323. return None
  324. limit = limit or 10
  325. num_queries = len(vectors)
  326. ids = [[] for _ in range(num_queries)]
  327. distances = [[] for _ in range(num_queries)]
  328. documents = [[] for _ in range(num_queries)]
  329. metadatas = [[] for _ in range(num_queries)]
  330. with self.get_connection() as connection:
  331. with connection.cursor() as cursor:
  332. for qid, vector in enumerate(vectors):
  333. vector_blob = self._vector_to_blob(vector)
  334. cursor.execute("""
  335. SELECT dc.id, dc.text,
  336. JSON_SERIALIZE(dc.vmetadata) as vmetadata,
  337. VECTOR_DISTANCE(dc.vector, :query_vector, COSINE) as distance
  338. FROM document_chunk dc
  339. WHERE dc.collection_name = :collection_name
  340. ORDER BY VECTOR_DISTANCE(dc.vector, :query_vector, COSINE)
  341. FETCH APPROX FIRST :limit ROWS ONLY
  342. """, {
  343. 'query_vector': vector_blob,
  344. 'collection_name': collection_name,
  345. 'limit': limit
  346. })
  347. results = cursor.fetchall()
  348. for row in results:
  349. ids[qid].append(row[0])
  350. documents[qid].append(row[1].read() if isinstance(row[1], oracledb.LOB) else str(row[1]))
  351. metadatas[qid].append(row[2].read() if isinstance(row[2], oracledb.LOB) else row[2])
  352. distances[qid].append(float(row[3]))
  353. return SearchResult(
  354. ids=ids,
  355. distances=distances,
  356. documents=documents,
  357. metadatas=metadatas
  358. )
  359. except Exception as e:
  360. print(f"Error during search: {e}")
  361. import traceback
  362. print(traceback.format_exc())
  363. return None
  364. def query(
  365. self,
  366. collection_name: str,
  367. filter: Dict[str, Any],
  368. limit: Optional[int] = None
  369. ) -> Optional[GetResult]:
  370. """
  371. Query items based on metadata filters.
  372. Retrieves items that match specified metadata criteria.
  373. Args:
  374. collection_name (str): Name of the collection to query
  375. filter (Dict[str, Any]): Metadata filters to apply
  376. limit (Optional[int]): Maximum number of results to return
  377. Returns:
  378. Optional[GetResult]: Query results containing ids, documents, and metadata
  379. Example:
  380. >>> client = Oracle23aiClient()
  381. >>> filter = {"source": "doc1", "category": "finance"}
  382. >>> results = client.query("my_collection", filter, limit=20)
  383. >>> if results:
  384. ... print(f"Found {len(results.ids[0])} matching documents")
  385. """
  386. print(f"Oracle23aiClient:Querying items from collection '{collection_name}'.")
  387. try:
  388. limit = limit or 100
  389. query = """
  390. SELECT id, text, vmetadata
  391. FROM document_chunk
  392. WHERE collection_name = :collection_name
  393. """
  394. params = {'collection_name': collection_name}
  395. for i, (key, value) in enumerate(filter.items()):
  396. param_name = f"value_{i}"
  397. query += f" AND JSON_VALUE(vmetadata, '$.{key}' RETURNING VARCHAR2(4096)) = :{param_name}"
  398. params[param_name] = str(value)
  399. query += " FETCH FIRST :limit ROWS ONLY"
  400. params['limit'] = limit
  401. with self.get_connection() as connection:
  402. with connection.cursor() as cursor:
  403. cursor.execute(query, params)
  404. results = cursor.fetchall()
  405. if not results:
  406. return None
  407. ids = [[row[0] for row in results]]
  408. documents = [[row[1].read() if isinstance(row[1], oracledb.LOB) else str(row[1]) for row in results]]
  409. metadatas = [[row[2].read() if isinstance(row[2], oracledb.LOB) else row[2] for row in results]]
  410. return GetResult(
  411. ids=ids,
  412. documents=documents,
  413. metadatas=metadatas
  414. )
  415. except Exception as e:
  416. print(f"Error during query: {e}")
  417. import traceback
  418. print(traceback.format_exc())
  419. return None
  420. def get(
  421. self,
  422. collection_name: str,
  423. limit: Optional[int] = None
  424. ) -> Optional[GetResult]:
  425. """
  426. Get all items in a collection.
  427. Retrieves items from a specified collection up to the limit.
  428. Args:
  429. collection_name (str): Name of the collection to retrieve
  430. limit (Optional[int]): Maximum number of items to retrieve
  431. Returns:
  432. Optional[GetResult]: Result containing ids, documents, and metadata
  433. Example:
  434. >>> client = Oracle23aiClient()
  435. >>> results = client.get("my_collection", limit=50)
  436. >>> if results:
  437. ... print(f"Retrieved {len(results.ids[0])} documents from collection")
  438. """
  439. try:
  440. limit = limit or 100
  441. with self.get_connection() as connection:
  442. with connection.cursor() as cursor:
  443. cursor.execute("""
  444. SELECT /*+ MONITOR */ id, text, vmetadata
  445. FROM document_chunk
  446. WHERE collection_name = :collection_name
  447. FETCH FIRST :limit ROWS ONLY
  448. """, {
  449. 'collection_name': collection_name,
  450. 'limit': limit
  451. })
  452. results = cursor.fetchall()
  453. if not results:
  454. return None
  455. ids = [[row[0] for row in results]]
  456. documents = [[row[1].read() if isinstance(row[1], oracledb.LOB) else str(row[1]) for row in results]]
  457. metadatas = [[row[2].read() if isinstance(row[2], oracledb.LOB) else row[2] for row in results]]
  458. return GetResult(
  459. ids=ids,
  460. documents=documents,
  461. metadatas=metadatas
  462. )
  463. except Exception as e:
  464. print(f"Error during get: {e}")
  465. import traceback
  466. print(traceback.format_exc())
  467. return None
  468. def delete(
  469. self,
  470. collection_name: str,
  471. ids: Optional[List[str]] = None,
  472. filter: Optional[Dict[str, Any]] = None,
  473. ) -> None:
  474. """
  475. Delete items from the database.
  476. Deletes items from a collection based on IDs or metadata filters.
  477. Args:
  478. collection_name (str): Name of the collection to delete from
  479. ids (Optional[List[str]]): Specific item IDs to delete
  480. filter (Optional[Dict[str, Any]]): Metadata filters for deletion
  481. Raises:
  482. Exception: If deletion fails
  483. Example:
  484. >>> client = Oracle23aiClient()
  485. >>> # Delete specific items by ID
  486. >>> client.delete("my_collection", ids=["1", "3", "5"])
  487. >>> # Or delete by metadata filter
  488. >>> client.delete("my_collection", filter={"source": "deprecated_source"})
  489. """
  490. try:
  491. query = "DELETE FROM document_chunk WHERE collection_name = :collection_name"
  492. params = {'collection_name': collection_name}
  493. if ids:
  494. id_list = ",".join([f"'{id}'" for id in ids])
  495. query += f" AND id IN ({id_list})"
  496. if filter:
  497. for i, (key, value) in enumerate(filter.items()):
  498. param_name = f"value_{i}"
  499. query += f" AND JSON_VALUE(vmetadata, '$.{key}' RETURNING VARCHAR2(4096)) = :{param_name}"
  500. params[param_name] = str(value)
  501. with self.get_connection() as connection:
  502. with connection.cursor() as cursor:
  503. cursor.execute(query, params)
  504. deleted = cursor.rowcount
  505. connection.commit()
  506. print(f"Deleted {deleted} items from collection '{collection_name}'.")
  507. except Exception as e:
  508. print(f"Error during delete: {e}")
  509. raise
  510. def reset(self) -> None:
  511. """
  512. Reset the database by deleting all items.
  513. Deletes all items from the document_chunk table.
  514. Raises:
  515. Exception: If reset fails
  516. Example:
  517. >>> client = Oracle23aiClient()
  518. >>> client.reset() # Warning: Removes all data!
  519. """
  520. try:
  521. with self.get_connection() as connection:
  522. with connection.cursor() as cursor:
  523. cursor.execute("DELETE FROM document_chunk")
  524. deleted = cursor.rowcount
  525. connection.commit()
  526. print(f"Reset complete. Deleted {deleted} items from 'document_chunk' table.")
  527. except Exception as e:
  528. print(f"Error during reset: {e}")
  529. raise
  530. def close(self) -> None:
  531. """
  532. Close the database connection pool.
  533. Properly closes the connection pool and releases all resources.
  534. Example:
  535. >>> client = Oracle23aiClient()
  536. >>> # After finishing all operations
  537. >>> client.close()
  538. """
  539. try:
  540. if hasattr(self, 'pool') and self.pool:
  541. self.pool.close()
  542. print("Oracle Vector Search connection pool closed.")
  543. except Exception as e:
  544. print(f"Error closing connection pool: {e}")
  545. def has_collection(self, collection_name: str) -> bool:
  546. """
  547. Check if a collection exists.
  548. Args:
  549. collection_name (str): Name of the collection to check
  550. Returns:
  551. bool: True if the collection exists, False otherwise
  552. Example:
  553. >>> client = Oracle23aiClient()
  554. >>> if client.has_collection("my_collection"):
  555. ... print("Collection exists!")
  556. ... else:
  557. ... print("Collection does not exist.")
  558. """
  559. try:
  560. with self.get_connection() as connection:
  561. with connection.cursor() as cursor:
  562. cursor.execute("""
  563. SELECT COUNT(*)
  564. FROM document_chunk
  565. WHERE collection_name = :collection_name
  566. FETCH FIRST 1 ROWS ONLY
  567. """, {'collection_name': collection_name})
  568. count = cursor.fetchone()[0]
  569. return count > 0
  570. except Exception as e:
  571. print(f"Error checking collection existence: {e}")
  572. return False
  573. def delete_collection(self, collection_name: str) -> None:
  574. """
  575. Delete an entire collection.
  576. Removes all items belonging to the specified collection.
  577. Args:
  578. collection_name (str): Name of the collection to delete
  579. Example:
  580. >>> client = Oracle23aiClient()
  581. >>> client.delete_collection("obsolete_collection")
  582. """
  583. self.delete(collection_name)
  584. print(f"Collection '{collection_name}' deleted.")