gstlal  1.4.1
gstlal_fake_frames_pipe
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # Copyright (C) 2011 Chad Hanna
4 #
5 # This program is free software; you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by the
7 # Free Software Foundation; either version 2 of the License, or (at your
8 # option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
13 # Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License along
16 # with this program; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 
19 ## @file
20 # An HTCondor DAG generator to recolor frame data
21 
22 """
23 This program makes a dag to recolor frames
24 """
25 
26 __author__ = 'Chad Hanna <chad.hanna@ligo.org>'
27 
28 
29 ##############################################################################
30 # import standard modules
31 import sys, os, copy, math
32 from optparse import OptionParser
33 import subprocess, socket, tempfile
34 
35 ##############################################################################
36 # import the modules we need to build the pipeline
37 from glue import pipeline
38 from ligo import segments
39 import glue.ligolw.utils as ligolw_utils
40 import glue.ligolw.utils.segments as ligolw_segments
41 from gstlal import datasource
42 from gstlal import dagparts
43 from lal import series as lalseries
44 from lal.utils import CacheEntry
45 
46 #
47 # Classes for generating reference psds
48 #
49 
50 class gstlal_reference_psd_job(pipeline.CondorDAGJob):
51  """
52  A gstlal_reference_psd job
53  """
54  def __init__(self, group, user, executable=dagparts.which('gstlal_reference_psd'), tag_base='gstlal_reference_psd'):
55  """
56  """
57  self.__prog__ = 'gstlal_reference_psd'
58  self.__executable = executable
59  self.__universe = 'vanilla'
60  pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable)
61  self.add_condor_cmd('getenv','True')
62  self.add_condor_cmd('requirements', 'Memory > 1999') #FIXME is this enough?
63  self.tag_base = tag_base
64  self.add_condor_cmd('environment',"KMP_LIBRARY=serial;MKL_SERIAL=yes")
65  self.set_sub_file(tag_base+'.sub')
66  if group is not None:
67  self.add_condor_cmd('accounting_group', group)
68  if user is not None:
69  self.add_condor_cmd('accounting_group_user', user)
70  self.set_stdout_file('logs/'+tag_base+'-$(macroid)-$(process).out')
71  self.set_stderr_file('logs/'+tag_base+'-$(macroid)-$(process).err')
72 
73 
74 class gstlal_median_psd_job(pipeline.CondorDAGJob):
75  """
76  A gstlal_median_psd job
77  """
78  def __init__(self, group, user, executable=dagparts.which('gstlal_median_of_psds'), tag_base='gstlal_median_of_psds'):
79  """
80  """
81  self.__prog__ = 'gstlal_median_of_psds'
82  self.__executable = executable
83  self.__universe = 'vanilla'
84  pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable)
85  self.add_condor_cmd('getenv','True')
86  self.tag_base = tag_base
87  self.add_condor_cmd('environment',"KMP_LIBRARY=serial;MKL_SERIAL=yes")
88  self.set_sub_file(tag_base+'.sub')
89  if group is not None:
90  self.add_condor_cmd('accounting_group', group)
91  if user is not None:
92  self.add_condor_cmd('accounting_group_user', user)
93  self.set_stdout_file('logs/'+tag_base+'-$(macroid)-$(process).out')
94  self.set_stderr_file('logs/'+tag_base+'-$(macroid)-$(process).err')
95 
96 
97 class gstlal_smooth_reference_psd_job(pipeline.CondorDAGJob):
98  """
99  A gstlal_smooth_reference_psd job
100  """
101  def __init__(self, group, user, executable=dagparts.which('gstlal_psd_polyfit'), tag_base='gstlal_psd_polyfit'):
102  """
103  """
104  self.__prog__ = 'gstlal_psd_polyfit'
105  self.__executable = executable
106  self.__universe = 'vanilla'
107  pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable)
108  self.add_condor_cmd('getenv','True')
109  self.tag_base = tag_base
110  self.add_condor_cmd('environment',"KMP_LIBRARY=serial;MKL_SERIAL=yes")
111  self.set_sub_file(tag_base+'.sub')
112  if group is not None:
113  self.add_condor_cmd('accounting_group', group)
114  if user is not None:
115  self.add_condor_cmd('accounting_group_user', user)
116  self.set_stdout_file('logs/'+tag_base+'-$(macroid)-$(process).out')
117  self.set_stderr_file('logs/'+tag_base+'-$(macroid)-$(process).err')
118 
119 
120 class gstlal_reference_psd_node(pipeline.CondorDAGNode):
121  """
122  A gstlal_reference_psd node
123  """
124  def __init__(self, job, dag, frame_cache, gps_start_time, gps_end_time, instrument, channel, injections=None, p_node=[]):
125 
126  pipeline.CondorDAGNode.__init__(self,job)
127  self.add_var_opt("frame-cache", frame_cache)
128  self.add_var_opt("gps-start-time", gps_start_time)
129  self.add_var_opt("gps-end-time", gps_end_time)
130  self.add_var_opt("data-source", "frames")
131  self.add_var_arg("--channel-name=%s=%s" % (instrument, channel))
132  if injections:
133  self.add_var_opt("injections", injections)
134  path = os.getcwd()
135  output_name = self.output_name = '%s/%s-%d-%d-reference_psd.xml.gz' % (path, instrument, gps_start_time, gps_end_time)
136  self.add_var_opt("write-psd",output_name)
137  dag.output_cache.append(CacheEntry(instrument, "-", segments.segment(gps_start_time, gps_end_time), "file://localhost/%s" % (output_name,)))
138  for p in p_node:
139  self.add_parent(p)
140  dag.add_node(self)
141 
142 
143 class gstlal_smooth_reference_psd_node(pipeline.CondorDAGNode):
144  """
145  A gstlal_smooth_reference_psd node
146  """
147  def __init__(self, job, dag, instrument, input_psd, p_node=[]):
148  pipeline.CondorDAGNode.__init__(self,job)
149  path = os.getcwd()
150  #FIXME shouldn't be hardcoding stuff like this
151  output_name = self.output_name = input_psd.replace('reference_psd', 'smoothed_reference_psd')
152  self.add_var_arg(input_psd)
153  self.add_var_opt("output", output_name)
154  self.add_var_opt("low-fit-freq", 10)
155  for p in p_node:
156  self.add_parent(p)
157  dag.add_node(self)
158 
159 
160 class gstlal_median_psd_node(pipeline.CondorDAGNode):
161  """
162  A gstlal_median_psd node
163  """
164  def __init__(self, job, dag, input_psds, output, p_node=[]):
165  pipeline.CondorDAGNode.__init__(self,job)
166  path = os.getcwd()
167  #FIXME shouldn't be hardcoding stuff like this
168  output_name = self.output_name = output
169  self.add_var_opt("output-name", output_name)
170  for psd in input_psds:
171  self.add_file_arg(psd)
172  for p in p_node:
173  self.add_parent(p)
174  dag.add_node(self)
175 
176 
177 #
178 # classes for generating recolored frames
179 #
180 
181 class gstlal_fake_frames_job(pipeline.CondorDAGJob):
182  """
183  A gstlal_fake_frames job
184  """
185  def __init__(self, group, user, executable=dagparts.which('gstlal_fake_frames'), tag_base='gstlal_fake_frames'):
186  """
187  """
188  self.__prog__ = 'gstlal_fake_frames'
189  self.__executable = executable
190  self.__universe = 'vanilla'
191  pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable)
192  self.add_condor_cmd('getenv','True')
193  self.add_condor_cmd('requirements', 'Memory > 1999') #FIXME is this enough?
194  self.tag_base = tag_base
195  self.add_condor_cmd('environment',"KMP_LIBRARY=serial;MKL_SERIAL=yes")
196  self.set_sub_file(tag_base+'.sub')
197  if group is not None:
198  self.add_condor_cmd('accounting_group', group)
199  if user is not None:
200  self.add_condor_cmd('accounting_group_user', user)
201  self.set_stdout_file('logs/'+tag_base+'-$(macroid)-$(process).out')
202  self.set_stderr_file('logs/'+tag_base+'-$(macroid)-$(process).err')
203 
204 
205 class gstlal_fake_frames_node(pipeline.CondorDAGNode):
206  """
207  A gstlal_fake_frames node
208  """
209  def __init__(self, job, dag, frame_cache, gps_start_time, gps_end_time, channel, reference_psd, color_psd, sample_rate, injections=None, output_channel_name = None, duration = 4096, output_path = None, frame_type = None, shift = None, whiten_track_psd = False, frames_per_file = 1, p_node=[]):
210 
211  pipeline.CondorDAGNode.__init__(self,job)
212  self.add_var_opt("frame-cache", frame_cache)
213  self.add_var_opt("gps-start-time",gps_start_time)
214  self.add_var_opt("gps-end-time",gps_end_time)
215  self.add_var_opt("data-source", "frames")
216  self.add_var_arg("--channel-name=%s=%s" % (instrument, channel))
217  self.add_var_opt("whiten-reference-psd",reference_psd)
218  self.add_var_opt("color-psd", color_psd)
219  self.add_var_opt("sample-rate", sample_rate)
220  if injections is not None:
221  self.add_var_opt("injections", injections)
222  self.add_var_opt("output-channel-name", output_channel_name)
223  self.add_var_opt("frame-duration", duration)
224  if output_path is not None:
225  self.add_var_opt("output-path", output_path)
226  self.add_var_opt("frame-type", frame_type)
227  if whiten_track_psd:
228  self.add_var_opt("whiten-track-psd",reference_psd)
229  if shift:
230  self.add_var_opt("shift", shift)
231  self.add_var_opt("frames-per-file", frames_per_file)
232  for p in p_node:
233  self.add_parent(p)
234  dag.add_node(self)
235 
236 
237 def choosesegs(seglists, min_segment_length):
238  for instrument, seglist in seglists.iteritems():
239  newseglist = segments.segmentlist()
240  for seg in seglist:
241  if abs(seg) > min_segment_length:
242  newseglist.append(segments.segment(seg))
243  seglists[instrument] = newseglist
244 
245 
246 def parse_command_line():
247  parser = OptionParser(description = __doc__)
248 
249  parser.add_option("--frame-cache", metavar = "filename", help = "Set the name of the LAL cache listing the LIGO-Virgo .gwf frame files (optional)")
250  parser.add_option("--injections", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load injections (optional).")
251  parser.add_option("--channel-name", metavar = "name", action = "append", help = "Set the name of the channels to process. Can be given multiple times as --channel-name=IFO=CHANNEL-NAME")
252  parser.add_option("--frame-segments-file", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load frame segments. Required")
253  parser.add_option("--frame-segments-name", metavar = "name", help = "Set the name of the segments to extract from the segment tables. Required")
254 
255  parser.add_option("--min-segment-length", metavar = "SECONDS", help = "Set the minimum segment length to process (required)", type="float")
256  parser.add_option("--shift", metavar = "NANOSECONDS", help = "Number of nanoseconds to delay (negative) or advance (positive) the time stream", type = "int")
257  parser.add_option("--sample-rate", metavar = "HZ", default = 16384, type = "int", help = "Sample rate at which to generate the data, should be less than or equal to the sample rate of the measured psds provided, default = 16384 Hz")
258  parser.add_option("--whiten-type", metavar="name", help = "Whiten whatever data is coming out of datasource either from the data or from a fixed reference psd if a file is given. Options are psdperseg|medianofpsdperseg|FILE")
259  parser.add_option("--whiten-track-psd", action = "store_true", help = "Calculate PSD from input data and track with time.")
260  parser.add_option("--color-psd", metavar = "FILE", help = "Set the name of psd xml file to color the data with")
261  parser.add_option("--output-path", metavar = "name", action = "append", help = "Set the instrument dependent output path for frames, defaults to current working directory. eg H1=/path/to/H1/frames. Can be given more than once.")
262  parser.add_option("--output-channel-name", metavar = "name", action="append", help = "The name of the channel in the output frames, e.g., --output-channel-name=IFO=CHANNEL-NAME. The default is the same as the channel name. Can be given more than once. Required ")
263  parser.add_option("--frame-type", metavar = "name", action = "append", help = "Set the instrument dependent frame type, H1=TYPE. Can be given more than once and is required for each instrument processed.")
264  parser.add_option("--frame-duration", metavar = "SECONDS", default = 16, type = "int", help = "Set the duration of the output frames. The duration of the frame file will be multiplied by --frames-per-file. Default: 16s")
265  parser.add_option("--frames-per-file", metavar = "INT", default = 256, type = "int", help = "Set the number of frames per file. Default: 256")
266  parser.add_option("--accounting-group", metavar = "str", help = "Set the accounting group name, e.g., ligo.dev.o3.cw.directedbinary.production")
267  parser.add_option("--accounting-group-user", metavar = "str", help = "Set the accounting group user, e.g., chad.hanna")
268  parser.add_option("--verbose", action = "store_true", help = "Be verbose")
269 
270  options, filenames = parser.parse_args()
271 
272  fail = ""
273  for option in ("min_segment_length", "frame_type", "frame_segments_file", "frame_segments_name"):
274  if getattr(options, option) is None:
275  fail += "must provide option %s\n" % (option)
276  if fail:
277  raise ValueError(fail)
278 
279  inchannels = datasource.channel_dict_from_channel_list(options.channel_name)
280  outchannels = datasource.channel_dict_from_channel_list(options.output_channel_name)
281  frametypes = datasource.channel_dict_from_channel_list(options.frame_type)
282  outpaths = datasource.channel_dict_from_channel_list(options.output_path)
283 
284  if not (set(frametypes) == set(inchannels) == set(outchannels)):
285  raise ValueError('--frame-type, --channel-name and --output-channel-name must contain same instruments')
286 
287  return options, inchannels, outchannels, outpaths, frametypes, filenames
288 
289 
290 options, inchannels, outchannels, outpaths, frametypes, filenames = parse_command_line()
291 
292 try:
293  os.mkdir("logs")
294 except:
295  pass
296 
297 dag = dagparts.CondorDAG("gstlal_fake_frames_pipe")
298 
299 seglists = ligolw_segments.segmenttable_get_by_name(ligolw_utils.load_filename(options.frame_segments_file, verbose = options.verbose, contenthandler = ligolw_segments.LIGOLWContentHandler), options.frame_segments_name).coalesce()
300 choosesegs(seglists, options.min_segment_length)
301 
302 psdJob = gstlal_reference_psd_job(options.accounting_group, options.accounting_group_user)
303 smoothJob = gstlal_smooth_reference_psd_job(options.accounting_group, options.accounting_group_user)
304 medianJob = gstlal_median_psd_job(options.accounting_group, options.accounting_group_user)
305 colorJob = gstlal_fake_frames_job(options.accounting_group, options.accounting_group_user)
306 
307 smoothnode = {}
308 mediannode = {}
309 p_node = dict([(i, []) for i in seglists])
310 
311 if options.whiten_type in ("psdperseg", "medianofpsdperseg"):
312  psd = {}
313  for instrument, seglist in seglists.iteritems():
314  mediannode[instrument] = {}
315  smoothnode[instrument] = {}
316  psd[instrument] = {}
317  for seg in seglist:
318  #FIXME if there are sements without frame caches this will barf
319  psdnode = gstlal_reference_psd_node(psdJob, dag, options.frame_cache, int(seg[0]), int(seg[1]), instrument, inchannels[instrument], injections=None, p_node=[])
320  smoothnode[instrument][seg] = gstlal_smooth_reference_psd_node(smoothJob, dag, instrument, psdnode.output_name, p_node=[psdnode])
321  if options.whiten_type == "psdperseg":
322  psd[instrument][seg] = smoothnode[instrument][seg].output_name
323 
324  mediannode[instrument] = gstlal_median_psd_node(medianJob, dag, [v.output_name for v in smoothnode[instrument].values()], "%s_median_psd.xml.gz" % instrument, p_node=smoothnode[instrument].values())
325  p_node[instrument] = [mediannode[instrument]]
326  if options.whiten_type == "medianofpsdperseg":
327  psd[instrument] = mediannode[instrument].output_name
328 
329 elif options.whiten_type is not None:
330  psd = lalseries.read_psd_xmldoc(ligolw_utils.load_filename(options.whiten_reference_psd, verbose = options.verbose, contenthandler = lalseries.PSDContentHandler))
331 else:
332  psd = dict([(i, None) for i in seglists])
333 
334 for instrument, seglist in seglists.iteritems():
335  try:
336  output_path = outpaths[instrument]
337  except KeyError:
338  output_path = None
339  for seg in seglist:
340  try:
341  reference_psd = psd[instrument][seg]
342  except TypeError:
343  reference_psd = psd[instrument]
344  gstlal_fake_frames_node(colorJob, dag, options.frame_cache, int(seg[0]), int(seg[1]), inchannels[instrument], reference_psd, color_psd=options.color_psd, sample_rate = options.sample_rate, injections=options.injections, output_channel_name = outchannels[instrument], output_path = output_path, duration = options.frame_duration, frame_type = frametypes[instrument], shift = options.shift, whiten_track_psd = options.whiten_track_psd, frames_per_file = options.frames_per_file, p_node=p_node[instrument])
345 
346 dag.write_sub_files()
347 dag.write_dag()
348 dag.write_script()
349 dag.write_cache()