fasttext
创建日期 星期二 12 三月 2019
fasttext 和CBOW模型很像,也是有一层输入层,然后隐藏层然后到输出层;
fastText适合与短文本分类;
是将所以词语映射成向量,从向量线性变换到特征层,然后从特征层来求最大似然函数;然后从该层映射到标签;
FastText= word2vec中 cbow + h-softmax的灵活使用
灵活体现在两个方面:
模型的输出层:word2vec的输出层,对应的是每一个term,计算某term的概率最大;而fasttext的输出层对应的是
分类的label。不过不管输出层对应的是什么内容,起对应的vector都不会被保留和使用;
模型的输入层:word2vec的输入层,是 context window 内的term;而fasttext 对应的整个sentence的内容,包括term,也包括 n-gram的内容;
两者本质的不同,体现在 h-softmax的使用。
Word2vec的目的是得到词向量,该词向量 最终是在输入层得到,输出层对应的 h-softmax 也会生成一系列的向量,但最终都被抛弃,不会使用。
fasttext则充分利用了h-softmax的分类功能,遍历分类树的所有叶节点,找到概率最大的label(一个或者N个)。
import collections
import math
import os
import random
import numpy as np
import tensorflow as tf
from random import shuffle
import sys, getopt
import os
from collections import namedtuple
Dataset = namedtuple('Dataset','sentences labels')
num_classes = 3
learning_rate = 0.01
num_epochs = 10
embedding_dim = 10
label_to_id = {'World': 0, 'Entertainment': 1, 'Sports': 2}
unknown_word_id = 0
def create_label_vec(label):
# Generate a label vector for a given classification label.
label_vec = [0] * num_classes
label_vec[label_to_id[label]] = 1
return label_vec
def tokenize(sens):
# Tokenize a given sentence into a sequence of tokens.
import nltk
return nltk.word_tokenize(sens)
def map_token_seq_to_word_id_seq(token_seq, word_to_id):
return [map_word_to_id(word_to_id,word) for word in token_seq]
def map_word_to_id(word_to_id, word):
# map each word to its id.
if word in word_to_id:
return word_to_id[word]
else:
return word_to_id['$UNK$']
def build_vocab(sens_file_name):
data = []
with open(sens_file_name) as f:
for line in f.readlines():
tokens = tokenize(line)
data.extend(tokens)
count = [['$UNK$', 0]]
sorted_counts = collections.Counter(data).most_common()
count.extend(sorted_counts)
word_to_id = dict()
for word, _ in count:
word_to_id[word] = len(word_to_id)
print('size of vocabulary is %s. ' % len(word_to_id))
return word_to_id
def read_labeled_dataset(sens_file_name, label_file_name, word_to_id):
sens_file = open(sens_file_name)
label_file = open(label_file_name)
data = []
for label in label_file:
label = label.strip()
sens = sens_file.readline()
word_id_seq = map_token_seq_to_word_id_seq(tokenize(sens), word_to_id)
data.append((word_id_seq, create_label_vec(label)))
print("read %d sentences from %s ." % (len(data), sens_file_name))
sens_file.close()
label_file.close()
return data
def read_dataset(sens_file_name, word_to_id):
sens_file = open(sens_file_name)
data = []
for sens in sens_file:
word_id_seq = map_token_seq_to_word_id_seq(tokenize(sens), word_to_id)
data.append(word_id_seq)
print("read %d sentences from %s ." % (len(data), sens_file_name))
sens_file.close()
return data
def train_eval(word_to_id, train_dataset, dev_dataset, test_dataset):
print 'train and eval start'
num_words = len(word_to_id)
# Initialize the placeholders and Variables. E.g.
# label tensor
correct_label = tf.placeholder(tf.float32, shape=[num_classes])
# sentences tensor
input_sens = tf.placeholder(tf.int32, shape=[None])
#build sentences embedding as embedding1 and labels embedding as embedding2 with embdedding_dim dimensions
embeddings1 = tf.Variable(tf.random_uniform([num_words, embedding_dim], -1.0/embedding_dim, 1.0/embedding_dim)) # num_words * 10
embeddings2 = tf.Variable(tf.random_uniform([num_classes, embedding_dim], -1.0/embedding_dim, 1.0/embedding_dim)) # 3 * 10
with tf.Session() as sess:
# retrieves rows of the params tensor
embed1 = tf.nn.embedding_lookup(embeddings1, input_sens) # n * 10
# Computes the sum of elements across dimensions of a tensor, and keep the dimension numbers.
tmp_m1 = tf.reduce_sum(embed1, 0) # 1 * 10
# reshape the tensor
sum_rep1 = tf.reshape(tmp_m1, [1, embedding_dim]) # 1*10
# embed2 = tf.nn.embedding_lookup(embeddings2, correct_label)
# tmp_m2 = tf.reduce_sum(embed2, 0)
# sum_rep2 = tf.reshape(tmp_m2, [num_classes, embedding_dim])
# Get a 1*3 dimension tensor
# y : maximum likelihood estimation
y = tf.nn.softmax(tf.matmul(sum_rep1, embeddings2, transpose_b=True)) # 3*1
cross_entropy = tf.reduce_mean(-tf.reduce_sum(correct_label * tf.log(y), reduction_indices=[1])) # 1*1
#evaluation code, assume y is the estimated probability vector of each class
correct_prediction = tf.equal(tf.argmax(y, 1), tf.argmax(correct_label, 0))
accuracy = tf.cast(correct_prediction, tf.float32)
prediction = tf.cast(tf.argmax(y, 1), tf.int32)
sess.run(tf.initialize_all_variables())
# Build SGD optimizer
train_step = tf.train.GradientDescentOptimizer(learning_rate).minimize(cross_entropy)
for epoch in range(num_epochs):
print 'epoch start'
shuffle(train_dataset)
for data in train_dataset:
train_step.run(feed_dict={input_sens: data[0], correct_label: data[1]})
# The following line computes the accuracy on the development dataset in each epoch.
print('Epoch %d : %s .' % (epoch,compute_accuracy(accuracy,input_sens, correct_label, dev_dataset)))
print('Accuracy on the test set : %s.' % compute_accuracy(accuracy,input_sens, correct_label, test_dataset))
# input_sens is the placeholder of an input sentence.
test_results = predict(prediction, input_sens, test_dataset)
return test_results
def compute_accuracy(accuracy,input_sens, correct_label, eval_dataset):
num_correct = 0
for (sens, label) in eval_dataset:
num_correct += accuracy.eval(feed_dict={input_sens: sens, correct_label: label})
print('#correct sentences is %s ' % num_correct)
return num_correct / len(eval_dataset)
def predict(prediction, input_sens, test_dataset):
test_results = []
for (sens, label) in test_dataset:
test_results.append(prediction.eval(feed_dict={input_sens: sens}))
return test_results
def write_result_file(test_results, result_file):
with open(result_file, mode='w') as f:
for r in test_results:
f.write("%d\n" % r)
def main(argv):
trainSensFile = ''
trainLabelFile = ''
devSensFile = ''
devLabelFile = ''
testSensFile = ''
testLabelFile = ''
testResultFile = ''
try:
opts, args = getopt.getopt(argv, "hd:", ["dataFolder="])
except getopt.GetoptError:
print('fastText.py -d <dataFolder>')
sys.exit(2)
for opt, arg in opts:
if opt == '-h':
print('fastText.py -d <dataFolder>')
sys.exit()
elif opt in ("-d", "--dataFolder"):
trainSensFile = os.path.join(arg, 'sentences_train.txt')
devSensFile = os.path.join(arg, 'sentences_dev.txt')
testSensFile = os.path.join(arg, 'sentences_test.txt')
trainLabelFile = os.path.join(arg, 'labels_train.txt')
devLabelFile = os.path.join(arg, 'labels_dev.txt')
#testLabelFile = os.path.join(arg, 'labels_test.txt')
testResultFile = os.path.join(arg, 'test_results.txt')
else:
print("unknown option %s ." % opt)
word_to_id_train = build_vocab('sentences_train.txt')
train_dataSet = read_labeled_dataset('sentences_train.txt', 'labels_train.txt', word_to_id_train)
# word_to_id_dev = build_vocab('sentences_dev.txt')
#dev_dataSet = read_labeled_dataset('sentences_dev.txt', 'labels_dev.txt', word_to_id_train)
test_dataSet = read_labeled_dataset('sentences_test.txt', 'labels_test.txt', word_to_id_train)
test_results = train_eval(word_to_id_train, train_dataSet, test_dataSet, test_dataSet)
write_result_file(test_results, 'test_results.txt')
if __name__ == "__main__":
main(sys.argv[1:])