123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- import string
- import bcolz
- import numpy as np
- import os
- import pickle
- from .nlp_utils import compact_text
- import nltk
- import torch
- nltk.download('stopwords')
- from nltk.corpus import stopwords
- from nltk.tokenize import word_tokenize, sent_tokenize
- import gensim
- stop_words = set(stopwords.words('english'))
- eps = 10e-8
- def get_init_hidden(bsz, hidden_size, use_cuda):
- h_0 = torch.autograd.Variable(torch.FloatTensor(bsz, hidden_size).zero_())
- c_0 = torch.autograd.Variable(torch.FloatTensor(bsz, hidden_size).zero_())
- if use_cuda:
- h_0, c_0 = h_0.cuda(), c_0.cuda()
- return h_0, c_0
- def similarity(w1, w2, w2v):
- try:
- vec1 = w2v[w1]
- except KeyError:
- vec1 = np.zeros((300,))
- try:
- vec2 = w2v[w2]
- except KeyError:
- vec2 = np.zeros((300,))
- if np.sum(vec1)==0 and np.sum(vec2)!=0:
- vec1 = vec1[:len(vec2)]
- elif np.sum(vec1)!=0 and np.sum(vec2)==0:
- vec2 = vec2[:len(vec1)]
- unit_vec1 = vec1/(np.linalg.norm(vec1) + eps)
- unit_vec2 = vec2/(np.linalg.norm(vec2) + eps)
- return np.dot(unit_vec1, unit_vec2)
- def normalize(state, remove_articles=True):
- state = state.lower()
- out = state.translate(str.maketrans('', '' , string.punctuation))
- if remove_articles:
- out = word_tokenize(out)
- s_ws = [w for w in out if not w in stop_words]
- else:
- s_ws = word_tokenize(out)
- return s_ws
- def get_thresholded(sim_dict, t=0.2):
- final_words = []
- for k, v in sim_dict.items():
- if v >=t:
- final_words.append(k)
- return final_words
- def statistics_score(sim_dict, kind='avg'):
- scores = []
- for k, v in sim_dict.items():
- scores.append(v)
- return np.mean(scores) if kind == 'avg' else np.max(scores)
- def correlate_state(s_ws, object_list, w2v, mean=True):
- sim_dict = {}
- if isinstance(object_list, list):
- for w in s_ws:
- sim_list = []
- for w_obj in object_list:
- sim_list.append(similarity(w_obj, w, w2v))
- sim_dict[w] = np.mean(sim_list) if mean else np.max(sim_list)
- elif isinstance(object_list, dict):
- for w in s_ws:
- sim_list = []
- for w_obj, val in object_list.items():
- sim_list.append(val * similarity(w_obj, w, w2v))
- sim_dict[w] = np.mean(sim_list) if mean else np.max(sim_list)
- return sim_dict
- class BootstrapFilter:
- def __init__(self, threshold=0.3,
- filter_sentence=False):
- self.threshold = threshold
- self.load_cc_embeddings()
- self.load_bs_action_token()
- self.filter_sent = filter_sentence
- def load_cc_embeddings(self):
- embed_size = 300
- cc_path = os.path.expanduser('~/Data/nlp/conceptNet')
- filename = '{0}/numberbatch-en-19.08.txt'.format(cc_path)
- rootdir = '{0}/glove.dat'.format(cc_path)
- words_file = '{0}/CC_words.pkl'.format(cc_path, embed_size)
- idx_file = '{0}/CC_idx.pkl'.format(cc_path, embed_size)
- words = pickle.load(open(words_file, 'rb'))
- word2idx = pickle.load(open(idx_file, 'rb'))
- vectors = bcolz.open(rootdir)
- self.w2v = {w: vectors[word2idx[w]] for w in words}
-
- # Domain relevant episodic state pruning
- class CREST(BootstrapFilter):
- def __init__(self, threshold=0.3, embeddings='cnet'): # 'cnet' | 'glove' | 'word2vec' | 'bert'
- self.threshold = threshold
- print('##'*30)
- print('Using embedding : ', embeddings)
- print('##'*30)
- if embeddings=='cnet':
- self.load_cc_embeddings()
- elif embeddings=='glove':
- self.load_glove_embeddings()
- elif embeddings=='word2vec':
- self.load_w2v_embeddings()
- def load_glove_embeddings(self):
- embed_size=100
- glove_path=os.path.expanduser('~/Data/nlp/glove/glove.6B')
- rootdir = '{0}/6B.{1}.dat'.format(glove_path, embed_size)
- words_file = '{0}/6B.{1}_words.pkl'.format(glove_path, embed_size)
- idx_file = '{0}/6B.{1}_idx.pkl'.format(glove_path, embed_size)
- vectors = bcolz.open(rootdir)[:]
- words = pickle.load(open(words_file, 'rb'))
- word2idx = pickle.load(open(idx_file, 'rb'))
- self.w2v = {w: vectors[word2idx[w]] for w in words}
- def load_w2v_embeddings(self):
- self.w2v=gensim.models.KeyedVectors.load_word2vec_format('data/Googlemodel.bin',binary=True)
- def prune_state(self, noisy_string, expert_words, return_details=False, add_prefix=True):
- def get_scores(noisy_string_x, mean=True):
- s_wsx = normalize(noisy_string_x)
- sim_dictx = correlate_state(s_wsx, object_list=expert_words, w2v=self.w2v, mean=mean)
- return sim_dictx
- sentence_pruned_str_joined = noisy_string
- sim_dict = get_scores(sentence_pruned_str_joined, mean=False)
- final_str = get_thresholded(sim_dict, t=self.threshold)
- if add_prefix:
- final_str = '-= Unknown =- ' + ' '.join(final_str)
- else:
- final_str = ' '.join(final_str)
- if return_details:
- return final_str, sim_dict
- else:
- return final_str
|