oracle23ai.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943
  1. """
  2. Oracle 23ai Vector Database Client - Fixed Version
  3. # .env
  4. VECTOR_DB = "oracle23ai"
  5. ## DBCS or oracle 23ai free
  6. ORACLE_DB_USE_WALLET = false
  7. ORACLE_DB_USER = "DEMOUSER"
  8. ORACLE_DB_PASSWORD = "Welcome123456"
  9. ORACLE_DB_DSN = "localhost:1521/FREEPDB1"
  10. ## ADW or ATP
  11. # ORACLE_DB_USE_WALLET = true
  12. # ORACLE_DB_USER = "DEMOUSER"
  13. # ORACLE_DB_PASSWORD = "Welcome123456"
  14. # ORACLE_DB_DSN = "medium"
  15. # ORACLE_DB_DSN = "(description= (retry_count=3)(retry_delay=3)(address=(protocol=tcps)(port=1522)(host=xx.oraclecloud.com))(connect_data=(service_name=yy.adb.oraclecloud.com))(security=(ssl_server_dn_match=no)))"
  16. # ORACLE_WALLET_DIR = "/home/opc/adb_wallet"
  17. # ORACLE_WALLET_PASSWORD = "Welcome1"
  18. ORACLE_VECTOR_LENGTH = 768
  19. ORACLE_DB_POOL_MIN = 2
  20. ORACLE_DB_POOL_MAX = 10
  21. ORACLE_DB_POOL_INCREMENT = 1
  22. """
  23. from typing import Optional, List, Dict, Any, Union
  24. from decimal import Decimal
  25. import logging
  26. import os
  27. import threading
  28. import time
  29. import json
  30. import array
  31. import oracledb
  32. from open_webui.retrieval.vector.main import (
  33. VectorDBBase,
  34. VectorItem,
  35. SearchResult,
  36. GetResult,
  37. )
  38. from open_webui.config import (
  39. ORACLE_DB_USE_WALLET,
  40. ORACLE_DB_USER,
  41. ORACLE_DB_PASSWORD,
  42. ORACLE_DB_DSN,
  43. ORACLE_WALLET_DIR,
  44. ORACLE_WALLET_PASSWORD,
  45. ORACLE_VECTOR_LENGTH,
  46. ORACLE_DB_POOL_MIN,
  47. ORACLE_DB_POOL_MAX,
  48. ORACLE_DB_POOL_INCREMENT,
  49. )
  50. from open_webui.env import SRC_LOG_LEVELS
  51. log = logging.getLogger(__name__)
  52. log.setLevel(SRC_LOG_LEVELS["RAG"])
  53. class Oracle23aiClient(VectorDBBase):
  54. """
  55. Oracle Vector Database Client for vector similarity search using Oracle Database 23ai.
  56. This client provides an interface to store, retrieve, and search vector embeddings
  57. in an Oracle database. It uses connection pooling for efficient database access
  58. and supports vector similarity search operations.
  59. Attributes:
  60. pool: Connection pool for Oracle database connections
  61. """
  62. def __init__(self) -> None:
  63. """
  64. Initialize the Oracle23aiClient with a connection pool.
  65. Creates a connection pool with configurable min/max connections, initializes
  66. the database schema if needed, and sets up necessary tables and indexes.
  67. Raises:
  68. ValueError: If required configuration parameters are missing
  69. Exception: If database initialization fails
  70. """
  71. self.pool = None
  72. try:
  73. # Create the appropriate connection pool based on DB type
  74. if ORACLE_DB_USE_WALLET:
  75. self._create_adb_pool()
  76. else: # DBCS
  77. self._create_dbcs_pool()
  78. dsn = ORACLE_DB_DSN
  79. log.info(f"Creating Connection Pool [{ORACLE_DB_USER}:**@{dsn}]")
  80. with self.get_connection() as connection:
  81. log.info(f"Connection version: {connection.version}")
  82. self._initialize_database(connection)
  83. log.info("Oracle Vector Search initialization complete.")
  84. except Exception as e:
  85. log.exception(f"Error during Oracle Vector Search initialization: {e}")
  86. raise
  87. def _create_adb_pool(self) -> None:
  88. """
  89. Create connection pool for Oracle Autonomous Database.
  90. Uses wallet-based authentication.
  91. """
  92. self.pool = oracledb.create_pool(
  93. user=ORACLE_DB_USER,
  94. password=ORACLE_DB_PASSWORD,
  95. dsn=ORACLE_DB_DSN,
  96. min=ORACLE_DB_POOL_MIN,
  97. max=ORACLE_DB_POOL_MAX,
  98. increment=ORACLE_DB_POOL_INCREMENT,
  99. config_dir=ORACLE_WALLET_DIR,
  100. wallet_location=ORACLE_WALLET_DIR,
  101. wallet_password=ORACLE_WALLET_PASSWORD,
  102. )
  103. log.info("Created ADB connection pool with wallet authentication.")
  104. def _create_dbcs_pool(self) -> None:
  105. """
  106. Create connection pool for Oracle Database Cloud Service.
  107. Uses basic authentication without wallet.
  108. """
  109. self.pool = oracledb.create_pool(
  110. user=ORACLE_DB_USER,
  111. password=ORACLE_DB_PASSWORD,
  112. dsn=ORACLE_DB_DSN,
  113. min=ORACLE_DB_POOL_MIN,
  114. max=ORACLE_DB_POOL_MAX,
  115. increment=ORACLE_DB_POOL_INCREMENT,
  116. )
  117. log.info("Created DB connection pool with basic authentication.")
  118. def get_connection(self):
  119. """
  120. Acquire a connection from the connection pool with retry logic.
  121. Returns:
  122. connection: A database connection with output type handler configured
  123. """
  124. max_retries = 3
  125. for attempt in range(max_retries):
  126. try:
  127. connection = self.pool.acquire()
  128. connection.outputtypehandler = self._output_type_handler
  129. return connection
  130. except oracledb.DatabaseError as e:
  131. (error_obj,) = e.args
  132. log.exception(
  133. f"Connection attempt {attempt + 1} failed: {error_obj.message}"
  134. )
  135. if attempt < max_retries - 1:
  136. wait_time = 2**attempt
  137. log.info(f"Retrying in {wait_time} seconds...")
  138. time.sleep(wait_time)
  139. else:
  140. raise
  141. def start_health_monitor(self, interval_seconds: int = 60):
  142. """
  143. Start a background thread to periodically check the health of the connection pool.
  144. Args:
  145. interval_seconds (int): Number of seconds between health checks
  146. """
  147. def _monitor():
  148. while True:
  149. try:
  150. log.info("[HealthCheck] Running periodic DB health check...")
  151. self.ensure_connection()
  152. log.info("[HealthCheck] Connection is healthy.")
  153. except Exception as e:
  154. log.exception(f"[HealthCheck] Connection health check failed: {e}")
  155. time.sleep(interval_seconds)
  156. thread = threading.Thread(target=_monitor, daemon=True)
  157. thread.start()
  158. log.info(f"Started DB health monitor every {interval_seconds} seconds.")
  159. def _reconnect_pool(self):
  160. """
  161. Attempt to reinitialize the connection pool if it's been closed or broken.
  162. """
  163. try:
  164. log.info("Attempting to reinitialize the Oracle connection pool...")
  165. # Close existing pool if it exists
  166. if self.pool:
  167. try:
  168. self.pool.close()
  169. except Exception as close_error:
  170. log.warning(f"Error closing existing pool: {close_error}")
  171. # Re-create the appropriate connection pool based on DB type
  172. if ORACLE_DB_USE_WALLET:
  173. self._create_adb_pool()
  174. else: # DBCS
  175. self._create_dbcs_pool()
  176. log.info("Connection pool reinitialized.")
  177. except Exception as e:
  178. log.exception(f"Failed to reinitialize the connection pool: {e}")
  179. raise
  180. def ensure_connection(self):
  181. """
  182. Ensure the database connection is alive, reconnecting pool if needed.
  183. """
  184. try:
  185. with self.get_connection() as connection:
  186. with connection.cursor() as cursor:
  187. cursor.execute("SELECT 1 FROM dual")
  188. except Exception as e:
  189. log.exception(
  190. f"Connection check failed: {e}, attempting to reconnect pool..."
  191. )
  192. self._reconnect_pool()
  193. def _output_type_handler(self, cursor, metadata):
  194. """
  195. Handle Oracle vector type conversion.
  196. Args:
  197. cursor: Oracle database cursor
  198. metadata: Metadata for the column
  199. Returns:
  200. A variable with appropriate conversion for vector types
  201. """
  202. if metadata.type_code is oracledb.DB_TYPE_VECTOR:
  203. return cursor.var(
  204. metadata.type_code, arraysize=cursor.arraysize, outconverter=list
  205. )
  206. def _initialize_database(self, connection) -> None:
  207. """
  208. Initialize database schema, tables and indexes.
  209. Creates the document_chunk table and necessary indexes if they don't exist.
  210. Args:
  211. connection: Oracle database connection
  212. Raises:
  213. Exception: If schema initialization fails
  214. """
  215. with connection.cursor() as cursor:
  216. try:
  217. log.info("Creating Table document_chunk")
  218. cursor.execute(
  219. """
  220. BEGIN
  221. EXECUTE IMMEDIATE '
  222. CREATE TABLE IF NOT EXISTS document_chunk (
  223. id VARCHAR2(255) PRIMARY KEY,
  224. collection_name VARCHAR2(255) NOT NULL,
  225. text CLOB,
  226. vmetadata JSON,
  227. vector vector(*, float32)
  228. )
  229. ';
  230. EXCEPTION
  231. WHEN OTHERS THEN
  232. IF SQLCODE != -955 THEN
  233. RAISE;
  234. END IF;
  235. END;
  236. """
  237. )
  238. log.info("Creating Index document_chunk_collection_name_idx")
  239. cursor.execute(
  240. """
  241. BEGIN
  242. EXECUTE IMMEDIATE '
  243. CREATE INDEX IF NOT EXISTS document_chunk_collection_name_idx
  244. ON document_chunk (collection_name)
  245. ';
  246. EXCEPTION
  247. WHEN OTHERS THEN
  248. IF SQLCODE != -955 THEN
  249. RAISE;
  250. END IF;
  251. END;
  252. """
  253. )
  254. log.info("Creating VECTOR INDEX document_chunk_vector_ivf_idx")
  255. cursor.execute(
  256. """
  257. BEGIN
  258. EXECUTE IMMEDIATE '
  259. CREATE VECTOR INDEX IF NOT EXISTS document_chunk_vector_ivf_idx
  260. ON document_chunk(vector)
  261. ORGANIZATION NEIGHBOR PARTITIONS
  262. DISTANCE COSINE
  263. WITH TARGET ACCURACY 95
  264. PARAMETERS (TYPE IVF, NEIGHBOR PARTITIONS 100)
  265. ';
  266. EXCEPTION
  267. WHEN OTHERS THEN
  268. IF SQLCODE != -955 THEN
  269. RAISE;
  270. END IF;
  271. END;
  272. """
  273. )
  274. connection.commit()
  275. log.info("Database initialization completed successfully.")
  276. except Exception as e:
  277. connection.rollback()
  278. log.exception(f"Error during database initialization: {e}")
  279. raise
  280. def check_vector_length(self) -> None:
  281. """
  282. Check vector length compatibility (placeholder).
  283. This method would check if the configured vector length matches the database schema.
  284. Currently implemented as a placeholder.
  285. """
  286. pass
  287. def _vector_to_blob(self, vector: List[float]) -> bytes:
  288. """
  289. Convert a vector to Oracle BLOB format.
  290. Args:
  291. vector (List[float]): The vector to convert
  292. Returns:
  293. bytes: The vector in Oracle BLOB format
  294. """
  295. return array.array("f", vector)
  296. def adjust_vector_length(self, vector: List[float]) -> List[float]:
  297. """
  298. Adjust vector to the expected length if needed.
  299. Args:
  300. vector (List[float]): The vector to adjust
  301. Returns:
  302. List[float]: The adjusted vector
  303. """
  304. return vector
  305. def _decimal_handler(self, obj):
  306. """
  307. Handle Decimal objects for JSON serialization.
  308. Args:
  309. obj: Object to serialize
  310. Returns:
  311. float: Converted decimal value
  312. Raises:
  313. TypeError: If object is not JSON serializable
  314. """
  315. if isinstance(obj, Decimal):
  316. return float(obj)
  317. raise TypeError(f"{obj} is not JSON serializable")
  318. def _metadata_to_json(self, metadata: Dict) -> str:
  319. """
  320. Convert metadata dictionary to JSON string.
  321. Args:
  322. metadata (Dict): Metadata dictionary
  323. Returns:
  324. str: JSON representation of metadata
  325. """
  326. return json.dumps(metadata, default=self._decimal_handler) if metadata else "{}"
  327. def _json_to_metadata(self, json_str: str) -> Dict:
  328. """
  329. Convert JSON string to metadata dictionary.
  330. Args:
  331. json_str (str): JSON string
  332. Returns:
  333. Dict: Metadata dictionary
  334. """
  335. return json.loads(json_str) if json_str else {}
  336. def insert(self, collection_name: str, items: List[VectorItem]) -> None:
  337. """
  338. Insert vector items into the database.
  339. Args:
  340. collection_name (str): Name of the collection
  341. items (List[VectorItem]): List of vector items to insert
  342. Raises:
  343. Exception: If insertion fails
  344. Example:
  345. >>> client = Oracle23aiClient()
  346. >>> items = [
  347. ... {"id": "1", "text": "Sample text", "vector": [0.1, 0.2, ...], "metadata": {"source": "doc1"}},
  348. ... {"id": "2", "text": "Another text", "vector": [0.3, 0.4, ...], "metadata": {"source": "doc2"}}
  349. ... ]
  350. >>> client.insert("my_collection", items)
  351. """
  352. log.info(f"Inserting {len(items)} items into collection '{collection_name}'.")
  353. with self.get_connection() as connection:
  354. try:
  355. with connection.cursor() as cursor:
  356. for item in items:
  357. vector_blob = self._vector_to_blob(item["vector"])
  358. metadata_json = self._metadata_to_json(item["metadata"])
  359. cursor.execute(
  360. """
  361. INSERT INTO document_chunk
  362. (id, collection_name, text, vmetadata, vector)
  363. VALUES (:id, :collection_name, :text, :metadata, :vector)
  364. """,
  365. {
  366. "id": item["id"],
  367. "collection_name": collection_name,
  368. "text": item["text"],
  369. "metadata": metadata_json,
  370. "vector": vector_blob,
  371. },
  372. )
  373. connection.commit()
  374. log.info(
  375. f"Successfully inserted {len(items)} items into collection '{collection_name}'."
  376. )
  377. except Exception as e:
  378. connection.rollback()
  379. log.exception(f"Error during insert: {e}")
  380. raise
  381. def upsert(self, collection_name: str, items: List[VectorItem]) -> None:
  382. """
  383. Update or insert vector items into the database.
  384. If an item with the same ID exists, it will be updated;
  385. otherwise, it will be inserted.
  386. Args:
  387. collection_name (str): Name of the collection
  388. items (List[VectorItem]): List of vector items to upsert
  389. Raises:
  390. Exception: If upsert operation fails
  391. Example:
  392. >>> client = Oracle23aiClient()
  393. >>> items = [
  394. ... {"id": "1", "text": "Updated text", "vector": [0.1, 0.2, ...], "metadata": {"source": "doc1"}},
  395. ... {"id": "3", "text": "New item", "vector": [0.5, 0.6, ...], "metadata": {"source": "doc3"}}
  396. ... ]
  397. >>> client.upsert("my_collection", items)
  398. """
  399. log.info(f"Upserting {len(items)} items into collection '{collection_name}'.")
  400. with self.get_connection() as connection:
  401. try:
  402. with connection.cursor() as cursor:
  403. for item in items:
  404. vector_blob = self._vector_to_blob(item["vector"])
  405. metadata_json = self._metadata_to_json(item["metadata"])
  406. cursor.execute(
  407. """
  408. MERGE INTO document_chunk d
  409. USING (SELECT :merge_id as id FROM dual) s
  410. ON (d.id = s.id)
  411. WHEN MATCHED THEN
  412. UPDATE SET
  413. collection_name = :upd_collection_name,
  414. text = :upd_text,
  415. vmetadata = :upd_metadata,
  416. vector = :upd_vector
  417. WHEN NOT MATCHED THEN
  418. INSERT (id, collection_name, text, vmetadata, vector)
  419. VALUES (:ins_id, :ins_collection_name, :ins_text, :ins_metadata, :ins_vector)
  420. """,
  421. {
  422. "merge_id": item["id"],
  423. "upd_collection_name": collection_name,
  424. "upd_text": item["text"],
  425. "upd_metadata": metadata_json,
  426. "upd_vector": vector_blob,
  427. "ins_id": item["id"],
  428. "ins_collection_name": collection_name,
  429. "ins_text": item["text"],
  430. "ins_metadata": metadata_json,
  431. "ins_vector": vector_blob,
  432. },
  433. )
  434. connection.commit()
  435. log.info(
  436. f"Successfully upserted {len(items)} items into collection '{collection_name}'."
  437. )
  438. except Exception as e:
  439. connection.rollback()
  440. log.exception(f"Error during upsert: {e}")
  441. raise
  442. def search(
  443. self, collection_name: str, vectors: List[List[Union[float, int]]], limit: int
  444. ) -> Optional[SearchResult]:
  445. """
  446. Search for similar vectors in the database.
  447. Performs vector similarity search using cosine distance.
  448. Args:
  449. collection_name (str): Name of the collection to search
  450. vectors (List[List[Union[float, int]]]): Query vectors to find similar items for
  451. limit (int): Maximum number of results to return per query
  452. Returns:
  453. Optional[SearchResult]: Search results containing ids, distances, documents, and metadata
  454. Example:
  455. >>> client = Oracle23aiClient()
  456. >>> query_vector = [0.1, 0.2, 0.3, ...] # Must match VECTOR_LENGTH
  457. >>> results = client.search("my_collection", [query_vector], limit=5)
  458. >>> if results:
  459. ... log.info(f"Found {len(results.ids[0])} matches")
  460. ... for i, (id, dist) in enumerate(zip(results.ids[0], results.distances[0])):
  461. ... log.info(f"Match {i+1}: id={id}, distance={dist}")
  462. """
  463. log.info(
  464. f"Searching items from collection '{collection_name}' with limit {limit}."
  465. )
  466. try:
  467. if not vectors:
  468. log.warning("No vectors provided for search.")
  469. return None
  470. num_queries = len(vectors)
  471. ids = [[] for _ in range(num_queries)]
  472. distances = [[] for _ in range(num_queries)]
  473. documents = [[] for _ in range(num_queries)]
  474. metadatas = [[] for _ in range(num_queries)]
  475. with self.get_connection() as connection:
  476. with connection.cursor() as cursor:
  477. for qid, vector in enumerate(vectors):
  478. vector_blob = self._vector_to_blob(vector)
  479. cursor.execute(
  480. """
  481. SELECT dc.id, dc.text,
  482. JSON_SERIALIZE(dc.vmetadata RETURNING VARCHAR2(4096)) as vmetadata,
  483. VECTOR_DISTANCE(dc.vector, :query_vector, COSINE) as distance
  484. FROM document_chunk dc
  485. WHERE dc.collection_name = :collection_name
  486. ORDER BY VECTOR_DISTANCE(dc.vector, :query_vector, COSINE)
  487. FETCH APPROX FIRST :limit ROWS ONLY
  488. """,
  489. {
  490. "query_vector": vector_blob,
  491. "collection_name": collection_name,
  492. "limit": limit,
  493. },
  494. )
  495. results = cursor.fetchall()
  496. for row in results:
  497. ids[qid].append(row[0])
  498. documents[qid].append(
  499. row[1].read()
  500. if isinstance(row[1], oracledb.LOB)
  501. else str(row[1])
  502. )
  503. # 🔧 FIXED: Parse JSON metadata properly
  504. metadata_str = (
  505. row[2].read()
  506. if isinstance(row[2], oracledb.LOB)
  507. else row[2]
  508. )
  509. metadatas[qid].append(self._json_to_metadata(metadata_str))
  510. distances[qid].append(float(row[3]))
  511. log.info(
  512. f"Search completed. Found {sum(len(ids[i]) for i in range(num_queries))} total results."
  513. )
  514. return SearchResult(
  515. ids=ids, distances=distances, documents=documents, metadatas=metadatas
  516. )
  517. except Exception as e:
  518. log.exception(f"Error during search: {e}")
  519. return None
  520. def query(
  521. self, collection_name: str, filter: Dict, limit: Optional[int] = None
  522. ) -> Optional[GetResult]:
  523. """
  524. Query items based on metadata filters.
  525. Retrieves items that match specified metadata criteria.
  526. Args:
  527. collection_name (str): Name of the collection to query
  528. filter (Dict[str, Any]): Metadata filters to apply
  529. limit (Optional[int]): Maximum number of results to return
  530. Returns:
  531. Optional[GetResult]: Query results containing ids, documents, and metadata
  532. Example:
  533. >>> client = Oracle23aiClient()
  534. >>> filter = {"source": "doc1", "category": "finance"}
  535. >>> results = client.query("my_collection", filter, limit=20)
  536. >>> if results:
  537. ... print(f"Found {len(results.ids[0])} matching documents")
  538. """
  539. log.info(f"Querying items from collection '{collection_name}' with filters.")
  540. try:
  541. limit = limit or 100
  542. query = """
  543. SELECT id, text, JSON_SERIALIZE(vmetadata RETURNING VARCHAR2(4096)) as vmetadata
  544. FROM document_chunk
  545. WHERE collection_name = :collection_name
  546. """
  547. params = {"collection_name": collection_name}
  548. for i, (key, value) in enumerate(filter.items()):
  549. param_name = f"value_{i}"
  550. query += f" AND JSON_VALUE(vmetadata, '$.{key}' RETURNING VARCHAR2(4096)) = :{param_name}"
  551. params[param_name] = str(value)
  552. query += " FETCH FIRST :limit ROWS ONLY"
  553. params["limit"] = limit
  554. with self.get_connection() as connection:
  555. with connection.cursor() as cursor:
  556. cursor.execute(query, params)
  557. results = cursor.fetchall()
  558. if not results:
  559. log.info("No results found for query.")
  560. return None
  561. ids = [[row[0] for row in results]]
  562. documents = [
  563. [
  564. row[1].read() if isinstance(row[1], oracledb.LOB) else str(row[1])
  565. for row in results
  566. ]
  567. ]
  568. # 🔧 FIXED: Parse JSON metadata properly
  569. metadatas = [
  570. [
  571. self._json_to_metadata(
  572. row[2].read() if isinstance(row[2], oracledb.LOB) else row[2]
  573. )
  574. for row in results
  575. ]
  576. ]
  577. log.info(f"Query completed. Found {len(results)} results.")
  578. return GetResult(ids=ids, documents=documents, metadatas=metadatas)
  579. except Exception as e:
  580. log.exception(f"Error during query: {e}")
  581. return None
  582. def get(self, collection_name: str) -> Optional[GetResult]:
  583. """
  584. Get all items in a collection.
  585. Retrieves items from a specified collection up to the limit.
  586. Args:
  587. collection_name (str): Name of the collection to retrieve
  588. limit (Optional[int]): Maximum number of items to retrieve
  589. Returns:
  590. Optional[GetResult]: Result containing ids, documents, and metadata
  591. Example:
  592. >>> client = Oracle23aiClient()
  593. >>> results = client.get("my_collection", limit=50)
  594. >>> if results:
  595. ... print(f"Retrieved {len(results.ids[0])} documents from collection")
  596. """
  597. log.info(
  598. f"Getting items from collection '{collection_name}' with limit {limit}."
  599. )
  600. try:
  601. limit = limit or 1000
  602. with self.get_connection() as connection:
  603. with connection.cursor() as cursor:
  604. cursor.execute(
  605. """
  606. SELECT /*+ MONITOR */ id, text, JSON_SERIALIZE(vmetadata RETURNING VARCHAR2(4096)) as vmetadata
  607. FROM document_chunk
  608. WHERE collection_name = :collection_name
  609. FETCH FIRST :limit ROWS ONLY
  610. """,
  611. {"collection_name": collection_name, "limit": limit},
  612. )
  613. results = cursor.fetchall()
  614. if not results:
  615. log.info("No results found.")
  616. return None
  617. ids = [[row[0] for row in results]]
  618. documents = [
  619. [
  620. row[1].read() if isinstance(row[1], oracledb.LOB) else str(row[1])
  621. for row in results
  622. ]
  623. ]
  624. # 🔧 FIXED: Parse JSON metadata properly
  625. metadatas = [
  626. [
  627. self._json_to_metadata(
  628. row[2].read() if isinstance(row[2], oracledb.LOB) else row[2]
  629. )
  630. for row in results
  631. ]
  632. ]
  633. return GetResult(ids=ids, documents=documents, metadatas=metadatas)
  634. except Exception as e:
  635. log.exception(f"Error during get: {e}")
  636. return None
  637. def delete(
  638. self,
  639. collection_name: str,
  640. ids: Optional[List[str]] = None,
  641. filter: Optional[Dict[str, Any]] = None,
  642. ) -> None:
  643. """
  644. Delete items from the database.
  645. Deletes items from a collection based on IDs or metadata filters.
  646. Args:
  647. collection_name (str): Name of the collection to delete from
  648. ids (Optional[List[str]]): Specific item IDs to delete
  649. filter (Optional[Dict[str, Any]]): Metadata filters for deletion
  650. Raises:
  651. Exception: If deletion fails
  652. Example:
  653. >>> client = Oracle23aiClient()
  654. >>> # Delete specific items by ID
  655. >>> client.delete("my_collection", ids=["1", "3", "5"])
  656. >>> # Or delete by metadata filter
  657. >>> client.delete("my_collection", filter={"source": "deprecated_source"})
  658. """
  659. log.info(f"Deleting items from collection '{collection_name}'.")
  660. try:
  661. query = (
  662. "DELETE FROM document_chunk WHERE collection_name = :collection_name"
  663. )
  664. params = {"collection_name": collection_name}
  665. if ids:
  666. # 🔧 FIXED: Use proper parameterized query to prevent SQL injection
  667. placeholders = ",".join([f":id_{i}" for i in range(len(ids))])
  668. query += f" AND id IN ({placeholders})"
  669. for i, id_val in enumerate(ids):
  670. params[f"id_{i}"] = id_val
  671. if filter:
  672. for i, (key, value) in enumerate(filter.items()):
  673. param_name = f"value_{i}"
  674. query += f" AND JSON_VALUE(vmetadata, '$.{key}' RETURNING VARCHAR2(4096)) = :{param_name}"
  675. params[param_name] = str(value)
  676. with self.get_connection() as connection:
  677. with connection.cursor() as cursor:
  678. cursor.execute(query, params)
  679. deleted = cursor.rowcount
  680. connection.commit()
  681. log.info(f"Deleted {deleted} items from collection '{collection_name}'.")
  682. except Exception as e:
  683. log.exception(f"Error during delete: {e}")
  684. raise
  685. def reset(self) -> None:
  686. """
  687. Reset the database by deleting all items.
  688. Deletes all items from the document_chunk table.
  689. Raises:
  690. Exception: If reset fails
  691. Example:
  692. >>> client = Oracle23aiClient()
  693. >>> client.reset() # Warning: Removes all data!
  694. """
  695. log.info("Resetting database - deleting all items.")
  696. try:
  697. with self.get_connection() as connection:
  698. with connection.cursor() as cursor:
  699. cursor.execute("DELETE FROM document_chunk")
  700. deleted = cursor.rowcount
  701. connection.commit()
  702. log.info(
  703. f"Reset complete. Deleted {deleted} items from 'document_chunk' table."
  704. )
  705. except Exception as e:
  706. log.exception(f"Error during reset: {e}")
  707. raise
  708. def close(self) -> None:
  709. """
  710. Close the database connection pool.
  711. Properly closes the connection pool and releases all resources.
  712. Example:
  713. >>> client = Oracle23aiClient()
  714. >>> # After finishing all operations
  715. >>> client.close()
  716. """
  717. try:
  718. if hasattr(self, "pool") and self.pool:
  719. self.pool.close()
  720. log.info("Oracle Vector Search connection pool closed.")
  721. except Exception as e:
  722. log.exception(f"Error closing connection pool: {e}")
  723. def has_collection(self, collection_name: str) -> bool:
  724. """
  725. Check if a collection exists.
  726. Args:
  727. collection_name (str): Name of the collection to check
  728. Returns:
  729. bool: True if the collection exists, False otherwise
  730. Example:
  731. >>> client = Oracle23aiClient()
  732. >>> if client.has_collection("my_collection"):
  733. ... print("Collection exists!")
  734. ... else:
  735. ... print("Collection does not exist.")
  736. """
  737. try:
  738. with self.get_connection() as connection:
  739. with connection.cursor() as cursor:
  740. cursor.execute(
  741. """
  742. SELECT COUNT(*)
  743. FROM document_chunk
  744. WHERE collection_name = :collection_name
  745. FETCH FIRST 1 ROWS ONLY
  746. """,
  747. {"collection_name": collection_name},
  748. )
  749. count = cursor.fetchone()[0]
  750. return count > 0
  751. except Exception as e:
  752. log.exception(f"Error checking collection existence: {e}")
  753. return False
  754. def delete_collection(self, collection_name: str) -> None:
  755. """
  756. Delete an entire collection.
  757. Removes all items belonging to the specified collection.
  758. Args:
  759. collection_name (str): Name of the collection to delete
  760. Example:
  761. >>> client = Oracle23aiClient()
  762. >>> client.delete_collection("obsolete_collection")
  763. """
  764. log.info(f"Deleting collection '{collection_name}'.")
  765. try:
  766. with self.get_connection() as connection:
  767. with connection.cursor() as cursor:
  768. cursor.execute(
  769. """
  770. DELETE FROM document_chunk
  771. WHERE collection_name = :collection_name
  772. """,
  773. {"collection_name": collection_name},
  774. )
  775. deleted = cursor.rowcount
  776. connection.commit()
  777. log.info(
  778. f"Collection '{collection_name}' deleted. Removed {deleted} items."
  779. )
  780. except Exception as e:
  781. log.exception(f"Error deleting collection '{collection_name}': {e}")
  782. raise