にほんごのれんしゅう

日本語として伝えるための訓練を兼ねたテクログ

CNNによる文字コード不明なドキュメントの推定

CNNによる文字コード不明なドキュメントの推定

Advent Calender遅刻いい訳

  1. 年末忙しすぎた
  2. ネタと期待していたいくつかがまともに結果が出ずに苦しい思いをしていた
  3. 元URLの喪失

バイト列から文字コーディングを推定する

Twitterで時々バズるネタとして、機械学習がこれほどもてはやされるのに、今だにBrowserは時々文字化けし、ExcelはUTF8を突っ込むと文字化けし、到底、文化的で最低限の人権が保護された状態ではありません。

実際、ルールベースで推定しようとすると、この様にshift jisとeucでは完全に背反な情報を使っているわけでないので、なんらかのヒューリスティックなルールを人間が作成して対応していたのだと思いますが、この様なユースケースの場合、機械学習が強い力を発揮します。

図1. sjiseuc文字コードのバイト列のマップ(参考:smdn)

その度、「それ、できると思うよ」って言い返していたのですが、実証実験を行いたいと思います。

なんの機械学習アルゴリズムがいいか

ニュースサイトをスクレイピングすると、大量のUTF8のテキスト情報が取得できます

このテキスト情報をもとに、nkfというコマンドで、euc, sjis文字コードに変換して、様々な文字コードのバージョンを作ります

Pythonやいくつかの言語では、UTF8以外を扱うとバグるのですが、バイト列としてみなすと読み込みが可能になり、バイト列にはなんらかの特徴が見て取れそうです(仮説)

バイト列をベクトル化して、CNNのテキスト分類の機械学習で分類することが良さそうです

ネットワーク

VGGのネットワークを参考に編集しました。

図2. 作成したネットワーク

目的関数

微妙な判断結果になった場合、確率を正しく出力したいので、sotfmaxではなく、3つのsigmoidを出力して、それぞれのbinary cross entropyを損失としています

出力の解釈性が良いので個人的によく使うテクニックです

コード

全体のコードはgithubにあります

CBRDという関数はosciiartさんの作り方を参考にさせていただきました

def CBRD(inputs, filters=64, kernel_size=3, droprate=0.5):
  x = Conv1D(filters, kernel_size, padding='same',
            kernel_initializer='random_normal')(inputs)
  x = BatchNormalization()(x)
  x = Activation('relu')(x)
  return x

input_tensor = Input( shape=(300, 95) )

x = input_tensor
x = CBRD(x, 2)
x = CBRD(x, 2)
x = MaxPool1D()(x)

x = CBRD(x, 4)
x = CBRD(x, 4)
x = MaxPool1D()(x)

x = CBRD(x, 8)
x = CBRD(x, 8)
x = MaxPool1D()(x)

x = CBRD(x, 16)
x = CBRD(x, 16)
x = CBRD(x, 16)
x = MaxPool1D()(x)

x = CBRD(x, 32)
x = CBRD(x, 32)
x = CBRD(x, 32)
x = MaxPool1D()(x)

x = Flatten()(x)
x = Dense(3, name='dense_last', activation='sigmoid')(x)
model = Model(inputs=input_tensor, outputs=x)
model.compile(loss='binary_crossentropy', optimizer='adam')

データセット

nifty newsさんniconico newsさんのニュースコーパスを利用しました。

zipファイルを分割して圧縮しています

もし、お手元で試していただいて性能が出ないと感じる場合は、おそらく、コーパスの属性があっていないものですので、再学習してもいいと思います

https://github.com/GINK03/keras-cnn-character-code-detection/tree/master/dataset

前処理

dbmに入ったデータセットから内容をテキストファイルで取り出します

$ python3 14-make_files.py

nkfを使ってeucのデータセットを作成します(Python2で実行)

$ python2 15-make_euc.py

nkfを使ってsjisのデータセットを作成します(Python2で実行)

$ python2 16-make_shiftjis.py

byte表現に対してindexをつけます(Python3で実行)

$ python3 17-unicode_vector.py 

最終的に用いるデータセットを作成してKVSに格納します(LevelDBが必要)

$ python3 18-make_pair.py

学習

$ python3 19-train.py --train

テストデータにおける精度

hash値でデータを管理していて、7から始まるデータをテストデータしています

Train on 464 samples, validate on 36 samples
Epoch 1/1
464/464 [==============================] - 1s 1ms/step - loss: 2.1088e-05 - val_loss: 2.8882e-06

val_lossが極めて小さい値になっており、十分小さい値を出しています

精度

7から始まるhash値のデータセットで1000件検証したところ、99.9%でした(すごい)

$ python3 19-train.py --precision 
actual precision 99.9

予想

$ python3 19-train.py --predict --file=${FILE_PATH}

$ python3 19-train.py --predict --fild=
$ python3 19-train.py --predict --file=../keras-mojibake-grabled/eucs/000000123.txt 
Using TensorFlow backend.
this document is EUC. # <- EUCとして判別された

終わりに

モデルのサイズ自体は、151kbyteとかなりコンパクトに収まっていて、精度自体も実践的です。

Microsoft Excelなどで文字コードが判定されなく化けていていて、毎回、数分損失するので、ネットワーク自体は深いですが、軽量なので組み込んで利用することも可能かと思います。

このように、実際に機会学習を適応して、生活が豊かになると良いですね。

Cloud DataFlowをKotlinで書く

Cloud DataFlowをKotlinで書く

以前投稿した基本時な項目に加えて、特にバッチ処理における

  1. SQLでは難しいデータの集計の角度
  2. 入出力にJSONを使うことでデータのユーザの独自のデータ型の定義
  3. 複数のGCSのバケットを入力にする
  4. DataFlowのリソース管理

という項目を追加して、より実践的な側面と使うにあたって気をつけるべきポイントを示しています

Javaではなく、Kotlinで書くモチベーション

