にほんごのれんしゅう

日本語として伝えるための訓練を兼ねたテクログ

BigQueryでUDFとwindow関数を使う 

bigqueryでUDFとwindow関数を使う 

転職してからMapReduceそのもののサービスや改良したサービスであるCloud DataFlowなどのサービスより、初手BigQueryが用いられることが増えてきました。分析環境でのプラットフォームを何にするかの文化の違いでしょう。 

BigQueryの優れた面がLegacy SQLを使っていたときは、なにもないのでは、と考えていたこともあったのですが、Standard SQLならばWindow関数を利用し、さらに非構造化データに対してもUser Define Functionをアドホックに用いることで、かなり良いところまで行けるということがわかりました。

window関数の例と、User Define Functionとの組み合わを記します。

bigqueryへのpandasからのアップロード

pandasでcsv等を読み取って、pandas-gbqを使うと、pandasの型情報のまま転送することができるので、この方法は体得しておくと便利です。

pandas-gbqのインストール

(AnacondaのPythonがインストールされているという前提で勧めます)

$ conda install pandas-gbq --channel conda-forge

サンプルデータセットとして、Kaggle Open Datasetのdata-science-for-goodという、ニューヨーク州の学校の情報のデータセットを利用します。

デーブルデータはこの様になっています。全部は写っていなく、一部になります。

import pandas as pd
pd.set_option("display.max_columns", 120)
df = pd.read_csv('./2016 School Explorer.csv')

# BigQueryはカラム名がアンダーバーと半角英数字以外認めないので、その他を消します
def replacer(c):
    for r in [' ', '?', '(', ')','/','%', '-']:
        c = c.replace(r, '')
    return c
df.columns = [replacer(c) for c in df.columns]

# BigQueryへアップロード
df.to_gbq('test.test2', 'gcp-project')
図1. GCPのBigQueryにテーブルが表示されることを確認

Window関数

SQLは2011年から2014年まで某データウェアハウスのレガシーSQLを使っていた関係で、マジ、MapReduceより何もできなくてダメみたいなことをしばらく思っていたのですが、Standart SQLを一通り触って強い(確信)といたりました。

具体的には、様々な操作を行うときに、ビューや一時テーブルを作りまくる必要があったのですが、window関数を用いると、そのようなものが必要なくなってきます。

Syntaxはこのようなになり、data-science-for-goodのデータセットを街粒度で分割し、白人率でソートして、ランキングするとこのようなクエリになります。

RANK() OVER(partition by city order by PercentWhite desc) 

より一般化すると、このようなもになります。

図2.

これは、pandasで書くとこのような意味です。

def ranker(df):
    df = df.sort_values('PercentWhite', ascending=False)
    df['rank'] = np.arange(len(df)) + 1
    return df
df.groupby(by=['City']).apply(ranker)[['City', 'PercentWhite','rank']].head(200)

BigQueryのwindow関数もpandasのgroupby.applyも似たようなフローになっています。

図3. 処理フロー

処理フローとしてはこの様になっています。BigQueryはPandasに比べて圧倒的に早いらしいので、ビッグデータになるにつれて、優位性が活かせそうです。

なお、window関数は他にもさまざまな機能があり、GCPの公式ドキュメントが最も整理されており、便利です。

toy problem: ニューヨーク州の街毎の白人率の大きさランキング

select
  SchoolName
  , RANK() over(partition by city order by PercentWhite desc)
  , city
  , PercentWhite
 from
  test.test
 ;

出力

図4. window+rank関数によるランキング

Standerd SQLでUDF(UserDefinedFunction)を定義する

前項ではBigQueryに組み込み関数のRANK関数を用いましたが、これを含め、自身で関数をJavaScriptで定義可能です。

JavaScriptで記述するという制約さえ除けば、かなり万能に近い書き方も可能になりますので、こんな不思議なことを計算することもできます。(おそらく、もっと効率の良い方法があると思いますが)

window関数で特定の値のノーマライズを行う

白人のパーセンテージをその街で最大にしめる大きさを1としてノーマライズします。

UDFはCREATE TEMPORARY FUNCTIONで入出力の値と型決めて、このように書きます

CREATE TEMPORARY FUNCTION norm(xs ARRAY<STRING>, rank INT64)
RETURNS FLOAT64
LANGUAGE js AS """
  const xs2 = xs.map( x => x.replace("%", "") ).map( x => parseFloat(x) )
  const max = Math.max.apply(null, xs2)
  const xs3 = xs2.map( x => x/max ).map( x => x.toString() )
  return xs3[rank-1];
  """;
select 
  SchoolName
  ,norm( 
    ARRAY_AGG(PercentWhite) over(partition by city order by PercentWhite desc) ,
    Rank() over(partition by city order by PercentWhite desc) 
  )
  ,city
  , PercentWhite
 from
  test.test
 ;

