2010-11-24 Dirk Pranke <dpranke@chromium.org>
[WebKit.git] / WebKitTools / Scripts / webkitpy / layout_tests / layout_package / message_broker.py
index 13951c02ef3d172a39e7369ceb2197f1cbcb9b98..e520a9caa6b7682ebfdd2a98bb217eb85d7251bc 100644 (file)
 
 """Module for handling messages, threads, processes, and concurrency for run-webkit-tests.
 
-The model we use is that of a message broker - it provides a messaging
-abstraction and message loops, and handles launching threads and/or processes
-depending on the requested configuration.
+Testing is accomplished by having a manager (TestRunner) gather all of the
+tests to be run, and sending messages to a pool of workers (TestShellThreads)
+to run each test. Each worker communicates with one driver (usually
+DumpRenderTree) to run one test at a time and then compare the output against
+what we expected to get.
+
+This modules provides a message broker that connects the manager to the
+workers: it provides a messaging abstraction and message loops, and
+handles launching threads and/or processes depending on the
+requested configuration.
 """
 
 import logging
 import sys
+import time
 import traceback
 
+import dump_render_tree_thread
 
 _log = logging.getLogger(__name__)
 
 
-def log_wedged_thread(id):
-    """Log information about the given thread state."""
+def get(port, options):
+    """Return an instance of a WorkerMessageBroker."""
+    worker_model = options.worker_model
+    if worker_model == 'inline':
+        return InlineBroker(port, options)
+    if worker_model == 'threads':
+        return MultiThreadedBroker(port, options)
+    raise ValueError('unsupported value for --worker-model: %s' % worker_model)
+
+
+class _WorkerState(object):
+    def __init__(self, name):
+        self.name = name
+        self.thread = None
+
+
+class WorkerMessageBroker(object):
+    def __init__(self, port, options):
+        self._port = port
+        self._options = options
+        self._num_workers = int(self._options.child_processes)
+
+        # This maps worker names to their _WorkerState values.
+        self._workers = {}
+
+    def _threads(self):
+        return tuple([w.thread for w in self._workers.values()])
+
+    def start_workers(self, test_runner):
+        """Starts up the pool of workers for running the tests.
+
+        Args:
+            test_runner: a handle to the manager/TestRunner object
+        """
+        self._test_runner = test_runner
+        for worker_number in xrange(self._num_workers):
+            worker = _WorkerState('worker-%d' % worker_number)
+            worker.thread = self._start_worker(worker_number, worker.name)
+            self._workers[worker.name] = worker
+        return self._threads()
+
+    def _start_worker(self, worker_number, worker_name):
+        raise NotImplementedError
+
+    def run_message_loop(self):
+        """Loop processing messages until done."""
+        raise NotImplementedError
+
+    def cancel_workers(self):
+        """Cancel/interrupt any workers that are still alive."""
+        pass
+
+    def cleanup(self):
+        """Perform any necessary cleanup on shutdown."""
+        pass
+
+
+class InlineBroker(WorkerMessageBroker):
+    def _start_worker(self, worker_number, worker_name):
+        # FIXME: Replace with something that isn't a thread.
+        thread = dump_render_tree_thread.TestShellThread(self._port,
+            self._options, worker_number, worker_name,
+            self._test_runner._current_filename_queue,
+            self._test_runner._result_queue)
+        # Note: Don't start() the thread! If we did, it would actually
+        # create another thread and start executing it, and we'd no longer
+        # be single-threaded.
+        return thread
+
+    def run_message_loop(self):
+        thread = self._threads()[0]
+        thread.run_in_main_thread(self._test_runner,
+                                  self._test_runner._current_result_summary)
+        self._test_runner.update()
+
+
+class MultiThreadedBroker(WorkerMessageBroker):
+    def _start_worker(self, worker_number, worker_name):
+        thread = dump_render_tree_thread.TestShellThread(self._port,
+            self._options, worker_number, worker_name,
+            self._test_runner._current_filename_queue,
+            self._test_runner._result_queue)
+        thread.start()
+        return thread
+
+    def run_message_loop(self):
+        threads = self._threads()
+
+        # Loop through all the threads waiting for them to finish.
+        some_thread_is_alive = True
+        while some_thread_is_alive:
+            some_thread_is_alive = False
+            t = time.time()
+            for thread in threads:
+                exception_info = thread.exception_info()
+                if exception_info is not None:
+                    # Re-raise the thread's exception here to make it
+                    # clear that testing was aborted. Otherwise,
+                    # the tests that did not run would be assumed
+                    # to have passed.
+                    raise exception_info[0], exception_info[1], exception_info[2]
+
+                if thread.isAlive():
+                    some_thread_is_alive = True
+                    next_timeout = thread.next_timeout()
+                    if next_timeout and t > next_timeout:
+                        log_wedged_worker(thread.getName(), thread.id())
+                        thread.clear_next_timeout()
+
+            self._test_runner.update()
+
+            if some_thread_is_alive:
+                time.sleep(0.01)
+
+    def cancel_workers(self):
+        threads = self._threads()
+        for thread in threads:
+            thread.cancel()
+
+
+def log_wedged_worker(name, id):
+    """Log information about the given worker state."""
     stack = _find_thread_stack(id)
     assert(stack is not None)
     _log.error("")
-    _log.error("Thread %d is wedged" % id)
+    _log.error("%s (tid %d) is wedged" % (name, id))
     _log_stack(stack)
     _log.error("")