にほんごのれんしゅう

日本語として伝えるための訓練を兼ねたテクログ

AWS EMR Hadoop Streaming Examples

AWS EMR Hadoop Streaming Examples

GCPのDataFlowの方が、AWS EMRより個人的にはモダンな印象があるのですが、業務でAWSで非構造化データの大規模な分析が必要になる可能性があり、Hadoop Streamingの仕組みを軽くおさらいして、いくつかの言語で動かしました  

Map Reduceの仕組み自体は古いのですが、論文にもなっており、この構成でほとんどの集計が可能みたいなことを言っており、パワーを感じます[1]  

 
図1. イメージしているところのMapReduceの構成

AWSのElastic Map Reduceを利用してHadoop Streamingで任意の言語で、ビッグデータを処理する方法を説明します  

任意の言語で処理をつなげることができるため、AWS EMR(Hadoop Streaming)の仕組みさえ理解していれば、好きな言語で処理が可能です

ここでは、以下の言語におけるもっとも簡単な集計である、全てのドキュメントになんの語が何回出現するか、カウントするプログラムを例示します

  1. Python2
  2. Python3
  3. Ruby ( 2.4 )
  4. Go ( 1.8 )

S3にデータをおく

S3に分析する対象の非構造化データをフォルダを作っていれておきます   生データか、GZで圧縮されている必要があります
例えば、一つのバケットで処理する場合には、このようなフォルダ構成をとります

 

フォルダに必要なデータを適切に配置して、awscliのコンフィグレーションを通した状態で、次のコマンドでHadoop Streamingを実行します

$ aws emr add-steps --cluster-id j-{$YOUR_EMR_ID} --steps file://./WordCount_step.json --region ap-northeast-1

ここで、引数に指定されている JSONファイルはこのような記述になっています  

[
  {
     "Name": "WordCount",
     "Type": "STREAMING",
     "ActionOnFailure": "CONTINUE",
     "Args": [
         "-files",
         "s3://{$YOUR_S3}/wordcount-code/mapper,s3://{$YOUR_S3}/wordcount-code/reducer",
         "-mapper",
         "mapper",
         "-reducer",
         "reducer",
         "-input",
         "s3://{$YOUR_S3}/wordcount-dataset/",
         "-output",
         "s3://{$YOUR_S3}/wordcount-result"]
  }
]

(注:これはGoでバイナリを指定しているので、pythonrubyの場合、適切にfilesの引数を変えてください)

Python2でのワードカウント

各種インターネット上の文献では、Python2でワードカウントしていることが多いです  

AWSの解説サイトで紹介されていた方法で、集計してみます  

Reducerを特別な予約関数を割り当てることで省略できますが、ボトムアップ的に学ぶには、実装してしまった方が良いだろうという判断をしました  

mapper

#!/usr/bin/python
import sys
import re
 
def main(argv):
  pattern = re.compile("[a-zA-Z][a-zA-Z0-9]*")
  for line in sys.stdin:
    line = line.strip()
    for word in pattern.findall(line):
      print( word.lower() + "\t" + "1" )
 
if __name__ == "__main__":
    main(sys.argv)

reducer

#!/usr/bin/python
import sys
import re

def main(argv):
  term_freq = {}
  for line in sys.stdin:
    line = line.strip()
    ents = line.split('\t')
    term, freq = ents
    freq = int(freq)
    try:
      if term_freq.get(term) == None:
        term_freq[term] = 0
      term_freq[term] += 1
    except Exception as e:
      print(e)
  for term, freq in term_freq.items():
    print( term, freq )

if __name__ == "__main__":
    main(sys.argv)

実行命令

$ aws emr add-steps --cluster-id j-{$YOUR_CLUSTER} --steps file://./WordCount_step.json --region ap-northeast-1

Python3でのワードカウント

Python3をインストールするため、Hadoopクラスタにログインする必要があります

AWSのデフォルトのセキュリティグループについては、sshでログインできないので、セキュリティグループを解放します

$ ssh -i {$KEY} hadoop@{$IPADDR}

