gstlal  1.4.1
datasource.py
1 # Copyright (C) 2009--2013 Kipp Cannon, Chad Hanna, Drew Keppel
2 #
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License as published by the
5 # Free Software Foundation; either version 2 of the License, or (at your
6 # option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
11 # Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 
17 
18 #
19 # =============================================================================
20 #
21 # Preamble
22 #
23 # =============================================================================
24 #
25 
26 __doc__ = """
27 
28 A file that contains the datasource module code
29 
30 *Review Status*
31 
32 +-------------------------------------------------+------------------------------------------+------------+
33 | Names | Hash | Date |
34 +=================================================+==========================================+============+
35 | Florent, Sathya, Duncan Me., Jolien, Kipp, Chad | b3ef077fe87b597578000f140e4aa780f3a227aa | 2014-05-01 |
36 +-------------------------------------------------+------------------------------------------+------------+
37 
38 """
39 
40 
41 import optparse
42 import sys
43 import time
44 
45 import gi
46 gi.require_version('Gst', '1.0')
47 from gi.repository import GObject
48 from gi.repository import Gst
49 GObject.threads_init()
50 Gst.init(None)
51 
52 from gstlal import bottle
53 from gstlal import pipeparts
54 from glue.ligolw import utils as ligolw_utils
55 from glue.ligolw.utils import segments as ligolw_segments
56 from ligo import segments
57 import lal
58 from lal import LIGOTimeGPS
59 
60 
61 #
62 # Misc useful functions
63 #
64 
65 
66 def channel_dict_from_channel_list(channel_list):
67  """
68  Given a list of channels, produce a dictionary keyed by ifo of channel names:
69 
70  The list here typically comes from an option parser with options that
71  specify the "append" action.
72 
73  Examples:
74 
75  >>> channel_dict_from_channel_list(["H1=LSC-STRAIN", "H2=SOMETHING-ELSE"])
76  {'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}
77  """
78  return dict(instrument_channel.split("=") for instrument_channel in channel_list)
79 
80 def channel_dict_from_channel_list_with_node_range(channel_list):
81  """
82  Given a list of channels with a range of mass bins, produce a dictionary
83  keyed by ifo of channel names:
84 
85  The list here typically comes from an option parser with options that
86  specify the "append" action.
87 
88  Examples:
89 
90  >>> channel_dict_from_channel_list_with_node_range(["0000:0002:H1=LSC_STRAIN_1,L1=LSC_STRAIN_2", "0002:0004:H1=LSC_STRAIN_3,L1=LSC_STRAIN_4", "0004:0006:H1=LSC_STRAIN_5,L1=LSC_STRAIN_6"])
91  {'0004': {'H1': 'LSC_STRAIN_5', 'L1': 'LSC_STRAIN_6'}, '0005': {'H1': 'LSC_STRAIN_5', 'L1': 'LSC_STRAIN_6'}, '0000': {'H1': 'LSC_STRAIN_1', 'L1': 'LSC_STRAIN_2'}, '0001': {'H1': 'LSC_STRAIN_1', 'L1': 'LSC_STRAIN_2'}, '0002': {'H1': 'LSC_STRAIN_3', 'L1': 'LSC_STRAIN_4'}, '0003': {'H1': 'LSC_STRAIN_3', 'L1': 'LSC_STRAIN_4'}}
92  """
93  outdict = {}
94  for instrument_channel_full in channel_list:
95  instrument_channel_split = instrument_channel_full.split(':')
96  for ii in range(int(instrument_channel_split[0]),int(instrument_channel_split[1])):
97  outdict[str(ii).zfill(4)] = dict((instrument_channel.split("=")) for instrument_channel in instrument_channel_split[2].split(','))
98  return outdict
99 
100 def pipeline_channel_list_from_channel_dict(channel_dict, ifos = None, opt = "channel-name"):
101  """
102  Creates a string of channel names options from a dictionary keyed by ifos.
103 
104  FIXME: This function exists to work around pipeline.py's inability to
105  give the same option more than once by producing a string to pass as an argument
106  that encodes the other instances of the option.
107 
108  - override --channel-name with a different option by setting opt.
109  - restrict the ifo keys to a subset of the channel_dict by
110  setting ifos
111 
112  Examples:
113 
114  >>> pipeline_channel_list_from_channel_dict({'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'})
115  'H2=SOMETHING-ELSE --channel-name=H1=LSC-STRAIN '
116 
117  >>> pipeline_channel_list_from_channel_dict({'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}, ifos=["H1"])
118  'H1=LSC-STRAIN '
119 
120  >>> pipeline_channel_list_from_channel_dict({'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}, opt="test-string")
121  'H2=SOMETHING-ELSE --test-string=H1=LSC-STRAIN '
122  """
123  outstr = ""
124  if ifos is None:
125  ifos = channel_dict.keys()
126  for i, ifo in enumerate(ifos):
127  if i == 0:
128  outstr += "%s=%s " % (ifo, channel_dict[ifo])
129  else:
130  outstr += "--%s=%s=%s " % (opt, ifo, channel_dict[ifo])
131 
132  return outstr
133 
134 def pipeline_channel_list_from_channel_dict_with_node_range(channel_dict, node = 0, ifos = None, opt = "channel-name"):
135  """
136  Creates a string of channel names options from a dictionary keyed by ifos.
137 
138  FIXME: This function exists to work around pipeline.py's inability to
139  give the same option more than once by producing a string to pass as an argument
140  that encodes the other instances of the option.
141 
142  - override --channel-name with a different option by setting opt.
143  - restrict the ifo keys to a subset of the channel_dict by.
144  setting ifos
145 
146  Examples:
147 
148  >>> pipeline_channel_list_from_channel_dict_with_node_range({'0000': {'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}}, node=0)
149  'H2=SOMETHING-ELSE --channel-name=H1=LSC-STRAIN '
150 
151  >>> pipeline_channel_list_from_channel_dict_with_node_range({'0000': {'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}}, node=0, ifos=["H1"])
152  'H1=LSC-STRAIN '
153 
154  >>> pipeline_channel_list_from_channel_dict_with_node_range({'0000': {'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}}, node=0, opt="test-string")
155  'H2=SOMETHING-ELSE --test-string=H1=LSC-STRAIN '
156  """
157  outstr = ""
158  node = str(node).zfill(4)
159  if ifos is None:
160  ifos = channel_dict[node].keys()
161  for i, ifo in enumerate(ifos):
162  if i == 0:
163  outstr += "%s=%s " % (ifo, channel_dict[node][ifo])
164  else:
165  outstr += "--%s=%s=%s " % (opt, ifo, channel_dict[node][ifo])
166 
167  return outstr
168 
169 def injection_dict_from_channel_list_with_node_range(injection_list):
170  """
171  Given a list of injection xml files with a range of mass bins, produce a
172  dictionary keyed by bin number:
173 
174  The list here typically comes from an option parser with options that
175  specify the "append" action.
176 
177  Examples:
178  >>> injection_dict_from_channel_list_with_node_range(["0000:0002:Injection_1.xml", "0002:0004:Injection_2.xml"])
179  {'0000': 'Injection_1.xml', '0001': 'Injection_1.xml', '0002': 'Injection_2.xml', '0003': 'Injection_2.xml'}
180  """
181  outdict = {}
182  for injection_name in injection_list:
183  injection_name_split = injection_name.split(':')
184  for ii in range(int(injection_name_split[0]),int(injection_name_split[1])):
185  outdict[str(ii).zfill(4)] = injection_name_split[2]
186  return outdict
187 
188 
190 state_vector_on_off_dict = {
191  "H1" : [0x7, 0x160],
192  "H2" : [0x7, 0x160],
193  "L1" : [0x7, 0x160],
194  "V1" : [0x67, 0x100]
195 }
196 
197 
198 
200 dq_vector_on_off_dict = {
201  "H1" : [0x7, 0x0],
202  "H2" : [0x7, 0x0],
203  "L1" : [0x7, 0x0],
204  "V1" : [0x7, 0x0]
205 }
206 
207 
208 def state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list, state_vector_on_off_dict = state_vector_on_off_dict):
209  """
210  Produce a dictionary (keyed by detector) of on / off bit tuples from a
211  list provided on the command line.
212 
213  Takes default values from module level datasource.state_vector_on_off_dict
214  if state_vector_on_off_dict is not given
215 
216  Inputs must be given as base 10 or 16 integers
217 
218  Examples:
219 
220  >>> on_bit_list = ["V1=7", "H1=7", "L1=7"]
221  >>> off_bit_list = ["V1=256", "H1=352", "L1=352"]
222  >>> state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list)
223  {'H2': [7, 352], 'V1': [7, 256], 'H1': [7, 352], 'L1': [7, 352]}
224 
225  >>> state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list,{})
226  {'V1': [7, 256], 'H1': [7, 352], 'L1': [7, 352]}
227 
228  >>> on_bit_list = ["V1=0x7", "H1=0x7", "L1=0x7"]
229  >>> off_bit_list = ["V1=0x256", "H1=0x352", "L1=0x352"]
230  >>> state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list,{})
231  {'V1': [7, 598], 'H1': [7, 850], 'L1': [7, 850]}
232  """
233  for ifo, bits in [line.strip().split("=", 1) for line in on_bit_list]:
234  bits = int(bits, 16) if bits.startswith("0x") else int(bits)
235  try:
236  state_vector_on_off_dict[ifo][0] = bits
237  except KeyError:
238  state_vector_on_off_dict[ifo] = [bits, 0]
239 
240  for ifo, bits in [line.strip().split("=", 1) for line in off_bit_list]:
241  bits = int(bits, 16) if bits.startswith("0x") else int(bits)
242  # shouldn't have to worry about key errors at this point
243  state_vector_on_off_dict[ifo][1] = bits
244 
245  return state_vector_on_off_dict
246 
247 
248 def state_vector_on_off_list_from_bits_dict(bit_dict):
249  """
250  Produce a tuple of useful command lines from a dictionary of on / off state
251  vector bits keyed by detector
252 
253  FIXME: This function exists to work around pipeline.py's inability to
254  give the same option more than once by producing a string to pass as an argument
255  that encodes the other instances of the option.
256 
257  Examples:
258 
259  >>> state_vector_on_off_dict = {"H1":[0x7, 0x160], "H2":[0x7, 0x160], "L1":[0x7, 0x160], "V1":[0x67, 0x100]}
260  >>> state_vector_on_off_list_from_bits_dict(state_vector_on_off_dict)
261  ('H2=7 --state-vector-on-bits=V1=103 --state-vector-on-bits=H1=7 --state-vector-on-bits=L1=7 ', 'H2=352 --state-vector-off-bits=V1=256 --state-vector-off-bits=H1=352 --state-vector-off-bits=L1=352 ')
262  """
263 
264  onstr = ""
265  offstr = ""
266  for i, ifo in enumerate(bit_dict):
267  if i == 0:
268  onstr += "%s=%s " % (ifo, bit_dict[ifo][0])
269  offstr += "%s=%s " % (ifo, bit_dict[ifo][1])
270  else:
271  onstr += "--state-vector-on-bits=%s=%s " % (ifo, bit_dict[ifo][0])
272  offstr += "--state-vector-off-bits=%s=%s " % (ifo, bit_dict[ifo][1])
273 
274  return onstr, offstr
275 
276 
277 
286 framexmit_ports = {
287  "CIT": {
288  "H1": ("224.3.2.1", 7096),
289  "L1": ("224.3.2.2", 7097),
290  "V1": ("224.3.2.3", 7098),
291  }
292 }
293 
294 
295 def framexmit_dict_from_framexmit_list(framexmit_list):
296  """
297  Given a list of framexmit addresses with ports, produce a dictionary keyed by ifo:
298 
299  The list here typically comes from an option parser with options that
300  specify the "append" action.
301 
302  Examples:
303 
304  >>> framexmit_dict_from_framexmit_list(["H1=224.3.2.1:7096", "L1=224.3.2.2:7097", "V1=224.3.2.3:7098"])
305  {'V1': ('224.3.2.3', 7098), 'H1': ('224.3.2.1', 7096), 'L1': ('224.3.2.2', 7097)}
306  """
307  out = []
308  for instrument_addr in framexmit_list:
309  ifo, addr_port = instrument_addr.split("=")
310  addr, port = addr_port.split(':')
311  out.append((ifo, (addr, int(port))))
312  return dict(out)
313 
314 
315 def framexmit_list_from_framexmit_dict(framexmit_dict, ifos = None, opt = "framexmit-addr"):
316  """
317  Creates a string of framexmit address options from a dictionary keyed by ifos.
318 
319  Examples:
320 
321  >>> framexmit_list_from_framexmit_dict({'V1': ('224.3.2.3', 7098), 'H1': ('224.3.2.1', 7096), 'L1': ('224.3.2.2', 7097)})
322  'V1=224.3.2.3:7098 --framexmit-addr=H1=224.3.2.1:7096 --framexmit-addr=L1=224.3.2.2:7097 '
323  """
324  outstr = ""
325  if ifos is None:
326  ifos = framexmit_dict.keys()
327  for i, ifo in enumerate(ifos):
328  if i == 0:
329  outstr += "%s=%s:%s " % (ifo, framexmit_dict[ifo][0], framexmit_dict[ifo][1])
330  else:
331  outstr += "--%s=%s=%s:%s " % (opt, ifo, framexmit_dict[ifo][0], framexmit_dict[ifo][1])
332 
333  return outstr
334 
335 
336 def pipeline_seek_for_gps(pipeline, gps_start_time, gps_end_time, flags = Gst.SeekFlags.FLUSH):
337  """
338  Create a new seek event, i.e., Gst.Event.new_seek() for a given
339  gps_start_time and gps_end_time, with optional flags.
340 
341  @param gps_start_time start time as LIGOTimeGPS, double or float
342  @param gps_end_time start time as LIGOTimeGPS, double or float
343  """
344  def seek_args_for_gps(gps_time):
345  """!
346  Convenience routine to convert a GPS time to a seek type and a
347  GStreamer timestamp.
348  """
349 
350  if gps_time is None or gps_time == -1:
351  return (Gst.SeekType.NONE, -1) # -1 == Gst.CLOCK_TIME_NONE
352  elif hasattr(gps_time, 'ns'):
353  return (Gst.SeekType.SET, gps_time.ns())
354  else:
355  return (Gst.SeekType.SET, long(float(gps_time) * Gst.SECOND))
356 
357  start_type, start_time = seek_args_for_gps(gps_start_time)
358  stop_type, stop_time = seek_args_for_gps(gps_end_time)
359 
360  # FIXME: should seek whole pipeline, but there are several
361  # problems preventing us from doing that.
362  #
363  # because the framecpp demuxer has no source pads until decoding
364  # begins, the bottom halves of pipelines start out disconnected
365  # from the top halves of pipelines, which means the seek events
366  # (which are sent to sink elements) don't make it all the way to
367  # the source elements. dynamic pipeline building will not fix the
368  # problem because the dumxer does not carry the "SINK" flag so even
369  # though it starts with only a sink pad and no source pads it still
370  # won't be sent the seek event. gstreamer's own demuxers must
371  # somehow have a solution to this problem, but I don't know what it
372  # is. I notice that many implement the send_event() method
373  # override, and it's possible that's part of the solution.
374  #
375  # seeking the pipeline can only be done in the PAUSED state. the
376  # GstBaseSrc baseclass seeks itself to 0 when changing to the
377  # paused state, and the preroll is performed before the seek event
378  # we send to the pipeline is processed, so the preroll occurs with
379  # whatever random data a seek to "0" causes source elements to
380  # produce. for us, when processing GW data, this leads to the
381  # whitener element's initial spectrum estimate being initialized
382  # from that random data, and a non-zero chance of even getting
383  # triggers out of it, all of which is very bad.
384  #
385  # the only way we have at the moment to solve both problems --- to
386  # ensure seek events arrive at source elements and to work around
387  # GstBaseSrc's initial seek to 0 --- is to send seek events
388  # directly to the source elements ourselves before putting the
389  # pipeline into the PAUSED state. the elements are happy to
390  # receive seek events in the READY state, and GstBaseSrc updtes its
391  # current segment using that seek so that when it transitions to
392  # the PAUSED state and does its intitial seek it seeks to our
393  # requested time, not to 0.
394  #
395  # So: this function needs to be called with the pipeline in the
396  # READY state in order to guarantee the data stream starts at the
397  # requested start time, and does not get prerolled with random
398  # data. For safety we include a check of the pipeline's current
399  # state.
400  #
401  # if in the future we find some other solution to these problems
402  # the story might change and the pipeline state required on entry
403  # into this function might change.
404 
405  #pipeline.seek(1.0, Gst.Format(Gst.Format.TIME), flags, start_type, start_time, stop_type, stop_time)
406 
407  if pipeline.current_state != Gst.State.READY:
408  raise ValueError("pipeline must be in READY state")
409 
410  for elem in pipeline.iterate_sources():
411  elem.seek(1.0, Gst.Format(Gst.Format.TIME), flags, start_type, start_time, stop_type, stop_time)
412 
413 
414 class GWDataSourceInfo(object):
415  """
416  Hold the data associated with data source command lines.
417  """
418 
419  def __init__(self, options):
420  """!
421  Initialize a GWDataSourceInfo class instance from command line options specified by append_options()
422  """
423 
424 
425  self.data_sources = set(("frames", "framexmit", "lvshm", "nds", "white", "silence", "AdvVirgo", "LIGO", "AdvLIGO"))
426  self.live_sources = set(("framexmit", "lvshm"))
427  assert self.live_sources <= self.data_sources
428 
429  # Sanity check the options
430  if options.data_source not in self.data_sources:
431  raise ValueError("--data-source must be one of %s" % ", ".join(self.data_sources))
432  if options.data_source == "frames" and options.frame_cache is None:
433  raise ValueError("--frame-cache must be specified when using --data-source=frames")
434  if not options.channel_name:
435  raise ValueError("must specify at least one channel in the form --channel-name=IFO=CHANNEL-NAME")
436  if options.frame_segments_file is not None and options.data_source != "frames":
437  raise ValueError("can only give --frame-segments-file if --data-source=frames")
438  if options.frame_segments_name is not None and options.frame_segments_file is None:
439  raise ValueError("can only specify --frame-segments-name if --frame-segments-file is given")
440  if options.data_source == "nds" and (options.nds_host is None or options.nds_port is None):
441  raise ValueError("must specify --nds-host and --nds-port when using --data-source=nds")
442 
443 
444  self.channel_dict = channel_dict_from_channel_list(options.channel_name)
445 
446 
447  self.shm_part_dict = {"H1": "LHO_Data", "L1": "LLO_Data", "V1": "VIRGO_Data"}
448  if options.shared_memory_partition is not None:
449  self.shm_part_dict.update( channel_dict_from_channel_list(options.shared_memory_partition) )
450 
451 
452  self.shm_assumed_duration = options.shared_memory_assumed_duration
453  self.shm_block_size = options.shared_memory_block_size # NOTE: should this be incorporated into options.block_size? currently only used for offline data sources
454 
455 
456  self.framexmit_addr = framexmit_ports["CIT"]
457  if options.framexmit_addr is not None:
458  self.framexmit_addr.update( framexmit_dict_from_framexmit_list(options.framexmit_addr) )
459  self.framexmit_iface = options.framexmit_iface
460 
461 
462  self.seg = None
463 
464  if options.gps_start_time is not None:
465  if options.gps_end_time is None:
466  raise ValueError("must provide both --gps-start-time and --gps-end-time")
467  if options.data_source in self.live_sources:
468  raise ValueError("cannot set --gps-start-time or --gps-end-time with %s" % " or ".join("--data-source=%s" % src for src in sorted(self.live_sources)))
469  try:
470  start = LIGOTimeGPS(options.gps_start_time)
471  except ValueError:
472  raise ValueError("invalid --gps-start-time '%s'" % options.gps_start_time)
473  try:
474  end = LIGOTimeGPS(options.gps_end_time)
475  except ValueError:
476  raise ValueError("invalid --gps-end-time '%s'" % options.gps_end_time)
477  if start >= end:
478  raise ValueError("--gps-start-time must be < --gps-end-time: %s < %s" % (options.gps_start_time, options.gps_end_time))
479 
480  self.seg = segments.segment(LIGOTimeGPS(options.gps_start_time), LIGOTimeGPS(options.gps_end_time))
481  elif options.gps_end_time is not None:
482  raise ValueError("must provide both --gps-start-time and --gps-end-time")
483  elif options.data_source not in self.live_sources:
484  raise ValueError("--gps-start-time and --gps-end-time must be specified when --data-source not one of %s" % ", ".join(sorted(self.live_sources)))
485 
486  if options.frame_segments_file is not None:
487 
488  self.frame_segments = ligolw_segments.segmenttable_get_by_name(ligolw_utils.load_filename(options.frame_segments_file, contenthandler=ligolw_segments.LIGOLWContentHandler), options.frame_segments_name).coalesce()
489  if self.seg is not None:
490  # Clip frame segments to seek segment if it
491  # exists (not required, just saves some
492  # memory and I/O overhead)
493  self.frame_segments = segments.segmentlistdict((instrument, seglist & segments.segmentlist([self.seg])) for instrument, seglist in self.frame_segments.items())
494  else:
495 
496  self.frame_segments = segments.segmentlistdict((instrument, None) for instrument in self.channel_dict)
497 
498 
499  self.state_channel_dict = { "H1": "LLD-DQ_VECTOR", "H2": "LLD-DQ_VECTOR","L1": "LLD-DQ_VECTOR", "V1": "LLD-DQ_VECTOR" }
500  self.dq_channel_dict = { "H1": "DMT-DQ_VECTOR", "H2": "DMT-DQ_VECTOR","L1": "DMT-DQ_VECTOR", "V1": "DMT-DQ_VECTOR" }
501 
502  if options.state_channel_name is not None:
503  state_channel_dict_from_options = channel_dict_from_channel_list( options.state_channel_name )
504  instrument = state_channel_dict_from_options.keys()[0]
505  self.state_channel_dict.update( state_channel_dict_from_options )
506 
507  if options.dq_channel_name is not None:
508  dq_channel_dict_from_options = channel_dict_from_channel_list( options.dq_channel_name )
509  instrument = dq_channel_dict_from_options.keys()[0]
510  self.dq_channel_dict.update( dq_channel_dict_from_options )
511 
512 
513  self.state_vector_on_off_bits = state_vector_on_off_dict_from_bit_lists(options.state_vector_on_bits, options.state_vector_off_bits, state_vector_on_off_dict)
514  self.dq_vector_on_off_bits = state_vector_on_off_dict_from_bit_lists(options.dq_vector_on_bits, options.dq_vector_off_bits, dq_vector_on_off_dict)
515 
516 
517  self.frame_cache = options.frame_cache
518 
519  self.block_size = options.block_size
520 
521  self.data_source = options.data_source
522 
523  self.injection_filename = options.injections
524 
525  if options.data_source == "nds":
526 
527  self.nds_host = options.nds_host
528 
529  self.nds_port = options.nds_port
530 
531  self.nds_channel_type = options.nds_channel_type
532 
533 
534 def append_options(parser):
535  """
536  Append generic data source options to an OptionParser object in order
537  to have consistent an unified command lines and parsing throughout the project
538  for applications that read GW data.
539 
540 - --data-source [string]
541  Set the data source from [frames|framexmit|lvshm|nds|silence|white|AdvVirgo|LIGO|AdvLIGO].
542 
543 - --block-size [int] (bytes)
544  Data block size to read in bytes. Default 16384 * 8 * 512 which is 512 seconds of double
545  precision data at 16384 Hz. This parameter is only used if --data-source is one of
546  white, silence, AdvVirgo, LIGO, AdvLIGO, nds.
547 
548 - --frame-cache [filename]
549  Set the name of the LAL cache listing the LIGO-Virgo .gwf frame files (optional).
550  This is required iff --data-sourceframes
551 
552 - --gps-start-time [int] (seconds)
553  Set the start time of the segment to analyze in GPS seconds.
554  Required unless --data-source is lvshm or framexmit
555 
556 - --gps-end-time [int] (seconds)
557  Set the end time of the segment to analyze in GPS seconds.
558  Required unless --data-source in lvshm,framexmit
559 
560 - --injections [filename]
561  Set the name of the LIGO light-weight XML file from which to load injections (optional).
562 
563 - --channel-name [string]
564  Set the name of the channels to process.
565  Can be given multiple times as --channel-name=IFO=CHANNEL-NAME
566 
567 - --nds-host [hostname]
568  Set the remote host or IP address that serves nds data.
569  This is required iff --data-source is nds
570 
571 - --nds-port [portnumber]
572  Set the port of the remote host that serves nds data, default = 31200.
573  This is required iff --data-source is nds
574 
575 - --nds-channel-type [string] type
576  FIXME please document
577 
578 - --framexmit-addr [string]
579  Set the address of the framexmit service. Can be given
580  multiple times as --framexmit-addr=IFO=xxx.xxx.xxx.xxx:port
581 
582 - --framexmit-iface [string]
583  Set the address of the framexmit interface.
584 
585 - --state-channel-name [string]
586  Set the name of the state vector channel.
587  This channel will be used to control the flow of data via the on/off bits.
588  Can be given multiple times as --state-channel-name=IFO=STATE-CHANNEL-NAME
589 
590 - --dq-channel-name [string]
591  Set the name of the data quality channel.
592  This channel will be used to control the flow of data via the on/off bits.
593  Can be given multiple times as --state-channel-name=IFO=DQ-CHANNEL-NAME
594 
595 - --shared-memory-partition [string]
596  Set the name of the shared memory partition for a given instrument.
597  Can be given multiple times as --shared-memory-partition=IFO=PARTITION-NAME
598 
599 - --shared-memory-assumed-duration [int]
600  Set the assumed span of files in seconds. Default = 4 seconds.
601 
602 - --shared-memory-block-size [int]
603  Set the byte size to read per buffer. Default = 4096 bytes.
604 
605 - --frame-segments-file [filename]
606  Set the name of the LIGO light-weight XML file from which to load frame segments.
607  Optional iff --data-source is frames
608 
609 - --frame-segments-name [string]
610  Set the name of the segments to extract from the segment tables.
611  Required iff --frame-segments-file is given
612 
613 - --state-vector-on-bits [hex]
614  Set the state vector on bits to process (optional).
615  The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times.
616  Only currently has meaning for online (lvshm, framexmit) data
617 
618 - --state-vector-off-bits [hex]
619  Set the state vector off bits to process (optional).
620  The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times.
621  Only currently has meaning for online (lvshm, framexmit) data
622 
623 - --dq-vector-on-bits [hex]
624  Set the state vector on bits to process (optional).
625  The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times.
626  Only currently has meaning for online (lvshm, framexmit) data
627 
628 - --dq-vector-off-bits [hex]
629  Set the dq vector off bits to process (optional).
630  The default is 0x0 for all detectors. Override with IFO=bits can be given multiple times.
631  Only currently has meaning for online (lvshm, framexmit) data
632 
633  **Typical usage case examples**
634 
635  1. Reading data from frames::
636 
637  --data-source=frames --gps-start-time=999999000 --gps-end-time=999999999 \\
638  --channel-name=H1=LDAS-STRAIN --frame-segments-file=segs.xml \\
639  --frame-segments-name=datasegments
640 
641  2. Reading data from a fake LIGO source::
642 
643  --data-source=LIGO --gps-start-time=999999000 --gps-end-time=999999999 \\
644  --channel-name=H1=FAIKE-STRAIN
645 
646  3. Reading online data via framexmit::
647 
648  --data-source=framexmit --channel-name=H1=FAIKE-STRAIN
649 
650  4. Many other combinations possible, please add some!
651  """
652  group = optparse.OptionGroup(parser, "Data source options", "Use these options to set up the appropriate data source")
653  group.add_option("--data-source", metavar = "source", help = "Set the data source from [frames|framexmit|lvshm|nds|silence|white|AdvVirgo|LIGO|AdvLIGO]. Required.")
654  group.add_option("--block-size", type="int", metavar = "bytes", default = 16384 * 8 * 512, help = "Data block size to read in bytes. Default 16384 * 8 * 512 (512 seconds of double precision data at 16384 Hz. This parameter is only used if --data-source is one of white, silence, AdvVirgo, LIGO, AdvLIGO, nds.")
655  group.add_option("--frame-cache", metavar = "filename", help = "Set the name of the LAL cache listing the LIGO-Virgo .gwf frame files (optional). This is required iff --data-source=frames")
656  group.add_option("--gps-start-time", metavar = "seconds", help = "Set the start time of the segment to analyze in GPS seconds. Required unless --data-source=lvshm")
657  group.add_option("--gps-end-time", metavar = "seconds", help = "Set the end time of the segment to analyze in GPS seconds. Required unless --data-source=lvshm")
658  group.add_option("--injections", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load injections (optional).")
659  group.add_option("--channel-name", metavar = "name", action = "append", help = "Set the name of the channels to process. Can be given multiple times as --channel-name=IFO=CHANNEL-NAME")
660  group.add_option("--nds-host", metavar = "hostname", help = "Set the remote host or IP address that serves nds data. This is required iff --data-source=nds")
661  group.add_option("--nds-port", metavar = "portnumber", type=int, default=31200, help = "Set the port of the remote host that serves nds data. This is required iff --data-source=nds")
662  group.add_option("--nds-channel-type", metavar = "type", default = "online", help = "Set the port of the remote host that serves nds data. This is required only if --data-source=nds. default==online")
663  group.add_option("--framexmit-addr", metavar = "name", action = "append", help = "Set the address of the framexmit service. Can be given multiple times as --framexmit-addr=IFO=xxx.xxx.xxx.xxx:port")
664  group.add_option("--framexmit-iface", metavar = "name", help = "Set the multicast interface address of the framexmit service.")
665  group.add_option("--state-channel-name", metavar = "name", action = "append", help = "Set the name of the state vector channel. This channel will be used to control the flow of data via the on/off bits. Can be given multiple times as --channel-name=IFO=CHANNEL-NAME")
666  group.add_option("--dq-channel-name", metavar = "name", action = "append", help = "Set the name of the data quality channel. This channel will be used to control the flow of data via the on/off bits. Can be given multiple times as --channel-name=IFO=CHANNEL-NAME")
667  group.add_option("--shared-memory-partition", metavar = "name", action = "append", help = "Set the name of the shared memory partition for a given instrument. Can be given multiple times as --shared-memory-partition=IFO=PARTITION-NAME")
668  group.add_option("--shared-memory-assumed-duration", type = "int", default = 4, help = "Set the assumed span of files in seconds. Default = 4.")
669  group.add_option("--shared-memory-block-size", type = "int", default = 4096, help = "Set the byte size to read per buffer. Default = 4096.")
670  group.add_option("--frame-segments-file", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load frame segments. Optional iff --data-source=frames")
671  group.add_option("--frame-segments-name", metavar = "name", help = "Set the name of the segments to extract from the segment tables. Required iff --frame-segments-file is given")
672  group.add_option("--state-vector-on-bits", metavar = "bits", default = [], action = "append", help = "Set the state vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm) data.")
673  group.add_option("--state-vector-off-bits", metavar = "bits", default = [], action = "append", help = "Set the state vector off bits to process (optional). The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm) data.")
674  group.add_option("--dq-vector-on-bits", metavar = "bits", default = [], action = "append", help = "Set the DQ vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm) data.")
675  group.add_option("--dq-vector-off-bits", metavar = "bits", default = [], action = "append", help = "Set the DQ vector off bits to process (optional). The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm) data.")
676  parser.add_option_group(group)
677 
678 
679 def mksegmentsrcgate(pipeline, src, segment_list, invert_output = False, rate = 1, **kwargs):
680  """
681  Takes a segment list and produces a gate driven by it. Hook up your own input and output.
682 
683  @param kwargs passed through to pipeparts.mkgate(), e.g., used to set the gate's name.
684 
685  Gstreamer graph describing this function:
686 
687  .. graphviz::
688 
689  digraph G {
690  compound=true;
691  node [shape=record fontsize=10 fontname="Verdana"];
692  rankdir=LR;
693  lal_segmentsrc;
694  lal_gate;
695  in [label="<src>"];
696  out [label="<return value>"];
697  in -> lal_gate -> out;
698  lal_segmentsrc -> lal_gate;
699  }
700 
701  """
702  return pipeparts.mkgate(pipeline, src, threshold = 1, control = pipeparts.mkcapsfilter(pipeline, pipeparts.mksegmentsrc(pipeline, segment_list, invert_output = invert_output), caps = "audio/x-raw, rate=%d" % rate), **kwargs)
703 
704 
705 def mkbasicsrc(pipeline, gw_data_source_info, instrument, verbose = False):
706  """
707  All the conditionals and stupid pet tricks for reading real or
708  simulated h(t) data in one place.
709 
710  Consult the append_options() function and the GWDataSourceInfo class
711 
712  This src in general supports only one instrument although
713  GWDataSourceInfo contains dictionaries of multi-instrument things. By
714  specifying the instrument when calling this function you will get ony a single
715  instrument source. A code wishing to have multiple basicsrcs will need to call
716  this function for each instrument.
717 
718  **Gstreamer Graph**
719 
720  .. graphviz::
721 
722  digraph mkbasicsrc {
723  compound=true;
724  node [shape=record fontsize=10 fontname="Verdana"];
725  subgraph clusterfakesrc {
726  fake_0 [label="fakesrc: white, silence, AdvVirgo, LIGO, AdvLIGO"];
727  color=black;
728  label="Possible path #1";
729  }
730  subgraph clusterframes {
731  color=black;
732  frames_0 [label="lalcachesrc: frames"];
733  frames_1 [label ="framecppchanneldemux"];
734  frames_2 [label ="queue"];
735  frames_3 [label ="gate (if user provides segments)", style=filled, color=lightgrey];
736  frames_4 [label ="audiorate"];
737  frames_0 -> frames_1 -> frames_2 -> frames_3 ->frames_4;
738  label="Possible path #2";
739  }
740  subgraph clusteronline {
741  color=black;
742  online_0 [label="lvshmsrc|framexmit"];
743  online_1 [label ="framecppchanneldemux"];
744  online_2a [label ="strain queue"];
745  online_2b [label ="statevector queue"];
746  online_3 [label ="statevector"];
747  online_4 [label ="gate"];
748  online_5 [label ="audiorate"];
749  online_6 [label ="queue"];
750  online_0 -> online_1;
751  online_1 -> online_2a;
752  online_1 -> online_2b;
753  online_2b -> online_3;
754  online_2a -> online_4;
755  online_3 -> online_4 -> online_5 -> online_6;
756  label="Possible path #3";
757  }
758  subgraph clusternds {
759  nds_0 [label="ndssrc"];
760  color=black;
761  label="Possible path #4";
762  }
763  audioconv [label="audioconvert"];
764  progress [label="progressreport (if verbose)", style=filled, color=lightgrey];
765  sim [label="lalsimulation (if injections requested)", style=filled, color=lightgrey];
766  queue [label="queue (if injections requested)", style=filled, color=lightgrey];
767 
768  // The connections
769  fake_0 -> audioconv [ltail=clusterfakesrc];
770  frames_4 -> audioconv [ltail=clusterframes];
771  online_6 -> audioconv [ltail=clusteronline];
772  nds_0 -> audioconv [ltail=clusternds];
773  audioconv -> progress -> sim -> queue -> "?";
774  }
775 
776  """
777  statevector = dqvector = None
778 
779  # NOTE: timestamp_offset is a hack to allow seeking with fake
780  # sources, a real solution should be fixing the general timestamp
781  # problem which would allow seeking to work properly
782  if gw_data_source_info.data_source == "white":
783  src = pipeparts.mkfakesrc(pipeline, instrument, gw_data_source_info.channel_dict[instrument], blocksize = gw_data_source_info.block_size, volume = 1.0, timestamp_offset = int(gw_data_source_info.seg[0]) * Gst.SECOND)
784  elif gw_data_source_info.data_source == "silence":
785  src = pipeparts.mkfakesrc(pipeline, instrument, gw_data_source_info.channel_dict[instrument], blocksize = gw_data_source_info.block_size, wave = 4, timestamp_offset = int(gw_data_source_info.seg[0]) * Gst.SECOND)
786  elif gw_data_source_info.data_source == "LIGO":
787  src = pipeparts.mkfakeLIGOsrc(pipeline, instrument = instrument, channel_name = gw_data_source_info.channel_dict[instrument], blocksize = gw_data_source_info.block_size)
788  elif gw_data_source_info.data_source == "AdvLIGO":
789  src = pipeparts.mkfakeadvLIGOsrc(pipeline, instrument = instrument, channel_name = gw_data_source_info.channel_dict[instrument], blocksize = gw_data_source_info.block_size)
790  elif gw_data_source_info.data_source == "AdvVirgo":
791  src = pipeparts.mkfakeadvvirgosrc(pipeline, instrument = instrument, channel_name = gw_data_source_info.channel_dict[instrument], blocksize = gw_data_source_info.block_size)
792  elif gw_data_source_info.data_source == "frames":
793  if instrument == "V1":
794  # FIXME Hack because virgo often just uses "V" in
795  # the file names rather than "V1". We need to
796  # sieve on "V"
797  src = pipeparts.mklalcachesrc(pipeline, location = gw_data_source_info.frame_cache, cache_src_regex = "V")
798  else:
799  src = pipeparts.mklalcachesrc(pipeline, location = gw_data_source_info.frame_cache, cache_src_regex = instrument[0], cache_dsc_regex = instrument)
800  demux = pipeparts.mkframecppchanneldemux(pipeline, src, do_file_checksum = False, channel_list = map("%s:%s".__mod__, gw_data_source_info.channel_dict.items()))
801  pipeparts.framecpp_channeldemux_set_units(demux, dict.fromkeys(demux.get_property("channel-list"), "strain"))
802  # allow frame reading and decoding to occur in a diffrent
803  # thread
804  src = pipeparts.mkqueue(pipeline, None, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 8 * Gst.SECOND)
805  pipeparts.src_deferred_link(demux, "%s:%s" % (instrument, gw_data_source_info.channel_dict[instrument]), src.get_static_pad("sink"))
806  if gw_data_source_info.frame_segments[instrument] is not None:
807  # FIXME: make segmentsrc generate segment samples
808  # at the sample rate of h(t)?
809  # FIXME: make gate leaky when I'm certain that
810  # will work.
811  src = pipeparts.mkgate(pipeline, src, threshold = 1, control = pipeparts.mksegmentsrc(pipeline, gw_data_source_info.frame_segments[instrument]), name = "%s_frame_segments_gate" % instrument)
812  pipeparts.framecpp_channeldemux_check_segments.set_probe(src.get_static_pad("src"), gw_data_source_info.frame_segments[instrument])
813  # FIXME: remove this when pipeline can handle disconts
814  src = pipeparts.mkaudiorate(pipeline, src, skip_to_first = True, silent = False)
815  elif gw_data_source_info.data_source in ("framexmit", "lvshm"):
816  # See https://wiki.ligo.org/DAC/ER2DataDistributionPlan#LIGO_Online_DQ_Channel_Specifica
817  state_vector_on_bits, state_vector_off_bits = gw_data_source_info.state_vector_on_off_bits[instrument]
818  dq_vector_on_bits, dq_vector_off_bits = gw_data_source_info.dq_vector_on_off_bits[instrument]
819 
820  if gw_data_source_info.data_source == "lvshm":
821  # FIXME make wait_time adjustable through web
822  # interface or command line or both
823  src = pipeparts.mklvshmsrc(pipeline, shm_name = gw_data_source_info.shm_part_dict[instrument], assumed_duration = gw_data_source_info.shm_assumed_duration, blocksize = gw_data_source_info.shm_block_size, wait_time = 120)
824  elif gw_data_source_info.data_source == "framexmit":
825  src = pipeparts.mkframexmitsrc(pipeline, multicast_iface = gw_data_source_info.framexmit_iface, multicast_group = gw_data_source_info.framexmit_addr[instrument][0], port = gw_data_source_info.framexmit_addr[instrument][1], wait_time = 120)
826  else:
827  # impossible code path
828  raise ValueError(gw_data_source_info.data_source)
829 
830  # 10 minutes of buffering, then demux
831  src = pipeparts.mkqueue(pipeline, src, max_size_buffers = 0, max_size_bytes = 0, max_size_time = Gst.SECOND * 60 * 10)
832  src = pipeparts.mkframecppchanneldemux(pipeline, src, do_file_checksum = False, skip_bad_files = True)
833 
834  # extract state vector and DQ vector and convert to
835  # booleans
836  statevector = pipeparts.mkstatevector(pipeline, None, required_on = state_vector_on_bits, required_off = state_vector_off_bits, name = "%s_state_vector" % instrument)
837  dqvector = pipeparts.mkstatevector(pipeline, None, required_on = dq_vector_on_bits, required_off = dq_vector_off_bits, name = "%s_dq_vector" % instrument)
838  pipeparts.src_deferred_link(src, "%s:%s" % (instrument, gw_data_source_info.state_channel_dict[instrument]), statevector.get_static_pad("sink"))
839  pipeparts.src_deferred_link(src, "%s:%s" % (instrument, gw_data_source_info.dq_channel_dict[instrument]), dqvector.get_static_pad("sink"))
840  @bottle.route("/%s/statevector_on.txt" % instrument)
841  def state_vector_state(elem = statevector):
842  t = float(lal.UTCToGPS(time.gmtime()))
843  on = elem.get_property("on-samples")
844  return "%.9f %d" % (t, on)
845  @bottle.route("/%s/statevector_off.txt" % instrument)
846  def state_vector_state(elem = statevector):
847  t = float(lal.UTCToGPS(time.gmtime()))
848  off = elem.get_property("off-samples")
849  return "%.9f %d" % (t, off)
850  @bottle.route("/%s/statevector_gap.txt" % instrument)
851  def state_vector_state(elem = statevector):
852  t = float(lal.UTCToGPS(time.gmtime()))
853  gap = elem.get_property("gap-samples")
854  return "%.9f %d" % (t, gap)
855  @bottle.route("/%s/dqvector_on.txt" % instrument)
856  def dq_vector_state(elem = dqvector):
857  t = float(lal.UTCToGPS(time.gmtime()))
858  on = elem.get_property("on-samples")
859  return "%.9f %d" % (t, on)
860  @bottle.route("/%s/dqvector_off.txt" % instrument)
861  def dq_vector_state(elem = dqvector):
862  t = float(lal.UTCToGPS(time.gmtime()))
863  off = elem.get_property("off-samples")
864  return "%.9f %d" % (t, off)
865  @bottle.route("/%s/dqvector_gap.txt" % instrument)
866  def dq_vector_state(elem = dqvector):
867  t = float(lal.UTCToGPS(time.gmtime()))
868  gap = elem.get_property("gap-samples")
869  return "%.9f %d" % (t, gap)
870 
871  # extract strain with 1 buffer of buffering
872  strain = pipeparts.mkqueue(pipeline, None, max_size_buffers = 1, max_size_bytes = 0, max_size_time = 0)
873  pipeparts.src_deferred_link(src, "%s:%s" % (instrument, gw_data_source_info.channel_dict[instrument]), strain.get_static_pad("sink"))
874  pipeparts.framecpp_channeldemux_set_units(src, {"%s:%s" % (instrument, gw_data_source_info.channel_dict[instrument]): "strain"})
875 
876  # fill in holes, skip duplicate data
877  statevector = pipeparts.mkaudiorate(pipeline, statevector, skip_to_first = True, silent = False)
878  dqvector = pipeparts.mkaudiorate(pipeline, dqvector, skip_to_first = True, silent = False)
879  src = pipeparts.mkaudiorate(pipeline, strain, skip_to_first = True, silent = False)
880  @bottle.route("/%s/strain_add_drop.txt" % instrument)
881  # FIXME don't hard code the sample rate
882  def strain_add(elem = src, rate = 16384):
883  t = float(lal.UTCToGPS(time.gmtime()))
884  add = elem.get_property("add")
885  drop = elem.get_property("drop")
886  return "%.9f %d %d" % (t, add // rate, drop // rate)
887 
888  # use state vector and DQ vector to gate strain. the sizes
889  # of the queues on the control inputs are not important.
890  # they must be large enough to buffer the state vector
891  # streams until they are needed, but the streams will be
892  # consumed immediately when needed so there is no risk that
893  # these queues actually fill up or add latency. be
894  # generous.
895  statevector = pipeparts.mktee(pipeline, statevector)
896  dqvector = pipeparts.mktee(pipeline, dqvector)
897  src = pipeparts.mkgate(pipeline, src, threshold = 1, control = pipeparts.mkqueue(pipeline, statevector, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 0), default_state = False, name = "%s_state_vector_gate" % instrument)
898  src = pipeparts.mkgate(pipeline, src, threshold = 1, control = pipeparts.mkqueue(pipeline, dqvector, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 0), default_state = False, name = "%s_dq_vector_gate" % instrument)
899  elif gw_data_source_info.data_source == "nds":
900  src = pipeparts.mkndssrc(pipeline, gw_data_source_info.nds_host, instrument, gw_data_source_info.channel_dict[instrument], gw_data_source_info.nds_channel_type, blocksize = gw_data_source_info.block_size, port = gw_data_source_info.nds_port)
901  else:
902  raise ValueError("invalid data_source: %s" % gw_data_source_info.data_source)
903 
904  #
905  # provide an audioconvert element to allow Virgo data (which is
906  # single-precision) to be adapted into the pipeline
907  #
908 
909  src = pipeparts.mkaudioconvert(pipeline, src)
910 
911  #
912  # progress report
913  #
914 
915  if verbose:
916  src = pipeparts.mkprogressreport(pipeline, src, "progress_src_%s" % instrument)
917 
918  #
919  # optional injections
920  #
921 
922  if gw_data_source_info.injection_filename is not None:
923  src = pipeparts.mkinjections(pipeline, src, gw_data_source_info.injection_filename)
924  # let the injection code run in a different thread than the
925  # whitener, etc.,
926  src = pipeparts.mkqueue(pipeline, src, max_size_bytes = 0, max_size_buffers = 0, max_size_time = Gst.SECOND * 64)
927 
928  #
929  # done
930  #
931 
932  return src, statevector, dqvector
933 
934 
935 def mkhtgate(pipeline, src, control = None, threshold = 8.0, attack_length = 128, hold_length = 128, **kwargs):
936  """
937  A convenience function to provide thresholds on input data. This can
938  be used to remove large spikes / glitches etc. Of course you can use it for
939  other stuff by plugging whatever you want as input and ouput
940 
941  NOTE: the queues constructed by this code assume the attack and
942  hold lengths combined are less than 1 second in duration.
943 
944  **Gstreamer Graph**
945 
946  .. graphviz::
947 
948  digraph G {
949  compound=true;
950  node [shape=record fontsize=10 fontname="Verdana"];
951  rankdir=LR;
952  tee ;
953  inputqueue ;
954  lal_gate ;
955  in [label="<src>"];
956  out [label="<return>"];
957  in -> tee -> inputqueue -> lal_gate -> out;
958  tee -> lal_gate;
959  }
960 
961  """
962  # FIXME someday explore a good bandpass filter
963  # src = pipeparts.mkaudiochebband(pipeline, src, low_frequency, high_frequency)
964  if control is None:
965  control = src = pipeparts.mktee(pipeline, src)
966  src = pipeparts.mkqueue(pipeline, src, max_size_time = Gst.SECOND, max_size_bytes = 0, max_size_buffers = 0)
967  return pipeparts.mkgate(pipeline, src, control = control, threshold = threshold, attack_length = -attack_length, hold_length = -hold_length, invert_control = True, **kwargs)
968 
969 # Unit tests
970 if __name__ == "__main__":
971  import doctest
972  doctest.testmod()
channel_dict
A dictionary of the requested channels, e.g., {"H1":"LDAS-STRAIN", "L1":"LDAS-STRAIN"}.
Definition: datasource.py:444
data_source
Data source, one of python.datasource.GWDataSourceInfo.data_sources.
Definition: datasource.py:521
frame_segments
Frame segments from a user defined file.
Definition: datasource.py:488
block_size
block size in bytes to read data from disk
Definition: datasource.py:519
framexmit_addr
A dictionary of framexmit addresses.
Definition: datasource.py:456
shm_part_dict
A dictionary for shared memory partition, e.g., {"H1": "LHO_Data", "H2": "LHO_Data", "L1": "LLO_Data", "V1": "VIRGO_Data"}.
Definition: datasource.py:447
def __init__(self, options)
See datasource.append_options()
Definition: datasource.py:419
state_channel_dict
DQ and state vector channel dictionary, e.g., { "H1": "LLD-DQ_VECTOR", "H2": "LLD-DQ_VECTOR","L1": "LLD-DQ_VECTOR", "V1": "LLD-DQ_VECTOR" }.
Definition: datasource.py:499
data_sources
A list of possible, valid data sources ("frames", "framexmit", "lvshm", "nds", "white", "silence", "AdvVirgo", "LIGO", "AdvLIGO")
Definition: datasource.py:425
state_vector_on_off_bits
Dictionary of state vector on, off bits like {"H1" : [0x7, 0x160], "H2" : [0x7, 0x160], "L1" : [0x7, 0x160], "V1" : [0x67, 0x100]}.
Definition: datasource.py:513
nds_channel_type
Store the ndssrc specific options: channel_type.
Definition: datasource.py:531
nds_port
Store the ndssrc specific options: port.
Definition: datasource.py:529
injection_filename
Injection file name.
Definition: datasource.py:523
shm_assumed_duration
options for shared memory
Definition: datasource.py:452
nds_host
Store the ndssrc specific options: host.
Definition: datasource.py:527