youtube.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. import logging
  2. from xml.etree.ElementTree import ParseError
  3. from typing import Any, Dict, Generator, List, Optional, Sequence, Union
  4. from urllib.parse import parse_qs, urlparse
  5. from langchain_core.documents import Document
  6. from open_webui.env import SRC_LOG_LEVELS
  7. log = logging.getLogger(__name__)
  8. log.setLevel(SRC_LOG_LEVELS["RAG"])
  9. ALLOWED_SCHEMES = {"http", "https"}
  10. ALLOWED_NETLOCS = {
  11. "youtu.be",
  12. "m.youtube.com",
  13. "youtube.com",
  14. "www.youtube.com",
  15. "www.youtube-nocookie.com",
  16. "vid.plus",
  17. }
  18. def _parse_video_id(url: str) -> Optional[str]:
  19. """Parse a YouTube URL and return the video ID if valid, otherwise None."""
  20. parsed_url = urlparse(url)
  21. if parsed_url.scheme not in ALLOWED_SCHEMES:
  22. return None
  23. if parsed_url.netloc not in ALLOWED_NETLOCS:
  24. return None
  25. path = parsed_url.path
  26. if path.endswith("/watch"):
  27. query = parsed_url.query
  28. parsed_query = parse_qs(query)
  29. if "v" in parsed_query:
  30. ids = parsed_query["v"]
  31. video_id = ids if isinstance(ids, str) else ids[0]
  32. else:
  33. return None
  34. else:
  35. path = parsed_url.path.lstrip("/")
  36. video_id = path.split("/")[-1]
  37. if len(video_id) != 11: # Video IDs are 11 characters long
  38. return None
  39. return video_id
  40. class YoutubeLoader:
  41. """Load `YouTube` video transcripts."""
  42. def __init__(
  43. self,
  44. video_id: str,
  45. language: Union[str, Sequence[str]] = "en",
  46. proxy_url: Optional[str] = None,
  47. ):
  48. """Initialize with YouTube video ID."""
  49. _video_id = _parse_video_id(video_id)
  50. self.video_id = _video_id if _video_id is not None else video_id
  51. self._metadata = {"source": video_id}
  52. self.proxy_url = proxy_url
  53. # Ensure language is a list
  54. if isinstance(language, str):
  55. self.language = [language]
  56. else:
  57. self.language = list(language)
  58. # Add English as fallback if not already in the list
  59. if "en" not in self.language:
  60. self.language.append("en")
  61. def load(self) -> List[Document]:
  62. """Load YouTube transcripts into `Document` objects."""
  63. try:
  64. from youtube_transcript_api import (
  65. NoTranscriptFound,
  66. TranscriptsDisabled,
  67. YouTubeTranscriptApi,
  68. )
  69. except ImportError:
  70. raise ImportError(
  71. 'Could not import "youtube_transcript_api" Python package. '
  72. "Please install it with `pip install youtube-transcript-api`."
  73. )
  74. if self.proxy_url:
  75. youtube_proxies = {
  76. "http": self.proxy_url,
  77. "https": self.proxy_url,
  78. }
  79. log.debug(f"Using proxy URL: {self.proxy_url[:14]}...")
  80. else:
  81. youtube_proxies = None
  82. transcript_api = YouTubeTranscriptApi(proxy_config=youtube_proxies)
  83. try:
  84. transcript_list = transcript_api.list(self.video_id)
  85. except Exception as e:
  86. log.exception("Loading YouTube transcript failed")
  87. return []
  88. # Try each language in order of priority
  89. for lang in self.language:
  90. try:
  91. transcript = transcript_list.find_transcript([lang])
  92. if transcript.is_generated:
  93. log.debug(f"Found generated transcript for language '{lang}'")
  94. try:
  95. transcript = transcript_list.find_manually_created_transcript(
  96. [lang]
  97. )
  98. log.debug(f"Found manual transcript for language '{lang}'")
  99. except NoTranscriptFound:
  100. log.debug(
  101. f"No manual transcript found for language '{lang}', using generated"
  102. )
  103. pass
  104. log.debug(f"Found transcript for language '{lang}'")
  105. try:
  106. transcript_pieces: List[Dict[str, Any]] = transcript.fetch()
  107. except ParseError:
  108. log.debug(f"Empty or invalid transcript for language '{lang}'")
  109. continue
  110. if not transcript_pieces:
  111. log.debug(f"Empty transcript for language '{lang}'")
  112. continue
  113. transcript_text = " ".join(
  114. map(
  115. lambda transcript_piece: (
  116. transcript_piece.text.strip(" ")
  117. if hasattr(transcript_piece, "text")
  118. else ""
  119. ),
  120. transcript_pieces,
  121. )
  122. )
  123. return [Document(page_content=transcript_text, metadata=self._metadata)]
  124. except NoTranscriptFound:
  125. log.debug(f"No transcript found for language '{lang}'")
  126. continue
  127. except Exception as e:
  128. log.info(f"Error finding transcript for language '{lang}'")
  129. raise e
  130. # If we get here, all languages failed
  131. languages_tried = ", ".join(self.language)
  132. log.warning(
  133. f"No transcript found for any of the specified languages: {languages_tried}. Verify if the video has transcripts, add more languages if needed."
  134. )
  135. raise NoTranscriptFound(self.video_id, self.language, list(transcript_list))
  136. async def aload(self) -> Generator[Document, None, None]:
  137. """Asynchronously load YouTube transcripts into `Document` objects."""
  138. import asyncio
  139. loop = asyncio.get_event_loop()
  140. return await loop.run_in_executor(None, self.load)