1. 程式人生 > >07-Hive高階查詢order by、group by

07-Hive高階查詢order by、group by

宣告:未經本人允許,不得轉載哦!

哈嘍,大家好。這兩天就要高考了,我原本是一名物理老師,這一屆初高中的學生帶完,估計就要開始找大資料崗位的工作了。目前掌握的是技能有java+linux++mysql+hadoop+hive+hbase,正在學習的是shell,計劃2016年接著要學習的是scala+spark。祝我好運吧。

今天我們一起來學習的是【Hive高階查詢group、order語法】。話不多說,咱們開始吧。

1 Hive的高階查詢操作有很多,主要有:

group by #按K來把資料進行分組
order by #全域性排序
join    #兩個表進行連線
distribute by
#把資料打散,按欄位把資料分到不同的檔案裡面 sort by #會把每個reducer上的資料進行排序,區域性排序 cluster by #cluster by 除了具有 distribute by 的功能外還兼具 sort by 的功能。 union all 把多個表進行組合起來形成一個新表

這些操作其底層實現的都是mapreduce.

2 幾個簡單得聚合操作

count計數
    count(*)     count(1)  count(col)
sum求和
    sum(可轉成數字的值)返回bigint
    sum(col)+cast(1 as bigint)
avg
求平均值 avg(可轉成數字的值)返回double distinct不同值個數 count(distinct col)

3 order by
這個函式的功能是:按照某些欄位排序
樣例是:

    select col1,other...
    from table
    where condition
    order by col1,col2[asc|desc]

關於order by值得注意的是:
order by 後面可以有多列進行排序,預設按字典排序;
order by為全域性排序;
order by需要reduce操作,且只有一個reduce,與配置有關。

4好的,接下來我們來實戰一下:建立一個M表。

hive> create table M(
    > col string,
    > col2 string
    > )
    > row format delimited fields terminated by '\t' 
    > lines terminated by '\n'
    > stored as textfile;
OK
Time taken: 0.283 seconds
hive> 

載入本地的資料進入M表中:

hive> load data local inpath '/usr/host/M' into table M;
Copying data from file:/usr/host/M
Copying file: file:/usr/host/M
Loading data to table default.m
OK
Time taken: 0.721 seconds
hive> 

接下來進行查詢:

hive> select * from M;
OK
A   1
B   5
B   2
C   3
Time taken: 0.288 seconds
hive> select * from M order by col desc,col2 asc;
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
Job running in-process (local Hadoop)
Hadoop job information for null: number of mappers: 1; number of reducers: 1
2016-06-06 01:28:20,284 null map = 0%,  reduce = 0%
2016-06-06 01:28:40,233 null map = 100%,  reduce = 0%, Cumulative CPU 1.18 sec
2016-06-06 01:28:43,409 null map = 100%,  reduce = 0%, Cumulative CPU 1.18 sec
2016-06-06 01:28:44,480 null map = 100%,  reduce = 0%, Cumulative CPU 1.18 sec
2016-06-06 01:28:45,560 null map = 100%,  reduce = 0%, Cumulative CPU 1.18 sec
2016-06-06 01:28:46,621 null map = 100%,  reduce = 0%, Cumulative CPU 1.18 sec
2016-06-06 01:28:47,676 null map = 100%,  reduce = 0%, Cumulative CPU 1.18 sec
2016-06-06 01:28:48,753 null map = 100%,  reduce = 0%, Cumulative CPU 1.18 sec
2016-06-06 01:28:49,831 null map = 100%,  reduce = 0%, Cumulative CPU 1.18 sec
2016-06-06 01:28:50,918 null map = 100%,  reduce = 0%, Cumulative CPU 1.18 sec
2016-06-06 01:28:51,987 null map = 100%,  reduce = 0%, Cumulative CPU 1.18 sec
2016-06-06 01:28:53,041 null map = 100%,  reduce = 0%, Cumulative CPU 1.18 sec
2016-06-06 01:28:54,137 null map = 100%,  reduce = 0%, Cumulative CPU 1.18 sec
2016-06-06 01:28:55,198 null map = 100%,  reduce = 0%, Cumulative CPU 1.18 sec
2016-06-06 01:28:56,242 null map = 100%,  reduce = 100%, Cumulative CPU 1.86 sec
2016-06-06 01:28:57,284 null map = 100%,  reduce = 100%, Cumulative CPU 1.86 sec
2016-06-06 01:28:58,326 null map = 100%,  reduce = 100%, Cumulative CPU 1.86 sec
MapReduce Total cumulative CPU time: 1 seconds 860 msec
Ended Job = job_1465200327080_0001
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
OK
C   3
B   2
B   5
A   1
Time taken: 80.999 seconds
hive> 

注意:(desc降序,asc升序)。很顯然col列是按照降序拍的,col2是按照升序排的,所以會出現

B   2
B   5

