Spring Boot2.0.5整合ElasticSearch6.4.1 叢集
阿新 • • 發佈:2018-12-14
轉載請標明出處 https://blog.csdn.net/Amor_Leo/article/details/83012038 謝謝
ES叢集搭建 https://blog.csdn.net/Amor_Leo/article/details/83011372 兩個節點
ES叢集搭建 https://blog.csdn.net/Amor_Leo/article/details/83144739 三個節點
該文參考了 https://blog.csdn.net/chy2z/article/details/80461745
pom.xml
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.5.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>6.4.1</version> <exclusions> <exclusion> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>6.4.1</version> </dependency> <dependency> <groupId>org.elasticsearch.plugin</groupId> <artifactId>transport-netty4-client</artifactId> <version>6.4.1</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>6.4.1</version> </dependency> <!-- swagger2 文件生成 --> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.7.0</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.7.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <!-- fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.39</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.4</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
ES配置類
/** * @Auther: LHL * @Date: 2018/10/10 08:55 * @Description: ES 連線 配置 */ @Configuration public class ESConfig { private static final Logger LOGGER = LoggerFactory.getLogger(ESConfig.class); @Bean(name = "transportClient") public TransportClient transportClient() { LOGGER.info("Elasticsearch初始化開始。。。。。"); TransportClient transportClient = null; try { //es叢集連線 TransportAddress node = new TransportAddress ( InetAddress.getByName("192.168.0.128"), 9300 ); TransportAddress node1 = new TransportAddress ( InetAddress.getByName("192.168.0.128"), 9301 ); TransportAddress node2 = new TransportAddress ( InetAddress.getByName("192.168.0.128"), 9302 ); //es叢集配置(自定義配置) 連線自己安裝的叢集名稱 Settings settings = Settings.builder() .put("cluster.name", "ESCluster") //增加嗅探機制,找到ES叢集 如果報org.elasticsearch.transport.ReceiveTimeoutTransportException: 把他註釋掉 即可 .put("client.transport.sniff", true) .put("thread_pool.search.size", Integer.parseInt("5"))//增加執行緒池個數,暫時設為5 .build(); transportClient = new PreBuiltTransportClient(settings); transportClient.addTransportAddress(node); transportClient.addTransportAddress(node1); transportClient.addTransportAddress(node2); } catch (UnknownHostException e) { e.printStackTrace(); LOGGER.error("elasticsearch TransportClient create error!!", e); } return transportClient; } }
/** * @Auther: LHL * @Date: 2018/10/16 11:44 * @Description: ES批量操作初始化 */ @Configuration public class ESClient { private static final Logger LOGGER = LogManager.getLogger(ESClient.class); @Autowired private TransportClient transportClient; private static TransportClient client; /** * @PostContruct 是spring容器初始化的時候執行該方法 * @param: [] * @return: void * @auther: LHL * @date: 2018/10/16 14:19 */ @PostConstruct public void init() { client = this.transportClient; } @Bean("bulkProcessor") public BulkProcessor bulkProcessor() { LOGGER.info("bulkProcessor初始化開始。。。。。"); return BulkProcessor.builder(client, new BulkProcessor.Listener() { @Override public void beforeBulk(long l, BulkRequest bulkRequest) { } @Override public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) { } @Override public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) { LOGGER.error("{} data bulk failed,reason :{}", bulkRequest.numberOfActions(), throwable); } }).setBulkActions(10000) // 批量匯入個數 .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) // 滿5MB進行匯入 .setFlushInterval(TimeValue.timeValueSeconds(5)) // 沖刷間隔 .setConcurrentRequests(3) // 併發數 .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueSeconds(1), 3)) // 重試3次,間隔1s .build(); } }
實體類
/**
* @Auther: LHL
* @Date: 2018/10/10 11:03
* @Description:
*/
public class EsModel {
private String id;
private Integer age;
private String name;
private Date date;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Date getDate() {
return date;
}
public void setDate(Date date) {
this.date = date;
}
@Override
public String toString() {
return "EsModel{" +
"id='" + id + '\'' +
", age=" + age +
", name='" + name + '\'' +
", date=" + date +
'}';
}
}
public class Book {
String id;
String name;
String message;
Double price;
Date creatDate;
public Book(String id, String name, String message, Double price, Date creatDate) {
this.id = id;
this.name = name;
this.message = message;
this.price = price;
this.creatDate = creatDate;
}
public Date getCreatDate() {
return creatDate;
}
public void setCreatDate(Date creatDate) {
this.creatDate = creatDate;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
}
工具類
/**
* @Auther: LHL
* @Date: 2018/10/10 10:38
* @Description:
*/
public class EsPage {
/**
* 當前頁
*/
private int currentPage;
/**
* 每頁顯示多少條
*/
private int pageSize;
/**
* 總記錄數
*/
private int recordCount;
/**
* 本頁的資料列表
*/
private List<Map<String, Object>> recordList;
/**
* 總頁數
*/
private int pageCount;
/**
* 頁碼列表的開始索引(包含)
*/
private int beginPageIndex;
/**
* 頁碼列表的結束索引(包含)
*/
private int endPageIndex;
/**
* 只接受前4個必要的屬性,會自動的計算出其他3個屬性的值
* @param currentPage
* @param pageSize
* @param recordCount
* @param recordList
*/
public EsPage(int currentPage, int pageSize, int recordCount, List<Map<String, Object>> recordList) {
this.currentPage = currentPage;
this.pageSize = pageSize;
this.recordCount = recordCount;
this.recordList = recordList;
// 計算總頁碼
pageCount = (recordCount + pageSize - 1) / pageSize;
// 計算 beginPageIndex 和 endPageIndex // >> 總頁數不多於10頁,則全部顯示
if (pageCount <= 10) {
beginPageIndex = 1;
endPageIndex = pageCount;
}
// >> 總頁數多於10頁,則顯示當前頁附近的共10個頁碼
else {
// 當前頁附近的共10個頁碼(前4個 + 當前頁 + 後5個)
beginPageIndex = currentPage - 4;
endPageIndex = currentPage + 5;
// 當前面的頁碼不足4個時,則顯示前10個頁碼
if (beginPageIndex < 1) {
beginPageIndex = 1;
endPageIndex = 10;
}
// 當後面的頁碼不足5個時,則顯示後10個頁碼
if (endPageIndex > pageCount) {
endPageIndex = pageCount;
beginPageIndex = pageCount - 10 + 1;
}
}
}
public int getCurrentPage() {
return currentPage;
}
public void setCurrentPage(int currentPage) {
this.currentPage = currentPage;
}
public int getPageSize() {
return pageSize;
}
public void setPageSize(int pageSize) {
this.pageSize = pageSize;
}
public int getRecordCount() {
return recordCount;
}
public void setRecordCount(int recordCount) {
this.recordCount = recordCount;
}
public List<Map<String, Object>> getRecordList() {
return recordList;
}
public void setRecordList(List<Map<String, Object>> recordList) {
this.recordList = recordList;
}
public int getPageCount() {
return pageCount;
}
public void setPageCount(int pageCount) {
this.pageCount = pageCount;
}
public int getBeginPageIndex() {
return beginPageIndex;
}
public void setBeginPageIndex(int beginPageIndex) {
this.beginPageIndex = beginPageIndex;
}
public int getEndPageIndex() {
return endPageIndex;
}
public void setEndPageIndex(int endPageIndex) {
this.endPageIndex = endPageIndex;
}
}
/**
* @Auther: LHL
* @Date: 2018/10/10 10:20
* @Description:
*/
@Component
public class ElasticsearchUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchUtil.class);
@Autowired
private TransportClient transportClient;
@Autowired
private BulkProcessor bulkProcessor;
private static TransportClient client;
private static BulkProcessor bulk;
/**
* @PostContruct是spring框架的註解 spring容器初始化的時候執行該方法
*/
@PostConstruct
public void init() {
client = this.transportClient;
}
/**
* @PostContruct是spring框架的註解 spring容器初始化的時候執行該方法
*/
@PostConstruct
public void initBulk() {
bulk = this.bulkProcessor;
}
/**
* 建立索引以及對映mapping,並給索引某些欄位指定iK分詞,以後向該索引中查詢時,就會用ik分詞。
* @param: [indexName, esTpye]
* @return: boolean
* @auther: LHL
* @date: 2018/10/15 17:04
*/
public static boolean createIndex(String indexName, String esTpye) {
if (!isIndexExist(indexName)) {
LOGGER.info("Index is not exits!");
}
//建立對映
XContentBuilder mapping = null;
try {
mapping = XContentFactory.jsonBuilder()
.startObject()
.startObject("properties")
//.startObject("m_id").field("type","keyword").endObject()
// title:欄位名, type:文字型別 analyzer :分詞器型別
.startObject("id").field("type", "text").field("analyzer", "standard").endObject()
//該欄位新增的內容,查詢時將會使用ik_smart分詞
.startObject("name").field("type", "text").field("analyzer", "standard").endObject()
//ik_smart ik_max_word standard
.startObject("message").field("type", "text").field("analyzer", "standard").endObject()
.startObject("price").field("type", "float").endObject()
.startObject("creatDate").field("type", "date").endObject()
.endObject()
.endObject();
} catch (IOException e) {
e.printStackTrace();
}
//index:索引名 type:型別名
PutMappingRequest putmap = Requests.putMappingRequest(indexName).type(esTpye).source(mapping);
//建立索引
client.admin().indices().prepareCreate(indexName).execute().actionGet();
//為索引新增對映
PutMappingResponse indexresponse = client.admin().indices().putMapping(putmap).actionGet();
LOGGER.info("執行建立成功?" + indexresponse.isAcknowledged());
return indexresponse.isAcknowledged();
}
/**
* 建立索引
* @param index
* @return
*/
public static boolean createIndex(String index) {
if (!isIndexExist(index)) {
LOGGER.info("Index is not exits!");
}
CreateIndexResponse indexresponse = client.admin().indices().prepareCreate(index).execute().actionGet();
LOGGER.info("執行建立成功?" + indexresponse.isAcknowledged());
return indexresponse.isAcknowledged();
}
/**
* 刪除索引
* @param index
* @return
*/
public static boolean deleteIndex(String index) {
if (!isIndexExist(index)) {
LOGGER.info("Index is not exits!");
}
DeleteIndexResponse dResponse = client.admin().indices().prepareDelete(index).execute().actionGet();
if (dResponse.isAcknowledged()) {
LOGGER.info("delete index " + index + " successfully!");
} else {
LOGGER.info("Fail to delete index " + index);
}
return dResponse.isAcknowledged();
}
/**
* 判斷索引是否存在
* @param index
* @return
*/
public static boolean isIndexExist(String index) {
IndicesExistsResponse inExistsResponse = client.admin().indices().exists(new IndicesExistsRequest(index)).actionGet();
if (inExistsResponse.isExists()) {
LOGGER.info("Index [" + index + "] is exist!");
} else {
LOGGER.info("Index [" + index + "] is not exist!");
}
return inExistsResponse.isExists();
}
/**
* 資料新增,正定ID
* @param jsonObject 要增加的資料
* @param index 索引,類似資料庫
* @param type 型別,類似表
* @param id 資料ID(為空預設生成)
* @return
*/
public static String addData(JSONObject jsonObject, String index, String type, String id) {
IndexResponse response = client.prepareIndex(index, type, id).setSource(jsonObject).get();
LOGGER.info("addData response status:{},id:{}", response.status().getStatus(), response.getId());
return response.getId();
}
/**
* 資料新增
* @param jsonObject 要增加的資料
* @param index 索引,類似資料庫
* @param type 型別,類似表
* @return
*/
public static String addData(JSONObject jsonObject, String index, String type) {
return addData(jsonObject, index, type, UUID.randomUUID().toString().replaceAll("-", "").toUpperCase());
}
/**
* 通過ID刪除資料
* @param index 索引,類似資料庫
* @param type 型別,類似表
* @param id 資料ID
*/
public static void deleteDataById(String index, String type, String id) {
DeleteResponse response = client.prepareDelete(index, type, id).execute().actionGet();
LOGGER.info("deleteDataById response status:{},id:{}", response.status().getStatus(), response.getId());
}
/**
* 通過ID 更新資料
* @param jsonObject 要增加的資料
* @param index 索引,類似資料庫
* @param type 型別,類似表
* @param id 資料ID
* @return
*/
public static void updateDataById(JSONObject jsonObject, String index, String type, String id) {
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index(index).type(type).id(id).doc(jsonObject);
client.update(updateRequest);
}
/**
* 通過ID獲取資料
* @param index 索引,類似資料庫
* @param type 型別,類似表
* @param id 資料ID
* @param fields 需要顯示的欄位,逗號分隔(預設為全部欄位)
* @return 結果
*/
public static Map<String, Object> searchDataById(String index, String type, String id, String fields) {
GetRequestBuilder getRequestBuilder = client.prepareGet(index, type, id);
if (StringUtils.isNotEmpty(fields)) {
getRequestBuilder.setFetchSource(fields.split(","), null);
}
GetResponse getResponse = getRequestBuilder.execute().actionGet();
return getResponse.getSource();
}
/**
* 批量增加
* @param: [index, type, bookList]
* @return: org.elasticsearch.action.bulk.BulkResponse
* @auther: LHL
* @date: 2018/10/15 13:12
*/
public static void bulkAddDocument(String index, String type, List<Book> bookList) {
BulkRequestBuilder bulkRequest = client.prepareBulk();
bookList.stream().forEach(
book -> {
try {
bulkRequest.add(client.prepareIndex(index, type, book.getId())
.setSource(XContentFactory.jsonBuilder()
.startObject()
.field("id", book.getId())
.field("name", book.getName())
.field("creatDate", book.getCreatDate())
.field("price", book.getPrice())
.field("message", book.getMessage())
.endObject()
)
);
if (Integer.valueOf(book.getId()) % 100000 == 0) {
BulkResponse responses = bulkRequest.execute().actionGet();
if (responses.hasFailures()) {
System.out.println("bulk error:" + responses.buildFailureMessage());
}
LOGGER.info("insert status?" + responses.status());
}
} catch (IOException e) {
e.printStackTrace();
}
}
);
BulkResponse responses = bulkRequest.execute().actionGet();
if (responses.hasFailures()) {
System.out.println("bulk error:" + responses.buildFailureMessage());
}
LOGGER.info("insert status?" + responses.status());
}
/**
* bulkProcessor 批量增加
* @param: [indexName, type, bookList]
* @return: void
* @auther: LHL
* @date: 2018/10/16 11:46
*/
public static void bulkProcessorAdd(String indexName, String type, List<Book> bookList) {
bookList.stream().parallel().forEach(
book -> {
try {
bulk.add(new IndexRequest(indexName, type, book.getId()).source(XContentFactory.jsonBuilder()
.startObject()
.field("id", book.getId())
.field("name", book.getName())
.field("creatDate", book.getCreatDate())
.field("price", book.getPrice())
.field("message", book.getMessage())
.endObject()));
} catch (IOException e) {
e.printStackTrace();
}
}
);
}
/**
* 批量刪除
* @param: [index, type, ids]
* @return: org.elasticsearch.action.bulk.BulkResponse
* @auther: LHL
* @date: 2018/10/15 11:42
*/
public static BulkResponse bulkDeleteDocument(String index, String type, List<String> ids) {
BulkRequestBuilder bulkRequest = client.prepareBulk();
ids.stream().forEach(
id -> bulkRequest.add(client.prepareDelete(index, type, id))
);
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
return bulkResponse;
}
/**
* 查詢所有
* @param: [indexName]
* @return: void
* @auther: LHL
* @date: 2018/10/15 14:56
*/
public static List<String> searchAll(String indexName) {
List<String> resultList = new ArrayList<>();
String result = null;
SearchResponse response = client.prepareSearch(indexName).setQuery(matchAllQuery()).setFrom(0).setSize(10).addSort("price", SortOrder.ASC).get();
for (SearchHit searchHit : response.getHits()) {
result = searchHit.getSourceAsString();
resultList.add(result);
}
return resultList;
}
/**
* 使用分詞查詢 高亮 排序 ,並分頁
* @param index 索引名稱
* @param type 型別名稱,可傳入多個type逗號分隔
* @param startPage 當前頁
* @param pageSize 每頁顯示條數
* @param query 查詢條件
* @param fields 需要顯示的欄位,逗號分隔(預設為全部欄位)
* @param sortField 排序欄位
* @param highlightField 高亮欄位
* @return 結果
*/
public static EsPage searchDataPage(String index, String type, int startPage, int pageSize, QueryBuilder query, String fields, String sortField, String highlightField) {
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index);
if (StringUtils.isNotEmpty(type)) {
searchRequestBuilder.setTypes(type.split(","));
}
// 需要顯示的欄位,逗號分隔(預設為全部欄位)
if (StringUtils.isNotEmpty(fields)) {
searchRequestBuilder.setFetchSource(fields.split(","), null);
}//排序欄位
if (StringUtils.isNotEmpty(sortField)) {
searchRequestBuilder.addSort(sortField, SortOrder.ASC);
}// 高亮(xxx=111,aaa=222)
if (StringUtils.isNotEmpty(highlightField)) {
HighlightBuilder highlightBuilder = new HighlightBuilder();
//設定字首
highlightBuilder.preTags("<span style='color:red' >");
//設定字尾
highlightBuilder.postTags("</span>");
// 設定高亮欄位
highlightBuilder.field(highlightField);
searchRequestBuilder.highlighter(highlightBuilder);
}
// 設定是否按查詢匹配度排序
searchRequestBuilder.setExplain(true);
//如果 pageSize是10 那麼startPage>9990 (10000-pagesize) 如果 20 那麼 >9980 如果 50 那麼>9950
//深度排序 TODO
if (startPage > (10000-pageSize)) {
searchRequestBuilder.setQuery(query);
searchRequestBuilder
.setScroll(TimeValue.timeValueMinutes(1))
.setSize(10000);
//列印的內容 可以在 Elasticsearch head 和 Kibana 上執行查詢
LOGGER.info("\n{}", searchRequestBuilder);
// 執行搜尋,返回搜尋響應資訊
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
long totalHits = searchResponse.getHits().totalHits;
if (searchResponse.status().getStatus() == 200){
//使用scrollId迭代查詢
List<Map<String, Object>> result = disposeScrollResult(searchResponse, highlightField);
List<Map<String, Object>> sourceList = result.stream().parallel().skip((startPage - 1- (10000/pageSize)) * pageSize).limit(pageSize).collect(Collectors.toList());
return new EsPage(startPage, pageSize, (int) totalHits, sourceList);
}
} else {//淺度排序
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
// QUERY_THEN_FETCH QUERY_AND_FETCH DFS_QUERY_THEN_FETCH
searchRequestBuilder.setQuery(matchAllQuery());
searchRequestBuilder.setQuery(query);
// 分頁應用
searchRequestBuilder
.setFrom(startPage)
.setSize(pageSize);
//列印的內容 可以在 Elasticsearch head 和 Kibana 上執行查詢
LOGGER.info("\n{}", searchRequestBuilder);
// 執行搜尋,返回搜尋響應資訊
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
long totalHits = searchResponse.getHits().totalHits;
long length = searchResponse.getHits().getHits().length;
LOGGER.debug("共查詢到[{}]條資料,處理資料條數[{}]", totalHits, length);
if (searchResponse.status().getStatus() == 200) {
// 解析物件
List<Map<String, Object>> sourceList = setSearchResponse(searchResponse, highlightField);
return new EsPage(startPage, pageSize, (int) totalHits, sourceList);
}
}
return null;
}
/**
* 深度排序 分頁 從當前頁為1001開始
* @param: [indexName, esType, startPage, pageSize, highlightField]
* @return: com.aqh.utils.EsPage
* @auther: LHL
* @date: 2018/10/17 15:07
*/
public static EsPage deepPageing(String indexName, String esType, int startPage, int pageSize, String highlightField) {
System.out.println("scroll 模式啟動!");
long begin = System.currentTimeMillis();
//初始化查詢,獲取scrollId
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.matchQuery("name", "名"));
//boolQueryBuilder.filter(QueryBuilders.rangeQuery("price").from("1").to("999821"));
SearchResponse response = client.prepareSearch(indexName)//對應索引
.setTypes(esType)//對應索引type
.setQuery(boolQueryBuilder)
.addSort("price", SortOrder.ASC)
.setScroll(TimeValue.timeValueMinutes(1))
.setSize(10000) //第一次不返回size條資料
.highlighter(new HighlightBuilder().preTags("<span style='color:red' >").postTags("</span>").field(highlightField))
.setExplain(true)
.execute()
.actionGet();
long totalHits = response.getHits().totalHits;
List<Map<String, Object>> result = disposeScrollResult(response, highlightField);
List<Map<String, Object>> sourceList = result.stream().parallel().skip((startPage - 1-(10000/pageSize)) * pageSize).limit(pageSize).collect(Collectors.toList());
long end = System.currentTimeMillis();
System.out.println("耗時: " + (end - begin) + "ms");
System.out.println("耗時: " + (end - begin) / 1000 + "s");
System.out.println("查詢"+totalHits+"條資料");
return new EsPage(startPage, pageSize, (int) totalHits, sourceList);
}
/**
* 處理scroll結果
* @param: [response, highlightField]
* @return: java.util.List<java.util.Map<java.lang.String,java.lang.Object>>
* @auther: LHL
* @date: 2018/10/17 11:17
*/
private static List<Map<String, Object>> disposeScrollResult(SearchResponse response ,String highlightField){
List<Map<String, Object>> sourceList = new ArrayList<Map<String, Object>>();
//使用scrollId迭代查詢
while (response.getHits().getHits().length > 0) {
String scrollId = response.getScrollId();
response = client.prepareSearchScroll(scrollId)
.setScroll(TimeValue.timeValueMinutes(1))//設定查詢context的存活時間
.execute()
.actionGet();
SearchHits hits = response.getHits();
for (SearchHit hit : hits.getHits()) {
Map<String, Object> resultMap =getResultMap(hit, highlightField);
sourceList.add(resultMap);
// System.out.println(JSON.toJSONString(resultMap));
}
}
ClearScrollRequest request = new ClearScrollRequest();
request.addScrollId(response.getScrollId());
client.clearScroll(request);
return sourceList;
}
/**
* 使用分詞查詢 排序 高亮
* @param index 索引名稱
* @param type 型別名稱,可傳入多個type逗號分隔
* @param query 查詢條件
* @param size 文件大小限制
* @param fields 需要顯示的欄位,逗號分隔(預設為全部欄位)
* @param sortField 排序欄位
* @param highlightField 高亮欄位
* @return 結果
*/
public static List<Map<String, Object>> searchListData(String index, String type, QueryBuilder query, Integer size, String fields, String sortField, String highlightField) {
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index);
if (StringUtils.isNotEmpty(type)) {
searchRequestBuilder.setTypes(type.split(","));
}
if (StringUtils.isNotEmpty(highlightField)) {
HighlightBuilder highlightBuilder = new HighlightBuilder();
// 設定高亮欄位
highlightBuilder.field(highlightField);
searchRequestBuilder.highlighter(highlightBuilder);
}
searchRequestBuilder.setQuery(query);
if (StringUtils.isNotEmpty(fields)) {
searchRequestBuilder.setFetchSource(fields.split(","), null);
}
searchRequestBuilder.setFetchSource(true);
if (StringUtils.isNotEmpty(sortField)) {
searchRequestBuilder.addSort(sortField, SortOrder.ASC);
}
if (size != null && size > 0) {
searchRequestBuilder.setSize(size);
}//列印的內容 可以在 Elasticsearch head 和 Kibana 上執行查詢
LOGGER.info("\n{}", searchRequestBuilder);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
long totalHits = searchResponse.getHits().totalHits;
long length = searchResponse.getHits().getHits().length;
LOGGER.info("共查詢到[{}]條資料,處理資料條數[{}]", totalHits, length);
if (searchResponse.status().getStatus() == 200) {
// 解析物件
return setSearchResponse(searchResponse, highlightField);
}
return null;
}
/**
* 高亮結果集 特殊處理
* @param searchResponse 搜尋的結果集
* @param highlightField 高亮欄位
*/
private static List<Map<String, Object>> setSearchResponse(SearchResponse searchResponse, String highlightField) {
List<Map<String, Object>> sourceList = new ArrayList<Map<String, Object>>();
for (SearchHit searchHit : searchResponse.getHits().getHits()) {
Map<String, Object> resultMap = getResultMap(searchHit, highlightField);
sourceList.add(resultMap);
}
return sourceList;
}
/**
* 獲取高亮結果集
* @param: [hit, highlightField]
* @return: java.util.Map<java.lang.String,java.lang.Object>
* @auther: LHL
* @date: 2018/10/17 10:54
*/
private static Map<String, Object> getResultMap(SearchHit hit,String highlightField){
hit.getSourceAsMap().put("id", hit.getId());
if (StringUtils.isNotEmpty(highlightField)) {
Text[] text = hit.getHighlightFields().get(highlightField).getFragments();
String hightStr = null;
if (text != null) {
for (Text str : text) {
hightStr = str.string();
}
//遍歷 高亮結果集,覆蓋 正常結果集
hit.getSourceAsMap().put(highlightField, hightStr);
}
}
return hit.getSourceAsMap();
}
/**
* ik分詞測試
* @param: []
* @return: void
* @auther: LHL
* @date: 2018/10/15 11:31
*/
public static String ik() {
StringBuilder stringBuilder = new StringBuilder();
AnalyzeRequest analyzeRequest = new AnalyzeRequest("entity")
.text("書名")
.analyzer("standard"); //ik_smart ik_max_word standard
List<AnalyzeResponse.AnalyzeToken> tokens = client.admin().indices()
.analyze(analyzeRequest)
.actionGet()
.getTokens();
for (AnalyzeResponse.AnalyzeToken token : tokens) {
stringBuilder.append(token.getTerm() + "\\r\\n");
}
return stringBuilder.toString();
}
}
Controller 測試類
/**
* @Auther: LHL
* @Date: 2018/10/10 10:12
* @Description:
*/
@RestController
@RequestMapping("/test")
public class TestController {
@GetMapping("/hello")
public String helloTest() {
return "Holle World";
}
/**
* 測試索引
*/
private String indexName = "entity";
/**
* 型別
*/
private String esType = "book";
/**
* http://127.0.0.1:8080/test/createIndex
* 建立索引
* @param request
* @param response
* @return
*/
@PutMapping("/createIndex")
public String createIndex(HttpServletRequest request, HttpServletResponse response) {
if (!ElasticsearchUtil.isIndexExist(indexName)) {
ElasticsearchUtil.createIndex(indexName);
} else {
return "索引已經存在";
}
return "索引建立成功";
}
/**
* 建立索引以及型別,並給索引某些欄位指定iK分詞,以後向該索引中查詢時,就會用ik分詞。
* @param: [request, response]
* @return: java.lang.String
* @auther: LHL
* @date: 2018/10/15 17:11
*/
@PutMapping("/createIndexTypeMapping")
public String createIndexTypeMapping(HttpServletRequest request, HttpServletResponse response) {
if (!ElasticsearchUtil.isIndexExist(indexName)) {
ElasticsearchUtil.createIndex(indexName,esType);
} else {
return "索引已經存在";
}
return "索引建立成功";
}
@DeleteMapping("/deleteIndex")
public String deleteIndex(String indexName, HttpServletRequest request, HttpServletResponse response) {
boolean b = ElasticsearchUtil.deleteIndex(indexName);
if (!b){
return "失敗";
}
return "索引刪除成功";
}
/**
* ik分詞測試
* @param: []
* @return: void
* @auther: LHL
* @date: 2018/10/11 15:13
*/
@GetMapping("getik")
public String ikMapping(){
String ik = ElasticsearchUtil.ik();
return ik;
}
/**
* 插入記錄
* @return
*/
@PostMapping("/insertJson")
public String insertJson() {
JSONObject jsonOject = new JSONObject();
jsonOject.put("id", DateFormatUtils.format(new Date(),"yyyyMMddhhmmss"));
jsonOject.put("age", 25);
jsonOject.put("name", "j-" + new Random(100).nextInt());
jsonOject.put("date", new Date());
String id = ElasticsearchUtil.addData(jsonOject, indexName, esType, jsonOject.getString("id"));
return id;
}
/**
* 插入記錄
* @return
*/
@PostMapping("/insertModel")
public String insertModel() {
EsModel esModel = new EsModel();
esModel.setId(DateFormatUtils.format(new Date(),"yyyyMMddhhmmss"));
esModel.setName("m-" + new Random(100).nextInt());
esModel.setAge(30);
esModel.setDate(new Date());
JSONObject jsonObject = (JSONObject) JSONObject.toJSON(esModel);
String id = ElasticsearchUtil.addData(jsonObject, indexName, esType, jsonObject.getString("id"));
return id;
}
/**
* 刪除記錄
* @return
*/
@DeleteMapping("/delete")
public String delete(String id) {
if (StringUtils.isNotBlank(id)) {
ElasticsearchUtil.deleteDataById(indexName, esType, id);
return "刪除id=" + id;
} else {
return "id為空";
}
}
/**
* 更新資料
* @return
*/
@PutMapping("/update/{id}")
public String update(@PathVariable("id") String id) {
if (StringUtils.isNotBlank(id)) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("id", id);
jsonObject.put("age", 31);
jsonObject.put("name", "修改");
jsonObject.put("date", new Date());
ElasticsearchUtil.updateDataById(jsonObject, indexName, esType, id);
return "id=" + id;
} else {
return "id為空";
}
}
/**
* 獲取資料
* http://127.0.0.1:8080/test/getData/id
* @param id
* @return
*/
@GetMapping("/getData/{id}")
public String getData(@PathVariable("id") String id) {
if (StringUtils.isNotBlank(id)) {
Map<String, Object> map = ElasticsearchUtil.searchDataById(indexName, esType, id, null);
return JSONObject.toJSONString(map);
} else {
return "id為空";
}
}
/**
* 查詢資料
* 模糊查詢
* @return
*/
@GetMapping("/queryMatchData")
public String queryMatchData() {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolean matchPhrase = false;
if (matchPhrase == Boolean.TRUE) {
boolQuery.must(QueryBuilders.matchPhraseQuery("name", "修改書名"));
} else {
boolQuery.must(QueryBuilders.matchQuery("name", "修改書名"));
}
List<Map<String, Object>> list = ElasticsearchUtil.searchListData(indexName, esType, boolQuery, 10, null, null, null);
return JSONObject.toJSONString(list);
}
/**
* 萬用字元查詢資料
* 萬用字元查詢 ?用來匹配1個任意字元,*用來匹配零個或者多個字元
* @return
*/
@GetMapping("/queryWildcardData")
public String queryWildcardData() {
QueryBuilder queryBuilder = QueryBuilders.wildcardQuery("name.keyword", "書名*2");
List<Map<String, Object>> list = ElasticsearchUtil.searchListData(indexName, esType, queryBuilder, 10, null, null, null);
return JSONObject.toJSONString(list);
}
/**
* 正則查詢
* @return
*/
@GetMapping("/queryRegexpData")
public String queryRegexpData() {
QueryBuilder queryBuilder = QueryBuilders.regexpQuery("name.keyword", "書名[0-9]{1,7}");
List<Map<String, Object>> list = ElasticsearchUtil.searchListData(indexName, esType, queryBuilder, 10, null, null, null);
return JSONObject.toJSONString(list);
}
/**
* 查詢數字範圍資料
* @return
*/
@GetMapping("/queryIntRangeData")
public String queryIntRangeData() {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must(QueryBuilders.rangeQuery("price").from(21.0).to(25.0)
.includeLower(true) // true 包含下界, false 不包含下界
.includeUpper(false)); // true 包含下界, false 不包含下界
List<Map<String, Object>> list = ElasticsearchUtil.searchListData(indexName, esType, boolQuery, 10, null, null, null);
return JSONObject.toJSONString(list);
}
/**
* 查詢日期範圍資料
* @return
*/
@GetMapping("/queryDateRangeData")
public String queryDateRangeData() {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must(QueryBuilders.rangeQuery("creatDate").from("2018-10-17T02:03:08.829Z").to("2018-10-17T02:03:09.727Z").includeLower(true).includeUpper(true));
List<Map<String, Object>> list = ElasticsearchUtil.searchListData(indexName, esType, boolQuery, 10, null, null, null);
return JSONObject.toJSONString(list);
}
/**
* 查詢分頁 高亮 排序
* @param startPage 第幾條記錄開始
* 從0開始
* @param pageSize 每頁大小
* @return
*/
@GetMapping("/queryPage")
public String queryPage(String startPage, String pageSize ,String context) {
if (StringUtils.isNotBlank(startPage) && StringUtils.isNotBlank(pageSize)) {
long start = System.currentTimeMillis();
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must(QueryBuilders.rangeQuery("creatDate").from("2018-10-17T02:03:08.829Z").to("2018-10-17T02:03:09.727Z").includeLower(true).includeUpper(true))
.filter(QueryBuilders.matchQuery("name",context));
// boolQuery.filter(QueryBuilders.matchQuery("name",context));
EsPage list = ElasticsearchUtil.searchDataPage(indexName, esType, Integer.parseInt(startPage), Integer.parseInt(pageSize), boolQuery, null, "price", "name");
long end = System.currentTimeMillis();
System.out.println((end-start)/1000+"s");
System.out.println((end-start)+"ms");
return JSONObject.toJSONString(list);
} else {
return "startPage或者pageSize缺失";
}
}
/**
* 深度排序 分頁 從當前頁為1001開始
* @param: [startPage, pageSize]
* @return: com.aqh.utils.EsPage
* @auther: LHL
* @date: 2018/10/17 13:45
*/
@GetMapping("/deepPageing")
public EsPage deepPageing(int startPage, int pageSize){
if (startPage<(10000/pageSize)){
System.out.println("startPage需要>="+(10000/pageSize));
}
long start = System.currentTimeMillis();
EsPage result = ElasticsearchUtil.deepPageing(indexName, esType, startPage, pageSize, "name");
long end = System.currentTimeMillis();
System.out.println((end-start)/1000+"s");
System.out.println((end-start)+"ms");
return result;
}
/**
* 批量新增
* @param: []
* @return: org.elasticsearch.rest.RestStatus
* @auther: LHL
* @date: 2018/10/15 14:10
*/
@PostMapping("addBulk")
public void addBulk(){
List<Book> bookList = Stream
.iterate(1, i -> i + 1)
.limit(100000L)
.parallel()
.map(integer -> new Book(String.valueOf(integer), "書名" + integer, "資訊" + integer, Double.valueOf(integer), new Date()))
.collect(Collectors.toList());
long start = System.currentTimeMillis();
ElasticsearchUtil.bulkAddDocument(indexName, esType, bookList);
long end = System.currentTimeMillis();
System.out.println((end-start)/1000+"s");
System.out.println((end-start)+"ms");
}
/**
* 批量新增
* @param: []
* @return: void
* @auther: LHL
* @date: 2018/10/16 13:55
*/
@PostMapping("/bulkProcessorAdd")
public void bulkProcessorAdd (){
List<Book> bookList = Stream
.iterate(1, i -> i + 1)
.limit(1000000L)
.parallel()
.map(integer -> new Book(String.valueOf(integer), "書名" + integer, "資訊" + integer, Double.valueOf(integer), new Date()))
.collect(Collectors.toList());
long start = System.currentTimeMillis();
ElasticsearchUtil.bulkProcessorAdd(indexName, esType, bookList);
long end = System.currentTimeMillis();
System.out.println((end-start)/1000+"s");
System.out.println((end-start)+"ms");
}
/**
* 批量刪除
* @param: []
* @return: org.elasticsearch.rest.RestStatus
* @auther: LHL
* @date: 2018/10/15 14:18
*/
@DeleteMapping("deleteBulk")
public RestStatus deleteBulk(){
List<String> idsList = Stream
.iterate(1, i -> i + 1)
.limit(100000L)
.parallel()
.map(integer -> String.valueOf(integer))
.collect(Collectors.toList());
long start = System.currentTimeMillis();
BulkResponse bulkItemResponses = ElasticsearchUtil.bulkDeleteDocument(indexName, esType, idsList);
if (bulkItemResponses.hasFailures()){
System.out.println(bulkItemResponses.buildFailureMessage());
}
long end = System.currentTimeMillis();
System.out.println((end-start)/1000+"s");
System.out.println((end-start)+"ms");
return bulkItemResponses.status();
}
/**
* 獲取所有
* @param: []
* @return: java.lang.String
* @auther: LHL
* @date: 2018/10/15 15:03
*/
@GetMapping("/getAll")
public List<String> getAll(){
return ElasticsearchUtil.searchAll(indexName);
}
}
Spring Boot 啟動報錯
- 錯誤資訊
[ main] o.a.catalina.core.AprLifecycleListener : An incompatible version [1.2.7] of the APR based Apache Tomcat Native library is installed, while Tomcat requires version [1.2.14]
- 在 tomcat倉庫上下載tomcat-native-1.2.14-ocsp-win32-bin.zip
- 解壓zip檔案
- 複製
- 32 位 tomcat-native-1.2.14-win32-bin\bin下的【tcnative-1.dll 、tcnative-1-src.pdb 】複製放在C:/Windows/system32/下
- 64 位 tomcat-native-1.2.14-win32-bin\bin\x64下的【tcnative-1.dll 、tcnative-1-src.pdb 】複製放在C:/Windows/system32/下
GitHub程式碼地址 https://github.com/412Aomr/ElasticSearchDemo