…忘れかけているから抜けがありそうだけど、次回の為に残しておきます。
用意したもの
- jdk-6u38-linux-x64-rpm.bin
- rpmforge-release-0.5.2-2.el5.rf.x86_64.rpm (必要なかったかも。。。)
- hadoop-1.1.1-1.x86_64.rpm (本体)
- hadoop-1.1.1.tar.gz (サンプルプログラムが欲しかった)
インストール
- rpm はroot ユーザで rpm -ivh でインストールする。
- hadoop-1.1.1.tar.gz は一般ユーザで解凍する。 ※誰でもいいのか不安だったので、上記で作成された mapred ユーザで解凍した。
月並みだけど、java のインストール後に JAVA_HOME の環境変数が必要です。私は面倒だったので、/etc/profile.d/java.sh を新規作成して、「export JAVA_HOME=/usr/java/latest」の1行のみ書いています。
hadoop-1.1.1-1.x86_64.rpm で作成されるもの
コマンド
hadoop hadoop-setup-conf.sh hadoop- validate-setup.sh
hadoop-create-user.sh hadoop-daemons.sh hadoop-setup-hdfs.sh
hadoop-daemon.sh hadoop-setup-applications.sh hadoop-setup-single-node.sh
hadoop コマンドや設定ファイルを作成してくれるもの、デーモン起動に使用するものがある。/usr/sbin や /usr/local/bin に作成される。
環境ファイル
/etc/hadoop が作成されます。よく解説にでてくる core-site.xml、hdfs-site.xml、mapred-site.xml がここに作成されます。
起動スクリプト
/etc/init.d/に各起動スクリプトが作成される。
hadoop-datanode hadoop-historyserver hadoop-jobtracker hadoop-namenode hadoop-secondarynamenode hadoop-tasktracker
この辺を起動するのだけど、hadoop の通信は結構あるので、iptables は停止した方が良い。
やったこと
- hadoop-create-user.sh
- hadoop-setup-single-node.sh
- hadoop-setup-hdfs.sh
上記を何も考えずに実行した。
- core-site.xml
- hdfs-site.xml
- mapred-site.xml
上記のlocalhostを全て 192.168.0.xxx のIPアドレスに変更した。あとで思えば、localhost のままでもいいものも、あったように思う。
- hadoop サービスの再起動。
順番は、/etc/rc3.d の記載順
/etc/rc3.d/S90hadoop-datanode start
/etc/rc3.d/S90hadoop-jobtracker start
/etc/rc3.d/S90hadoop-namenode start
/etc/rc3.d/S90hadoop-tasktracker start
- サイトアクセス
http://192.168.0.xxx:50070 はHDS のWEBサービス
http://192.168.0.xxx:50030 は job tracker のWEBサービス
- テスト
hadoop-1.1.1.tar.gz を展開すると、下記のようなサンプル(?)が、展開される。
hadoop-ant-1.1.1.jar
hadoop-client-1.1.1.jar
hadoop-core-1.1.1.jar
hadoop-examples-1.1.1.jar
hadoop-minicluster-1.1.1.jar
hadoop-test-1.1.1.jar
hadoop-tools-1.1.1.jar
とりあえず、hadoop-examples-1.1.1.jar を maperd ユーザで実行してみる。
$ hadoop jar hadoop-examples-1.1.1.jar
An example program must be given as the first argument.
Valid program names are:
aggregatewordcount: An Aggregate based map/reduce program that counts the words in the input files.
aggregatewordhist: An Aggregate based map/reduce program that computes the histogram of the words in the input files.
dbcount: An example job that count the pageview counts from a database.
grep: A map/reduce program that counts the matches of a regex in the input.
join: A job that effects a join over sorted, equally partitioned datasets
multifilewc: A job that counts words from several files.
pentomino: A map/reduce tile laying program to find solutions to pentomino problems.
pi: A map/reduce program that estimates Pi using monte-carlo method.
randomtextwriter: A map/reduce program that writes 10GB of random textual data per node.
randomwriter: A map/reduce program that writes 10GB of random data per node.
secondarysort: An example defining a secondary sort to the reduce.
sleep: A job that sleeps at each map and reduce task.
sort: A map/reduce program that sorts the data written by the random writer.
sudoku: A sudoku solver.
teragen: Generate data for the terasort
terasort: Run the terasort
teravalidate: Checking results of terasort
wordcount: A map/reduce program that counts the words in the input files.
親切ですね。コマンドに何があるか教えてくれているみたい。。。本にも紹介されていた wordcount が簡単そう。。
$ hadoop jar hadoop-examples-1.1.1.jar wordcount
Usage: wordcount <in> <out>
さらなる引数を教えてくれた。INファイルをOUTディレクトリが必要らしい。INファイルはUNIXファイルシステムではない事に注意。
INファイルをアップします。
$ hadoop dfs -ls /
Found 2 items
drwxrwxrwx – hdfs supergroup 0 2012-12-24 22:25 /tmp
drwxr-xr-x – hdfs supergroup 0 2012-12-24 20:31 /user
二つのディレクトリがHDS上に存在するらしい、dfs は -ls 以外に -mkdirなどいろいろあります。
UNIXファイルをアップします。
$ hadoop dfs -put /etc/hosts /tmp/
-bash-3.2$ hadoop dfs -ls /tmp/
Found 1 items
-rw——- 3 mapred supergroup 238 2012-12-24 23:29 /tmp/hosts
ファイルがアップできたので、再度コマンドを実行します。
$ hadoop jar hadoop-examples-1.1.1.jar wordcount /tmp/hosts /tmp/output
12/12/24 23:32:13 INFO input.FileInputFormat: Total input paths to process : 1
12/12/24 23:32:13 INFO util.NativeCodeLoader: Loaded the native-hadoop library
12/12/24 23:32:13 WARN snappy.LoadSnappy: Snappy native library not loaded
12/12/24 23:32:13 INFO mapred.JobClient: Running job: job_201212242215_0005
12/12/24 23:32:14 INFO mapred.JobClient: map 0% reduce 0%
12/12/24 23:32:29 INFO mapred.JobClient: map 100% reduce 0%
12/12/24 23:32:36 INFO mapred.JobClient: map 100% reduce 33%
12/12/24 23:32:37 INFO mapred.JobClient: map 100% reduce 100%
12/12/24 23:32:38 INFO mapred.JobClient: Job complete: job_201212242215_0005
12/12/24 23:32:39 INFO mapred.JobClient: Counters: 29
12/12/24 23:32:39 INFO mapred.JobClient: Job Counters
12/12/24 23:32:39 INFO mapred.JobClient: Launched reduce tasks=1
12/12/24 23:32:39 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=13193
12/12/24 23:32:39 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
12/12/24 23:32:39 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
12/12/24 23:32:39 INFO mapred.JobClient: Launched map tasks=1
12/12/24 23:32:39 INFO mapred.JobClient: Data-local map tasks=1
12/12/24 23:32:39 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=8831
12/12/24 23:32:39 INFO mapred.JobClient: File Output Format Counters
12/12/24 23:32:39 INFO mapred.JobClient: Bytes Written=285
12/12/24 23:32:39 INFO mapred.JobClient: FileSystemCounters
12/12/24 23:32:39 INFO mapred.JobClient: FILE_BYTES_READ=395
12/12/24 23:32:39 INFO mapred.JobClient: HDFS_BYTES_READ=338
12/12/24 23:32:39 INFO mapred.JobClient: FILE_BYTES_WRITTEN=58367
12/12/24 23:32:39 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=285
12/12/24 23:32:39 INFO mapred.JobClient: File Input Format Counters
12/12/24 23:32:39 INFO mapred.JobClient: Bytes Read=238
12/12/24 23:32:39 INFO mapred.JobClient: Map-Reduce Framework
12/12/24 23:32:39 INFO mapred.JobClient: Map output materialized bytes=395
12/12/24 23:32:39 INFO mapred.JobClient: Map input records=7
12/12/24 23:32:39 INFO mapred.JobClient: Reduce shuffle bytes=395
12/12/24 23:32:39 INFO mapred.JobClient: Spilled Records=52
12/12/24 23:32:39 INFO mapred.JobClient: Map output bytes=343
12/12/24 23:32:39 INFO mapred.JobClient: CPU time spent (ms)=1900
12/12/24 23:32:39 INFO mapred.JobClient: Total committed heap usage (bytes)=265216000
12/12/24 23:32:39 INFO mapred.JobClient: Combine input records=27
12/12/24 23:32:39 INFO mapred.JobClient: SPLIT_RAW_BYTES=100
12/12/24 23:32:39 INFO mapred.JobClient: Reduce input records=26
12/12/24 23:32:39 INFO mapred.JobClient: Reduce input groups=26
12/12/24 23:32:39 INFO mapred.JobClient: Combine output records=26
12/12/24 23:32:39 INFO mapred.JobClient: Physical memory (bytes) snapshot=231952384
12/12/24 23:32:39 INFO mapred.JobClient: Reduce output records=26
12/12/24 23:32:39 INFO mapred.JobClient: Virtual memory (bytes) snapshot=2097623040
12/12/24 23:32:39 INFO mapred.JobClient: Map output records=27
期待どおりの動作?。。。動いてよかったよかった。
ついでにソースが見たくなります。
$ jar tvf hadoop-examples-1.1.1.jar
0 Mon Nov 19 10:42:54 JST 2012 META-INF/
156 Mon Nov 19 10:42:52 JST 2012 META-INF/MANIFEST.MF
0 Mon Nov 19 10:42:54 JST 2012 org/
0 Mon Nov 19 10:42:54 JST 2012 org/apache/
0 Mon Nov 19 10:42:54 JST 2012 org/apache/hadoop/
0 Mon Nov 19 10:42:54 JST 2012 org/apache/hadoop/examples/
0 Mon Nov 19 10:42:54 JST 2012 org/apache/hadoop/examples/dancing/
0 Mon Nov 19 10:42:54 JST 2012 org/apache/hadoop/examples/terasort/
1995 Mon Nov 19 10:42:54 JST 2012 org/apache/hadoop/examples/AggregateWordCount$WordCountPlugInClass.class
(中略)
コンパイルされた class ファイルですかが、パスを確認できます。ソースコードは下記にあります。
個々の解説は。。。私が理解できたらですね。大きくいうと、標準クラスを利用している、オリジナルクラスを定義している、main で、引数の確認から Jobtracker への定義、処理の設定、ファイル入出力定義と実行の終了待ち「job.waitForCompletion()」といった感じでしょうか。この処理は、単純集計ですが、複雑な解析が必要になると、IntSumReducer()といった関数の中身が膨大になるでしょうね。また、集計方法に合わせて、Mapを変える必要があるでしょうし、Map、Reduce、In、Out、Map、Reduce、In、Out。。。とJobの設定が長くなっていく予感がします。
$ cat src/examples/org/apache/hadoop/examples/WordCount.java
package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println(“Usage: wordcount <in> <out>”);
System.exit(2);
}
Job job = new Job(conf, “word count”);
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}