にほんごのれんしゅう

日本語として伝えるための訓練を兼ねたテクログ

SQL, Pandas, 関数型言語におけるそれぞれの集計・分析方法の比較と類似

SQL, Pandas, 関数型言語におけるそれぞれの集計・分析方法の比較と類似

乱立するデータ集計技術

ビッグデータだの、人工知能だのバズワードが様々に叫ばれていますが、今でも主流はエクセルで分析しているということを聞いたりします。
エクセルを超えた量のデータを扱う時には綺麗に成型されている時に限り、SQLなどが使えますが、データが汚かったり、膨大な量になってくると関数型言語からヒントを得たMapReduce系の集計フレームワークが使われます。

また、さすがにVBScriptでデータ集計したくないという人もそれなりにいて、Excelのデータをpandasに投入して必要なデータを集計している人もいます。

昔からデータを貯める、分析するということを生業にしていた人は、ビッグデータの考えをそのままSQLを適応して、SQLで集計したりしています。  

色々な集計方法が出てきた今、改めて、主に私の周りで使われている集計技術(アドテクの一部の領域)について、それぞれが確かに互換性能があることを示したいと思います。

今回評価する方法

以下の三つを評価します

SQL

最も昔からあるデータ構造の管理の形だと思います。
私も学生時代はPHP + MySQL + Perl(or Ruby)でのWebシステムを作ったことがあります。
メリットとしてはSQLは使える人が多く、システム屋さんも知識的に保有していることが多いので、人の流動性が確保しやすいという印象があります。
デメリットとしては、データが正規化されている必要があるので、実験的なデータを取り込むのが弱く、アドホックでスピード感がある実験を繰り返せる環境にはあまり向いていないです(実体験ベース)

Pandas

直近でも経験したのですが、Excelの膨大なデータを渡されて分析してと投げられることがあります。
私は、ExcelCSVにダンプしてPythonRubyに投入して分析することしていたのですが、当時同じメンバーがExcel VB Macroで分析するということをやっており、[Pythonでの機械学習のアウトプット]-> [Excel]の繋ぎに不便を感じていました
今年ぐらいから、周りのデータサイエンス業界と、Kaggleの強いツール(Kaggleで多く使われているツールはよく流行るので参考になります)という角度で、Excelでデータをもらっても、基本的に、PythonのPandasで処理することになりました。
メリットとしては、Pythonはそれなりに使える人が多いし、Excelをこねくり回すよりだいぶマシで、SQLとやりたいことは同じことができます
デメリットとしては、Pandasの作り自体が独自のエコシステムを形成しており、SQLや関数型の考えとも微妙に異なっており、独自のポジジョンにいることで学習コストがかさみがちです

関数型言語MapReduceなどを操作を想定)

MapReduceアーキテクチャが提案されてから、どんなに巨大なデータでも、この仕組みの上に載せることで、集計が可能になりました。   関数型言語のmap関数とreduce関数にヒントを得て作られており、畳み込み(fold)に該当する操作を行うReduceのリソースをうまく分散処理させられれば、どんなデータでも処理可能です。  

メリットとしては、プログラミングをして集計をするので、かなり柔軟であり、複雑怪奇な非構造化データでも処理可能です。
デメリットとしては、プログラムをかけないレベルの人は集計するのも困難になることが挙げられます。

互換性を示す

圏論などを用いて、数式上で、データの操作が等価であることを示してもいいのですが、今回は、dataquest.ioさんが一般的にSQLを用いて基礎のデータサイエンティストが集計できるべき角度のデータとそのクエリをその教育プランで行なっており、それを他のデータ集計方法でも行えることを示すことで、基礎的な側面においては、等価であることを示したいと考えています
(よろしければ、dataquest.ioさんのコンテンツも見てみてください。基礎的な側面においては、充実している印象があります。)

使用するデータ

jobs.db

sqlite形式で保存したデータベースです。そんなに大きいデータでないので、これを今回、集計してきます
大学の専攻で、就職率、就職した先、人数、女性の比率などが記されています。

sqlite> PRAGMA table_info(recent_grads);
0|index|INTEGER|0||0
1|Rank|INTEGER|0||0
2|Major_code|INTEGER|0||0
3|Major|TEXT|0||0
4|Major_category|TEXT|0||0
5|Total|INTEGER|0||0
6|Sample_size|INTEGER|0||0
7|Men|INTEGER|0||0
8|Women|INTEGER|0||0
9|ShareWomen|REAL|0||0
10|Employed|INTEGER|0||0
11|Full_time|INTEGER|0||0
12|Part_time|INTEGER|0||0
13|Full_time_year_round|INTEGER|0||0
14|Unemployed|INTEGER|0||0
15|Unemployment_rate|REAL|0||0
16|Median|INTEGER|0||0
17|P25th|INTEGER|0||0
18|P75th|INTEGER|0||0
19|College_jobs|INTEGER|0||0
20|Non_college_jobs|INTEGER|0||0
21|Low_wage_jobs|INTEGER|0||0

factbook.db

sqlite形式で保存したデータベースです
主に、各国の人口や出生率など、国力を表すKPIが多く記されています
テーブルの構造はこのようになっています

sqlite> PRAGMA table_info(facts);
0|id|INTEGER|1||1
1|code|varchar(255)|1||0
2|name|varchar(255)|1||0
3|area|integer|0||0
4|area_land|integer|0||0
5|area_water|integer|0||0
6|population|integer|0||0
7|population_growth|float|0||0
8|birth_rate|float|0||0
9|death_rate|float|0||0
10|migration_rate|float|0||0
11|created_at|datetime|0||0
12|updated_at|datetime|0||0

データが少ないので、githubで管理しています。

jupyter notebook上での実行です

一つのSQLのクエリとして知っておくべき最小での粒度での、例をSQL, Pandas, 関数型の順で示します
本当は関数型はKotlinで書きたかったのですが、Jupyter上でPythonとKotlinを両方一つのノートブックで使う方法がわからなかったので、Rubyで書いています(Rubyは別に関数型言語ではないですが、SyntaxはGoogle Cloud DataFlowやSparkなどに似せられるので、そのように書きました。)

# SQLITEのデータをメモリ上にロードします
%load_ext sql
%sql sqlite:////var/jobs.db

# jobs.dbのデータ構造はこのようになっています
%sql SELECT * FROM recent_grads LIMIT 3;
index Rank Major_code Major Major_category Total Sample_size Men Women ShareWomen Employed Full_time Part_time Full_time_year_round Unemployed Unemployment_rate Median P25th P75th College_jobs Non_college_jobs Low_wage_jobs
0 1 2419 PETROLEUM ENGINEERING Engineering 2339 36 2057 282 0.120564344 1976 1849 270 1207 37 0.018380527 110000 95000 125000 1534 364 193
1 2 2416 MINING AND MINERAL ENGINEERING Engineering 756 7 679 77 0.10185185199999999 640 556 170 388 85 0.117241379 75000 55000 90000 350 257 50
2 3 2415 METALLURGICAL ENGINEERING Engineering 856 3 725 131 0.153037383 648 558 133 340 16 0.024096386 73000 50000 105000 456 176 0

専攻(Major)列に限定して、10行取り出す

# sql
%sql SELECT Rank, Major FROM recent_grads LIMIT 10;
Rank Major
1 PETROLEUM ENGINEERING
2 MINING AND MINERAL ENGINEERING
3 METALLURGICAL ENGINEERING
4 NAVAL ARCHITECTURE AND MARINE ENGINEERING
5 CHEMICAL ENGINEERING
6 NUCLEAR ENGINEERING
7 ACTUARIAL SCIENCE
8 ASTRONOMY AND ASTROPHYSICS
9 MECHANICAL ENGINEERING
10 ELECTRICAL ENGINEERING
# pandasで処理するために、まずdataframe(df)に読み込みます
import sqlite3
import pandas as pd
conn = sqlite3.connect("/var/jobs.db")
df = pd.read_sql_query("SELECT * FROM recent_grads ;", conn)
rank_major = df[["Rank", "Major"]]
rank_major.head(10)
Rank Major
0 1 PETROLEUM ENGINEERING
1 2 MINING AND MINERAL ENGINEERING
2 3 METALLURGICAL ENGINEERING
3 4 NAVAL ARCHITECTURE AND MARINE ENGINEERING
4 5 CHEMICAL ENGINEERING
5 6 NUCLEAR ENGINEERING
6 7 ACTUARIAL SCIENCE
7 8 ASTRONOMY AND ASTROPHYSICS
8 9 MECHANICAL ENGINEERING
9 10 ELECTRICAL ENGINEERING
%%ruby
## Jupyter notebookでrubyで記述するとこのように表現できる(毎回読み込む必要がある)
require "sqlite3"
db = SQLite3::Database.new "/var/jobs.db"
cols = db.execute("PRAGMA table_info(recent_grads)").map { |xs| xs[1]}
db.execute( "SELECT * FROM recent_grads ;" ).map {  |xs|
   cols.zip(xs).to_h
}&.map { |xs| 
    [xs["Rank"], xs["Major"]]
}&.slice(0..9).map { |xs| 
    p xs
}
[1, "PETROLEUM ENGINEERING"]
[2, "MINING AND MINERAL ENGINEERING"]
[3, "METALLURGICAL ENGINEERING"]
[4, "NAVAL ARCHITECTURE AND MARINE ENGINEERING"]
[5, "CHEMICAL ENGINEERING"]
[6, "NUCLEAR ENGINEERING"]
[7, "ACTUARIAL SCIENCE"]
[8, "ASTRONOMY AND ASTROPHYSICS"]
[9, "MECHANICAL ENGINEERING"]
[10, "ELECTRICAL ENGINEERING"]

Rank,Major_code,Major,Major_category,Total列に限定して、20件の行を取り出す