計算結果をみると、正しく、計算できていることがわかります。

図5. UDFによる任意の計算が可能

lag関数を使わずに前のrowの値との差を計算する

学校の街ごとの収入に、自分よりも前のrowとの収入の差を求める。

lag関数でも簡単に求めることができますが、JSの力とrow_number関数を使うことでこのようにして、rowベースの操作すらもできます。

#standardSQL
CREATE TEMPORARY FUNCTION prev(xs ARRAY<STRING>, index INT64)
RETURNS FLOAT64
LANGUAGE js AS """
  const xs1 = xs.map( function(x) {
    if( x == null ) 
      return "0"; 
    else 
      return x;
  });
  const xs2 = xs1.map( x => x.replace(",", "") ).map( x => x.replace("$", "") ).map( x => parseFloat(x) );
  const ret = xs2[index-1-1] - xs2[index-1];
  if( ret == null || isNaN(ret)) 
    return 0.0;
  else
    return ret
  """;
select 
  SchoolName
  ,prev( 
    ARRAY_AGG(SchoolIncomeEstimate) over(partition by city order by SchoolIncomeEstimate desc) ,
    row_number() over(partition by city order by SchoolIncomeEstimate desc) 
  )
  ,city
  ,SchoolIncomeEstimate
from
  test.test;
図6. 前のrowとの差を計算する

このように列だけでな行方向にも拡張された操作ができ、万能とはこういう事を言うんでしょうか

なかなかレガシーSQLでは難しかった操作ができる

window関数を用いることで、アグリゲートをする際、groupbyしてからビューを作りjoinをするというプロセスから解放されました。

MapReduceを扱う際のモチベーションが、膨大なデータをHash関数で写像空間にエンベッティングして、シャーディングするという基本的な仕組みを理解していたので、どのようなケースにも応用しやすく、使っていました。

図7. BigQuery(Dremel)とMapReduceの比較

MapReduceに比べて、BigQueryはcomplex data processing(プログラミング等でアドホックな処理など)を行うことができないとされていますが、User Deine Functionを用いればJavaScriptでの表現に限定されますが行うことができます。

outer source

codes

SQLの実行の仕方はコマンドでやるとき、こうするとめっちゃ便利です

$ bq query "$(cat bq-window-lag.sql)"

K8Sで機械学習の予想システムを作成 

K8Sで機械学習の予想システムを作成  

目次
- 機械学習の最小粒度としてのDocker, Dockerのデプロイ先としてのk8s - テキストを評価するAPIのDockerコンテナの作り方 - DockerコンテナのGoogle Cloud Container Registryへの登録 - K8Sへのデプロイ - 実際にアクセスする - まとめ

機械学習の最小粒度としてのDocker, Dockerのデプロイ先としてのk8s

コンテナのオーケストレーションツールがk8sが他のツールを押しのけて、優位にたった状況からしばらく経過し、ドキュメントやユースケースが揃ってきました。

GCPではコンテナを使ったデプロイメントサービスはKubernetes Engineがデフォルトであり、WebUIやCUIでの操作例を示したドキュメントも充実してきました。
k8sは、ローリングリリースが簡単にできたり、分析者からDocker Fileやコンテナが適切に受け渡しが開発者に行われれば、デプロイまでの時間的労力的消耗を最小化できたりします。

また、Micro Serviceのデザインパターンとして、Dockerが一つの管理粒度になり、そこだけで閉じてしまえば、自分の責任範囲を明確にし、役割が明確になり、「分析 -> モデルの評価&作成 -> IFの定義 -> コード作成 -> Dockerに固める」というプロセスに落とすことができ、進捗も良くなります。

今回はjson形式で日本語の自然言語を受け取り、映画のレビューの星がいくつなのがを予想するトイプロブレムをk8sに実際にデプロイして使ってみるまでを説明します。

今回のk8sのデザインはこのようなスタイルになります。

テキストを評価するAPIのDockerコンテナの作り方

トイプロブレムの予想モデルの要件

  • 任意のテキストをhttp経由でjsonを受け取る
  • テキストを分かち書きし、ベクトル化する
  • ベクトル化した情報に基づき、テキストが映画レビューならば、レビューの星何個に該当するか予想する
  • 予想した星の数をhttp経由でjsonで返却する
  • 以上の挙動をする仕組みをDockerコンテナとして提供する

HTTPサーバは私の以前のJSONでサーバクライアント間のやりとりのプロジェクトを参照しています。

予想システムは映画.comさまのコーパスを利用して、LightGBMでテキストコーパスから星の数の予想を行います。学習と評価に使ったスクリプトとコーパスはこちらになります。

