にほんごのれんしゅう

日本語として伝えるための訓練を兼ねたテクログ

ポアンカレエンベッディング

ポアンカレエンベッディング

Euclid空間にエンベッディングするようなword2vecは意味の上下関係が明示的に記されません。(情報としたあったとしても僅かでしょう)

ポアンカレボールという双曲幾何学空間に埋め込むことで、効率的に意味(や木構造)の上位関係をとらえることができます[1]

理解

 ポアンカレボールはこのような、外周部に行くほど密になる球みたいなものなのです。

図1. ハニカム構造のPoincare Ball(Wikipediaより)

ポアンカレボールでは外に行くほど情報が密になり、空間が広がっているともとらえます。

数式で表現するとこのようになって、

gEというユークリッド距離がxが1に近づけば無限に大きくなることがわかります。

このポアンカレボール上にある二点間の距離はこのように表現され、単純なユークリッド距離ではないことが見て取れます。

この距離関数に基づいて損失関数L(Θ)を定義します。

これをSGDやFBリサーチの論文ではRiemannianSGDというオプティマイザを利用して最適化しています

双極幾何学空間に埋め込むと、情報が何らかの上下関係を持っており、木構造で表現できるとき、ルートノード(つまり抽象度が高い)方が真ん中にきて、枝葉に近いほど、外周部に行く傾向があるとのことで、これはポアンカレ空間に木の幹と枝を配置しようと試みるとき、幹が真ん中にきて枝が外周に来るとおさまりがいいのは、直感的に理解できると思います。

なんか以前やったDeep Graph Convolutionに似ているなと思っていたら、Abejaさんのブログでも同様の記述を発見しました[2]  

何でもいいので、優秀なエンベッティングは距離空間を適宜定義することでその空間の形に応じた特性を獲得できそうということでもあり、ポアンカレボールは意味的、木構造的な関係を獲得できるし、別の距離空間ではそのようなものが定義できるのだと思います。

以下の実験のコード

実験

pythonの言語処理ライブラリであるgensimですでにpoincare embeddingが実装されています。

可視化の方法もjupyterを用いて簡単にできるので、なんかの情報のペアで木構造を持つと仮定できるデータセットがあれば、学習することができます。

poincare embeddingの論文では、動物名と動物の所属する種類などのペアのデータセットを利用して、poincareに埋め込んで評価しています。

(ライオン, ネコ科)
(ネコ科, 哺乳類)
...

こんなどちらが上位の概念化は順序がむちゃくちゃでいいので、ペアのデータセットがたくさん作成します

データセット

旧日本海軍の艦種のいくつかのデータセットを用いてやってみましょう。

軍艦を発信点にして、大分する艦種と、艦種と艦名のペアです。

軍艦    駆逐艦
軍艦    戦艦
軍艦    巡洋艦
軍艦    空母
駆逐艦  陽炎型
駆逐艦  朝潮型
駆逐艦  吹雪型
駆逐艦  白露型
朝潮型  朝潮
朝潮型  大潮
朝潮型  満潮
...

可視化

gensimでの実装は、jupyterとplotlyを用いてこのようにすることで、簡単に可視化できます
(おそらくFacebook ResearchのPyTorchでの実装は同じ方法は通用しませんが、こっちの方が本来性能はよさそうです)

import plotly
import gensim.viz.poincare

import pickle
model = pickle.loads(open('model.pkl','rb').read())
relations = pickle.loads(open('relations.pkl', 'rb').read() )

plotly.offline.init_notebook_mode(connected=False)
prefecutre_map = gensim.viz.poincare.poincare_2d_visualization(model=model,
                                                               tree=relations,
                                                               figure_title="艦種",
                                                               show_node_labels=model.kv.vocab.keys())
plotly.offline.iplot(prefecutre_map)

評価

ポアンカレボール上に配置するときに、乱択アルゴリズムでサンプリングするらしいので、小さいデータセットではきれいな円状に配置されませんが、おおむねそれらしい結果が得られています。(割り切ってランダムネスの影響を許容するといいでしょう)