另外:聽說生產中一般都不會在hive裡面做order by,會很慢,而是在hive裡面統計結果後匯入一部分去關係型資料庫中,在關係型資料庫中做order by,那就會很快。我覺得確實是如此,因為匯入mysql中查詢會快很多。
這裡寫圖片描述
4 group by

這個函式的功能是:按照某些欄位的值進行分組,有相同值放到一起。
樣例:

select col1[,col2],count(1),sel_expr(聚合操作)
from table
where condition
group by col1[,col2]
[having]

注意:
select 後面非聚合列必須出現在gruopby中
除了普通列就是一些聚合操作
groupby後面也可以跟表示式,比如substr(col)

我們來實際實驗一下:

hive> desc M;      
OK
col string  
col2    string  
Time taken: 0.28 seconds
hive> select col from M group by col;
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of **reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapred.reduce.tasks=<number>**
Job running in-process (local Hadoop)
Hadoop job information for null: number of mappers: 1; number of reducers: 1
2016-06-06 02:33:50,712 null map = 0%,  reduce = 0%
2016-06-06 02:34:12,802 null map = 100%,  reduce = 0%, Cumulative CPU 1.53 sec
2016-06-06 02:34:13,911 null map = 100%,  reduce = 0%, Cumulative CPU 1.53 sec
2016-06-06 02:34:15,018 null map = 100%,  reduce = 0%, Cumulative CPU 1.53 sec
2016-06-06 02:34:16,099 null map = 100%,  reduce = 0%, Cumulative CPU 1.53 sec
2016-06-06 02:34:17,315 null map = 100%,  reduce = 0%, Cumulative CPU 1.53 sec
2016-06-06 02:34:18,452 null map = 100%,  reduce = 0%, Cumulative CPU 1.53 sec
2016-06-06 02:34:19,558 null map = 100%,  reduce = 0%, Cumulative CPU 1.53 sec
2016-06-06 02:34:20,612 null map = 100%,  reduce = 0%, Cumulative CPU 1.53 sec
2016-06-06 02:34:21,699 null map = 100%,  reduce = 0%, Cumulative CPU 1.53 sec
2016-06-06 02:34:22,804 null map = 100%,  reduce = 0%, Cumulative CPU 1.53 sec
2016-06-06 02:34:23,870 null map = 100%,  reduce = 0%, Cumulative CPU 1.53 sec
2016-06-06 02:34:24,937 null map = 100%,  reduce = 0%, Cumulative CPU 1.53 sec
2016-06-06 02:34:25,978 null map = 100%,  reduce = 0%, Cumulative CPU 1.53 sec
2016-06-06 02:34:27,075 null map = 100%,  reduce = 0%, Cumulative CPU 1.53 sec
2016-06-06 02:34:28,145 null map = 100%,  reduce = 100%, Cumulative CPU 2.33 sec
2016-06-06 02:34:29,255 null map = 100%,  reduce = 100%, Cumulative CPU 2.33 sec
MapReduce Total cumulative CPU time: 2 seconds 330 msec
Ended Job = job_1465200327080_0002
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
OK
A
B
C
Time taken: 63.381 seconds
hive> 

其實group by語句是可以去重的。

