4b19974d9874dd826d3ab5d6db2b80eed99d1412
[WebKit-https.git] / Tools / Scripts / webkitpy / layout_tests / controllers / layout_test_runner.py
1 # Copyright (C) 2011 Google Inc. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or without
4 # modification, are permitted provided that the following conditions are
5 # met:
6 #
7 #     * Redistributions of source code must retain the above copyright
8 # notice, this list of conditions and the following disclaimer.
9 #     * Redistributions in binary form must reproduce the above
10 # copyright notice, this list of conditions and the following disclaimer
11 # in the documentation and/or other materials provided with the
12 # distribution.
13 #     * Neither the name of Google Inc. nor the names of its
14 # contributors may be used to endorse or promote products derived from
15 # this software without specific prior written permission.
16 #
17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
29 import atexit
30 import logging
31 import math
32 import threading
33 import time
34
35 from webkitpy.common import message_pool
36 from webkitpy.layout_tests.controllers import single_test_runner
37 from webkitpy.layout_tests.models.test_run_results import TestRunResults
38 from webkitpy.layout_tests.models import test_expectations
39 from webkitpy.layout_tests.models import test_failures
40 from webkitpy.layout_tests.models import test_results
41 from webkitpy.tool import grammar
42
43
44 _log = logging.getLogger(__name__)
45
46
47 TestExpectations = test_expectations.TestExpectations
48
49 # Export this so callers don't need to know about message pools.
50 WorkerException = message_pool.WorkerException
51
52
53 class TestRunInterruptedException(Exception):
54     """Raised when a test run should be stopped immediately."""
55     def __init__(self, reason):
56         Exception.__init__(self)
57         self.reason = reason
58         self.msg = reason
59
60     def __reduce__(self):
61         return self.__class__, (self.reason,)
62
63
64 class LayoutTestRunner(object):
65     def __init__(self, options, port, printer, results_directory, test_is_slow_fn, needs_http=False, needs_websockets=False, needs_web_platform_test_server=False):
66         self._options = options
67         self._port = port
68         self._printer = printer
69         self._results_directory = results_directory
70         self._test_is_slow = test_is_slow_fn
71         self._needs_http = needs_http
72         self._needs_websockets = needs_websockets
73         self._needs_web_platform_test_server = needs_web_platform_test_server
74
75         self._sharder = Sharder(self._port.split_test)
76         self._filesystem = self._port.host.filesystem
77
78         self._expectations = None
79         self._test_inputs = []
80         self._retrying = False
81         self._current_run_results = None
82         self._did_start_http_server = False
83         self._did_start_websocket_server = False
84         self._did_start_wpt_server = False
85
86         if ((self._needs_http and self._options.http) or self._needs_web_platform_test_server) and self._port.get_option("start_http_servers_if_needed"):
87             self.start_servers()
88             atexit.register(lambda: self.stop_servers())
89
90     def get_worker_count(self, test_inputs, child_process_count):
91         all_shards = self._sharder.shard_tests(test_inputs, child_process_count, self._options.fully_parallel)
92         return min(child_process_count, len(all_shards))
93
94     def run_tests(self, expectations, test_inputs, tests_to_skip, num_workers, retrying):
95         self._expectations = expectations
96         self._test_inputs = test_inputs
97
98         self._retrying = retrying
99
100         # FIXME: rename all variables to test_run_results or some such ...
101         run_results = TestRunResults(self._expectations, len(test_inputs) + len(tests_to_skip))
102         self._current_run_results = run_results
103         self._printer.num_tests = len(test_inputs)
104         self._printer.num_started = 0
105
106         if not retrying:
107             self._printer.print_expected(run_results, self._expectations.model().get_tests_with_result_type)
108
109         for test_name in set(tests_to_skip):
110             result = test_results.TestResult(test_name)
111             result.type = test_expectations.SKIP
112             run_results.add(result, expected=True, test_is_slow=self._test_is_slow(test_name))
113
114         self._printer.write_update('Sharding tests ...')
115         all_shards = self._sharder.shard_tests(test_inputs, int(self._options.child_processes), self._options.fully_parallel)
116
117         self._printer.print_workers_and_shards(num_workers, len(all_shards))
118
119         if self._options.dry_run:
120             return run_results
121
122         self._printer.write_update('Starting %s ...' % grammar.pluralize(num_workers, "worker"))
123
124         try:
125             with message_pool.get(self, self._worker_factory, num_workers, self._port.worker_startup_delay_secs(), self._port.host) as pool:
126                 pool.run(('test_list', shard.name, shard.test_inputs) for shard in all_shards)
127         except TestRunInterruptedException as e:
128             _log.warning(e.reason)
129             run_results.interrupted = True
130         except KeyboardInterrupt:
131             self._printer.flush()
132             self._printer.writeln('Interrupted, exiting ...')
133             run_results.keyboard_interrupted = True
134         except Exception as e:
135             _log.debug('%s("%s") raised, exiting' % (e.__class__.__name__, str(e)))
136             raise
137
138         return run_results
139
140     def _worker_factory(self, worker_connection):
141         results_directory = self._results_directory
142         if self._retrying:
143             self._filesystem.maybe_make_directory(self._filesystem.join(self._results_directory, 'retries'))
144             results_directory = self._filesystem.join(self._results_directory, 'retries')
145         return Worker(worker_connection, results_directory, self._options)
146
147     def _handle_did_spawn_worker(self, worker_number):
148         self._port.did_spawn_worker(worker_number)
149
150     def _mark_interrupted_tests_as_skipped(self, run_results):
151         for test_input in self._test_inputs:
152             if test_input.test_name not in run_results.results_by_name:
153                 result = test_results.TestResult(test_input.test_name, [test_failures.FailureEarlyExit()])
154                 # FIXME: We probably need to loop here if there are multiple iterations.
155                 # FIXME: Also, these results are really neither expected nor unexpected. We probably
156                 # need a third type of result.
157                 run_results.add(result, expected=False, test_is_slow=self._test_is_slow(test_input.test_name))
158
159     def _interrupt_if_at_failure_limits(self, run_results):
160         # Note: The messages in this method are constructed to match old-run-webkit-tests
161         # so that existing buildbot grep rules work.
162         def interrupt_if_at_failure_limit(limit, failure_count, run_results, message):
163             if limit and failure_count >= limit:
164                 message += " %d tests run." % (run_results.expected + run_results.unexpected)
165                 self._mark_interrupted_tests_as_skipped(run_results)
166                 raise TestRunInterruptedException(message)
167
168         interrupt_if_at_failure_limit(
169             self._options.exit_after_n_failures,
170             run_results.unexpected_failures,
171             run_results,
172             "Exiting early after %d failures." % run_results.unexpected_failures)
173         interrupt_if_at_failure_limit(
174             self._options.exit_after_n_crashes_or_timeouts,
175             run_results.unexpected_crashes + run_results.unexpected_timeouts,
176             run_results,
177             # This differs from ORWT because it does not include WebProcess crashes.
178             "Exiting early after %d crashes and %d timeouts." % (run_results.unexpected_crashes, run_results.unexpected_timeouts))
179
180     def _update_summary_with_result(self, run_results, result):
181         if result.type == test_expectations.SKIP:
182             exp_str = got_str = 'SKIP'
183             expected = True
184         else:
185             expected = self._expectations.matches_an_expected_result(result.test_name, result.type, self._options.pixel_tests or result.reftest_type, self._options.world_leaks)
186             exp_str = self._expectations.model().get_expectations_string(result.test_name)
187             got_str = self._expectations.model().expectation_to_string(result.type)
188
189         run_results.add(result, expected, self._test_is_slow(result.test_name))
190
191         self._printer.print_finished_test(result, expected, exp_str, got_str)
192
193         self._interrupt_if_at_failure_limits(run_results)
194
195     def _annotate_results_with_additional_failures(self, run_results, results):
196         for new_result in results:
197             existing_result = run_results.results_by_name.get(new_result.test_name)
198             # When running a chunk (--run-chunk), results_by_name contains all the tests, but (confusingly) all_tests only contains those in the chunk that was run,
199             # and we don't want to modify the results of a test that didn't run. existing_result.test_number is only non-None for tests that ran.
200             if existing_result and existing_result.test_number is not None:
201                 was_expected = self._expectations.matches_an_expected_result(new_result.test_name, existing_result.type, self._options.pixel_tests or existing_result.reftest_type, self._options.world_leaks)
202                 now_expected = self._expectations.matches_an_expected_result(new_result.test_name, new_result.type, self._options.pixel_tests or new_result.reftest_type, self._options.world_leaks)
203                 run_results.change_result_to_failure(existing_result, new_result, was_expected, now_expected)
204
205     def start_servers(self):
206         if self._needs_http and not self._did_start_http_server and not self._port.is_http_server_running():
207             self._printer.write_update('Starting HTTP server ...')
208             self._port.start_http_server()
209             self._did_start_http_server = True
210         if self._needs_websockets and not self._did_start_websocket_server and not self._port.is_websocket_server_running():
211             self._printer.write_update('Starting WebSocket server ...')
212             self._port.start_websocket_server()
213             self._did_start_websocket_server = True
214         if self._needs_web_platform_test_server and not self._did_start_wpt_server and not self._port.is_wpt_server_running():
215             self._printer.write_update('Starting Web Platform Test server ...')
216             self._port.start_web_platform_test_server()
217             self._did_start_wpt_server = True
218
219     def stop_servers(self):
220         if self._did_start_http_server:
221             self._printer.write_update('Stopping HTTP server ...')
222             self._port.stop_http_server()
223             self._did_start_http_server = False
224         if self._did_start_websocket_server:
225             self._printer.write_update('Stopping WebSocket server ...')
226             self._port.stop_websocket_server()
227             self._did_start_websocket_server = False
228         if self._did_start_wpt_server:
229             self._printer.write_update('Stopping Web Platform Test server ...')
230             self._port.stop_web_platform_test_server()
231             self._did_start_wpt_server = False
232
233     def handle(self, name, source, *args):
234         method = getattr(self, '_handle_' + name)
235         if method:
236             return method(source, *args)
237         raise AssertionError('unknown message %s received from %s, args=%s' % (name, source, repr(args)))
238
239     def _handle_started_test(self, worker_name, test_input, test_timeout_sec):
240         self._printer.print_started_test(test_input.test_name)
241
242     def _handle_finished_test(self, worker_name, result, log_messages=[]):
243         self._update_summary_with_result(self._current_run_results, result)
244
245     def _handle_finished_test_group(self, worker_name, overlay_results, log_messages=[]):
246         self._annotate_results_with_additional_failures(self._current_run_results, overlay_results)
247
248
249 class Worker(object):
250     def __init__(self, caller, results_directory, options):
251         self._caller = caller
252         self._worker_number = caller.worker_number
253         self._name = caller.name
254         self._results_directory = results_directory
255         self._options = options
256
257         # The remaining fields are initialized in start()
258         self._host = None
259         self._port = None
260         self._batch_size = None
261         self._batch_count = None
262         self._filesystem = None
263         self._driver = None
264         self._num_tests = 0
265
266     def __del__(self):
267         self.stop()
268
269     def start(self):
270         """This method is called when the object is starting to be used and it is safe
271         for the object to create state that does not need to be pickled (usually this means
272         it is called in a child process)."""
273         self._host = self._caller.host
274         self._filesystem = self._host.filesystem
275         self._port = self._host.port_factory.get(self._options.platform, self._options)
276
277         self._batch_count = 0
278         self._batch_size = self._options.batch_size or 0
279
280     def handle(self, name, source, test_list_name, test_inputs):
281         assert name == 'test_list'
282         for test_input in test_inputs:
283             self._run_test(test_input, test_list_name)
284
285         self._finished_test_group(test_inputs)
286
287     def _update_test_input(self, test_input):
288         if test_input.reference_files is None:
289             # Lazy initialization.
290             test_input.reference_files = self._port.reference_files(test_input.test_name)
291         if test_input.reference_files:
292             test_input.should_run_pixel_test = True
293         else:
294             test_input.should_run_pixel_test = self._port.should_run_as_pixel_test(test_input)
295
296     def _run_test(self, test_input, shard_name):
297         self._batch_count += 1
298
299         stop_when_done = False
300         if self._batch_size > 0 and self._batch_count >= self._batch_size:
301             self._batch_count = 0
302             stop_when_done = True
303
304         self._update_test_input(test_input)
305         test_timeout_sec = self._timeout(test_input)
306         start = time.time()
307         self._caller.post('started_test', test_input, test_timeout_sec)
308
309         result = self._run_test_with_or_without_timeout(test_input, test_timeout_sec, stop_when_done)
310         result.shard_name = shard_name
311         result.worker_name = self._name
312         result.total_run_time = time.time() - start
313         result.test_number = self._num_tests
314         self._num_tests += 1
315
316         self._caller.post('finished_test', result)
317
318         self._clean_up_after_test(test_input, result)
319
320     def _do_post_tests_work(self, driver):
321         additional_results = []
322         if not driver:
323             return additional_results
324
325         post_test_output = driver.do_post_tests_work()
326         if post_test_output:
327             for test_name, doc_list in post_test_output.world_leaks_dict.iteritems():
328                 additional_results.append(test_results.TestResult(test_name, [test_failures.FailureDocumentLeak(doc_list)]))
329         return additional_results
330
331     def _finished_test_group(self, test_inputs):
332         _log.debug("%s finished test group" % self._name)
333
334         if self._driver and self._driver.has_crashed():
335             self._kill_driver()
336
337         additional_results = []
338         if not self._options.run_singly:
339             additional_results = self._do_post_tests_work(self._driver)
340
341         self._caller.post('finished_test_group', additional_results)
342
343     def stop(self):
344         _log.debug("%s cleaning up" % self._name)
345         self._kill_driver()
346
347     def _timeout(self, test_input):
348         """Compute the appropriate timeout value for a test."""
349         # The DumpRenderTree watchdog uses 2.5x the timeout; we want to be
350         # larger than that. We also add a little more padding if we're
351         # running tests in a separate thread.
352         #
353         # Note that we need to convert the test timeout from a
354         # string value in milliseconds to a float for Python.
355         driver_timeout_sec = 3.0 * float(test_input.timeout) / 1000.0
356         if not self._options.run_singly:
357             return driver_timeout_sec
358
359         thread_padding_sec = 1.0
360         thread_timeout_sec = driver_timeout_sec + thread_padding_sec
361         return thread_timeout_sec
362
363     def _kill_driver(self):
364         # Be careful about how and when we kill the driver; if driver.stop()
365         # raises an exception, this routine may get re-entered via __del__.
366         driver = self._driver
367         self._driver = None
368         if driver:
369             _log.debug("%s killing driver" % self._name)
370             driver.stop()
371
372     def _run_test_with_or_without_timeout(self, test_input, timeout, stop_when_done):
373         if self._options.run_singly:
374             return self._run_test_in_another_thread(test_input, timeout, stop_when_done)
375         return self._run_test_in_this_thread(test_input, stop_when_done)
376
377     def _clean_up_after_test(self, test_input, result):
378         test_name = test_input.test_name
379
380         if result.failures:
381             # Check and kill DumpRenderTree if we need to.
382             if any([f.driver_needs_restart() for f in result.failures]):
383                 self._kill_driver()
384                 # Reset the batch count since the shell just bounced.
385                 self._batch_count = 0
386
387             # Print the error message(s).
388             _log.debug("%s %s failed:" % (self._name, test_name))
389             for f in result.failures:
390                 _log.debug("%s  %s" % (self._name, f.message()))
391         elif result.type == test_expectations.SKIP:
392             _log.debug("%s %s skipped" % (self._name, test_name))
393         else:
394             _log.debug("%s %s passed" % (self._name, test_name))
395
396     def _run_test_in_another_thread(self, test_input, thread_timeout_sec, stop_when_done):
397         """Run a test in a separate thread, enforcing a hard time limit.
398
399         Since we can only detect the termination of a thread, not any internal
400         state or progress, we can only run per-test timeouts when running test
401         files singly.
402
403         Args:
404           test_input: Object containing the test filename and timeout
405           thread_timeout_sec: time to wait before killing the driver process.
406         Returns:
407           A TestResult
408         """
409         worker = self
410
411         driver = self._port.create_driver(self._worker_number, self._options.no_timeout)
412
413         class SingleTestThread(threading.Thread):
414             def __init__(self):
415                 threading.Thread.__init__(self)
416                 self.result = None
417
418             def run(self):
419                 self.result = worker._run_single_test(driver, test_input, stop_when_done)
420
421         thread = SingleTestThread()
422         thread.start()
423         thread.join(thread_timeout_sec)
424         result = thread.result
425         failures = []
426         if thread.isAlive():
427             # If join() returned with the thread still running, the
428             # DumpRenderTree is completely hung and there's nothing
429             # more we can do with it.  We have to kill all the
430             # DumpRenderTrees to free it up. If we're running more than
431             # one DumpRenderTree thread, we'll end up killing the other
432             # DumpRenderTrees too, introducing spurious crashes. We accept
433             # that tradeoff in order to avoid losing the rest of this
434             # thread's results.
435             _log.error('Test thread hung: killing all DumpRenderTrees')
436             failures = [test_failures.FailureTimeout()]
437         else:
438             failure_results = self._do_post_tests_work(driver)
439             for failure_result in failure_results:
440                 if failure_result.test_name == result.test_name:
441                     result.convert_to_failure(failure_result)
442
443         driver.stop()
444
445         if not result:
446             result = test_results.TestResult(test_input.test_name, failures=failures, test_run_time=0)
447         return result
448
449     def _run_test_in_this_thread(self, test_input, stop_when_done):
450         """Run a single test file using a shared DumpRenderTree process.
451
452         Args:
453           test_input: Object containing the test filename, uri and timeout
454
455         Returns: a TestResult object.
456         """
457         if self._driver and self._driver.has_crashed():
458             self._kill_driver()
459         if not self._driver:
460             self._driver = self._port.create_driver(self._worker_number, self._options.no_timeout)
461         return self._run_single_test(self._driver, test_input, stop_when_done)
462
463     def _run_single_test(self, driver, test_input, stop_when_done):
464         return single_test_runner.run_single_test(self._port, self._options, self._results_directory,
465             self._name, driver, test_input, stop_when_done)
466
467
468 class TestShard(object):
469     """A test shard is a named list of TestInputs."""
470
471     def __init__(self, name, test_inputs):
472         self.name = name
473         self.test_inputs = test_inputs
474         self.needs_servers = test_inputs[0].needs_servers
475
476     def __repr__(self):
477         return "TestShard(name='%s', test_inputs=%s, needs_servers=%s'" % (self.name, self.test_inputs, self.needs_servers)
478
479     def __eq__(self, other):
480         return self.name == other.name and self.test_inputs == other.test_inputs
481
482
483 class Sharder(object):
484     def __init__(self, test_split_fn):
485         self._split = test_split_fn
486
487     def shard_tests(self, test_inputs, num_workers, fully_parallel):
488         """Groups tests into batches.
489         This helps ensure that tests that depend on each other (aka bad tests!)
490         continue to run together as most cross-tests dependencies tend to
491         occur within the same directory.
492         Return:
493             A list of TestShards.
494         """
495
496         # FIXME: Move all of the sharding logic out of manager into its
497         # own class or module. Consider grouping it with the chunking logic
498         # in prepare_lists as well.
499         if num_workers == 1:
500             return [TestShard('all_tests', test_inputs)]
501         elif fully_parallel:
502             return self._shard_every_file(test_inputs)
503         return self._shard_by_directory(test_inputs, num_workers)
504
505     def _shard_every_file(self, test_inputs):
506         """Returns a list of shards, each shard containing a single test file.
507
508         This mode gets maximal parallelism at the cost of much higher flakiness."""
509         shards = []
510         for test_input in test_inputs:
511             # Note that we use a '.' for the shard name; the name doesn't really
512             # matter, and the only other meaningful value would be the filename,
513             # which would be really redundant.
514             shards.append(TestShard('.', [test_input]))
515
516         return shards
517
518     def _shard_by_directory(self, test_inputs, num_workers):
519         """Returns a lists of shards, each shard containing all the files in a directory.
520
521         This is the default mode, and gets as much parallelism as we can while
522         minimizing flakiness caused by inter-test dependencies."""
523         shards = []
524         tests_by_dir = {}
525         # FIXME: Given that the tests are already sorted by directory,
526         # we can probably rewrite this to be clearer and faster.
527         for test_input in test_inputs:
528             directory = self._split(test_input.test_name)[0]
529             tests_by_dir.setdefault(directory, [])
530             tests_by_dir[directory].append(test_input)
531
532         for directory, test_inputs in tests_by_dir.iteritems():
533             shard = TestShard(directory, test_inputs)
534             shards.append(shard)
535
536         # Sort the shards by directory name.
537         shards.sort(key=lambda shard: shard.name)
538
539         return shards