Hadoopのコードを雑に読む - 入力スプリット編

created: 2016/01/11 03:55

※古いAPIのコードを読んでしまっていたので新しいAPIの方を読んで書き直しました。

Hadoopこれちゃんと勉強した方がいいんじゃないかと思わされる出来事が最近あったので、適当な環境に放り込んで動かしている。
いわゆる象本を読みながらやってるんだけど、Mapper, Reducerのコードだけ見てもイマイチしっくりこないので、読めそうな部分のコードを読んで理解することにした。
深い部分のコードまではちょっと無理そうだけど、Web上の情報を参照しつつInputFormat周辺の実装あたりを読むくらいならできそうだったので、出来る範囲で読んでいこうと思う。

あまりにもおかしな事書いてたら教えて下さい。お願いします。

ソースコード

HadoopのソースコードはApache Licenseのもとに公開されていてApache Hadoop Releasesからダウンロードできる。
ここでは2016/1/10時点で最新のバージョン2.6.3のコードを読むことにする。
読むコードのファイル自体は、ダウンロードしたtarballを展開して出来たディレクトリの"hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce"あたりに入っている。

一連の処理でどういうステップを踏むかは以下のページが参考になる。

Hadoopを使いこなす(1) - Yahoo! JAPAN Tech Blog

コード上だと、上記の各ステップに相当するインターフェース, クラスがそのまんま存在している感じになってるので読みやすい。
とりあえず上から順に、まずは入力スプリットに関連するコードを読んでいく。

入力スプリット(InputSplitクラス)を読む

処理対象となるデータをTaskTrackerに分配するために、データは入力スプリットという単位に分割される。
コード上では入力スプリットはInputSplitクラスで表現される。

// 一部抜粋
public abstract class InputSplit {
  /**
   * Get the size of the split, so that the input splits can be sorted by size.
   * @return the number of bytes in the split
   * @throws IOException
   * @throws InterruptedException
   */
  public abstract long getLength() throws IOException, InterruptedException;

  /**
   * Get the list of nodes by name where the data for the split would be local.
   * The locations do not need to be serialized.
   *
   * @return a new array of the node nodes.
   * @throws IOException
   * @throws InterruptedException
   */
  public abstract String[] getLocations() throws IOException, InterruptedException;

  /**
   * Gets info about which nodes the input split is stored on and how it is
   * stored at each location.
   *
   * @return list of <code>SplitLocationInfo</code>s describing how the split
   *    data is stored at each location. A null value indicates that all the
   *    locations have the data stored on disk.
   * @throws IOException
   */
  @Evolving
  public SplitLocationInfo[] getLocationInfo() throws IOException {
    return null;
  }
}

getLengthで入力スプリットのデータ長, getLocationsで入力スプリットを構成するデータ(ブロック)を保持しているデータノードの場所を返す。
あとgetLocationsに似たようなメソッドとしてgetLocationInfoがある。
ブロックを保持しているデータノード上で、そのブロックがメモリ上に存在するかどうかを取得するために存在しているっぽいけど、使用してる箇所がわからなかったので具体的な使途は不明。

ファイル用入力スプリット(FileSplitクラス)を読む。

InputSplitは抽象クラスなので、具体的にどういうデータをどうやって入力スプリットとするかはその子クラスで行う。
子クラスの1つとしてFileSplitクラスがあり、これは1つのファイル全体若しくは一部を入力スプリットとして取り扱う。

// 一部抜粋
public class FileSplit extends InputSplit implements Writable {
  private Path file;
  private long start;
  private long length;
  private String[] hosts;
  private SplitLocationInfo[] hostInfos;

  public FileSplit() {}

  /** Constructs a split with host information
   *
   * @param file the file name
   * @param start the position of the first byte in the file to process
   * @param length the number of bytes in the file to process
   * @param hosts the list of hosts containing the block, possibly null
   */
  public FileSplit(Path file, long start, long length, String[] hosts) {
    this.file = file;
    this.start = start;
    this.length = length;
    this.hosts = hosts;
  }

  /** Constructs a split with host and cached-blocks information
  *
  * @param file the file name
  * @param start the position of the first byte in the file to process
  * @param length the number of bytes in the file to process
  * @param hosts the list of hosts containing the block
  * @param inMemoryHosts the list of hosts containing the block in memory
  */
 public FileSplit(Path file, long start, long length, String[] hosts,
     String[] inMemoryHosts) {
   this(file, start, length, hosts);
   hostInfos = new SplitLocationInfo[hosts.length];
   for (int i = 0; i < hosts.length; i++) {
     // because N will be tiny, scanning is probably faster than a HashSet
     boolean inMemory = false;
     for (String inMemoryHost : inMemoryHosts) {
       if (inMemoryHost.equals(hosts[i])) {
         inMemory = true;
         break;
       }
     }
     hostInfos[i] = new SplitLocationInfo(hosts[i], inMemory);
   }
 }

