首页>>互联网>>大数据->Hadoop之MapReduce的基本使用

Hadoop之MapReduce的基本使用

时间:2023-11-29 本站 点击:1

MapReduce的基本使用

添加依赖

 <dependencies>        <dependency>            <groupId>org.apache.hadoop</groupId>            <artifactId>hadoop-common</artifactId>            <version>3.1.3</version>        </dependency>        <dependency>            <groupId>org.apache.hadoop</groupId>            <artifactId>hadoop-hdfs</artifactId>            <version>3.1.3</version>        </dependency>        <dependency>            <groupId>org.apache.hadoop</groupId>            <artifactId>hadoop-hdfs-client</artifactId>            <version>3.1.3</version>        </dependency>        <dependency>            <groupId>org.apache.hadoop</groupId>            <artifactId>hadoop-client</artifactId>            <version>3.1.3</version>        </dependency>        <dependency>            <groupId>junit</groupId>            <artifactId>junit</artifactId>            <version>4.13</version>            <scope>test</scope>        </dependency>    </dependencies>

WordCount计算

经典的WordCount计算,统计如下文本内容中单词的个数

vi mapReduce.txtMapReduce is a programmingparadigm that enablesmassive scalability acrosshundreds or thousands ofservers in a Hadoop cluster.As the processing component,MapReduce is the heart of Apache Hadoop.The term "MapReduce" refers to two separateand distinct tasks that Hadoop programs perform.

HDFS创建目录

hdfs dfs -mkdir /mapReduce/input

上传到HDFS

hdfs dfs -put mapReduce.txt /mapReduce/input

定义Mapper

