2010-12-02 Dirk Pranke <dpranke@chromium.org>
authordpranke@chromium.org <dpranke@chromium.org@268f45cc-cd09-0410-ab3c-d52691b4dbfc>
Fri, 3 Dec 2010 03:41:43 +0000 (03:41 +0000)
committerdpranke@chromium.org <dpranke@chromium.org@268f45cc-cd09-0410-ab3c-d52691b4dbfc>
Fri, 3 Dec 2010 03:41:43 +0000 (03:41 +0000)
        Reviewed by Tony Chang.

        nrwt multiprocessing - move logic back into run_webkit_tests

        This change moves a bunch of logic that I had put into
        message_broker back into run_webkit_tests, in a slightly
        different format. WorkerMessageBroker needed to become less aware of
        the logic the TestRunner class uses, and more generic.
        Eventually the MessageBroker will only do generic messaging and
        thread/process-pooling, and (almost) all of the
        run-webkit-tests-specific logic will be moved to
        run_webkit_tests.py and dump_render_tree_thread.py.

        The biggest changes are that the Broker can now start a single
        worker, but the responsibility for starting all of them is pushed
        back to the TestRunner (Manager), and the logic for checking if
        the threads are done or wedged is moved back to TestRunner. We
        also remove WorkerMessageBroker.cleanup (not needed) and
        cancel_workers (they have to be cancelled individually).

        The  message_broker is now encapsulated inside
        TestRunner._run_tests(); it only needs to exist while actually
        running the tests.

        Also, delete a bunch of tests in message_broker_unittest that no
        longer make much sense.

        This patch depends on bug 50372.

        https://bugs.webkit.org/show_bug.cgi?id=50374

        * Scripts/webkitpy/layout_tests/layout_package/dump_render_tree_thread.py:
        * Scripts/webkitpy/layout_tests/layout_package/message_broker.py:
        * Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py:
        * Scripts/webkitpy/layout_tests/run_webkit_tests.py:
        * Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py:

git-svn-id: http://svn.webkit.org/repository/webkit/trunk@73231 268f45cc-cd09-0410-ab3c-d52691b4dbfc

WebKitTools/ChangeLog
WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker.py
WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py
WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests.py
WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py

index d675077c06c3788381463061f46d3dd5278fc7b4..2de59e7ad0190ed7b34c8df5d64e30ebf71f1649 100644 (file)
@@ -1,3 +1,42 @@
+2010-12-02  Dirk Pranke  <dpranke@chromium.org>
+
+        Reviewed by Tony Chang.
+
+        nrwt multiprocessing - move logic back into run_webkit_tests
+
+        This change moves a bunch of logic that I had put into
+        message_broker back into run_webkit_tests, in a slightly
+        different format. WorkerMessageBroker needed to become less aware of
+        the logic the TestRunner class uses, and more generic.
+        Eventually the MessageBroker will only do generic messaging and
+        thread/process-pooling, and (almost) all of the
+        run-webkit-tests-specific logic will be moved to
+        run_webkit_tests.py and dump_render_tree_thread.py.
+        
+        The biggest changes are that the Broker can now start a single
+        worker, but the responsibility for starting all of them is pushed
+        back to the TestRunner (Manager), and the logic for checking if
+        the threads are done or wedged is moved back to TestRunner. We
+        also remove WorkerMessageBroker.cleanup (not needed) and
+        cancel_workers (they have to be cancelled individually).
+        
+        The  message_broker is now encapsulated inside
+        TestRunner._run_tests(); it only needs to exist while actually
+        running the tests.
+
+        Also, delete a bunch of tests in message_broker_unittest that no
+        longer make much sense.
+
+        This patch depends on bug 50372.
+
+        https://bugs.webkit.org/show_bug.cgi?id=50374
+
+        * Scripts/webkitpy/layout_tests/layout_package/dump_render_tree_thread.py:
+        * Scripts/webkitpy/layout_tests/layout_package/message_broker.py:
+        * Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py:
+        * Scripts/webkitpy/layout_tests/run_webkit_tests.py:
+        * Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py:
+
 2010-12-02  Hayato Ito  <hayato@chromium.org>
 
         Reviewed by Eric Seidel.