%sql SELECT Rank,Major_code,Major,Major_category,Total FROM recent_grads limit 20;
Rank Major_code Major Major_category Total
1 2419 PETROLEUM ENGINEERING Engineering 2339
2 2416 MINING AND MINERAL ENGINEERING Engineering 756
3 2415 METALLURGICAL ENGINEERING Engineering 856
4 2417 NAVAL ARCHITECTURE AND MARINE ENGINEERING Engineering 1258
5 2405 CHEMICAL ENGINEERING Engineering 32260
6 2418 NUCLEAR ENGINEERING Engineering 2573
7 6202 ACTUARIAL SCIENCE Business 3777
8 5001 ASTRONOMY AND ASTROPHYSICS Physical Sciences 1792
9 2414 MECHANICAL ENGINEERING Engineering 91227
10 2408 ELECTRICAL ENGINEERING Engineering 81527
11 2407 COMPUTER ENGINEERING Engineering 41542
12 2401 AEROSPACE ENGINEERING Engineering 15058
13 2404 BIOMEDICAL ENGINEERING Engineering 14955
14 5008 MATERIALS SCIENCE Engineering 4279
15 2409 ENGINEERING MECHANICS PHYSICS AND SCIENCE Engineering 4321
16 2402 BIOLOGICAL ENGINEERING Engineering 8925
17 2412 INDUSTRIAL AND MANUFACTURING ENGINEERING Engineering 18968
18 2400 GENERAL ENGINEERING Engineering 61152
19 2403 ARCHITECTURAL ENGINEERING Engineering 2825
20 3201 COURT REPORTING Law & Public Policy 1148
step2 = df[["Rank","Major_code","Major","Major_category","Total"]]
step2.head(20) 
Rank Major_code Major Major_category Total
0 1 2419 PETROLEUM ENGINEERING Engineering 2339
1 2 2416 MINING AND MINERAL ENGINEERING Engineering 756
2 3 2415 METALLURGICAL ENGINEERING Engineering 856
3 4 2417 NAVAL ARCHITECTURE AND MARINE ENGINEERING Engineering 1258
4 5 2405 CHEMICAL ENGINEERING Engineering 32260
5 6 2418 NUCLEAR ENGINEERING Engineering 2573
6 7 6202 ACTUARIAL SCIENCE Business 3777
7 8 5001 ASTRONOMY AND ASTROPHYSICS Physical Sciences 1792
8 9 2414 MECHANICAL ENGINEERING Engineering 91227
9 10 2408 ELECTRICAL ENGINEERING Engineering 81527
10 11 2407 COMPUTER ENGINEERING Engineering 41542
11 12 2401 AEROSPACE ENGINEERING Engineering 15058
12 13 2404 BIOMEDICAL ENGINEERING Engineering 14955
13 14 5008 MATERIALS SCIENCE Engineering 4279
14 15 2409 ENGINEERING MECHANICS PHYSICS AND SCIENCE Engineering 4321
15 16 2402 BIOLOGICAL ENGINEERING Engineering 8925
16 17 2412 INDUSTRIAL AND MANUFACTURING ENGINEERING Engineering 18968
17 18 2400 GENERAL ENGINEERING Engineering 61152
18 19 2403 ARCHITECTURAL ENGINEERING Engineering 2825
19 20 3201 COURT REPORTING Law & Public Policy 1148

女性率が0.5(50%)を超える専攻と女性率の20件の行を表示する

#sql
%sql SELECT Major,ShareWomen FROM recent_grads WHERE ShareWomen>0.5 limit 20;
Major ShareWomen
ACTUARIAL SCIENCE 0.535714286
COMPUTER SCIENCE 0.578766338
ENVIRONMENTAL ENGINEERING 0.558548009
NURSING 0.896018988
INDUSTRIAL PRODUCTION TECHNOLOGIES 0.75047259
COMPUTER AND INFORMATION SYSTEMS 0.7077185020000001
INFORMATION SCIENCES 0.526475764
APPLIED MATHEMATICS 0.75392736
PHARMACOLOGY 0.524152583
OCEANOGRAPHY 0.688999173
MATHEMATICS AND COMPUTER SCIENCE 0.927807246
COGNITIVE SCIENCE AND BIOPSYCHOLOGY 0.854523227
SCHOOL STUDENT COUNSELING 0.56486557
INTERNATIONAL RELATIONS 0.632986838
AGRICULTURE PRODUCTION AND MANAGEMENT 0.59420765
GENERAL AGRICULTURE 0.515543329
GENETICS 0.643331121
MISCELLANEOUS SOCIAL SCIENCES 0.5434054220000001
UNITED STATES HISTORY 0.6307163179999999
AGRICULTURAL ECONOMICS 0.589711902
# pandas
major_sharewomen = df[lambda df:df["ShareWomen"]>0.5][["Major","ShareWomen"]]
major_sharewomen.head(20)
Major ShareWomen
6 ACTUARIAL SCIENCE 0.535714
20 COMPUTER SCIENCE 0.578766
30 ENVIRONMENTAL ENGINEERING 0.558548
34 NURSING 0.896019
38 INDUSTRIAL PRODUCTION TECHNOLOGIES 0.750473
42 COMPUTER AND INFORMATION SYSTEMS 0.707719
45 INFORMATION SCIENCES 0.526476
47 APPLIED MATHEMATICS 0.753927
48 PHARMACOLOGY 0.524153
49 OCEANOGRAPHY 0.688999
52 MATHEMATICS AND COMPUTER SCIENCE 0.927807
54 COGNITIVE SCIENCE AND BIOPSYCHOLOGY 0.854523
55 SCHOOL STUDENT COUNSELING 0.564866
56 INTERNATIONAL RELATIONS 0.632987
63 AGRICULTURE PRODUCTION AND MANAGEMENT 0.594208
64 GENERAL AGRICULTURE 0.515543
67 GENETICS 0.643331
68 MISCELLANEOUS SOCIAL SCIENCES 0.543405
69 UNITED STATES HISTORY 0.630716
71 AGRICULTURAL ECONOMICS 0.589712
%%ruby
require "sqlite3"
db = SQLite3::Database.new "/var/jobs.db"
cols = db.execute("PRAGMA table_info(recent_grads)").map { |xs| xs[1]}
db.execute( "SELECT * FROM recent_grads ;" ).map {  |xs|
   cols.zip(xs).to_h
}.select { |xs| 
    xs["ShareWomen"] > 0.5
}.map { |xs| 
    [xs["Rank"], xs["Major"]]
}.slice(0..19).map { |xs| 
    p xs
}
[7, "ACTUARIAL SCIENCE"]
[21, "COMPUTER SCIENCE"]
[31, "ENVIRONMENTAL ENGINEERING"]
[35, "NURSING"]
[39, "INDUSTRIAL PRODUCTION TECHNOLOGIES"]
[43, "COMPUTER AND INFORMATION SYSTEMS"]
[46, "INFORMATION SCIENCES"]
[48, "APPLIED MATHEMATICS"]
[49, "PHARMACOLOGY"]
[50, "OCEANOGRAPHY"]
[53, "MATHEMATICS AND COMPUTER SCIENCE"]
[55, "COGNITIVE SCIENCE AND BIOPSYCHOLOGY"]
[56, "SCHOOL STUDENT COUNSELING"]
[57, "INTERNATIONAL RELATIONS"]
[64, "AGRICULTURE PRODUCTION AND MANAGEMENT"]
[65, "GENERAL AGRICULTURE"]
[68, "GENETICS"]
[69, "MISCELLANEOUS SOCIAL SCIENCES"]
[70, "UNITED STATES HISTORY"]
[72, "AGRICULTURAL ECONOMICS"]

就職者数が10000人を超える専攻と就職者数を10件の行を表示する

#sql
%sql SELECT Major,Employed FROM recent_grads WHERE Employed > 10000 limit 10;
Major Employed
CHEMICAL ENGINEERING 25694
MECHANICAL ENGINEERING 76442
ELECTRICAL ENGINEERING 61928
COMPUTER ENGINEERING 32506
AEROSPACE ENGINEERING 11391
BIOMEDICAL ENGINEERING 10047
INDUSTRIAL AND MANUFACTURING ENGINEERING 15604
GENERAL ENGINEERING 44931
COMPUTER SCIENCE 102087
MANAGEMENT INFORMATION SYSTEMS AND STATISTICS 16413
#pandas
major_employed = df[lambda df:df["Employed"]>10000][["Major","Employed"]]
major_employed.head(10)
Major Employed
4 CHEMICAL ENGINEERING 25694
8 MECHANICAL ENGINEERING 76442
9 ELECTRICAL ENGINEERING 61928
10 COMPUTER ENGINEERING 32506
11 AEROSPACE ENGINEERING 11391
12 BIOMEDICAL ENGINEERING 10047
16 INDUSTRIAL AND MANUFACTURING ENGINEERING 15604
17 GENERAL ENGINEERING 44931
20 COMPUTER SCIENCE 102087
24 MANAGEMENT INFORMATION SYSTEMS AND STATISTICS 16413

女性率が50%を超えて、かつ従業員が10000人を超える専攻を10行取り出す

#sql
%sql SELECT Major,ShareWomen,Employed FROM recent_grads WHERE ShareWomen>0.5 AND Employed>10000 LIMIT 10;
Major ShareWomen Employed
COMPUTER SCIENCE 0.578766338 102087
NURSING 0.896018988 180903
COMPUTER AND INFORMATION SYSTEMS 0.7077185020000001 28459
INTERNATIONAL RELATIONS 0.632986838 21190
AGRICULTURE PRODUCTION AND MANAGEMENT 0.59420765 12323
CHEMISTRY 0.5051405379999999 48535
BUSINESS MANAGEMENT AND ADMINISTRATION 0.580948004 276234
BIOCHEMICAL SCIENCES 0.515406449 25678
HUMAN RESOURCES AND PERSONNEL MANAGEMENT 0.672161443 20760
MISCELLANEOUS HEALTH MEDICAL PROFESSIONS 0.702020202 10076
# pandas
triple = df[lambda df: (df["Employed"]>10000) & (df["ShareWomen"] > 0.5) ][["Major","ShareWomen","Employed"]]
triple.head(10)
Major ShareWomen Employed
20 COMPUTER SCIENCE 0.578766 102087
34 NURSING 0.896019 180903
42 COMPUTER AND INFORMATION SYSTEMS 0.707719 28459
56 INTERNATIONAL RELATIONS 0.632987 21190
63 AGRICULTURE PRODUCTION AND MANAGEMENT 0.594208 12323
74 CHEMISTRY 0.505141 48535
76 BUSINESS MANAGEMENT AND ADMINISTRATION 0.580948 276234
82 BIOCHEMICAL SCIENCES 0.515406 25678
86 HUMAN RESOURCES AND PERSONNEL MANAGEMENT 0.672161 20760
88 MISCELLANEOUS HEALTH MEDICAL PROFESSIONS 0.702020 10076
%%ruby
require "sqlite3"
db = SQLite3::Database.new "/var/jobs.db"
cols = db.execute("PRAGMA table_info(recent_grads)").map { |xs| xs[1]}
db.execute( "SELECT * FROM recent_grads ;" ).map {  |xs|
   cols.zip(xs).to_h
}.select { |xs| 
    xs["ShareWomen"] > 0.5 && xs["Employed"] > 10000
}.map { |xs| 
    ["Major","ShareWomen","Employed"].map { |x| xs[x]}
}&.slice(0..9).map { |xs| 
    p xs
}
["COMPUTER SCIENCE", 0.578766338, 102087]
["NURSING", 0.896018988, 180903]
["COMPUTER AND INFORMATION SYSTEMS", 0.7077185020000001, 28459]
["INTERNATIONAL RELATIONS", 0.632986838, 21190]
["AGRICULTURE PRODUCTION AND MANAGEMENT", 0.59420765, 12323]
["CHEMISTRY", 0.5051405379999999, 48535]
["BUSINESS MANAGEMENT AND ADMINISTRATION", 0.580948004, 276234]
["BIOCHEMICAL SCIENCES", 0.515406449, 25678]
["HUMAN RESOURCES AND PERSONNEL MANAGEMENT", 0.672161443, 20760]
["MISCELLANEOUS HEALTH MEDICAL PROFESSIONS", 0.702020202, 10076]

