import numpy as np
from pycbc.filter import match
from scipy import optimize
from scipy.stats import chi2
from simple_pe.param_est.pe import SimplePESamples
from pesummary.utils.samples_dict import SamplesDict
from simple_pe.waveforms import waveform
[docs]
class Metric:
"""
A class to store the parameter space metric at a point x in parameter space,
where the variations are given by dx_directions
:param x: dictionary with parameter values for initial point
:param dx_directions: a list of directions to vary
:param mismatch: the mismatch value to use when calculating metric
:param f_low: low frequency cutoff
:param psd: the power spectrum to use in calculating the match
:param approximant: the approximant generator to use
:param tolerance: tolerance for scaling vectors
:param prob: probability to enclose within ellipse
:param snr: snr of signal, used in scaling mismatch
"""
def __init__(self, x, dx_directions, mismatch, f_low, psd,
approximant="IMRPhenomD", tolerance=1e-2, prob=None, snr=None):
"""
"""
self.x = SimplePESamples(x)
if 'distance' not in self.x:
self.x['distance'] = np.ones_like(list(x.values())[0])
self.dx_directions = dx_directions
self.ndim = len(dx_directions)
self.dxs = SimplePESamples(SamplesDict(self.dx_directions, np.eye(self.ndim)))
self.mismatch = mismatch
self.f_low = f_low
self.psd = psd
self.approximant = approximant
self.tolerance = tolerance
self.prob = prob
self.snr = snr
if self.prob:
self.n_sigma = np.sqrt(chi2.isf(1 - self.prob, self.ndim))
if self.snr:
self.mismatch = self.n_sigma ** 2 / (2 * self.snr ** 2)
self.coordinate_metric = None
self.metric = None
self.evals = None
self.evec = None
self.err = None
self.projected_directions = None
self.projected_metric = None
self.projection = None
self.projected_mismatch = None
self.scale_dxs()
self.calculate_metric()
[docs]
def scale_dxs(self):
"""
This function scales the vectors dxs so that the mismatch between
a waveform at point x and one at x + dxs[i] is equal to the specified
mismatch, up to the specified tolerance.
"""
scale = np.zeros(self.ndim)
for i in range(self.ndim):
dx = self.dxs[i:i + 1]
scale[i] = scale_dx(self.x, dx, self.mismatch, self.f_low, self.psd,
self.approximant, self.tolerance)
self.dxs = SimplePESamples(SamplesDict(self.dx_directions, self.dxs.samples * scale))
[docs]
def calculate_metric(self):
"""
Calculate the metric using the existing basis vectors, stored as self.dxs.
"""
scaling = 1.
gij = np.zeros([self.ndim, self.ndim])
# diagonal components
# g_ii = 1 - 0.5 [m(dx_i) + m(-dx_i)]
for i in range(self.ndim):
gij[i, i] += average_mismatch(self.x, self.dxs[i:i + 1], scaling, self.f_low,
self.psd, self.approximant)
# off diagonal
# g_ij = 0.25 * [+ m(1/sqrt(2) (+ dx_i + dx_j)) + m(1/sqrt(2) (- dx_i - dx_j))
# - m(1/sqrt(2) (+ dx_i - dx_j)) - m(1/sqrt(2) (- dx_i + dx_j))]
for i in range(self.ndim):
for j in range(i + 1, self.ndim):
for s in ([1, -1]):
dx = {}
for k, vals in self.dxs.items():
dx[k] = (vals[i] + s * vals[j]) / np.sqrt(2)
gij[i, j] += 0.5 * s * average_mismatch(self.x, dx, scaling,
self.f_low, self.psd, self.approximant)
gij[j, i] = gij[i, j]
# this gives the metric in coordinates given by the dxs.
# Let's instead return the metric in physical parameter space
self.coordinate_metric = gij
self.physical_metric()
[docs]
def physical_metric(self):
"""
Calculate the metric in physical coordinates
"""
dx_inv = np.linalg.inv(self.dxs.samples)
self.metric = np.matmul(dx_inv.T, np.matmul(self.coordinate_metric, dx_inv))
[docs]
def calculate_evecs(self):
"""
Calculate the eigenvectors and eigenvalues of the metric gij
"""
self.evals, self.evec = np.linalg.eig(self.metric)
[docs]
def normalized_evecs(self):
"""
Return the eigenvectors normalized to give the desired mismatch
"""
if self.evec is None:
self.calculate_evecs()
# always force to be positive to avoid negatives in sqrt
self.evals[self.evals < 0] = np.abs(self.evals[self.evals < 0])
return SimplePESamples(SamplesDict(self.dx_directions, self.evec *
np.sqrt(self.mismatch / self.evals)))
[docs]
def calc_metric_error(self):
"""
We are looking for a metric and corresponding basis for which the
basis is orthogonal and whose normalization is given by the desired mismatch
This function checks for the largest error
"""
vgv = np.matmul(self.dxs.samples.T, np.matmul(self.metric, self.dxs.samples))
off_diag = np.max(abs(vgv[~np.eye(self.metric.shape[0], dtype=bool)]))
diag = np.max(abs(np.diag(vgv)) - self.mismatch)
self.err = max(off_diag, diag)
[docs]
def update_metric(self):
"""
Re-calculate the metric gij based on the matches obtained
for the eigenvectors of the original metric
"""
# calculate the eigendirections of the matrix
self.calculate_evecs()
self.dxs = SimplePESamples(SamplesDict(self.dx_directions, self.evec))
# scale so that they actually have the desired mismatch
self.scale_dxs()
self.calculate_metric()
[docs]
def iteratively_update_metric(self, max_iter=20, verbose=True):
"""
A method to re-calculate the metric gij based on the matches obtained
for the eigenvectors of the original metric
:param max_iter: maximum number of iterations
:param verbose: print information messages during update
"""
tol = float(self.tolerance * self.mismatch)
self.calc_metric_error()
op = 0
if verbose:
from tqdm import tqdm
base_desc = "Calculating the metric | iteration {} < {} | error {:.2g} > {:.2g}"
pbar = tqdm(
np.arange(max_iter), bar_format="{desc}",
desc=base_desc.format(op, max_iter, self.err, tol)
)
while (self.err > tol) and (op < max_iter):
self.update_metric()
self.calc_metric_error()
op += 1
if verbose:
pbar.set_description(base_desc.format(op, max_iter, self.err, tol))
pbar.update(1)
if self.err > tol:
pbar.set_description(
f"Failed to achieve requested tolerance. Requested: {tol:.2g} "
f"achieved {self.err:.2g}"
)
[docs]
def project_metric(self, projected_directions):
"""
Project out the unwanted directions of the metric
:param projected_directions: list of parameters that we want to keep
"""
self.projected_directions = projected_directions
kept = np.ones(self.ndim, dtype=bool)
for i, x in enumerate(self.dx_directions):
if x not in projected_directions:
kept[i] = False
# project out unwanted: g'_ab = g_ab - g_ai (ginv)^ij g_jb (where a,b run over kept, i,j over removed)
# matrix to calculate the values of the maximized directions: x^i = (ginv)^ij g_jb
ginv = np.linalg.inv(self.metric[~kept][:, ~kept])
self.projection = - np.matmul(ginv, self.metric[~kept][:, kept])
self.projected_metric = self.metric[kept][:, kept] + np.matmul(self.metric[kept][:, ~kept], self.projection)
if self.prob and self.snr:
# calculate equivalent mismatch given number of remaining dimensions
n_sigmasq = chi2.isf(1 - self.prob, len(projected_directions))
self.projected_mismatch = n_sigmasq / (2 * self.snr ** 2)
[docs]
def projected_evecs(self):
"""
Return the evecs normalized to give the desired mismatch
"""
evals, evec = np.linalg.eig(self.projected_metric)
return SimplePESamples(SamplesDict(self.projected_directions, evec * np.sqrt(self.mismatch / evals)))
[docs]
def generate_ellipse(self, npts=100, projected=False, scale=1.):
"""
Generate an ellipse of points of the stored mismatch. Scale the radius by a factor `scale'
If the metric is projected, and we know the snr, then scale the mismatch appropriately
for the number of projected dimensions.
:param npts: number of points in the ellipse
:param projected: use the projected metric if True, else use metric
:param scale: scaling to apply to the mismatch
:return ellipse_dict: SimplePESamples with ellipse of points
"""
if projected:
dx_dirs = self.projected_directions
n_evec = self.projected_evecs()
else:
dx_dirs = self.dx_directions
n_evec = self.normalized_evecs()
if len(dx_dirs) != 2:
print("We're expecting to plot a 2-d ellipse")
return -1
if projected and self.projected_mismatch:
scale *= np.sqrt(self.projected_mismatch / self.mismatch)
# generate points on a circle
phi = np.linspace(0, 2 * np.pi, npts)
xx = scale * np.cos(phi)
yy = scale * np.sin(phi)
pts = np.array([xx, yy])
# project onto eigendirections
dx = SimplePESamples(SamplesDict(dx_dirs, np.matmul(n_evec.samples, pts)))
# scale to be physical
alphas = np.zeros(npts)
for i in range(npts):
alphas[i] = waveform.check_physical(self.x, dx[i:i + 1], 1.)
ellipse_dict = SimplePESamples(SamplesDict(dx_dirs,
np.array([self.x[dx] for dx in dx_dirs]).reshape(
[2, 1]) + dx.samples * alphas))
return ellipse_dict
[docs]
def generate_match_grid(self, npts=10, projected=False, scale=1.):
"""
Generate a grid of points with extent governed by stored mismatch.
If the metric is projected, scale to the projected number of dimensions
Apply an overall scaling of scale
:param npts: number of points in each dimension of the grid
:param projected: use the projected metric if True, else use metric
:param scale: a factor by which to scale the extent of the grid
:return ellipse_dict: SimplePESamples with grid of points and match at each point
"""
if projected:
dx_dirs = self.projected_directions + [x for x in self.dx_directions if x not in self.projected_directions]
n_evec = self.projected_evecs()
else:
dx_dirs = self.dx_directions
n_evec = self.normalized_evecs()
if projected and self.projected_mismatch:
scale *= np.sqrt(self.projected_mismatch / self.mismatch)
grid = np.mgrid[-scale:scale:npts * 1j, -scale:scale:npts * 1j]
dx_data = np.tensordot(n_evec.samples, grid, axes=(1, 0))
if projected:
dx_extra = np.tensordot(self.projection, dx_data, axes=(1, 0))
dx_data = np.append(dx_data, dx_extra, 0)
dx = SimplePESamples(SamplesDict(dx_dirs, dx_data.reshape(len(dx_dirs), npts ** 2)))
h0 = waveform.make_waveform(self.x, self.psd.delta_f, self.f_low, len(self.psd), self.approximant)
m = np.zeros(dx.number_of_samples)
for i in range(dx.number_of_samples):
if waveform.check_physical(self.x, dx[i:i + 1], 1.) < 1:
m[i] = 0
else:
h1 = waveform.make_offset_waveform(self.x, dx[i:i + 1], 1.,
self.psd.delta_f, self.f_low, len(self.psd), self.approximant)
m[i] = match(h0, h1, self.psd, self.f_low, subsample_interpolation=True)[0]
matches = SimplePESamples(SamplesDict(dx_dirs + ['match'],
np.append(
dx.samples + np.array([self.x[dx] for dx in dx_dirs]).reshape(
[len(dx_dirs), 1]),
m.reshape([1, npts ** 2]), 0).reshape(
[len(dx_dirs) + 1, npts, npts])))
return matches
[docs]
def generate_posterior_grid(self, npts=10, projected=False, scale=None):
"""
Generate a grid of points with extent governed by requested mismatch
:param npts: number of points in each dimension of the grid
:param projected: use the projected metric if True, else use metric
:param scale: the scale to apply to the greatest extent of the grid
:return ellipse_dict: SimplePESamples with grid of points and match at each point
"""
if not self.snr:
print("need an SNR to turn matches into probabilities")
return -1
matches = self.generate_match_grid(npts, projected, scale)
post = np.exp(-self.snr ** 2 / 2 * (1 - matches['match'] ** 2))
grid_probs = SimplePESamples(SamplesDict(matches.keys() + ['posterior'],
np.append(matches.samples, post).reshape(
[len(matches.keys()) + 1, npts, npts])))
return grid_probs
[docs]
def generate_samples(self, npts=int(1e5), mins=None, maxs=None):
"""
Generate a given number of samples
:param npts: number of points to generate
:return phys_samples: SimplePESamples with samples
"""
sample_pts = self._get_samples(2 * npts, mins=mins, maxs=maxs)
while sample_pts.number_of_samples < npts:
extra_pts = self._get_samples(npts, mins=mins, maxs=maxs)
sample_pts = SimplePESamples(SamplesDict(sample_pts.keys(),
np.concatenate((sample_pts.samples.T, extra_pts.samples.T)).T))
if sample_pts.number_of_samples > npts:
sample_pts = sample_pts.downsample(npts)
return sample_pts
def _get_samples(self, npts=int(1e5), mins=None, maxs=None):
"""
Generate an ellipse of points of constant mismatch
:param npts: number of points to generate
:return phys_samples: SimplePESamples with samples
"""
pts = np.random.normal(0, 1, [npts, self.ndim])
sample_pts = SimplePESamples(SamplesDict(self.dx_directions,
(np.array([self.x[dx] for dx in self.normalized_evecs().keys()
]).reshape([len(self.dx_directions), 1])
+ np.matmul(pts, self.normalized_evecs().samples.T /
self.n_sigma).T)))
sample_pts.trim_unphysical(mins=mins, maxs=maxs)
return sample_pts
[docs]
def scale_match(m_alpha, alpha):
"""
A function to scale the match calculated at an offset alpha to the
match at unit offset
:param m_alpha: the match at an offset alpha
:param alpha: the value of alpha
:return m: the match at unit offset
"""
m = (alpha ** 2 - 1 + m_alpha) / alpha ** 2
return m
[docs]
def average_mismatch(x, dx, scaling, f_low, psd,
approximant="IMRPhenomD", verbose=False):
"""
This function calculates the average match for steps of +dx and -dx
It also takes care of times when one of the steps moves beyond the
edge of the physical parameter space
:param x : dictionary with the initial point
:param dx: dictionary with parameter variations
:param scaling: the scaling to apply to dx
:param f_low: low frequency cutoff
:param psd: the power spectrum to use in calculating the match
:param approximant: the approximant generator to use
:param verbose: print debugging information
:return m: The average match from steps of +/- scaling * dx
"""
a = {}
m = {}
h0 = waveform.make_waveform(x, psd.delta_f, f_low, len(psd), approximant)
for s in [1., -1.]:
a[s] = waveform.check_physical(x, dx, s * scaling)
try:
h = waveform.make_offset_waveform(x, dx, s * a[s] * scaling,
psd.delta_f, f_low, len(psd),
approximant)
m[s] = match(h0, h, psd, low_frequency_cutoff=f_low,
subsample_interpolation=True)[0]
except RuntimeError:
m[s] = 1e-5
if verbose:
print("Had to scale steps to %.2f, %.2f" % (a[-1], a[1]))
print("Mismatches %.3g, %.3g" % (1 - m[-1], 1 - m[1]))
if min(a.values()) < 1e-2:
if verbose:
print("we're really close to the boundary,"
"so down-weight match contribution")
mm = (2 - m[1] - m[-1]) / (a[1] ** 2 + a[-1] ** 2)
else:
mm = 1 - 0.5 * (scale_match(m[1], a[1]) + scale_match(m[-1], a[-1]))
return mm
[docs]
def scale_dx(x, dx, desired_mismatch, f_low, psd,
approximant="IMRPhenomD", tolerance=1e-2):
"""
This function scales the input vectors so that the mismatch between
a waveform at point x and one at x + v[i] is equal to the specified
mismatch, up to the specified tolerance.
:param x: dictionary with parameter values for initial point
:param dx: a dictionary with the initial change in parameters
:param desired_mismatch: the desired mismatch (1 - match)
:param f_low: low frequency cutoff
:param psd: the power spectrum to use in calculating the match
:param approximant: the approximant generator to use
:param tolerance: the maximum fractional error in the mismatch
:return scale: The required scaling of dx to achieve the desired mismatch
"""
for num in range(10):
try:
opt = optimize.root_scalar(
lambda a: average_mismatch(
x, dx, a, f_low, psd, approximant=approximant
) - desired_mismatch, bracket=np.array([0., 20.]) * (float(num) + 1),
method='brentq', rtol=tolerance
)
except ValueError:
continue
try:
scale = opt.root
except UnboundLocalError:
raise ValueError("Unable to scale the input vectors")
return scale
[docs]
def find_metric_and_eigendirections(x, dx_directions, snr, f_low, psd,
approximant="IMRPhenomD",
tolerance=0.05, max_iter=20):
"""
Calculate the eigendirections in parameter space, normalized to enclose a
90% confidence region at the requested SNR
:param x: dictionary with parameter values for initial point
:param dx_directions: list of parameters for which to calculate waveform variations
:param snr: the observed SNR
:param f_low: low frequency cutoff
:param psd: the power spectrum to use in calculating the match
:param approximant: the approximant to use
:param tolerance: the allowed error in the metric is (tolerance * mismatch)
:param max_iter: the maximum number of iterations
:return g: the mismatch metric in physical space
"""
# initial directions and initial spacing
g = Metric(x, dx_directions, 0, f_low, psd, approximant, tolerance,
prob=0.9, snr=snr)
g.iteratively_update_metric(max_iter)
return g