from webkitpy.common.system import user
from webkitpy.thirdparty import simplejson
+from webkitpy.tool import grammar
import port
# in DumpRenderTree.
DEFAULT_TEST_TIMEOUT_MS = 6 * 1000
- def __init__(self, port, options, printer):
+ def __init__(self, port, options, printer, message_broker):
"""Initialize test runner data structures.
Args:
port: an object implementing port-specific
options: a dictionary of command line options
printer: a Printer object to record updates to.
+ message_broker: object used to communicate with workers.
"""
self._port = port
self._options = options
self._printer = printer
+ self._message_broker = message_broker
# disable wss server. need to install pyOpenSSL on buildbots.
# self._websocket_secure_server = websocket_server.PyWebSocket(
return True
return False
- def _instantiate_dump_render_tree_threads(self, test_files,
- result_summary):
- """Instantitates and starts the TestShellThread(s).
-
- Return:
- The list of threads.
- """
- num_workers = self._num_workers()
- test_lists = self._shard_tests(test_files,
- num_workers > 1 and not self._options.experimental_fully_parallel)
- filename_queue = Queue.Queue()
- for item in test_lists:
- filename_queue.put(item)
-
- # Instantiate TestShellThreads and start them.
- threads = []
- for worker_number in xrange(num_workers):
- thread = dump_render_tree_thread.TestShellThread(self._port,
- self._options, worker_number,
- filename_queue, self._result_queue)
- if num_workers > 1:
- thread.start()
- else:
- thread.run_in_main_thread(self, result_summary)
- threads.append(thread)
-
- return threads
-
def _num_workers(self):
return int(self._options.child_processes)
in the form {filename:filename, test_run_time:test_run_time}
result_summary: summary object to populate with the results
"""
- plural = ""
- if self._num_workers() > 1:
- plural = "s"
- self._printer.print_update('Starting %s%s ...' %
- (self._port.driver_name(), plural))
- threads = self._instantiate_dump_render_tree_threads(file_list,
- result_summary)
- self._printer.print_update("Starting testing ...")
- keyboard_interrupted = self._wait_for_threads_to_finish(threads,
- result_summary)
- (thread_timings, test_timings, individual_test_timings) = \
- self._collect_timing_info(threads)
+ self._printer.print_update('Sharding tests ...')
+ num_workers = self._num_workers()
+ test_lists = self._shard_tests(file_list,
+ num_workers > 1 and not self._options.experimental_fully_parallel)
+ filename_queue = Queue.Queue()
+ for item in test_lists:
+ filename_queue.put(item)
- return (keyboard_interrupted, thread_timings, test_timings,
- individual_test_timings)
+ self._printer.print_update('Starting %s ...' %
+ grammar.pluralize('worker', num_workers))
+ message_broker = self._message_broker
+ self._current_filename_queue = filename_queue
+ self._current_result_summary = result_summary
+ threads = message_broker.start_workers(self)
- def _wait_for_threads_to_finish(self, threads, result_summary):
+ self._printer.print_update("Starting testing ...")
keyboard_interrupted = False
try:
- # Loop through all the threads waiting for them to finish.
- some_thread_is_alive = True
- while some_thread_is_alive:
- some_thread_is_alive = False
- t = time.time()
- for thread in threads:
- exception_info = thread.exception_info()
- if exception_info is not None:
- # Re-raise the thread's exception here to make it
- # clear that testing was aborted. Otherwise,
- # the tests that did not run would be assumed
- # to have passed.
- raise exception_info[0], exception_info[1], exception_info[2]
-
- if thread.isAlive():
- some_thread_is_alive = True
- next_timeout = thread.next_timeout()
- if (next_timeout and t > next_timeout):
- message_broker.log_wedged_thread(thread.id())
- thread.clear_next_timeout()
-
- self.update_summary(result_summary)
-
- if some_thread_is_alive:
- time.sleep(0.01)
-
+ message_broker.run_message_loop()
except KeyboardInterrupt:
+ _log.info("Interrupted, exiting")
+ message_broker.cancel_workers()
keyboard_interrupted = True
- for thread in threads:
- thread.cancel()
+ except:
+ # Unexpected exception; don't try to clean up workers.
+ _log.info("Exception raised, exiting")
+ raise
- return keyboard_interrupted
+ thread_timings, test_timings, individual_test_timings = \
+ self._collect_timing_info(threads)
+
+ return (keyboard_interrupted, thread_timings, test_timings,
+ individual_test_timings)
+
+ def update(self):
+ self.update_summary(self._current_result_summary)
def _collect_timing_info(self, threads):
test_timings = {}
printer.cleanup()
return 0
+ broker = message_broker.get(port, options)
+
# We wrap any parts of the run that are slow or likely to raise exceptions
# in a try/finally to ensure that we clean up the logging configuration.
num_unexpected_results = -1
try:
- test_runner = TestRunner(port, options, printer)
+ test_runner = TestRunner(port, options, printer, broker)
test_runner._print_config()
printer.print_update("Collecting tests ...")
_log.debug("Testing completed, Exit status: %d" %
num_unexpected_results)
finally:
+ broker.cleanup()
printer.cleanup()
return num_unexpected_results
help="Number of DumpRenderTrees to run in parallel."),
# FIXME: Display default number of child processes that will run.
optparse.make_option("--worker-model", action="store",
- default="threads",
- help="controls worker model. Valid values are "
- "'inline' and 'threads' (default)."),
+ default="threads", help=("controls worker model. Valid values are "
+ "'inline' and 'threads' (default).")),
optparse.make_option("--experimental-fully-parallel",
action="store_true", default=False,
help="run all tests in parallel"),
old_run_webkit_tests_compat)
option_parser = optparse.OptionParser(option_list=option_list)
- options, args = option_parser.parse_args(args)
-
- if options.worker_model not in ('inline', 'threads'):
- option_parser.error("unsupported value for --worker-model: %s" %
- options.worker_model)
-
- return options, args
+ return option_parser.parse_args(args)
def main():