女性率が50%を超え、非雇用率が5.1%未満の"専攻"と"専攻のカテゴリ"について、10行取り出す

#sql
%sql SELECT Major, Major_category, ShareWomen, Unemployment_rate FROM recent_grads where (Major_category = 'Engineering') AND (ShareWomen > 0.5 or Unemployment_rate < 0.051) LIMIT 10;
Major Major_category ShareWomen Unemployment_rate
PETROLEUM ENGINEERING Engineering 0.120564344 0.018380527
METALLURGICAL ENGINEERING Engineering 0.153037383 0.024096386
NAVAL ARCHITECTURE AND MARINE ENGINEERING Engineering 0.107313196 0.050125313
MATERIALS SCIENCE Engineering 0.310820285 0.023042836
ENGINEERING MECHANICS PHYSICS AND SCIENCE Engineering 0.183985189 0.006334343
INDUSTRIAL AND MANUFACTURING ENGINEERING Engineering 0.34347321799999997 0.042875544
MATERIALS ENGINEERING AND MATERIALS SCIENCE Engineering 0.292607004 0.027788805
ENVIRONMENTAL ENGINEERING Engineering 0.558548009 0.093588575
INDUSTRIAL PRODUCTION TECHNOLOGIES Engineering 0.75047259 0.028308097
ENGINEERING AND INDUSTRIAL MANAGEMENT Engineering 0.174122505 0.03365166
triple = df[lambda df:(df["Major_category"] == 'Engineering') & ((df["ShareWomen"] > 0.5) | (df["Unemployment_rate"] < 0.051)) ][["Major", "Major_category", "ShareWomen", "Unemployment_rate"]]
triple.head(10)
Major Major_category ShareWomen Unemployment_rate
0 PETROLEUM ENGINEERING Engineering 0.120564 0.018381
2 METALLURGICAL ENGINEERING Engineering 0.153037 0.024096
3 NAVAL ARCHITECTURE AND MARINE ENGINEERING Engineering 0.107313 0.050125
13 MATERIALS SCIENCE Engineering 0.310820 0.023043
14 ENGINEERING MECHANICS PHYSICS AND SCIENCE Engineering 0.183985 0.006334
16 INDUSTRIAL AND MANUFACTURING ENGINEERING Engineering 0.343473 0.042876
23 MATERIALS ENGINEERING AND MATERIALS SCIENCE Engineering 0.292607 0.027789
30 ENVIRONMENTAL ENGINEERING Engineering 0.558548 0.093589
38 INDUSTRIAL PRODUCTION TECHNOLOGIES Engineering 0.750473 0.028308
50 ENGINEERING AND INDUSTRIAL MANAGEMENT Engineering 0.174123 0.033652
%%ruby
require "sqlite3"
db = SQLite3::Database.new "/var/jobs.db"
cols = db.execute("PRAGMA table_info(recent_grads)").map { |xs| xs[1]}
db.execute( "SELECT * FROM recent_grads ;" ).map {  |xs|
   cols.zip(xs).to_h
}.select { |xs| 
    xs["Major_category"] == 'Engineering' && (xs["ShareWomen"] > 0.5 || xs["Unemployment_rate"] < 0.051)
}.map { |xs| 
    ["Major", "Major_category", "ShareWomen", "Unemployment_rate"].map { |x| xs[x]}
}&.slice(0..9).map { |xs| 
    p xs
}
["PETROLEUM ENGINEERING", "Engineering", 0.120564344, 0.018380527]
["METALLURGICAL ENGINEERING", "Engineering", 0.153037383, 0.024096386]
["NAVAL ARCHITECTURE AND MARINE ENGINEERING", "Engineering", 0.107313196, 0.050125313]
["MATERIALS SCIENCE", "Engineering", 0.310820285, 0.023042836]
["ENGINEERING MECHANICS PHYSICS AND SCIENCE", "Engineering", 0.183985189, 0.006334343]
["INDUSTRIAL AND MANUFACTURING ENGINEERING", "Engineering", 0.34347321799999997, 0.042875544]
["MATERIALS ENGINEERING AND MATERIALS SCIENCE", "Engineering", 0.292607004, 0.027788805]
["ENVIRONMENTAL ENGINEERING", "Engineering", 0.558548009, 0.093588575]
["INDUSTRIAL PRODUCTION TECHNOLOGIES", "Engineering", 0.75047259, 0.028308097]
["ENGINEERING AND INDUSTRIAL MANAGEMENT", "Engineering", 0.174122505, 0.03365166]

専攻のカテゴリが”ビジネス”か”芸術”か”ヘルス”で、就職者が20000人を超えているか非雇用率が5.1%以下で、専攻、専攻カテゴリ、就職者数、非就職者を10行知りたい

%sql SELECT Major, Major_category, Employed, Unemployment_rate \
FROM recent_grads \
WHERE (Major_category = 'Business' OR Major_category = 'Arts' OR Major_category = 'Health') \
AND (Employed > 20000 OR Unemployment_rate < 0.051) \
LIMIT 10;
Major Major_category Employed Unemployment_rate
OPERATIONS LOGISTICS AND E-COMMERCE Business 10027 0.047858702999999995
NURSING Health 180903 0.04486272400000001
FINANCE Business 145696 0.060686356
ACCOUNTING Business 165527 0.069749014
MEDICAL TECHNOLOGIES TECHNICIANS Health 13150 0.03698279
MEDICAL ASSISTING SERVICES Health 9168 0.042506527
GENERAL BUSINESS Business 190183 0.072861468
BUSINESS MANAGEMENT AND ADMINISTRATION Business 276234 0.07221834099999999
MARKETING AND MARKETING RESEARCH Business 178862 0.061215064000000007
HUMAN RESOURCES AND PERSONNEL MANAGEMENT Business 20760 0.059569649
def filt(df):
    res = ((df['Major_category']  == 'Business' ) | (df['Major_category']  == 'Arts') | (df['Major_category']  == 'Health')) & \
                ( (df["Employed"] > 20000 )  |  (df["Unemployment_rate"] < 0.051) )
    return res

quad =  df[ filt(df) ][["Major", "Major_category", "Employed", "Unemployment_rate"]]
quad.head(10)
Major Major_category Employed Unemployment_rate
27 OPERATIONS LOGISTICS AND E-COMMERCE Business 10027 0.047859
34 NURSING Health 180903 0.044863
35 FINANCE Business 145696 0.060686
40 ACCOUNTING Business 165527 0.069749
44 MEDICAL TECHNOLOGIES TECHNICIANS Health 13150 0.036983
51 MEDICAL ASSISTING SERVICES Health 9168 0.042507
57 GENERAL BUSINESS Business 190183 0.072861
76 BUSINESS MANAGEMENT AND ADMINISTRATION Business 276234 0.072218
77 MARKETING AND MARKETING RESEARCH Business 178862 0.061215
86 HUMAN RESOURCES AND PERSONNEL MANAGEMENT Business 20760 0.059570
%%ruby
require "sqlite3"
db = SQLite3::Database.new "/var/jobs.db"
cols = db.execute("PRAGMA table_info(recent_grads)").map { |xs| xs[1]}
db.execute( "SELECT * FROM recent_grads ;" ).map {  |xs|
   cols.zip(xs).to_h
}.select { |xs| 
    (xs["Major_category"] == 'Engineering' ||  xs["Major_category"] == 'Arts' || xs["Major_category"] == 'Health' ) && (xs["Employed"] > 20000 || xs["Unemployment_rate"] < 0.051)
}.map { |xs| 
    ["Major", "Major_category", "Employed", "Unemployment_rate"].map { |x| xs[x]}
}.slice(0..9).map { |xs| 
    p xs
}
["PETROLEUM ENGINEERING", "Engineering", 1976, 0.018380527]
["METALLURGICAL ENGINEERING", "Engineering", 648, 0.024096386]
["NAVAL ARCHITECTURE AND MARINE ENGINEERING", "Engineering", 758, 0.050125313]
["CHEMICAL ENGINEERING", "Engineering", 25694, 0.061097712]
["MECHANICAL ENGINEERING", "Engineering", 76442, 0.057342277999999997]
["ELECTRICAL ENGINEERING", "Engineering", 61928, 0.059173845]
["COMPUTER ENGINEERING", "Engineering", 32506, 0.065409275]
["MATERIALS SCIENCE", "Engineering", 3307, 0.023042836]
["ENGINEERING MECHANICS PHYSICS AND SCIENCE", "Engineering", 3608, 0.006334343]
["INDUSTRIAL AND MANUFACTURING ENGINEERING", "Engineering", 15604, 0.042875544]

専攻をアルファベットを降順にソートして10行取り出す

#sql
%sql select Major \
from recent_grads \
order by Major desc \
limit 10;
Major
ZOOLOGY
VISUAL AND PERFORMING ARTS
UNITED STATES HISTORY
TREATMENT THERAPY PROFESSIONS
TRANSPORTATION SCIENCES AND TECHNOLOGIES
THEOLOGY AND RELIGIOUS VOCATIONS
TEACHER EDUCATION: MULTIPLE LEVELS
STUDIO ARTS
STATISTICS AND DECISION SCIENCE
SPECIAL NEEDS EDUCATION
asd = df[["Major"]].sort_values(by=["Major"], ascending=False)
asd.head(10)
Major
168 ZOOLOGY
153 VISUAL AND PERFORMING ARTS
69 UNITED STATES HISTORY
126 TREATMENT THERAPY PROFESSIONS
106 TRANSPORTATION SCIENCES AND TECHNOLOGIES
158 THEOLOGY AND RELIGIOUS VOCATIONS
154 TEACHER EDUCATION: MULTIPLE LEVELS
159 STUDIO ARTS
46 STATISTICS AND DECISION SCIENCE
100 SPECIAL NEEDS EDUCATION
%%ruby
require "sqlite3"
db = SQLite3::Database.new "/var/jobs.db"
cols = db.execute("PRAGMA table_info(recent_grads)").map { |xs| xs[1]}
db.execute( "SELECT * FROM recent_grads ;" ).map {  |xs|
   cols.zip(xs).to_h
}.sort_by { |xs|
    xs["Major"] 
}.reverse
.slice(0..9).map { |xs| p xs["Major"]}
"ZOOLOGY"
"VISUAL AND PERFORMING ARTS"
"UNITED STATES HISTORY"
"TREATMENT THERAPY PROFESSIONS"
"TRANSPORTATION SCIENCES AND TECHNOLOGIES"
"THEOLOGY AND RELIGIOUS VOCATIONS"
"TEACHER EDUCATION: MULTIPLE LEVELS"
"STUDIO ARTS"
"STATISTICS AND DECISION SCIENCE"
"SPECIAL NEEDS EDUCATION"

