1. 程式人生 > >Python資料預處理之---統計學的t檢驗,卡方檢驗以及均值,中位數等

Python資料預處理之---統計學的t檢驗,卡方檢驗以及均值,中位數等

Python資料預處理過程:利用統計學對資料進行檢驗,對連續屬性檢驗正態分佈,針對正態分佈屬性繼續使用t檢驗檢驗方差齊次性,針對非正態分佈使用Mann-Whitney檢驗。針對分類變數進行卡方檢驗(涉及三種卡方的檢驗:Pearson卡方,校準卡方,精準卡方)等。

具體的程式碼中註釋很詳細:

def output_statistics_info_self(data_df, category_feats, continue_feats, target,logger,nan_value=-1,info_more=True):
    '''
    Function:輸出最全的資料的描述資訊
    Parameters:
        data_df:DataFrame the source data
        category_feats:list
        continue_feats:list
        target:the classification target,as the y
        nan_value:default -1,represent the nan value need to be filled
        info_more:default True,output the whole info;False:output the part info for client and paper
    Return:
        DataFrame
    '''
    sample_size = data_df.shape[0]
    # 判斷target是二分類還是多分類(三分類及其以上)
    target_values=list(data_df[target].value_counts().index)
    logger.info('%s取值%s'%(target,target_values))
    task_type=len(target_values)
    total_describe_list=[]

    # 警告:做單因素分析之前必須要異常值檢查,排出非數字的異常值,否則報錯
    # data_df[continue_feats]=data_df[continue_feats].applymap(float)
    # data_df[category_feats]=data_df[category_feats].applymap(float)

    # 針對二分類任務
    if task_type==2:
        # 針對連續屬性
        # 先檢驗連續屬性是否是正態分佈其次再檢驗是否方差齊次,才能使用獨立t檢驗
        for col in continue_feats:
            logger.info('------%s--------'%col)
            col_series=data_df[data_df[col]!=nan_value][col]
            col_count=col_series.count()

            vals=[col,'連續',col_count]
            # 檢驗連續屬性是否符合正態分佈
            p_value=norm_distribution_test(sample_size,col_series)
            # 如果p_value>0.05 正態分佈,使用獨立t檢驗,檢驗連續屬性在兩組樣本方差相同的情況下它們的均值是否相同
            condition0=(data_df[target] == target_values[0]) & (data_df[col] != nan_value)
            condition1=(data_df[target] == target_values[1]) & (data_df[col] != nan_value)

            if p_value > 0.05:
                logger.info('%s符合正態分佈'%col)
                # 使用levene檢驗方差齊次
                stat,pval=levene(data_df[condition0][col].values,data_df[condition1][col].values)
                if pval>0.05:
                    # p值大於0.05,認為兩總體具有方差齊性。
                    t_stat, pvalue = ttest_ind(data_df[condition0][col].values,data_df[condition1][col].values,
                                equal_var=True)
                else:
                    # 兩總體方差不齊
                    t_stat, pvalue = ttest_ind(data_df[condition0][col].values,data_df[condition1][col].values,
                                equal_var=False)
                pvalue=round(pvalue,3)
                if pvalue==0:
                    pvalue='<0.001'
                vals.extend(['是_%s'%p_value,'ttest',t_stat,pvalue,''])

            #非正態分佈的二分類使用Mann-Whitney U test檢驗
            else:
                logger.info('%s不符合正態分佈'%col)
                m_stat, pvalue = mannwhitneyu(
                                    data_df[condition0][col].values,data_df[condition1][col].values,
                                    use_continuity=False,alternative='two-sided'
                                    )
                pvalue=round(pvalue,3)
                if pvalue==0:
                    pvalue='<0.001'
                vals.extend(['否_%s'%p_value,'Mann',m_stat,pvalue,''])


            # 對連續變數輸出均值±標準差
            # 並在括號中附上IQR值(75%分位點-25%分位點的值),檢視連續屬性中間部分是否集中或者分散
            target0_col_iqr=round(iqr(x=data_df[condition0][col].values,nan_policy='omit'),3)
            target_0_mean_std="%.2f±%.2f (%s)" %(data_df[condition0][col].mean(),
                        data_df[condition0][col].std(),target0_col_iqr)

            target1_col_iqr=round(iqr(x=data_df[condition1][col].values,nan_policy='omit'),3)
            target_1_mean_std="%.2f±%.2f (%s)" %(data_df[condition1][col].mean(),
                        data_df[condition1][col].std(),target1_col_iqr)

            vals.extend([target_0_mean_std,target_1_mean_std])
            total_describe_list.append(vals)

        # 針對分類變數使用"卡方檢驗"
        for col in category_feats:
            logger.info('#######%s######'%col)
            col_series=data_df[data_df[col]!=nan_value][col]
            col_count=col_series.count()
            col_count_ser=col_series.value_counts()
            vals=[col,'分類',col_count,'','卡方']

            data_kf = data_df[data_df[col]!=nan_value][[col,target]]
            cross_table = data_kf.groupby([col, target])[target].count().unstack()
            cross_table.fillna(0,inplace=True)
            logger.info(cross_table)

            if len(col_count_ser)==2:
                stat,pvalue=foursquare_chi_test(cross_table,col_count)
                vals.extend([stat,pvalue,''])
            else:
                stat,pvalue,iswarning=not_foursquare_chi_test(cross_table)
                vals.extend([stat,pvalue,iswarning])
            vals.extend(['',''])
            total_describe_list.append(vals)

            # 針對分類變數輸出各個類別的target比例
            for col_kind in col_count_ser.index:
                logger.info('col_kind:%s'%col_kind)
                col_kind_percent=['%s_%s'%(col,col_kind),'','','','','','','']
                for v in target_values:
                    col_kind_percent.append("%d(%.1f%%)" %
                                        (data_df[((data_df[col] == col_kind) & (data_df[target] == v))].shape[0],
                                        data_df[((data_df[col] == col_kind) & (data_df[target] == v))].shape[0] /
                                        data_df[((data_df[col]!=nan_value)&(data_df[target] == v))].shape[0]*100))
                total_describe_list.append(col_kind_percent)

    # 針對三分類或者多分類
    elif task_type>=3:
         # "先判斷是否方差齊次,才能使用獨立t檢驗"
        for col in continue_feats:
            logger.info('----!!!--%s--------'%col)
            col_series=data_df[data_df[col]!=nan_value][col]
            col_count=col_series.count()
            vals=[col,'連續',col_count]
            p_value=norm_distribution_test(sample_size,col_series)
            if p_value > 0.05:#正態分佈
                # 1-way ANOVA:原假設:兩個或多個group擁有相同的均值
                # 使用的前提條件:1、樣本獨立,2、每個樣本都來源於正態分佈群體,3、每個group方差齊次(方差相同)
                # 以上條件不滿足時:使用Kruskal-Wallis H-test
                df = data_df[[col, target]]
                # 排出填補的那些值
                df = df[df[col] != nan_value]
                stat, pvalue = f_oneway(
                                 df[df[target] == target_values[0]][col].values,
                                 df[df[target] == target_values[1]][col].values,
                                 df[df[target] == target_values[2]][col].values
                                 )
                pvalue=round(pvalue,3)
                if pvalue==0:
                    pvalue='<0.001'
                vals.extend(['是_%s'%p_value,'anova',round(stat,3),pvalue])

            else:
                # 非正態分佈
                # Compute the Kruskal-Wallis H-test for independent samples
                df = data_df[[col, target]]
                df = data_df[data_df[col] != nan_value]
                stat, pvalue = kruskalwallis(df[df[target] == target_values[0]][col].values,
                                            df[df[target] == target_values[1]][col].values,
                                            df[df[target] == target_values[2]][col].values)
                pvalue=round(pvalue,3)
                if pvalue==0:
                    pvalue='<0.001'
                vals.extend(['否_%s'%p_value,'kruskal',round(stat,3),pvalue])


            # 對連續變數輸出均值±標準差,以及IQR值
            condition0=(data_df[target] == target_values[0]) & (data_df[col] != nan_value)
            target0_col_iqr=round(iqr(x=data_df[condition0][col].values,nan_policy='omit'),3)
            target_0_mean_std="%.2f±%.2f (%s)" %(data_df[condition0][col].mean(),
                        data_df[condition0][col].std(),target0_col_iqr)

            condition1=(data_df[target] == target_values[1]) & (data_df[col] != nan_value)
            target1_col_iqr=round(iqr(x=data_df[condition1][col].values,nan_policy='omit'),3)
            target_1_mean_std="%.2f±%.2f (%s)" %(data_df[condition1][col].mean(),
                        data_df[condition1][col].std(),target1_col_iqr)

            condition2=(data_df[target] == target_values[2]) & (data_df[col] != nan_value)
            target2_col_iqr=round( iqr(x=data_df[condition2][col].values,nan_policy='omit'),3)
            target_2_mean_std="%.2f±%.2f (%s)" %(data_df[condition2][col].mean(),
                        data_df[condition2][col].std(),target2_col_iqr)

            vals.extend([target_0_mean_std,target_1_mean_std,target_2_mean_std])
            total_describe_list.append(vals)

        for col in category_feats:
            logger.info('#######%s######'%col)
            col_series=data_df[data_df[col]!=nan_value][col]
            col_count=col_series.count()
            vals=[col,'分類',col_count,'','卡方']
            data_kf = data_df[data_df[col] != nan_value][[col, target]]
            cross_table = data_kf.groupby([col, target])[target].count().unstack()
            cross_table.fillna(0,inplace=True)
            logger.info(cross_table)

            stat,pvalue,iswarning=not_foursquare_chi_test(cross_table)
            vals.extend([stat,pvalue,iswarning])
            vals.extend(['','',''])
            total_describe_list.append(vals)

            # 對類別屬性輸出各類別的比例
            for col_kind in col_series.index:
                logger.info('col_kind:%s'%col_kind)
                if col_kind!=nan_value:
                    col_kind_percent=['%s_%s'%(col,col_kind),'','','','','','']
                    for v in target_values:
                        col_kind_percent.append("%d(%.2f)" %
                                        (data_df[((data_df[col] == col_kind) & (data_df[target] == v))].shape[0],
                                        data_df[((data_df[col] == col_kind) & (data_df[target] == v))].shape[0] /
                                        data_df[data_df[col] == col_kind].shape[0]))
                    total_describe_list.append(col_kind_percent)

    columns = ['屬性','屬性類別','有效值','是否正態分佈', '檢驗方法', '統計量', 'pvalue','卡方warning']
    for v in target_values:
        columns.append("target_{0}".format(v))
    total_describe_df = pd.DataFrame(total_describe_list, columns=columns)

    # 輸出額外的更多詳細資訊
    if info_more==True:
        # 新增缺失情況統計,缺失情況、最小值、最大值、均值、標準差
        info_add_list=[]
        for col in continue_feats+category_feats:
            col_series=data_df[data_df[col]!=nan_value][col]
            miss_count=data_df[data_df[col]==nan_value][col].count()
            if miss_count==0:
                _miss=''
            else:
                miss_ratio=round(miss_count/sample_size*100,2)
                _miss='%s(%.1f%%)'%(miss_count,miss_ratio)
            vals_info=[col,_miss]
            if col in continue_feats:
                vals_info.extend([
                        round(col_series.min(), 2),round(col_series.max(), 2),
                        round(col_series.mean(), 2),round(col_series.std(), 2),
                        round(iqr(x=col_series.values,nan_policy='omit'),2)
                    ]
                )
            elif col in category_feats:
                vals_info.extend(['','','','',''])
            info_add_list.append(vals_info)

        add_columns=['屬性','缺失情況','最小值','最大值','均值','標準差','IQR']
        info_add_df= pd.DataFrame(info_add_list, columns=add_columns)
        total_describe_df=total_describe_df.merge(info_add_df,on='屬性',how='outer')

    return total_describe_df


