高可用Hadoop平台-运行MapReduce程序
1.概述
最近有同学反应,如何在配置了HA的Hadoop平台运行MapReduce程序呢?对于刚步入Hadoop行业的同学,这个疑问却是会存在,其实仔细想想,如果你之前的语言功底不错的,应该会想到自动重连,自动重连也可以帮我我们解决运行MapReduce程序的问题。然后,今天我赘述的是利用Hadoop的Java API 来实现。
2.介绍
下面直接附上代码,代码中我都有注释。
2.1Java操作HDFS HA的API
代码如下:
/** * */package cn.hdfs.mr.example;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;/** * @author dengjie * @date 2015年3月24日 * @description TODO */public class DFS {public static void main(String[] args) {Configuration conf = new Configuration();conf.set("fs.defaultFS", "hdfs://cluster1");//指定hdfs的nameservice为cluster1,是NameNode的URIconf.set("dfs.nameservices", "cluster1");//指定hdfs的nameservice为cluster1conf.set("dfs.ha.namenodes.cluster1", "nna,nns");//cluster1下面有两个NameNode,分别是nna,nnsconf.set("dfs.namenode.rpc-address.cluster1.nna", "10.211.55.26:9000");//nna的RPC通信地址conf.set("dfs.namenode.rpc-address.cluster1.nns", "10.211.55.27:9000");//nns的RPC通信地址conf.set("dfs.client.failover.proxy.provider.cluster1", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");//配置失败自动切换实现方式FileSystem fs = null;try {fs = FileSystem.get(conf);//获取文件对象FileStatus[] list = fs.listStatus(new Path("/"));//文件状态集合for (FileStatus file : list) {System.out.println(file.getPath().getName());//打印目录名}} catch (IOException e) {e.printStackTrace();} finally {try {if (fs != null) {fs.close();}} catch (IOException e) {e.printStackTrace();}}}}
接下来,附上 Java 运行 MapReduce 程序的 API 代码。
2.2Java 运行 MapReduce 程序的 API
以 WordCount 为例子,代码如下:
package cn.jpush.hdfs.mr.example;import java.io.IOException;import java.util.Random;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import cn.jpush.hdfs.utils.ConfigUtils;/** * * @author dengjie * @date 2014年11月29日 * @description Wordcount的例子是一个比较经典的mapreduce例子,可以叫做Hadoop版的hello world。 * 它将文件中的单词分割取出,然后shuffle,sort(map过程),接着进入到汇总统计 * (reduce过程),最后写道hdfs中。基本流程就是这样。 */public class WordCount {private static Logger log = LoggerFactory.getLogger(WordCount.class);public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);private Text word = new Text();/* * 源文件:a b b * * map之后: * * a 1 * * b 1 * * b 1 */public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());// 整行读取while (itr.hasMoreTokens()) {word.set(itr.nextToken());// 按空格分割单词context.write(word, one);// 每次统计出来的单词+1}}}/* * reduce之前: * * a 1 * * b 1 * * b 1 * * reduce之后: * * a 1 * * b 2 */public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}@SuppressWaings("deprecation")public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.set("fs.defaultFS", "hdfs://cluster1");conf.set("dfs.nameservices", "cluster1");conf.set("dfs.ha.namenodes.cluster1", "nna,nns");conf.set("dfs.namenode.rpc-address.cluster1.nna", "10.211.55.26:9000");conf.set("dfs.namenode.rpc-address.cluster1.nns", "10.211.55.27:9000");conf.set("dfs.client.failover.proxy.provider.cluster1", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");long random1 = new Random().nextLong();// 重定下输出目录log.info("random1 -> " + random1);Job job1 = new Job(conf, "word count");job1.setJarByClass(WordCount.class);job1.setMapperClass(TokenizerMapper.class);// 指定Map计算的类job1.setCombinerClass(IntSumReducer.class);// 合并的类job1.setReducerClass(IntSumReducer.class);// Reduce的类job1.setOutputKeyClass(Text.class);// 输出Key类型job1.setOutputValueClass(IntWritable.class);// 输出值类型 FileInputFormat.addInputPath(job1, new Path("/home/hdfs/test/hello.txt"));// 指定输入路径FileOutputFormat.setOutputPath(job1, new Path(String.format(ConfigUtils.HDFS.WORDCOUNT_OUT, random1)));// 指定输出路径System.exit(job1.waitForCompletion(true) ? 0 : 1);// 执行完MR任务后退出应用}}
3.运行结果
下面附上部分运行 Log 日志,如下所示:
[Job.main] - Running job: job_local551164419_00012015-03-24 11:52:09 INFO [LocalJobRunner.Thread-12] - OutputCommitter set in config null2015-03-24 11:52:09 INFO [LocalJobRunner.Thread-12] - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter2015-03-24 11:52:10 INFO [LocalJobRunner.Thread-12] - Waiting for map tasks2015-03-24 11:52:10 INFO [LocalJobRunner.LocalJobRunner Map Task Executor #0] - Starting task: attempt_local551164419_0001_m_000000_02015-03-24 11:52:10 INFO [ProcfsBasedProcessTree.LocalJobRunner Map Task Executor #0] - ProcfsBasedProcessTree currently is supported only on Linux.2015-03-24 11:52:10 INFO [Task.LocalJobRunner Map Task Executor #0] - Using ResourceCalculatorProcessTree : null2015-03-24 11:52:10 INFO [MapTask.LocalJobRunner Map Task Executor #0] - Processing split: hdfs://cluster1/home/hdfs/test/hello.txt:0+242015-03-24 11:52:10 INFO [MapTask.LocalJobRunner Map Task Executor #0] - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer2015-03-24 11:52:10 INFO [MapTask.LocalJobRunner Map Task Executor #0] - (EQUATOR) 0 kvi 26214396(104857584)2015-03-24 11:52:10 INFO [MapTask.LocalJobRunner Map Task Executor #0] - mapreduce.task.io.sort.mb: 1002015-03-24 11:52:10 INFO [MapTask.LocalJobRunner Map Task Executor #0] - soft limit at 838860802015-03-24 11:52:10 INFO [MapTask.LocalJobRunner Map Task Executor #0] - bufstart = 0; bufvoid = 1048576002015-03-24 11:52:10 INFO [MapTask.LocalJobRunner Map Task Executor #0] - kvstart = 26214396; length = 65536002015-03-24 11:52:10 INFO [LocalJobRunner.LocalJobRunner Map Task Executor #0] - 2015-03-24 11:52:10 INFO [MapTask.LocalJobRunner Map Task Executor #0] - Starting flush of map output2015-03-24 11:52:10 INFO [MapTask.LocalJobRunner Map Task Executor #0] - Spilling map output2015-03-24 11:52:10 INFO [MapTask.LocalJobRunner Map Task Executor #0] - bufstart = 0; bufend = 72; bufvoid = 1048576002015-03-24 11:52:10 INFO [MapTask.LocalJobRunner Map Task Executor #0] - kvstart = 26214396(104857584); kvend = 26214352(104857408); length = 45/65536002015-03-24 11:52:10 INFO [MapTask.LocalJobRunner Map Task Executor #0] - Finished spill 02015-03-24 11:52:10 INFO [Task.LocalJobRunner Map Task Executor #0] - Task:attempt_local551164419_0001_m_000000_0 is done. And is in the process of committing2015-03-24 11:52:10 INFO [LocalJobRunner.LocalJobRunner Map Task Executor #0] - map2015-03-24 11:52:10 INFO [Task.LocalJobRunner Map Task Executor #0] - Task 'attempt_local551164419_0001_m_000000_0' done.2015-03-24 11:52:10 INFO [LocalJobRunner.LocalJobRunner Map Task Executor #0] - Finishing task: attempt_local551164419_0001_m_000000_02015-03-24 11:52:10 INFO [LocalJobRunner.Thread-12] - map task executor complete.2015-03-24 11:52:10 INFO [LocalJobRunner.Thread-12] - Waiting for reduce tasks2015-03-24 11:52:10 INFO [LocalJobRunner.pool-6-thread-1] - Starting task: attempt_local551164419_0001_r_000000_02015-03-24 11:52:10 INFO [ProcfsBasedProcessTree.pool-6-thread-1] - ProcfsBasedProcessTree currently is supported only on Linux.2015-03-24 11:52:10 INFO [Task.pool-6-thread-1] - Using ResourceCalculatorProcessTree : null2015-03-24 11:52:10 INFO [ReduceTask.pool-6-thread-1] - Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@11974142015-03-24 11:52:10 INFO [MergeManagerImpl.pool-6-thread-1] - MergerManager: memoryLimit=1503238528, maxSingleShuffleLimit=375809632, mergeThreshold=992137472, ioSortFactor=10, memToMemMergeOutputsThreshold=102015-03-24 11:52:10 INFO [EventFetcher.EventFetcher for fetching Map Completion Events] - attempt_local551164419_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events2015-03-24 11:52:10 INFO [LocalFetcher.localfetcher#1] - localfetcher#1 about to shuffle output of map attempt_local551164419_0001_m_000000_0 decomp: 50 len: 54 to MEMORY2015-03-24 11:52:10 INFO [InMemoryMapOutput.localfetcher#1] - Read 50 bytes from map-output for attempt_local551164419_0001_m_000000_02015-03-24 11:52:10 INFO [MergeManagerImpl.localfetcher#1] - closeInMemoryFile -> map-output of size: 50, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->502015-03-24 11:52:10 INFO [EventFetcher.EventFetcher for fetching Map Completion Events] - EventFetcher is interrupted.. Retuing2015-03-24 11:52:10 INFO [LocalJobRunner.pool-6-thread-1] - 1 / 1 copied.2015-03-24 11:52:10 INFO [MergeManagerImpl.pool-6-thread-1] - finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs2015-03-24 11:52:10 INFO [Merger.pool-6-thread-1] - Merging 1 sorted segments2015-03-24 11:52:10 INFO [Merger.pool-6-thread-1] - Down to the last merge-pass, with 1 segments left of total size: 46 bytes2015-03-24 11:52:10 INFO [MergeManagerImpl.pool-6-thread-1] - Merged 1 segments, 50 bytes to disk to satisfy reduce memory limit2015-03-24 11:52:10 INFO [MergeManagerImpl.pool-6-thread-1] - Merging 1 files, 54 bytes from disk2015-03-24 11:52:10 INFO [MergeManagerImpl.pool-6-thread-1] - Merging 0 segments, 0 bytes from memory into reduce2015-03-24 11:52:10 INFO [Merger.pool-6-thread-1] - Merging 1 sorted segments2015-03-24 11:52:10 INFO [Merger.pool-6-thread-1] - Down to the last merge-pass, with 1 segments left of total size: 46 bytes2015-03-24 11:52:10 INFO [LocalJobRunner.pool-6-thread-1] - 1 / 1 copied.2015-03-24 11:52:10 INFO [deprecation.pool-6-thread-1] - mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords2015-03-24 11:52:10 INFO [Task.pool-6-thread-1] - Task:attempt_local551164419_0001_r_000000_0 is done. And is in the process of committing2015-03-24 11:52:10 INFO [LocalJobRunner.pool-6-thread-1] - 1 / 1 copied.2015-03-24 11:52:10 INFO [Task.pool-6-thread-1] - Task attempt_local551164419_0001_r_000000_0 is allowed to commit now2015-03-24 11:52:10 INFO [FileOutputCommitter.pool-6-thread-1] - Saved output of task 'attempt_local551164419_0001_r_000000_0' to hdfs://cluster1/output/result/-3636988299559297154/_temporary/0/task_local551164419_0001_r_0000002015-03-24 11:52:10 INFO [LocalJobRunner.pool-6-thread-1] - reduce > reduce2015-03-24 11:52:10 INFO [Task.pool-6-thread-1] - Task 'attempt_local551164419_0001_r_000000_0' done.2015-03-24 11:52:10 INFO [LocalJobRunner.pool-6-thread-1] - Finishing task: attempt_local551164419_0001_r_000000_02015-03-24 11:52:10 INFO [LocalJobRunner.Thread-12] - reduce task executor complete.2015-03-24 11:52:10 INFO [Job.main] - Job job_local551164419_0001 running in uber mode : false2015-03-24 11:52:10 INFO [Job.main] - map 100% reduce 100%2015-03-24 11:52:10 INFO [Job.main] - Job job_local551164419_0001 completed successfully2015-03-24 11:52:10 INFO [Job.main] - Counters: 35File System CountersFILE: Number of bytes read=462FILE: Number of bytes written=466172FILE: Number of read operations=0FILE: Number of large read operations=0FILE: Number of write operations=0HDFS: Number of bytes read=48HDFS: Number of bytes written=24HDFS: Number of read operations=13HDFS: Number of large read operations=0HDFS: Number of write operations=4Map-Reduce FrameworkMap input records=2Map output records=12Map output bytes=72Map output materialized bytes=54Input split bytes=105Combine input records=12Combine output records=6Reduce input groups=6Reduce shuffle bytes=54Reduce input records=6Reduce output records=6Spilled Records=12Shuffled Maps =1Failed Shuffles=0Merged Map outputs=1GC time elapsed (ms)=13Total committed heap usage (bytes)=514850816Shuffle ErrorsBAD_ID=0CONNECTION=0IO_ERROR=0WRONG_LENGTH=0WRONG_MAP=0WRONG_REDUCE=0File Input Format Counters Bytes Read=24File Output Format Counters Bytes Written=24
原文件如下所示:
a a c v d da d d s s x
Reduce 结果图,如下所示:
4.总结
我们可以按以下步骤进行验证代码的可用性:
- 保证 NNA( active 状态)和 NNS( standby 状态)。注意,DN 节点都是正常运行的。
- 然后,我们运行 WordCount 程序,看能否统计出结果。
- 若安上述步骤下来,可以统计;我们接着往下执行。若不行,请排查错误,然后继续。
- 然后,我们 kill 掉 NNA 节点的 NameNode 进程,此时,NNS 的状态会由 standby 转变为 active
- 接着我们在支持 WordCount 程序,看能否统计结果;若是能统计结果,表示代码可用。
以上就是整个验证的流程。
5.结束语
这篇文章就分享到这里,如果在验证的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!
邮箱:smartloli.org@gmail.com
Twitter:https://twitter.com/smartloli
QQ群(Hadoop - 交流社区1):424769183
QQ群(Kafka并不难学): 825943084
温馨提示:请大家加群的时候写上加群理由(姓名+公司/学校),方便管理员审核,谢谢!
热爱生活,享受编程,与君共勉!
公众号:
作者:哥不是小萝莉
来源链接:https://www.cnblogs.com/smartloli/p/4362688.html
版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。
2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。