gstlal  1.4.1
whiten_test_01.py
1 #!/usr/bin/env python
2 # Copyright (C) 2009,2010 Kipp Cannon
3 #
4 # This program is free software; you can redistribute it and/or modify it
5 # under the terms of the GNU General Public License as published by the
6 # Free Software Foundation; either version 2 of the License, or (at your
7 # option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
12 # Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License along
15 # with this program; if not, write to the Free Software Foundation, Inc.,
16 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17 
18 #
19 # =============================================================================
20 #
21 # Preamble
22 #
23 # =============================================================================
24 #
25 
26 
27 import numpy
28 import sys
29 import gi
30 gi.require_version('Gst', '1.0')
31 from gi.repository import GObject, Gst
32 GObject.threads_init()
33 Gst.init(None)
34 
35 
36 from gstlal import pipeparts
37 import cmp_nxydumps
38 import test_common
39 
40 
41 #
42 # =============================================================================
43 #
44 # Pipelines
45 #
46 # =============================================================================
47 #
48 
49 
50 #
51 # is the whiten element an identity transform when given a unit PSD? in
52 # and out timeseries should be identical modulo FFT precision and start-up
53 # and shut-down transients.
54 #
55 
56 
57 def whiten_test_01a(pipeline, name):
58  #
59  # signal handler to construct a new unit PSD (with LAL's
60  # normalization) whenever the frequency resolution or Nyquist
61  # frequency changes. LAL's normalization is such that the integral
62  # of the PSD yields the variance in the time domain, therefore
63  #
64  # PSD = 1 / (n \Delta f)
65  #
66 
67  def psd_resolution_changed(elem, pspec, ignored):
68  delta_f = elem.get_property("delta-f")
69  f_nyquist = elem.get_property("f-nyquist")
70  n = int(round(f_nyquist / delta_f) + 1)
71  elem.set_property("mean-psd", numpy.ones((n,), dtype = "double") / (n * delta_f))
72 
73  #
74  # try changing these. test should still work!
75  #
76 
77  rate = 2048 # Hz
78  zero_pad = 0.0 # seconds
79  fft_length = 4.0 # seconds
80  buffer_length = 1.0 # seconds
81  test_duration = 100.0 # seconds
82 
83  #
84  # build pipeline
85  #
86 
87  head = test_common.test_src(pipeline, buffer_length = buffer_length, rate = rate, test_duration = test_duration, wave = 9)
88  head = pipeparts.mkgeneric(pipeline, head, "audiocheblimit", mode = 1, cutoff = 0.25)
89  head = tee = pipeparts.mktee(pipeline, head)
90  head = pipeparts.mkwhiten(pipeline, head, psd_mode = 1, zero_pad = zero_pad, fft_length = fft_length)
91  head.connect_after("notify::f-nyquist", psd_resolution_changed, None)
92  head.connect_after("notify::delta-f", psd_resolution_changed, None)
93  head = pipeparts.mkchecktimestamps(pipeline, head)
94  pipeparts.mknxydumpsink(pipeline, pipeparts.mkqueue(pipeline, head), "%s_out.dump" % name)
95  pipeparts.mknxydumpsink(pipeline, pipeparts.mkqueue(pipeline, tee, max_size_time = int(fft_length * Gst.SECOND)), "%s_in.dump" % name)
96 
97  #
98  # done
99  #
100 
101  return pipeline
102 
103 
104 #
105 # does the whitener turn coloured Gaussian noise into zero-mean,
106 # unit-variance stationary white Gaussian noise?
107 #
108 
109 
110 def whiten_test_01b(pipeline, name):
111  #
112  # try changing these. test should still work!
113  #
114 
115  rate = 2048 # Hz
116  zero_pad = 0.0 # seconds
117  fft_length = 4.0 # seconds
118  buffer_length = 1.0 # seconds
119  test_duration = 10000.0 # seconds
120 
121  #
122  # build pipeline
123  #
124 
125  head = test_common.test_src(pipeline, buffer_length = buffer_length, rate = rate, test_duration = test_duration, wave = 6)
126  head = pipeparts.mkwhiten(pipeline, head, psd_mode = 0, zero_pad = zero_pad, fft_length = fft_length)
127  head = pipeparts.mkchecktimestamps(pipeline, head)
128  pipeparts.mknxydumpsink(pipeline, pipeparts.mkqueue(pipeline, head), "%s_out.dump" % name)
129 
130  #
131  # done
132  #
133 
134  return pipeline
135 
136 
137 #
138 # =============================================================================
139 #
140 # Main
141 #
142 # =============================================================================
143 #
144 
145 
146 test_common.build_and_run(whiten_test_01a, "whiten_test_01a")
147 test_common.build_and_run(whiten_test_01b, "whiten_test_01b")
148 
149 cmp_nxydumps.compare("whiten_test_01a_in.dump", "whiten_test_01a_out.dump", transients = (2.0, 2.0), sample_fuzz = 1e-2)
def compare(filename1, filename2, args, kwargs)
def build_and_run(pipelinefunc, name, segment=None, pipelinefunc_kwargs)
Definition: test_common.py:118
def test_src(pipeline, buffer_length=1.0, rate=2048, width=64, channels=1, test_duration=10.0, wave=5, freq=0, is_live=False, verbose=True)
Definition: test_common.py:71