1. 程式人生 > >java操作elasticsearch

java操作elasticsearch

upd 都是 2.4 port nlp mcr .json 不存在 artifacts

Elasticsearch是一個搜索引擎,建立在Lucene之上

集群 (cluster)

  代表一個集群,集群中有多個節點,其中有一個為主節點,這個主節點是可以通過選舉產生的,主從節點是對於集群內部來說的。
  es的一個概念就是去中心化,字面上理解就是無中心節點,這是對於集群外部來說的,因為從外部來看es集群,在邏輯上是個整體,
  你與任何一個節點的通信和與整個es集群通信是等價的。

節點(node)

  每一個運行實例稱為一個節點,每一個運行實例既可以在同一機器上,也可以在不同的機器上.所謂運行實例,就是一個服務器進程.
  在測試環境內,可以在一臺服務器上運行多個服務器進程,在生產環境建議每臺服務器運行一個服務器進程

索引(index)

  這裏的索引是名詞不是動詞,在elasticsearch裏面支持多個索引。類似於關系數據庫裏面每一個服務器可以支持多個數據庫一樣。
  在每一索引下面又支持多種類型,類似於關系數據庫裏面的一個數據庫可以有多張表。但是本質上和關系數據庫有很大的區別。

分片(shards) 

  把一個索引分解為多個小的索引,每一個小的索引叫做分片。分片後就可以把各個分片分配到不同的節點中,構成分布式搜索
  分片的數量只能在索引創建前指定,並且索引創建後不能更改

副本(replicas)

  副本的作用一是提高系統的容錯性,當個某個節點某個分片損壞或丟失時可以從副本中恢復。二是提高es的查詢效率,es會自動對搜索請求進行負載均衡

recovery

  代表數據恢復或叫數據重新分布,es在有節點加入或退出時會根據機器的負載對索引分片進行重新分配,掛掉的節點重新啟動時也會進行數據恢復。

river

  代表es的一個數據源,也是其它存儲方式(如:數據庫)同步數據到es的一個方法。它是以插件方式存在的一個es服務,通過讀取river中的數據並把它索引到es中,
        
   官方的river有couchDB的,RabbitMQ的,Twitter的,Wikipedia的,river這個功能將會在後面的文件中重點說到。

gateway

  代表es索引的持久化存儲方式,es默認是先把索引存放到內存中,當內存滿了時再持久化到硬盤。當這個es集群關閉再重新啟動時就會從gateway中讀取索引數據。
        
    es支持多種類型的gateway,有本地文件系統(默認),分布式文件系統,Hadoop的HDFS和amazon的s3雲存儲服務。

    
---將各種集群狀態信息、索引配置信息等全部持久存放在網關中

discovery.zen

  代表es的自動發現節點機制,es是一個基於p2p的系統,它先通過廣播尋找存在的節點,再通過多播協議來進行節點之間的通信,同時也支持點對點的交互。

Transport

  代表es內部節點或集群與客戶端的交互方式,默認內部是使用tcp協議進行交互,同時它支持http協議(json格式)、thrift、servlet、
  memcached、zeroMQ等的傳輸協議(通過插件方式集成)。

索引(Index)

  ElaticSearch將數據存放在一個或多個索引當中。一個索引相當於一個數據庫,裏面存放用戶文檔數據。在底層,ElasticSearch實際上還是
   使用Lucene完成讀寫數據的操作,ElasticSearch索引是由一個或多個Lucene索引組成,所以ES中的分片或副本實際上就是一個Lucene索引。
 

文檔(Document)

  文檔是ES中主要的實體,所有ES的查詢都是基於存放在ES中文檔資源的查詢。每個文檔都是由各種域(Field)組成,每個域(Field)有一個名
  稱和一個或多個值構成。實際上,從用戶的角度看,一個ES文檔就是一個JSON對象。

映射(Mapping)

  映射用於定義文檔域的屬性,這些屬性包括分詞器,字段類型,存儲類型等。對於沒有定義的字段類型的屬性,ES可以自動通過其字段值進行識別。

