# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-"""Module for handling messages, threads, processes, and concurrency for run-webkit-tests.
+"""Module for handling messages and concurrency for run-webkit-tests.
Testing is accomplished by having a manager (TestRunner) gather all of the
tests to be run, and sending messages to a pool of workers (TestShellThreads)
"""Return an instance of a WorkerMessageBroker."""
worker_model = options.worker_model
if worker_model == 'inline':
- return InlineBroker(port, options)
+ return _InlineBroker(port, options)
if worker_model == 'threads':
- return MultiThreadedBroker(port, options)
+ return _MultiThreadedBroker(port, options)
raise ValueError('unsupported value for --worker-model: %s' % worker_model)
-class _WorkerState(object):
- def __init__(self, name):
- self.name = name
- self.thread = None
-
-
-class WorkerMessageBroker(object):
+class _WorkerMessageBroker(object):
def __init__(self, port, options):
self._port = port
self._options = options
self._num_workers = int(self._options.child_processes)
- # This maps worker names to their _WorkerState values.
- self._workers = {}
-
- def _threads(self):
- return tuple([w.thread for w in self._workers.values()])
+ # This maps worker names to their TestShellThread objects.
+ self._threads = {}
def start_workers(self, test_runner):
"""Starts up the pool of workers for running the tests.
"""
self._test_runner = test_runner
for worker_number in xrange(self._num_workers):
- worker = _WorkerState('worker-%d' % worker_number)
- worker.thread = self._start_worker(worker_number, worker.name)
- self._workers[worker.name] = worker
- return self._threads()
+ thread = self.start_worker(worker_number)
+ self._threads[thread.name()] = thread
+ return self._threads.values()
- def _start_worker(self, worker_number, worker_name):
- raise NotImplementedError
+ def start_worker(self, worker_number):
+ # FIXME: Replace with something that isn't a thread.
+ # Note: Don't start() the thread! If we did, it would actually
+ # create another thread and start executing it, and we'd no longer
+ # be single-threaded.
+ return dump_render_tree_thread.TestShellThread(self._port,
+ self._options, worker_number,
+ self._test_runner._current_filename_queue,
+ self._test_runner._result_queue)
def run_message_loop(self):
"""Loop processing messages until done."""
pass
-class InlineBroker(WorkerMessageBroker):
- def _start_worker(self, worker_number, worker_name):
- # FIXME: Replace with something that isn't a thread.
- thread = dump_render_tree_thread.TestShellThread(self._port,
- self._options, worker_number, worker_name,
- self._test_runner._current_filename_queue,
- self._test_runner._result_queue)
- # Note: Don't start() the thread! If we did, it would actually
- # create another thread and start executing it, and we'd no longer
- # be single-threaded.
- return thread
-
+class _InlineBroker(_WorkerMessageBroker):
def run_message_loop(self):
- thread = self._threads()[0]
+ thread = self._threads.values()[0]
thread.run_in_main_thread(self._test_runner,
self._test_runner._current_result_summary)
self._test_runner.update()
-class MultiThreadedBroker(WorkerMessageBroker):
- def _start_worker(self, worker_number, worker_name):
- thread = dump_render_tree_thread.TestShellThread(self._port,
- self._options, worker_number, worker_name,
- self._test_runner._current_filename_queue,
- self._test_runner._result_queue)
+class _MultiThreadedBroker(_WorkerMessageBroker):
+ def start_worker(self, worker_number):
+ thread = _WorkerMessageBroker.start_worker(self, worker_number)
thread.start()
return thread
def run_message_loop(self):
- threads = self._threads()
-
# Loop through all the threads waiting for them to finish.
some_thread_is_alive = True
while some_thread_is_alive:
some_thread_is_alive = False
t = time.time()
- for thread in threads:
+ for thread in self._threads.values():
exception_info = thread.exception_info()
if exception_info is not None:
# Re-raise the thread's exception here to make it
some_thread_is_alive = True
next_timeout = thread.next_timeout()
if next_timeout and t > next_timeout:
- log_wedged_worker(thread.getName(), thread.id())
+ log_wedged_worker(thread.name(), thread.id())
thread.clear_next_timeout()
self._test_runner.update()
time.sleep(0.01)
def cancel_workers(self):
- threads = self._threads()
- for thread in threads:
+ for thread in self._threads.values():
thread.cancel()