朝潮型が変なところに来ています。初期値のラムダムネスにより変なところにきているようです

なかなかgemsinで実行するとうまくいきませんね。(最適化アルゴリズムSGDなどの乱択がはいるので仕方がないですが。。。)

より、細やかに丁寧に距離函数を再定義できそうな実装系としてFacebook社のPytorchのSGDオプティマイザと距離函数を双曲線距離に定義した物を使うと、learning rateを最初に大きく取って少なくしていくなどの戦略が取れるので、より細やかに綺麗に学習が収束しそうです[3]  

Facebook ResearchによるPoincare Embedding

参考文献

RNNで暗号であるEnigmaを解く

RNNで暗号であるEnigmaを解く

Enigma暗号とは

 1918年に発明されたEnigma第二次世界大戦時に発明された暗号化機であり、電線の配線のパターンと、ロータといわれる入力するたびに回転する円盤のパターンで、様々な文字の置き換えを行います。

 ドイツ軍で用いたられたアルファベットの数だけ暗号化のもととなる配線が記された三つのロータを組み合わせて、膨大な動的に変換するパターンを構築して文字列を置換して、単純な交換則が成立しない複雑な暗号を構築して連合軍を苦しめました。

図1. JavaScriptによるEnigma Simulator

 連合国側のイギリスの特殊チームのULTRAによって解析されたようです。数学的な暗号の原理を追っているのですが、まだ完全にキャッチアップしきっておりませんが、群論とコンピュータのパワーとヒントとなるキーが人間の発想に依存するという特性を利用して解いたそうです。総当たりに近い方法を用いており、とにかくコンピュータリソースが必要でした。

 2006年、分散コンピューティングを利用して総当たりにて、最後の未解決であったEnigma暗号を解いたとのことです[1]。

どのようにEnigmaを連合国軍は解いたのか

二つの方法を組み合わせたような表現を確認しました。

  1. Bombeとよばれるコンピュータで総当たり攻撃した
  2. 人間が利用しがちなカギや文章(日付などが末尾に入るとか)から推定されるパターンを限定する
図2. 現代に再現さたBombe(Wikipediaより)

今風の機械学習で解くにはどうすればいいのか

2.のように、何らかの暗号化前の文章と暗号化後の暗号文が手に入ったとします。このとき、この対が十分な量があるとき、ディープラーニングアルゴリズムの一つであるRNNで解くことが可能です[2]

ロータが機械的に回転し、絡み合うような仕組みは、ディープラーニングのような機械学習は苦手だろう思い込みがあったので、この[2]の資料を見たときは驚きました。

ロータが二つのEnigmaを仮定

 複雑な三つのロータではなく、簡単にした二つのロータのみで構成されるEnigmaを仮定します

 Enigmaのロータは一文字進むごとに回転し、初期値が不明になっており、キーはランダムになっているとします.

 初期値が不明なため、26(+3)2パターンの成立しうるロータの状態をディープラーニングのネットワークを施行し、もっとも自然な文字列である初期状態とロータの配線を全探索しないと、原理として解くことはできません。

 暗号化として以下のような二つのロータを仮定したスクリプトを作成ました

 コーパスとしてBBCの公開ニュースコーパスを利用しています。

total = ''
for name in glob.glob('courpus/bbc/*/*'):
  try:
    text = open(name).read()
  except Exception as ex:
    continue
  buff = []
  for char in list(text.lower()):
    if char in char_index_0:
      buff.append(char)
  total += ''.join(buff)

