from collections import defaultdict, Counter
import matplotlib.pyplot as plt
from keras.models import Model
import numpy as np
import copy
# from . import error_handler, standards
# from . import masks as msk
# from . import perturbation as ptn
# For Sphinx docs only:
import error_handler, standards
import masks as msk
import perturbation as ptn
[docs]class pudu:
[docs] def __init__(self, x, y, pf, model=None):
"""
`pudu` constructor.
:type x: list
:param x: Features (input) to be analyzed. Must have same format as
train and test descriptors with dientionality as
(batch, rows, columns, depth).
:type y: int, float
:param y: Targets (output) for `x`. It is a sclaar and not categorical
for easier inclusion of regression problems.
:type pf: function
:param pf: probability, or prediction, function of the algorithm. The input
must be `x` and the ouput a list of probabilities for each class (in
case of classification algorithm). If the default function does not
work this way (i.e.: needs a batch as input), it must be wrapped to
do so. Please refer to the documentation's examples to see specific
cases.
:type model: Keras model
:param model: Optional Keras model, only for `layer_activations` and
`unit_activation`.
"""
# Store the main parameters
self.x = x
self.y = y
self.pf = pf
# Optional parameters
self.model = model
# Main results
self.imp = None
self.spe = None
self.syn = None
# Activation results
self.lac = None # layer activation
self.uac = None # unit activations
self.fac = None # feature activations
self.icc = None
# Normalized results are calculated automatically so if the user needs them
self.imp_rel = None
self.spe_rel = None
self.syn_rel = None
self.lac_rel = None
self.uac_rel = None
# Some error handling
error_handler.for_constructor(model, x, y)
[docs] def importance(self, window=1, scope=None, evolution=None, padding='center', bias=0,
absolute=False, perturbation=ptn.Bidirectional(), mask=msk.All()):
"""
Calculates the importance vector for the input feature.
:type window: int
:param window: feature width to be changeg each time.
:type scope: tupple(int, int)
:param scope: Starting and ending point of the analysis for each feature.
If `None`, the all the vector is analysed.
:type evolution: int
:param evolution: feature width to be changeg each time.
:type padding: string
:param padding: Type of padding. If the legnth of `x` is not divisible
by `window` then padding is applyed. If `center`, then equal padding
to each side is applyed. If `right`, then paading to the right is
added and `window`starts from `0`. If `left`, padding to the left
is applyied and `window` ends at length `x`. If perfet `center` is
not possible, then ipadding left is added `1`.
:type absolute: bool
:param absolute: Weather or not the result is in absolute value or not. Default is `False`.
"""
error_handler.for_params(window, scope, padding, absolute, 0, None, None)
# Initial values
sh = np.array(self.x).shape
self.imp = np.zeros((sh[0], sh[1], sh[2], sh[3]))
scope, window, padd, evolution, total = standards.params_std(self.y, sh, scope, window, padding, evolution)
p0 = self.pf(self.x)
section = 1
row = padd[0][0] + scope[0][0]
while row <= scope[0][1] - padd[0][1] - window[0]:
col = padd[1][0] + scope[1][0]
while col <= scope[1][1] - padd[1][1] - window[1]:
mask_val = mask.apply(section, total)
if mask_val == 1:
x_copy = copy.deepcopy(self.x)
row_idx, col_idx = np.meshgrid(range(window[0]), range(window[1]), indexing='ij')
row_idx, col_idx = row_idx + row, col_idx + col
temp, temp2 = perturbation.apply(x_copy, row, col, window, bias)
if temp2 is None:
val = self.pf(temp) - p0
else:
val = (self.pf(temp2) + self.pf(temp) - 2*p0) / 2
if absolute:
val = abs(val)
if np.shape(val):
self.imp[0, row:row+window[0], col:col+window[1], 0] = val[evolution]
else:
self.imp[0, row:row+window[0], col:col+window[1], 0] = val
del x_copy, temp, temp2
else:
pass
section += 1
col += window[1]
row += window[0]
max_val, min_val = self.imp.max(), self.imp.min()
self.imp_rel = (self.imp - min_val) / (max_val - min_val)
[docs] def speed(self, window=1, scope=None, evolution=None, padding='center',
bias=0, absolute=False, mask=msk.All(),
perturbation=[ptn.Bidirectional(delta=.1), ptn.Bidirectional(delta=.2), ptn.Bidirectional(delta=.3)]):
"""
Calculates the gradient of the importance. In other words, the slope
of the curve formed by the importance at different values. This
indicates how fast a feature can change the result.
:type window: int
:param window: feature width to be changeg each time.
:type scope: tupple(int, int)
:param scope: Starting and ending point of the analysis for each feature.
If `None`, the all the vector is analysed.
:type evolution: int
:param evolution: feature width to be changeg each time.
:type padding: string
:param padding: Type of padding. If the legnth of `x` is not divisible
by `window` then padding is applyed. If `center`, then equal padding
to each side is applyed. If `right`, then paading to the right is
added and `window`starts from `0`. If `left`, padding to the left
is applyied and `window` ends at length `x`. If perfet `center` is
not possible, then ipadding left is added `1`.
:type absolute: bool
:param absolute: Weather or not the result is in absolute value or not. Default is `False`.
"""
error_handler.for_params(window, scope, padding, absolute, 0, None, None)
# Initial values
sh = np.array(self.x).shape
self.spe = np.zeros((sh[0], sh[1], sh[2], sh[3]))
scope, window, padd, evolution, total = standards.params_std(self.y, sh, scope, window, padding, evolution)
p0 = self.pf(self.x)
section = 1
row = padd[0][0] + scope[0][0]
while row <= scope[0][1] - padd[0][1] - window[0]:
col = padd[1][0] + scope[1][0]
while col <= scope[1][1] - padd[1][1] - window[1]:
x_copy = copy.deepcopy(self.x)
mask_val = mask.apply(section, total)
if mask_val == 1:
p = []
row_idx, col_idx = np.meshgrid(range(window[0]), range(window[1]), indexing='ij')
row_idx, col_idx = row_idx + row, col_idx + col
for j in perturbation:
temp = self.x.copy()
temp, temp2 = j.apply(x_copy, row, col, window, bias)
if temp2 is False:
val = self.pf(temp) - p0
else:
val = (self.pf(temp2) + self.pf(temp)) / 2
if absolute:
val = abs(val)
if np.shape(val):
val = val[evolution]
p.append(val)
var_x = [i for i in range(len(p))]
var_y = [i for i in p]
self.spe[0, row:row+window[0], col:col+window[1], 0] = np.polyfit(var_x, var_y, 1)[0]
else:
pass
section += 1
col += window[1]
row += window[0]
max_val, min_val = self.spe.max(), self.spe.min()
self.spe_rel = (self.spe - min_val) / (max_val - min_val)
[docs] def synergy(self, window=1, inspect=0, scope=None, absolute=False, bias=0,
evolution=None, padding='center', perturbation=ptn.Bidirectional(), mask=msk.All()):
"""
Calculates the synergy between features.
:type delta: float
:param delta: maximum variation to apply to each feature.
:type window: int
:param window: feature width to be changeg each time.
:type scope: tupple(int, int)
:param scope: Starting and ending point of the analysis for each feature.
If `None`, the all the vector is analysed.
:type evolution: int
:param evolution: feature width to be changeg each time.
:type padding: string
:param padding: Type of padding. If the legnth of `x` is not divisible
by `window` then padding is applyed. If `center`, then equal padding
to each side is applyed. If `right`, then paading to the right is
added and `window`starts from `0`. If `left`, padding to the left
is applyied and `window` ends at length `x`. If perfet `center` is
not possible, then ipadding left is added `1`.
:type absolute: bool
:param absolute: Weather or not the result is in absolute value or not. Default is `False`.
"""
error_handler.for_params(window, scope, padding, absolute, inspect, None, None)
# Initial values
sh = np.array(self.x).shape
self.syn = np.zeros((sh[0], sh[1], sh[2], sh[3]))
scope, window, padd, evolution, total = standards.params_std(self.y, sh, scope, window, padding, evolution)
# Position to range of the desired area to calculate synergy from
if len(np.array(inspect).shape) == 0:
if sh[1] == 1:
inspect = (0, int(window[1]*inspect + padd[1][0] + scope[1][0]))
else:
inspect = (window[0]*inspect + padd[0][0] + scope[0][0],
window[1]*inspect + padd[1][0] + scope[1][0])
x_copy = copy.deepcopy(self.x)
base, base2 = perturbation.apply(x_copy, inspect[0], inspect[1], window, bias)
pb0 = self.pf(base)
pb2 = self.pf(base2)
section = 1
row = padd[0][0] + scope[0][0]
while row <= scope[0][1] - padd[0][1] - window[0]:
col = padd[1][0] + scope[1][0]
while col <= scope[1][1] - padd[1][1] - window[1]:
x_copy = copy.deepcopy(self.x)
mask_val = mask.apply(section, total)
if mask_val == 1:
if inspect[0] == row and inspect[1] == col:
pass
else:
row_idx, col_idx = np.meshgrid(range(window[0]), range(window[1]), indexing='ij')
row_idx, col_idx = row_idx + row, col_idx + col
temp, temp2 = perturbation.apply(x_copy, row, col, window, bias)
if temp2 is False:
val = self.pf(temp) - pb0
else:
val = (self.pf(temp2) + self.pf(temp) - pb0 - pb2) / 2
if absolute:
val = abs(val)
if np.shape(val):
self.syn[0, row:row+window[0], col:col+window[1], 0] = val[evolution]
else:
self.syn[0, row:row+window[0], col:col+window[1], 0] = val
else:
pass
section += 1
col += window[1]
row += window[0]
max_val, min_val = self.syn.max(), self.syn.min()
self.syn_rel = (self.syn - min_val) / (max_val - min_val)
[docs] def reactivations(self, layer=0, slope=0, p=0.005, window=1, scope=None, bias=0,
padding='center', threshold=0, perturbation=ptn.Bidirectional(), mask=msk.All()):
"""
Counts the unit activations in the selected `layer` of a `Keras` model according
to change in the feature.
:type layer: int
:param layer: Position number within the keras model to be analyzed. Use `model.summary()`
to see exactly the position of the desired layer.
:type slope: float
:param slope: Default is 0 (`relu`).
:type window: int
:param window: feature width to be changeg each time.
:type scope: tupple(int, int)
:param scope: Starting and ending point of the analysis for each feature.
If `None`, the all the vector is analysed.
:type padding: string
:param padding: Type of padding. If the legnth of `x` is not divisible
by `window` then padding is applyed. If `center`, then equal padding
to each side is applyed. If `right`, then paading to the right is
added and `window`starts from `0`. If `left`, padding to the left
is applyied and `window` ends at length `x`. If perfet `center` is
not possible, then ipadding left is added `1`.
"""
error_handler.for_params(window, scope, padding, False, 0, layer, p)
# Initial values
sh = np.array(self.x).shape
d_temp = np.zeros((sh[0], sh[1], sh[2], sh[3]))
o_b, o_h, o_w, o_d = sh[0], sh[1], sh[2], sh[3]
scope, window, padd, evolution, total = standards.params_std(self.y, sh, scope, window, padding, None)
# Keras
layer_outputs = [layer.output for layer in self.model.layers]
activation_model = Model(inputs=self.model.input, outputs=layer_outputs)
x = np.squeeze(self.x, axis=0) if self.x.shape[1] == 1 else self.x
activations = activation_model.predict(x, verbose=0)
p0 = activations[layer] # raw activation values for the image
# index_array = [[] for _ in range(len(p0.flatten()))] # to store unit idx and coordinates
act_count = []
act_idx_count = 1
section = 1
row = padd[0][0] + scope[0][0]
while row <= scope[0][1] - padd[0][1] - window[0]:
col = padd[1][0] + scope[1][0]
while col <= scope[1][1] - padd[1][1] - window[1]:
x_copy = copy.deepcopy(self.x)
mask_val = mask.apply(section, total)
if mask_val == 1:
row_idx, col_idx = np.meshgrid(range(window[0]), range(window[1]), indexing='ij')
row_idx, col_idx = row_idx + row, col_idx + col
temp, temp2 = perturbation.apply(x_copy, row, col, window, bias)
if temp2 is None:
temp = np.squeeze(temp, axis=0) if temp.shape[1] == 1 else temp
activations = activation_model.predict(temp, verbose=0)
activations = activations[layer]
activations = activations-p0
else:
temp = np.squeeze(temp, axis=0) if temp.shape[1] == 1 else temp
temp2 = np.squeeze(temp2, axis=0) if temp2.shape[1] == 1 else temp2
activations = activation_model.predict(temp, verbose=0)
p1 = activations[layer]
activations = activation_model.predict(temp2, verbose=0)
p2 = activations[layer]
activations = (p1 + p2 - 2*p0) / 2
act_count.append(activations.flatten())
d_temp[0, row:row+window[0], col:col+window[1], 0] = act_idx_count
act_idx_count += 1
else:
pass
section += 1
col += window[1]
row += window[0]
"""
At this point 'act_count' has dims. (n, k), where n is the number
of times the kernel fits in the input and 'k' is the number of of units.
So now we calculate the quantiles and store the values above the quantile
and their indices (to what feature change they belong, basically).
Then we apply leakyrelu (relu as default)
Then we count the number of activations
"""
idx_w_vals = []
act_count = np.array(act_count)
act_count = np.transpose(act_count)
act_count = np.where(act_count > threshold, act_count, slope * act_count) # new
act_count = np.array(act_count)
quantiles = [np.quantile(i, p) for i in act_count]
quantiles = np.array(quantiles)
mask = (act_count != 0) & (act_count >= quantiles[:, np.newaxis])
i_vals, j_vals = np.nonzero(mask)
a_vals = act_count[i_vals, j_vals]
idx_w_vals = np.column_stack([i_vals, j_vals, a_vals]).tolist()
# 'acts' is the activation value, but if it is in this
# varibales then it is considered activated anyways
# here we have all the info we need:
units, feats, acts = np.transpose(idx_w_vals)
feats += 1
feats = np.array(feats).astype(int)
counts_feats = np.bincount(feats)
if len(counts_feats) < act_idx_count:
counts_feats = np.pad(counts_feats, (0, act_idx_count - len(counts_feats)), 'constant')
units = np.array(units).astype(int)
counts_units = np.bincount(units)
if len(counts_units) < len(quantiles):
counts_units = np.pad(counts_units, (0, len(quantiles) - len(counts_units)), 'constant')
# contar partes del mapping que no son zero (no padding)
feats = np.array(feats)
unique, counts = np.unique(feats, return_counts=True)
feats_counts = dict(zip(unique, counts))
feats_counts = defaultdict(int, feats_counts)
vfunc = np.vectorize(feats_counts.__getitem__)
mask = d_temp[0, :, :, 0] != 0
d_temp[0, :, :, 0][mask] = vfunc(d_temp[0, :, :, 0][mask])
self.fac = counts_feats
self.lac = d_temp # this shows the activation mapping, or activations per feature
max_val, min_val = d_temp.max(), d_temp.min()
self.lac_rel = (d_temp - min_val) / (1 if (max_val - min_val) == 0 else (max_val - min_val))
self.uac = counts_units # new
max_val, min_val = counts_units.max(), counts_units.min()
self.uac_rel = (counts_units - min_val) / (1 if (max_val - min_val) == 0 else (max_val - min_val))
# un_fe = defaultdict(list)
# for u, f in zip(units, feats):
# un_fe[u].append(f)
# un_fe = [un_fe[i] for i in range(max(units) + 1)]
# result = []
# for i, sublist in enumerate(un_fe):
# if sublist: # if not empty
# counts = Counter(sublist) # reps. of each number
# most_common_num, num_repetitions = counts.most_common(1)[0] # most reps.
# result.append([i, most_common_num, num_repetitions])
return feats, units
[docs] def relatable(self, layer=0, slope=0, p=0.005, window=1, scope=None, bias=0,
padding='center', threshold=0, perturbation=ptn.Bidirectional(), mask=msk.All()):
"""
This function generates an activation report for each set of coordinates in `x` and `y`.
:type layer: int
:param layer: Specifies the layer of the model for which the activation report is to be generated.
Default is 0.
:type slope: int or float
:param slope:
:type p: float
:param p: Specifies the p-value threshold for significance testing of activations. Default is 0.005.
:type window: int
:param window: Specifies the size of the window for the activation function. Default is 1.
:type scope: str
:param scope: Specifies the scope of the activations. Possible values are 'global' and 'local'.
Default is None.
:type padding: str
:param padding: Specifies the padding strategy for the activations. Default is 'center'.
:type kwargs: dict
:param kwargs: Additional keyword arguments passed to the activation function.
"""
# hay que cambiar self.x e .y para iterar. Se guarda y luego
# se cambia por cada integrante. Al final se vuelve a la forma
# original si se desea usar denuevo
s_x, s_y = self.x, self.y
master = []
for x, y in zip(s_x, s_y):
x = np.expand_dims(x, 0)
self.x, self.y = x, y
feats, units = self.reactivations(layer, slope, p, window, scope, bias, padding,
threshold, perturbation, mask)
master.extend((i, j) for i, j in zip(feats, units))
counts = Counter(master)
result = [[j, count, i] for (i, j), count in counts.items()]
# unit 'j' activates 'count' times with feature 'i'
result = np.transpose(result)
self.x, self.y = s_x, s_y
self.icc = result
def preview(self, window=1, scope=None, padding='center', axis=None, show_data=True,
title='Preview', xlabel='Feature', ylabel='Intensity', xticks=None,
yticks=[], cmap='Greens', font_size=15, figsize=(14, 4), bold=False,
mask = msk.All()):
"""
Plots an approximate preview of the sections, areas, or mask to be analyzed over the data
before executing. It is particularly useful to check if the parameters are
correct, especially if the user expects long runtimes.
:type feature: list
:param feature: feature analyzed or any that the user whant to plot against.
Normally you want it to be `self.x`.
:type window: int
:param window: feature width to be analyzed.
:type scope: tuple
:param scope: feature width to be analyzed.
:type padding: str
:param padding: Type of padding. If the legnth of `x` is not divisible
by `window` then padding is applyed. If `center`, then equal padding
to each side is applyed. If `right`, then paading to the right is
added and `window`starts from `0`. If `left`, padding to the left
is applyied and `window` ends at length `x`. If perfet `center` is
not possible, then ipadding left is added `1`.
:type mask: list
:param mask: Mask to be applied to the data. If `None`, no mask will be
applied.
:type axis: list
:param axis: X-axis for the plot. If `None`, it will show the pixel count.
:type show_data: bool
:param show_data: If `True`, it will plot the data. If `False`, it will
plot the mask.
:type title: str
:param title: Title of the plot.
:type xlabel: str
:param xlabel: X-axis label.
:type ylabel: str
:param ylabel: Y-axis label.
:type xticks: list
:param xticks: X-axis ticks.
:type yticks: list
:param yticks: Y-axis ticks.
:type cmap: str
:param cmap: Colormap to be used.
:type font_size: int
:param font_size: Font size for the plot.
:type figsize: tuple
:param figsize: Figure size for the plot.
:type bold: Boolean
:param bold: To make the limit lines bolder. Default is 'False'.
"""
error_handler.for_params(window, scope, padding, False, 0, None, None)
# Initial values
image = [] # this will be the preview image
sh = np.array(self.x).shape
d_temp = np.zeros((sh[0], sh[1], sh[2], sh[3]))
countour = np.zeros((sh[0], sh[1], sh[2], sh[3]))
scope, window, padd, evolution, total = standards.params_std(self.y, sh, scope, window, padding, None)
section = 1
row = padd[0][0] + scope[0][0]
while row <= scope[0][1] - padd[0][1] - window[0]:
col = padd[1][0] + scope[1][0]
while col <= scope[1][1] - padd[1][1] - window[1]:
mask_val = mask.apply(section, total)
if mask_val == 1:
countour[0, row, col, 0] = 1
d_temp[0, row:row+window[0], col:col+window[1], 0] = 1
if sh[1] > 1:
for i in range(sh[1]):
countour[0, i, col, 0] = 1
for i in range(sh[2]):
countour[0, row, i, 0] = 1
else:
pass
section += 1
col += window[1]
row += window[0]
dims = np.array(d_temp).shape
image = d_temp[0,:,:,0]
countour = countour[0,:,:,0]
feature = self.x
feature = np.array(feature)[0,:,:,0]
if dims[1] > 1:
rows, cols = dims[1], dims[2]
ext = [0, cols, 0, rows]
else:
rows, cols = 1, len(feature)
if axis is None:
axis = [i for i in range(len(feature[0]))]
ext = [0, len(feature[0]), min(feature[0]), max(feature[0])]
else:
ext = [min(axis), max(axis), min(feature[0]), max(feature[0])]
plt.rc('font', size=font_size)
plt.figure(figsize=figsize)
if dims[1] > 1 and show_data:
plt.imshow(feature, cmap='binary', aspect="auto",
interpolation='nearest', extent=ext, alpha=1)
elif dims[1] == 1 and show_data:
plt.plot(axis, feature[0], 'k')
# just to make bolder lines
if bold == True:
for i in range(len(countour)):
for j in range(len(countour[i])):
try:
if countour[i][j] == 1 and countour[i][j+1] == 0 and countour[i][j-1] == 0:
countour[i][j-1] = countour[i][j+1] = 1
if dims[1] > 1 and countour[i][j] == 1 and countour[i+1][j] == 0 and countour[i-1][j] == 0:
countour[i-1][j] = countour[i+1][j] = 1
except IndexError:
print('Index out of bounds. Proceed normally.')
plt.imshow(countour, cmap='binary', aspect="auto",
interpolation='nearest', extent=ext, alpha=1)
plt.imshow(image, cmap=cmap, aspect="auto",
interpolation='nearest', extent=ext, alpha=0.5)
plt.title(title)
plt.xlabel(xlabel)
plt.ylabel(ylabel)
plt.yticks(yticks)
if xticks:
plt.xticks(axis, xticks, rotation='vertical')
plt.show()