| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651 | import asyncioimport loggingimport socketimport sslimport urllib.parseimport urllib.requestfrom collections import defaultdictfrom datetime import datetime, time, timedeltafrom typing import (    Any,    AsyncIterator,    Dict,    Iterator,    List,    Optional,    Sequence,    Union,    Literal,)import aiohttpimport certifiimport validatorsfrom langchain_community.document_loaders import PlaywrightURLLoader, WebBaseLoaderfrom langchain_community.document_loaders.firecrawl import FireCrawlLoaderfrom langchain_community.document_loaders.base import BaseLoaderfrom langchain_core.documents import Documentfrom open_webui.retrieval.loaders.tavily import TavilyLoaderfrom open_webui.retrieval.loaders.external_web import ExternalWebLoaderfrom open_webui.constants import ERROR_MESSAGESfrom open_webui.config import (    ENABLE_RAG_LOCAL_WEB_FETCH,    PLAYWRIGHT_WS_URL,    PLAYWRIGHT_TIMEOUT,    WEB_LOADER_ENGINE,    FIRECRAWL_API_BASE_URL,    FIRECRAWL_API_KEY,    TAVILY_API_KEY,    TAVILY_EXTRACT_DEPTH,    EXTERNAL_WEB_LOADER_URL,    EXTERNAL_WEB_LOADER_API_KEY,)from open_webui.env import SRC_LOG_LEVELS, AIOHTTP_CLIENT_SESSION_SSLlog = logging.getLogger(__name__)log.setLevel(SRC_LOG_LEVELS["RAG"])def validate_url(url: Union[str, Sequence[str]]):    if isinstance(url, str):        if isinstance(validators.url(url), validators.ValidationError):            raise ValueError(ERROR_MESSAGES.INVALID_URL)        if not ENABLE_RAG_LOCAL_WEB_FETCH:            # Local web fetch is disabled, filter out any URLs that resolve to private IP addresses            parsed_url = urllib.parse.urlparse(url)            # Get IPv4 and IPv6 addresses            ipv4_addresses, ipv6_addresses = resolve_hostname(parsed_url.hostname)            # Check if any of the resolved addresses are private            # This is technically still vulnerable to DNS rebinding attacks, as we don't control WebBaseLoader            for ip in ipv4_addresses:                if validators.ipv4(ip, private=True):                    raise ValueError(ERROR_MESSAGES.INVALID_URL)            for ip in ipv6_addresses:                if validators.ipv6(ip, private=True):                    raise ValueError(ERROR_MESSAGES.INVALID_URL)        return True    elif isinstance(url, Sequence):        return all(validate_url(u) for u in url)    else:        return Falsedef safe_validate_urls(url: Sequence[str]) -> Sequence[str]:    valid_urls = []    for u in url:        try:            if validate_url(u):                valid_urls.append(u)        except ValueError:            continue    return valid_urlsdef resolve_hostname(hostname):    # Get address information    addr_info = socket.getaddrinfo(hostname, None)    # Extract IP addresses from address information    ipv4_addresses = [info[4][0] for info in addr_info if info[0] == socket.AF_INET]    ipv6_addresses = [info[4][0] for info in addr_info if info[0] == socket.AF_INET6]    return ipv4_addresses, ipv6_addressesdef extract_metadata(soup, url):    metadata = {"source": url}    if title := soup.find("title"):        metadata["title"] = title.get_text()    if description := soup.find("meta", attrs={"name": "description"}):        metadata["description"] = description.get("content", "No description found.")    if html := soup.find("html"):        metadata["language"] = html.get("lang", "No language found.")    return metadatadef verify_ssl_cert(url: str) -> bool:    """Verify SSL certificate for the given URL."""    if not url.startswith("https://"):        return True    try:        hostname = url.split("://")[-1].split("/")[0]        context = ssl.create_default_context(cafile=certifi.where())        with context.wrap_socket(ssl.socket(), server_hostname=hostname) as s:            s.connect((hostname, 443))        return True    except ssl.SSLError:        return False    except Exception as e:        log.warning(f"SSL verification failed for {url}: {str(e)}")        return Falseclass RateLimitMixin:    async def _wait_for_rate_limit(self):        """Wait to respect the rate limit if specified."""        if self.requests_per_second and self.last_request_time:            min_interval = timedelta(seconds=1.0 / self.requests_per_second)            time_since_last = datetime.now() - self.last_request_time            if time_since_last < min_interval:                await asyncio.sleep((min_interval - time_since_last).total_seconds())        self.last_request_time = datetime.now()    def _sync_wait_for_rate_limit(self):        """Synchronous version of rate limit wait."""        if self.requests_per_second and self.last_request_time:            min_interval = timedelta(seconds=1.0 / self.requests_per_second)            time_since_last = datetime.now() - self.last_request_time            if time_since_last < min_interval:                time.sleep((min_interval - time_since_last).total_seconds())        self.last_request_time = datetime.now()class URLProcessingMixin:    def _verify_ssl_cert(self, url: str) -> bool:        """Verify SSL certificate for a URL."""        return verify_ssl_cert(url)    async def _safe_process_url(self, url: str) -> bool:        """Perform safety checks before processing a URL."""        if self.verify_ssl and not self._verify_ssl_cert(url):            raise ValueError(f"SSL certificate verification failed for {url}")        await self._wait_for_rate_limit()        return True    def _safe_process_url_sync(self, url: str) -> bool:        """Synchronous version of safety checks."""        if self.verify_ssl and not self._verify_ssl_cert(url):            raise ValueError(f"SSL certificate verification failed for {url}")        self._sync_wait_for_rate_limit()        return Trueclass SafeFireCrawlLoader(BaseLoader, RateLimitMixin, URLProcessingMixin):    def __init__(        self,        web_paths,        verify_ssl: bool = True,        trust_env: bool = False,        requests_per_second: Optional[float] = None,        continue_on_failure: bool = True,        api_key: Optional[str] = None,        api_url: Optional[str] = None,        mode: Literal["crawl", "scrape", "map"] = "scrape",        proxy: Optional[Dict[str, str]] = None,        params: Optional[Dict] = None,    ):        """Concurrent document loader for FireCrawl operations.        Executes multiple FireCrawlLoader instances concurrently using thread pooling        to improve bulk processing efficiency.        Args:            web_paths: List of URLs/paths to process.            verify_ssl: If True, verify SSL certificates.            trust_env: If True, use proxy settings from environment variables.            requests_per_second: Number of requests per second to limit to.            continue_on_failure (bool): If True, continue loading other URLs on failure.            api_key: API key for FireCrawl service. Defaults to None                (uses FIRE_CRAWL_API_KEY environment variable if not provided).            api_url: Base URL for FireCrawl API. Defaults to official API endpoint.            mode: Operation mode selection:                - 'crawl': Website crawling mode (default)                - 'scrape': Direct page scraping                - 'map': Site map generation            proxy: Proxy override settings for the FireCrawl API.            params: The parameters to pass to the Firecrawl API.                Examples include crawlerOptions.                For more details, visit: https://github.com/mendableai/firecrawl-py        """        proxy_server = proxy.get("server") if proxy else None        if trust_env and not proxy_server:            env_proxies = urllib.request.getproxies()            env_proxy_server = env_proxies.get("https") or env_proxies.get("http")            if env_proxy_server:                if proxy:                    proxy["server"] = env_proxy_server                else:                    proxy = {"server": env_proxy_server}        self.web_paths = web_paths        self.verify_ssl = verify_ssl        self.requests_per_second = requests_per_second        self.last_request_time = None        self.trust_env = trust_env        self.continue_on_failure = continue_on_failure        self.api_key = api_key        self.api_url = api_url        self.mode = mode        self.params = params    def lazy_load(self) -> Iterator[Document]:        """Load documents concurrently using FireCrawl."""        for url in self.web_paths:            try:                self._safe_process_url_sync(url)                loader = FireCrawlLoader(                    url=url,                    api_key=self.api_key,                    api_url=self.api_url,                    mode=self.mode,                    params=self.params,                )                for document in loader.lazy_load():                    if not document.metadata.get("source"):                        document.metadata["source"] = document.metadata.get("sourceURL")                    yield document            except Exception as e:                if self.continue_on_failure:                    log.exception(f"Error loading {url}: {e}")                    continue                raise e    async def alazy_load(self):        """Async version of lazy_load."""        for url in self.web_paths:            try:                await self._safe_process_url(url)                loader = FireCrawlLoader(                    url=url,                    api_key=self.api_key,                    api_url=self.api_url,                    mode=self.mode,                    params=self.params,                )                async for document in loader.alazy_load():                    if not document.metadata.get("source"):                        document.metadata["source"] = document.metadata.get("sourceURL")                    yield document            except Exception as e:                if self.continue_on_failure:                    log.exception(f"Error loading {url}: {e}")                    continue                raise eclass SafeTavilyLoader(BaseLoader, RateLimitMixin, URLProcessingMixin):    def __init__(        self,        web_paths: Union[str, List[str]],        api_key: str,        extract_depth: Literal["basic", "advanced"] = "basic",        continue_on_failure: bool = True,        requests_per_second: Optional[float] = None,        verify_ssl: bool = True,        trust_env: bool = False,        proxy: Optional[Dict[str, str]] = None,    ):        """Initialize SafeTavilyLoader with rate limiting and SSL verification support.        Args:            web_paths: List of URLs/paths to process.            api_key: The Tavily API key.            extract_depth: Depth of extraction ("basic" or "advanced").            continue_on_failure: Whether to continue if extraction of a URL fails.            requests_per_second: Number of requests per second to limit to.            verify_ssl: If True, verify SSL certificates.            trust_env: If True, use proxy settings from environment variables.            proxy: Optional proxy configuration.        """        # Initialize proxy configuration if using environment variables        proxy_server = proxy.get("server") if proxy else None        if trust_env and not proxy_server:            env_proxies = urllib.request.getproxies()            env_proxy_server = env_proxies.get("https") or env_proxies.get("http")            if env_proxy_server:                if proxy:                    proxy["server"] = env_proxy_server                else:                    proxy = {"server": env_proxy_server}        # Store parameters for creating TavilyLoader instances        self.web_paths = web_paths if isinstance(web_paths, list) else [web_paths]        self.api_key = api_key        self.extract_depth = extract_depth        self.continue_on_failure = continue_on_failure        self.verify_ssl = verify_ssl        self.trust_env = trust_env        self.proxy = proxy        # Add rate limiting        self.requests_per_second = requests_per_second        self.last_request_time = None    def lazy_load(self) -> Iterator[Document]:        """Load documents with rate limiting support, delegating to TavilyLoader."""        valid_urls = []        for url in self.web_paths:            try:                self._safe_process_url_sync(url)                valid_urls.append(url)            except Exception as e:                log.warning(f"SSL verification failed for {url}: {str(e)}")                if not self.continue_on_failure:                    raise e        if not valid_urls:            if self.continue_on_failure:                log.warning("No valid URLs to process after SSL verification")                return            raise ValueError("No valid URLs to process after SSL verification")        try:            loader = TavilyLoader(                urls=valid_urls,                api_key=self.api_key,                extract_depth=self.extract_depth,                continue_on_failure=self.continue_on_failure,            )            yield from loader.lazy_load()        except Exception as e:            if self.continue_on_failure:                log.exception(f"Error extracting content from URLs: {e}")            else:                raise e    async def alazy_load(self) -> AsyncIterator[Document]:        """Async version with rate limiting and SSL verification."""        valid_urls = []        for url in self.web_paths:            try:                await self._safe_process_url(url)                valid_urls.append(url)            except Exception as e:                log.warning(f"SSL verification failed for {url}: {str(e)}")                if not self.continue_on_failure:                    raise e        if not valid_urls:            if self.continue_on_failure:                log.warning("No valid URLs to process after SSL verification")                return            raise ValueError("No valid URLs to process after SSL verification")        try:            loader = TavilyLoader(                urls=valid_urls,                api_key=self.api_key,                extract_depth=self.extract_depth,                continue_on_failure=self.continue_on_failure,            )            async for document in loader.alazy_load():                yield document        except Exception as e:            if self.continue_on_failure:                log.exception(f"Error loading URLs: {e}")            else:                raise eclass SafePlaywrightURLLoader(PlaywrightURLLoader, RateLimitMixin, URLProcessingMixin):    """Load HTML pages safely with Playwright, supporting SSL verification, rate limiting, and remote browser connection.    Attributes:        web_paths (List[str]): List of URLs to load.        verify_ssl (bool): If True, verify SSL certificates.        trust_env (bool): If True, use proxy settings from environment variables.        requests_per_second (Optional[float]): Number of requests per second to limit to.        continue_on_failure (bool): If True, continue loading other URLs on failure.        headless (bool): If True, the browser will run in headless mode.        proxy (dict): Proxy override settings for the Playwright session.        playwright_ws_url (Optional[str]): WebSocket endpoint URI for remote browser connection.        playwright_timeout (Optional[int]): Maximum operation time in milliseconds.    """    def __init__(        self,        web_paths: List[str],        verify_ssl: bool = True,        trust_env: bool = False,        requests_per_second: Optional[float] = None,        continue_on_failure: bool = True,        headless: bool = True,        remove_selectors: Optional[List[str]] = None,        proxy: Optional[Dict[str, str]] = None,        playwright_ws_url: Optional[str] = None,        playwright_timeout: Optional[int] = 10000,    ):        """Initialize with additional safety parameters and remote browser support."""        proxy_server = proxy.get("server") if proxy else None        if trust_env and not proxy_server:            env_proxies = urllib.request.getproxies()            env_proxy_server = env_proxies.get("https") or env_proxies.get("http")            if env_proxy_server:                if proxy:                    proxy["server"] = env_proxy_server                else:                    proxy = {"server": env_proxy_server}        # We'll set headless to False if using playwright_ws_url since it's handled by the remote browser        super().__init__(            urls=web_paths,            continue_on_failure=continue_on_failure,            headless=headless if playwright_ws_url is None else False,            remove_selectors=remove_selectors,            proxy=proxy,        )        self.verify_ssl = verify_ssl        self.requests_per_second = requests_per_second        self.last_request_time = None        self.playwright_ws_url = playwright_ws_url        self.trust_env = trust_env        self.playwright_timeout = playwright_timeout    def lazy_load(self) -> Iterator[Document]:        """Safely load URLs synchronously with support for remote browser."""        from playwright.sync_api import sync_playwright        with sync_playwright() as p:            # Use remote browser if ws_endpoint is provided, otherwise use local browser            if self.playwright_ws_url:                browser = p.chromium.connect(self.playwright_ws_url)            else:                browser = p.chromium.launch(headless=self.headless, proxy=self.proxy)            for url in self.urls:                try:                    self._safe_process_url_sync(url)                    page = browser.new_page()                    response = page.goto(url, timeout=self.playwright_timeout)                    if response is None:                        raise ValueError(f"page.goto() returned None for url {url}")                    text = self.evaluator.evaluate(page, browser, response)                    metadata = {"source": url}                    yield Document(page_content=text, metadata=metadata)                except Exception as e:                    if self.continue_on_failure:                        log.exception(f"Error loading {url}: {e}")                        continue                    raise e            browser.close()    async def alazy_load(self) -> AsyncIterator[Document]:        """Safely load URLs asynchronously with support for remote browser."""        from playwright.async_api import async_playwright        async with async_playwright() as p:            # Use remote browser if ws_endpoint is provided, otherwise use local browser            if self.playwright_ws_url:                browser = await p.chromium.connect(self.playwright_ws_url)            else:                browser = await p.chromium.launch(                    headless=self.headless, proxy=self.proxy                )            for url in self.urls:                try:                    await self._safe_process_url(url)                    page = await browser.new_page()                    response = await page.goto(url, timeout=self.playwright_timeout)                    if response is None:                        raise ValueError(f"page.goto() returned None for url {url}")                    text = await self.evaluator.evaluate_async(page, browser, response)                    metadata = {"source": url}                    yield Document(page_content=text, metadata=metadata)                except Exception as e:                    if self.continue_on_failure:                        log.exception(f"Error loading {url}: {e}")                        continue                    raise e            await browser.close()class SafeWebBaseLoader(WebBaseLoader):    """WebBaseLoader with enhanced error handling for URLs."""    def __init__(self, trust_env: bool = False, *args, **kwargs):        """Initialize SafeWebBaseLoader        Args:            trust_env (bool, optional): set to True if using proxy to make web requests, for example                using http(s)_proxy environment variables. Defaults to False.        """        super().__init__(*args, **kwargs)        self.trust_env = trust_env    async def _fetch(        self, url: str, retries: int = 3, cooldown: int = 2, backoff: float = 1.5    ) -> str:        async with aiohttp.ClientSession(trust_env=self.trust_env) as session:            for i in range(retries):                try:                    kwargs: Dict = dict(                        headers=self.session.headers,                        cookies=self.session.cookies.get_dict(),                    )                    if not self.session.verify:                        kwargs["ssl"] = False                    async with session.get(                        url,                        **(self.requests_kwargs | kwargs),                        allow_redirects=False,                    ) as response:                        if self.raise_for_status:                            response.raise_for_status()                        return await response.text()                except aiohttp.ClientConnectionError as e:                    if i == retries - 1:                        raise                    else:                        log.warning(                            f"Error fetching {url} with attempt "                            f"{i + 1}/{retries}: {e}. Retrying..."                        )                        await asyncio.sleep(cooldown * backoff**i)        raise ValueError("retry count exceeded")    def _unpack_fetch_results(        self, results: Any, urls: List[str], parser: Union[str, None] = None    ) -> List[Any]:        """Unpack fetch results into BeautifulSoup objects."""        from bs4 import BeautifulSoup        final_results = []        for i, result in enumerate(results):            url = urls[i]            if parser is None:                if url.endswith(".xml"):                    parser = "xml"                else:                    parser = self.default_parser                self._check_parser(parser)            final_results.append(BeautifulSoup(result, parser, **self.bs_kwargs))        return final_results    async def ascrape_all(        self, urls: List[str], parser: Union[str, None] = None    ) -> List[Any]:        """Async fetch all urls, then return soups for all results."""        results = await self.fetch_all(urls)        return self._unpack_fetch_results(results, urls, parser=parser)    def lazy_load(self) -> Iterator[Document]:        """Lazy load text from the url(s) in web_path with error handling."""        for path in self.web_paths:            try:                soup = self._scrape(path, bs_kwargs=self.bs_kwargs)                text = soup.get_text(**self.bs_get_text_kwargs)                # Build metadata                metadata = extract_metadata(soup, path)                yield Document(page_content=text, metadata=metadata)            except Exception as e:                # Log the error and continue with the next URL                log.exception(f"Error loading {path}: {e}")    async def alazy_load(self) -> AsyncIterator[Document]:        """Async lazy load text from the url(s) in web_path."""        results = await self.ascrape_all(self.web_paths)        for path, soup in zip(self.web_paths, results):            text = soup.get_text(**self.bs_get_text_kwargs)            metadata = {"source": path}            if title := soup.find("title"):                metadata["title"] = title.get_text()            if description := soup.find("meta", attrs={"name": "description"}):                metadata["description"] = description.get(                    "content", "No description found."                )            if html := soup.find("html"):                metadata["language"] = html.get("lang", "No language found.")            yield Document(page_content=text, metadata=metadata)    async def aload(self) -> list[Document]:        """Load data into Document objects."""        return [document async for document in self.alazy_load()]def get_web_loader(    urls: Union[str, Sequence[str]],    verify_ssl: bool = True,    requests_per_second: int = 2,    trust_env: bool = False,):    # Check if the URLs are valid    safe_urls = safe_validate_urls([urls] if isinstance(urls, str) else urls)    web_loader_args = {        "web_paths": safe_urls,        "verify_ssl": verify_ssl,        "requests_per_second": requests_per_second,        "continue_on_failure": True,        "trust_env": trust_env,    }    if WEB_LOADER_ENGINE.value == "" or WEB_LOADER_ENGINE.value == "safe_web":        WebLoaderClass = SafeWebBaseLoader    if WEB_LOADER_ENGINE.value == "playwright":        WebLoaderClass = SafePlaywrightURLLoader        web_loader_args["playwright_timeout"] = PLAYWRIGHT_TIMEOUT.value        if PLAYWRIGHT_WS_URL.value:            web_loader_args["playwright_ws_url"] = PLAYWRIGHT_WS_URL.value    if WEB_LOADER_ENGINE.value == "firecrawl":        WebLoaderClass = SafeFireCrawlLoader        web_loader_args["api_key"] = FIRECRAWL_API_KEY.value        web_loader_args["api_url"] = FIRECRAWL_API_BASE_URL.value    if WEB_LOADER_ENGINE.value == "tavily":        WebLoaderClass = SafeTavilyLoader        web_loader_args["api_key"] = TAVILY_API_KEY.value        web_loader_args["extract_depth"] = TAVILY_EXTRACT_DEPTH.value    if WEB_LOADER_ENGINE.value == "external":        WebLoaderClass = ExternalWebLoader        web_loader_args["external_url"] = EXTERNAL_WEB_LOADER_URL.value        web_loader_args["external_api_key"] = EXTERNAL_WEB_LOADER_API_KEY.value    if WebLoaderClass:        web_loader = WebLoaderClass(**web_loader_args)        log.debug(            "Using WEB_LOADER_ENGINE %s for %s URLs",            web_loader.__class__.__name__,            len(safe_urls),        )        return web_loader    else:        raise ValueError(            f"Invalid WEB_LOADER_ENGINE: {WEB_LOADER_ENGINE.value}. "            "Please set it to 'safe_web', 'playwright', 'firecrawl', or 'tavily'."        )
 |