Python35のインストール(必要に応じでバージョンを切り替えてください)

$ sudo yum install python35
$ sudo yum install python35-devel
$ sudo yum install python35-pip

mapper

#!/usr/bin/python3
import sys
import re
 
def main(argv):
  for line in sys.stdin:
    line = line.strip()
    for term in line.split():
      print('{}\t1'.format(term) )
if __name__ == "__main__":
    main(sys.argv)

reducer

#!/usr/bin/python3
import sys
import re

def main(argv):
  term_freq = {}
  for line in sys.stdin:
    line = line.strip()
    ents = line.split('\t')
    term, freq = ents
    freq = int(freq)
    try:
      if term_freq.get(term) == None:
        term_freq[term] = 0
      term_freq[term] += 1
    except Exception as e:
      print(e)
  for term, freq in term_freq.items():
    print( term, freq )

if __name__ == "__main__":
    main(sys.argv)

Rubyでのワードカウント

世の中には、Rubyestが多く、PythonでなくてRubyでやりたいという人も多いです  

Rubyは悪くない選択肢でもあるので、使い方を説明します

AWS EMRのノードにインストールされているバージョンは古く、アップデートします

rpmここのサイトからダウンロードしました

$ sudo yum remove ruby
$ sudo yum install ./ruby-2.4.1-1.el6.x86_64.rpm

mapper

#!/usr/bin/ruby
STDIN.each_line { |x|
  x.split(" ").map { |x| 
    puts sprintf("%s\t1", x.downcase)
  }
}

reducer

#!/usr/bin/ruby
term_freq = {}
STDIN.each_line { |x| 
  term, freq = x.split("\t")
  if term_freq[term] == nil  then 
    term_freq[term] = 0
  end
  term_freq[term] += 1
}

term_freq.each { |term, freq| 
  puts sprintf("%s %d", term, freq)
}

全体的にみて、PythonよりRubyの方がスッキリかけますね  

好みの問題である気がしてて、好きな言語で良いと思います  

Goでのワードカウント

私がstreamingにおいて、スクリプト言語より、速度やメモリリソースや、シングルバイナリになるという点で、良いと感じているのが、Go言語です  

Go言語はそのシンプルなシンタックスでありながら、ランタイムに依存することなく、実行可能なバイナリを出力が可能です

つまり、AWS EMRのノードに対して、なんらかログインしてソフトウェアをインストールやセットアップをする必要がありません

mapper

package main

import (
        "bufio"
        "fmt"
        "os"
        "strings"
)

func main() {
        scanner := bufio.NewScanner(os.Stdin)
        for scanner.Scan() {
                line := scanner.Text()
                terms := strings.Split(line, " ")
                for _, term := range terms {
                        out := fmt.Sprintf("%s\t1", term)
                        fmt.Println(out)
                }
        }
}

reducer

package main

import (
        "bufio"
        "fmt"
        "os"
        "strings"
)

func main() {
        dic := map[string]int{}
        scanner := bufio.NewScanner(os.Stdin)
        for scanner.Scan() {
                line := scanner.Text()
                ents := strings.Split(line, "\t")
                term := ents[0]
                _ = ents[1]

                _, ok := dic[term]
                if ok == false {
                        dic[term] = 0
                }
                dic[term] += 1
        }

        for term, freq := range dic {
                out := fmt.Sprintf("%s %d", term, freq)
                fmt.Println(out)
        }
}

コード

github.com

まとめ

いくつかの言語において、実際に、AWS EMRを動かして集計しました

PythonRubyおける使い方は、数年前からあまり進化を感じられませんが、Go言語という選択が可能になったことで、goroutineによる効率的な並列処理や、実行可能なバイナリであるというメモリが少なく済んだりするメリットがあるので、今までの集計方では集められなかった角度のデータが取得できそうで、未来が見えました

参考文献

[1] MapReduce: Simplified Data Processing on Large Clusters

XGBoost fizzbuzz

XGBoost fizzbuzz

XGBoostのFizzBuzzです  

勾配ブースティングでもFizzBuzzできるという例を示します

やろうと思った動機