Dockerコンテナに集約する

 以前作成した何でもごった煮Dockerがコンテナがあり、それを元に編集して作成しました。

 本来ならば、Docker Fileを厳密に定義して、Docker Fileからgithubからpullして、システムの/usr/binに任意のスクリプトを配置する記述をする必要があります。

 それとは別に、アドホックなオペレーションをある程度許容する方法も可能ではあり、Dockerの中に入ってしまって、様々な環境を構築して、commitしてしまうのもありかと思っています(というか楽ですので、それで対応しました)

 ベストプラクティスは様々な企業文化があるので、それに従うといいでしょうが、雑な方法についてはこちらで説明しているので、参考にしていただければ幸いです。

 作成したDockerコンテナはこちら

 動作はこのようにローカルでも行えます。

$ docker pull nardtree/lightgbm-clf
$ docker run -it nardtree/lightgbm-clf 40-predict.py

挙動のチェック

ポジティブな文を投入してみる

$ curl -v -H "Accept: application/json" -H "Content-type: application/json" -X POST -d '{"texts":"すごい!最高に興奮した!楽しい"}'  http://localhost:4567/api/1/users
{"score": 4.77975661771051}

(星5が最高なので、ほぼ最高と正しく予想できている)

ネガティブな文を投入してみる

$ curl -v -H "Accept: application/json" -H "Content-type: application/json" -X POST -d '{"texts":"この映画は全くだめ、楽しくない。駄作"}' http://localhost:4567/api/1/users
{"score": 1.2809874000768104}

(星1が最低と、正しく予想できている)

DockerコンテナのGoogle Cloud Container Registryへの登録

Cloud Container Registryへの登録は、タグが、asia.gcr.io/${YOUR_PROJECT_NAME}/${CONTAINER_NAME}となっている必要があるので、 このようにコミットして、実行中のコンテナに対して、別のタグを付けます。

$ docker commit 44f751eb4c19
sha256:5a60e4460a156f4ca2465f4eb71983fbd040a084116884bcb40e88e3537cdc38
$ docker images
REPOSITORY                                         TAG                 IMAGE ID            CREATED             SIZE
<none>                                             <none>              5a60e4460a15        2 minutes ago       8.39GB
...
$ docker tag 5a60e4460a15 asia.gcr.io/${YOUR_PROJECT_NAME}/${CONTAINER_NAME}

gcrへコンテナのアップロード

$ gcloud docker -- push asia.gcr.io/${YOUR_PROJECT_NAME}/${CONTAINER_NAME}:latest

今回は、CONTAINER_NAMEはlightgbm-clfとしました

docker hubに置いてあるので参考にしてください

K8Sへのデプロイ

K8Sへのデプロイは、コマンドだと、デプロイ時の進捗の情報が充分に見れないのでWebUIで行う例を示します。

GCPのKubernetes Engineにアクセスし、クラスタを作成します。

Hello World程度であれば少ないリソースでいいのですが、少し余裕を持って多めのリソースを投下します。

クラスタの作成にはしばらくかかるので、しばらく待ちます。

コンテナレジストリに登録したご自身のDockerコンテナを指定し、このコンテナのサービスの実行に必要な引数を入力します。

機械学習のモデルと各種依存ライブラリを含んだDockerコンテナはサイズが大きいので、ダウンロードが完了しデプロイが終わるまでしばらく待ちます(10分程度)

外部に公開するために、IPの割当とPortのマッピングを行います。

このとき、サービスタイプはロードバランサーを選択します。

外部IPが得られるので、次の項で、実際にアクセスしてみましょう。

実際にアクセスする

 今回はマイクロサービスのデザインパターンにのっとり、jsonでデータをやり取りし、任意のテキスト情報から、そのテキストの映画のレビューとしての星の数を予想します。
stress-testing.pyで1000件の自然言語コーパスに対して、負荷テストを行っています。
K8Sの特性としてか、SLAを大幅に超過したときに、httpサーバが応答しなくなってしまうので、これは実運用の際にはよく考えたほうが良さそうです。

GCP K8Sで予想する

$ DOCKER=35.189.146.153 python3 stress-testing.py 
...
{"score": 4.059278052177565} 特殊な映画 クリストファー・ノーランらしさ全開だと感じました。この緊迫感、絶望感、暗さ。ダークナイトを思い出します。昼のシーンが多く画面や映像が暗い訳ではないのですが、な
んとなく雰囲気が暗い。でもこの暗さがいい味を出してます。分かりやすい娯楽映画ばかり観ている人には理解しにくいかも。
elapsed time 18.113281965255737

ローカルのDOCKERで予想する

