'--pixel-tests']))
-class TestThread(dump_render_tree_thread.WatchableThread):
- def __init__(self, started_queue, stopping_queue):
- dump_render_tree_thread.WatchableThread.__init__(self)
- self._started_queue = started_queue
- self._stopping_queue = stopping_queue
- self._timeout = False
- self._timeout_queue = Queue.Queue()
-
- def run(self):
- self._covered_run()
-
- def _covered_run(self):
- # FIXME: this is a separate routine to work around a bug
- # in coverage: see http://bitbucket.org/ned/coveragepy/issue/85.
- self._thread_id = thread.get_ident()
- try:
- self._started_queue.put('')
- msg = self._stopping_queue.get()
- if msg == 'KeyboardInterrupt':
- raise KeyboardInterrupt
- elif msg == 'Exception':
- raise ValueError()
- elif msg == 'Timeout':
- self._timeout = True
- self._timeout_queue.get()
- except:
- self._exception_info = sys.exc_info()
-
- def next_timeout(self):
- if self._timeout:
- self._timeout_queue.put('done')
- return time.time() - 10
- return time.time()
-
-
-class TestHandler(logging.Handler):
- def __init__(self, astream):
- logging.Handler.__init__(self)
- self._stream = astream
-
- def emit(self, record):
- self._stream.write(self.format(record))
-
-
-class WaitForThreadsToFinishTest(unittest.TestCase):
- class MockTestRunner(run_webkit_tests.TestRunner):
- def __init__(self):
- pass
-
- def __del__(self):
- pass
-
- def update_summary(self, result_summary):
- pass
-
- def run_one_thread(self, msg):
- runner = self.MockTestRunner()
- starting_queue = Queue.Queue()
- stopping_queue = Queue.Queue()
- child_thread = TestThread(starting_queue, stopping_queue)
- child_thread.start()
- started_msg = starting_queue.get()
- stopping_queue.put(msg)
- threads = [child_thread]
- return runner._wait_for_threads_to_finish(threads, None)
-
- def test_basic(self):
- interrupted = self.run_one_thread('')
- self.assertFalse(interrupted)
-
- def test_interrupt(self):
- interrupted = self.run_one_thread('KeyboardInterrupt')
- self.assertTrue(interrupted)
-
- def test_timeout(self):
- oc = outputcapture.OutputCapture()
- oc.capture_output()
- interrupted = self.run_one_thread('Timeout')
- self.assertFalse(interrupted)
- oc.restore_output()
-
- def test_exception(self):
- self.assertRaises(ValueError, self.run_one_thread, 'Exception')
-
-
-class StandaloneFunctionsTest(unittest.TestCase):
- def test_log_wedged_thread(self):
- oc = outputcapture.OutputCapture()
- oc.capture_output()
- logger = run_webkit_tests._log
- astream = array_stream.ArrayStream()
- handler = TestHandler(astream)
- logger.addHandler(handler)
-
- starting_queue = Queue.Queue()
- stopping_queue = Queue.Queue()
- child_thread = TestThread(starting_queue, stopping_queue)
- child_thread.start()
- msg = starting_queue.get()
-
- run_webkit_tests._log_wedged_thread(child_thread)
- stopping_queue.put('')
- child_thread.join(timeout=1.0)
-
- self.assertFalse(astream.empty())
- self.assertFalse(child_thread.isAlive())
- oc.restore_output()
-
-
if __name__ == '__main__':
unittest.main()