Kotlinが標準で採用しているラムダ式を用いたメソッドチェーンと、Cloud DataFlow(OSSの名前はApache Beam)の作りが類似しており、ローカルでのKotlinやScalaで書いた集計コードの発想を大きく書き換えることなく、Cloud DataFlowで利用できます。   

また、シンタックスもKotlinはJavaに比べると整理されており、データ分析という視点において、見通しが良く、最近はAndroidの開発や、サーバサイドの開発だけでなく、データサイエンスにも転用が可能となって来ております

Cloud DataFlowとは

GCPで提供されているクラウド集計プラットフォームで、Apache Beamという名前でOSSで公開されています。

Map Reduceの発展系のような印象を受ける作りになっており、何段階にもパイプラインを結合して様々な処理ができるようになっています

ストリーミング処理も得意なことがメリットとしてあげられていますが、バッチ処理も強力です

また、専用にあらかじめインスタンスを確保しておく必要がないため、サーバレスのビッグデータ環境のようにも扱えます(CPUやDISKの制約はGCPのComputing Engineと共用なようです)

  • Googleが考える、データの取り扱い

  • この処理に則って、様々な分析プラットフォームのクラウドサービスが展開されている

  • AmazonのElastic Map Reduceと競合する製品だと思われますが、サーバの台数に制限がないことと、自動リソース管理、Map Reduceの操作が二段階ではなく、任意の回数行うことができます

AWS Elastic Map Reduceとの違い

elastic map reduceイメージ

Google Cloud DataFlow

Amazon Elastic Map Reduceに比べて、多段にした処理を行うことができることが、最大のメリットだと感じます(複雑な集計が一気通貫でできる)

Google Cloud DataFlow特徴

  • JVMを基準としたプログラミング言語で分析・集計処理を書けるので、非構造化データに対応しやすい  
  • GCPのDataStorage(AWSのS3のようなもの)に保存するのでコストが安い  
  • Apache Sparkなどのラムダ式を用いたデータ構造の変換と操作が類似しており、データ集計に関する知識が利用できる  
  • SQLでないので、プログラムをしらないと集計できない(デメリット)

Requirements

Google Cloud SDKのインストールとセットアップ

ローカルのLinuxマシンからGCPに命令を送るのに、Google Cloud SDKのgcloudというツールをインストールしておく必要があります  

この例ではLinuxを対象としています

$ wget https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-sdk-158.0.0-linux-x86_64.tar.gz
$ tar zxvf google-cloud-sdk-158.0.0-linux-x86_64.tar.gz
$ ./google-cloud-sdk/install.sh 
Welcome to the Google Cloud SDK!
...
Do you want to help improve the Google Cloud SDK (Y/n)? <<- Yと入力
Do you want to continue (Y/n)?  <<- Yと入力

bashrcのリロード

$ source ~/.bashrc

gcloud initして、gcloudの認証を通します

$ gcloud init # gcloundのセットアップ
 [1] alicedatafoundation
 [2] machine-learning-173502
 [3] Create a new project
 Please enter numeric choice or text value (must exactly match list 
item): 2 # 使っているプロジェクトを選択

asia-northeast1-[a|b|c]のリージョンを設定

If you do not specify a zone via a command line flag while working 
with Compute Engine resources, the default is assumed.
 [1] asia-east1-c
 [2] asia-east1-b
 [3] asia-east1-a
 ...

クレデンシャル(秘密鍵などが記述されたjsonファイル)の環境設定の発行と設定   Google Apiで発行してもらう

クレデンシャルを環境変数に通します

必要ならば、bashrcに追加しておくと、ログイン時に自動でロードされます
(ターミナルでの利用を前提としています)

$ export GOOGLE_APPLICATION_CREDENTIALS=$HOME/gcp.json

GCP側のセットアップ操作

1. Projectの作成

1. Projectの作成

2. CloudStrageの作成

2. CloudStrageの作成

3. Keyの作成

3. Keyの作成

(ここで作成したjsonファイルはダウンロードして環境変数にセットしておく必要があります)

4. Codeを書く

4. Codeを書く

任意の集計処理を記述します

Kotlinで記述されたサンプルのコンパイル&実行

私が作成したKotlinのサンプルでの実行例です。   シェイクスピアの小説などの文章から、何の文字が何回出現したかをカウントするプログラムです
git clone(ダウンロード)

$ git clone https://github.com/GINK03/gcp-dataflow-kotlin-java-mix

コンパイル 

$ mvn package

クリーン(明示的にtargetなどのバイナリを消したい場合)

$ mvn clean

GCPに接続して実行

$ mvn exec:java

これを実行すると、ご自身のGCPのDataStrageに結果が出力されているのを確認できるかと思います

KotlinのDataFlowのプログラムの説明

多くの例では、Word(単語)カウントを行なっていますが、今回の例では、Char(文字)のカウントを行います  

Googleが無償で公開しているシェイクスピアのテキストを全て文字に分解して、group byを行いどの文字が何回出現しているのか、カウントします  

このプログラムを処理ブロック(一つのブロックはクラスの粒度で定義されている)で図示すると、このようになります

各クラスの定義はこのように行いました  

KotlinProc1クラス

public class KotlinProc1 : DoFn<String, String>() {
  override fun processElement(c : DoFn<String,String>.ProcessContext) {
    val elem = c.element()
    elem.toList().map { 
      val char = it.toString()
      c.output(char)
    }
  }
}

KotlinProc2クラス

public class KotlinProc2 : DoFn<String, KV<String, String>>() {
  override fun processElement(c : DoFn<String, KV<String,String>>.ProcessContext) {
    val char = c.element()
    c.output(KV.of(char, "1"))
  }
}

GroupByKey

 GroupByKey.create<String,String>() 

KotlinProc3クラス

