gstlal  1.4.1
framesrc_test_01.py
1 #!/usr/bin/env python
2 # Copyright (C) 2010 Kipp Cannon
3 #
4 # This program is free software; you can redistribute it and/or modify it
5 # under the terms of the GNU General Public License as published by the
6 # Free Software Foundation; either version 2 of the License, or (at your
7 # option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
12 # Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License along
15 # with this program; if not, write to the Free Software Foundation, Inc.,
16 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17 
18 #
19 # =============================================================================
20 #
21 # Preamble
22 #
23 # =============================================================================
24 #
25 
26 
27 from gstlal import pipeparts
28 from lal import LIGOTimeGPS
29 import test_common
30 
31 
32 #
33 # =============================================================================
34 #
35 # Pipelines
36 #
37 # =============================================================================
38 #
39 
40 
41 #
42 # check consistency of buffer metadata generated by framesrc, and confirm
43 # that seeking and EOS work as expected
44 #
45 
46 
47 def framesrc_test_01a(pipeline, name):
48  #
49  # change these as needed to make test run on your machine
50  #
51 
52  location = "/home/kipp/scratch_local/874100000-20000/cache/874000000-20000.cache"
53  instrument = "H1"
54  channel_name = "LSC-STRAIN"
55 
56  #
57  # build pipeline
58  #
59 
60  head = pipeparts.mkframecppchanneldemux(pipeline, pipeparts.mkcachesrc(pipeline, location = location, cache_src_regex = "%s.*" % instrument[0]))
61  elem = pipeparts.mkqueue(pipeline, None, max_size_time = 8 * Gst.SECOND)
62  pipeparts.src_deferred_link(head, "%s:%s" % (instrument, channel_name), elem.get_static_pad("sink"))
63  head = elem
64 
65  head = pipeparts.mkprogressreport(pipeline, head, "src")
66  head = pipeparts.mkchecktimestamps(pipeline, head)
67  pipeparts.mkfakesink(pipeline, head)
68 
69  #
70  # done
71  #
72 
73  return pipeline
74 
75 
76 #
77 # =============================================================================
78 #
79 # Main
80 #
81 # =============================================================================
82 #
83 
84 
85 test_common.build_and_run(framesrc_test_01a, "framesrc_test_01a", segment = (LIGOTimeGPS(874000016), LIGOTimeGPS(874000600)))
86 
def build_and_run(pipelinefunc, name, segment=None, pipelinefunc_kwargs)
Definition: test_common.py:118