$ DOCKER=localhost python3 stress-testing.py 
...
{"score": 4.059278052177565} 特殊な映画 クリストファー・ノーランらしさ全開だと感じました。この緊迫感、絶望感、暗さ。ダークナイトを思い出します。昼のシーンが多く画面や映像が暗い訳ではないのですが、な
んとなく雰囲気が暗い。でもこの暗さがいい味を出してます。分かりやすい娯楽映画ばかり観ている人には理解しにくいかも。
elapsed time 5.5899786949157715

何もチューニングしない状態では、ローカルのほうが早いですね(それはそう)

まとめ

Dockerで簡潔にかつ素早くサービスを提供する仕組みを提供する仕組みとしてとてもよさそうです。

小さい案件を一瞬で終わらせるデザインパターンとして、有益なように思います。

kubeflowではなくてk8sをやった理由

フレームワークを利用しないことによる、圧倒的に高い自由度と、ベースとなるDockerコンテナをそれなりにちゃんと整えていたので、kubeflowのワークフローに乗せるメリットは今回の設定では少なかったです。そのため、生のk8sを利用しました。

参照したドキュメント

実際に使用したコードはこちら

実践的な分散処理を利用して処理を高速化

実践的な分散処理を利用して処理を高速化

GCPAWSで膨大な計算を行う際に、オーバーヘッドを見極めて、大量のインスタンスを利用し、半自動化して、より効率的に運用するテクニックです。

Kaggle Google Landmark Recognition + Retrievalで必要となったテク

Kaggleでチームを組んで皆さんのノウハウと勢いを学ぶべく、KaggleのGoogle Landmark RecognitionRetrievalコンペティションにそれぞれチームで、参加しました。

メンツは、キャッシュさん、yu4uさん、私という激強のお二人に私が計算リソースの最適化で参加しました。画像のことはディープ以降の知識レベルであったので、大変勉強になったコンペです。結果は銀メダル2個です。  

「ディープの特徴量」 + 「局所特徴量」の両方を取り出し、マッチングを計算するという問題で、これが大量の画像に対して適応しようとすると、とても重いものでした。

これを様々な計算リソースを投入し、並列で計算した方法がかなり極まっていたのと、これは知らないと難しいかも、、、と思い、よい機会なのでまとめました。

大量のクラウド一時的なインスタンスをかりて、SSHFSというファイルシステムで一つのマシンをマウントし、インスタンスに処理の命令を送りオーバーヘッドを見極めて、アルゴリズム的改善を行い、改善した処理プロセスを行う、というPDCAのような流れをおこうなのですが、各要素について説明したいと思います。

目次

  1. ファイルシステム
  2. GCP Preemtipbleインスタンスを用いた効率的なスケールアウト
  3. MacBookからGCPインスタンスに命令を送る
  4. Overheadを見極める
  5. Forkコスト最小化とメモ化
  6. まとめ

1. ファイルシステム:SSHFSがファイルの破損が少なくて便利

sshfsはssh経由で、ファイルシステムをマウントする仕組みですが、安定性が、他のリモート経由のファイルシステムに対して高く、一つのハードディスクに対して、sshfs経由で多くのマシンからマウントしても、問題が比較的軽微です。
また、ホストから簡単に進捗状況をチェックすこともできます。

この構造のメリットは、横展開するマシンの台数に応じて早くできることと、コードを追加で編集することなく、分散処理できます。

図1.

処理粒度を決定して、処理したデータはなにかキーとなる値か、なければ処理したデータのhash値でファイルが処理済みかどうかを判断することで、効率的に分散処理することtができます。   

sshfs上の処理粒度に対してすでに、処理済みであれば、処理をスキップします。

(sshfs上で行ったものは二度目はファイルが共有され、二度目は、処理されないので、効率的に処理できます)

from pathlib import Path
import random
from concurrent.futures  import ProcessPoolExecutor as PPE
def deal(path):
  target = Path( f'target_dir/' + str(path).split('/').pop() )
  if target.exists():
    # do nothing
    return  
  # do some heavy process
  target.open('w').write( 'some_heavy_output' )

paths = [path for path in Path('source_dir/').glob('*')]
random.shuffle(paths) # shuffle
with PPE(max_workers=64) as exe:
  exe.map(deal, paths)

2. GCP Preemptible Instance(AWSのSpot Instance)を用いた効率的なスケールアウト

計算ノードは、非同期で運用できるので、途中で唐突にシャットダウンされても問題がないです。そのため、安いけどクラウド運営側の都合でシャットダウンされてしまう可能性があるが、1/10~1/5の値段程度に収まるGCP PreemptibleインスタンスAWS Spotインスタンスを用いることができます。

Preemptibleインスタンスはgcloudコマンドで一括で作成できますが、このようにPythonなどのスクリプトでラップしておくとまとめて作成できて便利です。  