public class KotlinProc3 : DoFn<KV<String, Iterable<String>>, String>() {
  override fun processElement(c : DoFn<KV<String, Iterable<String>>, String>.ProcessContext) {
    val key = c.element().getKey()
    val iter = c.element().getValue()
    val list = mutableListOf<String>()
    iter.forEach {  list.add(it.toString()) }
    c.output(key.toString() + ": " + list.size.toString()); 
  }
}

GCP DataFlowを用いず生のKotlinを用いて同等の処理を書く

類似していることが確認できます

import java.io.*
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
import java.io.BufferedReader
import java.io.InputStream
import java.io.File
import java.util.stream.Collectors
import java.util.stream.*
fun main( args : Array<String> ) {
  val dataframe = mutableListOf<String>() 
  Files.newDirectoryStream(Paths.get("contents/"), "*").map { name -> //データをオンメモリにロード
    val inputStream = File(name.toString()).inputStream()
    inputStream.bufferedReader().useLines { xs -> xs.forEach { dataframe.add(it)} }
  }

  dataframe.map {
    it.toList().map { // DataFlowのKotlinProc1に該当
      it.toString()
    }
  }.flatten().map { // DataFlowのKotlinProc2に該当
    Pair(it, "1")
  }.groupBy { // DataFlowのGroupByKeyに該当
    it.first
  }.toList().map { // DataFlowのKotlinProc3に該当
    val (key, arr) = it
    println("$key : ${arr.size}")
  }
}

WordCountレベルの、ここまでのまとめ

ローカルで分析すると一台のマシンで収まるメモリの量しか取り扱うことができないので、ビッグデータになると、必然的にスケーラブルなCloud DataFlowのようなサービスを利用する必要があります。
このように、ローカルでの分析の方法とビッグデータの分析の方法が似ていると、発想を切り替える必要がなかったり、一人でスモールなデータからビッグデータまで低いコストで分析できるので、生産性を高めることができます。

ビッグデータSQLの代わりにGoogle DataFlowを使うプラクティス

通常使うぶんにはSQLのインターフェースが用意されていることが多いですが、SQL以外で分析したい、SQLでは分析しにくい、などのモチベーションがる場合、Google DataStrageにコピーすることでも分析します。

minioのmcというコマンドでs3 -> gcsの同期を簡単に実行できます

$ mc mirror s3/any-s3-bucket/ gcs/any-gcs-bucket/

特定のキーでデータを直列化する

SQLでも無理くり頑張ればできないこともないのですが、かなりアドホックなのと、ビッグデータに全適応しようとする場合、かなり困難が伴います。

SQLがユーザ行動のような人によって可変長なテーブルの上に関連データベースとして表現しにくいからなのですが、無理にそのようにせず、KVSやDocument志向の発想を持ち込んで、特定のキーで転地インデックスを作成することが可能になります。

参考:SQLで転置インデックス

JSONを用いたデータ構造と変形

例えば、TreasureDataのダンプ情報は行志向でjsonで一行が表現されています。また、gzで圧縮されており、chunkingと呼ばれる適度なファイルサイズに断片化されています

そのため、Google DataFlowで処理するときは、jsonパーサが必要です

jsonのパースにはGsonが便利であり、型が不明なときはKotlinはAny型で受け取れるので、適切にリフレクションを用いれてば、複雑なデータ構造であっても、DataFlowの各ステップにデータを受け渡すことができます

こんな感じで処理すると便利

JSONシリアライズしたデータ構造などで統一することで、ユーザ定義型から解放されて、一応の汎用性を持つことが可能になります

また、特定のサイズまでシュリンクしたのち、ローカルマシンで、Pythonなどでjsonを読み取ることにより、最終的なデータの加工と機械学習が容易になります

具体例

public class KotlinProc1 : DoFn<String, String>() {
  // DoFnの定義がinput: String -> output: Stringとすることができる
  override fun processElement(c : DoFn<String,String>.ProcessContext) {
    // ここだけ、Kotlinだと切り出して、手元でコマンドラインでパイプ操作で再現することが楽なので、テストしながら開発できる
    val gson = Gson()                              
    val type = object : TypeToken<Map<String, Any>>() {}.type 

    val elem = c.element()
    val recover:Map<String,Any> = gson.fromJson<Map<String, Any>>(elem, type)                      
    if( recover.get("gender_age") == null )      
      return
    if( recover.get("os") == null)               
      return
    if( recover.get("uuid") == null || recover.get("uuid")!! == "null")
      return
    val gender_age = (recover["gender_age"]!! as Double).toInt() // <- データ中でデータの型がが判明してるならば、as Typeで変換できる                          
    val os = recover["os"]!! as String           
    val uuid = recover["uuid"] as String  
    val urlreq = try {                           
      "keyword=(.*?)&".toRegex().find( URLDec.decode( URLDec.decode( recover["request_uri"]!! as String, "UTF-8" ), "UTF-8" ) )?.groups?.get(1)?.value ?: null  // <- このように複雑なテーブルの中のデータを受け取ることができる                                    
                  
    } catch( ex : Exception ) { null }           
    if( urlreq == null || urlreq == "" )         
      return  
    // 出力の段階でここをjsonで出すようにすると、outputがList<Any>をシリアライズした、Stringに限定できるので、IFの定義が楽
    val output = gson.toJson( listOf(gender_age, os, uuid, urlreq) )
    c.output(output)
  }
}

複数のデータソースの利用

GCPの複数のDataStorageのファイルを入力し、特定のキーで結合したいなどの場合があるかと思います。

複数のインプットを同時に入力する方法が見つからず、公式ドキュメントをかなり漁りましたが、見つからず難儀していました。

DataFlowのSDK1.X系では、パイプラインを結合して、任意の処理にするという発想なので、inputのパイプラインを二種類以上用意して、Flattenして結合するという発想になるようです。

