2023-03-15
攔截器 interceptor 事件
在Flume運行過程中 ,Flume有能力在運行階段修改/刪除Event,這是通過攔截器(Interceptors)來實現(xiàn)的。攔截器有下面幾個特點:
攔截器需要實現(xiàn)org.apache.flume.interceptor.Interceptor接口。
攔截器可以修改或刪除事件基于開發(fā)者在選擇器中選擇的任何條件。
攔截器采用了責任鏈模式,多個攔截器可以按指定順序攔截。
一個攔截器返回的事件列表被傳遞給鏈中的下一個攔截器。
如果一個攔截器需要刪除事件,它只需要在返回的事件集中不包含要刪除的事件即可。
一、系統(tǒng)內(nèi)置攔截器
Timestamp Interceptor :時間戳攔截器,將當前時間戳(毫秒)加入到events header中,key名字為:timestamp,值為當前時間戳。用的不是很多
Host Interceptor:主機名攔截器。將運行Flume agent的主機名或者IP地址加入到events header中,key名字為:host(也可自定義)
Static Interceptor:靜態(tài)攔截器,用于在events header中加入一組靜態(tài)的key和value。
二、內(nèi)置攔截器的使用
2.1. Timestamp+HTTP+File+HDFS
通過時間攔截器,數(shù)據(jù)源為HTTP,傳送的通道模式是FileChannel,最后輸出的目的地為HDFS
采集方案
a1.sources = r1
a1.channels = c1
a1.sinks = s1
a1.sources.r1.type=http
a1.sources.r1.bind = qianfeng01
a1.sources.r1.port = 6666
a1.sources.r1.handler = org.apache.flume.source.http.JSONHandler
a1.sources.r1.handler.nickname = JSON props
a1.sources.r1.interceptors=i1 i2 i3
a1.sources.r1.interceptors.i1.type=timestamp
#如果攔截器中已經(jīng)有了時間戳,直接替換成現(xiàn)在的
a1.sources.r1.interceptors.i1.preserveExisting=false
a1.sources.r1.interceptors.i2.type=host
a1.sources.r1.interceptors.i2.preserveExisting=false
a1.sources.r1.interceptors.i2.useIP=true
a1.sources.r1.interceptors.i2.hostHeader=hostname
a1.sources.r1.interceptors.i3.type=static
a1.sources.r1.interceptors.i3.preserveExisting=false
a1.sources.r1.interceptors.i3.key=hn
a1.sources.r1.interceptors.i3.value=qianfeng01
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.channels.c1.keep-alive=3
a1.channels.c1.byteCapacityBufferPercentage=20
a1.channels.c1.byteCapacity=800000
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://qianfeng01:8020/flume/%Y/%m/%d/%H%M
a1.sinks.s1.hdfs.filePrefix=%{hostname}
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.inUseSuffix=.tmp
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.idleTimeout=0
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=1
a1.sinks.s1.hdfs.roundUnit=second
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1
啟動 Agent
[root@qianfeng01 flumeconf]# flume-ng agent -c ../conf -f ./ts.conf -n a1 -Dflume.root.logger=INFO,console
測試數(shù)據(jù)
[root@qianfeng01 ~]# curl -X POST -d '[{"headers":{"hn":"qianfeng01","pwd":"123456"},"body":"this is my content qianfeng01"}]' http://qianfeng01:6666
三、自定義攔截器
為了提高Flume的擴展性,用戶可以自己定義一個攔截器, 對每一組的item_type和active_time都過濾出相應(yīng)的HOST和USERID
處理數(shù)據(jù)樣例:
log='{
"host":"www.baidu.com",
"user_id":"13755569427",
"items":[
{
"item_type":"eat",
"active_time":156234
},
{
"item_type":"car",
"active_time":156233
}
]
}'
結(jié)果樣例:
{"active_time":156234,"user_id":"13755569427","item_type":"eat","host":"www.baidu.com"}
3.1. pom.xml
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.48</version>
</dependency>
</dependencies>
3.2. 代碼實現(xiàn)
/**
* @Author 千鋒大數(shù)據(jù)教學團隊
* @Company 千鋒好程序員大數(shù)據(jù)
* @Description 自定義攔截器:對每一組的item_type和active_time都過濾出相應(yīng)的HOST和USERID
*/
public class MyInterceptor implements Interceptor {
@Override
public void initialize() {
//初始化方法,寫攔截器初始化時的業(yè)務(wù)
}
@Override
public void close() {
//關(guān)閉方法,寫攔截器關(guān)閉時的代碼
}
/**
* 解析單條event
* @param event
* @return
*/
@Override
public Event intercept(Event event) {
//輸入
String inputeBody=null;
//輸出
byte[] outputBoday=null;
//解析---這里定義對單條Event處理規(guī)則
try {
inputeBody=new String(event.getBody(), Charsets.UTF_8);
ArrayListtemp = new ArrayList<>();
JSONObject bodyObj = JSON.parseObject(inputeBody);
//1)公共字段
String host = bodyObj.getString("host");
String user_id = bodyObj.getString("user_id");
JSONArray data = bodyObj.getJSONArray("items");
//2)Json數(shù)組=>every json obj
for (Object item : data) {
JSONObject itemObj = JSON.parseObject(item.toString());
HashMap<string, object=""> fields = new HashMap<>();
fields.put("host",host);
fields.put("user_id",user_id);
fields.put("item_type",itemObj.getString("item_type"));
fields.put("active_time",itemObj.getLongValue("active_time"));
temp.add(new JSONObject(fields).toJSONString());
}
//3)Json obj 拼接
outputBoday=String.join("\n",temp).getBytes();
}catch (Exception e){
System.out.println("輸入數(shù)據(jù):"+inputeBody);
e.printStackTrace();
}
event.setBody(outputBoday);
return event;
}
/**
* 解析一批event
* @param events
* @return
*/
@Override
public Listintercept(Listevents) {
//輸出---一批Event
ArrayListresult = new ArrayList<>();
//輸入---一批Event
try{
for (Event event : events) {
//一條條解析
Event interceptedEvent = intercept(event);
byte[] interceptedEventBody = interceptedEvent.getBody();
if(interceptedEventBody.length!=0){
String multiEvent = new String(interceptedEventBody, Charsets.UTF_8);
String[] multiEventArr = multiEvent.split("\n");
for (String needEvent : multiEventArr) {
SimpleEvent simpleEvent = new SimpleEvent();
simpleEvent.setBody(needEvent.getBytes());
result.add(simpleEvent);
}
}
}
}catch (Exception e){
e.printStackTrace();
}
return result;
}
/**
* 實現(xiàn)內(nèi)部類接口
*/
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new MyInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
3.3. 打包上傳
使用maven將攔截器打包,然后把此包和依賴的fastjson一起上傳到Flume lib目錄下
3.4. 采集方案制定
a1.sources = s1
a1.channels = c1
a1.sinks = r1
a1.sources.s1.type = TAILDIR
#文件以JSON格式記錄inode、絕對路徑和每個跟蹤文件的最后位置
a1.sources.s1.positionFile = /root/flume/taildir_position.json
#以空格分隔的文件組列表。每個文件組表示要跟蹤的一組文件
a1.sources.s1.filegroups = f1
#文件組的絕對路徑
a1.sources.s1.filegroups.f1=/root/flume/data/.*log
#是否添加存儲絕對路徑文件名的標題
a1.sources.s1.fileHeader = true
#使用自定義攔截器
a1.sources.s1.interceptors = i1
a1.sources.s1.interceptors.i1.type = flume.MyInterceptor$Builder
a1.channels.c1.type = file
a1.channels.c1.dataDirs = /root/flume/filechannle/dataDirs
a1.channels.c1.checkpointDir = /root/flume/filechannle/checkpointDir
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.r1.type = hdfs
a1.sinks.r1.hdfs.path = hdfs://qianfeng01:8020/flume/spooldir
a1.sinks.r1.hdfs.filePrefix =
a1.sinks.r1.hdfs.round = true
a1.sinks.r1.hdfs.roundValue = 10
a1.sinks.r1.hdfs.roundUnit = minute
a1.sinks.r1.hdfs.fileSuffix= .log
a1.sinks.r1.hdfs.rollInterval=60
a1.sinks.r1.hdfs.fileType=DataStream
a1.sinks.r1.hdfs.writeFormat=Text
a1.sources.s1.channels = c1
a1.sinks.r1.channel = c1
3.5. 啟動 Agent
[root@qianfeng01 flumeconf]# flume-ng agent -c ../conf/ -f ./mytest.conf -n a1 -Dflume.root.logger=INFO,console
3.6. 測試數(shù)據(jù)
[root@qianfeng01 ~]# vi my.sh
#!/bin/bash
log='{
"host":"www.baidu.com",
"user_id":"13755569427",
"items":[
{
"item_type":"eat",
"active_time":156234
},
{
"item_type":"car",
"active_time":156233
}
]
}'
echo $log>> /root/flume/data/test.log
[root@qianfeng01 ~]# bash my.sh
執(zhí)行后我們希望得到是數(shù)據(jù)格式:
{"active_time":156234,"user_id":"13755569427","item_type":"eat","host":"www.baidu.com"}
{"active_time":156233,"user_id":"13755569427","item_type":"car","host":"www.baidu.com"}
開班時間:2021-04-12(深圳)
開班盛況開班時間:2021-05-17(北京)
開班盛況開班時間:2021-03-22(杭州)
開班盛況開班時間:2021-04-26(北京)
開班盛況開班時間:2021-05-10(北京)
開班盛況開班時間:2021-02-22(北京)
開班盛況開班時間:2021-07-12(北京)
預(yù)約報名開班時間:2020-09-21(上海)
開班盛況開班時間:2021-07-12(北京)
預(yù)約報名開班時間:2019-07-22(北京)
開班盛況Copyright 2011-2023 北京千鋒互聯(lián)科技有限公司 .All Right 京ICP備12003911號-5 京公網(wǎng)安備 11010802035720號