Hadoopのコードを雑に読む - InputFormat編

created: 2016/01/13 06:27

前回は入力スプリットがどういう構成をしているかを見たので、今回は入力スプリットを生成するInputFormatについて読んでいく。
コードやそのバージョンは前回と同様のものを参照する。

InputFormatクラス

InputFormatクラスには以下の2つの抽象メソッドが定義されている。

public abstract class InputFormat<K, V> {

  /**
   * Logically split the set of input files for the job.
   */
  public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;

  /**
   * Create a record reader for a given split. The framework will call
   */
  public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
}

createRecordReaderは入力スプリットの生成には関係無いので今回は無視して、getSplitsメソッド関係のコードを読んでいく。
定義を見れば分かる通り、このメソッドは前回読んだInputSplitクラス(入力スプリット)を返すように子クラスで実装されなければならない。

FileInputFormatクラス

getSplitsの具体的な実装を見るため、子クラスであるFileInputFormatクラスのコードを読むことにする。
このクラスは入力元データにファイルを取るようなInputFormatのベースとなるクラスで、これを親クラスとして様々なInputFormatが定義されている。
以下はFileInputFormatクラスのコードから抜粋した説明文。

A base class for file-based InputFormats.

FileInputFormat is the base class for all file-based
InputFormats. This provides a generic implementation of
getSplits(JobContext).
Subclasses of FileInputFormat can also override the
isSplitable(JobContext, Path) method to ensure input-files are
not split-up and are processed as a whole by Mappers.

isSplitableというメソッドにも言及していて、これを子クラスでオーバーライドすることでファイルが複数の入力スプリットに分割されることを抑制できるという旨の説明がある。
このメソッドもgetSplits内で使用される。

それじゃあ実装を読む。ざっくり流れだけでもわかればいいかな。

// ログ出力等のデバッグ用コードは除外
public List<InputSplit> getSplits(JobContext job) throws IOException {

  long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
  long maxSize = getMaxSplitSize(job);

  // generate splits
  List<InputSplit> splits = new ArrayList<InputSplit>();
  List<FileStatus> files = listStatus(job);
  for (FileStatus file: files) {
    Path path = file.getPath();
    long length = file.getLen();
    if (length != 0) {
      BlockLocation[] blkLocations;
      if (file instanceof LocatedFileStatus) {
        blkLocations = ((LocatedFileStatus) file).getBlockLocations();
      } else {
        FileSystem fs = path.getFileSystem(job.getConfiguration());
        blkLocations = fs.getFileBlockLocations(file, 0, length);
      }
      if (isSplitable(job, path)) {
        long blockSize = file.getBlockSize();
        long splitSize = computeSplitSize(blockSize, minSize, maxSize);

        long bytesRemaining = length;
        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
          splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                      blkLocations[blkIndex].getHosts(),
                      blkLocations[blkIndex].getCachedHosts()));
          bytesRemaining -= splitSize;
        }

        if (bytesRemaining != 0) {
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
          splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                     blkLocations[blkIndex].getHosts(),
                     blkLocations[blkIndex].getCachedHosts()));
        }
      } else { // not splitable
        splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                    blkLocations[0].getCachedHosts()));
      }
    } else {
      //Create empty hosts array for zero length files
      splits.add(makeSplit(path, 0, length, new String[0]));
    }
  }
  // Save the number of input files for metrics/loadgen
  job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());

  return splits;
}

protected boolean isSplitable(JobContext context, Path filename) {
  return true;
}

protected FileSplit makeSplit(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) {
  return new FileSplit(file, start, length, hosts, inMemoryHosts);
}