専攻をアルファベットで昇順、給与で降順で、20行を表示する

#sql
%sql SELECT Major_category, Median, Major \
FROM recent_grads \
ORDER BY Major ASC, Median DESC \
LIMIT 20;
Major_category Median Major
Business 45000 ACCOUNTING
Business 62000 ACTUARIAL SCIENCE
Communications & Journalism 35000 ADVERTISING AND PUBLIC RELATIONS
Engineering 60000 AEROSPACE ENGINEERING
Agriculture & Natural Resources 40000 AGRICULTURAL ECONOMICS
Agriculture & Natural Resources 40000 AGRICULTURE PRODUCTION AND MANAGEMENT
Agriculture & Natural Resources 30000 ANIMAL SCIENCES
Humanities & Liberal Arts 28000 ANTHROPOLOGY AND ARCHEOLOGY
Computers & Mathematics 45000 APPLIED MATHEMATICS
Engineering 54000 ARCHITECTURAL ENGINEERING
Engineering 40000 ARCHITECTURE
Humanities & Liberal Arts 35000 AREA ETHNIC AND CIVILIZATION STUDIES
Education 32100 ART AND MUSIC EDUCATION
Humanities & Liberal Arts 31000 ART HISTORY AND CRITICISM
Physical Sciences 62000 ASTRONOMY AND ASTROPHYSICS
Physical Sciences 35000 ATMOSPHERIC SCIENCES AND METEOROLOGY
Biology & Life Science 37400 BIOCHEMICAL SCIENCES
Engineering 57100 BIOLOGICAL ENGINEERING
Biology & Life Science 33400 BIOLOGY
Engineering 60000 BIOMEDICAL ENGINEERING
tri = df[["Major_category", "Median", "Major"]].sort_values(by=["Major", "Median"], ascending=[True, False])
tri.head(20)
Major_category Median Major
40 Business 45000 ACCOUNTING
6 Business 62000 ACTUARIAL SCIENCE
98 Communications & Journalism 35000 ADVERTISING AND PUBLIC RELATIONS
11 Engineering 60000 AEROSPACE ENGINEERING
71 Agriculture & Natural Resources 40000 AGRICULTURAL ECONOMICS
63 Agriculture & Natural Resources 40000 AGRICULTURE PRODUCTION AND MANAGEMENT
152 Agriculture & Natural Resources 30000 ANIMAL SCIENCES
162 Humanities & Liberal Arts 28000 ANTHROPOLOGY AND ARCHEOLOGY
47 Computers & Mathematics 45000 APPLIED MATHEMATICS
18 Engineering 54000 ARCHITECTURAL ENGINEERING
58 Engineering 40000 ARCHITECTURE
99 Humanities & Liberal Arts 35000 AREA ETHNIC AND CIVILIZATION STUDIES
136 Education 32100 ART AND MUSIC EDUCATION
148 Humanities & Liberal Arts 31000 ART HISTORY AND CRITICISM
7 Physical Sciences 62000 ASTRONOMY AND ASTROPHYSICS
110 Physical Sciences 35000 ATMOSPHERIC SCIENCES AND METEOROLOGY
82 Biology & Life Science 37400 BIOCHEMICAL SCIENCES
15 Engineering 57100 BIOLOGICAL ENGINEERING
123 Biology & Life Science 33400 BIOLOGY
12 Engineering 60000 BIOMEDICAL ENGINEERING
%%ruby
require "sqlite3"
db = SQLite3::Database.new "/var/jobs.db"
cols = db.execute("PRAGMA table_info(recent_grads)").map { |xs| xs[1]}
db.execute( "SELECT * FROM recent_grads ;" ).map {  |xs|
   cols.zip(xs).to_h
}.sort_by { |as, bs|
    [as["Major"],  !as["Median"] ]
}.slice(0..19).map { |xs| p [xs["Major"], xs["Median"]]}
["ACCOUNTING", 45000]
["ACTUARIAL SCIENCE", 62000]
["ADVERTISING AND PUBLIC RELATIONS", 35000]
["AEROSPACE ENGINEERING", 60000]
["AGRICULTURAL ECONOMICS", 40000]
["AGRICULTURE PRODUCTION AND MANAGEMENT", 40000]
["ANIMAL SCIENCES", 30000]
["ANTHROPOLOGY AND ARCHEOLOGY", 28000]
["APPLIED MATHEMATICS", 45000]
["ARCHITECTURAL ENGINEERING", 54000]
["ARCHITECTURE", 40000]
["AREA ETHNIC AND CIVILIZATION STUDIES", 35000]
["ART AND MUSIC EDUCATION", 32100]
["ART HISTORY AND CRITICISM", 31000]
["ASTRONOMY AND ASTROPHYSICS", 62000]
["ATMOSPHERIC SCIENCES AND METEOROLOGY", 35000]
["BIOCHEMICAL SCIENCES", 37400]
["BIOLOGICAL ENGINEERING", 57100]
["BIOLOGY", 33400]
["BIOMEDICAL ENGINEERING", 60000]

FactBook Dataset

factbookと呼ばれるデータセットで、SQLとpandas, Rubyでの記述を示します

# まずSQLITEをメモリにロード
%sql sqlite:////var/factbook.db
# pythonのdataframeにロード
conn = sqlite3.connect("/var/factbook.db")
df = pd.read_sql_query("select * from facts;", conn)

データに入っているbirth_rateの件数をカウント

# sql
%sql SELECT COUNT(birth_rate) FROM facts;
COUNT(birth_rate)
228
# pandas
df["birth_rate"].count()
228
%%ruby
require "sqlite3"
db = SQLite3::Database.new "/var/factbook.db"
cols = db.execute("PRAGMA table_info(facts)").map { |xs| xs[1]}
r = db.execute( "SELECT * FROM facts ;" ).map {  |xs|
   cols.zip(xs).to_h
}.select { |xs| 
    xs["birth_rate"] != nil
}.size
p r
228

birth_rateの合計値を計算

#sql
%sql SELECT SUM(birth_rate) \
FROM facts;
SUM(birth_rate)
4406.909999999998
# pandas
df["birth_rate"].sum()
4406.9099999999999
%%ruby
require "sqlite3"
db = SQLite3::Database.new "/var/factbook.db"
cols = db.execute("PRAGMA table_info(facts)").map { |xs| xs[1]}
r = db.execute( "SELECT * FROM facts ;" ).map {  |xs| cols.zip(xs).to_h
}.select { |xs| xs["birth_rate"] != nil
}.map { |x| x["birth_rate"]
}.reduce { |y,x| y+x }
p r
4406.909999999998

birth_rateの平均値の計算

%sql SELECT AVG(birth_rate) \
FROM facts;
AVG(birth_rate)
19.32855263157894
df["birth_rate"].mean()
19.328552631578948
%%ruby
require "sqlite3"
db = SQLite3::Database.new "/var/factbook.db"
cols = db.execute("PRAGMA table_info(facts)").map { |xs| xs[1]}
all = db.execute( "SELECT * FROM facts ;" ).map {  |xs| cols.zip(xs).to_h
}.select { |xs| xs["birth_rate"] != nil }.map { |x| x["birth_rate"] }
p all.reduce{|y,x| y+x }/all.size
19.32855263157894

出生率をUniq(Distinct)して、人口が2000万を超えるデータの平均値を計算する

# sql
%sql SELECT AVG(DISTINCT birth_rate) \
FROM facts \
WHERE population > 20000000;
AVG(DISTINCT birth_rate)
20.43473684210527
# pandas
df[ df.population > 20000000 ][ ["birth_rate"] ].drop_duplicates().mean()
birth_rate    20.434737
dtype: float64
%%ruby
require 'set'
require "sqlite3"
db = SQLite3::Database.new "/var/factbook.db"
cols = db.execute("PRAGMA table_info(facts)").map { |xs| xs[1]}
all = db.execute( "SELECT * FROM facts ;" ).map {  |xs| cols.zip(xs).to_h
}.select { |xs|  xs["population"] != nil and xs["population"] > 20000000 
}.map { |xs| xs["birth_rate"] }.to_set.to_a
p all.reduce { |y,x| y+x}/all.size
20.43473684210527

より細かい条件指定

ここから、データセットをjobs.dbに戻します
groupbyなど一歩踏み込んだSQLのオペレーションであっても、Pandasや関数型言語でも同様に扱えることを示します

# sqliteをメモリにロード
%sql sqlite:////var/jobs.db
# pythonのdataframeにロード
conn = sqlite3.connect("/var/jobs.db")
df = pd.read_sql_query("select * from recent_grads;", conn)

専攻カテゴリごとにおける、女性率の平均値の計算

#sql
%sql SELECT Major_category, AVG(ShareWomen) \
FROM recent_grads \
GROUP BY Major_category;
Major_category AVG(ShareWomen)
Agriculture & Natural Resources 0.6179384232
Arts 0.56185119575
Biology & Life Science 0.584518475857143
Business 0.4050631853076923
Communications & Journalism 0.64383484025
Computers & Mathematics 0.5127519954545455
Education 0.6749855163125
Engineering 0.2571578951034483
Health 0.6168565694166667
Humanities & Liberal Arts 0.6761934042
Industrial Arts & Consumer Services 0.4493512688571429
Interdisciplinary 0.495397153
Law & Public Policy 0.3359896912
Physical Sciences 0.5087494197
Psychology & Social Work 0.7777631628888888
Social Science 0.5390672957777778
#  pandas
df[ ["Major_category", "ShareWomen"] ].groupby( ["Major_category"]).mean()
ShareWomen
Major_category
Agriculture & Natural Resources 0.617938
Arts 0.561851
Biology & Life Science 0.584518
Business 0.405063
Communications & Journalism 0.643835
Computers & Mathematics 0.512752
Education 0.674986
Engineering 0.257158
Health 0.616857
Humanities & Liberal Arts 0.676193
Industrial Arts & Consumer Services 0.449351
Interdisciplinary 0.495397
Law & Public Policy 0.335990
Physical Sciences 0.508749
Psychology & Social Work 0.777763
Social Science 0.539067
%%ruby
require 'set'
require "sqlite3"
db = SQLite3::Database.new "/var/jobs.db"
cols = db.execute("PRAGMA table_info(recent_grads)").map { |xs| xs[1]}
db.execute( "SELECT * FROM recent_grads ;" ).map {  |xs| cols.zip(xs).to_h
}.map { |xs| ["Major_category", "ShareWomen"].map {|x| xs[x]} 
}.group_by { |xs|  xs[0]}.to_a
.map { |xs| 
    key,vals = xs
    vals = vals.map { |x| x[1]}
    [key, vals.reduce{ |y,x| y+x}/vals.size]
}.map { |xs| 
    p xs
}
["Engineering", 0.2571578951034483]
["Business", 0.4050631853076923]
["Physical Sciences", 0.5087494197]
["Law & Public Policy", 0.3359896912]
["Computers & Mathematics", 0.5127519954545455]
["Agriculture & Natural Resources", 0.6179384232]
["Industrial Arts & Consumer Services", 0.4493512688571429]
["Arts", 0.56185119575]
["Health", 0.6168565694166667]
["Social Science", 0.5390672957777778]
["Biology & Life Science", 0.584518475857143]
["Education", 0.6749855163125]
["Humanities & Liberal Arts", 0.6761934042]
["Psychology & Social Work", 0.7777631628888888]
["Communications & Journalism", 0.64383484025]
["Interdisciplinary", 0.495397153]

