1 # Copyright (C) 2011 Google Inc. All rights reserved.
3 # Redistribution and use in source and binary forms, with or without
4 # modification, are permitted provided that the following conditions are
7 # * Redistributions of source code must retain the above copyright
8 # notice, this list of conditions and the following disclaimer.
9 # * Redistributions in binary form must reproduce the above
10 # copyright notice, this list of conditions and the following disclaimer
11 # in the documentation and/or other materials provided with the
13 # * Neither the name of Google Inc. nor the names of its
14 # contributors may be used to endorse or promote products derived from
15 # this software without specific prior written permission.
17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 from webkitpy.common import message_pool
36 from webkitpy.layout_tests.controllers import single_test_runner
37 from webkitpy.layout_tests.models.test_run_results import TestRunResults
38 from webkitpy.layout_tests.models import test_expectations
39 from webkitpy.layout_tests.models import test_failures
40 from webkitpy.layout_tests.models import test_results
41 from webkitpy.tool import grammar
44 _log = logging.getLogger(__name__)
47 TestExpectations = test_expectations.TestExpectations
49 # Export this so callers don't need to know about message pools.
50 WorkerException = message_pool.WorkerException
53 class TestRunInterruptedException(Exception):
54 """Raised when a test run should be stopped immediately."""
55 def __init__(self, reason):
56 Exception.__init__(self)
61 return self.__class__, (self.reason,)
64 class LayoutTestRunner(object):
65 def __init__(self, options, port, printer, results_directory, test_is_slow_fn, needs_http=False, needs_websockets=False, needs_web_platform_test_server=False):
66 self._options = options
68 self._printer = printer
69 self._results_directory = results_directory
70 self._test_is_slow = test_is_slow_fn
71 self._needs_http = needs_http
72 self._needs_websockets = needs_websockets
73 self._needs_web_platform_test_server = needs_web_platform_test_server
75 self._sharder = Sharder(self._port.split_test)
76 self._filesystem = self._port.host.filesystem
78 self._expectations = None
79 self._test_inputs = []
80 self._retrying = False
81 self._current_run_results = None
82 self._did_start_http_server = False
83 self._did_start_websocket_server = False
84 self._did_start_wpt_server = False
86 if ((self._needs_http and self._options.http) or self._needs_web_platform_test_server) and self._port.get_option("start_http_servers_if_needed"):
88 atexit.register(lambda: self.stop_servers())
90 def get_worker_count(self, test_inputs, child_process_count):
91 all_shards = self._sharder.shard_tests(test_inputs, child_process_count, self._options.fully_parallel)
92 return min(child_process_count, len(all_shards))
94 def run_tests(self, expectations, test_inputs, tests_to_skip, num_workers, retrying):
95 self._expectations = expectations
96 self._test_inputs = test_inputs
98 self._retrying = retrying
100 # FIXME: rename all variables to test_run_results or some such ...
101 run_results = TestRunResults(self._expectations, len(test_inputs) + len(tests_to_skip))
102 self._current_run_results = run_results
103 self._printer.num_tests = len(test_inputs)
104 self._printer.num_started = 0
107 self._printer.print_expected(run_results, self._expectations.model().get_tests_with_result_type)
109 for test_name in set(tests_to_skip):
110 result = test_results.TestResult(test_name)
111 result.type = test_expectations.SKIP
112 run_results.add(result, expected=True, test_is_slow=self._test_is_slow(test_name))
114 self._printer.write_update('Sharding tests ...')
115 all_shards = self._sharder.shard_tests(test_inputs, int(self._options.child_processes), self._options.fully_parallel)
117 self._printer.print_workers_and_shards(num_workers, len(all_shards))
119 if self._options.dry_run:
122 self._printer.write_update('Starting %s ...' % grammar.pluralize(num_workers, "worker"))
125 with message_pool.get(self, self._worker_factory, num_workers, self._port.worker_startup_delay_secs(), self._port.host) as pool:
126 pool.run(('test_list', shard.name, shard.test_inputs) for shard in all_shards)
127 except TestRunInterruptedException as e:
128 _log.warning(e.reason)
129 run_results.interrupted = True
130 except KeyboardInterrupt:
131 self._printer.flush()
132 self._printer.writeln('Interrupted, exiting ...')
133 run_results.keyboard_interrupted = True
134 except Exception as e:
135 _log.debug('%s("%s") raised, exiting' % (e.__class__.__name__, str(e)))
140 def _worker_factory(self, worker_connection):
141 results_directory = self._results_directory
143 self._filesystem.maybe_make_directory(self._filesystem.join(self._results_directory, 'retries'))
144 results_directory = self._filesystem.join(self._results_directory, 'retries')
145 return Worker(worker_connection, results_directory, self._options)
147 def _handle_did_spawn_worker(self, worker_number):
148 self._port.did_spawn_worker(worker_number)
150 def _mark_interrupted_tests_as_skipped(self, run_results):
151 for test_input in self._test_inputs:
152 if test_input.test_name not in run_results.results_by_name:
153 result = test_results.TestResult(test_input.test_name, [test_failures.FailureEarlyExit()])
154 # FIXME: We probably need to loop here if there are multiple iterations.
155 # FIXME: Also, these results are really neither expected nor unexpected. We probably
156 # need a third type of result.
157 run_results.add(result, expected=False, test_is_slow=self._test_is_slow(test_input.test_name))
159 def _interrupt_if_at_failure_limits(self, run_results):
160 # Note: The messages in this method are constructed to match old-run-webkit-tests
161 # so that existing buildbot grep rules work.
162 def interrupt_if_at_failure_limit(limit, failure_count, run_results, message):
163 if limit and failure_count >= limit:
164 message += " %d tests run." % (run_results.expected + run_results.unexpected)
165 self._mark_interrupted_tests_as_skipped(run_results)
166 raise TestRunInterruptedException(message)
168 interrupt_if_at_failure_limit(
169 self._options.exit_after_n_failures,
170 run_results.unexpected_failures,
172 "Exiting early after %d failures." % run_results.unexpected_failures)
173 interrupt_if_at_failure_limit(
174 self._options.exit_after_n_crashes_or_timeouts,
175 run_results.unexpected_crashes + run_results.unexpected_timeouts,
177 # This differs from ORWT because it does not include WebProcess crashes.
178 "Exiting early after %d crashes and %d timeouts." % (run_results.unexpected_crashes, run_results.unexpected_timeouts))
180 def _update_summary_with_result(self, run_results, result):
181 if result.type == test_expectations.SKIP:
182 exp_str = got_str = 'SKIP'
185 expectations = self._expectations.filtered_expectations_for_test(result.test_name, self._options.pixel_tests or bool(result.reftest_type), self._options.world_leaks)
186 expected = self._expectations.matches_an_expected_result(result.test_name, result.type, expectations)
187 exp_str = self._expectations.model().expectations_to_string(expectations)
188 got_str = self._expectations.model().expectation_to_string(result.type)
190 run_results.add(result, expected, self._test_is_slow(result.test_name))
192 self._printer.print_finished_test(result, expected, exp_str, got_str)
194 self._interrupt_if_at_failure_limits(run_results)
196 def _annotate_results_with_additional_failures(self, run_results, results):
197 for new_result in results:
198 existing_result = run_results.results_by_name.get(new_result.test_name)
199 # When running a chunk (--run-chunk), results_by_name contains all the tests, but (confusingly) all_tests only contains those in the chunk that was run,
200 # and we don't want to modify the results of a test that didn't run. existing_result.test_number is only non-None for tests that ran.
201 if existing_result and existing_result.test_number is not None:
202 expectations = self._expectations.filtered_expectations_for_test(new_result.test_name, self._options.pixel_tests or bool(new_result.reftest_type), self._options.world_leaks)
203 was_expected = self._expectations.matches_an_expected_result(new_result.test_name, existing_result.type, expectations)
204 now_expected = self._expectations.matches_an_expected_result(new_result.test_name, new_result.type, expectations)
205 run_results.change_result_to_failure(existing_result, new_result, was_expected, now_expected)
207 def start_servers(self):
208 if self._needs_http and not self._did_start_http_server and not self._port.is_http_server_running():
209 self._printer.write_update('Starting HTTP server ...')
210 self._port.start_http_server()
211 self._did_start_http_server = True
212 if self._needs_websockets and not self._did_start_websocket_server and not self._port.is_websocket_server_running():
213 self._printer.write_update('Starting WebSocket server ...')
214 self._port.start_websocket_server()
215 self._did_start_websocket_server = True
216 if self._needs_web_platform_test_server and not self._did_start_wpt_server and not self._port.is_wpt_server_running():
217 self._printer.write_update('Starting Web Platform Test server ...')
218 self._port.start_web_platform_test_server()
219 self._did_start_wpt_server = True
221 def stop_servers(self):
222 if self._did_start_http_server:
223 self._printer.write_update('Stopping HTTP server ...')
224 self._port.stop_http_server()
225 self._did_start_http_server = False
226 if self._did_start_websocket_server:
227 self._printer.write_update('Stopping WebSocket server ...')
228 self._port.stop_websocket_server()
229 self._did_start_websocket_server = False
230 if self._did_start_wpt_server:
231 self._printer.write_update('Stopping Web Platform Test server ...')
232 self._port.stop_web_platform_test_server()
233 self._did_start_wpt_server = False
235 def handle(self, name, source, *args):
236 method = getattr(self, '_handle_' + name)
238 return method(source, *args)
239 raise AssertionError('unknown message %s received from %s, args=%s' % (name, source, repr(args)))
241 def _handle_started_test(self, worker_name, test_input, test_timeout_sec):
242 self._printer.print_started_test(test_input.test_name)
244 def _handle_finished_test(self, worker_name, result, log_messages=[]):
245 self._update_summary_with_result(self._current_run_results, result)
247 def _handle_finished_test_group(self, worker_name, overlay_results, log_messages=[]):
248 self._annotate_results_with_additional_failures(self._current_run_results, overlay_results)
251 class Worker(object):
252 def __init__(self, caller, results_directory, options):
253 self._caller = caller
254 self._worker_number = caller.worker_number
255 self._name = caller.name
256 self._results_directory = results_directory
257 self._options = options
259 # The remaining fields are initialized in start()
262 self._batch_size = None
263 self._batch_count = None
264 self._filesystem = None
272 """This method is called when the object is starting to be used and it is safe
273 for the object to create state that does not need to be pickled (usually this means
274 it is called in a child process)."""
275 self._host = self._caller.host
276 self._filesystem = self._host.filesystem
277 self._port = self._host.port_factory.get(self._options.platform, self._options)
279 self._batch_count = 0
280 self._batch_size = self._options.batch_size or 0
282 def handle(self, name, source, test_list_name, test_inputs):
283 assert name == 'test_list'
284 for test_input in test_inputs:
285 self._run_test(test_input, test_list_name)
287 self._finished_test_group(test_inputs)
289 def _update_test_input(self, test_input):
290 if test_input.reference_files is None:
291 # Lazy initialization.
292 test_input.reference_files = self._port.reference_files(test_input.test_name)
293 if test_input.reference_files:
294 test_input.should_run_pixel_test = True
296 test_input.should_run_pixel_test = self._port.should_run_as_pixel_test(test_input)
298 def _run_test(self, test_input, shard_name):
299 self._batch_count += 1
301 stop_when_done = False
302 if self._batch_size > 0 and self._batch_count >= self._batch_size:
303 self._batch_count = 0
304 stop_when_done = True
306 self._update_test_input(test_input)
307 test_timeout_sec = self._timeout(test_input)
309 self._caller.post('started_test', test_input, test_timeout_sec)
311 result = self._run_test_with_or_without_timeout(test_input, test_timeout_sec, stop_when_done)
312 result.shard_name = shard_name
313 result.worker_name = self._name
314 result.total_run_time = time.time() - start
315 result.test_number = self._num_tests
318 self._caller.post('finished_test', result)
320 self._clean_up_after_test(test_input, result)
322 def _do_post_tests_work(self, driver):
323 additional_results = []
325 return additional_results
327 post_test_output = driver.do_post_tests_work()
329 for test_name, doc_list in post_test_output.world_leaks_dict.iteritems():
330 additional_results.append(test_results.TestResult(test_name, [test_failures.FailureDocumentLeak(doc_list)]))
331 return additional_results
333 def _finished_test_group(self, test_inputs):
334 _log.debug("%s finished test group" % self._name)
336 if self._driver and self._driver.has_crashed():
339 additional_results = []
340 if not self._options.run_singly:
341 additional_results = self._do_post_tests_work(self._driver)
343 self._caller.post('finished_test_group', additional_results)
346 _log.debug("%s cleaning up" % self._name)
349 def _timeout(self, test_input):
350 """Compute the appropriate timeout value for a test."""
351 # The DumpRenderTree watchdog uses 2.5x the timeout; we want to be
352 # larger than that. We also add a little more padding if we're
353 # running tests in a separate thread.
355 # Note that we need to convert the test timeout from a
356 # string value in milliseconds to a float for Python.
357 driver_timeout_sec = 3.0 * float(test_input.timeout) / 1000.0
358 if not self._options.run_singly:
359 return driver_timeout_sec
361 thread_padding_sec = 1.0
362 thread_timeout_sec = driver_timeout_sec + thread_padding_sec
363 return thread_timeout_sec
365 def _kill_driver(self):
366 # Be careful about how and when we kill the driver; if driver.stop()
367 # raises an exception, this routine may get re-entered via __del__.
368 driver = self._driver
371 _log.debug("%s killing driver" % self._name)
374 def _run_test_with_or_without_timeout(self, test_input, timeout, stop_when_done):
375 if self._options.run_singly:
376 return self._run_test_in_another_thread(test_input, timeout, stop_when_done)
377 return self._run_test_in_this_thread(test_input, stop_when_done)
379 def _clean_up_after_test(self, test_input, result):
380 test_name = test_input.test_name
383 # Check and kill DumpRenderTree if we need to.
384 if any([f.driver_needs_restart() for f in result.failures]):
386 # Reset the batch count since the shell just bounced.
387 self._batch_count = 0
389 # Print the error message(s).
390 _log.debug("%s %s failed:" % (self._name, test_name))
391 for f in result.failures:
392 _log.debug("%s %s" % (self._name, f.message()))
393 elif result.type == test_expectations.SKIP:
394 _log.debug("%s %s skipped" % (self._name, test_name))
396 _log.debug("%s %s passed" % (self._name, test_name))
398 def _run_test_in_another_thread(self, test_input, thread_timeout_sec, stop_when_done):
399 """Run a test in a separate thread, enforcing a hard time limit.
401 Since we can only detect the termination of a thread, not any internal
402 state or progress, we can only run per-test timeouts when running test
406 test_input: Object containing the test filename and timeout
407 thread_timeout_sec: time to wait before killing the driver process.
413 driver = self._port.create_driver(self._worker_number, self._options.no_timeout)
415 class SingleTestThread(threading.Thread):
417 threading.Thread.__init__(self)
421 self.result = worker._run_single_test(driver, test_input, stop_when_done)
423 thread = SingleTestThread()
425 thread.join(thread_timeout_sec)
426 result = thread.result
429 # If join() returned with the thread still running, the
430 # DumpRenderTree is completely hung and there's nothing
431 # more we can do with it. We have to kill all the
432 # DumpRenderTrees to free it up. If we're running more than
433 # one DumpRenderTree thread, we'll end up killing the other
434 # DumpRenderTrees too, introducing spurious crashes. We accept
435 # that tradeoff in order to avoid losing the rest of this
437 _log.error('Test thread hung: killing all DumpRenderTrees')
438 failures = [test_failures.FailureTimeout()]
440 failure_results = self._do_post_tests_work(driver)
441 for failure_result in failure_results:
442 if failure_result.test_name == result.test_name:
443 result.convert_to_failure(failure_result)
448 result = test_results.TestResult(test_input.test_name, failures=failures, test_run_time=0)
451 def _run_test_in_this_thread(self, test_input, stop_when_done):
452 """Run a single test file using a shared DumpRenderTree process.
455 test_input: Object containing the test filename, uri and timeout
457 Returns: a TestResult object.
459 if self._driver and self._driver.has_crashed():
462 self._driver = self._port.create_driver(self._worker_number, self._options.no_timeout)
463 return self._run_single_test(self._driver, test_input, stop_when_done)
465 def _run_single_test(self, driver, test_input, stop_when_done):
466 return single_test_runner.run_single_test(self._port, self._options, self._results_directory,
467 self._name, driver, test_input, stop_when_done)
470 class TestShard(object):
471 """A test shard is a named list of TestInputs."""
473 def __init__(self, name, test_inputs):
475 self.test_inputs = test_inputs
476 self.needs_servers = test_inputs[0].needs_servers
479 return "TestShard(name='%s', test_inputs=%s, needs_servers=%s'" % (self.name, self.test_inputs, self.needs_servers)
481 def __eq__(self, other):
482 return self.name == other.name and self.test_inputs == other.test_inputs
485 class Sharder(object):
486 def __init__(self, test_split_fn):
487 self._split = test_split_fn
489 def shard_tests(self, test_inputs, num_workers, fully_parallel):
490 """Groups tests into batches.
491 This helps ensure that tests that depend on each other (aka bad tests!)
492 continue to run together as most cross-tests dependencies tend to
493 occur within the same directory.
495 A list of TestShards.
498 # FIXME: Move all of the sharding logic out of manager into its
499 # own class or module. Consider grouping it with the chunking logic
500 # in prepare_lists as well.
502 return [TestShard('all_tests', test_inputs)]
504 return self._shard_every_file(test_inputs)
505 return self._shard_by_directory(test_inputs, num_workers)
507 def _shard_every_file(self, test_inputs):
508 """Returns a list of shards, each shard containing a single test file.
510 This mode gets maximal parallelism at the cost of much higher flakiness."""
512 for test_input in test_inputs:
513 # Note that we use a '.' for the shard name; the name doesn't really
514 # matter, and the only other meaningful value would be the filename,
515 # which would be really redundant.
516 shards.append(TestShard('.', [test_input]))
520 def _shard_by_directory(self, test_inputs, num_workers):
521 """Returns a lists of shards, each shard containing all the files in a directory.
523 This is the default mode, and gets as much parallelism as we can while
524 minimizing flakiness caused by inter-test dependencies."""
527 # FIXME: Given that the tests are already sorted by directory,
528 # we can probably rewrite this to be clearer and faster.
529 for test_input in test_inputs:
530 directory = self._split(test_input.test_name)[0]
531 tests_by_dir.setdefault(directory, [])
532 tests_by_dir[directory].append(test_input)
534 for directory, test_inputs in tests_by_dir.iteritems():
535 shard = TestShard(directory, test_inputs)
538 # Sort the shards by directory name.
539 shards.sort(key=lambda shard: shard.name)