1 # Copyright (C) 2010 Google Inc. All rights reserved.
3 # Redistribution and use in source and binary forms, with or without
4 # modification, are permitted provided that the following conditions are
7 # * Redistributions of source code must retain the above copyright
8 # notice, this list of conditions and the following disclaimer.
9 # * Redistributions in binary form must reproduce the above
10 # copyright notice, this list of conditions and the following disclaimer
11 # in the documentation and/or other materials provided with the
13 # * Neither the name of Google Inc. nor the names of its
14 # contributors may be used to endorse or promote products derived from
15 # this software without specific prior written permission.
17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
37 from webkitpy.common import array_stream
38 from webkitpy.common.system import outputcapture
39 from webkitpy.tool import mocktool
41 from webkitpy.layout_tests import run_webkit_tests
46 class TestThread(threading.Thread):
47 def __init__(self, started_queue, stopping_queue):
48 threading.Thread.__init__(self)
50 self._started_queue = started_queue
51 self._stopping_queue = stopping_queue
53 self._timeout_queue = Queue.Queue()
54 self._exception_info = None
65 def _covered_run(self):
66 # FIXME: this is a separate routine to work around a bug
67 # in coverage: see http://bitbucket.org/ned/coveragepy/issue/85.
68 self._id = thread.get_ident()
70 self._started_queue.put('')
71 msg = self._stopping_queue.get()
72 if msg == 'KeyboardInterrupt':
73 raise KeyboardInterrupt
74 elif msg == 'Exception':
76 elif msg == 'Timeout':
78 self._timeout_queue.get()
80 self._exception_info = sys.exc_info()
82 def exception_info(self):
83 return self._exception_info
85 def next_timeout(self):
87 self._timeout_queue.put('done')
88 return time.time() - 10
91 def clear_next_timeout(self):
92 self._next_timeout = None
94 class TestHandler(logging.Handler):
95 def __init__(self, astream):
96 logging.Handler.__init__(self)
97 self._stream = astream
99 def emit(self, record):
100 self._stream.write(self.format(record))
103 class MultiThreadedBrokerTest(unittest.TestCase):
104 class MockTestRunner(object):
114 def run_one_thread(self, msg):
115 runner = self.MockTestRunner()
117 options = mocktool.MockOptions(child_processes='1')
118 starting_queue = Queue.Queue()
119 stopping_queue = Queue.Queue()
120 broker = message_broker._MultiThreadedBroker(port, options)
121 broker._test_runner = runner
122 child_thread = TestThread(starting_queue, stopping_queue)
123 name = child_thread.name()
124 broker._threads[name] = child_thread
126 started_msg = starting_queue.get()
127 stopping_queue.put(msg)
128 return broker.run_message_loop()
130 def test_basic(self):
131 interrupted = self.run_one_thread('')
132 self.assertFalse(interrupted)
134 def test_interrupt(self):
135 self.assertRaises(KeyboardInterrupt, self.run_one_thread, 'KeyboardInterrupt')
137 def test_timeout(self):
138 oc = outputcapture.OutputCapture()
140 interrupted = self.run_one_thread('Timeout')
141 self.assertFalse(interrupted)
144 def test_exception(self):
145 self.assertRaises(ValueError, self.run_one_thread, 'Exception')
148 class Test(unittest.TestCase):
149 def test_find_thread_stack_found(self):
150 id, stack = sys._current_frames().items()[0]
151 found_stack = message_broker._find_thread_stack(id)
152 self.assertNotEqual(found_stack, None)
154 def test_find_thread_stack_not_found(self):
155 found_stack = message_broker._find_thread_stack(0)
156 self.assertEqual(found_stack, None)
158 def test_log_wedged_worker(self):
159 oc = outputcapture.OutputCapture()
161 logger = message_broker._log
162 astream = array_stream.ArrayStream()
163 handler = TestHandler(astream)
164 logger.addHandler(handler)
166 starting_queue = Queue.Queue()
167 stopping_queue = Queue.Queue()
168 child_thread = TestThread(starting_queue, stopping_queue)
170 msg = starting_queue.get()
172 message_broker.log_wedged_worker(child_thread.name(),
174 stopping_queue.put('')
175 child_thread.join(timeout=1.0)
177 self.assertFalse(astream.empty())
178 self.assertFalse(child_thread.isAlive())
182 if __name__ == '__main__':