専攻カテゴリごとの平均就職者数、全人数の平均値を計算して、"就職者数/平均人数"を算出する

#SQL
%sql SELECT Major_category, AVG(Employed) / AVG(Total) AS share_employed \
FROM recent_grads \
GROUP BY Major_category;
Major_category share_employed
Agriculture & Natural Resources 0.8369862842425075
Arts 0.8067482429367457
Biology & Life Science 0.6671565365683841
Business 0.8359659576036412
Communications & Journalism 0.8422291333949735
Computers & Mathematics 0.7956108197773972
Education 0.858190149321534
Engineering 0.7819666916550562
Health 0.8033741337996244
Humanities & Liberal Arts 0.7626382682895378
Industrial Arts & Consumer Services 0.8226700668430581
Interdisciplinary 0.7987150292778139
Law & Public Policy 0.8083994483744353
Physical Sciences 0.7506564085422069
Psychology & Social Work 0.790724459311403
Social Science 0.7575825619001975
# pandas
df_mean = df[ ["Major_category", "Employed", "Total"] ].groupby( ["Major_category"]).mean()
df_mean[ "Employed/Total" ] = df_mean.apply(lambda x:x[0]/x[1],axis=1)
df_mean
Employed Total Employed/Total
Major_category
Agriculture & Natural Resources 6694.300000 7998.100000 0.836986
Arts 36014.250000 44641.250000 0.806748
Biology & Life Science 21628.357143 32418.714286 0.667157
Business 83749.384615 100182.769231 0.835966
Communications & Journalism 82665.000000 98150.250000 0.842229
Computers & Mathematics 21626.727273 27182.545455 0.795611
Education 29989.937500 34945.562500 0.858190
Engineering 14495.586207 18537.344828 0.781967
Health 31012.250000 38602.500000 0.803374
Humanities & Liberal Arts 36274.533333 47564.533333 0.762638
Industrial Arts & Consumer Services 27006.142857 32827.428571 0.822670
Interdisciplinary 9821.000000 12296.000000 0.798715
Law & Public Policy 28958.000000 35821.400000 0.808399
Physical Sciences 13923.100000 18547.900000 0.750656
Psychology & Social Work 42260.444444 53445.222222 0.790724
Social Science 44610.333333 58885.111111 0.757583
%%ruby
require 'set'
require "sqlite3"
db = SQLite3::Database.new "/var/jobs.db"
cols = db.execute("PRAGMA table_info(recent_grads)").map { |xs| xs[1]}
db.execute( "SELECT * FROM recent_grads ;" ).map {  |xs| cols.zip(xs).to_h
}.map { |xs| ["Major_category", "Employed", "Total"].map {|x| xs[x]} 
}.group_by { |xs|  xs[0]}.to_a
.map { |xs| 
    key,vals = xs
    emps = vals.map { |x| x[1]}
    emps_mean = emps.reduce{ |y,x| y+x}/emps.size 
    totals = vals.map { |x| x[2]}
    totals_mean = totals.reduce{ |y,x| y+x}/totals.size
    [key, emps_mean.to_f/totals_mean]
}.map { |xs| 
    p xs
}
["Engineering", 0.7819496142849436]
["Business", 0.8359685372621828]
["Physical Sciences", 0.7506874427131073]
["Law & Public Policy", 0.8084084754752798]
["Computers & Mathematics", 0.7956000294312413]
["Agriculture & Natural Resources", 0.8369592398099525]
["Industrial Arts & Consumer Services", 0.8226764553568708]
["Arts", 0.8067471606818843]
["Health", 0.8033780633127817]
["Social Science", 0.7575783306444765]
["Biology & Life Science", 0.6671602196310692]
["Education", 0.8581771354986407]
["Humanities & Liberal Arts", 0.7626356067614162]
["Psychology & Social Work", 0.790719431190944]
["Communications & Journalism", 0.8422312786551197]
["Interdisciplinary", 0.7987150292778139]

先ほど計算した、"就職者数/平均人数"が0.8を超えるデータを表示する

#SQL
%sql SELECT Major_category, AVG(Employed) / AVG(Total) AS share_employed \
FROM recent_grads \
GROUP BY Major_category \
    HAVING share_employed > .8;
Major_category share_employed
Agriculture & Natural Resources 0.8369862842425075
Arts 0.8067482429367457
Business 0.8359659576036412
Communications & Journalism 0.8422291333949735
Education 0.858190149321534
Health 0.8033741337996244
Industrial Arts & Consumer Services 0.8226700668430581
Law & Public Policy 0.8083994483744353
# pandas
df_having = df[ ["Major_category", "Employed", "Total"] ].groupby( ["Major_category"]).mean()
df_having[ "Employed/Total" ] = df_mean.apply(lambda x:x[0]/x[1],axis=1)
df_having[ df_having["Employed/Total"] > 0.8 ]
Employed Total Employed/Total
Major_category
Agriculture & Natural Resources 6694.300000 7998.100000 0.836986
Arts 36014.250000 44641.250000 0.806748
Business 83749.384615 100182.769231 0.835966
Communications & Journalism 82665.000000 98150.250000 0.842229
Education 29989.937500 34945.562500 0.858190
Health 31012.250000 38602.500000 0.803374
Industrial Arts & Consumer Services 27006.142857 32827.428571 0.822670
Law & Public Policy 28958.000000 35821.400000 0.808399
%%ruby
require 'set'
require "sqlite3"
db = SQLite3::Database.new "/var/jobs.db"
cols = db.execute("PRAGMA table_info(recent_grads)").map { |xs| xs[1]}
db.execute( "SELECT * FROM recent_grads ;" ).map {  |xs| cols.zip(xs).to_h
}.map { |xs| ["Major_category", "Employed", "Total"].map {|x| xs[x]} 
}.group_by { |xs|  xs[0]}.to_a
.map { |xs| 
    key,vals = xs
    emps = vals.map { |x| x[1]}
    emps_mean = emps.reduce{ |y,x| y+x}/emps.size 
    totals = vals.map { |x| x[2]}
    totals_mean = totals.reduce{ |y,x| y+x}/totals.size
    [key, emps_mean.to_f/totals_mean]
}.select{ |xs| xs[1] > 0.8
}.map { |xs| 
    p xs
}
["Business", 0.8359685372621828]
["Law & Public Policy", 0.8084084754752798]
["Agriculture & Natural Resources", 0.8369592398099525]
["Industrial Arts & Consumer Services", 0.8226764553568708]
["Arts", 0.8067471606818843]
["Health", 0.8033780633127817]
["Education", 0.8581771354986407]
["Communications & Journalism", 0.8422312786551197]

女性率を小数点以下、第二桁まで計算して10行を表示