fun runner(options: DataflowPipelineOptions, input:String, output: String) {
  val p:Pipeline = Pipeline.create(options)
  val events1:PCollection<String> = p.apply(TextIO.Read.from("gs://input1/*"));
  val events2:PCollection<String> = p.apply(TextIO.Read.from("gs://input2/*"));
  val eventsList = PCollectionList.of(events1).and(events2)
  val events = eventsList.apply(Flatten.pCollections())
  
  events
    .apply( ParDo.named("ExtractMap1").of( KotlinProc1() ) )
    .apply( ParDo.named("MakeTransit").of( KotlinProc2() ) )
    .apply( GroupByKey.create<String,String>() )
    .apply( ParDo.named("FormatResults").of( KotlinProc3() ) )
    .apply( TextIO.Write.to(output) )
  p.run()
}

DataFlowの管理画面ではこのように見ることができます

コンピュータリソースが必要な箇所

HadoopにおけるMapの処理の際は弱いCPUをいくつも並列化することで、データの変換を行うことができますが、Reduceの処理につなぐ時に、特定のキーでshuffle & sortが必要になります。

この操作がメモリとディスクを大量に消費して、場合によってはコンピュータのディスクやメモリを増やす必要が出てきます。

この制約は、GCP Cloud DataFlowにもあって、謎のUnknownエラーで落ちらた、リソース不足を疑うと良いかもしれません(Unknownのせいで48時間程度溶かしました...)

DataFlowでは、GroupByKeyでコンピュータリソースを大量に消費するので、この前後のパイプラインで落ちていたら、ヒントになりえます。

リソース不足の例、GroupByKeyのステップがエラーになります...

このようなエラーが出た際には、以下の対応が有効でした

  1. マシンのメモリを増やす
  2. 動作させるワーカーの数を増加させる
  3. 一台当たりのディスクサイズを増やす

これは、pipelineを構築する際のconfigで設定できます

fun main( args : Array<String> ) {
  val options = JavaDataFlowUtils.getOptionInstance()
  // define project name
  options.setProject("machine-learning-173502")
  // define max workers (max_workerを増加させます、並列で動作させるマシンの台数の最大値です)
  options.setMaxNumWorkers(128)
  // disk size(マシン一台当たりのディスクサイズ数です、GBが単位です)
  options.setDiskSizeGb(1024*2)
  // machine type(インスタンスのタイプです、メモリ優先タイプを選択しています)
  options.setWorkerMachineType("n1-highmem-4")
  // define staging directory
  options.setStagingLocation( "gs://dataflow-stagings-machine-learning/stating-36" )
  // args order, 1st -> options, 2nd -> input data bucket, 3rd -> output data bucket
  runner(options, "gs://treasuredata-dump/20171221-json/export.*",
                  "gs://dataflow-output-machine-learning/keyword_uuid_timeseries-categories-17/*" )
}

コード

まとめ

Cloud DataFlowはサーバを自社に持つことなく、ビッグデータの分析を行うことができる素晴らしい基盤です。

AWS EMRと比較しても、速度の面において2倍ぐらい早く感じるのと、インスタンスを事前に予約する必要がなく、立ち上がりも早いです

今回はDataStorageに溜まったデータを一気に分析する、バッチ処理を行いましたが、AWS EMR, AWS Athena, AWS RedShift, Apache Spark, Apache Hadoop, GCP BigQueryなども使いましたが、柔軟性と速度の両立という視点では一番優れているように思います。すごい

Google Cloud FunctionをPythonで使う

世間ではAWS Lambdaばかり着目されますが、GoogleもCloud Functionと呼ばれるLambdaに相当する機能を提供しています

LambdaがPython,JS,Javaなどをサポートしているのに比べて、Cloud FunctionはJSのみのサポートとなっていています

Python3(PyPy3)をGoogle Clund Functionにデプロイして、実質的にPythonで使えるようにして、いくつかの応用例を示したいと思います

目次

  • A. nodejsでしか動かないはずのCloud FunctionでPythonを使う  
  • B. gcloud-toolのインストール
  • C. コード書いてデプロイする
  • D. リクエストを送ってみる
  • 調査: Cloud FunctionでScraperは使えるか
  • 例: リクエスト送った人のGlobal IPを返すだけの例
  • 例: (ユーザ行動などのIoT情報を取得する)ビーコンのデータを受け取りCloud Strageに格納する
  • 例: Amazon Dash Buttonより便利な、クラウド操作ボタンをスマホに作る
  • E: まとめ

A. nodejsでしか動かないはずのCloud FunctionでPythonを使う 

1. 環境依存がないPyPy3を利用する

色々試した結果、いろんなLinuxの環境で動くように調整されたコンパイル済みで環境依存の少ないpypy3を利用することで、Google Cloud FunctionでPython3を利用できることがわかりました(どうしてもPython3の文法を使いたい主義)

PyPy

$ bzip2 -d pypy3-v5.9.0-linux64.tar.bz2
$ tar xvf pypy3-v5.9.0-linux64.tar
$ mv pypy3-v5.9.0-linux64 {YOUR_GOOGLE_CLOUD_FUNCTION_DIR}

pipの機能を有効化します

$ ./pypy3-v5.9.0-linux64/bin/pypy3 -m ensurepip

2. 動作が期待できるライブラリ

OSがDebianでversionがよくわかっていません、そのため、手元のLinuxなどでコンパイルが必要なライブラリをコンパイルして送っても、動作しないことがあります。

どうしても動作させたいライブラリがある場合はCloud FunctionのLinuxのlibcやインストールされているshared objectを分析調査するスクリプトを別途記述して、確認する必要があります

    1. numpy
    1. requests
    1. BeautifulSoup4

など、PurePythonで記述されたものと、PyPyで正式にサポートされているnumpyなどは動作します

3. PyPy3にライブラリをインストール

pipはサポートされているので、このように任意の(限定はされていますが)インストールすることができます

$ ./pypy3-v5.9.0-linux64/bin/pypy3 -m pip install flask

B. gcloud-toolのインストール

任意のLinuxで動作する方法を示します

何度かこのツールを使っていますが、aptやyumレポジトリを利用するより、直接バイナリをダウンロードして来た方が安定性が良い気がします

