gstlal  1.4.1
test_common.py
1 # Copyright (C) 2009--2011,2013 Kipp Cannon
2 #
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License as published by the
5 # Free Software Foundation; either version 2 of the License, or (at your
6 # option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
11 # Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 
17 #
18 # =============================================================================
19 #
20 # Preamble
21 #
22 # =============================================================================
23 #
24 
25 
26 import numpy
27 import sys
28 
29 
30 import gi
31 gi.require_version('Gst', '1.0')
32 from gi.repository import GObject
33 from gi.repository import Gst
34 
35 
36 from gstlal import pipeparts
37 from gstlal import pipeio
38 from gstlal import simplehandler
39 
40 
41 GObject.threads_init()
42 Gst.init(None)
43 
44 
45 if sys.byteorder == "little":
46  BYTE_ORDER = "LE"
47 else:
48  BYTE_ORDER = "BE"
49 
50 
51 #
52 # =============================================================================
53 #
54 # Utilities
55 #
56 # =============================================================================
57 #
58 
59 
60 def complex_test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, test_duration = 10.0, wave = 5, freq = 0, is_live = False, verbose = True):
61  assert not width % 8
62  samplesperbuffer = int(round(buffer_length * rate))
63  head = pipeparts.mkaudiotestsrc(pipeline, wave = wave, freq = freq, volume = 1, blocksize = (width / 8 * 2) * samplesperbuffer, samplesperbuffer = samplesperbuffer, num_buffers = int(round(test_duration / buffer_length)), is_live = is_live)
64  head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, format=Z%d%s, rate=%d, channels=2" % (width, BYTE_ORDER, rate))
65  head = pipeparts.mktogglecomplex(pipeline, head)
66  if verbose:
67  head = pipeparts.mkprogressreport(pipeline, head, "src")
68  return head
69 
70 
71 def test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, channels = 1, test_duration = 10.0, wave = 5, freq = 0, is_live = False, verbose = True):
72  assert not width % 8
73  if wave == "ligo":
74  head = pipeparts.mkfakeLIGOsrc(pipeline, instrument = "H1", channel_name = "LSC-STRAIN")
75  else:
76  samplesperbuffer = int(round(buffer_length * rate))
77  head = pipeparts.mkaudiotestsrc(pipeline, wave = wave, freq = freq, volume = 1, blocksize = (width / 8 * channels) * samplesperbuffer, samplesperbuffer = samplesperbuffer, num_buffers = int(round(test_duration / buffer_length)), is_live = is_live)
78  head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, format=F%d%s, rate=%d, channels=%d" % (width, BYTE_ORDER, rate, channels))
79  if verbose:
80  head = pipeparts.mkprogressreport(pipeline, head, "src")
81  return head
82 
83 
84 def add_gaps(pipeline, head, buffer_length, rate, test_duration, gap_frequency = None, gap_threshold = None, control_dump_filename = None):
85  if gap_frequency is None:
86  return head
87  samplesperbuffer = int(round(buffer_length * rate))
88  control = pipeparts.mkcapsfilter(pipeline, pipeparts.mkaudiotestsrc(pipeline, wave = 0, freq = gap_frequency, volume = 1, blocksize = 4 * samplesperbuffer, samplesperbuffer = samplesperbuffer, num_buffers = int(round(test_duration / buffer_length))), "audio/x-raw, format=F32%s, rate=%d, channels=1" % (BYTE_ORDER, rate))
89  if control_dump_filename is not None:
90  control = pipeparts.mknxydumpsinktee(pipeline, pipeparts.mkqueue(pipeline, control), control_dump_filename)
91  control = pipeparts.mkqueue(pipeline, control)
92  return pipeparts.mkgate(pipeline, head, control = control, threshold = gap_threshold)
93 
94 
95 def gapped_test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, channels = 1, test_duration = 10.0, wave = 5, freq = 0, gap_frequency = None, gap_threshold = None, control_dump_filename = None, tags = None, is_live = False, verbose = True):
96  src = test_src(pipeline, buffer_length = buffer_length, rate = rate, width = width, channels = channels, test_duration = test_duration, wave = wave, freq = freq, is_live = is_live, verbose = verbose)
97  if tags is not None:
98  src = pipeparts.mktaginject(pipeline, src, tags)
99  return add_gaps(pipeline, src, buffer_length = buffer_length, rate = rate, test_duration = test_duration, gap_frequency = gap_frequency, gap_threshold = gap_threshold, control_dump_filename = control_dump_filename)
100 
101 
102 def gapped_complex_test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, test_duration = 10.0, wave = 5, freq = 0, gap_frequency = None, gap_threshold = None, control_dump_filename = None, tags = None, is_live = False, verbose = True):
103  src = complex_test_src(pipeline, buffer_length = buffer_length, rate = rate, width = width, test_duration = test_duration, wave = wave, freq = freq, is_live = is_live, verbose = verbose)
104  if tags is not None:
105  src = pipeparts.mktaginject(pipeline, src, tags)
106  return pipeparts.mktogglecomplex(pipeline, add_gaps(pipeline, pipeparts.mktogglecomplex(pipeline, src), buffer_length = buffer_length, rate = rate, test_duration = test_duration, gap_frequency = gap_frequency, gap_threshold = gap_threshold, control_dump_filename = control_dump_filename))
107 
108 
109 #
110 # =============================================================================
111 #
112 # Pipeline Builder
113 #
114 # =============================================================================
115 #
116 
117 
118 def build_and_run(pipelinefunc, name, segment = None, **pipelinefunc_kwargs):
119  print >>sys.stderr, "=== Running Test %s ===" % name
120  mainloop = GObject.MainLoop()
121  pipeline = pipelinefunc(Gst.Pipeline(name = name), name, **pipelinefunc_kwargs)
122  handler = simplehandler.Handler(mainloop, pipeline)
123  if segment is not None:
124  if pipeline.set_state(Gst.State.PAUSED) == Gst.StateChangeReturn.FAILURE:
125  raise RuntimeError("pipeline failed to enter PLAYING state")
126  pipeline.seek(1.0, Gst.Format(Gst.Format.TIME), Gst.SeekFlags.FLUSH, Gst.SeekType.SET, segment[0].ns(), Gst.SeekType.SET, segment[1].ns())
127  if pipeline.set_state(Gst.State.PLAYING) == Gst.StateChangeReturn.FAILURE:
128  raise RuntimeError("pipeline failed to enter PLAYING state")
129  mainloop.run()
130 
131 
132 #
133 # =============================================================================
134 #
135 # Push Arrays Through an Element
136 #
137 # =============================================================================
138 #
139 
140 
141 def transform_arrays(input_arrays, elemfunc, name, rate = 1, **elemfunc_kwargs):
142  input_arrays = list(input_arrays) # so we can modify it
143  output_arrays = []
144 
145  pipeline = Gst.Pipeline(name = name)
146 
147  head = pipeparts.mkgeneric(pipeline, None, "appsrc", caps = pipeio.caps_from_array(input_arrays[0], rate = rate))
148  def need_data(elem, arg, (input_arrays, rate)):
149  if input_arrays:
150  arr = input_arrays.pop(0)
151  elem.set_property("caps", pipeio.caps_from_array(arr, rate))
152  buf = pipeio.audio_buffer_from_array(arr, 0, 0, rate)
153  elem.emit("push-buffer", pipeio.audio_buffer_from_array(arr, 0, 0, rate))
154  return Gst.FlowReturn.OK
155  else:
156  elem.emit("end-of-stream")
157  return Gst.FlowReturn.EOS
158  head.connect("need-data", need_data, (input_arrays, rate))
159 
160  head = elemfunc(pipeline, head, **elemfunc_kwargs)
161 
162  head = pipeparts.mkappsink(pipeline, head)
163  def appsink_get_array(elem, output_arrays):
164  output_arrays.append(pipeio.array_from_audio_sample(elem.emit("pull-sample")))
165  return Gst.FlowReturn.OK
166 
167  head.connect("new-sample", appsink_get_array, output_arrays)
168  build_and_run((lambda *args, **kwargs: pipeline), name)
169 
170  return output_arrays