#SQL
%sql SELECT Major_category, ROUND(ShareWomen, 2) AS rounded_share_women \
FROM recent_grads \
LIMIT 10;
Major_category rounded_share_women
Engineering 0.12
Engineering 0.1
Engineering 0.15
Engineering 0.11
Engineering 0.34
Engineering 0.14
Business 0.54
Physical Sciences 0.44
Engineering 0.14
Engineering 0.44
# pandas
df_round = df[ ["Major_category", "ShareWomen"] ]
df_round["round"]=  df_round.apply(lambda x:"%0.2f"%x[1], axis=1)
df_round.head(10)
Major_category ShareWomen round
0 Engineering 0.120564 0.12
1 Engineering 0.101852 0.10
2 Engineering 0.153037 0.15
3 Engineering 0.107313 0.11
4 Engineering 0.341631 0.34
5 Engineering 0.144967 0.14
6 Business 0.535714 0.54
7 Physical Sciences 0.441356 0.44
8 Engineering 0.139793 0.14
9 Engineering 0.437847 0.44
%%ruby
require 'set'
require "sqlite3"
db = SQLite3::Database.new "/var/jobs.db"
cols = db.execute("PRAGMA table_info(recent_grads)").map { |xs| xs[1]}
db.execute( "SELECT * FROM recent_grads ;" ).map {  |xs| cols.zip(xs).to_h
}.map { |xs| ["Major_category", "ShareWomen"].map {|x| xs[x]} 
}.map { |xs| 
    mc = xs[0]
    sw = sprintf("%0.2f", xs[1])
    [mc, sw]
}.map{ |x| p x}
["Engineering", "0.12"]
["Engineering", "0.10"]
["Engineering", "0.15"]
["Engineering", "0.11"]
["Engineering", "0.34"]
["Engineering", "0.14"]
["Business", "0.54"]
["Physical Sciences", "0.44"]
["Engineering", "0.14"]
["Engineering", "0.44"]
["Engineering", "0.20"]
["Engineering", "0.20"]
["Engineering", "0.12"]
["Engineering", "0.31"]
["Engineering", "0.18"]
["Engineering", "0.32"]
["Engineering", "0.34"]
["Engineering", "0.25"]
["Engineering", "0.35"]
["Law & Public Policy", "0.24"]
["Computers & Mathematics", "0.58"]
["Agriculture & Natural Resources", "0.22"]
["Engineering", "0.33"]
["Engineering", "0.29"]
["Business", "0.28"]
["Engineering", "0.23"]
["Industrial Arts & Consumer Services", "0.34"]
["Business", "0.32"]
["Engineering", "0.19"]
["Law & Public Policy", "0.25"]
["Engineering", "0.56"]
["Engineering", "0.09"]
["Arts", "0.41"]
["Engineering", "0.32"]
["Health", "0.90"]
["Business", "0.36"]
["Social Science", "0.34"]
["Business", "0.25"]
["Engineering", "0.75"]
["Physical Sciences", "0.43"]
["Business", "0.25"]
["Computers & Mathematics", "0.24"]
["Computers & Mathematics", "0.71"]
["Physical Sciences", "0.45"]
["Health", "0.43"]
["Computers & Mathematics", "0.53"]
["Computers & Mathematics", "0.28"]
["Computers & Mathematics", "0.75"]
["Biology & Life Science", "0.52"]
["Physical Sciences", "0.69"]
["Engineering", "0.17"]
["Health", "0.18"]
["Computers & Mathematics", "0.93"]
["Computers & Mathematics", "0.27"]
["Biology & Life Science", "0.85"]
["Education", "0.56"]
["Social Science", "0.63"]
["Business", "0.42"]
["Engineering", "0.32"]
["Business", "0.28"]
["Health", "0.45"]
["Biology & Life Science", "0.08"]
["Business", "0.20"]
["Agriculture & Natural Resources", "0.59"]
["Agriculture & Natural Resources", "0.52"]
["Engineering", "0.00"]
["Engineering", "0.38"]
["Biology & Life Science", "0.64"]
["Social Science", "0.54"]
["Humanities & Liberal Arts", "0.63"]
["Psychology & Social Work", "0.44"]
["Agriculture & Natural Resources", "0.59"]
["Physical Sciences", "0.43"]
["Industrial Arts & Consumer Services", "0.43"]
["Physical Sciences", "0.51"]
["Industrial Arts & Consumer Services", "0.23"]
["Business", "0.58"]
["Business", "0.38"]
["Social Science", "0.49"]
["Social Science", "0.47"]
["Biology & Life Science", "0.62"]
["Computers & Mathematics", "0.18"]
["Biology & Life Science", "0.52"]
["Biology & Life Science", "0.53"]
["Computers & Mathematics", "0.31"]
["Physical Sciences", "0.47"]
["Business", "0.67"]
["Law & Public Policy", "0.59"]
["Health", "0.70"]
["Law & Public Policy", "0.48"]
["Physical Sciences", "0.88"]
["Psychology & Social Work", "0.75"]
["Biology & Life Science", "0.58"]
["Communications & Journalism", "0.31"]
["Law & Public Policy", "0.13"]
["Arts", "0.37"]
["Communications & Journalism", "0.72"]
["Physical Sciences", "0.67"]
["Communications & Journalism", "0.67"]
["Humanities & Liberal Arts", "0.76"]
["Education", "0.37"]
["Biology & Life Science", "0.91"]
["Social Science", "0.62"]
["Health", "0.64"]
["Health", "0.77"]
["Computers & Mathematics", "0.86"]
["Industrial Arts & Consumer Services", "0.32"]
["Agriculture & Natural Resources", "0.56"]
["Biology & Life Science", "0.48"]
["Interdisciplinary", "0.50"]
["Physical Sciences", "0.12"]
["Agriculture & Natural Resources", "0.69"]
["Agriculture & Natural Resources", "0.76"]
["Education", "0.81"]
["Humanities & Liberal Arts", "0.65"]
["Humanities & Liberal Arts", "0.73"]
["Humanities & Liberal Arts", "0.51"]
["Education", "0.73"]
["Health", "0.65"]
["Education", "0.79"]
["Education", "0.45"]
["Health", "0.56"]
["Biology & Life Science", "0.57"]
["Biology & Life Science", "0.60"]
["Social Science", "0.53"]
["Communications & Journalism", "0.88"]
["Health", "0.64"]
["Business", "0.73"]
["Education", "0.58"]
["Humanities & Liberal Arts", "0.76"]
["Education", "0.72"]
["Social Science", "0.72"]
["Biology & Life Science", "0.65"]
["Education", "0.60"]
["Health", "0.77"]
["Humanities & Liberal Arts", "0.42"]
["Education", "0.69"]
["Humanities & Liberal Arts", "0.34"]
["Education", "0.92"]
["Industrial Arts & Consumer Services", "0.68"]
["Humanities & Liberal Arts", "0.70"]
["Arts", "0.69"]
["Social Science", "0.50"]
["Agriculture & Natural Resources", "0.61"]
["Education", "0.42"]
["Psychology & Social Work", "0.78"]
["Arts", "0.44"]
["Education", "0.51"]
["Humanities & Liberal Arts", "0.85"]
["Arts", "0.67"]
["Industrial Arts & Consumer Services", "0.75"]
["Psychology & Social Work", "0.81"]
["Agriculture & Natural Resources", "0.91"]
["Arts", "0.70"]
["Education", "0.80"]
["Psychology & Social Work", "0.91"]
["Psychology & Social Work", "0.90"]
["Humanities & Liberal Arts", "0.75"]
["Humanities & Liberal Arts", "0.73"]
["Arts", "0.58"]
["Industrial Arts & Consumer Services", "0.38"]
["Agriculture & Natural Resources", "0.72"]
["Humanities & Liberal Arts", "0.97"]
["Health", "0.71"]
["Education", "0.97"]
["Humanities & Liberal Arts", "0.69"]
["Arts", "0.63"]
["Humanities & Liberal Arts", "0.67"]
["Biology & Life Science", "0.64"]
["Psychology & Social Work", "0.82"]
["Psychology & Social Work", "0.80"]
["Psychology & Social Work", "0.80"]
["Education", "0.88"]

大学で仕事を得た人の専攻カテゴリごとの平均と、専攻カテゴリごとの平均人数を割ることで、”大学での仕事の人/全人数の平均”を計算し、30%以下のデータを表示する

#SQL
%sql SELECT Major_category, ROUND(AVG(College_jobs) / AVG(Total), 3) AS share_degree_jobs \
FROM recent_grads \
GROUP BY Major_category HAVING share_degree_jobs < .3;
Major_category share_degree_jobs
Agriculture & Natural Resources 0.248
Arts 0.265
Business 0.114
Communications & Journalism 0.22
Humanities & Liberal Arts 0.27
Industrial Arts & Consumer Services 0.249
Law & Public Policy 0.163
Social Science 0.215
# pandas
df_having = df[ ["Major_category", "College_jobs", "Total"] ].groupby( ["Major_category"]).mean()
df_having[ "College_jobs/Total" ] = df_having.apply(lambda x: float("%.3f"%(x[0]/x[1])),axis=1)
df_having = df_having[ df_having["College_jobs/Total"] < 0.3 ]
df_having
College_jobs Total College_jobs/Total
Major_category
Agriculture & Natural Resources 1986.000000 7998.100000 0.248
Arts 11848.125000 44641.250000 0.265
Business 11426.000000 100182.769231 0.114
Communications & Journalism 21639.000000 98150.250000 0.220
Humanities & Liberal Arts 12843.333333 47564.533333 0.270
Industrial Arts & Consumer Services 8171.428571 32827.428571 0.249
Law & Public Policy 5844.200000 35821.400000 0.163
Social Science 12662.222222 58885.111111 0.215
%%ruby
require 'set'
require "sqlite3"
db = SQLite3::Database.new "/var/jobs.db"
cols = db.execute("PRAGMA table_info(recent_grads)").map { |xs| xs[1]}
db.execute( "SELECT * FROM recent_grads ;" ).map {  |xs| cols.zip(xs).to_h
}.map { |xs| ["Major_category", "College_jobs", "Total"].map {|x| xs[x]} 
}.group_by { |xs|  xs[0]}.to_a
.map { |xs| 
    key,vals = xs
    cljs = vals.map { |x| x[1]}
    cljs_mean = cljs.reduce{ |y,x| y+x}/cljs.size 
    totals = vals.map { |x| x[2]}
    totals_mean = totals.reduce{ |y,x| y+x}/totals.size
    [key, cljs_mean.to_f/totals_mean]
}.select{ |xs| xs[1] < 0.3
}.map { |xs|
    key = xs[0]
    val = sprintf("%0.3f", xs[1])
    p [key, val]
}
["Business", "0.114"]
["Law & Public Policy", "0.163"]
["Agriculture & Natural Resources", "0.248"]
["Industrial Arts & Consumer Services", "0.249"]
["Arts", "0.265"]
["Social Science", "0.215"]
["Humanities & Liberal Arts", "0.270"]
["Communications & Journalism", "0.220"]

まとめ

このように、基本的なSQLでできることは、Pandasと関数型で全てできるということができそうだとわかりました   機械学習の文脈は、この中では含んでいませんが、必要なデータを様々な方法で集めて、機械学習のかけるなどはよくするので、前処理の一環でもあります  

いろんな案件に応じて、適切な集計方法を選択するのですが、SQLに入っていっていて、SQLクエリだけで済むのであれば、そのようにすればいいですし、一台のローカルマシンで済むぐらいのExcelファイルならば、Pandasなどが良いでしょう。複数台数に跨って収められているビッグデータに関しては、MapReduceなどを選択すれば良いでしょう。

アドテクはこの程度で済むという経験則があるのですが、これ以上何か集計ツールが増えるには、できるだけ機能やシンタックスを対応させて覚えさせて、情報量があまり増えすぎないようにコントロールしたいです、

ビッグデータを扱った時のでファイルシステムでのパフォーマンスの差

多コアCPU, GPUなどの発展により、一台のマシンで効率的なmap reduceができるようになりました

Map Reduceはそのアルゴルズムから分散性能が十分な際、複数のマシンで分割してタスクを実行できるので、ビッグデータを処理する際には非常に便利なのですが、これはnVidia社のCUDAやAMD Ryzenなどの極めてコア数が多いCPUなどと、高速なIOをもつDiskであるNVMeなども用いることで、同様のシステムを一台のコンピュータで完結させることができるようになりました。

会社ではAWSのElastic Map Reduceを使うほどの分析じゃないけど、コストを安く、定常的にMap Reduceで処理したいというモチベーションの時、自作したMap Reduceフレームワークを使っています  

と言っても、優秀なファイルシステム、高速なディスクIO、マルチコアの強力な並列性に依存しており、まさに巨人の肩の上に立つと言った感じです。

自作したMapReduceシステム概要

Mapperの出力を、オンメモリで保持するのではなく、NVMeに直接かきこむことで、Reducerが特定のキーの粒度で集められたデータを処理することができます。オンメモリでデータを保持しないことで、ディスクの容量とディスクのファイルシステムが許す限り、書き込むことができるので、この限界値を考えなければ、どんなに大きいデータであっても、それなりの時間で処理することができます。(データの特徴や粒度によっては大規模クラスタリングしたHadoopより早いです)  

図1. 作成したコンピュータ一台で動作するMap Reduce(Mapperの出力先はbtrfsでフォーマットしたNVMe)

課題:btrfsの開発がRHELで断念される

マイナビニュースにこのような文章が発表されました  

