2010-11-24 Dirk Pranke <dpranke@chromium.org>
authordpranke@chromium.org <dpranke@chromium.org@268f45cc-cd09-0410-ab3c-d52691b4dbfc>
Wed, 24 Nov 2010 21:57:09 +0000 (21:57 +0000)
committerdpranke@chromium.org <dpranke@chromium.org@268f45cc-cd09-0410-ab3c-d52691b4dbfc>
Wed, 24 Nov 2010 21:57:09 +0000 (21:57 +0000)
        Reviewed by Tony Chang.

        This patch implements the first part of the manager side of the
        Broker objects - it handles creating threads, waiting for them
        to complete, and running a single-threaded loop as well.

        https://bugs.webkit.org/show_bug.cgi?id=49779

        * Scripts/webkitpy/layout_tests/layout_package/message_broker.py:
        * Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py:
        * Scripts/webkitpy/layout_tests/run_webkit_tests.py:
        * Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py:

git-svn-id: http://svn.webkit.org/repository/webkit/trunk@72698 268f45cc-cd09-0410-ab3c-d52691b4dbfc

WebKitTools/ChangeLog
WebKitTools/Scripts/webkitpy/layout_tests/layout_package/dump_render_tree_thread.py
WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker.py
WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py
WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests.py
WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py

index bafefa494000ecc4a59a4e866c1f1c60708b5c8b..7a89f33c99067d2ffd81fc48e259886601bb3ad0 100644 (file)
@@ -1,3 +1,18 @@
+2010-11-24  Dirk Pranke  <dpranke@chromium.org>
+
+        Reviewed by Tony Chang.
+
+        This patch implements the first part of the manager side of the
+        Broker objects - it handles creating threads, waiting for them
+        to complete, and running a single-threaded loop as well.
+
+        https://bugs.webkit.org/show_bug.cgi?id=49779
+
+        * Scripts/webkitpy/layout_tests/layout_package/message_broker.py:
+        * Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py:
+        * Scripts/webkitpy/layout_tests/run_webkit_tests.py:
+        * Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py:
+
 2010-11-24  Mihai Parparita  <mihaip@chromium.org>
 
         Reviewed by David Levin.
index 1bac7ef18e3e2ea7a6dcf5fa37402678e7ae2a2b..38cf6c352377b396c0c9591e7a5b5b9745d7a87a 100644 (file)
@@ -244,7 +244,7 @@ class WatchableThread(threading.Thread):
 
 
 class TestShellThread(WatchableThread):
