1. 程式人生 > >Spring Boot2.0.5整合ElasticSearch6.4.1 叢集

Spring Boot2.0.5整合ElasticSearch6.4.1 叢集

轉載請標明出處 https://blog.csdn.net/Amor_Leo/article/details/83012038 謝謝
ES叢集搭建 https://blog.csdn.net/Amor_Leo/article/details/83011372 兩個節點
ES叢集搭建 https://blog.csdn.net/Amor_Leo/article/details/83144739 三個節點
該文參考了 https://blog.csdn.net/chy2z/article/details/80461745

pom.xml

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.5.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>6.4.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.elasticsearch</groupId>
                    <artifactId>elasticsearch</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>6.4.1</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.plugin</groupId>
            <artifactId>transport-netty4-client</artifactId>
            <version>6.4.1</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>6.4.1</version>
        </dependency>
        <!-- swagger2 文件生成 -->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <!-- fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.39</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.4</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

ES配置類

/**
 * @Auther: LHL
 * @Date: 2018/10/10 08:55
 * @Description:  ES 連線  配置
 */
@Configuration
public class ESConfig {

    private static final Logger LOGGER = LoggerFactory.getLogger(ESConfig.class);

    @Bean(name = "transportClient")
    public TransportClient transportClient() {
        LOGGER.info("Elasticsearch初始化開始。。。。。");

        TransportClient transportClient = null;
        try {
            //es叢集連線
            TransportAddress node = new TransportAddress (
                    InetAddress.getByName("192.168.0.128"),
                    9300
            );
            TransportAddress node1 = new TransportAddress (
                    InetAddress.getByName("192.168.0.128"),
                    9301
            );
            TransportAddress node2 = new TransportAddress (
                    InetAddress.getByName("192.168.0.128"),
                    9302
            );
            //es叢集配置(自定義配置) 連線自己安裝的叢集名稱
            Settings settings = Settings.builder()
                    .put("cluster.name", "ESCluster")
                    //增加嗅探機制,找到ES叢集  如果報org.elasticsearch.transport.ReceiveTimeoutTransportException: 把他註釋掉 即可
                    .put("client.transport.sniff", true)
                    .put("thread_pool.search.size", Integer.parseInt("5"))//增加執行緒池個數,暫時設為5
                    .build();
            transportClient = new PreBuiltTransportClient(settings);
            transportClient.addTransportAddress(node);
            transportClient.addTransportAddress(node1);
            transportClient.addTransportAddress(node2);
        } catch (UnknownHostException e) {
            e.printStackTrace();
            LOGGER.error("elasticsearch TransportClient create error!!", e);
        }
        return transportClient;
    }
}
/**
 * @Auther: LHL
 * @Date: 2018/10/16 11:44
 * @Description:  ES批量操作初始化
 */
@Configuration
public class ESClient {

    private static final Logger LOGGER = LogManager.getLogger(ESClient.class);

    @Autowired
    private TransportClient transportClient;

    private static TransportClient client;


    /**
     * @PostContruct  是spring容器初始化的時候執行該方法
     * @param: []
     * @return: void
     * @auther: LHL
     * @date: 2018/10/16 14:19
     */
    @PostConstruct
    public void init() {
        client = this.transportClient;
    }

    @Bean("bulkProcessor")
    public BulkProcessor bulkProcessor() {
        LOGGER.info("bulkProcessor初始化開始。。。。。");
        return BulkProcessor.builder(client, new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long l, BulkRequest bulkRequest) {

            }

            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {

            }

            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
                LOGGER.error("{} data bulk failed,reason :{}", bulkRequest.numberOfActions(), throwable);
            }

        }).setBulkActions(10000)  // 批量匯入個數
                .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))  // 滿5MB進行匯入
                .setFlushInterval(TimeValue.timeValueSeconds(5))  // 沖刷間隔
                .setConcurrentRequests(3)  // 併發數
                .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueSeconds(1), 3))  // 重試3次,間隔1s
                .build();
    }
}

實體類

/**
 * @Auther: LHL
 * @Date: 2018/10/10 11:03
 * @Description:
 */
public class EsModel {

    private String id;
    private Integer age;
    private String name;
    private Date date;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Date getDate() {
        return date;
    }

    public void setDate(Date date) {
        this.date = date;
    }

    @Override
    public String toString() {
        return "EsModel{" +
                "id='" + id + '\'' +
                ", age=" + age +
                ", name='" + name + '\'' +
                ", date=" + date +
                '}';
    }
}

public class Book {

	String id;
	String name;
	String message;
	Double price;
	Date creatDate;

	public Book(String id, String name, String message, Double price, Date creatDate) {
		this.id = id;
		this.name = name;
		this.message = message;
		this.price = price;
		this.creatDate = creatDate;
	}

	public Date getCreatDate() {
		return creatDate;
	}

	public void setCreatDate(Date creatDate) {
		this.creatDate = creatDate;
	}

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public String getMessage() {
		return message;
	}

	public void setMessage(String message) {
		this.message = message;
	}

	public Double getPrice() {
		return price;
	}

	public void setPrice(Double price) {
		this.price = price;
	}
}

工具類

/**
 * @Auther: LHL
 * @Date: 2018/10/10 10:38
 * @Description:
 */
