项目实训第二天
本文最后更新于:2021年7月23日 晚上
MapReduce
简介
- MapReduce在处理数据的时候,首先将数据进行切片(Split),切片本质上是一种逻辑划分(logical split),实际上实在划分任务量
- 划分完任务量之后,每一个Split都会交给一个MapTask来处理
组件
序列化
- 在MapReduce中,需要对数据进行序列化,MapReduce中单独提供了对应的序列化形式 - 需要被序列化的对象对应的类实现接口Writable
- MapReduce在序列化过程中不允许属性为
null
- 案例:统计每一个人花费的总流量(文件:flow.txt)
- 练习:获取每一个人的平均成绩(文件:score.txt)
分区
- 分区的作用是对数据进行分类
- 在实际过程中,需要根据指定的需求来对数据进行分类,指定不同的分区
- 案例:按地区分别统计每一个人花费的总流量(文件:flow.txt)
- MapReduce需要对分区来进行编号,编号从0开始依次向上递增
- 每一个分类需要对一个单独的ReduceTask,有几个分类,就需要产生对应个数的ReduceTask
- 练习:按月份统计每一个人的总成绩(目录:score)
排序
- MapReduce会自动的数据的键来进行排序,默认是按照自然序
- 如果需要指定排序规则,那么键的位置上的元素对应的类必须实现
Comparable
,考虑对数据进行序列化,所以实现WritableComparable
- 案例:对之前的总流量来进行降序排序
- 练习:按照月份升序排序,如果是同一个月,则按照利润来降序排序(文件:profit.txt)
答案
序列化
统计每一个人花费的总流量
数据(flow.txt):
1 |
|
上传到hdfs的/txt文件夹下
数据存储类实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43package com.quosimodo.serialflow;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class Flow implements Writable {
private int upFlow; // 上行流量
private int downFlow; // 下行流量
public int getUpFlow() {
return upFlow;
}
public void setUpFlow(int upFlow) {
this.upFlow = upFlow;
}
public int getDownFlow() {
return downFlow;
}
public void setDownFlow(int downFlow) {
this.downFlow = downFlow;
}
// 序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(upFlow);
out.writeInt(downFlow);
}
// 反序列化
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readInt();
this.downFlow = in.readInt();
}
}Mapper类的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26package com.quosimodo.serialflow;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class SerialFlowMapper extends Mapper<LongWritable, Text, Text, Flow> {
// key:输入的键,指的是行的字节偏移量
// value:输入的值,指的是要处理的一行数据
// context:环境参数,用于写出结果
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 18642971356 shanghai david 4132 4121
// 拆分字段
String[] arr = value.toString().split(" ");
// 封装流量信息
Flow f = new Flow();
f.setUpFlow(Integer.parseInt(arr[3]));
f.setDownFlow(Integer.parseInt(arr[4]));
// 写出数据
context.write(new Text(arr[2]), f);
}
}
Reducer类实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23package com.quosimodo.serialflow;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
// 输入的键值类型,键值类型和Mapper的输出是一致的
public class SerialFlowReducer extends Reducer<Text, Flow, Text, IntWritable> {
// key:Mapper输出的键
// values:将Mapper输出的值放到了迭代器中
@Override
protected void reduce(Text key, Iterable<Flow> values, Context context) throws IOException, InterruptedException {
// 记录总流量
int sum = 0;
// 统计总流量
for (Flow value : values) {
sum += value.getUpFlow() + value.getDownFlow();
}
context.write(key, new IntWritable(sum));
}
}
Driver类实现:配置启动类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47package com.quosimodo.serialflow;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class SerialFlowDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 获取环境配置
Configuration conf = new Configuration();
// 获取任务对象
Job job = Job.getInstance(conf);
// 设置入口类
job.setJarByClass(SerialFlowDriver.class);
// 设置Mapper
job.setMapperClass(SerialFlowMapper.class);
// 设置Reducer
job.setReducerClass(SerialFlowReducer.class);
// 设置Mapper类的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Flow.class);
// 设置Reducer类的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置输入路径
FileInputFormat.addInputPath(job,
new Path("hdfs://hadoop:9000/txt/flow.txt"));
// 设置输出路径 - 要求输出路径必须不存在
FileOutputFormat.setOutputPath(job,
new Path("hdfs://hadoop:9000/result/serial_flow"));
// 提交任务
job.waitForCompletion(true);
}
}hadoop是之前在hosts中的ip映射,对应虚拟机ip
运行之后会在hdfs的/result目录下生成对应的结果:
获取每一个人的平均成绩
数据(score.txt):
1 |
|
上传到hdfs的/txt文件夹下
数据存储类:包括各科目成绩
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42package com.quosimodo.average_score;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class Score implements Writable {
private List<Integer> scores;
public List<Integer> getScores() {
return scores;
}
public void setScores(List<Integer> scores) {
this.scores = scores;
}
// 序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(scores.size());
for (Integer score : scores) {
out.writeInt(score);
}
}
// 反序列化
@Override
public void readFields(DataInput in) throws IOException {
int len = in.readInt();
this.scores = new ArrayList<>(len);
for (int i=0; i<len; i++) {
this.scores.add(in.readInt());
}
}
}Mapper类的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34package com.quosimodo.average_score;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class SerialScoreMapper
extends Mapper<LongWritable, Text, Text, Score> {
// key:输入的键,指的是行的字节偏移量
// value:输入的值,指的是要处理的一行数据
// context:环境参数,用于写出结果
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// Bob 90 64 92 83 82 95
// 拆分字段
String[] arr = value.toString().split(" ");
// 封装流量信息
Score s = new Score();
List<Integer> scores = new ArrayList<Integer>(arr.length - 1);
for (int i = 1; i < arr.length; i++) {
scores.add(Integer.parseInt(arr[i]));
}
s.setScores(scores);
// 写出数据
context.write(new Text(arr[0]), s);
}
}
Reducer类实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33package com.quosimodo.average_score;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.List;
// 输入的键值类型,键值类型和Mapper的输出是一致的
public class SerialScoreReducer extends Reducer<Text, Score, Text, Text> {
// key:Mapper输出的键
// values:将Mapper输出的值放到了迭代器中
@Override
protected void reduce(Text key, Iterable<Score> values, Context context) throws IOException, InterruptedException {
// 记录分数
long sum = 0;
int count = 0;
for (Score value : values) {
List<Integer> scores = value.getScores();
for (Integer score : scores ) {
sum += score;
count++;
}
}
Long avg = sum/ count;
// 保留小数位
DecimalFormat df = new DecimalFormat("0.00");
String str = df.format(avg);
context.write(key, new Text(str));
}
}
Driver类实现:配置启动类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48package com.quosimodo.average_score;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class SerialScoreDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 获取环境配置
Configuration conf = new Configuration();
// 获取任务对象
Job job = Job.getInstance(conf);
// 设置入口类
job.setJarByClass(SerialScoreDriver.class);
// 设置Mapper
job.setMapperClass(SerialScoreMapper.class);
// 设置Reducer
job.setReducerClass(SerialScoreReducer.class);
// 设置Mapper类的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Score.class);
// 设置Reducer类的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 设置输入路径
FileInputFormat.addInputPath(job,
new Path("hdfs://hadoop:9000/txt/score.txt"));
// 设置输出路径 - 要求输出路径必须不存在
FileOutputFormat.setOutputPath(job,
new Path("hdfs://hadoop:9000/result/average_score"));
// 提交任务
job.waitForCompletion(true);
}
}hadoop是之前在hosts中的ip映射,对应虚拟机ip
运行之后会在hdfs的/result目录下生成对应的结果:
分区
按地区分别统计每一个人花费的总流量
数据(flow.txt):序列化题目中有数据
上传到hdfs的/txt/score文件夹下
数据存储类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53package com.quosimodo.partflow;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class Flow implements Writable {
private String city = "";
private int upFlow;
private int downFlow;
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
public int getUpFlow() {
return upFlow;
}
public void setUpFlow(int upFlow) {
this.upFlow = upFlow;
}
public int getDownFlow() {
return downFlow;
}
public void setDownFlow(int downFlow) {
this.downFlow = downFlow;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(city);
out.writeInt(upFlow);
out.writeInt(downFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
this.city = in.readUTF();
this.upFlow = in.readInt();
this.downFlow = in.readInt();
}
}分区类:
1 |
|
Mapper类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24package com.quosimodo.partflow;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class PartFlowMapper extends Mapper<LongWritable, Text, Text, Flow> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 15432697314 hangzhou jack 4558 7474
// 拆分字段
String[] arr = value.toString().split(" ");
// 封装Flow对象
Flow f = new Flow();
f.setCity(arr[1]);
f.setUpFlow(Integer.parseInt(arr[3]));
f.setDownFlow(Integer.parseInt(arr[4]));
// 写出
context.write(new Text(arr[2]), f);
}
}
Reducer类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20package com.quosimodo.partflow;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class PartFlowReducer extends Reducer<Text, Flow, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<Flow> values, Context context) throws IOException, InterruptedException {
int sum = 0;
// 计算总流量
for (Flow value : values) {
sum += value.getUpFlow() + value.getDownFlow();
}
context.write(key, new IntWritable(sum));
}
}
Driver类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45package com.quosimodo.partflow;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class PartFlowDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(PartFlowDriver.class);
job.setMapperClass(PartFlowMapper.class);
job.setReducerClass(PartFlowReducer.class);
// 指定Partitioner类
job.setPartitionerClass(CityPartitioner.class);
// 指定ReduceTask的个数
job.setNumReduceTasks(3);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Flow.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job,
new Path("hdfs://hadoop:9000/txt/flow.txt"));
FileOutputFormat.setOutputPath(job,
new Path("hdfs://hadoop:9000/result/part_flow"));
// 提交任务
job.waitForCompletion(true);
}
}
运行之后会在hdfs的/result目录下生成对应的文件
按月份统计每一个人的总成绩
数据(目录:score):
1 |
|
上传到hdfs的/txt/score文件夹下
数据存储类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42package com.quosimodo.partscore;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class Score implements Writable {
private int month;
private int score;
public int getMonth() {
return month;
}
public void setMonth(int month) {
this.month = month;
}
public int getScore() {
return score;
}
public void setScore(int score) {
this.score = score;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(month);
out.writeInt(score);
}
@Override
public void readFields(DataInput in) throws IOException {
this.month = in.readInt();
this.score = in.readInt();
}
}分区类:
1 |
|
Mapper类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22package com.quosimodo.partscore;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class PartScoreMapper extends Mapper<LongWritable, Text, Text, Score> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 zhang 89
String[] arr = value.toString().split(" ");
// 封装Score对象
Score s = new Score();
s.setMonth(Integer.parseInt(arr[0]));
s.setScore(Integer.parseInt(arr[2]));
// 写出数据
context.write(new Text(arr[1]), s);
}
}
Reducer类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19package com.quosimodo.partscore;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class PartScoreReducer extends Reducer<Text, Score, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<Score> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (Score value : values) {
sum += value.getScore();
}
context.write(key, new IntWritable(sum));
}
}
Driver类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44package com.quosimodo.partscore;
import com.quosimodo.partflow.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class PartScoreDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(PartScoreDriver.class);
job.setMapperClass(PartScoreMapper.class);
job.setReducerClass(PartScoreReducer.class);
job.setPartitionerClass(MonthPartitioner.class);
job.setNumReduceTasks(3);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Score.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job,
new Path("hdfs://hadoop:9000/txt/score/"));
FileOutputFormat.setOutputPath(job,
new Path("hdfs://hadoop:9000/result/part_score"));
// 提交任务
job.waitForCompletion(true);
}
}
运行之后会在hdfs的/result目录下生成对应的文件
排序
对之前的总流量来进行降序排序
数据(flow.txt):序列化题目中有数据
上传到hdfs的/txt文件夹下
数据存储类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48package com.quosimodo.sortflow;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class Flow implements WritableComparable<Flow> {
private String name = "";
private int totalFlow;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getTotalFlow() {
return totalFlow;
}
public void setTotalFlow(int totalFlow) {
this.totalFlow = totalFlow;
}
// 按照每个人花费的总流量进行降序排序
@Override
public int compareTo(Flow o) {
return o.totalFlow - this.totalFlow;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeInt(totalFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
this.name = in.readUTF();
this.totalFlow = in.readInt();
}
}Mapper类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24package com.quosimodo.sortflow;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class SortFlowMapper extends Mapper<LongWritable, Text, Flow, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// adair 33129
// 拆分字段
String[] arr = value.toString().split("\t");
// 封装Flow对象
Flow f = new Flow();
f.setName(arr[0]);
f.setTotalFlow(Integer.parseInt(arr[1]));
// 写出数据
context.write(f, NullWritable.get());
}
}
Reducer类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16package com.quosimodo.sortflow;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class SortFlowReducer extends Reducer<Flow, NullWritable, Text, IntWritable> {
@Override
protected void reduce(Flow key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(new Text(key.getName()), new IntWritable(key.getTotalFlow()));
}
}
Driver类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41package com.quosimodo.sortflow;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class SortFlowDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SortFlowDriver.class);
job.setMapperClass(SortFlowMapper.class);
job.setReducerClass(SortFlowReducer.class);
job.setMapOutputKeyClass(Flow.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job,
new Path("hdfs://hadoop:9000/result/serial_flow"));
FileOutputFormat.setOutputPath(job,
new Path("hdfs://hadoop:9000/result/sort_flow"));
// 提交任务
job.waitForCompletion(true);
}
}hadoop是之前在hosts中的ip映射,对应虚拟机ip
运行之后会在hdfs的/result目录下生成对应的结果:
按照月份升序排序,如果是同一个月,则按照利润来降序排序
数据(profit.txt):
1 |
|
上传到hdfs的/txt文件夹下
数据存储类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72package com.quosimodo.sortprofit;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class Profit implements WritableComparable<Profit> {
private int month;
private String name = "";
private int profit;
public int getMonth() {
return month;
}
public void setMonth(int month) {
this.month = month;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getProfit() {
return profit;
}
public void setProfit(int profit) {
this.profit = profit;
}
@Override
public int compareTo(Profit o) {
// 先按照月份来排序
int r = this.getMonth() - o.getMonth();
// 如果是同一个月则按照利润来排序
if (r == 0)
return o.getProfit() - this.getProfit();
return r;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(month);
out.writeUTF(name);
out.writeInt(profit);
}
@Override
public void readFields(DataInput in) throws IOException {
this.month = in.readInt();
this.name = in.readUTF();
this.profit = in.readInt();
}
@Override
public String toString() {
return "Profit{" +
"month=" + month +
", name='" + name + '\'' +
", profit=" + profit +
'}';
}
}Mapper类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23package com.quosimodo.sortprofit;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class SortProfitMapper extends Mapper<LongWritable, Text, Profit, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 2 rose 345
String[] arr = value.toString().split(" ");
// 封装对象
Profit p = new Profit();
p.setMonth(Integer.parseInt(arr[0]));
p.setName(arr[1]);
p.setProfit(Integer.parseInt(arr[2]));
context.write(p, NullWritable.get());
}
}
Reducer类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14package com.quosimodo.sortprofit;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class SortProfitReducer extends Reducer<Profit, NullWritable, Profit, NullWritable> {
@Override
protected void reduce(Profit key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
Driver类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36package com.quosimodo.sortprofit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class SortProfitDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SortProfitDriver.class);
job.setMapperClass(SortProfitMapper.class);
job.setReducerClass(SortProfitReducer.class);
job.setOutputKeyClass(Profit.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job,
new Path("hdfs://hadoop:9000/txt/profit.txt"));
FileOutputFormat.setOutputPath(job,
new Path("hdfs://hadoop:9000/result/sort_profit"));
// 提交任务
job.waitForCompletion(true);
}
}hadoop是之前在hosts中的ip映射,对应虚拟机ip
运行之后会在hdfs的/result目录下生成对应的结果:
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!