1 # Copyright (C) 2010 Google Inc. All rights reserved.
3 # Redistribution and use in source and binary forms, with or without
4 # modification, are permitted provided that the following conditions are
7 # * Redistributions of source code must retain the above copyright
8 # notice, this list of conditions and the following disclaimer.
9 # * Redistributions in binary form must reproduce the above
10 # copyright notice, this list of conditions and the following disclaimer
11 # in the documentation and/or other materials provided with the
13 # * Neither the name of Google Inc. nor the names of its
14 # contributors may be used to endorse or promote products derived from
15 # this software without specific prior written permission.
17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 from webkitpy.common import array_stream
37 from webkitpy.common.system import outputcapture
39 from webkitpy.layout_tests import run_webkit_tests
41 import dump_render_tree_thread
45 class TestThread(dump_render_tree_thread.WatchableThread):
46 def __init__(self, started_queue, stopping_queue):
47 dump_render_tree_thread.WatchableThread.__init__(self)
48 self._started_queue = started_queue
49 self._stopping_queue = stopping_queue
51 self._timeout_queue = Queue.Queue()
56 def _covered_run(self):
57 # FIXME: this is a separate routine to work around a bug
58 # in coverage: see http://bitbucket.org/ned/coveragepy/issue/85.
59 self._thread_id = thread.get_ident()
61 self._started_queue.put('')
62 msg = self._stopping_queue.get()
63 if msg == 'KeyboardInterrupt':
64 raise KeyboardInterrupt
65 elif msg == 'Exception':
67 elif msg == 'Timeout':
69 self._timeout_queue.get()
71 self._exception_info = sys.exc_info()
73 def next_timeout(self):
75 self._timeout_queue.put('done')
76 return time.time() - 10
80 class TestHandler(logging.Handler):
81 def __init__(self, astream):
82 logging.Handler.__init__(self)
83 self._stream = astream
85 def emit(self, record):
86 self._stream.write(self.format(record))
89 class WaitForThreadsToFinishTest(unittest.TestCase):
90 class MockTestRunner(run_webkit_tests.TestRunner):
97 def update_summary(self, result_summary):
100 def run_one_thread(self, msg):
101 runner = self.MockTestRunner()
102 starting_queue = Queue.Queue()
103 stopping_queue = Queue.Queue()
104 child_thread = TestThread(starting_queue, stopping_queue)
106 started_msg = starting_queue.get()
107 stopping_queue.put(msg)
108 threads = [child_thread]
109 return runner._wait_for_threads_to_finish(threads, None)
111 def test_basic(self):
112 interrupted = self.run_one_thread('')
113 self.assertFalse(interrupted)
115 def test_interrupt(self):
116 interrupted = self.run_one_thread('KeyboardInterrupt')
117 self.assertTrue(interrupted)
119 def test_timeout(self):
120 oc = outputcapture.OutputCapture()
122 interrupted = self.run_one_thread('Timeout')
123 self.assertFalse(interrupted)
126 def test_exception(self):
127 self.assertRaises(ValueError, self.run_one_thread, 'Exception')
130 class Test(unittest.TestCase):
131 def test_find_thread_stack_found(self):
132 id, stack = sys._current_frames().items()[0]
133 found_stack = message_broker._find_thread_stack(id)
134 self.assertNotEqual(found_stack, None)
136 def test_find_thread_stack_not_found(self):
137 found_stack = message_broker._find_thread_stack(0)
138 self.assertEqual(found_stack, None)
140 def test_log_wedged_thread(self):
141 oc = outputcapture.OutputCapture()
143 logger = message_broker._log
144 astream = array_stream.ArrayStream()
145 handler = TestHandler(astream)
146 logger.addHandler(handler)
148 starting_queue = Queue.Queue()
149 stopping_queue = Queue.Queue()
150 child_thread = TestThread(starting_queue, stopping_queue)
152 msg = starting_queue.get()
154 message_broker.log_wedged_thread(child_thread.id())
155 stopping_queue.put('')
156 child_thread.join(timeout=1.0)
158 self.assertFalse(astream.empty())
159 self.assertFalse(child_thread.isAlive())
163 if __name__ == '__main__':