鍍金池/ 教程/ 大數(shù)據(jù)/ MapReduce - 讀取數(shù)據(jù)
YARN - Failover
MapReduce - Shuffle
Hadoop配置
MapReduce - 讀取數(shù)據(jù)
Hadoop - MapReduce
YARN - Container
Hadoop 測試
MapReduce - Mapper
Hadoop - 參考
YARN - ResourceManager
HDFS - 可靠性
HDFS - 讀文件
Hadoop - 簡介
YARN - ApplicationMaster
Hadoop - IO
Hadoop 監(jiān)控
MapReduce - 編程
Hadoop - YARN
Hadoop - HDFS
HDFS - 命令工具
HDFS - 寫文件
YARN - NodeManager
Hadoop安裝

MapReduce - 讀取數(shù)據(jù)

通過InputFormat決定讀取的數(shù)據(jù)的類型,然后拆分成一個(gè)個(gè)InputSplit,每個(gè)InputSplit對應(yīng)一個(gè)Map處理,RecordReader讀取InputSplit的內(nèi)容給Map

InputFormat

決定讀取數(shù)據(jù)的格式,可以是文件或數(shù)據(jù)庫等

功能

  1. 驗(yàn)證作業(yè)輸入的正確性,如格式等
  2. 將輸入文件切割成邏輯分片(InputSplit),一個(gè)InputSplit將會被分配給一個(gè)獨(dú)立的Map任務(wù)
  3. 提供RecordReader實(shí)現(xiàn),讀取InputSplit中的"K-V對"供Mapper使用

方法

List getSplits(): 獲取由輸入文件計(jì)算出輸入分片(InputSplit),解決數(shù)據(jù)或文件分割成片問題

RecordReader <k,v>createRecordReader():</k,v> 創(chuàng)建#x5EFA;RecordReader,從InputSplit中讀取數(shù)據(jù),解決讀取分片中數(shù)據(jù)問題

類結(jié)構(gòu)

http://wiki.jikexueyuan.com/project/hadoop/images/mapreduce-inputformat.png" alt="" />

TextInputFormat: 輸入文件中的每一行就是一個(gè)記錄,Key是這一行的byte offset,而value是這一行的內(nèi)容

KeyValueTextInputFormat: 輸入文件中每一行就是一個(gè)記錄,第一個(gè)分隔符字符切分每行。在分隔符字符之前的內(nèi)容為Key,在之后的為Value。分隔符變量通過key.value.separator.in.input.line變量設(shè)置,默認(rèn)為(\t)字符。

NLineInputFormat: 與TextInputFormat一樣,但每個(gè)數(shù)據(jù)塊必須保證有且只有N行,mapred.line.input.format.linespermap屬性,默認(rèn)為1

SequenceFileInputFormat: 一個(gè)用來讀取字符流數(shù)據(jù)的InputFormat,<key,value>為用戶自定義的。字符流數(shù)據(jù)是Hadoop自定義的壓縮的二進(jìn)制數(shù)據(jù)格式。它用來優(yōu)化從一個(gè)MapReduce任務(wù)的輸出到另一個(gè)MapReduce任務(wù)的輸入之間的數(shù)據(jù)傳輸過程。</key,value>

InputSplit

代表一個(gè)個(gè)邏輯分片,并沒有真正存儲數(shù)據(jù),只是提供了一個(gè)如何將數(shù)據(jù)分片的方法

Split內(nèi)有Location信息,利于數(shù)據(jù)局部化

一個(gè)InputSplit給一個(gè)單獨(dú)的Map處理

public abstract class InputSplit {
      /**
       * 獲取Split的大小,支持根據(jù)size對InputSplit排序.
       */
      public abstract long getLength() throws IOException, InterruptedException;

      /**
       * 獲取存儲該分片的數(shù)據(jù)所在的節(jié)點(diǎn)位置.
       */
      public abstract String[] getLocations() throws IOException, InterruptedException;
}

RecordReader

將InputSplit拆分成一個(gè)個(gè)<key,value>對給Map處理,也是實(shí)際的文件讀取分隔對象</key,value>

問題

大量小文件如何處理

CombineFileInputFormat可以將若干個(gè)Split打包成一個(gè),目的是避免過多的Map任務(wù)(因?yàn)镾plit的數(shù)目決定了Map的數(shù)目,大量的Mapper Task創(chuàng)建銷毀開銷將是巨大的)

怎么計(jì)算split的

通常一個(gè)split就是一個(gè)block(FileInputFormat僅僅拆分比block大的文件),這樣做的好處是使得Map可以在存儲有當(dāng)前數(shù)據(jù)的節(jié)點(diǎn)上運(yùn)行本地的任務(wù),而不需要通過網(wǎng)絡(luò)進(jìn)行跨節(jié)點(diǎn)的任務(wù)調(diào)度

通過mapred.min.split.size, mapred.max.split.size, block.size來控制拆分的大小

如果mapred.min.split.size大于block size,則會將兩個(gè)block合成到一個(gè)split,這樣有部分block數(shù)據(jù)需要通過網(wǎng)絡(luò)讀取