def foursquare_chi_test(cross_table,col_count):
    # 四格表卡方檢驗用於進行兩個率或兩個構成比的比較。
    # 要求樣本含量應大於40且每個格子中的理論頻數不應小於5。
    # 當樣本含量大於40但理論頻數有小於5的情況時卡方值需要校正,當樣本含量小於40時只能用確切概率法計算概率。
    # (1)所有的理論數T≥5並且總樣本量n≥40,用Pearson卡方進行檢驗。
    # (2)如果理論數T<5但T≥1,並且總樣本量n≥40,用連續性校正的卡方進行檢驗。
    # (3)如果有理論數T<1或n<40,則用Fisher’s檢驗。
    stat, pvalue, dof, expected = chi2_contingency(cross_table,correction=False)
    if col_count>=40 and expected.min()>=5:
        # Pearson卡方進行檢驗
        stat, pvalue, dof, expected = chi2_contingency(cross_table,correction=False)
    elif col_count>=40 and expected.min()<5 and expected.min()>=1:
        # 連續性校正的卡方進行檢驗
        stat, pvalue, dof, expected = chi2_contingency(cross_table,correction=True)
    else:
        # 用Fisher’s檢驗
        stat,pvalue=fisher_exact(cross_table)
    stat=round(stat,3)
    pvalue=round(pvalue,3)
    if pvalue==0:
        pvalue='<0.001'
    return stat,pvalue


def not_foursquare_chi_test(cross_table):
    # 針對非四方表格的卡方檢驗
    # (1)如果rxc表格中最小的理論數<1,報警告
    # (2)如果rxc表格中最小的理論數<5的個數佔比超過>1/5,報警告
    # (3)其他情況下,使用Pearson檢驗
    iswarning=''
    stat, pvalue, dof, expected = chi2_contingency(cross_table,correction=False)
    if expected.min()<1 or len([v for v in expected.reshape(1,-1)[0] if v<5])/\
                        (expected.shape[0]*expected.shape[1])>0.2:
        iswarning='warning'

    stat=round(stat,3)
    pvalue=round(pvalue,3)
    if pvalue==0:
        pvalue='<0.001'
    return stat,pvalue,iswarning