# random slice
pairs = []
for index in random.sample( list(range(0, len(total) - 150)),100000):
  _char_index_0 = copy.copy(char_index_0)
  _char_index_1 = copy.copy(char_index_1)
  real = total[index:index+150]

  enigma = []
  for diff, char in enumerate(real):
    # roater No.1 update _char_index
    _char_index_0 = { char:(ind+1)%len(_char_index_0) for char, ind in _char_index_0.items() }
    # get index
    ind = _char_index_0[char]
    next_char = chars[ind]

    # roater No.2
    _char_index_1 = { char:(ind+1)%len(_char_index_1) for char, ind in _char_index_1.items() }
    # get index
    ind = _char_index_1[next_char]
    next_char = chars[ind]

    enigma.append(next_char)
  cript = ''.join(enigma)

  crop = random.choice(list(range(len(char_index_0))))
  real, cript = real[crop:crop+100], cript[crop:crop+100]
  pairs.append( (real, cript) )

open('pairs.json', 'w').write( json.dumps(pairs, indent=2) )

DeepLearningのネットワークを設計

Kerasで実装しました。
GRUを用いネットワークはこのようになっています。
初期状態が不明なEnigmaで暗号化された暗号文を最大100文字入力し、対応する100文字を入力します

timesteps   = 100
inputs      = Input(shape=(timesteps, 29))
x           = Bi(GRU(512, dropout=0.10, recurrent_dropout=0.25, return_sequences=True))(inputs)
x           = TD(Dense(3000, activation='relu'))(x)
x           = Dropout(0.2)(x)
x           = TD(Dense(3000, activation='relu'))(x)
x           = Dropout(0.2)(x)
x           = TD(Dense(29, activation='softmax'))(x)

decript     = Model(inputs, x)
decript.compile(optimizer=Adam(), loss='categorical_crossentropy')

前処理

Enigmaの配線をランダムで初期化

$ python3 14-make-enigma.py 

コーパスから暗号文と正解のペアを作成

$ python3 15-prepare.py 

RNNで入力する密ベクトルに変換

$ python3 16-make-vector.py 

学習

全体の99%を学習します

$ python3 17-train.py --train 

評価

学習で使わなかった1%のデータを評価します

$ python3 17-train.py --predict

出力

[オリジナルの文]    all targets, he said. the exceptional quality of the paintings in our permanent collection is also h
[入力された暗号文]   ccxej,,,xum yh.bfpp,ggm.ihsorjges.iy .cyvmyl wvogaxdqyzqmomtdkzbqqeobszs uyyaoagyy.nsuomynekaescf sc
[モデルで評価した文]  all targets, he said. the exceptional quality of the paintings in our permanent collection is also h

このように、Enigmaのネットワークが未知であっても、確定してわかるテキストが十分にあれば、RNNでエニグマ暗号は解けることがわかりました。
今回は仮想的なロータを二つソフトウェア的に再現しましたが、ロータが三つでも十分にRNNのネットワークが大きく、データが十分にあれば、この延長線上で解けると思います。

参考文献

CNNによる文字コード不明なドキュメントの推定

CNNによる文字コード不明なドキュメントの推定

Advent Calender遅刻いい訳

  1. 年末忙しすぎた
  2. ネタと期待していたいくつかがまともに結果が出ずに苦しい思いをしていた
  3. 元URLの喪失

バイト列から文字コーディングを推定する

Twitterで時々バズるネタとして、機械学習がこれほどもてはやされるのに、今だにBrowserは時々文字化けし、ExcelはUTF8を突っ込むと文字化けし、到底、文化的で最低限の人権が保護された状態ではありません。

実際、ルールベースで推定しようとすると、この様にshift jisとeucでは完全に背反な情報を使っているわけでないので、なんらかのヒューリスティックなルールを人間が作成して対応していたのだと思いますが、この様なユースケースの場合、機械学習が強い力を発揮します。

図1. sjiseuc文字コードのバイト列のマップ(参考:smdn)

その度、「それ、できると思うよ」って言い返していたのですが、実証実験を行いたいと思います。

なんの機械学習アルゴリズムがいいか

ニュースサイトをスクレイピングすると、大量のUTF8のテキスト情報が取得できます

このテキスト情報をもとに、nkfというコマンドで、euc, sjis文字コードに変換して、様々な文字コードのバージョンを作ります