public class EsPage {

    /**
     * 當前頁
     */
    private int currentPage;
    /**
     * 每頁顯示多少條
     */
    private int pageSize;
    /**
     * 總記錄數
     */
    private int recordCount;
    /**
     * 本頁的資料列表
     */
    private List<Map<String, Object>> recordList;
    /**
     * 總頁數
     */
    private int pageCount;
    /**
     * 頁碼列表的開始索引(包含)
     */
    private int beginPageIndex;
    /**
     * 頁碼列表的結束索引(包含)
     */
    private int endPageIndex;

    /**
     * 只接受前4個必要的屬性,會自動的計算出其他3個屬性的值         
     * @param currentPage    
     * @param pageSize     
     * @param recordCount     
     * @param recordList
     */
    public EsPage(int currentPage, int pageSize, int recordCount, List<Map<String, Object>> recordList) {
        this.currentPage = currentPage;
        this.pageSize = pageSize;
        this.recordCount = recordCount;
        this.recordList = recordList;
        // 計算總頁碼
        pageCount = (recordCount + pageSize - 1) / pageSize;
        // 計算 beginPageIndex 和 endPageIndex        // >> 總頁數不多於10頁,則全部顯示
        if (pageCount <= 10) {
            beginPageIndex = 1;
            endPageIndex = pageCount;
        }
        // >> 總頁數多於10頁,則顯示當前頁附近的共10個頁碼
        else {
            // 當前頁附近的共10個頁碼(前4個 + 當前頁 + 後5個)
            beginPageIndex = currentPage - 4;
            endPageIndex = currentPage + 5;
            // 當前面的頁碼不足4個時,則顯示前10個頁碼
            if (beginPageIndex < 1) {
                beginPageIndex = 1;
                endPageIndex = 10;
            }
            // 當後面的頁碼不足5個時,則顯示後10個頁碼
            if (endPageIndex > pageCount) {
                endPageIndex = pageCount;
                beginPageIndex = pageCount - 10 + 1;
            }
        }
    }

    public int getCurrentPage() {
        return currentPage;
    }

    public void setCurrentPage(int currentPage) {
        this.currentPage = currentPage;
    }

    public int getPageSize() {
        return pageSize;
    }

    public void setPageSize(int pageSize) {
        this.pageSize = pageSize;
    }

    public int getRecordCount() {
        return recordCount;
    }

    public void setRecordCount(int recordCount) {
        this.recordCount = recordCount;
    }

    public List<Map<String, Object>> getRecordList() {
        return recordList;
    }

    public void setRecordList(List<Map<String, Object>> recordList) {
        this.recordList = recordList;
    }

    public int getPageCount() {
        return pageCount;
    }

    public void setPageCount(int pageCount) {
        this.pageCount = pageCount;
    }

    public int getBeginPageIndex() {
        return beginPageIndex;
    }

    public void setBeginPageIndex(int beginPageIndex) {
        this.beginPageIndex = beginPageIndex;
    }

    public int getEndPageIndex() {
        return endPageIndex;
    }

    public void setEndPageIndex(int endPageIndex) {
        this.endPageIndex = endPageIndex;
    }

}

/**
 * @Auther: LHL
 * @Date: 2018/10/10 10:20
 * @Description:
 */
