国产另类ts人妖一区二区_欧美肥老太做爰视频_快穿高h肉_国产欧美综合在线

當前位置: 首頁 / 技術(shù)干貨 / 正文
數(shù)據(jù)采集工具之Flume 的攔截器

2023-03-15

攔截器    interceptor 事件

  在Flume運行過程中 ,Flume有能力在運行階段修改/刪除Event,這是通過攔截器(Interceptors)來實現(xiàn)的。攔截器有下面幾個特點:

  攔截器需要實現(xiàn)org.apache.flume.interceptor.Interceptor接口。

  攔截器可以修改或刪除事件基于開發(fā)者在選擇器中選擇的任何條件。

  攔截器采用了責任鏈模式,多個攔截器可以按指定順序攔截。

  一個攔截器返回的事件列表被傳遞給鏈中的下一個攔截器。

  如果一個攔截器需要刪除事件,它只需要在返回的事件集中不包含要刪除的事件即可。

  一、系統(tǒng)內(nèi)置攔截器

  Timestamp Interceptor :時間戳攔截器,將當前時間戳(毫秒)加入到events header中,key名字為:timestamp,值為當前時間戳。用的不是很多

  Host Interceptor:主機名攔截器。將運行Flume agent的主機名或者IP地址加入到events header中,key名字為:host(也可自定義)

  Static Interceptor:靜態(tài)攔截器,用于在events header中加入一組靜態(tài)的key和value。

  二、內(nèi)置攔截器的使用

  2.1. Timestamp+HTTP+File+HDFS

  通過時間攔截器,數(shù)據(jù)源為HTTP,傳送的通道模式是FileChannel,最后輸出的目的地為HDFS

  采集方案

  a1.sources = r1

  a1.channels = c1

  a1.sinks = s1

  a1.sources.r1.type=http

  a1.sources.r1.bind = qianfeng01

  a1.sources.r1.port = 6666

  a1.sources.r1.handler = org.apache.flume.source.http.JSONHandler

  a1.sources.r1.handler.nickname = JSON props

  a1.sources.r1.interceptors=i1 i2 i3

  a1.sources.r1.interceptors.i1.type=timestamp

  #如果攔截器中已經(jīng)有了時間戳,直接替換成現(xiàn)在的

  a1.sources.r1.interceptors.i1.preserveExisting=false

  a1.sources.r1.interceptors.i2.type=host

  a1.sources.r1.interceptors.i2.preserveExisting=false

  a1.sources.r1.interceptors.i2.useIP=true

  a1.sources.r1.interceptors.i2.hostHeader=hostname

  a1.sources.r1.interceptors.i3.type=static

  a1.sources.r1.interceptors.i3.preserveExisting=false

  a1.sources.r1.interceptors.i3.key=hn

  a1.sources.r1.interceptors.i3.value=qianfeng01

  a1.channels.c1.type=memory

  a1.channels.c1.capacity=1000

  a1.channels.c1.transactionCapacity=100

  a1.channels.c1.keep-alive=3

  a1.channels.c1.byteCapacityBufferPercentage=20

  a1.channels.c1.byteCapacity=800000

  a1.sinks.s1.type=hdfs

  a1.sinks.s1.hdfs.path=hdfs://qianfeng01:8020/flume/%Y/%m/%d/%H%M

  a1.sinks.s1.hdfs.filePrefix=%{hostname}

  a1.sinks.s1.hdfs.fileSuffix=.log

  a1.sinks.s1.hdfs.inUseSuffix=.tmp

  a1.sinks.s1.hdfs.rollInterval=60

  a1.sinks.s1.hdfs.rollSize=1024

  a1.sinks.s1.hdfs.rollCount=10

  a1.sinks.s1.hdfs.idleTimeout=0

  a1.sinks.s1.hdfs.batchSize=100

  a1.sinks.s1.hdfs.fileType=DataStream

  a1.sinks.s1.hdfs.writeFormat=Text

  a1.sinks.s1.hdfs.round=true

  a1.sinks.s1.hdfs.roundValue=1

  a1.sinks.s1.hdfs.roundUnit=second

  a1.sinks.s1.hdfs.useLocalTimeStamp=true

  a1.sources.r1.channels=c1

  a1.sinks.s1.channel=c1

  啟動 Agent

  [root@qianfeng01 flumeconf]# flume-ng agent -c ../conf -f ./ts.conf -n a1 -Dflume.root.logger=INFO,console

  測試數(shù)據(jù)

  [root@qianfeng01 ~]# curl -X POST -d '[{"headers":{"hn":"qianfeng01","pwd":"123456"},"body":"this is my content qianfeng01"}]' http://qianfeng01:6666

  三、自定義攔截器

  為了提高Flume的擴展性,用戶可以自己定義一個攔截器, 對每一組的item_type和active_time都過濾出相應(yīng)的HOST和USERID

  處理數(shù)據(jù)樣例:

  log='{

  "host":"www.baidu.com",

  "user_id":"13755569427",

  "items":[

  {

  "item_type":"eat",

  "active_time":156234

  },

  {

  "item_type":"car",

  "active_time":156233

  }

  ]

  }'

  結(jié)果樣例:

  {"active_time":156234,"user_id":"13755569427","item_type":"eat","host":"www.baidu.com"}

  3.1. pom.xml

