youtube.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. import logging
  2. from typing import Any, Dict, Generator, List, Optional, Sequence, Union
  3. from urllib.parse import parse_qs, urlparse
  4. from langchain_core.documents import Document
  5. from open_webui.env import SRC_LOG_LEVELS
  6. log = logging.getLogger(__name__)
  7. log.setLevel(SRC_LOG_LEVELS["RAG"])
  8. ALLOWED_SCHEMES = {"http", "https"}
  9. ALLOWED_NETLOCS = {
  10. "youtu.be",
  11. "m.youtube.com",
  12. "youtube.com",
  13. "www.youtube.com",
  14. "www.youtube-nocookie.com",
  15. "vid.plus",
  16. }
  17. def _parse_video_id(url: str) -> Optional[str]:
  18. """Parse a YouTube URL and return the video ID if valid, otherwise None."""
  19. parsed_url = urlparse(url)
  20. if parsed_url.scheme not in ALLOWED_SCHEMES:
  21. return None
  22. if parsed_url.netloc not in ALLOWED_NETLOCS:
  23. return None
  24. path = parsed_url.path
  25. if path.endswith("/watch"):
  26. query = parsed_url.query
  27. parsed_query = parse_qs(query)
  28. if "v" in parsed_query:
  29. ids = parsed_query["v"]
  30. video_id = ids if isinstance(ids, str) else ids[0]
  31. else:
  32. return None
  33. else:
  34. path = parsed_url.path.lstrip("/")
  35. video_id = path.split("/")[-1]
  36. if len(video_id) != 11: # Video IDs are 11 characters long
  37. return None
  38. return video_id
  39. class YoutubeLoader:
  40. """Load `YouTube` video transcripts."""
  41. def __init__(
  42. self,
  43. video_id: str,
  44. language: Union[str, Sequence[str]] = "en",
  45. proxy_url: Optional[str] = None,
  46. ):
  47. """Initialize with YouTube video ID."""
  48. _video_id = _parse_video_id(video_id)
  49. self.video_id = _video_id if _video_id is not None else video_id
  50. self._metadata = {"source": video_id}
  51. self.proxy_url = proxy_url
  52. # Ensure language is a list
  53. if isinstance(language, str):
  54. self.language = [language]
  55. else:
  56. self.language = list(language)
  57. # Add English as fallback if not already in the list
  58. if "en" not in self.language:
  59. self.language.append("en")
  60. def load(self) -> List[Document]:
  61. """Load YouTube transcripts into `Document` objects."""
  62. try:
  63. from youtube_transcript_api import (
  64. NoTranscriptFound,
  65. TranscriptsDisabled,
  66. YouTubeTranscriptApi,
  67. )
  68. except ImportError:
  69. raise ImportError(
  70. 'Could not import "youtube_transcript_api" Python package. '
  71. "Please install it with `pip install youtube-transcript-api`."
  72. )
  73. if self.proxy_url:
  74. youtube_proxies = {
  75. "http": self.proxy_url,
  76. "https": self.proxy_url,
  77. }
  78. # Don't log complete URL because it might contain secrets
  79. log.debug(f"Using proxy URL: {self.proxy_url[:14]}...")
  80. else:
  81. youtube_proxies = None
  82. try:
  83. transcript_list = YouTubeTranscriptApi.list_transcripts(
  84. self.video_id, proxies=youtube_proxies
  85. )
  86. except Exception as e:
  87. log.exception("Loading YouTube transcript failed")
  88. return []
  89. # Try each language in order of priority
  90. for lang in self.language:
  91. try:
  92. transcript = transcript_list.find_transcript([lang])
  93. log.debug(f"Found transcript for language '{lang}'")
  94. transcript_pieces: List[Dict[str, Any]] = transcript.fetch()
  95. transcript_text = " ".join(
  96. map(
  97. lambda transcript_piece: transcript_piece.text.strip(" "),
  98. transcript_pieces,
  99. )
  100. )
  101. return [Document(page_content=transcript_text, metadata=self._metadata)]
  102. except NoTranscriptFound:
  103. log.debug(f"No transcript found for language '{lang}'")
  104. continue
  105. except Exception as e:
  106. log.info(f"Error finding transcript for language '{lang}'")
  107. raise e
  108. # If we get here, all languages failed
  109. languages_tried = ", ".join(self.language)
  110. log.warning(
  111. f"No transcript found for any of the specified languages: {languages_tried}. Verify if the video has transcripts, add more languages if needed."
  112. )
  113. raise NoTranscriptFound(
  114. f"No transcript found for any supported language. Verify if the video has transcripts, add more languages if needed."
  115. )