Pythonやいくつかの言語では、UTF8以外を扱うとバグるのですが、バイト列としてみなすと読み込みが可能になり、バイト列にはなんらかの特徴が見て取れそうです(仮説)

バイト列をベクトル化して、CNNのテキスト分類の機械学習で分類することが良さそうです

ネットワーク

VGGのネットワークを参考に編集しました。

図2. 作成したネットワーク

目的関数

微妙な判断結果になった場合、確率を正しく出力したいので、sotfmaxではなく、3つのsigmoidを出力して、それぞれのbinary cross entropyを損失としています

出力の解釈性が良いので個人的によく使うテクニックです

コード

全体のコードはgithubにあります

CBRDという関数はosciiartさんの作り方を参考にさせていただきました

def CBRD(inputs, filters=64, kernel_size=3, droprate=0.5):
  x = Conv1D(filters, kernel_size, padding='same',
            kernel_initializer='random_normal')(inputs)
  x = BatchNormalization()(x)
  x = Activation('relu')(x)
  return x

input_tensor = Input( shape=(300, 95) )

x = input_tensor
x = CBRD(x, 2)
x = CBRD(x, 2)
x = MaxPool1D()(x)

x = CBRD(x, 4)
x = CBRD(x, 4)
x = MaxPool1D()(x)

x = CBRD(x, 8)
x = CBRD(x, 8)
x = MaxPool1D()(x)

x = CBRD(x, 16)
x = CBRD(x, 16)
x = CBRD(x, 16)
x = MaxPool1D()(x)

x = CBRD(x, 32)
x = CBRD(x, 32)
x = CBRD(x, 32)
x = MaxPool1D()(x)

x = Flatten()(x)
x = Dense(3, name='dense_last', activation='sigmoid')(x)
model = Model(inputs=input_tensor, outputs=x)
model.compile(loss='binary_crossentropy', optimizer='adam')

データセット

nifty newsさんniconico newsさんのニュースコーパスを利用しました。

zipファイルを分割して圧縮しています

もし、お手元で試していただいて性能が出ないと感じる場合は、おそらく、コーパスの属性があっていないものですので、再学習してもいいと思います

https://github.com/GINK03/keras-cnn-character-code-detection/tree/master/dataset

前処理

dbmに入ったデータセットから内容をテキストファイルで取り出します

$ python3 14-make_files.py

nkfを使ってeucのデータセットを作成します(Python2で実行)

$ python2 15-make_euc.py

nkfを使ってsjisのデータセットを作成します(Python2で実行)

$ python2 16-make_shiftjis.py

byte表現に対してindexをつけます(Python3で実行)

$ python3 17-unicode_vector.py 

最終的に用いるデータセットを作成してKVSに格納します(LevelDBが必要)

$ python3 18-make_pair.py

学習

$ python3 19-train.py --train

テストデータにおける精度

hash値でデータを管理していて、7から始まるデータをテストデータしています

Train on 464 samples, validate on 36 samples
Epoch 1/1
464/464 [==============================] - 1s 1ms/step - loss: 2.1088e-05 - val_loss: 2.8882e-06

val_lossが極めて小さい値になっており、十分小さい値を出しています

精度

7から始まるhash値のデータセットで1000件検証したところ、99.9%でした(すごい)

$ python3 19-train.py --precision 
actual precision 99.9

予想

$ python3 19-train.py --predict --file=${FILE_PATH}

$ python3 19-train.py --predict --fild=
$ python3 19-train.py --predict --file=../keras-mojibake-grabled/eucs/000000123.txt 
Using TensorFlow backend.
this document is EUC. # <- EUCとして判別された

終わりに

モデルのサイズ自体は、151kbyteとかなりコンパクトに収まっていて、精度自体も実践的です。

Microsoft Excelなどで文字コードが判定されなく化けていていて、毎回、数分損失するので、ネットワーク自体は深いですが、軽量なので組み込んで利用することも可能かと思います。

