MapReduce是Hadoop中处理大数据的方法,是一个处理大数据的简单算法、编程泛型。虽然思想简单,但其实真正用起来还是有很多问题,不是所有的问题都可以像WordCount那样典型和直观, 有很多需要trick的地方。MapReduce的中心思想是分而治之,数据要松耦合,可以划分为小数据集并行处理,如果数据本身在计算上存在很强的依赖关系,就不要赶鸭子上架,用MapReduce了。
MapReduce编程中,最重要的是要抓住Map和Reduce的input和output,好的input和output可以降低实现的复杂度。最近,写了很多关于MapReduce的job,有倒排索引,统计,排序等。其中,对排序花费了一番功夫,MapReduce做WordCount很好理解,
Map input:[offset, text], output: [word, 1],
Reduce input: [word, 1], output: [word, totalcount],还可以设置Combiner进行优化。
但排序不同了,大量的文件记录,分配给map,然后reduce出来的文件分布在各个机器上,怎么保证有序呢?排序是算法中常考常用的,MapReduce做排序还需要理解一下MapReduce过程中,非常magic的过程Shuffle and Sort.[more…]
Shuffle and Sort过程解析
如上图,Shuffle的过程包括了Map端和Reduce端。
Map端
- Input Split分配给Map
- Map进行计算,输出[key, value]形式的output
- Map的输出结果缓存在内存里
- 内存中进行Partition,默认是HashPartitioner(采用取模hash (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks), 目的是将map的结果分给不同的reducer,有几个Partition,就有几个reducer,partition的数目可以在job启动时通过参数 “-Dmapreduc.job.reduces”设置(Hadoop 2.2.0), HashPartitioner可以让map的结果均匀的分给不同机器上的reducer,保证负载均衡。
- 内存中在Partition结束后,会对每个Partition中的结果按照key进行排序。
- 排序结束后,相同的key在一起了,如果设置过combiner,就合并数据,减少写入磁盘的记录数
- 当内存中buffer(default 100M)达到阈值(default 80%),就会把记录spill(即溢写)到磁盘中,优化Map时可以调大buffer的阈值,缓存更多的数据。
- 当磁盘中的spill文件数目在3(min.num.spills.for.combine)个(包括)以上, map的新的output会再次运行combiner,而如果磁盘中spill file文件就1~2个,就没有必要调用combiner,因为combiner大多数情况和reducer是一样的逻辑,可以在reduer端再计算。
- Map结束时会把spill出来的多个文件合并成一个,merge过程最多10(默认)个文件同时merge成一个文件,多余的文件分多次merge,merge过程是merge sort的算法。
- Map端shuffle完毕,数据都有序的存放在磁盘里,等待reducer来拿取
Reducer端
shuffle and sort的过程不仅仅在map端,别忘了reducer端还没拿数据呢,reduce job当然不能开启。
- Copy phase: reducer的后台进程(default 5个)到被Application Master (Hadoop 2.2), 或者之前的JobTracker 指定的机器上将map的output拷贝到本地,先拷贝到内存,内存满了就拷贝的磁盘。
- Sort phase(Merge phase): Reduer采用merge sort,将来自各个map的data进行merge, merge成有序的更大的文件。
- 如果设置过Combiner,merge过程可能会调用Combiner,调不调用要看在磁盘中产生的文件数目是否超过了设定的阈值。(这一点我还没有确认,但Combiner在Reducer端是可能调用。)
- Reduce phase: reduce job开始,输入是shuffle sort过程merge产生的文件。
MapReduce排序(Secondary Sort)案例
理解了Shuffle的过程后,我们可以着手开始做排序了。
1. 需求
现在我在HBase里面,存储了10万条文献的评论统计记录,每个文献的评论数目是2~20,每个文献评论统计记录在HBase的存储示例如下:
即表 “pb_stat_comments_count”的记录
[rowkey, column family, column qualifer, value] =
[literature row key, ‘info’, ‘count’, 12]
[literature row key, ‘info’, ‘avgts’, 1400815843854]
count:是每个文献的记录
avgts:是所有文献的平均评价时间戳
排序要求:将文献按照评价次数逆序排,次数大的靠前,次数相同的平均评价时间相同的靠前。
2. 思想
MapReduce在Map端排序时,都只按key排序,而现在我们的排序指标有两个字段count, avgts,组合的key,为了保证排序结果对于第二个字段有序,MapReduce里面叫做Secondary Sort,就是个名称而已,可以扩展为保证第三个字段有序,第四,第五…
- 首先,我们需要重新自定义一个组合Key,即我们将看到的示例中的SortKeyPair类, 包括count和avgts两个字段,并overwrite里面的compare to方法,按我们的需求,count大的靠前,count相同,avgts大的靠前。
- 其次,利用SortKeyPair,自定义一个SortComparatorClass给Map端排序时用。
- 再次,自定义PartitionClass.
我们单机伪分布式下,默认只有一个Partition,一个Reducer,Partition有无均可,但是分布式环境下,我们要求分布在各个reducer上的结果有序,即评论次数25的在reducer0上,610的在reducer 1上,1115在reducer2上,1620在reducer3上,如果采用取模hash,会造成各个reducer的结果顺序无法控制,因此我们不能用HashPartitioner。而自己的Partitioner则需要只按count进行partition,不管平均评价时间,我们这里把partition的主要的这个key叫做NaturalKey. - 最后,我们可以看到reduce端要进行merge,merge过程中,我们需要把相同的count分做一组,需要自定义GroupingComparatorClass。
否则如果是按整个组合key进行分组,每个组合key都是不同的,不能分到一个reduce调用中,reduce方法会被调用10万次(单机环境,一个reducer拿到所有10万条文献), 而如果按count分组,文献评论数目2~20,reduce方法只会被调用最多19次,减少了开销。
我们可以看到一个Reducer实例中,reduce方法会被调用多次,按道理是调用一次,但是因为Reducer过程会把数据分成多个组,reduce调用多次是可能的。(具体的需要再看看源码)
案例代码
1. 自定义组合key类:SortKeyPair.java
[java]
public class SortKeyPair implements WritableComparable<SortKeyPair> {
private int count = 0;
private long avgts = 0;
//要写一个默认构造函数,否则MapReduce的反射机制,无法创建该类报错
public SortKeyPair() {
}
/**
*
* @param count
* @param timestamp
* Average timestamp
*/
public SortKeyPair(int count, long avgts) {
super();
this.count = count;
this.avgts = avgts;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
public long getAvgts() {
return avgts;
}
public void setAvgts(long avgts) {
this.avgts = avgts;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(this.count);
out.writeLong(this.avgts);
}
@Override
public void readFields(DataInput in) throws IOException {
this.count = in.readInt();
this.avgts = in.readLong();
}
/**
* We want sort in descending count and descending avgts, Java里面排序默认小的放前面,即返回-1的放前面,这里直接把小值返回1,就会被排序到后面了。
*/
@Override
public int compareTo(SortKeyPair o) {
int res = this.count < o.getCount() ? 1
: (this.count == o.getCount() ? 0 : -1);
if (res == 0) {
res = this.avgts < o.getAvgts() ? 1
: (this.avgts == o.getAvgts() ? 0 : -1);
}
return res;
}
//这个方法需要Overrride
@Override
public int hashCode() {
return Integer.MAX_VALUE - this.count;
}
@Override
public String toString() {
return this.count + "," + this.avgts;
}
// 这个方法,写不写都不会影响的,至少我测的是这样
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (this == obj) {
return true;
}
if (obj instanceof SortKeyPair) {
SortKeyPair s = (SortKeyPair) obj;
return this.count == s.getCount() && this.avgts == s.getAvgts();
} else {
return false;
}
}
}
[/java]
2. 自定义SortComparatorClass, 我命名为:CompositeKeyComparator.java
[java]
public class CompositeKeyComparator extends WritableComparator{
public CompositeKeyComparator () {
super(SortKeyPair.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
SortKeyPair s1 = (SortKeyPair)a;
SortKeyPair s2 = (SortKeyPair)b;
return s1.compareTo(s2);
}
}
[/java]
3. 自定义PartitionerClass, 我命名为NaturalKeyPartitioner.java
[java]
public class NaturalKeyPartitioner extends Partitioner<SortKeyPair, Text>{
@Override
public int getPartition(SortKeyPair key, Text value, int numPartitions) {
// % is hash partition, can't make sure bigger count go to reducer with small id.
int count = key.getCount();
if (count <= 5) {
return 0;
} else if (count > 5 && count <= 10) {
return 1;
} else if (count > 10 && count <= 15) {
return 2;
} else {
return 3;
}
}
}
[/java]
4. 自定义GroupingComparatorClass,
我命名为NaturalKeyGroupComparator.java
[java]
public class NaturalKeyGroupComparator extends WritableComparator {
public NaturalKeyGroupComparator() {
super(SortKeyPair.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
SortKeyPair s1 = (SortKeyPair) a;
SortKeyPair s2 = (SortKeyPair) b;
int res = s1.getCount() < s2.getCount() ? 1 : (s1.getCount() == s2
.getCount() ? 0 : -1);
return res;
}
}
[/java]
5. 定义Mapper,Reducer,并定义Job提交
[java]
public class SecondarySort {
/**
* It must be declared static, in case of reflection error
*
* @author lgrcyanny
*
*/
public static class SortMapper extends TableMapper<SortKeyPair, Text> {
// Map input [hbase row key, hbase result], output: [SortKeyPair, Text] (Text中包括了我们最后输出到文件的信息:literature row key, count, avgts)
@Override
protected void map(ImmutableBytesWritable key, Result rs,
Context context) throws IOException, InterruptedException {
String literature = Bytes.toString(rs.getRow());
int count = Integer.valueOf(Bytes.toString(rs.getValue(
Bytes.toBytes("info"), Bytes.toBytes("count"))));
long avgts = Long.valueOf(Bytes.toString(rs.getValue(
Bytes.toBytes("info"), Bytes.toBytes("avgts"))));
context.write(new SortKeyPair(count, avgts), new Text(literature
+ "," + count + "," + avgts));
}
}
public static class SortReducer extends
Reducer<SortKeyPair, Text, Text, Text> {
/**
* Now the key is max SortKeyPair in the list, we just dump the ordered
* items
* Reduce input: [SorKeyPair, list of text], Output: [text, text]
*/
@Override
protected void reduce(SortKeyPair key, Iterable<Text> items,
Context context) throws IOException, InterruptedException {
Iterator<Text> iterator = items.iterator();
while (iterator.hasNext()) {
context.write(null, iterator.next());// 因为value中已经包括了需要输出的信息,SortKeyPair的信息不需要输出,key设置为null即可。
}
}
}
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
// 因为数据源是HBase,因此需要使用HBase中的MapReduce启动设置,可以参考HBase官方网站
// 自己做测试,可以改成文件输入,看具体需求而定
Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf, "Secondarysort");
job.setJarByClass(SecondarySort.class);
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes("info"));
scan.setCaching(5000); // Default is 1, set 500 improve performance
scan.setCacheBlocks(false); // Close block cache for MR job
TableMapReduceUtil.initTableMapperJob("pb_stat_comments_count", scan,
SortMapper.class, SortKeyPair.class, Text.class, job);
job.setReducerClass(SortReducer.class);
// For secondary sort, 这里设置自定义排序的三个类
job.setSortComparatorClass(CompositeKeyComparator.class);
job.setPartitionerClass(NaturalKeyPartitioner.class);
job.setGroupingComparatorClass(NaturalKeyGroupComparator.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// Reducer的输出,设置为文件输出
FileOutputFormat.setOutputPath(job, new Path("secondary-sort-res"));
long start = System.currentTimeMillis();
boolean res = job.waitForCompletion(true);
long end = System.currentTimeMillis();
if (res) {
System.out.println("Job done with time " + (end - start));
} else {
throw new IOException("Job exit with error.");
}
}
}
[/java]
总结
MapReduce做排序,麻烦的不是写Mapper和Reducer,而是自定义
CompositeKeyClass
SortComparatorClas
PartitionerClass
GroupingComparatorClass
利用好Shuffle这个很magic课程,可以实现很多奇妙的功能。
本次案例的代码在我的GitHub上。
这些都是个人的理解,如有不对之处,还请指正。
最后,本学期的云计算课到最后了才让我们去些MapReduce程序,之前做无关的MySQL的项目,然后迁移HBase,看着很高端,但是做那个网站到底有什么意义,这是云计算,不是前端计算,HBase in Action中也提到,HBase的设计和MySQL的设计是完全不同的,而MySQL我相信到这个阶段的同学们都会用,何必再次重复劳动呢?直接上HBase不就可以了,本末倒置是我对本学期的课程的失望和吐槽。课程内容多半是概念,涉及技术的有多少?看论文,看概念每一个研究生都可以做到,而我想我们缺的是实践上的指导,比如MapReduce,除了能讲讲WordCount,还能再深入点么?Shuffle的过程可以多说说么?Zookeeper里面的PAXOS算法可以跟我们说说么?学院有Hadoop的集群机器,可以让我们去玩玩不?而不是写那个无聊的网站。
我个人觉得让我们去写一些Hadoop的架构分析,看看源码,也比写那个无聊的网站强。最后9次作业,做的头大,重复劳动,体力劳动,最后还考试,说写的多分会高,有意思么?那么多作业还考试,数据库课程就两次作业+一个论文,但是我觉得我学了也复习了很多知识,比起那些大而空的概念,高到不知哪里去了。
我对云计算,Hadoop,HBase感兴趣,本学期学了很多,但不是课堂上学的,估计我最后对云计算的概念们,要靠下周背PPT了。 我对云计算有兴趣,但对本学期的课程没兴趣。吐槽都是不好的情绪,调整好心态,好好考试吧~BTW,下周还有一次作业。