にほんごのれんしゅう

日本語として伝えるための訓練を兼ねたテクログ

Google Cloud DataFlowをKotlinで書く

Google Cloud DataFlowをKotlinで書く

Kotlinで書くモチベーション

以前も書きましたが、Kotlinが標準で採用しているラムダ式を用いたメソッドチェーンと、GCP Cloud DataFlow(OSSの名前はApache Beam)の作りが類似しており、ローカルでのKotlinで書いた集計コードの発想を大きく書き換えることなく、GCP DataFlowで利用できるからです。   

また、シンタックスもKotlinはJavaに比べると整理されており、データ分析という視点において、見通しが良いと感じるからでもあります。  

Google Cloud DataFlowとは

GCPで提供されているクラウド集計プラットフォームで、Apache Beamという名前でOSSで公開されています。

Map Reduceの発展系のような印象を受ける作りになっており、集計するためのコードの書きやすさや、特定の集計部分だけ、集中して記述してかけるなどのメリットがあり、大規模なビッグデータ処理に向いていると感じます。  

  • Googleが考える、データの取り扱い

  • この処理に則って、様々な分析プラットフォームのクラウドサービスが展開されている

  • AmazonのElastic Map Reduceと競合する製品だと思われますが、より、柔軟で、汎用性が高いように見えます

AWS Elastic Map Reduceとの違い

elastic map reduceイメージ

Google Cloud DataFlow

Amazon Elastic Map Reduceに比べて、多段にした処理を行うことができることが、最大のメリットだと感じます(複雑な集計が一気通貫でできる)

Google Cloud DataFlow特徴

  • Javaなどのプログラミング言語で分析・集計処理を書けるので、非構造化データに対応しやすい
  • GCPのデータストレージ(AWSのS3のようなもの)に保存するのでコストが安い
  • 関数型言語における、ラムダ式を用いたデータ構造の変換と操作が類似しており、データ集計に関する知識が利用できる  
  • SQLでないので、プログラムをしらないと集計できない(デメリット)

Requirements

Google Cloud SDKのインストールとセットアップ

ローカルのLinuxマシンからGCPに命令を送るのに、Google Cloud SDKのgcloudというツールをインストールしておく必要があります  

この例ではLinuxを対象としています

$ wget https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-sdk-158.0.0-linux-x86_64.tar.gz
$ tar zxvf google-cloud-sdk-158.0.0-linux-x86_64.tar.gz
$ ./google-cloud-sdk/install.sh 
Welcome to the Google Cloud SDK!
...
Do you want to help improve the Google Cloud SDK (Y/n)? <<- Yと入力
Do you want to continue (Y/n)?  <<- Yと入力

bashrcのリロード

$ source ~/.bashrc

gcloud initして、gcloudの認証を通します

$ gcloud init # gcloundのセットアップ
 [1] alicedatafoundation
 [2] machine-learning-173502
 [3] Create a new project
 Please enter numeric choice or text value (must exactly match list 
item): 2 # 使っているプロジェクトを選択

asia-northeast1-aのリージョンを設定

If you do not specify a zone via a command line flag while working 
with Compute Engine resources, the default is assumed.
 [1] asia-east1-c
 [2] asia-east1-b
 [3] asia-east1-a
 [4] asia-northeast1-c
...

クレデンシャル(秘密鍵などが記述されたjsonファイル)の環境設定の発行と設定   Google Apiで発行してもらう

クレデンシャルを環境変数に通します

必要ならば、bashrcに追加してください。

$ export GOOGLE_APPLICATION_CREDENTIALS=$HOME/gcp.json

GCP側のセットアップ操作

1. Projectの作成

1. Projectの作成

2. CloudStrageの作成

2. CloudStrageの作成

3. Keyの作成

3. Keyの作成

(ここで作成したjsonファイルはダウンロードして環境変数にセットしておく必要があります)

4. Codeを書く

4. Codeを書く

任意の集計の角度を記述します

Kotlinで記述されたサンプルのコンパイル&実行

私が作成したKotlinのサンプルでの実行例です。   シェイクスピアの小説などの文章から、何の文字が何回出現したかをカウントするプログラムです
git clone(ダウンロード)

$ git clone https://github.com/GINK03/gcp-dataflow-kotlin-java-mix

コンパイル 

$ mvn package

クリーン(明示的にtargetなどのバイナリを消したい場合)

$ mvn clean

GCPに接続して実行

$ mvn exec:java

これを実行すると、ご自身のGCPのDataStrageに結果が出力されているのを確認できるかと思います

今回のKotlinのDataFlowのプログラムの説明

多くの例では、Word(単語)カウントを行なっていますが、今回の例では、Char(文字)のカウントを行います  

Googleが無償で公開しているシェイクスピアのテキストを全て文字に分解して、group byを行いどの文字が何回出現しているのか、カウントします  

このプログラムを処理ブロック(一つのブロックはクラスの粒度で定義されている)で図示すると、このようになります

クラスの定義はこのように行いました  

KotlinProc1クラス

public class KotlinProc1 : DoFn<String, String>() {
  override fun processElement(c : DoFn<String,String>.ProcessContext) {
    val elem = c.element()
    elem.toList().map { 
      val char = it.toString()
      c.output(char)
    }
  }
}

KotlinProc2クラス

public class KotlinProc2 : DoFn<String, KV<String, String>>() {
  override fun processElement(c : DoFn<String, KV<String,String>>.ProcessContext) {
    val char = c.element()
    c.output(KV.of(char, "1"))
  }
}

GroupByKey

 GroupByKey.create<String,String>() 

KotlinProc3クラス

public class KotlinProc3 : DoFn<KV<String, Iterable<String>>, String>() {
  override fun processElement(c : DoFn<KV<String, Iterable<String>>, String>.ProcessContext) {
    val key = c.element().getKey()
    val iter = c.element().getValue()
    val list = mutableListOf<String>()
    iter.forEach {  list.add(it.toString()) }
    c.output(key.toString() + ": " + list.size.toString()); 
  }
}

GCP DataFlowを用いず生のKotlinを用いて同等の処理を書く

類似していることが確認できます

import java.io.*
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
import java.io.BufferedReader
import java.io.InputStream
import java.io.File
import java.util.stream.Collectors
import java.util.stream.*
fun main( args : Array<String> ) {
  val dataframe = mutableListOf<String>() 
  Files.newDirectoryStream(Paths.get("contents/"), "*").map { name -> //データをオンメモリにロード
    val inputStream = File(name.toString()).inputStream()
    inputStream.bufferedReader().useLines { xs -> xs.forEach { dataframe.add(it)} }
  }

  dataframe.map {
    it.toList().map { // DataFlowのKotlinProc1に該当
      it.toString()
    }
  }.flatten().map { // DataFlowのKotlinProc2に該当
    Pair(it, "1")
  }.groupBy { // DataFlowのGroupByKeyに該当
    it.first
  }.toList().map { // DataFlowのKotlinProc3に該当
    val (key, arr) = it
    println("$key : ${arr.size}")
  }
}

まとめ

ローカルで分析すると一台のマシンで収まるメモリの量しか取り扱うことができないので、ビッグデータになると、必然的にスケーラブルなGCP Cloud DataFlowのようなサービスを利用する必要があります。
このように、ローカルでの分析の方法とビッグデータの分析の方法が似ていると、発想を切り替える必要がなかったり、一人でスモールなデータからビッグデータまで低いコストで分析できるので、経験則的に生産性を高めることができます。