  /** The file containing this split's data. */
  public Path getPath() { return file; }

  /** The position of the first byte in the file to process. */
  public long getStart() { return start; }

  /** The number of bytes in the file to process. */
  @Override
  public long getLength() { return length; }

  @Override
  public String toString() { return file + ":" + start + "+" + length; }

  ////////////////////////////////////////////
  // Writable methods
  ////////////////////////////////////////////

  @Override
  public void write(DataOutput out) throws IOException {
    Text.writeString(out, file.toString());
    out.writeLong(start);
    out.writeLong(length);
  }

  @Override
  public void readFields(DataInput in) throws IOException {
    file = new Path(Text.readString(in));
    start = in.readLong();
    length = in.readLong();
    hosts = null;
  }

  @Override
  public String[] getLocations() throws IOException {
    if (this.hosts == null) {
      return new String[]{};
    } else {
      return this.hosts;
    }
  }

  @Override
  @Evolving
  public SplitLocationInfo[] getLocationInfo() throws IOException {
    return hostInfos;
  }
}

InputSplitクラスで抽象メソッドとなっているメソッドの他、ファイルパスやファイルにおける入力スプリットの開始位置を返すメソッドを実装している。

その他、FileSplitクラスではWritableインターフェイスも実装している。
Hadoopではシリアライズ, デシリアライズが必要なオブジェクトはWritableインターフェイスを実装することになっていて、つまりこれを実装しているFileSplitクラス(入力スプリット)はシリアライズ, デシリアライズが可能ということになる。
入力スプリットは計算のため各TaskTrackerへ転送される必要があるはずなので、そのために入力スプリット用クラスがシリアライズ, デシリアライズ可能であることに特に違和感はない。

以下はWritableインターフェイスの実装。メソッドはバイト列への変換とそこからの復元用のメソッド2つだけ。

// 一部抜粋
public interface Writable {

  void write(DataOutput out) throws IOException;

  void readFields(DataInput in) throws IOException;
}

FileSplitクラスのwrite, readFieldsメソッドを読めば、それがどのようにシリアライズ, デシリアライズされるかを見ることが出来る。
シリアライズ後の結果は3フィールドしかなく、それぞれ入力スプリットが持つファイルパス, オフセット, データ長だけとなっている。

FileSplitクラスのコンストラクタは非常に単純なので、擬似的なFileSplitを生成してこのシリアライズの様子を確認することもできる。

import java.io.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class FileSplitSerialization {
    public static void main(String[] args) throws IOException {
        String path = args[0];
        System.out.printf("Path: %s(%d)\n", path, path.length());

        FileSplit fs = new FileSplit(new Path(path), 0, 10, new String[0], new String[0]);

        ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
        DataOutputStream      dataOut = new DataOutputStream(byteOut);

        fs.write(dataOut); // シリアライズ

        System.out.printf("Serialized byte size: %d\n", byteOut.toByteArray().length);
    }
}

HDFS上に適当な画像ファイル(13KBくらい)を作って実行してみる。

vagrant@hadoop:~/workspace$ hadoop fs -ls hdfs://localhost/
Found 1 items
-rw-r--r--   1 vagrant supergroup      13504 2016-01-10 18:05 hdfs://localhost/sample.png
vagrant@hadoop:~/workspace$ hadoop FileSplitSerialization hdfs://localhost/sample.png
Path: hdfs://localhost/sample.png(27)
Serialized byte size: 44
vagrant@hadoop:~/workspace$

パス長が27バイトでオフセットとデータ長がそれぞれlongで8バイトずつで合計43バイト。残りの1バイトは文字列に付随するnull文字か何かな気がするけどちゃんと確認してない。
シリアライズの結果にデータ自体は含まれないため、その結果はほとんどの場合数十バイトになると思う。
このことから「JobTrackerから入力スプリットを割り振る」というのは、実際の巨大であろうデータをTaskTrackerに直接転送するのではなく、パスやオフセットの情報のみを与えて、実際のデータの取得はTaskTracker上で行わせていそうだというのがぼんやりわかる。

ところでFileSplitのシリアライズ, デシリアライズではブロックの位置情報(hostsメンバ変数)を無視しているけど、これはTaskTracker上、つまりJobTrackerによってシリアライズされ割り振られた後では、もはやこの情報を必要としないということなのかな。

終わり

入力スプリットの成り立ちを読んだので、は入力スプリットを生成するInputFormatあたりを読む。