项目实训第五天
本文最后更新于:2021年7月23日 晚上
aFlume
Sink
Avro Sink
AVRO Sink将数据利用AVRO序列化之后写出,结合AVRO Source可以实现多级、扇入和扇出流动
多级流动
第一个节点配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop02
a1.sinks.k1.port = 8090
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1第二个节点配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = avro
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop03
a1.sinks.k1.port = 8090
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1第三个节点配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = avro
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1启动命令
1
flume-ng agent -n a1 -c $FLUME_HOME/conf -f duoji.conf -Dflume.root.logger=INFO,console
扇入流动
第一个和第二节点的配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop03
a1.sinks.k1.port = 8090
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1第三个节点的配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = avro
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
扇出流动
第一个节点的配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23a1.sources = s1
a1.channels = c1 c2
a1.sinks = k1 k2
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.channels.c2.type = memory
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop02
a1.sinks.k1.port = 8090
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop03
a1.sinks.k2.port = 8090
a1.sources.s1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2第二个和第三个节点的配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = avro
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
Custom Sink
如果需要自定义Sink,那么需要考虑定义一个类实现
Sink
接口,同时需要考虑让这个类实现Configurable
接口在定义Sink的时候,需要注意Flume中的事务问题
配置
格式文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = http
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
# 配置自定义Sink - 需要指定类的全路径名
a1.sinks.k1.type = cn.tedu.flume.sink.AuthSink
a1.sinks.k1.path = /opt/flumedata
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1启动命令
1
flume-ng agent -n a1 -c $FLUME_HOME/conf -f authsink.conf -Dflume.root.logger=INFO,console
发送POST请求
1
curl -X POST -d '[{"headers":{"class":"big data"},"body":"大数据实训"}]' http://hadoop:8090
例子
Flume->sink(Package)->AuthSink.java:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84package cn.tedu.flume.sink;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import java.io.FileNotFoundException;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.Map;
// 模拟:File Roll Sink,将数据写到指定的目录下
public class AuthSink extends AbstractSink implements Sink, Configurable {
private String path;
private PrintStream ps;
// 获取属性值
@Override
public void configure(Context context) {
// 获取用户指定的存储路径
path = context.getString("path");
// 判断用户是否指定了存储路径
if (path == null)
throw new IllegalArgumentException("没有指定path属性!!!");
}
// 启动Sink
@Override
public synchronized void start() {
// 初始化流
try {
ps = new PrintStream(path + "/" + System.currentTimeMillis());
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
@Override
public Status process() {
// 获取Channel
Channel c = this.getChannel();
// 获取事务
Transaction t = c.getTransaction();
// 开启事务
t.begin();
// 构建Event来存储数据
Event e;
try {
// 从Channel中来获取数据
while ((e = c.take()) != null) {
// 写出headers部分
ps.println("headers:");
Map<String, String> headers = e.getHeaders();
for (Map.Entry<String, String> header : headers.entrySet()) {
ps.println("\t" + header.getKey() + "=" + header.getValue());
}
// 写出body部分
ps.println("body:");
byte[] body = e.getBody();
ps.println("\t" + new String(body, StandardCharsets.UTF_8));
}
// 提交事务
t.commit();
return Status.READY;
} catch (Exception ex) {
ex.printStackTrace();
// 事务回滚
t.rollback();
return Status.BACKOFF;
} finally {
// 关闭事务
t.close();
}
}
// 关闭Sink
@Override
public synchronized void stop() {
if (ps != null)
ps.close();
}
}Flume->source(Package)->AuthSource.java:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79package cn.tedu.flume.source;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
// 模拟:Sequence Generator Source
public class AuthSource extends AbstractSource implements EventDrivenSource, Configurable {
private long step; // 步长
private long end; // 终止范围
private ExecutorService es; // 线程池
// 获取指定属性值
@Override
public void configure(Context context) {
// 如果用户指定了步长,就按照指定步长来递增;如果用户没有指定,那么步长默认为1
step = context.getLong("step", 1L);
// 如果用户指定了范围,就递增到指定的范围;如果用户没有指定,那么范围就是Long.MAX_VALUE
end = context.getLong("end", Long.MAX_VALUE);
}
// 启动Source
@Override
public synchronized void start() {
// 初始化线程池
es = Executors.newFixedThreadPool(5);
// 获取通道处理器用于处理数据
ChannelProcessor cp = this.getChannelProcessor();
// 提交任务
es.submit(new Add(step, end, cp));
}
@Override
public synchronized void stop() {
if (es != null)
es.shutdown();
}
}
class Add implements Runnable {
private final long step;
private final long end;
private final ChannelProcessor cp;
public Add(long step, long end, ChannelProcessor cp) {
this.step = step;
this.end = end;
this.cp = cp;
}
@Override
public void run() {
for (long i = 0; i < end; i += step) {
// 构建headers
Map<String, String> headers = new HashMap<>();
headers.put("time", System.currentTimeMillis() + "");
// 构建body
byte[] body = (i + "").getBytes(StandardCharsets.UTF_8);
// 需要将数据封装成Event对象
Event e = EventBuilder.withBody(body, headers);
// 需要将封装好的Event传递给Channel
cp.processEvent(e);
}
}
}
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!