Googleからダウンロードすることができます

$ wget https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-sdk-170.0.1-linux-x86_64.tar.gz
$ tar zxvf google-cloud-sdk-170.0.1-linux-x86_64.tar.gz?hl=ja
$ ./google-cloud-sdk/install.sh
$ ./google-cloud-sdk/bin/gcloud init
(各種、認証が求められるので、通しましょう)

.bashrcにこの記述を追加すると、相対パスを入力しなくても使えます

PATH=$HOME/google-cloud-sdk/bin:$PATH

Cloud Functionはオプション扱いらしく、こうすることで正しくインストールすることができます

$ gcloud components update beta && gcloud components install

Cloud Functionのコードやバイナリを置くbacketを作ります

$ gsutil mb gs://{YOUR_STAGING_BUCKET_NAME}

C. コード書いてデプロイする

ディレクトリの中で作業すると、そのディレクトリの中身全てがGoogle Cloud Functionのコンテナにデプロイされますので、あまり大きなファイルはおけないようです

エントリーポイント(Cloud Functionが呼びされた時に最初に実行される関数)はindex.jsという風になっています。

spawnというプロセス間通信を使うと、このJavaScriptのファイルと一緒にデプロイされPyPy3が実行されて、結果を得ることができます

const spawnSync = require('child_process').spawnSync;
exports.pycall = function pycall(req, res) {
  result = spawnSync('./pypy3-v5.9.0-linux64/bin/pypy3', ['./inspect.py'], {
    stdio: 'pipe',
  });

  if (result.stdout){
    res.status(200).send(result.stdout);
  }else if (result.stderr){
    res.status(200).send(result.stderr);
  }
};

デプロイはこのように行います

$ gcloud beta functions deploy ${YOUR_CLOUD_FUNCTION_NAME} --stage-bucket ${YOUR_STAGING_BUCKET} --trigger-http

D. リクエストを送ってみる

コードをデプロイしたタイミングでapiのURLが標準出力に表示されるので、そのURLを参照すると、Cloud Functionが実行されます

$ curl https://${YOUR_PROJECT}.cloudfunctions.net/pycall

curljsonをポストする例

$ curl -X POST -H "Content-Type:application/json"  -d '{"message":"hello world!"}' https://${YOUR_PROJECT}.cloudfunctions.net/pycall

調査: Cloud FunctionでScraperは使えるか

AWS LambdaではFunctionを実行するたびに、IPなどが変わることがあるので、スクレイパーとしても利用することが期待できるのですが、Google Cloud Functionではどうでしょうか

1000回、Cloud Functionを呼び出して、その時のGlobal IPを調べて、どのような分布になっているか調べました
(Global IPを調べるサイトのAPIの制限で、累積値が1000になっていませんが、IPのレンジはAWSより広くなく、固まっている印象があります。また、やはりコンテナはなんども再利用されているようです)

107.178.232.249 8
107.178.232.247 8
107.178.232.181 7
107.178.236.24 7
107.178.238.51 6
107.178.236.4 6 
107.178.236.8 6 
107.178.232.167 6
107.178.237.16 5
107.178.232.180 4

IPという視点で見ると、効率的に使うことは現時点ではあまり期待できなそうです

例: リクエスト送った人のGlobal IPを返すだけの例

やってて思ったのですが、自分のマシンにscpで外部からデータを持ってこようという時に、いちいちiPhoneに記されたIPアドレス帳を参照していたのですが、コマンドを叩いてverboseを利用するより個人的には、jqなどのコマンドで確認できる方が望ましいと考えています

そのため、リクエスト送信元のheaderをjsonに変換してそのままインデントをつけて返します

index.js

const spawnSync = require('child_process').spawnSync;
exports.reflection = function reflection(req, res) {
  result = spawnSync('./pypy3-v5.9.0-linux64/bin/pypy3', ['./reflection.py'], {
    stdio: 'pipe',
    input: JSON.stringify(req.headers)
  });
  if (result.stdout){
    res.status(200).send(result.stdout);
  }else if (result.stderr){
    res.status(200).send(result.stderr);
  }
};

reflection.py

import json
print(json.dumps(json.loads(input()), indent=2))

デプロイしてクエリを投げてみます

$ sh deploy.sh 
$ curl  https://us-central1-wild-yukikaze.cloudfunctions.net/reflection2

出力結果はjsonフォーマットで、最初から結構見やすい!

$ curl  https://us-central1-wild-yukikaze.cloudfunctions.net/reflection2
{
  "host": "us-central1-wild-yukikaze.cloudfunctions.net",
  "user-agent": "curl/7.55.1",
  "accept": "*/*",
  "function-execution-id": "03jbvskqvfyu",
  "x-appengine-api-ticket": "a140cc827b21f195",
  "x-appengine-city": "arakawa",
  "x-appengine-citylatlong": "35.736080,139.783369",
  "x-appengine-country": "JP",
  "x-appengine-https": "on",
  "x-appengine-region": "13",
  "x-appengine-user-ip": "118.241.189.54",
  "x-cloud-trace-context": "8ab2a49b8cd1c80b068daaafda2c85a1/10677056975691001014;o=1",
  "x-forwarded-for": "118.241.189.54",
  "accept-encoding": "gzip"
}

(ユーザ行動などのIoT情報を取得する)ビーコンのデータを受け取りCloud Strageに格納する

アドテクというか、ユーザのサイト内での回遊情報を調べるのに一般的に、ページのどこまでを視認したか、スクロールしたか、PCなのかスマホなのか、画面のサイズは、ブラウザは、オーガニック検索なのか、直帰率はどうなのか、マウスオーバー情報はどうなのか、といった視点がJavaScriptで取得可能であることは、広く知られたことだと思います

これらの複雑なJavaScriptを受け取り、Cloud Strage(AWS S3のようなもの)に書き込むことができれば、サーバレスで行動ログを測定 -> 保存までできます。  