Preemptible インスタンスをまとめて作成

import os

type = 'n1-highcpu-64'
image = 'nardtree-jupyter-1'

for i in range(0, 3):
  name = f'adhoc-preemptible-{i:03d}'
  ctx = f'gcloud compute instances create {name} --machine-type {type} --image {image} --preemptible'
  os.system(ctx)

このスクリプトは、自分で作成した必要なライブラリがインストールされた状態のイメージ(nardtree-jupyter-1)からハイパフォーマンスのインスタンスを3台作成します。

3. MacBookからGCPインスタンスに命令を送る

google cloud toolをインストールし設定することで、GCPインスタンスに対して命令(コマンド)を送ることができます

Premptible インスタンスに必要なソフトをインストールして、sshfs経由でマウント
クライアントマシン(手元のMacBookなど)から、コマンドを実行させることができます。   

このオペレーションのなかに、手元のマシンから任意の前処理,学習のスクリプトを実行することもできます。

(GCP_NAME, GCP_KEY_NAMEは手元のパソコンの環境変数に設定しておくとよいです)

import os
GCP_NAME = os.environ['GCP_NAME']
GCP_KEY_NAME = os.environ['GCP_KEY_NAME']
names = [f'adhoc-preemptible-{i:03d}' for i in range(0, 3)]
# commandsのところに任意の処理を書くことができます 
commands = [  'sudo apt update', \
              'sudo apt-get install sshfs', \
              'mkdir machine', \
              f'sshfs 35.200.39.32:/home/{GCP_NAME} /home/{GCP_NAME}/machine -o IdentityFile=/home/{GCP_NAME}/.ssh/{GCP_KEY_NAME} -o StrictHostKeyChecking=no' ]

for name in names:
  for command in commands:
    base = f'gcloud compute ssh {GCP_NAME}@{name} --command "{command}"'
    os.system(base)

4. Overheadを見極める

高速化の余地があるプログラムの最適化はどうされていますでしょうか

私は、以下のプロセスで全体のプロセスを最適化しています。   

図2. 

図2の例では、CPUは空いており、コードとこの観測結果から、DISKのアクセスが間に合ってないと分かります。オンメモリで読み込むことや、よりアクセスの速いDISKを利用することが検討されます。

5. Forkコスト最小化とメモ化

マルチプロセッシングによるリソースの最大利用は便利な方法ですが、spawn, fork, forkserverの方法が提供されています。
使っていて最もコスト安なのはforkなのですが、これでうまく動作しないことが稀にあって、spawnやforkserverに切り替えて用いることがあります。spawnが一番重いです。   

 forkでは親プロセスのメモリ内容をコピーしてしまうので、大きなデータを並列処理しようとすると、丸ごとコピーコストがかかり、小さい処理を行うためだけにメモリがいくら高速と言えど、細かく行いすぎるのは、かなりのコストになるので、バッチ的に処理する内容をある程度固めて行うべきです。   cent

例えば、次のランダムな値を100万回、二乗するのをマルチプロセスで行うと、おおよそ、30秒かかります。

from concurrent.futures import ProcessPoolExecutor as PPE
import time
import random
args = [random.randint(0, 1_000) for i in range(100_000)]

def normal(i):
  ans = i*i
  return ans

start = time.time()
with PPE(max_workers=16) as exe:
  exe.map(normal, args)

print(f'elapsed time {time.time() - start}')
$ python3 batch.py 
elapsed time 32.37867593765259

では、単純にある程度、データをチャンクしてマルチプロセスにします。

すると、0.04秒程度になり、ほぼ一瞬で処理が完了します

これはおおよそ、もとの速度の800倍です

from concurrent.futures import ProcessPoolExecutor as PPE
import time
import random
data = [random.randint(0, 1_000) for i in range(100_000)]

tmp = {}
for index, rint in enumerate(data):
  key = index%16
  if tmp.get(key) is None:
    tmp[key] = []
  tmp[key].append( rint )

args = [ rints for key, rints in tmp.items() ] 
def batch(rints):
  return [rint*rint for rint in rints ]

start = time.time()
with PPE(max_workers=16) as exe:
  exe.map(batch, args)

print(f'elapsed time {time.time() - start}')
$ python3 batch2.py 
elapsed time 0.035398244857788086

メモ化、キャッシュを利用する
同じ内容が出現し、結果を保存できる場合、計算の多くを共通で占める箇所を、特定のキーで保存しておいて、再利用することで、高速に処理することができます。

特定の入力の値を10乗して、返すというあまりない問題ですが、わかりやすいので、これで示すと、チャンクして処理するものが、このコードになり、4秒程度かかります。

