27 from gstlal
import pipeparts
28 from lal
import LIGOTimeGPS
47 def framesrc_test_01a(pipeline, name):
52 location =
"/home/kipp/scratch_local/874100000-20000/cache/874000000-20000.cache" 54 channel_name =
"LSC-STRAIN" 60 head = pipeparts.mkframecppchanneldemux(pipeline, pipeparts.mkcachesrc(pipeline, location = location, cache_src_regex =
"%s.*" % instrument[0]))
61 elem = pipeparts.mkqueue(pipeline,
None, max_size_time = 8 * Gst.SECOND)
62 pipeparts.src_deferred_link(head,
"%s:%s" % (instrument, channel_name), elem.get_static_pad(
"sink"))
65 head = pipeparts.mkprogressreport(pipeline, head,
"src")
66 head = pipeparts.mkchecktimestamps(pipeline, head)
67 pipeparts.mkfakesink(pipeline, head)
85 test_common.build_and_run(framesrc_test_01a,
"framesrc_test_01a", segment = (LIGOTimeGPS(874000016), LIGOTimeGPS(874000600)))
def build_and_run(pipelinefunc, name, segment=None, pipelinefunc_kwargs)