Source code for scm.glompo.analysis.hsic

import os
import sys

import psutil
import tempfile as tf
import warnings
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union

import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm

from .kernels import BaseKernel, ConjunctiveGaussianKernel, GaussianKernel

__all__ = ("HSIC", "HSICResult")


[docs]class HSICResult: """Result object of a single HSIC calculation produced by :meth:`HSIC.compute`. Can only be created by :meth:`.HSIC.compute` or :meth:`load`. """ @property def hsic(self) -> np.ndarray: """:math:`d` vector of raw HSIC calculation results. If :attr:`n_bootstraps` is larger than one, this represents the mean over the bootstraps. :See Also: :attr:`hsic_std` :attr:`sensitivities` """ return self._hsic.mean(0) @property def hsic_std(self) -> Optional[np.ndarray]: """:math:`d` vector of the standard deviation of raw HSIC calculation results across all the bootstraps. :See Also: :attr:`hsic` :attr:`sensitivities_std` """ return self._hsic.std(0) @property def sensitivities(self) -> np.ndarray: r""":math:`d` vector of normalised HSIC calculation results. These are the main sensitivity results a user should be concerned with. If :attr:`n_bootstraps` is larger than one. This represents the mean over the bootstraps. Sensitivities are defined such that: .. math:: \sum S_d = 1 .. math :: 0 \leq S_d \leq 1 :See Also: :attr:`hsic` :attr:`sensitivities_std` """ return self._s_mean @property def sensitivities_std(self) -> np.ndarray: """:math:`d` vector of the standard deviation of normalised HSIC calculation results across all the bootstraps. :See Also: :attr:`hsic_std` :attr:`sensitivities` """ return self._s.std(0) @property def n_bootstraps(self) -> int: """Number of times the HSIC calculation was performed with different sub-samples of the data.""" return self._n_boot @property def n_samples(self) -> int: """Number of items in the sub-samples of the data used in each bootstrap.""" return self._n_sample @property def n_factors(self) -> int: """Number of factors analyzed.""" return self._d @property def sampling_with_replacement(self) -> bool: """Whether sampling was done with or without replacement from the available data. False, if all available data was used and automatically True if more samples were used than the number available in the data set. """ return self._replace @property def inputs_kernel(self) -> str: """Name of the kernel applied to the input-space data.""" return self._in_kernel @property def inputs_kernel_parameters(self) -> Dict[str, Any]: """Parameters for the input kernel.""" return self._in_kernel_params @property def outputs_kernel(self) -> str: """Name of the kernel applied to the output-space data.""" return self._out_kernel @property def outputs_kernel_parameters(self) -> Dict[str, Any]: """Parameters for the output kernel.""" return self._out_kernel_params @property def order_factors(self) -> np.ndarray: """Returns factor indices in descending order of their influence on the outputs. The *positions* in the array are the rankings, the *contents* of the array are the factor indices. This is the inverse of :meth:`ranking`. :Returns: numpy.ndarray :math:`d` vector of order factors. :See Also: :attr:`ranking` """ return self._s_mean.argsort()[-1::-1] @property def ranking(self) -> np.ndarray: """Returns the ranking of each factor being analyzed. The *positions* in the array are the factor indices, the *contents* of the array are rankings such that 1 is the most influential factor and :math:`g+1` is the least influential. This is the inverse of :meth:`order_factors`. :Returns: numpy.ndarray :math:`d` vector of rankings. :See Also: :attr:`order_factors` """ return self.order_factors.argsort(0) + 1
[docs] @classmethod def load(cls, path: Union[Path, str]) -> Union["HSICResult", "HSICResultWithReweight"]: """Load a calculation result from file. :Parameters: path Path to saved result file :See Also: :meth:`save` """ path = str(path) data = np.load(path, allow_pickle=True) if "_has_reweight" in data and data["_has_reweight"]: obj = object.__new__(HSICResultWithReweight) else: obj = object.__new__(HSICResult) for k, v in data.items(): setattr(obj, k, v) # Remove ndarray nesting for some items # todo consider using pickle instead of numpy to persist this object so this unnesting is not needed obj._in_kernel = obj._in_kernel.item() obj._out_kernel = obj._out_kernel.item() if obj._out_kernel == "SigmoidKernel": # Hack to automatically rename deprecated sigmoid kernel to new name obj._out_kernel = "ConjunctiveGaussianKernel" obj._in_kernel_params = obj._in_kernel_params.item() obj._out_kernel_params = obj._out_kernel_params.item() obj._replace = obj._replace.item() if obj.labels.size == 1 and obj.labels.item() is None: obj.labels = None if hasattr(obj, "_has_reweight"): obj._has_reweight = obj._has_reweight.item() if hasattr(obj, "_metadata"): obj._metadata = obj._metadata.item() return obj
@classmethod def _from_compute( cls, hsic: np.ndarray, input_kernel: BaseKernel, output_kernel: BaseKernel, n_bootstraps: int, n_sample: int, replace: bool, has_reweight: bool, targets: Optional[np.ndarray], g: Optional[float], dg: Optional[np.ndarray], weights: Optional[np.ndarray], labels: Optional[Sequence[str]], ) -> Union["HSICResult", "HSICResultWithReweight"]: """Processes new calculation results from :meth:`.HSIC.compute`.""" if has_reweight: obj = object.__new__(HSICResultWithReweight) else: obj = object.__new__(HSICResult) obj._metadata = None obj._hsic = hsic obj._in_kernel = type(input_kernel).__name__ obj._out_kernel = type(output_kernel).__name__ obj._n_sample = n_sample obj._n_boot = n_bootstraps obj._d = obj._hsic.shape[1] obj._in_kernel_params = {k: getattr(input_kernel, k) for k in input_kernel.PARAMETERS} obj._out_kernel_params = {k: getattr(output_kernel, k) for k in output_kernel.PARAMETERS} obj._replace = replace obj._has_reweight = has_reweight obj._targets = targets obj._g = g obj._dg = dg obj._weights = weights obj.labels = np.array(labels) if labels is not None else None s = np.clip(hsic, 0, None) s /= s.sum(1, keepdims=True) obj._s = s obj._s_mean = s.mean(0) return obj def __init__(self, *args, **kwargs): raise RuntimeError("Cannot create HSICResult directly, please see HSIC.compute() or HSICResult.load().")
[docs] def __str__(self, top_n: int = 5, width: int = 50) -> str: """Returns a summary of the result and its settings.""" rec = f"{'HSIC Result':^{width}}\n" rec += "=" * width + "\n" fmt = f"{{0:<{width // 2}}}{{1:>{width // 2}}}\n" if self._metadata: rec += fmt.format("Calculation Description:", self._metadata) rec += "-" * width + "\n" rec += fmt.format("No. Factors", self.n_factors) rec += fmt.format("No. Samples", self.n_samples) rec += fmt.format("No. Bootstraps", self.n_bootstraps) rec += fmt.format("Sample with replacement", str(self.sampling_with_replacement)) rec += fmt.format("Includes reweight calc.", "Yes" if self._has_reweight else "No") if self._has_reweight: std = f"\u00b1{self.g_std:.03}" if self.n_bootstraps > 1 else "" rec += fmt.format("Sensitivity Imbalance (g)", f"{self.g:.03}" + std) rec += "-" * width + "\n" rec += fmt.format("Inputs Kernel", self.inputs_kernel) for k, v in self.inputs_kernel_parameters.items(): rec += fmt.format(f" {k}", v) rec += fmt.format("Outputs Kernel", self.outputs_kernel) for k, v in self.outputs_kernel_parameters.items(): rec += fmt.format(f" {k}", v) rec += "-" * width + "\n" order = np.argsort(self.sensitivities)[-1::-1] top_n = min(top_n, self.n_factors) if top_n == -1: rec += f"Factor Rankings:\n" else: rec += f"Top {top_n} Factors:\n" order = order[:top_n] for i, fact in enumerate(order, 1): label = self.labels[fact][:18] if self.labels is not None else f"Parameter_{fact:03}" if self.n_bootstraps > 1: rec += f" {{0:3d}}. {{1:>{width - 20}}} {{2:>.3f}}\u00b1{{3:>.3f}}\n".format( i, label, self.sensitivities[fact], self.sensitivities_std[fact] ) else: rec += f" {{0:3d}}. {{1:>{width - 14}}} {{2:>.3f}}\n".format(i, label, self.sensitivities[fact]) rec += "=" * width + "\n" return rec
[docs] def save(self, path: Union[Path, str] = "hsiccalc.npz"): """Saves the result to file. Uses the numpy ``'.npz'`` format to save the result attributes (see ``numpy.savez``). :Parameters: path Path to file in which the result will be saved. """ path = str(path) np.savez(path, **self.__dict__)
[docs] def plot_sensitivities( self, path: Union[None, Path, str] = "hsicresult.png", plot_top_n: Optional[int] = None ) -> plt.Figure: """Create a detailed graphic of the :attr:`sensitivities` results. :Parameters: path Optional file location in which to save the image. If not provided the image is not saved and only returned. plot_top_n The number of factors to include in the plot. Only the ``plot_top_n`` most influential factors are included. :Returns: matplotlib.figure.Figure Figure instance allowing the user to further tweak and change the plot as desired. """ path = Path(path) if path is not None else None d = min(plot_top_n, self._d) if plot_top_n else self.sensitivities.size fig, ax = plt.subplots() fig: plt.Figure ax: plt.Axes fig.set_size_inches(5, np.clip(15 / 20 * d, 2, 15)) order = np.argsort(self.sensitivities)[-1::-1][:d] labs = self.labels[order] if self.labels is not None else np.arange(self._d)[order] y = np.arange(labs.size) for i in range(self.n_bootstraps): ax.scatter(self._s[i, order], y, marker=".", color="k") ax.scatter(self.sensitivities[order], y, marker="|", color="r") ax.set_yticks(range(labs.size)) ax.set_yticklabels(labs) ax.margins(y=1 / d) ax.set_xlabel(r"$S_d$") ax.set_xlim(0, 1) fig.tight_layout() if path: fig.savefig(path) return fig
[docs] def plot_grouped_sensitivities( self, path: Union[None, Path, str] = "trend.png", squash_threshold: float = 0.0, _seed_ax: plt.Figure = None ) -> plt.Figure: """Create a pie chart of the :attr:`sensitivities` result per factor group. Assumes ``labels`` for the parameters take the format: `group:factor_name`. :Parameters: path Optional file location in which to save the image. If not provided the image is not saved and only returned. squash_threshold If a group's sensitivity falls below this value it will be added to the 'Other' wedge of the plot. :Returns: matplotlib.figure.Figure Figure instance allowing the user to further tweak and change the plot as desired. """ grouped = {"Other": 0} labels = self.labels if self.labels is not None else [str(i) for i in range(self.n_factors)] for l, s in zip(labels, self.sensitivities): g = l.split(":")[0] grouped[g] = grouped.get(g, 0) + s for g, s in grouped.copy().items(): if s < squash_threshold and g != "Other": grouped["Other"] += s del grouped[g] if _seed_ax: ax = _seed_ax else: fig, ax = plt.subplots(figsize=(13.5, 3)) fig: plt.Figure ax: plt.Axes ordered_keys = ["Other"] + [k for _, k in sorted(zip(grouped.values(), grouped.keys())) if k != "Other"] ordered_vals = [grouped[k] for k in ordered_keys] ax.pie(ordered_vals, labels=ordered_keys, autopct="%1.1f%%", startangle=90, counterclock=True, normalize=False) if _seed_ax: return fig.tight_layout() if path: fig.savefig(path) return fig
def _plot_sensitivity_summary( self, path: Union[None, Path, str] = "sensitivity_summary.png", squash_threshold: float = 0.0, width: float = 13.5, height: float = 3, ) -> plt.Figure: """Combines :meth:`plot_grouped_sensitivities` and :meth:`plot_sensitivity_trends` into a single plot.""" fig, ax = plt.subplots(1, 2, figsize=(width, height)) fig: plt.Figure ax: List[plt.Axes] self.plot_sensitivity_trends(None, _seed_ax=ax[0]) self.plot_grouped_sensitivities(None, squash_threshold=squash_threshold, _seed_ax=ax[1]) fig.tight_layout() if path: fig.savefig(path) return fig
[docs]class HSICResultWithReweight(HSICResult): @property def g(self) -> float: """Metric of the sensitivity imbalance (:math:`g`). :math:`g \\geq 0` such that :math:`g = 0` means all sensitivities are on target. If :attr:`~.HSICResult.n_bootstraps` is larger than one. This represents the mean over the bootstraps. """ return self._g.mean(0) @property def g_std(self) -> float: """Standard deviation of :math:`g` across all the bootstraps. Returns ``None`` if a reweight calculation was not run. """ return self._g.std(0) @property def dg(self) -> np.ndarray: """:math:`n_{bootstraps} \\times m` matrix of gradients of :math:`g` with respect to the :math:`m` weights in the data set. """ return self._dg @property def weights(self) -> np.ndarray: """Original :math:`m` length vector of weights used in the loss function. Returns ``None`` if a reweight calculation was not run. """ return self._weights @property def targets(self) -> np.ndarray: """:math:`d` length vector of target sensitivity values for each parameter. Returns ``None`` if a reweight calculation was not run. """ return self._targets @property def n_residuals(self) -> int: """Returns the number of residuals contributing to the loss function.""" return len(self.weights)
[docs] def plot_reweight( self, path: Union[None, Path, str] = "reweightresult.png", group_by: Optional[Sequence[int]] = None, labels: Optional[Sequence[str]] = None, ) -> plt.Figure: """Create a graphic of the reweight results. :Parameters: path Optional file location in which to save the image. If not provided the image is not saved and only returned. group_by Sequence of integer indices grouping individual loss function contributions by training set item. For example, all the forces in a single data set 'Forces' item. labels Sequence of data set item names. :Returns: matplotlib.figure.Figure Figure instance allowing the user to further tweak and change the plot as desired. """ group_by = np.array(group_by) if group_by is not None else np.arange(self.n_residuals) labels = labels if labels else set(group_by) fig, ax = plt.subplots(1, 2, figsize=(15, np.clip(15 / 40 * len(labels), 2, 15)), sharey="all") fig: plt.Figure ax: List[plt.Axes] ids = np.unique(group_by) suggested_weights, stats = self.suggest_weights(group_by, True) dg_mean = self.dg.mean(0) # Gradient results ax[0].scatter(dg_mean, group_by, marker=".", color="k", label="Raw Results") ax[0].scatter(stats["medians"], ids, marker="|", color="r", label="Group Median") ax[0].scatter(stats["means"], ids, marker="|", color="g", label="Group Mean") std = f"\u00B1{self.g_std:.03}" if self.n_bootstraps > 1 else "" ax[0].set_title(f"$g(w)=${self.g:.03}{std} (Target: 0)") ax[0].set_ylabel("Data Set Item") ax[0].set_yticks(ids) ax[0].set_yticklabels(labels if labels else ids) ax[0].set_xlabel("$dg/dw$\n<--- Increase Weight | Decrease Weight --->") lim = np.abs(dg_mean).max() ax[0].set_xlim(-lim, lim) ax[0].axvline(0, color="k", zorder=0) ax[0].invert_yaxis() ax[0].margins(y=1 / len(labels)) ax[0].legend(fontsize=10) # Suggested Weights ax[1].scatter(stats["original_weights"], ids, marker="o", color="r", label="Original") ax[1].scatter(suggested_weights, ids, marker="x", color="g", label="Suggested") ax[1].set_xlabel("Weights") ax[1].margins(y=1 / len(labels)) ax[1].legend(fontsize=10) fig.tight_layout() if path: fig.savefig(path) return fig
[docs] def suggest_weights( self, group_by: Optional[Sequence[int]] = None, return_stats: bool = False ) -> Union[np.ndarray, Tuple[np.ndarray, Dict[str, np.ndarray]]]: """Suggest new weights based on the reweight calculation results. If :attr:`~.HSICResult.n_bootstraps` is larger than one, uses the mean :attr:`dg` value over the bootstraps. :Parameters: group_by Sequence of integer indices grouping individual loss function contributions by training set item. For example, all the forces in a single data set 'Forces' item. """ dg_mean = self.dg.mean(0) # Group results and calculate suggested weights group_by = np.array(group_by) if group_by is not None else np.arange(self.n_residuals) meds = [] means = [] stds = [] original_weights = [] suggested_weights = [] for i, idx in enumerate(set(group_by)): vals = dg_mean[group_by == i] meds.append(np.median(vals)) means.append(vals.mean()) stds.append(vals.std()) original_weights.append(self.weights[group_by == i].sum()) change_weight = np.abs(meds) > 0.005 # todo: should this be an option? Is it universally appropriate? n_change = change_weight.sum() for i, w in enumerate(original_weights): if change_weight[i]: new_weight = w - self.g / n_change / meds[i] new_weight = max(0.1, new_weight) # todo: should this be an option? Is it universally appropriate? suggested_weights.append(new_weight) else: suggested_weights.append(w) if return_stats: return np.array(suggested_weights), { "medians": np.array(meds), "means": np.array(means), "stds": np.array(stds), "original_weights": np.array(original_weights), } return np.array(suggested_weights)
[docs] def save_reweight_summary( self, path: Union[Path, str] = "reweightresult.csv", group_by: Optional[Sequence[int]] = None, labels: Optional[Sequence[str]] = None, ): """Save a summary of the reweight calculation to a text file. :Parameters: group_by Sequence of integer indices grouping individual loss function contributions by training set item. For example, all the forces in a single data set 'Forces' item. labels Sequence of data set item names. """ path = Path(path) suggested_weights, stats = self.suggest_weights(group_by, True) group_by = np.array(group_by) if group_by is not None else np.arange(self.n_residuals) labels = labels if labels else set(group_by) with path.open("w") as file: file.write("#dataset_item dg_median dg_mean dg_std original_weight suggested_weight\n") for i, lab in enumerate(labels): file.write( f"{lab} " f"{stats['medians'][i]} " f"{stats['means'][i]} " f"{stats['stds'][i]} " f"{stats['original_weights'][i]} " f"{suggested_weights[i]}\n" )
[docs]class HSIC: """Implementation of the Hilbert-Schmidt Independence Criterion for global sensitivity analysis. Estimates the HSIC via the unbiased estimator of Song et al (2012). :Parameters: x :math:`n \\times d` matrix of :math:`d`-dimensional vectors in the input space. These are samples of the variables you would like to know the sensitivity for. y :math:`n` length vector of outputs corresponding to the input samples in ``x``. These are the responses of some function against which the sensitivity is measured. labels Optional list of factor names. x_bounds :math:`d \\times 2` matrix of min-max pairs for every factor in ``x`` to scale the values between 0 and 1. Defaults to ``'auto'`` which takes the limits as the min and max values of the sample data. To use no scaling set this to ``None``. :Notes: ``y`` is automatically scaled by taking the logarithm of the values and then scaling by the minimum and maximum to a range between zero and one. This makes selecting kernel parameters easier, and deals with order-of-magnitude problems which often arise during reparameterization. :References: Song, L., Smola, A., Gretton, A., Bedo, J., & Borgwardt, K. (2012). Feature Selection via Dependence Maximization. Journal of Machine Learning Research, 13, 1393–1434. https://doi.org/10.5555/2188385.2343691 Gretton, A., Bousquet, O., Smola, A., & Schölkopf, B. (2005). Measuring Statistical Dependence with Hilbert-Schmidt Norms. In Lecture Notes in Computer Science (including subseries Lecture Notes in Artificial Intelligence and Lecture Notes in Bioinformatics): Vol. 3734 LNAI (Issue 140, pp. 63–77). Springer, Berlin, Heidelberg. https://doi.org/10.1007/11564089_7 Gretton, A., Borgwardt, K. M., Rasch, M. J., Smola, A., Schölkopf, B., Smola GRETTON, A., & Smola, A. (2012). A kernel two-sample test. Journal of Machine Learning Research, 13(25), 723–773. http://jmlr.org/papers/v13/gretton12a.html Spagnol, A., Riche, R. Le, & Veiga, S. Da. (2019). Global Sensitivity Analysis for Optimization with Variable Selection. SIAM/ASA Journal on Uncertainty Quantification, 7(2), 417–443. https://doi.org/10.1137/18M1167978 Da Veiga, S. (2015). Global sensitivity analysis with dependence measures. Journal of Statistical Computation and Simulation, 85(7), 1283–1305. https://doi.org/10.1080/00949655.2014.945932 """ def __init__( self, x: np.ndarray, y: np.ndarray, labels: Optional[Sequence[str]] = None, x_bounds: Union[str, np.ndarray, None] = "auto", ): self._x_raw = x.copy() if x_bounds == "auto": self.x = (x - x.min(0)) / (x.max(0) - x.min(0)) elif x_bounds is not None: x_bounds = np.array(x_bounds) self.x = (x - x_bounds[:, 0]) / (x_bounds[:, 1] - x_bounds[:, 0]) else: self.x = x.copy() self._y_raw = y.copy() self.y = np.log(y) self._y_min = self.y.min() self._y_max = self.y.max() self.y = (self.y - self._y_min) / (self._y_max - self._y_min) self.n, self.d = x.shape self.labels = labels
[docs] def compute( self, inputs_kernel: Optional[BaseKernel] = None, outputs_kernel: Optional[BaseKernel] = None, n_bootstraps: int = 1, n_sample: int = -1, replace: bool = False, ) -> HSICResult: """Calculates the HSIC for each input factor. The larger the HSIC value for a parameter, the more sensitive ``y`` is to changes in the corresponding parameter in ``x``. :Parameters: inputs_kernel Instance of :class:`.BaseKernel` which will be applied to ``x``. Defaults to :class:`!GaussianKernel`. outputs_kernel Instance of :class:`.BaseKernel` which will be applied to ``y``. Defaults to :class:`!ConjunctiveGaussianKernel`. n_bootstraps Number of repeats of the calculation with different sub-samples from the data set. A small spread from a large number of bootstraps provides confidence on the estimation of the sensitivity. n_sample Number of vectors in ``x`` to use in the calculation. Defaults to -1 which uses all available points. replace If ``True``, samples from ``x`` will be done with replacement and vice verse. This only has an effect if ``n_sample`` is less than ``n`` otherwise replace is ``True`` by necessity. :Returns: :class:`.HSICResult` Object containing the results of the HSIC calculation and the calculation settings. """ return self._compute( residuals=None, targets=None, error_weights=None, error_sigma=None, inputs_kernel=inputs_kernel, outputs_kernel=outputs_kernel, n_bootstraps=n_bootstraps, n_sample=n_sample, replace=replace, max_cache_size=None, run_reweight=False, )
[docs] def compute_with_reweight( self, residuals: np.ndarray, targets: np.ndarray, error_weights: np.ndarray, error_sigma: np.ndarray, inputs_kernel: Optional[BaseKernel] = None, outputs_kernel: Optional[BaseKernel] = None, n_bootstraps: int = 1, n_sample: int = -1, replace: bool = False, max_cache_size: Optional[int] = None, ) -> HSICResultWithReweight: """Computes the sensitivity of the HSIC to the weights applied to the construction of an error function. This calculation is only applicable in very particular conditions: #. :math:`X \\in \\mathbb{R}^{n \\times d}` represents the inputs to a function which produces :math:`m` outputs for each :math:`d` length input vector input of :math:`X` resampled :math:`n` times. #. These outputs are the :math:`n \\times m` ``predictions`` matrix (:math:`P`). #. The ``predictions`` matrix can be condensed to the :math:`n` length vector ``y`` by an 'error' function: .. math:: \\mathbf{y} = \\sum^d_i w_i \\left(\\frac{P_i - r_i}{\\sigma_i}\\right)^2 #. :math:`\\mathbf{r}`, :math:`\\mathbf{w}` and :math:`\\mathbf{\\sigma}` are :math:`m` length vectors of ``reference``, ``error_weights``, and ``error_sigma`` values respectively. #. This function returns :math:`\\frac{dg}{d\\mathbf{w}}` which is a measure of how much changes in :math:`\\mathbf{w}` (the 'weights' in the error function) will affect :math:`g` which is a measure of how close HSIC values are to our ``target`` HSIC sensitivities. #. :math:`g` is defined such that :math:`g \\geq 0`. :math:`g = 0` implies the sensitivities are perfect. :Parameters: residuals :math:`n \\times m` matrix of error values between predictions and references. targets :math:`d` length boolean vector. If an element is ``True``, one would like the corresponding parameter to show sensitivity. Can also send a vector of real values such that sum of elements is 1. This allows for custom sensitivities to be targeted for each parameter. error_weights :math:`m` length vector of error function 'weights' for which the sensitivity will be measured. error_sigma :math:`m` length vector of error function 'standard error' values. inputs_kernel See :meth:`compute`. But only one value is allowed in this function. outputs_kernel See :meth:`compute`. n_bootstraps Number of repeats of the calculation with different sub-samples from the data set. A small spread from a large number of bootstraps provides confidence on the estimation of the sensitivity. n_sample Number of vectors in ``x`` to use in the calculation. Defaults to -1 which uses all available points. replace If ``True``, samples from ``x`` will be done with replacement and vice verse. This only has an effect if ``n_sample`` is less than ``n`` otherwise replace is ``True`` by necessity. max_cache_size Maximum amount of disk space (in bytes) the program may use to store matrices and speed-up the calculation. Defaults to the maximum size of the temporary directory on the system. :Returns: :class:`.HSICResultWithReweight` Object containing the results of the HSIC calculation and the calculation settings. .. warning:: Efforts have been made to reduce the memory footprint of the calculation, but it can become very large, very quickly. This calculation is also significantly slower than the normal sensitivity calculation. """ return self._compute( residuals=residuals, targets=targets, error_weights=error_weights, error_sigma=error_sigma, inputs_kernel=inputs_kernel, outputs_kernel=outputs_kernel, n_bootstraps=n_bootstraps, n_sample=n_sample, replace=replace, max_cache_size=max_cache_size, run_reweight=True, )
def _compute( self, residuals: Optional[np.ndarray], targets: Optional[np.ndarray], error_weights: Optional[np.ndarray], error_sigma: Optional[np.ndarray], inputs_kernel: Optional[BaseKernel], outputs_kernel: Optional[BaseKernel], n_bootstraps: int, n_sample: int, replace: bool, max_cache_size: Optional[int], run_reweight: bool, ) -> Union[HSICResult, HSICResultWithReweight]: # Process inputs if n_sample > self.n: replace = True n_sample = n_sample if n_sample > 0 else self.n if inputs_kernel is None: inputs_kernel = GaussianKernel() if outputs_kernel is None: outputs_kernel = ConjunctiveGaussianKernel() assert n_bootstraps > 0 all_x = np.arange(self.n) boot_ids = np.array([np.random.choice(all_x, n_sample, replace=replace) for _ in range(n_bootstraps)]) d = self.d triu_ids = np.triu_indices(n_sample, 1) # HSIC Coefficients coeff_pre = 1 / (n_sample * (n_sample - 3)) coeff_t2 = 1 / ((n_sample - 1) * (n_sample - 2)) coeff_t3 = -2 / (n_sample - 2) # Pre-allocation hsic = np.zeros((n_bootstraps, d)) # Reweight specific tasks t = None if run_reweight: residuals = np.array(residuals) targets = np.array(targets) w = np.array(error_weights) m = residuals.shape[1] # Setup targets if targets.dtype == bool: t = np.zeros(d) t[targets] = 1 / targets.sum() else: t = targets dhsic = np.zeros((n_bootstraps, d, m)) # Error Function df = residuals / error_sigma[None] df **= 2 # n x m f = w * df f = f.sum(1) # n if not np.allclose(f, self._y_raw): warnings.warn( "The total loss calculated from 'residuals' does not match the total loss given. " "Please check the data and verify that a SSE loss function was used." ) # Scale f to f_bar # (This is essential for the conjunctive-Gaussian kernel which relies on scaled data) f_bar = np.log(f) f_bar = (f_bar - self._y_min) / (self._y_max - self._y_min) df_bar = 1 / (self._y_max - self._y_min) * (1 / f[:, None]) * df else: f_bar = self.y m = 1 # Setup Progress Bar pbar = tqdm( file=sys.stdout, bar_format="[ {percentage:3.0f}%|{bar:8}] {n_fmt}/{total_fmt}, {elapsed}<{remaining}, {rate_fmt}{postfix}", total=n_bootstraps * (m * d + int(run_reweight)), # Only cache if reweight calculation is run unit_scale=True, ) for b in range(n_bootstraps): x = self.x[boot_ids[b]] y = f_bar[boot_ids[b]] poststr = f"Boot {b + 1:03} / {n_bootstraps:03} " # Output kernel L = outputs_kernel(y) L_sum = L.sum() cache = "none" if run_reweight: # Caching only needed for reweight calculation # d << m (usually there are far fewer dimensions than training set items) # We will try and pre-calculate them and store them in memory. # If too large try hold them temporarily in disk, # If still too large we will just have to recalculate them over and over again. est_cache_space = d * n_sample * (n_sample - 1) // 2 * 8 buffer_space = 10 * n_sample * n_sample * 8 # Reserve some memory for later calculation mem_avail = psutil.virtual_memory().available if mem_avail > (est_cache_space + buffer_space): K_cache = np.empty((d, n_sample * (n_sample - 1) // 2), dtype=float) pbar.set_postfix_str(poststr + "Caching to memory", refresh=True) for i in range(d): K = inputs_kernel(x[:, i]) K_cache[i] = K[triu_ids] # Store only upper tri since matrices symmetric del K cache = "memory" else: tmpdir = tf.TemporaryDirectory() try: if max_cache_size is not None and est_cache_space > max_cache_size: raise OSError pbar.set_postfix_str(poststr + "Caching to disk", refresh=True) for i in range(d): K = inputs_kernel(x[:, i]) np.save(tmpdir.name + os.sep + str(i) + ".npy", K[triu_ids]) cache = "disk" except OSError: tmpdir.cleanup() cache = "none" pbar.set_postfix_str(poststr + "Unable to cache results") pbar.update() pbar.set_postfix_str(poststr + "HSIC Calc", refresh=True) # Loop over training set items for j in range(m): # m == 1 if reweight is not run if run_reweight: # Placed outside loop to avoid unneeded recalculations dL = outputs_kernel(y, df_bar[boot_ids[b], j]) dL_sum = dL.sum() for i in range(d): if cache == "memory": K = np.zeros((n_sample, n_sample)) K[triu_ids] = K_cache[i] K = K.T K[triu_ids] = K_cache[i] elif cache == "disk": K = np.zeros((n_sample, n_sample)) load = np.load(tmpdir.name + os.sep + str(i) + ".npy") K[triu_ids] = load K = K.T K[triu_ids] = load else: # cache == 'none' K = inputs_kernel(x[:, i]) K_sum = K.sum() # Avoid repeating this calc which is not a function of j but is here so K is only extracted once if j == 0: # HSIC calculation KL = K @ L t1 = np.trace(KL) # Term 1: tr(KL) t2 = K_sum * L_sum # Term 2: 1K11L1 t3 = np.sum(KL) # Term 3: 1KL1 del KL hsic[b, i] = coeff_pre * (t1 + coeff_t2 * t2 + coeff_t3 * t3) if run_reweight: # dHSIC Calculation KdL = K @ dL t1 = np.trace(KdL) # Term 1: tr(KL) t2 = K_sum * dL_sum # Term 2: 1K11L1 t3 = np.sum(KdL) # Term 3: 1KL1 del KdL dhsic[b, i, j] = coeff_pre * (t1 + coeff_t2 * t2 + coeff_t3 * t3) del K pbar.update() # Close cache if used if cache == "disk": tmpdir.cleanup() # Close Progress Bar Nicely pbar.refresh() pbar.close() # Calculate S hsic = np.clip(hsic, 0, None) hsic_tot = hsic.sum(1) s = hsic / hsic_tot[:, None] g = None dg = None if run_reweight: ds = (hsic_tot[:, None, None] * dhsic - hsic[:, :, None] * dhsic) / (hsic_tot[:, None, None] ** 2) core = s - t g = np.sum(core**2, 1) dg = np.sum(2 * ds * core[:, :, None], 1) return HSICResult._from_compute( hsic=hsic, input_kernel=inputs_kernel, output_kernel=outputs_kernel, n_bootstraps=n_bootstraps, n_sample=n_sample, replace=replace, has_reweight=run_reweight, targets=t, g=g, dg=dg, weights=error_weights, labels=self.labels, )