如果mapred.max.split.size小于block size,則會將一個(gè)block拆成多個(gè)split,增加了Map任務(wù)數(shù)(Map對split進(jìn)行計(jì)算?#x5E76;且上報(bào)結(jié)果,關(guān)閉當(dāng)前計(jì)算打開新的split均需要耗費(fèi)資源)

先獲取文件在HDFS上的路徑和Block信息,然后根據(jù)splitSize對文件進(jìn)行切分( splitSize = computeSplitSize(blockSize, minSize, maxSize) ),默認(rèn)splitSize 就等于blockSize的默認(rèn)值(64m)

public List<InputSplit> getSplits(JobContext job) throws IOException {
    // 首先計(jì)算分片的最大和最小值。這兩個(gè)值將會用來計(jì)算分片的大小
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    long maxSize = getMaxSplitSize(job);

    // generate splits
    List<InputSplit> splits = new ArrayList<InputSplit>();
    List<FileStatus> files = listStatus(job);
    for (FileStatus file: files) {
        Path path = file.getPath();
        long length = file.getLen();
        if (length != 0) {
              FileSystem fs = path.getFileSystem(job.getConfiguration());
            // 獲取該文件所有的block信息列表[hostname, offset, length]
              BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
            // 判斷文件是否可分割,通常是可分割的,但如果文件是壓縮的,將不可分割
              if (isSplitable(job, path)) {
                long blockSize = file.getBlockSize();
                // 計(jì)算分片大小
                // 即 Math.max(minSize, Math.min(maxSize, blockSize));
                long splitSize = computeSplitSize(blockSize, minSize, maxSize);

                long bytesRemaining = length;
                // 循環(huán)分片。
                // 當(dāng)剩余數(shù)據(jù)與分片大小比值大于Split_Slop時(shí),繼續(xù)分片, 小于等于時(shí),停止分片
                while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                      int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                      splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts()));
                      bytesRemaining -= splitSize;
                }
                // 處理余下的數(shù)據(jù)
                if (bytesRemaining != 0) {
                    splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkLocations.length-1].getHosts()));
                }
            } else {
                // 不可split,整塊返回
                splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
            }
        } else {
            // 對于長度為0的文件,創(chuàng)建空Hosts列表,返回
            splits.add(makeSplit(path, 0, length, new String[0]));
        }
    }

    // 設(shè)置輸入文件數(shù)量
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    LOG.debug("Total # of splits: " + splits.size());
    return splits;
}

分片間的數(shù)據(jù)如何處理

split是根據(jù)文件大小分割的,而一般處理是根據(jù)分隔符進(jìn)行分割的,這樣勢必存在一條記錄橫跨兩個(gè)split

http://wiki.jikexueyuan.com/project/hadoop/images/mapreduce-split.png" alt="" />

解決辦法是只要不是第一個(gè)split,都會遠(yuǎn)程讀取一條記錄。不是第一個(gè)split的都忽略到第一條記錄

public class LineRecordReader extends RecordReader<LongWritable, Text> {
    private CompressionCodecFactory compressionCodecs = null;
    private long start;
    private long pos;
    private long end;
    private LineReader in;
    private int maxLineLength;
    private LongWritable key = null;
    private Text value = null;

    // initialize函數(shù)即對LineRecordReader的一個(gè)初始化
    // 主要是計(jì)算分片的始末位置,打開輸入流以供讀取K-V對,處理分片經(jīng)過壓縮的情況等
    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
        FileSplit split = (FileSplit) genericSplit;
        Configuration job = context.getConfiguration();
        this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
        start = split.getStart();
        end = start + split.getLength();
        final Path file = split.getPath();
        compressionCodecs = new CompressionCodecFactory(job);
        final CompressionCodec codec = compressionCodecs.getCodec(file);

        // 打開文件,并定位到分片讀取的起始位置
        FileSystem fs = file.getFileSystem(job);
        FSDataInputStream fileIn = fs.open(split.getPath());

        boolean skipFirstLine = false;
        if (codec != null) {
            // 文件是壓縮文件的話,直接打開文件
            in = new LineReader(codec.createInputStream(fileIn), job);
            end = Long.MAX_VALUE;
        } else {
            // 只要不是第一個(gè)split,則忽略本split的第一行數(shù)據(jù)
            if (start != 0) {
                skipFirstLine = true;
                --start;
                // 定位到偏移位置,下&#x#x6B21;讀取就會從偏移位置開始
                fileIn.seek(start);
            }
            in = new LineReader(fileIn, job);
        }

        if (skipFirstLine) {
            // 忽略第一行數(shù)據(jù),重新定位start
            start += in.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE, end - start));
        }
        this.pos = start;
    }

    public boolean nextKeyValue() throws IOException {
        if (key == null) {
            key = new LongWritable();
        }
        key.set(pos);// key即為偏移量
        if (value == null) {
            value = new Text();
        }
        int newSize = 0;
        while (pos < end) {
            newSize = in.readLine(value, maxLineLength,    Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength));
            // 讀取的數(shù)據(jù)長度為0,則說明已讀完
            if (newSize == 0) {
                break;
            }
            pos += newSize;
            // 讀取的數(shù)據(jù)長度小于最大行長度,也說明已讀取完畢
            if (newSize < maxLineLength) {
                break;
            }
            // 執(zhí)行到此處,說明該行數(shù)據(jù)沒讀完,繼續(xù)讀入
        }
        if (newSize == 0) {
            key = null;
            value = null;
            return false;
        } else {
            return true;
        }
    }
}
上一篇:Hadoop 監(jiān)控下一篇:Hadoop - 簡介