2010-11-24 Dirk Pranke <dpranke@chromium.org>
[WebKit.git] / WebKitTools / Scripts / webkitpy / layout_tests / run_webkit_tests.py
index 1ceaee92367c382d47c71fb9cee95e47abce3fdd..d78e45299fd4ed2f13c15c4ee09c3ae234d739fe 100755 (executable)
@@ -75,6 +75,7 @@ from layout_package import test_results_uploader
 
 from webkitpy.common.system import user
 from webkitpy.thirdparty import simplejson
+from webkitpy.tool import grammar
 
 import port
 
@@ -239,17 +240,19 @@ class TestRunner:
     # in DumpRenderTree.
     DEFAULT_TEST_TIMEOUT_MS = 6 * 1000
 
-    def __init__(self, port, options, printer):
+    def __init__(self, port, options, printer, message_broker):
         """Initialize test runner data structures.
 
         Args:
           port: an object implementing port-specific
           options: a dictionary of command line options
           printer: a Printer object to record updates to.
+          message_broker: object used to communicate with workers.
         """
         self._port = port
         self._options = options
         self._printer = printer
+        self._message_broker = message_broker
 
         # disable wss server. need to install pyOpenSSL on buildbots.
         # self._websocket_secure_server = websocket_server.PyWebSocket(
@@ -565,34 +568,6 @@ class TestRunner:
                 return True
         return False
 
-    def _instantiate_dump_render_tree_threads(self, test_files,
-                                              result_summary):
-        """Instantitates and starts the TestShellThread(s).
-
-        Return:
-          The list of threads.
-        """
-        num_workers = self._num_workers()
-        test_lists = self._shard_tests(test_files,
-            num_workers > 1 and not self._options.experimental_fully_parallel)
-        filename_queue = Queue.Queue()
-        for item in test_lists:
-            filename_queue.put(item)
-
-        # Instantiate TestShellThreads and start them.
-        threads = []
-        for worker_number in xrange(num_workers):
-            thread = dump_render_tree_thread.TestShellThread(self._port,
-                self._options, worker_number,
-                filename_queue, self._result_queue)
-            if num_workers > 1:
-                thread.start()
-            else:
-                thread.run_in_main_thread(self, result_summary)
-            threads.append(thread)
-
-        return threads
-
     def _num_workers(self):
         return int(self._options.child_processes)
 
@@ -610,58 +585,43 @@ class TestRunner:
               in the form {filename:filename, test_run_time:test_run_time}
             result_summary: summary object to populate with the results
         """
-        plural = ""
-        if self._num_workers() > 1:
-            plural = "s"
-        self._printer.print_update('Starting %s%s ...' %
-                                   (self._port.driver_name(), plural))
-        threads = self._instantiate_dump_render_tree_threads(file_list,
-                                                             result_summary)
-        self._printer.print_update("Starting testing ...")
 
-        keyboard_interrupted = self._wait_for_threads_to_finish(threads,
-                                                                result_summary)
-        (thread_timings, test_timings, individual_test_timings) = \
-            self._collect_timing_info(threads)
+        self._printer.print_update('Sharding tests ...')
+        num_workers = self._num_workers()
+        test_lists = self._shard_tests(file_list,
+            num_workers > 1 and not self._options.experimental_fully_parallel)
+        filename_queue = Queue.Queue()
+        for item in test_lists:
+            filename_queue.put(item)
 
-        return (keyboard_interrupted, thread_timings, test_timings,
-                individual_test_timings)
+        self._printer.print_update('Starting %s ...' %
+                                   grammar.pluralize('worker', num_workers))
+        message_broker = self._message_broker
+        self._current_filename_queue = filename_queue
+        self._current_result_summary = result_summary
+        threads = message_broker.start_workers(self)
 
-    def _wait_for_threads_to_finish(self, threads, result_summary):
+        self._printer.print_update("Starting testing ...")
         keyboard_interrupted = False
         try:
-            # Loop through all the threads waiting for them to finish.
-            some_thread_is_alive = True
-            while some_thread_is_alive:
-                some_thread_is_alive = False
-                t = time.time()
-                for thread in threads:
-                    exception_info = thread.exception_info()
-                    if exception_info is not None:
-                        # Re-raise the thread's exception here to make it
-                        # clear that testing was aborted. Otherwise,
-                        # the tests that did not run would be assumed
-                        # to have passed.
-                        raise exception_info[0], exception_info[1], exception_info[2]
-
-                    if thread.isAlive():
-                        some_thread_is_alive = True
-                        next_timeout = thread.next_timeout()
-                        if (next_timeout and t > next_timeout):
-                            message_broker.log_wedged_thread(thread.id())
-                            thread.clear_next_timeout()
-
-                self.update_summary(result_summary)
-
-                if some_thread_is_alive:
-                    time.sleep(0.01)
-
+            message_broker.run_message_loop()
         except KeyboardInterrupt:
+            _log.info("Interrupted, exiting")
+            message_broker.cancel_workers()
             keyboard_interrupted = True
-            for thread in threads:
-                thread.cancel()
+        except:
+            # Unexpected exception; don't try to clean up workers.
+            _log.info("Exception raised, exiting")
+            raise
 
-        return keyboard_interrupted
+        thread_timings, test_timings, individual_test_timings = \
+            self._collect_timing_info(threads)
+
+        return (keyboard_interrupted, thread_timings, test_timings,
+                individual_test_timings)
+
+    def update(self):
+        self.update_summary(self._current_result_summary)
 
     def _collect_timing_info(self, threads):
         test_timings = {}
@@ -1326,11 +1286,13 @@ def run(port, options, args, regular_output=sys.stderr,
         printer.cleanup()
         return 0
 
+    broker = message_broker.get(port, options)
+
     # We wrap any parts of the run that are slow or likely to raise exceptions
     # in a try/finally to ensure that we clean up the logging configuration.
     num_unexpected_results = -1
     try:
-        test_runner = TestRunner(port, options, printer)
+        test_runner = TestRunner(port, options, printer, broker)
         test_runner._print_config()
 
         printer.print_update("Collecting tests ...")
@@ -1359,6 +1321,7 @@ def run(port, options, args, regular_output=sys.stderr,
             _log.debug("Testing completed, Exit status: %d" %
                        num_unexpected_results)
     finally:
+        broker.cleanup()
         printer.cleanup()
 
     return num_unexpected_results
@@ -1595,9 +1558,8 @@ def parse_args(args=None):
             help="Number of DumpRenderTrees to run in parallel."),
         # FIXME: Display default number of child processes that will run.
         optparse.make_option("--worker-model", action="store",
-                             default="threads",
-                             help="controls worker model. Valid values are "
-                             "'inline' and 'threads' (default)."),
+            default="threads", help=("controls worker model. Valid values are "
+            "'inline' and 'threads' (default).")),
         optparse.make_option("--experimental-fully-parallel",
             action="store_true", default=False,
             help="run all tests in parallel"),
@@ -1653,13 +1615,7 @@ def parse_args(args=None):
                    old_run_webkit_tests_compat)
     option_parser = optparse.OptionParser(option_list=option_list)
 
-    options, args = option_parser.parse_args(args)
-
-    if options.worker_model not in ('inline', 'threads'):
-        option_parser.error("unsupported value for --worker-model: %s" %
-                            options.worker_model)
-
-    return options, args
+    return option_parser.parse_args(args)
 
 
 def main():