gstlal  1.4.1
multirate_datasource.py
1 # Copyright (C) 2009--2013 Kipp Cannon, Chad Hanna, Drew Keppel
2 #
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License as published by the
5 # Free Software Foundation; either version 2 of the License, or (at your
6 # option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
11 # Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 
17 
18 #
19 # =============================================================================
20 #
21 # Preamble
22 #
23 # =============================================================================
24 #
25 
26 
27 import sys
28 import os
29 import optparse
30 import math
31 import numpy
32 
33 import gi
34 gi.require_version('Gst', '1.0')
35 from gi.repository import GObject, Gst, GstAudio
36 GObject.threads_init()
37 Gst.init(None)
38 
39 import lal
40 from ligo import segments
41 
42 from gstlal import bottle
43 from gstlal import pipeparts
44 from gstlal import reference_psd
45 from gstlal import datasource
46 
47 __doc__ = """
48 
49 **Review Status**
50 
51 +------------------------------------------------+------------------------------------------+------------+
52 | Names | Hash | Date |
53 +================================================+==========================================+============+
54 | Florent, Sathya, Duncan Me, Jolien, Kipp, Chad | 8a6ea41398be79c00bdc27456ddeb1b590b0f68e | 2014-06-18 |
55 +------------------------------------------------+------------------------------------------+------------+
56 """
57 
58 # a macro to switch between a conventional whitener and a fir whitener below
59 try:
60  if int(os.environ["GSTLAL_FIR_WHITEN"]):
61  FIR_WHITENER = True
62  else:
63  FIR_WHITENER = False
64 except KeyError as e:
65  print >> sys.stderr, "You must set the environment variable GSTLAL_FIR_WHITEN to either 0 or 1. 1 enables causal whitening. 0 is the traditional acausal whitening filter"
66  raise
67 
68 
69 def mkwhitened_multirate_src(pipeline, src, rates, instrument, psd = None, psd_fft_length = 32, ht_gate_threshold = float("inf"), veto_segments = None, nxydump_segment = None, track_psd = False, block_duration = 1 * Gst.SECOND, zero_pad = None, width = 64, unit_normalize = True, statevector = None, dqvector = None, fir_whiten_reference_psd = None):
70  """
71  Build pipeline stage to whiten and downsample h(t).
72 
73  * pipeline: the gstreamer pipeline to add this to
74  * src: the gstreamer element that will be providing data to this
75  * rates: a list of the requested sample rates, e.g., [512,1024].
76  * instrument: the instrument to process
77  * psd: a psd frequency series
78  * psd_fft_length: length of fft used for whitening
79  * ht_gate_threshold: gate h(t) if it crosses this value
80  * veto_segments: segments to mark as gaps after whitening
81  * track_psd: decide whether to dynamically track the spectrum or use the fixed spectrum provided
82  * width: type convert to either 32 or 64 bit float
83  * fir_whiten_reference_psd: when using FIR whitener, use this PSD to define desired desired phase response
84 
85  **Gstreamer graph describing this function**
86 
87  .. graphviz::
88 
89  digraph mkbasicsrc {
90  rankdir = LR;
91  compound=true;
92  node [shape=record fontsize=10 fontname="Verdana"];
93  edge [fontsize=8 fontname="Verdana"];
94 
95  capsfilter1 ;
96  audioresample ;
97  capsfilter2 ;
98  reblock ;
99  whiten ;
100  audioconvert ;
101  capsfilter3 ;
102  "segmentsrcgate()" [label="segmentsrcgate() \\n [iff veto segment list provided]", style=filled, color=lightgrey];
103  tee ;
104  audioamplifyr1 ;
105  capsfilterr1 ;
106  htgater1 [label="htgate() \\n [iff ht gate specified]", style=filled, color=lightgrey];
107  tee1 ;
108  audioamplifyr2 ;
109  capsfilterr2 ;
110  htgater2 [label="htgate() \\n [iff ht gate specified]", style=filled, color=lightgrey];
111  tee2 ;
112  audioamplify_rn ;
113  capsfilter_rn ;
114  htgate_rn [style=filled, color=lightgrey, label="htgate() \\n [iff ht gate specified]"];
115  tee ;
116 
117  // nodes
118 
119  "<src>" -> capsfilter1 -> audioresample;
120  audioresample -> capsfilter2;
121  capsfilter2 -> reblock;
122  reblock -> whiten;
123  whiten -> audioconvert;
124  audioconvert -> capsfilter3;
125  capsfilter3 -> "segmentsrcgate()";
126  "segmentsrcgate()" -> tee;
127 
128  tee -> audioamplifyr1 [label="Rate 1"];
129  audioamplifyr1 -> capsfilterr1;
130  capsfilterr1 -> htgater1;
131  htgater1 -> tee1 -> "<return> 1";
132 
133  tee -> audioamplifyr2 [label="Rate 2"];
134  audioamplifyr2 -> capsfilterr2;
135  capsfilterr2 -> htgater2;
136  htgater2 -> tee2 -> "<return> 2";
137 
138  tee -> audioamplify_rn [label="Rate N"];
139  audioamplify_rn -> capsfilter_rn;
140  capsfilter_rn -> htgate_rn;
141  htgate_rn -> tee_n -> "<return> 3";
142  }
143 
144  """
145  #
146  # input sanity checks
147  #
148 
149  if psd is None and not track_psd:
150  raise ValueError("must enable track_psd when psd is None")
151  if int(psd_fft_length) != psd_fft_length:
152  raise ValueError("psd_fft_length must be an integer")
153  psd_fft_length = int(psd_fft_length)
154 
155  #
156  # set default whitener zero-padding if needed
157  #
158 
159  if zero_pad is None:
160  if FIR_WHITENER:
161  # in this configuration we are not asking the
162  # whitener to reassemble an output time series
163  # (that we care about) so we disable zero-padding
164  # to get the most information from the whitener's
165  # FFT blocks.
166  zero_pad = 0
167  elif psd_fft_length % 4:
168  raise ValueError("default whitener zero-padding requires psd_fft_length to be multiple of 4")
169  else:
170  zero_pad = psd_fft_length // 4
171 
172  #
173  # down-sample to highest of target sample rates. we include a caps
174  # filter upstream of the resampler to ensure that this is, infact,
175  # *down*-sampling. if the source time series has a lower sample
176  # rate than the highest target sample rate the resampler will
177  # become an upsampler, and the result will likely interact poorly
178  # with the whitener as it tries to ampify the non-existant
179  # high-frequency components, possibly adding significant numerical
180  # noise to its output. if you see errors about being unable to
181  # negotiate a format from this stage in the pipeline, it is because
182  # you are asking for output sample rates that are higher than the
183  # sample rate of your data source.
184  #
185 
186  head = pipeparts.mkcapsfilter(pipeline, src, "audio/x-raw, rate=[%d,MAX]" % max(rates))
187  head = pipeparts.mkinterpolator(pipeline, head)
188  head = pipeparts.mkaudioconvert(pipeline, head)
189  head = pipeparts.mkchecktimestamps(pipeline, head, "%s_timestamps_%d_hoft" % (instrument, max(rates)))
190 
191  #
192  # optionally add vetoes
193  #
194 
195  # FIXME NOTE The pre whitening vetoes are only applied via the
196  # deglitcher if they are impulse like. What that means is that they
197  # have to be shorter than 1s.
198  # This should be revisted for O3.
199  if veto_segments is not None:
200  short_veto_segments = segments.segmentlist([seg for seg in veto_segments if abs(seg) < 1.0]).protract(0.25).coalesce()
201  head = pipeparts.mkdeglitcher(pipeline, head, short_veto_segments)
202 
203  #
204  # construct whitener.
205  #
206 
207  if FIR_WHITENER:
208  head = pipeparts.mktee(pipeline, head)
209  whiten = pipeparts.mkwhiten(pipeline, head, fft_length = psd_fft_length, zero_pad = zero_pad, average_samples = 64, median_samples = 7, expand_gaps = True, name = "lal_whiten_%s" % instrument)
210  pipeparts.mkfakesink(pipeline, whiten)
211 
212  # high pass filter
213  kernel = reference_psd.one_second_highpass_kernel(max(rates), cutoff = 12)
214  block_stride = block_duration * max(rates) // Gst.SECOND
215  assert len(kernel) % 2 == 1, "high-pass filter length is not odd"
216  head = pipeparts.mkfirbank(pipeline, head, fir_matrix = numpy.array(kernel, ndmin = 2), block_stride = block_stride, time_domain = False, latency = (len(kernel) - 1) // 2)
217 
218  # FIR filter for whitening kernel
219  head = pipeparts.mktdwhiten(pipeline, head, kernel = numpy.zeros(1 + max(rates) * psd_fft_length, dtype=numpy.float64), latency = 0)
220 
221  # compute whitening kernel from PSD
222  def set_fir_psd(whiten, pspec, firelem, psd_fir_kernel):
223  psd_data = numpy.array(whiten.get_property("mean-psd"))
224  psd = lal.CreateREAL8FrequencySeries(
225  name = "psd",
226  epoch = lal.LIGOTimeGPS(0),
227  f0 = 0.0,
228  deltaF = whiten.get_property("delta-f"),
229  sampleUnits = lal.Unit(whiten.get_property("psd-units")),
230  length = len(psd_data)
231  )
232  psd.data.data = psd_data
233  kernel, latency, sample_rate = psd_fir_kernel.psd_to_linear_phase_whitening_fir_kernel(psd)
234  kernel, phase = psd_fir_kernel.linear_phase_fir_kernel_to_minimum_phase_whitening_fir_kernel(kernel, sample_rate)
235  firelem.set_property("kernel", kernel)
236  firkernel = reference_psd.PSDFirKernel()
237  if fir_whiten_reference_psd is not None:
238  assert fir_whiten_reference_psd.f0 == 0.
239  # interpolate the reference phase PSD if its
240  # resolution doesn't match what we'll eventually
241  # require it to be.
242  if psd_fft_length != round(1. / fir_whiten_reference_psd.deltaF):
243  fir_whiten_reference_psd = reference_psd.interpolate_psd(fir_whiten_reference_psd, 1. / psd_fft_length)
244  # confirm that the reference phase PSD's Nyquist is
245  # sufficiently high, then reduce it to the required
246  # Nyquist if needed.
247  assert (psd_fft_length * max(rates)) // 2 + 1 <= len(fir_whiten_reference_psd.data.data), "fir_whiten_reference_psd Nyquist too low"
248  if (psd_fft_length * max(rates)) // 2 + 1 < len(fir_whiten_reference_psd.data.data):
249  fir_whiten_reference_psd = lal.CutREAL8FrequencySeries(fir_whiten_reference_psd, 0, (psd_fft_length * max(rates)) // 2 + 1)
250  # set the reference phase PSD
251  firkernel.set_phase(fir_whiten_reference_psd)
252  whiten.connect_after("notify::mean-psd", set_fir_psd, head, firkernel)
253 
254  # Gate after gaps. the queue sizes on the control inputs
255  # need only be large enough to hold the state vector
256  # streams until they are required. the streams will be
257  # consumed immediately when needed, so there is no risk
258  # that these queues add to the latency, so make them
259  # generously large.
260  # FIXME the -max(rates) extra padding is for the high pass
261  # filter: NOTE it also needs to be big enough for the
262  # downsampling filter, but that is typically smaller than the
263  # HP filter (192 samples at Qual 9)
264  # FIXME: this first queue should not be needed. what is
265  # going on!?
266  if statevector is not None or dqvector is not None:
267  head = pipeparts.mkqueue(pipeline, head, max_size_buffers = 0, max_size_bytes = 0, max_size_time = Gst.SECOND * (psd_fft_length + 2))
268  if statevector is not None:
269  head = pipeparts.mkgate(pipeline, head, control = pipeparts.mkqueue(pipeline, statevector, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 0), default_state = False, threshold = 1, hold_length = -max(rates), attack_length = -max(rates) * (psd_fft_length + 1))
270  if dqvector is not None:
271  head = pipeparts.mkgate(pipeline, head, control = pipeparts.mkqueue(pipeline, dqvector, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 0), default_state = False, threshold = 1, hold_length = -max(rates), attack_length = -max(rates) * (psd_fft_length + 1))
272  head = pipeparts.mkchecktimestamps(pipeline, head, "%s_timestamps_fir" % instrument)
273  #head = pipeparts.mknxydumpsinktee(pipeline, head, filename = "after_mkfirbank.txt")
274  else:
275  # FIXME: we should require fir_whiten_reference_psd to be
276  # None in this code path for safety, but that's hard to do
277  # since the calling code would need to know what
278  # environment variable is being used to select the mode,
279  # and we don't want to be duplicating that code all over
280  # the place
281 
282  #
283  # add a reblock element. the whitener's gap support isn't
284  # 100% yet and giving it smaller input buffers works around
285  # the remaining weaknesses (namely that when it sees a gap
286  # buffer large enough to drain its internal history, it
287  # doesn't know enough to produce a short non-gap buffer to
288  # drain its history followed by a gap buffer, it just
289  # produces one huge non-gap buffer that's mostly zeros).
290  # this is not required in the FIR-whitener case because
291  # there we don't use the whitener's output time series for
292  # anything.
293  #
294 
295  head = pipeparts.mkreblock(pipeline, head, block_duration = block_duration)
296 
297  head = whiten = pipeparts.mkwhiten(pipeline, head, fft_length = psd_fft_length, zero_pad = zero_pad, average_samples = 64, median_samples = 7, expand_gaps = True, name = "lal_whiten_%s" % instrument)
298  # make the buffers going downstream smaller, this can
299  # really help with RAM
300  head = pipeparts.mkreblock(pipeline, head, block_duration = block_duration)
301 
302  #
303  # enable/disable PSD tracking
304  #
305 
306  whiten.set_property("psd-mode", 0 if track_psd else 1)
307 
308  #
309  # install signal handler to retrieve \Delta f and f_{Nyquist}
310  # whenever they are known and/or change, resample the user-supplied
311  # PSD, and install it into the whitener.
312  #
313 
314  if psd is not None:
315  def psd_units_or_resolution_changed(elem, pspec, psd):
316  # make sure units are set, compute scale factor
317  units = lal.Unit(elem.get_property("psd-units"))
318  if units == lal.DimensionlessUnit:
319  return
320  scale = float(psd.sampleUnits / units)
321  # get frequency resolution and number of bins
322  delta_f = elem.get_property("delta-f")
323  n = int(round(elem.get_property("f-nyquist") / delta_f) + 1)
324  # interpolate, rescale, and install PSD
325  psd = reference_psd.interpolate_psd(psd, delta_f)
326  elem.set_property("mean-psd", psd.data.data[:n] * scale)
327  whiten.connect_after("notify::f-nyquist", psd_units_or_resolution_changed, psd)
328  whiten.connect_after("notify::delta-f", psd_units_or_resolution_changed, psd)
329  whiten.connect_after("notify::psd-units", psd_units_or_resolution_changed, psd)
330 
331  #
332  # convert to desired precision
333  #
334 
335  head = pipeparts.mkaudioconvert(pipeline, head)
336  if width == 64:
337  head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, rate=%d, format=%s" % (max(rates), GstAudio.AudioFormat.to_string(GstAudio.AudioFormat.F64)))
338  elif width == 32:
339  head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, rate=%d, format=%s" % (max(rates), GstAudio.AudioFormat.to_string(GstAudio.AudioFormat.F32)))
340  else:
341  raise ValueError("invalid width: %d" % width)
342  head = pipeparts.mkchecktimestamps(pipeline, head, "%s_timestamps_%d_whitehoft" % (instrument, max(rates)))
343 
344  #
345  # optionally add vetoes
346  #
347 
348  if veto_segments is not None:
349  long_veto_segments = segments.segmentlist(veto_segments).protract(0.25).coalesce()
350  head = pipeparts.mkdeglitcher(pipeline, head, long_veto_segments)
351 
352  #
353  # optional gate on whitened h(t) amplitude. attack and hold are
354  # made to be 1/4 second or 1 sample, whichever is larger
355  #
356 
357  # FIXME: this could be omitted if ht_gate_threshold is None, but
358  # we need to collect whitened h(t) segments, however something
359  # could be done to collect those if these gates aren't here.
360  ht_gate_window = max(max(rates) // 4, 1) # samples
361  head = datasource.mkhtgate(pipeline, head, threshold = ht_gate_threshold if ht_gate_threshold is not None else float("+inf"), hold_length = ht_gate_window, attack_length = ht_gate_window, name = "%s_ht_gate" % instrument)
362  # emit signals so that a user can latch on to them
363  head.set_property("emit-signals", True)
364 
365  #
366  # tee for highest sample rate stream
367  #
368 
369  head = {max(rates): pipeparts.mktee(pipeline, head)}
370 
371  #
372  # down-sample whitened time series to remaining target sample rates
373  # while applying an amplitude correction to adjust for low-pass
374  # filter roll-off. we also scale by \sqrt{original rate / new
375  # rate}. this is done to preserve the square magnitude of the time
376  # series --- the inner product of the time series with itself.
377  # really what we want is for
378  #
379  # \int v_{1}(t) v_{2}(t) \diff t
380  # \approx \sum v_{1}(t) v_{2}(t) \Delta t
381  #
382  # to be preserved across different sample rates, i.e. for different
383  # \Delta t. what we do is rescale the time series and ignore
384  # \Delta t, so we put 1/2 factor of the ratio of the \Delta t's
385  # into the h(t) time series here, and, later, another 1/2 factor
386  # into the template when it gets downsampled.
387  #
388  # by design, the output of the whitener is a unit-variance time
389  # series. however, downsampling it reduces the variance due to the
390  # removal of some frequency components. we require the input to
391  # the orthogonal filter banks to be unit variance, therefore a
392  # correction factor is applied via an audio amplify element to
393  # adjust for the reduction in variance due to the downsampler.
394  #
395 
396  for rate in sorted(set(rates))[:-1]:
397  # downsample. make sure each output stream is unit
398  # normalized, otherwise the audio resampler removes power
399  # according to the rate difference and filter rolloff
400  if unit_normalize:
401  # NOTE the interpolator is about as good as the
402  # audioresampler at quality 10, hence the 10.
403  head[rate] = pipeparts.mkaudioamplify(pipeline, head[max(rates)], 1. / math.sqrt(pipeparts.audioresample_variance_gain(10, max(rates), rate)))
404  else:
405  head[rate] = head[max(rates)]
406  head[rate] = pipeparts.mkcapsfilter(pipeline, pipeparts.mkinterpolator(pipeline, head[rate]), caps = "audio/x-raw, rate=%d" % rate)
407  head[rate] = pipeparts.mkchecktimestamps(pipeline, head[rate], "%s_timestamps_%d_whitehoft" % (instrument, rate))
408 
409  head[rate] = pipeparts.mktee(pipeline, head[rate])
410 
411  #
412  # done. return value is a dictionary of tee elements indexed by
413  # sample rate
414  #
415 
416  return head
417