主要记录了Hadoop各个组件的基本原理,处理过程和关键的知识点等,包括HDFS、YARN、MapReduce等。
本教程内容来源于 PennyWong
更新日期 | 更新内容 |
---|---|
2015-5-7 | Hadoop文档 |
硬件问题:复制数据解决(RAID)
分析需要从不同的硬盘读取数据:MapReduce
而Hadoop提供了
1.可靠的共享存储(分布式存储) 2.抽象的分析接口(分布式分析)
不能使用一台机器进行处理的数据
大数据的核心是样本=总体
1.数据分布在多台机器
可靠性:每个数据块都复制到多个节点
性能:多个节点同时处理数据
2.计算随数据走
网络IO速度 << 本地磁盘IO速度,大数据系统会尽量地将任务分配到离数据最近的机器上运行(程序运行时,将程序及其依赖包都复制到数据所在的机器运行)
代码向数据迁移,避免大规模数据时,造成大量数据迁移的情况,尽量让一段数据的计算发生在同一台机器上
3.串行IO取代随机IO
传输时间 << 寻道时间,一般数据写入后不再修改
Hadoop可运行于一般的商用服务器上,具有高容错、高可靠性、高扩展性等特点
特别适合写一次,读多次的场景
内部各个节点基本都是采用Master-Woker架构
Hadoop Distributed File System,分布式文件系统
Block数据块;
基本存储单位,一般大小为64M(配置大的块主要是因为:
1)减少搜寻时间,一般硬盘传输速率比寻道时间要快,大的块可以减少寻道时间;
2)减少管理块的数据开销,每个块都需要在NameNode上有对应的记录;
3)对数据块进行读写,减少建立网络的连接成本)
一个大文件会被拆分成一个个的块,然后存储于不同的机器。如果一个文件少于Block大小,那么实际占用的空间为其文件的大小
基本的读写单位,类似于磁盘的页,每次都是读写一个块
HDFS2.x以后的block默认128M
NameNode
存储文件的metadata,运行时所有数据都保存到内存,整个HDFS可存储的文件数受限于NameNode的内存大小
一个Block在NameNode中对应一条记录(一般一个block占用150字节),如果是大量的小文件,会消耗大量内存。同时map task的数量是由splits来决定的,所以用MapReduce处理大量的小文件时,就会产生过多的map task,线程管理开销将会增加作业时间。处理大量小文件的速度远远小于处理同等大小的大文件的速度。因此Hadoop建议存储大文件
数据会定时保存到本地磁盘,但不保存block的位置信息,而是由DataNode注册时上报和运行时维护(NameNode中与DataNode相关的信息并不保存到NameNode的文件系统中,而是NameNode每次重启后,动态重建)
Secondary NameNode
DataNode
保存具体的block数据
负责数据的读写操作和复制操作
DataNode启动时会向NameNode报告当前存储的数据块信息,后续也会定时报告修改信息
1.客户端将文件写入本地磁盘的 HDFS Client 文件中
2.当临时文件大小达到一个 block 大小时,HDFS client 通知 NameNode,申请写入文件
3.NameNode 在 HDFS 的文件系统中创建一个文件,并把该 block id 和要写入的 DataNode 的列表返回给客户端
4.客户端收到这些信息后,将临时文件写入 DataNodes
5.文件写完后(客户端关闭),NameNode 提交文件(这时文件才可见,如果提交前,NameNode 垮掉,那文件也就丢失了。fsync:只保证数据的信息写到 NameNode 上,但并不保证数据已经被写到DataNode 中)
Rack aware(机架感知)
通过配置文件指定机架名和 DNS 的对应关系
假设复制参数是3,在写入文件时,会在本地的机架保存一份数据,然后在另外一个机架内保存两份数据(同机架内的传输速度快,从而提高性能)
整个 HDFS 的集群,最好是负载平衡的,这样才能尽量利用集群的优势
HDFS 的可靠性主要有以下几点:
可以在 hdfs-site.xml 中设置复制因子指定副本数量
所有数据块都可副本
DataNode 启动时,遍历本地文件系统,产生一份 HDFS 数据块和本地文件的对应关系列表 (blockreport) 汇报给 Namenode
HDFS 的"机架感知",通过节点之间发送一个数据包,来感应它们是否在同一个机架
一般在本机架放一个副本,在其他机架再存放一个副本,这样可以防止机架失效时丢失数据,也可以提高带宽利用率
NameNode 周期性从 DataNode 接受心跳信息和块报告
NameNode 根据块报告验证元数据
没有按时发送心跳的 DataNode 会被标记为宕机,不会再给他任何 I/O 请求
如果 DataNode 失效造成副本数量下降,并且低于预先设定的值,NameNode 会检测出这些数据库,并在合适的时机重新复制
引发重新复制的原因还包括数据副本本身损坏,磁盘错误,复制因子被增大等
NameNode 启动时会先经过一个 "安全模式" 阶段
安全模式阶段不会产生数据写
在此阶段 NameNode 收集各个 DataNode 的报告, 当数据块达到最小副本数以上时,会被认为是"安全"的
在一定比例(可设置) 的数据块被确定为"安全" 后 ,在过若干时间,安全模式结束
当检测到副本数不足的数据块时,该块会被复制,直到达到最小副本数
在文件创立时,每个数据块都产生效验和
效验和会作为单独一个隐藏文件保存在命名空间下
客户端获取数据时可以检查效验和是否相同,从而发现数据块是否损坏
如果正在读取的数据块损坏,则可以继续读取其他副本
删除文件时,其实是放入回收站 /trash
回收站里的文件是可以快速恢复的
可以设置一个时间值,当回收站里文件的存放时间超过了这个值,就被彻底删除,并且释放占用的数据块
映像文件和事物日志是 NameNode 的核心数据.可以配置为拥有多个副本
副本会降低 NameNode 的处理速度,但增加安全性
NameNode 依然是单点,如果发生故障要手工切换
fsck: 检查文件的完整性
start-balancer.sh: 重新平衡HDFS
hdfs dfs -copyFromLocal 从本地磁盘复制文件到HDFS
此架构会有以下问题:
总的来说就是单点问题和资源利用率问题
YARN就是将 JobTracker 的职责进行拆分,将资源管理和任务调度监控拆分成独立的进程:一个全局的资源管理和一个每个作业的管理(ApplicationMaster) ResourceManager 和 NodeManager 提供了计算资源的分配和管理,而 ApplicationMaster 则完成应用程序的运行
YARN架构下形成了一个通用的资源管理平台和一个通用的应用计算平台,避免了旧架构的单点问题和资源利用率问题,同时也让在其上运行的应用不再局限于 MapReduce 形式
1. Job submission
从ResourceManager 中获取一个Application ID 检查作业输出配置,计算输入分片 拷贝作业资源(job jar、配置文件、分片信息)到 HDFS,以便后面任务的执行
2. Job initialization
ResourceManager 将作业递交给 Scheduler(有很多调度算法,一般是根据优先级)Scheduler 为作业分配一个 Container,ResourceManager 就加载一个 application master process 并交给 NodeManager。
管理 ApplicationMaster 主要是创建一系列的监控进程来跟踪作业的进度,同时获取输入分片,为每一个分片创建一个 Map task 和相应的 reduce task Application Master 还决定如何运行作业,如果作业很小(可配置),则直接在同一个 JVM 下运行
3. Task assignment
ApplicationMaster 向 Resource Manager 申请资源(一个个的Container,指定任务分配的资源要求)一般是根据 data locality 来分配资源
4. Task execution
ApplicationMaster 根据 ResourceManager 的分配情况,在对应的 NodeManager 中启动 Container 从 HDFS 中读取任务所需资源(job jar,配置文件等),然后执行该任务
5. Progress and status update
定时将任务的进度和状态报告给 ApplicationMaster Client 定时向 ApplicationMaster 获取整个任务的进度和状态
6. Job completion
Client定时检查整个作业是否完成 作业完成后,会清空临时文件、目录等
负责全局的资源管理和任务调度,把整个集群当成计算资源池,只关注分配,不管应用,且不负责容错
Node节点下的Container管理
ContainerManager: 接收RPC请求(启动、停止),资源本地化(下载应用需要的资源到本地,根据需要共享这些资源)
PUBLIC: /filecache
PRIVATE: /usercache//filecache
APPLICATION: /usercache//appcache//(在程序完成后会被删除)
ContainersLauncher: 加载或终止Container
单个作业的资源管理和任务监控
具体功能描述:
ApplicationMaster可以是用任何语言编写的程序,它和ResourceManager和NodeManager之间是通过ProtocolBuf交互,以前是一个全局的JobTracker负责的,现在每个作业都一个,可伸缩性更强,至少不会因为作业太多,造成JobTracker瓶颈。同时将作业的逻辑放到一个独立的ApplicationMaster中,使得灵活性更加高,每个作业都可以有自己的处理方式,不用绑定到MapReduce的处理模式上
如何计算资源需求
一般的MapReduce是根据block数量来定Map和Reduce的计算数量,然后一般的Map或Reduce就占用一个Container
如何发现数据的本地化
数据本地化是通过HDFS的block分片信息获取的
可以看出,一般的错误处理都是由当前模块的父模块进行监控(心跳)和恢复。而最顶端的模块则通过定时保存、同步状态和zookeeper来ֹ实现HA
一种分布式的计算方式指定一个Map(映#x5C04;)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组
map: (K1, V1) → list(K2, V2) combine: (K2, list(V2)) → list(K2, V2) reduce: (K2, list(V2)) → list(K3, V3)
Map输出格式和Reduce输入格式一定是相同的
MapReduce主要是先读取文件数据,然后进行Map处理,接着Reduce处理,最后把处理结果写到文件中
记录阅读器会翻译由输入格式生成的记录,记录阅读器用于将数据解析给记录,并不分析记录自身。记录读取器的目的是将数据解析成记录,但不分析记录本身。它将数据以键值对的形式传输给mapper。通常键是位置信息,值是构成记录的数据存储块.自定义记录不在本文讨论范围之内.
在映射器中用户提供的代码称为中间对。对于键值的具体定义是慎重的,因为定义对于分布式任务的完成具有重要意义.键决定了数据分类的依据,而值决定了处理器中的分析信息.本书的设计模式将会展示大量细节来解释特定键值如何选择.
ruduce任务以随机和排序步骤开始。此步骤写入输出文件并下载到本地计算机。这些数据采用键进行排序以把等价密钥组合到一起。
reduce采用分组数据作为输入。该功能传递键和此键相关值的迭代器。可以采用多种方式来汇总、过滤或者合并数据。当reduce功能完成,就会发送0个或多个键值对。
输出格式会转换最终的键值对并写入文件。默认情况下键和值以tab分割,各记录以换行符分割。因此可以自定义更多输出格式,最终数据会写入HDFS。类似记录读取,自定义输出格式不在本书范围。
通过InputFormat决定读取的数据的类型,然后拆分成一个个InputSplit,每个InputSplit对应一个Map处理,RecordReader读取InputSplit的内容给Map
决定读取数据的格式,可以是文件或数据库等
List getSplits(): 获取由输入文件计算出输入分片(InputSplit),解决数据或文件分割成片问题
RecordReader createRecordReader(): 创建RecordReader,从InputSplit中读取数据,解决读取分片中数据问题
TextInputFormat: 输入文件中的每一行就是一个记录,Key是这一行的byte offset,而value是这一行的内容
KeyValueTextInputFormat: 输入文件中每一行就是一个记录,第一个分隔符字符切分每行。在分隔符字符之前的内容为Key,在之后的为Value。分隔符变量通过key.value.separator.in.input.line变量设置,默认为( )字符。
NLineInputFormat: 与TextInputFormat一样,但每个数据块必须保证有且只有N行,mapred.line.input.format.linespermap属性,默认为1
SequenceFileInputFormat: 一个用来读取字符流数据的InputFormat,<key,value>为用户自定义的。字符流数据是Hadoop自定义的压缩的二进制数据格式。它用来优化从一个MapReduce任务的输出到另一个MapReduce任务的输入之间的数据传输过程。</key,value>
代表一个个逻辑分片,并没有真正存储数据,只是提供了一个如何将数据分片的方法
Split内有Location信息,利于数据局部化
一个InputSplit给一个单独的Map处理
public abstract class InputSplit { /** * 获取Split的大小,支持根据size对InputSplit排序. */ public abstract long getLength() throws IOException, InterruptedException; /** * 获取存储该分片的数据所在的节点位置. */ public abstract String[] getLocations() throws IOException, InterruptedException;}
将InputSplit拆分成一个个<key,value>对给Map处理,也是实际的文件读取分隔对象</key,value>
CombineFileInputFormat可以将若干个Split打包成一个,目的是避免过多的Map任务(因为Split的数目决定了Map的数目,大量的Mapper Task创建销毁开销将是巨大的)
通常一个split就是一个block(FileInputFormat仅仅拆分比block大的文件),这样做的好处是使得Map可以在存储有当前数据的节点上运行本地的任务,而不需要通过网络进行跨节点的任务调度
通过mapred.min.split.size, mapred.max.split.size, block.size来控制拆分的大小
如果mapred.min.split.size大于block size,则会将两个block合成到一个split,这样有部分block数据需要通过网络读取
如果mapred.max.split.size小于block size,则会将一个block拆成多个split,增加了Map任务数(Map对split进行计算并且上报结果,关闭当前计算打开新的split均需要耗费资源)
先获取文件在HDFS上的路径和Block信息,然后根据splitSize对文件进行切分( splitSize = computeSplitSize(blockSize, minSize, maxSize) ),默认splitSize 就等于blockSize的默认值(64m)
public List<InputSplit> getSplits(JobContext job) throws IOException { // 首先计算分片的最大和最小值。这两个值将会用来计算分片的大小 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); // generate splits List<InputSplit> splits = new ArrayList<InputSplit>(); List<FileStatus> files = listStatus(job); for (FileStatus file: files) { Path path = file.getPath(); long length = file.getLen(); if (length != 0) { FileSystem fs = path.getFileSystem(job.getConfiguration()); // 获取该文件所有的block信息列表[hostname, offset, length] BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); // 判断文件是否可分割,通常是可分割的,但如果文件是压缩的,将不可分割 if (isSplitable(job, path)) { long blockSize = file.getBlockSize(); // 计算分片大小 // 即 Math.max(minSize, Math.min(maxSize, blockSize)); long splitSize = computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining = length; // 循环分片。 // 当剩余数据与分片大小比值大于Split_Slop时,继续分片, 小于等于时,停止分片 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; } // 处理余下的数据 if (bytesRemaining != 0) { splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkLocations.length-1].getHosts())); } } else { // 不可split,整块返回 splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts())); } } else { // 对于长度为0的文件,创建空Hosts列表,返回 splits.add(makeSplit(path, 0, length, new String[0])); } } // 设置输入文件数量 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); LOG.debug("Total # of splits: " + splits.size()); return splits;}
split是根据文件大小分割的,而一般处理是根据分隔符进行分割的,这样势必存在一条记录横跨两个split
解决办法是只要不是第一个split,都会远程读取一条记录。不是第一个split的都忽略到第一条记录
public class LineRecordReader extends RecordReader<LongWritable, Text> { private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; private LineReader in; private int maxLineLength; private LongWritable key = null; private Text value = null; // initialize函数即对LineRecordReader的一个初始化 // 主要是计算分片的始末位置,打开输入流以供读取K-V对,处理分片经过压缩的情况等 public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); compressionCodecs = new CompressionCodecFactory(job); final CompressionCodec codec = compressionCodecs.getCodec(file); // 打开文件,并定位到分片读取的起始位置 FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); boolean skipFirstLine = false; if (codec != null) { // 文件是压缩文件的话,直接打开文件 in = new LineReader(codec.createInputStream(fileIn), job); end = Long.MAX_VALUE; } else { // 只要不是第一个split,则忽略本split的第一行数据 if (start != 0) { skipFirstLine = true; --start; // 定位到偏移位置,下次读取就会从偏移位置开始 fileIn.seek(start); } in = new LineReader(fileIn, job); } if (skipFirstLine) { // 忽略第一行数据,重新定位start start += in.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE, end - start)); } this.pos = start; } public boolean nextKeyValue() throws IOException { if (key == null) { key = new LongWritable(); } key.set(pos);// key即为偏移量 if (value == null) { value = new Text(); } int newSize = 0; while (pos < end) { newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength)); // 读取的数据长度为0,则说明已读完 if (newSize == 0) { break; } pos += newSize; // 读取的数据长度小于最大行长度,也说明已读取完毕 if (newSize < maxLineLength) { break; } // 执行到此处,说明该行数据没读完,继续读入 } if (newSize == 0) { key = null; value = null; return false; } else { return true; } }}
主要是读取InputSplit的每一个Key,Value对并进行处理
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { /** * 预处理,仅在map task启动时运行一次 */ protected void setup(Context context) throws IOException, InterruptedException { } /** * 对于InputSplit中的每一对<key, value>都会运行一次 */ @SuppressWarnings("unchecked") protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); } /** * 扫尾工作,比如关闭流等 */ protected void cleanup(Context context) throws IOException, InterruptedException { } /** * map task的驱动器 */ public void run(Context context) throws IOException, InterruptedException { setup(context); while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } cleanup(context); }}public class MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { private RecordReader<KEYIN, VALUEIN> reader; private InputSplit split; /** * Get the input split for this map. */ public InputSplit getInputSplit() { return split; } @Override public KEYIN getCurrentKey() throws IOException, InterruptedException { return reader.getCurrentKey(); } @Override public VALUEIN getCurrentValue() throws IOException, InterruptedException { return reader.getCurrentValue(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { return reader.nextKeyValue(); }}
对Map的结果进行排序并传输到Reduce进行处理 Map的结果并不是直接存放到硬盘,而是利用缓存做一些预排序处理 Map会调用Combiner,压缩,按key进行分区、排序等,尽量减少结果的大小 每个Map完成后都会通知Task,然后Reduce就可以进行处理
当Map程序开始产生结果的时候,并不是直接写到文件的,而是利用缓存做一些排序方面的预处理操作
每个Map任务都有一个循环内存缓冲区(默认100MB),当缓存的内容达到80%时,后台线程开始将内容写到文件,此时Map任务可以继续输出结果,但如果缓冲区满了,Map任务则需要等待
写文件使用round-robin方式。在写入文件之前,先将数据按照Reduce进行分区。对于每一个分区,都会在内存中根据key进行排序,如果配置了Combiner,则排序后执行Combiner(Combine之后可以减少写入文件和传输的数据)
每次结果达到缓冲区的阀值时,都会创建一个文件,在Map结束时,可能会产生大量的文件。在Map完成前,会将这些文件进行合并和排序。如果文件的数量超过3个,则合并后会再次运行Combiner(1、2个文件就没有必要了)
如果配置了压缩,则最终写入的文件会先进行压缩,这样可以减少写入和传输的数据
一旦Map完成,则通知任务管理器,此时Reduce就可以开始复制结果数据
Map的结果文件都存放到运行Map任务的机器的本地硬盘中
如果Map的结果很少,则直接放到内存,否则写入文件中
同时后台线程将这些文件进行合并和排序到一个更大的文件中(如果文件是压缩的,则需要先解压)
当所有的Map结果都被复制和合并后,就会调用Reduce方法
Reduce结果会写入到HDFS中
一般的原则是给shuffle分配尽可能多的内存,但前提是要保证Map、Reduce任务有足够的内存
对于Map,主要就是避免把文件写入磁盘,例如使用Combiner,增大io.sort.mb的值
对于Reduce,主要是把Map的结果尽可能地保存到内存中,同样也是要避免把中间结果写入磁盘。默认情况下,所有的内存都是分配给Reduce方法的,如果Reduce方法不怎么消耗内存,可以mapred.inmem.merge.threshold设成0,mapred.job.reduce.input.buffer.percent设成1.0
在任务监控中可通过Spilled records counter来监控写入磁盘的数,但这个值是包括map和reduce的
对于IO方面,可以Map的结果可以使用压缩,同时增大buffer size(io.file.buffer.size,默认4kb)
属性 | 默认值 | 描述 |
---|---|---|
io.sort.mb | 100 | 映射输出分类时所使用缓冲区的大小. |
io.sort.record.percent | 0.05 | 剩余空间用于映射输出自身记录.在1.X发布后去除此属性.随机代码用于使用映射所有内存并记录信息. |
io.sort.spill.percent | 0.80 | 针对映射输出内存缓冲和记录索引的阈值使用比例. |
io.sort.factor | 10 | 文件分类时合并流的最大数量。此属性也用于reduce。通常把数字设为100. |
min.num.spills.for.combine | 3 | 组合运行所需最小溢出文件数目. |
mapred.compress.map.output | false | 压缩映射输出. |
mapred.map.output.compression.codec | DefaultCodec | 映射输出所需的压缩解编码器. |
mapred.reduce.parallel.copies | 5 | 用于向reducer传送映射输出的线程数目. |
mapred.reduce.copy.backoff | 300 | 时间的最大数量,以秒为单位,这段时间内若reducer失败则会反复尝试传输 |
io.sort.factor | 10 | 组合运行所需最大溢出文件数目. |
mapred.job.shuffle.input.buffer.percent | 0.70 | 随机复制阶段映射输出缓冲器的堆栈大小比例 |
mapred.job.shuffle.merge.percent | 0.66 | 用于启动合并输出进程和磁盘传输的映射输出缓冲器的阀值使用比例 |
mapred.inmem.merge.threshold | 1000 | 用于启动合并输出和磁盘传输进程的映射输出的阀值数目。小于等于0意味着没有门槛,而溢出行为由 mapred.job.shuffle.merge.percent单独管理. |
mapred.job.reduce.input.buffer.percent | 0.0 | 用于减少内存映射输出的堆栈大小比例,内存中映射大小不得超出此值。若reducer需要较少内存则可以提高该值. |
export LIBJARS=$MYLIB/commons-lang-2.3.jar, hadoop jar prohadoop-0.0.1-SNAPSHOT.jar org.aspress.prohadoop.c3. WordCountUsingToolRunner -libjars $LIBJARS
hadoop jar prohadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.aspress.prohadoop.c3. WordCountUsingToolRunner The dependent libraries are now included inside the application JAR file
一般还是上面的好,指定依赖可以利用Public Cache,如果是包含依赖,则每次都需要拷贝
序列化是指将结构化对象转化为字节流以便在网络上传输或写到磁盘进行永久存储的过程。反序列化是指将字节流转回结构化对象的逆过程。
序列化用于分布式数据处理的两大领域:进程间通信和永久存储
在Hadoop中,系统中多个节点进程间的通信是通过“远程过程调用”(RPC)实现的。RPC协议将消息序列化成二进制流后发送到远程节点,远程节点接着将二进制流反序列化为原始消息。通常情况下,RPC序列化格式如下:
1.紧凑
紧凑格式能充分利用网络带宽(数据中心最稀缺的资源)
2.快速
进程间通信形成了分布式系统的骨架,所以需要尽量减少序列化和反序列化的性能开销,这是基本的。
3.可扩展
为了满足新的需求,协议不断变化。所以在控制客户端和服务期的过程中,需要直接引进相应的协议。例如,需要能够在方法调用的过程中增加新的参数,并且新的服务器需要能够接受来自老客户端的老格式的消息(无新增的参数)。
4.支持互操作
对于系统来说,希望能够支持以不同语言写的客户端与服务器交互,所以需要设计一种特定的格式来满足这一需求。
Writable 接口
Writable 接口定义了两个方法:一个将其状态写入 DataOutput 二进制流,另一个从 DataInput二进制流读取状态。
BytesWritable
BytesWritable 是对二进制数据数组的封装。它的序列化格式为一个指定所含数据字节数的整数域(4字节),后跟数据内容的本身。例如,长度为2的字节数组包含数值3和5,序列化形式为一个4字节的整数(00000002)和该数组中的两个字节(03和05)
NullWritable
NullWritable 是 writable 的特殊类型,它的序列化长度为0。它并不从数据流中读取数据,也不写入数据。它充当占位符。
ObjectWritable和GenericWritable
ObjectWritable 是对 java 基本类型(String,enum,Writable,null或这些类型组成的数组)的一个通用封装。它在 Hadoop RPC 中用于对方法的参数和返回类型进行封装和解封装。
Wriable集合类
io 软件包共有6个 Writable 集合类,分别是 ArrayWritable,ArrayPrimitiveWritable,TwoDArrayWritable,MapWritable,SortedMapWritable以及EnumMapWritable
ArrayWritable 和 TwoDArrayWritable 是对 Writeble 的数组和两维数组(数组的数组)的实现。ArrayWritable 或 TwoDArrayWritable 中所有元素必须是同一类的实例。ArrayWritable 和 TwoDArrayWritable 都有get() ,set() 和 toArray()方法,toArray() 方法用于新建该数组的一个浅拷贝。
ArrayPrimitiveWritable 是对 Java 基本数组类型的一个封装。调用 set() 方法时,可以识别相应组件类型,因而无需通过继承该类来设置类型。
序列化框架
尽管大多数 Mapreduce 程序使用的都是 Writable 类型的键和值,但这并不是 MapReduce API 强制要求使用的。事实上,可以使用任何类型,只要能有一个机制对每个类型进行类型与二进制表示的来回转换就可以。
为了支持这个机制,Hadoop 有一个针对可替换序列化框架的 API 。序列化框架用一个 Serialization 实现来表示。Serialization 对象定义了从类型到 Serializer 实例(将对象转换为字节流)和 Deserializer 实例(将字节流转换为对象)的映射方式。
序列化IDL
还有许多其他序列化框架从不同的角度来解决问题:不通过代码来定义类型,而是使用接口定义语言以不依赖与具体语言的方式进行声明。由此,系统能够为其他语言生成模型,这种形式能有效提高互操作能力。它们一般还会定义版本控制方案。
两个比较流行的序列化框架 Apache Thrift 和Google的Protocol Buffers,常常用作二进制数据的永久存储格式。Mapreduce 格式对该类的支持有限,但在 Hadoop 内部,部分组件仍使用上述两个序列化框架来实现 RPC 和数据交换。
基于文件的数据结构
对于某些应用,我们需要一种特殊的数据结构来存储自己的数据。对于基于 Mapreduce 的数据处理,将每个二进制数据大对象单独放在各自的文件中不能实现可扩展性,所以 Hadoop 为此开发了很多更高层次的容器。
关于 SequenceFile 。
考虑日志文件,其中每一行文本代表一条日志记录。纯文本不适合记录二进制类型的数据。在这种情况下,Hadoop 的 SequenceFile 类非常合适,为二进制键值对提供了一个持久数据结构。将它作为日志文件的存储格式时,你可以自己选择键,以及值可以是 Writable 类型。
SequenceFile 也可以作为小文件的容器。HDFS和Mapreduce 是针对大文件优化的,所以通过 SequenceFile 类型将小文件包装起来,可以获得更高效率的存储和处理。
SequnceFile的写操作
通过 createWriter()静态方法可以创建 SequenceFile 对象,并返回 SequnceFile.Writer 实例。该静态方法有多个重载版本,但都需要制定待写入的数据流,Configuration 对象,以及键和值的类型。存储在 SequenceFIle 中的键和值并不一定是 Writable 类型。只要能够被 Sertialization 序列化和反序列化,任何类型都可以。
SequenceFile的读操作
从头到尾读取顺序文件不外乎创建 SequenceFile.reader 实例后反复调用 next() 方法迭代读取记录。读取的是哪条记录与使用的序列化框架有关。如果使用的是 Writable 类型,那么通过键和值作为参数的 next() 方法可以将数据流的下一条键值对读入变量中。
1.通过命令行接口显示 SequenceFile。
hadoop fs 命令有一个 -text 选项可以以文本形式显示顺序文件。该选项可以查看文件的代码,由此检测出文件的类型并将其转换为相应的文本。该选项可以识别 gzip 压缩文件,顺序文件和 Avro 数据文件;否则,假设输入为纯文本文件。
2. SequenceFile 的排序和合并。
Mapreduce 是对多个顺序文件进行排序(或合并)最有效的方法。Mapreduce 本身是并行的,并且可由你制定使用多少个 reducer 。例如,通过制定一个 reducer ,可以得到一个输出文件。
3.SequenceFile 的格式。
顺序文件由文件头和随后的一条或多条记录组成。顺序文件的前三个字节为 SEQ,紧随其后的一个字节表示顺序文件的版本号。文件头还包括其他字段,例如键和值的名称,数据压缩细节,用户定义的元数据以及同步标识。同步标识用于在读取文件时能够从任意位置开始识别记录边界。每个文件都有一个随机生成的同步标识,其值存储在文件头中,位于顺序文件中的记录与记录之间。同步标识的额外存储开销要求小于1%,所以没有必要在每条记录末尾添加该标识。
关于MapFile
MapFile 是已经排过序的 SequenceFile ,它有索引,所以可以按键查找。索引本身就是一个 SequenceFile ,包含 map 中的一小部分键。由于索引能够加载进内存,因此可以提供对主数据文件的快速查找。主数据文件则是另一个 SequenceFIle ,包含了所有的 map 条目,这些条目都按照键顺序进行了排序。
其他文件格式和面向列的格式
顺序文件和 map 文件是 Hadoop 中最早的,但并不是仅有的二进制文件格式,事实上,对于新项目而言,有更好的二进制格式可供选择。
Avro 数据文件在某些方面类似顺序文件,是面向大规模数据处理而设计的。但是 Avro 数据文件又是可移植的,它们可以跨越不同的编程语言使用。顺序文件,map 文件和 Avro 数据文件都是面向行的格式,意味着每一行的值在文件中是连续存储的。在面向列的格式中,文件中的行被分割成行的分片,然后每个分片以面向列的形式存储:首先存储每行第一列的值,然后是每行第2列的值,如此以往。
能够减少磁盘的占用空间和网络传输的量,并加速数据在网络和磁盘上的传输。
Hadoop 应用处理的数据集非常大,因此需要借助于压缩。使用哪种压缩格式与待处理的文件的大小,格式和所用的工具有关。比较各种压缩算法的压缩比和性能(从高到低):
1. 使用容器文件格式,例如顺序文件, Avro 数据文件。 ORCF 了说 Parquet 文件
2. 使用支持切分的压缩格式,例如 bzip2 或者通过索引实现切分的压缩格式,例子如LZO。
3. 在应用中将文件中切分成块,并使用任意一种他所格式为每个数据块建立压缩文件(不论它是否支持切分)。在这种情况下,需要合理选择数据大小,以确保压缩后的数据块的大小近似于HDFS块的大小。
4. 存储未经压缩的文件。
重点:压缩和拆分一般是冲突的(压缩后的文件的 block 是不能很好地拆分独立运行,很多时候某个文件的拆分点是被拆分到两个压缩文件中,这时 Map 任务就无法处理,所以对于这些压缩,Hadoop 往往是直接使用一个 Map 任务处理整个文件的分析)。对大文件不可使用不支持切分整个文件的压缩格式,会失去数据的特性,从而造成 Mapreduce 应用效率低下。
Map 的输出结果也可以进行压缩,这样可以减少 Map 结果到 Reduce 的传输的数据量,加快传输速率。
在 Mapreduce 中使用压缩
FileOutputFormat.setCompressOutput(job,true);FileOutputFormat.setOutputCompressorClass(job,GzipCodec.class);
如果输出生成顺序文件,可以设置 mapreduce.output.fileoutputformat.compress.type 属性来控制限制使用压缩格式。默认值是RECORD,即针对每条记录进行压缩。如果将其改为BLOCK,将针对一组记录进行压缩,这是推荐的压缩策略,因为它的压缩效率更高。
MRUnit单元测试Mapper和Reducer类在内存上独立运行, PipelineMapReduceDriver单线程运行.
LocalJobRunner单线程运行, 且仅有一个 Reducer能够启动conf.set("mapred.job.tracker", "local"); conf.set("fs.default.name", "file:////"); FileSystem fs = FileSystem.getLocal(conf);
MiniMRCluster, MiniYarnCluster, MiniDFSCluster 多线程
Hadoop官网:http://hadoop.apache.org/
确保 network 网络已经配置好,使用Xftp等类似工具进行上传,把 hadoop-2.7.5.tar.gz 上传到 /opt/hadoop 目录内。上传完成后,在 master 主机上执行以下代码:
cd /opt/hadoop
进入/opt/hadoop目录后,执行解压缩命令:
tar -zxvf hadoop-2.7.5.tar.gz
回车后系统开始解压,屏幕会不断滚动解压过程,执行成功后,系统在hadoop目录自动创建hadoop-2.7.5子目录。
然后修改文件夹名称为“hadoop”,即hadoop安装目录,执行修改文件夹名称命令:
mv hadoop-2.7.5 hadoop
注意:也可用Xftp查看相应目录是否存在,确保正确完成。
我们进入安装目录,查看一下安装文件,如果显示如图文件列表,说明压缩成功。
1.修改主机名称,我这里创建了三个虚拟主机,分别命名node-1,node-2,node-3,进入 network 文件删掉里面的内容直接写上主机名就可以了
vi /etc/sysconfig/network
2.映射 IP 和主机名,之后 reboot 重启主机
[root@node-1 redis-cluster]# vim /etc/hosts192.168.0.1 node-1192.168.0.2 node-2192.168.0.3 node-3
3.检测防火墙(要关闭防火墙),不同系统的防火墙关闭方式不一样,以下做个参考即可
1.service iptables stop 关闭2.chkconfig iptables off 强制关闭 除非手动开启不然不会自动开启3.chkconfig iptanles 查看4.service iptables status 查看防火墙状态
4.ssh 免密码登录
输入命令:ssh-keygen -t rsa 然后点击四个回车键,如下图所示:
然后通过 ssh-copy-id 对应主机 IP,之后通过“ssh 主机名/IP” 便可以不输入密码即可登录相应的主机系统
上传 hadoop 安装包解压后进入 hadoop-2.7.6/etc/hadoop 目录下
以下所有的 <property> </property> 都是写在各自相应配置文件末尾的 <configuration> 标签里面
第一个 hadoop-env.sh
vi hadoop-env.shexport JAVA_HOME=/usr/java/jdk1.8.0_171 #JAVA_HOME写上自己jdk 的安装路径
第二个 :core-site.xml
vi <strong>core-site.xml</strong>
<!-- 指定Hadoop所使用的文件系统schema(URI),HDFS的老大(NameNode)的地址 --><property> <name>fs.defaultFS</name> <value>hdfs://node-1:9000</value></property><!-- 定Hadoop运行是产生文件的存储目录。默认 --><property> <name>hadoop.tmp.dir</name> <value>/export/data/hddata</value></property>
第三个: hdfs-site.xml
vi <strong>hdfs-site.xml</strong>
<!-- 指定HDFS副本的数量,不修改默认为3个 --><property> <name>dfs.replication</name> <value>2</value></property><!-- dfs的SecondaryNameNode在哪台主机上 --><property> <name>dfs.namenode.secondary.http-address</name> <value>node-2:50090</value></property>
第四个: mapred-site.xml
mv mapred-site.xml.template mapred-site.xmlvi mapred-site.xml<!-- 指定MapReduce运行是框架,这里指定在yarn上,默认是local --><property> <name>mapreduce.framework.name</name> <value>yarn</value></property>
第五个:yarn-site.xml
vi <strong>yarn-site.xml</strong>
<!-- 指定yarn的老大ResourceManager的地址 --><property> <name>yarn.resourcemanager.hostname</name> <value>node-1</value></property><!-- NodeManager上运行的附属服务。需要配置成mapreduce_shuffle,才可以运行MapReduce程序默认值 --><property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value></property>
第六个:slaves 文件,里面写上从节点所在的主机名字
vi slavesnode-1node-2node-3
第七个:将 Hadoop 添加到环境变量
vi /etc/profileexport HADOOP_HOME=/export/server/hadoop-2.7.6export PATH=$HADOOP_HOME/bin:$PATH
然后将 hadoop 安装包远程发送给各个副计算机
scp -r /export/server/hadoop-2.7.6/ root@node-2:/export/server/scp -r /export/server/hadoop-2.7.6/ root@node-3:/export/server/
把配置好的环境变量也要远程发送给各个副计算机
scp -r /etc/profile root@node-2:/etc/scp -r /etc/profile root@node-3:/etc/
然后试所有的计算机环境变量生效
source /etc/profile
关于 hadoop的 配置说明
在 hadoop 官网,左下角点击 Documentation 点击相应的版本进去,拉到最左下角有 ***-default.xml
***-default.xml 是默认的配置,如果用户没有修改这些选项将会生效
***-site.xml 是用户自定义的配置
site的配置选项优先级大于 default 的,如果 site 里面有就会覆盖 default 里面的
Log yarn.log-aggregation-enable=true如果显示错误,则日志存储在节点管理器运行节点上。当聚集启用时所有日志进行汇总,任务完成后转移到HDFS。
Hadoop集群性能监控Ganglia, Nagios
使用Hadoop工具 Ambari管理集群
//www.51coolma.cn/hadoop/hadoop_big_data_overview.html
主要记录了Hadoop各个组件的基本原理,处理过程和关键的知识点等,包括HDFS、YARN、MapReduce等。
本教程内容来源于 PennyWong
更新日期 | 更新内容 |
---|---|
2015-5-7 | Hadoop文档 |
硬件问题:复制数据解决(RAID)
分析需要从不同的硬盘读取数据:MapReduce
而Hadoop提供了
1.可靠的共享存储(分布式存储) 2.抽象的分析接口(分布式分析)
不能使用一台机器进行处理的数据
大数据的核心是样本=总体
1.数据分布在多台机器
可靠性:每个数据块都复制到多个节点
性能:多个节点同时处理数据
2.计算随数据走
网络IO速度 << 本地磁盘IO速度,大数据系统会尽量地将任务分配到离数据最近的机器上运行(程序运行时,将程序及其依赖包都复制到数据所在的机器运行)
代码向数据迁移,避免大规模数据时,造成大量数据迁移的情况,尽量让一段数据的计算发生在同一台机器上
3.串行IO取代随机IO
传输时间 << 寻道时间,一般数据写入后不再修改
Hadoop可运行于一般的商用服务器上,具有高容错、高可靠性、高扩展性等特点
特别适合写一次,读多次的场景
内部各个节点基本都是采用Master-Woker架构
Hadoop Distributed File System,分布式文件系统
Block数据块;
基本存储单位,一般大小为64M(配置大的块主要是因为:
1)减少搜寻时间,一般硬盘传输速率比寻道时间要快,大的块可以减少寻道时间;
2)减少管理块的数据开销,每个块都需要在NameNode上有对应的记录;
3)对数据块进行读写,减少建立网络的连接成本)
一个大文件会被拆分成一个个的块,然后存储于不同的机器。如果一个文件少于Block大小,那么实际占用的空间为其文件的大小
基本的读写单位,类似于磁盘的页,每次都是读写一个块
HDFS2.x以后的block默认128M
NameNode
存储文件的metadata,运行时所有数据都保存到内存,整个HDFS可存储的文件数受限于NameNode的内存大小
一个Block在NameNode中对应一条记录(一般一个block占用150字节),如果是大量的小文件,会消耗大量内存。同时map task的数量是由splits来决定的,所以用MapReduce处理大量的小文件时,就会产生过多的map task,线程管理开销将会增加作业时间。处理大量小文件的速度远远小于处理同等大小的大文件的速度。因此Hadoop建议存储大文件
数据会定时保存到本地磁盘,但不保存block的位置信息,而是由DataNode注册时上报和运行时维护(NameNode中与DataNode相关的信息并不保存到NameNode的文件系统中,而是NameNode每次重启后,动态重建)
Secondary NameNode
DataNode
保存具体的block数据
负责数据的读写操作和复制操作
DataNode启动时会向NameNode报告当前存储的数据块信息,后续也会定时报告修改信息
1.客户端将文件写入本地磁盘的 HDFS Client 文件中
2.当临时文件大小达到一个 block 大小时,HDFS client 通知 NameNode,申请写入文件
3.NameNode 在 HDFS 的文件系统中创建一个文件,并把该 block id 和要写入的 DataNode 的列表返回给客户端
4.客户端收到这些信息后,将临时文件写入 DataNodes
5.文件写完后(客户端关闭),NameNode 提交文件(这时文件才可见,如果提交前,NameNode 垮掉,那文件也就丢失了。fsync:只保证数据的信息写到 NameNode 上,但并不保证数据已经被写到DataNode 中)
Rack aware(机架感知)
通过配置文件指定机架名和 DNS 的对应关系
假设复制参数是3,在写入文件时,会在本地的机架保存一份数据,然后在另外一个机架内保存两份数据(同机架内的传输速度快,从而提高性能)
整个 HDFS 的集群,最好是负载平衡的,这样才能尽量利用集群的优势
HDFS 的可靠性主要有以下几点:
可以在 hdfs-site.xml 中设置复制因子指定副本数量
所有数据块都可副本
DataNode 启动时,遍历本地文件系统,产生一份 HDFS 数据块和本地文件的对应关系列表 (blockreport) 汇报给 Namenode
HDFS 的"机架感知",通过节点之间发送一个数据包,来感应它们是否在同一个机架
一般在本机架放一个副本,在其他机架再存放一个副本,这样可以防止机架失效时丢失数据,也可以提高带宽利用率
NameNode 周期性从 DataNode 接受心跳信息和块报告
NameNode 根据块报告验证元数据
没有按时发送心跳的 DataNode 会被标记为宕机,不会再给他任何 I/O 请求
如果 DataNode 失效造成副本数量下降,并且低于预先设定的值,NameNode 会检测出这些数据库,并在合适的时机重新复制
引发重新复制的原因还包括数据副本本身损坏,磁盘错误,复制因子被增大等
NameNode 启动时会先经过一个 "安全模式" 阶段
安全模式阶段不会产生数据写
在此阶段 NameNode 收集各个 DataNode 的报告, 当数据块达到最小副本数以上时,会被认为是"安全"的
在一定比例(可设置) 的数据块被确定为"安全" 后 ,在过若干时间,安全模式结束
当检测到副本数不足的数据块时,该块会被复制,直到达到最小副本数
在文件创立时,每个数据块都产生效验和
效验和会作为单独一个隐藏文件保存在命名空间下
客户端获取数据时可以检查效验和是否相同,从而发现数据块是否损坏
如果正在读取的数据块损坏,则可以继续读取其他副本
删除文件时,其实是放入回收站 /trash
回收站里的文件是可以快速恢复的
可以设置一个时间值,当回收站里文件的存放时间超过了这个值,就被彻底删除,并且释放占用的数据块
映像文件和事物日志是 NameNode 的核心数据.可以配置为拥有多个副本
副本会降低 NameNode 的处理速度,但增加安全性
NameNode 依然是单点,如果发生故障要手工切换
fsck: 检查文件的完整性
start-balancer.sh: 重新平衡HDFS
hdfs dfs -copyFromLocal 从本地磁盘复制文件到HDFS
此架构会有以下问题:
总的来说就是单点问题和资源利用率问题
YARN就是将 JobTracker 的职责进行拆分,将资源管理和任务调度监控拆分成独立的进程:一个全局的资源管理和一个每个作业的管理(ApplicationMaster) ResourceManager 和 NodeManager 提供了计算资源的分配和管理,而 ApplicationMaster 则完成应用程序的运行
YARN架构下形成了一个通用的资源管理平台和一个通用的应用计算平台,避免了旧架构的单点问题和资源利用率问题,同时也让在其上运行的应用不再局限于 MapReduce 形式
1. Job submission
从ResourceManager 中获取一个Application ID 检查作业输出配置,计算输入分片 拷贝作业资源(job jar、配置文件、分片信息)到 HDFS,以便后面任务的执行
2. Job initialization
ResourceManager 将作业递交给 Scheduler(有很多调度算法,一般是根据优先级)Scheduler 为作业分配一个 Container,ResourceManager 就加载一个 application master process 并交给 NodeManager。
管理 ApplicationMaster 主要是创建一系列的监控进程来跟踪作业的进度,同时获取输入分片,为每一个分片创建一个 Map task 和相应的 reduce task Application Master 还决定如何运行作业,如果作业很小(可配置),则直接在同一个 JVM 下运行
3. Task assignment
ApplicationMaster 向 Resource Manager 申请资源(一个个的Container,指定任务分配的资源要求)一般是根据 data locality 来分配资源
4. Task execution
ApplicationMaster 根据 ResourceManager 的分配情况,在对应的 NodeManager 中启动 Container 从 HDFS 中读取任务所需资源(job jar,配置文件等),然后执行该任务
5. Progress and status update
定时将任务的进度和状态报告给 ApplicationMaster Client 定时向 ApplicationMaster 获取整个任务的进度和状态
6. Job completion
Client定时检查整个作业是否完成 作业完成后,会清空临时文件、目录等
负责全局的资源管理和任务调度,把整个集群当成计算资源池,只关注分配,不管应用,且不负责容错
Node节点下的Container管理
ContainerManager: 接收RPC请求(启动、停止),资源本地化(下载应用需要的资源到本地,根据需要共享这些资源)
PUBLIC: /filecache
PRIVATE: /usercache//filecache
APPLICATION: /usercache//appcache//(在程序完成后会被删除)
ContainersLauncher: 加载或终止Container
单个作业的资源管理和任务监控
具体功能描述:
ApplicationMaster可以是用任何语言编写的程序,它和ResourceManager和NodeManager之间是通过ProtocolBuf交互,以前是一个全局的JobTracker负责的,现在每个作业都一个,可伸缩性更强,至少不会因为作业太多,造成JobTracker瓶颈。同时将作业的逻辑放到一个独立的ApplicationMaster中,使得灵活性更加高,每个作业都可以有自己的处理方式,不用绑定到MapReduce的处理模式上
如何计算资源需求
一般的MapReduce是根据block数量来定Map和Reduce的计算数量,然后一般的Map或Reduce就占用一个Container
如何发现数据的本地化
数据本地化是通过HDFS的block分片信息获取的
可以看出,一般的错误处理都是由当前模块的父模块进行监控(心跳)和恢复。而最顶端的模块则通过定时保存、同步状态和zookeeper来ֹ实现HA
一种分布式的计算方式指定一个Map(映#x5C04;)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组
map: (K1, V1) → list(K2, V2) combine: (K2, list(V2)) → list(K2, V2) reduce: (K2, list(V2)) → list(K3, V3)
Map输出格式和Reduce输入格式一定是相同的
MapReduce主要是先读取文件数据,然后进行Map处理,接着Reduce处理,最后把处理结果写到文件中
记录阅读器会翻译由输入格式生成的记录,记录阅读器用于将数据解析给记录,并不分析记录自身。记录读取器的目的是将数据解析成记录,但不分析记录本身。它将数据以键值对的形式传输给mapper。通常键是位置信息,值是构成记录的数据存储块.自定义记录不在本文讨论范围之内.
在映射器中用户提供的代码称为中间对。对于键值的具体定义是慎重的,因为定义对于分布式任务的完成具有重要意义.键决定了数据分类的依据,而值决定了处理器中的分析信息.本书的设计模式将会展示大量细节来解释特定键值如何选择.
ruduce任务以随机和排序步骤开始。此步骤写入输出文件并下载到本地计算机。这些数据采用键进行排序以把等价密钥组合到一起。
reduce采用分组数据作为输入。该功能传递键和此键相关值的迭代器。可以采用多种方式来汇总、过滤或者合并数据。当reduce功能完成,就会发送0个或多个键值对。
输出格式会转换最终的键值对并写入文件。默认情况下键和值以tab分割,各记录以换行符分割。因此可以自定义更多输出格式,最终数据会写入HDFS。类似记录读取,自定义输出格式不在本书范围。
通过InputFormat决定读取的数据的类型,然后拆分成一个个InputSplit,每个InputSplit对应一个Map处理,RecordReader读取InputSplit的内容给Map
决定读取数据的格式,可以是文件或数据库等
List getSplits(): 获取由输入文件计算出输入分片(InputSplit),解决数据或文件分割成片问题
RecordReader createRecordReader(): 创建RecordReader,从InputSplit中读取数据,解决读取分片中数据问题
TextInputFormat: 输入文件中的每一行就是一个记录,Key是这一行的byte offset,而value是这一行的内容
KeyValueTextInputFormat: 输入文件中每一行就是一个记录,第一个分隔符字符切分每行。在分隔符字符之前的内容为Key,在之后的为Value。分隔符变量通过key.value.separator.in.input.line变量设置,默认为( )字符。
NLineInputFormat: 与TextInputFormat一样,但每个数据块必须保证有且只有N行,mapred.line.input.format.linespermap属性,默认为1
SequenceFileInputFormat: 一个用来读取字符流数据的InputFormat,<key,value>为用户自定义的。字符流数据是Hadoop自定义的压缩的二进制数据格式。它用来优化从一个MapReduce任务的输出到另一个MapReduce任务的输入之间的数据传输过程。</key,value>
代表一个个逻辑分片,并没有真正存储数据,只是提供了一个如何将数据分片的方法
Split内有Location信息,利于数据局部化
一个InputSplit给一个单独的Map处理
public abstract class InputSplit { /** * 获取Split的大小,支持根据size对InputSplit排序. */ public abstract long getLength() throws IOException, InterruptedException; /** * 获取存储该分片的数据所在的节点位置. */ public abstract String[] getLocations() throws IOException, InterruptedException;}
将InputSplit拆分成一个个<key,value>对给Map处理,也是实际的文件读取分隔对象</key,value>
CombineFileInputFormat可以将若干个Split打包成一个,目的是避免过多的Map任务(因为Split的数目决定了Map的数目,大量的Mapper Task创建销毁开销将是巨大的)
通常一个split就是一个block(FileInputFormat仅仅拆分比block大的文件),这样做的好处是使得Map可以在存储有当前数据的节点上运行本地的任务,而不需要通过网络进行跨节点的任务调度
通过mapred.min.split.size, mapred.max.split.size, block.size来控制拆分的大小
如果mapred.min.split.size大于block size,则会将两个block合成到一个split,这样有部分block数据需要通过网络读取
如果mapred.max.split.size小于block size,则会将一个block拆成多个split,增加了Map任务数(Map对split进行计算并且上报结果,关闭当前计算打开新的split均需要耗费资源)
先获取文件在HDFS上的路径和Block信息,然后根据splitSize对文件进行切分( splitSize = computeSplitSize(blockSize, minSize, maxSize) ),默认splitSize 就等于blockSize的默认值(64m)
public List<InputSplit> getSplits(JobContext job) throws IOException { // 首先计算分片的最大和最小值。这两个值将会用来计算分片的大小 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); // generate splits List<InputSplit> splits = new ArrayList<InputSplit>(); List<FileStatus> files = listStatus(job); for (FileStatus file: files) { Path path = file.getPath(); long length = file.getLen(); if (length != 0) { FileSystem fs = path.getFileSystem(job.getConfiguration()); // 获取该文件所有的block信息列表[hostname, offset, length] BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); // 判断文件是否可分割,通常是可分割的,但如果文件是压缩的,将不可分割 if (isSplitable(job, path)) { long blockSize = file.getBlockSize(); // 计算分片大小 // 即 Math.max(minSize, Math.min(maxSize, blockSize)); long splitSize = computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining = length; // 循环分片。 // 当剩余数据与分片大小比值大于Split_Slop时,继续分片, 小于等于时,停止分片 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; } // 处理余下的数据 if (bytesRemaining != 0) { splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkLocations.length-1].getHosts())); } } else { // 不可split,整块返回 splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts())); } } else { // 对于长度为0的文件,创建空Hosts列表,返回 splits.add(makeSplit(path, 0, length, new String[0])); } } // 设置输入文件数量 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); LOG.debug("Total # of splits: " + splits.size()); return splits;}
split是根据文件大小分割的,而一般处理是根据分隔符进行分割的,这样势必存在一条记录横跨两个split
解决办法是只要不是第一个split,都会远程读取一条记录。不是第一个split的都忽略到第一条记录
public class LineRecordReader extends RecordReader<LongWritable, Text> { private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; private LineReader in; private int maxLineLength; private LongWritable key = null; private Text value = null; // initialize函数即对LineRecordReader的一个初始化 // 主要是计算分片的始末位置,打开输入流以供读取K-V对,处理分片经过压缩的情况等 public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); compressionCodecs = new CompressionCodecFactory(job); final CompressionCodec codec = compressionCodecs.getCodec(file); // 打开文件,并定位到分片读取的起始位置 FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); boolean skipFirstLine = false; if (codec != null) { // 文件是压缩文件的话,直接打开文件 in = new LineReader(codec.createInputStream(fileIn), job); end = Long.MAX_VALUE; } else { // 只要不是第一个split,则忽略本split的第一行数据 if (start != 0) { skipFirstLine = true; --start; // 定位到偏移位置,下次读取就会从偏移位置开始 fileIn.seek(start); } in = new LineReader(fileIn, job); } if (skipFirstLine) { // 忽略第一行数据,重新定位start start += in.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE, end - start)); } this.pos = start; } public boolean nextKeyValue() throws IOException { if (key == null) { key = new LongWritable(); } key.set(pos);// key即为偏移量 if (value == null) { value = new Text(); } int newSize = 0; while (pos < end) { newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength)); // 读取的数据长度为0,则说明已读完 if (newSize == 0) { break; } pos += newSize; // 读取的数据长度小于最大行长度,也说明已读取完毕 if (newSize < maxLineLength) { break; } // 执行到此处,说明该行数据没读完,继续读入 } if (newSize == 0) { key = null; value = null; return false; } else { return true; } }}
主要是读取InputSplit的每一个Key,Value对并进行处理
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { /** * 预处理,仅在map task启动时运行一次 */ protected void setup(Context context) throws IOException, InterruptedException { } /** * 对于InputSplit中的每一对<key, value>都会运行一次 */ @SuppressWarnings("unchecked") protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); } /** * 扫尾工作,比如关闭流等 */ protected void cleanup(Context context) throws IOException, InterruptedException { } /** * map task的驱动器 */ public void run(Context context) throws IOException, InterruptedException { setup(context); while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } cleanup(context); }}public class MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { private RecordReader<KEYIN, VALUEIN> reader; private InputSplit split; /** * Get the input split for this map. */ public InputSplit getInputSplit() { return split; } @Override public KEYIN getCurrentKey() throws IOException, InterruptedException { return reader.getCurrentKey(); } @Override public VALUEIN getCurrentValue() throws IOException, InterruptedException { return reader.getCurrentValue(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { return reader.nextKeyValue(); }}
对Map的结果进行排序并传输到Reduce进行处理 Map的结果并不是直接存放到硬盘,而是利用缓存做一些预排序处理 Map会调用Combiner,压缩,按key进行分区、排序等,尽量减少结果的大小 每个Map完成后都会通知Task,然后Reduce就可以进行处理
当Map程序开始产生结果的时候,并不是直接写到文件的,而是利用缓存做一些排序方面的预处理操作
每个Map任务都有一个循环内存缓冲区(默认100MB),当缓存的内容达到80%时,后台线程开始将内容写到文件,此时Map任务可以继续输出结果,但如果缓冲区满了,Map任务则需要等待
写文件使用round-robin方式。在写入文件之前,先将数据按照Reduce进行分区。对于每一个分区,都会在内存中根据key进行排序,如果配置了Combiner,则排序后执行Combiner(Combine之后可以减少写入文件和传输的数据)
每次结果达到缓冲区的阀值时,都会创建一个文件,在Map结束时,可能会产生大量的文件。在Map完成前,会将这些文件进行合并和排序。如果文件的数量超过3个,则合并后会再次运行Combiner(1、2个文件就没有必要了)
如果配置了压缩,则最终写入的文件会先进行压缩,这样可以减少写入和传输的数据
一旦Map完成,则通知任务管理器,此时Reduce就可以开始复制结果数据
Map的结果文件都存放到运行Map任务的机器的本地硬盘中
如果Map的结果很少,则直接放到内存,否则写入文件中
同时后台线程将这些文件进行合并和排序到一个更大的文件中(如果文件是压缩的,则需要先解压)
当所有的Map结果都被复制和合并后,就会调用Reduce方法
Reduce结果会写入到HDFS中
一般的原则是给shuffle分配尽可能多的内存,但前提是要保证Map、Reduce任务有足够的内存
对于Map,主要就是避免把文件写入磁盘,例如使用Combiner,增大io.sort.mb的值
对于Reduce,主要是把Map的结果尽可能地保存到内存中,同样也是要避免把中间结果写入磁盘。默认情况下,所有的内存都是分配给Reduce方法的,如果Reduce方法不怎么消耗内存,可以mapred.inmem.merge.threshold设成0,mapred.job.reduce.input.buffer.percent设成1.0
在任务监控中可通过Spilled records counter来监控写入磁盘的数,但这个值是包括map和reduce的
对于IO方面,可以Map的结果可以使用压缩,同时增大buffer size(io.file.buffer.size,默认4kb)
属性 | 默认值 | 描述 |
---|---|---|
io.sort.mb | 100 | 映射输出分类时所使用缓冲区的大小. |
io.sort.record.percent | 0.05 | 剩余空间用于映射输出自身记录.在1.X发布后去除此属性.随机代码用于使用映射所有内存并记录信息. |
io.sort.spill.percent | 0.80 | 针对映射输出内存缓冲和记录索引的阈值使用比例. |
io.sort.factor | 10 | 文件分类时合并流的最大数量。此属性也用于reduce。通常把数字设为100. |
min.num.spills.for.combine | 3 | 组合运行所需最小溢出文件数目. |
mapred.compress.map.output | false | 压缩映射输出. |
mapred.map.output.compression.codec | DefaultCodec | 映射输出所需的压缩解编码器. |
mapred.reduce.parallel.copies | 5 | 用于向reducer传送映射输出的线程数目. |
mapred.reduce.copy.backoff | 300 | 时间的最大数量,以秒为单位,这段时间内若reducer失败则会反复尝试传输 |
io.sort.factor | 10 | 组合运行所需最大溢出文件数目. |
mapred.job.shuffle.input.buffer.percent | 0.70 | 随机复制阶段映射输出缓冲器的堆栈大小比例 |
mapred.job.shuffle.merge.percent | 0.66 | 用于启动合并输出进程和磁盘传输的映射输出缓冲器的阀值使用比例 |
mapred.inmem.merge.threshold | 1000 | 用于启动合并输出和磁盘传输进程的映射输出的阀值数目。小于等于0意味着没有门槛,而溢出行为由 mapred.job.shuffle.merge.percent单独管理. |
mapred.job.reduce.input.buffer.percent | 0.0 | 用于减少内存映射输出的堆栈大小比例,内存中映射大小不得超出此值。若reducer需要较少内存则可以提高该值. |
export LIBJARS=$MYLIB/commons-lang-2.3.jar, hadoop jar prohadoop-0.0.1-SNAPSHOT.jar org.aspress.prohadoop.c3. WordCountUsingToolRunner -libjars $LIBJARS
hadoop jar prohadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.aspress.prohadoop.c3. WordCountUsingToolRunner The dependent libraries are now included inside the application JAR file
一般还是上面的好,指定依赖可以利用Public Cache,如果是包含依赖,则每次都需要拷贝
序列化是指将结构化对象转化为字节流以便在网络上传输或写到磁盘进行永久存储的过程。反序列化是指将字节流转回结构化对象的逆过程。
序列化用于分布式数据处理的两大领域:进程间通信和永久存储
在Hadoop中,系统中多个节点进程间的通信是通过“远程过程调用”(RPC)实现的。RPC协议将消息序列化成二进制流后发送到远程节点,远程节点接着将二进制流反序列化为原始消息。通常情况下,RPC序列化格式如下:
1.紧凑
紧凑格式能充分利用网络带宽(数据中心最稀缺的资源)
2.快速
进程间通信形成了分布式系统的骨架,所以需要尽量减少序列化和反序列化的性能开销,这是基本的。
3.可扩展
为了满足新的需求,协议不断变化。所以在控制客户端和服务期的过程中,需要直接引进相应的协议。例如,需要能够在方法调用的过程中增加新的参数,并且新的服务器需要能够接受来自老客户端的老格式的消息(无新增的参数)。
4.支持互操作
对于系统来说,希望能够支持以不同语言写的客户端与服务器交互,所以需要设计一种特定的格式来满足这一需求。
Writable 接口
Writable 接口定义了两个方法:一个将其状态写入 DataOutput 二进制流,另一个从 DataInput二进制流读取状态。
BytesWritable
BytesWritable 是对二进制数据数组的封装。它的序列化格式为一个指定所含数据字节数的整数域(4字节),后跟数据内容的本身。例如,长度为2的字节数组包含数值3和5,序列化形式为一个4字节的整数(00000002)和该数组中的两个字节(03和05)
NullWritable
NullWritable 是 writable 的特殊类型,它的序列化长度为0。它并不从数据流中读取数据,也不写入数据。它充当占位符。
ObjectWritable和GenericWritable
ObjectWritable 是对 java 基本类型(String,enum,Writable,null或这些类型组成的数组)的一个通用封装。它在 Hadoop RPC 中用于对方法的参数和返回类型进行封装和解封装。
Wriable集合类
io 软件包共有6个 Writable 集合类,分别是 ArrayWritable,ArrayPrimitiveWritable,TwoDArrayWritable,MapWritable,SortedMapWritable以及EnumMapWritable
ArrayWritable 和 TwoDArrayWritable 是对 Writeble 的数组和两维数组(数组的数组)的实现。ArrayWritable 或 TwoDArrayWritable 中所有元素必须是同一类的实例。ArrayWritable 和 TwoDArrayWritable 都有get() ,set() 和 toArray()方法,toArray() 方法用于新建该数组的一个浅拷贝。
ArrayPrimitiveWritable 是对 Java 基本数组类型的一个封装。调用 set() 方法时,可以识别相应组件类型,因而无需通过继承该类来设置类型。
序列化框架
尽管大多数 Mapreduce 程序使用的都是 Writable 类型的键和值,但这并不是 MapReduce API 强制要求使用的。事实上,可以使用任何类型,只要能有一个机制对每个类型进行类型与二进制表示的来回转换就可以。
为了支持这个机制,Hadoop 有一个针对可替换序列化框架的 API 。序列化框架用一个 Serialization 实现来表示。Serialization 对象定义了从类型到 Serializer 实例(将对象转换为字节流)和 Deserializer 实例(将字节流转换为对象)的映射方式。
序列化IDL
还有许多其他序列化框架从不同的角度来解决问题:不通过代码来定义类型,而是使用接口定义语言以不依赖与具体语言的方式进行声明。由此,系统能够为其他语言生成模型,这种形式能有效提高互操作能力。它们一般还会定义版本控制方案。
两个比较流行的序列化框架 Apache Thrift 和Google的Protocol Buffers,常常用作二进制数据的永久存储格式。Mapreduce 格式对该类的支持有限,但在 Hadoop 内部,部分组件仍使用上述两个序列化框架来实现 RPC 和数据交换。
基于文件的数据结构
对于某些应用,我们需要一种特殊的数据结构来存储自己的数据。对于基于 Mapreduce 的数据处理,将每个二进制数据大对象单独放在各自的文件中不能实现可扩展性,所以 Hadoop 为此开发了很多更高层次的容器。
关于 SequenceFile 。
考虑日志文件,其中每一行文本代表一条日志记录。纯文本不适合记录二进制类型的数据。在这种情况下,Hadoop 的 SequenceFile 类非常合适,为二进制键值对提供了一个持久数据结构。将它作为日志文件的存储格式时,你可以自己选择键,以及值可以是 Writable 类型。
SequenceFile 也可以作为小文件的容器。HDFS和Mapreduce 是针对大文件优化的,所以通过 SequenceFile 类型将小文件包装起来,可以获得更高效率的存储和处理。
SequnceFile的写操作
通过 createWriter()静态方法可以创建 SequenceFile 对象,并返回 SequnceFile.Writer 实例。该静态方法有多个重载版本,但都需要制定待写入的数据流,Configuration 对象,以及键和值的类型。存储在 SequenceFIle 中的键和值并不一定是 Writable 类型。只要能够被 Sertialization 序列化和反序列化,任何类型都可以。
SequenceFile的读操作
从头到尾读取顺序文件不外乎创建 SequenceFile.reader 实例后反复调用 next() 方法迭代读取记录。读取的是哪条记录与使用的序列化框架有关。如果使用的是 Writable 类型,那么通过键和值作为参数的 next() 方法可以将数据流的下一条键值对读入变量中。
1.通过命令行接口显示 SequenceFile。
hadoop fs 命令有一个 -text 选项可以以文本形式显示顺序文件。该选项可以查看文件的代码,由此检测出文件的类型并将其转换为相应的文本。该选项可以识别 gzip 压缩文件,顺序文件和 Avro 数据文件;否则,假设输入为纯文本文件。
2. SequenceFile 的排序和合并。
Mapreduce 是对多个顺序文件进行排序(或合并)最有效的方法。Mapreduce 本身是并行的,并且可由你制定使用多少个 reducer 。例如,通过制定一个 reducer ,可以得到一个输出文件。
3.SequenceFile 的格式。
顺序文件由文件头和随后的一条或多条记录组成。顺序文件的前三个字节为 SEQ,紧随其后的一个字节表示顺序文件的版本号。文件头还包括其他字段,例如键和值的名称,数据压缩细节,用户定义的元数据以及同步标识。同步标识用于在读取文件时能够从任意位置开始识别记录边界。每个文件都有一个随机生成的同步标识,其值存储在文件头中,位于顺序文件中的记录与记录之间。同步标识的额外存储开销要求小于1%,所以没有必要在每条记录末尾添加该标识。
关于MapFile
MapFile 是已经排过序的 SequenceFile ,它有索引,所以可以按键查找。索引本身就是一个 SequenceFile ,包含 map 中的一小部分键。由于索引能够加载进内存,因此可以提供对主数据文件的快速查找。主数据文件则是另一个 SequenceFIle ,包含了所有的 map 条目,这些条目都按照键顺序进行了排序。
其他文件格式和面向列的格式
顺序文件和 map 文件是 Hadoop 中最早的,但并不是仅有的二进制文件格式,事实上,对于新项目而言,有更好的二进制格式可供选择。
Avro 数据文件在某些方面类似顺序文件,是面向大规模数据处理而设计的。但是 Avro 数据文件又是可移植的,它们可以跨越不同的编程语言使用。顺序文件,map 文件和 Avro 数据文件都是面向行的格式,意味着每一行的值在文件中是连续存储的。在面向列的格式中,文件中的行被分割成行的分片,然后每个分片以面向列的形式存储:首先存储每行第一列的值,然后是每行第2列的值,如此以往。
能够减少磁盘的占用空间和网络传输的量,并加速数据在网络和磁盘上的传输。
Hadoop 应用处理的数据集非常大,因此需要借助于压缩。使用哪种压缩格式与待处理的文件的大小,格式和所用的工具有关。比较各种压缩算法的压缩比和性能(从高到低):
1. 使用容器文件格式,例如顺序文件, Avro 数据文件。 ORCF 了说 Parquet 文件
2. 使用支持切分的压缩格式,例如 bzip2 或者通过索引实现切分的压缩格式,例子如LZO。
3. 在应用中将文件中切分成块,并使用任意一种他所格式为每个数据块建立压缩文件(不论它是否支持切分)。在这种情况下,需要合理选择数据大小,以确保压缩后的数据块的大小近似于HDFS块的大小。
4. 存储未经压缩的文件。
重点:压缩和拆分一般是冲突的(压缩后的文件的 block 是不能很好地拆分独立运行,很多时候某个文件的拆分点是被拆分到两个压缩文件中,这时 Map 任务就无法处理,所以对于这些压缩,Hadoop 往往是直接使用一个 Map 任务处理整个文件的分析)。对大文件不可使用不支持切分整个文件的压缩格式,会失去数据的特性,从而造成 Mapreduce 应用效率低下。
Map 的输出结果也可以进行压缩,这样可以减少 Map 结果到 Reduce 的传输的数据量,加快传输速率。
在 Mapreduce 中使用压缩
FileOutputFormat.setCompressOutput(job,true);FileOutputFormat.setOutputCompressorClass(job,GzipCodec.class);
如果输出生成顺序文件,可以设置 mapreduce.output.fileoutputformat.compress.type 属性来控制限制使用压缩格式。默认值是RECORD,即针对每条记录进行压缩。如果将其改为BLOCK,将针对一组记录进行压缩,这是推荐的压缩策略,因为它的压缩效率更高。
MRUnit单元测试Mapper和Reducer类在内存上独立运行, PipelineMapReduceDriver单线程运行.
LocalJobRunner单线程运行, 且仅有一个 Reducer能够启动conf.set("mapred.job.tracker", "local"); conf.set("fs.default.name", "file:////"); FileSystem fs = FileSystem.getLocal(conf);
MiniMRCluster, MiniYarnCluster, MiniDFSCluster 多线程
Hadoop官网:http://hadoop.apache.org/
确保 network 网络已经配置好,使用Xftp等类似工具进行上传,把 hadoop-2.7.5.tar.gz 上传到 /opt/hadoop 目录内。上传完成后,在 master 主机上执行以下代码:
cd /opt/hadoop
进入/opt/hadoop目录后,执行解压缩命令:
tar -zxvf hadoop-2.7.5.tar.gz
回车后系统开始解压,屏幕会不断滚动解压过程,执行成功后,系统在hadoop目录自动创建hadoop-2.7.5子目录。
然后修改文件夹名称为“hadoop”,即hadoop安装目录,执行修改文件夹名称命令:
mv hadoop-2.7.5 hadoop
注意:也可用Xftp查看相应目录是否存在,确保正确完成。
我们进入安装目录,查看一下安装文件,如果显示如图文件列表,说明压缩成功。
1.修改主机名称,我这里创建了三个虚拟主机,分别命名node-1,node-2,node-3,进入 network 文件删掉里面的内容直接写上主机名就可以了
vi /etc/sysconfig/network
2.映射 IP 和主机名,之后 reboot 重启主机
[root@node-1 redis-cluster]# vim /etc/hosts192.168.0.1 node-1192.168.0.2 node-2192.168.0.3 node-3
3.检测防火墙(要关闭防火墙),不同系统的防火墙关闭方式不一样,以下做个参考即可
1.service iptables stop 关闭2.chkconfig iptables off 强制关闭 除非手动开启不然不会自动开启3.chkconfig iptanles 查看4.service iptables status 查看防火墙状态
4.ssh 免密码登录
输入命令:ssh-keygen -t rsa 然后点击四个回车键,如下图所示:
然后通过 ssh-copy-id 对应主机 IP,之后通过“ssh 主机名/IP” 便可以不输入密码即可登录相应的主机系统
上传 hadoop 安装包解压后进入 hadoop-2.7.6/etc/hadoop 目录下
以下所有的 <property> </property> 都是写在各自相应配置文件末尾的 <configuration> 标签里面
第一个 hadoop-env.sh
vi hadoop-env.shexport JAVA_HOME=/usr/java/jdk1.8.0_171 #JAVA_HOME写上自己jdk 的安装路径
第二个 :core-site.xml
vi <strong>core-site.xml</strong>
<!-- 指定Hadoop所使用的文件系统schema(URI),HDFS的老大(NameNode)的地址 --><property> <name>fs.defaultFS</name> <value>hdfs://node-1:9000</value></property><!-- 定Hadoop运行是产生文件的存储目录。默认 --><property> <name>hadoop.tmp.dir</name> <value>/export/data/hddata</value></property>
第三个: hdfs-site.xml
vi <strong>hdfs-site.xml</strong>
<!-- 指定HDFS副本的数量,不修改默认为3个 --><property> <name>dfs.replication</name> <value>2</value></property><!-- dfs的SecondaryNameNode在哪台主机上 --><property> <name>dfs.namenode.secondary.http-address</name> <value>node-2:50090</value></property>
第四个: mapred-site.xml
mv mapred-site.xml.template mapred-site.xmlvi mapred-site.xml<!-- 指定MapReduce运行是框架,这里指定在yarn上,默认是local --><property> <name>mapreduce.framework.name</name> <value>yarn</value></property>
第五个:yarn-site.xml
vi <strong>yarn-site.xml</strong>
<!-- 指定yarn的老大ResourceManager的地址 --><property> <name>yarn.resourcemanager.hostname</name> <value>node-1</value></property><!-- NodeManager上运行的附属服务。需要配置成mapreduce_shuffle,才可以运行MapReduce程序默认值 --><property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value></property>
第六个:slaves 文件,里面写上从节点所在的主机名字
vi slavesnode-1node-2node-3
第七个:将 Hadoop 添加到环境变量
vi /etc/profileexport HADOOP_HOME=/export/server/hadoop-2.7.6export PATH=$HADOOP_HOME/bin:$PATH
然后将 hadoop 安装包远程发送给各个副计算机
scp -r /export/server/hadoop-2.7.6/ root@node-2:/export/server/scp -r /export/server/hadoop-2.7.6/ root@node-3:/export/server/
把配置好的环境变量也要远程发送给各个副计算机
scp -r /etc/profile root@node-2:/etc/scp -r /etc/profile root@node-3:/etc/
然后试所有的计算机环境变量生效
source /etc/profile
关于 hadoop的 配置说明
在 hadoop 官网,左下角点击 Documentation 点击相应的版本进去,拉到最左下角有 ***-default.xml
***-default.xml 是默认的配置,如果用户没有修改这些选项将会生效
***-site.xml 是用户自定义的配置
site的配置选项优先级大于 default 的,如果 site 里面有就会覆盖 default 里面的
Log yarn.log-aggregation-enable=true如果显示错误,则日志存储在节点管理器运行节点上。当聚集启用时所有日志进行汇总,任务完成后转移到HDFS。
Hadoop集群性能监控Ganglia, Nagios
使用Hadoop工具 Ambari管理集群
//www.51coolma.cn/hadoop/hadoop_big_data_overview.html