34 gi.require_version(
'Gst',
'1.0')
35 from gi.repository
import GObject
36 from gi.repository
import Gst
37 GObject.threads_init()
41 from glue
import iterutils
42 from ligo
import segments
43 from gstlal
import pipeio
44 from lal
import LIGOTimeGPS
45 from lal.utils
import CacheEntry
48 if sys.byteorder ==
"little":
54 __author__ =
"Kipp Cannon <kipp.cannon@ligo.org>, Chad Hanna <chad.hanna@ligo.org>, Drew Keppel <drew.keppel@ligo.org>" 88 def mkgeneric(pipeline, src, elem_type_name, **properties):
89 if "name" in properties:
90 elem = Gst.ElementFactory.make(elem_type_name, properties.pop(
"name"))
92 elem = Gst.ElementFactory.make(elem_type_name,
None)
93 for name, value
in properties.items():
94 elem.set_property(name.replace(
"_",
"-"), value)
96 if isinstance(src, Gst.Pad):
97 src.get_parent_element().link_pads(src, elem,
None)
110 A class that manages the task of watching for and connecting to new 111 source pads by name. The inputs are an element, the name of the 112 source pad to watch for on that element, and the sink pad (on a 113 different element) to which the source pad should be linked when it 116 The "pad-added" signal of the element will be used to watch for new 117 pads, and if the "no-more-pads" signal is emitted by the element 118 before the requested pad has appeared ValueError is raised. 120 def __init__(self, element, srcpadname, sinkpad):
121 no_more_pads_handler_id = element.connect(
"no-more-pads", self.
no_more_pads, srcpadname)
122 assert no_more_pads_handler_id > 0
123 pad_added_data = [srcpadname, sinkpad, no_more_pads_handler_id]
124 pad_added_handler_id = element.connect(
"pad-added", self.
pad_added, pad_added_data)
125 assert pad_added_handler_id > 0
126 pad_added_data.append(pad_added_handler_id)
129 def pad_added(element, pad, (srcpadname, sinkpad, no_more_pads_handler_id, pad_added_handler_id)):
130 if pad.get_name() == srcpadname:
131 element.handler_disconnect(no_more_pads_handler_id)
132 element.handler_disconnect(pad_added_handler_id)
136 def no_more_pads(element, srcpadname):
137 raise ValueError(
"<%s>: no pad named '%s'" % (element.get_name(), srcpadname))
148 Connect a handler for the pad-added signal of the 149 framecpp_channeldemux element elem, and when a pad is added 150 to the element if the pad's name appears as a key in the 151 units_dict dictionary that pad's units property will be set 152 to the string value associated with that key in the 157 >>> framecpp_channeldemux_set_units(elem, {"H1:LSC-STRAIN": "strain"}) 159 NOTE: this is a work-around to address the problem that 160 most (all?) frame files do not have units set on their 161 channel data, whereas downstream consumers of the data 162 might require information about the units. The demuxer 163 provides the units as part of a tag event, and 164 framecpp_channeldemux_set_units() can be used to override 165 the values, thereby correcting absent or incorrect units 173 def pad_added(element, pad, units_dict):
174 name = pad.get_name()
175 if name
in units_dict:
176 pad.set_property(
"units", units_dict[name])
181 Utility to watch for missing data. Pad probes are used to collect 182 the times spanned by buffers, these are compared to a segment list 183 defining the intervals of data the stream is required to have. If 184 any intervals of data are found to have been skipped or if EOS is 185 seen before the end of the segment list then a ValueError exception 188 There are two ways to use this tool. To directly install a segment 189 list monitor on a single pad use the .set_probe() class method. 190 For elements with dynamic pads, the class can be allowed to 191 automatically add monitors to pads as they become available by 192 using the element's pad-added signal. In this case initialize an 193 instance of the class with the element and a dictionary of segment 194 lists mapping source pad name to the segment list to check that 195 pad's output against. 197 In both cases a jitter parameter sets the maximum size of a skipped 198 segment that will be ignored (for example, to accomodate round-off 199 error in element timestamp computations). The default is 1 ns. 210 def __init__(self, elem, seglists, jitter = LIGOTimeGPS(0, 1)):
218 def pad_added(self, element, pad, seglists):
219 name = pad.get_name()
227 def set_probe(cls, pad, seglist, jitter = LIGOTimeGPS(0, 1)):
229 seglist = segments.segmentlist(seglist)
231 data = [seglist, jitter,
None]
233 probe_id = data[2] = pad.add_probe(Gst.PadProbeType.DATA_DOWNSTREAM, cls.
probe, data)
237 def probe(pad, probeinfo, (seglist, jitter, probe_id)):
238 if probeinfo.type & Gst.PadProbeType.BUFFER:
239 obj = probeinfo.get_buffer()
240 if not obj.mini_object.flags & Gst.BufferFlags.GAP:
243 seglist -= segments.segmentlist([segments.segment((LIGOTimeGPS(0, obj.pts), LIGOTimeGPS(0, obj.pts + obj.duration)))])
246 iterutils.inplace_filter(
lambda seg: abs(seg) > jitter, seglist)
249 preceding = segments.segment((segments.NegInfinity, LIGOTimeGPS(0, obj.pts)))
250 if seglist.intersects_segment(preceding):
251 raise ValueError(
"%s: detected missing data: %s" % (pad.get_name(), seglist & segments.segmentlist([preceding])))
252 elif probeinfo.type & Gst.PadProbeType.EVENT_DOWNSTREAM
and probeinfo.get_event().type == Gst.EventType.EOS:
254 pad.remove_probe(probe_id)
257 iterutils.inplace_filter(
lambda seg: abs(seg) > jitter, seglist)
259 raise ValueError(
"%s: at EOS detected missing data: %s" % (pad.get_name(), seglist))
272 >>> filesinkelem.connect("notify::timestamp", framecpp_filesink_ldas_path_handler, (".", 5)) 275 timestamp = elem.get_property(
"timestamp") // Gst.SECOND
278 leading_digits = timestamp // 10**int(math.log10(timestamp) + 1 - dir_digits)
281 instrument = elem.get_property(
"instrument")
282 frame_type = elem.get_property(
"frame-type")
285 path = os.path.join(outpath,
"%s-%s-%d" % (instrument, frame_type, leading_digits))
286 if not os.path.exists(path):
288 elem.set_property(
"path", path)
293 Translate an element message posted by the multifilesink element 294 inside a framecpp_filesink bin into a lal.utils.CacheEntry object 295 describing the file being written by the multifilesink element. 298 start = LIGOTimeGPS(0, message.get_structure()[
"timestamp"])
299 end = start + LIGOTimeGPS(0, message.get_structure()[
"duration"])
303 parent = message.src.get_parent()
306 return CacheEntry(parent.get_property(
"instrument"), parent.get_property(
"frame-type"), segments.segment(start, end),
"file://localhost%s" % os.path.abspath(message.get_structure()[
"filename"]))
318 def mkchannelgram(pipeline, src, **properties):
319 return mkgeneric(pipeline, src,
"lal_channelgram", **properties)
322 def mkspectrumplot(pipeline, src, **properties):
323 return mkgeneric(pipeline, src,
"lal_spectrumplot", **properties)
326 def mkhistogram(pipeline, src):
327 return mkgeneric(pipeline, src,
"lal_histogramplot")
331 def mksegmentsrc(pipeline, segment_list, blocksize = 4096 * 1 * 1, invert_output = False):
334 return mkgeneric(pipeline,
None,
"lal_segmentsrc", blocksize = blocksize, segment_list = segments.segmentlist(segments.segment(a.ns(), b.ns())
for a, b
in segment_list), invert_output = invert_output)
339 return mkgeneric(pipeline,
None,
"lal_cachesrc", location = location, use_mmap = use_mmap, **properties)
342 def mklvshmsrc(pipeline, shm_name, **properties):
343 return mkgeneric(pipeline,
None,
"gds_lvshmsrc", shm_name = shm_name, **properties)
346 def mkframexmitsrc(pipeline, multicast_group, port, **properties):
347 return mkgeneric(pipeline,
None,
"gds_framexmitsrc", multicast_group = multicast_group, port = port, **properties)
350 def mkigwdparse(pipeline, src, **properties):
351 return mkgeneric(pipeline, src,
"framecpp_igwdparse", **properties)
355 def mkuridecodebin(pipeline, uri, caps = "application/x-igwd-frame,framed=true", **properties):
356 return mkgeneric(pipeline,
None,
"uridecodebin", uri = uri, caps =
None if caps
is None else Gst.Caps.from_string(caps), **properties)
359 def mkframecppchanneldemux(pipeline, src, **properties):
360 return mkgeneric(pipeline, src,
"framecpp_channeldemux", **properties)
363 def mkframecppchannelmux(pipeline, channel_src_map, units = None, seglists = None, **properties):
364 elem = mkgeneric(pipeline,
None,
"framecpp_channelmux", **properties)
365 if channel_src_map
is not None:
366 for channel, src
in channel_src_map.items():
367 for srcpad
in src.srcpads:
368 if srcpad.link(elem.get_request_pad(channel)) == Gst.PadLinkReturn.OK:
370 if units
is not None:
372 if seglists
is not None:
377 def mkframecppfilesink(pipeline, src, message_forward = True, **properties):
378 post_messages = properties.pop(
"post_messages",
True)
379 elem = mkgeneric(pipeline, src,
"framecpp_filesink", message_forward = message_forward, **properties)
383 elem.get_by_name(
"multifilesink").set_property(
"post-messages", post_messages)
388 def mkmultifilesink(pipeline, src, next_file = 0, sync = False, async = False, **properties):
389 return mkgeneric(pipeline, src,
"multifilesink", next_file = next_file, sync = sync, async = async, **properties)
392 def mkndssrc(pipeline, host, instrument, channel_name, channel_type, blocksize = 16384 * 8 * 1, port = 31200):
395 return mkgeneric(pipeline,
None,
"ndssrc", blocksize = blocksize, port = port, host = host, channel_name =
"%s:%s" % (instrument, channel_name), channel_type = channel_type)
400 return mkgeneric(pipeline, src,
"capsfilter", caps = Gst.Caps.from_string(caps))
405 return mkgeneric(pipeline, src,
"capssetter", caps = Gst.Caps.from_string(caps), **properties)
410 return mkgeneric(pipeline, src,
"lal_statevector", **properties)
415 return mkgeneric(pipeline, src,
"taginject", tags = tags)
420 return mkgeneric(pipeline,
None,
"audiotestsrc", **properties)
424 def mkfakesrc(pipeline, instrument, channel_name, blocksize = None, volume = 1e-20, is_live = False, wave = 9, rate = 16384, **properties):
425 if blocksize
is None:
428 blocksize = 1 * rate * 8
429 return mktaginject(pipeline,
mkcapsfilter(pipeline,
mkaudiotestsrc(pipeline, samplesperbuffer = blocksize / 8, wave = wave, volume = volume, is_live = is_live, **properties),
"audio/x-raw, format=F64%s, rate=%d" % (BYTE_ORDER, rate)),
"instrument=%s,channel-name=%s,units=strain" % (instrument, channel_name))
434 properties.update((name, val)
for name, val
in ((
"kernel", kernel), (
"latency", latency))
if val
is not None)
435 return mkgeneric(pipeline, src,
"audiofirfilter", **properties)
442 return mkgeneric(pipeline, src,
"audioiirfilter", a = a, b = b)
447 return mkgeneric(pipeline, src,
"lal_shift", **properties)
450 def mkfakeLIGOsrc(pipeline, location = None, instrument = None, channel_name = None, blocksize = 16384 * 8 * 1):
451 properties = {
"blocksize": blocksize}
452 properties.update((name, val)
for name, val
in ((
"instrument", instrument), (
"channel_name", channel_name))
if val
is not None)
453 return mkgeneric(pipeline,
None,
"lal_fakeligosrc", **properties)
456 def mkfakeadvLIGOsrc(pipeline, location = None, instrument = None, channel_name = None, blocksize = 16384 * 8 * 1):
457 properties = {
"blocksize": blocksize}
458 properties.update((name, val)
for name, val
in ((
"instrument", instrument), (
"channel_name", channel_name))
if val
is not None)
459 return mkgeneric(pipeline,
None,
"lal_fakeadvligosrc", **properties)
462 def mkfakeadvvirgosrc(pipeline, location = None, instrument = None, channel_name = None, blocksize = 16384 * 8 * 1):
463 properties = {
"blocksize": blocksize}
464 if instrument
is not None:
465 properties[
"instrument"] = instrument
466 if channel_name
is not None:
467 properties[
"channel_name"] = channel_name
468 return mkgeneric(pipeline,
None,
"lal_fakeadvvirgosrc", **properties)
473 return mkgeneric(pipeline, src,
"progressreport", do_query =
False, name = name)
478 return mkgeneric(pipeline, src,
"lal_simulation", xml_location = filename)
483 return mkgeneric(pipeline, src,
"audiochebband", lower_frequency = lower_frequency, upper_frequency = upper_frequency, poles = poles)
488 return mkgeneric(pipeline, src,
"audiocheblimit", cutoff = cutoff, mode = mode, poles = poles, type = type, ripple = ripple)
493 return mkgeneric(pipeline, src,
"audioamplify", clipping_method = 3, amplification = amplification)
498 return mkgeneric(pipeline, src,
"lal_audioundersample")
503 return mkgeneric(pipeline, src,
"audioresample", **properties)
508 return mkgeneric(pipeline, src,
"lal_interpolator", **properties)
512 def mkwhiten(pipeline, src, psd_mode = 0, zero_pad = 0, fft_length = 8, average_samples = 64, median_samples = 7, **properties):
513 return mkgeneric(pipeline, src,
"lal_whiten", psd_mode = psd_mode, zero_pad = zero_pad, fft_length = fft_length, average_samples = average_samples, median_samples = median_samples, **properties)
518 return mkgeneric(pipeline, src,
"tee")
522 def mkadder(pipeline, srcs, sync = True, mix_mode = "sum", **properties):
523 elem = mkgeneric(pipeline,
None,
"lal_adder", sync = sync, mix_mode = mix_mode, **properties)
531 def mkmultiplier(pipeline, srcs, sync = True, mix_mode = "product", **properties):
532 return mkadder(pipeline, srcs, sync = sync, mix_mode = mix_mode, **properties)
537 return mkgeneric(pipeline, src,
"queue", **properties)
541 def mkdrop(pipeline, src, drop_samples = 0):
542 return mkgeneric(pipeline, src,
"lal_drop", drop_samples = drop_samples)
547 return mkgeneric(pipeline, src,
"lal_nofakedisconts", silent = silent)
551 def mkfirbank(pipeline, src, latency = None, fir_matrix = None, time_domain = None, block_stride = None):
552 properties = dict((name, value)
for name, value
in zip((
"latency",
"fir_matrix",
"time_domain",
"block_stride"), (latency, fir_matrix, time_domain, block_stride))
if value
is not None)
553 return mkgeneric(pipeline, src,
"lal_firbank", **properties)
556 def mktdwhiten(pipeline, src, latency = None, kernel = None, taper_length = None):
559 if taper_length
is None and kernel
is not None:
560 taper_length = len(kernel) // 4
561 properties = dict((name, value)
for name, value
in zip((
"latency",
"kernel",
"taper_length"), (latency, kernel, taper_length))
if value
is not None)
562 return mkgeneric(pipeline, src,
"lal_tdwhiten", **properties)
565 def mkiirbank(pipeline, src, a1, b0, delay, name=None):
566 properties = dict((name, value)
for name, value
in ((
"name", name), (
"delay_matrix", delay))
if value
is not None)
568 properties[
"a1_matrix"] = pipeio.repack_complex_array_to_real(a1)
570 properties[
"b0_matrix"] = pipeio.repack_complex_array_to_real(b0)
571 elem = mkgeneric(pipeline, src,
"lal_iirbank", **properties)
576 def mkcudaiirbank(pipeline, src, a1, b0, delay, name=None):
577 properties = dict((name, value)
for name, value
in ((
"name", name), (
"delay_matrix", delay))
if value
is not None)
579 properties[
"a1_matrix"] = pipeio.repack_complex_array_to_real(a1)
581 properties[
"b0_matrix"] = pipeio.repack_complex_array_to_real(b0)
582 elem = mkgeneric(pipeline, src,
"cuda_iirbank", **properties)
587 def mkcudamultiratespiir(pipeline, src, bank_struct, bank_id=0, name=None):
588 properties = dict((name, value)
for name, value
in ((
"name", name), (
"spiir_bank", bank_struct), (
"bank_id", bank_id))
if value
is not None)
589 elem = mkgeneric(pipeline, src,
"cuda_multiratespiir", **properties)
593 def mktrim(pipeline, src, initial_offset = None, final_offset = None, inverse = None):
594 properties = dict((name, value)
for name, value
in zip((
"initial-offset",
"final-offset",
"inverse"), (initial_offset,final_offset,inverse))
if value
is not None)
595 return mkgeneric(pipeline, src,
"lal_trim", **properties)
598 def mkmean(pipeline, src, **properties):
599 return mkgeneric(pipeline, src,
"lal_mean", **properties)
602 def mkabs(pipeline, src, **properties):
603 return mkgeneric(pipeline, src,
"abs", **properties)
606 def mkpow(pipeline, src, **properties):
607 return mkgeneric(pipeline, src,
"pow", **properties)
612 return mkgeneric(pipeline, src,
"lal_reblock", **properties)
617 if weights
is not None:
618 return mkgeneric(pipeline, src,
"lal_sumsquares", weights = weights)
620 return mkgeneric(pipeline, src,
"lal_sumsquares")
624 def mkgate(pipeline, src, threshold = None, control = None, **properties):
625 if threshold
is not None:
626 elem = mkgeneric(pipeline,
None,
"lal_gate", threshold = threshold, **properties)
628 elem = mkgeneric(pipeline,
None,
"lal_gate", **properties)
629 for peer, padname
in ((src,
"sink"), (control,
"control")):
630 if isinstance(peer, Gst.Pad):
631 peer.get_parent_element().link_pads(peer, elem, padname)
632 elif peer
is not None:
633 peer.link_pads(
None, elem, padname)
637 def mkbitvectorgen(pipeline, src, bit_vector, **properties):
638 return mkgeneric(pipeline, src,
"lal_bitvectorgen", bit_vector = bit_vector, **properties)
643 if matrix
is not None:
644 return mkgeneric(pipeline, src,
"lal_matrixmixer", matrix = matrix)
646 return mkgeneric(pipeline, src,
"lal_matrixmixer")
651 return mkgeneric(pipeline, src,
"lal_togglecomplex")
655 def mkautochisq(pipeline, src, autocorrelation_matrix = None, mask_matrix = None, latency = 0, snr_thresh=0):
657 if autocorrelation_matrix
is not None:
659 "autocorrelation_matrix": pipeio.repack_complex_array_to_real(autocorrelation_matrix),
661 "snr_thresh": snr_thresh
663 if mask_matrix
is not None:
664 properties[
"autocorrelation_mask_matrix"] = mask_matrix
665 return mkgeneric(pipeline, src,
"lal_autochisq", **properties)
670 return mkgeneric(pipeline, src,
"fakesink", sync =
False, async =
False)
674 def mkfilesink(pipeline, src, filename, sync = False, async = False):
675 return mkgeneric(pipeline, src,
"filesink", sync = sync, async = async, buffer_mode = 2, location = filename)
680 if segment
is not None:
681 elem = mkgeneric(pipeline, src,
"lal_nxydump", start_time = segment[0].ns(), stop_time = segment[1].ns())
683 elem = mkgeneric(pipeline, src,
"lal_nxydump")
687 def mknxydumpsinktee(pipeline, src, *args, **properties):
688 t =
mktee(pipeline, src)
693 def mkblcbctriggergen(pipeline, snr, chisq, template_bank_filename, snr_threshold, sigmasq):
696 elem = mkgeneric(pipeline, snr,
"lal_blcbctriggergen", bank_filename = template_bank_filename, snr_thresh = snr_threshold, sigmasq = sigmasq)
701 def mktriggergen(pipeline, snr, chisq, template_bank_filename, snr_threshold, sigmasq):
704 elem = mkgeneric(pipeline, snr,
"lal_triggergen", bank_filename = template_bank_filename, snr_thresh = snr_threshold, sigmasq = sigmasq)
709 def mktriggerxmlwritersink(pipeline, src, filename):
710 return mkgeneric(pipeline, src,
"lal_triggerxmlwriter", sync =
False, async =
False, location = filename)
715 return mkgeneric(pipeline, src,
"wavenc")
720 return mkgeneric(pipeline, src,
"vorbisenc")
723 def mkcolorspace(pipeline, src):
724 return mkgeneric(pipeline, src,
"ffmpegcolorspace")
727 def mktheoraenc(pipeline, src, **properties):
728 return mkgeneric(pipeline, src,
"theoraenc", **properties)
731 def mkmpeg4enc(pipeline, src, **properties):
732 return mkgeneric(pipeline, src,
"ffenc_mpeg4", **properties)
735 def mkoggmux(pipeline, src):
736 return mkgeneric(pipeline, src,
"oggmux")
739 def mkavimux(pipeline, src):
740 return mkgeneric(pipeline, src,
"avimux")
745 elem = mkgeneric(pipeline, src,
"audioconvert")
746 if caps_string
is not None:
753 return mkgeneric(pipeline, src,
"audiorate", **properties)
757 def mkflacenc(pipeline, src, quality = 0, **properties):
758 return mkgeneric(pipeline, src,
"flacenc", quality = quality, **properties)
761 def mkogmvideosink(pipeline, videosrc, filename, audiosrc = None, verbose = False):
762 src = mkcolorspace(pipeline, videosrc)
763 src =
mkcapsfilter(pipeline, src,
"video/x-raw-yuv, format=(fourcc)I420")
764 src = mktheoraenc(pipeline, src, border = 2, quality = 48, quick =
False)
765 src = mkoggmux(pipeline, src)
766 if audiosrc
is not None:
773 def mkvideosink(pipeline, src):
774 return mkgeneric(pipeline, mkcolorspace(pipeline, src),
"autovideosink")
779 return mkgeneric(pipeline,
mkqueue(pipeline, src),
"autoaudiosink")
782 def mkplaybacksink(pipeline, src, amplification = 0.1):
784 Gst.ElementFactory.make(
"audioconvert",
None),
785 Gst.ElementFactory.make(
"capsfilter",
None),
786 Gst.ElementFactory.make(
"audioamplify",
None),
787 Gst.ElementFactory.make(
"audioconvert",
None),
788 Gst.ElementFactory.make(
"queue",
None),
789 Gst.ElementFactory.make(
"autoaudiosink",
None)
791 elems[1].set_property(
"caps", Gst.Caps.from_string(
"audio/x-raw, format=F32%s" % BYTE_ORDER))
792 elems[2].set_property(
"amplification", amplification)
793 elems[4].set_property(
"max-size-time", 1 * Gst.SECOND)
795 Gst.element_link_many(src, *elems)
798 def mkdeglitcher(pipeline, src, segment_list):
799 return mkgeneric(pipeline, src,
"lal_deglitcher", segment_list = segments.segmentlist(segments.segment(a.ns(), b.ns())
for a, b
in segment_list))
805 def mkappsink(pipeline, src, max_buffers = 1, drop = False, sync = False, async = False, **properties):
806 return mkgeneric(pipeline, src,
"appsink", sync = sync, async = async, emit_signals =
True, max_buffers = max_buffers, drop = drop, **properties)
810 def __init__(self, appsink_new_buffer, appsinks = []):
811 self.
lock = threading.Lock()
821 for elem
in appsinks:
824 def add_sink(self, pipeline, src, drop = False, **properties):
825 return self.
attach(
mkappsink(pipeline, src, drop = drop, **properties))
829 connect this AppSync's signal handlers to the given appsink 830 element. the element's max-buffers property will be set to 831 1 (required for AppSync to work). 834 raise ValueError(
"duplicate appsinks %s" % repr(appsink))
835 appsink.set_property(
"max-buffers", 1)
837 assert handler_id > 0
839 assert handler_id > 0
840 handler_id = appsink.connect(
"eos", self.
eos_handler)
841 assert handler_id > 0
845 def new_preroll_handler(self, elem):
850 elem.emit(
"pull-preroll")
851 return Gst.FlowReturn.OK
853 def new_sample_handler(self, elem):
858 self.
appsinks[elem] = elem.get_last_sample().get_buffer().pts
862 def eos_handler(self, elem):
871 for internal use. must be called with lock held. 878 timestamps = [(t, e)
for e, t
in self.
appsinks.items()
if e
not in self.
at_eos or t
is not None]
882 return Gst.FlowReturn.EOS
888 timestamp, elem_with_oldest = min(timestamps)
892 if timestamp
is None:
893 return Gst.FlowReturn.OK
898 self.
appsinks[elem_with_oldest] =
None 904 add a signal handler to write a pipeline graph upon receipt of the 905 first trigger buffer. the caps in the pipeline graph are not fully 906 negotiated until data comes out the end, so this version of the graph 907 shows the final formats on all links 909 def __init__(self, pipeline, appsinks, basename, verbose = False):
911 self.
filestem =
"%s.%s" % (basename,
"TRIGGERS")
916 for sink
in appsinks:
920 def execute(self, elem):
925 elem.disconnect(handler_id)
926 return Gst.FlowReturn.OK
929 def mkchecktimestamps(pipeline, src, name = None, silent = True, timestamp_fuzz = 1):
930 return mkgeneric(pipeline, src,
"lal_checktimestamps", name = name, silent = silent, timestamp_fuzz = timestamp_fuzz)
935 return mkgeneric(pipeline, src,
"lal_peak", n = n)
938 def mkitac(pipeline, src, n, bank, autocorrelation_matrix = None, mask_matrix = None, snr_thresh = 0, sigmasq = None):
941 "bank_filename": bank,
942 "snr_thresh": snr_thresh
944 if autocorrelation_matrix
is not None:
945 properties[
"autocorrelation_matrix"] = pipeio.repack_complex_array_to_real(autocorrelation_matrix)
946 if mask_matrix
is not None:
947 properties[
"autocorrelation_mask"] = mask_matrix
948 if sigmasq
is not None:
949 properties[
"sigmasq"] = sigmasq
950 return mkgeneric(pipeline, src,
"lal_itac", **properties)
952 def mktrigger(pipeline, src, n, autocorrelation_matrix = None, mask_matrix = None, snr_thresh = 0, sigmasq = None, max_snr = False):
955 "snr_thresh": snr_thresh,
958 if autocorrelation_matrix
is not None:
959 properties[
"autocorrelation_matrix"] = pipeio.repack_complex_array_to_real(autocorrelation_matrix)
960 if mask_matrix
is not None:
961 properties[
"autocorrelation_mask"] = mask_matrix
962 if sigmasq
is not None:
963 properties[
"sigmasq"] = sigmasq
964 return mkgeneric(pipeline, src,
"lal_trigger", **properties)
966 def mklatency(pipeline, src, name = None, silent = False):
967 return mkgeneric(pipeline, src,
"lal_latency", name = name, silent = silent)
969 def mklhocoherentnull(pipeline, H1src, H2src, H1_impulse, H1_latency, H2_impulse, H2_latency, srate):
970 elem = mkgeneric(pipeline,
None,
"lal_lho_coherent_null", block_stride = srate, H1_impulse = H1_impulse, H2_impulse = H2_impulse, H1_latency = H1_latency, H2_latency = H2_latency)
971 for peer, padname
in ((H1src,
"H1sink"), (H2src,
"H2sink")):
972 if isinstance(peer, Gst.Pad):
973 peer.get_parent_element().link_pads(peer, elem, padname)
974 elif peer
is not None:
975 peer.link_pads(
None, elem, padname)
978 def mkcomputegamma(pipeline, dctrl, exc, cos, sin, **properties):
979 elem = mkgeneric(pipeline,
None,
"lal_compute_gamma", **properties)
980 for peer, padname
in ((dctrl,
"dctrl_sink"), (exc,
"exc_sink"), (cos,
"cos"), (sin,
"sin")):
981 if isinstance(peer, Gst.Pad):
982 peer.get_parent_element().link_pads(peer, elem, padname)
983 elif peer
is not None:
984 peer.link_pads(
None, elem, padname)
987 def mkbursttriggergen(pipeline, src, **properties):
988 return mkgeneric(pipeline, src,
"lal_bursttriggergen", **properties)
990 def mkodctodqv(pipeline, src, **properties):
991 return mkgeneric(pipeline, src,
"lal_odc_to_dqv", **properties)
993 def mktcpserversink(pipeline, src, **properties):
996 return mkgeneric(pipeline, src,
"tcpserversink", sync =
True, sync_method =
"latest-keyframe", recover_policy =
"keyframe", unit_type =
"bytes", units_soft_max = 1024**3, **properties)
1000 """Calculate the output gain of GStreamer's stock audioresample element. 1002 The audioresample element has a frequency response of unity "almost" all the 1003 way up the Nyquist frequency. However, for an input of unit variance 1004 Gaussian noise, the output will have a variance very slighly less than 1. 1005 The return value is the variance that the filter will produce for a given 1006 "quality" setting and sample rate. 1008 @param den The denomenator of the ratio of the input and output sample rates 1009 @param num The numerator of the ratio of the input and output sample rates 1010 @return The variance of the output signal for unit variance input 1012 The following example shows how to apply the correction factor using an 1013 audioamplify element. 1015 >>> from gstlal.pipeutil import * 1016 >>> from gstlal.pipeparts import audioresample_variance_gain 1017 >>> from gstlal import pipeio 1019 >>> nsamples = 2 ** 17 1022 >>> def handoff_handler(element, buffer, pad, (quality, filt_len, num, den)): 1023 ... out_latency = numpy.ceil(float(den) / num * filt_len) 1024 ... buf = pipeio.array_from_audio_buffer(buffer).flatten() 1025 ... std = numpy.std(buf[out_latency:-out_latency]) 1026 ... print "quality=%2d, filt_len=%3d, num=%d, den=%d, stdev=%.2f" % ( 1027 ... quality, filt_len, num, den, std) 1029 >>> for quality in range(11): 1030 ... pipeline = Gst.Pipeline() 1031 ... correction = 1/numpy.sqrt(audioresample_variance_gain(quality, num, den)) 1032 ... elems = mkelems_in_bin(pipeline, 1033 ... ('audiotestsrc', {'wave':'gaussian-noise','volume':1}), 1034 ... ('capsfilter', {'caps':Gst.Caps.from_string('audio/x-raw,format=F64LE,rate=%d' % num)}), 1035 ... ('audioresample', {'quality':quality}), 1036 ... ('capsfilter', {'caps':Gst.Caps.from_string('audio/x-raw,width=F64LE,rate=%d' % den)}), 1037 ... ('audioamplify', {'amplification':correction,'clipping-method':'none'}), 1038 ... ('fakesink', {'signal-handoffs':True, 'num-buffers':1}) 1040 ... filt_len = elems[2].get_property('filter-length') 1041 ... elems[0].set_property('samplesperbuffer', 2 * filt_len + nsamples) 1042 ... if elems[-1].connect_after('handoff', handoff_handler, (quality, filt_len, num, den)) < 1: 1043 ... raise RuntimeError 1045 ... if pipeline.set_state(Gst.State.PLAYING) is not Gst.State.CHANGE_ASYNC: 1046 ... raise RuntimeError 1047 ... if not pipeline.get_bus().poll(Gst.MessageType.EOS, -1): 1048 ... raise RuntimeError 1050 ... if pipeline.set_state(Gst.State.NULL) is not Gst.StateChangeReturn.SUCCESS: 1051 ... raise RuntimeError 1053 quality= 0, filt_len= 8, num=2, den=1, stdev=1.00 1054 quality= 1, filt_len= 16, num=2, den=1, stdev=1.00 1055 quality= 2, filt_len= 32, num=2, den=1, stdev=1.00 1056 quality= 3, filt_len= 48, num=2, den=1, stdev=1.00 1057 quality= 4, filt_len= 64, num=2, den=1, stdev=1.00 1058 quality= 5, filt_len= 80, num=2, den=1, stdev=1.00 1059 quality= 6, filt_len= 96, num=2, den=1, stdev=1.00 1060 quality= 7, filt_len=128, num=2, den=1, stdev=1.00 1061 quality= 8, filt_len=160, num=2, den=1, stdev=1.00 1062 quality= 9, filt_len=192, num=2, den=1, stdev=1.00 1063 quality=10, filt_len=256, num=2, den=1, stdev=1.00 1070 0.7224862140943990596,
1071 0.7975021342935247892,
1072 0.8547537598970208483,
1073 0.8744072146753004704,
1074 0.9075294214410336568,
1075 0.9101523813406768859,
1076 0.9280549396020538744,
1077 0.9391809530012216189,
1078 0.9539276644089494939,
1079 0.9623083437067311285,
1080 0.9684700588501590213
1084 0.7539740617648067467,
1085 0.8270076656536116122,
1086 0.8835072979478705291,
1087 0.8966758456219333651,
1088 0.9253434087537378838,
1089 0.9255866674042573239,
1090 0.9346487800036394900,
1091 0.9415331868209220190,
1092 0.9524608799160205752,
1093 0.9624372769883490220,
1094 0.9704505626409354324
1111 This function needs the environment variable GST_DEBUG_DUMP_DOT_DIR 1112 to be set. The filename will be 1114 os.path.join($GST_DEBUG_DUMP_DOT_DIR, filestem + ".dot") 1116 If verbose is True, a message will be written to stderr. 1118 if "GST_DEBUG_DUMP_DOT_DIR" not in os.environ:
1119 raise ValueError(
"cannot write pipeline, environment variable GST_DEBUG_DUMP_DOT_DIR is not set")
1120 Gst.debug_bin_to_dot_file(pipeline, Gst.DebugGraphDetails.ALL, filestem)
1122 print >>sys.stderr,
"Wrote pipeline to %s" % os.path.join(os.environ[
"GST_DEBUG_DUMP_DOT_DIR"],
"%s.dot" % filestem)
def mkstatevector(pipeline, src, properties)
Adds a lal_statevector element to a pipeline with useful default properties.
def __init__(self, elem, units_dict)
def mkfakesink(pipeline, src)
Adds a fakesink element to a pipeline with useful default properties.
def mknofakedisconts(pipeline, src, silent=True)
Adds a lal_nofakedisconts element to a pipeline with useful default properties.
def mknxydumpsink(pipeline, src, filename, segment=None)
Adds a lal_nxydump element to a pipeline with useful default properties.
def new_sample_handler(self, elem)
def framecpp_filesink_cache_entry_from_mfs_message(message)
def attach(self, appsink)
def mkaudiorate(pipeline, src, properties)
Adds a audiorate element to a pipeline with useful default properties.
def mkmatrixmixer(pipeline, src, matrix=None)
Adds a lal_matrixmixer element to a pipeline with useful default properties.
def mkfirbank(pipeline, src, latency=None, fir_matrix=None, time_domain=None, block_stride=None)
Adds a lal_firbank element to a pipeline with useful default properties.
def mkappsink(pipeline, src, max_buffers=1, drop=False, sync=False, async=False, properties)
Adds a appsink element to a pipeline with useful default properties.
def mkautoaudiosink(pipeline, src)
Adds a autoaudiosink element to a pipeline with useful default properties.
def pad_added(self, element, pad, seglists)
def write_dump_dot(pipeline, filestem, verbose=False)
def mkinterpolator(pipeline, src, properties)
Adds a lal_interpolator element to a pipeline with useful default properties.
def mkresample(pipeline, src, properties)
Adds a audioresample element to a pipeline with useful default properties.
def mkfilesink(pipeline, src, filename, sync=False, async=False)
Adds a filesink element to a pipeline with useful default properties.
def mkreblock(pipeline, src, properties)
Adds a lal_reblock element to a pipeline with useful default properties.
def mktaginject(pipeline, src, tags)
Adds a taginject element to a pipeline with useful default properties.
def mkaudioamplify(pipeline, src, amplification)
Adds a audioamplify element to a pipeline with useful default properties.
def pad_added(element, pad, srcpadname, sinkpad, no_more_pads_handler_id, pad_added_handler_id)
def mkqueue(pipeline, src, properties)
Adds a queue element to a pipeline with useful default properties.
def eos_handler(self, elem)
def mkaudiotestsrc(pipeline, properties)
Adds a audiotestsrc element to a pipeline with useful default properties.
def mkautochisq(pipeline, src, autocorrelation_matrix=None, mask_matrix=None, latency=0, snr_thresh=0)
Adds a lal_autochisq element to a pipeline with useful default properties.
def mkcapssetter(pipeline, src, caps, properties)
Adds a capssetter element to a pipeline with useful default properties.
def mklalcachesrc(pipeline, location, use_mmap=True, properties)
Adds a lal_cachesrc element to a pipeline with useful default properties.
A class that manages the task of watching for and connecting to new source pads by name...
def mkinjections(pipeline, src, filename)
Adds a lal_simulation element to a pipeline with useful default properties.
def mkfakesrc(pipeline, instrument, channel_name, blocksize=None, volume=1e-20, is_live=False, wave=9, rate=16384, properties)
see documentation for mktaginject() mkcapsfilter() and mkaudiotestsrc()
def mkiirfilter(pipeline, src, a, b)
Adds a audioiirfilter element to a pipeline with useful default properties.
def mkaudiocheblimit(pipeline, src, cutoff, mode=0, poles=8, type=1, ripple=0.25)
Adds a audiocheblimit element to a pipeline with useful default properties.
def set_probe(cls, pad, seglist, jitter=LIGOTimeGPS(0, 1))
def mkwhiten(pipeline, src, psd_mode=0, zero_pad=0, fft_length=8, average_samples=64, median_samples=7, properties)
Adds a lal_whiten element to a pipeline with useful default properties.
def mkfirfilter(pipeline, src, kernel, latency, properties)
Adds a audiofirfilter element to a pipeline with useful default properties.
def mkuridecodebin(pipeline, uri, caps="application/x-igwd-frame, framed=true", properties)
Adds a uridecodebin element to a pipeline with useful default properties.
def mkprogressreport(pipeline, src, name)
Adds a progress_report element to a pipeline with useful default properties.
def new_preroll_handler(self, elem)
def mkpeak(pipeline, src, n)
Adds a lal_peak element to a pipeline with useful default properties.
def audioresample_variance_gain(quality, num, den)
def mkaudiochebband(pipeline, src, lower_frequency, upper_frequency, poles=8)
Adds a audiochebband element to a pipeline with useful default properties.
def mkmultiplier(pipeline, srcs, sync=True, mix_mode="product", properties)
Adds a lal_adder element to a pipeline configured for synchronous "product" mode mixing.
def mkaudioconvert(pipeline, src, caps_string=None)
Adds a audioconvert element to a pipeline with useful default properties.
def mkflacenc(pipeline, src, quality=0, properties)
Adds a flacenc element to a pipeline with useful default properties.
def no_more_pads(element, srcpadname)
def mkvorbisenc(pipeline, src)
Adds a vorbisenc element to a pipeline with useful default properties.
def mktogglecomplex(pipeline, src)
Adds a lal_togglecomplex element to a pipeline with useful default properties.
def mkadder(pipeline, srcs, sync=True, mix_mode="sum", properties)
Adds a lal_adder element to a pipeline configured for synchronous "sum" mode mixing.
def framecpp_filesink_ldas_path_handler(elem, pspec, outpath, dir_digits)
def mkgate(pipeline, src, threshold=None, control=None, properties)
Adds a lal_gate element to a pipeline with useful default properties.
def mkwavenc(pipeline, src)
Adds a wavenc element to a pipeline with useful default properties.
def mkmultifilesink(pipeline, src, next_file=0, sync=False, async=False, properties)
Adds a multifilesink element to a pipeline with useful default properties.
def mkaudioundersample(pipeline, src)
Adds a lal_audioundersample element to a pipeline with useful default properties. ...
def mkdrop(pipeline, src, drop_samples=0)
Adds a lal_whiten element to a pipeline with useful default properties.
def mksumsquares(pipeline, src, weights=None)
Adds a lal_sumsquares element to a pipeline with useful default properties.
def mksegmentsrc(pipeline, segment_list, blocksize=4096 *1 *1, invert_output=False)
Adds a lal_segmentsrc element to a pipeline with useful default properties.
def mkshift(pipeline, src, properties)
Adds a lal_shift element to a pipeline with useful default properties.
def pad_added(element, pad, units_dict)
def probe(pad, probeinfo, seglist, jitter, probe_id)
def mktee(pipeline, src)
Adds a tee element to a pipeline with useful default properties.
def mkcapsfilter(pipeline, src, caps)
Adds a capsfilter element to a pipeline with useful default properties.
def pull_buffers(self, elem)