DeepLearningならばFizzBuzzの3の倍数と5の倍数と15の倍数の時に、特定の動作をするというルールを獲得することは容易なのですが、他の機械学習アルゴリズムはどうでしょうか

XGBoostはその決定木の性質と、勾配ブースティングの学習アルゴリズムを解析的に説明した論文の内容を見ると、特定のルールを獲得することは難しくないんじゃないかと思いました[1]  

ただ、FizzBuzzを数値として扱ってしまうと、かなり厄介で、連続する値が大きい小さいなどで判別するのは容易ではありません

DeepLearning時と同じように、Character Levelで入力を扱います

具体的な数値データの取り扱い

数字を文字表現として皆して、各桁の数字を一つの特徴量として扱います  

図1. データの取り扱い

クラスの設定、目的関数の設定

クラスは"3の倍数の時のFizz",“5の倍数の時のBuzz”,“15の倍数の時のFizzBuzz”,“その他"の時の4つのクラスの分類問題にしました  

softmaxではなくて、softprobを用いました  

ドキュメントを読むと、各クラスの所属する確率として表現されるようです(クラスの数ぶん、sigmoidが配置されていると、同じ?)  

学習データ

0〜99999までの数字の各FizzBuzzを利用します  

この時、2割をランダムでテストデータに、8割を学習データに分割します  

各種パラメータ

このようにしました、もっと最適な設定があるかもしれないので、教えていただけると幸いです

etaが大きいのは、極めてroundが多いので、これ以上小さくするとまともな時間に学習が完了しません  

booster      = gbtree
objective    = multi:softprob
num_class    = 4
eta          = 1.0
gamma        = 1.0
min_child_weight = 1
max_depth   = 100
subsample   = 0.8
num_round   = 100000
save_period = 1000
colsample_bytree = 0.9
data        = "svm.fmt.train"
eval[test]  = "svm.fmt.test"
#eval_train = 1
test:data   = "svm.fmt.test"

プログラムの解説

githubにコードが置いてあります

データセットの準備

$ python3 createDataset.py --step1 # データセットの作成
$ python3 createDataset.py --step2 # 前処理
$ python3 createDataset.py --step3 # libsvmフォーマットを作成

学習

(xgboost.binはubuntu linux 16.04でコンパイルしたバイナリです。環境に合わせて適宜バイナリを用意してください)
(学習には、16コアのRyzen 1700Xで2時間程度かかります)

$ ./xgboost.bin fizzbuzz.train.conf

予想

(必要に応じて、使用するモデルを書き換えてください)

$ ./xgboost.bin fizzbuzz.predict.conf 

精度の確認

$ python3 predCheck.py

精度

50000roundでテストデータで以下の精度が出ます  

acc 0.9492

出力はこのようになります

    12518 predict class = 3 real class = 3
    42645 predict class = 2 real class = 0
    15296 predict class = 3 real class = 3
    47712 predict class = 3 real class = 1
     1073 predict class = 3 real class = 3
    66924 predict class = 1 real class = 1
    82852 predict class = 3 real class = 3
    26043 predict class = 1 real class = 1
    96556 predict class = 3 real class = 3
    81672 predict class = 1 real class = 1
    44018 predict class = 3 real class = 3
    16622 predict class = 3 real class = 3
    79924 predict class = 3 real class = 3
    15290 predict class = 2 real class = 2
    25276 predict class = 3 real class = 3

class 2は15の倍数なのですが、これの獲得が難しいようです

liblinear(support vector classification)との比較

一応、違う機械学習との比較ともやるべきでしょう  

L2-regularized L2-loss support vector classificationで動作する、liblinearで比較しました  

$ ./train -s 1 svm.fmt.train 
....*.*
optimization finished, #iter = 52
Objective value = -56679.234880
nSV = 64068
.....*
optimization finished, #iter = 51
Objective value = -56678.857821
nSV = 65083
....*.
optimization finished, #iter = 50
Objective value = -14306.576984
nSV = 17032
.....*
optimization finished, #iter = 51
Objective value = -14305.608585
nSV = 16957
$ ./predict svm.fmt.test svm.fmt.train.model output
Accuracy = 66.2298% (13240/19991)

