今回もネットで集めた情報+詰まった箇所等を修正して紹介いたします。言語はPython 3.6.4です。
今回は放送中の生放送ではなく、アーカイブに対して実施することを目的にしています。
また今後のYoutubeおよび開発者用のページはバージョンアップによりレイアウトや仕様が変わる可能性があります。 その場合はこの記事は形骸化すると思いますので、良い感じに自分で対応してくださいませ。
Googleアカウントを作成する
これはみんな持っていると思うので省略しますよ。
Google API(YouTube Data API v3)の準備をする
まずは開発用プロジェクトの作成を実施しましょう。以下のサイトにアクセスしてGoogleアカウントでログインします。
Google Developers
次に以下にアクセスしてプロジェクトを作成します。APIを使いたい場合、まずはプロジェクトを作成して、そのプロジェクト内で使うAPIを登録する必要があるのです。
Google Cloud Platform
プロジェクトを作成したら画面左上の「Google API」の画像リンクからトップに一度戻ります。
その後「+ APIとサービスの有効化」ボタンを押します。APIを検索するページに遷移するため、APIの検索では「YouTube Data API v3」で検索しましょう。出てきたリンクをクリックしてAPIを追加します。
具体的にこのあたりどの値を入力するか忘れたので、以下サイトの「How do you get a YouTube API Key?」の1.~7.を参考にしてみてください。
How To Get a YouTube API Key (in 7 Simple Steps) [API Tutorial] | RapidAPI
APIキーの取得
続いてまた画面左上の「Google API」の画像リンクからトップに一度戻ります。
先ほど作成したプロジェクトが選ばれていることを確認したあと、左側のリストから、「認証情報」をクリックします。そこでたったいま作成したAPIのAPIキー(ランダムな文字列)があります。このキーを使ってプログラムからAPIを実行するのでこのキーは大切に保管してください(他人に教えずに自分だけ使ってください)。
Youtubeにアクセスするためのオブジェクトの作成
今回は一ファイルでやります。必要なパッケージとかもまとめて記載しちゃいます。
入っていないものはpipでインストールしましょう!自分は以下だけでしたが、もしかしたらrequestsとrequests_htmlはインストールの必要があるかもしれません。
pip install --upgrade google-api-python-client pip install bs4 pip install retry
from apiclient.discovery import build import json import os from bs4 import BeautifulSoup import ast import requests import requests_html import re import sys import glob from retry import retry youtube = build('youtube', 'v3', developerKey='<ここに↑で取得したAPIキーを入れます')
APIを使って動画の一覧(投稿日時、タイトル、動画のID)を取得する
このyoutubeオブジェクトを使ってAPIを使っていきます。
ここではチャンネルIDを指定して、動画の一覧(投稿日時、タイトル、動画のID)を取得します。
結果は変数videosに格納されますが、同時にC:\temp\ytdump.txtに情報を保存しています。
なお、自分はテストでここを何度も実行していたら一日に実行できる上限に到達してしまい、APIが実行できなくなってしまいました。 ここで欲しいのは動画のIDだけなので、動画のIDを取得したらこの部分はできるだけ実行しないようにしましょう!
channel_id = '<目的のユーザーのチャンネル(ページ)へ行き、URLの後ろに記載されている文字列をここに入力します>' nextPagetoken = None curToken = None # 最終的な結果がここに格納されます videos = {} while True: if nextPagetoken != None: curToken = nextPagetoken search_response = youtube.search().list( part = "id,snippet", channelId = channel_id, maxResults = 50, # 最大50 order = "date", # 日付順にソート pageToken = curToken ).execute() with open( os.path.join("C:","temp","ytdump.txt"), "a+") as f: for search_result in search_response.get("items", []): if search_result["id"]["kind"] == "youtube#video": values = [] values.append(search_result["snippet"]["publishedAt"]) values.append(search_result["snippet"]["title"]) videos[search_result["id"]["videoId"]] = values f.write(search_result["snippet"]["publishedAt"]) f.write(search_result["snippet"]["title"]) f.write(search_result["id"]["videoId"]) f.write("\n") print(search_result["snippet"]["publishedAt"]) print(search_result["snippet"]["title"]) print(search_result["id"]["videoId"]) try: nextPagetoken = search_response["nextPageToken"] except: break
コメントをクロールして取得していく
ブラウザで取得するHTMLとスクリプトで取得するHTMLを同じにするため、ここではrequestsではなく、requests_htmlを使用しています。結果はデフォルトで「スクリプトを実行した位置+livechatlog+動画ID.csv」となります。
@retry(EOFError, tries=5, delay=10) def YoutubeChatReplayCrawler(video_id, output_dir="./"): # URLの頭部分 youtube_url = "https://www.youtube.com/watch?v=" # json形式のチャットログのファイル名 filename = output_dir + "livechatlog" + video_id + ".json" # json形式からcsvにしたログのファイル名 converted_file = output_dir + "livechatlog" + video_id + ".csv" # 目的の動画のURL target_url = youtube_url + video_id dict_str = '' next_url = '' # コメントデータ comment_data = [] # HTTPリクエストのセッション session = requests_html.HTMLSession() headers = {'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36'} if glob.glob(filename): print("Already Exists json file") return elif glob.glob(converted_file): print("Already Exists json file") return # 動画ページの取得 resp = session.get(target_url) resp.html.render(sleep=20) # レンダリング完了のため20秒まつ RE_EMOJI = re.compile('[\U00010000-\U0010ffff]', flags=re.UNICODE) for iframe in resp.html.find("iframe"): if("live_chat_replay" in iframe.attrs["src"]): next_url = "".join(["https://www.youtube.com", iframe.attrs["src"]]) if not next_url: print("Couldn't find live_chat_replay iframe. Maybe try running again?") raise EOFError("Couldn't find live_chat_replay iframe. Maybe try running again?") return (None) while(1): try: html = session.get(next_url, headers=headers) soup = BeautifulSoup(html.text, 'lxml') for script in soup.find_all('script'): script_text = str(script) if 'ytInitialData' in script_text: dict_str = ''.join(script_text.split(" = ")[1:]) # boolの表記をPython dictで扱えるように変換する dict_str = dict_str.replace("false", "False") dict_str = dict_str.replace("true", "True") # jsonから余分なHTMLを分割して捨てる dict_str = re.sub(r'};.*\n.+<\/script>', '}', dict_str) # 末尾の一部文字列を削除 dict_str = dict_str.rstrip(" \n;") dics = ast.literal_eval(dict_str) # これ以上continuationsもactionsもない場合、最後のループでKeyErrorが発生する。その場合はおとなしくループを抜けます。 continue_url = dics["continuationContents"]["liveChatContinuation"]["continuations"][0]["liveChatReplayContinuationData"]["continuation"] next_url = "https://www.youtube.com/live_chat_replay?continuation=" + continue_url for samp in dics["continuationContents"]["liveChatContinuation"]["actions"]: comment_data.append(str(samp) + "\n") # next_urlが入手できなくなったら終わり except requests.ConnectionError: print("Connection Error") continue except requests.HTTPError: print("HTTPError") continue except requests.Timeout: print("Timeout") continue except requests.exceptions.RequestException as e: print(e) break except KeyError as e: error = str(e) if 'liveChatReplayContinuationData' in error: print('Hit last live chat segment, finishing job.') else: print("KeyError") print(e) break except SyntaxError as e: print("SyntaxError") print(e) break except KeyboardInterrupt: break except Exception: print("Unexpected error:" + str(sys.exc_info()[0])) # 結果出力 with open(filename, mode='w', encoding="utf-8") as f: f.writelines(comment_data) print('Comment data saved to ' + filename) return (filename)
YoutubeChatReplayCrawler関数で取得したjsonデータから必要なデータを抜き出してtsv形式で出力します。tsvはタブ区切りのcsvみたいなものでExcelに直接貼り付けることができます。
def chatReplayConverter(filepath): filename = os.path.basename(filepath) dirname = os.path.dirname(filepath) target_id = filename.split('.')[0] if dirname != "": output_path = dirname + "/" + target_id + ".tsv" else: output_path = target_id + ".tsv" count = 1 result = "LiveID\tSerialNumber\tType\tAuthor\tTime\tAmount\tComment\n" # ヘッダで初期化しておく with open(filepath, 'r', encoding='utf-8') as f: lines = f.readlines() for line in lines: sys.stdout.write('\rProcessing line %d' % (count)) # 関係ない行のスキップ # liveChatViewerEngagementMessageRendererは最初に表示されるYoutubeからのメッセージ # liveChatTickerPaidMessageItemRendererはスパチャの行の次の行に表示されているもので、数えると二重カウントされそうなのでスキップする # addLiveChatTickerItemActionはメンバーシップ関連。おそらくスキップしても大丈夫だと思う。 # liveChatTickerSponsorItemRendererはメンバーシップ関連で、システムからのメッセージ? # liveChatPlaceholderItemRendererはコメントの撤回。 # liveChatTickerPaidStickerItemRendererは、addLiveChatTickerItemActionと同時に出るもの。絵文字のみのスパチャのときに出る。スパチャの額とは関係ないのでここではスキップする。 if ('liveChatViewerEngagementMessageRenderer' in line) or \ ('liveChatTickerPaidMessageItemRenderer' in line) or \ ('addLiveChatTickerItemAction' in line) or \ ('liveChatTickerPaidStickerItemRenderer' in line) or \ ('liveChatTickerSponsorItemRenderer' in line) or \ ('liveChatPlaceholderItemRenderer' in line): continue ql = line frac = target_id frac += ("\t#Chat No.%05d" % count) info = ast.literal_eval(ql) # convert str to dict by python grammer. # デフォルト値をここで更新する purchaseAmout = "0" # 通常チャットの投稿 # 'item'のペアの要素が'liveChatTextMessageRenderer'である。 if 'liveChatTextMessageRenderer' in line: # 一行ごとにこのあたりのjsonはずっと入れ子になっているだけ。これの直後は'message' 'runs' 'text' で実際のコメントが続いている。 info = info['replayChatItemAction']['actions'][0]['addChatItemAction']['item']['liveChatTextMessageRenderer'] content = "" if 'simpleText' in info['message']: content = info['message']['simpleText'] # 投稿者の名前 elif 'runs' in info['message']: for fragment in info['message']['runs']: if 'text' in fragment: content += fragment['text'] else: print("no text") continue authorName = info['authorName']['simpleText'] time = info['timestampText']['simpleText'] #frac += ", type: NORMALCHAT, user: \"" + authorName + "\", time: " + time + ", amount: 0, \"" + content + "\"\n" frac += "\t" + "NORMALCHAT" + "\t" + authorName + "\t" + time + "\t0\t" + content + "\n" # スーパーチャットの投稿 # liveChatPaidMessageRendererは通常のスパチャ # liveChatPaidStickerRendererはスパチャで絵文字のみ elif ('liveChatPaidMessageRenderer' in line) or ('liveChatPaidStickerRenderer' in line): # 種別によってjsonの属性名が異なるためここで変数化しておく sctype = "" if 'liveChatPaidMessageRenderer' in line: sctype = 'liveChatPaidMessageRenderer' else: sctype = 'liveChatPaidStickerRenderer' info = info['replayChatItemAction']['actions'][0]['addChatItemAction']['item'][sctype] content = "" if 'message' in info: if 'simpleText' in info['message']: content = info['message']['simpleText'] elif 'runs' in info['message']: for fragment in info['message']['runs']: if 'text' in fragment: content += fragment['text'] else: print("no text") # continue if 'authorName' in info: authorName = info['authorName']['simpleText'] else: authorName = "%anonymous%" time = info['timestampText']['simpleText'] purchaseAmout = info['purchaseAmountText']['simpleText'] #frac += ", type: SUPERCHAT, user: \"" + authorName + "\", time: " + time + ", amount: " + purchaseAmout + ", \"" + content + "\"\n" frac += "\t" + "SUPERCHAT" + "\t" + authorName + "\t" + time + "\t" + purchaseAmout + "\t" + content + "\n" # メンバーシップに加入したときの投稿 elif 'liveChatMembershipItemRenderer' in line: info = info['replayChatItemAction']['actions'][0]['addChatItemAction']['item']['liveChatMembershipItemRenderer'] content = "" if 'authorName' in info: authorName = info['authorName']['simpleText'] else: authorName = "%anonymous%" time = info['timestampText']['simpleText'] frac += "\t" + "MEMBERSHIP" + "\t" + authorName + "\t" + time + "\t" + purchaseAmout + "\t" + content + "\n" # どちらでもない場合(想定外や仕様変更が生じた場合ここに来る) else: frac += "\t" + "OTHER" + "\t" + "-" + "\t" + "-" + "\t" + "0" + "\t" + "\n" result += frac count += 1 sys.stdout.write('\nDone!\n') try: with open(output_path, 'w', encoding='utf-8') as f: f.write(result) except: print("Cannot convert json file")
結果サンプル
tsvなのでExcelに直接貼り付けられます。以下の貼り付けたサンプル(一部消し)をご紹介します。