このように、実際に機会学習を適応して、生活が豊かになると良いですね。

Cloud DataFlowをKotlinで書く

Cloud DataFlowをKotlinで書く

以前投稿した基本時な項目に加えて、特にバッチ処理における

  1. SQLでは難しいデータの集計の角度
  2. 入出力にJSONを使うことでデータのユーザの独自のデータ型の定義
  3. 複数のGCSのバケットを入力にする
  4. DataFlowのリソース管理

という項目を追加して、より実践的な側面と使うにあたって気をつけるべきポイントを示しています

Javaではなく、Kotlinで書くモチベーション

Kotlinが標準で採用しているラムダ式を用いたメソッドチェーンと、Cloud DataFlow(OSSの名前はApache Beam)の作りが類似しており、ローカルでのKotlinやScalaで書いた集計コードの発想を大きく書き換えることなく、Cloud DataFlowで利用できます。   

また、シンタックスもKotlinはJavaに比べると整理されており、データ分析という視点において、見通しが良く、最近はAndroidの開発や、サーバサイドの開発だけでなく、データサイエンスにも転用が可能となって来ております

Cloud DataFlowとは

GCPで提供されているクラウド集計プラットフォームで、Apache Beamという名前でOSSで公開されています。

Map Reduceの発展系のような印象を受ける作りになっており、何段階にもパイプラインを結合して様々な処理ができるようになっています

ストリーミング処理も得意なことがメリットとしてあげられていますが、バッチ処理も強力です

また、専用にあらかじめインスタンスを確保しておく必要がないため、サーバレスのビッグデータ環境のようにも扱えます(CPUやDISKの制約はGCPのComputing Engineと共用なようです)

  • Googleが考える、データの取り扱い

  • この処理に則って、様々な分析プラットフォームのクラウドサービスが展開されている

  • AmazonのElastic Map Reduceと競合する製品だと思われますが、サーバの台数に制限がないことと、自動リソース管理、Map Reduceの操作が二段階ではなく、任意の回数行うことができます

AWS Elastic Map Reduceとの違い

elastic map reduceイメージ

Google Cloud DataFlow

Amazon Elastic Map Reduceに比べて、多段にした処理を行うことができることが、最大のメリットだと感じます(複雑な集計が一気通貫でできる)

Google Cloud DataFlow特徴

  • JVMを基準としたプログラミング言語で分析・集計処理を書けるので、非構造化データに対応しやすい  
  • GCPのDataStorage(AWSのS3のようなもの)に保存するのでコストが安い  
  • Apache Sparkなどのラムダ式を用いたデータ構造の変換と操作が類似しており、データ集計に関する知識が利用できる  
  • SQLでないので、プログラムをしらないと集計できない(デメリット)

Requirements

Google Cloud SDKのインストールとセットアップ

ローカルのLinuxマシンからGCPに命令を送るのに、Google Cloud SDKのgcloudというツールをインストールしておく必要があります  

この例ではLinuxを対象としています

$ wget https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-sdk-158.0.0-linux-x86_64.tar.gz
$ tar zxvf google-cloud-sdk-158.0.0-linux-x86_64.tar.gz
$ ./google-cloud-sdk/install.sh 
Welcome to the Google Cloud SDK!
...
Do you want to help improve the Google Cloud SDK (Y/n)? <<- Yと入力
Do you want to continue (Y/n)?  <<- Yと入力

bashrcのリロード

$ source ~/.bashrc

gcloud initして、gcloudの認証を通します

$ gcloud init # gcloundのセットアップ
 [1] alicedatafoundation
 [2] machine-learning-173502
 [3] Create a new project
 Please enter numeric choice or text value (must exactly match list 
item): 2 # 使っているプロジェクトを選択

asia-northeast1-[a|b|c]のリージョンを設定

If you do not specify a zone via a command line flag while working 
with Compute Engine resources, the default is assumed.
 [1] asia-east1-c
 [2] asia-east1-b
 [3] asia-east1-a
 ...