hive> select distinct col from M;
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapred.reduce.tasks=<number>
16/06/06 02:36:49 INFO Configuration.deprecation: mapred.job.name is 
Job running in-process (local Hadoop)
Hadoop job information for null: number of mappers: 1; number of reducers: 1
2016-06-06 02:37:00,159 null map = 0%,  reduce = 0%
2016-06-06 02:37:18,943 null map = 100%,  reduce = 0%, Cumulative CPU 1.1 sec
2016-06-06 02:37:20,203 null map = 100%,  reduce = 0%, Cumulative CPU 1.1 sec
2016-06-06 02:37:21,344 null map = 100%,  reduce = 0%, Cumulative CPU 1.1 sec
2016-06-06 02:37:23,459 null map = 100%,  reduce = 0%, Cumulative CPU 1.1 sec
2016-06-06 02:37:24,554 null map = 100%,  reduce = 0%, Cumulative CPU 1.1 sec
2016-06-06 02:37:25,589 null map = 100%,  reduce = 0%, Cumulative CPU 1.1 sec
2016-06-06 02:37:26,660 null map = 100%,  reduce = 0%, Cumulative CPU 1.1 sec
2016-06-06 02:37:27,735 null map = 100%,  reduce = 0%, Cumulative CPU 1.1 sec
2016-06-06 02:37:28,815 null map = 100%,  reduce = 0%, Cumulative CPU 1.1 sec
2016-06-06 02:37:29,906 null map = 100%,  reduce = 0%, Cumulative CPU 1.1 sec
2016-06-06 02:37:30,989 null map = 100%,  reduce = 0%, Cumulative CPU 1.1 sec
2016-06-06 02:37:32,054 null map = 100%,  reduce = 0%, Cumulative CPU 1.1 sec
2016-06-06 02:37:33,111 null map = 100%,  reduce = 100%, Cumulative CPU 1.96 sec
2016-06-06 02:37:34,223 null map = 100%,  reduce = 100%, Cumulative CPU 1.96 sec
MapReduce Total cumulative CPU time: 1 seconds 960 msec
Ended Job = job_1465200327080_0003
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
OK
A
B
C
Time taken: 55.682 seconds
select distinct col from M; //(跟上一句話是一樣的結果),可以用來去重
hive> select col from m group by col,col2;
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapred.reduce.tasks=<number>
Job running in-process (local Hadoop)
Hadoop job information for null: number of mappers: 1; number of reducers: 1
2016-06-06 02:38:48,837 null map = 0%,  reduce = 0%
2016-06-06 02:39:06,717 null map = 100%,  reduce = 0%, Cumulative CPU 1.24 sec
2016-06-06 02:39:08,045 null map = 100%,  reduce = 0%, Cumulative CPU 1.24 sec
2016-06-06 02:39:09,271 null map = 100%,  reduce = 0%, Cumulative CPU 1.24 sec
2016-06-06 02:39:10,428 null map = 100%,  reduce = 0%, Cumulative CPU 1.24 sec
2016-06-06 02:39:11,590 null map = 100%,  reduce = 0%, Cumulative CPU 1.24 sec
2016-06-06 02:39:12,696 null map = 100%,  reduce = 0%, Cumulative CPU 1.24 sec
2016-06-06 02:39:13,765 null map = 100%,  reduce = 0%, Cumulative CPU 1.24 sec
2016-06-06 02:39:14,879 null map = 100%,  reduce = 0%, Cumulative CPU 1.24 sec
2016-06-06 02:39:15,949 null map = 100%,  reduce = 0%, Cumulative CPU 1.24 sec
2016-06-06 02:39:17,099 null map = 100%,  reduce = 0%, Cumulative CPU 1.24 sec
2016-06-06 02:39:18,173 null map = 100%,  reduce = 0%, Cumulative CPU 1.24 sec
2016-06-06 02:39:19,281 null map = 100%,  reduce = 0%, Cumulative CPU 1.24 sec
2016-06-06 02:39:20,357 null map = 100%,  reduce = 0%, Cumulative CPU 1.24 sec
2016-06-06 02:39:21,420 null map = 100%,  reduce = 100%, Cumulative CPU 2.05 sec
2016-06-06 02:39:22,560 null map = 100%,  reduce = 100%, Cumulative CPU 2.05 sec
MapReduce Total cumulative CPU time: 2 seconds 50 msec
Ended Job = job_1465200327080_0004
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
OK
A
B
B
C
Time taken: 56.956 seconds
hive> 

這裡寫圖片描述
5 特性:
使用了reduce操作,受限於reduce數量,設定reduce引數mared.reduce.tasks
輸出檔案個數與reduce數相同,檔案大小與reduce處理的資料量有關

**問題:網路負載過重;
資料傾斜,優化引數**:hive.groupby.skewindata

6 什麼叫資料傾斜呢?
可以這麼簡單理解比如說:如果說某一個K值資料量過大,如果有10個reducer,其中9個數據量不大,很快執行完了,剩下一個資料量巨大,那麼這9個就會等這一個reducer執行完。換句話說就是處理某值的reduce灰常耗時。
解決思路:Hive的執行是分階段的,map處理資料量的差異取決於上一個stage的reduce輸出,所以如何將資料均勻的分配到各個reduce中,就是解決資料傾斜的根本所在。

set mapred.reduce.task=5;
select * from M order by col desc,col asc;

set hive.groupby.skewindata=true;//避免資料傾斜,total job 變為了2個,這個引數是有用的,啟用兩個job,避免資料傾斜

select country,count(1) as num from city1 group by country;

驗證一下:

hive> set hive.groupby.skewindata=true;
hive> select country,count(1) as num from city1 group by country;
Total MapReduce jobs = 2
Launching Job 1 out of 2
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapred.reduce.tasks=<number>
Job running in-process (local Hadoop)
Hadoop job information for null: number of mappers: 1; number of reducers: 1
2016-06-06 03:03:27,536 null map = 0%,  reduce = 0%
2016-06-06 03:03:45,874 null map = 100%,  reduce = 0%
2016-06-06 03:04:00,051 null map = 100%,  reduce = 100%, Cumulative CPU 2.72 sec
2016-06-06 03:04:01,156 null map = 100%,  reduce = 100%, Cumulative CPU 2.72 sec
2016-06-06 03:04:02,280 null map = 100%,  reduce = 100%, Cumulative CPU 2.72 sec
MapReduce Total cumulative CPU time: 2 seconds 720 msec
Ended Job = job_1465200327080_0005
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
Launching Job 2 out of 2
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapred.reduce.tasks=<number>
16/06/06 03:04:13 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
16/06/06 03:04:13 INFO Configuration.deprecation: mapred.system