31 gi.require_version(
'Gst',
'1.0')
32 from gi.repository
import GObject
33 from gi.repository
import Gst
36 from gstlal
import pipeparts
37 from gstlal
import pipeio
38 from gstlal
import simplehandler
41 GObject.threads_init()
45 if sys.byteorder ==
"little":
60 def complex_test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, test_duration = 10.0, wave = 5, freq = 0, is_live = False, verbose = True):
62 samplesperbuffer = int(round(buffer_length * rate))
63 head = pipeparts.mkaudiotestsrc(pipeline, wave = wave, freq = freq, volume = 1, blocksize = (width / 8 * 2) * samplesperbuffer, samplesperbuffer = samplesperbuffer, num_buffers = int(round(test_duration / buffer_length)), is_live = is_live)
64 head = pipeparts.mkcapsfilter(pipeline, head,
"audio/x-raw, format=Z%d%s, rate=%d, channels=2" % (width, BYTE_ORDER, rate))
65 head = pipeparts.mktogglecomplex(pipeline, head)
67 head = pipeparts.mkprogressreport(pipeline, head,
"src")
71 def test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, channels = 1, test_duration = 10.0, wave = 5, freq = 0, is_live = False, verbose = True):
74 head = pipeparts.mkfakeLIGOsrc(pipeline, instrument =
"H1", channel_name =
"LSC-STRAIN")
76 samplesperbuffer = int(round(buffer_length * rate))
77 head = pipeparts.mkaudiotestsrc(pipeline, wave = wave, freq = freq, volume = 1, blocksize = (width / 8 * channels) * samplesperbuffer, samplesperbuffer = samplesperbuffer, num_buffers = int(round(test_duration / buffer_length)), is_live = is_live)
78 head = pipeparts.mkcapsfilter(pipeline, head,
"audio/x-raw, format=F%d%s, rate=%d, channels=%d" % (width, BYTE_ORDER, rate, channels))
80 head = pipeparts.mkprogressreport(pipeline, head,
"src")
84 def add_gaps(pipeline, head, buffer_length, rate, test_duration, gap_frequency = None, gap_threshold = None, control_dump_filename = None):
85 if gap_frequency
is None:
87 samplesperbuffer = int(round(buffer_length * rate))
88 control = pipeparts.mkcapsfilter(pipeline, pipeparts.mkaudiotestsrc(pipeline, wave = 0, freq = gap_frequency, volume = 1, blocksize = 4 * samplesperbuffer, samplesperbuffer = samplesperbuffer, num_buffers = int(round(test_duration / buffer_length))),
"audio/x-raw, format=F32%s, rate=%d, channels=1" % (BYTE_ORDER, rate))
89 if control_dump_filename
is not None:
90 control = pipeparts.mknxydumpsinktee(pipeline, pipeparts.mkqueue(pipeline, control), control_dump_filename)
91 control = pipeparts.mkqueue(pipeline, control)
92 return pipeparts.mkgate(pipeline, head, control = control, threshold = gap_threshold)
95 def gapped_test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, channels = 1, test_duration = 10.0, wave = 5, freq = 0, gap_frequency = None, gap_threshold = None, control_dump_filename = None, tags = None, is_live = False, verbose = True):
96 src = test_src(pipeline, buffer_length = buffer_length, rate = rate, width = width, channels = channels, test_duration = test_duration, wave = wave, freq = freq, is_live = is_live, verbose = verbose)
98 src = pipeparts.mktaginject(pipeline, src, tags)
99 return add_gaps(pipeline, src, buffer_length = buffer_length, rate = rate, test_duration = test_duration, gap_frequency = gap_frequency, gap_threshold = gap_threshold, control_dump_filename = control_dump_filename)
102 def gapped_complex_test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, test_duration = 10.0, wave = 5, freq = 0, gap_frequency = None, gap_threshold = None, control_dump_filename = None, tags = None, is_live = False, verbose = True):
103 src = complex_test_src(pipeline, buffer_length = buffer_length, rate = rate, width = width, test_duration = test_duration, wave = wave, freq = freq, is_live = is_live, verbose = verbose)
105 src = pipeparts.mktaginject(pipeline, src, tags)
106 return pipeparts.mktogglecomplex(pipeline, add_gaps(pipeline, pipeparts.mktogglecomplex(pipeline, src), buffer_length = buffer_length, rate = rate, test_duration = test_duration, gap_frequency = gap_frequency, gap_threshold = gap_threshold, control_dump_filename = control_dump_filename))
118 def build_and_run(pipelinefunc, name, segment = None, **pipelinefunc_kwargs):
119 print >>sys.stderr,
"=== Running Test %s ===" % name
120 mainloop = GObject.MainLoop()
121 pipeline = pipelinefunc(Gst.Pipeline(name = name), name, **pipelinefunc_kwargs)
122 handler = simplehandler.Handler(mainloop, pipeline)
123 if segment
is not None:
124 if pipeline.set_state(Gst.State.PAUSED) == Gst.StateChangeReturn.FAILURE:
125 raise RuntimeError(
"pipeline failed to enter PLAYING state")
126 pipeline.seek(1.0, Gst.Format(Gst.Format.TIME), Gst.SeekFlags.FLUSH, Gst.SeekType.SET, segment[0].ns(), Gst.SeekType.SET, segment[1].ns())
127 if pipeline.set_state(Gst.State.PLAYING) == Gst.StateChangeReturn.FAILURE:
128 raise RuntimeError(
"pipeline failed to enter PLAYING state")
141 def transform_arrays(input_arrays, elemfunc, name, rate = 1, **elemfunc_kwargs):
142 input_arrays = list(input_arrays)
145 pipeline = Gst.Pipeline(name = name)
147 head = pipeparts.mkgeneric(pipeline,
None,
"appsrc", caps = pipeio.caps_from_array(input_arrays[0], rate = rate))
148 def need_data(elem, arg, (input_arrays, rate)):
150 arr = input_arrays.pop(0)
151 elem.set_property(
"caps", pipeio.caps_from_array(arr, rate))
152 buf = pipeio.audio_buffer_from_array(arr, 0, 0, rate)
153 elem.emit(
"push-buffer", pipeio.audio_buffer_from_array(arr, 0, 0, rate))
154 return Gst.FlowReturn.OK
156 elem.emit(
"end-of-stream")
157 return Gst.FlowReturn.EOS
158 head.connect(
"need-data", need_data, (input_arrays, rate))
160 head = elemfunc(pipeline, head, **elemfunc_kwargs)
162 head = pipeparts.mkappsink(pipeline, head)
163 def appsink_get_array(elem, output_arrays):
164 output_arrays.append(pipeio.array_from_audio_sample(elem.emit(
"pull-sample")))
165 return Gst.FlowReturn.OK
167 head.connect(
"new-sample", appsink_get_array, output_arrays)
168 build_and_run((
lambda *args, **kwargs: pipeline), name)