1. 程式人生 > >web安全之機器學習入門——3.1 KNN/k近鄰算法

web安全之機器學習入門——3.1 KNN/k近鄰算法

數據收集 完成 整合 ada set acc eat true orm

目錄

sklearn.neighbors.NearestNeighbors

參數/方法

基礎用法

用於監督學習

檢測異常操作(一)

檢測異常操作(二)

檢測rootkit

檢測webshell


sklearn.neighbors.NearestNeighbors

參數:

技術分享圖片

方法:

技術分享圖片


基礎用法

print(__doc__)

from sklearn.neighbors import NearestNeighbors
import numpy as np

X = np.array([[-1, -1], [-2, -1], [-3
, -2], [1, 1], [2, 1], [3, 2]]) nbrs = NearestNeighbors(n_neighbors=2, algorithm=ball_tree).fit(X) distances, indices = nbrs.kneighbors(X) #indeices鄰節點,distances鄰節點距離 print(鄰節點距離\n,distances) print(鄰節點\n, indices) print(nbrs.kneighbors_graph(X).toarray())
鄰節點距離
 [[ 0.          1
. ] [ 0. 1. ] [ 0. 1.41421356] [ 0. 1. ] [ 0. 1. ] [ 0. 1.41421356]] 鄰節點 [[0 1] [1 0] [2 1] [3 4] [4 3] [5 4]] 可視化結果 [[ 1. 1. 0. 0. 0. 0.] [ 1. 1. 0. 0. 0. 0.] [ 0. 1. 1. 0. 0. 0.] [ 0. 0. 0. 1. 1. 0
.] [ 0. 0. 0. 1. 1. 0.] [ 0. 0. 0. 0. 1. 1.]]


用於監督學習

sklearn.neighbors.KNeighborsClassifier

使用很簡單,三步:1)創建KNeighborsClassifier對象,2)調用fit函數,3)調用predict/predict_proba函數進行預測。

#predict返回概率最大的預測值
#predict_proba返回的是一個n行k列的數組, 第i行j列上的數值是模型預測第i個預測樣本為某個標簽的概率,並且每一行的概率和為1。
from sklearn.neighbors import KNeighborsClassifier

x=[[0],[1],[2],[3]]
y=[0,0,1,1]
neigh = KNeighborsClassifier(n_neighbors=3)
neigh.fit(x,y)
print(neigh.predict([[1.1]]))
print(neigh.predict_proba([[0.9]]))
[0]
[[ 0.66666667  0.33333333]]


檢測異常操作(一)

# -*- coding:utf-8 -*-
import numpy as np

import nltkfrom nltk.probability import FreqDistfrom sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import classification_report
from sklearn import metrics

#測試樣本數
N=100

"""
數據收集和數據清洗(清洗換行符\n)
從scholaon數據集的user3文件導入信息;一百條命令組成一個列表x[],最終組成二維列表cmd_set[[]];
返回二維列表,最頻繁50條命令,和最不頻繁50條命令
"""
def load_user_cmd(filename):
    cmd_set=[]
    dist_max=[]
    dist_min=[]
    dist=[]
    with open(filename) as f:
        i=0
        x=[]
        for line in f:
            line=line.strip(\n)
            x.append(line)
            dist.append(line)
            i+=1
            if i == 100:
                cmd_set.append(x)
                x=[]
                i=0

    fdist = list(FreqDist(dist).keys())
    dist_max=set(fdist[0:50])
    dist_min = set(fdist[-50:])
    return cmd_set,dist_max,dist_min

"""
特征化
將load_user_cmd函數的輸出作為輸入;
以100個命令為統計單元,作為一個操作序列,去重後的操作命令個數作為特征;(函數FreqDist會統計每個單詞的頻度,重新整合成一個+1維度的新的列表)
KNN只能以標量作為輸入參數,所以需要將f2和f3表量化,最簡單的方式就是和統計的最頻繁使用的前50個命令以及最不頻繁使用的前50個命令計算重合程度。
返回一個150×3的列表;3裏的0:不重復單詞的個數,1:最頻繁單詞重合程度<=min{10,50},2最不頻繁單詞重合程度<=min{10,50}
"""
def get_user_cmd_feature(user_cmd_set,dist_max,dist_min):
    user_cmd_feature=[]
    for cmd_block in user_cmd_set:
        f1=len(set(cmd_block))
        fdist = list(FreqDist(cmd_block).keys())
        f2=fdist[0:10]
        f3=fdist[-10:]
        f2 = len(set(f2) & set(dist_max))
        f3=len(set(f3) & set(dist_min))
        x=[f1,f2,f3]
        user_cmd_feature.append(x)
    return user_cmd_feature

"""
訓練模型
導入標識文件,100×50,正常命令為0,異常命令為1;
從標識文件中加載針對操作序列正確/異常的標識
返回一個容量為150的list 0/1數值,(只要這一行有1)
"""
def get_label(filename,index=0):
    x=[]
    with open(filename) as f:
        for line in f:
            line=line.strip(\n)#清空每行的\n
            x.append(int(line.split()[index]))#????
    return x

