# histlite.py
"""Calculate and plot histograms, easily."""
import copy
import datetime
import inspect
import itertools
import os
import warnings
import numpy as np
from scipy import interpolate, ndimage, optimize, stats
try:
import matplotlib as mpl
import matplotlib.pyplot as plt
except:
mpl = plt = None
from . import version
try:
from . import heal
except:
pass
eps = np.finfo(float).eps
xrange = range
# 1 - 2 * stats.norm.sf(1)
_containment_1sigma = 0.6826894921370859
[docs]def reindex(a, order):
"""Rearrange the axes of a multidimensional array.
:type a: ndarray
:param a: the input array
:type order: sequence of int
:param order: the axis that should wind up in each ordinal position
:return: reindexed array
Note: this is useful for implementing :meth:`Hist.sum`, etc., but you probably should
prefer ``np.swapaxes``, possibly using multiple applications, instead.
"""
cur = list(range(len(a.shape)))
assert (sorted(order) == cur)
for i_dest, i_source in enumerate(order):
i_cur = cur.index(i_source)
cur[i_dest], cur[i_cur] = cur[i_cur], cur[i_dest]
a = np.swapaxes(a, i_dest, i_cur)
return a
[docs]def unreindex(a, order):
"""Reverse the effects of :meth:`reindex`.
:type a: ndarray
:param a: the already reindexed array
:type order: sequence of int
:param order: order previously applied to :meth:`reindex`
:return: unreindexed array
Note: this is useful for implementing :meth:`Hist.sum`, etc., but you probably should
prefer ``np.swapaxes``, possibly using multiple applications, instead.
"""
cur = list(range(len(a.shape)))
assert (sorted(order) == cur), '{} vs {}'.format(sorted(order), cur)
for i_source, i_dest in enumerate(order):
i_cur = cur.index(i_source)
cur[i_dest], cur[i_cur] = cur[i_cur], cur[i_dest]
a = np.swapaxes(a, i_dest, i_cur)
return a
def breakout_1d(a):
return np.r_[a[0], a, a[-1]]
def breakout_2d(a):
z = np.zeros((a.shape[0] + 2, a.shape[1] + 2))
z[1:-1, 1:-1] = a
z[0, 1:-1] = a[0]
z[-1, 1:-1] = a[-1]
z[1:-1, 0] = a[:,0]
z[1:-1, -1] = a[:,-1]
for (i, j) in [(0, 0), (0, -1), (-1, 0), (-1, -1)]:
z[i, j] = a[i,j]
return z
[docs]class Hist(object):
"""A histogram."""
def __init__(self, bins, values, errors=None, data=None, weights=None):
"""Construct a :class:`Hist`.
:type bins: 2D array-like
:param bins: the bin edges
:type values: n_dim array-like
:param values: the counts
:type errors: n_dim array-like
:param values: the per-bin errors if given; otherwise NaN is assumed.
:type data: tuple of array-like
:param data: the source data for the histogram: n_dim tuples of
n_sample arrays
:type weights: array-like
:param weights: the weights of the source datapoints: an array of
length n_sample
"""
self._values = np.asarray(values)
if len(self.values.shape) == 1:
try:
bins_array = np.asarray(bins)
if len(bins_array) == self.values.shape[0] + 1:
bins = (bins,)
except:
pass
self._bins = list(map(np.asarray, bins))
if errors is not None:
self._errors = np.asarray(errors)
else:
self._errors = None
if len(self.values.shape):
self._errors = np.empty(self.values.shape)
self._errors[:] = np.nan
else:
self._errors = np.array(np.nan)
self._data = data
self._weights = weights
self._n_dim = len(self.bins)
self._range = [(b[0], b[-1]) for b in self.bins]
# plan to lazy-compute log and centers
self._log = None
self._centers = None
def copy(self):
b, v, e = map(copy.deepcopy, (self._bins, self._values, self._errors))
d, w = self._data, self._weights
return Hist(b, v, e, data=d, weights=w)
@property
def zeronans(self):
self = self.copy()
self._values[self._values == 0] = np.nan
return self
def to_finite(self, nan=0, inf=np.max, minf=np.min):
self = self.copy()
masks = np.isnan(self.values), self.values == np.inf, self.values == -np.inf
values = self.values
for (mask, k) in zip(masks, (nan, inf, minf)):
if callable(k):
v = k(values[~mask])
else:
v = k
values[mask] = v
return self
def _determine_log(self):
log = []
for dim in xrange(self.n_dim):
diffs = np.diff(self.bins[dim])
if 0 in self.bins[dim] \
or isinstance(self.bins[dim][0], datetime.datetime) \
or isinstance(self.bins[dim][0], np.datetime64):
dim_log = False
else:
ratios = self.bins[dim][1:] / self.bins[dim][:-1]
nlog = len(np.unique(ratios))
nlin = len(np.unique(diffs))
dim_log = (nlog < nlin)
log.append(dim_log)
self._log = np.array(log)
def _determine_centers(self):
log = self.log
centers = []
for dim in xrange(self.n_dim):
if log[dim]:
ratios = self.bins[dim][1:] / self.bins[dim][:-1]
centers.append(self.bins[dim][:-1] * np.sqrt(ratios))
else:
diffs = np.diff(self.bins[dim])
centers.append(self.bins[dim][:-1] + diffs / 2)
self._centers = list(map(np.asarray, centers))
# given construction properties of the Hist
@property
def n_dim(self):
"""The number of dimensions in the histogram."""
return self._n_dim
@property
def bins(self):
"""A list of bin edge arrays, one for each dimension."""
return self._bins
@property
def n_bins(self):
"""A list of the number of bins in each dimension."""
return list(map(len, self.centers))
@property
def range(self):
"""A list of(min_value,max_value) tuples for each dimension."""
return self._range
@property
def values(self):
"""An nD array of bin values(sum of weights in each bin)."""
return self._values
@property
def errors(self):
"""An nD array of bin errors(sqrt(sum(squares of weights))) in each
bin."""
return self._errors
@property
def data(self):
"""The data used to construct the histogram(if given upon
construction)."""
return self._data
@property
def weights(self):
"""The weights used to construct the histogram(if given upon
construction)."""
return self._weights
# inferred construction properties of the Hist
@property
def log(self):
"""A list of bools describing whether each dimenion is binned linearly
or logarithmically."""
if self._log is None:
self._determine_log()
return self._log
@property
def centers(self):
"""A list of bin center arrays for each dimension."""
if self._centers is None:
self._determine_centers()
return self._centers
@property
def widths(self):
"""A list of bin width arrays for each dimension."""
return list(map(np.diff, self.bins))
@property
def volumes(self):
"""An nD array of bin volumes(product of bin widths in
each dimension)."""
if self.n_dim == 0:
return 0
elif self.n_dim == 1:
return np.diff(self.bins[0])
else:
return np.prod(np.meshgrid(*map(np.diff, self.bins)), axis=0).T
[docs] def plotrange(self, N=100):
"""Get linspace or logspace along each binning axis."""
return tuple(
np.logspace(np.log10(mi), np.log10(ma), N) if log else np.linspace(mi, ma, N)
for ((mi, ma),log) in zip(self.range, self.log))
# bin-data access
[docs] def index(self, x, axis=0):
"""The bin index for value ``x`` in dimension ``axis``."""
def digitize(b, a):
return np.searchsorted(a, b, side='right')
shape = np.shape(x)
last_mask = x == self.range[axis][1]
last_i = self.n_bins[axis] - 1
if shape:
out = (digitize(x.ravel(), self.bins[axis]) - 1).reshape(shape)
out[last_mask] = last_i
return out
else:
if last_mask:
return last_i
else:
return int(digitize([x], self.bins[axis]) - 1)
[docs] def indices(self, *xs):
"""Get the indices for the specified coordinates."""
if len(xs) != self.n_dim:
raise TypeError('`n_dim` coordinates required')
return tuple(self.index(x, i) for (i,x) in enumerate(xs))
[docs] def ravel_indices(self, *xs):
"""Get the indices into values.ravel() for the specified coordinates.
Index of -1 indicates out-of-bounds
"""
indices = self.indices(*xs)
out_indices = np.zeros(len(indices[0]), dtype=int)
step = 1
bad_mask = np.zeros(len(indices[0]), dtype=bool)
for axis in xrange(self.n_dim - 1, -1, -1):
bad_mask |= ~ ( (self.range[axis][0] <= xs[axis])
& (xs[axis] < self.range[axis][1]))
out_indices += step * indices[axis]
step *= self.n_bins[axis]
out_indices[bad_mask] = -1
return out_indices
[docs] def get_value(self, *xs):
"""Get the counts value at the specified coordinates."""
return self.values[self.indices(*xs)]
[docs] def get_values(self, *xs):
"""Get the counts values at the specified lists of coordinates."""
indices = tuple([self.index(np.asarray(x),i) for (i,x) in enumerate(xs)])
return self.values[indices]
def __call__(self, *xs):
return self.get_values(*xs)
def eval(self, *a, **kw):
kw.setdefault('ndim', self.n_dim)
kw.setdefault('range', self.range)
return hist_from_eval(self.get_values, *a, **kw)
[docs] def get_error(self, *xs):
"""Get the error value at the specified coordinates."""
return self.errors[self.indices(*xs)]
[docs] def get_errors(self, *xs):
"""Get the error values at the specified lists of coordinates."""
return np.array([self.get_error(*x) for x in zip(*xs)])
def transform_bins(self, f, axes=[-1]):
bins = [
f(self.bins[axis])
if (axis in axes) or (axis - self.n_dim in axes)
else self.bins[axis]
for axis in xrange(self.n_dim)]
return Hist(bins, self.values, self.errors)
# sampling
[docs] def sample(self, n_samples=1, *values, **kw):
"""Draw n samples from the data.
:type n_samples: int
:param n_samples: the number of samples
Any given values select bins in the first len(values) dimensions, such
that sampling is done only from the remaining dimensions.
:return: tuple of arrays of length n_dim
"""
seed = kw.get('seed', np.random.seed())
random = kw.get('random', np.random.RandomState(seed))
if values:
return self[values].sample(n_samples)
cdf = np.cumsum(self.values.ravel()) / self.values.sum()
dice = random.uniform(0, 1, n_samples)
dice_bins = np.searchsorted(cdf, dice)
indices = np.unravel_index(dice_bins, self.values.shape)
lefts = [self.bins[i][indices[i]] for i in xrange(self.n_dim)]
rights = [self.bins[i][indices[i] + 1] for i in xrange(self.n_dim)]
outs = [left * ((right / left) ** random.uniform(0, 1, n_samples))
if self.log[i] else
left + (right - left) * random.uniform(0, 1, n_samples)
for (i, left,right)
in zip(xrange(self.n_dim), lefts, rights)]
return outs
# axis-wise operations
[docs] def cumsum(self, axes=[-1], normalize=False):
"""
Calculate the cumulative sum along specified axes(in order).
"""
values = self.values.copy()
axes = np.atleast_1d(axes)
sum_axes = [self.n_dim + i if i < 0 else i for i in axes]
for axis in sum_axes:
values = values.cumsum(axis=axis)
if normalize:
values = values / self.sum(axes).values
return Hist(self.bins, values)
[docs] def sum(self, axes=None, integrate=False):
"""Project the histogram onto a subset of its dimensions by summing
over ``axes``.
:type axes: sequence of int
:param axes: the axes along which to sum and thus the dimensions
no longer present in the resulting Hist.
:type integrate: bool
:param integrate: if True, evaluate sum of(value * width) rather than
just value.
:return: :class:`Hist`
"""
if axes is None:
axes = list(range(self.n_dim))
axes = np.atleast_1d(axes)
# get sum and keep axes
sum_axes = sorted([self.n_dim + i if i < 0 else i for i in axes])
keep_axes = [i for i in xrange(self.n_dim) if i not in sum_axes]
bins = [self.bins[i] for i in xrange(self.n_dim) if i not in sum_axes]
for axis in sum_axes:
if axis not in xrange(self.n_dim):
raise ValueError('histogram has no axis {0}'.format(axis))
# order new arrays so sum axes are last
axes_order = keep_axes + sum_axes
values = reindex(self.values.copy(), axes_order)
if self.errors is not None:
sq_errors = reindex(self.errors**2, axes_order)
else:
sq_errors = None
# sum over last axes until done summing
for axis in reversed(sum_axes):
axis_bins = self.bins[axis]
if integrate:
values *= np.diff(axis_bins)
if sq_errors is not None:
sq_errors *= np.diff(axis_bins)
values = values.sum(axis=-1)
if sq_errors is not None:
sq_errors = sq_errors.sum(axis=-1)
if self.errors is not None:
errors = np.sqrt(sq_errors)
else:
errors = None
return Hist(bins, values, errors)
[docs] def integrate(self, axes=None):
"""Project the histogram onto a subset of its dimensions by integrating
over ``axes``.
:type axes: sequence of int
:param axes: the axes along which to integrate and thus the dimensions
no longer present in the resulting Hist.
:return: :class:`Hist`
"""
return self.sum(axes=axes, integrate=True)
[docs] def project(self, axes=[-1], integrate=False):
"""Project the histogram onto a subset of its dimensions by summing
over axes other than those listed in ``axes``.
:type axes: sequence of int
:param axes: the axes along which NOT to sum or integrate, and thus the
dimensions no longer present in the resulting Hist.
:return: :class:`Hist`
"""
axes = [self.n_dim + i if i < 0 else i for i in axes]
return self.sum([i for i in xrange(self.n_dim) if i not in axes],
integrate=integrate)
[docs] def contain(self, axis, frac=_containment_1sigma):
"""Project the histogram onto a subset of its dimensions by taking the
containment interval along ``axis``.
:type axis: int
:param axis: the axis along which to measure containment and thus the
dimensions no longer present in the resulting Hist.
:type frac: float
:param frac: the containment interval, measured from
``self.range[axis][0]`` and moving to the "right"
:return: :class:`Hist`
"""
if axis < 0:
axis = len(self.bins) + axis
def weighted_containment(a, w):
as_ws = np.array(sorted(zip(a, w))).T
if np.sum(w) == 0:
m = np.nan
else:
m = as_ws[0][as_ws[1].cumsum() >= frac * w.sum()][0]
return m
def apply_weighted_containment(w):
return weighted_containment(self.centers[axis], w)
values = np.apply_along_axis(apply_weighted_containment, axis, self.values)
bins = [self.bins[i] for i in xrange(self.n_dim) if i != axis]
return Hist(bins, values)
[docs] def contain_project(self, axis,
frac=_containment_1sigma,
n_sigma=None,
):
"""
Project the histogram taking median along one dimension, with errorbars
reflecting the eliminated axis.
:type axis: int
:param axis: the axis along which to measure containment and thus the
dimensions no longer present in the resulting Hist.
:type frac: float
:param frac: the containment fraction, which will be centered on the
median value along the given axis
:type n_sigma: float
:param n_sigma: the containment fraction, specified as a number of
sigmas
:return: :class:`Hist`
If given, ``n_sigma`` overrides ``frac``.
"""
out = self.median(axis)
f = (1 - frac) / 2. if n_sigma is None else stats.norm.sf(n_sigma)
h_low = self.contain(axis, f)
h_high = self.contain(axis, 1 - f)
out._errors = np.vstack(((out - h_low).values, (h_high - out).values))
return out
[docs] def mean(self, axis=-1):
"""Project the histogram onto a subset of its dimensions by taking the mean along
``axis``.
:type axis: int
:param axis: the axis along which to find the median and thus the
dimensions no longer present in the resulting Hist.
:return: :class:`Hist`
"""
if axis < 0:
axis = len(self.bins) + axis
keep_axes = [i for i in xrange(self.n_dim) if i != axis]
reorder = keep_axes + [axis]
values = reindex(self.values, reorder)
values = (self.centers[axis] * values).sum(axis=-1) / values.sum(axis=-1)
bins = [self.bins[i] for i in keep_axes]
# TODO: can we calculate an error/interval as well?
return Hist(bins, values)
[docs] def normalize(self, axes=None, integrate=True, density=False):
"""Return a histogram normalized so the sums(or integrals) over the
given axes are unity.
:type axes: sequence of int, optional
:param axes: the axes that will sum(or integrate) to unity for the
normalized histogram
:type integrate: bool
:param integrate: if True, normalize so the integral is unity;
otherwise, normalize so the sum is unity
:type density: bool
:param density: if True, normalize so the integral is unity, but
*as though* the binning were linspaced, even if it is actually not.
This option supersedes the ``integrate`` argument.
:return: :class:`Hist`
The norm is found by summing over all axes other than the ones
specified, or by summing over all axes if ``axis`` is not given. Note
that setting ``density=True`` should obtain equivalent behavior to
``numpy.histogram(..., density=True)``.
"""
if axes is None:
axes = list(range(self.n_dim))
axes = np.atleast_1d(axes)
axes = sorted([self.n_dim + i if i < 0 else i for i in axes])
for axis in axes:
if axis not in xrange(self.n_dim):
raise ValueError('histogram has no axis {0}'.format(axis))
keep_axes = axes
lose_axes = [i for i in xrange(self.n_dim) if i not in axes]
if density:
integrate = False
hsum = 1. * self.sum(keep_axes, integrate=integrate)
hother = self.sum(lose_axes)
reorder = keep_axes + lose_axes
if density:
volumes = hother.volumes
else:
volumes = 1
with warnings.catch_warnings():
warnings.filterwarnings('ignore')
values = unreindex(
reindex(self.values, reorder) / hsum.values, reorder
) / volumes
if self.errors is not None:
with warnings.catch_warnings():
warnings.filterwarnings('ignore')
errors = unreindex(
reindex(self.errors, reorder) / hsum.values, reorder
) / volumes
else:
errors = None
return Hist(self.bins, values, errors)
[docs] def rebin(self, axis, bins, tol=1e-4):
"""Produce coarser binning along the given axis.
:type axis: int
:param axis: the axis along which to rebin
:type bins: sequence
:param bins: the new bin edges
:type tol: float
:param tol: the absolute error between the given ``bins`` and ones
found in the original histogram
:return: :class:`Hist`
Each bin in bins should be contained in the existing bins, and the
endpoints should match. Tolerance for bin agreement is given as an
absolute error by `tol`.
"""
# validate bins
oldbins = self.bins[axis]
newbins = np.copy(np.sort(bins))
for (i, b) in enumerate(newbins):
misses = np.abs(b - oldbins)
j = np.argmin(misses)
closest = np.min(misses)
if closest > tol:
raise ValueError(
'{0} is not among current bin edges'.format(b))
newbins[i] = oldbins[j]
if newbins[0] != oldbins[0]:
raise ValueError(
'binning startpoint should match')
if newbins[-1] != oldbins[-1]:
raise ValueError(
'binning endpoint should match')
n_newbins = len(newbins) - 1
newbin_indices = np.digitize(oldbins, newbins)[:-1] - 1
def revalue_one(a):
return [np.sum(a[newbin_indices==i])
for i in xrange(n_newbins)]
def reerror_one(a):
return [np.sqrt(np.sum(a[newbin_indices==i]**2))
for i in xrange(n_newbins)]
values = np.apply_along_axis(revalue_one, axis, self.values)
if self.errors is not None:
errors = np.apply_along_axis(reerror_one, axis, self.errors)
else:
errors = None
bins = [self.bins[i] if i != axis else newbins
for i in xrange(self.n_dim)]
return Hist(bins, values, errors)
def __getitem__(self, sli):
def intypes(obj, types):
for t in types:
if isinstance(obj, t):
return True
return False
# handle non-tuple
if not isinstance(sli, tuple):
return self[sli,]
# check against excess dimensions
if len(sli) > self.n_dim:
raise TypeError('too many dimensions for this Hist')
# TODO: do we really need a copy here? seems like we shouldn't
#subh = copy.deepcopy(self)
subh = self
subsli = sli[0]
n_reductions = 0
for dim, subsli in enumerate(sli):
# could be a number/sequence, or slice
if intypes(subsli, (int, float, list, np.float32, np.ndarray)):
# cannot subscript a 1D Hist
if self.n_dim == 1:
raise TypeError('cannot reduce dimensionality of 1D Hist')
index = self.index(subsli, dim)
subh = subh.get_slice(index, dim - n_reductions)
else:
if subsli.start is None:
start = 0
else:
start = self.index(subsli.start, dim)
if subsli.stop is None:
stop = len(self.bins[dim])
else:
stop = self.index(subsli.stop, dim)
if subsli.stop not in self.bins[dim]:
stop += 1
indsli = slice(start, stop)
subh = subh.get_slice(indsli, dim - n_reductions)
if subh.n_dim < self.n_dim:
n_reductions += 1
return subh
def get_slice(self, index, axis=0):
slices = []
bins = []
log = []
for dim in xrange(self.n_dim):
if dim == axis:
slices.append(index)
if not isinstance(index, int):
if isinstance(index, slice):
if index.stop is not None:
bin_index = slice(
index.start, index.stop + 1, index.step)
else:
bin_index = index
bins.append(self.bins[axis][bin_index])
log.append(self.log[axis])
else:
n_axis_bins = len(self.bins[dim])
slices.append(slice(0, n_axis_bins))
bins.append(self.bins[dim])
log.append(self.log[dim])
values = self.values[tuple(slices)]
if self.errors is not None:
errors = self.errors[tuple(slices)]
else:
errors = None
return Hist(bins, values, errors)
@property
def T(self):
"""A transposed version of the Hist."""
bins = self.bins[::-1]
values = self.values.T
if self.errors is not None:
errors = self.errors.T
else:
errors = None
return Hist(bins, values, errors)
# fitting
[docs] def curve_fit(self, func, **kw):
"""
Fit a function to the histogram.
:type func: function
:param func: model function as in scipy.optimize.curve_fit().
:return: popt, pcov as in scipy.optimize.curve_fit()
This function unravels the values and bin centers into ``n_dim`` 1D
arrays which are then passed, along with any keyword arguments, to
scipy.optimize.curve_fit().
"""
ydata = np.ravel(self.values)
errors = self.errors
if errors is not None and not np.all(~np.isfinite(errors)):
#sigma = np.ravel(self.errors)
if len(errors.shape) == len(self.values.shape) + 1 \
and errors.shape[0] == 2:
errors = np.mean(errors, axis=0)
sigma = np.ravel(errors)
else:
sigma = np.ones(ydata.shape)
mask = (0 < sigma) & (sigma < np.inf)
ydata = ydata[mask]
sigma = sigma[mask]
if self.n_dim >= 2:
xdata = np.array(list(map(np.ravel, np.meshgrid(*self.centers, indexing='ij'))))
xdata = np.array([xd[mask] for xd in xdata])
else:
xdata = self.centers[0][mask]
# special case for scipy rvs functions
if hasattr(func, '__self__'):
shapes = func.__self__.shapes
if shapes:
n_args = 2 + len(shapes.split(','))
else:
n_args = 2
funcs = {
1: (lambda x, a: func(x, a)),
2: (lambda x, a,b: func(x, a,b)),
3: (lambda x, a,b,c: func(x, a,b,c)),
4: (lambda x, a,b,c,d: func(x, a,b,c,d)),
5: (lambda x, a,b,c,d,e: func(x, a,b,c,d,e)),
}
f = funcs[n_args]
else:
f = func
return optimize.curve_fit(f, xdata, ydata, sigma=sigma, **kw)
[docs] def spline_fit(self, log=False, floor=None, *a, **kw):
"""
Get a scipy spline fit to the histogram.
:type log: bool
:param log: whether to fit in log-value or linear-value
:type floor: float
:param floor: 10**floor is the arbitrary small number to stand in for
zeros when ``log`` is true. If not set, log10 of 0.1 times the
smallest nonzero value will be used.
:return: :class:`SplineFit`
This method produces a spline fit to the histogram values for 1D or 2D
histograms. The domain of the fitted spline will be the same as that
of the histogram.
"""
errors = self.errors
#if errors is not None and not np.all(~np.isfinite(errors)):
# errors = np.mean(np.atleast_2d(errors), axis=0)
if log and floor is None:
floor = np.log(0.1 * self.values[self.values > 0].min())
if self.n_dim == 1:
x = np.r_[self.bins[0][0], self.centers[0], self.bins[0][-1]]
if self.log[0]:
x = np.log10(x)
y = breakout_1d(self.values)
err = breakout_1d(errors)
if np.all(err == 0) or not np.any(np.isfinite(err)):
err = np.ones_like(err)
if log:
with warnings.catch_warnings():
warnings.filterwarnings('ignore')
logy = np.log(y)
err /= y
logy[y == 0] = floor
y = logy
with warnings.catch_warnings():
warnings.filterwarnings('ignore')
mask = err > 0
spl = interpolate.UnivariateSpline(
x[mask], y[mask], 1 / err[mask], *a, **kw
)
return SplineFit(self, spl, self.log, log, floor)
elif self.n_dim == 2:
x = np.r_[self.bins[0][0], self.centers[0], self.bins[0][-1]]
if self.log[0]:
x = np.log10(x)
y = np.r_[self.bins[1][0], self.centers[1], self.bins[1][-1]]
if self.log[1]:
y = np.log10(y)
z = breakout_2d(self.values)
if log:
with warnings.catch_warnings():
warnings.filterwarnings('ignore')
logz = np.log(z)
logz[z == 0] = floor
z = logz
s = interpolate.RectBivariateSpline(
x, y, z, *a, **kw
)
return SplineFit(self, s, self.log, log, floor)
else:
args = [
np.r_[self.bins[i][0], self.centers[i], self.bins[i][-1]]
for i in xrange(self.n_dim)
]
for i in xrange(self.n_dim):
if self.log[i]:
args[i] = np.log10(args[i])
args = list(np.meshgrid(*args, indexing='ij'))
V = np.pad(self.values, 1, mode='edge')
if log:
V = np.log(V)
args.append(V)
args += a
s = interpolate.Rbf(*args, **kw)
return SplineFit(self, s, self.log, log, floor)
# smoothing
[docs] def gaussian_filter(self, *a, **kw):
"""
Smooth both values and errors with ``scipy.ndimage.gaussian_filter()``.
"""
values = ndimage.gaussian_filter(self.values, *a, **kw)
errors = ndimage.gaussian_filter(self.errors, *a, **kw)
return Hist(self.bins, values, errors)
# smoothing
[docs] def gaussian_filter1d(self, *a, **kw):
"""
Smooth both values and errors with ``scipy.ndimage.gaussian_filter1d()``.
"""
values = ndimage.gaussian_filter1d(self.values, *a, **kw)
errors = ndimage.gaussian_filter1d(self.errors, *a, **kw)
return Hist(self.bins, values, errors)
# Hist-Hist operations
[docs] def matches(self, other):
"""True if self and other have the same binning."""
#return True
for (sbins, obins) in zip(self.bins, other.bins):
n = len(sbins)
if len(obins) != n:
return False
deltas = np.abs(sbins - obins)
sdiff, odiff = np.diff(sbins), np.diff(obins)
diff = .5 * (sdiff + odiff)
diff_deltas = np.abs(sdiff - odiff)
ratios = np.maximum(deltas[:-1] / diff, deltas[1:] / diff)
if np.max(ratios) > 1e-5:
return False
return True
def assert_match(self, other):
if not self.matches(other):
raise ValueError('histograms do not have matching binning')
def __add__(a, b):
"""
Add two histograms with matching bins, or a scalar and a histogram.
"""
if isinstance(b, Hist):
a.assert_match(b)
values = a.values + b.values
if a.errors is not None and b.errors is not None:
errors = np.sqrt(a.errors**2 + b.errors**2)
else:
errors = None
return Hist(a.bins, values, errors)
else:
return Hist(a.bins, a.values + b, a.errors)
def __radd__(a, b):
"""Add a scalar and a histogram."""
return a + b
def __sub__(a, b):
"""
Subtract two histograms with matching bins, or a scalar from a
histogram.
"""
if isinstance(b, Hist):
a.assert_match(b)
values = a.values - b.values
if a.errors is not None and b.errors is not None:
errors = np.sqrt(a.errors**2 + b.errors**2)
else:
errors = None
return Hist(a.bins, values, errors)
else:
return Hist(a.bins, a.values - b, a.errors)
def __rsub__(a, b):
"""
Subtract a histogram from a scalar, returning a new histogram.
"""
return (-a) + b
def __mul__(a, b):
"""
Multiply two histograms with matching bins, or a scalar and a histogram.
"""
if isinstance(b, Hist):
a.assert_match(b)
values = a.values * b.values
if a.errors is not None and b.errors is not None:
errors = values * np.sqrt(
(a.errors / a.values)**2 + (b.errors / b.values)**2)
else:
errors = None
return Hist(a.bins, values, errors)
else:
return Hist(a.bins, b * a.values, abs(b) * a.errors)
def __rmul__(self, scalar):
"""
Multiply a scalar and a histogram.
"""
return self * scalar
def __div__(a, b):
"""
Divide two histograms with matching bins, or histogram by a scalar.
"""
if isinstance(b, Hist):
a.assert_match(b)
with warnings.catch_warnings():
warnings.filterwarnings('ignore')
values = a.values / b.values
if a.errors is not None and b.errors is not None:
errors = values * np.sqrt(
(a.errors / a.values)**2 + (b.errors / b.values)**2)
else:
errors = None
return Hist(a.bins, values, errors)
else:
b = 1.0 * b
return Hist(a.bins, a.values / b, a.errors / b)
__truediv__ = __div__
def __pow__(a, b):
"""
Raise to a histogram with matching bins, or a scalar power.
"""
if isinstance(b, Hist):
a.assert_match(b)
values = a.values ** b.values
if a.errors is not None and b.errors is not None:
with warnings.catch_warnings():
warnings.filterwarnings('ignore')
errorsa = values * b.values * a.errors / a.values
errorsb = values * np.log(a.values) * b.errors
errors = np.sqrt(errorsa**2 + errorsb**2)
else:
errors = None
return Hist(a.bins, values, errors)
else:
values = a.values**b
with warnings.catch_warnings():
warnings.filterwarnings('ignore')
errors = values * b * a.errors / a.values
return Hist(a.bins, values, errors)
def __rpow__(self, scalar):
"""
Raise a scalar to powers given by a histogram.
"""
values = scalar**self.values
errors = values * np.log(scalar) * self.errors
return Hist(self.bins, values, errors)
[docs] def efficiency(self, base_hist):
"""Get an efficiency Hist for this Hist divided by base_hist.
:type base_hist: :class:`Hist`
:param base_hist: The base histogram, of which this one should be a
subset.
This method differs from __div__ in the way that errors are propagated.
"""
keep = self
orig = base_hist
rej = orig - keep
use_errors = self.errors is not None and base_hist.errors is not None
if use_errors:
rej._errors = np.sqrt(base_hist.errors**2 - self.errors**2)
eff = keep / orig
nkeep = keep.values
nrej = rej.values
if use_errors:
with warnings.catch_warnings():
warnings.filterwarnings('ignore')
eff._errors = np.sqrt(
(nrej / (nkeep+nrej)**2 * keep.errors)**2
+ (nkeep / (nkeep+nrej)**2 * rej.errors)**2 )
return eff
# self operations
def __neg__(self):
"""
Get a histogram with opposite-signed values.
"""
return Hist(self.bins, -self.values, self.errors)
def abs(self):
return Hist(self.bins, np.abs(self.values), self.errors)
def exp(self):
values = np.exp(self.values)
errors = values * self.errors
return Hist(self.bins, values, errors)
def log_base(self, base):
mask = self.values > 0
values = np.nan * np.ones_like(self.values)
errors = np.nan * np.ones_like(self.values)
lb = np.log(base)
values[mask] = np.log(self.values[mask]) / lb
errors[mask] = self.errors[mask] / (self.values[mask] * lb)
return Hist(self.bins, values, errors)
def ln(self):
return self.log_base(np.e)
def log2(self):
return self.log_base(2)
def log10(self):
return self.log_base(10)
def log1p(self):
values = np.log1p(self.values) / lb
# TODO: check error propagation here
errors = self.errors / (self.values * lb)
return Hist(self.bins, values, errors)
def sqrt(self):
return self ** 0.5
# pretty
def __repr__(self):
out = ['Hist(']
# binning
for dim in xrange(self.n_dim):
out.append('{} bins in [{},{}], '.format(
self.n_bins[dim], self.range[dim][0], self.range[dim][1]
))
out.append('with {} {}'.format(
'sum' if self.n_dim else 'value',
self.values.sum()))
if self.n_dim:
out.append(', {} empty bins,'.format(np.sum(self.values == 0)))
out.append(' and {} non-finite values'.format(
np.sum(~np.isfinite(self.values))))
out.append(')')
return ''.join(out)
def _regularize_data(data):
# validate data
try:
try:
len(data[0])
except:
data = (data,)
n_dims = len(data)
n_samples = len(data[0])
for i in xrange(1, n_dims):
assert len(data[i]) == n_samples
except:
raise ValueError(
'could not interpret `data`(check for matching array lengths)')
return data, n_dims, n_samples
[docs]def hist(data, weights=None,
bins=10, range=None, log=False,
round_int_bins=False,
keep_data=False):
"""Factory function for :class:`Hist`.
:type data: array-like or tuple of array-like
:param data: the source data for the histogram: a tuple of array-like
each of length(number of samples), or just a single array of that
length
:type weights: array-like
:param weights: the weights of the source datapoints
:type bins: sequence or int, optional
:param bins: a numpy.histogramdd() bins specification
:type range: sequence, optional
:param range: a numpy.histogramdd() range specification
:type log: sequence or bool
:param log: if `bins` gives bin counts rather than edges, log=True causes
logarithmic bin edges to be chosen(can be given per-dimension)
:type keep_data: bool
:param keep_data: whether to keep the data and weights for the histogram
:return: the :class:`Hist`
"""
data, n_dims, n_samples = _regularize_data(data)
data = tuple(np.asarray(a) for a in data)
# regularize bins argument
if bins is not None:
if isinstance(bins, tuple):
bins = list(bins)
else:
try:
len(bins[0])
except:
bins = n_dims * [bins]
# regularize range argument
if range is None:
range = []
for b in bins:
if hasattr(b, '__iter__'):
range.append((np.min(b), np.max(b)))
else:
range.append(None)
if isinstance(range, tuple):
if len(range) == 2 and not (
isinstance(range[0], tuple) or isinstance(range[1], tuple)):
range = n_dims * [range]
# regularize log argument
try:
len(log)
except:
log = n_dims * [log]
# ensure weights
if weights is None:
weights = np.ones(n_samples)
# ignore non-finite values
good_idx = np.isfinite(weights)
for dim in xrange(n_dims):
good_idx *= np.isfinite(data[dim])
# ensure ranges
if np.sum(good_idx) == 0:
full_range = range
else:
full_range = [
(a[good_idx].min(),
a[good_idx].max())
for a in data]
log_range = [
(a[good_idx * (a > 0)].min(),
a[good_idx * (a > 0)].max())
if np.sum(good_idx * (a > 0))
else (np.nan, np.nan)
for a in data]
np_range = [r if r not in (None, (None,None)) else fr
for (r, fr) in zip(range, full_range)]
# get log bins if requested, but edges not specified
for dim in xrange(n_dims):
if log[dim]:
try:
len(bins[dim])
except:
if range and range[dim]:
a, b = list(map(np.log10, range[dim]))
else:
a, b = np.log10(log_range[dim])
bins[dim] = np.logspace(a, b, bins[dim] + 1)
np_range[dim] = (a,b)
# validate ranges
for dim in xrange(n_dims):
if sum(~np.isfinite(np_range[dim])):
raise ValueError(
'NaN found in range for dimension {0}'.format(dim))
data = tuple(a[good_idx] for a in data)
weights = weights[good_idx]
if bins:
if len(data) != len(bins):
raise ValueError(
'data dimensions({}) must match number of bin({})'
' specifications'.format(len(data), len(bins)))
if len(data) != len(range):
raise ValueError(
'data dimensions must match number of range specification')
# deal with int-valued axes
if round_int_bins:
for dim in xrange(n_dims):
if 'i' in data[dim].dtype.descr[0][1]:
xmin, xmax = orig_xmin, orig_xmax = orig_range = np_range[dim]
orig_n = bins[dim]
n = min(orig_n, xmax - xmin)
dx = lambda xmin, xmax, n: 1. * (xmax - xmin) / n
orig_dx = dx(orig_xmin, orig_xmax, orig_n)
while dx(xmin, xmax, n) % 1 > 1e-5:
xmax += 1
np_range[dim] = xmin, xmax
bins[dim] = int((xmax - xmin) / dx(xmin, xmax, n))
# build histogram
values, edges = np.histogramdd(data, weights=weights,
bins=bins, range=np_range)
errors = np.sqrt(np.histogramdd(data, weights=weights**2,
bins=bins, range=np_range)[0])
others = dict(data=data, weights=weights) if keep_data else {}
return Hist(edges, values, errors=errors, **others)
[docs]def hist_like(other, data, weights=None, keep_data=False):
"""Create a :class:`Hist` using the same binning as ``other``.
:type other: :class:`Hist`
:param other: the other Hist
:type data: array-like
:param data: the data to be histogrammed. For multidimensional
histograms, ``data`` will be transposed for input into
``np.histogramdd()``.
:type weights: array-like
:param weights: the weights
:return: the :class:`Hist`
"""
if len(np.shape(data)) > 1:
data = np.transpose(data)
bins = other.bins
values, bins = np.histogramdd(data, bins=bins, weights=weights)
if weights is not None:
sq_errors, bins = np.histogramdd(data, bins=bins, weights=weights**2)
errors = np.sqrt(sq_errors)
else:
errors = np.sqrt(values)
others = dict(data=data, weights=weights) if keep_data else {}
return Hist(bins, values, errors, **others)
[docs]def hist_like_indices(other, ravel_indices, weights=None):
"""
Create a :class:`Hist` using pre-computed per-sample indices from ``other``.
:type other: :class:`Hist`
:param other: the pre-existing histogram which defines the binning
:type ravel_indices: array-like
:param ravel_indices: result of a :meth:`Hist.ravel_indices` call for the
samples.
:type weights: array-like
:param weights: the weights of the source datapoints
"""
bins = copy.deepcopy(other.bins)
if weights is None:
weights = np.ones(ravel_indices.size)
mask = ravel_indices >= 0
values = np.bincount(
ravel_indices[mask], weights[mask], minlength=other.values.size)
errors = np.sqrt(np.bincount(
ravel_indices[mask], weights[mask]**2, minlength=other.values.size))
shape = other.values.shape
return Hist(bins, values.reshape(shape), errors.reshape(shape))
[docs]def hist_direct(data, weights=None, bins=None, range=None, keep_data=False):
"""Fast factory function for :class:`Hist`.
:type data: array-like
:param data: the data to be histogrammed. For multidimensional
histograms, ``data`` will be transposed for input into
``np.histogramdd()``.
:type weights: array-like
:param weights: the weights
:type bins: sequence or int, optional
:param bins: a ``np.histogramdd()`` bins specification
:type range: sequence, optional
:param bins: a ``np.histogramdd()`` range specification
:type keep_data: bool
:param keep_data: whether to keep the data and weights for the histogram
:return: the :class:`Hist`
This method creates a :class:`Hist` by calling ``np.histogramdd()`` as
directly as possible. Note that this requires a slightly more constrained
format for ``data`` compared with :meth:`hist`.
"""
if len(np.shape(data)) > 1:
data = np.transpose(data)
values, bins = np.histogramdd(
data, bins=bins, range=range, weights=weights)
if weights is not None:
sq_errors, bins = np.histogramdd(
data, bins=bins, range=range, weights=weights**2)
errors = np.sqrt(sq_errors)
else:
errors = np.sqrt(values)
others = dict(data=data, weights=weights) if keep_data else {}
return Hist(bins, values, errors, **others)
[docs]def hist_from_function(bins, func, *args, **kwargs):
"""
[Deprecated] Create a :class:`Hist` by evaluating ``func`` at the centers
of specified ``bins``.
:type bins: list of np.array
:param bins: the bin edges
:type func: function
:param func: the function to evaluate. it should return floats and respect
numpy broadcasting rules. if ``splat`` is true, it should take
``len(bins)`` (that is, n_dim) arguments; otherwise, it should accept
a single argument containing all values of all independent variables
:type err_func: function
:param err_func: if given, the function that returns the uncertainty at
each part of the parameter space
:type splat: bool
:param splat: determines the signature of ``func`` as described above
(default: True)
:return: the :class:`Hist`
Any additional positional or keyword arguments are passed to ``func``.
Note: This function is now deprecated; use :meth:`hist_from_eval` instead.
"""
warnings.warn(
'this function is deprecated; use hist_from_eval() instead',
DeprecationWarning)
err_func = kwargs.pop('err_func', None)
splat = kwargs.pop('splat', True)
bins = copy.deepcopy(bins)
shape = tuple(len(b) - 1 for b in bins)
dummy_hist = Hist(bins, np.zeros(shape))
if len(bins) > 1:
mesh_centers = list(np.meshgrid(*dummy_hist.centers, indexing='ij'))
else:
mesh_centers = [dummy_hist.centers[0]]
if splat:
pos_args = mesh_centers + list(args)
else:
pos_args = [np.array([mesh_centers])] + list(args)
values = func(*pos_args, **kwargs)
if err_func is not None:
errors = err_func(*pos_args, **kwargs)
else:
errors = None
return Hist(bins, values, errors)
[docs]def hist_slide(Ns, data, weights=None,
*args, **kwargs):
"""
Construct a "histogram" from ``N`` partially-overlapping Hist's.
:type Ns: int or sequence of int
:param Ns: number of sliding iterations, optionally per-axis
:type data: array-like or tuple of array-like
:param data: the source data for the histogram: a tuple of array-like
each of length(number of samples), or just a single array of that
length
:type weights: array-like
:param weights: the weights of the source datapoints
:type indices: sequence of array-like
:param indices: ravel_indices for each individual histogram, obtained from
a previous hist_slide call
:type get_indices: bool
:param get_indices: if True, obtain ravel_indices values for use in later
hist_slide calls
"""
indices = kwargs.pop('indices', None)
get_indices = kwargs.pop('get_indices', False)
out_indices = []
# get main ordinary hist
data, n_dims, n_samples = _regularize_data(data)
if indices is None:
h_main = hist(data, weights, *args, **kwargs)
else:
empty = [[1] for i in xrange(n_dims)]
h_main = hist(empty, *args, **kwargs)
h_main = hist_like_indices(h_main, indices[0], weights=weights)
if get_indices:
out_indices.append(h_main.ravel_indices(*data))
orig_bins = h_main.bins
centers = [ [c[-1]] for c in h_main.centers ]
# determine output shape
if isinstance(Ns, int):
Ns = [Ns if h_main.n_bins[axis] >= 2 else 1 for axis in xrange(n_dims)]
out_shape = tuple(
(h_main.values.shape[i] - 1) * N + 1 if N >= 2 else h_main.values.shape[i]
for (i, N) in enumerate(Ns)
)
# store values from h_main
values = np.zeros(out_shape)
errors = np.zeros(out_shape)
slices = tuple(
slice(0, out_shape[i], N)
for (i, N) in enumerate(Ns)
)
values[slices] = h_main.values
errors[slices] = h_main.errors
slide_ranges = [
np.arange(0, N)
for (i,N) in enumerate(Ns)
]
# slide bins to fill in remaining values
for (i_step, i_slides) in enumerate(itertools.product(*slide_ranges)):
bins = copy.deepcopy(orig_bins)
lasts = []
for axis in xrange(n_dims):
b = bins[axis]
lasts.append(out_shape[axis] - 1)
if Ns[axis] == 1:
continue
if i_slides[axis] == 0:
bins[axis] = b
lasts[-1] += 1
elif h_main.log[axis]:
bins[axis] = np.exp(
np.log(b[:-1])
+ 1.0 * i_slides[axis] / Ns[axis] * np.diff(np.log(b))
)
else:
bins[axis] = (
b[:-1]
+ 1.0 * i_slides[axis] / Ns[axis] * np.diff(b)
)
if indices is None:
h = hist_direct(data, weights=weights, bins=bins)
else:
empty = [[1] for i in xrange(n_dims)]
h = hist(empty, bins=bins)
h = hist_like_indices(h, indices[i_step+1], weights=weights)
if get_indices:
out_indices.append(h.ravel_indices(*data))
for axis in xrange(n_dims):
if len(centers[axis]) < out_shape[axis] - 1:
centers[axis] = np.unique(np.r_[centers[axis], h.centers[axis]])
slices = tuple(
slice(i_slide, lasts[i] if Ns[i] >= 2 else out_shape[i], Ns[i])
for (i, i_slide) in enumerate(i_slides)
)
values[slices] = h.values
errors[slices] = h.errors
# get non-overlapping binning
bins = []
for axis in xrange(n_dims):
if not Ns[axis] >= 2:
bins.append(orig_bins[axis])
else:
c = centers[axis]
if h_main.log[axis]:
log_c = np.log(c)
log_delta = np.diff(log_c)
log_delta = np.r_[log_delta, log_delta[-1]]
b = np.exp(np.r_[
log_c - .5 * log_delta, log_c[-1] + .5 * log_delta[-1]
])
else:
delta = np.diff(c)
delta = np.r_[delta, delta[-1]]
b = np.r_[c - .5 * delta, c[-1] + .5 * delta[-1]]
b[[0,-1]] = orig_bins[axis][[0,-1]]
bins.append(b)
if get_indices:
return Hist(bins, values, errors), out_indices
else:
return Hist(bins, values, errors)
[docs]def hist_bootstrap(N,
data, weights=None,
*args,
**kwargs):
"""
Like ``hist()``, but for N iterations of sampling from ``data`` with
replacement.
:type stacked: bool
:param stacked: if True, output is an ndim+1 dimensional Hist where the
first dimension has n_bins=``N`` and values are sorted along this first
dimension. otherwise output is ndim dimensional
:type errors: str
:param errors: 'original' to obtain errorbars from standard
(non-bootstrapped) Hist; 'bootstrap' to obtain errors from containment
interval along the Hist that would have been returned if
``stacked=True``. default is 'bootstrap'
:type frac: float in [0,1]
:param frac: fraction of samples to use in each iteration. default is 1.0
:type slide_Ns: float or sequence of float
:param slide_Ns: if given, use hist_slide() instead of hist(), with the
given number(s) of slided binnings
"""
data, n_dims, n_samples = _regularize_data(data)
stacked = kwargs.pop('stacked', False)
error_type = kwargs.pop('errors', 'original')
slide_Ns = kwargs.pop('slide_Ns', 1)
frac = kwargs.pop('frac', 1)
hs = []
all_i = np.arange(N)
for i in all_i:
mask = np.random.randint(0, n_samples, int(frac * n_samples))
data_i = tuple(d[mask] for d in data)
weights_i = None if weights is None else weights[mask]
hs.append(hist_slide(slide_Ns, data_i, weights_i, *args, **kwargs))
all_bins = [np.arange(N)] + hs[0].bins
shape = (N,) + tuple([len(b) - 1 for b in all_bins[1:]])
all_values = np.vstack([h.values for h in hs]).reshape(shape)
#errors = np.vstack([h.errors for h in hs])
all_values = np.sort(all_values, axis=0)
all_errors = None
h_stacked = Hist(all_bins, all_values, all_errors)
if stacked:
return h_stacked
ilow, imed, ihigh = np.array(N * stats.norm.cdf([-1,0,1]), dtype=int)
values = all_values[imed]
errors = np.vstack((values - all_values[ilow], all_values[ihigh] - values))
h = Hist(all_bins[1:], values, errors)
if error_type == 'original':
h_orig = hist_slide(slide_Ns, data, weights, *args, **kwargs)
h._errors = h_orig.errors
return h
elif error_type == 'bootstrap':
return h
else:
raise ValueError('`errors` must be one of "original" or "bootstrap"')
[docs]def hist_from_eval(f, vectorize=True, err_f=None, ndim=None, **kwargs):
"""
Create a :class:`Hist` by evaluating a function.
:type f: callable
:param f: the function to evaluate
:type ndim: int
:param ndim: number of arguments to function
:type vectorize: bool
:param vectorize: whether ``numpy.vectorize`` is needed to evaluate ``f``
over many sets of values
:type err_f: callable
:param err_f: if given, function to evaluate to obtain "errors"
All other keyword arguments define the binning the same as for
:meth:`hist`.
This function supersedes :meth:`hist_from_function`.
"""
# infer number of arguments
try:
spec = inspect.getargspec(f)
ismethod = inspect.ismethod(f)
except:
spec = inspect.getargspec(f.__call__)
ismethod = inspect.ismethod(f.__call__)
if ndim is None:
ndim = len(spec.args) - len(spec.defaults or [])
if ismethod:
ndim -= 1
# get empty histogram
if ndim <= 0:
raise ValueError('could not determine valid ndim; obtained {}'.format(ndim))
empty = [[] for i in xrange(ndim)]
h = hist(empty, **kwargs)
# vectorize if needed
if vectorize:
F = np.vectorize(f)
if err_f is not None:
err_F = np.vectorize(err_f)
else:
err_F = None
else:
F = f
err_F = err_f
# obtain values and possibly errors
errors = np.zeros(h.values.shape)
if ndim == 1:
X = h.centers[0]
values = F(X)
if err_F is not None:
values = err_F(X)
else:
X = np.meshgrid(*h.centers, indexing='ij')
values = F(*map(np.ravel, X)).reshape(X[0].shape)
if err_F is not None:
values = err_F(*map(np.ravel, X)).reshape(X[0].shape)
h._values = values
h._errors = errors
return h
def kde(data, weights=None, kernel=.01, normalize=True,
indices=None, get_indices=False, **kw):
data, n_dim, n_samples = _regularize_data(data)
if indices is None:
kw['bins'] = kw.pop('kde_bins', int(np.ceil(1000 / n_dim)))
h = hist(data, weights=weights, **kw)
else:
empty = [[1] for i in xrange(n_dim)]
h = hist(empty, **kw)
h = hist_like_indices(h, indices, weights=weights)
orig_kernel = kernel
if not np.shape(kernel):
kernel = h.n_dim * [kernel]
kernel_bins = [
kernel[axis] * h.n_bins[axis]
for axis in xrange(h.n_dim) ]
h = h.gaussian_filter(kernel_bins, truncate=20)
if normalize:
h = h.normalize(axes=None if normalize is True else normalize)
h.kernel, h.kernel_bins = orig_kernel, kernel_bins
if get_indices:
return h, h.ravel_indices(*data)
else:
return h
def profile(data, weights, f=hist, **kw):
# mean: weighted by value, divided by pure counting
if f is kde:
kw.setdefault('normalize', False)
h_count = f(data, **kw)
h_weight = f(data, weights=weights, **kw)
h_mean = h_weight / h_count
# error: stderr = rms deviation
dev = weights - h_mean(data)
h_dev2 = f(data, dev**2, **kw)
h_stddev = (h_dev2 / h_count**2).sqrt()
h_mean._errors = h_stddev.values
#if f is kde:
# h_mean._errors /= np.asarray(h_count.kernel_bins)
return h_mean
def save(h, filename):
np.save(filename, (h.bins, h.values, h.errors))
if len(filename) >= 5 and filename [-4:] != '.npy':
os.rename(filename + '.npy', filename)
def load(filename):
return Hist(*np.load(filename))
[docs]class SplineFit(object):
"""
Wrapper for spline fits to histograms.
"""
def __init__(self, hist, spline, bin_logs, log, floor):
self.bins = hist.bins
self.range = hist.range
self.spline = spline
self.bin_logs = bin_logs
self.log = log
self.floor = floor
@property
def n_dim(self):
return len(self.bins)
def __call__(self, *args):
"""
Get the spline-fitted values.
"""
if self.n_dim == 1:
assert len(args) == 1
X = np.atleast_1d(args[0])
mask = (self.bins[0][0] <= X) & (X <= self.bins[0][-1])
if self.bin_logs[0]:
X = np.log10(X)
out = np.zeros_like(X, dtype=float)
out[mask] = self.spline(X[mask])
out[~mask] = np.nan
if self.log:
with warnings.catch_warnings():
warnings.filterwarnings('ignore')
zero_mask = out <= self.floor
#out = 10**out
out = np.exp(out)
out[zero_mask] = 0
return out
elif self.n_dim == 2:
assert len(args) == 2
X = np.atleast_1d(args[0])
Y = np.atleast_1d(args[1])
if not X.shape == Y.shape:
raise ValueError(
'shapes of X({}) and Y({}) do not match'.format(
X.shape, Y.shape))
out = np.zeros_like(X, dtype=float)
mask = ((self.bins[0][0] <= X) & (X <= self.bins[0][-1])
& (self.bins[1][0] <= Y) & (Y <= self.bins[1][-1]))
if self.bin_logs[0]:
X = np.log10(X)
if self.bin_logs[1]:
Y = np.log10(Y)
if np.any(mask):
out[mask] = self.spline.ev(X[mask], Y[mask])
out[~mask] = np.nan
if self.log:
with warnings.catch_warnings():
warnings.filterwarnings('ignore')
zero_mask = out <= self.floor
#out = 10**out
out = np.exp(out)
out[zero_mask] = 0
return out
else:
assert len(args) == self.n_dim
args = list(map(np.atleast_1d, args))
assert len(set(map(np.shape, args))) == 1
out = np.empty(len(args[0]), dtype=float)
mask = np.product([
(self.bins[i][0] <= args[i]) & (args[i] <= self.bins[i][-1])
for i in xrange(self.n_dim)
], dtype=bool)
for i in xrange(self.n_dim):
if self.bin_logs[i]:
args[i] = np.log10(args[i])
if np.any(mask):
args_masked = [args[i][mask] for i in xrange(self.n_dim)]
out[mask] = self.spline(*args_masked)
out[~mask] = np.nan
if self.log:
with warnings.catch_warnings():
warnings.filterwarnings('ignore')
zero_mask = out <= self.floor
#out = 10**out
out = np.exp(out)
out[zero_mask] = 0
return out
#raise ValueError('>2D Hist spline fits are not thought to exist')
def eval(self, *a, **kw):
if not hasattr(self, 'range'):
self.range = [(b[0], b[-1]) for b in self.bins]
kw.setdefault('ndim', self.n_dim)
kw.setdefault('range', self.range)
return hist_from_eval(self, *a, **kw)
[docs]class LineStyle(object):
"""Style object for 1D :class:`Hist` s."""
def __init__(self,
line=True,
marker='None',
errorbars=False,
errorcaps=False,
errorbands=False,
xerrorbars=False,
crosses=False,
poisson=False,
**kwargs
):
"""Initialize a LineStyle.
:type line: bool
:param line: Whether to draw a line.
:type marker: str(optional)
:param marker: The marker specification
:type errorbars: bool
:param errorbars: Whether to draw errorbars.
:type errorcaps: bool
:param errorcaps: Whether to draw error bar caps.
:type errorbands: bool
:param errorbands: Whether to draw error bands.
:type xerrorbars: bool
:param xerrorbars: Whether to draw x errorbars.
:type crosses: bool
:param crosses: bool
:type poisson: bool
:param poisson: Whether to show Poisson errorbars.
All other keyword args are saved to be passed to
matplotlib.axes.Axes.errorbar(). Note that drawstyle should not be
specified; if line == True, then drawstyle='steps-post' will be used.
"""
self._kwargs = {}
self.line = True
self.marker = 'None'
self.errorbars = False
self.errorcaps = False
self.errorbands = False
self.xerrorbars = False
self.poisson = False
self.update(
line=line, marker=marker,
errorbars=errorbars, errorcaps=errorcaps,
errorbands=errorbands, xerrorbars=xerrorbars,
crosses=crosses, poisson=poisson,
**kwargs)
[docs] def update(self, **kwargs):
"""Update the keyword args with the given values."""
crosses = kwargs.pop('crosses', False)
if crosses:
kwargs['errorbars'] = True
kwargs['xerrorbars'] = True
kwargs['line'] = False
self.line = kwargs.pop('line', self.line)
self.marker = kwargs.pop('marker', self.marker)
self.errorbars = kwargs.pop('errorbars', self.errorbars)
self.errorcaps = kwargs.pop('errorcaps', self.errorcaps)
self.errorbands = kwargs.pop('errorbands', self.errorbands)
self.xerrorbars = kwargs.pop('xerrorbars', self.xerrorbars)
self.poisson = kwargs.pop('poisson', self.poisson)
self._kwargs.update(copy.deepcopy(kwargs))
[docs] def copy(self, **kwargs):
"""Get a copy of this LineStyle, updating the given keyword args.
All arguments accepted by the :class:`LineStyle` constructor may be
given, including line, markers, errorbars, errorcaps, and arbitrary
matplotlib arguments.
"""
out = copy.deepcopy(self)
out.update(**kwargs)
return out
@property
def line(self):
"""Whether to draw a line."""
return self._line
@line.setter
def line(self, line):
self._line = line
@property
def markers(self):
"""Whether to draw point markers."""
return self.marker not in ('None',)
@property
def marker(self):
"""The marker to use."""
return self._marker
@marker.setter
def marker(self, marker):
self._marker = marker
self._kwargs['marker'] = marker
@property
def errorbars(self):
"""Whether to draw error bars."""
return self._errorbars
@errorbars.setter
def errorbars(self, errorbars):
self._errorbars = errorbars
@property
def errorbands(self):
"""Whether to draw error bands."""
return self._errorbands
@errorbands.setter
def errorbands(self, errorbands):
self._errorbands = errorbands
@property
def errorcaps(self):
"""Whether to draw error bar caps."""
return self._errorcaps
@errorcaps.setter
def errorcaps(self, errorcaps):
self._errorcaps = errorcaps
if not self.errorcaps:
self._kwargs['capsize'] = 0
else:
self._kwargs.pop('capsize', None)
@property
def xerrorbars(self):
"""Whether to draw error bars."""
return self._xerrorbars
@xerrorbars.setter
def xerrorbars(self, xerrorbars):
self._xerrorbars = xerrorbars
@property
def poisson(self):
"""Whether to draw Poisson error bars."""
return self._poisson
@poisson.setter
def poisson(self, poisson):
self._poisson = poisson
@property
def kwargs(self):
"""Keyword args for matplotlib.axes.Axes.errorbar()."""
out = copy.deepcopy(self._kwargs)
if (not self.errorbars) and (not self.xerrorbars):
out['elinewidth'] = np.finfo(float).tiny
else:
out.pop('elinewidth', None)
return out
[docs]def plot1d(ax, h=None, style=None, **kwargs):
"""
Plot 1D :class:`Hist` ``h`` on matplotlib Axes ``ax``.
:type ax: matplotlib Axes
:param ax: if given, the axes on which to plot
:type h: :class:`Hist`
:param h: the 1D histogram
:type style: :class:`LineStyle`
:param style: the line style
Other keyword arguments are propagated to pyplot.errorbar() and
pyplot.plot() as appropriate.
"""
if h is None:
h = ax
ax = plt.gca()
if h.n_dim != 1:
raise TypeError('`h` must be a 1D Hist')
if style is None:
style = LineStyle()
style.update(**kwargs)
kw = copy.deepcopy(style.kwargs)
if 'markers' in kw:
del kw['markers']
legend_kw = dict(label=kw.get('label', ''))
# get errorband stuff out of the way early
bkw = {}
bkw['alpha'] = kw.pop('ebandalpha', .5) * kw.get('alpha', 1.0)
ebandlabel = kw.pop('ebandlabel', '')
n = h.n_bins[0]
x = h.centers[0]
y = h.values
err = h.errors
if style.poisson:
alpha = 2 * stats.norm.sf(1)
# see
# https://en.wikipedia.org/wiki/Poisson_distribution#Confidence_interval
low = np.where(y == 0, 0, stats.chi2.ppf(alpha / 2, 2 * y) / 2.)
high = stats.chi2.ppf(1 - alpha / 2, 2 * y + 2) / 2.
err = [y - low, high - y]
color = kw.get('color', None)
legend_line = False
if style.markers or style.errorbars or style.xerrorbars:
ekw = copy.deepcopy(kw)
ekw['linestyle'] = 'none'
if 'ls' in ekw:
del ekw['ls']
if 'drawstyle' in ekw:
del ekw['drawstyle']
ekw['color'] = color
ekw['label'] = ''
if color is None:
ekw.pop('color')
if style.xerrorbars:
ekw['xerr'] = [x - h.bins[0][:-1], h.bins[0][1:] - x]
else:
ekw['xerr'] = None
if style.errorbars:
ekw['yerr'] = err
else:
ekw['yerr'] = np.zeros_like(y)
if style.errorbars or style.xerrorbars:
eline = ax.errorbar(x, y, **ekw)[0]
else:
if 'capsize' in ekw:
del ekw['capsize']
if 'elinewidth' in ekw:
del ekw['elinewidth']
eline = ax.plot(x, y, **ekw)[0]
kw['color'] = color = eline.get_color()
legend_kw.update(kw)
legend_line = True
if style.line:
lkw = {}
def keep(*keys):
for key in keys:
if key in kw:
lkw[key] = kw[key]
lkw['drawstyle'] = kw.get('drawstyle', 'steps-post')
steps = lkw['drawstyle'] == 'steps-post'
keep('alpha', 'color', 'linewidth', 'lw', 'linestyle', 'ls')
line_x = h.bins[0] if steps else h.centers[0]
line_y = np.r_[y, y[-1]] if steps else y
lline = ax.plot(line_x, line_y, **lkw)[0]
lkw['color'] = color = lline.get_color()
legend_kw.update(lkw)
legend_line = True
else:
legend_kw['linestyle'] = 'none'
# weird hack to make legend work properly without disrupting the plot
if legend_line:
legend_args = [[np.nan],
[np.nan]]
if style.errorbars:
legend_args.append([0])
if style.xerrorbars:
legend_args.append([0])
out = ax.errorbar(*legend_args, **legend_kw)
if style.errorbands:
bkw['color'] = color
bkw['lw'] = 0
steps = 'steps-post' == kw.get('drawstyle', 'steps-post')
eshape = np.shape(err)
if len(eshape) == 2 and eshape[0] == 2:
merr = err[0]
perr = err[1]
else:
perr = merr = err
if steps:
ix = np.sort(np.r_[:n, 1:n+1])
band_x = h.bins[0][ix]
iy = np.sort(np.r_[:n, :n])
band_ymin = (y - merr)[iy]
band_ymax = (y + perr)[iy]
if style.line:
res = ax.fill_between(band_x, band_ymax, band_ymin, **bkw)
else:
for ib in xrange(0, 2 * n, 2):
res = ax.fill_between(
band_x[ib:ib+2], band_ymax[ib:ib+2], band_ymin[ib:ib+2],
**bkw
)
else:
band_x = x
band_ymin = y - merr
band_ymax = y + perr
res = ax.fill_between(band_x, band_ymax, band_ymin, **bkw)
if ebandlabel:
eband_kw = dict(
color=res.get_facecolor()[0],
linestyle='-', lw=8, label=ebandlabel
)
ax.plot([np.nan, np.nan], [np.nan, np.nan],
**eband_kw)
[docs]def fill_between(ax, h1, h2,
interpolate=False,
drawstyle='steps',
**kw):
"""
Fill the region between histograms h1 and h2.
:type ax: matplotlib Axes
:param ax: the axes on which to plot
:type h1: number or :class:`Hist`
:param h1: a number or the first histogram
:type h2: number or :class:`Hist`
:param h2: a number or the second histogram
:param where: see ``pyplot.fill_between()``.
:param interpolate: see ``pyplot.fill_between()``.
:type drawstyle: str
:param drawstyle: if 'line', plot smooth curves; otherwise, plot with
histogram steps
Other keyword arguments are passed to ``pyplot.fill_between()``.
"""
if ax is None:
fig, ax = plt.subplots()
if isinstance(h1, Hist) and isinstance(h2, Hist):
h1.assert_match(h2)
bins = h1.bins[0]
centers = h1.centers[0]
y1 = h1.values
y2 = h2.values
if h1.n_dim != 1 or h2.n_dim != 1:
raise TypeError('histogram arguments must be 1D Hists')
elif isinstance(h1, Hist):
bins = h1.bins[0]
centers = h1.centers[0]
y1 = h1.values
y2 = h2 * np.ones(h1.values)
if h1.n_dim != 1:
raise TypeError('histogram arguments must be 1D Hists')
elif isinstance(h2, Hist):
bins = h2.bins[0]
centers = h2.centers[0]
y1 = h1 * np.ones_like(h2.values)
y2 = h2.values
if h2.n_dim != 1:
raise TypeError('histogram arguments must be 1D Hists')
else:
raise TypeError('one of `h1` or `h2` must be of type Hist')
steps = drawstyle != 'line'
n = len(centers)
if steps:
ix = np.sort(np.r_[:n, 1:n+1])
band_x = bins[ix]
iy = np.sort(np.r_[:n, :n])
band_ymin = y1[iy]
band_ymax = y2[iy]
else:
band_x = centers
band_ymin = y1
band_ymax = y2
label = kw.pop('label', '')
res = ax.fill_between(band_x, band_ymax, band_ymin,
interpolate=interpolate, **kw)
if label:
kw['color'] = res.get_facecolor()[0]
ax.plot([np.nan, np.nan], [np.nan, np.nan],
lw=8, label=label, **kw)
return res
[docs]def stack1d(ax, hs,
colors=None,
labels=None,
kws=None,
interpolate=False,
drawstyle='steps',
ymin=0,
**morekw):
"""
Stack histograms ``hs`` using :func:`fill_between`.
:type ax: matplotlib Axes
:param ax: the axes on which to plot
:type hs: sequence of :class:`Hist`
:param hs: the histograms
:type colors: sequence
:param colors: the fill colors
:type labels: sequence of str
:param labels: the labels
:type kws: sequence of str to value mappings
:param kws: keyword arguments for individual fills
:param interpolate: see ``pyplot.fill_between()``
:type drawstyle: str
:param drawstyle: if 'line', plot smooth curves; otherwise, plot with
histogram steps
:type ymin: number
:param ymin: minimum value(useful for semilogy plots)
Other keyword arguments are passed to each :func:`fill_between` call.
"""
if ax is None:
fig, ax = plt.subplots()
if colors is None:
colors = ['C{}'.format(i % 10) for (i,_) in enumerate(hs)]
if labels is None:
labels = ['' for h in hs]
if kws is None:
kws = [{} for h in hs]
total = ymin
outs = []
for (h, kw, color, label) in zip(hs, kws, colors, labels):
new_total = total + h
kw.update(morekw)
outs.append(fill_between(ax, total, new_total,
interpolate=interpolate,
drawstyle=drawstyle,
color=color,
label=label,
**kw))
total = new_total
return outs
[docs]def plot2d(ax, h=None,
log=False,
cbar=False,
levels=None,
**kwargs):
""" Plot 1D :class:`Hist` ``h`` on ``ax`` on a color scale.
:type ax: matplotlib Axes
:param ax: The main axes on which to plot.
:type h: :class:`Hist`
:param h: The 2D histogram to plot.
:type log: bool
:param log: Whether to use a log color scale
:type cbar: bool
:param cbar: If true, draw colorbar.
:type levels: int or array of float
:param levels: if given, plot with contourf rather than pcolormesh. if
a number is given, automatically select that many levels between vmin
and vmax.
:type zmin: float
:param zmin: Minimum value to plot with color; bins below the minimum
value will be white.
:return: If cbar, a dict containing a matplotlib.collection.QuadMesh and a
matplotlib.colorbar.Colorbar as values; otherwise, just the QuadMesh.
Other keyword arguments are passed to ax.pcolormesh().
"""
if h is None:
h = ax
ax = plt.gca()
if h.n_dim != 2:
raise TypeError('`h` must be a 2D Hist')
plotvalues = h.values.T
zmin = kwargs.pop('zmin', None)
if log:
if zmin is None:
zmin = 0
H = np.ma.array(plotvalues)
else:
H = np.ma.array(plotvalues)
H.mask += ~np.isfinite(plotvalues)
if zmin is not None:
H.mask += (plotvalues <= zmin)
xbins = h.bins[0]
ybins = h.bins[1]
Bx, By = np.meshgrid(xbins, ybins)
vmin = kwargs.pop('vmin', H.min())
vmax = kwargs.pop('vmax', H.max())
clabel = kwargs.pop('clabel', None)
cticks = None
if log:
kwargs['norm'] = mpl.colors.LogNorm(vmin, vmax)
if levels is None:
if not log:
kwargs['vmin'], kwargs['vmax'] = vmin, vmax
pc = ax.pcolormesh(Bx, By, H, **kwargs)
else:
if isinstance(levels, int):
if log:
levels = np.logspace(np.log10(vmin), np.log10(vmax), levels + 1)
cticks = 10**np.arange(
np.ceil(np.log10(vmin)), np.floor(np.log10(vmax)) + .1 )
else:
levels = np.linspace(vmin, vmax, levels + 1)
fill = kwargs.pop('fill', True)
if fill:
pc = ax.contourf(Bx[:-1,:-1], By[:-1,:-1], np.clip(H, vmin, vmax), levels, **kwargs)
else:
pc = ax.contour(Bx[:-1,:-1], By[:-1,:-1], np.clip(H, vmin, vmax), levels, **kwargs)
if h.log[0]:
ax.set_xscale('log')
if h.log[1]:
ax.set_yscale('log')
ax.set_xlim(xbins.min(), xbins.max())
ax.set_ylim(ybins.min(), ybins.max())
if cbar:
cb_kw = cbar if hasattr(cbar, 'keys') else {}
if log:
cb = ax.figure.colorbar(
pc, ax=ax, format=mpl.ticker.LogFormatterMathtext(), **cb_kw)
else:
cb = ax.figure.colorbar(pc, ax=ax, **cb_kw)
if cticks is not None:
cb.set_ticks(cticks)
if clabel:
cb.set_label(clabel)
return dict(colormesh=pc, colorbar=cb)
else:
return pc
def label2d(ax, h,
fmt='',
**kw):
if ax is None:
fig, ax = plt.subplots()
for i in xrange(h.n_bins[0]):
for j in xrange(h.n_bins[1]):
ax.text(h.centers[0][i], h.centers[1][j],
format(h.values[i,j], fmt),
ha='center', va='center',
fontdict=kw)