クレデンシャル(秘密鍵などが記述されたjsonファイル)の環境設定の発行と設定   Google Apiで発行してもらう

クレデンシャルを環境変数に通します

必要ならば、bashrcに追加しておくと、ログイン時に自動でロードされます
(ターミナルでの利用を前提としています)

$ export GOOGLE_APPLICATION_CREDENTIALS=$HOME/gcp.json

GCP側のセットアップ操作

1. Projectの作成

1. Projectの作成

2. CloudStrageの作成

2. CloudStrageの作成

3. Keyの作成

3. Keyの作成

(ここで作成したjsonファイルはダウンロードして環境変数にセットしておく必要があります)

4. Codeを書く

4. Codeを書く

任意の集計処理を記述します

Kotlinで記述されたサンプルのコンパイル&実行

私が作成したKotlinのサンプルでの実行例です。   シェイクスピアの小説などの文章から、何の文字が何回出現したかをカウントするプログラムです
git clone(ダウンロード)

$ git clone https://github.com/GINK03/gcp-dataflow-kotlin-java-mix

コンパイル 

$ mvn package

クリーン(明示的にtargetなどのバイナリを消したい場合)

$ mvn clean

GCPに接続して実行

$ mvn exec:java

これを実行すると、ご自身のGCPのDataStrageに結果が出力されているのを確認できるかと思います

KotlinのDataFlowのプログラムの説明

多くの例では、Word(単語)カウントを行なっていますが、今回の例では、Char(文字)のカウントを行います  

Googleが無償で公開しているシェイクスピアのテキストを全て文字に分解して、group byを行いどの文字が何回出現しているのか、カウントします  

このプログラムを処理ブロック(一つのブロックはクラスの粒度で定義されている)で図示すると、このようになります

各クラスの定義はこのように行いました  

KotlinProc1クラス

public class KotlinProc1 : DoFn<String, String>() {
  override fun processElement(c : DoFn<String,String>.ProcessContext) {
    val elem = c.element()
    elem.toList().map { 
      val char = it.toString()
      c.output(char)
    }
  }
}

KotlinProc2クラス

public class KotlinProc2 : DoFn<String, KV<String, String>>() {
  override fun processElement(c : DoFn<String, KV<String,String>>.ProcessContext) {
    val char = c.element()
    c.output(KV.of(char, "1"))
  }
}

GroupByKey

 GroupByKey.create<String,String>() 

KotlinProc3クラス

public class KotlinProc3 : DoFn<KV<String, Iterable<String>>, String>() {
  override fun processElement(c : DoFn<KV<String, Iterable<String>>, String>.ProcessContext) {
    val key = c.element().getKey()
    val iter = c.element().getValue()
    val list = mutableListOf<String>()
    iter.forEach {  list.add(it.toString()) }
    c.output(key.toString() + ": " + list.size.toString()); 
  }
}

GCP DataFlowを用いず生のKotlinを用いて同等の処理を書く

類似していることが確認できます

import java.io.*
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
import java.io.BufferedReader
import java.io.InputStream
import java.io.File
import java.util.stream.Collectors
import java.util.stream.*
fun main( args : Array<String> ) {
  val dataframe = mutableListOf<String>() 
  Files.newDirectoryStream(Paths.get("contents/"), "*").map { name -> //データをオンメモリにロード
    val inputStream = File(name.toString()).inputStream()
    inputStream.bufferedReader().useLines { xs -> xs.forEach { dataframe.add(it)} }
  }

  dataframe.map {
    it.toList().map { // DataFlowのKotlinProc1に該当
      it.toString()
    }
  }.flatten().map { // DataFlowのKotlinProc2に該当
    Pair(it, "1")
  }.groupBy { // DataFlowのGroupByKeyに該当
    it.first
  }.toList().map { // DataFlowのKotlinProc3に該当
    val (key, arr) = it
    println("$key : ${arr.size}")
  }
}