類型(Type)

  ES中每個文檔必須有一個類型定義。這裏的類型相當於數據庫當中的表,類型定義了字段映射(類似數據庫表結構),
  這樣一來,每個索引可以包含多種文檔類型,而每種文檔類型定義一種映射關系。

路由(Routing)

  ES給每個文檔建索引後,通過路由可以算出所查的文檔處在哪個分片上,因為在建立索引之初使用公式:shard = hash(routting) % number_of_pr
  imary_shards進行文檔分配。routing值是一個任意的字符串,默認是文檔的ID,通過人工指定就可以控制文檔存放在哪個shard的位置了。

索引別名(Index Alias)

  索引別名相當於快捷方式或軟鏈接,可以指向一個或多個索引,甚至可以指向帶路由的分片。

近實時性 near realtime (nrt)

  Elasticsearch是一個近實時性的搜索平臺,所以對於剛建過的索引文件進行查詢時需要一個輕微的等待時間(通常為1秒)。

java操作elastic:

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <runSuite>**/MainTestSuite.class</runSuite>
        <elasticsearch.plugin.name>sql</elasticsearch.plugin.name>
        <elasticsearch.plugin.site>true</elasticsearch.plugin.site>
        <elasticsearch.plugin.jvm>true</elasticsearch.plugin.jvm>
        <elasticsearch.version>5.6.2</elasticsearch.version>
        <elasticsearch.rest.version>5.5.2</elasticsearch.rest.version>
        <slf4j.version>1.7.7</slf4j.version>
        <elasticsearch.plugin.classname>org.elasticsearch.plugin.nlpcn.SqlPlug</elasticsearch.plugin.classname>
    </properties>
    
    <repositories>
        <repository>
            <id>elasticsearch-releases</id>
            <url>https://artifacts.elastic.co/maven</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>
    
    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>4.0.2.RELEASE</version>
        </dependency>
        
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>${elasticsearch.version}</version>
            <scope>provided</scope>
        </dependency>
            
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>${elasticsearch.version}</version>
            <scope>provided</scope>
        </dependency>
        
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>rest</artifactId>
            <version>${elasticsearch.rest.version}</version>
        </dependency>
        
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>x-pack-transport</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>
            
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.4</version>
        </dependency>
            
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>15.0</version>
        </dependency>
        
        <dependency>
            <groupId>org.hamcrest</groupId>
            <artifactId>hamcrest-all</artifactId>
            <version>1.3</version>
            <scope>test</scope>
        </dependency>
            
        <dependency>
            <groupId>org.locationtech.spatial4j</groupId>
            <artifactId>spatial4j</artifactId>
            <version>0.6</version>
        </dependency>
        
        <dependency>
            <groupId>com.vividsolutions</groupId>
            <artifactId>jts</artifactId>
            <version>1.13</version>
        </dependency>
         
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.1.41</version>
            <scope>test</scope>
        </dependency>
        
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.0.15</version>
        </dependency>
        
        <!-- LOGGING begin -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <!-- common-logging 實際調用slf4j -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>jcl-over-slf4j</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <!-- java.util.logging 實際調用slf4j -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>jul-to-slf4j</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <!-- LOGGING end -->
                         
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>servlet-api</artifactId>
            <version>2.5</version>
            <scope>provided</scope>
        </dependency>
        
        <dependency>  
            <groupId>junit</groupId>  
            <artifactId>junit</artifactId>  
            <version>4.12</version>  
            <scope>test</scope>  
        </dependency>
    </dependencies>

EsQuery

public class EsQuery {
    
    protected static Logger logger = LoggerFactory.getLogger(EsQuery.class);
    
    /**
     * 集群狀態
     */
    public void clusterStatus(){
        ClusterAdminClient clusterAdminClient = ElasticUtil.getClusterClient().admin().cluster();
        ClusterHealthResponse healths = clusterAdminClient.prepareHealth().get();
        String clusterName = healths.getClusterName();
        int numberOfDataNodes = healths.getNumberOfDataNodes();
        int numberOfNodes = healths.getNumberOfNodes();
        ClusterHealthStatus status = healths.getStatus();
        System.out.println("集群名稱:"+clusterName);
        System.out.println("數據節點:"+numberOfDataNodes);
        System.out.println("正常節點:"+numberOfNodes);
        System.out.println("狀態值:"+status.name());
    }
    
