現在位置: ホーム / ゲストブログ / [第2回] 旬のオープンソースソフトウェア~超高速インメモリ処理のSparkって何?どうやって使うの?後編~

[第2回] 旬のオープンソースソフトウェア~超高速インメモリ処理のSparkって何?どうやって使うの?後編~

今回のゲストブログは、日本ヒューレット・パッカードが公式に認定するオープンソース・Linuxテクノロジーエバンジェリストで、Hadoopの技術者認定資格を保有する古賀政純さんです。旬のオープンソースソフトウェアについて連載記事でお届けしています。 連載第2回は、前回に引き続きビッグデータ向けソフトウェアのSpark(スパーク)の後編です。Sparkのインストール手順、基本的な使用法、Scalaシェルの使い方、そして、複数ノードから構成されるSparkクラスターの構築、シミュレーションの実行手順も具体的に紹介します。(2015年10月1日)


 

■Sparkの構築、利用

Sparkを1台のサーバーで利用する

前回、Sparkの概要をご紹介しましたので、今回からは、Sparkをインストールして利用する具体的な手順をご紹介します。まずは、話を単純にするため、サーバー1台だけにSparkをインストールし、実行してみます。本来Sparkは、複数ノードを使ったクラスターを構成しますが、サーバー1台だけでも稼働させることが可能です。また、Sparkは、インメモリ処理を得意とするソフトウェアですので、あまりメモリ容量が多くないサーバーを利用するのは避けましょう。Sparkを少し触ってみたいという場合は、数GB程度あれば動作しますが、実際に巨大なデータをインメモリ処理させたい場合は、メモリ容量の不足に注意してください。今回、筆者のワーカーノードのサーバー環境のメモリ容量は、決して大きいとは言えませんが、32ギガバイトを搭載しています。筆者の環境では、OSとしてCentOS 6.xとCentOS 7.1上でSpark 1.4.1が稼働できることを確認しています。以下では、CentOSにSparkをインストールする手順を掲載します。まず、Sparkを動かすには、Javaが必要ですので、事前にインストールしておきます。

 

# yum install -y java-1.7.0-openjdk

 

Sparkは、root権限がない一般ユーザーで使用することができます。Sparkを利用できる一般ユーザーを作っておき、Sparkのインストールも利用も、この一般ユーザー行うことができます。ここでは、一般ユーザーsparkを作成しておきます。一般ユーザーsparkの作成は、root権限で行います。

 

# useradd -m spark && echo password | passwd spark --stdin

 

一般ユーザーspark(パスワードはpassword)が作成できました。次に、Sparkの最新版のtarballを以下のURLから入手します。

 

Sparkの入手先:http://spark.apache.org/downloads.html

 

2015年8月中旬時点で、Sparkは、1.4.1が最新版ですので、「spark-1.4.1-bin-hadoop2.6.tgz」を入手します。入手した「spark-1.4.1-bin-hadoop2.6.tgz」をユーザーsparkのホームディレクトリにコピーし、tarコマンドで展開します。root権限のコマンドプロンプトは、「#」、一般ユーザーのプロンプトは、「$」で表すことにします。

 

# cp /root/spark-1.4.1-bin-hadoop2.6.tgz /home/spark/
# chown spark:spark /home/spark/spark-1.4.1-bin-hadoop2.6.tgz
# su - spark
$ whoami
spark
$ pwd
/home/spark
$ tar xzvf spark-1.4.1-bin-hadoop2.6.tgz
$ ls -F
spark-1.4.1-bin-hadoop2.6/  spark-1.4.1-bin-hadoop2.6.tgz

 

Sparkのtarballを展開すると、spark-1.4.1-bin-hadoop2.6ディレクトリが生成され、そのディレクトリ配下に起動スクリプトなどが含まれていますので、sparkユーザーの.bash_profileにPATHを設定します。

 

$ vi $HOME/.bash_profile
...
export SPARK_HOME=$HOME/spark-1.4.1-bin-hadoop2.6
PATH=$PATH:$HOME/bin:$SPARK_HOME/bin:$SPARK_HOME/sbin
export PATH
...

$ source $HOME/.bash_profile

 