protected int getBlockIndex(BlockLocation[] blkLocations, long offset) {
  for (int i = 0 ; i < blkLocations.length; i++) {
    // is the offset inside this block?
    if ((blkLocations[i].getOffset() <= offset) &&
        (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
      return i;
    }
  }
  BlockLocation last = blkLocations[blkLocations.length -1];
  long fileLength = last.getOffset() + last.getLength() -1;
  throw new IllegalArgumentException("Offset " + offset + " is outside of file (0.." + fileLength + ")");
}

流れとしては、まずlistStatusメソッドで入力元データとなるファイルの一覧(FileStatus)を取得する。
FileStatusはファイルに関する情報(パス, ファイルサイズ, ファイルか, ディレクトリか)を取り扱うクラスで、ちょっとこのコードまで読もうとするとHDFSに関連するコードまでうろうろしないといけなくなるので、この辺りはドキュメントを読んで流す。

その後、それぞれのファイルについて

  • 分割が許可される(isSplittableが真)なら分割
  • 分割が許可されないならファイルをそのまま入力スプリットとする

という処理を行う。
子クラスでオーバーライドされない限りisSplittableは真を返す。
入力スプリットを生成しているmakeSplitメソッドは、単にFileSplitクラスのコンストラクタへ引数を渡しているだけである。InputSplitは非常に小さいデータのみを持つことは前回読んだ。

ファイルを入力スプリットへ分割する際は、まだ入力スプリットとしていない部分のデータ長をスプリットサイズで除算した結果がSPLIT_SLOPで定義される値より大きい限りループで分割し、最後に残ったデータ長の部分を入力スプリット1つとしてファイル全体を分割する。

SPLIT_SLOPの値は、1ではなく1.1でハードコードされていて、

private static final double SPLIT_SLOP = 1.1;   // 10% slop

これはつまり、whileを抜けた後のbytesRemainingがsplitSizeよりも最大10%大きい可能性があることになる(が、総数はSPLIT_SLOPを1とした場合と比較して1つ少なくなる)
多少入力スプリットのサイズが設定値をオーバーしても入力スプリットの総数を小さくしたいという方針なのかな。

getBlockIndexメソッドは、現在のファイルオフセットがブロックの配列のインデックスの何番目に相当するかを計算するために使用される。

最後にlistStatusに目を通しておきたいけど、その前にそもそも入力元データとなるファイルはどうやって設定するのかも見ておく。
よくサンプルとして挙げられるWordCountでは、以下の用に入力元データのファイル名を設定する。

FileInputFormat.addInputPath(job, new Path(otherArgs[i]));

addInputPathメソッドの実装はこう。

public static void addInputPath(Job job, Path path) throws IOException {
  Configuration conf = job.getConfiguration();
  path = path.getFileSystem(conf).makeQualified(path);
  String dirStr = StringUtils.escapeString(path.toString());
  String dirs = conf.get(INPUT_DIR);
  conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
}

addInputPathは入力元データとなるファイルをジョブの設定の1つとして書き込む。
listStatusはこの設定を元にファイルのFileStatusを取得するので、listStatusの引数にはJobが渡されているというわけだ。

で、listStatusのコードはこう。

protected List<FileStatus> listStatus(JobContext job
                                      ) throws IOException {
  Path[] dirs = getInputPaths(job);
  if (dirs.length == 0) {
    throw new IOException("No input paths specified in job");
  }

  // get tokens for all the required FileSystems..
  TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
                                      job.getConfiguration());

  // Whether we need to recursive look into the directory structure
  boolean recursive = getInputDirRecursive(job);

  // creates a MultiPathFilter with the hiddenFileFilter and the
  // user provided one (if any).
  List<PathFilter> filters = new ArrayList<PathFilter>();
  filters.add(hiddenFileFilter);
  PathFilter jobFilter = getInputPathFilter(job);
  if (jobFilter != null) {
    filters.add(jobFilter);
  }
  PathFilter inputFilter = new MultiPathFilter(filters);

  List<FileStatus> result = null;

  int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
      DEFAULT_LIST_STATUS_NUM_THREADS);
  Stopwatch sw = new Stopwatch().start();
  if (numThreads == 1) {
    result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
  } else {
    Iterable<FileStatus> locatedFiles = null;
    try {
      LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
          job.getConfiguration(), dirs, recursive, inputFilter, true);
      locatedFiles = locatedFileStatusFetcher.getFileStatuses();
    } catch (InterruptedException e) {
      throw new IOException("Interrupted while getting file statuses");
    }
    result = Lists.newArrayList(locatedFiles);
  }

  sw.stop();
  if (LOG.isDebugEnabled()) {
    LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis());
  }
  LOG.info("Total input paths to process : " + result.size());
  return result;
}

