import numpy as np
import random as rnd
import copy
from simple_pe import detectors
from simple_pe.localization import loc, sky_loc
from pesummary.gw.conversions.mass import mchirp_from_m1_m2
from astropy.time import Time
from scipy.optimize import brentq
from scipy.special import logsumexp
##################################################################
# Helper functions
##################################################################
[docs]
def snr_projection(f_sig, method):
"""
Function to calculate the SNR projection matrix p for a given set
of detector responses, f_sig
Parameters
----------
f_sig: array
a Nx2 array of detector responses [F+, Fx] x sigma
method: str
the way we project (one of "time", "coh", "left", "right")
Returns
-------
p: array
SNR projection matrix
"""
if len(f_sig) == 1:
# single detector, so projection is identity
p = np.identity(len(f_sig))
elif method == "time":
p = np.identity(len(f_sig))
elif method == "coh":
M = np.zeros((2, 2))
for f in f_sig:
M += np.outer(f, f)
p = np.inner(np.inner(f_sig, np.linalg.inv(M)), f_sig)
elif method == "right":
cf = np.array([complex(f[0], f[1]) for f in f_sig])
p = np.outer(cf.conjugate(), cf) / np.inner(cf.conjugate(), cf)
elif method == "left":
cf = np.array([complex(f[0], f[1]) for f in f_sig])
p = np.outer(cf, cf.conjugate()) / np.inner(cf, cf.conjugate())
else:
raise NameError("Invalid projection method: %s" % method)
return p
##################################################################
# Class to store event information
##################################################################
[docs]
class Event(object):
"""
Class to hold the details of the event. This contains the sky location,
orientation, mass of the event. The Event class can also have details of
the active GW network, including the detectors, sensivitities and SNRs
added. These can be used to calculate the localization of the event.
"""
def __init__(self, dist, ra, dec, phi, psi, cosi, mchirp, t_gps):
"""
Initialize event.
Parameters
----------
dist: float
distance to event
ra: float
right ascension
dec: float
declination
phi: float
coalescence phase
psi: float
polarization
cosi: float
cos of inclination angle
mchirp: float
chirp mass
t_gps: float
GPS time of event (coalescence time)
"""
self.D = float(dist)
self.ra = float(ra)
self.dec = float(dec)
self.psi = float(psi)
self.phi = float(phi)
self.cosi = float(cosi)
self.mchirp = float(mchirp)
t = Time(t_gps, format='gps')
self.gps = float(t.gps)
self.gmst = float(t.sidereal_time('mean', 'greenwich').rad)
self.xyz = detectors.xyz(self.ra - self.gmst, self.dec)
self.ifos = []
self.mirror = False
self.mirror_xyz = None
self.detected = False
self.sensitivity = None
self.mirror_sensitivity = None
self.mirror_dec = None
self.mirror_ra = None
self.snrsq = None
self.localized = None
self.found = None
self.threshold = None
self.localization = {}
self.mirror_loc = {}
self.area = {}
self.patches = {}
[docs]
@classmethod
def from_params(cls, params):
"""
Give a set of parameters, as used in the first 2 years paper,
and use these to initialize an event
Parameters
----------
params: dict
parameters in form used by first 2 years paper
"""
try:
t = Time(params['gps'], format='gps')
except:
t = Time(params["MJD"], format='mjd')
return cls(dist=params["distance"],
ra=np.radians(params["RAdeg"]),
dec=np.radians(params["DEdeg"]),
phi=np.radians(params["coa-phase"]),
psi=np.radians(params["polarization"]),
cosi=np.cos(np.radians(params["inclination"])),
mchirp=mchirp_from_m1_m2(params["mass1"], params["mass2"]),
t_gps=t.gps,
)
[docs]
@classmethod
def random_values(cls, d_max=1000, mass=1.4, t_gps=1000000000):
"""
Generate an event with random distance, orientation at given time and
mass
Parameters
----------
d_max: float
maximum distance
mass: float
component mass (assumed equal mass)
t_gps: float
GPS time of event
"""
return cls(dist=rnd.uniform(0, 1) ** (1. / 3) * d_max,
ra=rnd.uniform(0, 2 * np.pi),
dec=np.arcsin(rnd.uniform(-1, 1)),
psi=rnd.uniform(0, 2 * np.pi),
phi=rnd.uniform(0, 2 * np.pi),
cosi=rnd.uniform(-1, 1),
mchirp=mchirp_from_m1_m2(mass, mass),
t_gps=t_gps
)
[docs]
@classmethod
def from_snrs(cls, net, snrs, times, mchirp, ra=None, dec=None):
"""
Give a network with SNR and time in each detector and use this
to populate the event information. If ra and dec are provided,
they are used. If not, then sky location is inferred from the time
of arrival in different detectors. For fewer than 3 detectors,
sky location is chosen arbitrarily among possible points.
Parameters
----------
net: network.Network
a network with SNR and time for each detector
snrs: dict
the complex snr in each detector
times: dict
the time in each detector
mchirp: float
the chirp mass of the event
ra: float
the right ascenscion of the source (optional)
dec: float
the declination of the source (optional)
"""
for i in net.ifos:
getattr(net, i).snr = snrs[i]
getattr(net, i).time = times[i]
f_band = {i: getattr(net, i).f_band for i in net.ifos}
if (ra is None) and (dec is None):
ra, dec = sky_loc.localization_from_timing(net.ifos, times, f_band)
elif (ra is not None) and (dec is not None):
pass
else:
raise ValueError(
"Please either provide an estimate for both 'ra' and 'dec', "
"or neither."
)
ev = cls(dist=0.,
ra=ra,
dec=dec,
phi=0.,
psi=0.,
cosi=0.,
mchirp=mchirp,
t_gps=np.mean(list(times.values())),
)
ev.add_network(net)
return ev
[docs]
def add_network(self, network):
"""
Calculate the sensitivities and SNRs for the various detectors in
network
Parameters
----------
network: network.Network
an object containing details of the network
"""
self.threshold = network.threshold
self.found = 0
self.localized = 0
self.snrsq = 0
for ifo in network.ifos:
i = getattr(network, ifo)
# don't use Numpy's RNG here as messes up seeding for networks
if rnd.random() < i.duty_cycle:
det = copy.deepcopy(i)
setattr(self, ifo, det)
# calculate SNR (if not already given)
if det.snr is None:
det.calculate_snr(self)
else:
det.calculate_sensitivity(self)
s = abs(det.snr)
if s > det.found_thresh:
self.found += 1
if s > det.loc_thresh:
self.snrsq += s ** 2
# add the details to the event
self.ifos.append(ifo)
# only count one ET and one Hanford detector in localization
hs = sum([i in self.ifos for i in ['H1', 'H2']])
ets = sum([i in self.ifos for i in ['E1', 'E2', 'E3',
'ETdet1', 'ETdet2']])
self.localized = len(self.ifos) - max(hs - 1, 0) - max(ets - 1, 0)
if self.found >= 2 and self.snrsq > self.threshold ** 2:
self.detected = True
[docs]
def get_data(self, data):
"""
Get the relevant data for each detector and return it as an array
Parameters
----------
data: str
string giving name of data field
Returns
-------
np.array
with the data (for all ifos)
"""
return np.array([getattr(getattr(self, i), data) for i in self.ifos])
[docs]
def get_fsig(self, mirror=False):
"""
Get F_plus/F_cross multiplied by sigma (sensitivity) for each detector
Parameters
----------
mirror: boolean
indicating whether we are considering the mirror location
Returns
-------
np.array
with the sensitivities of the detectors
"""
return np.array([getattr(self, i).get_fsig(mirror) for i in self.ifos])
[docs]
def get_f(self, mirror=False):
"""
Get the network sensitivity to plus and cross in dominant polarization
Parameters
----------
mirror: boolean
indicating whether we are considering the mirror location
Returns
-------
np.array
length 2 array containing F_+, F_x response
"""
m = np.zeros((2, 2))
fsig = self.get_fsig(mirror)
for f in fsig:
m += np.outer(f, f)
eigs = np.linalg.eig(m)[0]
if len(fsig) == 1:
# with 1-detector have no sensitivity to x
# numerically can get negative
eigs[eigs < 0] = 0
f_pc = np.sqrt(eigs)
f_pc.sort()
return f_pc[::-1]
[docs]
def alpha_net(self, mirror=False):
"""
Get the relative network sensitivity to the second polarization
Parameters
----------
mirror: boolean
indicating whether we are considering the mirror location
Returns
-------
float
value of alpha_network
"""
fp, fc = self.get_f(mirror)
return fc/fp
[docs]
def get_snr(self, dt_i=None):
"""
Calculate the snr for each detector at the requested time offset
Parameters
----------
dt_i: np.array
time shift to be applied in each detector
Returns
-------
np.array
the complex snr for each detector
"""
z = np.array([getattr(self, i).snr for i in self.ifos])
if dt_i is not None:
f_mean = self.get_data("f_mean")
f_band = self.get_data("f_band")
z *= (1 - 2 * np.pi ** 2 * (f_mean ** 2 + f_band ** 2) * dt_i ** 2
+ 2.j * np.pi * dt_i * f_mean)
return z
[docs]
def projected_snr(self, method, mirror=False, dt_i=None):
"""
Calculate the projected SNR for a given method at either original or
mirror sky location
Parameters
----------
method: str
localization method to use
mirror: boolean
indicating whether we are considering the mirror location
dt_i: time shift to be applied in each detector
Returns
-------
np.array
the complex snr for each detector
"""
f_sig = self.get_fsig(mirror)
p = snr_projection(f_sig, method)
zp = np.inner(self.get_snr(dt_i), p)
return zp
[docs]
def calculate_mirror(self):
"""
Calculate the mirror location and detector sensitivity there
"""
if len(self.ifos) == 3:
location = self.get_data("location")
x = location[1] - location[0]
y = location[2] - location[0]
normal = np.cross(x, y)
normal /= np.linalg.norm(normal)
self.mirror_xyz = self.xyz - 2 * np.inner(self.xyz, normal) * normal
mra, mdec = detectors.phitheta(self.mirror_xyz)
mra += self.gmst
self.mirror_ra = mra % (2 * np.pi)
self.mirror_dec = mdec
self.mirror = True
for i in self.ifos:
getattr(self, i).calculate_mirror_sensitivity(self)
[docs]
def calculate_sensitivity(self):
"""
Calculate the network sensitivity to the event, given the sky
location and masses.
"""
self.sensitivity = np.linalg.norm(self.get_fsig())
if self.mirror:
self.mirror_sensitivity = np.linalg.norm(self.get_fsig(mirror=True))
[docs]
def localization_factors(self, method, mirror=False):
"""
Calculate all the localization factors for a given source
and network of detectors, given the complex snr, sensitivity,
bandwidth, mean frequency, location of the detectors.
Definition of terms given in equations (B.5) and (B.6) of
"Localization of transient gravitational
wave sources: beyond triangulation", Class. Quantum Grav. 35 (2018)
105002.
Parameters
----------
method: string
localization method to use
mirror: boolean
indicating whether we are considering the mirror location
Returns
-------
b_i: np.array
the localization factor B_i (equation (B.5))
c_ij: np.array
the localization matrix C_ij (equation (B.6))
c_i: np.array
the localization factor c_i (equation (B.17))
c: float
the localization factor c (equation (B.17))
"""
f_mean = self.get_data("f_mean")
f_band = self.get_data("f_band")
# Calculate bar(f_sq)
f_sq = (f_mean ** 2 + f_band ** 2)
z = self.get_snr()
# calculate projection:
f_sig = self.get_fsig(mirror)
p = snr_projection(f_sig, method)
# work out the localization factors
k_i = 4 * np.pi ** 2 * np.real(np.sum(np.outer(f_sq * z.conjugate(), z)
* p, axis=1))
k_ij = 4 * np.pi ** 2 * np.real(np.outer(f_mean * z.conjugate(),
f_mean * z) * p)
c_ij = k_i * np.eye(len(k_i)) - k_ij
c_i = np.sum(c_ij, axis=1)
c = np.sum(c_i)
b_i = 4 * np.pi * np.imag(np.sum(np.outer(f_mean * z.conjugate(), z)
* p, axis=1))
return b_i, c_ij, c_i, c
[docs]
def localize(self, method, mirror=False, p=0.9):
"""
Calculate localization of source at given probability with
chosen method
Parameters
----------
method: str
localization method to use
mirror: boolean
indicating whether we are considering the mirror location
p: float
probability region to calculate (default 0.9)
"""
if mirror:
self.mirror_loc[method] = loc.Localization(method, self,
mirror, p)
else:
self.localization[method] = loc.Localization(method, self,
mirror, p)
[docs]
def combined_loc(self, method):
"""
Calculate the area from original and mirror locations for the given
method
Parameters
----------
method: str
localization method to use
"""
patches = 1
p = self.localization[method].p
a0 = - self.localization[method].area / np.log(1. - p)
if self.mirror:
if method == "marg":
drho2 = 2 * (self.localization[method].like -
self.mirror_loc[method].like)
else:
drho2 = (self.localization[method].snr ** 2 -
self.mirror_loc[method].snr ** 2)
prob_ratio = self.mirror_loc[method].p / p
a_ratio = self.mirror_loc[method].area / \
self.localization[method].area
if drho2 > 2 * (np.log(prob_ratio) +
np.log(1 + p * a_ratio) - np.log(1 - p)):
a = - np.log(1 - p * (1 + a_ratio * prob_ratio *
np.exp(-drho2 / 2))) * a0
else:
patches = 2
a = a0 * ((1 + a_ratio) * (-np.log(1 - p) + np.log(1 + a_ratio)
- np.log(1 + a_ratio * prob_ratio *
np.exp(-drho2 / 2)))
- a_ratio * (drho2 / 2 - np.log(prob_ratio)))
if np.isnan(a):
print("for method %s: we got a nan for the area" % method)
if not self.mirror or np.isnan(a):
a = - np.log(1. - p) * a0
patches = 1
self.patches[method] = patches
self.area[method] = a
[docs]
def marg_loc(self, mirror=False, p=0.9):
"""
Calculate the marginalized localization. Method described
in "Localization of transient gravitational
wave sources: beyond triangulation", Class. Quantum Grav. 35 (2018).
Parameters
----------
mirror: boolean
indicating whether we are considering the mirror location
p: float
probability region to calculate (default=0.9)
"""
if mirror:
localize = "mirror_loc"
else:
localize = "localization"
localization = getattr(self, localize)
if (localization["coh"].snr ** 2 - localization["right"].snr ** 2 < 2) \
or (localization["coh"].snr ** 2 - localization["left"].snr **
2 < 2):
# set coherent to zero if we don't trust it:
localization["coh"].like = 0
keys = ["left", "right", "coh"]
r_max = 1.1 * np.sqrt(np.nanmax([localization[k].area for k in keys])
/ np.pi)
r_min = 0.9 * np.sqrt(np.nanmin([localization[k].area for k in keys])
/ np.pi)
r = brentq(f, r_min, r_max, (keys, localization, p))
localization["marg"] = loc.Localization("marg", self, mirror, p,
area=np.pi * r ** 2)
localization["marg"].like = logsumexp([localization[k].like +
np.log(localization[k].area) -
np.log(-2 * np.pi *
np.log(1 -
localization[
k].p)) for k in keys])
localization["marg"].like -= np.log(localization["marg"].area) - \
np.log(-2 * np.pi * np.log(1 - p))
[docs]
def localize_all(self, p=0.9, methods=None):
"""
Calculate localization with given set of methods.
Parameters
----------
p: float
probability region to calculate
methods: list
A list of localization methods to use from 'time', 'coh', 'left',
'right', 'marg'. Default is to calculate all.
"""
if methods is None:
methods = ["time", "coh", "left", "right", "marg"]
if 'marg' in methods:
marg = True
methods.remove('marg')
else:
marg = False
self.calculate_mirror()
self.calculate_sensitivity()
for method in methods:
self.localize(method, mirror=False, p=p)
if self.mirror:
self.localize(method, mirror=True, p=p)
if marg:
methods.append('marg')
self.marg_loc(p=p)
if self.mirror:
self.marg_loc(mirror=True, p=p)
for method in methods:
self.combined_loc(method)
[docs]
def f(r, keys, localization, p):
"""
Function used in marginalization of localizations
TODO: Fix documentation, and check if this function is used
Parameters
----------
r: float
radius
keys: list
the different localization methods to consider
localization: loc.Localization
object storing localization information
p: float
probability
Returns
-------
float
f
"""
f = 0
lmax = max([localization[k].like for k in keys])
for k in keys:
s2 = localization[k].area / (-2 * np.pi * np.log(1 - localization[k].p))
f += np.exp(localization[k].like - lmax) * s2 * \
(1 - p - np.exp(-r ** 2 / (2 * s2)))
return f