Sparkで、テキストファイルの行数をカウントしてみる

SparkのScala言語を使って、ローカルの内蔵ディスクに保管されているテキストファイルの行数を数えてみます。事前に対象となるテキストファイルを/home/sparkディレクトリに用意しておきます。Linuxをインストールすると/usr/share/docディレクトリ以下に、アプリケーション毎にディレクトリが分かれており、様々なドキュメント類が保存されていますので、README等のテキストファイルをサンプルとして、一般ユーザーsparkのホームディレクトリにコピーしておきます。

$ cp /usr/share/doc/glibc-*/README $HOME/


まずは、Sparkを使わずに、READMEファイルの行数を数えてみます。

$ pwd
/home/spark
$ wc -l README
80 README


このREADMEファイルは、80行であることがわかります。次に、Scalaを使って、ファイルの行数を数えてみます。Sparkでは、Scalaシェルを持っており、対話的な操作が可能です。Scalaシェルは、spark-shellコマンドを実行します。

$ which spark-shell
~/spark-1.4.1-bin-hadoop2.6/bin/spark-shell

$ spark-shell --master local[8]
...
scala>


ここで、オプションとして「--master local[N]」を指定すると、スレッド数Nで実行されます。上記では、8スレッドで実行するという意味になります。今回は、1ノードでSparkを実行していますので、--masterの後にはlocal[N]を指定していますが、クラスター構成では、指定方法が異なります。Scalaシェルが起動すると、プロンプトが「scala>」になります。これで、対話型の操作が可能になります。Scalaシェルで、先程のREADMEファイルの行数をカウントするには、以下のようにsc.textFile("ファイルのパス")を指定し、変数に格納します。

scala> val line0001 = sc.textFile("/home/spark/README")
...


変数line0001を使って、行数をカウントします。

scala> line0001.count()
...
res0: Long = 80
...
scala>

 
以上で、sc.textFile()で読み込んだ/home/spark/READMEファイルは、80行であることがわかります。Scalaシェルを終了するには、exitを入力します。

scala> exit
...
$


上記のScalaシェルで入力した内容をファイルに保存しておき、バッチ処理することもできます。Scala言語で作成したファイルをロードするには、spark-shellコマンドに「-i」オプションを指定します。以下は、/home/spark/READMEファイルの行数をカウントするScalaプログラム「linecount.scala」を作成し、spark-shellで実行する例です。

$ vi linecount.scala
val line0001 = sc.textFile("/home/spark/README")
line0001.count()
exit(0)

$ spark-shell --master local[8] -i ./linecount.scala 2>/dev/null
...
res0: Long = 80
...
$

 

Sparkクラスターを構築する

Sparkは、スケールアウト型システムにおいて威力を発揮します。ここでは、複数のサーバーから構成されるSparkクラスターを構築・利用する基本的な方法を紹介します。実際には、必要に応じて、様々なパラメータを調整する必要がありますが、今回は、知っておくべき必要最低限のパラメータで、Sparkクラスターを構築します。まず、マスターノード1台と、アプリケーションを分散処理する複数台のワーカーノードを用意します。マスターノードの/etc/hostsファイルと、全ワーカーノードの/etc/hostsファイルに、Sparkクラスターを構成するノードのホスト名をすべて記述します。今回、マスターノードのホスト名をh01、ワーカーノードのホスト名を、h02~h08で構成します。

# vi /etc/hosts
172.16.1.1              h01.jpn.linux.hp.com h01
172.16.1.2              h02.jpn.linux.hp.com h02
...
...
172.16.1.8              h08.jpn.linux.hp.com h08


マスターノード、ワーカーノードともに、1台のサーバーにSparkをインストールしたときと全く同じ方法で、Sparkをインストールします。マスターノードと全ワーカーノードでユーザーsparkを作成する必要があります。全ノードでユーザーを作成したら、.bash_profileファイルも全ワーカーノードのsparkユーザーのホームディレクトリに必要です。次に、マスターノードにおいて、spark-defaults.confファイルのパラメータを設定します。

