MapReduce Programming Model
这是我的 Hadoop
学习之旅的第二站,之前我分享了关于 HDFS
的架构和设计。
现在要来看一下如何利用 HDFS
来实现大数据业务程序。
在 Hadoop
问世之前,分布式计算也是存在的,但是没有通用的解决方案,只能专门处理某一类计算。
这大概是跟不使用任何Web框架和系统库来开发Web程序的感觉差不多。
MapReduce
既是一个编程模型,又是一个计算框架。
作为编程模型来说,MapReduce
并不是一个“神奇”的东西。但是,作为大数据计算框架来说,它的出现使得开发大数据应用的门槛降低了许多,就跟Web框架的应用一个道理。
大数据计算框架解决的是什么呢?
简单来说,就是:
- 如何基于分布式存储分配计算资源
- 如何调度计算任务
今天这篇文章先介绍一下作为编程模型的 MapReduce
。计算框架的介绍放在下一篇。
MarReduce - 编程模型
由于我们的数据是存放在分布式文件系统中,自然不能用传统的编程模型来完成任务了。
接下来我用3个代码例子来解释传统编程模型和分布式模型的区别,以经典的WordCount程序为例。
Example 1: Python语言单机版WordCount
我们可以创建一个 Hash Table
,然后遍历文本中的每一个单词。
如果在 Hash Table
中存在,就将 key(单词)的 value +1
;
否则将单词作为 key 添加到 Hash Table
中。
# %%
# 文本前期处理
strlList = """Hello World
Bye World
Hello World
Bye World"""
strlList = strlList.replace('\n', ' ').lower().split(' ')
countDict = {}
# %%
# 如果字典里有该单词则加1,否则添加入字典
for w in strlList:
if w in countDict:
countDict[w] += 1
else:
countDict[w] = 1
# %%
print(countDict)
# output:
# {'hello': 2, 'world': 4, 'bye': 2}
如果文本很长,我们担心不能把文本一次性加载到内存中,也可以使用 Generator
特性来完成迭代操作。但是这样将会花费很长的时间。这是第一个问题。
我们自然会想到可以通过多进程的方式来增加处理速度,对文本设置多个偏移量,并行地处理。
但是数据量大到一个机器无法装得下的情况下,就要考虑如何将数据分散地存放到一个集群中了。有一个经典的比喻:
如果有一块巨大的石头拉不动,你不会找一头巨型牛来拉,而会去找一群牛来一齐拉。
那么如何将数据分散地存放在一个集群中,并尽可能快地处理这些数据呢,这是第二个问题。
试想一下,如果让你来设计一个并行处理分散在多个服务器上的数据的系统,你会考虑哪些问题?
明显地,这需要结合数据源来设计,因为数据源(HDFS)是采用分布式设计的,所以像上面这样的单机版逻辑就不满足需求了。
Example 2: Shell脚本的MapReduce
现在我们来看一个复杂的,不用大数据框架来完成的MapReduce模型的计算任务。这是一个基于 Shell 脚本实现的程序。
这个示例来自这本书 - Hadoop: The Definitive Guide, 4th Edition
以下的示例,我们编写一个挖掘天气数据的程序。气象传感器每小时在全球许多地方收集数据,并收集大量的日志数据,这是使用MapReduce进行分析的理想选择,因为我们要处理所有数据,并且数据是半结构化(semi-structured)且记录导向的(record-oriented)。
资料格式
我们将使用的数据来自国家气候数据中心(National Climatic Data Center)(NCDC)。
数据是使用行导向的(line-oriented)ASCII格式存储的,其中每一行都是一条记录。该格式包含一组丰富的气象元素,其中许多是可选的或者具有可变的数据长度。
为简单起见,我们关注一些始终存在且宽度固定的基本元素,例如温度。
示例2-1显示了一个示例行,其中标注了一些显著的字段。该行已经被分为多行以显示每个字段;在实际的文件中,这些字典打包在同一行,没有分行。
示例2-1 NCDC记录的格式
0057
332130 # USAF weather station identifier
99999 # WBAN weather station identifier
19500101 # observation date
0300 # observation time
4
+51317 # latitude (degrees x 1000)
+028783 # longitude (degrees x 1000)
FM-12
+0171 # elevation (meters)
99999
V020
320 # wind direction (degrees)
1 # quality code
N
0072
1
00450 # sky ceiling height (meters)
1 # quality code
C
N
010000 # visibility distance (meters)
1 # quality code
N
9
-0128 # air temperature (degrees Celsius x 10)
1 # quality code
-0139 # dew point temperature (degrees Celsius x 10)
1 # quality code
10268 # atmospheric pressure (hectopascals x 10)
1 # quality code
NOTE: 在一个文件中应该保存着多行这样的数据,我们可以看到这样一行数据中有气象站的代号、日期、时间、温度等重要信息。这一行信息的时间单位可能是一天,也可能是几个小时。
数据文件按日期和气象站来组织。从1901年到2001年,每年都有一个目录,每个目录都包含每个气象站的压缩文件以及该年的读书。例如,以下是1990年的第一个条目:
% ls raw/1990 | head
010010-99999-1990.gz
010014-99999-1990.gz
010015-99999-1990.gz
010016-99999-1990.gz
010017-99999-1990.gz
010030-99999-1990.gz
010040-99999-1990.gz
010080-99999-1990.gz
010100-99999-1990.gz
010150-99999-1990.gz
NOTE: 例如
010010-99999-1990.gz
这个文件中应该是包含一个文件,该文件中包含多条示例2-1那样的数据。
99999 是某个气象站的编号。
由上面的例子推测,1990年,编号为99999的气象站可能产生很多个上面那样的gz
压缩文件,其中每个gz
的内容为多条示例2-1的数据。
有数万个气象站,因此整个数据集由大量的相对小的文件组成。通常来说,处理较少数量的相对较大的文件更容易,更高效,因此我们先对数据做了预处理,以便将每年的读数合并为一个文件。
资料预处理
由于上述的原因,我们需要先对这些大量的小文件先进行预处理,我们想将一年的数据压缩成一个文件。
我们使用一个剧有map
函数的程序来做这件事,不需要reduce
函数,因为我们不需要并行地合并,map
函数可以并行执行所有文件处理。
这段处理过程已经使用了Hadoop的MapReduce框架,所以在此就省略了,感兴趣的可以点击这里查看这个程序的源码:
MapReduce - streming接口示例
处理完成后的文件列表是这样的:
% ls -1 all/
1901.tar.bz2
1902.tar.bz2
1903.tar.bz2
...
2000.tar.bz2
每个tar
文件都包含所有气象站的当年的读数的文件,并使用gzip
压缩。(也就是说1901.tar.bz2
中保存的还是压缩文件,所以这个1901.tar.bz2
本身的bzip2
压缩是多余的)
% tar jxf 1901.tar.bz2
% ls 1901 | head
029070-99999-1901.gz
029500-99999-1901.gz
029600-99999-1901.gz
029720-99999-1901.gz
029810-99999-1901.gz
227070-99999-1901.gz
使用Unix工具分析数据
现在我们的需求是计算每一年中全球的最高气温。
看看用 shell 脚本要如何处理:
我们使用处理行导向的经典工具 - awk
。
参照示例2-2
示例2-2 一个从NCDC气象记录中逐年查找最高记录温度的程序
#!/usr/bin/env bash
for year in all/*
do
echo -ne `basename $year .gz`"\t"
gunzip -c $year | \
awk '{ temp = substr($0, 88, 5) + 0;
q = substr($0, 93, 1);
if (temp !=9999 && q ~ /[01459]/ && temp > max) max = temp }
END { print max }'
done
该脚本循环遍历压缩的year
文件,首先打印年份,然后使用awk
处理每个文件。awk
脚本从数据提取两个字段:空气温度(air temperature)和质量代码(quality code)。
空气温度值通过加0变成整数。
接下来,进行测试以查看温度是否有效(9999这个值表示NCDC数据集中的缺失值)以及质量代码表示读数是可信的还是错误的。
如果读数正常,则将该值与到目前为止看到的最大值进行比较,如果找到新的最大值,则将更新该最大值。END
代码块将在awk
处理完所有行后被执行,并输出最大值。
脚本运行之后看起来像这样:
% ./max_temperature.sh
1901 317
1902 244
1903 289
1904 256
1905 283
...
源文件中的温度值放大了10倍,因此得出1901年的最高温度为31.7°C(本世纪初读数很少,所以这是合理的)。
计算这一个世纪每年的最高温,这个程序完全运行在一台EC2 High-CPU Extra Large实例上,一次运行了42分钟。
为了加快处理速度,我们需要并行运行程序的各个部分。从理论上讲,这很简单:我们可以使用机器上所有可用的硬件线程,在不同的过程中处理不同的年份。但是,这有一些问题。
首先,将工作分成相等大小的部分并不总是那么容易。在这种情况下,不同年份的文件大小会有很大差异,因此某些进程将比其他进程更早完成。即使他们接手进一步的工作,整个运行仍以最长的文件为主。尽管需要更多工作,但是更好的方法是将输入拆分为固定大小的块,并将每个块分配给一个进程。
NOTE: 抽象地考虑,这里的工作有计算和存储两层含义。所以大数据处理在收集原始数据的时候就要和传统编程方式下不一样了。
其次,将独立进程的结果合并可能需要进一步处理。在这种情况下,每年的结果与其他年份无关,可以通过合并所有结果并按年份排序来合并它们。如果使用固定大小的块方法,则组合会更加精细。对于此示例,特定年份的数据通常会分为几个块,每个块独立处理。我们将得出每个块的最高温度,因此最后一步是寻找每年这些最高温度中的最高温度。
在资料预处理的阶段我们使用了Hadoop的Map功能,上述的shell脚本则是相当于一个Reduce功能的程序。
第三,您仍然受到单台计算机处理能力的限制。如果您现有的处理器数量可以达到的最佳时间是20分钟,那就是这样了。您无法使其运行更快。此外,某些数据集超出了单台计算机的容量。当我们开始使用多台机器时,就会需要考虑许多其他因素了,主要是协调性和可靠性的范畴。谁负责整体工作?我们如何处理失败的进程?
因此,尽管并行处理是可行的,但在实践中却很棘手。使用像Hadoop
这样的框架来解决这些问题是一个很好的选择。
事实上,您会发现不得不开发一个像样的分布式系统来做这样的事。
Example 3: Java语言使用Hadoop的MapReduce
Hadoop
在并行处理上有优势,要使用Hadoop
,我们需要将查询(query)表示为MapReduce
作业(job)。经过一些本地的小规模测试之后,我们将能够在一组机器上运行它。
Map and Reduce
MapReduce
通过将处理分为两个阶段进行工作:Map阶段和Reduce阶段。
每个阶段都有键值对作为输入和输出,程序员可以选择其类型。程序员还需要指定用于Map和Reduce的函数 - map函数和reduce函数。
Map阶段的输入是原始NCDC数据。输入的数据是一个键值对(key-value pair)。
我们选择一种文本输入格式,该格式将数据集
(dataset)中的每一行作为文本类型,并作为值(value)处理。
键(key)是文件的开头到该行的开头的偏移量。
NOTE: 文件开头的偏移量是0;第1行有120个字符,那么第二行的偏移量则是121;以此类推。
我们的map函数很简单。我们提取年份和气温,因为这是我们唯一感兴趣的字段。在这种情况下,map函数只是数据准备阶段,以便reduce函数可以执行以下操作:
找出每年的最高温度。
map函数也是删除不良记录的好方法:在这里,我们可以过滤掉缺失的、可疑的或错误的温度。
为了可视化map的工作方式,请考虑以下输入数据提示示例(已经删除一些未使用的列以适配页面,用省略号表示):
0067011990999991950051507004...9999999N9+00001+99999999999...
0043011990999991950051512004...9999999N9+00221+99999999999...
0043011990999991950051518004...9999999N9-00111+99999999999...
0043012650999991949032412004...0500001N9+01111+99999999999...
0043012650999991949032418004...0500001N9+00781+99999999999...
这些行以键值对的形式呈现给map函数:
(0, 0067011990999991950051507004 … 9999999N9 + 00001 + 99999999999…)
(106, 0043011990999991950051512004 … 9999999N9 + 00221 + 99999999999…)
(212, 0043011990999991950051518004 … 9999999N9- 00111 + 99999999999…)
(318, 0043012650999991949032412004 … 0500001N9 + 01111 + 99999999999…)
(424, 0043012650999991949032418004 … 0500001N9 + 00781 + 99999999999…)
key是行号在文件中的偏移量,我们在map函数中将其忽略。map函数仅提取年份和气温(以粗体显示),并将其作为输出输出(温度值已解释为整数):
(1950, 0)
(1950, 22)
(1950, −11)
(1949, 111)
(1949, 78)
map函数的输出在发送给reduce函数之前,由MapReduce框架处理。此处理会根据Key来对此key-value对进行排序和分组。因此,继续该示例,我们的reduce函数将会看到以下输入:
(1949, [111, 78])
(1950, [0, 22, −11])
每年都会显示所有气温读数的列表,所有reduce函数现在要做的是遍历列表并获取最大读书:
(1949, 111)
(1950, 22)
这是最终输出:每年记录的最高全球温度。
NOTE: map 函数的输入主要是一个 <Key, Value> 对,在这个例子里,Value 是要统计的所有文本中的一行数据,Key 在一般计算中都不会用到。
整个数据流如图2-1所示。
该图的底部是一个Unix管道,它模仿了整个MapReduce流,当我们研究Hadoop Streaming时。以后研究Hadoop Streaming时,我们会在此看到。
图2-1。MapReduce逻辑数据流
JAVA MAPREDUCE
上面我们看了MapReduce程序工作的机制,下一步是要通过代码表达它。我们需要三件事:一个map函数,一个reduce函数以及一些代码以运行这个job。map函数由Mapper类来表示,该类声明一个抽象map()方法。示例2-3显示了我们的map函数的实现。
示例2-3 最高温度的Mapper示例
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
Mapper类是一个通用类型(Generic Type),具有四个形式参数以指定输入键(input key)、输入值(input
value)、输出键(output key)和输出值(output value)的类型。
对于本示例,输入键是一个长整数偏移量,输入值是一行文本,输出键是年份,输出值是气温(整数)。
Hadoop不使用内置的Java类型,而是提供了自己的一组基本类型。这些类型针对网络序列化进行了优化。这些类型位于org.apache.hadoop.io
这个软件包。
在这里,我们使用LongWritable,其对应于一个Java Long,Text(如Java String),和IntWritable(如Java Integer)。
该map()方法传递了一个键和一个值。我们将Text包含输入行的值转换为Java String,然后使用其substring()方法提取我们感兴趣的列。
该map()方法还提供了一个Context实例,用于将输出写入到reduce()。在这个例子中,我们将年份(year)输出为一个Text对象(因为我们只将它当作一个key),然后我们用IntWritable来包装。
我们只有在温度值不为空并且其品质代码(quality code)表示气温读书为OK时才会输出一个记录。
与map函数相似,reduce函数使用Reducer类来实现,如下面的示例2-4。
示例2-4 计算最高温度的Reducer示例
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperatureReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
同样,四个形式参数用于指定输入和输出类型,这一次是reduce函数。reduce函数的输入类型必须与map函数的输出类型匹配:Text和IntWritale。
在这个示例中,reduce函数的输出类型是Text和IntWritable,代表了年份和其最高温度,这是通过迭代温度并将每个温度与迄今为止找到的最高记录进行比较而得出的。
第三段代码运行这个MapReduce作业(请参见示例2-5)。
示例2-5 在天气数据集中查找最高温的应用
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MaxTemperature {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
我们需要遵循Job对象的规范来实现这个作业,使您可以控制作业的运行方式。当我们在Hadoop集群上运行此作业时,我们会将代码打包到一个JAR文件中(Hadoop将在集群中分发该文件)。
无需显示指定JAR文件的名称,我们可以在Job的setJarByClass()方法中传递一个类,Hadoop将通过查找包含此类的JAR文件来使用该类来找到相关的JAR文件。
构造Job对象后,我们指定输入和输出路径。通过调用FileInputFormat上的静态的addInputPath()方法,来指定输入路径,并且它可以是一个单个的文件,或者一个目录(这种情况下,输入将会由该目录的所有文件组成),或者是一个文件模式(正则表达式)。
顾名思义,addInputPath()可以被调用多次来添加多个路径。
输出路径(只有一个)是通过调用FileOutputFormat上的静态的setOutputPath()方法来指定的。
它指定了reduce函数输出文件的目录。
在运行作业之前,该目录应该存在,因为Hadoop会抱怨而不运行作业。此预防措施是为了防止数据丢失(意外地用一个作业的输出覆盖另一个长作业的输出会很烦人)。
接下来,我们通过setMapperClass()和setReducerClass()方法来指定map和reduce的类型。
该setOutputKeyClass()和setOutputValueClass()方法为reduce函数控制输出类型,并且必须和Reduce类的产出相匹配。
map输出类型默认为相同的类型,因此,如果mapper生成与reducer相同的类型,则不需要设置它们(就像我们的例子一样)。
但是,如果它们不同,则必须使用setMapOutputKeyClass()和setMapOutputValueClass()方法设置map输出类型。
输入类型是通过输入格式来控制的,由于我们使用的是默认的TextInputFormat,因此尚未明确设置输入格式。
在设置了定义map和reduce函数的类之后,我们就可以运行该作业了。在waitForCompletion()对方法Job提交作业并等待它完成。该方法的单个参数是指示是否生成详细输出的标志。如果为true,则作业将有关其进度的信息写入控制台。
waitForCompletion()方法的返回值是一个布尔值,指示成功(true)或失败(false),我们将其转换为程序的退出代码0或1。
小结
本文主要参考:
今天我们学习了MapReduce编程模型。这个模型既简单又强大,简单是因为它只包含Map和Reduce两个过程,强大之处在于它可以实现大数据领域几乎所有的计算需求。这也正是MapReduce这个模型令人着迷的地方。
以图2-1的这个查找最高温的程序为例,我们做的就是编写map函数,截取每行(key)中固定位置的字段作为输出给reduce的key(年份);截取温度(value)输出给reduce。
然后MapReduce框架会在数据输出给reduce之前,先帮我们进行一次reduce,也就是图中shuffle的阶段。
这个阶段会将相同的年份的value合并,原本的value是一个IntWritable,合并后会变为Iterable<IntWritable>
这么一个数组。
所以我们在reduce中需要实现业务逻辑就是遍历这个数组,取最大值。
可见,许多和key相关的操作,都是框架帮我们完成的,我们需要对MapReduce编程模型先加以了解,就可以按照它的规范完成自己的业务逻辑。