index 285b2e7b1fd49029f238ae85d54a250875dedd1f..bc3d336f085c3030dffe1799bbeb0eebc2335f50 100644 (file)
@@ -64,114 +64,88 @@ class _WorkerMessageBroker(object):
     def __init__(self, port, options):
         self._port = port
         self._options = options
-        self._num_workers = int(self._options.child_processes)
 
-        # This maps worker names to their TestShellThread objects.
+        # This maps worker_names to TestShellThreads
         self._threads = {}
 
-    def start_workers(self, test_runner):
-        """Starts up the pool of workers for running the tests.
-
-        Args:
-            test_runner: a handle to the manager/TestRunner object
-        """
-        self._test_runner = test_runner
-        for worker_number in xrange(self._num_workers):
-            thread = self.start_worker(worker_number)
-            self._threads[thread.name()] = thread
-        return self._threads.values()
-
-    def start_worker(self, worker_number):
-        # FIXME: Replace with something that isn't a thread.
+    def start_worker(self, test_runner, worker_number):
+        """Start a worker with the given index number.
+
+        Returns the actual TestShellThread object."""
+        # FIXME: Remove dependencies on test_runner.
+        # FIXME: Replace with something that isn't a thread, and return
+        # the name of the worker, not the thread itself. We need to return
+        # the thread itself for now to allow TestRunner to access the object
+        # directly to read shared state.
+        thread = dump_render_tree_thread.TestShellThread(self._port,
+            self._options, worker_number, test_runner._current_filename_queue,
+            test_runner._result_queue)
+        self._threads[thread.name()] = thread
         # Note: Don't start() the thread! If we did, it would actually
         # create another thread and start executing it, and we'd no longer
         # be single-threaded.
-        return dump_render_tree_thread.TestShellThread(self._port,
-            self._options, worker_number,
-            self._test_runner._current_filename_queue,
-            self._test_runner._result_queue)
+        return thread
 
-    def run_message_loop(self):
-        """Loop processing messages until done."""
-        raise NotImplementedError
+    def cancel_worker(self, worker_name):
+        """Attempt to cancel a worker (best-effort). The worker may still be
+        running after this call returns."""
+        self._threads[worker_name].cancel()
 
-    def cancel_workers(self):
-        """Cancel/interrupt any workers that are still alive."""
-        pass
+    def log_wedged_worker(self, worker_name):
+        """Log information about the given worker's state."""
+        raise NotImplementedError
 
-    def cleanup(self):
-        """Perform any necessary cleanup on shutdown."""
-        pass
+    def run_message_loop(self, test_runner):
+        """Loop processing messages until done."""
+        # FIXME: eventually we'll need a message loop that the workers
+        # can also call.
+        raise NotImplementedError
 
 
 class _InlineBroker(_WorkerMessageBroker):
-    def run_message_loop(self):
+    def run_message_loop(self, test_runner):
         thread = self._threads.values()[0]
-        thread.run_in_main_thread(self._test_runner,
-                                  self._test_runner._current_result_summary)
-        self._test_runner.update()
+        thread.run_in_main_thread(test_runner,
+                                  test_runner._current_result_summary)
+
+    def log_wedged_worker(self, worker_name):
+        raise AssertionError('_InlineBroker.log_wedged_worker() called')
 
 
 class _MultiThreadedBroker(_WorkerMessageBroker):
-    def start_worker(self, worker_number):
-        thread = _WorkerMessageBroker.start_worker(self, worker_number)
+    def start_worker(self, test_runner, worker_number):
+        thread = _WorkerMessageBroker.start_worker(self, test_runner,
+                                                   worker_number)
+        # Unlike the base implementation, here we actually want to start
+        # the thread.
         thread.start()
         return thread
 
