摘要: 随着深度学习的快速发展,人们创建了一整套神经网络结构来解决各种各样的任务和问题。在英文分类基础上,中文文本分类的处理也同样重要...
人工智能学习离不开实践的验证,推荐大家可以多在FlyAI-AI竞赛服务平台多参加训练和竞赛,以此来提升自己的能力。FlyAI是为AI开发者提供数据竞赛并支持GPU离线训练的一站式服务平台。每周免费提供项目开源算法样例,支持算法能力变现以及快速的迭代算法模型。
目录
- 概述
- 数据集合
- 代码
- 结果展示
一、概述
在英文分类的基础上,再看看中文分类的,是一种10分类问题(体育,科技,游戏,财经,房产,家居等)的处理。
二、数据集合
数据集为新闻,总共有四个数据文件,在/data/cnews目录下,包括内容如下图所示测试集,训练集和验证集,和单词表(最后的单词表cnews.vocab.txt可以不要,因为训练可以自动产生)。数据格式:前面为类别,后面为描述内容。
训练数据地址:链接: https://pan.baidu.com/s/1ZHh98RrjQpG5Tm-yq73vBQ 提取码:2r04
其中训练集的格式:
vocab.txt的格式:每个字一行,其中前面加上PAD。
三、代码
3.1 数据采集cnews_loader.py
# coding: utf-8import sysfrom collections import Counterimport numpy as npimport tensorflow.contrib.keras as krif sys.version_info[0] > 2:is_py3 = Trueelse:reload(sys)sys.setdefaultencoding("utf-8")is_py3 = Falsedef native_word(word, encoding='utf-8'):"""如果在python2下面使用python3训练的模型,可考虑调用此函数转化一下字符编码"""if not is_py3:return word.encode(encoding)else:return worddef native_content(content):if not is_py3:return content.decode('utf-8')else:return contentdef open_file(filename, mode='r'):"""常用文件操作,可在python2和python3间切换.mode: 'r' or 'w' for read or write"""if is_py3:return open(filename, mode, encoding='utf-8', errors='ignore')else:return open(filename, mode)def read_file(filename):"""读取文件数据"""contents, labels = [], []with open_file(filename) as f:for line in f:try:label, content = line.strip().split('\t')if content:contents.append(list(native_content(content)))labels.append(native_content(label))except:passreturn contents, labelsdef build_vocab(train_dir, vocab_dir, vocab_size=5000):"""根据训练集构建词汇表,存储"""data_train, _ = read_file(train_dir)all_data = []for content in data_train:all_data.extend(content)counter = Counter(all_data)count_pairs = counter.most_common(vocab_size - 1)words, _ = list(zip(*count_pairs))# 添加一个 <PAD> 来将所有文本pad为同一长度words = ['<PAD>'] + list(words)open_file(vocab_dir, mode='w').write('\n'.join(words) + '\n')def read_vocab(vocab_dir):"""读取词汇表"""# words = open_file(vocab_dir).read().strip().split('\n')with open_file(vocab_dir) as fp:# 如果是py2 则每个值都转化为unicodewords = [native_content(_.strip()) for _ in fp.readlines()]word_to_id = dict(zip(words, range(len(words))))return words, word_to_iddef read_category():"""读取分类目录,固定"""categories = ['体育', '财经', '房产', '家居', '教育', '科技', '时尚', '时政', '游戏', '娱乐']categories = [native_content(x) for x in categories]cat_to_id = dict(zip(categories, range(len(categories))))return categories, cat_to_iddef to_words(content, words):"""将id表示的内容转换为文字"""return ''.join(words[x] for x in content)def process_file(filename, word_to_id, cat_to_id, max_length=600):"""将文件转换为id表示"""contents, labels = read_file(filename)data_id, label_id = [], []for i in range(len(contents)):data_id.append([word_to_id[x] for x in contents[i] if x in word_to_id])label_id.append(cat_to_id[labels[i]])# 使用keras提供的pad_sequences来将文本pad为固定长度x_pad = kr.preprocessing.sequence.pad_sequences(data_id, max_length)y_pad = kr.utils.to_categorical(label_id, num_classes=len(cat_to_id)) # 将标签转换为one-hot表示return x_pad, y_paddef batch_iter(x, y, batch_size=64):"""生成批次数据"""data_len = len(x)num_batch = int((data_len - 1) / batch_size) + 1indices = np.random.permutation(np.arange(data_len))x_shuffle = x[indices]y_shuffle = y[indices]for i in range(num_batch):start_id = i * batch_sizeend_id = min((i + 1) * batch_size, data_len)yield x_shuffle[start_id:end_id], y_shuffle[start_id:end_id
3.2 模型搭建cnn_model.py
#!/usr/bin/python# -*- coding: utf-8 -*-import tensorflow as tfclass TRNNConfig(object):"""RNN配置参数"""# 模型参数embedding_dim = 64 # 词向量维度seq_length = 600 # 序列长度num_classes = 10 # 类别数vocab_size = 5000 # 词汇表达小num_layers= 2 # 隐藏层层数hidden_dim = 128 # 隐藏层神经元rnn = 'gru' # lstm 或 grudropout_keep_prob = 0.8 # dropout保留比例learning_rate = 1e-3 # 学习率batch_size = 128 # 每批训练大小num_epochs = 10 # 总迭代轮次print_per_batch = 100 # 每多少轮输出一次结果save_per_batch = 10 # 每多少轮存入tensorboardclass TextRNN(object):"""文本分类,RNN模型"""def __init__(self, config):self.config = config# 三个待输入的数据self.input_x = tf.placeholder(tf.int32, [None, self.config.seq_length], name='input_x')self.input_y = tf.placeholder(tf.float32, [None, self.config.num_classes], name='input_y')self.keep_prob = tf.placeholder(tf.float32, name='keep_prob')self.rnn()def rnn(self):"""rnn模型"""def lstm_cell(): # lstm核return tf.contrib.rnn.BasicLSTMCell(self.config.hidden_dim, state_is_tuple=True)def gru_cell(): # gru核return tf.contrib.rnn.GRUCell(self.config.hidden_dim)def dropout(): # 为每一个rnn核后面加一个dropout层if (self.config.rnn == 'lstm'):cell = lstm_cell()else:cell = gru_cell()return tf.contrib.rnn.DropoutWrapper(cell, output_keep_prob=self.keep_prob)# 词向量映射with tf.device('/cpu:0'):embedding = tf.get_variable('embedding', [self.config.vocab_size, self.config.embedding_dim])embedding_inputs = tf.nn.embedding_lookup(embedding, self.input_x)with tf.name_scope("rnn"):# 多层rnn网络cells = [dropout() for _ in range(self.config.num_layers)]rnn_cell = tf.contrib.rnn.MultiRNNCell(cells, state_is_tuple=True)_outputs, _ = tf.nn.dynamic_rnn(cell=rnn_cell, inputs=embedding_inputs, dtype=tf.float32)last = _outputs[:, -1, :] # 取最后一个时序输出作为结果with tf.name_scope("score"):# 全连接层,后面接dropout以及relu激活fc = tf.layers.dense(last, self.config.hidden_dim, name='fc1')fc = tf.contrib.layers.dropout(fc, self.keep_prob)fc = tf.nn.relu(fc)# 分类器self.logits = tf.layers.dense(fc, self.config.num_classes, name='fc2')self.y_pred_cls = tf.argmax(tf.nn.softmax(self.logits), 1) # 预测类别with tf.name_scope("optimize"):# 损失函数,交叉熵cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits=self.logits, labels=self.input_y)self.loss = tf.reduce_mean(cross_entropy)# 优化器self.optim = tf.train.AdamOptimizer(learning_rate=self.config.learning_rate).minimize(self.loss)with tf.name_scope("accuracy"):# 准确率correct_pred = tf.equal(tf.argmax(self.input_y, 1), self.y_pred_cls)self.acc = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
3.3 运行代码run_cnn.py
# coding: utf-8from __future__ import print_functionimport osimport sysimport timefrom datetime import timedeltaimport numpy as npimport tensorflow as tffrom sklearn import metricsfrom rnn_model import TRNNConfig, TextRNNfrom cnews_loader import read_vocab, read_category, batch_iter, process_file, build_vocabbase_dir = '../data/cnews'train_dir = os.path.join(base_dir, 'cnews.train.txt')test_dir = os.path.join(base_dir, 'cnews.test.txt')val_dir = os.path.join(base_dir, 'cnews.val.txt')vocab_dir = os.path.join(base_dir, 'cnews.vocab.txt')save_dir = '../checkpoints/textrnn'save_path = os.path.join(save_dir, 'best_validation') # 最佳验证结果保存路径def get_time_dif(start_time):"""获取已使用时间"""end_time = time.time()time_dif = end_time - start_timereturn timedelta(seconds=int(round(time_dif)))def feed_data(x_batch, y_batch, keep_prob):feed_dict = {model.input_x: x_batch,model.input_y: y_batch,model.keep_prob: keep_prob}return feed_dictdef evaluate(sess, x_, y_):"""评估在某一数据上的准确率和损失"""data_len = len(x_)batch_eval = batch_iter(x_, y_, 128)total_loss = 0.0total_acc = 0.0for x_batch, y_batch in batch_eval:batch_len = len(x_batch)feed_dict = feed_data(x_batch, y_batch, 1.0)y_pred_class,loss, acc = sess.run([model.y_pred_cls,model.loss, model.acc], feed_dict=feed_dict)total_loss += loss * batch_lentotal_acc += acc * batch_lenreturn y_pred_class,total_loss / data_len, total_acc / data_lendef train():print("Configuring TensorBoard and Saver...")# 配置 Tensorboard,重新训练时,请将tensorboard文件夹删除,不然图会覆盖tensorboard_dir = '../tensorboard/textrnn'if not os.path.exists(tensorboard_dir):os.makedirs(tensorboard_dir)tf.summary.scalar("loss", model.loss)tf.summary.scalar("accuracy", model.acc)merged_summary = tf.summary.merge_all()writer = tf.summary.FileWriter(tensorboard_dir)# 配置 Saversaver = tf.train.Saver()if not os.path.exists(save_dir):os.makedirs(save_dir)print("Loading training and validation data...")# 载入训练集与验证集start_time = time.time()x_train, y_train = process_file(train_dir, word_to_id, cat_to_id, config.seq_length)x_val, y_val = process_file(val_dir, word_to_id, cat_to_id, config.seq_length)time_dif = get_time_dif(start_time)print("Time usage:", time_dif)# 创建sessionsession = tf.Session()session.run(tf.global_variables_initializer())writer.add_graph(session.graph)print('Training and evaluating...')start_time = time.time()total_batch = 0 # 总批次best_acc_val = 0.0 # 最佳验证集准确率last_improved = 0 # 记录上一次提升批次require_improvement = 1000 # 如果超过1000轮未提升,提前结束训练flag = Falsefor epoch in range(config.num_epochs):print('Epoch:', epoch + 1)batch_train = batch_iter(x_train, y_train, config.batch_size)for x_batch, y_batch in batch_train:feed_dict = feed_data(x_batch, y_batch, config.dropout_keep_prob)if total_batch % config.save_per_batch == 0:# 每多少轮次将训练结果写入tensorboard scalars = session.run(merged_summary, feed_dict=feed_dict)writer.add_summary(s, total_batch)if total_batch % config.print_per_batch == 0:# 每多少轮次输出在训练集和验证集上的性能feed_dict[model.keep_prob] = 1.0loss_train, acc_train = session.run([model.loss, model.acc], feed_dict=feed_dict)y_pred_class,loss_val, acc_val = evaluate(session, x_val, y_val) # todoif acc_val > best_acc_val:# 保存最好结果best_acc_val = acc_vallast_improved = total_batchsaver.save(sess=session, save_path=save_path)improved_str = '*'else:improved_str = ''time_dif = get_time_dif(start_time)msg = 'Iter: {0:>6}, Train Loss: {1:>6.2}, Train Acc: {2:>7.2%},' \+ ' Val Loss: {3:>6.2}, Val Acc: {4:>7.2%}, Time: {5} {6}'print(msg.format(total_batch, loss_train, acc_train, loss_val, acc_val, time_dif, improved_str))session.run(model.optim, feed_dict=feed_dict) # 运行优化total_batch += 1if total_batch - last_improved > require_improvement:# 验证集正确率长期不提升,提前结束训练print("No optimization for a long time, auto-stopping...")flag = Truebreak # 跳出循环if flag: # 同上breakdef test():print("Loading test data...")start_time = time.time()x_test, y_test = process_file(test_dir, word_to_id, cat_to_id, config.seq_length)session = tf.Session()session.run(tf.global_variables_initializer())saver = tf.train.Saver()saver.restore(sess=session, save_path=save_path) # 读取保存的模型print('Testing...')y_pred,loss_test, acc_test = evaluate(session, x_test, y_test)msg = 'Test Loss: {0:>6.2}, Test Acc: {1:>7.2%}'print(msg.format(loss_test, acc_test))batch_size = 128data_len = len(x_test)num_batch = int((data_len - 1) / batch_size) + 1y_test_cls = np.argmax(y_test, 1)y_pred_cls = np.zeros(shape=len(x_test), dtype=np.int32) # 保存预测结果for i in range(num_batch): # 逐批次处理start_id = i * batch_sizeend_id = min((i + 1) * batch_size, data_len)feed_dict = {model.input_x: x_test[start_id:end_id],model.keep_prob: 1.0}y_pred_cls[start_id:end_id] = session.run(model.y_pred_cls, feed_dict=feed_dict)# 评估print("Precision, Recall and F1-Score...")print(metrics.classification_report(y_test_cls, y_pred_cls, target_names=categories))# 混淆矩阵print("Confusion Matrix...")cm = metrics.confusion_matrix(y_test_cls, y_pred_cls)print(cm)time_dif = get_time_dif(start_time)print("Time usage:", time_dif)if __name__ == '__main__':print('Configuring RNN model...')config = TRNNConfig()if not os.path.exists(vocab_dir): # 如果不存在词汇表,重建build_vocab(train_dir, vocab_dir, config.vocab_size)categories, cat_to_id = read_category()words, word_to_id = read_vocab(vocab_dir)config.vocab_size = len(words)model = TextRNN(config)option='train'if option == 'train':train()else:test()
3.4 预测predict.py
# coding: utf-8from __future__ import print_functionimport osimport tensorflow as tfimport tensorflow.contrib.keras as krfrom rnn_model import TRNNConfig, TextRNNfrom cnews_loader import read_category, read_vocabtry:bool(type(unicode))except NameError:unicode = strbase_dir = '../data/cnews'vocab_dir = os.path.join(base_dir, 'cnews.vocab.txt')save_dir = '../checkpoints/textrnn'save_path = os.path.join(save_dir, 'best_validation') # 最佳验证结果保存路径class RnnModel:def __init__(self):self.config = TRNNConfig()self.categories, self.cat_to_id = read_category()self.words, self.word_to_id = read_vocab(vocab_dir)self.config.vocab_size = len(self.words)self.model = TextRNN(self.config)self.session = tf.Session()self.session.run(tf.global_variables_initializer())saver = tf.train.Saver()saver.restore(sess=self.session, save_path=save_path) # 读取保存的模型def predict(self, message):# 支持不论在python2还是python3下训练的模型都可以在2或者3的环境下运行content = unicode(message)data = [self.word_to_id[x] for x in content if x in self.word_to_id]feed_dict = {self.model.input_x: kr.preprocessing.sequence.pad_sequences([data], self.config.seq_length),self.model.keep_prob: 1.0}y_pred_cls = self.session.run(self.model.y_pred_cls, feed_dict=feed_dict)return self.categories[y_pred_cls[0]]if __name__ == '__main__':rnn_model = RnnModel()test_demo = ['三星ST550以全新的拍摄方式超越了以往任何一款数码相机','热火vs骑士前瞻:皇帝回乡二番战 东部次席唾手可得新浪体育讯北京时间3月30日7:00']for i in test_demo:print(rnn_model.predict(i))
四、结果展示
训练时长,接近2小时
相关代码可见:https://github.com/yifanhunter/textClassifier_chinese
有关于‘文本分类’的竞赛项目,大家可移步官网进行查看和参赛!
更多精彩内容请访问FlyAI-AI竞赛服务平台;为AI开发者提供数据竞赛并支持GPU离线训练的一站式服务平台;每周免费提供项目开源算法样例,支持算法能力变现以及快速的迭代算法模型。
挑战者,都在FlyAI!!!