1. 程式人生 > >elasticsearch選擇器聚合,分組返回聚合結果

elasticsearch選擇器聚合,分組返回聚合結果

package tianjun.cmcc.es;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.log4j.Logger;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse
; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.script.Script; import org.elasticsearch.search
.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.range.date.DateRangeAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.range.date.InternalDateRange; import org.elasticsearch
.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude; import org.elasticsearch.search.aggregations.metrics.cardinality.InternalCardinality; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders; import org.elasticsearch.search.aggregations.pipeline.bucketselector.BucketSelectorPipelineAggregationBuilder; import org.elasticsearch.transport.client.PreBuiltTransportClient; public class Funnel { private String[] ips = null; private Integer[] ports = null; private String clusterName=null; private TransportClient client = null; private Logger logger =Logger.getLogger(this.getClass()); public String[] getIps() { return ips; } public void setIps(String[] ips) { this.ips = ips; } public Integer[] getPorts() { return ports; } public void setPorts(Integer[] ports) { this.ports = ports; } public String getClusterName() { return clusterName; } public void setClusterName(String clusterName) { this.clusterName = clusterName; } public Funnel(){ } public Funnel(String[] ips,Integer[] ports){ initClient(ips, ports,null); } public Funnel(String[] ips,Integer[] ports,String clusterName){ initClient(ips, ports,clusterName); } private void initClient(String[] ips,Integer[] ports,String clusterName) { if(ips.length!=ports.length){ try { throw new Exception("IPs or Ports has error!"); } catch (Exception e) { logger.error(e.getMessage()); } }else{ if(clusterName == null || "".equals(clusterName)){ clusterName = "my-application"; } Settings settings =Settings.builder() .put("cluster.name",clusterName) .put("client.transport.sniff",true) .build(); client = new PreBuiltTransportClient(settings); for(int i=0;i<ips.length;i++){ try { client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ips[i].trim()), ports[i])); } catch (UnknownHostException e) { logger.error(e.getMessage()); } } } } public void close(){ if(client!=null){ client.close(); } } /** * 漏斗,查出例如a-b-c路徑的人數:即訪問了c同一時段也先後訪問了a和b的使用者數 * @param index 索引名 * @param type 文件名 * @param fromDate 篩選開始時間 * @param toDate 篩選結束時間 * @param pages 漏斗訪問路徑 * @return */ public Object getFunnelByUser(String index,String type,String fromDate,String toDate, String... pages){ if(pages.length<=0){ try { throw new Exception("please input you path."); } catch (Exception e) { logger.error(e.getMessage()); } } Map<String,String> result_map = new ConcurrentHashMap<>(); StringBuffer script = new StringBuffer(); SearchRequestBuilder resp = client.prepareSearch(index).setTypes(type).setSize(0).setPostFilter(QueryBuilders.termsQuery("tag", pages)); DateRangeAggregationBuilder dr_aggrb = AggregationBuilders.dateRange("limit_by_time").field("local_time_string").addRange(fromDate, toDate); TermsAggregationBuilder terms_aggrb = AggregationBuilders.terms("group_by_userId").field("user_id"); for(Integer i = 0 ; i < pages.length ; i++){ if(i==0){ result_map.put("a"+i.toString(),"tag_"+i+">min_user_time"); Integer j = i; script.append("params."+"a"+i.toString()+"<" +"params."+"b"+(++j).toString()); FilterAggregationBuilder filter_aggrb = AggregationBuilders.filter("tag_"+i, QueryBuilders.termQuery("tag", pages[i])) .subAggregation(AggregationBuilders.min("min_user_time").field("local_time_string")); terms_aggrb.subAggregation(filter_aggrb); }else if(i==pages.length-1){ result_map.put("b"+i.toString(), "tag_"+i+">max_user_time"); FilterAggregationBuilder filter_aggrb = AggregationBuilders.filter("tag_"+i, QueryBuilders.termQuery("tag", pages[i])) .subAggregation(AggregationBuilders.max("max_user_time").field("local_time_string")); terms_aggrb.subAggregation(filter_aggrb); }else{ result_map.put("a"+i.toString(), "tag_"+i+">min_user_time"); result_map.put("b"+i.toString(), "tag_"+i+">max_user_time"); Integer j = i; script.append(" && " + "params."+"a"+i.toString()+"<" +"params."+"b"+(++j).toString()); FilterAggregationBuilder filter_aggrb = AggregationBuilders.filter("tag_"+i, QueryBuilders.termQuery("tag", pages[i])) .subAggregation(AggregationBuilders.min("min_user_time").field("local_time_string")) .subAggregation(AggregationBuilders.max("max_user_time").field("local_time_string")); terms_aggrb.subAggregation(filter_aggrb); } } BucketSelectorPipelineAggregationBuilder pipe_selector = PipelineAggregatorBuilders.bucketSelector("result_buckets", result_map,new Script(script.toString())); terms_aggrb.subAggregation(pipe_selector); Long count = 0L ; // SearchResponse response = resp.addAggregation(dr_aggrb.subAggregation(terms_aggrb.includeExclude(new IncludeExclude(1, 2)).size(100000))) //// SearchResponse response = resp.addAggregation(dr_aggrb.subAggregation(terms_aggrb)) // .execute().actionGet(); // InternalDateRange ida = response.getAggregations().get("limit_by_time"); // Terms terms = ida.getBuckets().get(0).getAggregations().get("group_by_userId"); // count += terms.getBuckets().size(); //----------------------------------------------------------------------------------------- resp.addAggregation(dr_aggrb.subAggregation(terms_aggrb)); for( int i =0;i<50;i++){ terms_aggrb.includeExclude(new IncludeExclude(i, 50)).size(100000); SearchResponse response = null; response = resp.execute().actionGet(); InternalDateRange ida = response.getAggregations().get("limit_by_time"); Terms terms = ida.getBuckets().get(0).getAggregations().get("group_by_userId"); count += terms.getBuckets().size(); } // SearchResponse response = client.prepareSearch(index) // .setTypes(type) // .setSize(0) // .setPostFilter(QueryBuilders.termsQuery("tag", "a","b","c")) // .addAggregation( // AggregationBuilders.dateRange("limit_by_time") // .field("time") // .addRange("2016-01-01", "2017-12-12") // .subAggregation( // AggregationBuilders // .terms("group_by_userId") // .field("userId") // .subAggregation( // AggregationBuilders.filter("tag_a", QueryBuilders.termQuery("tag", "a")) //// .subAggregation(AggregationBuilders.max("max_user_time").field("time")) // .subAggregation(AggregationBuilders.min("min_user_time").field("time")) // ) // .subAggregation( // AggregationBuilders.filter("tag_b", QueryBuilders.termQuery("tag", "b")) // .subAggregation(AggregationBuilders.max("max_user_time").field("time")) // .subAggregation(AggregationBuilders.min("min_user_time").field("time")) // ) // .subAggregation( // AggregationBuilders.filter("tag_c", QueryBuilders.termQuery("tag", "c")) // .subAggregation(AggregationBuilders.max("max_user_time").field("time")) //// .subAggregation(AggregationBuilders.min("min_user_time").field("time")) // ) // .subAggregation( // PipelineAggregatorBuilders.bucketSelector("result_buckets", // result_map, // new Script("params.e<params.f && params.g<params.h")) // ) // .includeExclude(new IncludeExclude(1, 2)) // .size(10000) // ) // ) // .execute().actionGet(); // // // // InternalDateRange ida = response.getAggregations().get("limit_by_time"); // Terms terms = ida.getBuckets().get(0).getAggregations().get("group_by_userId"); // @SuppressWarnings("unchecked") // List<Bucket> buckets =(List<Bucket>) terms.getBuckets(); // close(); return count; } /** * 單頁面去重 * @param index 索引名 * @param type 文件名 * @param fromDate 篩選開始時間 * @param toDate 篩選結束時間 * @param page 單頁面 */ public Object countDistinct(String index,String type,String fromDate,String toDate, String page){ InternalCardinality ica =null; SearchResponse response = client.prepareSearch(index) .setTypes(type) .setSize(10) .setPostFilter(QueryBuilders.termsQuery("tag", page)) .addAggregation( AggregationBuilders.dateRange("limit_by_time") .field("local_time_string") .addRange(fromDate, toDate) .subAggregation( AggregationBuilders.cardinality("count_distinct_userId").field("user_id") ) ) .execute().actionGet(); InternalDateRange ida = response.getAggregations().get("limit_by_time"); if(ida.getBuckets().size()>0){ ica = ida.getBuckets().get(0).getAggregations().get("count_distinct_userId"); } if(ica!=null){ return ica.getValue(); }else{ return 0; } } }
package tianjun.cmccc.es;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import tianjun.cmcc.es.Funnel;


public class FunnelTest {

//  private Funnel funnel = null;
    private static Funnel funnel = null;

    @BeforeClass
    public static void init(){
        String[] ips = {"172.23.27.89","172.23.27.125","172.23.27.126"};
        Integer[] ports = {9300,9300,9300};
        funnel = new Funnel(ips,ports);
    }

    @Test
    public void test1(){
//      String[] ips = {"172.23.27.89","172.23.27.125","172.23.27.126"};
//      Integer[] ports = {9300,9300,9300};
//      Funnel funnel = new Funnel(ips,ports);
        /**
         * 漏斗,查出例如a-b-c路徑的人數:即訪問了c同一時段也先後訪問了a和b的使用者數
         * @param index 索引名
         * @param type 文件名
         * @param fromDate 篩選開始時間 yyyy-MM-dd hh:mm:ss
         * @param toDate 篩選結束時間 yyyy-MM-dd hh:mm:ss
         * @param pages 漏斗訪問路徑 
         * @return
         */
        Object obj = funnel.getFunnelByUser("event","t_dws_trace_event_dtl","2017-09-01","2017-09-11","Album_Pic","Album_Camera","Album_Camera_OneKeyUpload");
        System.out.println(obj);
    }

    @Test
    public void test2(){
//      String[] ips = {"172.23.27.89","172.23.27.125","172.23.27.126"};
//      Integer[] ports = {9300,9300,9300};
//      Funnel funnel = new Funnel(ips,ports);
        /**
         * 漏斗,查出例如a-b-c路徑的人數:即訪問了c同一時段也先後訪問了b的使用者數
         * @param index 索引名
         * @param type 文件名
         * @param fromDate 篩選開始時間
         * @param toDate 篩選結束時間
         * @param pages 漏斗訪問路徑 
         * @return
         */
        Object obj = funnel.getFunnelByUser("event","t_dws_trace_event_dtl","2017-09-01","2017-09-01","Album_Pic","Album_Camera");
        System.out.println(obj);
    }

    /**
     * 單頁面去重
     * @param index 索引名
     * @param type 文件名
     * @param fromDate 篩選開始時間
     * @param toDate 篩選結束時間
     * @param page 單頁面
     */
    @Test
    public void test3(){
        Object obj = funnel.countDistinct("event","t_dws_trace_event_dtl","2017-09-10","2017-09-11","Album_Pic");
        System.out.println(obj);
    }


    @AfterClass
    public static void close(){
        funnel.close();
    }

}