精度が66%しか出ていません

やはり、XGBoostの精度で判別をすることはできないようです  

まとめ

DeepLearningでは精度100%を達成できましたが、XGBoostでは95%程度の精度です  

完全なルールの獲得は怪しいですが、それでもかなりいいところまで行っているようです  

また、目的関数を複数ラベルを取れるようにするなど、うまく設計すれば、もっといけるでしょう(勾配ブースティングのマルチラベル分類、どうやるんだろう)  

参考文献

[1] XGBoost: A Scalable Tree Boosting System

Multi Agent Deep Q Network for Keras

Multi Agent Deep Q Network for Keras

Kerasでマルチエージェント DQN

マルチエージェントラーニングは、相互に影響を与え合うモデルが強調ないし、敵対して、目的となる報酬を最大化するシチュエーションのディープラーニングです[1][2]

強化学習の特殊系と捉えることができそです  

Deep Mind社が提案したモデルの一部では非常に面白く、報酬の設定しだいでは各エージェントが協力したり敵対したりします。  

Kerasで敵対的な簡単なマルチエージェントラーニングを21言っちゃダメゲームでスクラッチで、構築しました

(これはQiitaのはむこさんの記事を参考にさせていただきました、ありがとうございます[3])

(調べながらやったこともあり、理論的な間違いを見つけたら、ツイッターで指摘していただけると助かります)

強化学習の理論

強化学習は、人間が特に正しい悪いなどを指定せずとも、なんらかの報酬系から値を得ることで報酬を最大化するよう学習します  

このとき、ある系列の状態をSとし、その時の行動をaとし、この組み合わせで得られる報酬関数をQとすします  

 この関数で、最適な行動を得られた時に*をつけて表し、この時の行動sについて最大となるsをとると、以下のように最適な行動πを得ることができます  

また報酬の割引率というものがあるのですが、今回は具体的に割引率を与えたり、求めるということをしていません

今回の例では、Q関数を具体的にDeepLearningによる関数としています

ϵ-greedy

全く意識していなかったのですが、どうやら行動の選択は初期値依存性がある程度あり、運が悪いと局所解に嵌ったなままなかなか更新してくれなくなります  

適度にランダムに行動を選択することを入れないとダメっぽいです  

ルール(問題設定)

先手、後手に別れて0から最小1、最大の3つ増やした数字を言い合います  

数字を累積していって、21以上のになるように言った時点で負けです。相手に21以上を踏ませれば勝ちです

今回設計した、Q関数

Q関数は状態と行動を入力することで、報酬の値を得ます  

報酬がもっとも多いと期待できる選択を選ぶことで、(この問題の場合)最短の手数で、勝ちに行くことができます

今回は、それぞれの行動と状態から、唯一に報酬が決まるとして、その和でQ関数が表現されるとしました(この問題設定がいつも適応できるわけではなさそうです)  

このスモールq関数をディープラーニングで表現すると、このような任意のモデルで書くことができます

報酬の設定

一個一個の行動に報酬を設定するのは、困難なので、一連の行動の系の結果として勝ったか、負けたかを見ていきます  

スモールq関数を求める問題に変更できたので、これを具体的に以下の値を最小化していきます(Nはゲーム終了までにかかった手数)  

 

勝った時の報酬を+1, 負けた時の報酬を-1のReward関数とすると、以上の式を最小化すれば、勝った時はその選択をより強化して学習し、負けた時は選択を誤ったとして、別の可能性を探索する可能性が強くなります 

マルチエージェント

同じような報酬系をもつモデルを2つ以上用意して、対決させました

先に21以上を言った方が負けというルールで二つのモデルに対決させました  

コード&実行

githubにて管理しています  

マルチエージェント学習

$ python3 21-icchadame-pure.py --reinforce

実際に対戦してみる

$ python3 21-icchadame-pure.py --play

強さについて

