youtube.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. import logging
  2. from typing import Any, Dict, Generator, List, Optional, Sequence, Union
  3. from urllib.parse import parse_qs, urlparse
  4. from langchain_core.documents import Document
  5. from open_webui.env import SRC_LOG_LEVELS
  6. log = logging.getLogger(__name__)
  7. log.setLevel(SRC_LOG_LEVELS["RAG"])
  8. ALLOWED_SCHEMES = {"http", "https"}
  9. ALLOWED_NETLOCS = {
  10. "youtu.be",
  11. "m.youtube.com",
  12. "youtube.com",
  13. "www.youtube.com",
  14. "www.youtube-nocookie.com",
  15. "vid.plus",
  16. }
  17. def _parse_video_id(url: str) -> Optional[str]:
  18. """Parse a YouTube URL and return the video ID if valid, otherwise None."""
  19. parsed_url = urlparse(url)
  20. if parsed_url.scheme not in ALLOWED_SCHEMES:
  21. return None
  22. if parsed_url.netloc not in ALLOWED_NETLOCS:
  23. return None
  24. path = parsed_url.path
  25. if path.endswith("/watch"):
  26. query = parsed_url.query
  27. parsed_query = parse_qs(query)
  28. if "v" in parsed_query:
  29. ids = parsed_query["v"]
  30. video_id = ids if isinstance(ids, str) else ids[0]
  31. else:
  32. return None
  33. else:
  34. path = parsed_url.path.lstrip("/")
  35. video_id = path.split("/")[-1]
  36. if len(video_id) != 11: # Video IDs are 11 characters long
  37. return None
  38. return video_id
  39. class YoutubeLoader:
  40. """Load `YouTube` video transcripts."""
  41. def __init__(
  42. self,
  43. video_id: str,
  44. language: Union[str, Sequence[str]] = "en",
  45. proxy_url: Optional[str] = None,
  46. ):
  47. """Initialize with YouTube video ID."""
  48. _video_id = _parse_video_id(video_id)
  49. self.video_id = _video_id if _video_id is not None else video_id
  50. self._metadata = {"source": video_id}
  51. self.proxy_url = proxy_url
  52. # Ensure language is a list
  53. if isinstance(language, str):
  54. self.language = [language]
  55. else:
  56. self.language = list(language)
  57. # Add English as fallback if not already in the list
  58. if "en" not in self.language:
  59. self.language.append("en")
  60. def load(self) -> List[Document]:
  61. """Load YouTube transcripts into `Document` objects."""
  62. try:
  63. from youtube_transcript_api import (
  64. NoTranscriptFound,
  65. TranscriptsDisabled,
  66. YouTubeTranscriptApi,
  67. )
  68. except ImportError:
  69. raise ImportError(
  70. 'Could not import "youtube_transcript_api" Python package. '
  71. "Please install it with `pip install youtube-transcript-api`."
  72. )
  73. if self.proxy_url:
  74. youtube_proxies = {
  75. "http": self.proxy_url,
  76. "https": self.proxy_url,
  77. }
  78. # Don't log complete URL because it might contain secrets
  79. log.debug(f"Using proxy URL: {self.proxy_url[:14]}...")
  80. else:
  81. youtube_proxies = None
  82. try:
  83. transcript_list = YouTubeTranscriptApi.list_transcripts(
  84. self.video_id, proxies=youtube_proxies
  85. )
  86. except Exception as e:
  87. log.exception("Loading YouTube transcript failed")
  88. return []
  89. # Try each language in order of priority
  90. for lang in self.language:
  91. try:
  92. transcript = transcript_list.find_transcript([lang])
  93. log.debug(f"Found transcript for language '{lang}'")
  94. transcript_pieces: List[Dict[str, Any]] = transcript.fetch()
  95. transcript_text = " ".join(
  96. map(
  97. lambda transcript_piece: transcript_piece.text.strip(" "),
  98. transcript_pieces,
  99. )
  100. )
  101. return [Document(page_content=transcript_text, metadata=self._metadata)]
  102. except NoTranscriptFound:
  103. log.debug(f"No transcript found for language '{lang}'")
  104. continue
  105. except Exception as e:
  106. log.info(f"Error finding transcript for language '{lang}'")
  107. raise e
  108. # If we get here, all languages failed
  109. languages_tried = ", ".join(self.language)
  110. log.warning(f"No transcript found for any of the specified languages: {languages_tried}. Verify if the video has transcripts, add more languages if needed.")
  111. raise NoTranscriptFound(f"No transcript found for any supported language. Verify if the video has transcripts, add more languages if needed.")