diff --git a/youtube_search/__init__.py b/youtube_search/__init__.py index 1572d6b..e5f9673 100644 --- a/youtube_search/__init__.py +++ b/youtube_search/__init__.py @@ -2,8 +2,8 @@ import asyncio import sys -from .search import AsyncYoutubeSearch, YoutubeSearch -from .video import AsyncYoutubeVideo, YoutubeVideo +from .search import AsyncYoutubeSearch, BaseYoutubeSearch, YoutubeSearch +from .video import AsyncYoutubeVideo, BaseYoutubeVideo, YoutubeVideo from .options import Options if sys.platform == "win32": diff --git a/youtube_search/search.py b/youtube_search/search.py index 85876d2..425b681 100644 --- a/youtube_search/search.py +++ b/youtube_search/search.py @@ -9,8 +9,8 @@ import re from typing import Iterator, List, Optional, Union from unicodedata import normalize as unicode_normalize +import aiohttp import requests -from aiohttp import ClientSession from .options import Options BASE_URL = "https://www.youtube.com" @@ -32,16 +32,15 @@ def encode_url(url: str) -> str: return requests.utils.quote(url).replace("%20", "+") -class YoutubeSearch: +class BaseYoutubeSearch: """ - Entry point class for youtube searching + Base class for YoutubeSearch """ def __init__( self, max_results: Optional[int] = None, options: Options = Options(), - session: Optional[requests.Session] = None, ): """ Parameters @@ -50,43 +49,22 @@ def __init__( The maximum result that will be returned. Set to None to remove the limit options : Options youtube_search options - session : Optional[requests.Session], default None - Requests session """ if max_results is not None and max_results < 0: raise ValueError( "Max result must be a whole number or set to None to remove the limit" ) self.json = options.json_parser - requests.models.complexjson = self.json self.max_results = max_results - self.__api_key = None - self.__cookies = { + self._api_key = None + self._cookies = { "PREF": f"hl={options.language}&gl={options.region}", "domain": ".youtube.com", } - self.__data = {} - self.__requests_kwargs = {"timeout": options.timeout, "proxies": options.proxy} - self.__session = requests.Session() if session is None else session - self.__videos = [] + self._data = {} + self._videos = [] - def __enter__(self) -> "YoutubeSearch": - return self - - def __exit__(self, *args) -> None: - self.close() - - @property - def count(self) -> int: - """ - Returns - ------- - int - How many video are in the list - """ - return len(self.__videos) - - def __get_video(self, response: Union[str, dict]) -> None: + def _get_video(self, response: Union[str, dict]) -> None: """ Get video from parsed html @@ -95,7 +73,7 @@ def __get_video(self, response: Union[str, dict]) -> None: response: Union[str, dict] Passed to self.__parse_html function """ - for contents in self.__parse_html(response): + for contents in self._parse_html(response): if "itemSectionRenderer" not in contents: continue for video in contents.get("itemSectionRenderer", {}).get("contents", {}): @@ -153,9 +131,9 @@ def __get_video(self, response: Union[str, dict]) -> None: res["owner_name"] = ( video_data.get("ownerText", {}).get("runs", [{}])[0].get("text") ) - self.__videos.append(res) + self._videos.append(res) - def __parse_html(self, response: Union[str, dict]) -> Iterator[list]: + def _parse_html(self, response: Union[str, dict]) -> Iterator[list]: """ Parse the html response to get the videos @@ -169,27 +147,29 @@ def __parse_html(self, response: Union[str, dict]) -> Iterator[list]: Iterator[list] Contains list of video data """ - if self.__api_key: - return response.get("onResponseReceivedCommands", [{}])[0].get( - "appendContinuationItemsAction" - ).get("continuationItems") + if self._api_key: + return ( + response.get("onResponseReceivedCommands", [{}])[0] + .get("appendContinuationItemsAction", {}) + .get("continuationItems", []) + ) start = response.index("ytInitialData") + len("ytInitialData") + 3 end = response.index("};", start) + 1 json_str = response[start:end] data = self.json.loads(json_str) - self.__api_key = re.search( + self._api_key = re.search( r"(?:\"INNERTUBE_API_KEY\":\")(?P[A-Za-z0-9_-]+)(?:\",)", response, )["api_key"] - self.__data["context"] = self.json.loads( + self._data["context"] = self.json.loads( re.search( r"(?:\"INNERTUBE_CONTEXT\"\:)(?P\{(.*)\})(?:,\"INNERTUBE_CONTEXT_CLIENT_NAME\")", response, re.DOTALL, )["context"] ) - self.__data["continuation"] = re.search( + self._data["continuation"] = re.search( r"(?:\"continuationCommand\":{\"token\":\")(?P.+)(?:\",\"request\")", response, )["token"] @@ -197,6 +177,68 @@ def __parse_html(self, response: Union[str, dict]) -> Iterator[list]: "sectionListRenderer" ]["contents"] + @property + def count(self) -> int: + """ + Returns + ------- + int + How many video are in the list + """ + return len(self._videos) + + def list(self, clear_cache: bool = True) -> List[dict]: + """ + Return the list of videos + + Parameters + ---------- + clear_cache: bool, default True + Clear the result cache + + Return + ------ + List[dict]: + The list of videos + """ + result = self._videos.copy() + if clear_cache: + self._videos.clear() + return result + + +class YoutubeSearch(BaseYoutubeSearch): + """ + Entry point class for youtube searching + """ + + def __init__( + self, + max_results: Optional[int] = None, + options: Options = Options(), + session: Optional[requests.Session] = None, + ): + """ + Parameters + ---------- + max_results : Optional[int], default 20 + The maximum result that will be returned. Set to None to remove the limit + options : Options + youtube_search options + session : Optional[requests.Session], default None + Requests session + """ + super().__init__(max_results, options) + requests.models.complexjson = self.json + self._requests_kwargs = {"timeout": options.timeout, "proxies": options.proxy} + self.__session = requests.Session() if session is None else session + + def __enter__(self) -> "YoutubeSearch": + return self + + def __exit__(self, *args) -> None: + self.close() + def __search(self, query: str, first: bool = False): """ Search wrapper @@ -211,51 +253,32 @@ def __search(self, query: str, first: bool = False): if first: url = f"{BASE_URL}/results?search_query={encode_url(query)}" resp = self.__session.get( - url, cookies=self.__cookies, **self.__requests_kwargs + url, cookies=self._cookies, **self._requests_kwargs ) resp.raise_for_status() body = resp.text - self.__get_video(body) + self._get_video(body) return - url = f"{BASE_URL}/youtubei/v1/search?{self.__api_key}&prettyPrint=false" + url = f"{BASE_URL}/youtubei/v1/search?{self._api_key}&prettyPrint=false" resp = self.__session.post( url, - cookies=self.__cookies, - data=self.json.dumps(self.__data), - **self.__requests_kwargs, + cookies=self._cookies, + data=self.json.dumps(self._data), + **self._requests_kwargs, ) resp.raise_for_status() body = resp.json() - self.__get_video(body) + self._get_video(body) def close(self) -> None: """ Close the context manager """ - self.__api_key = None - self.__data.clear() - self.__videos.clear() + self._api_key = None + self._data.clear() + self._videos.clear() self.__session.close() - def list(self, clear_cache: bool = True) -> List[dict]: - """ - Return the list of videos - - Parameters - ---------- - clear_cache: bool, default True - Clear the result cache - - Return - ------ - List[dict]: - The list of videos - """ - result = self.__videos.copy() - if clear_cache: - self.__videos.clear() - return result - def search(self, query: str = None, pages: int = 1) -> "YoutubeSearch": """ Parameters @@ -270,11 +293,11 @@ def search(self, query: str = None, pages: int = 1) -> "YoutubeSearch": self YoutubeSearch object """ - self.__videos.clear() + self._videos.clear() if query: - self.__api_key = None - self.__data.clear() - if self.__api_key is None and not query: + self._api_key = None + self._data.clear() + if self._api_key is None and not query: raise ValueError("Last search query not found!") for i in range(pages): if i == 0 and query: @@ -284,7 +307,7 @@ def search(self, query: str = None, pages: int = 1) -> "YoutubeSearch": return self -class AsyncYoutubeSearch: +class AsyncYoutubeSearch(BaseYoutubeSearch): """ Entry point class for youtube searching """ @@ -293,7 +316,7 @@ def __init__( self, max_results: Optional[int] = None, options: Options = Options(), - session: Optional[ClientSession] = None, + session: Optional[aiohttp.ClientSession] = None, ): """ Parameters @@ -302,25 +325,16 @@ def __init__( The maximum result that will be returned. Set to None to remove the limit options : Options youtube_search options - session : Optional[ClientSession], default None + session : Optional[aiohttp.ClientSession], default None aiohttp client session """ - if max_results is not None and max_results < 0: - raise ValueError( - "Max result must be a whole number or set to None to remove the limit" - ) - self.json = options.json_parser - self.max_results = max_results - self.__api_key = None - self.__cookies = { - "PREF": f"hl={options.language}&gl={options.region}", - } - self.__data = {} - self.__requests_kwargs = {"timeout": options.timeout} + super().__init__(max_results, options) + if "domain" in self._cookies: + self._cookies.pop("domain") + self._requests_kwargs = {"timeout": options.timeout} if isinstance(options.proxy, dict): - self.__requests_kwargs["proxy"] = options.proxy.get("https", "") - self.__session = ClientSession() if session is None else session - self.__videos = [] + self._requests_kwargs["proxy"] = options.proxy.get("https", "") + self.__session = aiohttp.ClientSession() if session is None else session async def __aenter__(self) -> "AsyncYoutubeSearch": return self @@ -328,127 +342,6 @@ async def __aenter__(self) -> "AsyncYoutubeSearch": async def __aexit__(self, *args) -> None: await self.close() - @property - def count(self) -> int: - """ - Returns - ------- - int - How many video are in the list - """ - return len(self.__videos) - - async def __get_video(self, response: Union[str, dict]) -> None: - """ - Get video from parsed html - - Parameters - ---------- - response: Union[str, dict] - Passed to self.__parse_html function - """ - for contents in self.__parse_html(response): - if "itemSectionRenderer" not in contents: - continue - for video in contents.get("itemSectionRenderer", {}).get("contents", {}): - if self.max_results is not None and self.count >= self.max_results: - return - res = {} - if "videoRenderer" not in video: - continue - video_data = video.get("videoRenderer", {}) - owner_url_suffix = ( - video_data.get("ownerText", {}) - .get("runs", [{}])[0] - .get("navigationEndpoint", {}) - .get("browseEndpoint", {}) - .get("canonicalBaseUrl") - ) - res["id"] = video_data.get("videoId", None) - res["thumbnails"] = [ - thumb.get("url", None) - for thumb in video_data.get("thumbnail", {}).get("thumbnails", [{}]) - ] - res["title"] = ( - video_data.get("title", {}).get("runs", [[{}]])[0].get("text", None) - ) - res["desc_snippet"] = unicode_normalize( - "NFKD", - "".join( - [ - item.get("text", "") - for item in video_data.get( - "detailedMetadataSnippets", [{}] - )[0] - .get("snippetText", {}) - .get("runs", [{}]) - ] - ), - ) - res["channel"] = ( - video_data.get("longBylineText", {}) - .get("runs", [[{}]])[0] - .get("text", None) - ) - res["duration"] = video_data.get("lengthText", {}).get("simpleText", 0) - res["views"] = video_data.get("viewCountText", {}).get("simpleText", 0) - res["publish_time"] = video_data.get("publishedTimeText", {}).get( - "simpleText", 0 - ) - res["url_suffix"] = ( - video_data.get("navigationEndpoint", {}) - .get("commandMetadata", {}) - .get("webCommandMetadata", {}) - .get("url", None) - ) - res["owner_url"] = f"{BASE_URL}{owner_url_suffix}" - res["owner_name"] = ( - video_data.get("ownerText", {}).get("runs", [{}])[0].get("text") - ) - self.__videos.append(res) - - def __parse_html(self, response: Union[str, dict]) -> Iterator[list]: - """ - Parse the html response to get the videos - - Parameters - ---------- - response: Union[str, dict] - The response body - - Returns - ------- - Iterator[list] - Contains list of video data - """ - if self.__api_key: - return response.get("onResponseReceivedCommands", [{}])[0].get( - "appendContinuationItemsAction" - ).get("continuationItems") - - start = response.index("ytInitialData") + len("ytInitialData") + 3 - end = response.index("};", start) + 1 - json_str = response[start:end] - data = self.json.loads(json_str) - self.__api_key = re.search( - r"(?:\"INNERTUBE_API_KEY\":\")(?P[A-Za-z0-9_-]+)(?:\",)", - response, - )["api_key"] - self.__data["context"] = self.json.loads( - re.search( - r"(?:\"INNERTUBE_CONTEXT\"\:)(?P\{(.*)\})(?:,\"INNERTUBE_CONTEXT_CLIENT_NAME\")", - response, - re.DOTALL, - )["context"] - ) - self.__data["continuation"] = re.search( - r"(?:\"continuationCommand\":{\"token\":\")(?P.+)(?:\",\"request\")", - response, - )["token"] - return data["contents"]["twoColumnSearchResultsRenderer"]["primaryContents"][ - "sectionListRenderer" - ]["contents"] - async def __search(self, query: str, first: bool = False): """ Search wrapper @@ -463,55 +356,36 @@ async def __search(self, query: str, first: bool = False): if first: url = f"{BASE_URL}/results?search_query={encode_url(query)}" async with self.__session.get( - url, cookies=self.__cookies, **self.__requests_kwargs + url, cookies=self._cookies, **self._requests_kwargs ) as resp: resp.raise_for_status() body = await resp.text() - await self.__get_video(body) + self._get_video(body) return - url = f"{BASE_URL}/youtubei/v1/search?{self.__api_key}&prettyPrint=false" + url = f"{BASE_URL}/youtubei/v1/search?{self._api_key}&prettyPrint=false" async with self.__session.post( url, - cookies=self.__cookies, - data=self.json.dumps(self.__data), + cookies=self._cookies, + data=self.json.dumps(self._data), headers={"content-type": "application/json"}, - **self.__requests_kwargs, + **self._requests_kwargs, ) as resp: resp.raise_for_status() body = await resp.json(loads=self.json.loads) - await self.__get_video(body) + self._get_video(body) async def close(self) -> None: """ Close the context manager """ - self.__api_key = None - self.__data.clear() - self.__videos.clear() + self._api_key = None + self._data.clear() + self._videos.clear() await self.__session.close() await asyncio.sleep( 0.250 ) # https://docs.aiohttp.org/en/stable/client_advanced.html#graceful-shutdown - def list(self, clear_cache: bool = True) -> List[dict]: - """ - Return the list of videos - - Parameters - ---------- - clear_cache: bool, default True - Clear the result cache - - Return - ------ - List[dict]: - The list of videos - """ - result = self.__videos.copy() - if clear_cache: - self.__videos.clear() - return result - async def search(self, query: str = None, pages: int = 1) -> "AsyncYoutubeSearch": """ Parameters @@ -526,11 +400,11 @@ async def search(self, query: str = None, pages: int = 1) -> "AsyncYoutubeSearch self AsyncYoutubeSearch object """ - self.__videos.clear() + self._videos.clear() if query: - self.__api_key = None - self.__data.clear() - if self.__api_key is None and not query: + self._api_key = None + self._data.clear() + if self._api_key is None and not query: raise ValueError("Last search query not found!") tasks = [] for i in range(pages): diff --git a/youtube_search/video.py b/youtube_search/video.py index afd7825..3b35da0 100644 --- a/youtube_search/video.py +++ b/youtube_search/video.py @@ -238,6 +238,7 @@ class BaseYoutubeVideo(ABC): """ Base class for youtube video """ + def __init__(self, data: dict): self._data = data self._options: Options = None @@ -494,7 +495,7 @@ def __init__( url, ) and not re.match( r"^(?:https?://)(?:youtu\.be/|(?:www\.|m\.)?youtube\.com/)(?:shorts/)(?P[a-zA-Z0-9\_-]{7,15})(?:[\?&][a-zA-Z0-9\_-]+=[a-zA-Z0-9\_\.-]+)*$", - url + url, ): raise InvalidURLError(f"{url} isn't valid url") self._data = {} @@ -540,7 +541,7 @@ def __init__( url, ) and not re.match( r"^(?:https?://)(?:youtu\.be/|(?:www\.|m\.)?youtube\.com/)(?:shorts/)(?P[a-zA-Z0-9\_-]{7,15})(?:[\?&][a-zA-Z0-9\_-]+=[a-zA-Z0-9\_\.-]+)*$", - url + url, ): raise InvalidURLError(f"{url} isn't valid url") self._data = {}