結果だけでなく過程も見てください

日々の奮闘を綴る日記です。

Youtube APIで生放送のアーカイブの全コメントを拾ったり、スパチャ額の合計を取得する方法

今回もネットで集めた情報+詰まった箇所等を修正して紹介いたします。言語はPython 3.6.4です。
今回は放送中の生放送ではなく、アーカイブに対して実施することを目的にしています。

また今後のYoutubeおよび開発者用のページはバージョンアップによりレイアウトや仕様が変わる可能性があります。
その場合はこの記事は形骸化すると思いますので、良い感じに自分で対応してくださいませ。

Googleアカウントを作成する

これはみんな持っていると思うので省略しますよ。

Google API(YouTube Data API v3)の準備をする

まずは開発用プロジェクトの作成を実施しましょう。以下のサイトにアクセスしてGoogleアカウントでログインします。
Google Developers

次に以下にアクセスしてプロジェクトを作成します。APIを使いたい場合、まずはプロジェクトを作成して、そのプロジェクト内で使うAPIを登録する必要があるのです。
Google Cloud Platform

プロジェクトを作成したら画面左上の「Google API」の画像リンクからトップに一度戻ります。
その後「+ APIとサービスの有効化」ボタンを押します。APIを検索するページに遷移するため、APIの検索では「YouTube Data API v3」で検索しましょう。出てきたリンクをクリックしてAPIを追加します。

具体的にこのあたりどの値を入力するか忘れたので、以下サイトの「How do you get a YouTube API Key?」の1.~7.を参考にしてみてください。
How To Get a YouTube API Key (in 7 Simple Steps) [API Tutorial] | RapidAPI

APIキーの取得

続いてまた画面左上の「Google API」の画像リンクからトップに一度戻ります。
先ほど作成したプロジェクトが選ばれていることを確認したあと、左側のリストから、「認証情報」をクリックします。そこでたったいま作成したAPIAPIキー(ランダムな文字列)があります。このキーを使ってプログラムからAPIを実行するのでこのキーは大切に保管してください(他人に教えずに自分だけ使ってください)。

Youtubeにアクセスするためのオブジェクトの作成

今回は一ファイルでやります。必要なパッケージとかもまとめて記載しちゃいます。
入っていないものはpipでインストールしましょう!自分は以下だけでしたが、もしかしたらrequestsとrequests_htmlはインストールの必要があるかもしれません。

pip install --upgrade google-api-python-client
pip install bs4
pip install retry
from apiclient.discovery import build
import json
import os
from bs4 import BeautifulSoup
import ast
import requests
import requests_html
import re
import sys
import glob
from retry import retry

youtube = build('youtube', 'v3', developerKey='<ここに↑で取得したAPIキーを入れます')

APIを使って動画の一覧(投稿日時、タイトル、動画のID)を取得する

このyoutubeオブジェクトを使ってAPIを使っていきます。
ここではチャンネルIDを指定して、動画の一覧(投稿日時、タイトル、動画のID)を取得します。
結果は変数videosに格納されますが、同時にC:\temp\ytdump.txtに情報を保存しています。

なお、自分はテストでここを何度も実行していたら一日に実行できる上限に到達してしまい、APIが実行できなくなってしまいました。
ここで欲しいのは動画のIDだけなので、動画のIDを取得したらこの部分はできるだけ実行しないようにしましょう!
channel_id = '<目的のユーザーのチャンネル(ページ)へ行き、URLの後ろに記載されている文字列をここに入力します>'
nextPagetoken = None
curToken = None

# 最終的な結果がここに格納されます
videos = {}

while True:
    if nextPagetoken != None:
        curToken = nextPagetoken

    search_response = youtube.search().list(
        part = "id,snippet",
        channelId = channel_id,
        maxResults = 50,    # 最大50
        order = "date",     # 日付順にソート
        pageToken = curToken
    ).execute()

    with open( os.path.join("C:","temp","ytdump.txt"), "a+") as f:
        for search_result in search_response.get("items", []):
            if search_result["id"]["kind"] == "youtube#video":
                values = []
                values.append(search_result["snippet"]["publishedAt"])
                values.append(search_result["snippet"]["title"])
                videos[search_result["id"]["videoId"]] = values
                f.write(search_result["snippet"]["publishedAt"])
                f.write(search_result["snippet"]["title"])
                f.write(search_result["id"]["videoId"])
                f.write("\n")
                print(search_result["snippet"]["publishedAt"])
                print(search_result["snippet"]["title"])
                print(search_result["id"]["videoId"])

    try:
        nextPagetoken = search_response["nextPageToken"]
    except:
        break

コメントをクロールして取得していく

ブラウザで取得するHTMLとスクリプトで取得するHTMLを同じにするため、ここではrequestsではなく、requests_htmlを使用しています。結果はデフォルトで「スクリプトを実行した位置+livechatlog+動画ID.csv」となります。