private List<FileStatus> singleThreadedListStatus(JobContext job, Path[] dirs,
    PathFilter inputFilter, boolean recursive) throws IOException {
  List<FileStatus> result = new ArrayList<FileStatus>();
  List<IOException> errors = new ArrayList<IOException>();
  for (int i=0; i < dirs.length; ++i) {
    Path p = dirs[i];
    FileSystem fs = p.getFileSystem(job.getConfiguration());
    FileStatus[] matches = fs.globStatus(p, inputFilter);
    if (matches == null) {
      errors.add(new IOException("Input path does not exist: " + p));
    } else if (matches.length == 0) {
      errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
    } else {
      for (FileStatus globStat: matches) {
        if (globStat.isDirectory()) {
          RemoteIterator<LocatedFileStatus> iter =
              fs.listLocatedStatus(globStat.getPath());
          while (iter.hasNext()) {
            LocatedFileStatus stat = iter.next();
            if (inputFilter.accept(stat.getPath())) {
              if (recursive && stat.isDirectory()) {
                addInputPathRecursively(result, fs, stat.getPath(),
                    inputFilter);
              } else {
                result.add(stat);
              }
            }
          }
        } else {
          result.add(globStat);
        }
      }
    }
  }

  if (!errors.isEmpty()) {
    throw new InvalidInputException(errors);
  }
  return result;
}

public static Path[] getInputPaths(JobContext context) {
  String dirs = context.getConfiguration().get(INPUT_DIR, "");
  String [] list = StringUtils.split(dirs);
  Path[] result = new Path[list.length];
  for (int i = 0; i < list.length; i++) {
    result[i] = new Path(StringUtils.unEscapeString(list[i]));
  }
  return result;
}

ちょっと長い。
コードを見ると、listStatusによる列挙はマルチスレッドに対応していることがわかる。
起動するスレッド数は設定値によって変更可能になっているがデフォルトでは1、つまりsingleThreadedListStatusメソッドを通してファイルの一覧が取得されることになる。
あとはその中でパスにマッチするファイルを探してディレクトリだったら再帰して...という感じになっている。

その他のInputFormat

FileInputFormatは抽象クラスなので、FileInputFormatを継承してやっと使えるInputFormatが手に入る。
ただ、入力スプリットの生成に関して大部分の実装をFileInputFormatが請け負ってくれているため、それぞれの子クラスのコード量はFileInputFormatに比べるとかなり少ない。

ちなみにInputFormat(FileInputFormatと読み替えても良い)を継承したクラスが複数あるのは、「入力スプリットで割り振られたデータのどこをキー, 値としてMapperに渡していくか」のパターンを、Hadoopは色々提供してくれているからだ。
入力スプリットをテキストとして1行取り出してバイトオフセットをキー, 行の内容を値として渡してくれるTextInputFormatや、もうちょっと便利に、行の内容を区切り文字で区切って最初の要素をキー, 最後の次の要素を値としてくれるKeyValueTextInputFormat、他にも色々あるっぽい。

で、実際にキーと値を入力スプリットから切り出す実装となるクラスを、InputFormatは今回無視したcreateRecordReaderで決定する。
createRecordReaderはいわゆるRecordReader、コード上ではRecordReaderクラスの子クラスを返す。
K, VはそれぞれMapperに渡されるキー, 値の型だ。
ここでもう1回InputFormatの定義を読み返してみる。

public abstract class InputFormat<K, V> {

  public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;

}

この定義から、InputFormatのK, Vの型が決まればMapperに渡されるキー, 値の型も決まることになる。
例えばWordCountのサンプルでも使われているTextInputFormatは以下の実装になっている。

// 全実装。短い
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {

  @Override
  public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
    String delimiter = context.getConfiguration().get("textinputformat.record.delimiter");
    byte[] recordDelimiterBytes = null;
    if (null != delimiter)
      recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
    return new LineRecordReader(recordDelimiterBytes);
  }

  @Override
  protected boolean isSplitable(JobContext context, Path file) {
    final CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
    if (null == codec) {
      return true;
    }
    return codec instanceof SplittableCompressionCodec;
  }

}

この場合、Mapperが受け取るキー, 値の型はそれぞれLongWritable, Textになる。
初めてサンプル読んだ時はなんでそうなるのか全然分からなかった...

終わり

次はRecordReaderを読みたい。