28 A file that contains the datasource module code 32 +-------------------------------------------------+------------------------------------------+------------+ 33 | Names | Hash | Date | 34 +=================================================+==========================================+============+ 35 | Florent, Sathya, Duncan Me., Jolien, Kipp, Chad | b3ef077fe87b597578000f140e4aa780f3a227aa | 2014-05-01 | 36 +-------------------------------------------------+------------------------------------------+------------+ 46 gi.require_version(
'Gst',
'1.0')
47 from gi.repository
import GObject
48 from gi.repository
import Gst
49 GObject.threads_init()
52 from gstlal
import bottle
53 from gstlal
import pipeparts
54 from glue.ligolw
import utils
as ligolw_utils
55 from glue.ligolw.utils
import segments
as ligolw_segments
56 from ligo
import segments
58 from lal
import LIGOTimeGPS
66 def channel_dict_from_channel_list(channel_list):
68 Given a list of channels, produce a dictionary keyed by ifo of channel names: 70 The list here typically comes from an option parser with options that 71 specify the "append" action. 75 >>> channel_dict_from_channel_list(["H1=LSC-STRAIN", "H2=SOMETHING-ELSE"]) 76 {'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'} 78 return dict(instrument_channel.split(
"=")
for instrument_channel
in channel_list)
80 def channel_dict_from_channel_list_with_node_range(channel_list):
82 Given a list of channels with a range of mass bins, produce a dictionary 83 keyed by ifo of channel names: 85 The list here typically comes from an option parser with options that 86 specify the "append" action. 90 >>> channel_dict_from_channel_list_with_node_range(["0000:0002:H1=LSC_STRAIN_1,L1=LSC_STRAIN_2", "0002:0004:H1=LSC_STRAIN_3,L1=LSC_STRAIN_4", "0004:0006:H1=LSC_STRAIN_5,L1=LSC_STRAIN_6"]) 91 {'0004': {'H1': 'LSC_STRAIN_5', 'L1': 'LSC_STRAIN_6'}, '0005': {'H1': 'LSC_STRAIN_5', 'L1': 'LSC_STRAIN_6'}, '0000': {'H1': 'LSC_STRAIN_1', 'L1': 'LSC_STRAIN_2'}, '0001': {'H1': 'LSC_STRAIN_1', 'L1': 'LSC_STRAIN_2'}, '0002': {'H1': 'LSC_STRAIN_3', 'L1': 'LSC_STRAIN_4'}, '0003': {'H1': 'LSC_STRAIN_3', 'L1': 'LSC_STRAIN_4'}} 94 for instrument_channel_full
in channel_list:
95 instrument_channel_split = instrument_channel_full.split(
':')
96 for ii
in range(int(instrument_channel_split[0]),int(instrument_channel_split[1])):
97 outdict[str(ii).zfill(4)] = dict((instrument_channel.split(
"="))
for instrument_channel
in instrument_channel_split[2].split(
','))
100 def pipeline_channel_list_from_channel_dict(channel_dict, ifos = None, opt = "channel-name"):
102 Creates a string of channel names options from a dictionary keyed by ifos. 104 FIXME: This function exists to work around pipeline.py's inability to 105 give the same option more than once by producing a string to pass as an argument 106 that encodes the other instances of the option. 108 - override --channel-name with a different option by setting opt. 109 - restrict the ifo keys to a subset of the channel_dict by 114 >>> pipeline_channel_list_from_channel_dict({'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}) 115 'H2=SOMETHING-ELSE --channel-name=H1=LSC-STRAIN ' 117 >>> pipeline_channel_list_from_channel_dict({'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}, ifos=["H1"]) 120 >>> pipeline_channel_list_from_channel_dict({'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}, opt="test-string") 121 'H2=SOMETHING-ELSE --test-string=H1=LSC-STRAIN ' 125 ifos = channel_dict.keys()
126 for i, ifo
in enumerate(ifos):
128 outstr +=
"%s=%s " % (ifo, channel_dict[ifo])
130 outstr +=
"--%s=%s=%s " % (opt, ifo, channel_dict[ifo])
134 def pipeline_channel_list_from_channel_dict_with_node_range(channel_dict, node = 0, ifos = None, opt = "channel-name"):
136 Creates a string of channel names options from a dictionary keyed by ifos. 138 FIXME: This function exists to work around pipeline.py's inability to 139 give the same option more than once by producing a string to pass as an argument 140 that encodes the other instances of the option. 142 - override --channel-name with a different option by setting opt. 143 - restrict the ifo keys to a subset of the channel_dict by. 148 >>> pipeline_channel_list_from_channel_dict_with_node_range({'0000': {'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}}, node=0) 149 'H2=SOMETHING-ELSE --channel-name=H1=LSC-STRAIN ' 151 >>> pipeline_channel_list_from_channel_dict_with_node_range({'0000': {'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}}, node=0, ifos=["H1"]) 154 >>> pipeline_channel_list_from_channel_dict_with_node_range({'0000': {'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}}, node=0, opt="test-string") 155 'H2=SOMETHING-ELSE --test-string=H1=LSC-STRAIN ' 158 node = str(node).zfill(4)
160 ifos = channel_dict[node].keys()
161 for i, ifo
in enumerate(ifos):
163 outstr +=
"%s=%s " % (ifo, channel_dict[node][ifo])
165 outstr +=
"--%s=%s=%s " % (opt, ifo, channel_dict[node][ifo])
169 def injection_dict_from_channel_list_with_node_range(injection_list):
171 Given a list of injection xml files with a range of mass bins, produce a 172 dictionary keyed by bin number: 174 The list here typically comes from an option parser with options that 175 specify the "append" action. 178 >>> injection_dict_from_channel_list_with_node_range(["0000:0002:Injection_1.xml", "0002:0004:Injection_2.xml"]) 179 {'0000': 'Injection_1.xml', '0001': 'Injection_1.xml', '0002': 'Injection_2.xml', '0003': 'Injection_2.xml'} 182 for injection_name
in injection_list:
183 injection_name_split = injection_name.split(
':')
184 for ii
in range(int(injection_name_split[0]),int(injection_name_split[1])):
185 outdict[str(ii).zfill(4)] = injection_name_split[2]
190 state_vector_on_off_dict = {
200 dq_vector_on_off_dict = {
208 def state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list, state_vector_on_off_dict = state_vector_on_off_dict):
210 Produce a dictionary (keyed by detector) of on / off bit tuples from a 211 list provided on the command line. 213 Takes default values from module level datasource.state_vector_on_off_dict 214 if state_vector_on_off_dict is not given 216 Inputs must be given as base 10 or 16 integers 220 >>> on_bit_list = ["V1=7", "H1=7", "L1=7"] 221 >>> off_bit_list = ["V1=256", "H1=352", "L1=352"] 222 >>> state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list) 223 {'H2': [7, 352], 'V1': [7, 256], 'H1': [7, 352], 'L1': [7, 352]} 225 >>> state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list,{}) 226 {'V1': [7, 256], 'H1': [7, 352], 'L1': [7, 352]} 228 >>> on_bit_list = ["V1=0x7", "H1=0x7", "L1=0x7"] 229 >>> off_bit_list = ["V1=0x256", "H1=0x352", "L1=0x352"] 230 >>> state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list,{}) 231 {'V1': [7, 598], 'H1': [7, 850], 'L1': [7, 850]} 233 for ifo, bits
in [line.strip().split(
"=", 1)
for line
in on_bit_list]:
234 bits = int(bits, 16)
if bits.startswith(
"0x")
else int(bits)
236 state_vector_on_off_dict[ifo][0] = bits
238 state_vector_on_off_dict[ifo] = [bits, 0]
240 for ifo, bits
in [line.strip().split(
"=", 1)
for line
in off_bit_list]:
241 bits = int(bits, 16)
if bits.startswith(
"0x")
else int(bits)
243 state_vector_on_off_dict[ifo][1] = bits
245 return state_vector_on_off_dict
248 def state_vector_on_off_list_from_bits_dict(bit_dict):
250 Produce a tuple of useful command lines from a dictionary of on / off state 251 vector bits keyed by detector 253 FIXME: This function exists to work around pipeline.py's inability to 254 give the same option more than once by producing a string to pass as an argument 255 that encodes the other instances of the option. 259 >>> state_vector_on_off_dict = {"H1":[0x7, 0x160], "H2":[0x7, 0x160], "L1":[0x7, 0x160], "V1":[0x67, 0x100]} 260 >>> state_vector_on_off_list_from_bits_dict(state_vector_on_off_dict) 261 ('H2=7 --state-vector-on-bits=V1=103 --state-vector-on-bits=H1=7 --state-vector-on-bits=L1=7 ', 'H2=352 --state-vector-off-bits=V1=256 --state-vector-off-bits=H1=352 --state-vector-off-bits=L1=352 ') 266 for i, ifo
in enumerate(bit_dict):
268 onstr +=
"%s=%s " % (ifo, bit_dict[ifo][0])
269 offstr +=
"%s=%s " % (ifo, bit_dict[ifo][1])
271 onstr +=
"--state-vector-on-bits=%s=%s " % (ifo, bit_dict[ifo][0])
272 offstr +=
"--state-vector-off-bits=%s=%s " % (ifo, bit_dict[ifo][1])
288 "H1": (
"224.3.2.1", 7096),
289 "L1": (
"224.3.2.2", 7097),
290 "V1": (
"224.3.2.3", 7098),
295 def framexmit_dict_from_framexmit_list(framexmit_list):
297 Given a list of framexmit addresses with ports, produce a dictionary keyed by ifo: 299 The list here typically comes from an option parser with options that 300 specify the "append" action. 304 >>> framexmit_dict_from_framexmit_list(["H1=224.3.2.1:7096", "L1=224.3.2.2:7097", "V1=224.3.2.3:7098"]) 305 {'V1': ('224.3.2.3', 7098), 'H1': ('224.3.2.1', 7096), 'L1': ('224.3.2.2', 7097)} 308 for instrument_addr
in framexmit_list:
309 ifo, addr_port = instrument_addr.split(
"=")
310 addr, port = addr_port.split(
':')
311 out.append((ifo, (addr, int(port))))
315 def framexmit_list_from_framexmit_dict(framexmit_dict, ifos = None, opt = "framexmit-addr"):
317 Creates a string of framexmit address options from a dictionary keyed by ifos. 321 >>> framexmit_list_from_framexmit_dict({'V1': ('224.3.2.3', 7098), 'H1': ('224.3.2.1', 7096), 'L1': ('224.3.2.2', 7097)}) 322 'V1=224.3.2.3:7098 --framexmit-addr=H1=224.3.2.1:7096 --framexmit-addr=L1=224.3.2.2:7097 ' 326 ifos = framexmit_dict.keys()
327 for i, ifo
in enumerate(ifos):
329 outstr +=
"%s=%s:%s " % (ifo, framexmit_dict[ifo][0], framexmit_dict[ifo][1])
331 outstr +=
"--%s=%s=%s:%s " % (opt, ifo, framexmit_dict[ifo][0], framexmit_dict[ifo][1])
336 def pipeline_seek_for_gps(pipeline, gps_start_time, gps_end_time, flags = Gst.SeekFlags.FLUSH):
338 Create a new seek event, i.e., Gst.Event.new_seek() for a given 339 gps_start_time and gps_end_time, with optional flags. 341 @param gps_start_time start time as LIGOTimeGPS, double or float 342 @param gps_end_time start time as LIGOTimeGPS, double or float 344 def seek_args_for_gps(gps_time):
346 Convenience routine to convert a GPS time to a seek type and a 350 if gps_time
is None or gps_time == -1:
351 return (Gst.SeekType.NONE, -1)
352 elif hasattr(gps_time,
'ns'):
353 return (Gst.SeekType.SET, gps_time.ns())
355 return (Gst.SeekType.SET, long(float(gps_time) * Gst.SECOND))
357 start_type, start_time = seek_args_for_gps(gps_start_time)
358 stop_type, stop_time = seek_args_for_gps(gps_end_time)
407 if pipeline.current_state != Gst.State.READY:
408 raise ValueError(
"pipeline must be in READY state")
410 for elem
in pipeline.iterate_sources():
411 elem.seek(1.0, Gst.Format(Gst.Format.TIME), flags, start_type, start_time, stop_type, stop_time)
416 Hold the data associated with data source command lines. 421 Initialize a GWDataSourceInfo class instance from command line options specified by append_options() 425 self.
data_sources = set((
"frames",
"framexmit",
"lvshm",
"nds",
"white",
"silence",
"AdvVirgo",
"LIGO",
"AdvLIGO"))
431 raise ValueError(
"--data-source must be one of %s" %
", ".join(self.
data_sources))
432 if options.data_source ==
"frames" and options.frame_cache
is None:
433 raise ValueError(
"--frame-cache must be specified when using --data-source=frames")
434 if not options.channel_name:
435 raise ValueError(
"must specify at least one channel in the form --channel-name=IFO=CHANNEL-NAME")
436 if options.frame_segments_file
is not None and options.data_source !=
"frames":
437 raise ValueError(
"can only give --frame-segments-file if --data-source=frames")
438 if options.frame_segments_name
is not None and options.frame_segments_file
is None:
439 raise ValueError(
"can only specify --frame-segments-name if --frame-segments-file is given")
440 if options.data_source ==
"nds" and (options.nds_host
is None or options.nds_port
is None):
441 raise ValueError(
"must specify --nds-host and --nds-port when using --data-source=nds")
444 self.
channel_dict = channel_dict_from_channel_list(options.channel_name)
447 self.
shm_part_dict = {
"H1":
"LHO_Data",
"L1":
"LLO_Data",
"V1":
"VIRGO_Data"}
448 if options.shared_memory_partition
is not None:
449 self.
shm_part_dict.update( channel_dict_from_channel_list(options.shared_memory_partition) )
457 if options.framexmit_addr
is not None:
458 self.
framexmit_addr.update( framexmit_dict_from_framexmit_list(options.framexmit_addr) )
464 if options.gps_start_time
is not None:
465 if options.gps_end_time
is None:
466 raise ValueError(
"must provide both --gps-start-time and --gps-end-time")
468 raise ValueError(
"cannot set --gps-start-time or --gps-end-time with %s" %
" or ".join(
"--data-source=%s" % src
for src
in sorted(self.
live_sources)))
470 start = LIGOTimeGPS(options.gps_start_time)
472 raise ValueError(
"invalid --gps-start-time '%s'" % options.gps_start_time)
474 end = LIGOTimeGPS(options.gps_end_time)
476 raise ValueError(
"invalid --gps-end-time '%s'" % options.gps_end_time)
478 raise ValueError(
"--gps-start-time must be < --gps-end-time: %s < %s" % (options.gps_start_time, options.gps_end_time))
480 self.
seg = segments.segment(LIGOTimeGPS(options.gps_start_time), LIGOTimeGPS(options.gps_end_time))
481 elif options.gps_end_time
is not None:
482 raise ValueError(
"must provide both --gps-start-time and --gps-end-time")
484 raise ValueError(
"--gps-start-time and --gps-end-time must be specified when --data-source not one of %s" %
", ".join(sorted(self.
live_sources)))
486 if options.frame_segments_file
is not None:
488 self.
frame_segments = ligolw_segments.segmenttable_get_by_name(ligolw_utils.load_filename(options.frame_segments_file, contenthandler=ligolw_segments.LIGOLWContentHandler), options.frame_segments_name).coalesce()
489 if self.
seg is not None:
493 self.
frame_segments = segments.segmentlistdict((instrument, seglist & segments.segmentlist([self.
seg]))
for instrument, seglist
in self.
frame_segments.items())
499 self.
state_channel_dict = {
"H1":
"LLD-DQ_VECTOR",
"H2":
"LLD-DQ_VECTOR",
"L1":
"LLD-DQ_VECTOR",
"V1":
"LLD-DQ_VECTOR" }
500 self.
dq_channel_dict = {
"H1":
"DMT-DQ_VECTOR",
"H2":
"DMT-DQ_VECTOR",
"L1":
"DMT-DQ_VECTOR",
"V1":
"DMT-DQ_VECTOR" }
502 if options.state_channel_name
is not None:
503 state_channel_dict_from_options = channel_dict_from_channel_list( options.state_channel_name )
504 instrument = state_channel_dict_from_options.keys()[0]
507 if options.dq_channel_name
is not None:
508 dq_channel_dict_from_options = channel_dict_from_channel_list( options.dq_channel_name )
509 instrument = dq_channel_dict_from_options.keys()[0]
513 self.
state_vector_on_off_bits = state_vector_on_off_dict_from_bit_lists(options.state_vector_on_bits, options.state_vector_off_bits, state_vector_on_off_dict)
514 self.
dq_vector_on_off_bits = state_vector_on_off_dict_from_bit_lists(options.dq_vector_on_bits, options.dq_vector_off_bits, dq_vector_on_off_dict)
525 if options.data_source ==
"nds":
534 def append_options(parser):
536 Append generic data source options to an OptionParser object in order 537 to have consistent an unified command lines and parsing throughout the project 538 for applications that read GW data. 540 - --data-source [string] 541 Set the data source from [frames|framexmit|lvshm|nds|silence|white|AdvVirgo|LIGO|AdvLIGO]. 543 - --block-size [int] (bytes) 544 Data block size to read in bytes. Default 16384 * 8 * 512 which is 512 seconds of double 545 precision data at 16384 Hz. This parameter is only used if --data-source is one of 546 white, silence, AdvVirgo, LIGO, AdvLIGO, nds. 548 - --frame-cache [filename] 549 Set the name of the LAL cache listing the LIGO-Virgo .gwf frame files (optional). 550 This is required iff --data-sourceframes 552 - --gps-start-time [int] (seconds) 553 Set the start time of the segment to analyze in GPS seconds. 554 Required unless --data-source is lvshm or framexmit 556 - --gps-end-time [int] (seconds) 557 Set the end time of the segment to analyze in GPS seconds. 558 Required unless --data-source in lvshm,framexmit 560 - --injections [filename] 561 Set the name of the LIGO light-weight XML file from which to load injections (optional). 563 - --channel-name [string] 564 Set the name of the channels to process. 565 Can be given multiple times as --channel-name=IFO=CHANNEL-NAME 567 - --nds-host [hostname] 568 Set the remote host or IP address that serves nds data. 569 This is required iff --data-source is nds 571 - --nds-port [portnumber] 572 Set the port of the remote host that serves nds data, default = 31200. 573 This is required iff --data-source is nds 575 - --nds-channel-type [string] type 576 FIXME please document 578 - --framexmit-addr [string] 579 Set the address of the framexmit service. Can be given 580 multiple times as --framexmit-addr=IFO=xxx.xxx.xxx.xxx:port 582 - --framexmit-iface [string] 583 Set the address of the framexmit interface. 585 - --state-channel-name [string] 586 Set the name of the state vector channel. 587 This channel will be used to control the flow of data via the on/off bits. 588 Can be given multiple times as --state-channel-name=IFO=STATE-CHANNEL-NAME 590 - --dq-channel-name [string] 591 Set the name of the data quality channel. 592 This channel will be used to control the flow of data via the on/off bits. 593 Can be given multiple times as --state-channel-name=IFO=DQ-CHANNEL-NAME 595 - --shared-memory-partition [string] 596 Set the name of the shared memory partition for a given instrument. 597 Can be given multiple times as --shared-memory-partition=IFO=PARTITION-NAME 599 - --shared-memory-assumed-duration [int] 600 Set the assumed span of files in seconds. Default = 4 seconds. 602 - --shared-memory-block-size [int] 603 Set the byte size to read per buffer. Default = 4096 bytes. 605 - --frame-segments-file [filename] 606 Set the name of the LIGO light-weight XML file from which to load frame segments. 607 Optional iff --data-source is frames 609 - --frame-segments-name [string] 610 Set the name of the segments to extract from the segment tables. 611 Required iff --frame-segments-file is given 613 - --state-vector-on-bits [hex] 614 Set the state vector on bits to process (optional). 615 The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times. 616 Only currently has meaning for online (lvshm, framexmit) data 618 - --state-vector-off-bits [hex] 619 Set the state vector off bits to process (optional). 620 The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times. 621 Only currently has meaning for online (lvshm, framexmit) data 623 - --dq-vector-on-bits [hex] 624 Set the state vector on bits to process (optional). 625 The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times. 626 Only currently has meaning for online (lvshm, framexmit) data 628 - --dq-vector-off-bits [hex] 629 Set the dq vector off bits to process (optional). 630 The default is 0x0 for all detectors. Override with IFO=bits can be given multiple times. 631 Only currently has meaning for online (lvshm, framexmit) data 633 **Typical usage case examples** 635 1. Reading data from frames:: 637 --data-source=frames --gps-start-time=999999000 --gps-end-time=999999999 \\ 638 --channel-name=H1=LDAS-STRAIN --frame-segments-file=segs.xml \\ 639 --frame-segments-name=datasegments 641 2. Reading data from a fake LIGO source:: 643 --data-source=LIGO --gps-start-time=999999000 --gps-end-time=999999999 \\ 644 --channel-name=H1=FAIKE-STRAIN 646 3. Reading online data via framexmit:: 648 --data-source=framexmit --channel-name=H1=FAIKE-STRAIN 650 4. Many other combinations possible, please add some! 652 group = optparse.OptionGroup(parser,
"Data source options",
"Use these options to set up the appropriate data source")
653 group.add_option(
"--data-source", metavar =
"source", help =
"Set the data source from [frames|framexmit|lvshm|nds|silence|white|AdvVirgo|LIGO|AdvLIGO]. Required.")
654 group.add_option(
"--block-size", type=
"int", metavar =
"bytes", default = 16384 * 8 * 512, help =
"Data block size to read in bytes. Default 16384 * 8 * 512 (512 seconds of double precision data at 16384 Hz. This parameter is only used if --data-source is one of white, silence, AdvVirgo, LIGO, AdvLIGO, nds.")
655 group.add_option(
"--frame-cache", metavar =
"filename", help =
"Set the name of the LAL cache listing the LIGO-Virgo .gwf frame files (optional). This is required iff --data-source=frames")
656 group.add_option(
"--gps-start-time", metavar =
"seconds", help =
"Set the start time of the segment to analyze in GPS seconds. Required unless --data-source=lvshm")
657 group.add_option(
"--gps-end-time", metavar =
"seconds", help =
"Set the end time of the segment to analyze in GPS seconds. Required unless --data-source=lvshm")
658 group.add_option(
"--injections", metavar =
"filename", help =
"Set the name of the LIGO light-weight XML file from which to load injections (optional).")
659 group.add_option(
"--channel-name", metavar =
"name", action =
"append", help =
"Set the name of the channels to process. Can be given multiple times as --channel-name=IFO=CHANNEL-NAME")
660 group.add_option(
"--nds-host", metavar =
"hostname", help =
"Set the remote host or IP address that serves nds data. This is required iff --data-source=nds")
661 group.add_option(
"--nds-port", metavar =
"portnumber", type=int, default=31200, help =
"Set the port of the remote host that serves nds data. This is required iff --data-source=nds")
662 group.add_option(
"--nds-channel-type", metavar =
"type", default =
"online", help =
"Set the port of the remote host that serves nds data. This is required only if --data-source=nds. default==online")
663 group.add_option(
"--framexmit-addr", metavar =
"name", action =
"append", help =
"Set the address of the framexmit service. Can be given multiple times as --framexmit-addr=IFO=xxx.xxx.xxx.xxx:port")
664 group.add_option(
"--framexmit-iface", metavar =
"name", help =
"Set the multicast interface address of the framexmit service.")
665 group.add_option(
"--state-channel-name", metavar =
"name", action =
"append", help =
"Set the name of the state vector channel. This channel will be used to control the flow of data via the on/off bits. Can be given multiple times as --channel-name=IFO=CHANNEL-NAME")
666 group.add_option(
"--dq-channel-name", metavar =
"name", action =
"append", help =
"Set the name of the data quality channel. This channel will be used to control the flow of data via the on/off bits. Can be given multiple times as --channel-name=IFO=CHANNEL-NAME")
667 group.add_option(
"--shared-memory-partition", metavar =
"name", action =
"append", help =
"Set the name of the shared memory partition for a given instrument. Can be given multiple times as --shared-memory-partition=IFO=PARTITION-NAME")
668 group.add_option(
"--shared-memory-assumed-duration", type =
"int", default = 4, help =
"Set the assumed span of files in seconds. Default = 4.")
669 group.add_option(
"--shared-memory-block-size", type =
"int", default = 4096, help =
"Set the byte size to read per buffer. Default = 4096.")
670 group.add_option(
"--frame-segments-file", metavar =
"filename", help =
"Set the name of the LIGO light-weight XML file from which to load frame segments. Optional iff --data-source=frames")
671 group.add_option(
"--frame-segments-name", metavar =
"name", help =
"Set the name of the segments to extract from the segment tables. Required iff --frame-segments-file is given")
672 group.add_option(
"--state-vector-on-bits", metavar =
"bits", default = [], action =
"append", help =
"Set the state vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm) data.")
673 group.add_option(
"--state-vector-off-bits", metavar =
"bits", default = [], action =
"append", help =
"Set the state vector off bits to process (optional). The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm) data.")
674 group.add_option(
"--dq-vector-on-bits", metavar =
"bits", default = [], action =
"append", help =
"Set the DQ vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm) data.")
675 group.add_option(
"--dq-vector-off-bits", metavar =
"bits", default = [], action =
"append", help =
"Set the DQ vector off bits to process (optional). The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm) data.")
676 parser.add_option_group(group)
679 def mksegmentsrcgate(pipeline, src, segment_list, invert_output = False, rate = 1, **kwargs):
681 Takes a segment list and produces a gate driven by it. Hook up your own input and output. 683 @param kwargs passed through to pipeparts.mkgate(), e.g., used to set the gate's name. 685 Gstreamer graph describing this function: 691 node [shape=record fontsize=10 fontname="Verdana"]; 696 out [label="<return value>"]; 697 in -> lal_gate -> out; 698 lal_segmentsrc -> lal_gate; 702 return pipeparts.mkgate(pipeline, src, threshold = 1, control = pipeparts.mkcapsfilter(pipeline, pipeparts.mksegmentsrc(pipeline, segment_list, invert_output = invert_output), caps =
"audio/x-raw, rate=%d" % rate), **kwargs)
705 def mkbasicsrc(pipeline, gw_data_source_info, instrument, verbose = False):
707 All the conditionals and stupid pet tricks for reading real or 708 simulated h(t) data in one place. 710 Consult the append_options() function and the GWDataSourceInfo class 712 This src in general supports only one instrument although 713 GWDataSourceInfo contains dictionaries of multi-instrument things. By 714 specifying the instrument when calling this function you will get ony a single 715 instrument source. A code wishing to have multiple basicsrcs will need to call 716 this function for each instrument. 724 node [shape=record fontsize=10 fontname="Verdana"]; 725 subgraph clusterfakesrc { 726 fake_0 [label="fakesrc: white, silence, AdvVirgo, LIGO, AdvLIGO"]; 728 label="Possible path #1"; 730 subgraph clusterframes { 732 frames_0 [label="lalcachesrc: frames"]; 733 frames_1 [label ="framecppchanneldemux"]; 734 frames_2 [label ="queue"]; 735 frames_3 [label ="gate (if user provides segments)", style=filled, color=lightgrey]; 736 frames_4 [label ="audiorate"]; 737 frames_0 -> frames_1 -> frames_2 -> frames_3 ->frames_4; 738 label="Possible path #2"; 740 subgraph clusteronline { 742 online_0 [label="lvshmsrc|framexmit"]; 743 online_1 [label ="framecppchanneldemux"]; 744 online_2a [label ="strain queue"]; 745 online_2b [label ="statevector queue"]; 746 online_3 [label ="statevector"]; 747 online_4 [label ="gate"]; 748 online_5 [label ="audiorate"]; 749 online_6 [label ="queue"]; 750 online_0 -> online_1; 751 online_1 -> online_2a; 752 online_1 -> online_2b; 753 online_2b -> online_3; 754 online_2a -> online_4; 755 online_3 -> online_4 -> online_5 -> online_6; 756 label="Possible path #3"; 758 subgraph clusternds { 759 nds_0 [label="ndssrc"]; 761 label="Possible path #4"; 763 audioconv [label="audioconvert"]; 764 progress [label="progressreport (if verbose)", style=filled, color=lightgrey]; 765 sim [label="lalsimulation (if injections requested)", style=filled, color=lightgrey]; 766 queue [label="queue (if injections requested)", style=filled, color=lightgrey]; 769 fake_0 -> audioconv [ltail=clusterfakesrc]; 770 frames_4 -> audioconv [ltail=clusterframes]; 771 online_6 -> audioconv [ltail=clusteronline]; 772 nds_0 -> audioconv [ltail=clusternds]; 773 audioconv -> progress -> sim -> queue -> "?"; 777 statevector = dqvector =
None 782 if gw_data_source_info.data_source ==
"white":
783 src = pipeparts.mkfakesrc(pipeline, instrument, gw_data_source_info.channel_dict[instrument], blocksize = gw_data_source_info.block_size, volume = 1.0, timestamp_offset = int(gw_data_source_info.seg[0]) * Gst.SECOND)
784 elif gw_data_source_info.data_source ==
"silence":
785 src = pipeparts.mkfakesrc(pipeline, instrument, gw_data_source_info.channel_dict[instrument], blocksize = gw_data_source_info.block_size, wave = 4, timestamp_offset = int(gw_data_source_info.seg[0]) * Gst.SECOND)
786 elif gw_data_source_info.data_source ==
"LIGO":
787 src = pipeparts.mkfakeLIGOsrc(pipeline, instrument = instrument, channel_name = gw_data_source_info.channel_dict[instrument], blocksize = gw_data_source_info.block_size)
788 elif gw_data_source_info.data_source ==
"AdvLIGO":
789 src = pipeparts.mkfakeadvLIGOsrc(pipeline, instrument = instrument, channel_name = gw_data_source_info.channel_dict[instrument], blocksize = gw_data_source_info.block_size)
790 elif gw_data_source_info.data_source ==
"AdvVirgo":
791 src = pipeparts.mkfakeadvvirgosrc(pipeline, instrument = instrument, channel_name = gw_data_source_info.channel_dict[instrument], blocksize = gw_data_source_info.block_size)
792 elif gw_data_source_info.data_source ==
"frames":
793 if instrument ==
"V1":
797 src = pipeparts.mklalcachesrc(pipeline, location = gw_data_source_info.frame_cache, cache_src_regex =
"V")
799 src = pipeparts.mklalcachesrc(pipeline, location = gw_data_source_info.frame_cache, cache_src_regex = instrument[0], cache_dsc_regex = instrument)
800 demux = pipeparts.mkframecppchanneldemux(pipeline, src, do_file_checksum =
False, channel_list = map(
"%s:%s".__mod__, gw_data_source_info.channel_dict.items()))
804 src = pipeparts.mkqueue(pipeline,
None, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 8 * Gst.SECOND)
805 pipeparts.src_deferred_link(demux,
"%s:%s" % (instrument, gw_data_source_info.channel_dict[instrument]), src.get_static_pad(
"sink"))
806 if gw_data_source_info.frame_segments[instrument]
is not None:
811 src = pipeparts.mkgate(pipeline, src, threshold = 1, control = pipeparts.mksegmentsrc(pipeline, gw_data_source_info.frame_segments[instrument]), name =
"%s_frame_segments_gate" % instrument)
812 pipeparts.framecpp_channeldemux_check_segments.set_probe(src.get_static_pad(
"src"), gw_data_source_info.frame_segments[instrument])
814 src = pipeparts.mkaudiorate(pipeline, src, skip_to_first =
True, silent =
False)
815 elif gw_data_source_info.data_source
in (
"framexmit",
"lvshm"):
817 state_vector_on_bits, state_vector_off_bits = gw_data_source_info.state_vector_on_off_bits[instrument]
818 dq_vector_on_bits, dq_vector_off_bits = gw_data_source_info.dq_vector_on_off_bits[instrument]
820 if gw_data_source_info.data_source ==
"lvshm":
823 src = pipeparts.mklvshmsrc(pipeline, shm_name = gw_data_source_info.shm_part_dict[instrument], assumed_duration = gw_data_source_info.shm_assumed_duration, blocksize = gw_data_source_info.shm_block_size, wait_time = 120)
824 elif gw_data_source_info.data_source ==
"framexmit":
825 src = pipeparts.mkframexmitsrc(pipeline, multicast_iface = gw_data_source_info.framexmit_iface, multicast_group = gw_data_source_info.framexmit_addr[instrument][0], port = gw_data_source_info.framexmit_addr[instrument][1], wait_time = 120)
828 raise ValueError(gw_data_source_info.data_source)
831 src = pipeparts.mkqueue(pipeline, src, max_size_buffers = 0, max_size_bytes = 0, max_size_time = Gst.SECOND * 60 * 10)
832 src = pipeparts.mkframecppchanneldemux(pipeline, src, do_file_checksum =
False, skip_bad_files =
True)
836 statevector = pipeparts.mkstatevector(pipeline,
None, required_on = state_vector_on_bits, required_off = state_vector_off_bits, name =
"%s_state_vector" % instrument)
837 dqvector = pipeparts.mkstatevector(pipeline,
None, required_on = dq_vector_on_bits, required_off = dq_vector_off_bits, name =
"%s_dq_vector" % instrument)
838 pipeparts.src_deferred_link(src,
"%s:%s" % (instrument, gw_data_source_info.state_channel_dict[instrument]), statevector.get_static_pad(
"sink"))
839 pipeparts.src_deferred_link(src,
"%s:%s" % (instrument, gw_data_source_info.dq_channel_dict[instrument]), dqvector.get_static_pad(
"sink"))
840 @bottle.route(
"/%s/statevector_on.txt" % instrument)
841 def state_vector_state(elem = statevector):
842 t = float(lal.UTCToGPS(time.gmtime()))
843 on = elem.get_property(
"on-samples")
844 return "%.9f %d" % (t, on)
845 @bottle.route(
"/%s/statevector_off.txt" % instrument)
846 def state_vector_state(elem = statevector):
847 t = float(lal.UTCToGPS(time.gmtime()))
848 off = elem.get_property(
"off-samples")
849 return "%.9f %d" % (t, off)
850 @bottle.route(
"/%s/statevector_gap.txt" % instrument)
851 def state_vector_state(elem = statevector):
852 t = float(lal.UTCToGPS(time.gmtime()))
853 gap = elem.get_property(
"gap-samples")
854 return "%.9f %d" % (t, gap)
855 @bottle.route(
"/%s/dqvector_on.txt" % instrument)
856 def dq_vector_state(elem = dqvector):
857 t = float(lal.UTCToGPS(time.gmtime()))
858 on = elem.get_property(
"on-samples")
859 return "%.9f %d" % (t, on)
860 @bottle.route(
"/%s/dqvector_off.txt" % instrument)
861 def dq_vector_state(elem = dqvector):
862 t = float(lal.UTCToGPS(time.gmtime()))
863 off = elem.get_property(
"off-samples")
864 return "%.9f %d" % (t, off)
865 @bottle.route(
"/%s/dqvector_gap.txt" % instrument)
866 def dq_vector_state(elem = dqvector):
867 t = float(lal.UTCToGPS(time.gmtime()))
868 gap = elem.get_property(
"gap-samples")
869 return "%.9f %d" % (t, gap)
872 strain = pipeparts.mkqueue(pipeline,
None, max_size_buffers = 1, max_size_bytes = 0, max_size_time = 0)
873 pipeparts.src_deferred_link(src,
"%s:%s" % (instrument, gw_data_source_info.channel_dict[instrument]), strain.get_static_pad(
"sink"))
877 statevector = pipeparts.mkaudiorate(pipeline, statevector, skip_to_first =
True, silent =
False)
878 dqvector = pipeparts.mkaudiorate(pipeline, dqvector, skip_to_first =
True, silent =
False)
879 src = pipeparts.mkaudiorate(pipeline, strain, skip_to_first =
True, silent =
False)
880 @bottle.route(
"/%s/strain_add_drop.txt" % instrument)
882 def strain_add(elem = src, rate = 16384):
883 t = float(lal.UTCToGPS(time.gmtime()))
884 add = elem.get_property(
"add")
885 drop = elem.get_property(
"drop")
886 return "%.9f %d %d" % (t, add // rate, drop // rate)
895 statevector = pipeparts.mktee(pipeline, statevector)
896 dqvector = pipeparts.mktee(pipeline, dqvector)
897 src = pipeparts.mkgate(pipeline, src, threshold = 1, control = pipeparts.mkqueue(pipeline, statevector, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 0), default_state =
False, name =
"%s_state_vector_gate" % instrument)
898 src = pipeparts.mkgate(pipeline, src, threshold = 1, control = pipeparts.mkqueue(pipeline, dqvector, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 0), default_state =
False, name =
"%s_dq_vector_gate" % instrument)
899 elif gw_data_source_info.data_source ==
"nds":
900 src = pipeparts.mkndssrc(pipeline, gw_data_source_info.nds_host, instrument, gw_data_source_info.channel_dict[instrument], gw_data_source_info.nds_channel_type, blocksize = gw_data_source_info.block_size, port = gw_data_source_info.nds_port)
902 raise ValueError(
"invalid data_source: %s" % gw_data_source_info.data_source)
909 src = pipeparts.mkaudioconvert(pipeline, src)
916 src = pipeparts.mkprogressreport(pipeline, src,
"progress_src_%s" % instrument)
922 if gw_data_source_info.injection_filename
is not None:
923 src = pipeparts.mkinjections(pipeline, src, gw_data_source_info.injection_filename)
926 src = pipeparts.mkqueue(pipeline, src, max_size_bytes = 0, max_size_buffers = 0, max_size_time = Gst.SECOND * 64)
932 return src, statevector, dqvector
935 def mkhtgate(pipeline, src, control = None, threshold = 8.0, attack_length = 128, hold_length = 128, **kwargs):
937 A convenience function to provide thresholds on input data. This can 938 be used to remove large spikes / glitches etc. Of course you can use it for 939 other stuff by plugging whatever you want as input and ouput 941 NOTE: the queues constructed by this code assume the attack and 942 hold lengths combined are less than 1 second in duration. 950 node [shape=record fontsize=10 fontname="Verdana"]; 956 out [label="<return>"]; 957 in -> tee -> inputqueue -> lal_gate -> out; 965 control = src = pipeparts.mktee(pipeline, src)
966 src = pipeparts.mkqueue(pipeline, src, max_size_time = Gst.SECOND, max_size_bytes = 0, max_size_buffers = 0)
967 return pipeparts.mkgate(pipeline, src, control = control, threshold = threshold, attack_length = -attack_length, hold_length = -hold_length, invert_control =
True, **kwargs)
970 if __name__ ==
"__main__":
channel_dict
A dictionary of the requested channels, e.g., {"H1":"LDAS-STRAIN", "L1":"LDAS-STRAIN"}.
data_source
Data source, one of python.datasource.GWDataSourceInfo.data_sources.
frame_segments
Frame segments from a user defined file.
block_size
block size in bytes to read data from disk
framexmit_addr
A dictionary of framexmit addresses.
shm_part_dict
A dictionary for shared memory partition, e.g., {"H1": "LHO_Data", "H2": "LHO_Data", "L1": "LLO_Data", "V1": "VIRGO_Data"}.
def __init__(self, options)
See datasource.append_options()
state_channel_dict
DQ and state vector channel dictionary, e.g., { "H1": "LLD-DQ_VECTOR", "H2": "LLD-DQ_VECTOR","L1": "LLD-DQ_VECTOR", "V1": "LLD-DQ_VECTOR" }.
data_sources
A list of possible, valid data sources ("frames", "framexmit", "lvshm", "nds", "white", "silence", "AdvVirgo", "LIGO", "AdvLIGO")
frame_cache
frame cache file
state_vector_on_off_bits
Dictionary of state vector on, off bits like {"H1" : [0x7, 0x160], "H2" : [0x7, 0x160], "L1" : [0x7, 0x160], "V1" : [0x67, 0x100]}.
nds_channel_type
Store the ndssrc specific options: channel_type.
A class that manages the task of watching for and connecting to new source pads by name...
nds_port
Store the ndssrc specific options: port.
injection_filename
Injection file name.
shm_assumed_duration
options for shared memory
nds_host
Store the ndssrc specific options: host.