この21言っちゃダメゲームは4の倍数を取りに行けば勝てることがわかっている問題なのですが、途中から4の倍数にはめ込もうとしようとしていることがわかります  

例えば、次のような結果になります

なお、最適解は、初手で1を打って次に4を取らせることですが、初期値依存性があり、この状態に素早く収束させるのは結構難しいです  

下記の例は、20000回のゲームをさせた例

例1.

コンピュータは3を選択しました
now position 3
数字(1−3)を入力してください
2
コンピュータは3を選択しました
now position 8
数字(1−3)を入力してください
1
コンピュータは3を選択しました
now position 12
数字(1−3)を入力してください
2
コンピュータは2を選択しました
now position 16
数字(1−3)を入力してください
1
コンピュータは3を選択しました
now position 20
数字(1−3)を入力してください
1
結果 あなたの負け

例2.

コンピュータは3を選択しました
now position 3
数字(1−3)を入力してください
3
コンピュータは2を選択しました
now position 8
数字(1−3)を入力してください
3
コンピュータは1を選択しました
now position 12
数字(1−3)を入力してください
3
コンピュータは1を選択しました
now position 16
数字(1−3)を入力してください
3
コンピュータは1を選択しました
now position 20
数字(1−3)を入力してください
3
結果 あなたの負け

実際やってみて

Reinforce Learning関しては教師あり学習、教師なし学習についで、最後にやろうと思っていたこともあり、あまり深く手をつけていませんでした  

最初、理論をあまり勉強せず、とりあえず適当にコードを書いてみたのですが、収束はめっちゃ早いけど、すぐオーバーフィットしてしまうモデルになってしまいました。基礎理論を再度確認して、コードに落として行くという気持ちでやると、サクサクできます(反省)  

マルチエージェントにすることで、コードがやばいことになるのかなと思ったのですが、意外とシンプルに構築することができました

今回は敵対的なゲームでしたが、時には協力し、時には裏切るモデルなど面白そうであります。学習させることも容易なので、様々な応用が利きそうで面白そうでした

参考文献

[1] Understanding Agent Cooperation
[2] Multi-agent Reinforcement Learning in Sequential Social Dilemmas
[3] 深層強化学習:「20言っちゃダメゲーム」の最適解を30分程度で自動的に編み出す(chainerRL)

KerasのRNNでFizzBuzzを行う(+ Epochスケジューラの提案)

KerasのRNNでFizzBuzzを行う(+ Epochスケジューラの提案)

ディープラーニングをやるようになって半年程度経ちました
ある程度ならば、文章や画像判別モデルならば、過去の自分の資産をうまく活用することと、外部からState of the Artな手法を導入することで、様々なネットワークを組むことが可能になってまいりました
しかし、基礎の基礎であるはずの、Fizz Buzzをやるのを忘れていたのです
やるしかありません

先行研究

全結合のモデルでの、Fizz Buzzの評価のようです

提案

RNNでも、FizzBuzzは可能なのではないでしょうか
全結合層のモデルのみで、1000 ~ 5000程度のデータで学習させることが多いですが、20万件のデータセットで学習させることで、より大きな数字にも対応させることを目標とします  

カリキュラム学習という学習法があり、簡単な問題設定から初めて、徐々に難しくしていくことで、早く安定的に学習できるそうです[1]  

この時、人間がカリキュラムを意図して簡単な問題を用意して学習させるのではなく、学習のデータを最初のうちは限定したデータセットにて学習させ、限定したデータを覚えてきたらデータを拡大し、様々なケースを学習させて、汎化性能を獲得していくという学習方法をとります  

具体的には、データセットとepochにスケジューラを組み込むことで実現します  

モデル

‘1:Fizz, 2:Buzz, 3:Fizz Buzz, 4:そのまま(Path)'と4値の判別問題を全結合層2層でといている問題設定が多いが、 '1:Fizz, 2:Buzz, 3:Path'の3値のそれぞれの状態を求める問題設定とする

図1. 使用したモデル

コードはKerasを利用した   モデルとスケジューラは、非常に小さく、わかりやすいです   モデル