    /**
     * 判斷索引庫是否存在
     * @param indexName
     * @return
     */
    public boolean isIndexExists(String indexName) {
        IndicesExistsRequest inExistsRequest = new IndicesExistsRequest(indexName);
        IndicesExistsResponse inExistsResponse = ElasticUtil.getClusterClient().admin().indices()
                .exists(inExistsRequest).actionGet();
        return inExistsResponse.isExists();
    }
    
    /**
     * 創建索引 indexName 索引名稱
     * @param indexName
     * @return
     * @throws IOException
     */
    public boolean createIndex(String indexName){
        if(isIndexExists(indexName)){
            return false;
        }
        CreateIndexResponse response = ElasticUtil.getClusterClient().admin().indices().prepareCreate(indexName).execute().actionGet();
        if(response.isAcknowledged()){
            return true;
        }
        return false;
    }
    

    /**
     * 刪除索引庫  
     * @param indexName
     * @return
     */
    public boolean dropIndex(String indexName) {
        if (!isIndexExists(indexName)) { 
            return false;
        } else {
            DeleteIndexResponse dResponse = ElasticUtil.getClusterClient().admin().indices().prepareDelete(indexName).execute().actionGet();
            if (dResponse.isAcknowledged()) {
                return true;
            }else{
                return false;
            }
        }
    }
    
    /**
     * 采用standard分詞器-默認*/
    public boolean addType(String indexName,String typeName){
        XContentBuilder builder=null;
        try {
            builder = XContentFactory.jsonBuilder()
                    .startObject()
                    .startObject(typeName)
                    .endObject()
                    .endObject();
        } catch (IOException e) {
            e.printStackTrace();
        }
        PutMappingRequest mappingRequest = Requests.putMappingRequest(indexName).type(typeName).source(builder);
        try {
            PutMappingResponse mappingResponse = ElasticUtil.getClusterClient().admin().indices().putMapping(mappingRequest).actionGet();
            if (mappingResponse.isAcknowledged()) {
                return true;
            }else{
                return false;
            }
        } catch (IndexNotFoundException e) {
            System.out.println("索引不存在,創建失敗...");
        }
        return false;
    }
    
    /**
     * 采用IK分詞器
     */
    public boolean addIKType(String indexName,String typeName){
        XContentBuilder builder=null;
        try {
            builder = XContentFactory.jsonBuilder()
                    .startObject()
                    .startObject(typeName)
                    .startObject("properties")
                    .startObject("poi_id").field("type","integer").endObject()
                    .startObject("poi_title").field("type","text").field("analyzer","ik_max_word").endObject()
                    .startObject("poi_address").field("type","text").field("analyzer","ik_max_word").endObject()
                    .startObject("poi_tags").field("type","text").field("analyzer","ik_max_word").endObject()
                    .startObject("poi_phone").field("type","text").endObject()
                    .endObject()
                    .endObject()
                    .endObject();
        } catch (IOException e) {
            e.printStackTrace();
        }
        PutMappingRequest mappingRequest = Requests.putMappingRequest(indexName).type(typeName).source(builder);
        try {
            PutMappingResponse mappingResponse = ElasticUtil.getClusterClient().admin().indices().putMapping(mappingRequest).actionGet();
            if (mappingResponse.isAcknowledged()) {
                return true;
            }else{
                return false;
            }
        } catch (IndexNotFoundException e) {
            System.out.println("索引不存在,創建失敗...");
        }
        return false;
    }
     
    
    /**
     * 添加或修改數據  設置自己的id
     * @param id
     * @param json
     */
    public static String insertOrUpdate(String indexName,String typeName,String id,Map<String, Object> json) {
        if(json==null){
            return null;
        }
        IndexResponse response = ElasticUtil.getClusterClient().prepareIndex(indexName, typeName,id).setSource(json).execute().actionGet();
        return response.getId();
    }
    

    /**
     * 添加或修改數據  使用隨機id
     * @param json
     */
    public String insertOrUpdate(String indexName,String typeName,Map<String, Object> json) {
        if(json==null){
            return null;
        }
        IndexResponse response = ElasticUtil.getClusterClient().prepareIndex(indexName, typeName).setSource(json).execute().actionGet();
        return response.getId();
    }
    