-    def run_message_loop(self):
-        # Loop through all the threads waiting for them to finish.
-        some_thread_is_alive = True
-        while some_thread_is_alive:
-            some_thread_is_alive = False
-            t = time.time()
-            for thread in self._threads.values():
-                exception_info = thread.exception_info()
-                if exception_info is not None:
-                    # Re-raise the thread's exception here to make it
-                    # clear that testing was aborted. Otherwise,
-                    # the tests that did not run would be assumed
-                    # to have passed.
-                    raise exception_info[0], exception_info[1], exception_info[2]
-
-                if thread.isAlive():
-                    some_thread_is_alive = True
-                    next_timeout = thread.next_timeout()
-                    if next_timeout and t > next_timeout:
-                        log_wedged_worker(thread.name(), thread.id())
-                        thread.clear_next_timeout()
-
-            self._test_runner.update()
-
-            if some_thread_is_alive:
-                time.sleep(0.01)
-
-    def cancel_workers(self):
-        for thread in self._threads.values():
-            thread.cancel()
-
-
-def log_wedged_worker(name, id):
-    """Log information about the given worker state."""
-    stack = _find_thread_stack(id)
-    assert(stack is not None)
-    _log.error("")
-    _log.error("%s (tid %d) is wedged" % (name, id))
-    _log_stack(stack)
-    _log.error("")
-
-
-def _find_thread_stack(id):
-    """Returns a stack object that can be used to dump a stack trace for
-    the given thread id (or None if the id is not found)."""
-    for thread_id, stack in sys._current_frames().items():
-        if thread_id == id:
-            return stack
-    return None
-
-
-def _log_stack(stack):
-    """Log a stack trace to log.error()."""
-    for filename, lineno, name, line in traceback.extract_stack(stack):
-        _log.error('File: "%s", line %d, in %s' % (filename, lineno, name))
-        if line:
-            _log.error('  %s' % line.strip())
+    def run_message_loop(self, test_runner):
+        # FIXME: Remove the dependencies on test_runner. Checking on workers
+        # should be done via a timer firing.
+        test_runner._check_on_workers()
+
+    def log_wedged_worker(self, worker_name):
+        thread = self._threads[worker_name]
+        stack = self._find_thread_stack(thread.id())
+        assert(stack is not None)
+        _log.error("")
+        _log.error("%s (tid %d) is wedged" % (worker_name, thread.id()))
+        self._log_stack(stack)
+        _log.error("")
+
+    def _find_thread_stack(self, id):
+        """Returns a stack object that can be used to dump a stack trace for
+        the given thread id (or None if the id is not found)."""
+        for thread_id, stack in sys._current_frames().items():
+            if thread_id == id:
+                return stack
+        return None
+
+    def _log_stack(self, stack):
+        """Log a stack trace to log.error()."""
+        for filename, lineno, name, line in traceback.extract_stack(stack):
+            _log.error('File: "%s", line %d, in %s' % (filename, lineno, name))
+            if line:
+                _log.error('  %s' % line.strip())
index c006471e4a9fb25fe1c68094576f32ce5181f5d9..d46df4c57f15496de346da9a8469368d5a8fd804 100644 (file)
@@ -42,139 +42,60 @@ from webkitpy.layout_tests import run_webkit_tests
 
 import message_broker
 
+# FIXME: Boy do we need a lot more tests here ...
 
