gstlal  1.4.1
dagparts.py
Go to the documentation of this file.
1 # Copyright (C) 2010 Kipp Cannon (kipp.cannon@ligo.org)
2 # Copyright (C) 2010 Chad Hanna (chad.hanna@ligo.org)
3 #
4 # This program is free software; you can redistribute it and/or modify it under
5 # the terms of the GNU General Public License as published by the Free Software
6 # Foundation; either version 2 of the License, or (at your option) any later
7 # version.
8 #
9 # This program is distributed in the hope that it will be useful, but WITHOUT
10 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11 # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12 # details.
13 #
14 # You should have received a copy of the GNU General Public License along with
15 # this program; if not, write to the Free Software Foundation, Inc., 51
16 # Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17 
18 
19 
20 
21 
22 #
23 # =============================================================================
24 #
25 # Preamble
26 #
27 # =============================================================================
28 #
29 
30 
31 """
32 DAG construction tools.
33 """
34 
35 
36 import os
37 import sys
38 import socket
39 import subprocess
40 import tempfile
41 import math
42 
43 from ligo import segments
44 from glue import pipeline
45 
46 
47 __author__ = "Kipp Cannon <kipp.cannon@ligo.org>, Chad Hanna <chad.hanna@ligo.org>"
48 __date__ = "$Date$" #FIXME
49 __version__ = "$Revision$" #FIXME
50 
51 
52 #
53 # =============================================================================
54 #
55 # Environment utilities
56 #
57 # =============================================================================
58 #
59 
60 
61 def which(prog):
62  which = subprocess.Popen(['which',prog], stdout=subprocess.PIPE)
63  out = which.stdout.read().strip()
64  if not out:
65  print >>sys.stderr, "ERROR: could not find %s in your path, have you built the proper software and sourced the proper environment scripts?" % (prog,prog)
66  raise ValueError
67  return out
68 
69 
70 def log_path():
71  host = socket.getfqdn()
72  try:
73  return os.environ['TMPDIR']
74  except KeyError:
75  print "\n\n!!!! $TMPDIR NOT SET !!!!\n\n\tPLEASE email your admin to tell them to set $TMPDIR to be the place where a users temporary files should be\n"
76  #FIXME add more hosts as you need them
77  if 'cit' in host or 'caltech.edu' in host:
78  tmp = '/usr1/' + os.environ['USER']
79  print "falling back to ", tmp
80  return tmp
81  if 'phys.uwm.edu' in host:
82  tmp = '/localscratch/' + os.environ['USER']
83  print "falling back to ", tmp
84  return tmp
85  if 'aei.uni-hannover.de' in host:
86  tmp = '/local/user/' + os.environ['USER']
87  print "falling back to ", tmp
88  return tmp
89  if 'phy.syr.edu' in host:
90  tmp = '/usr1/' + os.environ['USER']
91  print "falling back to ", tmp
92  return tmp
93 
94  raise KeyError("$TMPDIR is not set and I don't recognize this environment")
95 
96 
97 #
98 # =============================================================================
99 #
100 # Condor DAG utilities
101 #
102 # =============================================================================
103 #
104 
105 
106 class CondorDAG(pipeline.CondorDAG):
107 
108  def __init__(self, name, logpath = log_path()):
109  self.basename = name
110  fh, logfile = tempfile.mkstemp(dir = log_path(), prefix = self.basename + '.dag.log.')
111  os.close(fh)
112  pipeline.CondorDAG.__init__(self,logfile)
113  self.set_dag_file(self.basename)
114  self.jobsDict = {}
115  self.node_id = 0
116  self.output_cache = []
117 
118  def add_node(self, node, retry = 0):
119  node.set_retry(retry)
120  self.node_id += 1
121  node.add_macro("macroid", self.node_id)
122  node.add_macro("macronodename", node.get_name())
123  pipeline.CondorDAG.add_node(self, node)
124 
125  def write_cache(self):
126  out = self.basename + ".cache"
127  f = open(out,"w")
128  for c in self.output_cache:
129  f.write(str(c)+"\n")
130  f.close()
131 
132 
133 class CondorDAGJob(pipeline.CondorDAGJob):
134  """
135  A generic job class for gstlal stuff
136  """
137  def __init__(self, executable, tag_base):
138  self.__prog__ = tag_base
139  self.__executable = executable
140  self.__universe = 'vanilla'
141  pipeline.CondorDAGJob.__init__(self, self.__universe, self.__executable)
142  self.add_condor_cmd('getenv','True')
143  self.tag_base = tag_base
144  self.set_sub_file(tag_base+'.sub')
145  self.set_stdout_file('logs/'+tag_base+'-$(macroid)-$(macronodename)-$(cluster)-$(process).out')
146  self.set_stderr_file('logs/'+tag_base+'-$(macroid)-$(macronodename)-$(cluster)-$(process).err')
147  self.number = 1
148 
149 
150 class CondorDAGNode(pipeline.CondorDAGNode):
151  """
152  A generic node class for gstlal stuff
153  """
154  def __init__(self, job, dag, p_node=[]):
155  pipeline.CondorDAGNode.__init__(self, job)
156  for p in p_node:
157  self.add_parent(p)
158  dag.add_node(self)
159 
160 
161 #
162 # =============================================================================
163 #
164 # Segment utilities
165 #
166 # =============================================================================
167 #
168 
169 
170 def breakupseg(seg, maxextent, overlap):
171  if maxextent <= 0:
172  raise ValueError, "maxextent must be positive, not %s" % repr(maxextent)
173 
174  # Simple case of only one segment
175  if abs(seg) < maxextent:
176  return segments.segmentlist([seg])
177 
178  # adjust maxextent so that segments are divided roughly equally
179  maxextent = max(int(abs(seg) / (int(abs(seg)) // int(maxextent) + 1)), overlap)
180  maxextent = int(math.ceil(abs(seg) / math.ceil(abs(seg) / maxextent)))
181  end = seg[1]
182 
183  seglist = segments.segmentlist()
184 
185 
186  while abs(seg):
187  if (seg[0] + maxextent + overlap) < end:
188  seglist.append(segments.segment(seg[0], seg[0] + maxextent + overlap))
189  seg = segments.segment(seglist[-1][1] - overlap, seg[1])
190  else:
191  seglist.append(segments.segment(seg[0], end))
192  break
193 
194  return seglist
195 
196 
197 def breakupsegs(seglist, maxextent, overlap):
198  newseglist = segments.segmentlist()
199  for bigseg in seglist:
200  newseglist.extend(breakupseg(bigseg, maxextent, overlap))
201  return newseglist
202 
203 
204 def breakupseglists(seglists, maxextent, overlap):
205  for instrument, seglist in seglists.iteritems():
206  newseglist = segments.segmentlist()
207  for bigseg in seglist:
208  newseglist.extend(breakupseg(bigseg, maxextent, overlap))
209  seglists[instrument] = newseglist