gstlal  1.4.1
firbank_test_01.py
1 #!/usr/bin/env python
2 # Copyright (C) 2009,2010 Kipp Cannon
3 #
4 # This program is free software; you can redistribute it and/or modify it
5 # under the terms of the GNU General Public License as published by the
6 # Free Software Foundation; either version 2 of the License, or (at your
7 # option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
12 # Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License along
15 # with this program; if not, write to the Free Software Foundation, Inc.,
16 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17 
18 #
19 # =============================================================================
20 #
21 # Preamble
22 #
23 # =============================================================================
24 #
25 
26 
27 import itertools
28 import numpy
29 from gstlal import pipeparts
30 import cmp_nxydumps
31 import test_common
32 
33 
34 #
35 # =============================================================================
36 #
37 # Pipelines
38 #
39 # =============================================================================
40 #
41 
42 
43 #
44 # is the firbank element an identity transform when given a unit impulse?
45 # in and out timeseries should be identical modulo start/stop transients
46 #
47 
48 
49 def firbank_test_01(pipeline, name, width, time_domain, gap_frequency):
50  #
51  # try changing these. test should still work!
52  #
53 
54  rate = 2048 # Hz
55  gap_frequency = gap_frequency # Hz
56  gap_threshold = 0.8 # of 1
57  buffer_length = 1.0 # seconds
58  test_duration = 10.0 # seconds
59  fir_length = 21 # samples
60  latency = (fir_length - 1) / 2 # samples, in [0, fir_length)
61 
62  #
63  # build pipeline
64  #
65 
66  head = test_common.gapped_test_src(pipeline, buffer_length = buffer_length, rate = rate, width = width, test_duration = test_duration, gap_frequency = gap_frequency, gap_threshold = gap_threshold, control_dump_filename = "%s_control.dump" % name)
67  head = tee = pipeparts.mktee(pipeline, head)
68 
69  fir_matrix = numpy.zeros((1, fir_length), dtype = "double")
70  fir_matrix[0, (fir_matrix.shape[1] - 1) - latency] = 1.0
71 
72  head = pipeparts.mkfirbank(pipeline, head, fir_matrix = fir_matrix, latency = latency, time_domain = time_domain)
73  head = pipeparts.mkchecktimestamps(pipeline, head)
74  pipeparts.mknxydumpsink(pipeline, pipeparts.mkqueue(pipeline, head), "%s_out.dump" % name)
75  pipeparts.mknxydumpsink(pipeline, pipeparts.mkqueue(pipeline, tee), "%s_in.dump" % name)
76 
77  #
78  # done
79  #
80 
81  return pipeline
82 
83 
84 #
85 # does the firbank work with input and output streams that have differnt
86 # numbers of channels
87 #
88 
89 
90 def firbank_test_02(pipeline, name, width, time_domain):
91  # 1 channel goes into firbank
92  head = test_common.test_src(pipeline, buffer_length = 10.0, rate = 16384, width = width, channels = 1, test_duration = 200.0, wave = 5, verbose = True)
93  # 200 channels come out
94  head = pipeparts.mkfirbank(pipeline, head, fir_matrix = numpy.ones((200, 1)), time_domain = time_domain)
95  pipeparts.mkfakesink(pipeline, head)
96 
97  #
98  # done
99  #
100 
101  return pipeline
102 
103 
104 #
105 # =============================================================================
106 #
107 # Main
108 #
109 # =============================================================================
110 #
111 
112 
113 flags = cmp_nxydumps.COMPARE_FLAGS_EXACT_GAPS | cmp_nxydumps.COMPARE_FLAGS_ZERO_IS_GAP | cmp_nxydumps.COMPARE_FLAGS_ALLOW_STARTSTOP_MISALIGN
114 
115 for gap_frequency, width, time_domain in itertools.product(
116  (153.0, 13.0, 0.13),
117  (32, 64),
118  (True, False)
119  ):
120  name = "firbank_test_01_%d%s_%.2f" % (width, ("TD" if time_domain else "FD"), gap_frequency)
121  test_common.build_and_run(firbank_test_01, name, width = width, time_domain = time_domain, gap_frequency = gap_frequency)
122  if width == 64:
123  cmp_nxydumps.compare("%s_in.dump" % name, "%s_out.dump" % name, flags = flags)
124  else:
125  cmp_nxydumps.compare("%s_in.dump" % name, "%s_out.dump" % name, flags = flags, sample_fuzz = 1e-6)
126 
127 
128 test_common.build_and_run(firbank_test_02, "firbank_test_02a", width = 64, time_domain = True)
def compare(filename1, filename2, args, kwargs)
def build_and_run(pipelinefunc, name, segment=None, pipelinefunc_kwargs)
Definition: test_common.py:118
def gapped_test_src(pipeline, buffer_length=1.0, rate=2048, width=64, channels=1, test_duration=10.0, wave=5, freq=0, gap_frequency=None, gap_threshold=None, control_dump_filename=None, tags=None, is_live=False, verbose=True)
Definition: test_common.py:95
def test_src(pipeline, buffer_length=1.0, rate=2048, width=64, channels=1, test_duration=10.0, wave=5, freq=0, is_live=False, verbose=True)
Definition: test_common.py:71