REGRESSION (r217572): run-webkit-tests exits without emitting newline character
[WebKit.git] / Tools / Scripts / webkitpy / layout_tests / controllers / layout_test_runner.py
1 # Copyright (C) 2011 Google Inc. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or without
4 # modification, are permitted provided that the following conditions are
5 # met:
6 #
7 #     * Redistributions of source code must retain the above copyright
8 # notice, this list of conditions and the following disclaimer.
9 #     * Redistributions in binary form must reproduce the above
10 # copyright notice, this list of conditions and the following disclaimer
11 # in the documentation and/or other materials provided with the
12 # distribution.
13 #     * Neither the name of Google Inc. nor the names of its
14 # contributors may be used to endorse or promote products derived from
15 # this software without specific prior written permission.
16 #
17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
29 import atexit
30 import logging
31 import math
32 import threading
33 import time
34
35 from webkitpy.common import message_pool
36 from webkitpy.layout_tests.controllers import single_test_runner
37 from webkitpy.layout_tests.models.test_run_results import TestRunResults
38 from webkitpy.layout_tests.models import test_expectations
39 from webkitpy.layout_tests.models import test_failures
40 from webkitpy.layout_tests.models import test_results
41 from webkitpy.tool import grammar
42
43
44 _log = logging.getLogger(__name__)
45
46
47 TestExpectations = test_expectations.TestExpectations
48
49 # Export this so callers don't need to know about message pools.
50 WorkerException = message_pool.WorkerException
51
52
53 class TestRunInterruptedException(Exception):
54     """Raised when a test run should be stopped immediately."""
55     def __init__(self, reason):
56         Exception.__init__(self)
57         self.reason = reason
58         self.msg = reason
59
60     def __reduce__(self):
61         return self.__class__, (self.reason,)
62
63
64 class LayoutTestRunner(object):
65     def __init__(self, options, port, printer, results_directory, test_is_slow_fn, needs_http=False, needs_websockets=False, needs_web_platform_test_server=False):
66         self._options = options
67         self._port = port
68         self._printer = printer
69         self._results_directory = results_directory
70         self._test_is_slow = test_is_slow_fn
71         self._needs_http = needs_http
72         self._needs_websockets = needs_websockets
73         self._needs_web_platform_test_server = needs_web_platform_test_server
74
75         self._sharder = Sharder(self._port.split_test)
76         self._filesystem = self._port.host.filesystem
77
78         self._expectations = None
79         self._test_inputs = []
80         self._retrying = False
81         self._current_run_results = None
82         self._did_start_http_server = False
83         self._did_start_websocket_server = False
84         self._did_start_wpt_server = False
85
86         if ((self._needs_http and self._options.http) or self._needs_web_platform_test_server) and self._port.get_option("start_http_servers_if_needed"):
87             self.start_servers()
88             atexit.register(lambda: self.stop_servers())
89
90     def get_worker_count(self, test_inputs, child_process_count):
91         all_shards = self._sharder.shard_tests(test_inputs, child_process_count, self._options.fully_parallel)
92         return min(child_process_count, len(all_shards))
93
94     def run_tests(self, expectations, test_inputs, tests_to_skip, num_workers, retrying):
95         self._expectations = expectations
96         self._test_inputs = test_inputs
97
98         self._retrying = retrying
99
100         # FIXME: rename all variables to test_run_results or some such ...
101         run_results = TestRunResults(self._expectations, len(test_inputs) + len(tests_to_skip))
102         self._current_run_results = run_results
103         self._printer.num_tests = len(test_inputs)
104         self._printer.num_started = 0
105
106         if not retrying:
107             self._printer.print_expected(run_results, self._expectations.model().get_tests_with_result_type)
108
109         for test_name in set(tests_to_skip):
110             result = test_results.TestResult(test_name)
111             result.type = test_expectations.SKIP
112             run_results.add(result, expected=True, test_is_slow=self._test_is_slow(test_name))
113
114         self._printer.write_update('Sharding tests ...')
115         all_shards = self._sharder.shard_tests(test_inputs, int(self._options.child_processes), self._options.fully_parallel)
116
117         self._printer.print_workers_and_shards(num_workers, len(all_shards))
118
119         if self._options.dry_run:
120             return run_results
121
122         self._printer.write_update('Starting %s ...' % grammar.pluralize(num_workers, "worker"))
123
124         try:
125             with message_pool.get(self, self._worker_factory, num_workers, self._port.worker_startup_delay_secs(), self._port.host) as pool:
126                 pool.run(('test_list', shard.name, shard.test_inputs) for shard in all_shards)
127         except TestRunInterruptedException as e:
128             _log.warning(e.reason)
129             run_results.interrupted = True
130         except KeyboardInterrupt:
131             self._printer.flush()
132             self._printer.writeln('Interrupted, exiting ...')
133             run_results.keyboard_interrupted = True
134         except Exception as e:
135             _log.debug('%s("%s") raised, exiting' % (e.__class__.__name__, str(e)))
136             raise
137
138         return run_results
139
140     def _worker_factory(self, worker_connection):
141         results_directory = self._results_directory
142         if self._retrying:
143             self._filesystem.maybe_make_directory(self._filesystem.join(self._results_directory, 'retries'))
144             results_directory = self._filesystem.join(self._results_directory, 'retries')
145         return Worker(worker_connection, results_directory, self._options)
146
147     def _handle_did_spawn_worker(self, worker_number):
148         self._port.did_spawn_worker(worker_number)
149
150     def _mark_interrupted_tests_as_skipped(self, run_results):
151         for test_input in self._test_inputs:
152             if test_input.test_name not in run_results.results_by_name:
153                 result = test_results.TestResult(test_input.test_name, [test_failures.FailureEarlyExit()])
154                 # FIXME: We probably need to loop here if there are multiple iterations.
155                 # FIXME: Also, these results are really neither expected nor unexpected. We probably
156                 # need a third type of result.
157                 run_results.add(result, expected=False, test_is_slow=self._test_is_slow(test_input.test_name))
158
159     def _interrupt_if_at_failure_limits(self, run_results):
160         # Note: The messages in this method are constructed to match old-run-webkit-tests
161         # so that existing buildbot grep rules work.
162         def interrupt_if_at_failure_limit(limit, failure_count, run_results, message):
163             if limit and failure_count >= limit:
164                 message += " %d tests run." % (run_results.expected + run_results.unexpected)
165                 self._mark_interrupted_tests_as_skipped(run_results)
166                 raise TestRunInterruptedException(message)
167
168         interrupt_if_at_failure_limit(
169             self._options.exit_after_n_failures,
170             run_results.unexpected_failures,
171             run_results,
172             "Exiting early after %d failures." % run_results.unexpected_failures)
173         interrupt_if_at_failure_limit(
174             self._options.exit_after_n_crashes_or_timeouts,
175             run_results.unexpected_crashes + run_results.unexpected_timeouts,
176             run_results,
177             # This differs from ORWT because it does not include WebProcess crashes.
178             "Exiting early after %d crashes and %d timeouts." % (run_results.unexpected_crashes, run_results.unexpected_timeouts))
179
180     def _update_summary_with_result(self, run_results, result):
181         if result.type == test_expectations.SKIP:
182             exp_str = got_str = 'SKIP'
183             expected = True
184         else:
185             expected = self._expectations.matches_an_expected_result(result.test_name, result.type, self._options.pixel_tests or result.reftest_type)
186             exp_str = self._expectations.model().get_expectations_string(result.test_name)
187             got_str = self._expectations.model().expectation_to_string(result.type)
188
189         run_results.add(result, expected, self._test_is_slow(result.test_name))
190
191         self._printer.print_finished_test(result, expected, exp_str, got_str)
192
193         self._interrupt_if_at_failure_limits(run_results)
194
195     def start_servers(self):
196         if self._needs_http and not self._did_start_http_server and not self._port.is_http_server_running():
197             self._printer.write_update('Starting HTTP server ...')
198             self._port.start_http_server()
199             self._did_start_http_server = True
200         if self._needs_websockets and not self._did_start_websocket_server and not self._port.is_websocket_server_running():
201             self._printer.write_update('Starting WebSocket server ...')
202             self._port.start_websocket_server()
203             self._did_start_websocket_server = True
204         if self._needs_web_platform_test_server and not self._did_start_wpt_server and not self._port.is_wpt_server_running():
205             self._printer.write_update('Starting Web Platform Test server ...')
206             self._port.start_web_platform_test_server()
207             self._did_start_wpt_server = True
208
209     def stop_servers(self):
210         if self._did_start_http_server:
211             self._printer.write_update('Stopping HTTP server ...')
212             self._port.stop_http_server()
213             self._did_start_http_server = False
214         if self._did_start_websocket_server:
215             self._printer.write_update('Stopping WebSocket server ...')
216             self._port.stop_websocket_server()
217             self._did_start_websocket_server = False
218         if self._did_start_wpt_server:
219             self._printer.write_update('Stopping Web Platform Test server ...')
220             self._port.stop_web_platform_test_server()
221             self._did_start_wpt_server = False
222
223     def handle(self, name, source, *args):
224         method = getattr(self, '_handle_' + name)
225         if method:
226             return method(source, *args)
227         raise AssertionError('unknown message %s received from %s, args=%s' % (name, source, repr(args)))
228
229     def _handle_started_test(self, worker_name, test_input, test_timeout_sec):
230         self._printer.print_started_test(test_input.test_name)
231
232     def _handle_finished_test(self, worker_name, result, log_messages=[]):
233         self._update_summary_with_result(self._current_run_results, result)
234
235
236 class Worker(object):
237     def __init__(self, caller, results_directory, options):
238         self._caller = caller
239         self._worker_number = caller.worker_number
240         self._name = caller.name
241         self._results_directory = results_directory
242         self._options = options
243
244         # The remaining fields are initialized in start()
245         self._host = None
246         self._port = None
247         self._batch_size = None
248         self._batch_count = None
249         self._filesystem = None
250         self._driver = None
251         self._num_tests = 0
252
253     def __del__(self):
254         self.stop()
255
256     def start(self):
257         """This method is called when the object is starting to be used and it is safe
258         for the object to create state that does not need to be pickled (usually this means
259         it is called in a child process)."""
260         self._host = self._caller.host
261         self._filesystem = self._host.filesystem
262         self._port = self._host.port_factory.get(self._options.platform, self._options)
263
264         self._batch_count = 0
265         self._batch_size = self._options.batch_size or 0
266
267     def handle(self, name, source, test_list_name, test_inputs):
268         assert name == 'test_list'
269         for test_input in test_inputs:
270             self._run_test(test_input, test_list_name)
271
272     def _update_test_input(self, test_input):
273         if test_input.reference_files is None:
274             # Lazy initialization.
275             test_input.reference_files = self._port.reference_files(test_input.test_name)
276         if test_input.reference_files:
277             test_input.should_run_pixel_test = True
278         else:
279             test_input.should_run_pixel_test = self._port.should_run_as_pixel_test(test_input)
280
281     def _run_test(self, test_input, shard_name):
282         self._batch_count += 1
283
284         stop_when_done = False
285         if self._batch_size > 0 and self._batch_count >= self._batch_size:
286             self._batch_count = 0
287             stop_when_done = True
288
289         self._update_test_input(test_input)
290         test_timeout_sec = self._timeout(test_input)
291         start = time.time()
292         self._caller.post('started_test', test_input, test_timeout_sec)
293
294         result = self._run_test_with_or_without_timeout(test_input, test_timeout_sec, stop_when_done)
295         result.shard_name = shard_name
296         result.worker_name = self._name
297         result.total_run_time = time.time() - start
298         result.test_number = self._num_tests
299         self._num_tests += 1
300
301         self._caller.post('finished_test', result)
302
303         self._clean_up_after_test(test_input, result)
304
305     def stop(self):
306         _log.debug("%s cleaning up" % self._name)
307         self._kill_driver()
308
309     def _timeout(self, test_input):
310         """Compute the appropriate timeout value for a test."""
311         # The DumpRenderTree watchdog uses 2.5x the timeout; we want to be
312         # larger than that. We also add a little more padding if we're
313         # running tests in a separate thread.
314         #
315         # Note that we need to convert the test timeout from a
316         # string value in milliseconds to a float for Python.
317         driver_timeout_sec = 3.0 * float(test_input.timeout) / 1000.0
318         if not self._options.run_singly:
319             return driver_timeout_sec
320
321         thread_padding_sec = 1.0
322         thread_timeout_sec = driver_timeout_sec + thread_padding_sec
323         return thread_timeout_sec
324
325     def _kill_driver(self):
326         # Be careful about how and when we kill the driver; if driver.stop()
327         # raises an exception, this routine may get re-entered via __del__.
328         driver = self._driver
329         self._driver = None
330         if driver:
331             _log.debug("%s killing driver" % self._name)
332             driver.stop()
333
334     def _run_test_with_or_without_timeout(self, test_input, timeout, stop_when_done):
335         if self._options.run_singly:
336             return self._run_test_in_another_thread(test_input, timeout, stop_when_done)
337         return self._run_test_in_this_thread(test_input, stop_when_done)
338
339     def _clean_up_after_test(self, test_input, result):
340         test_name = test_input.test_name
341
342         if result.failures:
343             # Check and kill DumpRenderTree if we need to.
344             if any([f.driver_needs_restart() for f in result.failures]):
345                 self._kill_driver()
346                 # Reset the batch count since the shell just bounced.
347                 self._batch_count = 0
348
349             # Print the error message(s).
350             _log.debug("%s %s failed:" % (self._name, test_name))
351             for f in result.failures:
352                 _log.debug("%s  %s" % (self._name, f.message()))
353         elif result.type == test_expectations.SKIP:
354             _log.debug("%s %s skipped" % (self._name, test_name))
355         else:
356             _log.debug("%s %s passed" % (self._name, test_name))
357
358     def _run_test_in_another_thread(self, test_input, thread_timeout_sec, stop_when_done):
359         """Run a test in a separate thread, enforcing a hard time limit.
360
361         Since we can only detect the termination of a thread, not any internal
362         state or progress, we can only run per-test timeouts when running test
363         files singly.
364
365         Args:
366           test_input: Object containing the test filename and timeout
367           thread_timeout_sec: time to wait before killing the driver process.
368         Returns:
369           A TestResult
370         """
371         worker = self
372
373         driver = self._port.create_driver(self._worker_number, self._options.no_timeout)
374
375         class SingleTestThread(threading.Thread):
376             def __init__(self):
377                 threading.Thread.__init__(self)
378                 self.result = None
379
380             def run(self):
381                 self.result = worker._run_single_test(driver, test_input, stop_when_done)
382
383         thread = SingleTestThread()
384         thread.start()
385         thread.join(thread_timeout_sec)
386         result = thread.result
387         failures = []
388         if thread.isAlive():
389             # If join() returned with the thread still running, the
390             # DumpRenderTree is completely hung and there's nothing
391             # more we can do with it.  We have to kill all the
392             # DumpRenderTrees to free it up. If we're running more than
393             # one DumpRenderTree thread, we'll end up killing the other
394             # DumpRenderTrees too, introducing spurious crashes. We accept
395             # that tradeoff in order to avoid losing the rest of this
396             # thread's results.
397             _log.error('Test thread hung: killing all DumpRenderTrees')
398             failures = [test_failures.FailureTimeout()]
399
400         driver.stop()
401
402         if not result:
403             result = test_results.TestResult(test_input.test_name, failures=failures, test_run_time=0)
404         return result
405
406     def _run_test_in_this_thread(self, test_input, stop_when_done):
407         """Run a single test file using a shared DumpRenderTree process.
408
409         Args:
410           test_input: Object containing the test filename, uri and timeout
411
412         Returns: a TestResult object.
413         """
414         if self._driver and self._driver.has_crashed():
415             self._kill_driver()
416         if not self._driver:
417             self._driver = self._port.create_driver(self._worker_number, self._options.no_timeout)
418         return self._run_single_test(self._driver, test_input, stop_when_done)
419
420     def _run_single_test(self, driver, test_input, stop_when_done):
421         return single_test_runner.run_single_test(self._port, self._options, self._results_directory,
422             self._name, driver, test_input, stop_when_done)
423
424
425 class TestShard(object):
426     """A test shard is a named list of TestInputs."""
427
428     def __init__(self, name, test_inputs):
429         self.name = name
430         self.test_inputs = test_inputs
431         self.needs_servers = test_inputs[0].needs_servers
432
433     def __repr__(self):
434         return "TestShard(name='%s', test_inputs=%s, needs_servers=%s'" % (self.name, self.test_inputs, self.needs_servers)
435
436     def __eq__(self, other):
437         return self.name == other.name and self.test_inputs == other.test_inputs
438
439
440 class Sharder(object):
441     def __init__(self, test_split_fn):
442         self._split = test_split_fn
443
444     def shard_tests(self, test_inputs, num_workers, fully_parallel):
445         """Groups tests into batches.
446         This helps ensure that tests that depend on each other (aka bad tests!)
447         continue to run together as most cross-tests dependencies tend to
448         occur within the same directory.
449         Return:
450             A list of TestShards.
451         """
452
453         # FIXME: Move all of the sharding logic out of manager into its
454         # own class or module. Consider grouping it with the chunking logic
455         # in prepare_lists as well.
456         if num_workers == 1:
457             return [TestShard('all_tests', test_inputs)]
458         elif fully_parallel:
459             return self._shard_every_file(test_inputs)
460         return self._shard_by_directory(test_inputs, num_workers)
461
462     def _shard_every_file(self, test_inputs):
463         """Returns a list of shards, each shard containing a single test file.
464
465         This mode gets maximal parallelism at the cost of much higher flakiness."""
466         shards = []
467         for test_input in test_inputs:
468             # Note that we use a '.' for the shard name; the name doesn't really
469             # matter, and the only other meaningful value would be the filename,
470             # which would be really redundant.
471             shards.append(TestShard('.', [test_input]))
472
473         return shards
474
475     def _shard_by_directory(self, test_inputs, num_workers):
476         """Returns a lists of shards, each shard containing all the files in a directory.
477
478         This is the default mode, and gets as much parallelism as we can while
479         minimizing flakiness caused by inter-test dependencies."""
480         shards = []
481         tests_by_dir = {}
482         # FIXME: Given that the tests are already sorted by directory,
483         # we can probably rewrite this to be clearer and faster.
484         for test_input in test_inputs:
485             directory = self._split(test_input.test_name)[0]
486             tests_by_dir.setdefault(directory, [])
487             tests_by_dir[directory].append(test_input)
488
489         for directory, test_inputs in tests_by_dir.iteritems():
490             shard = TestShard(directory, test_inputs)
491             shards.append(shard)
492
493         # Sort the shards by directory name.
494         shards.sort(key=lambda shard: shard.name)
495
496         return shards