utils.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685
  1. import asyncio
  2. import logging
  3. import socket
  4. import ssl
  5. import urllib.parse
  6. import urllib.request
  7. from datetime import datetime, time, timedelta
  8. from typing import (
  9. Any,
  10. AsyncIterator,
  11. Dict,
  12. Iterator,
  13. List,
  14. Optional,
  15. Sequence,
  16. Union,
  17. Literal,
  18. )
  19. import aiohttp
  20. import certifi
  21. import validators
  22. from langchain_community.document_loaders import PlaywrightURLLoader, WebBaseLoader
  23. from langchain_community.document_loaders.base import BaseLoader
  24. from langchain_core.documents import Document
  25. from open_webui.retrieval.loaders.tavily import TavilyLoader
  26. from open_webui.retrieval.loaders.external_web import ExternalWebLoader
  27. from open_webui.constants import ERROR_MESSAGES
  28. from open_webui.config import (
  29. ENABLE_RAG_LOCAL_WEB_FETCH,
  30. PLAYWRIGHT_WS_URL,
  31. PLAYWRIGHT_TIMEOUT,
  32. WEB_LOADER_ENGINE,
  33. FIRECRAWL_API_BASE_URL,
  34. FIRECRAWL_API_KEY,
  35. TAVILY_API_KEY,
  36. TAVILY_EXTRACT_DEPTH,
  37. EXTERNAL_WEB_LOADER_URL,
  38. EXTERNAL_WEB_LOADER_API_KEY,
  39. )
  40. from open_webui.env import SRC_LOG_LEVELS
  41. from firecrawl import Firecrawl
  42. log = logging.getLogger(__name__)
  43. log.setLevel(SRC_LOG_LEVELS["RAG"])
  44. def validate_url(url: Union[str, Sequence[str]]):
  45. if isinstance(url, str):
  46. if isinstance(validators.url(url), validators.ValidationError):
  47. raise ValueError(ERROR_MESSAGES.INVALID_URL)
  48. if not ENABLE_RAG_LOCAL_WEB_FETCH:
  49. # Local web fetch is disabled, filter out any URLs that resolve to private IP addresses
  50. parsed_url = urllib.parse.urlparse(url)
  51. # Get IPv4 and IPv6 addresses
  52. ipv4_addresses, ipv6_addresses = resolve_hostname(parsed_url.hostname)
  53. # Check if any of the resolved addresses are private
  54. # This is technically still vulnerable to DNS rebinding attacks, as we don't control WebBaseLoader
  55. for ip in ipv4_addresses:
  56. if validators.ipv4(ip, private=True):
  57. raise ValueError(ERROR_MESSAGES.INVALID_URL)
  58. for ip in ipv6_addresses:
  59. if validators.ipv6(ip, private=True):
  60. raise ValueError(ERROR_MESSAGES.INVALID_URL)
  61. return True
  62. elif isinstance(url, Sequence):
  63. return all(validate_url(u) for u in url)
  64. else:
  65. return False
  66. def safe_validate_urls(url: Sequence[str]) -> Sequence[str]:
  67. valid_urls = []
  68. for u in url:
  69. try:
  70. if validate_url(u):
  71. valid_urls.append(u)
  72. except Exception as e:
  73. log.debug(f"Invalid URL {u}: {str(e)}")
  74. continue
  75. return valid_urls
  76. def resolve_hostname(hostname):
  77. # Get address information
  78. addr_info = socket.getaddrinfo(hostname, None)
  79. # Extract IP addresses from address information
  80. ipv4_addresses = [info[4][0] for info in addr_info if info[0] == socket.AF_INET]
  81. ipv6_addresses = [info[4][0] for info in addr_info if info[0] == socket.AF_INET6]
  82. return ipv4_addresses, ipv6_addresses
  83. def extract_metadata(soup, url):
  84. metadata = {"source": url}
  85. if title := soup.find("title"):
  86. metadata["title"] = title.get_text()
  87. if description := soup.find("meta", attrs={"name": "description"}):
  88. metadata["description"] = description.get("content", "No description found.")
  89. if html := soup.find("html"):
  90. metadata["language"] = html.get("lang", "No language found.")
  91. return metadata
  92. def verify_ssl_cert(url: str) -> bool:
  93. """Verify SSL certificate for the given URL."""
  94. if not url.startswith("https://"):
  95. return True
  96. try:
  97. hostname = url.split("://")[-1].split("/")[0]
  98. context = ssl.create_default_context(cafile=certifi.where())
  99. with context.wrap_socket(ssl.socket(), server_hostname=hostname) as s:
  100. s.connect((hostname, 443))
  101. return True
  102. except ssl.SSLError:
  103. return False
  104. except Exception as e:
  105. log.warning(f"SSL verification failed for {url}: {str(e)}")
  106. return False
  107. class RateLimitMixin:
  108. async def _wait_for_rate_limit(self):
  109. """Wait to respect the rate limit if specified."""
  110. if self.requests_per_second and self.last_request_time:
  111. min_interval = timedelta(seconds=1.0 / self.requests_per_second)
  112. time_since_last = datetime.now() - self.last_request_time
  113. if time_since_last < min_interval:
  114. await asyncio.sleep((min_interval - time_since_last).total_seconds())
  115. self.last_request_time = datetime.now()
  116. def _sync_wait_for_rate_limit(self):
  117. """Synchronous version of rate limit wait."""
  118. if self.requests_per_second and self.last_request_time:
  119. min_interval = timedelta(seconds=1.0 / self.requests_per_second)
  120. time_since_last = datetime.now() - self.last_request_time
  121. if time_since_last < min_interval:
  122. time.sleep((min_interval - time_since_last).total_seconds())
  123. self.last_request_time = datetime.now()
  124. class URLProcessingMixin:
  125. def _verify_ssl_cert(self, url: str) -> bool:
  126. """Verify SSL certificate for a URL."""
  127. return verify_ssl_cert(url)
  128. async def _safe_process_url(self, url: str) -> bool:
  129. """Perform safety checks before processing a URL."""
  130. if self.verify_ssl and not self._verify_ssl_cert(url):
  131. raise ValueError(f"SSL certificate verification failed for {url}")
  132. await self._wait_for_rate_limit()
  133. return True
  134. def _safe_process_url_sync(self, url: str) -> bool:
  135. """Synchronous version of safety checks."""
  136. if self.verify_ssl and not self._verify_ssl_cert(url):
  137. raise ValueError(f"SSL certificate verification failed for {url}")
  138. self._sync_wait_for_rate_limit()
  139. return True
  140. class SafeFireCrawlLoader(BaseLoader, RateLimitMixin, URLProcessingMixin):
  141. def __init__(
  142. self,
  143. web_paths,
  144. verify_ssl: bool = True,
  145. trust_env: bool = False,
  146. requests_per_second: Optional[float] = None,
  147. continue_on_failure: bool = True,
  148. api_key: Optional[str] = None,
  149. api_url: Optional[str] = None,
  150. mode: Literal["crawl", "scrape", "map"] = "scrape",
  151. proxy: Optional[Dict[str, str]] = None,
  152. params: Optional[Dict] = None,
  153. ):
  154. """Concurrent document loader for FireCrawl operations.
  155. Executes multiple FireCrawlLoader instances concurrently using thread pooling
  156. to improve bulk processing efficiency.
  157. Args:
  158. web_paths: List of URLs/paths to process.
  159. verify_ssl: If True, verify SSL certificates.
  160. trust_env: If True, use proxy settings from environment variables.
  161. requests_per_second: Number of requests per second to limit to.
  162. continue_on_failure (bool): If True, continue loading other URLs on failure.
  163. api_key: API key for FireCrawl service. Defaults to None
  164. (uses FIRE_CRAWL_API_KEY environment variable if not provided).
  165. api_url: Base URL for FireCrawl API. Defaults to official API endpoint.
  166. mode: Operation mode selection:
  167. - 'crawl': Website crawling mode
  168. - 'scrape': Direct page scraping (default)
  169. - 'map': Site map generation
  170. proxy: Proxy override settings for the FireCrawl API.
  171. params: The parameters to pass to the Firecrawl API.
  172. For more details, visit: https://docs.firecrawl.dev/sdks/python#batch-scrape
  173. """
  174. proxy_server = proxy.get("server") if proxy else None
  175. if trust_env and not proxy_server:
  176. env_proxies = urllib.request.getproxies()
  177. env_proxy_server = env_proxies.get("https") or env_proxies.get("http")
  178. if env_proxy_server:
  179. if proxy:
  180. proxy["server"] = env_proxy_server
  181. else:
  182. proxy = {"server": env_proxy_server}
  183. self.web_paths = web_paths
  184. self.verify_ssl = verify_ssl
  185. self.requests_per_second = requests_per_second
  186. self.last_request_time = None
  187. self.trust_env = trust_env
  188. self.continue_on_failure = continue_on_failure
  189. self.api_key = api_key
  190. self.api_url = api_url
  191. self.mode = mode
  192. self.params = params or {}
  193. def lazy_load(self) -> Iterator[Document]:
  194. """Load documents using FireCrawl batch_scrape."""
  195. log.debug(
  196. "Starting FireCrawl batch scrape for %d URLs, mode: %s, params: %s",
  197. len(self.web_paths),
  198. self.mode,
  199. self.params,
  200. )
  201. try:
  202. firecrawl = Firecrawl(api_key=self.api_key, api_url=self.api_url)
  203. result = firecrawl.batch_scrape(
  204. self.web_paths,
  205. formats=["markdown"],
  206. skip_tls_verification=not self.verify_ssl,
  207. ignore_invalid_urls=True,
  208. remove_base64_images=True,
  209. max_age=300000, # 5 minutes https://docs.firecrawl.dev/features/fast-scraping#common-maxage-values
  210. wait_timeout=len(self.web_paths) * 3,
  211. **self.params,
  212. )
  213. if result.status != "completed":
  214. raise RuntimeError(
  215. f"FireCrawl batch scrape did not complete successfully. result: {result}"
  216. )
  217. for data in result.data:
  218. metadata = data.metadata or {}
  219. yield Document(
  220. page_content=data.markdown or "",
  221. metadata={"source": metadata.url or metadata.source_url or ""},
  222. )
  223. except Exception as e:
  224. if self.continue_on_failure:
  225. log.exception(f"Error extracting content from URLs: {e}")
  226. else:
  227. raise e
  228. async def alazy_load(self):
  229. """Async version of lazy_load."""
  230. log.debug(
  231. "Starting FireCrawl batch scrape for %d URLs, mode: %s, params: %s",
  232. len(self.web_paths),
  233. self.mode,
  234. self.params,
  235. )
  236. try:
  237. firecrawl = Firecrawl(api_key=self.api_key, api_url=self.api_url)
  238. result = firecrawl.batch_scrape(
  239. self.web_paths,
  240. formats=["markdown"],
  241. skip_tls_verification=not self.verify_ssl,
  242. ignore_invalid_urls=True,
  243. remove_base64_images=True,
  244. max_age=300000, # 5 minutes https://docs.firecrawl.dev/features/fast-scraping#common-maxage-values
  245. wait_timeout=len(self.web_paths) * 3,
  246. **self.params,
  247. )
  248. if result.status != "completed":
  249. raise RuntimeError(
  250. f"FireCrawl batch scrape did not complete successfully. result: {result}"
  251. )
  252. for data in result.data:
  253. metadata = data.metadata or {}
  254. yield Document(
  255. page_content=data.markdown or "",
  256. metadata={"source": metadata.url or metadata.source_url or ""},
  257. )
  258. except Exception as e:
  259. if self.continue_on_failure:
  260. log.exception(f"Error extracting content from URLs: {e}")
  261. else:
  262. raise e
  263. class SafeTavilyLoader(BaseLoader, RateLimitMixin, URLProcessingMixin):
  264. def __init__(
  265. self,
  266. web_paths: Union[str, List[str]],
  267. api_key: str,
  268. extract_depth: Literal["basic", "advanced"] = "basic",
  269. continue_on_failure: bool = True,
  270. requests_per_second: Optional[float] = None,
  271. verify_ssl: bool = True,
  272. trust_env: bool = False,
  273. proxy: Optional[Dict[str, str]] = None,
  274. ):
  275. """Initialize SafeTavilyLoader with rate limiting and SSL verification support.
  276. Args:
  277. web_paths: List of URLs/paths to process.
  278. api_key: The Tavily API key.
  279. extract_depth: Depth of extraction ("basic" or "advanced").
  280. continue_on_failure: Whether to continue if extraction of a URL fails.
  281. requests_per_second: Number of requests per second to limit to.
  282. verify_ssl: If True, verify SSL certificates.
  283. trust_env: If True, use proxy settings from environment variables.
  284. proxy: Optional proxy configuration.
  285. """
  286. # Initialize proxy configuration if using environment variables
  287. proxy_server = proxy.get("server") if proxy else None
  288. if trust_env and not proxy_server:
  289. env_proxies = urllib.request.getproxies()
  290. env_proxy_server = env_proxies.get("https") or env_proxies.get("http")
  291. if env_proxy_server:
  292. if proxy:
  293. proxy["server"] = env_proxy_server
  294. else:
  295. proxy = {"server": env_proxy_server}
  296. # Store parameters for creating TavilyLoader instances
  297. self.web_paths = web_paths if isinstance(web_paths, list) else [web_paths]
  298. self.api_key = api_key
  299. self.extract_depth = extract_depth
  300. self.continue_on_failure = continue_on_failure
  301. self.verify_ssl = verify_ssl
  302. self.trust_env = trust_env
  303. self.proxy = proxy
  304. # Add rate limiting
  305. self.requests_per_second = requests_per_second
  306. self.last_request_time = None
  307. def lazy_load(self) -> Iterator[Document]:
  308. """Load documents with rate limiting support, delegating to TavilyLoader."""
  309. valid_urls = []
  310. for url in self.web_paths:
  311. try:
  312. self._safe_process_url_sync(url)
  313. valid_urls.append(url)
  314. except Exception as e:
  315. log.warning(f"SSL verification failed for {url}: {str(e)}")
  316. if not self.continue_on_failure:
  317. raise e
  318. if not valid_urls:
  319. if self.continue_on_failure:
  320. log.warning("No valid URLs to process after SSL verification")
  321. return
  322. raise ValueError("No valid URLs to process after SSL verification")
  323. try:
  324. loader = TavilyLoader(
  325. urls=valid_urls,
  326. api_key=self.api_key,
  327. extract_depth=self.extract_depth,
  328. continue_on_failure=self.continue_on_failure,
  329. )
  330. yield from loader.lazy_load()
  331. except Exception as e:
  332. if self.continue_on_failure:
  333. log.exception(f"Error extracting content from URLs: {e}")
  334. else:
  335. raise e
  336. async def alazy_load(self) -> AsyncIterator[Document]:
  337. """Async version with rate limiting and SSL verification."""
  338. valid_urls = []
  339. for url in self.web_paths:
  340. try:
  341. await self._safe_process_url(url)
  342. valid_urls.append(url)
  343. except Exception as e:
  344. log.warning(f"SSL verification failed for {url}: {str(e)}")
  345. if not self.continue_on_failure:
  346. raise e
  347. if not valid_urls:
  348. if self.continue_on_failure:
  349. log.warning("No valid URLs to process after SSL verification")
  350. return
  351. raise ValueError("No valid URLs to process after SSL verification")
  352. try:
  353. loader = TavilyLoader(
  354. urls=valid_urls,
  355. api_key=self.api_key,
  356. extract_depth=self.extract_depth,
  357. continue_on_failure=self.continue_on_failure,
  358. )
  359. async for document in loader.alazy_load():
  360. yield document
  361. except Exception as e:
  362. if self.continue_on_failure:
  363. log.exception(f"Error loading URLs: {e}")
  364. else:
  365. raise e
  366. class SafePlaywrightURLLoader(PlaywrightURLLoader, RateLimitMixin, URLProcessingMixin):
  367. """Load HTML pages safely with Playwright, supporting SSL verification, rate limiting, and remote browser connection.
  368. Attributes:
  369. web_paths (List[str]): List of URLs to load.
  370. verify_ssl (bool): If True, verify SSL certificates.
  371. trust_env (bool): If True, use proxy settings from environment variables.
  372. requests_per_second (Optional[float]): Number of requests per second to limit to.
  373. continue_on_failure (bool): If True, continue loading other URLs on failure.
  374. headless (bool): If True, the browser will run in headless mode.
  375. proxy (dict): Proxy override settings for the Playwright session.
  376. playwright_ws_url (Optional[str]): WebSocket endpoint URI for remote browser connection.
  377. playwright_timeout (Optional[int]): Maximum operation time in milliseconds.
  378. """
  379. def __init__(
  380. self,
  381. web_paths: List[str],
  382. verify_ssl: bool = True,
  383. trust_env: bool = False,
  384. requests_per_second: Optional[float] = None,
  385. continue_on_failure: bool = True,
  386. headless: bool = True,
  387. remove_selectors: Optional[List[str]] = None,
  388. proxy: Optional[Dict[str, str]] = None,
  389. playwright_ws_url: Optional[str] = None,
  390. playwright_timeout: Optional[int] = 10000,
  391. ):
  392. """Initialize with additional safety parameters and remote browser support."""
  393. proxy_server = proxy.get("server") if proxy else None
  394. if trust_env and not proxy_server:
  395. env_proxies = urllib.request.getproxies()
  396. env_proxy_server = env_proxies.get("https") or env_proxies.get("http")
  397. if env_proxy_server:
  398. if proxy:
  399. proxy["server"] = env_proxy_server
  400. else:
  401. proxy = {"server": env_proxy_server}
  402. # We'll set headless to False if using playwright_ws_url since it's handled by the remote browser
  403. super().__init__(
  404. urls=web_paths,
  405. continue_on_failure=continue_on_failure,
  406. headless=headless if playwright_ws_url is None else False,
  407. remove_selectors=remove_selectors,
  408. proxy=proxy,
  409. )
  410. self.verify_ssl = verify_ssl
  411. self.requests_per_second = requests_per_second
  412. self.last_request_time = None
  413. self.playwright_ws_url = playwright_ws_url
  414. self.trust_env = trust_env
  415. self.playwright_timeout = playwright_timeout
  416. def lazy_load(self) -> Iterator[Document]:
  417. """Safely load URLs synchronously with support for remote browser."""
  418. from playwright.sync_api import sync_playwright
  419. with sync_playwright() as p:
  420. # Use remote browser if ws_endpoint is provided, otherwise use local browser
  421. if self.playwright_ws_url:
  422. browser = p.chromium.connect(self.playwright_ws_url)
  423. else:
  424. browser = p.chromium.launch(headless=self.headless, proxy=self.proxy)
  425. for url in self.urls:
  426. try:
  427. self._safe_process_url_sync(url)
  428. page = browser.new_page()
  429. response = page.goto(url, timeout=self.playwright_timeout)
  430. if response is None:
  431. raise ValueError(f"page.goto() returned None for url {url}")
  432. text = self.evaluator.evaluate(page, browser, response)
  433. metadata = {"source": url}
  434. yield Document(page_content=text, metadata=metadata)
  435. except Exception as e:
  436. if self.continue_on_failure:
  437. log.exception(f"Error loading {url}: {e}")
  438. continue
  439. raise e
  440. browser.close()
  441. async def alazy_load(self) -> AsyncIterator[Document]:
  442. """Safely load URLs asynchronously with support for remote browser."""
  443. from playwright.async_api import async_playwright
  444. async with async_playwright() as p:
  445. # Use remote browser if ws_endpoint is provided, otherwise use local browser
  446. if self.playwright_ws_url:
  447. browser = await p.chromium.connect(self.playwright_ws_url)
  448. else:
  449. browser = await p.chromium.launch(
  450. headless=self.headless, proxy=self.proxy
  451. )
  452. for url in self.urls:
  453. try:
  454. await self._safe_process_url(url)
  455. page = await browser.new_page()
  456. response = await page.goto(url, timeout=self.playwright_timeout)
  457. if response is None:
  458. raise ValueError(f"page.goto() returned None for url {url}")
  459. text = await self.evaluator.evaluate_async(page, browser, response)
  460. metadata = {"source": url}
  461. yield Document(page_content=text, metadata=metadata)
  462. except Exception as e:
  463. if self.continue_on_failure:
  464. log.exception(f"Error loading {url}: {e}")
  465. continue
  466. raise e
  467. await browser.close()
  468. class SafeWebBaseLoader(WebBaseLoader):
  469. """WebBaseLoader with enhanced error handling for URLs."""
  470. def __init__(self, trust_env: bool = False, *args, **kwargs):
  471. """Initialize SafeWebBaseLoader
  472. Args:
  473. trust_env (bool, optional): set to True if using proxy to make web requests, for example
  474. using http(s)_proxy environment variables. Defaults to False.
  475. """
  476. super().__init__(*args, **kwargs)
  477. self.trust_env = trust_env
  478. async def _fetch(
  479. self, url: str, retries: int = 3, cooldown: int = 2, backoff: float = 1.5
  480. ) -> str:
  481. async with aiohttp.ClientSession(trust_env=self.trust_env) as session:
  482. for i in range(retries):
  483. try:
  484. kwargs: Dict = dict(
  485. headers=self.session.headers,
  486. cookies=self.session.cookies.get_dict(),
  487. )
  488. if not self.session.verify:
  489. kwargs["ssl"] = False
  490. async with session.get(
  491. url,
  492. **(self.requests_kwargs | kwargs),
  493. allow_redirects=False,
  494. ) as response:
  495. if self.raise_for_status:
  496. response.raise_for_status()
  497. return await response.text()
  498. except aiohttp.ClientConnectionError as e:
  499. if i == retries - 1:
  500. raise
  501. else:
  502. log.warning(
  503. f"Error fetching {url} with attempt "
  504. f"{i + 1}/{retries}: {e}. Retrying..."
  505. )
  506. await asyncio.sleep(cooldown * backoff**i)
  507. raise ValueError("retry count exceeded")
  508. def _unpack_fetch_results(
  509. self, results: Any, urls: List[str], parser: Union[str, None] = None
  510. ) -> List[Any]:
  511. """Unpack fetch results into BeautifulSoup objects."""
  512. from bs4 import BeautifulSoup
  513. final_results = []
  514. for i, result in enumerate(results):
  515. url = urls[i]
  516. if parser is None:
  517. if url.endswith(".xml"):
  518. parser = "xml"
  519. else:
  520. parser = self.default_parser
  521. self._check_parser(parser)
  522. final_results.append(BeautifulSoup(result, parser, **self.bs_kwargs))
  523. return final_results
  524. async def ascrape_all(
  525. self, urls: List[str], parser: Union[str, None] = None
  526. ) -> List[Any]:
  527. """Async fetch all urls, then return soups for all results."""
  528. results = await self.fetch_all(urls)
  529. return self._unpack_fetch_results(results, urls, parser=parser)
  530. def lazy_load(self) -> Iterator[Document]:
  531. """Lazy load text from the url(s) in web_path with error handling."""
  532. for path in self.web_paths:
  533. try:
  534. soup = self._scrape(path, bs_kwargs=self.bs_kwargs)
  535. text = soup.get_text(**self.bs_get_text_kwargs)
  536. # Build metadata
  537. metadata = extract_metadata(soup, path)
  538. yield Document(page_content=text, metadata=metadata)
  539. except Exception as e:
  540. # Log the error and continue with the next URL
  541. log.exception(f"Error loading {path}: {e}")
  542. async def alazy_load(self) -> AsyncIterator[Document]:
  543. """Async lazy load text from the url(s) in web_path."""
  544. results = await self.ascrape_all(self.web_paths)
  545. for path, soup in zip(self.web_paths, results):
  546. text = soup.get_text(**self.bs_get_text_kwargs)
  547. metadata = {"source": path}
  548. if title := soup.find("title"):
  549. metadata["title"] = title.get_text()
  550. if description := soup.find("meta", attrs={"name": "description"}):
  551. metadata["description"] = description.get(
  552. "content", "No description found."
  553. )
  554. if html := soup.find("html"):
  555. metadata["language"] = html.get("lang", "No language found.")
  556. yield Document(page_content=text, metadata=metadata)
  557. async def aload(self) -> list[Document]:
  558. """Load data into Document objects."""
  559. return [document async for document in self.alazy_load()]
  560. def get_web_loader(
  561. urls: Union[str, Sequence[str]],
  562. verify_ssl: bool = True,
  563. requests_per_second: int = 2,
  564. trust_env: bool = False,
  565. ):
  566. # Check if the URLs are valid
  567. safe_urls = safe_validate_urls([urls] if isinstance(urls, str) else urls)
  568. web_loader_args = {
  569. "web_paths": safe_urls,
  570. "verify_ssl": verify_ssl,
  571. "requests_per_second": requests_per_second,
  572. "continue_on_failure": True,
  573. "trust_env": trust_env,
  574. }
  575. if WEB_LOADER_ENGINE.value == "" or WEB_LOADER_ENGINE.value == "safe_web":
  576. WebLoaderClass = SafeWebBaseLoader
  577. if WEB_LOADER_ENGINE.value == "playwright":
  578. WebLoaderClass = SafePlaywrightURLLoader
  579. web_loader_args["playwright_timeout"] = PLAYWRIGHT_TIMEOUT.value
  580. if PLAYWRIGHT_WS_URL.value:
  581. web_loader_args["playwright_ws_url"] = PLAYWRIGHT_WS_URL.value
  582. if WEB_LOADER_ENGINE.value == "firecrawl":
  583. WebLoaderClass = SafeFireCrawlLoader
  584. web_loader_args["api_key"] = FIRECRAWL_API_KEY.value
  585. web_loader_args["api_url"] = FIRECRAWL_API_BASE_URL.value
  586. if WEB_LOADER_ENGINE.value == "tavily":
  587. WebLoaderClass = SafeTavilyLoader
  588. web_loader_args["api_key"] = TAVILY_API_KEY.value
  589. web_loader_args["extract_depth"] = TAVILY_EXTRACT_DEPTH.value
  590. if WEB_LOADER_ENGINE.value == "external":
  591. WebLoaderClass = ExternalWebLoader
  592. web_loader_args["external_url"] = EXTERNAL_WEB_LOADER_URL.value
  593. web_loader_args["external_api_key"] = EXTERNAL_WEB_LOADER_API_KEY.value
  594. if WebLoaderClass:
  595. web_loader = WebLoaderClass(**web_loader_args)
  596. log.debug(
  597. "Using WEB_LOADER_ENGINE %s for %s URLs",
  598. web_loader.__class__.__name__,
  599. len(safe_urls),
  600. )
  601. return web_loader
  602. else:
  603. raise ValueError(
  604. f"Invalid WEB_LOADER_ENGINE: {WEB_LOADER_ENGINE.value}. "
  605. "Please set it to 'safe_web', 'playwright', 'firecrawl', or 'tavily'."
  606. )