$ pwd
/home/spark
$ cd ./spark-1.4.1-bin-hadoop2.6/conf/
$ cp spark-defaults.conf.template spark-defaults.conf
$ vi spark-defaults.conf
...
spark.serializer                 org.apache.spark.serializer.KryoSerializer
spark.driver.memory              4g
spark.executor.memory            8g
...


上記は、Sparkのdriverに割り当てるメモリ容量を4ギガバイト、executorに割り当てるメモリを8ギガバイトに設定している例です。標準では、512メガバイトと非常に小さい値が設定されていますので、大容量メモリを搭載している場合は、このようにdriverとexecutorに割り当てるメモリ容量を明示的に指定するなどの調整が必要になります。ただし、実際には、driverやexecutorに割り当てるメモリ容量だけでなく、executorの数や、executorが利用するCPUコア数などの設定も合わせて検討する必要があります。その他のパラメータについては、Apache Sparkの以下のURLに掲載されています。

 

Apache SparkのWebサイトで公開されている「Spark Configuration」:https://spark.apache.org/docs/1.4.1/configuration.html

 

編集した設定ファイルは、全てのノードで同じものにしておきます。いよいよ、マスターノードのサービスを起動します。Sparkでは、マスターノードの起動スクリプトstart-master.shが用意されていますので、マスターノードで以下のように実行します。

$ which start-master.sh
~/spark-1.4.1-bin-hadoop2.6/sbin/start-master.sh

$ start-master.sh -h h01 -p 7077


start-master.shスクリプトに、-hオプションを指定し、マスターノードのホスト名を付与します。また、-pオプションで、ポート番号を付与します。マスターノードで、Javaのプロセス「Master」が起動できているかを確認します。

$ jps |grep Master
16833 Master


次に、ワーカーノードのサービスを起動します。マスターノード同様に、ワーカーノードのサービス起動するstart-slave.shスクリプトが用意されていますので、全ワーカーノードでstart-slave.shスクリプトを実行します。以下のように、マスターノードからssh接続を行い、start-slave.shスクリプトを起動しても構いません。以下は、ワーカーノードh02からh08までの合計7台に対して、マスターノードからssh接続で、start-slave.shを実行している様子です。

$ ssh h02 "source $HOME/.bash_profile; start-slave.sh spark://h01:7077"
$ ssh h03 "source $HOME/.bash_profile; start-slave.sh spark://h01:7077"
...
...
$ ssh h08 "source $HOME/.bash_profile; start-slave.sh spark://h01:7077"


全てのワーカーノードにおいてJavaのプロセス「Worker」が起動できているかを確認します。

$ ssh h02 "jps |grep Worker"
23830 Worker

...

$ ssh h08 "jps |grep Worker"
30719 Worker


以上で、マスターノード1台とワーカーノード7台によるSparkクラスターが構築できました。Sparkクラスターが構築できたら、Webブラウザーから状態を確認してみます。Sparkクラスターの状態を確認するには、URLとして、「http://マスターノードのホスト名:8080」でアクセスします。

Sparkクラスターの状態をWebブラウザーで確認する

図9. Sparkクラスターの状態をWebブラウザーで確認する
 

Sparkでモンテカルロ法を使ったシミュレーションを行う

7台のサーバーで構成されたSparkクラスターができましたので、早速、クラスター上でインメモリ処理を行ってみましょう。インメモリ処理の例として、繰り返し処理を行うモンテカルロ法によるシミュレーションをとりあげます。モンテカルロ法は、乱数を使って近似解を求めるシミュレーション技法であり、機械学習、統計学でも利用されています。このモンテカルロ法を用いて、円周率πの近似値を求めることができます。モンテカルロ法では、正方形の中に一様に砂粒をまいて、円の内側に落ちた砂粒の数と、円の外側に落ちた砂粒の数をそれぞれ数え、正方形の面積に対して、円の中に入った砂粒の数の割合を算出することで円周率の近似値を求めることができます。下図のように、一辺が2rの正方形に内接する半径rの円を用意すると、正方形の面積は2r×2r、円の面積はπ×r×rです。円の面積π×r×rと正方形の面積2r×2rの比をとると、π/4になります。両辺を4倍するとπ=4×面積比となります。面積の比は落下させる砂粒の数を大きくとれば、より精度の高い近似値を得ることができます。モンテカルロ法で円周率を求めるpythonプログラムは、Sparkのtarballの中に収録されています。また、以下のURLから入手することもできます。

