oracle23ai.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699
  1. from typing import Optional, List, Dict, Any
  2. from decimal import Decimal
  3. import os
  4. import oracledb
  5. from open_webui.retrieval.vector.main import (
  6. VectorDBBase,
  7. VectorItem,
  8. SearchResult,
  9. GetResult,
  10. )
  11. from open_webui.config import (
  12. ORACLE_DB_USER,
  13. ORACLE_DB_PASSWORD,
  14. ORACLE_DB_DSN,
  15. ORACLE_WALLET_DIR,
  16. ORACLE_WALLET_PASSWORD,
  17. ORACLE_VECTOR_LENGTH,
  18. )
  19. class Oracle23aiClient(VectorDBBase):
  20. """
  21. Oracle Vector Database Client for vector similarity search using Oracle Database 23ai.
  22. This client provides an interface to store, retrieve, and search vector embeddings
  23. in an Oracle database. It uses connection pooling for efficient database access
  24. and supports vector similarity search operations.
  25. Attributes:
  26. pool: Connection pool for Oracle database connections
  27. """
  28. def __init__(self) -> None:
  29. """
  30. Initialize the Oracle23aiClient with a connection pool.
  31. Creates a connection pool with min=2 and max=10 connections, initializes
  32. the database schema if needed, and sets up necessary tables and indexes.
  33. Raises:
  34. ValueError: If required configuration parameters are missing
  35. Exception: If database initialization fails
  36. """
  37. try:
  38. if not ORACLE_DB_DSN:
  39. raise ValueError("ORACLE_DB_DSN is required for Oracle Vector Search")
  40. self.pool = oracledb.create_pool(
  41. user=ORACLE_DB_USER,
  42. password=ORACLE_DB_PASSWORD,
  43. dsn=ORACLE_DB_DSN,
  44. min=2,
  45. max=10,
  46. increment=1,
  47. config_dir=ORACLE_WALLET_DIR,
  48. wallet_location=ORACLE_WALLET_DIR,
  49. wallet_password=ORACLE_WALLET_PASSWORD
  50. )
  51. print(f" >>> Creating Connection Pool [{ORACLE_DB_USER}:**@{ORACLE_DB_DSN}]")
  52. with self.get_connection() as connection:
  53. print("Connection version:", connection.version)
  54. self._initialize_database(connection)
  55. print("Oracle Vector Search initialization complete.")
  56. except Exception as e:
  57. print(f"Error during Oracle Vector Search initialization: {e}")
  58. raise
  59. def get_connection(self):
  60. """
  61. Acquire a connection from the connection pool.
  62. Returns:
  63. connection: A database connection with output type handler configured
  64. """
  65. connection = self.pool.acquire()
  66. connection.outputtypehandler = self._output_type_handler
  67. return connection
  68. def _output_type_handler(self, cursor, metadata):
  69. """
  70. Handle Oracle vector type conversion.
  71. Args:
  72. cursor: Oracle database cursor
  73. metadata: Metadata for the column
  74. Returns:
  75. A variable with appropriate conversion for vector types
  76. """
  77. if metadata.type_code is oracledb.DB_TYPE_VECTOR:
  78. return cursor.var(metadata.type_code, arraysize=cursor.arraysize,
  79. outconverter=list)
  80. def _initialize_database(self, connection) -> None:
  81. """
  82. Initialize database schema, tables and indexes.
  83. Creates the document_chunk table and necessary indexes if they don't exist.
  84. Args:
  85. connection: Oracle database connection
  86. Raises:
  87. Exception: If schema initialization fails
  88. """
  89. with connection.cursor() as cursor:
  90. print(f" >>> Creating Table document_chunk")
  91. cursor.execute(f"""
  92. BEGIN
  93. EXECUTE IMMEDIATE '
  94. CREATE TABLE IF NOT EXISTS document_chunk (
  95. id VARCHAR2(255) PRIMARY KEY,
  96. collection_name VARCHAR2(255) NOT NULL,
  97. text CLOB,
  98. vmetadata JSON,
  99. vector vector(*, float32)
  100. )
  101. ';
  102. EXCEPTION
  103. WHEN OTHERS THEN
  104. IF SQLCODE != -955 THEN
  105. RAISE;
  106. END IF;
  107. END;
  108. """)
  109. print(f" >>> Creating Table document_chunk_collection_name_idx")
  110. cursor.execute("""
  111. BEGIN
  112. EXECUTE IMMEDIATE '
  113. CREATE INDEX IF NOT exists document_chunk_collection_name_idx
  114. ON document_chunk (collection_name)
  115. ';
  116. EXCEPTION
  117. WHEN OTHERS THEN
  118. IF SQLCODE != -955 THEN
  119. RAISE;
  120. END IF;
  121. END;
  122. """)
  123. print(f" >>> Creating VECTOR INDEX document_chunk_vector_ivf_idx")
  124. cursor.execute("""
  125. BEGIN
  126. EXECUTE IMMEDIATE '
  127. create vector index IF NOT EXISTS document_chunk_vector_ivf_idx on document_chunk(vector)
  128. organization neighbor partitions
  129. distance cosine
  130. with target accuracy 95
  131. PARAMETERS (type IVF, NEIGHBOR PARTITIONS 100)
  132. ';
  133. EXCEPTION
  134. WHEN OTHERS THEN
  135. IF SQLCODE != -955 THEN
  136. RAISE;
  137. END IF;
  138. END;
  139. """)
  140. connection.commit()
  141. def check_vector_length(self) -> None:
  142. """
  143. Check vector length compatibility (placeholder).
  144. This method would check if the configured vector length matches the database schema.
  145. Currently implemented as a placeholder.
  146. """
  147. pass
  148. def _vector_to_blob(self, vector: List[float]) -> bytes:
  149. """
  150. Convert a vector to Oracle BLOB format.
  151. Args:
  152. vector (List[float]): The vector to convert
  153. Returns:
  154. bytes: The vector in Oracle BLOB format
  155. """
  156. import array
  157. return array.array("f", vector)
  158. def adjust_vector_length(self, vector: List[float]) -> List[float]:
  159. """
  160. Adjust vector to the expected length if needed.
  161. Args:
  162. vector (List[float]): The vector to adjust
  163. Returns:
  164. List[float]: The adjusted vector
  165. """
  166. return vector
  167. def _decimal_handler(self, obj):
  168. """
  169. Handle Decimal objects for JSON serialization.
  170. Args:
  171. obj: Object to serialize
  172. Returns:
  173. float: Converted decimal value
  174. Raises:
  175. TypeError: If object is not JSON serializable
  176. """
  177. if isinstance(obj, Decimal):
  178. return float(obj)
  179. raise TypeError(f"{obj} is not JSON serializable")
  180. def _metadata_to_json(self, metadata: Dict) -> str:
  181. """
  182. Convert metadata dictionary to JSON string.
  183. Args:
  184. metadata (Dict): Metadata dictionary
  185. Returns:
  186. str: JSON representation of metadata
  187. """
  188. import json
  189. return json.dumps(metadata, default=self._decimal_handler) if metadata else "{}"
  190. def _json_to_metadata(self, json_str: str) -> Dict:
  191. """
  192. Convert JSON string to metadata dictionary.
  193. Args:
  194. json_str (str): JSON string
  195. Returns:
  196. Dict: Metadata dictionary
  197. """
  198. import json
  199. return json.loads(json_str) if json_str else {}
  200. def insert(self, collection_name: str, items: List[VectorItem]) -> None:
  201. """
  202. Insert vector items into the database.
  203. Args:
  204. collection_name (str): Name of the collection
  205. items (List[VectorItem]): List of vector items to insert
  206. Raises:
  207. Exception: If insertion fails
  208. Example:
  209. >>> client = Oracle23aiClient()
  210. >>> items = [
  211. ... {"id": "1", "text": "Sample text", "vector": [0.1, 0.2, ...], "metadata": {"source": "doc1"}},
  212. ... {"id": "2", "text": "Another text", "vector": [0.3, 0.4, ...], "metadata": {"source": "doc2"}}
  213. ... ]
  214. >>> client.insert("my_collection", items)
  215. """
  216. print(f"Oracle23aiClient:Inserting {len(items)} items into collection '{collection_name}'.")
  217. with self.get_connection() as connection:
  218. try:
  219. with connection.cursor() as cursor:
  220. for item in items:
  221. vector_blob = self._vector_to_blob(item["vector"])
  222. metadata_json = self._metadata_to_json(item["metadata"])
  223. cursor.execute("""
  224. INSERT INTO document_chunk
  225. (id, collection_name, text, vmetadata, vector)
  226. VALUES (:id, :collection_name, :text, :metadata, :vector)
  227. """, {
  228. 'id': item["id"],
  229. 'collection_name': collection_name,
  230. 'text': item["text"],
  231. 'metadata': metadata_json,
  232. 'vector': vector_blob
  233. })
  234. connection.commit()
  235. print(f"Oracle23aiClient:Inserted {len(items)} items into collection '{collection_name}'.")
  236. except Exception as e:
  237. connection.rollback()
  238. print(f"Error during insert: {e}")
  239. raise
  240. def upsert(self, collection_name: str, items: List[VectorItem]) -> None:
  241. """
  242. Update or insert vector items into the database.
  243. If an item with the same ID exists, it will be updated;
  244. otherwise, it will be inserted.
  245. Args:
  246. collection_name (str): Name of the collection
  247. items (List[VectorItem]): List of vector items to upsert
  248. Raises:
  249. Exception: If upsert operation fails
  250. Example:
  251. >>> client = Oracle23aiClient()
  252. >>> items = [
  253. ... {"id": "1", "text": "Updated text", "vector": [0.1, 0.2, ...], "metadata": {"source": "doc1"}},
  254. ... {"id": "3", "text": "New item", "vector": [0.5, 0.6, ...], "metadata": {"source": "doc3"}}
  255. ... ]
  256. >>> client.upsert("my_collection", items)
  257. """
  258. with self.get_connection() as connection:
  259. try:
  260. with connection.cursor() as cursor:
  261. for item in items:
  262. vector_blob = self._vector_to_blob(item["vector"])
  263. metadata_json = self._metadata_to_json(item["metadata"])
  264. cursor.execute("""
  265. MERGE INTO document_chunk d
  266. USING (SELECT :id as id FROM dual) s
  267. ON (d.id = s.id)
  268. WHEN MATCHED THEN
  269. UPDATE SET
  270. collection_name = :collection_name,
  271. text = :text,
  272. vmetadata = :metadata,
  273. vector = :vector
  274. WHEN NOT MATCHED THEN
  275. INSERT (id, collection_name, text, vmetadata, vector)
  276. VALUES (:id, :collection_name, :text, :metadata, :vector)
  277. """, {
  278. 'id': item["id"],
  279. 'collection_name': collection_name,
  280. 'text': item["text"],
  281. 'metadata': metadata_json,
  282. 'vector': vector_blob,
  283. 'id': item["id"],
  284. 'collection_name': collection_name,
  285. 'text': item["text"],
  286. 'metadata': metadata_json,
  287. 'vector': vector_blob
  288. })
  289. connection.commit()
  290. print(f"Upserted {len(items)} items into collection '{collection_name}'.")
  291. except Exception as e:
  292. connection.rollback()
  293. print(f"Error during upsert: {e}")
  294. raise
  295. def search(
  296. self,
  297. collection_name: str,
  298. vectors: List[List[float]],
  299. limit: Optional[int] = None
  300. ) -> Optional[SearchResult]:
  301. """
  302. Search for similar vectors in the database.
  303. Performs vector similarity search using cosine distance.
  304. Args:
  305. collection_name (str): Name of the collection to search
  306. vectors (List[List[float]]): Query vectors to find similar items for
  307. limit (Optional[int]): Maximum number of results to return per query
  308. Returns:
  309. Optional[SearchResult]: Search results containing ids, distances, documents, and metadata
  310. Example:
  311. >>> client = Oracle23aiClient()
  312. >>> query_vector = [0.1, 0.2, 0.3, ...] # Must match VECTOR_LENGTH
  313. >>> results = client.search("my_collection", [query_vector], limit=5)
  314. >>> if results:
  315. ... print(f"Found {len(results.ids[0])} matches")
  316. ... for i, (id, dist) in enumerate(zip(results.ids[0], results.distances[0])):
  317. ... print(f"Match {i+1}: id={id}, distance={dist}")
  318. """
  319. print(f"Oracle23aiClient:Searching items from collection '{collection_name}'.")
  320. try:
  321. if not vectors:
  322. return None
  323. limit = limit or 10
  324. num_queries = len(vectors)
  325. ids = [[] for _ in range(num_queries)]
  326. distances = [[] for _ in range(num_queries)]
  327. documents = [[] for _ in range(num_queries)]
  328. metadatas = [[] for _ in range(num_queries)]
  329. with self.get_connection() as connection:
  330. with connection.cursor() as cursor:
  331. for qid, vector in enumerate(vectors):
  332. vector_blob = self._vector_to_blob(vector)
  333. cursor.execute("""
  334. SELECT dc.id, dc.text,
  335. JSON_SERIALIZE(dc.vmetadata) as vmetadata,
  336. VECTOR_DISTANCE(dc.vector, :query_vector, COSINE) as distance
  337. FROM document_chunk dc
  338. WHERE dc.collection_name = :collection_name
  339. ORDER BY VECTOR_DISTANCE(dc.vector, :query_vector, COSINE)
  340. FETCH APPROX FIRST :limit ROWS ONLY
  341. """, {
  342. 'query_vector': vector_blob,
  343. 'collection_name': collection_name,
  344. 'limit': limit
  345. })
  346. results = cursor.fetchall()
  347. for row in results:
  348. ids[qid].append(row[0])
  349. documents[qid].append(row[1].read() if isinstance(row[1], oracledb.LOB) else str(row[1]))
  350. metadatas[qid].append(row[2].read() if isinstance(row[2], oracledb.LOB) else row[2])
  351. distances[qid].append(float(row[3]))
  352. return SearchResult(
  353. ids=ids,
  354. distances=distances,
  355. documents=documents,
  356. metadatas=metadatas
  357. )
  358. except Exception as e:
  359. print(f"Error during search: {e}")
  360. import traceback
  361. print(traceback.format_exc())
  362. return None
  363. def query(
  364. self,
  365. collection_name: str,
  366. filter: Dict[str, Any],
  367. limit: Optional[int] = None
  368. ) -> Optional[GetResult]:
  369. """
  370. Query items based on metadata filters.
  371. Retrieves items that match specified metadata criteria.
  372. Args:
  373. collection_name (str): Name of the collection to query
  374. filter (Dict[str, Any]): Metadata filters to apply
  375. limit (Optional[int]): Maximum number of results to return
  376. Returns:
  377. Optional[GetResult]: Query results containing ids, documents, and metadata
  378. Example:
  379. >>> client = Oracle23aiClient()
  380. >>> filter = {"source": "doc1", "category": "finance"}
  381. >>> results = client.query("my_collection", filter, limit=20)
  382. >>> if results:
  383. ... print(f"Found {len(results.ids[0])} matching documents")
  384. """
  385. print(f"Oracle23aiClient:Querying items from collection '{collection_name}'.")
  386. try:
  387. limit = limit or 100
  388. query = """
  389. SELECT id, text, vmetadata
  390. FROM document_chunk
  391. WHERE collection_name = :collection_name
  392. """
  393. params = {'collection_name': collection_name}
  394. for i, (key, value) in enumerate(filter.items()):
  395. param_name = f"value_{i}"
  396. query += f" AND JSON_VALUE(vmetadata, '$.{key}' RETURNING VARCHAR2(4096)) = :{param_name}"
  397. params[param_name] = str(value)
  398. query += " FETCH FIRST :limit ROWS ONLY"
  399. params['limit'] = limit
  400. with self.get_connection() as connection:
  401. with connection.cursor() as cursor:
  402. cursor.execute(query, params)
  403. results = cursor.fetchall()
  404. if not results:
  405. return None
  406. ids = [[row[0] for row in results]]
  407. documents = [[row[1].read() if isinstance(row[1], oracledb.LOB) else str(row[1]) for row in results]]
  408. metadatas = [[row[2].read() if isinstance(row[2], oracledb.LOB) else row[2] for row in results]]
  409. return GetResult(
  410. ids=ids,
  411. documents=documents,
  412. metadatas=metadatas
  413. )
  414. except Exception as e:
  415. print(f"Error during query: {e}")
  416. import traceback
  417. print(traceback.format_exc())
  418. return None
  419. def get(
  420. self,
  421. collection_name: str,
  422. limit: Optional[int] = None
  423. ) -> Optional[GetResult]:
  424. """
  425. Get all items in a collection.
  426. Retrieves items from a specified collection up to the limit.
  427. Args:
  428. collection_name (str): Name of the collection to retrieve
  429. limit (Optional[int]): Maximum number of items to retrieve
  430. Returns:
  431. Optional[GetResult]: Result containing ids, documents, and metadata
  432. Example:
  433. >>> client = Oracle23aiClient()
  434. >>> results = client.get("my_collection", limit=50)
  435. >>> if results:
  436. ... print(f"Retrieved {len(results.ids[0])} documents from collection")
  437. """
  438. try:
  439. limit = limit or 100
  440. with self.get_connection() as connection:
  441. with connection.cursor() as cursor:
  442. cursor.execute("""
  443. SELECT /*+ MONITOR */ id, text, vmetadata
  444. FROM document_chunk
  445. WHERE collection_name = :collection_name
  446. FETCH FIRST :limit ROWS ONLY
  447. """, {
  448. 'collection_name': collection_name,
  449. 'limit': limit
  450. })
  451. results = cursor.fetchall()
  452. if not results:
  453. return None
  454. ids = [[row[0] for row in results]]
  455. documents = [[row[1].read() if isinstance(row[1], oracledb.LOB) else str(row[1]) for row in results]]
  456. metadatas = [[row[2].read() if isinstance(row[2], oracledb.LOB) else row[2] for row in results]]
  457. return GetResult(
  458. ids=ids,
  459. documents=documents,
  460. metadatas=metadatas
  461. )
  462. except Exception as e:
  463. print(f"Error during get: {e}")
  464. import traceback
  465. print(traceback.format_exc())
  466. return None
  467. def delete(
  468. self,
  469. collection_name: str,
  470. ids: Optional[List[str]] = None,
  471. filter: Optional[Dict[str, Any]] = None,
  472. ) -> None:
  473. """
  474. Delete items from the database.
  475. Deletes items from a collection based on IDs or metadata filters.
  476. Args:
  477. collection_name (str): Name of the collection to delete from
  478. ids (Optional[List[str]]): Specific item IDs to delete
  479. filter (Optional[Dict[str, Any]]): Metadata filters for deletion
  480. Raises:
  481. Exception: If deletion fails
  482. Example:
  483. >>> client = Oracle23aiClient()
  484. >>> # Delete specific items by ID
  485. >>> client.delete("my_collection", ids=["1", "3", "5"])
  486. >>> # Or delete by metadata filter
  487. >>> client.delete("my_collection", filter={"source": "deprecated_source"})
  488. """
  489. try:
  490. query = "DELETE FROM document_chunk WHERE collection_name = :collection_name"
  491. params = {'collection_name': collection_name}
  492. if ids:
  493. id_list = ",".join([f"'{id}'" for id in ids])
  494. query += f" AND id IN ({id_list})"
  495. if filter:
  496. for i, (key, value) in enumerate(filter.items()):
  497. param_name = f"value_{i}"
  498. query += f" AND JSON_VALUE(vmetadata, '$.{key}' RETURNING VARCHAR2(4096)) = :{param_name}"
  499. params[param_name] = str(value)
  500. with self.get_connection() as connection:
  501. with connection.cursor() as cursor:
  502. cursor.execute(query, params)
  503. deleted = cursor.rowcount
  504. connection.commit()
  505. print(f"Deleted {deleted} items from collection '{collection_name}'.")
  506. except Exception as e:
  507. print(f"Error during delete: {e}")
  508. raise
  509. def reset(self) -> None:
  510. """
  511. Reset the database by deleting all items.
  512. Deletes all items from the document_chunk table.
  513. Raises:
  514. Exception: If reset fails
  515. Example:
  516. >>> client = Oracle23aiClient()
  517. >>> client.reset() # Warning: Removes all data!
  518. """
  519. try:
  520. with self.get_connection() as connection:
  521. with connection.cursor() as cursor:
  522. cursor.execute("DELETE FROM document_chunk")
  523. deleted = cursor.rowcount
  524. connection.commit()
  525. print(f"Reset complete. Deleted {deleted} items from 'document_chunk' table.")
  526. except Exception as e:
  527. print(f"Error during reset: {e}")
  528. raise
  529. def close(self) -> None:
  530. """
  531. Close the database connection pool.
  532. Properly closes the connection pool and releases all resources.
  533. Example:
  534. >>> client = Oracle23aiClient()
  535. >>> # After finishing all operations
  536. >>> client.close()
  537. """
  538. try:
  539. if hasattr(self, 'pool') and self.pool:
  540. self.pool.close()
  541. print("Oracle Vector Search connection pool closed.")
  542. except Exception as e:
  543. print(f"Error closing connection pool: {e}")
  544. def has_collection(self, collection_name: str) -> bool:
  545. """
  546. Check if a collection exists.
  547. Args:
  548. collection_name (str): Name of the collection to check
  549. Returns:
  550. bool: True if the collection exists, False otherwise
  551. Example:
  552. >>> client = Oracle23aiClient()
  553. >>> if client.has_collection("my_collection"):
  554. ... print("Collection exists!")
  555. ... else:
  556. ... print("Collection does not exist.")
  557. """
  558. try:
  559. with self.get_connection() as connection:
  560. with connection.cursor() as cursor:
  561. cursor.execute("""
  562. SELECT COUNT(*)
  563. FROM document_chunk
  564. WHERE collection_name = :collection_name
  565. FETCH FIRST 1 ROWS ONLY
  566. """, {'collection_name': collection_name})
  567. count = cursor.fetchone()[0]
  568. return count > 0
  569. except Exception as e:
  570. print(f"Error checking collection existence: {e}")
  571. return False
  572. def delete_collection(self, collection_name: str) -> None:
  573. """
  574. Delete an entire collection.
  575. Removes all items belonging to the specified collection.
  576. Args:
  577. collection_name (str): Name of the collection to delete
  578. Example:
  579. >>> client = Oracle23aiClient()
  580. >>> client.delete_collection("obsolete_collection")
  581. """
  582. self.delete(collection_name)
  583. print(f"Collection '{collection_name}' deleted.")