utils.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690
  1. import asyncio
  2. import logging
  3. import socket
  4. import ssl
  5. import urllib.parse
  6. import urllib.request
  7. from datetime import datetime, time, timedelta
  8. from typing import (
  9. Any,
  10. AsyncIterator,
  11. Dict,
  12. Iterator,
  13. List,
  14. Optional,
  15. Sequence,
  16. Union,
  17. Literal,
  18. )
  19. from fastapi.concurrency import run_in_threadpool
  20. import aiohttp
  21. import certifi
  22. import validators
  23. from langchain_community.document_loaders import PlaywrightURLLoader, WebBaseLoader
  24. from langchain_community.document_loaders.base import BaseLoader
  25. from langchain_core.documents import Document
  26. from open_webui.retrieval.loaders.tavily import TavilyLoader
  27. from open_webui.retrieval.loaders.external_web import ExternalWebLoader
  28. from open_webui.constants import ERROR_MESSAGES
  29. from open_webui.config import (
  30. ENABLE_RAG_LOCAL_WEB_FETCH,
  31. PLAYWRIGHT_WS_URL,
  32. PLAYWRIGHT_TIMEOUT,
  33. WEB_LOADER_ENGINE,
  34. FIRECRAWL_API_BASE_URL,
  35. FIRECRAWL_API_KEY,
  36. TAVILY_API_KEY,
  37. TAVILY_EXTRACT_DEPTH,
  38. EXTERNAL_WEB_LOADER_URL,
  39. EXTERNAL_WEB_LOADER_API_KEY,
  40. )
  41. from open_webui.env import SRC_LOG_LEVELS
  42. log = logging.getLogger(__name__)
  43. log.setLevel(SRC_LOG_LEVELS["RAG"])
  44. def validate_url(url: Union[str, Sequence[str]]):
  45. if isinstance(url, str):
  46. if isinstance(validators.url(url), validators.ValidationError):
  47. raise ValueError(ERROR_MESSAGES.INVALID_URL)
  48. if not ENABLE_RAG_LOCAL_WEB_FETCH:
  49. # Local web fetch is disabled, filter out any URLs that resolve to private IP addresses
  50. parsed_url = urllib.parse.urlparse(url)
  51. # Get IPv4 and IPv6 addresses
  52. ipv4_addresses, ipv6_addresses = resolve_hostname(parsed_url.hostname)
  53. # Check if any of the resolved addresses are private
  54. # This is technically still vulnerable to DNS rebinding attacks, as we don't control WebBaseLoader
  55. for ip in ipv4_addresses:
  56. if validators.ipv4(ip, private=True):
  57. raise ValueError(ERROR_MESSAGES.INVALID_URL)
  58. for ip in ipv6_addresses:
  59. if validators.ipv6(ip, private=True):
  60. raise ValueError(ERROR_MESSAGES.INVALID_URL)
  61. return True
  62. elif isinstance(url, Sequence):
  63. return all(validate_url(u) for u in url)
  64. else:
  65. return False
  66. def safe_validate_urls(url: Sequence[str]) -> Sequence[str]:
  67. valid_urls = []
  68. for u in url:
  69. try:
  70. if validate_url(u):
  71. valid_urls.append(u)
  72. except Exception as e:
  73. log.debug(f"Invalid URL {u}: {str(e)}")
  74. continue
  75. return valid_urls
  76. def resolve_hostname(hostname):
  77. # Get address information
  78. addr_info = socket.getaddrinfo(hostname, None)
  79. # Extract IP addresses from address information
  80. ipv4_addresses = [info[4][0] for info in addr_info if info[0] == socket.AF_INET]
  81. ipv6_addresses = [info[4][0] for info in addr_info if info[0] == socket.AF_INET6]
  82. return ipv4_addresses, ipv6_addresses
  83. def extract_metadata(soup, url):
  84. metadata = {"source": url}
  85. if title := soup.find("title"):
  86. metadata["title"] = title.get_text()
  87. if description := soup.find("meta", attrs={"name": "description"}):
  88. metadata["description"] = description.get("content", "No description found.")
  89. if html := soup.find("html"):
  90. metadata["language"] = html.get("lang", "No language found.")
  91. return metadata
  92. def verify_ssl_cert(url: str) -> bool:
  93. """Verify SSL certificate for the given URL."""
  94. if not url.startswith("https://"):
  95. return True
  96. try:
  97. hostname = url.split("://")[-1].split("/")[0]
  98. context = ssl.create_default_context(cafile=certifi.where())
  99. with context.wrap_socket(ssl.socket(), server_hostname=hostname) as s:
  100. s.connect((hostname, 443))
  101. return True
  102. except ssl.SSLError:
  103. return False
  104. except Exception as e:
  105. log.warning(f"SSL verification failed for {url}: {str(e)}")
  106. return False
  107. class RateLimitMixin:
  108. async def _wait_for_rate_limit(self):
  109. """Wait to respect the rate limit if specified."""
  110. if self.requests_per_second and self.last_request_time:
  111. min_interval = timedelta(seconds=1.0 / self.requests_per_second)
  112. time_since_last = datetime.now() - self.last_request_time
  113. if time_since_last < min_interval:
  114. await asyncio.sleep((min_interval - time_since_last).total_seconds())
  115. self.last_request_time = datetime.now()
  116. def _sync_wait_for_rate_limit(self):
  117. """Synchronous version of rate limit wait."""
  118. if self.requests_per_second and self.last_request_time:
  119. min_interval = timedelta(seconds=1.0 / self.requests_per_second)
  120. time_since_last = datetime.now() - self.last_request_time
  121. if time_since_last < min_interval:
  122. time.sleep((min_interval - time_since_last).total_seconds())
  123. self.last_request_time = datetime.now()
  124. class URLProcessingMixin:
  125. async def _verify_ssl_cert(self, url: str) -> bool:
  126. """Verify SSL certificate for a URL."""
  127. return await run_in_threadpool(verify_ssl_cert, url)
  128. async def _safe_process_url(self, url: str) -> bool:
  129. """Perform safety checks before processing a URL."""
  130. if self.verify_ssl and not await self._verify_ssl_cert(url):
  131. raise ValueError(f"SSL certificate verification failed for {url}")
  132. await self._wait_for_rate_limit()
  133. return True
  134. def _safe_process_url_sync(self, url: str) -> bool:
  135. """Synchronous version of safety checks."""
  136. if self.verify_ssl and not self._verify_ssl_cert(url):
  137. raise ValueError(f"SSL certificate verification failed for {url}")
  138. self._sync_wait_for_rate_limit()
  139. return True
  140. class SafeFireCrawlLoader(BaseLoader, RateLimitMixin, URLProcessingMixin):
  141. def __init__(
  142. self,
  143. web_paths,
  144. verify_ssl: bool = True,
  145. trust_env: bool = False,
  146. requests_per_second: Optional[float] = None,
  147. continue_on_failure: bool = True,
  148. api_key: Optional[str] = None,
  149. api_url: Optional[str] = None,
  150. mode: Literal["crawl", "scrape", "map"] = "scrape",
  151. proxy: Optional[Dict[str, str]] = None,
  152. params: Optional[Dict] = None,
  153. ):
  154. """Concurrent document loader for FireCrawl operations.
  155. Executes multiple FireCrawlLoader instances concurrently using thread pooling
  156. to improve bulk processing efficiency.
  157. Args:
  158. web_paths: List of URLs/paths to process.
  159. verify_ssl: If True, verify SSL certificates.
  160. trust_env: If True, use proxy settings from environment variables.
  161. requests_per_second: Number of requests per second to limit to.
  162. continue_on_failure (bool): If True, continue loading other URLs on failure.
  163. api_key: API key for FireCrawl service. Defaults to None
  164. (uses FIRE_CRAWL_API_KEY environment variable if not provided).
  165. api_url: Base URL for FireCrawl API. Defaults to official API endpoint.
  166. mode: Operation mode selection:
  167. - 'crawl': Website crawling mode
  168. - 'scrape': Direct page scraping (default)
  169. - 'map': Site map generation
  170. proxy: Proxy override settings for the FireCrawl API.
  171. params: The parameters to pass to the Firecrawl API.
  172. For more details, visit: https://docs.firecrawl.dev/sdks/python#batch-scrape
  173. """
  174. proxy_server = proxy.get("server") if proxy else None
  175. if trust_env and not proxy_server:
  176. env_proxies = urllib.request.getproxies()
  177. env_proxy_server = env_proxies.get("https") or env_proxies.get("http")
  178. if env_proxy_server:
  179. if proxy:
  180. proxy["server"] = env_proxy_server
  181. else:
  182. proxy = {"server": env_proxy_server}
  183. self.web_paths = web_paths
  184. self.verify_ssl = verify_ssl
  185. self.requests_per_second = requests_per_second
  186. self.last_request_time = None
  187. self.trust_env = trust_env
  188. self.continue_on_failure = continue_on_failure
  189. self.api_key = api_key
  190. self.api_url = api_url
  191. self.mode = mode
  192. self.params = params or {}
  193. def lazy_load(self) -> Iterator[Document]:
  194. """Load documents using FireCrawl batch_scrape."""
  195. log.debug(
  196. "Starting FireCrawl batch scrape for %d URLs, mode: %s, params: %s",
  197. len(self.web_paths),
  198. self.mode,
  199. self.params,
  200. )
  201. try:
  202. from firecrawl import FirecrawlApp
  203. firecrawl = FirecrawlApp(api_key=self.api_key, api_url=self.api_url)
  204. result = firecrawl.batch_scrape(
  205. self.web_paths,
  206. formats=["markdown"],
  207. skip_tls_verification=not self.verify_ssl,
  208. ignore_invalid_urls=True,
  209. remove_base64_images=True,
  210. max_age=300000, # 5 minutes https://docs.firecrawl.dev/features/fast-scraping#common-maxage-values
  211. wait_timeout=len(self.web_paths) * 3,
  212. **self.params,
  213. )
  214. if result.status != "completed":
  215. raise RuntimeError(
  216. f"FireCrawl batch scrape did not complete successfully. result: {result}"
  217. )
  218. for data in result.data:
  219. metadata = data.metadata or {}
  220. yield Document(
  221. page_content=data.markdown or "",
  222. metadata={"source": metadata.url or metadata.source_url or ""},
  223. )
  224. except Exception as e:
  225. if self.continue_on_failure:
  226. log.exception(f"Error extracting content from URLs: {e}")
  227. else:
  228. raise e
  229. async def alazy_load(self):
  230. """Async version of lazy_load."""
  231. log.debug(
  232. "Starting FireCrawl batch scrape for %d URLs, mode: %s, params: %s",
  233. len(self.web_paths),
  234. self.mode,
  235. self.params,
  236. )
  237. try:
  238. from firecrawl import FirecrawlApp
  239. firecrawl = FirecrawlApp(api_key=self.api_key, api_url=self.api_url)
  240. result = firecrawl.batch_scrape(
  241. self.web_paths,
  242. formats=["markdown"],
  243. skip_tls_verification=not self.verify_ssl,
  244. ignore_invalid_urls=True,
  245. remove_base64_images=True,
  246. max_age=300000, # 5 minutes https://docs.firecrawl.dev/features/fast-scraping#common-maxage-values
  247. wait_timeout=len(self.web_paths) * 3,
  248. **self.params,
  249. )
  250. if result.status != "completed":
  251. raise RuntimeError(
  252. f"FireCrawl batch scrape did not complete successfully. result: {result}"
  253. )
  254. for data in result.data:
  255. metadata = data.metadata or {}
  256. yield Document(
  257. page_content=data.markdown or "",
  258. metadata={"source": metadata.url or metadata.source_url or ""},
  259. )
  260. except Exception as e:
  261. if self.continue_on_failure:
  262. log.exception(f"Error extracting content from URLs: {e}")
  263. else:
  264. raise e
  265. class SafeTavilyLoader(BaseLoader, RateLimitMixin, URLProcessingMixin):
  266. def __init__(
  267. self,
  268. web_paths: Union[str, List[str]],
  269. api_key: str,
  270. extract_depth: Literal["basic", "advanced"] = "basic",
  271. continue_on_failure: bool = True,
  272. requests_per_second: Optional[float] = None,
  273. verify_ssl: bool = True,
  274. trust_env: bool = False,
  275. proxy: Optional[Dict[str, str]] = None,
  276. ):
  277. """Initialize SafeTavilyLoader with rate limiting and SSL verification support.
  278. Args:
  279. web_paths: List of URLs/paths to process.
  280. api_key: The Tavily API key.
  281. extract_depth: Depth of extraction ("basic" or "advanced").
  282. continue_on_failure: Whether to continue if extraction of a URL fails.
  283. requests_per_second: Number of requests per second to limit to.
  284. verify_ssl: If True, verify SSL certificates.
  285. trust_env: If True, use proxy settings from environment variables.
  286. proxy: Optional proxy configuration.
  287. """
  288. # Initialize proxy configuration if using environment variables
  289. proxy_server = proxy.get("server") if proxy else None
  290. if trust_env and not proxy_server:
  291. env_proxies = urllib.request.getproxies()
  292. env_proxy_server = env_proxies.get("https") or env_proxies.get("http")
  293. if env_proxy_server:
  294. if proxy:
  295. proxy["server"] = env_proxy_server
  296. else:
  297. proxy = {"server": env_proxy_server}
  298. # Store parameters for creating TavilyLoader instances
  299. self.web_paths = web_paths if isinstance(web_paths, list) else [web_paths]
  300. self.api_key = api_key
  301. self.extract_depth = extract_depth
  302. self.continue_on_failure = continue_on_failure
  303. self.verify_ssl = verify_ssl
  304. self.trust_env = trust_env
  305. self.proxy = proxy
  306. # Add rate limiting
  307. self.requests_per_second = requests_per_second
  308. self.last_request_time = None
  309. def lazy_load(self) -> Iterator[Document]:
  310. """Load documents with rate limiting support, delegating to TavilyLoader."""
  311. valid_urls = []
  312. for url in self.web_paths:
  313. try:
  314. self._safe_process_url_sync(url)
  315. valid_urls.append(url)
  316. except Exception as e:
  317. log.warning(f"SSL verification failed for {url}: {str(e)}")
  318. if not self.continue_on_failure:
  319. raise e
  320. if not valid_urls:
  321. if self.continue_on_failure:
  322. log.warning("No valid URLs to process after SSL verification")
  323. return
  324. raise ValueError("No valid URLs to process after SSL verification")
  325. try:
  326. loader = TavilyLoader(
  327. urls=valid_urls,
  328. api_key=self.api_key,
  329. extract_depth=self.extract_depth,
  330. continue_on_failure=self.continue_on_failure,
  331. )
  332. yield from loader.lazy_load()
  333. except Exception as e:
  334. if self.continue_on_failure:
  335. log.exception(f"Error extracting content from URLs: {e}")
  336. else:
  337. raise e
  338. async def alazy_load(self) -> AsyncIterator[Document]:
  339. """Async version with rate limiting and SSL verification."""
  340. valid_urls = []
  341. for url in self.web_paths:
  342. try:
  343. await self._safe_process_url(url)
  344. valid_urls.append(url)
  345. except Exception as e:
  346. log.warning(f"SSL verification failed for {url}: {str(e)}")
  347. if not self.continue_on_failure:
  348. raise e
  349. if not valid_urls:
  350. if self.continue_on_failure:
  351. log.warning("No valid URLs to process after SSL verification")
  352. return
  353. raise ValueError("No valid URLs to process after SSL verification")
  354. try:
  355. loader = TavilyLoader(
  356. urls=valid_urls,
  357. api_key=self.api_key,
  358. extract_depth=self.extract_depth,
  359. continue_on_failure=self.continue_on_failure,
  360. )
  361. async for document in loader.alazy_load():
  362. yield document
  363. except Exception as e:
  364. if self.continue_on_failure:
  365. log.exception(f"Error loading URLs: {e}")
  366. else:
  367. raise e
  368. class SafePlaywrightURLLoader(PlaywrightURLLoader, RateLimitMixin, URLProcessingMixin):
  369. """Load HTML pages safely with Playwright, supporting SSL verification, rate limiting, and remote browser connection.
  370. Attributes:
  371. web_paths (List[str]): List of URLs to load.
  372. verify_ssl (bool): If True, verify SSL certificates.
  373. trust_env (bool): If True, use proxy settings from environment variables.
  374. requests_per_second (Optional[float]): Number of requests per second to limit to.
  375. continue_on_failure (bool): If True, continue loading other URLs on failure.
  376. headless (bool): If True, the browser will run in headless mode.
  377. proxy (dict): Proxy override settings for the Playwright session.
  378. playwright_ws_url (Optional[str]): WebSocket endpoint URI for remote browser connection.
  379. playwright_timeout (Optional[int]): Maximum operation time in milliseconds.
  380. """
  381. def __init__(
  382. self,
  383. web_paths: List[str],
  384. verify_ssl: bool = True,
  385. trust_env: bool = False,
  386. requests_per_second: Optional[float] = None,
  387. continue_on_failure: bool = True,
  388. headless: bool = True,
  389. remove_selectors: Optional[List[str]] = None,
  390. proxy: Optional[Dict[str, str]] = None,
  391. playwright_ws_url: Optional[str] = None,
  392. playwright_timeout: Optional[int] = 10000,
  393. ):
  394. """Initialize with additional safety parameters and remote browser support."""
  395. proxy_server = proxy.get("server") if proxy else None
  396. if trust_env and not proxy_server:
  397. env_proxies = urllib.request.getproxies()
  398. env_proxy_server = env_proxies.get("https") or env_proxies.get("http")
  399. if env_proxy_server:
  400. if proxy:
  401. proxy["server"] = env_proxy_server
  402. else:
  403. proxy = {"server": env_proxy_server}
  404. # We'll set headless to False if using playwright_ws_url since it's handled by the remote browser
  405. super().__init__(
  406. urls=web_paths,
  407. continue_on_failure=continue_on_failure,
  408. headless=headless if playwright_ws_url is None else False,
  409. remove_selectors=remove_selectors,
  410. proxy=proxy,
  411. )
  412. self.verify_ssl = verify_ssl
  413. self.requests_per_second = requests_per_second
  414. self.last_request_time = None
  415. self.playwright_ws_url = playwright_ws_url
  416. self.trust_env = trust_env
  417. self.playwright_timeout = playwright_timeout
  418. def lazy_load(self) -> Iterator[Document]:
  419. """Safely load URLs synchronously with support for remote browser."""
  420. from playwright.sync_api import sync_playwright
  421. with sync_playwright() as p:
  422. # Use remote browser if ws_endpoint is provided, otherwise use local browser
  423. if self.playwright_ws_url:
  424. browser = p.chromium.connect(self.playwright_ws_url)
  425. else:
  426. browser = p.chromium.launch(headless=self.headless, proxy=self.proxy)
  427. for url in self.urls:
  428. try:
  429. self._safe_process_url_sync(url)
  430. page = browser.new_page()
  431. response = page.goto(url, timeout=self.playwright_timeout)
  432. if response is None:
  433. raise ValueError(f"page.goto() returned None for url {url}")
  434. text = self.evaluator.evaluate(page, browser, response)
  435. metadata = {"source": url}
  436. yield Document(page_content=text, metadata=metadata)
  437. except Exception as e:
  438. if self.continue_on_failure:
  439. log.exception(f"Error loading {url}: {e}")
  440. continue
  441. raise e
  442. browser.close()
  443. async def alazy_load(self) -> AsyncIterator[Document]:
  444. """Safely load URLs asynchronously with support for remote browser."""
  445. from playwright.async_api import async_playwright
  446. async with async_playwright() as p:
  447. # Use remote browser if ws_endpoint is provided, otherwise use local browser
  448. if self.playwright_ws_url:
  449. browser = await p.chromium.connect(self.playwright_ws_url)
  450. else:
  451. browser = await p.chromium.launch(
  452. headless=self.headless, proxy=self.proxy
  453. )
  454. for url in self.urls:
  455. try:
  456. await self._safe_process_url(url)
  457. page = await browser.new_page()
  458. response = await page.goto(url, timeout=self.playwright_timeout)
  459. if response is None:
  460. raise ValueError(f"page.goto() returned None for url {url}")
  461. text = await self.evaluator.evaluate_async(page, browser, response)
  462. metadata = {"source": url}
  463. yield Document(page_content=text, metadata=metadata)
  464. except Exception as e:
  465. if self.continue_on_failure:
  466. log.exception(f"Error loading {url}: {e}")
  467. continue
  468. raise e
  469. await browser.close()
  470. class SafeWebBaseLoader(WebBaseLoader):
  471. """WebBaseLoader with enhanced error handling for URLs."""
  472. def __init__(self, trust_env: bool = False, *args, **kwargs):
  473. """Initialize SafeWebBaseLoader
  474. Args:
  475. trust_env (bool, optional): set to True if using proxy to make web requests, for example
  476. using http(s)_proxy environment variables. Defaults to False.
  477. """
  478. super().__init__(*args, **kwargs)
  479. self.trust_env = trust_env
  480. async def _fetch(
  481. self, url: str, retries: int = 3, cooldown: int = 2, backoff: float = 1.5
  482. ) -> str:
  483. async with aiohttp.ClientSession(trust_env=self.trust_env) as session:
  484. for i in range(retries):
  485. try:
  486. kwargs: Dict = dict(
  487. headers=self.session.headers,
  488. cookies=self.session.cookies.get_dict(),
  489. )
  490. if not self.session.verify:
  491. kwargs["ssl"] = False
  492. async with session.get(
  493. url,
  494. **(self.requests_kwargs | kwargs),
  495. allow_redirects=False,
  496. ) as response:
  497. if self.raise_for_status:
  498. response.raise_for_status()
  499. return await response.text()
  500. except aiohttp.ClientConnectionError as e:
  501. if i == retries - 1:
  502. raise
  503. else:
  504. log.warning(
  505. f"Error fetching {url} with attempt "
  506. f"{i + 1}/{retries}: {e}. Retrying..."
  507. )
  508. await asyncio.sleep(cooldown * backoff**i)
  509. raise ValueError("retry count exceeded")
  510. def _unpack_fetch_results(
  511. self, results: Any, urls: List[str], parser: Union[str, None] = None
  512. ) -> List[Any]:
  513. """Unpack fetch results into BeautifulSoup objects."""
  514. from bs4 import BeautifulSoup
  515. final_results = []
  516. for i, result in enumerate(results):
  517. url = urls[i]
  518. if parser is None:
  519. if url.endswith(".xml"):
  520. parser = "xml"
  521. else:
  522. parser = self.default_parser
  523. self._check_parser(parser)
  524. final_results.append(BeautifulSoup(result, parser, **self.bs_kwargs))
  525. return final_results
  526. async def ascrape_all(
  527. self, urls: List[str], parser: Union[str, None] = None
  528. ) -> List[Any]:
  529. """Async fetch all urls, then return soups for all results."""
  530. results = await self.fetch_all(urls)
  531. return self._unpack_fetch_results(results, urls, parser=parser)
  532. def lazy_load(self) -> Iterator[Document]:
  533. """Lazy load text from the url(s) in web_path with error handling."""
  534. for path in self.web_paths:
  535. try:
  536. soup = self._scrape(path, bs_kwargs=self.bs_kwargs)
  537. text = soup.get_text(**self.bs_get_text_kwargs)
  538. # Build metadata
  539. metadata = extract_metadata(soup, path)
  540. yield Document(page_content=text, metadata=metadata)
  541. except Exception as e:
  542. # Log the error and continue with the next URL
  543. log.exception(f"Error loading {path}: {e}")
  544. async def alazy_load(self) -> AsyncIterator[Document]:
  545. """Async lazy load text from the url(s) in web_path."""
  546. results = await self.ascrape_all(self.web_paths)
  547. for path, soup in zip(self.web_paths, results):
  548. text = soup.get_text(**self.bs_get_text_kwargs)
  549. metadata = {"source": path}
  550. if title := soup.find("title"):
  551. metadata["title"] = title.get_text()
  552. if description := soup.find("meta", attrs={"name": "description"}):
  553. metadata["description"] = description.get(
  554. "content", "No description found."
  555. )
  556. if html := soup.find("html"):
  557. metadata["language"] = html.get("lang", "No language found.")
  558. yield Document(page_content=text, metadata=metadata)
  559. async def aload(self) -> list[Document]:
  560. """Load data into Document objects."""
  561. return [document async for document in self.alazy_load()]
  562. def get_web_loader(
  563. urls: Union[str, Sequence[str]],
  564. verify_ssl: bool = True,
  565. requests_per_second: int = 2,
  566. trust_env: bool = False,
  567. ):
  568. # Check if the URLs are valid
  569. safe_urls = safe_validate_urls([urls] if isinstance(urls, str) else urls)
  570. web_loader_args = {
  571. "web_paths": safe_urls,
  572. "verify_ssl": verify_ssl,
  573. "requests_per_second": requests_per_second,
  574. "continue_on_failure": True,
  575. "trust_env": trust_env,
  576. }
  577. if WEB_LOADER_ENGINE.value == "" or WEB_LOADER_ENGINE.value == "safe_web":
  578. WebLoaderClass = SafeWebBaseLoader
  579. if WEB_LOADER_ENGINE.value == "playwright":
  580. WebLoaderClass = SafePlaywrightURLLoader
  581. web_loader_args["playwright_timeout"] = PLAYWRIGHT_TIMEOUT.value
  582. if PLAYWRIGHT_WS_URL.value:
  583. web_loader_args["playwright_ws_url"] = PLAYWRIGHT_WS_URL.value
  584. if WEB_LOADER_ENGINE.value == "firecrawl":
  585. WebLoaderClass = SafeFireCrawlLoader
  586. web_loader_args["api_key"] = FIRECRAWL_API_KEY.value
  587. web_loader_args["api_url"] = FIRECRAWL_API_BASE_URL.value
  588. if WEB_LOADER_ENGINE.value == "tavily":
  589. WebLoaderClass = SafeTavilyLoader
  590. web_loader_args["api_key"] = TAVILY_API_KEY.value
  591. web_loader_args["extract_depth"] = TAVILY_EXTRACT_DEPTH.value
  592. if WEB_LOADER_ENGINE.value == "external":
  593. WebLoaderClass = ExternalWebLoader
  594. web_loader_args["external_url"] = EXTERNAL_WEB_LOADER_URL.value
  595. web_loader_args["external_api_key"] = EXTERNAL_WEB_LOADER_API_KEY.value
  596. if WebLoaderClass:
  597. web_loader = WebLoaderClass(**web_loader_args)
  598. log.debug(
  599. "Using WEB_LOADER_ENGINE %s for %s URLs",
  600. web_loader.__class__.__name__,
  601. len(safe_urls),
  602. )
  603. return web_loader
  604. else:
  605. raise ValueError(
  606. f"Invalid WEB_LOADER_ENGINE: {WEB_LOADER_ENGINE.value}. "
  607. "Please set it to 'safe_web', 'playwright', 'firecrawl', or 'tavily'."
  608. )