import time
import random
import functools
data = [random.randint(0, 1_000) for i in range(10_000_000)]
tmp = {}
for index, rint in enumerate(data):
  key = index%16
  if tmp.get(key) is None:
    tmp[key] = []
  tmp[key].append( rint )
args = [ rints for key, rints in tmp.items() ] 
def batch(rints):
  return [functools.reduce(lambda y,x:y*x, [rint for i in range(10)]) for rint in rints ]

start = time.time()
with PPE(max_workers=16) as exe:
  exe.map(batch, args)
print(f'elapsed time {time.time() - start}')
$ python3 batch2.py 
elapsed time 3.9257876873016357

これをメモ化して、ある程度の最適化を入れると、倍以上の速度になります

from concurrent.futures import ProcessPoolExecutor as PPE
import time
import random
import itertools
data = [random.randint(0, 1_000) for i in range(10_000_000)]
tmp = {}
for index, rint in enumerate(data):
  key = index%16
  if tmp.get(key) is None:
    tmp[key] = []
  tmp[key].append( rint )

args = [ rints for key, rints in tmp.items() ] 
def batch(rints):
  mem = {}
  result = []
  for rint in rints:
    if mem.get(rint) is None:
      mem[rint] = itertools.reduce(lambda y,x:y*x, [rint for i in range(10)] )
    result.append( mem[rint] ) 
  return result
start = time.time()
with PPE(max_workers=16) as exe:
  exe.map(batch, args)
print(f'elapsed time {time.time() - start}')
$ python3 batch3.py
elapsed time 1.4659481048583984

5.5 コード

6. まとめ

並列処理はMapReduceという分析スタイル以外のものもたくさん存在し、微妙な勘所の最適化がないとそもそも目的に対して間に合わないということも十分にありえます。

sshfsを共通のファイルシステムとして持ち、Preemptibleインスタンスをたくさん用意して、 gcloudで任意のコマンドを送信することで、一つの問題を膨大な計算リソースで処理することができるようになります。

計算量での押し切りは、最後に粘りがちになるえる技能でもあるので、そこそこ重要なのだと思います。

Kaggleを取り掛かるまでにやったこととと、モチベーションの維持のために必要だったこと

Kaggleを取り掛かるまでにやったこととと、モチベーションの維持のために必要だったこと

わたしの経験した、最初のKaggleの一歩と、実際にKaggleに対するモチベーションがそれなりに加熱するまでにやったことと、息切れしない心の持ち方です。

KaggleがDataScienceに携わるものの価値の可視化の基軸の一つになっていますが、まだ取り掛かれない or 心が折れそう人のために、私に必要だったきっかけと、私が行ったモチベーションコントロールを含めて記します。  

まだまだkaggleは弱いですが、継続的に、日々の生活の中に組み入れるまでが大変でした。

目次

  1. 既存の機械学習関連の技術者にとってのKaggleの認識のあり方

  2. すでに機械学習アルゴリズムを知っているがやるべきか

  3. 競技プログラミングは業務コーディングで役に立たないロジックが、Kaggleの業務のデータ分析との関係にも成り立つか

  4. 挑み方(ブートアップ)

  5. 挑み方(Kernelにキャッチアップし続ける)

  6. 挑み方(ツール類)

  7. 挑み方(データ構造)

  8. 挑み方(魂のあり様)

  9. 人間性を捧げよ

1. 既存の機械学習関連の技術者にとってのKaggleの認識のあり方

 私が面接する側に立ったこともそれなりにあるし、面接される立場になったこともそれなりにありますが、DSに関わる人のスキルや技能は面接で完全に納得行くまで把握し切ることが、かなり困難だと感じていて、一つの客観的な指標にKaggleが強いことなどのアウトプットが重要視されることがあります。  

今までそれなりにやってきたという技術者や企業の意思決定をデータ的な側面で支えてきたデータサイエンティストたちも、過去の実績の資産だけでなく、現実に何ができるかということを、積極的に可視化することが求められている流れをひしひしと感じています。

忙しいから & 自信がないからやらないという言い訳がもはや通用しない所まで来ています。

2. すでに機械学習アルゴリズム(本人の主観で十全に)を知っているがやるべきか

 やるべきです。理屈や概要を知っているだけでは、使いこなせないということを嫌というほど体感できるかと思います。

 また、競技に向いたデータ分析手段もあり、スピード的、精度的に、高効率なものが多く、短期集中でアウトプットを出す良い訓練にもなるでしょう。

 「自分は業績を示す論文があるから」や、「マイペースでやりたい」という人も多いのは承知なのですが、Kaggle的な方法もDSの一側面です。

言い訳している暇があればやったほうがいいように思われます。  