@retry(EOFError, tries=5, delay=10)
def YoutubeChatReplayCrawler(video_id, output_dir="./"):
    # URLの頭部分
    youtube_url = "https://www.youtube.com/watch?v="
    # json形式のチャットログのファイル名
    filename = output_dir + "livechatlog" + video_id + ".json"
    # json形式からcsvにしたログのファイル名
    converted_file = output_dir + "livechatlog" + video_id + ".csv"
    # 目的の動画のURL
    target_url = youtube_url + video_id
    dict_str = ''
    next_url = ''
    # コメントデータ
    comment_data = []
    # HTTPリクエストのセッション
    session = requests_html.HTMLSession()
    headers = {'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36'}

    if glob.glob(filename):
        print("Already Exists json file")
        return
    elif glob.glob(converted_file):
        print("Already Exists json file")
        return

    # 動画ページの取得
    resp = session.get(target_url)
    resp.html.render(sleep=20)  # レンダリング完了のため20秒まつ

    RE_EMOJI = re.compile('[\U00010000-\U0010ffff]', flags=re.UNICODE)

    for iframe in resp.html.find("iframe"):
        if("live_chat_replay" in iframe.attrs["src"]):
            next_url = "".join(["https://www.youtube.com", iframe.attrs["src"]])

    if not next_url:
        print("Couldn't find live_chat_replay iframe. Maybe try running again?")
        raise EOFError("Couldn't find live_chat_replay iframe. Maybe try running again?")
        return (None)

    while(1):
        try:
            html = session.get(next_url, headers=headers)
            soup = BeautifulSoup(html.text, 'lxml')

            for script in soup.find_all('script'):
                script_text = str(script)
                if 'ytInitialData' in script_text:
                    dict_str = ''.join(script_text.split(" = ")[1:])

            # boolの表記をPython dictで扱えるように変換する
            dict_str = dict_str.replace("false", "False")
            dict_str = dict_str.replace("true", "True")

            # jsonから余分なHTMLを分割して捨てる
            dict_str = re.sub(r'};.*\n.+<\/script>', '}', dict_str)

            # 末尾の一部文字列を削除
            dict_str = dict_str.rstrip("  \n;")

            dics = ast.literal_eval(dict_str)

            # これ以上continuationsもactionsもない場合、最後のループでKeyErrorが発生する。その場合はおとなしくループを抜けます。
            continue_url = dics["continuationContents"]["liveChatContinuation"]["continuations"][0]["liveChatReplayContinuationData"]["continuation"]
            next_url = "https://www.youtube.com/live_chat_replay?continuation=" + continue_url

            for samp in dics["continuationContents"]["liveChatContinuation"]["actions"]:
                comment_data.append(str(samp) + "\n")

        # next_urlが入手できなくなったら終わり
        except requests.ConnectionError:
            print("Connection Error")
            continue
        except requests.HTTPError:
            print("HTTPError")
            continue
        except requests.Timeout:
            print("Timeout")
            continue
        except requests.exceptions.RequestException as e:
            print(e)
            break
        except KeyError as e:
            error = str(e)
            if 'liveChatReplayContinuationData' in error:
                print('Hit last live chat segment, finishing job.')
            else:
                print("KeyError")
                print(e)
            break
        except SyntaxError as e:
            print("SyntaxError")
            print(e)
            break
        except KeyboardInterrupt:
            break
        except Exception:
            print("Unexpected error:" + str(sys.exc_info()[0]))

    # 結果出力
    with open(filename, mode='w', encoding="utf-8") as f:
        f.writelines(comment_data)

    print('Comment data saved to ' + filename)
    return (filename)

YoutubeChatReplayCrawler関数で取得したjsonデータから必要なデータを抜き出してtsv形式で出力します。tsvはタブ区切りのcsvみたいなものでExcelに直接貼り付けることができます。

