hbase与flume集成编程

By | 2019年1月31日

1、官网下载src包,解压,需要导入的——》flume-ng-sinks——》flume-ng-hbase-sink

2、编辑SimpleAsyncHbaseEventSerializer:复制一份重命名为MySimpleAsyncHbaseEventSerializer,修改代码,案例:

@Override
  public List<PutRequest> getActions() {
    List<PutRequest> actions = new ArrayList<PutRequest>();
    if (payloadColumn != null) {
      byte[] rowKey;
      try {
        
    	  String[] columns =new String(this.payloadColumn).split(",");
    	  String[] values = new String(this.payload).split(",");
    	  
    	  for(int i=0;i<columns.length;i++) {
    		  
    		  byte[] colColumn = columns[i].getBytes(Charsets.UTF_8);
    		  byte[] colValue = values[i].getBytes(Charsets.UTF_8);
    		  
    		  if(columns.length != values.length) break;
    		  
    		  String datetime = String.valueOf(values[0]);
    		  String userid = String.valueOf(values[1]);	  
    		  rowKey = SimpleRowKeyGenerator.getKfkRowKey(userid, datetime);
    		  
    		  PutRequest putRequest =  new PutRequest(table, rowKey, cf,
    				  colColumn, colValue);
    		        actions.add(putRequest);
    	  }    
      } catch (Exception e) {
        throw new FlumeException("Could not get row key!", e);
      }
    }
    return actions;
  }

可以自定义rowkey:编辑SimpleRowKeyGenerator:

public class SimpleRowKeyGenerator {

  public static byte[] getUUIDKey(String prefix) throws UnsupportedEncodingException {
    return (prefix + UUID.randomUUID().toString()).getBytes("UTF8");
  }

  public static byte[] getRandomKey(String prefix) throws UnsupportedEncodingException {
    return (prefix + String.valueOf(new Random().nextLong())).getBytes("UTF8");
  }

  public static byte[] getTimestampKey(String prefix) throws UnsupportedEncodingException {
    return (prefix + String.valueOf(System.currentTimeMillis())).getBytes("UTF8");
  }

  public static byte[] getNanoTimestampKey(String prefix) throws UnsupportedEncodingException {
    return (prefix + String.valueOf(System.nanoTime())).getBytes("UTF8");
  }
  
  public static byte[] getKfkRowKey(String userid, String datetime) throws UnsupportedEncodingException {
	    return (userid+datetime+ String.valueOf(System.currentTimeMillis())).getBytes("UTF8");
	  }
}

3、需要将刚才编辑的指定默认:

编辑AsyncHBaseSink,查找SimpleAsyncHbaseEventSerializer,改为MySimpleAsyncHbaseEventSerializer,如下:

    if (eventSerializerType == null || eventSerializerType.isEmpty()) {
      eventSerializerType =
          "org.apache.flume.sink.hbase.MyAsyncHbaseEventSerializer";
      logger.info("No serializer defined, Will use default");
    }

4、打jar包,名字改为flum的lib目录下的对应jar包同名,传入lib目录,覆盖原jar包即可

发表评论