3. 競技プログラミングは業務コーディングで役に立たないロジックが、Kaggleの業務のデータ分析との関係にも成り立つか

 これも度々、ツイッターやいろんなメディアで話題になる類のお話ですが、競技プログラミングのデータ構造はデータ分析に使えますし、データ分析の構造を体系的に理解させてくれるコンテンツも見たことがないので、私自身は競技プログラミング、コードパズルは一定の意味があったと考えています。  

 しかし、SIでの開発ではこれらの技能よりわかりやすいオブジェクトを作るとか、ライブラリを綺麗に使うなどの側面で凝ったアルゴリズム的な側面でなく別の技能の側面が求めらます。チームでウォーターフォール開発の中で一人だけ生産性マックスであっても浮いてしまうでしょう。

 Kaggleの側面にもそれと似たような問題はあると思います。組織のスタイルによって受け入れられるとか、そうでもないとか。

 分析イディオムみたいなものはたくさん身につくので、イディオムで勝負できる案件に昇華できるかが、一つキーポイントな気がします。  

4. 挑み方(ブートアップ)

まず、最初に取り掛かるのに、一人でやろうとすると、荷が重いので、だれかを巻き込みましょう。

だれかがやってるとモチベーションがし易いと感じるし、細かいインターフェースの英語で詰まったりしても、聞けば解決するかも知れないというのは案外馬鹿にできない要素であったりします。

Kaggleの課題とは本来なんの関係もないのですが、余計なところにエネルギーをかけないで済むとなると、人間、かなり進捗します。  

ネットの世界ではSNSという便利ツールがあります。このツールを利用して、情報を広く集めるのも有力な手段です。

もし御社で、誰かがKaggleをやりたいと騒ぎ出したら、白い目で見るとか、馬鹿にするとかしないで、興味がなくても見守ってあげましょう。

物事を始めるのに必要なエネルギーがそれなりに必要です。彼ら彼女らなりに工夫して、初速を出そうとしているのだと思います。

5. 挑み方(Kernelにキャッチアップし続ける)

KernelというKaggle上で動作するJupyter Notebookがあるのですが、コンペティションが開始されると、いろんな人がKernelを投稿し、精度の良い例を示します。

競技としてのその性格により、コンペ終了間際で、投稿数が増えていき、かつ、精度自体も上がっていきます。

 通常のKernelの進歩を考えない、初期だけ参照する例だとこのようにすればある程度の進捗を得ることができます。  

図1. Kernelの進歩を考えない進捗モデル

最初の一月ぐらいは私は以下のような戦い方にしてしまって、過剰に消耗してしまいました。  

図2. 赤で囲まれたイタレーションを回しているうちに、コンペが終了してしまう

効率の悪い局所ループにハマった状態なので、なんとか、脱出しないといけません。  

強化学習の知恵ですが、たまに、乱択を入れることで、ダイバーシティを維持することがあります。そんなこんなで、意識的にこのような経路に変更しました。   (α, βはコンペによって設定される 0 ~ 1の値)

図3. 例えば、外部のリソース参照の経路を確率的につくる

  

適切に外部の知恵を取り入れることで、進捗が出しやすく、煮詰まるということがなくなるのですが、今までの自分のリズムで進捗していくというスタイルを変更する必要があるので、ここでコストがかかります。

6. 挑み方(ツール類)

PythonかRのどちらを使うかまずは決めましょう。  

Pythonでは、データ操作にPandasとNumpyが圧倒的に使われます。

以下のライブラリとソフトウェアをよく見ます
- Python3 - Pandas - Numpy - ScikitLearn - lightgbm - xgboost - keras

また、データ量の多いコンペティションもたくさん出題されていて、普通に全量を扱おうとすると、BigQueryなどのSQLが使えるビッグデータ処理基板が必要になります。

MapReduce系のアーキテクチャとは、後述するデータ構造の関係で、行志向と列志向だと、Kernelが列志向のイメージと処理フローなので、行志向のMapReduceとはあまり相性がよくありません(Kaggle TalkinData Detectionでこれで大いに爆死しました。。)  

Kernelに登場しない処理方法 - BigQuery

7. 挑み方(データ構造)

Pandasのデータ構造が列志向という方法を採用しており、この方法を理解していると、便利です。  

列志向は、分散処理に向かないという側面がありますが、なんだかんだで直感的にデータを操作できて便利です。  

図4. PandasのDataFrameはSeriesのオブジェクトを束ねた列志向になっています

列志向は、Pandas作者のWes McKinnyがよく使う方法で、Apache Arrowなどの処理基板方式を横断したDataFrameを推進していらっしゃいます。

なんらかのKeyを必要としない処理方式なので、Map ReduceなどKeyをハッシングして、大規模分散する発想とは違ったものです。

