web安全之機器學習入門——3.1 KNN/k近鄰算法
阿新 • • 發佈:2019-04-08
數據收集 完成 整合 ada set acc eat true orm
目錄
sklearn.neighbors.NearestNeighbors
參數/方法
基礎用法
用於監督學習
檢測異常操作(一)
檢測異常操作(二)
檢測rootkit
檢測webshell
sklearn.neighbors.NearestNeighbors
參數:
方法:
基礎用法
print(__doc__) from sklearn.neighbors import NearestNeighbors import numpy as np X = np.array([[-1, -1], [-2, -1], [-3, -2], [1, 1], [2, 1], [3, 2]]) nbrs = NearestNeighbors(n_neighbors=2, algorithm=‘ball_tree‘).fit(X) distances, indices = nbrs.kneighbors(X) #indeices鄰節點,distances鄰節點距離 print(‘鄰節點距離\n‘,distances) print(‘鄰節點\n‘, indices) print(nbrs.kneighbors_graph(X).toarray())
鄰節點距離 [[ 0. 1. ] [ 0. 1. ] [ 0. 1.41421356] [ 0. 1. ] [ 0. 1. ] [ 0. 1.41421356]] 鄰節點 [[0 1] [1 0] [2 1] [3 4] [4 3] [5 4]] 可視化結果 [[ 1. 1. 0. 0. 0. 0.] [ 1. 1. 0. 0. 0. 0.] [ 0. 1. 1. 0. 0. 0.] [ 0. 0. 0. 1. 1. 0.] [ 0. 0. 0. 1. 1. 0.] [ 0. 0. 0. 0. 1. 1.]]
用於監督學習
sklearn.neighbors.KNeighborsClassifier
使用很簡單,三步:1)創建KNeighborsClassifier對象,2)調用fit函數,3)調用predict/predict_proba函數進行預測。
#predict返回概率最大的預測值
#predict_proba返回的是一個n行k列的數組, 第i行j列上的數值是模型預測第i個預測樣本為某個標簽的概率,並且每一行的概率和為1。
from sklearn.neighbors import KNeighborsClassifier x=[[0],[1],[2],[3]] y=[0,0,1,1] neigh = KNeighborsClassifier(n_neighbors=3) neigh.fit(x,y) print(neigh.predict([[1.1]])) print(neigh.predict_proba([[0.9]]))
[0] [[ 0.66666667 0.33333333]]
檢測異常操作(一)
# -*- coding:utf-8 -*- import numpy as np import nltkfrom nltk.probability import FreqDistfrom sklearn.neighbors import KNeighborsClassifier from sklearn.metrics import classification_report from sklearn import metrics #測試樣本數 N=100 """ 數據收集和數據清洗(清洗換行符\n) 從scholaon數據集的user3文件導入信息;一百條命令組成一個列表x[],最終組成二維列表cmd_set[[]]; 返回二維列表,最頻繁50條命令,和最不頻繁50條命令 """ def load_user_cmd(filename): cmd_set=[] dist_max=[] dist_min=[] dist=[] with open(filename) as f: i=0 x=[] for line in f: line=line.strip(‘\n‘) x.append(line) dist.append(line) i+=1 if i == 100: cmd_set.append(x) x=[] i=0 fdist = list(FreqDist(dist).keys()) dist_max=set(fdist[0:50]) dist_min = set(fdist[-50:]) return cmd_set,dist_max,dist_min """ 特征化 將load_user_cmd函數的輸出作為輸入; 以100個命令為統計單元,作為一個操作序列,去重後的操作命令個數作為特征;(函數FreqDist會統計每個單詞的頻度,重新整合成一個+1維度的新的列表) KNN只能以標量作為輸入參數,所以需要將f2和f3表量化,最簡單的方式就是和統計的最頻繁使用的前50個命令以及最不頻繁使用的前50個命令計算重合程度。 返回一個150×3的列表;3裏的0:不重復單詞的個數,1:最頻繁單詞重合程度<=min{10,50},2最不頻繁單詞重合程度<=min{10,50} """ def get_user_cmd_feature(user_cmd_set,dist_max,dist_min): user_cmd_feature=[] for cmd_block in user_cmd_set: f1=len(set(cmd_block)) fdist = list(FreqDist(cmd_block).keys()) f2=fdist[0:10] f3=fdist[-10:] f2 = len(set(f2) & set(dist_max)) f3=len(set(f3) & set(dist_min)) x=[f1,f2,f3] user_cmd_feature.append(x) return user_cmd_feature """ 訓練模型 導入標識文件,100×50,正常命令為0,異常命令為1; 從標識文件中加載針對操作序列正確/異常的標識 返回一個容量為150的list 0/1數值,(只要這一行有1) """ def get_label(filename,index=0): x=[] with open(filename) as f: for line in f: line=line.strip(‘\n‘)#清空每行的\n x.append(int(line.split()[index]))#???? return x if __name__ == ‘__main__‘: user_cmd_set,user_cmd_dist_max,user_cmd_dist_min=load_user_cmd("../data/MasqueradeDat/User3") user_cmd_feature=get_user_cmd_feature(user_cmd_set,user_cmd_dist_max,user_cmd_dist_min) labels=get_label("../data/MasqueradeDat/label.txt",2) y=[0]*50+labels#y長度150,labels長度100 x_train=user_cmd_feature[0:N] y_train=y[0:N] x_test=user_cmd_feature[N:150] y_test=y[N:150] neigh = KNeighborsClassifier(n_neighbors=3) neigh.fit(x_train, y_train) y_predict=neigh.predict(x_test) score=np.mean(y_test==y_predict)*100 #print(y) #print(y_train) print(‘y_test\n‘,y_test) print(‘y_predict\n‘,y_predict) print(‘score\n‘,score) print(‘classification_report(y_test, y_predict)\n‘,classification_report(y_test, y_predict)) print(‘metrics.confusion_matrix(y_test, y_predict)\n‘,metrics.confusion_matrix(y_test, y_predict))
y_test [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] y_predict [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0] score 100.0 classification_report(y_test, y_predict) precision recall f1-score support 0 1.00 1.00 1.00 30 avg / total 1.00 1.00 1.00 30 metrics.confusion_matrix(y_test, y_predict) [[30]]
檢測異常操作(二)
上例只比較了最頻繁和最不頻繁的操作命令,這次我們全量比較。
# -*- coding:utf-8 -*- import sys import urllib #import urlparse import re #from hmmlearn import hmm import numpy as np from sklearn.externals import joblib #import HTMLParser import nltk import csv import matplotlib.pyplot as plt from nltk.probability import FreqDist from sklearn.feature_extraction.text import CountVectorizer from sklearn.neighbors import KNeighborsClassifier from sklearn import cross_validation #測試樣本數 N=90 """ 數據搜集和數據清洗(清洗換行符) 返回cmd_list:150×100的二維列表和fdist:去重的字符串集 """ def load_user_cmd_new(filename): cmd_list=[] dist=[] with open(filename) as f: i=0 x=[] for line in f: line=line.strip(‘\n‘) x.append(line) dist.append(line) i+=1 if i == 100: cmd_list.append(x) x=[] i=0 fdist = FreqDist(dist).keys() return cmd_list,fdist """ 特征化 使用詞集將操作命令向量化 """ def get_user_cmd_feature_new(user_cmd_list,dist): user_cmd_feature=[] for cmd_list in user_cmd_list: v=[0]*len(dist) for i in range(0,len(dist)): if list(dist)[i] in list(cmd_list): v[i]+=1 user_cmd_feature.append(v) return user_cmd_feature def get_label(filename,index=0): x=[] with open(filename) as f: for line in f: line=line.strip(‘\n‘) x.append( int(line.split()[index])) return x if __name__ == ‘__main__‘: """ 訓練模型 """ user_cmd_list,dist=load_user_cmd_new("../data/MasqueradeDat/User3") print( "len(dist):%d" % len(dist)) print( "dist:%s" % dist) user_cmd_feature=get_user_cmd_feature_new(user_cmd_list,dist) labels=get_label("../data/MasqueradeDat/label.txt",2) y=[0]*50+labels x_train=user_cmd_feature[0:N] y_train=y[0:N] x_test=user_cmd_feature[N:150] y_test=y[N:150] neigh = KNeighborsClassifier(n_neighbors=3) neigh.fit(x_train, y_train) y_predict=neigh.predict(x_test) """ 交叉驗證效果,10次隨機取樣和驗證 """ print(cross_validation.cross_val_score(neigh,user_cmd_feature, y, n_jobs=-1,cv=10))
len(dist):107 dist:dict_keys([‘Xsession‘, ‘sed‘, ‘grep‘, ‘wc‘, ‘date‘, ‘uname‘, ‘true‘, ‘xsetroot‘, ‘cpp‘, ‘sh‘, ‘xrdb‘, ‘cat‘, ‘stty‘, ‘basename‘, ‘ksh‘, ‘tail‘, ‘xmodmap‘, ‘ls‘, ‘hostname‘, ‘netstat‘, ‘netscape‘, ‘xterm‘, ‘sccs‘, ‘get‘, ‘diff‘, ‘more‘, ‘.java_wr‘, ‘expr‘, ‘dirname‘, ‘egrep‘, ‘java‘, ‘make‘, ‘mailx‘, ‘pq‘, ‘bdiff‘, ‘delta‘, ‘ex‘, ‘rm‘, ‘javac‘, ‘mkdir‘, ‘man‘, ‘od‘, ‘ln‘, ‘cfe‘, ‘ugen‘, ‘as1‘, ‘driver‘, ‘ld_‘, ‘readacct‘, ‘touch‘, ‘bc‘, ‘sendmail‘, ‘seecalls‘, ‘FvwmPage‘, ‘GoodStuf‘, ‘fvwm‘, ‘xdm‘, ‘chmod‘, ‘id‘, ‘nawk‘, ‘getopt‘, ‘lp‘, ‘find‘, ‘FIFO‘, ‘generic‘, ‘pr‘, ‘postprin‘, ‘file‘, ‘post‘, ‘awk‘, ‘getpgrp‘, ‘LOCK‘, ‘gethost‘, ‘download‘, ‘tcpostio‘, ‘UNLOCK‘, ‘rmdir‘, ‘tcppost‘, ‘cpio‘, ‘xargs‘, ‘gzip‘, ‘jar‘, ‘nslookup‘, ‘rlogin‘, ‘xhost‘, ‘admin‘, ‘runnit‘, ‘gs‘, ‘ppost‘, ‘hpost‘, ‘tracerou‘, ‘unpack‘, ‘col‘, ‘telnet‘, ‘ptelnet‘, ‘tset‘, ‘logname‘, ‘matlab‘, ‘launchef‘, ‘MediaMai‘, ‘a.out‘, ‘dbx‘, ‘dbxpcs‘, ‘mimencod‘, ‘sim301bS‘, ‘sim301bK‘, ‘ps‘]) [ 1. 1. 0.93333333 1. 1. 1. 1. 1. 0.93333333 0.92857143]
檢測Rootkit(三)
Rootkit是一種特殊的惡意軟件,它的功能是在安裝目標上隱藏自身及指定的文件,進程和網絡鏈接等信息,比較常見的是Rootkit,一般都和木馬,後門等其他惡意程序結合使用。
基於KDD 99的樣本數據,嘗試使用KNN算法識別基於telnet連接的Rootkit行為,檢測流程如下所示。
# -*- coding:utf-8 -*- from sklearn import cross_validation from sklearn.neighbors import KNeighborsClassifier """ 數據集已經完成了大部分的清洗工作; 41個特征描述 加載KDD 99數據集中的數據 """ def load_kdd99(filename): x=[] with open(filename) as f: for line in f: line=line.strip(‘\n‘) line=line.split(‘,‘) x.append(line) return x """ 特征化 """ def get_rootkit2andNormal(x): v=[] w=[] y=[] for x1 in x: if ( x1[41] in [‘rootkit.‘,‘normal.‘] ) and ( x1[2] == ‘telnet‘ ): if x1[41] == ‘rootkit.‘: y.append(1) else: y.append(0) """ 挑選與Rootkit相關的特征作為樣本特征 """ x1 = x1[9:21] v.append(x1) for x1 in v : v1=[] for x2 in x1: v1.append(float(x2)) w.append(v1) return w,y if __name__ == ‘__main__‘: v=load_kdd99("../data/kddcup99/corrected") x,y=get_rootkit2andNormal(v) """ 訓練樣本 """ clf = KNeighborsClassifier(n_neighbors=3) """ 效果驗證 """ print(cross_validation.cross_val_score(clf, x, y, n_jobs=-1, cv=10))
[ 0.9 0.9 1. 1. 1. 0.77777778 1. 1. 1. 1. ]
檢測Webshell(四)
使用ADFA-LD數據集中webshell相關數據,ADFA-LD數據集中記錄下了系統調用序列(比如A,B,C),然後使用數字標識每一個系統調用(1,2,3),這時(1,2,3)就轉換成了一個序列向量。
以下是系統調用的順序抽象成序列向量的過程
# -*- coding:utf-8 -*- import re import os import numpy as np from sklearn.feature_extraction.text import CountVectorizer from sklearn import cross_validation from sklearn.neighbors import KNeighborsClassifier def load_one_flle(filename): x=[] with open(filename) as f: line=f.readline() line=line.strip(‘\n‘) return line #加載ADFA-LD中的正常樣本數據 def load_adfa_training_files(rootdir): x=[] y=[] list = os.listdir(rootdir) for i in range(0, len(list)): path = os.path.join(rootdir, list[i]) if os.path.isfile(path): x.append(load_one_flle(path)) y.append(0) return x,y #定義遍歷目錄下文件的函數 def dirlist(path, allfile): filelist = os.listdir(path) for filename in filelist: filepath = os.path.join(path, filename) if os.path.isdir(filepath): dirlist(filepath, allfile) else: allfile.append(filepath) return allfile #從攻擊數據集中篩選和webshell相關的數據 def load_adfa_webshell_files(rootdir): x=[] y=[] allfile=dirlist(rootdir,[]) for file in allfile: #此處小心,前部分分隔符為/,web_shell_x後為 if re.match(r"../data/ADFA-LD/Attack_Data_Master/Web_Shell_\d+\\UAD-W*",file): x.append(load_one_flle(file)) y.append(1) return x,y if __name__ == ‘__main__‘: x1,y1=load_adfa_training_files("../data/ADFA-LD/Training_Data_Master/") x2,y2=load_adfa_webshell_files("../data/ADFA-LD/Attack_Data_Master/") x=x1+x2 y=y1+y2 #print(x) vectorizer = CountVectorizer(min_df=1) x=vectorizer.fit_transform(x) x=x.toarray() #print(y) clf = KNeighborsClassifier(n_neighbors=3) scores=cross_validation.cross_val_score(clf, x, y, n_jobs=-1, cv=10) print(scores) print(np.mean(scores))
[ 0.95833333 0.94791667 0.97916667 0.96842105 0.96842105 0.84210526 0.97894737 0.98947368 0.9787234 0.9787234 ] 0.959023189623
參考:
web安全之機器學習入門——2.機器學習概述
scikit-learn K近鄰法類庫使用小結
predict predict_proba區別的小例子
基於用戶行為動態變化的內部威脅檢測方法
web安全之機器學習入門——3.1 KNN/k近鄰算法