Google Cloud DataFlowをKotlinで書く
Google Cloud DataFlowをKotlinで書く
Kotlinで書くモチベーション
以前も書きましたが、Kotlinが標準で採用しているラムダ式を用いたメソッドチェーンと、GCP Cloud DataFlow(OSSの名前はApache Beam)の作りが類似しており、ローカルでのKotlinで書いた集計コードの発想を大きく書き換えることなく、GCP DataFlowで利用できるからです。
また、シンタックスもKotlinはJavaに比べると整理されており、データ分析という視点において、見通しが良いと感じるからでもあります。
Google Cloud DataFlowとは
GCPで提供されているクラウド集計プラットフォームで、Apache Beamという名前でOSSで公開されています。
Map Reduceの発展系のような印象を受ける作りになっており、集計するためのコードの書きやすさや、特定の集計部分だけ、集中して記述してかけるなどのメリットがあり、大規模なビッグデータ処理に向いていると感じます。
- Googleが考える、データの取り扱い
- この処理に則って、様々な分析プラットフォームのクラウドサービスが展開されている
- AmazonのElastic Map Reduceと競合する製品だと思われますが、より、柔軟で、汎用性が高いように見えます
AWS Elastic Map Reduceとの違い
Amazon Elastic Map Reduceに比べて、多段にした処理を行うことができることが、最大のメリットだと感じます(複雑な集計が一気通貫でできる)
Google Cloud DataFlow特徴
- Javaなどのプログラミング言語で分析・集計処理を書けるので、非構造化データに対応しやすい
- GCPのデータストレージ(AWSのS3のようなもの)に保存するのでコストが安い
- 関数型言語における、ラムダ式を用いたデータ構造の変換と操作が類似しており、データ集計に関する知識が利用できる
- SQLでないので、プログラムをしらないと集計できない(デメリット)
Requirements
Google Cloud SDKのインストールとセットアップ
ローカルのLinuxマシンからGCPに命令を送るのに、Google Cloud SDKのgcloudというツールをインストールしておく必要があります
この例ではLinuxを対象としています
$ wget https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-sdk-158.0.0-linux-x86_64.tar.gz $ tar zxvf google-cloud-sdk-158.0.0-linux-x86_64.tar.gz $ ./google-cloud-sdk/install.sh Welcome to the Google Cloud SDK! ... Do you want to help improve the Google Cloud SDK (Y/n)? <<- Yと入力 Do you want to continue (Y/n)? <<- Yと入力
bashrcのリロード
$ source ~/.bashrc
gcloud initして、gcloudの認証を通します
$ gcloud init # gcloundのセットアップ [1] alicedatafoundation [2] machine-learning-173502 [3] Create a new project Please enter numeric choice or text value (must exactly match list item): 2 # 使っているプロジェクトを選択
asia-northeast1-aのリージョンを設定
If you do not specify a zone via a command line flag while working with Compute Engine resources, the default is assumed. [1] asia-east1-c [2] asia-east1-b [3] asia-east1-a [4] asia-northeast1-c ...
クレデンシャル(秘密鍵などが記述されたjsonファイル)の環境設定の発行と設定 Google Apiで発行してもらう
クレデンシャルを環境変数に通します
必要ならば、bashrcに追加してください。
$ export GOOGLE_APPLICATION_CREDENTIALS=$HOME/gcp.json
GCP側のセットアップ操作
1. Projectの作成
2. CloudStrageの作成
3. Keyの作成
(ここで作成したjsonファイルはダウンロードして環境変数にセットしておく必要があります)
4. Codeを書く
任意の集計の角度を記述します
Kotlinで記述されたサンプルのコンパイル&実行
私が作成したKotlinのサンプルでの実行例です。
シェイクスピアの小説などの文章から、何の文字が何回出現したかをカウントするプログラムです
git clone(ダウンロード)
$ git clone https://github.com/GINK03/gcp-dataflow-kotlin-java-mix
$ mvn package
クリーン(明示的にtargetなどのバイナリを消したい場合)
$ mvn clean
GCPに接続して実行
$ mvn exec:java
これを実行すると、ご自身のGCPのDataStrageに結果が出力されているのを確認できるかと思います
今回のKotlinのDataFlowのプログラムの説明
多くの例では、Word(単語)カウントを行なっていますが、今回の例では、Char(文字)のカウントを行います
Googleが無償で公開しているシェイクスピアのテキストを全て文字に分解して、group byを行いどの文字が何回出現しているのか、カウントします
このプログラムを処理ブロック(一つのブロックはクラスの粒度で定義されている)で図示すると、このようになります
クラスの定義はこのように行いました
KotlinProc1クラス
public class KotlinProc1 : DoFn<String, String>() { override fun processElement(c : DoFn<String,String>.ProcessContext) { val elem = c.element() elem.toList().map { val char = it.toString() c.output(char) } } }
KotlinProc2クラス
public class KotlinProc2 : DoFn<String, KV<String, String>>() { override fun processElement(c : DoFn<String, KV<String,String>>.ProcessContext) { val char = c.element() c.output(KV.of(char, "1")) } }
GroupByKey
GroupByKey.create<String,String>()
KotlinProc3クラス
public class KotlinProc3 : DoFn<KV<String, Iterable<String>>, String>() { override fun processElement(c : DoFn<KV<String, Iterable<String>>, String>.ProcessContext) { val key = c.element().getKey() val iter = c.element().getValue() val list = mutableListOf<String>() iter.forEach { list.add(it.toString()) } c.output(key.toString() + ": " + list.size.toString()); } }
GCP DataFlowを用いず生のKotlinを用いて同等の処理を書く
類似していることが確認できます
import java.io.* import java.nio.file.Files import java.nio.file.Path import java.nio.file.Paths import java.io.BufferedReader import java.io.InputStream import java.io.File import java.util.stream.Collectors import java.util.stream.* fun main( args : Array<String> ) { val dataframe = mutableListOf<String>() Files.newDirectoryStream(Paths.get("contents/"), "*").map { name -> //データをオンメモリにロード val inputStream = File(name.toString()).inputStream() inputStream.bufferedReader().useLines { xs -> xs.forEach { dataframe.add(it)} } } dataframe.map { it.toList().map { // DataFlowのKotlinProc1に該当 it.toString() } }.flatten().map { // DataFlowのKotlinProc2に該当 Pair(it, "1") }.groupBy { // DataFlowのGroupByKeyに該当 it.first }.toList().map { // DataFlowのKotlinProc3に該当 val (key, arr) = it println("$key : ${arr.size}") } }
まとめ
ローカルで分析すると一台のマシンで収まるメモリの量しか取り扱うことができないので、ビッグデータになると、必然的にスケーラブルなGCP Cloud DataFlowのようなサービスを利用する必要があります。
このように、ローカルでの分析の方法とビッグデータの分析の方法が似ていると、発想を切り替える必要がなかったり、一人でスモールなデータからビッグデータまで低いコストで分析できるので、経験則的に生産性を高めることができます。