if __name__ == __main__:
    user_cmd_set,user_cmd_dist_max,user_cmd_dist_min=load_user_cmd("../data/MasqueradeDat/User3")
    user_cmd_feature=get_user_cmd_feature(user_cmd_set,user_cmd_dist_max,user_cmd_dist_min)
    labels=get_label("../data/MasqueradeDat/label.txt",2)
    y=[0]*50+labels#y長度150,labels長度100

    x_train=user_cmd_feature[0:N]
    y_train=y[0:N]

    x_test=user_cmd_feature[N:150]
    y_test=y[N:150]

    neigh = KNeighborsClassifier(n_neighbors=3)
    neigh.fit(x_train, y_train)
    y_predict=neigh.predict(x_test)

    score=np.mean(y_test==y_predict)*100

    #print(y)
    #print(y_train)
    print(y_test\n,y_test)
    print(y_predict\n,y_predict)
    print(score\n,score)

    print(classification_report(y_test, y_predict)\n,classification_report(y_test, y_predict))

    print(metrics.confusion_matrix(y_test, y_predict)\n,metrics.confusion_matrix(y_test, y_predict))
y_test
 [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
y_predict
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
score
 100.0
classification_report(y_test, y_predict)
              precision    recall  f1-score   support

          0       1.00      1.00      1.00        30

avg / total       1.00      1.00      1.00        30

metrics.confusion_matrix(y_test, y_predict)
 [[30]]


檢測異常操作(二)

上例只比較了最頻繁和最不頻繁的操作命令,這次我們全量比較。

# -*- coding:utf-8 -*-

import sys
import urllib
#import urlparse
import re
#from hmmlearn import hmm
import numpy as np
from sklearn.externals import joblib
#import HTMLParser
import nltk
import csv
import matplotlib.pyplot as plt
from nltk.probability import FreqDist
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.neighbors import KNeighborsClassifier
from sklearn import cross_validation

#測試樣本數
N=90

"""
數據搜集和數據清洗(清洗換行符)
返回cmd_list:150×100的二維列表和fdist:去重的字符串集
"""
def load_user_cmd_new(filename):
    cmd_list=[]
    dist=[]
    with open(filename) as f:
        i=0
        x=[]
        for line in f:
            line=line.strip(\n)
            x.append(line)
            dist.append(line)
            i+=1
            if i == 100:
                cmd_list.append(x)
                x=[]
                i=0

    fdist = FreqDist(dist).keys()
    return cmd_list,fdist

"""
特征化
使用詞集將操作命令向量化
"""
def get_user_cmd_feature_new(user_cmd_list,dist):
    user_cmd_feature=[]

    for cmd_list in user_cmd_list:
        v=[0]*len(dist)
        for i in range(0,len(dist)):
            if list(dist)[i] in list(cmd_list):
                v[i]+=1
        user_cmd_feature.append(v)
    return user_cmd_feature

def get_label(filename,index=0):
    x=[]
    with open(filename) as f:
        for line in f:
            line=line.strip(\n)
            x.append( int(line.split()[index]))
    return x

if __name__ == __main__:
    """
    訓練模型
    """
    user_cmd_list,dist=load_user_cmd_new("../data/MasqueradeDat/User3")
    print( "len(dist):%d" % len(dist))
    print( "dist:%s" % dist)
    user_cmd_feature=get_user_cmd_feature_new(user_cmd_list,dist)
    labels=get_label("../data/MasqueradeDat/label.txt",2)
    y=[0]*50+labels

    x_train=user_cmd_feature[0:N]
    y_train=y[0:N]

    x_test=user_cmd_feature[N:150]
    y_test=y[N:150]

    neigh = KNeighborsClassifier(n_neighbors=3)
    neigh.fit(x_train, y_train)
    y_predict=neigh.predict(x_test)

    """
    交叉驗證效果,10次隨機取樣和驗證
    """
    print(cross_validation.cross_val_score(neigh,user_cmd_feature, y, n_jobs=-1,cv=10))
len(dist):107
dist:dict_keys([Xsession, sed, grep, wc, date, uname, true, xsetroot, cpp, sh, xrdb, cat, stty, basename, ksh, tail, xmodmap, ls, hostname, netstat, netscape, xterm, sccs, get, diff, more, .java_wr, expr, dirname, egrep, java, make, mailx, pq, bdiff, delta, ex, rm, javac, mkdir, man, od, ln, cfe, ugen, as1, driver, ld_, readacct, touch, bc, sendmail, seecalls, FvwmPage, GoodStuf, fvwm, xdm, chmod, id, nawk, getopt, lp, find, FIFO, generic, pr, postprin, file, post, awk, getpgrp, LOCK, gethost, download, tcpostio, UNLOCK, rmdir, tcppost, cpio, xargs, gzip, jar, nslookup, rlogin, xhost, admin, runnit, gs, ppost, hpost, tracerou, unpack, col, telnet, ptelnet, tset, logname, matlab, launchef, MediaMai, a.out, dbx, dbxpcs, mimencod, sim301bS, sim301bK, ps])
[ 1.          1.          0.93333333  1.          1.          1.          1.
  1.          0.93333333  0.92857143]


檢測Rootkit(三)

Rootkit是一種特殊的惡意軟件,它的功能是在安裝目標上隱藏自身及指定的文件,進程和網絡鏈接等信息,比較常見的是Rootkit,一般都和木馬,後門等其他惡意程序結合使用。

基於KDD 99的樣本數據,嘗試使用KNN算法識別基於telnet連接的Rootkit行為,檢測流程如下所示。

技術分享圖片

# -*- coding:utf-8 -*-

from sklearn import cross_validation
from sklearn.neighbors import KNeighborsClassifier

"""
數據集已經完成了大部分的清洗工作;
41個特征描述
加載KDD 99數據集中的數據
"""
def load_kdd99(filename):
    x=[]
    with open(filename) as f:
        for line in f:
            line=line.strip(\n)
            line=line.split(,)
            x.append(line)
    return x

"""
特征化
"""
def get_rootkit2andNormal(x):
    v=[]
    w=[]
    y=[]
    for x1 in x:
        if ( x1[41] in [rootkit.,normal.] ) and ( x1[2] == telnet ):
            if x1[41] == rootkit.:
                y.append(1)
            else:
                y.append(0)
            """
            挑選與Rootkit相關的特征作為樣本特征
            """
            x1 = x1[9:21]
            v.append(x1)
    for x1 in v :
        v1=[]
        for x2 in x1:
            v1.append(float(x2))
        w.append(v1)
    return w,y

if __name__ == __main__:
    v=load_kdd99("../data/kddcup99/corrected")
    x,y=get_rootkit2andNormal(v)
    """
    訓練樣本
    """
    clf = KNeighborsClassifier(n_neighbors=3)
    """
    效果驗證
    """
    print(cross_validation.cross_val_score(clf, x, y, n_jobs=-1, cv=10))
[ 0.9         0.9         1.          1.          1.          0.77777778
  1.          1.          1.          1.        ]


檢測Webshell(四)

使用ADFA-LD數據集中webshell相關數據,ADFA-LD數據集中記錄下了系統調用序列(比如A,B,C),然後使用數字標識每一個系統調用(1,2,3),這時(1,2,3)就轉換成了一個序列向量。

以下是系統調用的順序抽象成序列向量的過程

技術分享圖片

# -*- coding:utf-8 -*-

import re
import os
import numpy as np
from sklearn.feature_extraction.text import CountVectorizer
from sklearn import cross_validation
from sklearn.neighbors import KNeighborsClassifier


def load_one_flle(filename):
    x=[]
    with open(filename) as f:
        line=f.readline()
        line=line.strip(\n)
    return line

#加載ADFA-LD中的正常樣本數據
def load_adfa_training_files(rootdir):
    x=[]
    y=[]
    list = os.listdir(rootdir)
    for i in range(0, len(list)):
        path = os.path.join(rootdir, list[i])
        if os.path.isfile(path):
            x.append(load_one_flle(path))
            y.append(0)
    return x,y

#定義遍歷目錄下文件的函數
def dirlist(path, allfile):
    filelist = os.listdir(path)

    for filename in filelist:
        filepath = os.path.join(path, filename)
        if os.path.isdir(filepath):
            dirlist(filepath, allfile)
        else:
            allfile.append(filepath)
    return allfile

#從攻擊數據集中篩選和webshell相關的數據
def load_adfa_webshell_files(rootdir):
    x=[]
    y=[]
    allfile=dirlist(rootdir,[])
    for file in allfile:
        #此處小心,前部分分隔符為/,web_shell_x後為        if re.match(r"../data/ADFA-LD/Attack_Data_Master/Web_Shell_\d+\\UAD-W*",file):
            x.append(load_one_flle(file))
            y.append(1)
    return x,y


if __name__ == __main__:

    x1,y1=load_adfa_training_files("../data/ADFA-LD/Training_Data_Master/")
    x2,y2=load_adfa_webshell_files("../data/ADFA-LD/Attack_Data_Master/")

    x=x1+x2
    y=y1+y2
    #print(x)
    vectorizer = CountVectorizer(min_df=1)
    x=vectorizer.fit_transform(x)
    x=x.toarray()
    #print(y)
    clf = KNeighborsClassifier(n_neighbors=3)
    scores=cross_validation.cross_val_score(clf, x, y, n_jobs=-1, cv=10)
    print(scores)
    print(np.mean(scores))
[ 0.95833333  0.94791667  0.97916667  0.96842105  0.96842105  0.84210526
  0.97894737  0.98947368  0.9787234   0.9787234 ]
0.959023189623

參考:

web安全之機器學習入門——2.機器學習概述

scikit-learn K近鄰法類庫使用小結

predict predict_proba區別的小例子

基於用戶行為動態變化的內部威脅檢測方法

web安全之機器學習入門——3.1 KNN/k近鄰算法