/** * Mapper<KEYIN, KEYIN, KEYOUT, VALUEOUT> * KEYIN : K1的类型 行偏移量 LongWritable * KEYIN : V1的类型 一行数据 Text * KEYOUT : K2的类型 每个单词 Text * VALUEOUT : V2的类型 固定值1 LongWritable */public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {    /**     * 将数据切分为 Key-Value(K1和V1), 输入到第二步     * 自定义Map逻辑, 将第一步的结果转换成另外的Key-Value(K2和V2), 输出结果     * <p>     * K1 V1                           K2     V2     * 0  hello world       ===>      hello    1     * 11 hello mapReduce             world    1     *     * @param key     K1     * @param value   V1     * @param context MapReduce上下文对象     * @throws IOException     * @throws InterruptedException     */    @Override    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {        Text text = new Text();        LongWritable longWritable = new LongWritable();        // 对每行数据拆分处理        String row = value.toString();        String[] worlds = row.split(" ");        // 对拆分数据转换        for (String world : worlds) {            text.set(world);            longWritable.set(1);            context.write(text, longWritable);        }    }}

定义Reduce

/** * Reducer<KEYIN, KEYIN, KEYOUT, VALUEOUT> * KEYIN : K2的类型 每个单词 Text * KEYIN : V2的类型 集合中泛型的类型 LongWritable * KEYOUT : K3的类型 每个单词 Text * VALUEOUT : V3的类型 每个单词出现的次数 LongWritable */public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {    /**     * 将新的K2 V2转换为K3 V3     * <p>     * 新K2     新V2                  K3   V3     * hello  <1,1>      ===>   hello   2     * world  <1,1,1>           world   3     *     * @param key     K2     * @param values  V2     * @param context MapReduce上下文对象     * @throws IOException     * @throws InterruptedException     */    @Override    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {        long count = 0;        LongWritable longWritable = new LongWritable();        // 遍历集合对每个单词出现次数累加        for (LongWritable value : values) {            count += value.get();        }        // 写入MapReduce上下文        longWritable.set(count);        context.write(key, longWritable);    }}

定义Job

方式一:

public class WordCountJob extends Configured implements Tool {    @Override    public int run(String[] args) throws Exception {        // 创建任务对象        Job job = Job.getInstance(super.getConf(), "mapreduce-test");        //打包到集群运行,必须要添加以下配置,指定程序的main函数//        job.setJarByClass(WordCountJob.class);        // 设置读取文件的类以及从哪里读取        job.setInputFormatClass(TextInputFormat.class);        TextInputFormat.addInputPath(job, new Path("hdfs://node001:9000/mapReduce/input"));        // 设置Mapper类        job.setMapperClass(WordCountMapper.class);        // 设置Map阶段, K2 V2的输出类型        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(LongWritable.class);        // Shuffle阶段,使用默认方式        job.setReducerClass(WordCountReducer.class);        // 设置Reduce类        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(LongWritable.class);        // 设置输出类        job.setOutputFormatClass(TextOutputFormat.class);        TextOutputFormat.setOutputPath(job, new Path("hdfs://node001:9000/mapReduce/output"));        // 提交任务        boolean waitForCompletion = job.waitForCompletion(true);        return waitForCompletion ? 0 : 1;    }    public static void main(String[] args) throws Exception {        Configuration configuration = new Configuration();        int run = ToolRunner.run(configuration, new WordCountJob(), args);        // 非零状态码表示异常终止        System.exit(run);    }}

方式二:

复制core-site.xmlhdfs-site.xmlmapred-site.xmlyarn-site.xml等文件到项目resources目录

public class WordCountJob {    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        //获取配置文件        Configuration configuration = new Configuration(true);        //本地模式运行        configuration.set("mapreduce.framework.name", "local");        //创建任务        Job job = Job.getInstance(configuration);        //设置任务主类        job.setJarByClass(WordCountJob.class);        //设置任务        job.setJobName("wordcount-" + System.currentTimeMillis());        //设置Reduce的数量        job.setNumReduceTasks(2);        //设置数据的输入路径        FileInputFormat.setInputPaths(job, new Path("/mapReduce/input"));        //设置数据的输出路径        FileOutputFormat.setOutputPath(job, new Path("/mapReduce/output_" + System.currentTimeMillis()));        //设置Map的输入的key和value类型        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(LongWritable.class);        // 设置Reduce的输出的key和value类型        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(LongWritable.class);        //设置Map和Reduce的处理类        job.setMapperClass(WordCountMapper.class);        job.setReducerClass(WordCountReducer.class);        //提交任务        job.waitForCompletion(true);    }}

MapReduce的执行方式

Linux端执行方式

1.对项目打包,如:wordcount.jar,并上传Linux服务器

2.再Linux服务器执行:Hadoop jar wordcount.jar xx.xx.xx.WordCountJob

window端本地化执行

1.复制core-site.xmlhdfs-site.xmlmapred-site.xmlyarn-site.xml等Hadoop配置文件到项目resources目录

2.configuration.set("mapreduce.framework.name", "local");

在Linux服务端运行

打Jar包运行

hadoop jar wordcount.jar  cn.ybzy.mapreduce.WordCountJob

bash-4.1# hadoop jar mapreduce.jar  cn.ybzy.mapreduce.WordCountJob 22/02/27 09:05:54 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id22/02/27 09:05:54 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=22/02/27 09:05:55 WARN mapreduce.JobResourceUploader: No job jar file set.  User classes may not be found. See Job or Job#setJar(String).22/02/27 09:05:55 INFO input.FileInputFormat: Total input paths to process : 122/02/27 09:05:55 INFO mapreduce.JobSubmitter: number of splits:122/02/27 09:05:55 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1736835367_000122/02/27 09:05:56 INFO mapreduce.Job: The url to track the job: http://node001:8080/22/02/27 09:05:56 INFO mapreduce.Job: Running job: job_local1736835367_000122/02/27 09:05:56 INFO mapred.LocalJobRunner: OutputCommitter set in config null22/02/27 09:05:56 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 122/02/27 09:05:56 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter22/02/27 09:05:56 INFO mapred.LocalJobRunner: Waiting for map tasks22/02/27 09:05:56 INFO mapred.LocalJobRunner: Starting task: attempt_local1736835367_0001_m_000000_022/02/27 09:05:56 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 122/02/27 09:05:56 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]22/02/27 09:05:56 INFO mapred.MapTask: Processing split: hdfs://node001:9000/mapReduce/input/mapReduce.txt:0+29122/02/27 09:05:56 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)22/02/27 09:05:56 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 10022/02/27 09:05:56 INFO mapred.MapTask: soft limit at 8388608022/02/27 09:05:56 INFO mapred.MapTask: bufstart = 0; bufvoid = 10485760022/02/27 09:05:56 INFO mapred.MapTask: kvstart = 26214396; length = 655360022/02/27 09:05:57 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer22/02/27 09:05:57 INFO mapred.LocalJobRunner: 22/02/27 09:05:57 INFO mapred.MapTask: Starting flush of map output22/02/27 09:05:57 INFO mapred.MapTask: Spilling map output22/02/27 09:05:57 INFO mapred.MapTask: bufstart = 0; bufend = 643; bufvoid = 10485760022/02/27 09:05:57 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214224(104856896); length = 173/655360022/02/27 09:05:57 INFO mapred.MapTask: Finished spill 022/02/27 09:05:57 INFO mapred.Task: Task:attempt_local1736835367_0001_m_000000_0 is done. And is in the process of committing22/02/27 09:05:57 INFO mapred.LocalJobRunner: map22/02/27 09:05:57 INFO mapred.Task: Task 'attempt_local1736835367_0001_m_000000_0' done.22/02/27 09:05:57 INFO mapred.LocalJobRunner: Finishing task: attempt_local1736835367_0001_m_000000_022/02/27 09:05:57 INFO mapred.LocalJobRunner: map task executor complete.22/02/27 09:05:57 INFO mapred.LocalJobRunner: Waiting for reduce tasks22/02/27 09:05:57 INFO mapred.LocalJobRunner: Starting task: attempt_local1736835367_0001_r_000000_022/02/27 09:05:57 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 122/02/27 09:05:57 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]22/02/27 09:05:57 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@16e20f4022/02/27 09:05:57 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=333971456, maxSingleShuffleLimit=83492864, mergeThreshold=220421168, ioSortFactor=10, memToMemMergeOutputsThreshold=1022/02/27 09:05:57 INFO reduce.EventFetcher: attempt_local1736835367_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events22/02/27 09:05:57 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local1736835367_0001_m_000000_0 decomp: 733 len: 737 to MEMORY22/02/27 09:05:57 INFO reduce.InMemoryMapOutput: Read 733 bytes from map-output for attempt_local1736835367_0001_m_000000_022/02/27 09:05:57 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 733, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->73322/02/27 09:05:57 INFO mapreduce.Job: Job job_local1736835367_0001 running in uber mode : false22/02/27 09:05:57 INFO mapreduce.Job:  map 100% reduce 0%22/02/27 09:05:57 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning22/02/27 09:05:57 INFO mapred.LocalJobRunner: 1 / 1 copied.22/02/27 09:05:57 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs22/02/27 09:05:57 INFO mapred.Merger: Merging 1 sorted segments22/02/27 09:05:57 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 719 bytes22/02/27 09:05:57 INFO reduce.MergeManagerImpl: Merged 1 segments, 733 bytes to disk to satisfy reduce memory limit22/02/27 09:05:57 INFO reduce.MergeManagerImpl: Merging 1 files, 737 bytes from disk22/02/27 09:05:57 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce22/02/27 09:05:57 INFO mapred.Merger: Merging 1 sorted segments22/02/27 09:05:57 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 719 bytes22/02/27 09:05:57 INFO mapred.LocalJobRunner: 1 / 1 copied.22/02/27 09:05:57 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords22/02/27 09:05:58 INFO mapred.Task: Task:attempt_local1736835367_0001_r_000000_0 is done. And is in the process of committing22/02/27 09:05:58 INFO mapred.LocalJobRunner: 1 / 1 copied.22/02/27 09:05:58 INFO mapred.Task: Task attempt_local1736835367_0001_r_000000_0 is allowed to commit now22/02/27 09:05:58 INFO output.FileOutputCommitter: Saved output of task 'attempt_local1736835367_0001_r_000000_0' to hdfs://node001:9000/mapReduce/output/_temporary/0/task_local1736835367_0001_r_00000022/02/27 09:05:58 INFO mapred.LocalJobRunner: reduce > reduce22/02/27 09:05:58 INFO mapred.Task: Task 'attempt_local1736835367_0001_r_000000_0' done.22/02/27 09:05:58 INFO mapred.LocalJobRunner: Finishing task: attempt_local1736835367_0001_r_000000_022/02/27 09:05:58 INFO mapred.LocalJobRunner: reduce task executor complete.22/02/27 09:05:58 INFO mapreduce.Job:  map 100% reduce 100%22/02/27 09:05:58 INFO mapreduce.Job: Job job_local1736835367_0001 completed successfully22/02/27 09:05:58 INFO mapreduce.Job: Counters: 35        File System Counters                FILE: Number of bytes read=1864                FILE: Number of bytes written=551289                FILE: Number of read operations=0                FILE: Number of large read operations=0                FILE: Number of write operations=0                HDFS: Number of bytes read=582                HDFS: Number of bytes written=343                HDFS: Number of read operations=13                HDFS: Number of large read operations=0                HDFS: Number of write operations=4        Map-Reduce Framework                Map input records=9                Map output records=44                Map output bytes=643                Map output materialized bytes=737                Input split bytes=120                Combine input records=0                Combine output records=0                Reduce input groups=38                Reduce shuffle bytes=737                Reduce input records=44                Reduce output records=38                Spilled Records=88                Shuffled Maps =1                Failed Shuffles=0                Merged Map outputs=1                GC time elapsed (ms)=22                Total committed heap usage (bytes)=488112128        Shuffle Errors                BAD_ID=0                CONNECTION=0                IO_ERROR=0                WRONG_LENGTH=0                WRONG_MAP=0                WRONG_REDUCE=0        File Input Format Counters                 Bytes Read=291        File Output Format Counters                 Bytes Written=343bash-4.1#

vi mapReduce.txtMapReduce is a programmingparadigm that enablesmassive scalability acrosshundreds or thousands ofservers in a Hadoop cluster.As the processing component,MapReduce is the heart of Apache Hadoop.The term "MapReduce" refers to two separateand distinct tasks that Hadoop programs perform.0

vi mapReduce.txtMapReduce is a programmingparadigm that enablesmassive scalability acrosshundreds or thousands ofservers in a Hadoop cluster.As the processing component,MapReduce is the heart of Apache Hadoop.The term "MapReduce" refers to two separateand distinct tasks that Hadoop programs perform.1

解决运行任务卡住

方案一

网上多说yarn管理的内存资源不够,修改yarn-site.xml,设置资源大小

vi mapReduce.txtMapReduce is a programmingparadigm that enablesmassive scalability acrosshundreds or thousands ofservers in a Hadoop cluster.As the processing component,MapReduce is the heart of Apache Hadoop.The term "MapReduce" refers to two separateand distinct tasks that Hadoop programs perform.2

方案二

修改mapred-site.xml,将

vi mapReduce.txtMapReduce is a programmingparadigm that enablesmassive scalability acrosshundreds or thousands ofservers in a Hadoop cluster.As the processing component,MapReduce is the heart of Apache Hadoop.The term "MapReduce" refers to two separateand distinct tasks that Hadoop programs perform.3

修改为

vi mapReduce.txtMapReduce is a programmingparadigm that enablesmassive scalability acrosshundreds or thousands ofservers in a Hadoop cluster.As the processing component,MapReduce is the heart of Apache Hadoop.The term "MapReduce" refers to two separateand distinct tasks that Hadoop programs perform.4

分区Partation

vi mapReduce.txtMapReduce is a programmingparadigm that enablesmassive scalability acrosshundreds or thousands ofservers in a Hadoop cluster.As the processing component,MapReduce is the heart of Apache Hadoop.The term "MapReduce" refers to two separateand distinct tasks that Hadoop programs perform.5

对上述mapReduce.txt文件中的单词进行统计分区

定义Partitioner

vi mapReduce.txtMapReduce is a programmingparadigm that enablesmassive scalability acrosshundreds or thousands ofservers in a Hadoop cluster.As the processing component,MapReduce is the heart of Apache Hadoop.The term "MapReduce" refers to two separateand distinct tasks that Hadoop programs perform.6

使用Partitioner

vi mapReduce.txtMapReduce is a programmingparadigm that enablesmassive scalability acrosshundreds or thousands ofservers in a Hadoop cluster.As the processing component,MapReduce is the heart of Apache Hadoop.The term "MapReduce" refers to two separateand distinct tasks that Hadoop programs perform.7

测试

vi mapReduce.txtMapReduce is a programmingparadigm that enablesmassive scalability acrosshundreds or thousands ofservers in a Hadoop cluster.As the processing component,MapReduce is the heart of Apache Hadoop.The term "MapReduce" refers to two separateand distinct tasks that Hadoop programs perform.8

vi mapReduce.txtMapReduce is a programmingparadigm that enablesmassive scalability acrosshundreds or thousands ofservers in a Hadoop cluster.As the processing component,MapReduce is the heart of Apache Hadoop.The term "MapReduce" refers to two separateand distinct tasks that Hadoop programs perform.9

hdfs dfs -mkdir /mapReduce/input0

hdfs dfs -mkdir /mapReduce/input1

序列化和排序

对文本文件中的数据(字母、数字)排序

数据准备

准备sort.txt

hdfs dfs -mkdir /mapReduce/input2

定义MyPairWritable

hdfs dfs -mkdir /mapReduce/input3

定义SortMapper

hdfs dfs -mkdir /mapReduce/input4

定义SortReduce

hdfs dfs -mkdir /mapReduce/input5

定义Job

hdfs dfs -mkdir /mapReduce/input6

测试

代码打包上传并执行

hdfs dfs -mkdir /mapReduce/input7

hdfs dfs -mkdir /mapReduce/input8

计数器

hadoop内置计数器

计数器 类 MapReduce 任务计数器 org.apache.hadoop.mapreduce.TaskCounter 文件系统计数器 org.apache.hadoop.mapreduce.FileSystemCounter FileInputFormat计数器 org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter FileOutputFormat计数器 org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter Job作业计数器 org.apache.hadoop.mapreduce.JobCounter

通过Context上下文对象

定义计数器,通过context上下文对象获取计数器,进行记录。

通过context上下文对象,使用计数器统计Map阶段读取了多少条数据

hdfs dfs -mkdir /mapReduce/input9

通过Enum枚举

通过enum枚举类型来定义计数器

通过Enum枚举,统计Reduce阶段读取了多少条数据

hdfs dfs -put mapReduce.txt /mapReduce/input0

hdfs dfs -put mapReduce.txt /mapReduce/input1

组合器Combiner

Combiner是对每一个 maptask的输出先做一次合并,减少在 map 和 reduce 节点之间的数据传输量,以提高网络IO 性能

数据准备

对如下文本内容进行单词个数统计

hdfs dfs -put mapReduce.txt /mapReduce/input2

自定义Combiner

自定义Combiner,继承 Reducer,重写 reduce 方法

hdfs dfs -put mapReduce.txt /mapReduce/input3

Job中设置Combiner

hdfs dfs -put mapReduce.txt /mapReduce/input4

测试

未使用规约对单词统计

hdfs dfs -put mapReduce.txt /mapReduce/input5

使用规约对单词统计

hdfs dfs -put mapReduce.txt /mapReduce/input6
原文:https://juejin.cn/post/7097121027850764319


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/BigData/1299.html