-class TestThread(threading.Thread):
-    def __init__(self, started_queue, stopping_queue):
-        threading.Thread.__init__(self)
-        self._id = None
-        self._started_queue = started_queue
-        self._stopping_queue = stopping_queue
-        self._timeout = False
-        self._timeout_queue = Queue.Queue()
-        self._exception_info = None
-
-    def id(self):
-        return self._id
-
-    def name(self):
-        return 'worker/0'
-
-    def run(self):
-        self._covered_run()
-
-    def _covered_run(self):
-        # FIXME: this is a separate routine to work around a bug
-        # in coverage: see http://bitbucket.org/ned/coveragepy/issue/85.
-        self._id = thread.get_ident()
-        try:
-            self._started_queue.put('')
-            msg = self._stopping_queue.get()
-            if msg == 'KeyboardInterrupt':
-                raise KeyboardInterrupt
-            elif msg == 'Exception':
-                raise ValueError()
-            elif msg == 'Timeout':
-                self._timeout = True
-                self._timeout_queue.get()
-        except:
-            self._exception_info = sys.exc_info()
-
-    def exception_info(self):
-        return self._exception_info
-
-    def next_timeout(self):
-        if self._timeout:
-            self._timeout_queue.put('done')
-            return time.time() - 10
-        return time.time()
-
-    def clear_next_timeout(self):
-        self._next_timeout = None
-
-class TestHandler(logging.Handler):
-    def __init__(self, astream):
-        logging.Handler.__init__(self)
-        self._stream = astream
-
-    def emit(self, record):
-        self._stream.write(self.format(record))
-
-
-class MultiThreadedBrokerTest(unittest.TestCase):
-    class MockTestRunner(object):
-        def __init__(self):
-            pass
-
-        def __del__(self):
-            pass
-
-        def update(self):
-            pass
-
-    def run_one_thread(self, msg):
-        runner = self.MockTestRunner()
-        port = None
-        options = mocktool.MockOptions(child_processes='1')
-        starting_queue = Queue.Queue()
-        stopping_queue = Queue.Queue()
-        broker = message_broker._MultiThreadedBroker(port, options)
-        broker._test_runner = runner
-        child_thread = TestThread(starting_queue, stopping_queue)
-        name = child_thread.name()
-        broker._threads[name] = child_thread
-        child_thread.start()
-        started_msg = starting_queue.get()
-        stopping_queue.put(msg)
-        return broker.run_message_loop()
 
-    def test_basic(self):
-        interrupted = self.run_one_thread('')
-        self.assertFalse(interrupted)
+class TestThreadStacks(unittest.TestCase):
+    class Thread(threading.Thread):
+        def __init__(self, started_queue, stopping_queue):
+            threading.Thread.__init__(self)
+            self._id = None
+            self._started_queue = started_queue
+            self._stopping_queue = stopping_queue
 
-    def test_interrupt(self):
-        self.assertRaises(KeyboardInterrupt, self.run_one_thread, 'KeyboardInterrupt')
+        def id(self):
+            return self._id
 
-    def test_timeout(self):
-        oc = outputcapture.OutputCapture()
-        oc.capture_output()
-        interrupted = self.run_one_thread('Timeout')
-        self.assertFalse(interrupted)
-        oc.restore_output()
+        def name(self):
+            return 'worker/0'
 
-    def test_exception(self):
-        self.assertRaises(ValueError, self.run_one_thread, 'Exception')
+        def run(self):
+            self._id = thread.get_ident()
+            self._started_queue.put('')
+            msg = self._stopping_queue.get()
 
+    def make_broker(self):
+        options = mocktool.MockOptions()
+        return message_broker._MultiThreadedBroker(port=None,
+                                                     options=options)
 
-class Test(unittest.TestCase):
     def test_find_thread_stack_found(self):
+        broker = self.make_broker()
         id, stack = sys._current_frames().items()[0]
-        found_stack = message_broker._find_thread_stack(id)
+        found_stack = broker._find_thread_stack(id)
         self.assertNotEqual(found_stack, None)
 
     def test_find_thread_stack_not_found(self):
-        found_stack = message_broker._find_thread_stack(0)
+        broker = self.make_broker()
+        found_stack = broker._find_thread_stack(0)
         self.assertEqual(found_stack, None)
 
     def test_log_wedged_worker(self):
+        broker = self.make_broker()
         oc = outputcapture.OutputCapture()
         oc.capture_output()
-        logger = message_broker._log
-        astream = array_stream.ArrayStream()
-        handler = TestHandler(astream)
-        logger.addHandler(handler)
 
         starting_queue = Queue.Queue()
         stopping_queue = Queue.Queue()
