1 # Copyright (C) 2011 Google Inc. All rights reserved.
3 # Redistribution and use in source and binary forms, with or without
4 # modification, are permitted provided that the following conditions are
7 # * Redistributions of source code must retain the above copyright
8 # notice, this list of conditions and the following disclaimer.
9 # * Redistributions in binary form must reproduce the above
10 # copyright notice, this list of conditions and the following disclaimer
11 # in the documentation and/or other materials provided with the
13 # * Neither the name of Google Inc. nor the names of its
14 # contributors may be used to endorse or promote products derived from
15 # this software without specific prior written permission.
17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 from webkitpy.common import message_pool
36 from webkitpy.layout_tests.controllers import single_test_runner
37 from webkitpy.layout_tests.models.test_run_results import TestRunResults
38 from webkitpy.layout_tests.models import test_expectations
39 from webkitpy.layout_tests.models import test_failures
40 from webkitpy.layout_tests.models import test_results
41 from webkitpy.tool import grammar
44 _log = logging.getLogger(__name__)
47 TestExpectations = test_expectations.TestExpectations
49 # Export this so callers don't need to know about message pools.
50 WorkerException = message_pool.WorkerException
53 class TestRunInterruptedException(Exception):
54 """Raised when a test run should be stopped immediately."""
55 def __init__(self, reason):
56 Exception.__init__(self)
61 return self.__class__, (self.reason,)
64 class LayoutTestRunner(object):
65 def __init__(self, options, port, printer, results_directory, test_is_slow_fn, needs_http=False, needs_websockets=False, needs_web_platform_test_server=False):
66 self._options = options
68 self._printer = printer
69 self._results_directory = results_directory
70 self._test_is_slow = test_is_slow_fn
71 self._needs_http = needs_http
72 self._needs_websockets = needs_websockets
73 self._needs_web_platform_test_server = needs_web_platform_test_server
75 self._sharder = Sharder(self._port.split_test)
76 self._filesystem = self._port.host.filesystem
78 self._expectations = None
79 self._test_inputs = []
80 self._retrying = False
81 self._current_run_results = None
82 self._did_start_http_server = False
83 self._did_start_websocket_server = False
84 self._did_start_wpt_server = False
86 if ((self._needs_http and self._options.http) or self._needs_web_platform_test_server) and self._port.get_option("start_http_servers_if_needed"):
88 atexit.register(lambda: self.stop_servers())
90 def get_worker_count(self, test_inputs, child_process_count):
91 all_shards = self._sharder.shard_tests(test_inputs, child_process_count, self._options.fully_parallel)
92 return min(child_process_count, len(all_shards))
94 def run_tests(self, expectations, test_inputs, tests_to_skip, num_workers, retrying):
95 self._expectations = expectations
96 self._test_inputs = test_inputs
98 self._retrying = retrying
100 # FIXME: rename all variables to test_run_results or some such ...
101 run_results = TestRunResults(self._expectations, len(test_inputs) + len(tests_to_skip))
102 self._current_run_results = run_results
103 self._printer.num_tests = len(test_inputs)
104 self._printer.num_started = 0
107 self._printer.print_expected(run_results, self._expectations.model().get_tests_with_result_type)
109 for test_name in set(tests_to_skip):
110 result = test_results.TestResult(test_name)
111 result.type = test_expectations.SKIP
112 run_results.add(result, expected=True, test_is_slow=self._test_is_slow(test_name))
114 self._printer.write_update('Sharding tests ...')
115 all_shards = self._sharder.shard_tests(test_inputs, int(self._options.child_processes), self._options.fully_parallel)
117 self._printer.print_workers_and_shards(num_workers, len(all_shards))
119 if self._options.dry_run:
122 self._printer.write_update('Starting %s ...' % grammar.pluralize(num_workers, "worker"))
125 with message_pool.get(self, self._worker_factory, num_workers, self._port.worker_startup_delay_secs(), self._port.host) as pool:
126 pool.run(('test_list', shard.name, shard.test_inputs) for shard in all_shards)
127 except TestRunInterruptedException as e:
128 _log.warning(e.reason)
129 run_results.interrupted = True
130 except KeyboardInterrupt:
131 self._printer.flush()
132 self._printer.writeln('Interrupted, exiting ...')
133 run_results.keyboard_interrupted = True
134 except Exception as e:
135 _log.debug('%s("%s") raised, exiting' % (e.__class__.__name__, str(e)))
140 def _worker_factory(self, worker_connection):
141 results_directory = self._results_directory
143 self._filesystem.maybe_make_directory(self._filesystem.join(self._results_directory, 'retries'))
144 results_directory = self._filesystem.join(self._results_directory, 'retries')
145 return Worker(worker_connection, results_directory, self._options)
147 def _handle_did_spawn_worker(self, worker_number):
148 self._port.did_spawn_worker(worker_number)
150 def _mark_interrupted_tests_as_skipped(self, run_results):
151 for test_input in self._test_inputs:
152 if test_input.test_name not in run_results.results_by_name:
153 result = test_results.TestResult(test_input.test_name, [test_failures.FailureEarlyExit()])
154 # FIXME: We probably need to loop here if there are multiple iterations.
155 # FIXME: Also, these results are really neither expected nor unexpected. We probably
156 # need a third type of result.
157 run_results.add(result, expected=False, test_is_slow=self._test_is_slow(test_input.test_name))
159 def _interrupt_if_at_failure_limits(self, run_results):
160 # Note: The messages in this method are constructed to match old-run-webkit-tests
161 # so that existing buildbot grep rules work.
162 def interrupt_if_at_failure_limit(limit, failure_count, run_results, message):
163 if limit and failure_count >= limit:
164 message += " %d tests run." % (run_results.expected + run_results.unexpected)
165 self._mark_interrupted_tests_as_skipped(run_results)
166 raise TestRunInterruptedException(message)
168 interrupt_if_at_failure_limit(
169 self._options.exit_after_n_failures,
170 run_results.unexpected_failures,
172 "Exiting early after %d failures." % run_results.unexpected_failures)
173 interrupt_if_at_failure_limit(
174 self._options.exit_after_n_crashes_or_timeouts,
175 run_results.unexpected_crashes + run_results.unexpected_timeouts,
177 # This differs from ORWT because it does not include WebProcess crashes.
178 "Exiting early after %d crashes and %d timeouts." % (run_results.unexpected_crashes, run_results.unexpected_timeouts))
180 def _update_summary_with_result(self, run_results, result):
181 if result.type == test_expectations.SKIP:
182 exp_str = got_str = 'SKIP'
185 expected = self._expectations.matches_an_expected_result(result.test_name, result.type, self._options.pixel_tests or result.reftest_type, self._options.world_leaks)
186 exp_str = self._expectations.model().get_expectations_string(result.test_name)
187 got_str = self._expectations.model().expectation_to_string(result.type)
189 run_results.add(result, expected, self._test_is_slow(result.test_name))
191 self._printer.print_finished_test(result, expected, exp_str, got_str)
193 self._interrupt_if_at_failure_limits(run_results)
195 def _annotate_results_with_additional_failures(self, run_results, results):
196 for new_result in results:
197 existing_result = run_results.results_by_name.get(new_result.test_name)
198 # When running a chunk (--run-chunk), results_by_name contains all the tests, but (confusingly) all_tests only contains those in the chunk that was run,
199 # and we don't want to modify the results of a test that didn't run. existing_result.test_number is only non-None for tests that ran.
200 if existing_result and existing_result.test_number is not None:
201 was_expected = self._expectations.matches_an_expected_result(new_result.test_name, existing_result.type, self._options.pixel_tests or existing_result.reftest_type, self._options.world_leaks)
202 now_expected = self._expectations.matches_an_expected_result(new_result.test_name, new_result.type, self._options.pixel_tests or new_result.reftest_type, self._options.world_leaks)
203 run_results.change_result_to_failure(existing_result, new_result, was_expected, now_expected)
205 def start_servers(self):
206 if self._needs_http and not self._did_start_http_server and not self._port.is_http_server_running():
207 self._printer.write_update('Starting HTTP server ...')
208 self._port.start_http_server()
209 self._did_start_http_server = True
210 if self._needs_websockets and not self._did_start_websocket_server and not self._port.is_websocket_server_running():
211 self._printer.write_update('Starting WebSocket server ...')
212 self._port.start_websocket_server()
213 self._did_start_websocket_server = True
214 if self._needs_web_platform_test_server and not self._did_start_wpt_server and not self._port.is_wpt_server_running():
215 self._printer.write_update('Starting Web Platform Test server ...')
216 self._port.start_web_platform_test_server()
217 self._did_start_wpt_server = True
219 def stop_servers(self):
220 if self._did_start_http_server:
221 self._printer.write_update('Stopping HTTP server ...')
222 self._port.stop_http_server()
223 self._did_start_http_server = False
224 if self._did_start_websocket_server:
225 self._printer.write_update('Stopping WebSocket server ...')
226 self._port.stop_websocket_server()
227 self._did_start_websocket_server = False
228 if self._did_start_wpt_server:
229 self._printer.write_update('Stopping Web Platform Test server ...')
230 self._port.stop_web_platform_test_server()
231 self._did_start_wpt_server = False
233 def handle(self, name, source, *args):
234 method = getattr(self, '_handle_' + name)
236 return method(source, *args)
237 raise AssertionError('unknown message %s received from %s, args=%s' % (name, source, repr(args)))
239 def _handle_started_test(self, worker_name, test_input, test_timeout_sec):
240 self._printer.print_started_test(test_input.test_name)
242 def _handle_finished_test(self, worker_name, result, log_messages=[]):
243 self._update_summary_with_result(self._current_run_results, result)
245 def _handle_finished_test_group(self, worker_name, overlay_results, log_messages=[]):
246 self._annotate_results_with_additional_failures(self._current_run_results, overlay_results)
249 class Worker(object):
250 def __init__(self, caller, results_directory, options):
251 self._caller = caller
252 self._worker_number = caller.worker_number
253 self._name = caller.name
254 self._results_directory = results_directory
255 self._options = options
257 # The remaining fields are initialized in start()
260 self._batch_size = None
261 self._batch_count = None
262 self._filesystem = None
270 """This method is called when the object is starting to be used and it is safe
271 for the object to create state that does not need to be pickled (usually this means
272 it is called in a child process)."""
273 self._host = self._caller.host
274 self._filesystem = self._host.filesystem
275 self._port = self._host.port_factory.get(self._options.platform, self._options)
277 self._batch_count = 0
278 self._batch_size = self._options.batch_size or 0
280 def handle(self, name, source, test_list_name, test_inputs):
281 assert name == 'test_list'
282 for test_input in test_inputs:
283 self._run_test(test_input, test_list_name)
285 self._finished_test_group(test_inputs)
287 def _update_test_input(self, test_input):
288 if test_input.reference_files is None:
289 # Lazy initialization.
290 test_input.reference_files = self._port.reference_files(test_input.test_name)
291 if test_input.reference_files:
292 test_input.should_run_pixel_test = True
294 test_input.should_run_pixel_test = self._port.should_run_as_pixel_test(test_input)
296 def _run_test(self, test_input, shard_name):
297 self._batch_count += 1
299 stop_when_done = False
300 if self._batch_size > 0 and self._batch_count >= self._batch_size:
301 self._batch_count = 0
302 stop_when_done = True
304 self._update_test_input(test_input)
305 test_timeout_sec = self._timeout(test_input)
307 self._caller.post('started_test', test_input, test_timeout_sec)
309 result = self._run_test_with_or_without_timeout(test_input, test_timeout_sec, stop_when_done)
310 result.shard_name = shard_name
311 result.worker_name = self._name
312 result.total_run_time = time.time() - start
313 result.test_number = self._num_tests
316 self._caller.post('finished_test', result)
318 self._clean_up_after_test(test_input, result)
320 def _do_post_tests_work(self, driver):
321 additional_results = []
323 return additional_results
325 post_test_output = driver.do_post_tests_work()
327 for test_name, doc_list in post_test_output.world_leaks_dict.iteritems():
328 additional_results.append(test_results.TestResult(test_name, [test_failures.FailureDocumentLeak(doc_list)]))
329 return additional_results
331 def _finished_test_group(self, test_inputs):
332 _log.debug("%s finished test group" % self._name)
334 if self._driver and self._driver.has_crashed():
337 additional_results = []
338 if not self._options.run_singly:
339 additional_results = self._do_post_tests_work(self._driver)
341 self._caller.post('finished_test_group', additional_results)
344 _log.debug("%s cleaning up" % self._name)
347 def _timeout(self, test_input):
348 """Compute the appropriate timeout value for a test."""
349 # The DumpRenderTree watchdog uses 2.5x the timeout; we want to be
350 # larger than that. We also add a little more padding if we're
351 # running tests in a separate thread.
353 # Note that we need to convert the test timeout from a
354 # string value in milliseconds to a float for Python.
355 driver_timeout_sec = 3.0 * float(test_input.timeout) / 1000.0
356 if not self._options.run_singly:
357 return driver_timeout_sec
359 thread_padding_sec = 1.0
360 thread_timeout_sec = driver_timeout_sec + thread_padding_sec
361 return thread_timeout_sec
363 def _kill_driver(self):
364 # Be careful about how and when we kill the driver; if driver.stop()
365 # raises an exception, this routine may get re-entered via __del__.
366 driver = self._driver
369 _log.debug("%s killing driver" % self._name)
372 def _run_test_with_or_without_timeout(self, test_input, timeout, stop_when_done):
373 if self._options.run_singly:
374 return self._run_test_in_another_thread(test_input, timeout, stop_when_done)
375 return self._run_test_in_this_thread(test_input, stop_when_done)
377 def _clean_up_after_test(self, test_input, result):
378 test_name = test_input.test_name
381 # Check and kill DumpRenderTree if we need to.
382 if any([f.driver_needs_restart() for f in result.failures]):
384 # Reset the batch count since the shell just bounced.
385 self._batch_count = 0
387 # Print the error message(s).
388 _log.debug("%s %s failed:" % (self._name, test_name))
389 for f in result.failures:
390 _log.debug("%s %s" % (self._name, f.message()))
391 elif result.type == test_expectations.SKIP:
392 _log.debug("%s %s skipped" % (self._name, test_name))
394 _log.debug("%s %s passed" % (self._name, test_name))
396 def _run_test_in_another_thread(self, test_input, thread_timeout_sec, stop_when_done):
397 """Run a test in a separate thread, enforcing a hard time limit.
399 Since we can only detect the termination of a thread, not any internal
400 state or progress, we can only run per-test timeouts when running test
404 test_input: Object containing the test filename and timeout
405 thread_timeout_sec: time to wait before killing the driver process.
411 driver = self._port.create_driver(self._worker_number, self._options.no_timeout)
413 class SingleTestThread(threading.Thread):
415 threading.Thread.__init__(self)
419 self.result = worker._run_single_test(driver, test_input, stop_when_done)
421 thread = SingleTestThread()
423 thread.join(thread_timeout_sec)
424 result = thread.result
427 # If join() returned with the thread still running, the
428 # DumpRenderTree is completely hung and there's nothing
429 # more we can do with it. We have to kill all the
430 # DumpRenderTrees to free it up. If we're running more than
431 # one DumpRenderTree thread, we'll end up killing the other
432 # DumpRenderTrees too, introducing spurious crashes. We accept
433 # that tradeoff in order to avoid losing the rest of this
435 _log.error('Test thread hung: killing all DumpRenderTrees')
436 failures = [test_failures.FailureTimeout()]
438 failure_results = self._do_post_tests_work(driver)
439 for failure_result in failure_results:
440 if failure_result.test_name == result.test_name:
441 result.convert_to_failure(failure_result)
446 result = test_results.TestResult(test_input.test_name, failures=failures, test_run_time=0)
449 def _run_test_in_this_thread(self, test_input, stop_when_done):
450 """Run a single test file using a shared DumpRenderTree process.
453 test_input: Object containing the test filename, uri and timeout
455 Returns: a TestResult object.
457 if self._driver and self._driver.has_crashed():
460 self._driver = self._port.create_driver(self._worker_number, self._options.no_timeout)
461 return self._run_single_test(self._driver, test_input, stop_when_done)
463 def _run_single_test(self, driver, test_input, stop_when_done):
464 return single_test_runner.run_single_test(self._port, self._options, self._results_directory,
465 self._name, driver, test_input, stop_when_done)
468 class TestShard(object):
469 """A test shard is a named list of TestInputs."""
471 def __init__(self, name, test_inputs):
473 self.test_inputs = test_inputs
474 self.needs_servers = test_inputs[0].needs_servers
477 return "TestShard(name='%s', test_inputs=%s, needs_servers=%s'" % (self.name, self.test_inputs, self.needs_servers)
479 def __eq__(self, other):
480 return self.name == other.name and self.test_inputs == other.test_inputs
483 class Sharder(object):
484 def __init__(self, test_split_fn):
485 self._split = test_split_fn
487 def shard_tests(self, test_inputs, num_workers, fully_parallel):
488 """Groups tests into batches.
489 This helps ensure that tests that depend on each other (aka bad tests!)
490 continue to run together as most cross-tests dependencies tend to
491 occur within the same directory.
493 A list of TestShards.
496 # FIXME: Move all of the sharding logic out of manager into its
497 # own class or module. Consider grouping it with the chunking logic
498 # in prepare_lists as well.
500 return [TestShard('all_tests', test_inputs)]
502 return self._shard_every_file(test_inputs)
503 return self._shard_by_directory(test_inputs, num_workers)
505 def _shard_every_file(self, test_inputs):
506 """Returns a list of shards, each shard containing a single test file.
508 This mode gets maximal parallelism at the cost of much higher flakiness."""
510 for test_input in test_inputs:
511 # Note that we use a '.' for the shard name; the name doesn't really
512 # matter, and the only other meaningful value would be the filename,
513 # which would be really redundant.
514 shards.append(TestShard('.', [test_input]))
518 def _shard_by_directory(self, test_inputs, num_workers):
519 """Returns a lists of shards, each shard containing all the files in a directory.
521 This is the default mode, and gets as much parallelism as we can while
522 minimizing flakiness caused by inter-test dependencies."""
525 # FIXME: Given that the tests are already sorted by directory,
526 # we can probably rewrite this to be clearer and faster.
527 for test_input in test_inputs:
528 directory = self._split(test_input.test_name)[0]
529 tests_by_dir.setdefault(directory, [])
530 tests_by_dir[directory].append(test_input)
532 for directory, test_inputs in tests_by_dir.iteritems():
533 shard = TestShard(directory, test_inputs)
536 # Sort the shards by directory name.
537 shards.sort(key=lambda shard: shard.name)