Source code for persephone.rnn_ctc

""" An acoustic model with a LSTM/CTC architecture. """

import os
from typing import Union, Dict, Any
from pathlib import Path

import numpy as np
import tensorflow as tf

from . import model

def lstm_cell(hidden_size):
    """ Wrapper function to create an LSTM cell. """

    return tf.contrib.rnn.LSTMCell(
        hidden_size, use_peepholes=True, state_is_tuple=True)

[docs]class Model(model.Model): """ An acoustic model with a LSTM/CTC architecture. """
[docs] def write_desc(self) -> None: """ Writes a description of the model to the exp_dir. """ path = os.path.join(self.exp_dir, "model_description.txt") with open(path, "w") as desc_f: for key, val in self.__dict__.items(): print("%s=%s" % (key, val), file=desc_f) import json json_path = os.path.join(self.exp_dir, "model_description.json") desc = { } #type: Dict[str, Any] # For use in decoding from a saved model desc["topology"] = { "batch_x_name" : self.batch_x.name, #type: ignore "batch_x_lens_name" : self.batch_x_lens.name, #type: ignore "dense_decoded_name" : self.dense_decoded.name #type: ignore } desc["model_type"] = str(self.__class__) for key, val in self.__dict__.items(): if isinstance(val, int): desc[str(key)] = val elif isinstance(val, tf.Tensor): desc[key] = { "type": "tf.Tensor", "name": val.name, #type: ignore "shape": str(val.shape), #type: ignore "dtype" : str(val.dtype), #type: ignore "value" : str(val), } elif isinstance(val, tf.SparseTensor): #type: ignore desc[key] = { "type": "tf.SparseTensor", "value": str(val), #type: ignore } else: desc[str(key)] = str(val) with open(json_path, "w") as json_desc_f: json.dump(desc, json_desc_f, skipkeys=True)
def __init__(self, exp_dir: Union[str, Path], corpus_reader, num_layers: int = 3, hidden_size: int=250, beam_width: int = 100, decoding_merge_repeated: bool = True) -> None: super().__init__(exp_dir, corpus_reader) if isinstance(exp_dir, Path): exp_dir = str(exp_dir) if not os.path.isdir(exp_dir): os.makedirs(exp_dir) # Increase vocab size by 2 since we need an extra for CTC blank labels # and another extra for dynamic padding with zeros. vocab_size = corpus_reader.corpus.vocab_size+2 # Reset the graph. tf.reset_default_graph() self.num_layers = num_layers self.hidden_size = hidden_size self.beam_width = beam_width self.vocab_size = vocab_size # Initialize placeholders for feeding data to model. self.batch_x = tf.placeholder( tf.float32, [None, None, corpus_reader.corpus.num_feats], name="batch_x") self.batch_x_lens = tf.placeholder(tf.int32, [None], name="batch_x_lens") self.batch_y = tf.sparse_placeholder(tf.int32) batch_size = tf.shape(self.batch_x)[0] layer_input = self.batch_x for i in range(num_layers): with tf.variable_scope("layer_%d" % i): #type: ignore cell_fw = lstm_cell(self.hidden_size) cell_bw = lstm_cell(self.hidden_size) (self.out_fw, self.out_bw), _ = tf.nn.bidirectional_dynamic_rnn( cell_fw, cell_bw, layer_input, self.batch_x_lens, dtype=tf.float32, time_major=False) # Self outputs now becomes [batch_num, time, hidden_size*2] self.outputs_concat = tf.concat((self.out_fw, self.out_bw), 2) #type: ignore # For feeding into the next layer layer_input = self.outputs_concat self.outputs = tf.reshape(self.outputs_concat, [-1, self.hidden_size*2]) # pylint: disable=no-member # Single-variable names are appropriate for weights an biases. # pylint: disable=invalid-name W = tf.Variable(tf.truncated_normal([hidden_size*2, vocab_size], stddev=np.sqrt(2.0 / (2*hidden_size)))) #type: ignore b = tf.Variable(tf.zeros([vocab_size])) #type: ignore self.logits = tf.matmul(self.outputs, W) + b #type: ignore self.logits = tf.reshape(self.logits, [batch_size, -1, vocab_size]) # pylint: disable=no-member # igormq made it time major, because of an optimization in ctc_loss. self.logits = tf.transpose(self.logits, (1, 0, 2), name="logits") #type: ignore # For lattice construction self.log_softmax = tf.nn.log_softmax(self.logits) self.decoded, self.log_prob = tf.nn.ctc_beam_search_decoder( self.logits, self.batch_x_lens, beam_width=beam_width, merge_repeated=decoding_merge_repeated) # If we want to do manual PER decoding. The decoded[0] beans the best # hypothesis (0th) in an n-best list. self.dense_decoded = tf.sparse_tensor_to_dense(self.decoded[0], name="hyp_dense_decoded") self.dense_ref = tf.sparse_tensor_to_dense(self.batch_y) self.loss = tf.nn.ctc_loss(self.batch_y, self.logits, self.batch_x_lens, preprocess_collapse_repeated=False, ctc_merge_repeated=True) self.cost = tf.reduce_mean(self.loss) self.optimizer = tf.train.AdamOptimizer().minimize(self.cost) #type: ignore self.ler = tf.reduce_mean(tf.edit_distance( tf.cast(self.decoded[0], tf.int32), self.batch_y)) #type: ignore self.write_desc()