3 # Copyright (C) 2011 Chad Hanna
5 # This program is free software; you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by the
7 # Free Software Foundation; either version 2 of the License, or (at your
8 # option) any later version.
10 # This program is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
13 # Public License for more details.
15 # You should have received a copy of the GNU General Public License along
16 # with this program; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20 # An HTCondor DAG generator to recolor frame data
23 This program makes a dag to recolor frames
26 __author__ = 'Chad Hanna <chad.hanna@ligo.org>'
29 ##############################################################################
30 # import standard modules
31 import sys, os, copy, math
32 from optparse import OptionParser
33 import subprocess, socket, tempfile
35 ##############################################################################
36 # import the modules we need to build the pipeline
37 from glue import pipeline
38 from ligo import segments
39 import glue.ligolw.utils as ligolw_utils
40 import glue.ligolw.utils.segments as ligolw_segments
41 from gstlal import datasource
42 from gstlal import dagparts
43 from lal import series as lalseries
44 from lal.utils import CacheEntry
47 # Classes for generating reference psds
50 class gstlal_reference_psd_job(pipeline.CondorDAGJob):
52 A gstlal_reference_psd job
54 def __init__(self, group, user, executable=dagparts.which('gstlal_reference_psd'), tag_base='gstlal_reference_psd'):
57 self.__prog__ = 'gstlal_reference_psd'
58 self.__executable = executable
59 self.__universe = 'vanilla'
60 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable)
61 self.add_condor_cmd('getenv','True')
62 self.add_condor_cmd('requirements', 'Memory > 1999') #FIXME is this enough?
63 self.tag_base = tag_base
64 self.add_condor_cmd('environment',"KMP_LIBRARY=serial;MKL_SERIAL=yes")
65 self.set_sub_file(tag_base+'.sub')
67 self.add_condor_cmd('accounting_group', group)
69 self.add_condor_cmd('accounting_group_user', user)
70 self.set_stdout_file('logs/'+tag_base+'-$(macroid)-$(process).out')
71 self.set_stderr_file('logs/'+tag_base+'-$(macroid)-$(process).err')
74 class gstlal_median_psd_job(pipeline.CondorDAGJob):
76 A gstlal_median_psd job
78 def __init__(self, group, user, executable=dagparts.which('gstlal_median_of_psds'), tag_base='gstlal_median_of_psds'):
81 self.__prog__ = 'gstlal_median_of_psds'
82 self.__executable = executable
83 self.__universe = 'vanilla'
84 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable)
85 self.add_condor_cmd('getenv','True')
86 self.tag_base = tag_base
87 self.add_condor_cmd('environment',"KMP_LIBRARY=serial;MKL_SERIAL=yes")
88 self.set_sub_file(tag_base+'.sub')
90 self.add_condor_cmd('accounting_group', group)
92 self.add_condor_cmd('accounting_group_user', user)
93 self.set_stdout_file('logs/'+tag_base+'-$(macroid)-$(process).out')
94 self.set_stderr_file('logs/'+tag_base+'-$(macroid)-$(process).err')
97 class gstlal_smooth_reference_psd_job(pipeline.CondorDAGJob):
99 A gstlal_smooth_reference_psd job
101 def __init__(self, group, user, executable=dagparts.which('gstlal_psd_polyfit'), tag_base='gstlal_psd_polyfit'):
104 self.__prog__ = 'gstlal_psd_polyfit'
105 self.__executable = executable
106 self.__universe = 'vanilla'
107 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable)
108 self.add_condor_cmd('getenv','True')
109 self.tag_base = tag_base
110 self.add_condor_cmd('environment',"KMP_LIBRARY=serial;MKL_SERIAL=yes")
111 self.set_sub_file(tag_base+'.sub')
112 if group is not None:
113 self.add_condor_cmd('accounting_group', group)
115 self.add_condor_cmd('accounting_group_user', user)
116 self.set_stdout_file('logs/'+tag_base+'-$(macroid)-$(process).out')
117 self.set_stderr_file('logs/'+tag_base+'-$(macroid)-$(process).err')
120 class gstlal_reference_psd_node(pipeline.CondorDAGNode):
122 A gstlal_reference_psd node
124 def __init__(self, job, dag, frame_cache, gps_start_time, gps_end_time, instrument, channel, injections=None, p_node=[]):
126 pipeline.CondorDAGNode.__init__(self,job)
127 self.add_var_opt("frame-cache", frame_cache)
128 self.add_var_opt("gps-start-time", gps_start_time)
129 self.add_var_opt("gps-end-time", gps_end_time)
130 self.add_var_opt("data-source", "frames")
131 self.add_var_arg("--channel-name=%s=%s" % (instrument, channel))
133 self.add_var_opt("injections", injections)
135 output_name = self.output_name = '%s/%s-%d-%d-reference_psd.xml.gz' % (path, instrument, gps_start_time, gps_end_time)
136 self.add_var_opt("write-psd",output_name)
137 dag.output_cache.append(CacheEntry(instrument, "-", segments.segment(gps_start_time, gps_end_time), "file://localhost/%s" % (output_name,)))
143 class gstlal_smooth_reference_psd_node(pipeline.CondorDAGNode):
145 A gstlal_smooth_reference_psd node
147 def __init__(self, job, dag, instrument, input_psd, p_node=[]):
148 pipeline.CondorDAGNode.__init__(self,job)
150 #FIXME shouldn't be hardcoding stuff like this
151 output_name = self.output_name = input_psd.replace('reference_psd', 'smoothed_reference_psd')
152 self.add_var_arg(input_psd)
153 self.add_var_opt("output", output_name)
154 self.add_var_opt("low-fit-freq", 10)
160 class gstlal_median_psd_node(pipeline.CondorDAGNode):
162 A gstlal_median_psd node
164 def __init__(self, job, dag, input_psds, output, p_node=[]):
165 pipeline.CondorDAGNode.__init__(self,job)
167 #FIXME shouldn't be hardcoding stuff like this
168 output_name = self.output_name = output
169 self.add_var_opt("output-name", output_name)
170 for psd in input_psds:
171 self.add_file_arg(psd)
178 # classes for generating recolored frames
181 class gstlal_fake_frames_job(pipeline.CondorDAGJob):
183 A gstlal_fake_frames job
185 def __init__(self, group, user, executable=dagparts.which('gstlal_fake_frames'), tag_base='gstlal_fake_frames'):
188 self.__prog__ = 'gstlal_fake_frames'
189 self.__executable = executable
190 self.__universe = 'vanilla'
191 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable)
192 self.add_condor_cmd('getenv','True')
193 self.add_condor_cmd('requirements', 'Memory > 1999') #FIXME is this enough?
194 self.tag_base = tag_base
195 self.add_condor_cmd('environment',"KMP_LIBRARY=serial;MKL_SERIAL=yes")
196 self.set_sub_file(tag_base+'.sub')
197 if group is not None:
198 self.add_condor_cmd('accounting_group', group)
200 self.add_condor_cmd('accounting_group_user', user)
201 self.set_stdout_file('logs/'+tag_base+'-$(macroid)-$(process).out')
202 self.set_stderr_file('logs/'+tag_base+'-$(macroid)-$(process).err')
205 class gstlal_fake_frames_node(pipeline.CondorDAGNode):
207 A gstlal_fake_frames node
209 def __init__(self, job, dag, frame_cache, gps_start_time, gps_end_time, channel, reference_psd, color_psd, sample_rate, injections=None, output_channel_name = None, duration = 4096, output_path = None, frame_type = None, shift = None, whiten_track_psd = False, frames_per_file = 1, p_node=[]):
211 pipeline.CondorDAGNode.__init__(self,job)
212 self.add_var_opt("frame-cache", frame_cache)
213 self.add_var_opt("gps-start-time",gps_start_time)
214 self.add_var_opt("gps-end-time",gps_end_time)
215 self.add_var_opt("data-source", "frames")
216 self.add_var_arg("--channel-name=%s=%s" % (instrument, channel))
217 self.add_var_opt("whiten-reference-psd",reference_psd)
218 self.add_var_opt("color-psd", color_psd)
219 self.add_var_opt("sample-rate", sample_rate)
220 if injections is not None:
221 self.add_var_opt("injections", injections)
222 self.add_var_opt("output-channel-name", output_channel_name)
223 self.add_var_opt("frame-duration", duration)
224 if output_path is not None:
225 self.add_var_opt("output-path", output_path)
226 self.add_var_opt("frame-type", frame_type)
228 self.add_var_opt("whiten-track-psd",reference_psd)
230 self.add_var_opt("shift", shift)
231 self.add_var_opt("frames-per-file", frames_per_file)
237 def choosesegs(seglists, min_segment_length):
238 for instrument, seglist in seglists.iteritems():
239 newseglist = segments.segmentlist()
241 if abs(seg) > min_segment_length:
242 newseglist.append(segments.segment(seg))
243 seglists[instrument] = newseglist
246 def parse_command_line():
247 parser = OptionParser(description = __doc__)
249 parser.add_option("--frame-cache", metavar = "filename", help = "Set the name of the LAL cache listing the LIGO-Virgo .gwf frame files (optional)")
250 parser.add_option("--injections", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load injections (optional).")
251 parser.add_option("--channel-name", metavar = "name", action = "append", help = "Set the name of the channels to process. Can be given multiple times as --channel-name=IFO=CHANNEL-NAME")
252 parser.add_option("--frame-segments-file", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load frame segments. Required")
253 parser.add_option("--frame-segments-name", metavar = "name", help = "Set the name of the segments to extract from the segment tables. Required")
255 parser.add_option("--min-segment-length", metavar = "SECONDS", help = "Set the minimum segment length to process (required)", type="float")
256 parser.add_option("--shift", metavar = "NANOSECONDS", help = "Number of nanoseconds to delay (negative) or advance (positive) the time stream", type = "int")
257 parser.add_option("--sample-rate", metavar = "HZ", default = 16384, type = "int", help = "Sample rate at which to generate the data, should be less than or equal to the sample rate of the measured psds provided, default = 16384 Hz")
258 parser.add_option("--whiten-type", metavar="name", help = "Whiten whatever data is coming out of datasource either from the data or from a fixed reference psd if a file is given. Options are psdperseg|medianofpsdperseg|FILE")
259 parser.add_option("--whiten-track-psd", action = "store_true", help = "Calculate PSD from input data and track with time.")
260 parser.add_option("--color-psd", metavar = "FILE", help = "Set the name of psd xml file to color the data with")
261 parser.add_option("--output-path", metavar = "name", action = "append", help = "Set the instrument dependent output path for frames, defaults to current working directory. eg H1=/path/to/H1/frames. Can be given more than once.")
262 parser.add_option("--output-channel-name", metavar = "name", action="append", help = "The name of the channel in the output frames, e.g., --output-channel-name=IFO=CHANNEL-NAME. The default is the same as the channel name. Can be given more than once. Required ")
263 parser.add_option("--frame-type", metavar = "name", action = "append", help = "Set the instrument dependent frame type, H1=TYPE. Can be given more than once and is required for each instrument processed.")
264 parser.add_option("--frame-duration", metavar = "SECONDS", default = 16, type = "int", help = "Set the duration of the output frames. The duration of the frame file will be multiplied by --frames-per-file. Default: 16s")
265 parser.add_option("--frames-per-file", metavar = "INT", default = 256, type = "int", help = "Set the number of frames per file. Default: 256")
266 parser.add_option("--accounting-group", metavar = "str", help = "Set the accounting group name, e.g., ligo.dev.o3.cw.directedbinary.production")
267 parser.add_option("--accounting-group-user", metavar = "str", help = "Set the accounting group user, e.g., chad.hanna")
268 parser.add_option("--verbose", action = "store_true", help = "Be verbose")
270 options, filenames = parser.parse_args()
273 for option in ("min_segment_length", "frame_type", "frame_segments_file", "frame_segments_name"):
274 if getattr(options, option) is None:
275 fail += "must provide option %s\n" % (option)
277 raise ValueError(fail)
279 inchannels = datasource.channel_dict_from_channel_list(options.channel_name)
280 outchannels = datasource.channel_dict_from_channel_list(options.output_channel_name)
281 frametypes = datasource.channel_dict_from_channel_list(options.frame_type)
282 outpaths = datasource.channel_dict_from_channel_list(options.output_path)
284 if not (set(frametypes) == set(inchannels) == set(outchannels)):
285 raise ValueError('--frame-type, --channel-name and --output-channel-name must contain same instruments')
287 return options, inchannels, outchannels, outpaths, frametypes, filenames
290 options, inchannels, outchannels, outpaths, frametypes, filenames = parse_command_line()
297 dag = dagparts.CondorDAG("gstlal_fake_frames_pipe")
299 seglists = ligolw_segments.segmenttable_get_by_name(ligolw_utils.load_filename(options.frame_segments_file, verbose = options.verbose, contenthandler = ligolw_segments.LIGOLWContentHandler), options.frame_segments_name).coalesce()
300 choosesegs(seglists, options.min_segment_length)
302 psdJob = gstlal_reference_psd_job(options.accounting_group, options.accounting_group_user)
303 smoothJob = gstlal_smooth_reference_psd_job(options.accounting_group, options.accounting_group_user)
304 medianJob = gstlal_median_psd_job(options.accounting_group, options.accounting_group_user)
305 colorJob = gstlal_fake_frames_job(options.accounting_group, options.accounting_group_user)
309 p_node = dict([(i, []) for i in seglists])
311 if options.whiten_type in ("psdperseg", "medianofpsdperseg"):
313 for instrument, seglist in seglists.iteritems():
314 mediannode[instrument] = {}
315 smoothnode[instrument] = {}
318 #FIXME if there are sements without frame caches this will barf
319 psdnode = gstlal_reference_psd_node(psdJob, dag, options.frame_cache, int(seg[0]), int(seg[1]), instrument, inchannels[instrument], injections=None, p_node=[])
320 smoothnode[instrument][seg] = gstlal_smooth_reference_psd_node(smoothJob, dag, instrument, psdnode.output_name, p_node=[psdnode])
321 if options.whiten_type == "psdperseg":
322 psd[instrument][seg] = smoothnode[instrument][seg].output_name
324 mediannode[instrument] = gstlal_median_psd_node(medianJob, dag, [v.output_name for v in smoothnode[instrument].values()], "%s_median_psd.xml.gz" % instrument, p_node=smoothnode[instrument].values())
325 p_node[instrument] = [mediannode[instrument]]
326 if options.whiten_type == "medianofpsdperseg":
327 psd[instrument] = mediannode[instrument].output_name
329 elif options.whiten_type is not None:
330 psd = lalseries.read_psd_xmldoc(ligolw_utils.load_filename(options.whiten_reference_psd, verbose = options.verbose, contenthandler = lalseries.PSDContentHandler))
332 psd = dict([(i, None) for i in seglists])
334 for instrument, seglist in seglists.iteritems():
336 output_path = outpaths[instrument]
341 reference_psd = psd[instrument][seg]
343 reference_psd = psd[instrument]
344 gstlal_fake_frames_node(colorJob, dag, options.frame_cache, int(seg[0]), int(seg[1]), inchannels[instrument], reference_psd, color_psd=options.color_psd, sample_rate = options.sample_rate, injections=options.injections, output_channel_name = outchannels[instrument], output_path = output_path, duration = options.frame_duration, frame_type = frametypes[instrument], shift = options.shift, whiten_track_psd = options.whiten_track_psd, frames_per_file = options.frames_per_file, p_node=p_node[instrument])
346 dag.write_sub_files()