+2010-08-13 Dirk Pranke <dpranke@chromium.org>
+
+ Reviewed by Eric Seidel.
+
+ Rewrite new-run-webkit-test's wait_for_threads_to_finish loop to
+ check for exceptions on all threads, not just the first thread.
+
+ This change also changes the logging behavior for wedged threads
+ to only dump the stacks of threads that are actually wedged.
+
+ Refactor the thread classes in the dump_render_tree_thread module
+ to make the contract between TestRunner and TestShellThread clearer.
+
+ Added a bunch of unit tests.
+ https://bugs.webkit.org/show_bug.cgi?id=38561
+
+ * Scripts/webkitpy/layout_tests/layout_package/dump_render_tree_thread.py:
+ * Scripts/webkitpy/layout_tests/run_webkit_tests.py:
+ * Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py:
+
2010-08-13 Kenichi Ishibashi <bashi@google.com>
Reviewed by Shinichiro Hamaji.
"dump_render_tree_thread")
-def process_output(port, test_info, test_types, test_args, configuration,
- output_dir, crash, timeout, test_run_time, actual_checksum,
- output, error):
+def _process_output(port, test_info, test_types, test_args, configuration,
+ output_dir, crash, timeout, test_run_time, actual_checksum,
+ output, error):
"""Receives the output from a DumpRenderTree process, subjects it to a
number of tests, and returns a list of failure types the test produced.
total_time_for_all_diffs, time_for_diffs)
+def _pad_timeout(timeout):
+ """Returns a safe multiple of the per-test timeout value to use
+ to detect hung test threads.
+
+ """
+ # When we're running one test per DumpRenderTree process, we can
+ # enforce a hard timeout. The DumpRenderTree watchdog uses 2.5x
+ # the timeout; we want to be larger than that.
+ return timeout * 3
+
+
+def _milliseconds_to_seconds(msecs):
+ return float(msecs) / 1000.0
+
+
class TestResult(object):
def __init__(self, filename, failures, test_run_time,
driver.run_test(test_info.uri.strip(), test_info.timeout,
test_info.image_hash())
end = time.time()
- self._test_result = process_output(self._port,
+ self._test_result = _process_output(self._port,
test_info, self._test_types, self._test_args,
self._configuration, self._output_dir, crash, timeout, end - start,
actual_checksum, output, error)
return self._test_result
-class TestShellThread(threading.Thread):
+class WatchableThread(threading.Thread):
+ """This class abstracts an interface used by
+ run_webkit_tests.TestRunner._wait_for_threads_to_finish for thread
+ management."""
+ def __init__(self):
+ threading.Thread.__init__(self)
+ self._canceled = False
+ self._exception_info = None
+ self._next_timeout = None
+ self._thread_id = None
+
+ def cancel(self):
+ """Set a flag telling this thread to quit."""
+ self._canceled = True
+
+ def clear_next_timeout(self):
+ """Mark a flag telling this thread to stop setting timeouts."""
+ self._timeout = 0
+
+ def exception_info(self):
+ """If run() terminated on an uncaught exception, return it here
+ ((type, value, traceback) tuple).
+ Returns None if run() terminated normally. Meant to be called after
+ joining this thread."""
+ return self._exception_info
+
+ def id(self):
+ """Return a thread identifier."""
+ return self._thread_id
+
+ def next_timeout(self):
+ """Return the time the test is supposed to finish by."""
+ return self._next_timeout
+
+class TestShellThread(WatchableThread):
def __init__(self, port, filename_list_queue, result_queue,
test_types, test_args, image_path, shell_args, options):
"""Initialize all the local state for this DumpRenderTree thread.
command-line options should match those expected by
run_webkit_tests; they are typically passed via the
run_webkit_tests.TestRunner class."""
- threading.Thread.__init__(self)
+ WatchableThread.__init__(self)
self._port = port
self._filename_list_queue = filename_list_queue
self._result_queue = result_queue
self._image_path = image_path
self._shell_args = shell_args
self._options = options
- self._canceled = False
- self._exception_info = None
self._directory_timing_stats = {}
self._test_results = []
self._num_tests = 0
"""
return self._test_results
- def cancel(self):
- """Set a flag telling this thread to quit."""
- self._canceled = True
-
- def get_exception_info(self):
- """If run() terminated on an uncaught exception, return it here
- ((type, value, traceback) tuple).
- Returns None if run() terminated normally. Meant to be called after
- joining this thread."""
- return self._exception_info
-
def get_total_time(self):
return max(self._stop_time - self._start_time, 0.0)
def run(self):
"""Delegate main work to a helper method and watch for uncaught
exceptions."""
+ self._thread_id = thread.get_ident()
self._start_time = time.time()
self._num_tests = 0
try:
worker.start()
- # When we're running one test per DumpRenderTree process, we can
- # enforce a hard timeout. The DumpRenderTree watchdog uses 2.5x
- # the timeout; we want to be larger than that.
- worker.join(int(test_info.timeout) * 3.0 / 1000.0)
+ thread_timeout = _milliseconds_to_seconds(
+ _pad_timeout(test_info.timeout))
+ thread._next_timeout = time.time() + thread_timeout
+ worker.join(thread_timeout)
if worker.isAlive():
# If join() returned with the thread still running, the
# DumpRenderTree is completely hung and there's nothing
not self._options.pixel_tests)):
image_hash = ""
start = time.time()
+
+ thread_timeout = _milliseconds_to_seconds(
+ _pad_timeout(test_info.timeout))
+ self._next_timeout = start + thread_timeout
+
crash, timeout, actual_checksum, output, error = \
self._driver.run_test(test_info.uri, test_info.timeout, image_hash)
end = time.time()
- result = process_output(self._port, test_info, self._test_types,
+ result = _process_output(self._port, test_info, self._test_types,
self._test_args, self._options.configuration,
self._options.results_directory, crash,
timeout, end - start, actual_checksum,
import math
import optparse
import os
+import pdb
import platform
import Queue
import random
"""Returns whether we should run all the tests in the main thread."""
return int(self._options.child_processes) == 1
- def _dump_thread_states(self):
- for thread_id, stack in sys._current_frames().items():
- # FIXME: Python 2.6 has thread.ident which we could
- # use to map from thread_id back to thread.name
- print "\n# Thread: %d" % thread_id
- for filename, lineno, name, line in traceback.extract_stack(stack):
- print 'File: "%s", line %d, in %s' % (filename, lineno, name)
- if line:
- print " %s" % (line.strip())
-
- def _dump_thread_states_if_necessary(self):
- # HACK: Dump thread states every minute to figure out what's
- # hanging on the bots.
- if not self._options.verbose:
- return
- dump_threads_every = 60 # Dump every minute
- if not self._last_thread_dump:
- self._last_thread_dump = time.time()
- time_since_last_dump = time.time() - self._last_thread_dump
- if time_since_last_dump > dump_threads_every:
- self._dump_thread_states()
- self._last_thread_dump = time.time()
-
def _run_tests(self, file_list, result_summary):
"""Runs the tests in the file_list.
- Return: A tuple (failures, thread_timings, test_timings,
+ Return: A tuple (keyboard_interrupted, thread_timings, test_timings,
individual_test_timings)
- failures is a map from test to list of failure types
+ keyboard_interrupted is whether someone typed Ctrl^C
thread_timings is a list of dicts with the total runtime
of each thread with 'name', 'num_tests', 'total_time' properties
test_timings is a list of timings for each sharded subdirectory
result_summary)
self._printer.print_update("Starting testing ...")
- # Wait for the threads to finish and collect test failures.
- failures = {}
- test_timings = {}
- individual_test_timings = []
- thread_timings = []
+ keyboard_interrupted = self._wait_for_threads_to_finish(threads,
+ result_summary)
+ (thread_timings, test_timings, individual_test_timings) = \
+ self._collect_timing_info(threads)
+
+ return (keyboard_interrupted, thread_timings, test_timings,
+ individual_test_timings)
+
+ def _wait_for_threads_to_finish(self, threads, result_summary):
keyboard_interrupted = False
try:
# Loop through all the threads waiting for them to finish.
- for thread in threads:
- # FIXME: We'll end up waiting on the first thread the whole
- # time. That means we won't notice exceptions on other
- # threads until the first one exits.
- # We should instead while True: in the outer loop
- # and then loop through threads joining and checking
- # isAlive and get_exception_info. Exiting on any exception.
- while thread.isAlive():
- # Wake the main thread every 0.1 seconds so we
- # can call update_summary in a timely fashion.
- thread.join(0.1)
- # HACK: Used for debugging threads on the bots.
- self._dump_thread_states_if_necessary()
- self.update_summary(result_summary)
+ some_thread_is_alive = True
+ while some_thread_is_alive:
+ some_thread_is_alive = False
+ t = time.time()
+ for thread in threads:
+ exception_info = thread.exception_info()
+ if exception_info is not None:
+ # Re-raise the thread's exception here to make it
+ # clear that testing was aborted. Otherwise,
+ # the tests that did not run would be assumed
+ # to have passed.
+ raise (exception_info[0], exception_info[1],
+ exception_info[2])
+
+ if thread.isAlive():
+ some_thread_is_alive = True
+ next_timeout = thread.next_timeout()
+ if (next_timeout and t > next_timeout):
+ _log_wedged_thread(thread)
+ thread.clear_next_timeout()
+
+ self.update_summary(result_summary)
+
+ if some_thread_is_alive:
+ time.sleep(0.1)
except KeyboardInterrupt:
keyboard_interrupted = True
for thread in threads:
thread.cancel()
- if not keyboard_interrupted:
- for thread in threads:
- # Check whether a thread died before normal completion.
- exception_info = thread.get_exception_info()
- if exception_info is not None:
- # Re-raise the thread's exception here to make it clear
- # something went wrong. Otherwise, the tests that did not
- # run would be assumed to have passed.
- raise (exception_info[0], exception_info[1],
- exception_info[2])
+ return keyboard_interrupted
+
+ def _collect_timing_info(self, threads):
+ test_timings = {}
+ individual_test_timings = []
+ thread_timings = []
for thread in threads:
thread_timings.append({'name': thread.getName(),
'total_time': thread.get_total_time()})
test_timings.update(thread.get_directory_timing_stats())
individual_test_timings.extend(thread.get_test_results())
- return (keyboard_interrupted, thread_timings, test_timings,
- individual_test_timings)
+
+ return (thread_timings, test_timings, individual_test_timings)
def needs_http(self):
"""Returns whether the test runner needs an HTTP server."""
return options, args
+def _find_thread_stack(id):
+ """Returns a stack object that can be used to dump a stack trace for
+ the given thread id (or None if the id is not found)."""
+ for thread_id, stack in sys._current_frames().items():
+ if thread_id == id:
+ return stack
+ return None
+
+
+def _log_stack(stack):
+ """Log a stack trace to log.error()."""
+ for filename, lineno, name, line in traceback.extract_stack(stack):
+ _log.error('File: "%s", line %d, in %s' % (filename, lineno, name))
+ if line:
+ _log.error(' %s' % line.strip())
+
+
+def _log_wedged_thread(thread):
+ """Log information about the given thread state."""
+ id = thread.id()
+ stack = _find_thread_stack(id)
+ assert(stack is not None)
+ _log.error("")
+ _log.error("thread %s (%d) is wedged" % (thread.getName(), id))
+ _log_stack(stack)
+ _log.error("")
+
+
def main():
options, args = parse_args()
port_obj = port.get(options.platform, options)
"""Unit tests for run_webkit_tests."""
import codecs
+import logging
import os
+import pdb
+import Queue
import sys
+import thread
+import time
+import threading
import unittest
from webkitpy.common import array_stream
from webkitpy.layout_tests import port
from webkitpy.layout_tests import run_webkit_tests
+from webkitpy.layout_tests.layout_package import dump_render_tree_thread
from webkitpy.thirdparty.mock import Mock
self.assertEqual(buildbot_output.get(), [])
+
def _mocked_open(original_open, file_list):
def _wrapper(name, mode, encoding):
if name.find("-expected.") != -1 and mode == "w":
'fast/html']))
+class TestThread(dump_render_tree_thread.WatchableThread):
+ def __init__(self, started_queue, stopping_queue):
+ dump_render_tree_thread.WatchableThread.__init__(self)
+ self._started_queue = started_queue
+ self._stopping_queue = stopping_queue
+ self._timeout = False
+ self._timeout_queue = Queue.Queue()
+
+ def run(self):
+ self._thread_id = thread.get_ident()
+ try:
+ self._started_queue.put('')
+ msg = self._stopping_queue.get()
+ if msg == 'KeyboardInterrupt':
+ raise KeyboardInterrupt
+ elif msg == 'Exception':
+ raise ValueError()
+ elif msg == 'Timeout':
+ self._timeout = True
+ self._timeout_queue.get()
+ except:
+ self._exception_info = sys.exc_info()
+
+ def next_timeout(self):
+ if self._timeout:
+ self._timeout_queue.put('done')
+ return time.time() - 10
+ return time.time()
+
+
+class TestHandler(logging.Handler):
+ def __init__(self, astream):
+ logging.Handler.__init__(self)
+ self._stream = astream
+
+ def emit(self, record):
+ self._stream.write(self.format(record))
+
+
+class WaitForThreadsToFinishTest(unittest.TestCase):
+ class MockTestRunner(run_webkit_tests.TestRunner):
+ def __init__(self):
+ pass
+
+ def __del__(self):
+ pass
+
+ def update_summary(self, result_summary):
+ pass
+
+ def run_one_thread(self, msg):
+ runner = self.MockTestRunner()
+ starting_queue = Queue.Queue()
+ stopping_queue = Queue.Queue()
+ child_thread = TestThread(starting_queue, stopping_queue)
+ child_thread.start()
+ started_msg = starting_queue.get()
+ stopping_queue.put(msg)
+ threads = [child_thread]
+ return runner._wait_for_threads_to_finish(threads, None)
+
+ def test_basic(self):
+ interrupted = self.run_one_thread('')
+ self.assertFalse(interrupted)
+
+ def test_interrupt(self):
+ interrupted = self.run_one_thread('KeyboardInterrupt')
+ self.assertTrue(interrupted)
+
+ def test_timeout(self):
+ interrupted = self.run_one_thread('Timeout')
+ self.assertFalse(interrupted)
+
+ def test_exception(self):
+ self.assertRaises(ValueError, self.run_one_thread, 'Exception')
+
+
+class StandaloneFunctionsTest(unittest.TestCase):
+ def test_log_wedged_thread(self):
+ logger = run_webkit_tests._log
+ astream = array_stream.ArrayStream()
+ handler = TestHandler(astream)
+ logger.addHandler(handler)
+
+ starting_queue = Queue.Queue()
+ stopping_queue = Queue.Queue()
+ child_thread = TestThread(starting_queue, stopping_queue)
+ child_thread.start()
+ msg = starting_queue.get()
+
+ run_webkit_tests._log_wedged_thread(child_thread)
+ stopping_queue.put('')
+ child_thread.join(timeout=1.0)
+
+ self.assertFalse(astream.empty())
+ self.assertFalse(child_thread.isAlive())
+
+ def test_find_thread_stack(self):
+ id, stack = sys._current_frames().items()[0]
+ found_stack = run_webkit_tests._find_thread_stack(id)
+ self.assertNotEqual(found_stack, None)
+
+ found_stack = run_webkit_tests._find_thread_stack(0)
+ self.assertEqual(found_stack, None)
+
if __name__ == '__main__':
unittest.main()