def chatReplayConverter(filepath):
    filename = os.path.basename(filepath)
    dirname  = os.path.dirname(filepath)
    target_id = filename.split('.')[0]
    if dirname != "":
       output_path = dirname + "/" + target_id + ".tsv"
    else:
       output_path = target_id + ".tsv"

    count = 1
    result = "LiveID\tSerialNumber\tType\tAuthor\tTime\tAmount\tComment\n"  # ヘッダで初期化しておく
    with open(filepath, 'r', encoding='utf-8') as f:
        lines = f.readlines()
        for line in lines:
            sys.stdout.write('\rProcessing line %d' % (count))

            # 関係ない行のスキップ
            # liveChatViewerEngagementMessageRendererは最初に表示されるYoutubeからのメッセージ
            # liveChatTickerPaidMessageItemRendererはスパチャの行の次の行に表示されているもので、数えると二重カウントされそうなのでスキップする
            # addLiveChatTickerItemActionはメンバーシップ関連。おそらくスキップしても大丈夫だと思う。
            # liveChatTickerSponsorItemRendererはメンバーシップ関連で、システムからのメッセージ?
            # liveChatPlaceholderItemRendererはコメントの撤回。
            # liveChatTickerPaidStickerItemRendererは、addLiveChatTickerItemActionと同時に出るもの。絵文字のみのスパチャのときに出る。スパチャの額とは関係ないのでここではスキップする。
            if ('liveChatViewerEngagementMessageRenderer' in line) or \
               ('liveChatTickerPaidMessageItemRenderer' in line) or \
               ('addLiveChatTickerItemAction' in line) or \
               ('liveChatTickerPaidStickerItemRenderer' in line) or \
               ('liveChatTickerSponsorItemRenderer' in line) or \
               ('liveChatPlaceholderItemRenderer' in line):
                continue

            ql = line
            frac = target_id
            frac += ("\t#Chat No.%05d" % count)
            info = ast.literal_eval(ql)  # convert str to dict by python grammer.

            # デフォルト値をここで更新する
            purchaseAmout = "0"

            # 通常チャットの投稿
            # 'item'のペアの要素が'liveChatTextMessageRenderer'である。
            if 'liveChatTextMessageRenderer' in line:
                # 一行ごとにこのあたりのjsonはずっと入れ子になっているだけ。これの直後は'message' 'runs' 'text' で実際のコメントが続いている。
                info = info['replayChatItemAction']['actions'][0]['addChatItemAction']['item']['liveChatTextMessageRenderer']
                content = ""
                if 'simpleText' in info['message']:
                    content = info['message']['simpleText']  # 投稿者の名前
                elif 'runs' in info['message']:
                    for fragment in info['message']['runs']:
                        if 'text' in fragment:
                            content += fragment['text']
                else:
                    print("no text")
                    continue

                authorName = info['authorName']['simpleText']
                time = info['timestampText']['simpleText']
                #frac += ", type: NORMALCHAT, user: \"" + authorName + "\", time: " + time + ", amount: 0, \"" + content + "\"\n"
                frac += "\t" + "NORMALCHAT" + "\t" + authorName + "\t" + time + "\t0\t" + content + "\n"

            # スーパーチャットの投稿
            # liveChatPaidMessageRendererは通常のスパチャ
            # liveChatPaidStickerRendererはスパチャで絵文字のみ
            elif ('liveChatPaidMessageRenderer' in line) or ('liveChatPaidStickerRenderer' in line):

                # 種別によってjsonの属性名が異なるためここで変数化しておく
                sctype = ""
                if 'liveChatPaidMessageRenderer' in line:
                    sctype = 'liveChatPaidMessageRenderer'
                else:
                    sctype = 'liveChatPaidStickerRenderer'

                info = info['replayChatItemAction']['actions'][0]['addChatItemAction']['item'][sctype]
                content = ""
                if 'message' in info:
                    if 'simpleText' in info['message']:
                       content = info['message']['simpleText']
                    elif 'runs' in info['message']:
                        for fragment in info['message']['runs']:
                            if 'text' in fragment:
                                content += fragment['text']
                    else:
                        print("no text")
                        # continue

                if 'authorName' in info:
                    authorName = info['authorName']['simpleText']
                else:
                    authorName = "%anonymous%"
                time = info['timestampText']['simpleText']
                purchaseAmout = info['purchaseAmountText']['simpleText']
                #frac += ", type: SUPERCHAT, user: \"" + authorName + "\", time: " + time + ", amount: " + purchaseAmout + ", \"" + content + "\"\n"
                frac += "\t" + "SUPERCHAT" + "\t" + authorName + "\t" + time + "\t" + purchaseAmout + "\t" + content + "\n"

            # メンバーシップに加入したときの投稿
            elif 'liveChatMembershipItemRenderer' in line:
                info = info['replayChatItemAction']['actions'][0]['addChatItemAction']['item']['liveChatMembershipItemRenderer']
                content = ""
                if 'authorName' in info:
                    authorName = info['authorName']['simpleText']
                else:
                    authorName = "%anonymous%"
                time = info['timestampText']['simpleText']
                frac += "\t" + "MEMBERSHIP" + "\t" + authorName + "\t" + time + "\t" + purchaseAmout + "\t" + content + "\n"

            # どちらでもない場合(想定外や仕様変更が生じた場合ここに来る)
            else:
                frac += "\t" + "OTHER" + "\t" + "-" + "\t" + "-" + "\t" + "0" + "\t" + "\n"
            
            result += frac
            count += 1

        sys.stdout.write('\nDone!\n')
        try:
            with open(output_path, 'w', encoding='utf-8') as f:
                f.write(result)
        except:
            print("Cannot convert json file")

結果サンプル

tsvなのでExcelに直接貼り付けられます。以下の貼り付けたサンプル(一部消し)をご紹介します。
f:id:taiyakisun:20201013223428j:plain

プライバシーポリシー お問い合わせ