dask.distributedで分散処理
dask.distributedで分散処理
dask.distributedの使い方と、具体例集です
dask.distributedの簡単な理解
一種の分散処理フレームワークになっており、便利です。
ドキュメントやgithubはdaskからdask.distributedは分割されており、DataFrameの取扱以外のより汎用的な分散処理を含んでいるようです。
Celeryとかでもやったことをがあるのですが、dask.distributedはRemote Procedureのそれよりまともでより整理された方法で、concurrent.futureのリモート版とも考えられます。
dask.distributedのインストール
pip経由でのdaskのインストール
$ sudo pip3 install dask $ sudo pip3 install distributed
ubuntuでのインストール
注:パッケージが微妙に古くてpip経由の方がいいです
$ sudo apt install python3-distributed
Dask SchedulerとWorkerのセットアップ
Schedulerは分析を実行するマシン、クライアントマシンなど任意のマシンでいいはずです
(今回はschedulerを起動するマシンは192.168.14.13のIPアドレスを持つとしています)
$ dask-scheduler Start scheduler at 192.168.14.13:8786
portのチェック
$ nc -v -w 1 192.168.14.13 -z 8786
Workerは命令を受け実際に計算するマシンなので、別のマシンになります
${SCHEDULER}にはschedulerを起動したマシンのIPが入ります
nprocsオプションで最大のworkerでの並列数(同時に関数が走る数)を指定できます
$ dask-worker ${SCHEDULER}:8786 --nprocs 12
Dask.Distributedの並列マシン数と、速度の関係
下記にある簡単な足し算を並列マシン(worker数の増加)でやろうとすると、処理時間がほぼ反比例の関係で下がるので、効率的に分散処理できていることが確認できます。
分散した命令の結果を受け取る2つの方法
clientにmapされた命令はschedulerが管理して分散処理されますが、内部で、マシン台数かに分割されており、速度的には、一気に結果を見る(gather)のも、個別にみる(result)のもあまり変わりがないようです。
gatherという関数で一気に集められますが戻り値が多いときには、オンメモリにするのが難しいので、挙動が不安です。
L = client.map(inc, range(1000)) ga = client.gather(L)
resultで一個一個取っていく方法は、一件の処理結果のみ返すので、メモリ節約にはなりそうです
L = client.map(inc, args) for l in L: print(l.result())
例に用いたコード
簡単な例:数字を増やすだけ
簡単な命令で、数字を足し合わせて行くだけのプログラムですが、100回引数を変えて行おうとすると、計算時間がかかります。
今回、5台のマシンで一台あたり12並列数で行うので、合計60並列で処理します。
ClientでschedulerのIPアドレスとportを指定して、以下のようなコードを実行するだけで、複数のマシンで並列計算をすることができます。
from distributed import Client client = Client('192.168.14.13:8786') def inc(x): for i in range(10000000): x += i return x L = client.map(inc, range(1000)) ga = client.gather(L) print(ga)
複雑な例:機械学習のモデルのグリッドサーチでも便利
機械学習のパラメータを少しずつ変えながらもっとも、パフォーマンスが良いパラメータを総当りで探すグリッドサーチというものあって、マシンパワーでゴリ押ししてしまうのが都合がいいのです。
複数台のworkerでグリッドサーチさせると、そのマシンの台数分だけ減らせます
dask.mllibを使うとscikit-learnをラップして使うことができますが、任意のライブラリでも動かしたいので、特にdask.mllibは使いません
代表的なUCIのadult incomeデータセットでランダムフォレストでパラメータを2x10x10x15=3000通りという膨大なパラメータサーチであっても、割と早く終わらせることができます。
以下のコードの例では、パラメータの組み合わせを作って、分散処理で評価させています。
関数doはworkerで実行されて、ホームディレクトリ以下のデータセットを読み込んで、引数に与えられたRandomForestのパラメータを適応して学習し、テストデータでの精度を見ています。
def do(param): dataset = pickle.load(open(f'{os.environ["HOME"]}/dataset.pkl', 'rb')) Xs, ys, Xst, yst = dataset criterion, n_estimators, max_features, max_depth = param model = RandomForestClassifier(n_estimators=n_estimators, criterion=criterion, max_features=max_features, max_depth=max_depth) model.fit(Xs, ys) ysp = model.predict(Xst) acc = accuracy_score(yst, ysp) print(acc) return [acc, list(param)] params = [] for cri in ['gini', 'entropy']: for n_esti in range(5,15): for max_features in range(10,20): for max_depth in range(4, 20): params.append( (cri, n_esti, max_features, max_depth) ) L = client.map(do, params) ga = client.gather(L)
adult incomeのデータを学習可能かデータに変換
$ python3 parse-adult.py
出力されたdataset.pklを各workerのホームディレクトに配置
関数の引数に学習用のデータを含めると、遅くなったり、警告がでたりするので、大規模なデータの際はscpで転送する、S3, GCSを利用するなどが良いようです。
workerの台数で分散処理してグリッドサーチ
$ time python3 adult.py ... real 2m24.923s user 0m5.950s sys 0m0.951s
10分以上かかる処理が2分半程度に圧縮できました!
progress barの表示
client.map, client.submitをprogress関数でラップすることで、進捗を確認することができます
L = client.map(do, params) progress(L) # このようにする ga = client.gather(L)
dask.distributedで注意すべき点
dask.distributedで関数の引数に大きすぎるデータ(100MByte)を超えるものを投入すると警告が出るし、転送も早くありません。
そのため、なんらかローカルで対象となるデータを共有している必要があり、scpでファイル転送、HDFSやS3, cloudstrageなどとは相性がよい設計です。
S3とかに格納されchunkされたデータを一気に処理するときなど、便利そうです
金銭的な比較
今現在、私の個人サーバは6core 12threadのRyzen 1600Xが5台あって、30core 60threadで動作します。
2018/03現在、これはおおよそ2万円なので、10万円程度でこの仕組が購入できたことになります。
同等のスペックであるEpyc 7551pは$2100なので、おおよそ20万円で、半額程度で手に入れることができました。
もちろん、分散させないで一大のマシンで処理するメリットとかあると思うのですが、こんな大規模計算は稀だし、必要なときにdask-workerを立ち上げてクラスタに組み込めるのはメリットです。
Dask(+Dask.Distributed)の使い所
Apche Sparkと競合するような位置づけですが、Daks.Distributedのその簡単にデプロイできることと、Apache Sparkを用意するまででもないときとかよいんではないか、みたいに言われているようです。
私は、HadoopやDataFlow(Apache Beam)が結構好きで得意なので、Sparkをあんまり使わないですが、ここまで大げさに分散処理する必要が無いときにDask(+Dask.Distributed)は良さそうですね。
DeepLearningでアップサンプリングする
DeepLearningでアップサンプリングする
オーディオ界隈はオカルトっぽく見えていたので、今までどうしようと思っていたのですが、簡単な感じで結果がでました
世の中、音のアップサンプリングや音質がよくなるような細工に本当に余念がないのですが、ディープラーニングでも簡単に対応することは可能です。
世の常としてA/D変換されたデータは元のデータが欠落するから、音の復元は無理だと言われ[3]てきましたが、機械学習を使えばその制限は突破できます。
High Resolution
ハイレゾは96kHz/24bitという高いサンプリング数と、高い解像度を誇ります。
通常、YouTubeでは44kHz/16bitで音楽が再生されるので、及ばないのですが、22kHz/16bitの音源を44kHz/16bitに引き上げてみます。
この中間を補填するロジックに深層学習を組み込みます。
今回使用したネットワーク
幾つかやり方にはコツがあって、実は音の16bit値をfloatに変換したのでは、うまく行きません。これは、DNNがあまりにも小さい値には反応しないし、この小さなさが、音の善し悪しを分けたりするからです。
そのため、16bitをバイナリ(2進数)表記に変換して、音のある波形の前後25サンプルを切り出します。
25サンプルから、44kHzで本来存在していただろう、音の波形を補完します。
人の声や楽器など、ある程度のレンジを取って音波をみることで、楽器や声などの音の並ごとに特性が獲得できるとの仮説があるからです。
ネットワークでは、双方向のLSTMのネットワークを用いました。
コードで書くと、こんな感じです。(Bi=Bidirection, TD=TimeDistribute)
input_tensor1 = Input(shape=(50, 16)) x = Bi(CuDNNLSTM(300, return_sequences=True))(input_tensor1) x = TD(Dense(500, activation='relu'))(x) x = Bi(CuDNNLSTM(300, return_sequences=True))(x) x = TD(Dense(500, activation='relu'))(x) x = TD(Dense(20, activation='relu'))(x) decoded = Dense(1, activation='linear')(x) print(decoded.shape) model = Model(input_tensor1, decoded) model.compile(RMSprop(lr=0.0001, decay=0.03), loss='mae')
実験
ボーカロイドの曲である「wave」をreworuさんが歌ったものを利用しました
YouTubeからwaveファイルを取り出すことはできるので、ダウンロードしたら、入力用に22kHzにダウンサンプルします。(著作権の関係で音源は添付しませんのでご自身でご用意してください)
$ python3 10-scan.py
オリジナルの曲と、ダウンサンプルした曲のペアのデータ・セットを作成してnumpyに変換します
$ python3 20-make-dataset.py
学習
GTX1080Tiで二時間程度です
(音楽の前8割を学習に使い、残り2割を評価に回します)
$ python3 rnn-super-resolution.py --train
アップサンプリング
$ python3 rnn-super-resolution.py --predict
結果(誤差評価)
ぶっちゃけ、聞いても定性的な評価になってしまうというのが本音なので、テストデータにおけるMean Absolute Error(平均絶対誤差)をみて良くなっていることを確認します。
$ python3 eval.py オリジナルデータ 0.0 22kHzデータ 1260.0746398721526 ディープラーニングでアップサンプリングしたデータ 610.2184827578526
値が少ないほうがいいのですが、たしかに、22kHzのデータそのものより音質は改善していることがわかりました。
結果(聞いてみる)
オリジナル44khz .
https://soundcloud.com/sgemuj01eczp/origin
ダウンサンプル22kHz
https://soundcloud.com/sgemuj01eczp/degradation-1
https://soundcloud.com/sgemuj01eczp/yp-orig-5
よく聞き分けると、ノイズのような音源が、ところどころ機械学習では混じっていることがわかるかと思います(課題)
まとめ
ネットワークの大きさや、出力を工夫することで、44khz -> 88khzも可能だと思うし、単純なフィルタを超えたアップサンプリングが可能だと思います。
オーディオ沼は怖いのでほどほどにしておきたいですが、簡単にできるので、やってみる価値はあるかもしれません。
最後に実は全然別で、audio super resolutionというものをやってらっしゃる方を発見して、これは、音を画像のように捉えてアップサンプリングするようです[2]。
参考文献&オカルト
[1] mp3音源を“アップサンプリング”で高音質化できるか試してみた (データフォーマットを変形しただけでアップサンプリングできていないように見える)
文章をBlockChainで管理する
文章をBlockChainで管理する
今更感がありますが、BlockChainについて、技術的な点について結構曖昧であったので、調べなおしたりしました。
P2Pの多数決の理屈ばかり強調されますが、実際のところどうなっているのか、自分で実際に実装を行いながら、ブロックチェーンを組んでみて確認してみました。
BlockChainとは
- BitCoinに代表される仮想通貨の分散データベースシステム
- ハッシュアルゴリズムにより、事実上の改竄が不可能
- ドキュメントによっては、合意形成や、ブロックチェーンが意図しない方向にチェーンが伸びてしまった場合の取扱まで含んだりする
(今回のスコープはp2pは含まず、オフラインで検証しました)
BlockChainを理解するには何を知っていると便利か
- hash関数
- hash関数を使った応用例各種
- 簡単なP2P
コンピュータにおけるhashの使い方
hashはいろいろな値(文字列やなんでも)を、特定の数値の範囲にうまくエンベッティングする技術で、この仕組を使うと、キーからバリューの引き出しがO(1)の計算量で行えます。
hashによる分散化はあらゆるところで行われており、エンジニアにとって馴染みのあるものでは、hashmap, KVS, Hadoopなどがキーをハッシュ化することでうまくやっています。
BlockChainまで発展する
BlockChainはこのhashによる鎖を連ねることで、鎖が維持できるているかどうかで、データが正しいかどうかの検証が行えます。
単純なこの仕組に加えて、様々にP2Pで動作を確認と保証する仕組みをいれてBlockChainというらしいです(広義すぎるので分散台帳技術はちょっと和訳として不適切だと思う)
BitCoinにある採掘という作業
BitCoinには採掘という作業があって、GPUをゴリゴリ回してお金を得るみたいなことをやっている人たちがいます。これは、ハッシュ値の先頭が0000000〜とかになるように、データの中にnonceというフィールドを追加して調整しています。 最初に特定条件に一致するハッシュ値を計算できた人がインセンティブをもらい、次に繋がる鎖を生成します。これはマイニングしている人が、マイニングするモチベーションであり、ビットコインという通貨的特性と相性がいい理由でもあります。
P2Pでは同時に生成してしまう可能性がありますが、同時に生成してしまったら、後続に続く鎖の数が大きい方が優先されます。
国会で今話題になっている公文書偽造などの問題には転用可能でしょうか。
公文書管理におけるブロックチェーンの利用
この資料によると、費用対効果の視点でコストが嵩みすぎるという課題が挙げられていますが、それは、nonceによる採掘難易度に依存するし、衝突困難性は採掘と関係がないので、私はできると考えています。
また、直接、データをブロックチェーンに入れるのではなく、公文書のハッシュ値やフィンガープリントに該当するものをいれればいいはずだと思うのですが。
夏目漱石の小説をブロックチェーンに組み込む
このようなコードを作成しました。 前のhashの情報を記憶しつつ、データを持ち、nonceの条件を満たすものを探し、一致したら、ブロックをjsonで吐き出すものです。
def _gen_block(arg): source_host, data, prev_hash = arg now = time.time() loc = datetime.fromtimestamp(now) timestamp = loc.timestamp() while True: nonce = random.randint(0, 100000000000000) block = { \ 'timestamp':timestamp, \ 'source_host':source_host, \ 'data':data, \ 'prev_hash': prev_hash, \ 'nonce':nonce, } next_hash = hashlib.sha256(bytes(json.dumps(block),'utf8')).hexdigest() # 先頭のN字が"0"ならば、採用 size = 1 if next_hash[:size] == '0'*size: break open(f'cache/{next_hash}', 'w').write( json.dumps(block, indent=2, ensure_ascii=False) ) return block, next_hash start_block, next_hash = _gen_block(('http://localhost:1200', 'Ground Zero', hashlib.sha256(bytes('0', 'utf8')).hexdigest())) for line in open('stash/kokoro.txt'): line = line.strip() block, next_hash = _gen_block(('http://localhost:1200', line, next_hash) )
nonce値の難易度による夏目漱石の「坊っちゃん」を全部ブロックチェーン化するまでの計算時間
先頭が0一個、0二個、0三個、0四個と評価しました。
わかりきっていたことですが、だいぶ計算時間が違います。
CPU
CPU MHz: 1550.000 CPU max MHz: 3800.0000 CPU min MHz: 1550.0000AMD BogoMIPS: 7585.48en 5 1500X Quad-Core Processor
$ uname -a Linux PrintzEugen 4.13.0-32-generic #35-Ubuntu SMP Thu Jan 25 09:13:46 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux
インセンティブが働かない公文書管理などの環境では、nonce値はないか、小さい値でいいのではないかと考えいています。
過去への遡及
BlockChainはその名の通り、鎖状になっているので、偽造されたものでない限り、最初の発行者のデータにたどり着けるような仕組みになっています。
坊っちゃんの最後のセリフから、最初データまで辿ってみましょう。
最後のセリフ
hash => 00ce7c3a81315250d7abc2d99f177b2ffb442334644044ea89f1d6e435845d86 っtimestamp': 1521708677.710969, 'source_host': 'http://localhost:1200', 'data': '私は私の過去を善悪ともに他ひとの参考に供するつもりです。しかし妻だけはた唯た一人の例外だと承知して下さい。私は妻には何にも知らせたくないのです。妻が己おのれの過去に対してもつ記憶を、なるべく純白に保存しておいてやりたいのが私の。一ゆいいつの希望なのですから、私が死んだ後あとでも、妻が生きている以上は、あなた限りに打ち明けられた私の秘密として、すべてを腹の中にしまっておいて下さい 」', 'prev_hash': '072c69c315bc6c2839122f1b5566df41dfb64f9c489b44441056bab8b0ed9104', 'nonce': 18180102876915}
最初まで遡ります
$ python3 phyllis.py 私は私の過去を善悪ともに他ひとの参考に供するつもりです。しかし妻だけはたった一人の例外だと承知して下さい。私は妻には何にも知らせたくないのです。妻が己おのれの過去に対してもつ記憶を、なるべく純白に保存しておいてやりたいのが私の唯一ゆいいつの希望なのですから、私が死んだ後あとでも、妻が生きている以上は、あなた限りに打ち明けられた私の秘密として、すべてを腹の中にしまっておいて下さい。」 しかし私は今その要求を果たしました。もう何にもする事はありません。この手紙があなたの手に落ちる頃ころには、私はもうこの世にはいないでしょう。とくに死んでいるでしょう。妻は十日ばかり前から市ヶ谷いちがやの叔母おばの所へ行きました。叔母が病気で手が足りないというから私が勧めてやったのです。私は妻の留守の間あいだに、この長いものの大部分を書きました。時々妻が帰って来ると、私はすぐそれを隠しました。 私が死のうと決心してから、もう十日以上になりますが、その大部分はあなたにこの長い自叙伝の一節を書き残すために使用されたものと思って下さい。 ... (中間の表現) ... 私わたくしはその人を常に先生と呼んでいた。だからここでもただ先生と書くだけで本名は打ち明けない。これは世間を憚はばかる遠慮というよりも、その方が私にとって自然だからである。私はその人の記憶を呼び起すごとに、すぐ「先生」といいたくなる。筆を執とっても心持は同じ事である。よそよそしい頭文字かしらもじなどはとても夏目漱石と私ない。 こころ Ground Zero
Ground Zeroとは、今回、作成したblockchainの最初の値になるシードのデータです。
様々なブロックチェーンの応用例
このように、情報が正しければ遡及を行え、すべてのレコードを引き出すことも可能になります。
改竄されていたりすると、不自然なところで遡ることができなくなったり、改竄されたデータを元にブロックチェーンをつなげてしまったと解釈できそうです。
この特徴を利用して、食品安全の仕組みに組み込むことで、肉などの偽装や、安全管理などが行えることが期待できます[3]。
またブロックチェーンの偽装が難しい問題で、戸籍などをブロックチェーン化してしまうのもあるようです[4]。
作成したコード
offline.py 単一マシンで、文章のブロックチェーンを構築します。 ハードコードされたsizeで計算の難易度を指定できます
$ python3 offline.py
phyllis.py ブロックチェーンのハッシュ値から過去のブロックを辿って、ルートまで逆引きするプログラムです
$ python3 phyllis.py
参考文献
RocksDBをさまざまな言語(C++, Rust, Kotlin, Python)で利用する
RocksDBをさまざまな言語(C++, Rust, Kotlin, Python)で利用する
InstagramのCassandraのバックエンドをJVMベースのものから、RocksDBに切り替えたというニュースが少し話題になりました。
CassandraのJVMは定期的にガーベジコレクタが走って、よろしくないようです。
P99というテストケースではデフォルトのJVMからRocksDBに張り替えたところ10倍近くのパフォーマンスが得られたとのことです。
データ分析でもメモリ収まりきらないけど、Sparkのような分散システムを本格に用意する必要がない場合、NVMe上にLevelDB, RocksDBなどのKVSを用意して加工することがあります。
ローカルで動作させるには最強の速度だし、文句のつけようもない感じです。
LSMというデータ構造で動いており、比較対象としてよく現れるb-treeより書き込み時のパフォーマンスは良いようです[1]
LSMのデータ構造では挿入にO(1)の計算量が必要で、検索と削除にO(N)の計算量が必要です。
前提とやりたいこと
- RocksDBはSSDやnvmeで爆速を引き出すパーマネントKVSです
- LevelDB, RocksDBはPythonで分析するときの必勝パターンに自分のスキルの中に入っているので、ぜひともRocksDBも開拓したい
- RocksDBはC++のインターフェースが美しい形で提供さており、他言語とのBindingが簡単そう
もくじ
- 1. RocksDBのインストール(Linux)
- 2. Pure C++でのRocksDBの利用
- 3. C++ Bindingの方針
- 4. Rustでの利用
- 5. Kotlinでの利用
- 6. Python(BoostPython)での利用
これらのコードはこちらにあります。
1. RocksDBのインストール
Ubuntuですと標準レポジトリにないので、ビルドしてインストールする必要があります
(GCC >= 7.2.0, cmakeなどの基本的なbuild-toolsが必要なので、ご自身のOSに合わせて用意してください)
$ git clone git@github.com:facebook/rocksdb.git $ cd rocksdb $ mkdir build $ cd build $ cmake .. $ make -j12 $ sudo make install
2. Pure C++
注意 最新のClangでは構文エラーでコンパイラが通らないので、gcc(g++ >= 7.2.0)を利用必要があります
C++でRocksDBは記述されているので、C++でのインターフェースが最も優れています。
DBのopen, get, putはこのようなIFで提供されています
DB* db; Options options; // Optimize RocksDB. This is the easiest way to get RocksDB to perform well options.IncreaseParallelism(); // create the DB if it's not already present options.create_if_missing = true; // open DB string kDBPath = "test.rdb"; Status s = DB::Open(options, kDBPath, &db); assert(s.ok()); // Put key-value s = db->Put(WriteOptions(), "key1", "value"); assert(s.ok()); // get value string value; s = db->Get(ReadOptions(), "key1", &value); assert(s.ok()); assert(value == "value");
Pinableという考え方があり、Pinableを用いると、データのコピーが発生しない(memcpyは動作しない)ので、高速性が要求されるときなど良さそうです
PinnableSlice pinnable; s = db->Get(ReadOptions(), db->DefaultColumnFamily(), "key1", &pinnable); // メモリコピーコストが発生しない
3. C++bindings
C/C++でラッパーを書くことで任意のCのshared objectが利用できる言語とバインディングを行うことができます。
extern "C"で囲んだ範囲が外部のプログラムで見える関数になります。
extern "C" { void helloDB(const char* dbname); int putDB(const char* dbname, const char* key, const char* value); int getDB(const char* dbname, const char* key, char* value); int delDB(const char* dbname, const char* key); int keysDB(const char* dbname, char* keys); }
サンプルのshared objectを作成するコードを用意したので、参考にしていただけると幸いです。
$ cd cpp-shared $ make $ ls librocks.so $ ldd librocks.so linux-vdso.so.1 => (0x00007fff04ccd000) librocksdb.so.5 => /usr/lib/x86_64-linux-gnu/librocksdb.so.5 (0x00007fdaf33ab000) libstdc++.so.6 => /usr/lib/x86_64-linux-gnu/libstdc++.so.6 (0x00007fdaf3025000) libgcc_s.so.1 => /lib/x86_64-linux-gnu/libgcc_s.so.1 (0x00007fdaf2e0e000) libc.so.6 => /lib/x86_64-linux-gnu/libc.so.6 (0x00007fdaf2a2e000) libpthread.so.0 => /lib/x86_64-linux-gnu/libpthread.so.0 (0x00007fdaf280f000) libm.so.6 => /lib/x86_64-linux-gnu/libm.so.6 (0x00007fdaf24b9000) /lib64/ld-linux-x86-64.so.2 (0x00007fdaf3e77000)
4. Rust
RustではC++のバインディングを利用してRocksDBにデータを格納したり取り出したりする方法を示します。
サンプルコードを動作させるには、以下のようにterminalを操作します。
$ cd rust $ export LD_LIBRARY_PATH=../cpp-shared/:$LD_LIBRARY_PATH $ make $ ./sample
Rustではstructで定義したものをimplで拡張していくのですが、例えば、putに関してはこのように設計しました。
C/C++などで文字の終了が示される\0が入らないため、このようなformatで文字を加工してC++に渡しています
pub struct Rocks { pub dbName:String, pub cursol:i32, } impl Rocks { pub fn new(dbName:&str) -> Rocks { let outName = format!("{}\0", dbName); unsafe { helloDB( outName.as_ptr() as *const c_char) }; Rocks{ dbName:outName.to_string(), cursol:0 } } } impl Rocks { pub fn put(&self, key:&str, value:&str) -> i32 { let dbName = format!("{}\0", &*(self.dbName)); let key = format!("{}\0", key); let value = format!("{}\0", value); let sub = unsafe { putDB( (&*dbName).as_ptr() as *const c_char, key.as_ptr() as *const c_char, value.as_ptr() as *const c_char) }; sub } }
5. Kotlin
Kotlin, JavaではGradleに追加することで簡単に利用可能になります。
compile group: 'org.rocksdb', name: 'rocksdbjni', version: '5.10.3'
Interfaceも整理されており、以下のように簡単に、put, get, iterate, deleteが行えます
import org.rocksdb.RocksDB import org.rocksdb.Options fun main(args : Array<String>) { RocksDB.loadLibrary() // DBをなければ作成して開く val options = Options().setCreateIfMissing(true) val db = RocksDB.open(options, "/tmp/kotlin.rdb") // データのput val key1 = "key1".toByteArray() val value1 = "value1".toByteArray() db.put(key1, value1) val key2 = "key2".toByteArray() val value2 = "value2".toByteArray() db.put(keygetvalue2) val bvalue = db.get(key1) println(String(bvalue)) // seek to end val iter = db.newIterator() iter.seekToFirst() while( iter.isValid() ) { println("${String(iter.key())} ${String(iter.value())}") iter.next() } // データの削除 db.delete(key1) db.delete(key2) db.close() }
実行
$ cd kotlin $ ./gradlew run -Dexec.args="" Starting a Gradle Daemon, 1 busy Daemon could not be reused, use --status for details :compileKotlin UP-TO-DATE :compileJava UP-TO-DATE :copyMainKotlinClasses UP-TO-DATE :processResources NO-SOURCE :classes UP-TO-DATE :runApp value1 key1 value1 key2 value2 BUILD SUCCESSFUL
6. Python
PythonはBoostPythonを用いると簡単にRocksDB <-> Pythonをつなぐことができます。
Python3とも問題なくBindingすることができて便利です。
ネット上のBoostPythonのドキュメントにはDeprecatedになった大量のSyntaxが入り混じっており、大変混沌としていたので、一つ確実に動く基準を設けて書くのが良さそうでした
CPPファイルの定義
CPPでRocksDBを扱うクラスを定義し、諸々実装を行います
#include <boost/python.hpp> #include <string> #include <cstdio> #include <string> #include <iostream> #include "rocksdb/db.h" #include "rocksdb/slice.h" #include "rocksdb/options.h" using namespace rocksdb; namespace py = boost::python; class RDB{ private: DB* db; public: std::string dbName; RDB(std::string dbName): dbName(dbName){ Options options; options.IncreaseParallelism(); options.create_if_missing = true; Status s = DB::Open(options, dbName, &(this->db)); }; RDB(py::list ch); void put(std::string key, std::string value); std::string get(std::string key); void dlt(std::string key); py::list keys(); }; void RDB::put(std::string key, std::string value) { this->db->Put(WriteOptions(), key, value); } ....
pythonの実装
Pythonで用いるのは簡単で、shared object名と同名のやつを読み出して、インスタンスを作成して、関数を叩くだけです(めっちゃ簡単)
from rdb import RDB # create drow instance db = RDB('/tmp/boost.rdb') # access the word and print it print( db.dbName ) db.put('key1', 'val1') val = db.get('key1') print(val) db.put('key2', 'val2') print(db.keys()) val = db.delete('key1')
NVMeとHDDとのパフォーマンスの違い
もっと決定的に処理速度の差が出ると思ったのですが、そんなに変わらないという感じでした。
まとめ
ユースケースとして、転置インデックスを巨大なデータ構造そのままで、力でゴリゴリ押ししようとしてもメモリ上に乗らなかったりするとき、KVSとしてデータをファイルに書き出すことで効率的に行えたりします。(例えばWikipediaの記事全量からtf-idfを計算するときなど)
Valueは任意のシリアライザーでシリアライズしておく必要があり、Pythonだとpickle, KotlinだとKotlinx.serialize, Rustだとserdeなどが便利です。
今まではLevelDB(RocksDBのFork元)を用いていたのですが、PythonとC++しか実用的な装系がなく、もっといろんな言語とBindingしようとすると、RocksDBのほうが便利だと思いました。