<dependencies>
  <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
  <dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.9.0</version>
  </dependency>
  <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
  <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.48</version>
  </dependency>
</dependencies>

  3.2. 代碼實現(xiàn)

  /**

  * @Author 千鋒大數(shù)據(jù)教學團隊

  * @Company 千鋒好程序員大數(shù)據(jù)

  * @Description 自定義攔截器:對每一組的item_type和active_time都過濾出相應(yīng)的HOST和USERID

  */

  public class MyInterceptor implements Interceptor {

  @Override

  public void initialize() {

  //初始化方法,寫攔截器初始化時的業(yè)務(wù)

  }

  @Override

  public void close() {

  //關(guān)閉方法,寫攔截器關(guān)閉時的代碼

  }

  /**

  * 解析單條event

  * @param event

  * @return

  */

  @Override

  public Event intercept(Event event) {

  //輸入

  String inputeBody=null;

  //輸出

  byte[] outputBoday=null;

  //解析---這里定義對單條Event處理規(guī)則

  try {

  inputeBody=new String(event.getBody(), Charsets.UTF_8);

  ArrayListtemp = new ArrayList<>();

  JSONObject bodyObj = JSON.parseObject(inputeBody);

  //1)公共字段

  String host = bodyObj.getString("host");

  String user_id = bodyObj.getString("user_id");

  JSONArray data = bodyObj.getJSONArray("items");

  //2)Json數(shù)組=>every json obj

  for (Object item : data) {

  JSONObject itemObj = JSON.parseObject(item.toString());

  HashMap<string, object=""> fields = new HashMap<>();

  fields.put("host",host);

  fields.put("user_id",user_id);

  fields.put("item_type",itemObj.getString("item_type"));

  fields.put("active_time",itemObj.getLongValue("active_time"));

  temp.add(new JSONObject(fields).toJSONString());

  }

  //3)Json obj 拼接

  outputBoday=String.join("\n",temp).getBytes();

  }catch (Exception e){

  System.out.println("輸入數(shù)據(jù):"+inputeBody);

  e.printStackTrace();

  }

  event.setBody(outputBoday);

  return event;

  }

  /**

  * 解析一批event

  * @param events

  * @return

  */

  @Override

  public Listintercept(Listevents) {

  //輸出---一批Event

  ArrayListresult = new ArrayList<>();

  //輸入---一批Event

  try{

  for (Event event : events) {

  //一條條解析

  Event interceptedEvent = intercept(event);

  byte[] interceptedEventBody = interceptedEvent.getBody();

  if(interceptedEventBody.length!=0){

  String multiEvent = new String(interceptedEventBody, Charsets.UTF_8);

  String[] multiEventArr = multiEvent.split("\n");

  for (String needEvent : multiEventArr) {

  SimpleEvent simpleEvent = new SimpleEvent();

  simpleEvent.setBody(needEvent.getBytes());

  result.add(simpleEvent);

  }

  }

  }

  }catch (Exception e){

  e.printStackTrace();

  }

  return result;

  }

  /**

  * 實現(xiàn)內(nèi)部類接口

  */

  public static class Builder implements Interceptor.Builder{

  @Override

  public Interceptor build() {

  return new MyInterceptor();

  }

  @Override

  public void configure(Context context) {

  }

  }

  }

  3.3. 打包上傳

  使用maven將攔截器打包,然后把此包和依賴的fastjson一起上傳到Flume lib目錄下

  3.4. 采集方案制定

  a1.sources = s1

  a1.channels = c1

  a1.sinks = r1

  a1.sources.s1.type = TAILDIR

  #文件以JSON格式記錄inode、絕對路徑和每個跟蹤文件的最后位置

  a1.sources.s1.positionFile = /root/flume/taildir_position.json

  #以空格分隔的文件組列表。每個文件組表示要跟蹤的一組文件

  a1.sources.s1.filegroups = f1

  #文件組的絕對路徑

  a1.sources.s1.filegroups.f1=/root/flume/data/.*log

  #是否添加存儲絕對路徑文件名的標題

  a1.sources.s1.fileHeader = true

  #使用自定義攔截器

  a1.sources.s1.interceptors = i1

  a1.sources.s1.interceptors.i1.type = flume.MyInterceptor$Builder

  a1.channels.c1.type = file

  a1.channels.c1.dataDirs = /root/flume/filechannle/dataDirs

  a1.channels.c1.checkpointDir = /root/flume/filechannle/checkpointDir

  a1.channels.c1.capacity = 1000

  a1.channels.c1.transactionCapacity = 100

  a1.sinks.r1.type = hdfs

  a1.sinks.r1.hdfs.path = hdfs://qianfeng01:8020/flume/spooldir

  a1.sinks.r1.hdfs.filePrefix =

  a1.sinks.r1.hdfs.round = true

  a1.sinks.r1.hdfs.roundValue = 10

  a1.sinks.r1.hdfs.roundUnit = minute

  a1.sinks.r1.hdfs.fileSuffix= .log

  a1.sinks.r1.hdfs.rollInterval=60

  a1.sinks.r1.hdfs.fileType=DataStream

  a1.sinks.r1.hdfs.writeFormat=Text

  a1.sources.s1.channels = c1

  a1.sinks.r1.channel = c1

  3.5. 啟動 Agent

  [root@qianfeng01 flumeconf]# flume-ng agent -c ../conf/ -f ./mytest.conf -n a1 -Dflume.root.logger=INFO,console

  3.6. 測試數(shù)據(jù)

  [root@qianfeng01 ~]# vi my.sh

  #!/bin/bash

  log='{

  "host":"www.baidu.com",

  "user_id":"13755569427",

  "items":[

  {

  "item_type":"eat",

  "active_time":156234

  },

  {

  "item_type":"car",

  "active_time":156233

  }

  ]

  }'

  echo $log>> /root/flume/data/test.log

  [root@qianfeng01 ~]# bash my.sh

  執(zhí)行后我們希望得到是數(shù)據(jù)格式:

  {"active_time":156234,"user_id":"13755569427","item_type":"eat","host":"www.baidu.com"}

  {"active_time":156233,"user_id":"13755569427","item_type":"car","host":"www.baidu.com"}