inputs       = Input(shape=(10, 11))
encoded1     = Bi( GRU(256, activation='relu') )(inputs)
encoded1     = Dense(512, activation='relu')( encoded1 )
encoded1_1x  = Reshape((1,512,))(encoded1)
decoded      = Dense(3, activation='sigmoid')( Flatten()(encoded1_1x) )
fizzbuzz     = Model(inputs, decoded)
fizzbuzz.compile(optimizer=Adam(), loss='binary_crossentropy')

スケジューラ(初期のデータセットは、epochを多く学習し、後半になるにつれ一回のみにスケジューリングしている)

class CURRICULUM:
  EPOCH = [50, 30, 20, 10, 5, 1]
  @staticmethod
  def GET():
    if len(CURRICULUM.EPOCH) > 0:
      return CURRICULUM.EPOCH.pop(0)
    else:
      return 1
...
fizzbuzz.fit(Xs, Ys, epochs=CURRICULUM.GET(), callbacks=[batch_callback])
...

コードや日本語では伝えるのに私の貧困なコミュ力では難しかったので、画像を添付しますと、このような差があります

図 2. スケジューリングなし

図 3. スケジューリングあり

このように、学習初期に置いて、学習するデータを非対称にして、最初のデータは多めに繰り返し学習させます  

実験

200,000件のFizz Buzzのデータセットを、スクリプトで作成し、5000件ずつ、データセットを分割し40個のデータセットを学習させる

この時、スケジューリングモデルAは、任意のデータセットをランダムで選択し、以下のepoch回、学習する  

{ 1回目:50epoch, 2回目:30epoch, 3回目:20epoch, 4回目:10epoch, 5回目:5epoch }

このスケジューリングが完了した後は、残りのデータセットを1epochで学習する

スケジューリングモデルBは特にスケジューリングは行わず、全てのデータセットを平等に学習していく。なお、この方法は、全てのデータセットをメモリ上に乗せて順番に学習していく方法と変わらない  

評価

スケジューリングモデルA(青)とモデルB(赤)で大きな差がでた

図4. trainデータのepochごとのlossの変化

ニューラルネットワークの初期値の依存性を考慮しても、この差は大きく、スケジューリングを行うことが、まともに収束するしないなどの差を担っているように思われる

モデルAはテストデータにおける精度100%であった
モデルBは68%であった

なお、出力はこのようになっている 左から、入力値、人手による結果、予想値、正解だったかどうか、である(PATHとは、そのまま出力するという意味にしました) ほぼ100%あっていることが確認できた  

    64170 original result = Fizz Buzz , predict result = Fizz Buzz , result = True
      9791 original result = Path , predict result = Path , result = True
     54665 original result = Buzz , predict result = Buzz , result = True
    118722 original result = Fizz , predict result = Fizz , result = True
     97502 original result = Path , predict result = Path , result = True
    186766 original result = Path , predict result = Path , result = True
    153331 original result = Path , predict result = Path , result = True
      7401 original result = Fizz , predict result = Fizz , result = True
    117939 original result = Fizz , predict result = Fizz , result = True
     22732 original result = Path , predict result = Path , result = True
     73516 original result = Path , predict result = Path , result = True
    144774 original result = Fizz , predict result = Fizz , result = True
     32783 original result = Path , predict result = Path , result = True
     67097 original result = Path , predict result = Path , result = True
    116715 original result = Fizz Buzz , predict result = Fizz Buzz , result = True
     21195 original result = Fizz Buzz , predict result = Fizz Buzz , result = True

コード

https://github.com/GINK03/keras-rnn-fizzbuzz-on-dev

テストデータを作成する

$ python3 data_utils.py --step1

学習する(全体の8割を学習します)

$ python3 fizzbuzz.py --train

予想する(テストデータから予想します)

$ python3 fizzbuzz.py --predict

感想

データによってはまともに収束してくれないものあり、RNNではその傾向が特に顕著です   精確にロス率の違いなどを測ったことがなかったのですが、Epochをいじることによって、安定して学習させることができることがあるということでした  

参考文献

[1] Deep Learningの技術と未来