1. 程式人生 > 其它 >java讀取大檔案內容到Elasticsearch分析(手把手教你java處理超大csv檔案)

java讀取大檔案內容到Elasticsearch分析(手把手教你java處理超大csv檔案)

package com.example.demo;

import com.alibaba.fastjson.JSON;
import com.example.demo.entity.Entity;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Objects;

/** * 讀取大檔案 * csv格式 * * @author lhb * @date 2021/11/11 * @since 1.0.0 */ @SpringBootTest public class ImportTest { @Autowired @Qualifier("client") private RestHighLevelClient restHighLevelClient; @Test void insert() {
     //csv檔案2G,63W條資料,十多個欄位 String filePath
= "D:\\file\\20211111.csv
"; LineIterator it = null; try { it = FileUtils.lineIterator(new File(filePath), "UTF-8"); } catch (IOException e) { e.printStackTrace(); } try { while (it.hasNext()) { String line = it.nextLine();
//System.out.println("line = " + line); //檔案是CSV檔案,CSV檔案中的每一列是用","隔開的,這樣就可以得到每一列的元素 String[] strArray = line.split(","); //有很長的空格,trim一下 String name = strArray[6].trim(); String code = strArray[8].trim(); String num = strArray[11].trim(); System.out.println(code + "==" + num); Entity entity = new Entity(); entity.setCode(code); if (Objects.equals("xxx", code)) { //跳過表頭 continue; } entity.setNum(Long.parseLong(num)); entity.setName(name); entity.setCreateTime(new Date()); String index = "index20211111"; singleInsert2(index, entity); } } finally { LineIterator.closeQuietly(it); } } @Test void batchInsert() { String filePath = "D:\\express\\20211111.csv"; LineIterator it = null; try { it = FileUtils.lineIterator(new File(filePath), "UTF-8"); } catch (IOException e) { e.printStackTrace(); } try { int i = 0; List<Entity> entities = new ArrayList<>(); while (it.hasNext()) { String line = it.nextLine(); //System.out.println("line = " + line); String[] strArray = line.split(","); String code = strArray[6].trim(); String name = strArray[8].trim(); String num = strArray[11].trim(); System.out.println(code + "==" + num); if (Objects.equals("xxx", code)) { //跳過表頭 continue; } Entity entity = new Entity(); entity.setCode(code); entity.setName(name); try { entity.setNum(Long.parseLong(num)); } catch (NumberFormatException e) { e.printStackTrace(); System.out.println("出錯的資料" + code + "==" + num); } entity.setCreateTime(new Date()); String index = "index20211111"; //批量插入 entities.add(entity); i++; if (i % 10000 == 0) { System.out.println("i = " + i); try { batchInsert2(index, entities); } catch (IOException e) { e.printStackTrace(); } //清空已經處理過的list entities.clear(); i = 0; } } } finally { LineIterator.closeQuietly(it); } } /** * 批量速度槓槓的 * * @param index * @param entities * @throws IOException */ public void batchInsert2(String index, List<Entity> entities) throws IOException { BulkRequest bulkRequest = new BulkRequest(index); System.out.println("entities.sz = " + entities.size()); for (Entity org : entities) { IndexRequest request = new IndexRequest(); request.source(JSON.toJSONString(org), XContentType.JSON); bulkRequest.add(request); } restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT); } /** * 資料量大,超級慢 * * @param index * @param entity */ public void singleInsert2(String index, Entity entity) { IndexRequest request = new IndexRequest(index); request.source(JSON.toJSONString(entity), XContentType.JSON); try { IndexResponse index1 = restHighLevelClient.index(request, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } } }
package com.example.demo.entity;

import lombok.Data;

import java.util.Date;

/**
 * @author lhb
 * @date 2021/11/11
 * @since 1.0.0
 */
@Data
public class Entity {

    /**
     * 編碼
     */
    private String code;
    /**
     * 名字
     */
    private String name;
    /**
     * 數量
     */
    private Long num;
    private Date createTime;

}

建立索引對映,然後插入資料:

PUT express_to_village20211104
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 1
  },
  "mappings": {
    "properties": {
      "code": {
        "type": "keyword"
      },
      "name": {
        "type": "keyword"
      },
      "num": {
        "type": "long"
      },
      "createTime": {
        "type": "date"
      }
    }
  }
}

開始分析資料:

GET index20211111/_count
{}

#返回63w資料

{
"count" : 630000,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
}
}

GET index20211111/_search
{
  "query": {
    "constant_score": {
      "filter": {
        "terms": {
          "code": [
            2222,
            1111,
            3333
          ]
        }
      }
    }
  },
  "size": 1,
  "track_total_hits": true,
  "aggs": {
    "per_code": {
      "terms": {
        "field": "code",
        "size": 200
      },
      "aggs": {
        "num": {
          "sum": {
            "field": "num"
          }
        }
      }
    },
    "sum_num": {
      "sum": {
        "field": "num"
      }
    }
  }
}