资讯专栏INFORMATION COLUMN

分布式计算框架MapReduce

Tecode / 1828人阅读

1 MapReduce概念 和 MapReduce编程模型
什么是MapReduce

  • 源于Google的MapReduce论文(2004年12月)
  • Hadoop的MapReduce是Google论文的开源实现
  • MapReduce优点: 海量数据离线处理&易开发
  • MapReduce缺点: 实时流式计算

MapReduce分而治之的思想

  • 数钱实例:一堆钞票,各种面值分别是多少
  • 单点策略
  • 一个人数所有的钞票,数出各种面值有多少张
  • 分治策略
  • 每个人分得一堆钞票,数出各种面值有多少张
  • 汇总,每个人负责统计一种面值
  • 解决数据可以切割进行计算的应用

MapReduce编程分Map和Reduce阶段

  • 将作业拆分成Map阶段和Reduce阶段
  • Map阶段 Map Tasks 分:把复杂的问题分解为若干"简单的任务"
  • Reduce阶段: Reduce Tasks 合:reduce

MapReduce编程执行步骤

  • 准备MapReduce的输入数据
  • 准备Mapper数据
  • Shuffle
  • Reduce处理
  • 结果输出

编程模型

  • 借鉴函数式编程方式
  • 用户只需要实现两个函数接口:

Map(in_keyin_value)

--->(out_keyintermediate_value) list

Reduce(out_keyintermediate_value) list

--->out_value list

  • Word Count 词频统计案例

2 应用MRJob编写MapReduce代码
mrjob 简介

  • 使用python开发在Hadoop上运行的程序 mrjob是最简单的方式
  • mrjob程序可以在本地测试运行也可以部署到Hadoop集群上运行
  • 如果不想成为hadoop专家 但是需要利用Hadoop写MapReduce代码mrJob是很好的选择

mrjob 安装

  • 使用pip安装

pip install mrjob
mrjob实现WordCount

from mrjob.job import MRJob

class MRWordCount(MRJob):

    #每一行从line中输入
    def mapper(self _ line):
        for word in line.split():
            yield word1

    # word相同的 会走到同一个reduce
    def reducer(self word counts):
        yield word sum(counts)

if __name__ == __main__:
    MRWordCount.run()

运行WordCount代码

打开命令行 找到一篇文本文档 敲如下命令:

python mr_word_count.py my_file.txt

运行MRJOB的不同方式

1、内嵌(-r inline)方式

特点是调试方便,启动单一进程模拟任务执行状态和结果,默认(-r inline)可以省略,输出文件使用 > output-file 或-o output-file,比如下面两种运行方式是等价的

python word_count.py -r inline input.txt > output.txt python word_count.py input.txt > output.txt

2、Hadoop(-r hadoop)方式

用于hadoop环境

python word_count.py -r hadoop hdfs:///test.txt -o  hdfs:///output

支持Hadoop运行调度控制参数,如:

1)指定Hadoop任务调度优先级(VERY_HIGH|HIGH)如:--jobconf mapreduce.job.priority=VERY_HIGH。

2)Map及Reduce任务个数限制,如:--jobconf mapreduce.map.tasks=2 --jobconf mapreduce.reduce.tasks=5

3 mrjob 实现 topN统计(实验)
统计数据中出现次数最多的前n个数据

import sys
from mrjob.job import MRJobMRStep
import heapq

class TopNWords(MRJob):
    def mapper(self _ line):
        if line.strip() != "":
            for word in line.strip().split():
                yield word1

    #介于mapper和reducer之间,用于临时的将mapper输出的数据进行统计
    def combiner(self word counts):
        yield wordsum(counts)

    def reducer_sum(self word counts):
        yield None(sum(counts)word)

    #利用heapq将数据进行排序,将最大的2个取出
    def top_n_reducer(self_word_cnts):
        for cntword in heapq.nlargest(2word_cnts):
            yield wordcnt

    #实现steps方法用于指定自定义的mapper,comnbiner和reducer方法
    def steps(self):
        #传入两个step 定义了执行的顺序
        return [
            MRStep(mapper=self.mapper
                   combiner=self.combiner
                   reducer=self.reducer_sum)
            MRStep(reducer=self.top_n_reducer)
        ]

def main():
    TopNWords.run()

if __name__==__main__:
    main()

4 MapReduce原理详解
单机程序计算流程

输入数据--->读取数据--->处理数据--->写入数据--->输出数据

Hadoop计算流程

input data:输入数据

InputFormat:对数据进行切分,格式化处理

map:将前面切分的数据做map处理(将数据进行分类,输出(kv)键值对数据)

shuffle&sort:将相同的数据放在一起,并对数据进行排序处理

reduce:将map输出的数据进行hash计算,对每个map数据进行统计计算

OutputFormat:格式化输出数据
image.png

image.png

map:将数据进行处理

buffer in memory:达到80%数据时,将数据锁在内存上,将这部分输出到磁盘上

partitions:在磁盘上有很多"小的数据",将这些数据进行归并排序。

merge on disk:将所有的"小的数据"进行合并。

reduce:不同的reduce任务,会从map中对应的任务中copy数据,在reduce中同样要进行merge操作

5 MapReduce架构
MapReduce架构 1.X
JobTracker:负责接收客户作业提交,负责任务到作业节点上运行,检查作业的状态
TaskTracker:由JobTracker指派任务,定期向JobTracker汇报状态,在每一个工作节点上永远只会有一个TaskTracker
image.png

MapReduce2.X架构

ResourceManager:负责资源的管理,负责提交任务到NodeManager所在的节点运行,检查节点的状态
NodeManager:由ResourceManager指派任务,定期向ResourceManager汇报状态
{{image.png(uploading...)}}

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/126017.html

相关文章

发表评论

0条评论

最新活动
阅读需要支付1元查看
<