31 gi.require_version(
'Gst',
'1.0')
32 gi.require_version(
'GstAudio',
'1.0')
33 gi.require_version(
'GstBase',
'1.0')
34 from gi.repository
import GObject
35 from gi.repository
import Gst
36 from gi.repository
import GstAudio
37 from gi.repository
import GstBase
40 GObject.threads_init()
44 __author__ =
"Kipp Cannon <kipp.cannon@ligo.org>" 75 A function to nicely format a timestamp for printing 77 if timestamp
is None or timestamp == Gst.CLOCK_TIME_NONE:
79 return "%d.%09d s" % (timestamp // Gst.SECOND, timestamp % Gst.SECOND)
82 def gst_buffer_flag_is_set(buf, flags):
85 return buf.mini_object.flags & flags == flags
90 Gstreamer element that will verify that the timestamps agree with 91 incoming buffers based on tracking the buffer offsets. 94 "Timestamp Checker Pass-Through Element",
96 "Checks that timestamps and offsets of audio streams advance as expected and remain synchronized to each other",
105 GObject.TYPE_BOOLEAN,
107 "Only report errors.",
109 GObject.ParamFlags.READWRITE
114 "Number of nanoseconds of timestamp<-->offset discrepancy to accept before reporting it. Timestamp<-->offset discrepancies of 1/2 a sample or more are always reported.",
115 0, GObject.G_MAXINT64, 1,
116 GObject.ParamFlags.READWRITE
121 Gst.PadTemplate.new(
"sink",
122 Gst.PadDirection.SINK,
123 Gst.PadPresence.ALWAYS,
124 Gst.caps_from_string(
126 "rate = " + GstAudio.AUDIO_RATE_RANGE +
", " +
127 "channels = " + GstAudio.AUDIO_CHANNELS_RANGE +
", " +
128 "format = (string) { Z64LE, Z64BE, Z128LE, Z128BE }, " +
129 "layout = (string) interleaved, " +
130 "channel-mask = (bitmask) 0; " +
132 "rate = " + GstAudio.AUDIO_RATE_RANGE +
", " +
133 "channels = " + GstAudio.AUDIO_CHANNELS_RANGE +
", " +
134 "format = " + GstAudio.AUDIO_FORMATS_ALL +
", " +
135 "layout = (string) interleaved, " +
136 "channel-mask = (bitmask) 0" 139 Gst.PadTemplate.new(
"src",
140 Gst.PadDirection.SRC,
141 Gst.PadPresence.ALWAYS,
142 Gst.caps_from_string(
144 "rate = " + GstAudio.AUDIO_RATE_RANGE +
", " +
145 "channels = " + GstAudio.AUDIO_CHANNELS_RANGE +
", " +
146 "format = (string) { Z64LE, Z64BE, Z128LE, Z128BE }, " +
147 "layout = (string) interleaved, " +
148 "channel-mask = (bitmask) 0; " +
150 "rate = " + GstAudio.AUDIO_RATE_RANGE +
", " +
151 "channels = " + GstAudio.AUDIO_CHANNELS_RANGE +
", " +
152 "format = " + GstAudio.AUDIO_FORMATS_ALL +
", " +
153 "layout = (string) interleaved, " +
154 "channel-mask = (bitmask) 0" 161 super(lal_checktimestamps, self).
__init__()
162 self.set_passthrough(
True)
167 def do_set_property(self, prop, val):
168 if prop.name ==
"silent":
170 elif prop.name ==
"timestamp-fuzz":
173 raise AttributeError(prop.name)
176 def do_get_property(self, prop):
177 if prop.name ==
"silent":
179 if prop.name ==
"timestamp-fuzz":
181 raise AttributeError(prop.name)
184 def do_set_caps(self, incaps, outcaps):
185 info = GstAudio.AudioInfo()
186 if info.from_caps(incaps):
190 s = incaps.get_structure(0)
191 if not s
or s.get_name() !=
"audio/x-raw":
193 success, chnls = s.get_int(
"channels")
195 success, rate = s.get_int(
"rate")
198 fmt = s.get_string(
"format")
203 elif fmt ==
"Z128LE":
218 def check_time_offset_mismatch(self, buf):
221 if buf.offset != expected_offset:
222 print >>sys.stderr,
"%s: timestamp/offset mismatch%s: got offset %d, buffer timestamp %s corresponds to offset %d (error = %d samples)" % (self.get_property(
"name"), (
" at discontinuity" if gst_buffer_flag_is_set(buf, Gst.BufferFlags.DISCONT)
else ""), buf.offset,
printable_timestamp(buf.pts), expected_offset, buf.offset - expected_offset)
224 print >>sys.stderr,
"%s: timestamp/offset mismatch%s: got timestamp %s, buffer offset %d corresponds to timestamp %s (error = %d ns)" % (self.get_property(
"name"), (
" at discontinuity" if gst_buffer_flag_is_set(buf, Gst.BufferFlags.DISCONT)
else ""),
printable_timestamp(buf.pts), buf.offset,
printable_timestamp(expected_timestamp), buf.pts - expected_timestamp)
227 def do_transform_ip(self, buf):
228 if self.
t0 is None or gst_buffer_flag_is_set(buf, Gst.BufferFlags.DISCONT):
231 print >>sys.stderr,
"%s: initial timestamp = %s, offset = %d" % (self.get_property(
"name"),
printable_timestamp(buf.pts), buf.offset)
232 elif gst_buffer_flag_is_set(buf, Gst.BufferFlags.DISCONT):
255 print >>sys.stderr,
"%s: got offset %d expected %d (discont flag is %s)" % (self.get_property(
"name"), buf.offset, self.
next_offset,
"set" if gst_buffer_flag_is_set(buf, Gst.BufferFlags.DISCONT)
else "not set")
267 length = buf.offset_end - buf.offset
268 allowed_sizes = [length * self.
unit_size]
269 if gst_buffer_flag_is_set(buf, Gst.BufferFlags.GAP):
270 allowed_sizes.append(0)
271 if buf.get_size()
not in allowed_sizes:
272 print >>sys.stderr,
"%s: got buffer size %d, buffer length %d corresponds to size %d" % (self.get_property(
"name"), buf.get_size(), length, length * self.
unit_size)
285 return Gst.FlowReturn.OK
293 GObject.type_register(lal_checktimestamps)
295 __gstelementfactory__ = (
296 lal_checktimestamps.__name__,
def printable_timestamp(timestamp)
A function to nicely format a timestamp for printing.
Gstreamer element that will verify that the timestamps agree with incoming buffers based on tracking ...
def check_time_offset_mismatch(self, buf)