さらに、DataFlowともプロセスをつなぐことができますので、実質的に、集計項目の設計、JSの実装(これは外部)、デプロイ、測定、分析、施策がEnd2Endでできやすくなって、素早いイテレーションを回せそうで、すごくいいです

こんな感じのEnd2Endで観測、集計、分析までできたらうれしい!

ブラウザ側のjavascriptは割愛します

index.js

header, post, getなどの全てのパラメータをpythonに渡します

const spawnSync = require('child_process').spawnSync;
exports.pycall_gcs = function pycall_gcs(req, res) {
  result = spawnSync('./pypy3-v5.9.0-linux64/bin/pypy3', ['./cloudstrage-push.py'], {
    stdio: 'pipe',
    input: JSON.stringify({'headers':req.headers, 'body':req.body, 'query':req.query})
  });
  if (result.stdout){
    res.status(200).send(result.stdout);
  }else if (result.stderr){
    res.status(200).send(result.stderr);
  }
};

cloudstrage-push.py
googleのCloud Strageに書き込む権限を与えたcredentialファイルと共にdeployして、ユーザやアクセス度(ここは適切だろう粒度で設計する必要があります)でuuidやhashで、blob(Cloud Strageにおけるファイル単位)を作り、書き込んで行くことができます

import os
import zipfile
import json
import uuid
import sys
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = './credentials.json'
raw = input()
try:
  from google.cloud import storage
  from google.cloud.storage import Blob
  client = storage.Client()
  bucket = client.get_bucket('wired-ant')
  print(bucket, file=sys.stderr)
  uuid = '%s.shard'%uuid.uuid4()
  blob = bucket.get_blob(uuid)
  if blob is None:
    print(blob, file=sys.stderr)
    blob = Blob(uuid, bucket)
    source = ''
  else:
    source = blob.download_as_string().decode()
  blob.upload_from_string( source + raw + '\n', content_type='text/plain')
except Exception as ex:
  print(ex)

試しに自分の無料のGCP枠で自分のgithub.ioに入れてやって見ましたけど、期待した通り動作していることを確認しました。   ただ、上書きが一度ロードしてからでないとできないので、何かうまくchunkingする方法を考えている次第です(しないという手もあります)。  

自分の行動ログが正しく書き込まれていることが確認できました(ぼっちっぽい)

例: Amazon Dash Buttonより便利な、クラウド操作ボタンをスマホに作る

Cloud Functionでも、AWS Lambdaでもいいのですが、ハマりどころが微妙にあまりないという制約があります

AWS Lambdaなどを繋ぎに、各種サービスをつないでいっていって価値のあるサービスを創出することが、一つの課題なのですが、単独で使おうとすると、リソース的な制約が大きく、機械学習も難しい(少なくとも現時点では)ので、ツール系になりがちです。

例えば、私は毎月クラウドの料金に苦しめられるのですが、計算が重いと言われる機械学習とはいえ、深層学習をのぞいて、朝起きて出社して、GCPAWSにログインして、インスタンスを起動して、何かやって、帰り際にシャットダウンして〜とするので、120秒は溶けますし、というか、めんどくさいはプライスレスです

試しに、GCPで私が契約しているリージョンのインスタンスを出社して作業を開始する前に、一括起動し、IPアドレスを確認し、帰り際に一括シャットダウンします

例えば、iPhoneのホームボタンにはURLショートカットを載せることができて、urlショートカットにはurlパラメータを載せることができます。つまり、iPhoneの画面上で動作する、Amazon Dash Button的なものを作ることができるのです!のです!

実際作ったCloud Functionへのショートカット、タップするだけでGCPがコントロールできて便利

index.js

htmlのコンテンツを生成して返す感じです

const spawnSync = require('child_process').spawnSync;
exports.pycall_instance_controls = function pycall_instance_controls(req, res) {
  result = spawnSync('./pypy3-v5.9.0-linux64/bin/pypy3', ['./instance-control.py'], {
    stdio: 'pipe',
    input: JSON.stringify(req.query)
  });
  if (result.stdout){
    //res.status(200).send(result.stdout);
    res.status(200).send(`<!doctype html>` + result.stdout + `</html>`);
  }else if (result.stderr){
    res.status(200).send(result.stderr);
  }
};

instance-control.py

GCPの承認情報をlocalに通してしまって、pypyにgcloud関連をインストールすると、このような闇魔術が使えます

from oauth2client.client import GoogleCredentials
credentials = GoogleCredentials.get_application_default()

from googleapiclient import discovery
compute = discovery.build('compute', 'v1', credentials=credentials)

imgs = ['"https://ja.gravatar.com/userimage/9847738/e0cfe4c445d28598ffc3d0a4fd235fa5.jpg?size=200"', \
  '"https://ja.gravatar.com/userimage/9847738/647f7900b8912b0669a1da2edc352b5e.jpg?size=200"',
  '"https://ja.gravatar.com/userimage/9847738/44b7b8d6b72f2dce6609be9e059c3920.jpg?size=200"']
html = '''
<head>
<title>GCP Control</title>
<link rel="icon" href={img}/>
<link rel="apple-touch-icon" href={img}/>
</head>
<body>
<p>
{body}
</p>
</body>
'''

project = 'wild-yukikaze'
zone = 'asia-northeast1-c'
def stop_all():
  instances = compute.instances().list(project=project, zone=zone).execute()
  for instance in instances['items']:
    name = instance.get('name')
    compute.instances().stop( project=project, zone=zone, instance=name).execute()
  print(html.format(body='finished all tear down', img=imgs[0]))

def start_all():
  instances = compute.instances().list(project=project, zone=zone).execute()
  for instance in instances['items']:
    name = instance.get('name')
    compute.instances().start( project=project, zone=zone, instance=name).execute()
  print(html.format(body='finished finished all start up', img=imgs[1]))

アイコンは大事  

