Source code for persephone.corpus_reader

""" An CorpusReader class that interfaces with preprocessed corpora."""
from __future__ import generator_stop

import logging
import logging.config
from pathlib import Path
import pprint
import random
from typing import List, Sequence, Iterator

import numpy as np

from . import utils
from .config import ENCODING
from .exceptions import PersephoneException

logger = logging.getLogger(__name__) # type: ignore

[docs]class CorpusReader: """ Interfaces to the preprocessed corpora to read in train, valid, and test set features and transcriptions. This interface is common to all corpora. It is the responsibility of <corpora-name>.py to preprocess the data into a valid structure of <corpus-name>/[mam-train|mam-valid<seed>|mam-test]. """ rand = True
[docs] def __init__(self, corpus, num_train=None, batch_size=None, max_samples=None, rand_seed=0): """ Construct a new `CorpusReader` instance. corpus: The Corpus object that interfaces with a given corpus. num_train: The number of training instances from the corpus used. batch_size: The size of the batches to yield. If None, then it is num_train / 32.0. max_samples: The maximum length of utterances measured in samples. Longer utterances are filtered out. rand_seed: The seed for the random number generator. If None, then no randomization is used. """ self.corpus = corpus if max_samples: logger.critical("max_samples not yet implemented in CorpusReader") raise NotImplementedError("Not yet implemented.") # TODO This logic should be changed. The number of training instances # doesn't need to be divisible by batch size. The remainder can just go # in its own, smaller batch. if not num_train: if not batch_size: batch_size = 64 num_train = len(corpus.get_train_fns()[0]) num_batches = int(num_train / batch_size) num_train = int(num_batches * batch_size) self.num_train = num_train logger.info("Number of training utterances: {}".format(num_train)) logger.info("Batch size: {}".format(batch_size)) logger.info("Batches per epoch: {}".format(int(num_train/batch_size))) print("Number of training utterances: {}".format(num_train)) print("Batch size: {}".format(batch_size)) print("Batches per epoch: {}".format(int(num_train/batch_size))) if batch_size: self.batch_size = batch_size if num_train % batch_size != 0: logger.error("Number of training examples {} not divisible" " by batch size {}.".format(num_train, batch_size)) raise PersephoneException("Number of training examples {} not divisible" " by batch size {}.".format(num_train, batch_size)) else: # Dynamically change batch size based on number of training # examples. self.batch_size = int(num_train / 32.0) if self.batch_size > 64: # I was getting OOM errors when training with 4096 sents, as # the batch size jumped to 128 self.batch_size = 64 # For now we hope that training numbers are powers of two or # something... If not, crash before anything else happens. if num_train % self.batch_size != 0: logger.error("Number of training examples {} not divisible" " by batch size {}.".format(num_train, self.batch_size)) raise PersephoneException("Number of training examples {} not divisible" " by batch size {}.".format(num_train, batch_size)) random.seed(rand_seed) # Make a copy of the training prefixes, randomize their order, and take # a subset. Doing random selection of a subset of training now ensures # the selection of of training sentences is invariant between calls to # train_batch_gen() self.train_fns = list(zip(*corpus.get_train_fns())) if self.rand: random.shuffle(self.train_fns) self.train_fns = self.train_fns[:self.num_train]
def load_batch(self, fn_batch): """ Loads a batch with the given prefixes. The prefixes is the full path to the training example minus the extension. """ # TODO Assumes targets are available, which is how its distinct from # utils.load_batch_x(). These functions need to change names to be # clearer. inverse = list(zip(*fn_batch)) feat_fn_batch = inverse[0] target_fn_batch = inverse[1] batch_inputs, batch_inputs_lens = utils.load_batch_x(feat_fn_batch, flatten=False) batch_targets_list = [] for targets_path in target_fn_batch: with open(targets_path, encoding=ENCODING) as targets_f: target_indices = self.corpus.labels_to_indices(targets_f.readline().split()) batch_targets_list.append(target_indices) batch_targets = utils.target_list_to_sparse_tensor(batch_targets_list) return batch_inputs, batch_inputs_lens, batch_targets def make_batches(self, utterance_fns: Sequence[Path]) -> List[Sequence[Path]]: """ Group utterances into batches for decoding. """ return utils.make_batches(utterance_fns, self.batch_size) def train_batch_gen(self) -> Iterator: """ Returns a generator that outputs batches in the training data.""" if len(self.train_fns) == 0: raise PersephoneException("""No training data available; cannot generate training batches.""") # Create batches of batch_size and shuffle them. fn_batches = self.make_batches(self.train_fns) if self.rand: random.shuffle(fn_batches) for fn_batch in fn_batches: logger.debug("Batch of training filenames: %s", pprint.pformat(fn_batch)) yield self.load_batch(fn_batch) else: # Python 3.7 compatible way to mark generator as exhausted return def valid_batch(self): """ Returns a single batch with all the validation cases.""" valid_fns = list(zip(*self.corpus.get_valid_fns())) return self.load_batch(valid_fns) def test_batch(self): """ Returns a single batch with all the test cases.""" test_fns = list(zip(*self.corpus.get_test_fns())) return self.load_batch(test_fns) def untranscribed_batch_gen(self): """ A batch generator for all the untranscribed data. """ feat_fns = self.corpus.get_untranscribed_fns() fn_batches = self.make_batches(feat_fns) for fn_batch in fn_batches: batch_inputs, batch_inputs_lens = utils.load_batch_x(fn_batch, flatten=False) yield batch_inputs, batch_inputs_lens, fn_batch def human_readable_hyp_ref(self, dense_decoded, dense_y): """ Returns a human readable version of the hypothesis for manual inspection, along with the reference. """ hyps = [] refs = [] for i in range(len(dense_decoded)): ref = [phn_i for phn_i in dense_y[i] if phn_i != 0] hyp = [phn_i for phn_i in dense_decoded[i] if phn_i != 0] ref = self.corpus.indices_to_labels(ref) hyp = self.corpus.indices_to_labels(hyp) refs.append(ref) hyps.append(hyp) return hyps, refs def human_readable(self, dense_repr: Sequence[Sequence[int]]) -> List[List[str]]: """ Returns a human readable version of a dense representation of either or reference to facilitate simple manual inspection. """ transcripts = [] for dense_r in dense_repr: non_empty_phonemes = [phn_i for phn_i in dense_r if phn_i != 0] transcript = self.corpus.indices_to_labels(non_empty_phonemes) transcripts.append(transcript) return transcripts def __repr__(self): return ("%s(" % self.__class__.__name__ + "num_train=%s,\n" % repr(self.num_train) + "\tbatch_size=%s,\n" % repr(self.batch_size) + "\tcorpus=\n%s)" % repr(self.corpus)) def calc_time(self) -> None: """ Prints statistics about the the total duration of recordings in the corpus. """ def get_number_of_frames(feat_fns): """ fns: A list of numpy files which contain a number of feature frames. """ total = 0 for feat_fn in feat_fns: num_frames = len(np.load(feat_fn)) total += num_frames return total def numframes_to_minutes(num_frames): # TODO Assumes 10ms strides for the frames. This should generalize to # different frame stride widths, as should feature preparation. minutes = ((num_frames*10)/1000)/60 return minutes total_frames = 0 train_fns = [train_fn[0] for train_fn in self.train_fns] num_train_frames = get_number_of_frames(train_fns) total_frames += num_train_frames num_valid_frames = get_number_of_frames(self.corpus.get_valid_fns()[0]) total_frames += num_valid_frames num_test_frames = get_number_of_frames(self.corpus.get_test_fns()[0]) total_frames += num_test_frames print("Train duration: %0.3f" % numframes_to_minutes(num_train_frames)) print("Validation duration: %0.3f" % numframes_to_minutes(num_valid_frames)) print("Test duration: %0.3f" % numframes_to_minutes(num_test_frames)) print("Total duration: %0.3f" % numframes_to_minutes(total_frames))