WordCountレベルの、ここまでのまとめ

ローカルで分析すると一台のマシンで収まるメモリの量しか取り扱うことができないので、ビッグデータになると、必然的にスケーラブルなCloud DataFlowのようなサービスを利用する必要があります。
このように、ローカルでの分析の方法とビッグデータの分析の方法が似ていると、発想を切り替える必要がなかったり、一人でスモールなデータからビッグデータまで低いコストで分析できるので、生産性を高めることができます。

ビッグデータSQLの代わりにGoogle DataFlowを使うプラクティス

通常使うぶんにはSQLのインターフェースが用意されていることが多いですが、SQL以外で分析したい、SQLでは分析しにくい、などのモチベーションがる場合、Google DataStrageにコピーすることでも分析します。

minioのmcというコマンドでs3 -> gcsの同期を簡単に実行できます

$ mc mirror s3/any-s3-bucket/ gcs/any-gcs-bucket/

特定のキーでデータを直列化する

SQLでも無理くり頑張ればできないこともないのですが、かなりアドホックなのと、ビッグデータに全適応しようとする場合、かなり困難が伴います。

SQLがユーザ行動のような人によって可変長なテーブルの上に関連データベースとして表現しにくいからなのですが、無理にそのようにせず、KVSやDocument志向の発想を持ち込んで、特定のキーで転地インデックスを作成することが可能になります。

参考:SQLで転置インデックス

JSONを用いたデータ構造と変形

例えば、TreasureDataのダンプ情報は行志向でjsonで一行が表現されています。また、gzで圧縮されており、chunkingと呼ばれる適度なファイルサイズに断片化されています

そのため、Google DataFlowで処理するときは、jsonパーサが必要です

jsonのパースにはGsonが便利であり、型が不明なときはKotlinはAny型で受け取れるので、適切にリフレクションを用いれてば、複雑なデータ構造であっても、DataFlowの各ステップにデータを受け渡すことができます

こんな感じで処理すると便利

JSONシリアライズしたデータ構造などで統一することで、ユーザ定義型から解放されて、一応の汎用性を持つことが可能になります

また、特定のサイズまでシュリンクしたのち、ローカルマシンで、Pythonなどでjsonを読み取ることにより、最終的なデータの加工と機械学習が容易になります

具体例

public class KotlinProc1 : DoFn<String, String>() {
  // DoFnの定義がinput: String -> output: Stringとすることができる
  override fun processElement(c : DoFn<String,String>.ProcessContext) {
    // ここだけ、Kotlinだと切り出して、手元でコマンドラインでパイプ操作で再現することが楽なので、テストしながら開発できる
    val gson = Gson()                              
    val type = object : TypeToken<Map<String, Any>>() {}.type 

    val elem = c.element()
    val recover:Map<String,Any> = gson.fromJson<Map<String, Any>>(elem, type)                      
    if( recover.get("gender_age") == null )      
      return
    if( recover.get("os") == null)               
      return
    if( recover.get("uuid") == null || recover.get("uuid")!! == "null")
      return
    val gender_age = (recover["gender_age"]!! as Double).toInt() // <- データ中でデータの型がが判明してるならば、as Typeで変換できる                          
    val os = recover["os"]!! as String           
    val uuid = recover["uuid"] as String  
    val urlreq = try {                           
      "keyword=(.*?)&".toRegex().find( URLDec.decode( URLDec.decode( recover["request_uri"]!! as String, "UTF-8" ), "UTF-8" ) )?.groups?.get(1)?.value ?: null  // <- このように複雑なテーブルの中のデータを受け取ることができる                                    
                  
    } catch( ex : Exception ) { null }           
    if( urlreq == null || urlreq == "" )         
      return  
    // 出力の段階でここをjsonで出すようにすると、outputがList<Any>をシリアライズした、Stringに限定できるので、IFの定義が楽
    val output = gson.toJson( listOf(gender_age, os, uuid, urlreq) )
    c.output(output)
  }
}