幾つかプリミティブな動作を示します。これらの組み合わせて殆どのデータマニピュレーションが可能になります。

KaggleのOpenDataのDonorsChoose.orgのドナーのデータセットの例で示します。

図5. KaggleのDonorDataset

pandasのDataFrameをcolumn名をListで投入して、スライシングすると、DataFrame Objectが帰ります

dfslice = df[ ['Donor ID', 'Donor City'] ]  # カラム名 Donor ID, Donor Cityでスライスする(スライスされたDataFrameが帰る)

isinstance(dfslice, pd.DataFrame)

>> True

今度はDataFrameをcolumnをstrで指定して、スライシングすると、Series Objectが帰ります

donor_series = df[ 'Donor ID' ] # カラム名 Donor IDのSeriesが帰る

isinstance(donor_series, pd.Series)
>> True

Seriesに対する操作は、変換関数を定義して、変換することができます。

図6.

ZIP Codeをintに変換でき、かつ、偶数なら2倍し、奇数ならばそのままで、文字列ならば、-1にする例です。

def f(x):
    try:
        x = int(x)
        if x%2 == 0:
            return x*2
        else:
            return x
    except Exception as ex:
        return -1
df['Donor Zip'] = df['Donor Zip'].fillna(0)
df['Donor Zip'] = df['Donor Zip'].apply(lambda x:f(x))
df.head()
図7. 変換後のデータフレーム

Pandasドキュメントが不足しているのが、groupbyです。groupbyは特定のキーで小さなデータフレームに分割して、共通のキーの中で様々なオペレーションができます。

gp = dfslice.groupby('Donor City')

isinstance(gp, pd.core.groupby.DataFrameGroupBy) # パッケージの名前空間からしてわかりにくい...
>> True
図8. 設定したキーで小さいDataFrameで分割されてGroupByで更にまとめている

Pandasのgroupbyを利用して、indexで分散して、小さいDataFrameを作成して、multiprocessingを行うこともできます。

import concurrent.futures

df['index'] = df.index
df['distribute'] = df['index'].apply(lambda x: x%16)
dfs = [ _df for key, _df in df.groupby('distribute') ]

def pmap(df):
    # do something.
    pass
with concurrent.futures.ProcessPoolExecutor(max_workers=16) as exe:
    exe.map(pmap, dfs)

8. 挑み方(魂のあり様)

多少なりとも機械学習に自己のアイデンティティを置いている人は、得意分野以外のフィールドで評価されるリスクを避けようとする心理が働くかと思います。

ここで、少し落ち着いて見ましょう。そのように評価されてあなた自身の価値は毀損しますか? 周りの人をイメージすると良いです。

少なくとも、新しいことにチャレンジしてうまくいかないことが原因で見下されるような場合、その組織は新しい必要な投機を認める文化が醸造されていません。そのような文化体系が支配的であれば、私はその組織に在籍する必要がないように思います。  

また、必ず成功しようという意気込みからか、勝利を確実にするためにリソースを投下し続けるの問題でして、人間、できることには限界があり、生活のQoLが下がらないように調整する必要があります。  

自己のプライドや主観的な価値の軸を、メタ的な認知である程度自由に動かせることが、必要になってきます。  

メタ認知の一種を行うと良いでしょう。

9. 人間性を捧げよ

 最近流行り(一部界隈だけ?)の「人間性を捧げよ」について、個人の意見の感想を述べさせてください。
 私は「ダークソウル」と「ダークソウル3」のゲームが好きで、人間性という曖昧なもの(実態は何かわからない)が、この物語でキーになっています。  

図9. ダークソウルでの人間性。謎の影で、アイテムであり、人間らしさ(正気度のようなパラメータでもある) 

人間性を捧げよ」は、ダークソウル1のキャッチフレーズで、ものすごい難しいゲームなのですが、ボス戦で、体力の削り合いみたいな事になってしまします。  

まともにプレイしていると死にまくって、攻略ができないのですが、「人間性」というアイテムを消耗すると、HPが全回復して、ボスと殴り合いを継続できて、人間性でゴリ押しすることも可能です。  

この人間性はゲーム中で限られた個数しか取得することができなくて、人間性を消耗すると、ストーリーをすすめることができなくなって、実質上、積んでしまいます。

C++を勉強してたときはものすごい勉強をして覚えたのですが、まさしく、人間性を捧げて、通常の安定した生活を遺棄して、効率のために、様々な最適化を行う努力をしました。

強い人が言う、「人間性を捧げてください」は、まさしくこのようなことだと思います。  

やりすぎると病気になったり、精神がおかしくなり、人生というゲームが積んでしまうので、人間性を消費するタイミングはよく考えてください。