-    def __init__(self, port, options, worker_number,
+    def __init__(self, port, options, worker_number, worker_name,
                  filename_list_queue, result_queue):
         """Initialize all the local state for this DumpRenderTree thread.
 
@@ -252,6 +252,7 @@ class TestShellThread(WatchableThread):
           port: interface to port-specific hooks
           options: command line options argument from optparse
           worker_number: identifier for a particular worker thread.
+          worker_name: for logging.
           filename_list_queue: A thread safe Queue class that contains lists
               of tuples of (filename, uri) pairs.
           result_queue: A thread safe Queue class that will contain
@@ -261,7 +262,7 @@ class TestShellThread(WatchableThread):
         self._port = port
         self._options = options
         self._worker_number = worker_number
-        self._name = 'worker-%d' % worker_number
+        self._name = worker_name
         self._filename_list_queue = filename_list_queue
         self._result_queue = result_queue
         self._filename_list = []
index 13951c02ef3d172a39e7369ceb2197f1cbcb9b98..e520a9caa6b7682ebfdd2a98bb217eb85d7251bc 100644 (file)
 
 """Module for handling messages, threads, processes, and concurrency for run-webkit-tests.
 
-The model we use is that of a message broker - it provides a messaging
-abstraction and message loops, and handles launching threads and/or processes
-depending on the requested configuration.
+Testing is accomplished by having a manager (TestRunner) gather all of the
+tests to be run, and sending messages to a pool of workers (TestShellThreads)
+to run each test. Each worker communicates with one driver (usually
+DumpRenderTree) to run one test at a time and then compare the output against
+what we expected to get.
+
+This modules provides a message broker that connects the manager to the
+workers: it provides a messaging abstraction and message loops, and
+handles launching threads and/or processes depending on the
+requested configuration.
 """
 
 import logging
 import sys
+import time
 import traceback
 
+import dump_render_tree_thread
 
 _log = logging.getLogger(__name__)
 
 
-def log_wedged_thread(id):
-    """Log information about the given thread state."""
+def get(port, options):
+    """Return an instance of a WorkerMessageBroker."""
+    worker_model = options.worker_model
+    if worker_model == 'inline':
+        return InlineBroker(port, options)
+    if worker_model == 'threads':
+        return MultiThreadedBroker(port, options)
+    raise ValueError('unsupported value for --worker-model: %s' % worker_model)
+
+
+class _WorkerState(object):
+    def __init__(self, name):
+        self.name = name
+        self.thread = None
+
+
+class WorkerMessageBroker(object):
+    def __init__(self, port, options):
+        self._port = port
+        self._options = options
+        self._num_workers = int(self._options.child_processes)
+
+        # This maps worker names to their _WorkerState values.
+        self._workers = {}
+
+    def _threads(self):
+        return tuple([w.thread for w in self._workers.values()])
+
+    def start_workers(self, test_runner):
+        """Starts up the pool of workers for running the tests.
+
+        Args:
+            test_runner: a handle to the manager/TestRunner object
+        """
+        self._test_runner = test_runner
+        for worker_number in xrange(self._num_workers):
+            worker = _WorkerState('worker-%d' % worker_number)
+            worker.thread = self._start_worker(worker_number, worker.name)
+            self._workers[worker.name] = worker
+        return self._threads()
+
+    def _start_worker(self, worker_number, worker_name):
+        raise NotImplementedError
+
+    def run_message_loop(self):
+        """Loop processing messages until done."""
+        raise NotImplementedError
+
+    def cancel_workers(self):
+        """Cancel/interrupt any workers that are still alive."""
+        pass
+
+    def cleanup(self):
+        """Perform any necessary cleanup on shutdown."""
+        pass
+
+
+class InlineBroker(WorkerMessageBroker):
+    def _start_worker(self, worker_number, worker_name):
+        # FIXME: Replace with something that isn't a thread.
+        thread = dump_render_tree_thread.TestShellThread(self._port,
+            self._options, worker_number, worker_name,
+            self._test_runner._current_filename_queue,
+            self._test_runner._result_queue)
+        # Note: Don't start() the thread! If we did, it would actually
+        # create another thread and start executing it, and we'd no longer
+        # be single-threaded.
+        return thread
+
+    def run_message_loop(self):
+        thread = self._threads()[0]
+        thread.run_in_main_thread(self._test_runner,
+                                  self._test_runner._current_result_summary)
+        self._test_runner.update()
+
+
+class MultiThreadedBroker(WorkerMessageBroker):
+    def _start_worker(self, worker_number, worker_name):
+        thread = dump_render_tree_thread.TestShellThread(self._port,
+            self._options, worker_number, worker_name,
+            self._test_runner._current_filename_queue,
+            self._test_runner._result_queue)
+        thread.start()
+        return thread
+
+    def run_message_loop(self):
+        threads = self._threads()
+
+        # Loop through all the threads waiting for them to finish.
+        some_thread_is_alive = True
+        while some_thread_is_alive:
+            some_thread_is_alive = False
+            t = time.time()
+            for thread in threads:
+                exception_info = thread.exception_info()
+                if exception_info is not None:
+                    # Re-raise the thread's exception here to make it
+                    # clear that testing was aborted. Otherwise,
+                    # the tests that did not run would be assumed
+                    # to have passed.
+                    raise exception_info[0], exception_info[1], exception_info[2]
+
+                if thread.isAlive():
+                    some_thread_is_alive = True
+                    next_timeout = thread.next_timeout()
+                    if next_timeout and t > next_timeout:
+                        log_wedged_worker(thread.getName(), thread.id())
+                        thread.clear_next_timeout()
+
+            self._test_runner.update()
+
+            if some_thread_is_alive:
+                time.sleep(0.01)
+
+    def cancel_workers(self):
+        threads = self._threads()
+        for thread in threads:
+            thread.cancel()
+
+
+def log_wedged_worker(name, id):
+    """Log information about the given worker state."""
     stack = _find_thread_stack(id)
     assert(stack is not None)
     _log.error("")
-    _log.error("Thread %d is wedged" % id)
+    _log.error("%s (tid %d) is wedged" % (name, id))
     _log_stack(stack)
     _log.error("")
 
index d7087455b3cc50e82c7c318cb387a2db98f21017..6f04fd35518a04ebd6919ff8a97613cf4c5be1f4 100644 (file)
@@ -30,25 +30,34 @@ import logging
 import Queue
 import sys
 import thread
+import threading
 import time
 import unittest
 
 from webkitpy.common import array_stream
 from webkitpy.common.system import outputcapture
+from webkitpy.tool import mocktool
 
 from webkitpy.layout_tests import run_webkit_tests
 
-import dump_render_tree_thread
 import message_broker
 
 
-class TestThread(dump_render_tree_thread.WatchableThread):
+class TestThread(threading.Thread):
     def __init__(self, started_queue, stopping_queue):
-        dump_render_tree_thread.WatchableThread.__init__(self)
+        threading.Thread.__init__(self)
+        self._thread_id = None
         self._started_queue = started_queue
         self._stopping_queue = stopping_queue
         self._timeout = False
         self._timeout_queue = Queue.Queue()
+        self._exception_info = None
+
+    def id(self):
+        return self._thread_id
+
+    def getName(self):
+        return "worker-0"
 
     def run(self):
         self._covered_run()
@@ -70,12 +79,17 @@ class TestThread(dump_render_tree_thread.WatchableThread):
         except:
             self._exception_info = sys.exc_info()
 
+    def exception_info(self):
+        return self._exception_info
+
     def next_timeout(self):
         if self._timeout:
             self._timeout_queue.put('done')
             return time.time() - 10
         return time.time()
 
+    def clear_next_timeout(self):
+        self._next_timeout = None
 
 class TestHandler(logging.Handler):
     def __init__(self, astream):
@@ -86,35 +100,39 @@ class TestHandler(logging.Handler):
         self._stream.write(self.format(record))
 
 
-class WaitForThreadsToFinishTest(unittest.TestCase):
-    class MockTestRunner(run_webkit_tests.TestRunner):
+class MultiThreadedBrokerTest(unittest.TestCase):
+    class MockTestRunner(object):
         def __init__(self):
             pass
 
         def __del__(self):
             pass
 
-        def update_summary(self, result_summary):
+        def update(self):
             pass
 
     def run_one_thread(self, msg):
         runner = self.MockTestRunner()
+        port = None
+        options = mocktool.MockOptions(child_processes='1')
         starting_queue = Queue.Queue()
         stopping_queue = Queue.Queue()
+        broker = message_broker.MultiThreadedBroker(port, options)
+        broker._test_runner = runner
         child_thread = TestThread(starting_queue, stopping_queue)
+        broker._workers['worker-0'] = message_broker._WorkerState('worker-0')
+        broker._workers['worker-0'].thread = child_thread
         child_thread.start()
         started_msg = starting_queue.get()
         stopping_queue.put(msg)
-        threads = [child_thread]
-        return runner._wait_for_threads_to_finish(threads, None)
+        return broker.run_message_loop()
 
     def test_basic(self):
         interrupted = self.run_one_thread('')
         self.assertFalse(interrupted)
 
     def test_interrupt(self):
-        interrupted = self.run_one_thread('KeyboardInterrupt')
-        self.assertTrue(interrupted)
+        self.assertRaises(KeyboardInterrupt, self.run_one_thread, 'KeyboardInterrupt')
 
     def test_timeout(self):
         oc = outputcapture.OutputCapture()
@@ -137,7 +155,7 @@ class Test(unittest.TestCase):
         found_stack = message_broker._find_thread_stack(0)
         self.assertEqual(found_stack, None)
 
-    def test_log_wedged_thread(self):
+    def test_log_wedged_worker(self):
         oc = outputcapture.OutputCapture()
         oc.capture_output()
         logger = message_broker._log
@@ -151,7 +169,8 @@ class Test(unittest.TestCase):
         child_thread.start()
         msg = starting_queue.get()
 
-        message_broker.log_wedged_thread(child_thread.id())
+        message_broker.log_wedged_worker(child_thread.getName(),
+                                         child_thread.id())
         stopping_queue.put('')
         child_thread.join(timeout=1.0)
 
index 1ceaee92367c382d47c71fb9cee95e47abce3fdd..d78e45299fd4ed2f13c15c4ee09c3ae234d739fe 100755 (executable)
@@ -75,6 +75,7 @@ from layout_package import test_results_uploader
 
 from webkitpy.common.system import user
 from webkitpy.thirdparty import simplejson
+from webkitpy.tool import grammar
 
 import port
 
@@ -239,17 +240,19 @@ class TestRunner:
     # in DumpRenderTree.
     DEFAULT_TEST_TIMEOUT_MS = 6 * 1000
 
-    def __init__(self, port, options, printer):
+    def __init__(self, port, options, printer, message_broker):
         """Initialize test runner data structures.
 
         Args:
           port: an object implementing port-specific
           options: a dictionary of command line options
           printer: a Printer object to record updates to.
+          message_broker: object used to communicate with workers.
         """
         self._port = port
         self._options = options
         self._printer = printer
+        self._message_broker = message_broker
 
         # disable wss server. need to install pyOpenSSL on buildbots.
         # self._websocket_secure_server = websocket_server.PyWebSocket(
@@ -565,34 +568,6 @@ class TestRunner:
                 return True
         return False
 
-    def _instantiate_dump_render_tree_threads(self, test_files,
-                                              result_summary):
-        """Instantitates and starts the TestShellThread(s).
-
-        Return:
-          The list of threads.
-        """
-        num_workers = self._num_workers()
-        test_lists = self._shard_tests(test_files,
-            num_workers > 1 and not self._options.experimental_fully_parallel)
-        filename_queue = Queue.Queue()
-        for item in test_lists:
-            filename_queue.put(item)
-
-        # Instantiate TestShellThreads and start them.
-        threads = []
-        for worker_number in xrange(num_workers):
-            thread = dump_render_tree_thread.TestShellThread(self._port,
-                self._options, worker_number,
-                filename_queue, self._result_queue)
-            if num_workers > 1:
-                thread.start()
-            else:
-                thread.run_in_main_thread(self, result_summary)
-            threads.append(thread)
-
-        return threads
-
     def _num_workers(self):
         return int(self._options.child_processes)
 
@@ -610,58 +585,43 @@ class TestRunner:
               in the form {filename:filename, test_run_time:test_run_time}
             result_summary: summary object to populate with the results
         """
-        plural = ""
-        if self._num_workers() > 1:
-            plural = "s"
-        self._printer.print_update('Starting %s%s ...' %
-                                   (self._port.driver_name(), plural))
-        threads = self._instantiate_dump_render_tree_threads(file_list,
-                                                             result_summary)
-        self._printer.print_update("Starting testing ...")
 
-        keyboard_interrupted = self._wait_for_threads_to_finish(threads,
-                                                                result_summary)
-        (thread_timings, test_timings, individual_test_timings) = \
-            self._collect_timing_info(threads)
+        self._printer.print_update('Sharding tests ...')
+        num_workers = self._num_workers()
+        test_lists = self._shard_tests(file_list,
+            num_workers > 1 and not self._options.experimental_fully_parallel)
+        filename_queue = Queue.Queue()
+        for item in test_lists:
+            filename_queue.put(item)
 
-        return (keyboard_interrupted, thread_timings, test_timings,
-                individual_test_timings)
+        self._printer.print_update('Starting %s ...' %
+                                   grammar.pluralize('worker', num_workers))
+        message_broker = self._message_broker
+        self._current_filename_queue = filename_queue
+        self._current_result_summary = result_summary
+        threads = message_broker.start_workers(self)
 
-    def _wait_for_threads_to_finish(self, threads, result_summary):
+        self._printer.print_update("Starting testing ...")
         keyboard_interrupted = False
         try:
-            # Loop through all the threads waiting for them to finish.
-            some_thread_is_alive = True
-            while some_thread_is_alive:
-                some_thread_is_alive = False
-                t = time.time()
-                for thread in threads:
-                    exception_info = thread.exception_info()
-                    if exception_info is not None:
-                        # Re-raise the thread's exception here to make it
-                        # clear that testing was aborted. Otherwise,
-                        # the tests that did not run would be assumed
-                        # to have passed.
-                        raise exception_info[0], exception_info[1], exception_info[2]
-
-                    if thread.isAlive():
-                        some_thread_is_alive = True
-                        next_timeout = thread.next_timeout()
-                        if (next_timeout and t > next_timeout):
-                            message_broker.log_wedged_thread(thread.id())
-                            thread.clear_next_timeout()
-
-                self.update_summary(result_summary)
-
-                if some_thread_is_alive:
-                    time.sleep(0.01)
-
+            message_broker.run_message_loop()
         except KeyboardInterrupt:
+            _log.info("Interrupted, exiting")
+            message_broker.cancel_workers()
             keyboard_interrupted = True
-            for thread in threads:
-                thread.cancel()
+        except:
+            # Unexpected exception; don't try to clean up workers.
+            _log.info("Exception raised, exiting")
+            raise
 
-        return keyboard_interrupted
+        thread_timings, test_timings, individual_test_timings = \
+            self._collect_timing_info(threads)
+
+        return (keyboard_interrupted, thread_timings, test_timings,
+                individual_test_timings)
+
+    def update(self):
+        self.update_summary(self._current_result_summary)
 
     def _collect_timing_info(self, threads):
         test_timings = {}
@@ -1326,11 +1286,13 @@ def run(port, options, args, regular_output=sys.stderr,
         printer.cleanup()
         return 0
 
+    broker = message_broker.get(port, options)
+
     # We wrap any parts of the run that are slow or likely to raise exceptions
     # in a try/finally to ensure that we clean up the logging configuration.
     num_unexpected_results = -1
     try:
-        test_runner = TestRunner(port, options, printer)
+        test_runner = TestRunner(port, options, printer, broker)
         test_runner._print_config()
 
         printer.print_update("Collecting tests ...")
@@ -1359,6 +1321,7 @@ def run(port, options, args, regular_output=sys.stderr,
             _log.debug("Testing completed, Exit status: %d" %
                        num_unexpected_results)
     finally:
+        broker.cleanup()
         printer.cleanup()
 
     return num_unexpected_results
@@ -1595,9 +1558,8 @@ def parse_args(args=None):
             help="Number of DumpRenderTrees to run in parallel."),
         # FIXME: Display default number of child processes that will run.
         optparse.make_option("--worker-model", action="store",
-                             default="threads",
-                             help="controls worker model. Valid values are "
-                             "'inline' and 'threads' (default)."),
+            default="threads", help=("controls worker model. Valid values are "
+            "'inline' and 'threads' (default).")),
         optparse.make_option("--experimental-fully-parallel",
             action="store_true", default=False,
             help="run all tests in parallel"),
@@ -1653,13 +1615,7 @@ def parse_args(args=None):
                    old_run_webkit_tests_compat)
     option_parser = optparse.OptionParser(option_list=option_list)
 
