本文共 5027 字,大约阅读时间需要 16 分钟。
这里首先设置两台Flume采集应用服务日志,将数据Push到第三台Flume进行日志合并、预处理。然后通过两个Channel分别将数据发送到HBASE和Kafka中。关于Flume基础可以参照。
这里配置三台节点机器,其中agent2、agent3节点配置flume,用于从应用服务将数据采集到agent1节点。以agent2节点为例修改配置文件。agent2.sources = r1agent2.channels = c1agent2.sinks = k1agent2.sources.r1.type = execagent2.sources.r1.command = tail -F /opt/datas/weblogs.logagent2.sources.r1.channels = c1agent2.channels.c1.type = memoryagent2.channels.c1.capacity = 10000agent2.channels.c1.transactionCapacity = 10000agent2.channels.c1.keep-alive = 5agent2.sinks.k1.type = avroagent2.sinks.k1.channel = c1agent2.sinks.k1.hostname = bigdata-pro01agent2.sinks.k1.port = 5555
agent1.sources = r1agent1.channels = kafkaC hbaseCagent1.sinks = kafkaSink hbaseSinkagent1.sources.r1.type = avroagent1.sources.r1.channels = hbaseCagent1.sources.r1.bind = bigdata-pro01agent1.sources.r1.port = 5555agent1.sources.r1.threads = 5//Define a memory channel called hbaseC on agent1agent1.channels.hbaseC.type = memoryagent1.channels.hbaseC.capacity = 100000agent1.channels.hbaseC.transactionCapacity = 100000agent1.channels.hbaseC.keep-alive = 20agent1.sinks.hbaseSink.type = asynchbase//同步HBASEagent1.sinks.hbaseSink.table = weblogsagent1.sinks.hbaseSink.columnFamily = infoagent1.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializeragent1.sinks.hbaseSink.channel = hbaseCagent1.sinks.hbaseSink.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl
SimpleAsyncHbaseEventSerializer实现了AsyncHBaseEventSerializer接口。里面有几个变量以及方法:byte[] table——HBASE中Table的名称指定
byte[] cf——列簇名称 byte[] payload、byte[] payloadColumn——表中的列名和列值 String rowPrefix——Rowkey的指定 Public void configure(Context context)当服务启动之后,加载的时候,先调用这个方法。KeyType定义了一个枚举,最后就可以取得这个列的值。加载配置项的时候获取外部配置文件的一些值。 Public List< Row > getAction()就是配置文件加载完成之后实时获取一些数据,并且进行对HBASE数据的写入。通过PutRequest这个类直接发送到HBASE中 Public void setEvent(Event event)从Flume中实时拿取事件的数据。其中event.getBody()指的是一行一行数据因为拿到的数据格式为:访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL。
需要对日志数据进行格式处理: 将文件中的tab更换成逗号:cat weblog.log|tr “\t” “,” > weblog2.log 将文件中的空格更换成逗号:cat weblog2.log|tr " " “,” > weblog3.log//这里需要对SimpleAsyncHbaseEventSerializer进行二次开发 @Override public List getActions() { List actions = new ArrayList(); if (payloadColumn != null) { byte[] rowKey; try { //解析列字段 String[] columns = new String(this.payloadColumn).split(","); //解析Flume采集过来的每行的值,HBase是一列一列写入数据的 String[] values = new String(this.payload).split(","); for(int i=0;i < columns.length;i++){ byte[] colColumn = columns[i].getBytes(); byte[] colValue = values[i].getBytes(Charsets.UTF_8); //数据校验:字段和值是否对应 if(colColumn.length != colValue.length) break; //时间 String datetime = values[0].toString(); //用户id String userid = values[1].toString(); //根据业务自定义Rowkey rowKey = SimpleRowKeyGenerator.getMyRowKey(userid,datetime); //插入数据 PutRequest putRequest = new PutRequest(table, rowKey, cf, colColumn, colValue); actions.add(putRequest); } } catch (Exception e) { throw new FlumeException("Could not get row key!", e); } } return actions; }//在SimpleRowKeyGenerator类中,根据具体业务自定义Rowkey生成方法//rowKey在设计的时候一定要跟业务关联起来,因为HBASE不像关系型数据库,只要列有了就行,至于查询不需要事先在程序上做设计。但是对于HBASE,没有关联性的查询,只能通过RowKey前缀匹配的方式依次往后匹配进行查询/** * 自定义Rowkey * @param userid * @param datetime * @return * @throws UnsupportedEncodingException */ public static byte[] getMyRowKey(String userid,String datetime)throws UnsupportedEncodingException { return (userid + datetime + String.valueOf(System.currentTimeMillis())).getBytes("UTF8"); }
agent1.sources = r1agent1.channels = kafkaC hbaseCagent1.sinks = kafkaSink hbaseSinkagent1.sources.r1.type = avroagent1.sources.r1.channels = hbaseC kafkaCagent1.sources.r1.bind = bigdata-pro01.kfk.comagent1.sources.r1.port = 5555agent1.sources.r1.threads = 5agent1.channels.kafkaC.type = memoryagent1.channels.kafkaC.capacity = 100000agent1.channels.kafkaC.transactionCapacity = 100000agent1.channels.kafkaC.keep-alive = 20agent1.sinks.kafkaSink.channel = kafkaCagent1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSinkagent1.sinks.kafkaSink.brokerList = bigdata-pro01.kfk.com:9092,bigdata-pro02.kfk.com:9092,bigdata-pro03.kfk.com:9092agent1.sinks.kafkaSink.topic = testagent1.sinks.kafkaSink.zookeeperConnect = bigdata-pro01.kfk.com:2181,bigdata-pro02.kfk.com:2181,bigdata-pro03.kfk.com:2181agent1.sinks.kafkaSink.requiredAcks = 1//producer节点拿到数据之后要给Leader节点,把数据给成功之后,会给生产端一个反馈agent1.sinks.kafkaSink.batchSize = 1agent1.sinks.kafkaSink.serializer.class = kafka.serializer.StringEncoder
转载地址:http://nyhli.baihongyu.com/