Code source de mlstatpy.ml._neural_tree_node

import numpy
import numpy.random as rnd
from scipy.special import expit, softmax, kl_div as kl_fct
from ._neural_tree_api import _TrainingAPI


[docs] class NeuralTreeNode(_TrainingAPI): """ One node in a neural network. :param weights: weights :param bias: bias, if None, draws a random number :param activation: activation function :param nodeid: node id :param tag: unused but to add information on how this node was created """ @staticmethod def _relu(x): "Relu function." return numpy.maximum(x, 0) @staticmethod def _leakyrelu(x): "Leaky Relu function." return numpy.maximum(x, 0) + numpy.minimum(x, 0) * 0.01 @staticmethod def _drelu(x): "Derivative of the Relu function." res = numpy.ones(x.shape, dtype=x.dtype) res[x < 0] = 0.0 return res @staticmethod def _dleakyrelu(x): "Derivative of the Leaky Relu function." res = numpy.ones(x.shape, dtype=x.dtype) res[x < 0] = 0.01 return res @staticmethod def _dsigmoid(x): "Derivativ of the sigmoid function." y = expit(x) return y * (1 - y) @staticmethod def _softmax(x): "Derivative of the softmax function." if len(x.shape) == 2: return softmax(x, axis=1) return softmax(x) @staticmethod def _dsoftmax(x): "Derivative of the softmax function." soft = softmax(x) grad = -soft @ soft.T diag = numpy.diag(soft) return diag + grad
[docs] @staticmethod def get_activation_function(activation): """ Returns the activation function. It returns a function *y=f(x)*. """ if activation == "softmax": return NeuralTreeNode._softmax if activation == "softmax4": return lambda x: NeuralTreeNode._softmax(x * 4) if activation in {"logistic", "expit", "sigmoid"}: return expit if activation == "sigmoid4": return lambda x: expit(x * 4) if activation == "relu": return NeuralTreeNode._relu if activation == "leakyrelu": return NeuralTreeNode._leakyrelu if activation == "identity": return lambda x: x raise ValueError(f"Unknown activation function '{activation}'.")
[docs] @staticmethod def get_activation_gradient_function(activation): """ Returns the activation function. It returns a function *y=f'(x)*. About the sigmoid: .. math:: \\begin{array}{rcl} f(x) &=& \\frac{1}{1 + e^{-x}} \\\\ f'(x) &=& \\frac{e^{-x}}{(1 + e^{-x})^2} = f(x)(1-f(x)) \\end{array} """ if activation == "softmax": return NeuralTreeNode._dsoftmax if activation == "softmax4": return lambda x: NeuralTreeNode._dsoftmax(x) * 4 if activation in {"logistic", "expit", "sigmoid"}: return NeuralTreeNode._dsigmoid if activation == "sigmoid4": return lambda x: NeuralTreeNode._dsigmoid(x) * 4 if activation == "relu": return NeuralTreeNode._drelu if activation == "leakyrelu": return NeuralTreeNode._dleakyrelu if activation == "identity": return lambda x: numpy.ones(x.shape, dtype=x.dtype) raise ValueError(f"Unknown activation gradient function '{activation}'.")
[docs] @staticmethod def get_activation_loss_function(activation): """ Returns a default loss function based on the activation function. It returns two functions *g=loss(x,y)*. """ if activation in {"logistic", "expit", "sigmoid", "sigmoid4"}: # regression + regularization return lambda x, y: (x - y) ** 2 if activation in {"softmax", "softmax4"}: cst = numpy.finfo(numpy.float32).eps # classification def kl_fct2(x, y): return kl_fct(x + cst, y + cst) return kl_fct2 if activation in {"identity", "relu", "leakyrelu"}: # regression return lambda x, y: (x - y) ** 2 raise ValueError(f"Unknown activation function '{activation}'.")
[docs] @staticmethod def get_activation_dloss_function(activation): """ Returns the derivative of the default loss function based on the activation function. It returns a function *df(x,y)/dw, df(w)/dw* where *w* are the weights. """ if activation in {"logistic", "expit", "sigmoid", "sigmoid4"}: # regression + regularization def dregrdx(x, y): return (x - y) * 2 return dregrdx if activation in {"softmax", "softmax4"}: # classification cst = numpy.finfo(numpy.float32).eps def dclsdx(x, y): return numpy.log(x + cst) - numpy.log(y + cst) return dclsdx if activation in {"identity", "relu", "leakyrelu"}: # regression def dregdx(x, y): return (x - y) * 2 return dregdx raise ValueError(f"Unknown activation function '{activation}'.")
def __init__(self, weights, bias=None, activation="sigmoid", nodeid=-1, tag=None): self.tag = tag if isinstance(weights, int): if activation.startswith("softmax"): weights = rnd.randn(2, weights) else: weights = rnd.randn(weights) if isinstance(weights, list): weights = numpy.array(weights) if len(weights.shape) == 1: self.n_outputs = 1 if bias is None: bias = rnd.randn() self.coef = numpy.empty(len(weights) + 1) self.coef[1:] = weights self.coef[0] = bias elif len(weights.shape) == 2: self.n_outputs = weights.shape[0] if bias is None: bias = rnd.randn(self.n_outputs) shape = list(weights.shape) shape[1] += 1 self.coef = numpy.empty(shape) self.coef[:, 1:] = weights self.coef[:, 0] = bias else: raise RuntimeError(f"Unexpected weights shape: {weights.shape}") self.activation = activation self.nodeid = nodeid self._set_fcts() def _set_fcts(self): self.activation_ = NeuralTreeNode.get_activation_function(self.activation) self.gradient_ = NeuralTreeNode.get_activation_gradient_function( self.activation ) self.losss_ = NeuralTreeNode.get_activation_loss_function(self.activation) self.dlossds_ = NeuralTreeNode.get_activation_dloss_function(self.activation) @property def input_weights(self): "Returns the weights." if self.n_outputs == 1: return self.coef[1:] return self.coef[:, 1:] @property def bias(self): "Returns the weights." if self.n_outputs == 1: return self.coef[0] return self.coef[:, 0] def __getstate__(self): "usual" return { "coef": self.coef, "activation": self.activation, "nodeid": self.nodeid, "n_outputs": self.n_outputs, "tag": self.tag, } def __setstate__(self, state): "usual" self.coef = state["coef"] self.activation = state["activation"] self.nodeid = state["nodeid"] self.n_outputs = state["n_outputs"] self.tag = state["tag"] self._set_fcts() def __eq__(self, obj): if self.coef.shape != obj.coef.shape: return False if any(xy[0] != xy[1] for xy in zip(self.coef.ravel(), obj.coef.ravel())): return False if self.activation != obj.activation: return False return True def __repr__(self): "usual" if len(self.coef.shape) == 1: return "%s(weights=%r, bias=%r, activation=%r)" % ( self.__class__.__name__, self.coef[1:], self.coef[0], self.activation, ) return "%s(weights=%r, bias=%r, activation=%r)" % ( self.__class__.__name__, self.coef[:, 1:], self.coef[:, 0], self.activation, ) def _predict(self, X): "Computes inputs of the activation function." if self.n_outputs == 1: return X @ self.coef[1:] + self.coef[0] if len(X.shape) == 2: return X @ self.coef[:, 1:].T + self.coef[:, 0] res = X.reshape((1, -1)) @ self.coef[:, 1:].T + self.coef[:, 0] return res.ravel()
[docs] def predict(self, X): "Computes neuron outputs." y = self._predict(X) return self.activation_(y)
@property def ndim(self): "Returns the input dimension." if len(self.coef.shape) == 1: return self.coef.shape[0] - 1 return self.coef.shape[1] - 1 @property def ndim_out(self): "Returns the output dimension." if len(self.coef.shape) == 1: return 1 return self.coef.shape[0] @property def training_weights(self): "Returns the weights stored in the neuron." return self.coef.ravel()
[docs] def update_training_weights(self, X, add=True): """ Updates weights. :param X: training datasets :param add: addition or replace """ if add: self.coef += X.reshape(self.coef.shape) else: numpy.copyto(self.coef, X.reshape(self.coef.shape))
[docs] def fill_cache(self, X): """ Creates a cache with intermediate results. ``lX`` is the results before the activation function, ``aX`` is the results after the activation function, the prediction. """ cache = dict(lX=self._predict(X)) cache["aX"] = self.activation_(cache["lX"]) return cache
def _common_loss_dloss(self, X, y, cache=None): """ Common beginning to methods *loss*, *dlossds*, *dlossdw*. """ if cache is not None and "aX" in cache: act = cache["aX"] else: act = self.predict(X) return act
[docs] def loss(self, X, y, cache=None): """ Computes the loss. Returns a float. """ act = self._common_loss_dloss(X, y, cache=cache) if len(X.shape) == 1: return self.losss_(act, y) return self.losss_(act, y)
[docs] def dlossds(self, X, y, cache=None): """ Computes the loss derivative due to prediction error. """ act = self._common_loss_dloss(X, y, cache=cache) return self.dlossds_(act, y)
[docs] def gradient_backward(self, graddx, X, inputs=False, cache=None): """ Computes the gradients at point *X*. :param graddx: existing gradient against the inputs :param X: computes the gradient in X :param inputs: if False, derivative against the coefficients, otherwise against the inputs. :param cache: cache intermediate results :return: gradient """ if cache is None: cache = self.fill_cache(X) pred = cache["aX"] ga = self.gradient_(pred) if len(ga.shape) == 2: f = graddx @ ga else: f = graddx * ga if inputs: if len(self.coef.shape) == 1: rgrad = numpy.empty(X.shape) rgrad[:] = self.coef[1:] rgrad *= f else: rgrad = numpy.sum(self.coef[:, 1:] * f.reshape((-1, 1)), axis=0) return rgrad rgrad = numpy.empty(self.coef.shape) if len(self.coef.shape) == 1: rgrad[0] = 1 rgrad[1:] = X rgrad *= f else: rgrad[:, 0] = 1 rgrad[:, 1:] = X rgrad *= f.reshape((-1, 1)) return rgrad