-        child_thread = TestThread(starting_queue, stopping_queue)
+        child_thread = TestThreadStacks.Thread(starting_queue, stopping_queue)
         child_thread.start()
+        broker._threads[child_thread.name()] = child_thread
         msg = starting_queue.get()
 
-        message_broker.log_wedged_worker(child_thread.name(),
-                                         child_thread.id())
+        broker.log_wedged_worker(child_thread.name())
         stopping_queue.put('')
         child_thread.join(timeout=1.0)
 
-        self.assertFalse(astream.empty())
         self.assertFalse(child_thread.isAlive())
         oc.restore_output()
 
index 0b11d9b88b23ce19fe5af01a66984ac942436049..dd847886171fbec318dd139a699962c3436e9e14 100755 (executable)
@@ -228,6 +228,15 @@ def summarize_unexpected_results(port_obj, expectations, result_summary,
     return results
 
 
+class WorkerState(object):
+    """A class for the TestRunner/manager to use to track the current state
+    of the workers."""
+    def __init__(self, name, number, thread):
+        self.name = name
+        self.number = number
+        self.thread = thread
+
+
 class TestRunner:
     """A class for managing running a series of tests on a series of layout
     test files."""
@@ -240,19 +249,20 @@ class TestRunner:
     # in DumpRenderTree.
     DEFAULT_TEST_TIMEOUT_MS = 6 * 1000
 
-    def __init__(self, port, options, printer, message_broker):
+    def __init__(self, port, options, printer):
         """Initialize test runner data structures.
 
         Args:
           port: an object implementing port-specific
           options: a dictionary of command line options
           printer: a Printer object to record updates to.
-          message_broker: object used to communicate with workers.
         """
         self._port = port
         self._options = options
         self._printer = printer
-        self._message_broker = message_broker
+
+        # This maps worker names to the state we are tracking for each of them.
+        self._workers = {}
 
         # disable wss server. need to install pyOpenSSL on buildbots.
         # self._websocket_secure_server = websocket_server.PyWebSocket(
@@ -586,33 +596,39 @@ class TestRunner:
             result_summary: summary object to populate with the results
         """
 
+        self._workers = {}
+
         self._printer.print_update('Sharding tests ...')
         num_workers = self._num_workers()
         test_lists = self._shard_tests(file_list,
             num_workers > 1 and not self._options.experimental_fully_parallel)
+
+        broker = message_broker.get(self._port, self._options)
+        self._message_broker = broker
+
         filename_queue = Queue.Queue()
         for item in test_lists:
             filename_queue.put(item)
 
         self._printer.print_update('Starting %s ...' %
                                    grammar.pluralize('worker', num_workers))
-        message_broker = self._message_broker
         self._current_filename_queue = filename_queue
         self._current_result_summary = result_summary
 
-        if not self._options.dry_run:
-            threads = message_broker.start_workers(self)
-        else:
-            threads = []
+        for worker_number in xrange(num_workers):
+            thread = broker.start_worker(self, worker_number)
+            w = WorkerState(thread.name(), worker_number, thread)
+            self._workers[thread.name()] = w
 
         self._printer.print_update("Starting testing ...")
         keyboard_interrupted = False
         if not self._options.dry_run:
             try:
-                message_broker.run_message_loop()
+                broker.run_message_loop(self)
             except KeyboardInterrupt:
                 _log.info("Interrupted, exiting")
-                message_broker.cancel_workers()
+                for worker_name in self._workers.keys():
+                    broker.cancel_worker(worker_name)
                 keyboard_interrupted = True
             except:
                 # Unexpected exception; don't try to clean up workers.
@@ -620,21 +636,50 @@ class TestRunner:
                 raise
 
         thread_timings, test_timings, individual_test_timings = \
-            self._collect_timing_info(threads)
+            self._collect_timing_info(self._workers)
+        self._message_broker = None
 
         return (keyboard_interrupted, thread_timings, test_timings,
                 individual_test_timings)
 
-    def update(self):
-        self.update_summary(self._current_result_summary)
-
-    def _collect_timing_info(self, threads):
+    def _check_on_workers(self):
+        """Returns True iff all the workers have either completed or wedged."""
+
+        # Loop through all the threads waiting for them to finish.
+        some_thread_is_alive = True
+        while some_thread_is_alive:
+            some_thread_is_alive = False
+            t = time.time()
+            for worker in self._workers.values():
+                thread = worker.thread
+                exception_info = thread.exception_info()
+                if exception_info is not None:
+                    # Re-raise the thread's exception here to make it
+                    # clear that testing was aborted. Otherwise,
+                    # the tests that did not run would be assumed
+                    # to have passed.
+                    raise exception_info[0], exception_info[1], exception_info[2]
+
+                if thread.isAlive():
+                    some_thread_is_alive = True
+                    next_timeout = thread.next_timeout()
+                    if next_timeout and t > next_timeout:
+                        self._message_broker.log_wedged_worker(worker.name)
+                        thread.clear_next_timeout()
+
+            self.update_summary(self._current_result_summary)
+
+            if some_thread_is_alive:
+                time.sleep(0.01)
+
+    def _collect_timing_info(self, workers):
         test_timings = {}
         individual_test_timings = []
         thread_timings = []
 
-        for thread in threads:
-            thread_timings.append({'name': thread.getName(),
+        for w in workers.values():
+            thread = w.thread
+            thread_timings.append({'name': thread.name(),
                                    'num_tests': thread.get_num_tests(),
                                    'total_time': thread.get_total_time()})
             test_timings.update(thread.get_test_group_timing_stats())
@@ -1006,8 +1051,7 @@ class TestRunner:
                                   result_summary):
         """Prints the run times for slow, timeout and crash tests.
         Args:
-          individual_test_timings: List of dump_render_tree_thread.TestStats
-              for all tests.
+          individual_test_timings: List of TestStats for all tests.
           result_summary: summary object for test run
         """
         # Reverse-sort by the time spent in DumpRenderTree.
@@ -1295,13 +1339,11 @@ def run(port, options, args, regular_output=sys.stderr,
         printer.cleanup()
         return 0
 
-    broker = message_broker.get(port, options)
-
     # We wrap any parts of the run that are slow or likely to raise exceptions
     # in a try/finally to ensure that we clean up the logging configuration.
     num_unexpected_results = -1
     try:
-        test_runner = TestRunner(port, options, printer, broker)
+        test_runner = TestRunner(port, options, printer)
         test_runner._print_config()
 
         printer.print_update("Collecting tests ...")
@@ -1330,7 +1372,6 @@ def run(port, options, args, regular_output=sys.stderr,
             _log.debug("Testing completed, Exit status: %d" %
                        num_unexpected_results)
     finally:
-        broker.cleanup()
         printer.cleanup()
 
     return num_unexpected_results
index 0ae9a09c7880e29620d02763438fc13a28413cd9..d325a1c95559d59153162feaade2d80e92a8a625 100644 (file)
@@ -489,7 +489,7 @@ class TestRunnerTest(unittest.TestCase):
         mock_port.filename_to_uri = lambda name: name
 
         runner = run_webkit_tests.TestRunner(port=mock_port, options=Mock(),
-            printer=Mock(), message_broker=Mock())
+                                             printer=Mock())
         expected_html = u"""<html>
   <head>
     <title>Layout Test Results (time)</title>
@@ -507,7 +507,7 @@ class TestRunnerTest(unittest.TestCase):
         # Test that _shard_tests in run_webkit_tests.TestRunner really
         # put the http tests first in the queue.
         runner = TestRunnerWrapper(port=Mock(), options=Mock(),
-            printer=Mock(), message_broker=Mock())
+                                   printer=Mock())
 
         test_list = [
           "LayoutTests/websocket/tests/unicode.htm",