Btrfsは長年にわたって作業をしているにもかかわらず、依然として技術的な問題が存在しており今後も改善が期待しにくいこと、ZFSはライセンスの関係で取り込むことができないこと、既存の技術と継続しながら新技術を実現するという観点から、当面はXFSをベースとしつつ機能拡張を進めていく「Stratis」プロジェクトを進めることが妥当ではないかと提案している。

しかし、ArchWikには、このようにもあり、btrfsの今後がどうなるかよくわからないです  

Btrfs またの名を “Better FS” — Btrfs は新しいファイルシステムで Sun/OracleZFS に似たパワフルな機能を備えています。機能としては、スナップショット、マルチディスク・ストライピング、ミラーリング (mdadm を使わないソフトウェア RAID)、チェックサム、増分バックアップ、容量の節約だけでなく優れたパフォーマンスも得られる透過圧縮などがあります。現在 Btrfs はメインラインカーネルにマージされており安定していると考えられています [1]。将来的に Btrfs は全てのメジャーなディストリビューションインストーラで標準になる GNU/Linux ファイルシステム として期待されています。

まじか。引っ越しの検討だけはつけておかなきゃ!

Linuxで使えるファイルシステムは多岐に渡りますが、今回の用途では、何が最も良いのでしょうか。

ビッグデータの用途で、ファイルシステムをKVSのように用いたり、LevelDBなどのKVSを用いることがよくあり、この用途では細かい数キロから数メガのファイルを大量に作ります  

そのため、数多くのファイルを持つことができ、パフォーマンスが高いbtrfsを今まで用いてきたのですが、他のファイルシステムも検討することにします  

検討対象のファイルシステムとその実験環境

まず、条件を整えるため、ハードウェアは固定します

  • CPU: RYZEN7 1700X
  • Memory: 36GByte DDR4
  • HDD1: SanDisk SSD UltraII 480GB (OS起動用)
  • HDD2: インテル SSD 600pシリーズ 512GB M.2 PCIEx4(検証用)

OSとそのバージョン

  • ArchLinux 4.12.4-1-ARCH (x86_64)

検証対象ファイルシステム

zfsに関しては、この時、ソフトウェアがこのバージョンのカーネル用にコンパイル & インストールできなかったので、諦めました

実験で読み書きするデータ

  • map reduceでよくあるパターンのファイルを作り出すスクリプトで実験を行います(1〜16プロセスで可変させて、大量のファイルの読み書きを行います)

各種ファイルシステム最大容量、最大ファイル数

ext4はそのファイルシステムの制約で、最初にmkfs.ext4した時にinodeの最大値を決めるのですが、ちょくちょくデフォルトから超えてしまい、分析が途中で破綻してしまい、苦しい目をみることになることが多いです   

他のファイルシステムは、サイズ、ファイル数はどうなっているのでしょうか

format parameters

フォーマットには可能なかぎり、オプションは指定しません   つまりデフォルトで用いたらどういう場合にパフォーマンスが高いかという視点です

mkfs.f2fs: mkfs.f2fs Ver: 1.8.0 (2017-02-03)
mkfs.ext4: mke2fs 1.43.5 (04-Aug-2017)
mkfs.ntfs: mkntfs v2017.3.23 (libntfs-3g)
mkfs.btrfs: mkfs.btrfs, part of btrfs-progs v4.12
mkfs.jfs: mkfs.jfs version 1.1.15, 04-Mar-2011
mkfs.reiserfs: (バージョンコマンドでなぜか何も表示されない。。。)
mkfs.xfs: mkfs.xfs version 4.12.0

パフォーマンステスト

シーケンシャルで大きなファイルの読み書きは今回は考慮しません  

ビッグデータ分析などに使う用途を考えており、数キロから数メガのファイルをとにかく大量に作り、読み書くことを目的とします  

今回、よく使う方法で並列アクセスをするベンチマーク用のスクリプトを作成したので、それでみていくことにします  

$ python3 benchmark.py | tee log.txt

あるあるなパターンを作り出して、ファイルを読み書きをして、どの程度で終わるかを検証します

1K, 10K, 100Kバイトのファイルをそれぞれ、10000, 20000, 40000個作成するのに、どの程度の時間が必要かを測定します  

また、作成したファイルを読み取るのにどの程度必要なのかを測定します

結果

 図1. ext4 format

 図2. btrfs format

 図3. f2fs format

 図4. reiserfs format

 図5. ntfs format(オレンジが場外に飛んだ)

 図6. xfs format

 図7. jfs format

 図8. ext4 format @ ラズベリーパイ class 10 U3(オマケ)

ext4を基準とした時のパフォーマンス

ext4のパフォーマンスを1とした時に、相対的にどの程度の速さ(小さい方がいい)なのかを表示します

 

 図9. ext4からの相対的な速度

f2fs max file number challenge

F2FSがこのわたしが分析でよく用いるデザインパターンにおいて最もパフォーマンスが良いことがわかりました。  

ドキュメントやWikiをさらっても、F2FSの最大ファイルサイズがよくわからないのですが、もともとこのファイルシステムが作られた背景である、フラッシュメモリを効率的に長寿命にもちいたいというモチベーションから考えると、armhf(32bit Armアーキテクチャ)をサポートしたいはずなので、232個までいけるんじゃないでしょうか  

ext4では500万個を超える程度のファイルを作ると、もう、デフォルトではinodeが埋まってしまい、書き込めないです

直近では4000万個ほどの、ファイルを用いる必要があり、とりあえずこの個数までファイルを作れれば良さそうです

# ファイルを作りまくって問題がない限界を確認する
count = 0
for i in range(40000000):
  count += 1
  if count % 10000 == 0:
    print('now iter', count)
  try: 
    open('targetssd/{}'.format(count), 'w' )
  except Exception as e:
    print( e )
    print( 'max file number is', count )
    break

以下のコマンドを実行したところ、問題なく完了することができました

$ time python3 make_disaster.py

参考:オフィシャルサイトでのベンチマーク

公式サイトで様々な角度からのベンチマークが行われました nvme, randiskやssdなどで様々な角度からベンチマークが行われています  

やはりというかなんというか、新しいフォーマットで新しい規格のほど、スコアが良いように見えます

まとめ

全く注目していなかったF2FSが思いの外良いパフォーマンスを出したので、アドホックな集計や分析の選択肢に加えるのはありだと思います。  

今後もbtrfsの開発は続いて行くと考えられますが、RHELの影響でどう転ぶかわからないので、長期的に使うシステムには安全策としてXFSなど古くからあって数多くのファイル数を扱え、パフォーマンスが良い、フォーマットも良いと思います  

使用したコード

Google Cloud DataFlowをKotlinで書く

Google Cloud DataFlowをKotlinで書く

Kotlinで書くモチベーション

以前も書きましたが、Kotlinが標準で採用しているラムダ式を用いたメソッドチェーンと、GCP Cloud DataFlow(OSSの名前はApache Beam)の作りが類似しており、ローカルでのKotlinで書いた集計コードの発想を大きく書き換えることなく、GCP DataFlowで利用できるからです。   

また、シンタックスもKotlinはJavaに比べると整理されており、データ分析という視点において、見通しが良いと感じるからでもあります。  

Google Cloud DataFlowとは

GCPで提供されているクラウド集計プラットフォームで、Apache Beamという名前でOSSで公開されています。

Map Reduceの発展系のような印象を受ける作りになっており、集計するためのコードの書きやすさや、特定の集計部分だけ、集中して記述してかけるなどのメリットがあり、大規模なビッグデータ処理に向いていると感じます。  

  • Googleが考える、データの取り扱い

  • この処理に則って、様々な分析プラットフォームのクラウドサービスが展開されている

  • AmazonのElastic Map Reduceと競合する製品だと思われますが、より、柔軟で、汎用性が高いように見えます

AWS Elastic Map Reduceとの違い

elastic map reduceイメージ

Google Cloud DataFlow

Amazon Elastic Map Reduceに比べて、多段にした処理を行うことができることが、最大のメリットだと感じます(複雑な集計が一気通貫でできる)

Google Cloud DataFlow特徴

  • Javaなどのプログラミング言語で分析・集計処理を書けるので、非構造化データに対応しやすい
  • GCPのデータストレージ(AWSのS3のようなもの)に保存するのでコストが安い
  • 関数型言語における、ラムダ式を用いたデータ構造の変換と操作が類似しており、データ集計に関する知識が利用できる  
  • SQLでないので、プログラムをしらないと集計できない(デメリット)

Requirements

Google Cloud SDKのインストールとセットアップ

ローカルのLinuxマシンからGCPに命令を送るのに、Google Cloud SDKのgcloudというツールをインストールしておく必要があります  

この例ではLinuxを対象としています

$ wget https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-sdk-158.0.0-linux-x86_64.tar.gz
$ tar zxvf google-cloud-sdk-158.0.0-linux-x86_64.tar.gz
$ ./google-cloud-sdk/install.sh 
Welcome to the Google Cloud SDK!
...
Do you want to help improve the Google Cloud SDK (Y/n)? <<- Yと入力
Do you want to continue (Y/n)?  <<- Yと入力

bashrcのリロード

$ source ~/.bashrc

gcloud initして、gcloudの認証を通します

$ gcloud init # gcloundのセットアップ
 [1] alicedatafoundation
 [2] machine-learning-173502
 [3] Create a new project
 Please enter numeric choice or text value (must exactly match list 
item): 2 # 使っているプロジェクトを選択

asia-northeast1-aのリージョンを設定

If you do not specify a zone via a command line flag while working 
with Compute Engine resources, the default is assumed.
 [1] asia-east1-c
 [2] asia-east1-b
 [3] asia-east1-a
 [4] asia-northeast1-c
...

クレデンシャル(秘密鍵などが記述されたjsonファイル)の環境設定の発行と設定   Google Apiで発行してもらう

クレデンシャルを環境変数に通します

必要ならば、bashrcに追加してください。

$ export GOOGLE_APPLICATION_CREDENTIALS=$HOME/gcp.json

GCP側のセットアップ操作

1. Projectの作成

1. Projectの作成

2. CloudStrageの作成

2. CloudStrageの作成

3. Keyの作成

3. Keyの作成

(ここで作成したjsonファイルはダウンロードして環境変数にセットしておく必要があります)

4. Codeを書く

4. Codeを書く

任意の集計の角度を記述します

Kotlinで記述されたサンプルのコンパイル&実行

私が作成したKotlinのサンプルでの実行例です。   シェイクスピアの小説などの文章から、何の文字が何回出現したかをカウントするプログラムです
git clone(ダウンロード)

$ git clone https://github.com/GINK03/gcp-dataflow-kotlin-java-mix

コンパイル 

$ mvn package

クリーン(明示的にtargetなどのバイナリを消したい場合)

$ mvn clean

GCPに接続して実行

$ mvn exec:java