モンテカルロ法で円周率を求めるpythonプログラムpi.py:https://github.com/apache/spark/blob/master/examples/src/main/python/pi.py

上記のpi.pyでは、乱数の発生、円の面積、砂粒が円内に落下したかどうかの判定が関数f()で定義されており、各ワーカーノードで並列処理されています。pi.pyの中では、並列処理を行うparallelizeというメソッドが使われています。このメソッドは、並列処理を行う場合に有用です。parallelizeメソッドについては、以下のSparkのプログラミングガイドをご覧ください。

「Spark Programming Guide」のURL:http://spark.apache.org/docs/latest/programming-guide.html

Pythonに不慣れと言う方のために、モンテカルロ法で円周率を求めるプログラムのC言語版とFORTRAN版もあわせて下図に掲載しておきます。ただし、図に掲載したC言語版とFORTRAN版は、Sparkに対応していませんので注意してください。一方、Python版は、Sparkのtarballに収録されているものと同様のものであり、Sparkクラスター上で稼働させることができます。

モンテカルロ法で円周率の近似値を得る

図10. モンテカルロ法で円周率の近似値を得る

それでは、実際にSparkクラスターにpi.pyを投入し、インメモリ処理により円周率の近似値を計算してみます。PythonプログラムをSparkに投入するには、spark-submitコマンドを使用します。

$ pwd
/home/spark

$ spark-submit --master spark://h01:7077 --num-executors 7 --executor-cores 16 \
$SPARK_HOME/examples/src/main/python/pi.py 100 2>/dev/null  ←実際には1行で入力
Pi is roughly 3.141061


「--master」オプションの後に「spark://マスターノードのホスト名:ポート番号」を付与します。「--num-executors」で、executorの数を指定できます。また、「--executor-cores」でexecutorに割り当てるCPUコア数を指定します。これもアプリケーションや環境によって調整が必要なパラメータです。
 

Sparkクラスターで、HadoopのHDFSを利用する

Sparkクラスターは、Hadoopの分散ファイルシステム「HDFS」を利用することができます。Sparkのインメモリ処理を行うための分析対象データをHDFSに保管しておき、アプリケーションから呼び出します。以下は、HDFSに保管されたテキストファイル「README」をロードし、そのREADMEに含まれる単語ごとの出現回数をカウントする例です。アプリケーションworkdcount.pyは、単語の出現回数をカウントするpythonプログラムです。以下では、wordcount.pyの入力ファイルとして、HDFSに格納されているREADMEを指定しています。

 

$ spark-submit --master spark://h01:7077 --num-executors 7 --executor-cores 16 \
 ./wordcount.py hdfs://h01:8020/user/hadoop/REAMDE > /tmp/result.txt ←実際には1行で入力

 

単語数をカウントするサンプルプログラムは、Sparkのtarballの中のexamples/src/main/pythonディレクトリにwordcount.pyとして収録されています。

 

[注意]

Apacheコミュニティが提供するHadoop 2.7.1のNameNodeのポートは、標準で9000番を使用しますので、以下のようにポート番号の指定に注意してください。

$ spark-submit --master spark://h01:7077 --num-executors 7 --executor-cores 16 \
./wordcount.py hdfs://h01:9000/user/hadoop/REAMDE > /tmp/result.txt ←実際には1行で入力 


上記のように、分析対象のファイルサイズが小さいと、あまりSparkの効果がわかりませんが、ファイルサイズが巨大な場合や、膨大な数のファイルをロードすると、HadoopのHDFSとMapReduceでの処理時間と、HDFSとSparkを組み合わせた処理時間に顕著な差がみられますので、是非試してみて下さい。ちなみに、筆者の環境では、総容量が約300メガバイトのテキストファイル群をHDFSに保存し、単語の出現回数を求めると、HDFSとMapReduceを使って30分以上かかっていた処理が、HDFSとSparkの組み合わせでは、約3分半に短縮されました。

 


 

