import numpy as np
import copy
import math
from scipy import interpolate
from simple_pe.waveforms import parameter_bounds, waveform_modes
from simple_pe.detectors import noise_curves
from simple_pe.fstat import fstat_hm
from simple_pe import waveforms
from pesummary.utils.array import Array
from pesummary.utils.samples_dict import SamplesDict
from pesummary.gw.conversions.spins import opening_angle
from scipy.stats import ncx2, norm
from pycbc.filter import sigma
import tqdm
def _add_chi_align(data, **kwargs):
"""Add samples for chi_align. If spin_1z, spin_2z, mass_ratio are not
in data, samples for chi_align are not added to data
Parameters
----------
data: dict
dictionary of samples
"""
try:
data["chi_align"] = (
data["spin_1z"] + data["mass_ratio"]**(4. / 3) * data["spin_2z"]
) / (1 + data["mass_ratio"]**(4. / 3))
except KeyError:
pass
return data
def _component_spins_from_chi_align_chi_p(data, chip_to_spin1x=False, **kwargs):
"""Add samples for the component spins. If chi_align and mass ratio
are not in data, z component spins are not added. If chi_p and component
masses are not in data, x, y component spins are not added. Component spins
are drawn from a uniform distribution, and conditioned on the
provided chi_align, chi_p samples (except when `chip_to_spin1x=True`). This
is an expensive operation and can likely be optimised.
Parameters
----------
data: dict
dictionary of samples
chip_to_spin1x: Bool, optional
if True, set spin_1x=chi_p and all other in-plane spin components=0
"""
from pesummary.utils.utils import draw_conditioned_prior_samples
_data = SimplePESamples(data.copy())
_data.generate_all_posterior_samples()
if all(_ in _data.keys() for _ in ["chi_align", "mass_ratio"]):
s1z = np.random.uniform(-1, 1, len(_data["chi_align"]))
s2z = np.random.uniform(-1, 1, len(_data["chi_align"]))
conditioned = _add_chi_align(
{"spin_1z": s1z, "spin_2z": s2z, "mass_ratio": _data["mass_ratio"]}
)
conditioned = draw_conditioned_prior_samples(
_data, conditioned, ["chi_align"], {"chi_align": -1}, {"chi_align": 1},
nsamples=len(_data["chi_align"])
)
_data["spin_1z"] = conditioned["spin_1z"]
_data["spin_2z"] = conditioned["spin_2z"]
_data["_chi_align"] = _data["chi_align"]
_data["chi_align"] = conditioned["chi_align"]
if "chi_p" in _data.keys() and chip_to_spin1x:
_total = len(_data["chi_p"])
_data["spin_1x"] = _data["chi_p"]
_data["spin_1y"] = np.zeros(_total)
_data["spin_2x"] = np.zeros(_total)
_data["spin_2y"] = np.zeros(_total)
elif all(_ in _data.keys() for _ in ["chi_p", "mass_1", "mass_2"]):
from pesummary.gw.conversions import chi_p
s1x = np.random.uniform(-1, 1, int(1e5))
s1y = np.random.uniform(-1, 1, int(1e5))
s2x = np.random.uniform(-1, 1, int(1e5))
s2y = np.random.uniform(-1, 1, int(1e5))
m1 = np.random.choice(_data["mass_1"], replace=True, size=int(1e5))
m2 = np.random.choice(_data["mass_2"], replace=True, size=int(1e5))
conditioned = chi_p(m1, m2, s1x, s1y, s2x, s2y)
conditioned = draw_conditioned_prior_samples(
_data, {"chi_p": conditioned}, ["chi_p"], {"chi_p": 0.}, {"chi_p": 1},
nsamples=len(_data["chi_p"])
)
_data["spin_1x"] = conditioned["spin_1x"]
_data["spin_1y"] = conditioned["spin_1y"]
_data["spin_2x"] = conditioned["spin_2x"]
_data["spin_2y"] = conditioned["spin_2y"]
_data["_chi_p"] = _data["chi_p"]
_data["chi_p"] = conditioned["chi_p"]
for num in range(1, 3, 1):
if all(f"spin_{num}{comp}" in _data.keys() for comp in ["x", "y", "z"]):
_data[f"a_{num}"] = np.sqrt(
_data[f"spin_{num}x"]**2 + _data[f"spin_{num}y"]**2 +
_data[f"spin_{num}z"]**2
)
_data[f"a_{num}"][_data[f"a_{num}"] > 1.] = 1.
return _data
[docs]
def convert(*args, **kwargs):
from pesummary.gw.conversions import convert as _convert
if isinstance(args[0], dict):
_dict = args[0]
for key, value in _dict.items():
if not isinstance(value, (np.ndarray, list)):
_dict[key] = np.array([value])
elif isinstance(value[0], (np.ndarray, list)):
_dict[key] = Array(np.array(value).flatten())
args = (_dict,)
data = _convert(*args, **kwargs)
return _add_chi_align(data)
[docs]
class SimplePESamples(SamplesDict):
"""
Class for holding Simple PE Samples, and generating PE distributions
"""
def __init__(self, *args, logger_warn="warn", autoscale=True):
"""
Initialize as a SamplesDict
"""
if isinstance(args[0], dict):
_args = {}
for key, item in args[0].items():
if isinstance(item, (float, int, np.number)):
_args[key] = [item]
elif isinstance(item[0], (np.ndarray, list)):
_args[key] = Array(np.array(item).flatten())
else:
_args[key] = item
args = (_args,)
SamplesDict.__init__(self, *args, logger_warn, autoscale)
def __setitem__(self, key, value):
_value = value
if isinstance(_value, (float, int, np.number)):
_value = [_value]
if not isinstance(_value, Array):
_value = Array(_value)
if not _value.ndim:
_value = Array([_value])
super(SamplesDict, self).__setitem__(key, _value)
try:
if key not in self.parameters:
self.parameters.append(key)
try:
cond = (
np.array(self.samples).ndim == 1 and isinstance(
self.samples[0], (float, int, np.number)
)
)
except Exception:
cond = False
if cond and isinstance(self.samples, np.ndarray):
self.samples = np.append(self.samples, _value)
elif cond and isinstance(self.samples, list):
self.samples.append(_value)
else:
self.samples = np.vstack([self.samples, _value])
self._update_latex_labels()
except (AttributeError, TypeError):
pass
@property
def neff(self):
if "weight" in self.keys():
return (np.sum(self["weight"]))**2 / np.sum(self["weight"]**2)
return self.number_of_samples
[docs]
def update(self, dictionary):
for key, value in dictionary.items():
self.__setitem__(key, value)
def _update_latex_labels(self):
super(SimplePESamples, self)._update_latex_labels()
self._latex_labels.update({"chi_align": r"$\chi_{A}$",
"distance": r"$d_{L}$"})
[docs]
def generate_all_posterior_samples(self, function=None, **kwargs):
"""Convert samples stored in the SamplesDict according to a conversion
function
Parameters
----------
function: func, optional
function to use when converting posterior samples. Must take a
dictionary as input and return a dictionary of converted posterior
samples. Default `simple_pe.param_est.convert`
**kwargs: dict, optional
All additional kwargs passed to function
"""
# if "chi_p" not in self.keys() and "chi_p2" in self.keys():
# self["chi_p"] = np.sqrt(self["chi_p2"])
if function is None:
function = convert
if "a_1" in self.keys():
# limit a_1 < 1
self['a_1'][self['a_1'] > 1.] = 1
if "a_2" in self.keys():
# limit a_2 < 1
self['a_2'][self['a_2'] > 1.] = 1
return super(SimplePESamples, self).generate_all_posterior_samples(
function=function, **kwargs
)
[docs]
def add_fixed(self, name, value):
"""
generate an additional parameter called 'name' with constant 'value'
:param name: the name of the parameter
:param value: its value
"""
try:
npts = self.number_of_samples
self[name] = np.ones(npts) * value
except TypeError:
self[name] = value
[docs]
def generate_snrs(
self, psd, f_low, approximant, template_parameters, snrs,
localization_method
):
from simple_pe.detectors import calc_reach_bandwidth, Network
from simple_pe.localization.event import Event
net = Network(threshold=10.)
for ifo, p in psd.items():
hor, f_mean, f_band = calc_reach_bandwidth(
[
template_parameters['chirp_mass'],
template_parameters['symmetric_mass_ratio']
], template_parameters["chi_align"], approximant, p, f_low,
mass_configuration="chirp"
)
net.add_ifo(ifo, hor, f_mean, f_band, bns_range=False,
loc_thresh=4.)
if all(len(np.unique(self[_])) == 1 for _ in ["ra", "dec"]):
_ra = self["ra"][:1]
_dec = self["dec"][:1]
elif localization_method == "fullsky":
_ra = self["ra"]
_dec = self["dec"]
elif localization_method == "average":
# downsample to 1000 sky points to average over
inds = np.random.choice(np.arange(len(self["ra"])), size=int(1e3))
_ra = self["ra"][inds]
_dec = self["dec"][inds]
total = len(_ra)
f_sig = np.zeros(total)
alpha_net = np.zeros_like(f_sig)
snr_left = np.zeros_like(f_sig)
snr_right = np.zeros_like(f_sig)
for i, (_r, _d) in tqdm.tqdm(enumerate(zip(_ra, _dec)), total=total):
if total != self.number_of_samples:
mc = template_parameters["chirp_mass"]
else:
mc = self["chirp_mass"][i]
ee = Event.from_snrs(
net, snrs["ifo_snr"], snrs["ifo_time"], mc, _r, _d
)
ee.calculate_sensitivity()
f_sig[i] = ee.sensitivity
alpha_net[i] = ee.alpha_net()
snr_left[i] = np.linalg.norm(ee.projected_snr('left'))
snr_right[i] = np.linalg.norm(ee.projected_snr('right'))
if total == 1:
self["f_sig"] = np.ones_like(self["chirp_mass"]) * f_sig[0]
self["alpha_net"] = np.ones_like(self["chirp_mass"]) * alpha_net[0]
self["left_snr"] = np.ones_like(self["chirp_mass"]) * snr_left[0]
self["right_snr"] = np.ones_like(self["chirp_mass"]) * snr_right[0]
self["not_left"] = np.ones_like(self["chirp_mass"]) * np.sqrt(
snrs["network"]**2 - snr_left[0]**2
)
self["not_right"] = np.ones_like(self["chirp_mass"]) * np.sqrt(
snrs["network"]**2 - snr_right[0]**2
)
elif localization_method == "fullsky":
self["left_snr"] = snr_left
self["right_snr"] = snr_right
self["not_left"] = np.sqrt(snrs["network"]**2 - snr_left**2)
self["not_right"] = np.sqrt(snrs["network"]**2 - snr_right**2)
self["f_sig"] = f_sig
self["alpha_net"] = alpha_net
else:
self["left_snr"] = np.ones_like(self["chirp_mass"]) * np.mean(snr_left)
self["right_snr"] = np.ones_like(self["chirp_mass"]) * np.mean(snr_right)
self["not_left"] = np.ones_like(self["chirp_mass"]) * np.sqrt(
np.min(snrs['network']**2 - snr_left**2)
)
self["not_right"] = np.ones_like(self["chirp_mass"]) * np.sqrt(
np.min(snrs['network']**2 - snr_right**2)
)
self["f_sig"] = np.ones_like(self["chirp_mass"]) * np.mean(f_sig)
self["alpha_net"] = np.ones_like(self["chirp_mass"]) * np.mean(alpha_net)
[docs]
def generate_theta_jn(self, theta_dist='uniform', snr_left=0.,
snr_right=0., overwrite=False):
"""
generate theta JN points with the desired distribution and
include in the SimplePESamples
:param theta_dist: the distribution to use for theta.
Currently, supports 'uniform', 'left_circ',
'right_circ', 'left_right'
:param snr_left: left snr
:param snr_right: right snr
:param overwrite: if True, then overwrite existing values,
otherwise don't
"""
if 'theta_jn' in self.keys() and overwrite:
print('Overwriting theta_jn values')
self.pop('theta_jn')
if 'cos_theta_jn' in self.keys() and overwrite:
print('Overwriting cos_theta_jn values')
self.pop('cos_theta_jn')
if ('theta_jn' in self.keys()) or ('cos_theta_jn' in self.keys()):
print('Did not overwrite theta_jn and cos_theta_jn samples')
return
npts = self.number_of_samples
if theta_dist == 'uniform':
cos_theta = np.random.uniform(-1, 1, npts)
else:
if theta_dist == 'left_circ':
n_left = npts
n_right = 0
elif theta_dist == 'right_circ':
n_left = 0
n_right = npts
elif theta_dist == 'left_right':
n_left = int(
npts * np.exp(0.5 * snr_left ** 2) /
(np.exp(0.5 * snr_left ** 2) + np.exp(0.5 * snr_right ** 2)))
n_right = npts - n_left
else:
print("only implemented for 'uniform', "
"'left_circ', 'right_circ', 'left_right'")
return
cos_theta_r = 2 * np.random.power(1 + 6, n_right) - 1
cos_theta_l = 1 - 2 * np.random.power(1 + 6, n_left)
cos_theta = np.concatenate((cos_theta_l, cos_theta_r))
theta = np.arccos(cos_theta)
self['theta_jn'] = theta
self['cos_theta_jn'] = cos_theta
[docs]
def generate_distance(self, fiducial_distance, fiducial_sigma,
psd, f_low, interp_directions, interp_points=5,
approximant="IMRPhenomXPHM", overwrite=False,
sigma_22_grid=None):
"""
generate distance points using the existing theta_JN samples and
fiducial distance. Interpolate sensitivity over the parameter space
:param fiducial_distance: distance for a fiducial set of parameters
:param fiducial_sigma: the range for a fiducial set of parameters
:param psd: the PSD to use
:param f_low: low frequency cutoff
:param interp_directions: directions to interpolate
:param interp_points: number of points to interpolate alpha_lm
:param approximant: waveform approximant
:param overwrite: if True, then overwrite existing values,
otherwise don't
"""
if 'theta_jn' not in self.keys():
print('Require theta_jn values to calculate distances')
return
if 'distance' in self.keys() and overwrite:
print('Overwriting distance values')
self.pop('distance')
if 'distance' not in self.keys():
tau = np.tan(np.minimum(self['theta_jn'],
np.pi - self['theta_jn']) / 2)
maxs = dict((k, self.maximum[k]) for k in interp_directions)
mins = dict((k, self.minimum[k]) for k in interp_directions)
fixed_pars = {k: v[0] for k, v in self.mean.items()
if k not in interp_directions}
fixed_pars['distance'] = 1.0
# ensure we wind up with unphysical spins
if 'chi_p' in fixed_pars.keys():
fixed_pars['chi_p'] = 0.
if 'chi_p2' in fixed_pars.keys():
fixed_pars['chi_p2'] = 0.
if sigma_22_grid is None:
sigma_grid, pts = interpolate_sigma(maxs, mins, fixed_pars, psd,
f_low, interp_points,
approximant)
else:
sigma_grid, pts = sigma_22_grid
sigma_int = interpolate.interpn(pts, sigma_grid,
np.array([self[k] for k in
interp_directions]).T)
self['distance'] = fiducial_distance * sigma_int/fiducial_sigma / \
(1 + tau**2) ** 2
else:
print('Did not overwrite distance samples')
[docs]
def jitter_distance(self, net_snr, response_sigma=0.):
"""
jitter distance values based upon existing distances
jitter due to SNR and variation of network response
:param net_snr: the network SNR
:param response_sigma: standard deviation of network response (over sky)
"""
if 'distance' not in self.keys():
print('Need existing distance values before jittering them')
return
std = np.sqrt(net_snr ** -2 + response_sigma ** 2)
# generate distance scaling factors
d_scale = norm.rvs(1, std, 10 * self.number_of_samples)
d_scale = d_scale[d_scale > 0]
# weight using uniform volume distribution
d_weight = d_scale ** 3
d_weight /= d_weight.max()
keep = (d_weight > np.random.uniform(0, 1, len(d_weight)))
try:
d_keep = d_scale[keep][:self.number_of_samples]
self['distance'] *= d_keep
except:
print('Failed to generate enough samples')
print('Not performing distance jitter')
return
[docs]
def generate_chi_p(self, chi_p_dist='uniform', overwrite=False):
"""
generate chi_p points with the desired distribution and include in the
existing samples dict
:param chi_p_dist: the distribution to use for chi_p.
Currently supports 'uniform' and 'isotropic_on_sky'
:param overwrite: if True, then overwrite existing values,
otherwise don't
"""
param = "chi_eff" if "chi_eff" in self.keys() else "chi_align"
self.trim_unphysical()
if chi_p_dist == 'uniform':
chi_p_samples = np.random.uniform(0, np.sqrt(0.99 - self.maximum[param] ** 2),
self.number_of_samples)
elif chi_p_dist == "isotropic_on_sky":
from pesummary.gw.conversions import chi_p as _chi_p, q_from_eta, \
m1_from_mchirp_q, m2_from_mchirp_q
a_1 = np.random.uniform(0, np.sqrt(0.99 - self.maximum[param] ** 2),
self.number_of_samples)
a_2 = np.random.uniform(0, np.sqrt(0.99 - self.maximum[param] ** 2),
self.number_of_samples)
tilt_1 = np.arccos(np.random.uniform(0, 1, self.number_of_samples))
tilt_2 = np.arccos(np.random.uniform(0, 1, self.number_of_samples))
spin_1x = a_1 * np.cos(tilt_1)
spin_1y = np.zeros_like(spin_1x)
spin_1z = a_1 * np.sin(tilt_1)
spin_2x = a_2 * np.cos(tilt_2)
spin_2y = np.zeros_like(spin_2x)
spin_2z = a_2 * np.sin(tilt_2)
chirp_mass = np.random.uniform(
self.minimum["chirp_mass"], self.maximum["chirp_mass"],
self.number_of_samples
)
mass_ratio = np.random.uniform(
q_from_eta(self.minimum["symmetric_mass_ratio"]),
q_from_eta(self.maximum["symmetric_mass_ratio"]),
self.number_of_samples
)
mass_1 = m1_from_mchirp_q(chirp_mass, mass_ratio)
mass_2 = m2_from_mchirp_q(chirp_mass, mass_ratio)
chi_p_samples = _chi_p(mass_1, mass_2, spin_1x, spin_1y,
spin_2x, spin_2y)
else:
print("only implemented for 'uniform' and 'isotropic_on_sky'")
return
if 'chi_p' in self.keys() and overwrite:
print('Overwriting chi_p values')
self.pop('chi_p')
if 'chi_p' not in self.keys():
self['chi_p'] = chi_p_samples
else:
print('Did not overwrite chi_p samples')
[docs]
def generate_spin_z(self, overwrite=False):
"""
Generate z-component spins from chi_eff or chi_align
:param overwrite: if True, then overwrite existing values,
otherwise don't
"""
if not any(_ in self.keys() for _ in ['chi_eff', 'chi_align']):
print("Need to have 'chi_align' in samples")
return
if 'spin_1z' in self.keys() and overwrite:
print('Overwriting spin_1z values')
self.pop('spin_1z')
if 'spin_2z' in self.keys() and overwrite:
print('Overwriting spin_2z values')
self.pop('spin_2z')
param = "chi_eff" if "chi_eff" in self.keys() else "chi_align"
if ('spin_1z' not in self.keys()) and ('spin_2z' not in self.keys()):
# put chi_eff on both BHs, no x,y components
self['spin_1z'] = self[param]
self['spin_2z'] = self[param]
else:
print('Did not overwrite spin_1z and spin_2z samples')
[docs]
def generate_prec_spin(self, overwrite=False):
"""
Generate component spins from chi_eff/chi_align/spin_z and chi_p
:param overwrite: if True, then overwrite existing values,
otherwise don't
"""
param = "chi_eff" if "chi_eff" in self.keys() else "chi_align"
if 'chi_p' not in self.keys() and "chi_p2" not in self.keys():
self["spin_1z"] = self[param]
self["spin_2z"] = self[param]
self["a_1"] = np.abs(self["spin_1z"])
self["a_2"] = np.abs(self["spin_2z"])
self["tilt_1"] = np.arccos(np.sign(self["spin_1z"]))
self["tilt_2"] = np.arccos(np.sign(self["spin_2z"]))
for param in [
"phi_12", "phi_jl", "beta", "spin_1x", "spin_1y",
"spin_2x", "spin_2y"
]:
self[param] = np.zeros_like(self["spin_1z"])
return
if ('spin_1z' in self.keys()) and ('spin_2z' in self.keys()):
s1z = self['spin_1z']
s2z = self['spin_2z']
elif ('spin_1z' in self.keys()) or ('spin_2z' in self.keys()):
print("Need to specify both 'spin_1z' and 'spin_2z'"
"(not just one) or else chi_align/chi_eff")
return
elif param not in self.keys():
print("Need to specify aligned spin component, "
"please give either 'chi_eff', 'chi_align' or components")
return
else:
s1z = self[param]
s2z = self[param]
if "chi_p2" in self.keys():
if "chi_p" in self.keys():
print('Both chi_p and chi_p2 in samples, using chi_p')
else:
if sum(self['chi_p2'] < 0):
print('negative values of chi_p2, smallest = %.2g' % min(self['chi_p2']) )
print('setting equal 0')
self['chi_p2'][self['chi_p2'] < 0] = 0
self['chi_p'] = np.sqrt(self['chi_p2'])
for k in ['a_1', 'a_2', 'tilt_1', 'tilt_2', 'phi_12', 'phi_jl']:
if k in self.keys():
if overwrite:
print('Overwriting %s values' % k)
self.pop(k)
else:
print('%s already in samples, not overwriting' % k)
return
self['a_1'] = np.sqrt(self["chi_p"] ** 2 + s1z ** 2)
# limit a_1 < 1
#self['a_1'][self['a_1'] > 1.] = 1
self['tilt_1'] = np.arctan2(self["chi_p"], s1z)
self['a_2'] = np.abs(s2z)
# limit a_2 < 1
#self['a_2'][self['a_2'] > 1.] = 1.
self['tilt_2'] = np.arccos(np.sign(s2z))
self.add_fixed('phi_12', 0.)
self.add_fixed('phi_jl', 0.)
[docs]
def trim_unphysical(self, maxs=None, mins=None, set_to_bounds=False):
"""
Trim unphysical points from SimplePESamples
:param maxs: the maximum permitted values of the physical parameters
:param mins: the minimum physical values of the physical parameters
:param set_to_bounds: move points that lie outside physical space to
the boundary of allowed space
:return physical_samples: SamplesDict with points outside the param max
and min given
"""
if mins is None:
mins = parameter_bounds.param_mins
if maxs is None:
maxs = parameter_bounds.param_maxs
if not set_to_bounds:
keep = np.ones(self.number_of_samples, bool)
for d, v in self.items():
if d in maxs:
keep *= (v <= maxs[d])
if d in mins:
keep *= (v >= mins[d])
self.__init__(self[keep])
else:
for d, v in self.items():
if d in maxs:
self[d][v >= maxs[d]] = maxs[d]
if d in mins:
self[d][v <= mins[d]] = mins[d]
ind = self.parameters.index(d)
self.samples[ind] = self[d]
[docs]
def calculate_rho_lm(self, psd, f_low, net_snr, modes, interp_directions,
interp_points=5, approximant="IMRPhenomXPHM",
alpha_lm_grid=None):
"""
Calculate the higher mode SNRs
:param psd: the PSD to use
:param f_low: low frequency cutoff
:param net_snr: the network SNR
:param modes: modes for which to calculate SNR
:param interp_directions: directions to interpolate
:param interp_points: number of points to interpolate alpha_lm
:param approximant: waveform approximant
"""
maxs = dict((k, self.maximum[k]) for k in interp_directions)
mins = dict((k, self.minimum[k]) for k in interp_directions)
fixed_pars = {k: v[0] for k, v in self.mean.items()
if k not in interp_directions}
if alpha_lm_grid is None:
alpha_grid, pts = interpolate_alpha_lm(maxs, mins, fixed_pars, psd,
f_low, interp_points,
modes, approximant)
else:
alpha_grid, pts = alpha_lm_grid
for m in modes:
alpha = interpolate.interpn(pts, alpha_grid[m],
np.array([self[k] for k in
interp_directions]).T)
if "polarization" in self.keys():
self['rho_' + m] = net_snr * alpha * (
np.cos(2 * self['polarization']) *
fstat_hm.amp[m + '+'](self["theta_jn"]) /
fstat_hm.amp['22+'](self["theta_jn"]) +
np.sin(2 * self['polarization']) *
fstat_hm.amp[m + 'x'](self["theta_jn"]) /
fstat_hm.amp['22x'](self["theta_jn"]))
else:
self['rho_' + m] = net_snr * alpha * \
fstat_hm.amp[m + '+'](self["theta_jn"]) / \
fstat_hm.amp['22+'](self["theta_jn"])
[docs]
def calculate_rho_2nd_pol(self, a_net, net_snr):
"""
Calculate the SNR in the second polarization
:param a_net: network sensitivity to x polarization (in DP frame)
:param net_snr: the network SNR
"""
self['rho_not_right'] = net_snr * np.tan(self['theta_jn'] / 2) ** 4 * \
2 * a_net / (1 + a_net ** 2)
self['rho_not_left'] = net_snr * \
np.tan((np.pi - self['theta_jn']) / 2) ** 4 * \
2 * a_net / (1 + a_net ** 2)
# doesn't make sense to have this larger than net_snr:
if np.shape(net_snr) == np.shape(self['theta_jn']):
self['rho_not_right'][self['rho_not_right'] > net_snr] = \
net_snr[self['rho_not_right'] > net_snr]
self['rho_not_left'][self['rho_not_left'] > net_snr] = \
net_snr[self['rho_not_left'] > net_snr]
else:
self['rho_not_right'][self['rho_not_right'] > net_snr] = net_snr
self['rho_not_left'][self['rho_not_left'] > net_snr] = net_snr
[docs]
def calculate_rho_p(self, psd, f_low, net_snr, interp_directions,
interp_points=5, approximant="IMRPhenomXP",
beta_22_grid=None):
"""
Calculate the precession SNR
:param psd: the PSD to use
:param f_low: low frequency cutoff
:param net_snr: the network SNR
:param interp_directions: directions to interpolate
:param interp_points: number of points to interpolate alpha_lm
:param approximant: waveform approximant
"""
maxs = dict((k, [self[k].max()]) for k in interp_directions)
mins = dict((k, [self[k].min()]) for k in interp_directions)
fixed_pars = {k: v[0] for k, v in self.mean.items()
if k not in interp_directions}
if beta_22_grid is None:
beta_grid, pts = interpolate_opening(maxs, mins, fixed_pars, psd,
f_low, interp_points, approximant)
else:
beta_grid, pts = beta_22_grid
self['beta'] = interpolate.interpn(pts, beta_grid,
np.array([self[k] for k
in interp_directions]).T)
t_over_2 = np.minimum(self['theta_jn'], np.pi - self['theta_jn'])/2
self['rho_p'] = net_snr * 4 * np.tan(self['beta'] / 2) * \
np.tan(t_over_2)
[docs]
def calculate_hm_prec_probs(self, hm_snr=None, prec_snr=None,
snr_2pol=None, overlaps=None):
"""
Calculate the precession SNR
:param hm_snr: dictionary of measured SNRs in higher modes
:param prec_snr: measured precession SNR
:param snr_2pol: the SNR in the second polarization
:param overlaps: dictionary of the measured overlaps between modes
"""
weights = np.ones(self.number_of_samples)
if hm_snr is not None:
hm_snr = np.nan_to_num(hm_snr, 0.)
for lm, snr in hm_snr.items():
rv = ncx2(2, snr ** 2)
if overlaps is not None:
over = overlaps[lm]
else:
over = 0.
p = rv.pdf(self['rho_' + lm] ** 2 * (1 - over ** 2))
self['p_' + lm] = p/p.max()
weights *= self['p_' + lm]
if prec_snr is not None:
prec_snr = np.nan_to_num(prec_snr, 0.)
rv = ncx2(2, prec_snr ** 2)
if overlaps is not None:
over = overlaps['prec']
else:
over = 0.
p = rv.pdf(self['rho_p'] ** 2 * (1 - over ** 2))
self['p_p'] = p / p.max()
weights *= self['p_p']
if snr_2pol is not None:
self.add_fixed('p_2pol', 0)
for pol, snr in snr_2pol.items():
rv = ncx2(2, snr ** 2)
self['p_' + pol] = rv.pdf(self['rho_' + pol] ** 2)
self['p_2pol'] = np.maximum(self['p_2pol'], self['p_' + pol])
self['p_2pol'] /= self['p_2pol'].max()
weights *= self['p_2pol']
self['weight'] = weights
[docs]
def interpolate_opening(param_max, param_min, fixed_pars, psd, f_low,
grid_points, approximant):
"""
generate interpolating functions for the amplitudes of the opening angle
:param param_max: A dictionary containing the maximum value of
each parameter
:param param_min: A dictionary containing the maximum value of
each parameter
:param fixed_pars: the fixed parameters needed to generate the waveform
:param psd: the psd to use in calculating mean frequency,
used for opening angle
:param f_low: the low frequency cutoff to use
:param grid_points: number of points to interpolate opening angle
:param approximant: the waveform approximant to use
:return opening: array of opening angle values interpolated across the grid
:return pts: set of points used in each direction
"""
dirs = param_max.keys()
pts = [np.linspace(param_min[d][0], param_max[d][0], grid_points)
for d in dirs]
grid_dict = dict(zip(dirs, np.array(np.meshgrid(*pts, indexing='ij'))))
grid_samples = SimplePESamples({k: i.flatten()
for k, i in grid_dict.items()})
for k, i in fixed_pars.items():
grid_samples.add_fixed(k, i)
grid_samples.add_fixed('f_ref', 0)
grid_samples.add_fixed('phase', 0)
# if "chi_p2" in grid_samples.keys() and "chi_p" in grid_samples.keys():
# grid_samples.pop("chi_p")
grid_samples.generate_prec_spin()
# need to use set_to_bounds=True so we do not modify the grid
grid_samples.trim_unphysical(set_to_bounds=True)
grid_samples.generate_all_posterior_samples(disable_remnant=True)
beta = np.zeros(grid_samples.number_of_samples)
for i in tqdm.tqdm(range(grid_samples.number_of_samples),
desc="calculating opening angle on grid"):
sample = grid_samples[i:i+1]
param = "chi_eff" if "chi_eff" in sample.keys() else "chi_align"
_, f_mean, _ = noise_curves.calc_reach_bandwidth(
[sample["mass_1"], sample["mass_2"]], sample[param], approximant,
psd, f_low, thresh=8., mass_configuration="component"
)
beta[i] = opening_angle(
sample["mass_1"], sample["mass_2"], sample["phi_jl"],
sample["tilt_1"], sample["tilt_2"], sample["phi_12"],
sample["a_1"], sample["a_2"],
f_mean * np.ones_like(sample["mass_1"]),
sample["phase"])
return beta.reshape(list(grid_dict.values())[0].shape), pts
[docs]
def interpolate_sigma(param_max, param_min, fixed_pars, psd, f_low, grid_points,
approximant):
"""
generate interpolating function for sigma
:param param_max: A dictionary containing the maximum value of
each parameter
:param param_min: A dictionary containing the maximum value of
each parameter
:param fixed_pars: A dictionary containing values of fixed parameters
:param psd: the PSD to use
:param f_low: low frequency cutoff
:param grid_points: number of points to interpolate alpha_33 and beta
:param approximant: waveform approximant
:return alpha: dictionary of alpha[lm] values interpolated across the grid
:return pts: set of points used in each direction
"""
from simple_pe.waveforms import waveform
dirs = param_max.keys()
pts = [np.linspace(param_min[d][0], param_max[d][0], grid_points)
for d in dirs]
grid_dict = dict(zip(dirs, np.array(np.meshgrid(*pts, indexing='ij'))))
grid_samples = SimplePESamples({k: i.flatten() for k, i in grid_dict.items()})
for k, i in fixed_pars.items():
grid_samples.add_fixed(k, i)
sig = np.zeros(grid_samples.number_of_samples)
for i in tqdm.tqdm(range(grid_samples.number_of_samples),
desc="calculating sigma on grid"):
sample = grid_samples[i:i+1]
h = waveform.make_waveform(sample, psd.delta_f, f_low, len(psd),
approximant)
sig[i] = sigma(h, psd, low_frequency_cutoff=f_low,
high_frequency_cutoff=psd.sample_frequencies[-1])
return sig.reshape(list(grid_dict.values())[0].shape), pts
[docs]
def interpolate_alpha_lm(param_max, param_min, fixed_pars, psd, f_low,
grid_points, modes, approximant):
"""
generate interpolating functions for the amplitudes of the lm multipoles
:param param_max: A dictionary containing the maximum value of
each parameter
:param param_min: A dictionary containing the maximum value of
each parameter
:param fixed_pars: A dictionary containing values of fixed parameters
:param psd: the PSD to use
:param f_low: low frequency cutoff
:param grid_points: number of points to interpolate alpha_33 and beta
:param modes: waveform modes to calculate
:param approximant: waveform approximant
:return alpha: dictionary of alpha[lm] values interpolated across the grid
:return pts: set of points used in each direction
"""
dirs = param_max.keys()
pts = [np.linspace(param_min[d][0], param_max[d][0], grid_points)
for d in dirs]
grid_dict = dict(zip(dirs, np.array(np.meshgrid(*pts, indexing='ij'))))
grid_samples = SimplePESamples({k: i.flatten()
for k, i in grid_dict.items()})
for k, i in fixed_pars.items():
grid_samples.add_fixed(k, i)
grid_samples.generate_spin_z()
grid_samples.generate_all_posterior_samples(disable_remnant=True)
alpha = {}
for m in modes:
alpha[m] = np.zeros(grid_samples.number_of_samples)
for i in tqdm.tqdm(range(grid_samples.number_of_samples),
desc="calculating alpha_lm on grid"):
sample = grid_samples[i:i+1]
a, _ = waveform_modes.calculate_alpha_lm_and_overlaps(sample['mass_1'],
sample['mass_2'],
sample['spin_1z'],
sample['spin_2z'],
psd, f_low,
approximant, modes,
dominant_mode='22'
)
for m, al in alpha.items():
al[i] = a[m]
for k, i in alpha.items():
alpha[k] = i.reshape(list(grid_dict.values())[0].shape)
return alpha, pts
[docs]
def calculate_interpolated_snrs(
samples, psd, f_low, dominant_snr, modes, response_sigma,
fiducial_sigma, dist_interp_dirs,
hm_interp_dirs, prec_interp_dirs, interp_points, approximant,
localization_method, **kwargs
):
"""Wrapper function to calculate the SNR in the (l,m) multipoles,
the SNR in the second polarisation and the SNR in precession.
Parameters
----------
samples: simple_pe.param_est.pe.SimplePESamples
table of posterior distributions
psd: pycbc.types.frequencyseries
frequency series containing the PSD
f_low: float
low frequency cut-off to use for SNR calculations
dominant_snr: float
SNR in the dominant 22 multipole
modes: list
list of higher order multipoles that you wish to calculate
the SNR for
alpha_net: float
network sensitivity to x polarization (in DP frame) used to
calculate the SNR in the second
response_sigma: float
standard deviation of network response over sky region
fiducial_distance: float
distance at which a face on signal would give the observed dominant SNR
fiducial_sigma: float
distance at which a face on signal would give SNR=8 at
(using params for fiducial_distance)
dist_interp_dirs: list
directions to interpolate the distance
hm_interp_dirs: list
directions to interpolate the higher multipole SNR calculation
prec_interp_dirs: list
directions to interpolate the precession SNR calculation
interp_points: int
number of points to interpolate the SNRs
approximant: str
approximant to use when calculating the SNRs
localization_method: str
method to use when localizing the event. Must either be 'average'
or 'fullsky'
"""
from simple_pe import io
if not isinstance(samples, SimplePESamples):
samples = SimplePESamples(samples)
hm_psd = io.calculate_harmonic_mean_psd(psd)
# generate required parameters if necessary
template_parameters = kwargs.get("template_parameters", None)
snrs = kwargs.get("snrs", None)
if any(kwargs.get(f"{_}_snr", None) is None for _ in ["left", "right"]):
samples.generate_snrs(
psd=psd, f_low=f_low, approximant=approximant,
template_parameters=template_parameters, snrs=snrs,
localization_method=localization_method
)
if "theta_jn" not in samples.keys() and \
kwargs.get("left_snr", None) is not None:
samples.generate_theta_jn(
'left_right', snr_left=kwargs.pop("left_snr"),
snr_right=kwargs.pop("right_snr")
)
elif "theta_jn" not in samples.keys():
samples.generate_theta_jn('uniform')
samples["distance_face_on"] = samples["f_sig"] / snrs["network"]
if "distance" not in samples.keys():
samples.generate_distance(samples["distance_face_on"], fiducial_sigma,
hm_psd, f_low, dist_interp_dirs,
interp_points, approximant,
sigma_22_grid=kwargs.get("sigma_22_grid", None))
samples.jitter_distance(dominant_snr, response_sigma)
if "chi_p" not in samples.keys() and "chi_p2" not in samples.keys():
if waveforms.precessing_approximant(approximant):
samples.generate_chi_p('isotropic_on_sky')
else:
samples.add_fixed("chi_p", 0.)
samples.calculate_rho_lm(
hm_psd, f_low, dominant_snr, modes, hm_interp_dirs, interp_points,
approximant, alpha_lm_grid=kwargs.get("alpha_lm_grid", None)
)
samples.calculate_rho_2nd_pol(samples["alpha_net"], dominant_snr)
if (prec_interp_dirs is not None) and ("chi_p" in prec_interp_dirs) and \
("chi_p" not in samples.keys()):
samples['chi_p'] = samples['chi_p2']**0.5
if waveforms.precessing_approximant(approximant):
samples.calculate_rho_p(
hm_psd, f_low, dominant_snr, prec_interp_dirs, interp_points,
approximant, beta_22_grid=kwargs.get("beta_22_grid", None)
)
else:
samples.add_fixed("rho_p", 0.)
if ("chi_p2" in samples.keys()) and ("chi_p" not in samples.keys()):
samples['chi_p'] = samples['chi_p2']**0.5
return samples
[docs]
def reweight_based_on_observed_snrs(samples, **kwargs):
"""Resample a table of posterior distributions based on the observed
SNR in the higher multipoles, precession and second polarisation.
Parameters
----------
samples: simple_pe.param_est.pe.SimplePESamples
table containing posterior distributions
**kwargs: dict, optional
all kwargs passed to the samples.calculate_hm_prec_probs function
"""
from pesummary.core.reweight import rejection_sampling
if not isinstance(samples, SimplePESamples):
samples = SimplePESamples(samples)
samples.calculate_hm_prec_probs(**kwargs)
return rejection_sampling(samples, samples['weight'])
[docs]
def isotropic_spin_prior_weight(samples, dx_directions):
"""
Re-weight points to prior proportional to chi_p (1 - chi_p), rather
than uniform in chi_p or chi_p2.
This prior approximately matches the one that arises from using
uniform priors on spin magnitudes and orientations.
Parameters
----------
samples: SimplePESamples
set of samples to reweight
dx_directions: list
directions used in generating samples
Returns
-------
reweighted_samples: SimplePESamples
"""
from pesummary.core.reweight import rejection_sampling
if 'chi_p' in dx_directions:
weights = samples['chi_p'] * (1 - samples['chi_p'])
elif 'chi_p2' in dx_directions:
weights = 1 - samples['chi_p2']**0.5
else:
weights = np.ones_like(samples['chirp_mass'])
return rejection_sampling(samples, weights)
[docs]
def component_mass_prior_weight(samples, dx_directions):
"""
Re-weight points to uniform in mass ratio rather than
symmetric mass ratio. Since the transformation is singular at equal mass
we truncate at close to equal mass (eta = 0.2499)
Parameters
----------
samples: SimplePESamples
set of samples to re-weight
dx_directions: list
directions used in generating samples
Returns
-------
reweighted_samples: SimplePESamples
"""
from pesummary.core.reweight import rejection_sampling
if 'symmetric_mass_ratio' in dx_directions:
mass_weights = samples['chirp_mass'] * np.minimum(50, 1 / np.sqrt(
1 - 4 * samples['symmetric_mass_ratio']))
else:
mass_weights = np.ones_like(samples['chirp_mass'])
return rejection_sampling(samples, mass_weights)