1
0

youtube.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. import logging
  2. from xml.etree.ElementTree import ParseError
  3. from typing import Any, Dict, Generator, List, Optional, Sequence, Union
  4. from urllib.parse import parse_qs, urlparse
  5. from langchain_core.documents import Document
  6. from open_webui.env import SRC_LOG_LEVELS
  7. log = logging.getLogger(__name__)
  8. log.setLevel(SRC_LOG_LEVELS["RAG"])
  9. ALLOWED_SCHEMES = {"http", "https"}
  10. ALLOWED_NETLOCS = {
  11. "youtu.be",
  12. "m.youtube.com",
  13. "youtube.com",
  14. "www.youtube.com",
  15. "www.youtube-nocookie.com",
  16. "vid.plus",
  17. }
  18. def _parse_video_id(url: str) -> Optional[str]:
  19. """Parse a YouTube URL and return the video ID if valid, otherwise None."""
  20. parsed_url = urlparse(url)
  21. if parsed_url.scheme not in ALLOWED_SCHEMES:
  22. return None
  23. if parsed_url.netloc not in ALLOWED_NETLOCS:
  24. return None
  25. path = parsed_url.path
  26. if path.endswith("/watch"):
  27. query = parsed_url.query
  28. parsed_query = parse_qs(query)
  29. if "v" in parsed_query:
  30. ids = parsed_query["v"]
  31. video_id = ids if isinstance(ids, str) else ids[0]
  32. else:
  33. return None
  34. else:
  35. path = parsed_url.path.lstrip("/")
  36. video_id = path.split("/")[-1]
  37. if len(video_id) != 11: # Video IDs are 11 characters long
  38. return None
  39. return video_id
  40. class YoutubeLoader:
  41. """Load `YouTube` video transcripts."""
  42. def __init__(
  43. self,
  44. video_id: str,
  45. language: Union[str, Sequence[str]] = "en",
  46. proxy_url: Optional[str] = None,
  47. ):
  48. """Initialize with YouTube video ID."""
  49. _video_id = _parse_video_id(video_id)
  50. self.video_id = _video_id if _video_id is not None else video_id
  51. self._metadata = {"source": video_id}
  52. self.proxy_url = proxy_url
  53. # Ensure language is a list
  54. if isinstance(language, str):
  55. self.language = [language]
  56. else:
  57. self.language = list(language)
  58. # Add English as fallback if not already in the list
  59. if "en" not in self.language:
  60. self.language.append("en")
  61. def load(self) -> List[Document]:
  62. """Load YouTube transcripts into `Document` objects."""
  63. try:
  64. from youtube_transcript_api import (
  65. NoTranscriptFound,
  66. TranscriptsDisabled,
  67. YouTubeTranscriptApi,
  68. )
  69. except ImportError:
  70. raise ImportError(
  71. 'Could not import "youtube_transcript_api" Python package. '
  72. "Please install it with `pip install youtube-transcript-api`."
  73. )
  74. if self.proxy_url:
  75. youtube_proxies = {
  76. "http": self.proxy_url,
  77. "https": self.proxy_url,
  78. }
  79. log.debug(f"Using proxy URL: {self.proxy_url[:14]}...")
  80. else:
  81. youtube_proxies = None
  82. try:
  83. transcript_list = YouTubeTranscriptApi.list_transcripts(
  84. self.video_id, proxies=youtube_proxies
  85. )
  86. except Exception as e:
  87. log.exception("Loading YouTube transcript failed")
  88. return []
  89. # Try each language in order of priority
  90. for lang in self.language:
  91. try:
  92. transcript = transcript_list.find_transcript([lang])
  93. if transcript.is_generated:
  94. log.debug(f"Found generated transcript for language '{lang}'")
  95. try:
  96. transcript = transcript_list.find_manually_created_transcript(
  97. [lang]
  98. )
  99. log.debug(f"Found manual transcript for language '{lang}'")
  100. except NoTranscriptFound:
  101. log.debug(
  102. f"No manual transcript found for language '{lang}', using generated"
  103. )
  104. pass
  105. log.debug(f"Found transcript for language '{lang}'")
  106. try:
  107. transcript_pieces: List[Dict[str, Any]] = transcript.fetch()
  108. except ParseError:
  109. log.debug(f"Empty or invalid transcript for language '{lang}'")
  110. continue
  111. if not transcript_pieces:
  112. log.debug(f"Empty transcript for language '{lang}'")
  113. continue
  114. transcript_text = " ".join(
  115. map(
  116. lambda transcript_piece: (
  117. transcript_piece.text.strip(" ")
  118. if hasattr(transcript_piece, "text")
  119. else ""
  120. ),
  121. transcript_pieces,
  122. )
  123. )
  124. return [Document(page_content=transcript_text, metadata=self._metadata)]
  125. except NoTranscriptFound:
  126. log.debug(f"No transcript found for language '{lang}'")
  127. continue
  128. except Exception as e:
  129. log.info(f"Error finding transcript for language '{lang}'")
  130. raise e
  131. # If we get here, all languages failed
  132. languages_tried = ", ".join(self.language)
  133. log.warning(
  134. f"No transcript found for any of the specified languages: {languages_tried}. Verify if the video has transcripts, add more languages if needed."
  135. )
  136. raise NoTranscriptFound(self.video_id, self.language, list(transcript_list))