オタクの皆様には語るまでもないですが、印象とユーザビリティを大きく支配するものなので、かわいいが好きなのでかわいい高解像度のアイコンを設定できるかどうかは、わりと死活問題です

htmlのメタタグにこのようなデータを入れると、高解像度のICONが作れます

<link rel="apple-touch-icon" href={img}/>

また、ICON画像は外部のサイトを参照させることが可能で、gravatar.comさまの公開URLを利用すると便利です

コード

E. まとめ

Cloud Function面白いですね

ディスクに書き込む作業ができないので、temporaryな処理を書き出す際には、Google Cloud Strageなどの外部のサービスと連携する必要がありますが、pipで各種モジュールをインストールできますので、なんでもできます  

しかし、もっとも頭を悩ませたのがリソースの制約で、圧縮して特定の容量を超えると、zipファイルでもデプロイできません。この制約は機械学習を使いたい時、モデルのデータサイズが大きい勾配ブースティングと深層学習はきついので、SVMまであたりでの運用となりそうです  

将来、Google Cloud FunctionやAWS Lambdaのリソースの緩和で大きなデータが扱えるようになってくると、ほとんどのサービスにおいてサーバレスが実現するかもしれません  

機械学習ではじめるDocker

目次とお断り

この資料をまとめるに当たって、実際に開発したり運用したりという経験のスニペットから、できるだけ編集して、自分なりに体系化したものです  

様々な角度のデータが乱雑なっててわかりにくいかもしれませんが、ご了承いただけると幸いです  

  • "1. Dockerとは"
  • "2. Dockerを用いるメリット"
  • "3. docker.ioのインストール"
  • "4. dockerでコンテナの起動"
  • "5. 基本的な操作"
  • "6. Dockerコンテナにsshdなどの必須ソフトウェアをインストールする"
  • "7. dockerコンテナのexportとimport"
  • "8. 機械学習ように調整したコンテナの利用"
  • "9. 実際に使用している例"
  • "10. Docker Hub連係"
  • "11. Docker Compose"
  • "12. Dockerのコンテナとホストマシン間でファイルの共有をする"
  • "13. 便利なチートシート"
  • "14. まとめ"

1. Dockerとは

  • 仮想化コンテナで、VMWareVirtualBoxのようなsystemdやシステムのサービス管理サービスが稼働するような重いものではない
  • chrootみたいなものだが、immutable architectureを前提とするので、docker commitするまで状態は保存されない
  • vmware, virtualboxみたいな完全なOSを乗っけるイメージよりアプリレベルの簡単な仮想化を想定(指定したバイナリしか動かないのでsystemd通常動作させない)

2. Dockerを用いるメリット

開発と分析するいう側面  

  • 依存の多い分析ライブラリと開発環境を簡単にどこでも好きな場所で再現できる
  • XGBoost, LightGBM, LibSVM, RedisやAerospikeのような微妙な環境じゃないと動かなかったりするものを封じ込めて、安定運用版のスナップショットとしていつでも利用できる

運用という側面

  • Dockerで構築したアプリをそのままデプロイ用に加工して、DevOpsに渡すことで、運用に回してもらう
  • DevOpsは開発レイヤーが強いので、なんなら機械学習とエンジニアリング専門の私でも、運用ルールを策定できる

3. docker.ioのインストール

手法1. dockerのrepositoryからのインストール

$ sudo apt install docker.io
$ sudo usermod -aG docker $USER
LOGOUTしてLOGINか、再起動

手法2. docker公式が提供する公式のインストールスクリプトからインストール

$ curl -fsSL get.docker.com -o get-docker.sh
$ sh get-docker.sh
$ sudo usermod -aG docker $USER
LOGOUTして、LOGINか再起動

4. dockerでコンテナの起動

最小サイズのubuntuをインストール

$ docker pull ubuntu
$ docker run -it ubuntu bash

5. 基本的な操作

dockerにはなぜか名前がつかないことがあって、docker tagで名前をつける

$ docker tag ${ID} ${NAME}

run(内容が保存されない)

$ docker run -it ${NAME} /bin/bash

すでに起動済みのDockerにCONTAINER IDでログイン  

$ docker exec -it ${CONTAINER_ID} /bin/bash

恒久化(Commit)  

作業した内容は保存されないないので、commitすることで、状態が差分として保存される

$ docker commit ${CONTAINER_ID} ${NAME}/${TAG}

Dockerでportをホストのportとバインドして起動

docker内でportを起動して、ホストマシンにつなげてサービスを公開するときなど、利用できる

$ docker run -it -p ${PORT}:${PORT} bash

6. Dockerコンテナにsshdなどの必須ソフトウェアをインストールする

DockerコンテナにSSHでアクセスする

sshdなどはdockerの性質上、systemdなどで管理なので、dockerにbashなどでログインして、手動やdocker-composeなどで起動する必要があります

sshdのインストール(Ubuntu, Debian)

$ apt install openssh-server

sshdのインストール(ArchLinux)

$ sudo pacman -S openssh

sshdサーバの起動

$ /usr/bin/sshd

sshへのアクセス
IPアドレスなどをnet-toolsのifconfigなどで確認して、ホストマシンからclientのマシンに接続する

$ ssh ${USER}@172.17.0.2

.bashrcのロード
通常の環境変数のローディングと異なるので、.bashrcから読み取る  

$ source .bashrc

これをしないとlocaleがja_JP.UTF-8にならずにtmuxなどが使えないっぽい

7. dockerコンテナのexportとimport

諸々をインストールして、安定して運用できるようになったらば、tar形式にexportできます

export
tarファイルで出力することができます

$ docker export ${PID} > alice-bob.tar

import

$ docker import ./hogehoge.tar

インポートした状態では、最初は

REPOSITORY:<none> TAG:<none>

となるので、tagコマンドで適切に名前をつける

$ docker tag ${NAME}/${TAG} ${NEWTAGNAME}

