from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
import urllib.parse as p
import re
import os
import pickle
SCOPES = ["https://www.googleapis.com/auth/youtube.force-ssl"]
def youtube_authenticate():
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
api_service_name = "youtube"
api_version = "v3"
client_secrets_file = "credentials.json"
creds = None
# the file token.pickle stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first time
if os.path.exists("token.pickle"):
with open("token.pickle", "rb") as token:
creds = pickle.load(token)
# if there are no (valid) credentials availablle, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(client_secrets_file, SCOPES)
creds = flow.run_local_server(port=0)
# save the credentials for the next run
with open("token.pickle", "wb") as token:
pickle.dump(creds, token)
return build(api_service_name, api_version, credentials=creds)
def get_channel_details(youtube, **kwargs):
return youtube.channels().list(
part="statistics,snippet,contentDetails",
**kwargs
).execute()
def search(youtube, **kwargs):
return youtube.search().list(
part="snippet",
**kwargs
).execute()
def get_video_details(youtube, **kwargs):
return youtube.videos().list(
part="snippet,contentDetails,statistics",
**kwargs
).execute()
def print_video_infos(video_response):
items = video_response.get("items")[0]
# get the snippet, statistics & content details from the video response
snippet = items["snippet"]
statistics = items["statistics"]
content_details = items["contentDetails"]
# get infos from the snippet
channel_title = snippet["channelTitle"]
title = snippet["title"]
description = snippet["description"]
publish_time = snippet["publishedAt"]
# get stats infos
comment_count = statistics["commentCount"]
like_count = statistics["likeCount"]
view_count = statistics["viewCount"]
# get duration from content details
duration = content_details["duration"]
# duration in the form of something like 'PT5H50M15S'
# parsing it to be something like '5:50:15'
parsed_duration = re.search(f"PT(\d+H)?(\d+M)?(\d+S)", duration).groups()
duration_str = ""
for d in parsed_duration:
if d:
duration_str += f"{d[:-1]}:"
duration_str = duration_str.strip(":")
print(f"""
Title: {title}
Description: {description}
Channel Title: {channel_title}
Publish time: {publish_time}
Duration: {duration_str}
Number of comments: {comment_count}
Number of likes: {like_count}
Number of views: {view_count}
""")
def parse_channel_url(url):
"""
This function takes channel `url` to check whether it includes a
channel ID, user ID or channel name
"""
path = p.urlparse(url).path
id = path.split("/")[-1]
if "/c/" in path:
return "c", id
elif "/channel/" in path:
return "channel", id
elif "/user/" in path:
return "user", id
def get_channel_id_by_url(youtube, url):
"""
Returns channel ID of a given `id` and `method`
- `method` (str): can be 'c', 'channel', 'user'
- `id` (str): if method is 'c', then `id` is display name
if method is 'channel', then it's channel id
if method is 'user', then it's username
"""
# parse the channel URL
method, id = parse_channel_url(url)
if method == "channel":
# if it's a channel ID, then just return it
return id
elif method == "user":
# if it's a user ID, make a request to get the channel ID
response = get_channel_details(youtube, forUsername=id)
items = response.get("items")
if items:
channel_id = items[0].get("id")
return channel_id
elif method == "c":
# if it's a channel name, search for the channel using the name
# may be inaccurate
response = search(youtube, q=id, maxResults=1)
items = response.get("items")
if items:
channel_id = items[0]["snippet"]["channelId"]
return channel_id
raise Exception(f"Cannot find ID:{id} with {method} method")
def get_video_id_by_url(url):
"""
Return the Video ID from the video `url`
"""
# split URL parts
parsed_url = p.urlparse(url)
# get the video ID by parsing the query of the URL
video_id = p.parse_qs(parsed_url.query).get("v")
if video_id:
return video_id[0]
else:
raise Exception(f"Wasn't able to parse video URL: {url}")