import numpy as np
import theano
import theano.tensor as tt
from . import layer
from .layer import InputLayer, ElasticLayer, ColorLayer
from .layer import ConvLayer, PoolLayer, MeanLayer, DropOutLayer
from .layer import HiddenLayer, AuxConcatLayer
from .layer import SoftmaxLayer, CenteredOutLayer, SoftAuxLayer, HingeLayer
# theano.config.optimizer = 'fast_compile'
# theano.config.exception_verbosity = "high"
# ########################### Helper Functions #################################
from functools import reduce
from operator import mul
[docs]def get_layers_info(layers):
string = ""
for lyr in layers:
string += '\n{} : '.format(lyr[0])
for key in lyr[1]:
string += '\n\t{} : \t{}'.format(key, lyr[1][key])
return string
[docs]def get_wts_info(wts, detailed=False):
string, n_wts = "", 0
for l, ww in enumerate(wts):
string += "\nLayer {}:".format(l)
for w in ww:
n_ww = reduce(mul, w.shape)
n_wts += n_ww
string += '\n\t {} {} ❲{}❳'.format(w.shape, w.dtype, n_ww)
if detailed:
string += " ❲{:.2e}, {:.2e}, {:.2e}❳".format(
w.min(), w.mean(), w.max())
string += '\n\nTotal Number of Weights : {:,}'.format(n_wts)
return string
[docs]def get_training_params_info(training_params):
string = "Training Parameters:"
for key in sorted(training_params.keys()):
string += '\n\t{} : \t{}'.format(key, training_params[key])
return string
###############################################################################
# The Neural Network
###############################################################################
[docs]class NeuralNet():
def __init__(self, layers, training_params, allwts=None):
# Either we have a random seed or the WTS for each layer from a
# previously trained NeuralNet
if allwts is None:
self.rand_gen = np.random.RandomState(training_params['SEED'])
else:
self.rand_gen = None
self.tr_prms = training_params
self.layers = layers
self.allwts = allwts
self.tr_layers = []
self.te_layers = []
self.batch_sz = training_params['BATCH_SZ']
self.num_layers = 0
# Symbolic variables for the data
self.x = tt.tensor4('x')
self.test_x = tt.tensor4('test_x')
self.y = tt.ivector('y')
# Input Layer
input_layer_type = getattr(layer, layers[0][0])
assert input_layer_type in (InputLayer, ElasticLayer, ColorLayer), \
"First layer needs to be Input or Elastic or Color Layer"
self.tr_layers.append(input_layer_type(self.x, rand_gen=self.rand_gen,
**layers[0][1]))
self.te_layers.append(self.tr_layers[0].TestVersion(self.test_x))
self.num_layers += 1
# Rest of the layers
while self.num_layers < len(layers):
self.append_next_layer()
# Handle Auxiliary input
for tr_layer, te_layer in zip(self.tr_layers, self.te_layers):
if type(tr_layer) in (AuxConcatLayer, SoftAuxLayer):
assert not hasattr(self, 'aux_inpt_tr'), "Multiple Aux Inputs"
self.aux_inpt_tr = tr_layer.aux_inpt
self.aux_inpt_te = te_layer.aux_inpt
# Set Epoch and learning rate
if 'CUR_EPOCH' not in training_params:
training_params['CUR_EPOCH'] = 0
self.cur_learn_rate = theano.shared(np.cast[theano.config.floatX](0.0))
self.set_rate()
[docs] def append_next_layer(self):
layer_type, layer_args = self.layers[self.num_layers]
prev_tr_layer = self.tr_layers[self.num_layers - 1]
prev_te_layer = self.te_layers[self.num_layers - 1]
wts = self.allwts[self.num_layers] if self.allwts else None
tr_inpt = prev_tr_layer.output
te_inpt = prev_te_layer.output
curr_layer_type = getattr(layer, layer_type)
print(layer_type)
if curr_layer_type in (ElasticLayer, ColorLayer,
ConvLayer, PoolLayer, MeanLayer):
if type(prev_tr_layer) is DropOutLayer:
use_tr_layer = self.tr_layers[self.num_layers - 2]
else:
use_tr_layer = prev_tr_layer
num_prev_maps = use_tr_layer.num_maps
prev_out_sz = use_tr_layer.out_sz
if curr_layer_type in (ElasticLayer, ColorLayer):
if "num_maps" in layer_args:
del layer_args["num_maps"]
if "img_sz" in layer_args:
del layer_args["img_sz"]
curr_layer = curr_layer_type(tr_inpt,
num_maps=num_prev_maps,
img_sz=prev_out_sz,
rand_gen=self.rand_gen,
**layer_args)
elif curr_layer_type is ConvLayer:
curr_layer = ConvLayer(tr_inpt,
wts,
self.rand_gen,
self.batch_sz,
num_prev_maps,
prev_out_sz,
**layer_args)
elif curr_layer_type in (PoolLayer, MeanLayer):
curr_layer = curr_layer_type(tr_inpt,
num_maps=num_prev_maps,
in_sz=prev_out_sz,
**layer_args)
elif curr_layer_type is DropOutLayer:
curr_layer = DropOutLayer(tr_inpt,
self.rand_gen,
prev_tr_layer.n_out,
**layer_args)
elif curr_layer_type in (AuxConcatLayer, HiddenLayer,
SoftmaxLayer, SoftAuxLayer, HingeLayer):
te_inpt = te_inpt.flatten(2)
curr_layer = curr_layer_type(tr_inpt.flatten(2),
wts,
self.rand_gen,
prev_tr_layer.n_out,
**layer_args)
elif curr_layer_type is CenteredOutLayer:
"""
CenteredOutLayer
Can be LOGIT or RBF
Needs Wts for hidden layer
(can be generated randomly as nPrevLayerUnits x nFeatures)
Needs CENTERS as a matrix or as a tuple
(nClasses x nFeatures) to convert from Features to Class probs.
"""
if wts:
centers = wts[3]
wts = wts[:2]
else:
centers = None
te_inpt = te_inpt.flatten(2)
curr_layer = CenteredOutLayer(tr_inpt.flatten(2),
wts, centers,
self.rand_gen,
prev_tr_layer.n_out,
**layer_args)
else:
raise NotImplementedError("Unknown Layer Type" + layer_type)
self.tr_layers.append(curr_layer)
self.te_layers.append(curr_layer.TestVersion(te_inpt))
self.num_layers += 1
[docs] def get_trin_model(self, x_data, y_data, aux_data=None):
# cost, training params, gradients, updates to those params
print('Compiling training function...')
cost = self.tr_layers[-1].cost(self.y)
for lyr in self.tr_layers:
cost += lyr.get_wtcost()
updates = []
for lyr in self.tr_layers:
updates += lyr.get_updates(cost, self.cur_learn_rate)
indx = tt.lscalar('training batch index')
bth_sz = self.tr_prms['BATCH_SZ']
givens = {
self.x: x_data[indx * bth_sz:(indx + 1) * bth_sz],
self.y: y_data[indx * bth_sz:(indx + 1) * bth_sz], }
if hasattr(self, 'aux_inpt_tr'):
assert aux_data is not None, "Auxillary data not supplied"
givens[self.aux_inpt_tr] = aux_data[
indx * bth_sz:(indx + 1) * bth_sz]
return theano.function([indx],
[cost,
self.tr_layers[-1].features,
self.tr_layers[-1].logprob],
updates=updates,
givens=givens)
[docs] def get_test_model(self, x_data, y_data, aux_data=None, preds_feats=False):
print('Compiling testing function... ')
idx = tt.lscalar('test batch index')
bth_sz = self.tr_prms['BATCH_SZ']
givens = {
self.test_x: x_data[idx * bth_sz:(idx + 1) * bth_sz],
self.y: y_data[idx * bth_sz:(idx + 1) * bth_sz]}
if hasattr(self, 'aux_inpt_te'):
assert aux_data is not None, "Auxillary data not supplied"
givens[self.aux_inpt_te] = \
aux_data[idx * bth_sz:(idx + 1) * bth_sz]
outputs = self.te_layers[-1].sym_and_oth_err_rate(self.y)
if preds_feats:
outputs += self.te_layers[-1].features_and_predictions()
return theano.function([idx],
outputs,
givens=givens)
[docs] def takes_aux(self):
return hasattr(self, 'aux_inpt_te')
[docs] def get_data_test_model(self, go_nuts=False):
print('Compiling full test function...')
inputs = [self.test_x]
if self.takes_aux():
inputs += [self.aux_inpt_te]
outputs = list(self.te_layers[-1].features_and_predictions())
if go_nuts:
print("going nuts...")
for lyr in self.te_layers:
outputs.append(lyr.output)
return theano.function(inputs, outputs)
[docs] def get_init_params(self):
return {"layers": self.layers,
"training_params": self.tr_prms,
"allwts": [l.get_wts() for l in self.tr_layers]}
[docs] def set_rate(self):
self.cur_learn_rate.set_value(
self.tr_prms['INIT_LEARNING_RATE'] /
(1 + self.tr_prms['CUR_EPOCH'] /
self.tr_prms['EPOCHS_TO_HALF_RATE']))
[docs] def inc_epoch_set_rate(self):
self.tr_prms['CUR_EPOCH'] += 1
self.set_rate()
[docs] def get_epoch(self):
return self.tr_prms['CUR_EPOCH']
def __str__(self):
prmstr = '; '.join([', '.join([str(prm) for prm in lyr.params])
for lyr in self.tr_layers])
return \
'\nTrain Layers\n\t' + \
'\n\t'.join([str(l) for l in self.tr_layers]) + \
'\nTest Layers\n\t' + \
'\n\t'.join([str(l) for l in self.te_layers]) + \
'\nParams ' + prmstr
[docs] def get_layers_info(self):
return get_layers_info(self.layers)
[docs] def get_wts_info(self, detailed=False):
return get_wts_info((l.get_wts() for l in self.tr_layers), detailed)
[docs] def get_training_params_info(self):
return get_training_params_info(self.tr_prms)