run-webkit-tests prints confusing messages when test expectations list results that...
[WebKit-https.git] / Tools / Scripts / webkitpy / layout_tests / controllers / layout_test_runner.py
1 # Copyright (C) 2011 Google Inc. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or without
4 # modification, are permitted provided that the following conditions are
5 # met:
6 #
7 #     * Redistributions of source code must retain the above copyright
8 # notice, this list of conditions and the following disclaimer.
9 #     * Redistributions in binary form must reproduce the above
10 # copyright notice, this list of conditions and the following disclaimer
11 # in the documentation and/or other materials provided with the
12 # distribution.
13 #     * Neither the name of Google Inc. nor the names of its
14 # contributors may be used to endorse or promote products derived from
15 # this software without specific prior written permission.
16 #
17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
29 import atexit
30 import logging
31 import math
32 import threading
33 import time
34
35 from webkitpy.common import message_pool
36 from webkitpy.layout_tests.controllers import single_test_runner
37 from webkitpy.layout_tests.models.test_run_results import TestRunResults
38 from webkitpy.layout_tests.models import test_expectations
39 from webkitpy.layout_tests.models import test_failures
40 from webkitpy.layout_tests.models import test_results
41 from webkitpy.tool import grammar
42
43
44 _log = logging.getLogger(__name__)
45
46
47 TestExpectations = test_expectations.TestExpectations
48
49 # Export this so callers don't need to know about message pools.
50 WorkerException = message_pool.WorkerException
51
52
53 class TestRunInterruptedException(Exception):
54     """Raised when a test run should be stopped immediately."""
55     def __init__(self, reason):
56         Exception.__init__(self)
57         self.reason = reason
58         self.msg = reason
59
60     def __reduce__(self):
61         return self.__class__, (self.reason,)
62
63
64 class LayoutTestRunner(object):
65     def __init__(self, options, port, printer, results_directory, test_is_slow_fn, needs_http=False, needs_websockets=False, needs_web_platform_test_server=False):
66         self._options = options
67         self._port = port
68         self._printer = printer
69         self._results_directory = results_directory
70         self._test_is_slow = test_is_slow_fn
71         self._needs_http = needs_http
72         self._needs_websockets = needs_websockets
73         self._needs_web_platform_test_server = needs_web_platform_test_server
74
75         self._sharder = Sharder(self._port.split_test)
76         self._filesystem = self._port.host.filesystem
77
78         self._expectations = None
79         self._test_inputs = []
80         self._retrying = False
81         self._current_run_results = None
82         self._did_start_http_server = False
83         self._did_start_websocket_server = False
84         self._did_start_wpt_server = False
85
86         if ((self._needs_http and self._options.http) or self._needs_web_platform_test_server) and self._port.get_option("start_http_servers_if_needed"):
87             self.start_servers()
88             atexit.register(lambda: self.stop_servers())
89
90     def get_worker_count(self, test_inputs, child_process_count):
91         all_shards = self._sharder.shard_tests(test_inputs, child_process_count, self._options.fully_parallel)
92         return min(child_process_count, len(all_shards))
93
94     def run_tests(self, expectations, test_inputs, tests_to_skip, num_workers, retrying):
95         self._expectations = expectations
96         self._test_inputs = test_inputs
97
98         self._retrying = retrying
99
100         # FIXME: rename all variables to test_run_results or some such ...
101         run_results = TestRunResults(self._expectations, len(test_inputs) + len(tests_to_skip))
102         self._current_run_results = run_results
103         self._printer.num_tests = len(test_inputs)
104         self._printer.num_started = 0
105
106         if not retrying:
107             self._printer.print_expected(run_results, self._expectations.model().get_tests_with_result_type)
108
109         for test_name in set(tests_to_skip):
110             result = test_results.TestResult(test_name)
111             result.type = test_expectations.SKIP
112             run_results.add(result, expected=True, test_is_slow=self._test_is_slow(test_name))
113
114         self._printer.write_update('Sharding tests ...')
115         all_shards = self._sharder.shard_tests(test_inputs, int(self._options.child_processes), self._options.fully_parallel)
116
117         self._printer.print_workers_and_shards(num_workers, len(all_shards))
118
119         if self._options.dry_run:
120             return run_results
121
122         self._printer.write_update('Starting %s ...' % grammar.pluralize(num_workers, "worker"))
123
124         try:
125             with message_pool.get(self, self._worker_factory, num_workers, self._port.worker_startup_delay_secs(), self._port.host) as pool:
126                 pool.run(('test_list', shard.name, shard.test_inputs) for shard in all_shards)
127         except TestRunInterruptedException as e:
128             _log.warning(e.reason)
129             run_results.interrupted = True
130         except KeyboardInterrupt:
131             self._printer.flush()
132             self._printer.writeln('Interrupted, exiting ...')
133             run_results.keyboard_interrupted = True
134         except Exception as e:
135             _log.debug('%s("%s") raised, exiting' % (e.__class__.__name__, str(e)))
136             raise
137
138         return run_results
139
140     def _worker_factory(self, worker_connection):
141         results_directory = self._results_directory
142         if self._retrying:
143             self._filesystem.maybe_make_directory(self._filesystem.join(self._results_directory, 'retries'))
144             results_directory = self._filesystem.join(self._results_directory, 'retries')
145         return Worker(worker_connection, results_directory, self._options)
146
147     def _handle_did_spawn_worker(self, worker_number):
148         self._port.did_spawn_worker(worker_number)
149
150     def _mark_interrupted_tests_as_skipped(self, run_results):
151         for test_input in self._test_inputs:
152             if test_input.test_name not in run_results.results_by_name:
153                 result = test_results.TestResult(test_input.test_name, [test_failures.FailureEarlyExit()])
154                 # FIXME: We probably need to loop here if there are multiple iterations.
155                 # FIXME: Also, these results are really neither expected nor unexpected. We probably
156                 # need a third type of result.
157                 run_results.add(result, expected=False, test_is_slow=self._test_is_slow(test_input.test_name))
158
159     def _interrupt_if_at_failure_limits(self, run_results):
160         # Note: The messages in this method are constructed to match old-run-webkit-tests
161         # so that existing buildbot grep rules work.
162         def interrupt_if_at_failure_limit(limit, failure_count, run_results, message):
163             if limit and failure_count >= limit:
164                 message += " %d tests run." % (run_results.expected + run_results.unexpected)
165                 self._mark_interrupted_tests_as_skipped(run_results)
166                 raise TestRunInterruptedException(message)
167
168         interrupt_if_at_failure_limit(
169             self._options.exit_after_n_failures,
170             run_results.unexpected_failures,
171             run_results,
172             "Exiting early after %d failures." % run_results.unexpected_failures)
173         interrupt_if_at_failure_limit(
174             self._options.exit_after_n_crashes_or_timeouts,
175             run_results.unexpected_crashes + run_results.unexpected_timeouts,
176             run_results,
177             # This differs from ORWT because it does not include WebProcess crashes.
178             "Exiting early after %d crashes and %d timeouts." % (run_results.unexpected_crashes, run_results.unexpected_timeouts))
179
180     def _update_summary_with_result(self, run_results, result):
181         if result.type == test_expectations.SKIP:
182             exp_str = got_str = 'SKIP'
183             expected = True
184         else:
185             expectations = self._expectations.filtered_expectations_for_test(result.test_name, self._options.pixel_tests or bool(result.reftest_type), self._options.world_leaks)
186             expected = self._expectations.matches_an_expected_result(result.test_name, result.type, expectations)
187             exp_str = self._expectations.model().expectations_to_string(expectations)
188             got_str = self._expectations.model().expectation_to_string(result.type)
189
190         run_results.add(result, expected, self._test_is_slow(result.test_name))
191
192         self._printer.print_finished_test(result, expected, exp_str, got_str)
193
194         self._interrupt_if_at_failure_limits(run_results)
195
196     def _annotate_results_with_additional_failures(self, run_results, results):
197         for new_result in results:
198             existing_result = run_results.results_by_name.get(new_result.test_name)
199             # When running a chunk (--run-chunk), results_by_name contains all the tests, but (confusingly) all_tests only contains those in the chunk that was run,
200             # and we don't want to modify the results of a test that didn't run. existing_result.test_number is only non-None for tests that ran.
201             if existing_result and existing_result.test_number is not None:
202                 expectations = self._expectations.filtered_expectations_for_test(new_result.test_name, self._options.pixel_tests or bool(new_result.reftest_type), self._options.world_leaks)
203                 was_expected = self._expectations.matches_an_expected_result(new_result.test_name, existing_result.type, expectations)
204                 now_expected = self._expectations.matches_an_expected_result(new_result.test_name, new_result.type, expectations)
205                 run_results.change_result_to_failure(existing_result, new_result, was_expected, now_expected)
206
207     def start_servers(self):
208         if self._needs_http and not self._did_start_http_server and not self._port.is_http_server_running():
209             self._printer.write_update('Starting HTTP server ...')
210             self._port.start_http_server()
211             self._did_start_http_server = True
212         if self._needs_websockets and not self._did_start_websocket_server and not self._port.is_websocket_server_running():
213             self._printer.write_update('Starting WebSocket server ...')
214             self._port.start_websocket_server()
215             self._did_start_websocket_server = True
216         if self._needs_web_platform_test_server and not self._did_start_wpt_server and not self._port.is_wpt_server_running():
217             self._printer.write_update('Starting Web Platform Test server ...')
218             self._port.start_web_platform_test_server()
219             self._did_start_wpt_server = True
220
221     def stop_servers(self):
222         if self._did_start_http_server:
223             self._printer.write_update('Stopping HTTP server ...')
224             self._port.stop_http_server()
225             self._did_start_http_server = False
226         if self._did_start_websocket_server:
227             self._printer.write_update('Stopping WebSocket server ...')
228             self._port.stop_websocket_server()
229             self._did_start_websocket_server = False
230         if self._did_start_wpt_server:
231             self._printer.write_update('Stopping Web Platform Test server ...')
232             self._port.stop_web_platform_test_server()
233             self._did_start_wpt_server = False
234
235     def handle(self, name, source, *args):
236         method = getattr(self, '_handle_' + name)
237         if method:
238             return method(source, *args)
239         raise AssertionError('unknown message %s received from %s, args=%s' % (name, source, repr(args)))
240
241     def _handle_started_test(self, worker_name, test_input, test_timeout_sec):
242         self._printer.print_started_test(test_input.test_name)
243
244     def _handle_finished_test(self, worker_name, result, log_messages=[]):
245         self._update_summary_with_result(self._current_run_results, result)
246
247     def _handle_finished_test_group(self, worker_name, overlay_results, log_messages=[]):
248         self._annotate_results_with_additional_failures(self._current_run_results, overlay_results)
249
250
251 class Worker(object):
252     def __init__(self, caller, results_directory, options):
253         self._caller = caller
254         self._worker_number = caller.worker_number
255         self._name = caller.name
256         self._results_directory = results_directory
257         self._options = options
258
259         # The remaining fields are initialized in start()
260         self._host = None
261         self._port = None
262         self._batch_size = None
263         self._batch_count = None
264         self._filesystem = None
265         self._driver = None
266         self._num_tests = 0
267
268     def __del__(self):
269         self.stop()
270
271     def start(self):
272         """This method is called when the object is starting to be used and it is safe
273         for the object to create state that does not need to be pickled (usually this means
274         it is called in a child process)."""
275         self._host = self._caller.host
276         self._filesystem = self._host.filesystem
277         self._port = self._host.port_factory.get(self._options.platform, self._options)
278
279         self._batch_count = 0
280         self._batch_size = self._options.batch_size or 0
281
282     def handle(self, name, source, test_list_name, test_inputs):
283         assert name == 'test_list'
284         for test_input in test_inputs:
285             self._run_test(test_input, test_list_name)
286
287         self._finished_test_group(test_inputs)
288
289     def _update_test_input(self, test_input):
290         if test_input.reference_files is None:
291             # Lazy initialization.
292             test_input.reference_files = self._port.reference_files(test_input.test_name)
293         if test_input.reference_files:
294             test_input.should_run_pixel_test = True
295         else:
296             test_input.should_run_pixel_test = self._port.should_run_as_pixel_test(test_input)
297
298     def _run_test(self, test_input, shard_name):
299         self._batch_count += 1
300
301         stop_when_done = False
302         if self._batch_size > 0 and self._batch_count >= self._batch_size:
303             self._batch_count = 0
304             stop_when_done = True
305
306         self._update_test_input(test_input)
307         test_timeout_sec = self._timeout(test_input)
308         start = time.time()
309         self._caller.post('started_test', test_input, test_timeout_sec)
310
311         result = self._run_test_with_or_without_timeout(test_input, test_timeout_sec, stop_when_done)
312         result.shard_name = shard_name
313         result.worker_name = self._name
314         result.total_run_time = time.time() - start
315         result.test_number = self._num_tests
316         self._num_tests += 1
317
318         self._caller.post('finished_test', result)
319
320         self._clean_up_after_test(test_input, result)
321
322     def _do_post_tests_work(self, driver):
323         additional_results = []
324         if not driver:
325             return additional_results
326
327         post_test_output = driver.do_post_tests_work()
328         if post_test_output:
329             for test_name, doc_list in post_test_output.world_leaks_dict.iteritems():
330                 additional_results.append(test_results.TestResult(test_name, [test_failures.FailureDocumentLeak(doc_list)]))
331         return additional_results
332
333     def _finished_test_group(self, test_inputs):
334         _log.debug("%s finished test group" % self._name)
335
336         if self._driver and self._driver.has_crashed():
337             self._kill_driver()
338
339         additional_results = []
340         if not self._options.run_singly:
341             additional_results = self._do_post_tests_work(self._driver)
342
343         self._caller.post('finished_test_group', additional_results)
344
345     def stop(self):
346         _log.debug("%s cleaning up" % self._name)
347         self._kill_driver()
348
349     def _timeout(self, test_input):
350         """Compute the appropriate timeout value for a test."""
351         # The DumpRenderTree watchdog uses 2.5x the timeout; we want to be
352         # larger than that. We also add a little more padding if we're
353         # running tests in a separate thread.
354         #
355         # Note that we need to convert the test timeout from a
356         # string value in milliseconds to a float for Python.
357         driver_timeout_sec = 3.0 * float(test_input.timeout) / 1000.0
358         if not self._options.run_singly:
359             return driver_timeout_sec
360
361         thread_padding_sec = 1.0
362         thread_timeout_sec = driver_timeout_sec + thread_padding_sec
363         return thread_timeout_sec
364
365     def _kill_driver(self):
366         # Be careful about how and when we kill the driver; if driver.stop()
367         # raises an exception, this routine may get re-entered via __del__.
368         driver = self._driver
369         self._driver = None
370         if driver:
371             _log.debug("%s killing driver" % self._name)
372             driver.stop()
373
374     def _run_test_with_or_without_timeout(self, test_input, timeout, stop_when_done):
375         if self._options.run_singly:
376             return self._run_test_in_another_thread(test_input, timeout, stop_when_done)
377         return self._run_test_in_this_thread(test_input, stop_when_done)
378
379     def _clean_up_after_test(self, test_input, result):
380         test_name = test_input.test_name
381
382         if result.failures:
383             # Check and kill DumpRenderTree if we need to.
384             if any([f.driver_needs_restart() for f in result.failures]):
385                 self._kill_driver()
386                 # Reset the batch count since the shell just bounced.
387                 self._batch_count = 0
388
389             # Print the error message(s).
390             _log.debug("%s %s failed:" % (self._name, test_name))
391             for f in result.failures:
392                 _log.debug("%s  %s" % (self._name, f.message()))
393         elif result.type == test_expectations.SKIP:
394             _log.debug("%s %s skipped" % (self._name, test_name))
395         else:
396             _log.debug("%s %s passed" % (self._name, test_name))
397
398     def _run_test_in_another_thread(self, test_input, thread_timeout_sec, stop_when_done):
399         """Run a test in a separate thread, enforcing a hard time limit.
400
401         Since we can only detect the termination of a thread, not any internal
402         state or progress, we can only run per-test timeouts when running test
403         files singly.
404
405         Args:
406           test_input: Object containing the test filename and timeout
407           thread_timeout_sec: time to wait before killing the driver process.
408         Returns:
409           A TestResult
410         """
411         worker = self
412
413         driver = self._port.create_driver(self._worker_number, self._options.no_timeout)
414
415         class SingleTestThread(threading.Thread):
416             def __init__(self):
417                 threading.Thread.__init__(self)
418                 self.result = None
419
420             def run(self):
421                 self.result = worker._run_single_test(driver, test_input, stop_when_done)
422
423         thread = SingleTestThread()
424         thread.start()
425         thread.join(thread_timeout_sec)
426         result = thread.result
427         failures = []
428         if thread.isAlive():
429             # If join() returned with the thread still running, the
430             # DumpRenderTree is completely hung and there's nothing
431             # more we can do with it.  We have to kill all the
432             # DumpRenderTrees to free it up. If we're running more than
433             # one DumpRenderTree thread, we'll end up killing the other
434             # DumpRenderTrees too, introducing spurious crashes. We accept
435             # that tradeoff in order to avoid losing the rest of this
436             # thread's results.
437             _log.error('Test thread hung: killing all DumpRenderTrees')
438             failures = [test_failures.FailureTimeout()]
439         else:
440             failure_results = self._do_post_tests_work(driver)
441             for failure_result in failure_results:
442                 if failure_result.test_name == result.test_name:
443                     result.convert_to_failure(failure_result)
444
445         driver.stop()
446
447         if not result:
448             result = test_results.TestResult(test_input.test_name, failures=failures, test_run_time=0)
449         return result
450
451     def _run_test_in_this_thread(self, test_input, stop_when_done):
452         """Run a single test file using a shared DumpRenderTree process.
453
454         Args:
455           test_input: Object containing the test filename, uri and timeout
456
457         Returns: a TestResult object.
458         """
459         if self._driver and self._driver.has_crashed():
460             self._kill_driver()
461         if not self._driver:
462             self._driver = self._port.create_driver(self._worker_number, self._options.no_timeout)
463         return self._run_single_test(self._driver, test_input, stop_when_done)
464
465     def _run_single_test(self, driver, test_input, stop_when_done):
466         return single_test_runner.run_single_test(self._port, self._options, self._results_directory,
467             self._name, driver, test_input, stop_when_done)
468
469
470 class TestShard(object):
471     """A test shard is a named list of TestInputs."""
472
473     def __init__(self, name, test_inputs):
474         self.name = name
475         self.test_inputs = test_inputs
476         self.needs_servers = test_inputs[0].needs_servers
477
478     def __repr__(self):
479         return "TestShard(name='%s', test_inputs=%s, needs_servers=%s'" % (self.name, self.test_inputs, self.needs_servers)
480
481     def __eq__(self, other):
482         return self.name == other.name and self.test_inputs == other.test_inputs
483
484
485 class Sharder(object):
486     def __init__(self, test_split_fn):
487         self._split = test_split_fn
488
489     def shard_tests(self, test_inputs, num_workers, fully_parallel):
490         """Groups tests into batches.
491         This helps ensure that tests that depend on each other (aka bad tests!)
492         continue to run together as most cross-tests dependencies tend to
493         occur within the same directory.
494         Return:
495             A list of TestShards.
496         """
497
498         # FIXME: Move all of the sharding logic out of manager into its
499         # own class or module. Consider grouping it with the chunking logic
500         # in prepare_lists as well.
501         if num_workers == 1:
502             return [TestShard('all_tests', test_inputs)]
503         elif fully_parallel:
504             return self._shard_every_file(test_inputs)
505         return self._shard_by_directory(test_inputs, num_workers)
506
507     def _shard_every_file(self, test_inputs):
508         """Returns a list of shards, each shard containing a single test file.
509
510         This mode gets maximal parallelism at the cost of much higher flakiness."""
511         shards = []
512         for test_input in test_inputs:
513             # Note that we use a '.' for the shard name; the name doesn't really
514             # matter, and the only other meaningful value would be the filename,
515             # which would be really redundant.
516             shards.append(TestShard('.', [test_input]))
517
518         return shards
519
520     def _shard_by_directory(self, test_inputs, num_workers):
521         """Returns a lists of shards, each shard containing all the files in a directory.
522
523         This is the default mode, and gets as much parallelism as we can while
524         minimizing flakiness caused by inter-test dependencies."""
525         shards = []
526         tests_by_dir = {}
527         # FIXME: Given that the tests are already sorted by directory,
528         # we can probably rewrite this to be clearer and faster.
529         for test_input in test_inputs:
530             directory = self._split(test_input.test_name)[0]
531             tests_by_dir.setdefault(directory, [])
532             tests_by_dir[directory].append(test_input)
533
534         for directory, test_inputs in tests_by_dir.iteritems():
535             shard = TestShard(directory, test_inputs)
536             shards.append(shard)
537
538         # Sort the shards by directory name.
539         shards.sort(key=lambda shard: shard.name)
540
541         return shards