博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flume与HBASE、Kafka集成
阅读量:4207 次
发布时间:2019-05-26

本文共 5027 字,大约阅读时间需要 16 分钟。

Flume与HBASE、Kafka集成相关配置

  这里首先设置两台Flume采集应用服务日志,将数据Push到第三台Flume进行日志合并、预处理。然后通过两个Channel分别将数据发送到HBASE和Kafka中。关于Flume基础可以参照。

  这里配置三台节点机器,其中agent2、agent3节点配置flume,用于从应用服务将数据采集到agent1节点。以agent2节点为例修改配置文件。

agent2.sources = r1agent2.channels = c1agent2.sinks = k1agent2.sources.r1.type = execagent2.sources.r1.command = tail -F /opt/datas/weblogs.logagent2.sources.r1.channels = c1agent2.channels.c1.type = memoryagent2.channels.c1.capacity = 10000agent2.channels.c1.transactionCapacity = 10000agent2.channels.c1.keep-alive = 5agent2.sinks.k1.type = avroagent2.sinks.k1.channel = c1agent2.sinks.k1.hostname = bigdata-pro01agent2.sinks.k1.port = 5555

Flume与HBASE集成

agent1.sources = r1agent1.channels = kafkaC hbaseCagent1.sinks = kafkaSink hbaseSinkagent1.sources.r1.type = avroagent1.sources.r1.channels = hbaseCagent1.sources.r1.bind = bigdata-pro01agent1.sources.r1.port = 5555agent1.sources.r1.threads = 5//Define a memory channel called hbaseC on agent1agent1.channels.hbaseC.type = memoryagent1.channels.hbaseC.capacity = 100000agent1.channels.hbaseC.transactionCapacity = 100000agent1.channels.hbaseC.keep-alive = 20agent1.sinks.hbaseSink.type = asynchbase//同步HBASEagent1.sinks.hbaseSink.table = weblogsagent1.sinks.hbaseSink.columnFamily = infoagent1.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializeragent1.sinks.hbaseSink.channel = hbaseCagent1.sinks.hbaseSink.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl

SimpleAsyncHbaseEventSerializer实现了AsyncHBaseEventSerializer接口。里面有几个变量以及方法:byte[] table——HBASE中Table的名称指定

   byte[] cf——列簇名称
   byte[] payload、byte[] payloadColumn——表中的列名和列值
   String rowPrefix——Rowkey的指定
   Public void configure(Context context)当服务启动之后,加载的时候,先调用这个方法。KeyType定义了一个枚举,最后就可以取得这个列的值。加载配置项的时候获取外部配置文件的一些值。
   Public List< Row > getAction()就是配置文件加载完成之后实时获取一些数据,并且进行对HBASE数据的写入。通过PutRequest这个类直接发送到HBASE中
   Public void setEvent(Event event)从Flume中实时拿取事件的数据。其中event.getBody()指的是一行一行数据

因为拿到的数据格式为:访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL。

需要对日志数据进行格式处理:
将文件中的tab更换成逗号:cat weblog.log|tr “\t” “,” > weblog2.log
将文件中的空格更换成逗号:cat weblog2.log|tr " " “,” > weblog3.log

//这里需要对SimpleAsyncHbaseEventSerializer进行二次开发 @Override    public List getActions() {
List actions = new ArrayList(); if (payloadColumn != null) {
byte[] rowKey; try {
//解析列字段 String[] columns = new String(this.payloadColumn).split(","); //解析Flume采集过来的每行的值,HBase是一列一列写入数据的 String[] values = new String(this.payload).split(","); for(int i=0;i < columns.length;i++){
byte[] colColumn = columns[i].getBytes(); byte[] colValue = values[i].getBytes(Charsets.UTF_8); //数据校验:字段和值是否对应 if(colColumn.length != colValue.length) break; //时间 String datetime = values[0].toString(); //用户id String userid = values[1].toString(); //根据业务自定义Rowkey rowKey = SimpleRowKeyGenerator.getMyRowKey(userid,datetime); //插入数据 PutRequest putRequest = new PutRequest(table, rowKey, cf, colColumn, colValue); actions.add(putRequest); } } catch (Exception e) {
throw new FlumeException("Could not get row key!", e); } } return actions; }//在SimpleRowKeyGenerator类中,根据具体业务自定义Rowkey生成方法//rowKey在设计的时候一定要跟业务关联起来,因为HBASE不像关系型数据库,只要列有了就行,至于查询不需要事先在程序上做设计。但是对于HBASE,没有关联性的查询,只能通过RowKey前缀匹配的方式依次往后匹配进行查询/** * 自定义Rowkey * @param userid * @param datetime * @return * @throws UnsupportedEncodingException */ public static byte[] getMyRowKey(String userid,String datetime)throws UnsupportedEncodingException {
return (userid + datetime + String.valueOf(System.currentTimeMillis())).getBytes("UTF8"); }

Flume与Kafka集成

agent1.sources = r1agent1.channels = kafkaC hbaseCagent1.sinks = kafkaSink hbaseSinkagent1.sources.r1.type = avroagent1.sources.r1.channels = hbaseC kafkaCagent1.sources.r1.bind = bigdata-pro01.kfk.comagent1.sources.r1.port = 5555agent1.sources.r1.threads = 5agent1.channels.kafkaC.type = memoryagent1.channels.kafkaC.capacity = 100000agent1.channels.kafkaC.transactionCapacity = 100000agent1.channels.kafkaC.keep-alive = 20agent1.sinks.kafkaSink.channel = kafkaCagent1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSinkagent1.sinks.kafkaSink.brokerList = bigdata-pro01.kfk.com:9092,bigdata-pro02.kfk.com:9092,bigdata-pro03.kfk.com:9092agent1.sinks.kafkaSink.topic = testagent1.sinks.kafkaSink.zookeeperConnect = bigdata-pro01.kfk.com:2181,bigdata-pro02.kfk.com:2181,bigdata-pro03.kfk.com:2181agent1.sinks.kafkaSink.requiredAcks = 1//producer节点拿到数据之后要给Leader节点,把数据给成功之后,会给生产端一个反馈agent1.sinks.kafkaSink.batchSize = 1agent1.sinks.kafkaSink.serializer.class = kafka.serializer.StringEncoder

转载地址:http://nyhli.baihongyu.com/

你可能感兴趣的文章
1.idea中Maven创建项目及2.对idea中生命周期的理解3.pom文件夹下groupId、artifactId含义
查看>>
LeetCode-栈|双指针-42. 接雨水
查看>>
stdin,stdout,stderr详解
查看>>
Linux文件和设备编程
查看>>
文件描述符
查看>>
终端驱动程序:几个简单例子
查看>>
登录linux密码验证很慢的解决办法
查看>>
fcntl函数总结
查看>>
HTML条件注释
查看>>
Putty远程服务器的SSH经验
查看>>
内核态与用户态
查看>>
使用mingw(fedora)移植virt-viewer
查看>>
趣链 BitXHub跨链平台 (4)跨链网关“初介绍”
查看>>
C++ 字符串string操作
查看>>
MySQL必知必会 -- 了解SQL和MySQL
查看>>
MySQL必知必会 -- 使用MySQL
查看>>
MySQL必知必会 -- 数据检索
查看>>
MySQL必知必会 -- 排序检索数据 ORDER BY
查看>>
MySQL必知必会 -- 数据过滤
查看>>
MYSQL必知必会 -- 用通配符进行过滤
查看>>