連載2回にわたり、Sparkの概要と一部の機能を簡単にご紹介しました。Sparkは、Hadoopと補完関係にあり、組み合わせて使うことで威力を発揮します。是非、Sparkを使ったインメモリによる驚異の超高速分析を体感してみて下さい。次回は、IT基盤における計算資源の割り当てを行うオープンソースソフトウェアApache Mesosに焦点を当てます。お楽しみに。

各種お問い合わせ

サイオスOSSよろず相談室(1)

問い合わせボタン

最新の情報
[第14回] Linux/OSS エバンジェリスト古賀政純の 『オープンソース・Linux超入門』 システム要件において検討すべき点 その4 2017年02月08日
[第13回] Linux/OSS エバンジェリスト古賀政純の 『オープンソース・Linux超入門』 システム要件において検討すべき点 その3 2017年02月01日
[第12回] Linux/OSS エバンジェリスト古賀政純の 『オープンソース・Linux超入門』 システム要件において検討すべき点 その2 2017年01月25日
[第11回] Linux/OSS エバンジェリスト古賀政純の『オープンソース・Linux超入門』 システム要件において検討すべき点 その1 2017年01月18日
Python人材育成の支援を目的としたPythonエンジニア育成推進協会の活動とは? 2016年12月21日
わかっておきたいセキュリティ: 第2回 マルウェア解析サンドボックス「Cuckoo」との連携 2016年12月14日
可知豊の『 わかっておきたい、オープンソースライセンス』: 第3回 オープンソースライセンスの使い方をわかっておきたい 2016年12月08日
可知豊の『 わかっておきたい、オープンソースライセンス』: 第2回 色々なオープンソースライセンスをわかっておきたい 2016年11月30日
可知豊の『 わかっておきたい、オープンソースライセンス』: 第1回 著作権とライセンスをわかっておきたい 2016年11月17日
わかっておきたいセキュリティ: 第1回 マルウェア解析サンドボックス「Cuckoo」 2016年11月02日
[第10回] Linux/OSS エヴァンジェリスト古賀政純の 『オープンソース・Linux超入門』 Linuxサーバーシステム導入前の検討~ RHELを知る ~ 2016年10月26日
[第9回] Linux/OSS エヴァンジェリスト古賀政純の 『オープンソース・Linux超入門』 Linuxサーバーシステム導入前の検討~ Ubuntu Serverを知る ~ 2016年10月19日
[第8回] Linux/OSS エヴァンジェリスト古賀政純の 『オープンソース・Linux超入門』  Linuxサーバーシステム導入前の検討~ SUSEを知る ~ 2016年10月12日
[第7回] Linux/OSS エヴァンジェリスト古賀政純の 『オープンソース・Linux超入門』~「初心者でもわかる、Linuxサーバーシステム活用者が知っておくべきポイント」(後編) 2016年08月09日
[第6回] Linux/OSS エヴァンジェリスト古賀政純の 『オープンソース・Linux超入門』~「初心者でもわかる、Linuxサーバーシステム活用者が知っておくべきポイント」(前編) 2016年08月02日
[第5回] Linux/OSS エヴァンジェリスト古賀政純の 『オープンソース・Linux超入門』~「ミッションクリティカルシステムとオープンソース・Linux」(後編) 2016年06月22日
[第4回] Linux/OSS エヴァンジェリスト古賀政純の 『オープンソース・Linux超入門』~「ミッションクリティカルシステムとオープンソース・Linux」(前編) 2016年06月16日
[第3回] Linux/OSS エヴァンジェリスト古賀政純の 『オープンソース・Linux超入門』~「CEOが知っておくべきオープンソース革新」(後編)~ 2016年05月25日
[第2回] Linux/OSS エヴァンジェリスト古賀政純の 『オープンソース・Linux超入門』~「CEOが知っておくべきオープンソース革新」(中編)~ 2016年05月18日
[新連載] Linux/OSS エヴァンジェリスト古賀政純の『オープンソース・Linux超入門』~「CEOが知っておくべきオープンソース革新」(前編)~ 2016年05月11日
最新の情報 - もっと...