@Component
public class ElasticsearchUtil {
    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchUtil.class);

    @Autowired
    private TransportClient transportClient;

    @Autowired
    private BulkProcessor bulkProcessor;

    private static TransportClient client;

    private static BulkProcessor bulk;

    /**
     * @PostContruct是spring框架的註解 spring容器初始化的時候執行該方法
     */
    @PostConstruct
    public void init() {
        client = this.transportClient;
    }

    /**
     * @PostContruct是spring框架的註解 spring容器初始化的時候執行該方法
     */
    @PostConstruct
    public void initBulk() {
        bulk = this.bulkProcessor;
    }

    /**
     * 建立索引以及對映mapping,並給索引某些欄位指定iK分詞,以後向該索引中查詢時,就會用ik分詞。
     * @param: [indexName, esTpye]
     * @return: boolean
     * @auther: LHL
     * @date: 2018/10/15 17:04
     */
    public static boolean createIndex(String indexName, String esTpye) {
        if (!isIndexExist(indexName)) {
            LOGGER.info("Index is not exits!");
        }
        //建立對映
        XContentBuilder mapping = null;
        try {
            mapping = XContentFactory.jsonBuilder()
                    .startObject()
                    .startObject("properties")
                    //.startObject("m_id").field("type","keyword").endObject()
                    // title:欄位名,  type:文字型別       analyzer :分詞器型別
                    .startObject("id").field("type", "text").field("analyzer", "standard").endObject()
                    //該欄位新增的內容,查詢時將會使用ik_smart分詞
                    .startObject("name").field("type", "text").field("analyzer", "standard").endObject()
                    //ik_smart  ik_max_word  standard
                    .startObject("message").field("type", "text").field("analyzer", "standard").endObject()
                    .startObject("price").field("type", "float").endObject()
                    .startObject("creatDate").field("type", "date").endObject()
                    .endObject()
                    .endObject();
        } catch (IOException e) {
            e.printStackTrace();
        }
        //index:索引名   type:型別名
        PutMappingRequest putmap = Requests.putMappingRequest(indexName).type(esTpye).source(mapping);
        //建立索引
        client.admin().indices().prepareCreate(indexName).execute().actionGet();
        //為索引新增對映
        PutMappingResponse indexresponse = client.admin().indices().putMapping(putmap).actionGet();
        LOGGER.info("執行建立成功?" + indexresponse.isAcknowledged());
        return indexresponse.isAcknowledged();
    }

    /**
     * 建立索引
     * @param index
     * @return
     */
    public static boolean createIndex(String index) {
        if (!isIndexExist(index)) {
            LOGGER.info("Index is not exits!");
        }
        CreateIndexResponse indexresponse = client.admin().indices().prepareCreate(index).execute().actionGet();
        LOGGER.info("執行建立成功?" + indexresponse.isAcknowledged());
        return indexresponse.isAcknowledged();
    }

    /**
     * 刪除索引
     * @param index
     * @return
     */
    public static boolean deleteIndex(String index) {
        if (!isIndexExist(index)) {
            LOGGER.info("Index is not exits!");
        }
        DeleteIndexResponse dResponse = client.admin().indices().prepareDelete(index).execute().actionGet();
        if (dResponse.isAcknowledged()) {
            LOGGER.info("delete index " + index + "  successfully!");
        } else {
            LOGGER.info("Fail to delete index " + index);
        }
        return dResponse.isAcknowledged();
    }

    /**
     * 判斷索引是否存在
     * @param index
     * @return
     */
    public static boolean isIndexExist(String index) {
        IndicesExistsResponse inExistsResponse = client.admin().indices().exists(new IndicesExistsRequest(index)).actionGet();
        if (inExistsResponse.isExists()) {
            LOGGER.info("Index [" + index + "] is exist!");
        } else {
            LOGGER.info("Index [" + index + "] is not exist!");
        }
        return inExistsResponse.isExists();
    }

    /**
     * 資料新增,正定ID
     * @param jsonObject 要增加的資料
     * @param index      索引,類似資料庫
     * @param type       型別,類似表
     * @param id         資料ID(為空預設生成)
     * @return
     */
    public static String addData(JSONObject jsonObject, String index, String type, String id) {
        IndexResponse response = client.prepareIndex(index, type, id).setSource(jsonObject).get();
        LOGGER.info("addData response status:{},id:{}", response.status().getStatus(), response.getId());
        return response.getId();
    }

    /**
     * 資料新增
     * @param jsonObject 要增加的資料
     * @param index      索引,類似資料庫
     * @param type       型別,類似表
     * @return
     */
    public static String addData(JSONObject jsonObject, String index, String type) {
        return addData(jsonObject, index, type, UUID.randomUUID().toString().replaceAll("-", "").toUpperCase());
    }

    /**
     * 通過ID刪除資料
     * @param index 索引,類似資料庫
     * @param type  型別,類似表
     * @param id    資料ID
     */
    public static void deleteDataById(String index, String type, String id) {
        DeleteResponse response = client.prepareDelete(index, type, id).execute().actionGet();
        LOGGER.info("deleteDataById response status:{},id:{}", response.status().getStatus(), response.getId());
    }

    /**
     * 通過ID 更新資料
     * @param jsonObject 要增加的資料
     * @param index      索引,類似資料庫
     * @param type       型別,類似表
     * @param id         資料ID
     * @return
     */
    public static void updateDataById(JSONObject jsonObject, String index, String type, String id) {
        UpdateRequest updateRequest = new UpdateRequest();
        updateRequest.index(index).type(type).id(id).doc(jsonObject);
        client.update(updateRequest);
    }

    /**
     * 通過ID獲取資料
     * @param index  索引,類似資料庫
     * @param type   型別,類似表
     * @param id     資料ID
     * @param fields 需要顯示的欄位,逗號分隔(預設為全部欄位)
     * @return 結果
     */
    public static Map<String, Object> searchDataById(String index, String type, String id, String fields) {
        GetRequestBuilder getRequestBuilder = client.prepareGet(index, type, id);
        if (StringUtils.isNotEmpty(fields)) {
            getRequestBuilder.setFetchSource(fields.split(","), null);
        }
        GetResponse getResponse = getRequestBuilder.execute().actionGet();
        return getResponse.getSource();
    }

    /**
     * 批量增加
     * @param: [index, type, bookList]
     * @return: org.elasticsearch.action.bulk.BulkResponse
     * @auther: LHL
     * @date: 2018/10/15 13:12
     */
    public static void bulkAddDocument(String index, String type, List<Book> bookList) {
        BulkRequestBuilder bulkRequest = client.prepareBulk();
        bookList.stream().forEach(
                book -> {
                    try {
                        bulkRequest.add(client.prepareIndex(index, type, book.getId())
                                .setSource(XContentFactory.jsonBuilder()
                                        .startObject()
                                        .field("id", book.getId())
                                        .field("name", book.getName())
                                        .field("creatDate", book.getCreatDate())
                                        .field("price", book.getPrice())
                                        .field("message", book.getMessage())
                                        .endObject()
                                )
                        );
                        if (Integer.valueOf(book.getId()) % 100000 == 0) {
                            BulkResponse responses = bulkRequest.execute().actionGet();
                            if (responses.hasFailures()) {
                                System.out.println("bulk error:" + responses.buildFailureMessage());
                            }
                            LOGGER.info("insert status?" + responses.status());
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
        );
        BulkResponse responses = bulkRequest.execute().actionGet();
        if (responses.hasFailures()) {
            System.out.println("bulk error:" + responses.buildFailureMessage());
        }
        LOGGER.info("insert status?" + responses.status());
    }


    /**
     * bulkProcessor 批量增加
     * @param: [indexName, type, bookList]
     * @return: void
     * @auther: LHL
     * @date: 2018/10/16 11:46
     */
    public static void bulkProcessorAdd(String indexName, String type, List<Book> bookList) {
        bookList.stream().parallel().forEach(
                book -> {
                    try {
                        bulk.add(new IndexRequest(indexName, type, book.getId()).source(XContentFactory.jsonBuilder()
                                .startObject()
                                .field("id", book.getId())
                                .field("name", book.getName())
                                .field("creatDate", book.getCreatDate())
                                .field("price", book.getPrice())
                                .field("message", book.getMessage())
                                .endObject()));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
        );
    }

    /**
     * 批量刪除
     * @param: [index, type, ids]
     * @return: org.elasticsearch.action.bulk.BulkResponse
     * @auther: LHL
     * @date: 2018/10/15 11:42
     */
    public static BulkResponse bulkDeleteDocument(String index, String type, List<String> ids) {
        BulkRequestBuilder bulkRequest = client.prepareBulk();
        ids.stream().forEach(
                id -> bulkRequest.add(client.prepareDelete(index, type, id))
        );
        BulkResponse bulkResponse = bulkRequest.execute().actionGet();
        return bulkResponse;
    }

    /**
     * 查詢所有
     * @param: [indexName]
     * @return: void
     * @auther: LHL
     * @date: 2018/10/15 14:56
     */
    public static List<String> searchAll(String indexName) {
        List<String> resultList = new ArrayList<>();
        String result = null;
        SearchResponse response = client.prepareSearch(indexName).setQuery(matchAllQuery()).setFrom(0).setSize(10).addSort("price", SortOrder.ASC).get();
        for (SearchHit searchHit : response.getHits()) {
            result = searchHit.getSourceAsString();
            resultList.add(result);
        }
        return resultList;
    }

    /**
     * 使用分詞查詢  高亮 排序 ,並分頁
     * @param index          索引名稱
     * @param type           型別名稱,可傳入多個type逗號分隔
     * @param startPage      當前頁
     * @param pageSize       每頁顯示條數
     * @param query          查詢條件
     * @param fields         需要顯示的欄位,逗號分隔(預設為全部欄位)
     * @param sortField      排序欄位
     * @param highlightField 高亮欄位
     * @return 結果
     */
    public static EsPage searchDataPage(String index, String type, int startPage, int pageSize, QueryBuilder query, String fields, String sortField, String highlightField) {
        SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index);
        if (StringUtils.isNotEmpty(type)) {
            searchRequestBuilder.setTypes(type.split(","));
        }
        // 需要顯示的欄位,逗號分隔(預設為全部欄位)
        if (StringUtils.isNotEmpty(fields)) {
            searchRequestBuilder.setFetchSource(fields.split(","), null);
        }//排序欄位
        if (StringUtils.isNotEmpty(sortField)) {
            searchRequestBuilder.addSort(sortField, SortOrder.ASC);
        }// 高亮(xxx=111,aaa=222)
        if (StringUtils.isNotEmpty(highlightField)) {
            HighlightBuilder highlightBuilder = new HighlightBuilder();
            //設定字首
            highlightBuilder.preTags("<span style='color:red' >");
            //設定字尾
            highlightBuilder.postTags("</span>");
            // 設定高亮欄位
            highlightBuilder.field(highlightField);
            searchRequestBuilder.highlighter(highlightBuilder);
        }
        // 設定是否按查詢匹配度排序
        searchRequestBuilder.setExplain(true);
        //如果 pageSize是10 那麼startPage>9990 (10000-pagesize) 如果 20  那麼 >9980 如果 50 那麼>9950
        //深度排序  TODO
        if (startPage > (10000-pageSize)) {
            searchRequestBuilder.setQuery(query);
            searchRequestBuilder
                    .setScroll(TimeValue.timeValueMinutes(1))
                    .setSize(10000);
            //列印的內容 可以在 Elasticsearch head 和 Kibana  上執行查詢
            LOGGER.info("\n{}", searchRequestBuilder);
            // 執行搜尋,返回搜尋響應資訊
            SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
            long totalHits = searchResponse.getHits().totalHits;
            if (searchResponse.status().getStatus() == 200){
                //使用scrollId迭代查詢
                List<Map<String, Object>> result = disposeScrollResult(searchResponse, highlightField);
                List<Map<String, Object>> sourceList = result.stream().parallel().skip((startPage - 1- (10000/pageSize)) * pageSize).limit(pageSize).collect(Collectors.toList());
                return new EsPage(startPage, pageSize, (int) totalHits, sourceList);
            }
        } else {//淺度排序
            searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
            // QUERY_THEN_FETCH    QUERY_AND_FETCH  DFS_QUERY_THEN_FETCH
            searchRequestBuilder.setQuery(matchAllQuery());
            searchRequestBuilder.setQuery(query);
            // 分頁應用
            searchRequestBuilder
                    .setFrom(startPage)
                    .setSize(pageSize);
            //列印的內容 可以在 Elasticsearch head 和 Kibana  上執行查詢
            LOGGER.info("\n{}", searchRequestBuilder);
            // 執行搜尋,返回搜尋響應資訊
            SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
            long totalHits = searchResponse.getHits().totalHits;
            long length = searchResponse.getHits().getHits().length;
            LOGGER.debug("共查詢到[{}]條資料,處理資料條數[{}]", totalHits, length);
            if (searchResponse.status().getStatus() == 200) {
                // 解析物件
                List<Map<String, Object>> sourceList = setSearchResponse(searchResponse, highlightField);
                return new EsPage(startPage, pageSize, (int) totalHits, sourceList);
            }
        }
        return null;
    }
    /**
     * 深度排序 分頁  從當前頁為1001開始
     * @param: [indexName, esType, startPage, pageSize, highlightField]
     * @return: com.aqh.utils.EsPage
     * @auther: LHL
     * @date: 2018/10/17 15:07
     */
    public static EsPage deepPageing(String indexName, String esType, int startPage, int pageSize, String highlightField) {
        System.out.println("scroll 模式啟動!");
        long begin = System.currentTimeMillis();
        //初始化查詢,獲取scrollId
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        boolQueryBuilder.must(QueryBuilders.matchQuery("name", "名"));
        //boolQueryBuilder.filter(QueryBuilders.rangeQuery("price").from("1").to("999821"));
        SearchResponse response = client.prepareSearch(indexName)//對應索引
                .setTypes(esType)//對應索引type
                .setQuery(boolQueryBuilder)
                .addSort("price", SortOrder.ASC)
                .setScroll(TimeValue.timeValueMinutes(1))
                .setSize(10000) //第一次不返回size條資料
                .highlighter(new HighlightBuilder().preTags("<span style='color:red' >").postTags("</span>").field(highlightField))
                .setExplain(true)
                .execute()
                .actionGet();
        long totalHits = response.getHits().totalHits;
        List<Map<String, Object>> result = disposeScrollResult(response, highlightField);
        List<Map<String, Object>> sourceList = result.stream().parallel().skip((startPage - 1-(10000/pageSize)) * pageSize).limit(pageSize).collect(Collectors.toList());
        long end = System.currentTimeMillis();
        System.out.println("耗時: " + (end - begin) + "ms");
        System.out.println("耗時: " + (end - begin) / 1000 + "s");
        System.out.println("查詢"+totalHits+"條資料");
        return new EsPage(startPage, pageSize, (int) totalHits, sourceList);
    }

    /**
     * 處理scroll結果
     * @param: [response, highlightField]
     * @return: java.util.List<java.util.Map<java.lang.String,java.lang.Object>>
     * @auther: LHL
     * @date: 2018/10/17 11:17
     */
    private static   List<Map<String, Object>>   disposeScrollResult(SearchResponse response ,String highlightField){
        List<Map<String, Object>> sourceList = new ArrayList<Map<String, Object>>();
        //使用scrollId迭代查詢
        while (response.getHits().getHits().length > 0) {
            String scrollId = response.getScrollId();
            response = client.prepareSearchScroll(scrollId)
                    .setScroll(TimeValue.timeValueMinutes(1))//設定查詢context的存活時間
                    .execute()
                    .actionGet();
            SearchHits hits = response.getHits();
            for (SearchHit hit : hits.getHits()) {
                Map<String, Object> resultMap =getResultMap(hit, highlightField);
                sourceList.add(resultMap);
//                System.out.println(JSON.toJSONString(resultMap));
            }
        }
        ClearScrollRequest request = new ClearScrollRequest();
        request.addScrollId(response.getScrollId());
        client.clearScroll(request);
        return sourceList;
    }

    /**
     * 使用分詞查詢  排序 高亮
     * @param index          索引名稱
     * @param type           型別名稱,可傳入多個type逗號分隔
     * @param query          查詢條件
     * @param size           文件大小限制
     * @param fields         需要顯示的欄位,逗號分隔(預設為全部欄位)
     * @param sortField      排序欄位
     * @param highlightField 高亮欄位
     * @return 結果
     */
    public static List<Map<String, Object>> searchListData(String index, String type, QueryBuilder query, Integer size, String fields, String sortField, String highlightField) {
        SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index);
        if (StringUtils.isNotEmpty(type)) {
            searchRequestBuilder.setTypes(type.split(","));
        }
        if (StringUtils.isNotEmpty(highlightField)) {
            HighlightBuilder highlightBuilder = new HighlightBuilder();
            // 設定高亮欄位
            highlightBuilder.field(highlightField);
            searchRequestBuilder.highlighter(highlightBuilder);
        }
        searchRequestBuilder.setQuery(query);
        if (StringUtils.isNotEmpty(fields)) {
            searchRequestBuilder.setFetchSource(fields.split(","), null);
        }
        searchRequestBuilder.setFetchSource(true);
        if (StringUtils.isNotEmpty(sortField)) {
            searchRequestBuilder.addSort(sortField, SortOrder.ASC);
        }
        if (size != null && size > 0) {
            searchRequestBuilder.setSize(size);
        }//列印的內容 可以在 Elasticsearch head 和 Kibana  上執行查詢
        LOGGER.info("\n{}", searchRequestBuilder);
        SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
        long totalHits = searchResponse.getHits().totalHits;
        long length = searchResponse.getHits().getHits().length;
        LOGGER.info("共查詢到[{}]條資料,處理資料條數[{}]", totalHits, length);
        if (searchResponse.status().getStatus() == 200) {
            // 解析物件
            return setSearchResponse(searchResponse, highlightField);
        }
        return null;
    }

    /**
     * 高亮結果集 特殊處理
     * @param searchResponse 搜尋的結果集
     * @param highlightField 高亮欄位
     */
    private static List<Map<String, Object>> setSearchResponse(SearchResponse searchResponse, String highlightField) {
        List<Map<String, Object>> sourceList = new ArrayList<Map<String, Object>>();
        for (SearchHit searchHit : searchResponse.getHits().getHits()) {
            Map<String, Object> resultMap = getResultMap(searchHit, highlightField);
            sourceList.add(resultMap);
        }
        return sourceList;
    }

    /**
     * 獲取高亮結果集
     * @param: [hit, highlightField]
     * @return: java.util.Map<java.lang.String,java.lang.Object>
     * @auther: LHL
     * @date: 2018/10/17 10:54
     */
    private  static   Map<String, Object>  getResultMap(SearchHit hit,String highlightField){
        hit.getSourceAsMap().put("id", hit.getId());
        if (StringUtils.isNotEmpty(highlightField)) {
            Text[] text = hit.getHighlightFields().get(highlightField).getFragments();
            String hightStr = null;
            if (text != null) {
                for (Text str : text) {
                    hightStr = str.string();
                }
                //遍歷 高亮結果集,覆蓋 正常結果集
                hit.getSourceAsMap().put(highlightField, hightStr);
            }
        }
        return  hit.getSourceAsMap();
    }

    /**
     * ik分詞測試
     * @param: []
     * @return: void
     * @auther: LHL
     * @date: 2018/10/15 11:31
     */
    public static String ik() {
        StringBuilder stringBuilder = new StringBuilder();
        AnalyzeRequest analyzeRequest = new AnalyzeRequest("entity")
                .text("書名")
                .analyzer("standard");  //ik_smart  ik_max_word  standard
        List<AnalyzeResponse.AnalyzeToken> tokens = client.admin().indices()
                .analyze(analyzeRequest)
                .actionGet()
                .getTokens();
        for (AnalyzeResponse.AnalyzeToken token : tokens) {
            stringBuilder.append(token.getTerm() + "\\r\\n");
        }
        return stringBuilder.toString();
    }
}

Controller 測試類

/**
 * @Auther: LHL
 * @Date: 2018/10/10 10:12
 * @Description:
 */
@RestController
@RequestMapping("/test")
public class TestController {

    @GetMapping("/hello")
    public String helloTest() {
        return "Holle World";
    }


    /**
     * 測試索引
     */
    private String indexName = "entity";
    /**
     * 型別
     */
    private String esType = "book";

    /**
     * http://127.0.0.1:8080/test/createIndex
     * 建立索引
     * @param request
     * @param response
     * @return
     */
    @PutMapping("/createIndex")
    public String createIndex(HttpServletRequest request, HttpServletResponse response) {
        if (!ElasticsearchUtil.isIndexExist(indexName)) {
            ElasticsearchUtil.createIndex(indexName);
        } else {
            return "索引已經存在";
        }
        return "索引建立成功";
    }

    /**
     * 建立索引以及型別,並給索引某些欄位指定iK分詞,以後向該索引中查詢時,就會用ik分詞。
     * @param: [request, response]
     * @return: java.lang.String
     * @auther: LHL
     * @date: 2018/10/15 17:11
     */
    @PutMapping("/createIndexTypeMapping")
    public String createIndexTypeMapping(HttpServletRequest request, HttpServletResponse response) {
        if (!ElasticsearchUtil.isIndexExist(indexName)) {
            ElasticsearchUtil.createIndex(indexName,esType);
        } else {
            return "索引已經存在";
        }
        return "索引建立成功";
    }

    @DeleteMapping("/deleteIndex")
    public String deleteIndex(String indexName, HttpServletRequest request, HttpServletResponse response) {
        boolean b = ElasticsearchUtil.deleteIndex(indexName);
        if (!b){
            return "失敗";
        }
        return "索引刪除成功";
    }

    /**
     * ik分詞測試
     * @param: []
     * @return: void
     * @auther: LHL
     * @date: 2018/10/11 15:13
     */
    @GetMapping("getik")
    public String ikMapping(){
        String ik = ElasticsearchUtil.ik();
        return ik;
    }


    /**
     * 插入記錄
     * @return
     */
    @PostMapping("/insertJson")
    public String insertJson() {
        JSONObject jsonOject = new JSONObject();
        jsonOject.put("id", DateFormatUtils.format(new Date(),"yyyyMMddhhmmss"));
        jsonOject.put("age", 25);
        jsonOject.put("name", "j-" + new Random(100).nextInt());
        jsonOject.put("date", new Date());
        String id = ElasticsearchUtil.addData(jsonOject, indexName, esType, jsonOject.getString("id"));
        return id;
    }

    /**
     * 插入記錄
     * @return
     */
    @PostMapping("/insertModel")
    public String insertModel() {
        EsModel esModel = new EsModel();
        esModel.setId(DateFormatUtils.format(new Date(),"yyyyMMddhhmmss"));
        esModel.setName("m-" + new Random(100).nextInt());
        esModel.setAge(30);
        esModel.setDate(new Date());
        JSONObject jsonObject = (JSONObject) JSONObject.toJSON(esModel);
        String id = ElasticsearchUtil.addData(jsonObject, indexName, esType, jsonObject.getString("id"));
        return id;
    }

    /**
     * 刪除記錄
     * @return
     */
    @DeleteMapping("/delete")
    public String delete(String id) {
        if (StringUtils.isNotBlank(id)) {
            ElasticsearchUtil.deleteDataById(indexName, esType, id);
            return "刪除id=" + id;
        } else {
            return "id為空";
        }
    }

    /**
     * 更新資料
     * @return
     */
    @PutMapping("/update/{id}")
    public String update(@PathVariable("id") String id) {
        if (StringUtils.isNotBlank(id)) {
           JSONObject jsonObject = new JSONObject();
            jsonObject.put("id", id);
            jsonObject.put("age", 31);
            jsonObject.put("name", "修改");
            jsonObject.put("date", new Date());
            ElasticsearchUtil.updateDataById(jsonObject, indexName, esType, id);
            return "id=" + id;
        } else {
            return "id為空";
        }
    }

    /**
     * 獲取資料
     * http://127.0.0.1:8080/test/getData/id
     * @param id
     * @return
     */
    @GetMapping("/getData/{id}")
    public String getData(@PathVariable("id") String id) {
        if (StringUtils.isNotBlank(id)) {
            Map<String, Object> map = ElasticsearchUtil.searchDataById(indexName, esType, id, null);
            return JSONObject.toJSONString(map);
        } else {
            return "id為空";
        }
    }

    /**
     * 查詢資料
     * 模糊查詢
     * @return
     */
    @GetMapping("/queryMatchData")
    public String queryMatchData() {
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
        boolean matchPhrase = false;
        if (matchPhrase == Boolean.TRUE) {
            boolQuery.must(QueryBuilders.matchPhraseQuery("name", "修改書名"));
        } else {
            boolQuery.must(QueryBuilders.matchQuery("name", "修改書名"));
        }
        List<Map<String, Object>> list = ElasticsearchUtil.searchListData(indexName, esType, boolQuery, 10, null, null, null);
        return JSONObject.toJSONString(list);
    }

    /**
     * 萬用字元查詢資料
     * 萬用字元查詢 ?用來匹配1個任意字元,*用來匹配零個或者多個字元
     * @return
     */
    @GetMapping("/queryWildcardData")
    public String queryWildcardData() {
        QueryBuilder queryBuilder = QueryBuilders.wildcardQuery("name.keyword", "書名*2");
        List<Map<String, Object>> list = ElasticsearchUtil.searchListData(indexName, esType, queryBuilder, 10, null, null, null);
        return JSONObject.toJSONString(list);
    }

    /**
     * 正則查詢
     * @return
     */
    @GetMapping("/queryRegexpData")
    public String queryRegexpData() {
        QueryBuilder queryBuilder = QueryBuilders.regexpQuery("name.keyword", "書名[0-9]{1,7}");
        List<Map<String, Object>> list = ElasticsearchUtil.searchListData(indexName, esType, queryBuilder, 10, null, null, null);
        return JSONObject.toJSONString(list);
    }

    /**
     * 查詢數字範圍資料
     * @return
     */
    @GetMapping("/queryIntRangeData")
    public String queryIntRangeData() {
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
        boolQuery.must(QueryBuilders.rangeQuery("price").from(21.0).to(25.0)
                .includeLower(true)  // true  包含下界, false 不包含下界
                .includeUpper(false)); // true  包含下界, false 不包含下界
        List<Map<String, Object>> list = ElasticsearchUtil.searchListData(indexName, esType, boolQuery, 10, null, null, null);
        return JSONObject.toJSONString(list);
    }

    /**
     * 查詢日期範圍資料
     * @return
     */
    @GetMapping("/queryDateRangeData")
    public String queryDateRangeData() {
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
        boolQuery.must(QueryBuilders.rangeQuery("creatDate").from("2018-10-17T02:03:08.829Z").to("2018-10-17T02:03:09.727Z").includeLower(true).includeUpper(true));
        List<Map<String, Object>> list = ElasticsearchUtil.searchListData(indexName, esType, boolQuery, 10, null, null, null);
        return JSONObject.toJSONString(list);
    }

    /**
     * 查詢分頁  高亮   排序
     * @param startPage   第幾條記錄開始
     * 從0開始
     * @param pageSize    每頁大小
     * @return
     */
    @GetMapping("/queryPage")
    public String queryPage(String startPage, String pageSize ,String context) {
        if (StringUtils.isNotBlank(startPage) && StringUtils.isNotBlank(pageSize)) {
            long start = System.currentTimeMillis();
            BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
            boolQuery.must(QueryBuilders.rangeQuery("creatDate").from("2018-10-17T02:03:08.829Z").to("2018-10-17T02:03:09.727Z").includeLower(true).includeUpper(true))
            .filter(QueryBuilders.matchQuery("name",context));
//            boolQuery.filter(QueryBuilders.matchQuery("name",context));
            EsPage list = ElasticsearchUtil.searchDataPage(indexName, esType, Integer.parseInt(startPage), Integer.parseInt(pageSize), boolQuery, null, "price", "name");
            long end = System.currentTimeMillis();
            System.out.println((end-start)/1000+"s");
            System.out.println((end-start)+"ms");
            return JSONObject.toJSONString(list);
        } else {
            return "startPage或者pageSize缺失";
        }
    }

    /**
     * 深度排序 分頁  從當前頁為1001開始
     * @param: [startPage, pageSize]
     * @return: com.aqh.utils.EsPage
     * @auther: LHL
     * @date: 2018/10/17 13:45
     */
    @GetMapping("/deepPageing")
    public EsPage deepPageing(int startPage, int pageSize){
        if (startPage<(10000/pageSize)){
            System.out.println("startPage需要>="+(10000/pageSize));
        }
        long start = System.currentTimeMillis();
        EsPage result = ElasticsearchUtil.deepPageing(indexName, esType, startPage, pageSize, "name");
        long end = System.currentTimeMillis();
        System.out.println((end-start)/1000+"s");
        System.out.println((end-start)+"ms");
        return result;
    }
    /**
     * 批量新增
     * @param: []
     * @return: org.elasticsearch.rest.RestStatus
     * @auther: LHL
     * @date: 2018/10/15 14:10
     */
    @PostMapping("addBulk")
    public void addBulk(){
        List<Book> bookList = Stream
                .iterate(1, i -> i + 1)
                .limit(100000L)
                .parallel()
                .map(integer -> new Book(String.valueOf(integer), "書名" + integer, "資訊" + integer, Double.valueOf(integer), new Date()))
                .collect(Collectors.toList());
        long start = System.currentTimeMillis();
         ElasticsearchUtil.bulkAddDocument(indexName, esType, bookList);
        long end = System.currentTimeMillis();
        System.out.println((end-start)/1000+"s");
        System.out.println((end-start)+"ms");
    }

    /**
     * 批量新增
     * @param: []
     * @return: void
     * @auther: LHL
     * @date: 2018/10/16 13:55
     */
    @PostMapping("/bulkProcessorAdd")
    public  void  bulkProcessorAdd (){
        List<Book> bookList = Stream
                .iterate(1, i -> i + 1)
                .limit(1000000L)
                .parallel()
                .map(integer -> new Book(String.valueOf(integer), "書名" + integer, "資訊" + integer, Double.valueOf(integer), new Date()))
                .collect(Collectors.toList());
        long start = System.currentTimeMillis();
        ElasticsearchUtil.bulkProcessorAdd(indexName, esType, bookList);
        long end = System.currentTimeMillis();
        System.out.println((end-start)/1000+"s");
        System.out.println((end-start)+"ms");
    }

    /**
     * 批量刪除
     * @param: []
     * @return: org.elasticsearch.rest.RestStatus
     * @auther: LHL
     * @date: 2018/10/15 14:18
     */
    @DeleteMapping("deleteBulk")
    public RestStatus deleteBulk(){
        List<String> idsList = Stream
                .iterate(1, i -> i + 1)
                .limit(100000L)
                .parallel()
                .map(integer -> String.valueOf(integer))
                .collect(Collectors.toList());
        long start = System.currentTimeMillis();
        BulkResponse bulkItemResponses = ElasticsearchUtil.bulkDeleteDocument(indexName, esType, idsList);
        if (bulkItemResponses.hasFailures()){
            System.out.println(bulkItemResponses.buildFailureMessage());
        }
        long end = System.currentTimeMillis();
        System.out.println((end-start)/1000+"s");
        System.out.println((end-start)+"ms");
        return bulkItemResponses.status();
    }


    /**
     * 獲取所有
     * @param: []
     * @return: java.lang.String
     * @auther: LHL
     * @date: 2018/10/15 15:03
     */
    @GetMapping("/getAll")
    public  List<String> getAll(){
        return ElasticsearchUtil.searchAll(indexName);
    }
}

Spring Boot 啟動報錯

  • 錯誤資訊
[           main] o.a.catalina.core.AprLifecycleListener   : An incompatible version [1.2.7] of the APR based Apache Tomcat Native library is installed, while Tomcat requires version [1.2.14]
  • tomcat倉庫上下載tomcat-native-1.2.14-ocsp-win32-bin.zip
  • 解壓zip檔案
  • 複製
    • 32 位 tomcat-native-1.2.14-win32-bin\bin下的【tcnative-1.dll 、tcnative-1-src.pdb 】複製放在C:/Windows/system32/下
    • 64 位 tomcat-native-1.2.14-win32-bin\bin\x64下的【tcnative-1.dll 、tcnative-1-src.pdb 】複製放在C:/Windows/system32/下

GitHub程式碼地址 https://github.com/412Aomr/ElasticSearchDemo