Google Cloud DataFlowをKotlinで書く
Google Cloud DataFlowをKotlinで書く
Kotlinで書くモチベーション
以前も書きましたが、Kotlinが標準で採用しているラムダ式を用いたメソッドチェーンと、GCP Cloud DataFlow(OSSの名前はApache Beam)の作りが類似しており、ローカルでのKotlinで書いた集計コードの発想を大きく書き換えることなく、GCP DataFlowで利用できるからです。
また、シンタックスもKotlinはJavaに比べると整理されており、データ分析という視点において、見通しが良いと感じるからでもあります。
Google Cloud DataFlowとは
GCPで提供されているクラウド集計プラットフォームで、Apache Beamという名前でOSSで公開されています。
Map Reduceの発展系のような印象を受ける作りになっており、集計するためのコードの書きやすさや、特定の集計部分だけ、集中して記述してかけるなどのメリットがあり、大規模なビッグデータ処理に向いていると感じます。
- Googleが考える、データの取り扱い
- この処理に則って、様々な分析プラットフォームのクラウドサービスが展開されている
- AmazonのElastic Map Reduceと競合する製品だと思われますが、より、柔軟で、汎用性が高いように見えます
AWS Elastic Map Reduceとの違い
Amazon Elastic Map Reduceに比べて、多段にした処理を行うことができることが、最大のメリットだと感じます(複雑な集計が一気通貫でできる)
Google Cloud DataFlow特徴
- Javaなどのプログラミング言語で分析・集計処理を書けるので、非構造化データに対応しやすい
- GCPのデータストレージ(AWSのS3のようなもの)に保存するのでコストが安い
- 関数型言語における、ラムダ式を用いたデータ構造の変換と操作が類似しており、データ集計に関する知識が利用できる
- SQLでないので、プログラムをしらないと集計できない(デメリット)
Requirements
Google Cloud SDKのインストールとセットアップ
ローカルのLinuxマシンからGCPに命令を送るのに、Google Cloud SDKのgcloudというツールをインストールしておく必要があります
この例ではLinuxを対象としています
$ wget https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-sdk-158.0.0-linux-x86_64.tar.gz $ tar zxvf google-cloud-sdk-158.0.0-linux-x86_64.tar.gz $ ./google-cloud-sdk/install.sh Welcome to the Google Cloud SDK! ... Do you want to help improve the Google Cloud SDK (Y/n)? <<- Yと入力 Do you want to continue (Y/n)? <<- Yと入力
bashrcのリロード
$ source ~/.bashrc
gcloud initして、gcloudの認証を通します
$ gcloud init # gcloundのセットアップ [1] alicedatafoundation [2] machine-learning-173502 [3] Create a new project Please enter numeric choice or text value (must exactly match list item): 2 # 使っているプロジェクトを選択
asia-northeast1-aのリージョンを設定
If you do not specify a zone via a command line flag while working with Compute Engine resources, the default is assumed. [1] asia-east1-c [2] asia-east1-b [3] asia-east1-a [4] asia-northeast1-c ...
クレデンシャル(秘密鍵などが記述されたjsonファイル)の環境設定の発行と設定 Google Apiで発行してもらう
クレデンシャルを環境変数に通します
必要ならば、bashrcに追加してください。
$ export GOOGLE_APPLICATION_CREDENTIALS=$HOME/gcp.json
GCP側のセットアップ操作
1. Projectの作成
2. CloudStrageの作成
3. Keyの作成
(ここで作成したjsonファイルはダウンロードして環境変数にセットしておく必要があります)
4. Codeを書く
任意の集計の角度を記述します
Kotlinで記述されたサンプルのコンパイル&実行
私が作成したKotlinのサンプルでの実行例です。
シェイクスピアの小説などの文章から、何の文字が何回出現したかをカウントするプログラムです
git clone(ダウンロード)
$ git clone https://github.com/GINK03/gcp-dataflow-kotlin-java-mix
$ mvn package
クリーン(明示的にtargetなどのバイナリを消したい場合)
$ mvn clean
GCPに接続して実行
$ mvn exec:java
これを実行すると、ご自身のGCPのDataStrageに結果が出力されているのを確認できるかと思います
今回のKotlinのDataFlowのプログラムの説明
多くの例では、Word(単語)カウントを行なっていますが、今回の例では、Char(文字)のカウントを行います
Googleが無償で公開しているシェイクスピアのテキストを全て文字に分解して、group byを行いどの文字が何回出現しているのか、カウントします
このプログラムを処理ブロック(一つのブロックはクラスの粒度で定義されている)で図示すると、このようになります
クラスの定義はこのように行いました
KotlinProc1クラス
public class KotlinProc1 : DoFn<String, String>() { override fun processElement(c : DoFn<String,String>.ProcessContext) { val elem = c.element() elem.toList().map { val char = it.toString() c.output(char) } } }
KotlinProc2クラス
public class KotlinProc2 : DoFn<String, KV<String, String>>() { override fun processElement(c : DoFn<String, KV<String,String>>.ProcessContext) { val char = c.element() c.output(KV.of(char, "1")) } }
GroupByKey
GroupByKey.create<String,String>()
KotlinProc3クラス
public class KotlinProc3 : DoFn<KV<String, Iterable<String>>, String>() { override fun processElement(c : DoFn<KV<String, Iterable<String>>, String>.ProcessContext) { val key = c.element().getKey() val iter = c.element().getValue() val list = mutableListOf<String>() iter.forEach { list.add(it.toString()) } c.output(key.toString() + ": " + list.size.toString()); } }
GCP DataFlowを用いず生のKotlinを用いて同等の処理を書く
類似していることが確認できます
import java.io.* import java.nio.file.Files import java.nio.file.Path import java.nio.file.Paths import java.io.BufferedReader import java.io.InputStream import java.io.File import java.util.stream.Collectors import java.util.stream.* fun main( args : Array<String> ) { val dataframe = mutableListOf<String>() Files.newDirectoryStream(Paths.get("contents/"), "*").map { name -> //データをオンメモリにロード val inputStream = File(name.toString()).inputStream() inputStream.bufferedReader().useLines { xs -> xs.forEach { dataframe.add(it)} } } dataframe.map { it.toList().map { // DataFlowのKotlinProc1に該当 it.toString() } }.flatten().map { // DataFlowのKotlinProc2に該当 Pair(it, "1") }.groupBy { // DataFlowのGroupByKeyに該当 it.first }.toList().map { // DataFlowのKotlinProc3に該当 val (key, arr) = it println("$key : ${arr.size}") } }
まとめ
ローカルで分析すると一台のマシンで収まるメモリの量しか取り扱うことができないので、ビッグデータになると、必然的にスケーラブルなGCP Cloud DataFlowのようなサービスを利用する必要があります。
このように、ローカルでの分析の方法とビッグデータの分析の方法が似ていると、発想を切り替える必要がなかったり、一人でスモールなデータからビッグデータまで低いコストで分析できるので、経験則的に生産性を高めることができます。
AWS EMR Hadoop Streaming Examples
AWS EMR Hadoop Streaming Examples
GCPのDataFlowの方が、AWS EMRより個人的にはモダンな印象があるのですが、業務でAWSで非構造化データの大規模な分析が必要になる可能性があり、Hadoop Streamingの仕組みを軽くおさらいして、いくつかの言語で動かしました
Map Reduceの仕組み自体は古いのですが、論文にもなっており、この構成でほとんどの集計が可能みたいなことを言っており、パワーを感じます[1]
AWSのElastic Map Reduceを利用してHadoop Streamingで任意の言語で、ビッグデータを処理する方法を説明します
任意の言語で処理をつなげることができるため、AWS EMR(Hadoop Streaming)の仕組みさえ理解していれば、好きな言語で処理が可能です
ここでは、以下の言語におけるもっとも簡単な集計である、全てのドキュメントになんの語が何回出現するか、カウントするプログラムを例示します
- Python2
- Python3
- Ruby ( 2.4 )
- Go ( 1.8 )
S3にデータをおく
S3に分析する対象の非構造化データをフォルダを作っていれておきます
生データか、GZで圧縮されている必要があります
例えば、一つのバケットで処理する場合には、このようなフォルダ構成をとります
フォルダに必要なデータを適切に配置して、awscliのコンフィグレーションを通した状態で、次のコマンドでHadoop Streamingを実行します
$ aws emr add-steps --cluster-id j-{$YOUR_EMR_ID} --steps file://./WordCount_step.json --region ap-northeast-1
ここで、引数に指定されている JSONファイルはこのような記述になっています
[ { "Name": "WordCount", "Type": "STREAMING", "ActionOnFailure": "CONTINUE", "Args": [ "-files", "s3://{$YOUR_S3}/wordcount-code/mapper,s3://{$YOUR_S3}/wordcount-code/reducer", "-mapper", "mapper", "-reducer", "reducer", "-input", "s3://{$YOUR_S3}/wordcount-dataset/", "-output", "s3://{$YOUR_S3}/wordcount-result"] } ]
(注:これはGoでバイナリを指定しているので、pythonやrubyの場合、適切にfilesの引数を変えてください)
Python2でのワードカウント
各種インターネット上の文献では、Python2でワードカウントしていることが多いです
AWSの解説サイトで紹介されていた方法で、集計してみます
Reducerを特別な予約関数を割り当てることで省略できますが、ボトムアップ的に学ぶには、実装してしまった方が良いだろうという判断をしました
mapper
#!/usr/bin/python import sys import re def main(argv): pattern = re.compile("[a-zA-Z][a-zA-Z0-9]*") for line in sys.stdin: line = line.strip() for word in pattern.findall(line): print( word.lower() + "\t" + "1" ) if __name__ == "__main__": main(sys.argv)
reducer
#!/usr/bin/python import sys import re def main(argv): term_freq = {} for line in sys.stdin: line = line.strip() ents = line.split('\t') term, freq = ents freq = int(freq) try: if term_freq.get(term) == None: term_freq[term] = 0 term_freq[term] += 1 except Exception as e: print(e) for term, freq in term_freq.items(): print( term, freq ) if __name__ == "__main__": main(sys.argv)
実行命令
$ aws emr add-steps --cluster-id j-{$YOUR_CLUSTER} --steps file://./WordCount_step.json --region ap-northeast-1
Python3でのワードカウント
Python3をインストールするため、Hadoopのクラスタにログインする必要があります
AWSのデフォルトのセキュリティグループについては、sshでログインできないので、セキュリティグループを解放します
$ ssh -i {$KEY} hadoop@{$IPADDR}
Python35のインストール(必要に応じでバージョンを切り替えてください)
$ sudo yum install python35 $ sudo yum install python35-devel $ sudo yum install python35-pip
mapper
#!/usr/bin/python3 import sys import re def main(argv): for line in sys.stdin: line = line.strip() for term in line.split(): print('{}\t1'.format(term) ) if __name__ == "__main__": main(sys.argv)
reducer
#!/usr/bin/python3 import sys import re def main(argv): term_freq = {} for line in sys.stdin: line = line.strip() ents = line.split('\t') term, freq = ents freq = int(freq) try: if term_freq.get(term) == None: term_freq[term] = 0 term_freq[term] += 1 except Exception as e: print(e) for term, freq in term_freq.items(): print( term, freq ) if __name__ == "__main__": main(sys.argv)
Rubyでのワードカウント
世の中には、Rubyestが多く、PythonでなくてRubyでやりたいという人も多いです
Rubyは悪くない選択肢でもあるので、使い方を説明します
AWS EMRのノードにインストールされているバージョンは古く、アップデートします
$ sudo yum remove ruby $ sudo yum install ./ruby-2.4.1-1.el6.x86_64.rpm
mapper
#!/usr/bin/ruby STDIN.each_line { |x| x.split(" ").map { |x| puts sprintf("%s\t1", x.downcase) } }
reducer
#!/usr/bin/ruby term_freq = {} STDIN.each_line { |x| term, freq = x.split("\t") if term_freq[term] == nil then term_freq[term] = 0 end term_freq[term] += 1 } term_freq.each { |term, freq| puts sprintf("%s %d", term, freq) }
全体的にみて、PythonよりRubyの方がスッキリかけますね
好みの問題である気がしてて、好きな言語で良いと思います
Goでのワードカウント
私がstreamingにおいて、スクリプト言語より、速度やメモリリソースや、シングルバイナリになるという点で、良いと感じているのが、Go言語です
Go言語はそのシンプルなシンタックスでありながら、ランタイムに依存することなく、実行可能なバイナリを出力が可能です
つまり、AWS EMRのノードに対して、なんらかログインしてソフトウェアをインストールやセットアップをする必要がありません
mapper
package main import ( "bufio" "fmt" "os" "strings" ) func main() { scanner := bufio.NewScanner(os.Stdin) for scanner.Scan() { line := scanner.Text() terms := strings.Split(line, " ") for _, term := range terms { out := fmt.Sprintf("%s\t1", term) fmt.Println(out) } } }
reducer
package main import ( "bufio" "fmt" "os" "strings" ) func main() { dic := map[string]int{} scanner := bufio.NewScanner(os.Stdin) for scanner.Scan() { line := scanner.Text() ents := strings.Split(line, "\t") term := ents[0] _ = ents[1] _, ok := dic[term] if ok == false { dic[term] = 0 } dic[term] += 1 } for term, freq := range dic { out := fmt.Sprintf("%s %d", term, freq) fmt.Println(out) } }
コード
まとめ
いくつかの言語において、実際に、AWS EMRを動かして集計しました
PythonやRubyおける使い方は、数年前からあまり進化を感じられませんが、Go言語という選択が可能になったことで、goroutineによる効率的な並列処理や、実行可能なバイナリであるというメモリが少なく済んだりするメリットがあるので、今までの集計方では集められなかった角度のデータが取得できそうで、未来が見えました
参考文献
XGBoost fizzbuzz
XGBoost fizzbuzz
XGBoostのFizzBuzzです
勾配ブースティングでもFizzBuzzできるという例を示します
やろうと思った動機
DeepLearningならばFizzBuzzの3の倍数と5の倍数と15の倍数の時に、特定の動作をするというルールを獲得することは容易なのですが、他の機械学習アルゴリズムはどうでしょうか
XGBoostはその決定木の性質と、勾配ブースティングの学習アルゴリズムを解析的に説明した論文の内容を見ると、特定のルールを獲得することは難しくないんじゃないかと思いました[1]
ただ、FizzBuzzを数値として扱ってしまうと、かなり厄介で、連続する値が大きい小さいなどで判別するのは容易ではありません
DeepLearning時と同じように、Character Levelで入力を扱います
具体的な数値データの取り扱い
数字を文字表現として皆して、各桁の数字を一つの特徴量として扱います
クラスの設定、目的関数の設定
クラスは"3の倍数の時のFizz",“5の倍数の時のBuzz”,“15の倍数の時のFizzBuzz”,“その他"の時の4つのクラスの分類問題にしました
softmaxではなくて、softprobを用いました
ドキュメントを読むと、各クラスの所属する確率として表現されるようです(クラスの数ぶん、sigmoidが配置されていると、同じ?)
学習データ
0〜99999までの数字の各FizzBuzzを利用します
この時、2割をランダムでテストデータに、8割を学習データに分割します
各種パラメータ
このようにしました、もっと最適な設定があるかもしれないので、教えていただけると幸いです
etaが大きいのは、極めてroundが多いので、これ以上小さくするとまともな時間に学習が完了しません
booster = gbtree objective = multi:softprob num_class = 4 eta = 1.0 gamma = 1.0 min_child_weight = 1 max_depth = 100 subsample = 0.8 num_round = 100000 save_period = 1000 colsample_bytree = 0.9 data = "svm.fmt.train" eval[test] = "svm.fmt.test" #eval_train = 1 test:data = "svm.fmt.test"
プログラムの解説
githubにコードが置いてあります
データセットの準備
$ python3 createDataset.py --step1 # データセットの作成 $ python3 createDataset.py --step2 # 前処理 $ python3 createDataset.py --step3 # libsvmフォーマットを作成
学習
(xgboost.binはubuntu linux 16.04でコンパイルしたバイナリです。環境に合わせて適宜バイナリを用意してください)
(学習には、16コアのRyzen 1700Xで2時間程度かかります)
$ ./xgboost.bin fizzbuzz.train.conf
予想
(必要に応じて、使用するモデルを書き換えてください)
$ ./xgboost.bin fizzbuzz.predict.conf
精度の確認
$ python3 predCheck.py
精度
50000roundでテストデータで以下の精度が出ます
acc 0.9492
出力はこのようになります
12518 predict class = 3 real class = 3 42645 predict class = 2 real class = 0 15296 predict class = 3 real class = 3 47712 predict class = 3 real class = 1 1073 predict class = 3 real class = 3 66924 predict class = 1 real class = 1 82852 predict class = 3 real class = 3 26043 predict class = 1 real class = 1 96556 predict class = 3 real class = 3 81672 predict class = 1 real class = 1 44018 predict class = 3 real class = 3 16622 predict class = 3 real class = 3 79924 predict class = 3 real class = 3 15290 predict class = 2 real class = 2 25276 predict class = 3 real class = 3
class 2は15の倍数なのですが、これの獲得が難しいようです
liblinear(support vector classification)との比較
一応、違う機械学習との比較ともやるべきでしょう
L2-regularized L2-loss support vector classificationで動作する、liblinearで比較しました
$ ./train -s 1 svm.fmt.train ....*.* optimization finished, #iter = 52 Objective value = -56679.234880 nSV = 64068 .....* optimization finished, #iter = 51 Objective value = -56678.857821 nSV = 65083 ....*. optimization finished, #iter = 50 Objective value = -14306.576984 nSV = 17032 .....* optimization finished, #iter = 51 Objective value = -14305.608585 nSV = 16957
$ ./predict svm.fmt.test svm.fmt.train.model output Accuracy = 66.2298% (13240/19991)
精度が66%しか出ていません
やはり、XGBoostの精度で判別をすることはできないようです
まとめ
DeepLearningでは精度100%を達成できましたが、XGBoostでは95%程度の精度です
完全なルールの獲得は怪しいですが、それでもかなりいいところまで行っているようです
また、目的関数を複数ラベルを取れるようにするなど、うまく設計すれば、もっといけるでしょう(勾配ブースティングのマルチラベル分類、どうやるんだろう)
参考文献
Multi Agent Deep Q Network for Keras
Multi Agent Deep Q Network for Keras
Kerasでマルチエージェント DQN
マルチエージェントラーニングは、相互に影響を与え合うモデルが強調ないし、敵対して、目的となる報酬を最大化するシチュエーションのディープラーニングです[1][2]
強化学習の特殊系と捉えることができそです
Deep Mind社が提案したモデルの一部では非常に面白く、報酬の設定しだいでは各エージェントが協力したり敵対したりします。
Kerasで敵対的な簡単なマルチエージェントラーニングを21言っちゃダメゲームでスクラッチで、構築しました
(これはQiitaのはむこさんの記事を参考にさせていただきました、ありがとうございます[3])
(調べながらやったこともあり、理論的な間違いを見つけたら、ツイッターで指摘していただけると助かります)
強化学習の理論
強化学習は、人間が特に正しい悪いなどを指定せずとも、なんらかの報酬系から値を得ることで報酬を最大化するよう学習します
このとき、ある系列の状態をSとし、その時の行動をaとし、この組み合わせで得られる報酬関数をQとすします
この関数で、最適な行動を得られた時に*をつけて表し、この時の行動sについて最大となるsをとると、以下のように最適な行動πを得ることができます
また報酬の割引率というものがあるのですが、今回は具体的に割引率を与えたり、求めるということをしていません
今回の例では、Q関数を具体的にDeepLearningによる関数としています
ϵ-greedy
全く意識していなかったのですが、どうやら行動の選択は初期値依存性がある程度あり、運が悪いと局所解に嵌ったなままなかなか更新してくれなくなります
適度にランダムに行動を選択することを入れないとダメっぽいです
ルール(問題設定)
先手、後手に別れて0から最小1、最大の3つ増やした数字を言い合います
数字を累積していって、21以上のになるように言った時点で負けです。相手に21以上を踏ませれば勝ちです
今回設計した、Q関数
Q関数は状態と行動を入力することで、報酬の値を得ます
報酬がもっとも多いと期待できる選択を選ぶことで、(この問題の場合)最短の手数で、勝ちに行くことができます
今回は、それぞれの行動と状態から、唯一に報酬が決まるとして、その和でQ関数が表現されるとしました(この問題設定がいつも適応できるわけではなさそうです)
このスモールq関数をディープラーニングで表現すると、このような任意のモデルで書くことができます
報酬の設定
一個一個の行動に報酬を設定するのは、困難なので、一連の行動の系の結果として勝ったか、負けたかを見ていきます
スモールq関数を求める問題に変更できたので、これを具体的に以下の値を最小化していきます(Nはゲーム終了までにかかった手数)
勝った時の報酬を+1, 負けた時の報酬を-1のReward関数とすると、以上の式を最小化すれば、勝った時はその選択をより強化して学習し、負けた時は選択を誤ったとして、別の可能性を探索する可能性が強くなります
マルチエージェント
同じような報酬系をもつモデルを2つ以上用意して、対決させました
先に21以上を言った方が負けというルールで二つのモデルに対決させました
コード&実行
githubにて管理しています
マルチエージェント学習
$ python3 21-icchadame-pure.py --reinforce
実際に対戦してみる
$ python3 21-icchadame-pure.py --play
強さについて
この21言っちゃダメゲームは4の倍数を取りに行けば勝てることがわかっている問題なのですが、途中から4の倍数にはめ込もうとしようとしていることがわかります
例えば、次のような結果になります
なお、最適解は、初手で1を打って次に4を取らせることですが、初期値依存性があり、この状態に素早く収束させるのは結構難しいです
下記の例は、20000回のゲームをさせた例
例1.
コンピュータは3を選択しました now position 3 数字(1−3)を入力してください 2 コンピュータは3を選択しました now position 8 数字(1−3)を入力してください 1 コンピュータは3を選択しました now position 12 数字(1−3)を入力してください 2 コンピュータは2を選択しました now position 16 数字(1−3)を入力してください 1 コンピュータは3を選択しました now position 20 数字(1−3)を入力してください 1 結果 あなたの負け
例2.
コンピュータは3を選択しました now position 3 数字(1−3)を入力してください 3 コンピュータは2を選択しました now position 8 数字(1−3)を入力してください 3 コンピュータは1を選択しました now position 12 数字(1−3)を入力してください 3 コンピュータは1を選択しました now position 16 数字(1−3)を入力してください 3 コンピュータは1を選択しました now position 20 数字(1−3)を入力してください 3 結果 あなたの負け
実際やってみて
Reinforce Learning関しては教師あり学習、教師なし学習についで、最後にやろうと思っていたこともあり、あまり深く手をつけていませんでした
最初、理論をあまり勉強せず、とりあえず適当にコードを書いてみたのですが、収束はめっちゃ早いけど、すぐオーバーフィットしてしまうモデルになってしまいました。基礎理論を再度確認して、コードに落として行くという気持ちでやると、サクサクできます(反省)
マルチエージェントにすることで、コードがやばいことになるのかなと思ったのですが、意外とシンプルに構築することができました
今回は敵対的なゲームでしたが、時には協力し、時には裏切るモデルなど面白そうであります。学習させることも容易なので、様々な応用が利きそうで面白そうでした
参考文献
[1] Understanding Agent Cooperation
[2] Multi-agent Reinforcement Learning in Sequential Social Dilemmas
[3] 深層強化学習:「20言っちゃダメゲーム」の最適解を30分程度で自動的に編み出す(chainerRL)