-    options, args = option_parser.parse_args(args)
-
-    if options.worker_model not in ('inline', 'threads'):
-        option_parser.error("unsupported value for --worker-model: %s" %
-                            options.worker_model)
-
-    return options, args
+    return option_parser.parse_args(args)
 
 
 def main():
index 09a0c71933a04c84057538ce889cb90a37e73184..b3c886103c8ab715b37695bdb65061f9da9d6d48 100644 (file)
@@ -103,8 +103,8 @@ def logging_run(extra_args=None, port_obj=None, tests_included=False):
                      'websocket/tests',
                      'failures/expected/*'])
 
+    oc = outputcapture.OutputCapture()
     try:
-        oc = outputcapture.OutputCapture()
         oc.capture_output()
         options, parsed_args = run_webkit_tests.parse_args(args)
         user = MockUser()
@@ -226,7 +226,7 @@ class MainTest(unittest.TestCase):
     def test_keyboard_interrupt(self):
         # Note that this also tests running a test marked as SKIP if
         # you specify it explicitly.
-        self.assertRaises(KeyboardInterrupt, passing_run,
+        self.assertRaises(KeyboardInterrupt, logging_run,
             ['failures/expected/keyboard.html'], tests_included=True)
 
     def test_last_results(self):
@@ -378,11 +378,11 @@ class MainTest(unittest.TestCase):
         self.assertTrue(passing_run(['--worker-model', 'threads']))
 
     def test_worker_model__processes(self):
-        self.assertRaises(SystemExit, logging_run,
+        self.assertRaises(ValueError, logging_run,
                           ['--worker-model', 'processes'])
 
     def test_worker_model__unknown(self):
-        self.assertRaises(SystemExit, logging_run,
+        self.assertRaises(ValueError, logging_run,
                           ['--worker-model', 'unknown'])
 
 MainTest = skip_if(MainTest, sys.platform == 'cygwin' and compare_version(sys, '2.6')[0] < 0, 'new-run-webkit-tests tests hang on Cygwin Python 2.5.2')
@@ -466,7 +466,8 @@ class TestRunnerTest(unittest.TestCase):
         mock_port.relative_test_filename = lambda name: name
         mock_port.filename_to_uri = lambda name: name
 
-        runner = run_webkit_tests.TestRunner(port=mock_port, options=Mock(), printer=Mock())
+        runner = run_webkit_tests.TestRunner(port=mock_port, options=Mock(),
+            printer=Mock(), message_broker=Mock())
         expected_html = u"""<html>
   <head>
     <title>Layout Test Results (time)</title>
@@ -483,7 +484,8 @@ class TestRunnerTest(unittest.TestCase):
     def test_shard_tests(self):
         # Test that _shard_tests in run_webkit_tests.TestRunner really
         # put the http tests first in the queue.
-        runner = TestRunnerWrapper(port=Mock(), options=Mock(), printer=Mock())
+        runner = TestRunnerWrapper(port=Mock(), options=Mock(),
+            printer=Mock(), message_broker=Mock())
 
         test_list = [
           "LayoutTests/websocket/tests/unicode.htm",