国产另类ts人妖一区二区_欧美肥老太做爰视频_快穿高h肉_国产欧美综合在线

當前位置: 首頁 / 技術干貨 / 正文
數據采集工具之Flume的選擇器

2023-03-16

channel 事件 寫入   

數據采集工具之Flume的選擇器

  說明

  Flume中的Channel選擇器作用于source階段 ,是決定Source接受的特定事件寫入到哪個Channel的組件,他們告訴Channel處理器,然后由其將事件寫入到Channel。

  Agent中各個組件的交互

  由于Flume不是兩階段提交,事件被寫入到一個Channel,然后事件在寫入下一個Channel之前提交,如果寫入一個Channel出現異常,那么之前已經寫入到其他Channel的相同事件不能被回滾。當這樣的異常發生時,Channel處理器拋出ChannelException異常,事務失敗,如果Source試圖再次寫入相同的事件(大多數情況下,會再次寫入,只有Syslog,Exec等Source不能重試,因為沒有辦法生成相同的數據),重復的事件將寫入到Channel中,而先前的提交是成功的,這樣在Flume中就發生了重復。

  Channel選擇器的配置是通過Channel處理器完成的,Channel選擇器可以指定一組Channel是必須的,另一組的可選的。

  Flume分類兩種選擇器,如果Source配置中沒有指定選擇器,那么會自動使用復制Channel選擇器.

  ●replicating:該選擇器復制每個事件到通過Source的Channels參數指定的所有Channel中。

  ●multiplexing:是一種專門用于動態路由事件的Channel選擇器,通過選擇事件應該寫入到哪個Channel,基于一個特定的事件頭的值進行路由

  案例演示:replicating selector

  配置方案

  [root@qianfeng01 flumeconf]# vi rep.conf

  a1.sources = r1

  a1.channels = c1 c2

  a1.sinks = s1 s2

  a1.sources.r1.type=syslogtcp

  a1.sources.r1.host = qianfeng01

  a1.sources.r1.port = 6666

  a1.sources.r1.selector.type=replicating

  a1.channels.c1.type=memory

  a1.channels.c1.capacity=1000

  a1.channels.c1.transactionCapacity=100

  a1.channels.c1.keep-alive=3

  a1.channels.c1.byteCapacityBufferPercentage=20

  a1.channels.c1.byteCapacity=800000

  a1.channels.c2.type=memory

  a1.channels.c2.capacity=1000

  a1.channels.c2.transactionCapacity=100

  a1.sinks.s1.type=hdfs

  a1.sinks.s1.hdfs.path=hdfs://qianfeng01:8020/flume/%Y/%m/%d/rep

  a1.sinks.s1.hdfs.filePrefix=s1sink

  a1.sinks.s1.hdfs.fileSuffix=.log

  a1.sinks.s1.hdfs.inUseSuffix=.tmp

  a1.sinks.s1.hdfs.rollInterval=60

  a1.sinks.s1.hdfs.rollSize=1024

  a1.sinks.s1.hdfs.rollCount=10

  a1.sinks.s1.hdfs.idleTimeout=0

  a1.sinks.s1.hdfs.batchSize=100

  a1.sinks.s1.hdfs.fileType=DataStream

  a1.sinks.s1.hdfs.writeFormat=Text

  a1.sinks.s1.hdfs.round=true

  a1.sinks.s1.hdfs.roundValue=1

  a1.sinks.s1.hdfs.roundUnit=second

  a1.sinks.s1.hdfs.useLocalTimeStamp=true

  a1.sinks.s2.type=hdfs

  a1.sinks.s2.hdfs.path=hdfs://qianfeng01:8020/flume/%Y/%m/%d/rep

  a1.sinks.s2.hdfs.filePrefix=s2sink

  a1.sinks.s2.hdfs.fileSuffix=.log

  a1.sinks.s2.hdfs.inUseSuffix=.tmp

  a1.sinks.s2.hdfs.rollInterval=60

  a1.sinks.s2.hdfs.rollSize=1024

  a1.sinks.s2.hdfs.rollCount=10

  a1.sinks.s2.hdfs.idleTimeout=0

  a1.sinks.s2.hdfs.batchSize=100

  a1.sinks.s2.hdfs.fileType=DataStream

  a1.sinks.s2.hdfs.writeFormat=Text

  a1.sinks.s2.hdfs.round=true

  a1.sinks.s2.hdfs.roundValue=1

  a1.sinks.s2.hdfs.roundUnit=second

  a1.sinks.s2.hdfs.useLocalTimeStamp=true

  a1.sources.r1.channels=c1 c2

  a1.sinks.s1.channel=c1

  a1.sinks.s2.channel=c2

  啟動agent的服務

  [root@qianfeng01 flumeconf]# flume-ng agent -c ../conf -f ./rep.conf -n a1 -Dflume.root.logger=INFO,console

  測試

  [root@qianfeng01 ~]# echo "hello world hello qianfeng" | nc qianfeng01 6666

  案例演示:Multiplexing selector

  配置方案

  [root@qianfeng01 flumeconf]# vi mul.conf

  a1.sources = r1

  a1.channels = c1 c2

  a1.sinks = s1 s2

  a1.sources.r1.type=http

  a1.sources.r1.bind = qianfeng01

  a1.sources.r1.port = 6666

  a1.sources.r1.selector.type=multiplexing

  a1.sources.r1.selector.header = state

  a1.sources.r1.selector.mapping.USER = c1

  a1.sources.r1.selector.mapping.ORDER = c2

  a1.sources.r1.selector.default = c1

  a1.channels.c1.type=memory

  a1.channels.c1.capacity=1000

  a1.channels.c1.transactionCapacity=100

  a1.channels.c1.keep-alive=3

  a1.channels.c1.byteCapacityBufferPercentage=20

  a1.channels.c1.byteCapacity=800000

  a1.channels.c2.type=memory

  a1.channels.c2.capacity=1000

  a1.channels.c2.transactionCapacity=100

  a1.sinks.s1.type=hdfs

  a1.sinks.s1.hdfs.path=hdfs://qianfeng01:8020/flume/%Y/%m/%d/mul

  a1.sinks.s1.hdfs.filePrefix=s1sink

  a1.sinks.s1.hdfs.fileSuffix=.log

  a1.sinks.s1.hdfs.inUseSuffix=.tmp

  a1.sinks.s1.hdfs.rollInterval=60

  a1.sinks.s1.hdfs.rollSize=1024

  a1.sinks.s1.hdfs.rollCount=10

  a1.sinks.s1.hdfs.idleTimeout=0

  a1.sinks.s1.hdfs.batchSize=100

  a1.sinks.s1.hdfs.fileType=DataStream

  a1.sinks.s1.hdfs.writeFormat=Text

  a1.sinks.s1.hdfs.round=true

  a1.sinks.s1.hdfs.roundValue=1

  a1.sinks.s1.hdfs.roundUnit=second

  a1.sinks.s1.hdfs.useLocalTimeStamp=true

  a1.sinks.s2.type=hdfs

  a1.sinks.s2.hdfs.path=hdfs://qianfeng01:8020/flume/%Y/%m/%d/mul

  a1.sinks.s2.hdfs.filePrefix=s2sink

  a1.sinks.s2.hdfs.fileSuffix=.log

  a1.sinks.s2.hdfs.inUseSuffix=.tmp

  a1.sinks.s2.hdfs.rollInterval=60

  a1.sinks.s2.hdfs.rollSize=1024

  a1.sinks.s2.hdfs.rollCount=10

  a1.sinks.s2.hdfs.idleTimeout=0

  a1.sinks.s2.hdfs.batchSize=100

  a1.sinks.s2.hdfs.fileType=DataStream

  a1.sinks.s2.hdfs.writeFormat=Text

  a1.sinks.s2.hdfs.round=true

  a1.sinks.s2.hdfs.roundValue=1

  a1.sinks.s2.hdfs.roundUnit=second

  a1.sinks.s2.hdfs.useLocalTimeStamp=true

  a1.sources.r1.channels=c1 c2

  a1.sinks.s1.channel=c1

  a1.sinks.s2.channel=c2

  啟動Agent的服務

  [root@qianfeng01 flumeconf]# flume-ng agent -c ../conf -f ./mul.conf -n a1 -Dflume.root.logger=INFO,console

  測試

  [root@qianfeng01 ~]# curl -X POST -d '[{"headers":{"state":"ORDER"},"body":"this my multiplex to c2"}]' http://qianfeng01:6666

  [root@qianfeng01 ~]# curl -X POST -d '[{"headers":{"state":"ORDER"},"body":"this is my content"}]' http://qianfeng01:6666


好程序員公眾號

  • · 剖析行業發展趨勢
  • · 匯聚企業項目源碼

好程序員開班動態

More+
  • HTML5大前端 <高端班>

    開班時間:2021-04-12(深圳)

    開班盛況

    開班時間:2021-05-17(北京)

    開班盛況
  • 大數據+人工智能 <高端班>

    開班時間:2021-03-22(杭州)

    開班盛況

    開班時間:2021-04-26(北京)

    開班盛況
  • JavaEE分布式開發 <高端班>

    開班時間:2021-05-10(北京)

    開班盛況

    開班時間:2021-02-22(北京)

    開班盛況
  • Python人工智能+數據分析 <高端班>

    開班時間:2021-07-12(北京)

    預約報名

    開班時間:2020-09-21(上海)

    開班盛況
  • 云計算開發 <高端班>

    開班時間:2021-07-12(北京)

    預約報名

    開班時間:2019-07-22(北京)

    開班盛況
IT培訓IT培訓
在線咨詢
IT培訓IT培訓
試聽
IT培訓IT培訓
入學教程
IT培訓IT培訓
立即報名
IT培訓

Copyright 2011-2023 北京千鋒互聯科技有限公司 .All Right 京ICP備12003911號-5 京公網安備 11010802035720號