import numpy as np
from hmmlearn import hmm
from urllib.parse import unquote, parse_qsl
import re
import joblib
import nltk
# 以黑找黑
# 处理参数值的最小长度
MIN_LEN = 6
# 隐状态个数
N = 10
# 最大似然概率阈值
T = -200
# 数据提取与特征提取,这一步不采用单字符的char特征提取,而是根据领域经验对特定的phrase字符组为基础单位,进行特征化,这是一种token切分方案
#
#
# <;IFRAME SRC=http://ha.ckers.org/scriptlet.html <;
# ';alert(String.fromCharCode(88,83,83))//\';alert(String.fromCharCode(88,83,83))//";alert(String.fromCharCode(88,83,83))
# //\";alert(String.fromCharCode(88,83,83))//-->">'>
tokens_pattern = r'''(?x)
"[^"]+" #"xxxx"
|http://\S+ #http://xxxx
|\w+> #
|<\w+> #
|<\w+ # # >
|\w+\([^<]+\) #函数 比如alert(String.fromCharCode(88,83,83))
|\w+ # xxxx
'''
# #排除中文干扰 只处理127以内的字符
def ischeck(str1):
for i, c in enumerate(str1):
if ord(c) > 127 or ord(c) < 31:
return False
return True
# 数据预处理
def preprocessing(str1):
result = []
line = str1.strip('\n')
line = unquote(line) # url解码
if len(line) >= MIN_LEN: # 忽略短url value
# 只处理参数
params = parse_qsl(line, True)
for k, line in params:
if ischeck(line) and len(line) >= N:
line, _ = re.subn(r'\d+', "8", line) # 数字常量替换成8
line, _ = re.subn(r'(http|https)://[a-zA-Z0-9\.@&/#!#\?:=]+', "http://u", line) # url换成http://u
line, _ = re.subn(r'\/\*.?\*\/', "", line) # 去除注释
tokens = nltk.regexp_tokenize(line, tokens_pattern) # token切分
result += tokens
if result:
return result
return False
# 加载词集
def load_wordbag(filename, max=100):
tokens_list = []
index_wordbag = 1 # 词袋索引
wordbag = {} # 词袋
with open(filename) as f:
for line in f:
tokens = preprocessing(line)
if tokens:
tokens_list += tokens
fredist = nltk.FreqDist(tokens_list) # 单文件词频
keys = list(fredist.keys())
keys = keys[:max] # 降维,只提取前N个高频使用的单词,其余规范化到0
for localkey in keys: # 获取统计后的不重复词集
if localkey in wordbag.keys(): # 判断该词是否已在词袋中
continue
else:
wordbag[localkey] = index_wordbag
index_wordbag += 1
return wordbag
# 训练
def train(filename, wordbag):
x = [[-1]]
x_lens = [1]
with open(filename) as f:
for line in f:
words = preprocessing(line)
if words:
vers = []
for word in words:
# 根据词汇编码表进行index编码,对不在词汇表中的token词不予编码
if word in wordbag.keys():
vers.append([wordbag[word]])
else:
vers.append([-1])
np_vers = np.array(vers)
x = np.concatenate([x, np_vers])
x_lens.append(len(np_vers))
ghmm = hmm.GaussianHMM(n_components=N, covariance_type="full", n_iter=100)
ghmm.fit(x, x_lens)
joblib.dump(ghmm, "export/model/hmm-xss-train_2.pkl")
return ghmm
# 测试
def test(filename, wordbag):
# 从pkl读取
ghmm = joblib.load("export/model/hmm-xss-train_2.pkl")
with open(filename) as f:
for line in f:
words = preprocessing(line)
if words:
vers = []
for word in words:
# test和train保持相同的编码方式
if word in wordbag.keys():
vers.append([wordbag[word]])
else:
vers.append([-1])
np_vers = np.array(vers)
pro = ghmm.score(np_vers)
if pro >= T:
print("SCORE:(%d) XSS_URL: %s " % (pro, line))
def main():
xss = "data/XSS/xss-200000.txt"
# 得到词频编码表
wordbag = load_wordbag(xss, 2000)
# 以黑找黑, 训练HMM模型, 保存
train(xss, wordbag)
# 输入test样本检测
test('data/XSS/test-sample.txt', wordbag)
if __name__ == '__main__':
main()