これを実行すると、ご自身のGCPのDataStrageに結果が出力されているのを確認できるかと思います

今回のKotlinのDataFlowのプログラムの説明

多くの例では、Word(単語)カウントを行なっていますが、今回の例では、Char(文字)のカウントを行います  

Googleが無償で公開しているシェイクスピアのテキストを全て文字に分解して、group byを行いどの文字が何回出現しているのか、カウントします  

このプログラムを処理ブロック(一つのブロックはクラスの粒度で定義されている)で図示すると、このようになります

クラスの定義はこのように行いました  

KotlinProc1クラス

public class KotlinProc1 : DoFn<String, String>() {
  override fun processElement(c : DoFn<String,String>.ProcessContext) {
    val elem = c.element()
    elem.toList().map { 
      val char = it.toString()
      c.output(char)
    }
  }
}

KotlinProc2クラス

public class KotlinProc2 : DoFn<String, KV<String, String>>() {
  override fun processElement(c : DoFn<String, KV<String,String>>.ProcessContext) {
    val char = c.element()
    c.output(KV.of(char, "1"))
  }
}

GroupByKey

 GroupByKey.create<String,String>() 

KotlinProc3クラス

public class KotlinProc3 : DoFn<KV<String, Iterable<String>>, String>() {
  override fun processElement(c : DoFn<KV<String, Iterable<String>>, String>.ProcessContext) {
    val key = c.element().getKey()
    val iter = c.element().getValue()
    val list = mutableListOf<String>()
    iter.forEach {  list.add(it.toString()) }
    c.output(key.toString() + ": " + list.size.toString()); 
  }
}

GCP DataFlowを用いず生のKotlinを用いて同等の処理を書く

類似していることが確認できます

import java.io.*
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
import java.io.BufferedReader
import java.io.InputStream
import java.io.File
import java.util.stream.Collectors
import java.util.stream.*
fun main( args : Array<String> ) {
  val dataframe = mutableListOf<String>() 
  Files.newDirectoryStream(Paths.get("contents/"), "*").map { name -> //データをオンメモリにロード
    val inputStream = File(name.toString()).inputStream()
    inputStream.bufferedReader().useLines { xs -> xs.forEach { dataframe.add(it)} }
  }

  dataframe.map {
    it.toList().map { // DataFlowのKotlinProc1に該当
      it.toString()
    }
  }.flatten().map { // DataFlowのKotlinProc2に該当
    Pair(it, "1")
  }.groupBy { // DataFlowのGroupByKeyに該当
    it.first
  }.toList().map { // DataFlowのKotlinProc3に該当
    val (key, arr) = it
    println("$key : ${arr.size}")
  }
}

まとめ

ローカルで分析すると一台のマシンで収まるメモリの量しか取り扱うことができないので、ビッグデータになると、必然的にスケーラブルなGCP Cloud DataFlowのようなサービスを利用する必要があります。
このように、ローカルでの分析の方法とビッグデータの分析の方法が似ていると、発想を切り替える必要がなかったり、一人でスモールなデータからビッグデータまで低いコストで分析できるので、経験則的に生産性を高めることができます。

AWS EMR Hadoop Streaming Examples

AWS EMR Hadoop Streaming Examples

GCPのDataFlowの方が、AWS EMRより個人的にはモダンな印象があるのですが、業務でAWSで非構造化データの大規模な分析が必要になる可能性があり、Hadoop Streamingの仕組みを軽くおさらいして、いくつかの言語で動かしました  

Map Reduceの仕組み自体は古いのですが、論文にもなっており、この構成でほとんどの集計が可能みたいなことを言っており、パワーを感じます[1]  

 
図1. イメージしているところのMapReduceの構成

AWSのElastic Map Reduceを利用してHadoop Streamingで任意の言語で、ビッグデータを処理する方法を説明します  

任意の言語で処理をつなげることができるため、AWS EMR(Hadoop Streaming)の仕組みさえ理解していれば、好きな言語で処理が可能です

ここでは、以下の言語におけるもっとも簡単な集計である、全てのドキュメントになんの語が何回出現するか、カウントするプログラムを例示します

  1. Python2
  2. Python3
  3. Ruby ( 2.4 )
  4. Go ( 1.8 )

S3にデータをおく

S3に分析する対象の非構造化データをフォルダを作っていれておきます   生データか、GZで圧縮されている必要があります
例えば、一つのバケットで処理する場合には、このようなフォルダ構成をとります

 

フォルダに必要なデータを適切に配置して、awscliのコンフィグレーションを通した状態で、次のコマンドでHadoop Streamingを実行します

$ aws emr add-steps --cluster-id j-{$YOUR_EMR_ID} --steps file://./WordCount_step.json --region ap-northeast-1

ここで、引数に指定されている JSONファイルはこのような記述になっています  

[
  {
     "Name": "WordCount",
     "Type": "STREAMING",
     "ActionOnFailure": "CONTINUE",
     "Args": [
         "-files",
         "s3://{$YOUR_S3}/wordcount-code/mapper,s3://{$YOUR_S3}/wordcount-code/reducer",
         "-mapper",
         "mapper",
         "-reducer",
         "reducer",
         "-input",
         "s3://{$YOUR_S3}/wordcount-dataset/",
         "-output",
         "s3://{$YOUR_S3}/wordcount-result"]
  }
]

(注:これはGoでバイナリを指定しているので、pythonrubyの場合、適切にfilesの引数を変えてください)

Python2でのワードカウント

各種インターネット上の文献では、Python2でワードカウントしていることが多いです  

AWSの解説サイトで紹介されていた方法で、集計してみます  

Reducerを特別な予約関数を割り当てることで省略できますが、ボトムアップ的に学ぶには、実装してしまった方が良いだろうという判断をしました  

mapper

#!/usr/bin/python
import sys
import re
 
def main(argv):
  pattern = re.compile("[a-zA-Z][a-zA-Z0-9]*")
  for line in sys.stdin:
    line = line.strip()
    for word in pattern.findall(line):
      print( word.lower() + "\t" + "1" )
 
if __name__ == "__main__":
    main(sys.argv)

reducer

#!/usr/bin/python
import sys
import re

def main(argv):
  term_freq = {}
  for line in sys.stdin:
    line = line.strip()
    ents = line.split('\t')
    term, freq = ents
    freq = int(freq)
    try:
      if term_freq.get(term) == None:
        term_freq[term] = 0
      term_freq[term] += 1
    except Exception as e:
      print(e)
  for term, freq in term_freq.items():
    print( term, freq )

if __name__ == "__main__":
    main(sys.argv)

実行命令

$ aws emr add-steps --cluster-id j-{$YOUR_CLUSTER} --steps file://./WordCount_step.json --region ap-northeast-1

Python3でのワードカウント

Python3をインストールするため、Hadoopクラスタにログインする必要があります

AWSのデフォルトのセキュリティグループについては、sshでログインできないので、セキュリティグループを解放します

$ ssh -i {$KEY} hadoop@{$IPADDR}

Python35のインストール(必要に応じでバージョンを切り替えてください)

$ sudo yum install python35
$ sudo yum install python35-devel
$ sudo yum install python35-pip

mapper

#!/usr/bin/python3
import sys
import re
 
def main(argv):
  for line in sys.stdin:
    line = line.strip()
    for term in line.split():
      print('{}\t1'.format(term) )
if __name__ == "__main__":
    main(sys.argv)

reducer

#!/usr/bin/python3
import sys
import re

def main(argv):
  term_freq = {}
  for line in sys.stdin:
    line = line.strip()
    ents = line.split('\t')
    term, freq = ents
    freq = int(freq)
    try:
      if term_freq.get(term) == None:
        term_freq[term] = 0
      term_freq[term] += 1
    except Exception as e:
      print(e)
  for term, freq in term_freq.items():
    print( term, freq )

if __name__ == "__main__":
    main(sys.argv)

Rubyでのワードカウント

世の中には、Rubyestが多く、PythonでなくてRubyでやりたいという人も多いです  

Rubyは悪くない選択肢でもあるので、使い方を説明します

AWS EMRのノードにインストールされているバージョンは古く、アップデートします

rpmここのサイトからダウンロードしました

$ sudo yum remove ruby
$ sudo yum install ./ruby-2.4.1-1.el6.x86_64.rpm

mapper

#!/usr/bin/ruby
STDIN.each_line { |x|
  x.split(" ").map { |x| 
    puts sprintf("%s\t1", x.downcase)
  }
}

reducer

#!/usr/bin/ruby
term_freq = {}
STDIN.each_line { |x| 
  term, freq = x.split("\t")
  if term_freq[term] == nil  then 
    term_freq[term] = 0
  end
  term_freq[term] += 1
}

term_freq.each { |term, freq| 
  puts sprintf("%s %d", term, freq)
}

全体的にみて、PythonよりRubyの方がスッキリかけますね  

好みの問題である気がしてて、好きな言語で良いと思います  

Goでのワードカウント

私がstreamingにおいて、スクリプト言語より、速度やメモリリソースや、シングルバイナリになるという点で、良いと感じているのが、Go言語です  

Go言語はそのシンプルなシンタックスでありながら、ランタイムに依存することなく、実行可能なバイナリを出力が可能です

つまり、AWS EMRのノードに対して、なんらかログインしてソフトウェアをインストールやセットアップをする必要がありません

mapper

package main

import (
        "bufio"
        "fmt"
        "os"
        "strings"
)

func main() {
        scanner := bufio.NewScanner(os.Stdin)
        for scanner.Scan() {
                line := scanner.Text()
                terms := strings.Split(line, " ")
                for _, term := range terms {
                        out := fmt.Sprintf("%s\t1", term)
                        fmt.Println(out)
                }
        }
}

reducer

package main

import (
        "bufio"
        "fmt"
        "os"
        "strings"
)

func main() {
        dic := map[string]int{}
        scanner := bufio.NewScanner(os.Stdin)
        for scanner.Scan() {
                line := scanner.Text()
                ents := strings.Split(line, "\t")
                term := ents[0]
                _ = ents[1]

                _, ok := dic[term]
                if ok == false {
                        dic[term] = 0
                }
                dic[term] += 1
        }

        for term, freq := range dic {
                out := fmt.Sprintf("%s %d", term, freq)
                fmt.Println(out)
        }
}

コード

github.com

まとめ

いくつかの言語において、実際に、AWS EMRを動かして集計しました

PythonRubyおける使い方は、数年前からあまり進化を感じられませんが、Go言語という選択が可能になったことで、goroutineによる効率的な並列処理や、実行可能なバイナリであるというメモリが少なく済んだりするメリットがあるので、今までの集計方では集められなかった角度のデータが取得できそうで、未来が見えました

参考文献

[1] MapReduce: Simplified Data Processing on Large Clusters