使用Hbase協作器(Coprocessor)同步資料到ElasticSearch(hbase 版本 1.2.0-cdh5.8.0, es 2.4.0 版本)
阿新 • • 發佈:2018-11-28
參考 https://gitee.com/eminem89/Hbase-Observer-ElasticSearch 上面的程式碼,但是由於我的es版本是2.4.0 和作者的版本不對應導致功能無法正常使用,所以特此記錄修改成能參考
程式碼如下:
maven pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.eminem.hbase</groupId> <artifactId>hbase-observer-elasticsearch</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>hbase-observer-elasticsearch</name> <url>http://maven.apache.org</url> <repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <elasticsearch.version>2.4.0</elasticsearch.version> <hbase-server.version>1.2.0-cdh5.8.2</hbase-server.version> <maven-assembly-plugin.version>2.6</maven-assembly-plugin.version> <commons-logging.version>1.2</commons-logging.version> <junit.version>4.12</junit.version> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase-server.version}</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>${elasticsearch.version}</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>18.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <configuration> <testFailureIgnore>true</testFailureIgnore> <skipTests>true</skipTests> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>${maven-assembly-plugin.version}</version> <configuration> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> <descriptors> <descriptor>assembly.xml</descriptor> </descriptors> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
專案目錄圖
參考原作者的專案基礎之上改動 https://gitee.com/eminem89/Hbase-Observer-ElasticSearch
import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequestBuilder; import org.elasticsearch.action.update.UpdateRequestBuilder; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * Bulk hbase data to ElasticSearch Class */ public class ElasticSearchBulkOperator { private static final Log LOG = LogFactory.getLog(ElasticSearchBulkOperator.class); private static final int MAX_BULK_COUNT = 10000; private static BulkRequestBuilder bulkRequestBuilder = null; private static final Lock commitLock = new ReentrantLock(); private static ScheduledExecutorService scheduledExecutorService = null; static { // init es bulkRequestBuilder bulkRequestBuilder = ESClient.client.prepareBulk(); bulkRequestBuilder.setRefresh(true); // init thread pool and set size 1 scheduledExecutorService = Executors.newScheduledThreadPool(1); // create beeper thread( it will be sync data to ES cluster) // use a commitLock to protected bulk es as thread-save final Runnable beeper = new Runnable() { public void run() { commitLock.lock(); try { bulkRequest(0); } catch (Exception ex) { System.out.println(ex.getMessage()); LOG.error("Time Bulk " + ESClient.indexName + " index error : " + ex.getMessage()); } finally { commitLock.unlock(); } } }; // set time bulk task // set beeper thread(10 second to delay first execution , 30 second period between successive executions) scheduledExecutorService.scheduleAtFixedRate(beeper, 10, 30, TimeUnit.SECONDS); } /** * shutdown time task immediately */ public static void shutdownScheduEx() { if (null != scheduledExecutorService && !scheduledExecutorService.isShutdown()) { scheduledExecutorService.shutdown(); } } /** * bulk request when number of builders is grate then threshold * * @param threshold */ private static void bulkRequest(int threshold) { if (bulkRequestBuilder.numberOfActions() > threshold) { BulkResponse bulkItemResponse = bulkRequestBuilder.execute().actionGet(); if (!bulkItemResponse.hasFailures()) { bulkRequestBuilder = ESClient.client.prepareBulk(); } } } /** * add update builder to bulk * use commitLock to protected bulk as thread-save * @param builder */ public static void addUpdateBuilderToBulk(UpdateRequestBuilder builder) { commitLock.lock(); try { bulkRequestBuilder.add(builder); bulkRequest(MAX_BULK_COUNT); } catch (Exception ex) { LOG.error(" update Bulk " + ESClient.indexName + " index error : " + ex.getMessage()); } finally { commitLock.unlock(); } } /** * add delete builder to bulk * use commitLock to protected bulk as thread-save * * @param builder */ public static void addDeleteBuilderToBulk(DeleteRequestBuilder builder) { commitLock.lock(); try { bulkRequestBuilder.add(builder); bulkRequest(MAX_BULK_COUNT); } catch (Exception ex) { LOG.error(" delete Bulk " + ESClient.indexName + " index error : " + ex.getMessage()); } finally { commitLock.unlock(); } } }
ESClient.java
package org.eminem.hbase.observer6; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.util.StringUtils; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import java.lang.reflect.Field; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; /** * ES Cleint class */ public class ESClient { private static final Log LOG = LogFactory.getLog(ESClient.class); // ElasticSearch的叢集名稱 public static String clusterName; // ElasticSearch的host public static String nodeHost; // ElasticSearch的埠(Java API用的是Transport埠,也就是TCP) public static int nodePort; // ElasticSearch的索引名稱 public static String indexName; // ElasticSearch的型別名稱 public static String typeName; // ElasticSearch Client public static Client client; /** * get Es config * * @return */ public static String getInfo() { List<String> fields = new ArrayList<String>(); try { for (Field f : ESClient.class.getDeclaredFields()) { fields.add(f.getName() + "=" + f.get(null)); } } catch (IllegalAccessException ex) { LOG.error(ex); ex.printStackTrace(); } return StringUtils.join(",",fields); } /** * init ES client */ public static void initEsClient() { try { // 建立配置物件 myClusterName處為es叢集名稱 Settings settings = Settings.settingsBuilder() .put("cluster.name", ESClient.clusterName).build(); // 建立客戶端 host1,host2處為es叢集節點的ip地址 client = TransportClient .builder() .settings(settings) .build() .addTransportAddress( new InetSocketTransportAddress(InetAddress .getByName(ESClient.nodeHost), ESClient.nodePort)); } catch (UnknownHostException e) { LOG.error(e); e.printStackTrace(); } } /** * Close ES client */ public static void closeEsClient() { LOG.info("-----------Close ES client--------------"); client.close(); } public static void main(String[] args) { try { // 建立配置物件 myClusterName處為es叢集名稱 Settings settings = Settings.settingsBuilder() .put("cluster.name", "myClusterName").build(); // 建立客戶端 host1,host2處為es叢集節點的ip地址 TransportClient client = TransportClient .builder() .settings(settings) .build() .addTransportAddress( new InetSocketTransportAddress(InetAddress .getByName("192.168.1.181"), 9300)); } catch (UnknownHostException e) { e.printStackTrace(); } System.out.println("conntect success"); } }
HbaseDataSyncEsObserver.java
package org.eminem.hbase.observer6;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
/**
* Hbase Sync data to Es Class
*/
public class HbaseDataSyncEsObserver extends BaseRegionObserver {
private static final Log LOG = LogFactory.getLog(HbaseDataSyncEsObserver.class);
/**
* read es config from params
* @param env
*/
private static void readConfiguration(CoprocessorEnvironment env) {
Configuration conf = env.getConfiguration();
ESClient.clusterName = conf.get("es_cluster");
ESClient.nodeHost = conf.get("es_host");
ESClient.nodePort = conf.getInt("es_port", -1);
ESClient.indexName = conf.get("es_index");
ESClient.typeName = conf.get("es_type");
}
/**
* start
* @param e
* @throws IOException
*/
@Override
public void start(CoprocessorEnvironment e) throws IOException {
// read config
readConfiguration(e);
// init ES client
ESClient.initEsClient();
LOG.error("------observer init EsClient ------"+ESClient.getInfo());
}
/**
* stop
* @param e
* @throws IOException
*/
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
// close es client
ESClient.closeEsClient();
// shutdown time task
ElasticSearchBulkOperator.shutdownScheduEx();
}
/**
* Called after the client stores a value
* after data put to hbase then prepare update builder to bulk ES
*
* @param e
* @param put
* @param edit
* @param durability
* @throws IOException
*/
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
LOG.error("------observer postPut ------"+ESClient.getInfo()+"\r\n");
String indexId = new String(put.getRow());
LOG.error(indexId+"\r\n");
LOG.error("------observer postPut ------");
try {
NavigableMap<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
Map<String, Object> infoJson = new HashMap<String, Object>();
Map<String, Object> json = new HashMap<String, Object>();
for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()) {
for (Cell cell : entry.getValue()) {
String key = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
json.put(key, value);
}
}
// set hbase family to es
infoJson.put("info", json);
LOG.error("------"+infoJson.toString()+" ------");
LOG.error("------"+json+" ------");
ElasticSearchBulkOperator.addUpdateBuilderToBulk(ESClient.client.prepareUpdate(ESClient.indexName, ESClient.typeName, indexId).setDocAsUpsert(true).setDoc(infoJson));
} catch (Exception ex) {
LOG.error("observer put a doc, index [ " + ESClient.indexName + " ]" + "indexId [" + indexId + "] error : " + ex.getMessage());
}
}
/**
* Called after the client deletes a value.
* after data delete from hbase then prepare delete builder to bulk ES
* @param e
* @param delete
* @param edit
* @param durability
* @throws IOException
*/
@Override
public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException {
String indexId = new String(delete.getRow());
try {
ElasticSearchBulkOperator.addDeleteBuilderToBulk(ESClient.client.prepareDelete(ESClient.indexName, ESClient.typeName, indexId));
} catch (Exception ex) {
LOG.error(ex);
LOG.error("observer delete a doc, index [ " + ESClient.indexName + " ]" + "indexId [" + indexId + "] error : " + ex.getMessage());
}
}
}
部署方式參考https://blog.csdn.net/fxsdbt520/article/details/53884338