22 from glue
import iterutils
23 from ligo
import segments
24 from lal
import LIGOTimeGPS
27 default_timestamp_fuzz = 1e-9
28 default_sample_fuzz = 1e-15
37 COMPARE_FLAGS_EXACT_GAPS = 1
39 COMPARE_FLAGS_ZERO_IS_GAP = 2
41 COMPARE_FLAGS_ALLOW_STARTSTOP_MISALIGN = 4
44 COMPARE_FLAGS_DEFAULT = 0
52 def load_file(fobj, transients = (0.0, 0.0)):
53 stream = (line.strip()
for line
in fobj)
54 stream = (line.split()
for line
in stream
if line
and not line.startswith(
"#"))
55 lines = [(LIGOTimeGPS(line[0]),) + tuple(map(float, line[1:]))
for line
in stream]
56 assert lines,
"no data" 57 channel_count_plus_1 = len(lines[0])
58 assert all(len(line) == channel_count_plus_1
for line
in lines),
"not all lines have the same channel count" 59 for t1, t2
in itertools.izip((line[0]
for line
in lines), (line[0]
for line
in lines[1:])):
60 assert t2 > t1,
"timestamps not in order @ t = %s s" % str(t2)
61 start = lines[0][0] + transients[0]
62 stop = lines[-1][0] - transients[-1]
63 iterutils.inplace_filter(
lambda line: start <= line[0] <= stop, lines)
64 assert lines,
"transients remove all data" 68 def max_abs_sample(lines):
70 return max(max(abs(x)
for x
in line[1:])
for line
in lines)
73 def identify_gaps(lines, timestamp_fuzz = default_timestamp_fuzz, sample_fuzz = default_sample_fuzz, flags = COMPARE_FLAGS_DEFAULT):
77 dt = min(float(line1[0] - line0[0])
for line0, line1
in itertools.izip(lines, lines[1:]))
78 dt = 1.0 / round(1.0 / dt)
82 if flags & COMPARE_FLAGS_ZERO_IS_GAP:
83 sample_fuzz *= max_abs_sample(lines)
85 gaps = segments.segmentlist()
86 for i, line
in enumerate(lines):
87 if i
and (line[0] - lines[i - 1][0]) - dt > timestamp_fuzz * 2:
90 gaps.append(segments.segment((lines[i - 1][0] + dt, line[0])))
91 if flags & COMPARE_FLAGS_ZERO_IS_GAP
and all(abs(x) <= sample_fuzz
for x
in line[1:]):
93 gaps.append(segments.segment((line[0], lines[i + 1][0]
if i + 1 < len(lines)
else line[0] + dt)))
94 return gaps.protract(timestamp_fuzz).coalesce()
97 def compare_fobjs(fobj1, fobj2, transients = (0.0, 0.0), timestamp_fuzz = default_timestamp_fuzz, sample_fuzz = default_sample_fuzz, flags = COMPARE_FLAGS_DEFAULT):
98 timestamp_fuzz = LIGOTimeGPS(timestamp_fuzz)
101 lines1 = load_file(fobj1, transients = transients)
102 lines2 = load_file(fobj2, transients = transients)
103 assert len(lines1[0]) == len(lines2[0]),
"files do not have same channel count" 106 if flags & COMPARE_FLAGS_ALLOW_STARTSTOP_MISALIGN:
107 lines1 = [line
for line
in lines1
if lines2[0][0] <= line[0] <= lines2[-1][0]]
108 assert lines1,
"time intervals do not overlap" 109 lines2 = [line
for line
in lines2
if lines1[0][0] <= line[0] <= lines1[-1][0]]
110 assert lines2,
"time intervals do not overlap" 113 gaps1 = identify_gaps(lines1, timestamp_fuzz = timestamp_fuzz, sample_fuzz = sample_fuzz, flags = flags)
114 gaps2 = identify_gaps(lines2, timestamp_fuzz = timestamp_fuzz, sample_fuzz = sample_fuzz, flags = flags)
115 if flags & COMPARE_FLAGS_EXACT_GAPS:
116 difference = gaps1 ^ gaps2
117 iterutils.inplace_filter(
lambda seg: abs(seg) > timestamp_fuzz, difference)
118 assert not difference,
"gap discrepancy: 1 ^ 2 = %s" % str(difference)
121 sample_fuzz *= max_abs_sample(itertools.chain(lines1, lines2))
123 lines1 = iter(lines1)
124 lines2 = iter(lines2)
126 line1 = lines1.next()
127 line2 = lines2.next()
130 if abs(line1[0] - line2[0]) <= timestamp_fuzz:
131 for val1, val2
in zip(line1[1:], line2[1:]):
132 assert abs(val1 - val2) <= sample_fuzz,
"values disagree @ t = %s s" % str(line1[0])
133 line1 = lines1.next()
134 line2 = lines2.next()
135 elif line1[0] < line2[0]
and line1[0]
in gaps2:
136 line1 = lines1.next()
137 elif line2[0] < line1[0]
and line2[0]
in gaps1:
138 line2 = lines2.next()
140 raise AssertionError(
"timestamp misalignment @ %s s and %s s" % (str(line1[0]), str(line2[0])))
141 except StopIteration:
146 def compare(filename1, filename2, *args, **kwargs):
148 compare_fobjs(open(filename1), open(filename2), *args, **kwargs)
149 except AssertionError
as e:
150 raise AssertionError(
"%s <--> %s: %s" % (filename1, filename2, str(e)))
158 if __name__ ==
"__main__":
159 from optparse
import OptionParser
160 parser = OptionParser()
161 parser.add_option(
"--compare-exact-gaps", action =
"store_const", const = COMPARE_FLAGS_EXACT_GAPS, default = 0)
162 parser.add_option(
"--compare-zero-is-gap", action =
"store_const", const = COMPARE_FLAGS_ZERO_IS_GAP, default = 0)
163 parser.add_option(
"--compare-allow-startstop-misalign", action =
"store_const", const = COMPARE_FLAGS_ALLOW_STARTSTOP_MISALIGN, default = 0)
164 parser.add_option(
"--timestamp-fuzz", metavar =
"seconds", type =
"float", default = default_timestamp_fuzz)
165 parser.add_option(
"--sample-fuzz", metavar =
"fraction", type =
"float", default = default_sample_fuzz)
166 options, (filename1, filename2) = parser.parse_args()
167 compare(filename1, filename2, timestamp_fuzz = options.timestamp_fuzz, sample_fuzz = options.sample_fuzz, flags = options.compare_exact_gaps | options.compare_zero_is_gap | options.compare_allow_startstop_misalign)