    /**
     * 通過id查詢單條數據
     * @param id
     * @return
     */
    public GetResponse getResourceById(String indexName,String typeName,String id){
        GetResponse response = ElasticUtil.getClusterClient().prepareGet(indexName,typeName, id).get();
        //Map<String, Object> source = response.getSource();
        return response;
    }
    
    /**
     * 刪除數據
     * @param id
     */
    public void deleteResourceByIds(String indexName,String typeName,String[] ids) {
        if(ids==null||ids.length<1){
            return;
        }
        for(String id :ids){
            ElasticUtil.getClusterClient().prepareDelete(indexName, typeName, id)
            .execute().actionGet();
            System.out.println("刪除id: "+id);
        }
        System.out.println("delete over..");
    }
    
    /**
     * 查詢 index/type 數據
     * @param indexName
     */
    public static void simpleQuery(String indexName,String typeName){
        SearchRequestBuilder prepareSearch = ElasticUtil.getClusterClient().prepareSearch(indexName);
        if(typeName!=null && !"".equals(typeName.trim())){
            prepareSearch.setTypes(typeName);
        }
        //        prepareSearch.setFrom(1).setSize(10);
        SearchResponse response = prepareSearch.execute().actionGet();
        SearchHit[] searchHits = response.getHits().getHits();
        for (SearchHit sh : searchHits) {
            System.out.println(sh.getId()+"==>"+sh.getSource());
        }   
        System.out.println("total==> "+searchHits.length);
    }
    
    /**
     * 通過field字段過濾索引庫
     * @param indexName
     * @param typeName
     * @param field
     * @param value
     */
    public void matchFieldQuery(String indexName,String typeName,String field,String value){
        QueryBuilder qb = QueryBuilders.matchQuery(field,value);
        SearchRequestBuilder requestBuilder = ElasticUtil.getClusterClient().prepareSearch(indexName);
        if(typeName!=null&&!"".equals(typeName.trim())){
            requestBuilder.setTypes(typeName);
        }
        SearchResponse response = requestBuilder.setQuery(qb).execute().actionGet();
        SearchHit[] searchHits = response.getHits().getHits();
        for (SearchHit sh : searchHits) {
            System.out.println(sh.getId()+"==>"+sh.getSource());
        }   
        System.out.println("total==> "+searchHits.length);
    }
    
    /**
     * 通過多個field字段過濾索引庫
     * @param indexName
     * @param typeName
     * @param field1
     * @param field2
     * @param value
     */
    public void multiFieldMatchQuery(String indexName,String typeName,String field1,String field2,String value){
        QueryBuilder qb = QueryBuilders.multiMatchQuery(value,field1, field2);
        SearchRequestBuilder requestBuilder = ElasticUtil.getClusterClient().prepareSearch(indexName);
        if(typeName!=null&&!"".equals(typeName.trim())){
            requestBuilder.setTypes(typeName);
        }
        SearchResponse response = requestBuilder.setQuery(qb) .execute().actionGet();
        SearchHit[] searchHits = response.getHits().getHits();
        for (SearchHit sh : searchHits) {
            System.out.println(sh.getId()+"==>"+sh.getSource());
        }   
    }
    
    /**
     * 通過id 獲取多條數據*/
    public void idsQuery(String indexName,String typeName,String[] ids){
        IdsQueryBuilder qb = QueryBuilders.idsQuery().addIds(ids);
        SearchRequestBuilder requestBuilder = ElasticUtil.getClusterClient().prepareSearch(indexName);
        if(typeName!=null&&!"".equals(typeName.trim())){
            requestBuilder.setTypes(typeName);
        }
        SearchResponse response = requestBuilder.setQuery(qb).execute().actionGet();
        SearchHit[] searchHits = response.getHits().getHits();
        for (SearchHit sh : searchHits) {
            System.out.println(sh.getId()+"==>"+sh.getSource());
        }   
        System.out.println("total==> "+searchHits.length);
    }
    
}

java操作elasticsearch