2010-12-02 Sam Weinig <sam@webkit.org>
[WebKit.git] / WebKitTools / Scripts / webkitpy / layout_tests / layout_package / message_broker.py
index e520a9caa6b7682ebfdd2a98bb217eb85d7251bc..285b2e7b1fd49029f238ae85d54a250875dedd1f 100644 (file)
@@ -26,7 +26,7 @@
 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
-"""Module for handling messages, threads, processes, and concurrency for run-webkit-tests.
+"""Module for handling messages and concurrency for run-webkit-tests.
 
 Testing is accomplished by having a manager (TestRunner) gather all of the
 tests to be run, and sending messages to a pool of workers (TestShellThreads)
 
 Testing is accomplished by having a manager (TestRunner) gather all of the
 tests to be run, and sending messages to a pool of workers (TestShellThreads)
@@ -54,29 +54,20 @@ def get(port, options):
     """Return an instance of a WorkerMessageBroker."""
     worker_model = options.worker_model
     if worker_model == 'inline':
     """Return an instance of a WorkerMessageBroker."""
     worker_model = options.worker_model
     if worker_model == 'inline':
-        return InlineBroker(port, options)
+        return _InlineBroker(port, options)
     if worker_model == 'threads':
     if worker_model == 'threads':
-        return MultiThreadedBroker(port, options)
+        return _MultiThreadedBroker(port, options)
     raise ValueError('unsupported value for --worker-model: %s' % worker_model)
 
 
     raise ValueError('unsupported value for --worker-model: %s' % worker_model)
 
 
-class _WorkerState(object):
-    def __init__(self, name):
-        self.name = name
-        self.thread = None
-
-
-class WorkerMessageBroker(object):
+class _WorkerMessageBroker(object):
     def __init__(self, port, options):
         self._port = port
         self._options = options
         self._num_workers = int(self._options.child_processes)
 
     def __init__(self, port, options):
         self._port = port
         self._options = options
         self._num_workers = int(self._options.child_processes)
 
-        # This maps worker names to their _WorkerState values.
-        self._workers = {}
-
-    def _threads(self):
-        return tuple([w.thread for w in self._workers.values()])
+        # This maps worker names to their TestShellThread objects.
+        self._threads = {}
 
     def start_workers(self, test_runner):
         """Starts up the pool of workers for running the tests.
 
     def start_workers(self, test_runner):
         """Starts up the pool of workers for running the tests.
@@ -86,13 +77,19 @@ class WorkerMessageBroker(object):
         """
         self._test_runner = test_runner
         for worker_number in xrange(self._num_workers):
         """
         self._test_runner = test_runner
         for worker_number in xrange(self._num_workers):
-            worker = _WorkerState('worker-%d' % worker_number)
-            worker.thread = self._start_worker(worker_number, worker.name)
-            self._workers[worker.name] = worker
-        return self._threads()
+            thread = self.start_worker(worker_number)
+            self._threads[thread.name()] = thread
+        return self._threads.values()
 
 
-    def _start_worker(self, worker_number, worker_name):
-        raise NotImplementedError
+    def start_worker(self, worker_number):
+        # FIXME: Replace with something that isn't a thread.
+        # Note: Don't start() the thread! If we did, it would actually
+        # create another thread and start executing it, and we'd no longer
+        # be single-threaded.
+        return dump_render_tree_thread.TestShellThread(self._port,
+            self._options, worker_number,
+            self._test_runner._current_filename_queue,
+            self._test_runner._result_queue)
 
     def run_message_loop(self):
         """Loop processing messages until done."""
 
     def run_message_loop(self):
         """Loop processing messages until done."""
@@ -107,43 +104,27 @@ class WorkerMessageBroker(object):
         pass
 
 
         pass
 
 
-class InlineBroker(WorkerMessageBroker):
-    def _start_worker(self, worker_number, worker_name):
-        # FIXME: Replace with something that isn't a thread.
-        thread = dump_render_tree_thread.TestShellThread(self._port,
-            self._options, worker_number, worker_name,
-            self._test_runner._current_filename_queue,
-            self._test_runner._result_queue)
-        # Note: Don't start() the thread! If we did, it would actually
-        # create another thread and start executing it, and we'd no longer
-        # be single-threaded.
-        return thread
-
+class _InlineBroker(_WorkerMessageBroker):
     def run_message_loop(self):
     def run_message_loop(self):
-        thread = self._threads()[0]
+        thread = self._threads.values()[0]
         thread.run_in_main_thread(self._test_runner,
                                   self._test_runner._current_result_summary)
         self._test_runner.update()
 
 
         thread.run_in_main_thread(self._test_runner,
                                   self._test_runner._current_result_summary)
         self._test_runner.update()
 
 
-class MultiThreadedBroker(WorkerMessageBroker):
-    def _start_worker(self, worker_number, worker_name):
-        thread = dump_render_tree_thread.TestShellThread(self._port,
-            self._options, worker_number, worker_name,
-            self._test_runner._current_filename_queue,
-            self._test_runner._result_queue)
+class _MultiThreadedBroker(_WorkerMessageBroker):
+    def start_worker(self, worker_number):
+        thread = _WorkerMessageBroker.start_worker(self, worker_number)
         thread.start()
         return thread
 
     def run_message_loop(self):
         thread.start()
         return thread
 
     def run_message_loop(self):
-        threads = self._threads()
-
         # Loop through all the threads waiting for them to finish.
         some_thread_is_alive = True
         while some_thread_is_alive:
             some_thread_is_alive = False
             t = time.time()
         # Loop through all the threads waiting for them to finish.
         some_thread_is_alive = True
         while some_thread_is_alive:
             some_thread_is_alive = False
             t = time.time()
-            for thread in threads:
+            for thread in self._threads.values():
                 exception_info = thread.exception_info()
                 if exception_info is not None:
                     # Re-raise the thread's exception here to make it
                 exception_info = thread.exception_info()
                 if exception_info is not None:
                     # Re-raise the thread's exception here to make it
@@ -156,7 +137,7 @@ class MultiThreadedBroker(WorkerMessageBroker):
                     some_thread_is_alive = True
                     next_timeout = thread.next_timeout()
                     if next_timeout and t > next_timeout:
                     some_thread_is_alive = True
                     next_timeout = thread.next_timeout()
                     if next_timeout and t > next_timeout:
-                        log_wedged_worker(thread.getName(), thread.id())
+                        log_wedged_worker(thread.name(), thread.id())
                         thread.clear_next_timeout()
 
             self._test_runner.update()
                         thread.clear_next_timeout()
 
             self._test_runner.update()
@@ -165,8 +146,7 @@ class MultiThreadedBroker(WorkerMessageBroker):
                 time.sleep(0.01)
 
     def cancel_workers(self):
                 time.sleep(0.01)
 
     def cancel_workers(self):
-        threads = self._threads()
-        for thread in threads:
+        for thread in self._threads.values():
             thread.cancel()
 
 
             thread.cancel()