8. 機械学習ように調整したコンテナの利用

ArchLinux(Official)

$ docker pull archlinux/base

Debian(Official)

$ docker pull debian

私のDocker Hubのレポジトリ

機械学習専用のコンテナを作りました!

sshd, python3, pip3, mecab, neologd, lightgbm, xgboost, neovim, tmux, git, JVM, Kotlin, leveldb, rocksdbなどをインストールしてすぐ使えるようにしたイメージです。docker hubで公開しているので、docker pullでダウンロードしてすぐ利用できます  

すぐに機械学習できますね!!

$ docker pull nardtree/oppai:latest
$ docker run -it nardtree/oppai bash
(sshd serverモードではこうして、表示されたipaddressにssh oppao@${IP_ADDR}とします)
$ docker run -p 1022:22 -it nardtree/oppai server.py

server.pyでは、ifconfigとsshdが非同期で実行されて、このDockerのsshdにアクセスできるようになります

**ここからコンテナ内部です**
# sudo su oppai #passwordはoppaiです
$ cd ~
$ source .bashrc

$ python3
Python 3.6.3 (default, Oct 24 2017, 14:48:20) 
[GCC 7.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> 

$ sudo pip3 install mecab-python3
Requirement already satisfied: mecab-python3 in /usr/lib/python3.6/site-packages

$ echo "艦これ" | mecab
艦これ  名詞,固有名詞,一般,*,*,*,艦これ,カンコレ,カンコレ
EOS

$ which lightgbm
/usr/local/bin/lightgbm

$ which xgboost
/usr/bin/xgboost

$ which nvim
/usr/bin/nvim

$ which javac
/usr/bin/javac

$ which kotlin
/usr/bin/kotlin

9. 実際に使用している例

開発、分析環境として利用する例

本番に投入するサービスとして利用する例

分析機にはUbuntu, Debian or ArchLinuxを使うことが多く、本番環境ではRHELCentOSを使う運用が多い構成なのですが、バイナリや微妙なディストリビューション間の差を人間が吸収するのはやめて、Dockerの出力を利用してどこでも再現してもらえば、問題ないよね、好きな構成でアプリとして機能すればDevOpsでよろしく組み合わせるよ、というスパースな設計ができます

このように一つの機能を持ったアプリとして動作させるには、docker composeやkubernetesを用いてDockerの断片をつなぎ合わせて、巨大なシステムを編纂します。   

ローカルでserverの機能をもつアプリをDockerでラップした一つのアプリの粒度として動作させるには、このようにします。

$ docker run -p ${PORT1}:${PORT2} -it ${APPLICATION_NAME}

10. Docker Hub連係

DockerというgithubのようなコンテナのイメージのSNSみたいなのがあります
nardtree/archのようなtag名をつけるとスラッシュの先のユーザ名のレポジトリにpushできて再現性が確保で来ます

$ docker tag ${IMAGE_ID} ${USER_NAME}/${SOME_NAME}
$ docker push ${USER_NAME}/${SOME_NAME}

11. Docker Compose

Dockerの一連の複雑なオペレーションを自動化した状態で、取り扱うのが、Docker Composeです

$ git clone https://github.com/GINK03/docker-compose-templates
$ cd docker-compose-templates/minimal-machine-learning-oppai
$ docker-compose up

これをやると、一連のめんどくさいセットアップコマンド各種をすっ飛ばして、port 1022でlocalhostsshdできるArchLinuxが仕上がります

$ ssh -p 1022:22 oppai@localhost
# password:oppai

12. Dockerのコンテナとホストマシン間でファイルの共有をする

Dockerコンテナでは状態を持たないイミュータブルアーキテクチャになっており、あまり状態を持つことを期待されて居ません   しかし、RDBやKVSなどを利用する際は、当然ながら、ファイルに書き出したりして状態を保存しなければいけません

DockerfileではVOLUMEオプションをつけることで、マウントするディレクトリを指定し、作ることができます

FROM nardtree/oppai                              
EXPOSE 22                                        
EXPOSE 1022                                      
EXPOSE 4567                                      
VOLUME ["/data"]                                 
CMD ["server.py"]  

Docker-compose.ymlでホストの$HOME/dataとコンテナの/dataを結び、片方で書き込まれたらもう片方にも反映されるようになります

version: '3'                                     
services:                                        
  ml-app:                                        
    build: .                                     
    ports:                                       
     - "4567:4567"                               
     - "1022:22"                                 
    volumes:                                     
      - ~/data:/data  

13. 便利なチートシート

noneの一括削除

$ docker images | awk '/<none/{print $3}' | xargs docker rmi

肥大化するvarを別のポイントに移動
ホストマシンのデフォルトのdockerのコンテナの保存を別の場所にする
SSDでパフォーマンスが大きく変わるサービスである、KVSなどを利用する際は、こっちの方がいいよ
  方法1. /etc/default/dockerを編集します

DOCKER_OPTS="-g $HOME/sda/docker/data"

方法2. /var/lib/dockerをssdやnvmeからリンクする

$ sudo mv /var/lib/docker ${nvme_dir}
$ sudo ln -s ${nvme_dir} /var/lib/docker

docker containerのIPや詳細を調べる

$ docker inspect ${PID}

docker composeで一括アプリの実行

$ docker-compose run ml-app

14. まとめ

私の所属しているいくつかの組織では、インフラエンジニアが十分に確保できなく、慣れないインフラオペレーションにイライラしていたのですが、時代はDevOpsということでソフトウェアエンジニアや私のような機械学習エンジニアがインフラの管理をしなくてはいけないことが、まま発生するようになりました

最近はもう諦めて、Dockerや最近のDocker-Compose、より上位のKubernetesなどを勉強し始めて、依存や環境を丸めて機能のみを取り出すDockerの粒度が一つのアプリの粒度として優れていると、思うようになり、運用と開発の面で導入するようになりました。実際便利であり、オンプレでサーバを持たなくていいので楽です