複数のデータソースの利用

GCPの複数のDataStorageのファイルを入力し、特定のキーで結合したいなどの場合があるかと思います。

複数のインプットを同時に入力する方法が見つからず、公式ドキュメントをかなり漁りましたが、見つからず難儀していました。

DataFlowのSDK1.X系では、パイプラインを結合して、任意の処理にするという発想なので、inputのパイプラインを二種類以上用意して、Flattenして結合するという発想になるようです。

fun runner(options: DataflowPipelineOptions, input:String, output: String) {
  val p:Pipeline = Pipeline.create(options)
  val events1:PCollection<String> = p.apply(TextIO.Read.from("gs://input1/*"));
  val events2:PCollection<String> = p.apply(TextIO.Read.from("gs://input2/*"));
  val eventsList = PCollectionList.of(events1).and(events2)
  val events = eventsList.apply(Flatten.pCollections())
  
  events
    .apply( ParDo.named("ExtractMap1").of( KotlinProc1() ) )
    .apply( ParDo.named("MakeTransit").of( KotlinProc2() ) )
    .apply( GroupByKey.create<String,String>() )
    .apply( ParDo.named("FormatResults").of( KotlinProc3() ) )
    .apply( TextIO.Write.to(output) )
  p.run()
}

DataFlowの管理画面ではこのように見ることができます

コンピュータリソースが必要な箇所

HadoopにおけるMapの処理の際は弱いCPUをいくつも並列化することで、データの変換を行うことができますが、Reduceの処理につなぐ時に、特定のキーでshuffle & sortが必要になります。

この操作がメモリとディスクを大量に消費して、場合によってはコンピュータのディスクやメモリを増やす必要が出てきます。

この制約は、GCP Cloud DataFlowにもあって、謎のUnknownエラーで落ちらた、リソース不足を疑うと良いかもしれません(Unknownのせいで48時間程度溶かしました...)

DataFlowでは、GroupByKeyでコンピュータリソースを大量に消費するので、この前後のパイプラインで落ちていたら、ヒントになりえます。

リソース不足の例、GroupByKeyのステップがエラーになります...

このようなエラーが出た際には、以下の対応が有効でした

  1. マシンのメモリを増やす
  2. 動作させるワーカーの数を増加させる
  3. 一台当たりのディスクサイズを増やす

これは、pipelineを構築する際のconfigで設定できます

fun main( args : Array<String> ) {
  val options = JavaDataFlowUtils.getOptionInstance()
  // define project name
  options.setProject("machine-learning-173502")
  // define max workers (max_workerを増加させます、並列で動作させるマシンの台数の最大値です)
  options.setMaxNumWorkers(128)
  // disk size(マシン一台当たりのディスクサイズ数です、GBが単位です)
  options.setDiskSizeGb(1024*2)
  // machine type(インスタンスのタイプです、メモリ優先タイプを選択しています)
  options.setWorkerMachineType("n1-highmem-4")
  // define staging directory
  options.setStagingLocation( "gs://dataflow-stagings-machine-learning/stating-36" )
  // args order, 1st -> options, 2nd -> input data bucket, 3rd -> output data bucket
  runner(options, "gs://treasuredata-dump/20171221-json/export.*",
                  "gs://dataflow-output-machine-learning/keyword_uuid_timeseries-categories-17/*" )
}

コード

まとめ

Cloud DataFlowはサーバを自社に持つことなく、ビッグデータの分析を行うことができる素晴らしい基盤です。

AWS EMRと比較しても、速度の面において2倍ぐらい早く感じるのと、インスタンスを事前に予約する必要がなく、立ち上がりも早いです

今回はDataStorageに溜まったデータを一気に分析する、バッチ処理を行いましたが、AWS EMR, AWS Athena, AWS RedShift, Apache Spark, Apache Hadoop, GCP BigQueryなども使いましたが、柔軟性と速度の両立という視点では一番優れているように思います。すごい