- 作者:老汪软件技巧
- 发表时间:2024-11-18 07:04
- 浏览量:
引子
随着移动互联网时代的到来,大数据时代 也随之而至。无数的信息流与数据流在各种系统和设备中涌动,如何高效地存储与处理这些海量数据,成为了当今技术领域的一大挑战。作为Apache旗下的分布式存储与计算框架,Hadoop 一直在大数据处理领域占有重要地位,凭借其强大的扩展性和可靠性,广泛应用于各类大规模数据处理任务。
本文将借鉴Hadoop的设计思想,使用Java实现其一大核心功能:MapReduce(分布式计算模型),以此展示如何通过并行计算解决海量数据处理问题。
认识Hadoop
既然是要借鉴设计,自然也就需要我们先对Hadoop来细细地“盘”一下,毕竟工欲善其事必先利其器。那么,就让我来用很多人都做过的图书管理系统来帮大家梳理一下。
1.Hadoop本身:图书馆管理系统
想象你是一家超大型图书馆的馆长,这个图书馆有成千上万的书籍,Hadoop就是一个强大的管理系统,可以帮助你有效地存储、管理和处理这些书籍的信息。
那么作为管理这些图书的Hadoop此时就面临着两个关键问题需要解决:
为了实现这两个目标,Hadoop就引入了HDFS和 MapReduce,它们分别负责存储和处理数据。
2.HDFS:图书馆的书架和仓库系统
HDFS(Hadoop Distributed File System)负责数据存储,就像图书馆中的书架和仓库系统,负责存储所有的书籍。
它的存储方式结合图书馆具有以下几个特点:
3.MapReduce:图书馆的任务分配系统
在图书馆的管理系统中,除了需要分布式存储书籍外,还需要对这些书籍进行查询、统计和分析工作。为了高效处理这些任务,图书馆采用了MapReduce来对任务进行分配。这个系统通过将任务拆分为多个步骤,并行分配给不同的管理员(节点),从而加快任务的执行速度。
MapReduce主要分为两个阶段:Map阶段和Reduce阶段。
3-1.Map阶段(映射阶段)
假设你想知道图书馆里每本书的借阅次数。图书馆不会让一个管理员去统计所有书籍的借阅信息,而是将统计任务分配给多个房间的管理员。每个管理员只负责统计自己房间内的书籍借阅情况,并生成一个中间结果。这就是 Map阶段:每个节点负责处理自己存储的数据,生成键值对结果。
对应到实际的Hadoop系统中,Map阶段会将大规模的数据集分成多个小块,由不同的节点并行处理。每个节点负责处理自己的一部分数据,并输出中间的键值对结果。
3-2.Reduce阶段(归约阶段)
当每个房间的管理员将统计结果交给馆长后,馆长会将这些结果汇总,得到整个图书馆的借阅统计信息。这就是 Reduce 阶段:汇总Map阶段生成的键值对,得到最终的统计结果。
在 Hadoop 中,Reduce 阶段会接收来自多个Map任务的中间结果,并对这些结果进行汇总或聚合,最终生成用户所需要的输出结果。
3-3.并行与容错
每个房间的管理员可以同时统计各自房间书籍的数量,如果某个房间管理员今天请假了没来,馆长也会为这个房间指定一个临时管理员来接手任务。
MapReduce的最大优势在于它的并行处理能力。由于每个节点可以独立地处理自己的一部分数据,整个任务可以被拆分为多个小任务并行执行,这极大提高了任务的处理速度。此外,若某个节点在执行任务时发生故障,MapReduce系统能够自动重新分配任务,确保整个作业的顺利完成。
而这些也是我们今天需要实现的点。
技术实现
Hadoop在本地安装后,可以以两种模式运行,分别是本地模式和伪分布式模式。在本地模式下,它会在单个 JVM 实例中运行,不依赖于 HDFS、YARN 或 MapReduce。所有的计算都在本地机器的文件系统上进行。因此,更适合我们此时的快速开发和测试。当然,别忘了引入相关依赖:
<dependencies>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-commonartifactId>
<version>3.3.6version>
dependency>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-mapreduce-client-coreartifactId>
<version>3.3.6version>
dependency>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-mapreduce-client-appartifactId>
<version>3.3.6version>
dependency>
dependencies>
实现MapReduce任务
首先,我们先通过Java实现一个简单的任务-统计一段文本中的单词出现次数。首先先来实现map接口,还记得我们前面提到的map阶段是各个节点处理自己的数据。在当前的任务下,就是对文本进行分词统计即可,代码如下:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split("\s+");
for (String w : words) {
word.set(w);
context.write(word, one);
}
}
}
map实现后,我们接下来继续reduce阶段,来汇总Mapper产生的中间结果,将相同单词的频次加起来。
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer {
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
最后是编写我们的任务入口,负责配置并提交MapReduce作业
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountJob {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: WordCountJob );
System.exit(-1);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Word Count");
job.setJarByClass(WordCountJob.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
我们准备一个txt文本,用我们的作业简单测试下效果,如下:
运行计数作业后,输出结果如下:
可以看到,我们这个简单的MapReduce任务就实现了。