好程序員公眾號

  • · 剖析行業(yè)發(fā)展趨勢
  • · 匯聚企業(yè)項目源碼

好程序員開班動態(tài)

More+
  • HTML5大前端 <高端班>

    開班時間:2021-04-12(深圳)

    開班盛況

    開班時間:2021-05-17(北京)

    開班盛況
  • 大數(shù)據(jù)+人工智能 <高端班>

    開班時間:2021-03-22(杭州)

    開班盛況

    開班時間:2021-04-26(北京)

    開班盛況
  • JavaEE分布式開發(fā) <高端班>

    開班時間:2021-05-10(北京)

    開班盛況

    開班時間:2021-02-22(北京)

    開班盛況
  • Python人工智能+數(shù)據(jù)分析 <高端班>

    開班時間:2021-07-12(北京)

    預(yù)約報名

    開班時間:2020-09-21(上海)

    開班盛況
  • 云計算開發(fā) <高端班>

    開班時間:2021-07-12(北京)

    預(yù)約報名

    開班時間:2019-07-22(北京)

    開班盛況
IT培訓IT培訓
在線咨詢
IT培訓IT培訓
試聽
IT培訓IT培訓
入學教程
IT培訓IT培訓
立即報名
IT培訓

Copyright 2011-2023 北京千鋒互聯(lián)科技有限公司 .All Right 京ICP備12003911號-5 京公網(wǎng)安備 11010802035720號