Rename WebKitTools to Tools
[WebKit-https.git] / Tools / Scripts / webkitpy / layout_tests / layout_package / message_broker.py
1 # Copyright (C) 2010 Google Inc. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or without
4 # modification, are permitted provided that the following conditions are
5 # met:
6 #
7 #     * Redistributions of source code must retain the above copyright
8 # notice, this list of conditions and the following disclaimer.
9 #     * Redistributions in binary form must reproduce the above
10 # copyright notice, this list of conditions and the following disclaimer
11 # in the documentation and/or other materials provided with the
12 # distribution.
13 #     * Neither the name of Google Inc. nor the names of its
14 # contributors may be used to endorse or promote products derived from
15 # this software without specific prior written permission.
16 #
17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
29 """Module for handling messages, threads, processes, and concurrency for run-webkit-tests.
30
31 Testing is accomplished by having a manager (TestRunner) gather all of the
32 tests to be run, and sending messages to a pool of workers (TestShellThreads)
33 to run each test. Each worker communicates with one driver (usually
34 DumpRenderTree) to run one test at a time and then compare the output against
35 what we expected to get.
36
37 This modules provides a message broker that connects the manager to the
38 workers: it provides a messaging abstraction and message loops, and
39 handles launching threads and/or processes depending on the
40 requested configuration.
41 """
42
43 import logging
44 import sys
45 import time
46 import traceback
47
48 import dump_render_tree_thread
49
50 _log = logging.getLogger(__name__)
51
52
53 def get(port, options):
54     """Return an instance of a WorkerMessageBroker."""
55     worker_model = options.worker_model
56     if worker_model == 'inline':
57         return InlineBroker(port, options)
58     if worker_model == 'threads':
59         return MultiThreadedBroker(port, options)
60     raise ValueError('unsupported value for --worker-model: %s' % worker_model)
61
62
63 class _WorkerState(object):
64     def __init__(self, name):
65         self.name = name
66         self.thread = None
67
68
69 class WorkerMessageBroker(object):
70     def __init__(self, port, options):
71         self._port = port
72         self._options = options
73         self._num_workers = int(self._options.child_processes)
74
75         # This maps worker names to their _WorkerState values.
76         self._workers = {}
77
78     def _threads(self):
79         return tuple([w.thread for w in self._workers.values()])
80
81     def start_workers(self, test_runner):
82         """Starts up the pool of workers for running the tests.
83
84         Args:
85             test_runner: a handle to the manager/TestRunner object
86         """
87         self._test_runner = test_runner
88         for worker_number in xrange(self._num_workers):
89             worker = _WorkerState('worker-%d' % worker_number)
90             worker.thread = self._start_worker(worker_number, worker.name)
91             self._workers[worker.name] = worker
92         return self._threads()
93
94     def _start_worker(self, worker_number, worker_name):
95         raise NotImplementedError
96
97     def run_message_loop(self):
98         """Loop processing messages until done."""
99         raise NotImplementedError
100
101     def cancel_workers(self):
102         """Cancel/interrupt any workers that are still alive."""
103         pass
104
105     def cleanup(self):
106         """Perform any necessary cleanup on shutdown."""
107         pass
108
109
110 class InlineBroker(WorkerMessageBroker):
111     def _start_worker(self, worker_number, worker_name):
112         # FIXME: Replace with something that isn't a thread.
113         thread = dump_render_tree_thread.TestShellThread(self._port,
114             self._options, worker_number, worker_name,
115             self._test_runner._current_filename_queue,
116             self._test_runner._result_queue)
117         # Note: Don't start() the thread! If we did, it would actually
118         # create another thread and start executing it, and we'd no longer
119         # be single-threaded.
120         return thread
121
122     def run_message_loop(self):
123         thread = self._threads()[0]
124         thread.run_in_main_thread(self._test_runner,
125                                   self._test_runner._current_result_summary)
126         self._test_runner.update()
127
128
129 class MultiThreadedBroker(WorkerMessageBroker):
130     def _start_worker(self, worker_number, worker_name):
131         thread = dump_render_tree_thread.TestShellThread(self._port,
132             self._options, worker_number, worker_name,
133             self._test_runner._current_filename_queue,
134             self._test_runner._result_queue)
135         thread.start()
136         return thread
137
138     def run_message_loop(self):
139         threads = self._threads()
140
141         # Loop through all the threads waiting for them to finish.
142         some_thread_is_alive = True
143         while some_thread_is_alive:
144             some_thread_is_alive = False
145             t = time.time()
146             for thread in threads:
147                 exception_info = thread.exception_info()
148                 if exception_info is not None:
149                     # Re-raise the thread's exception here to make it
150                     # clear that testing was aborted. Otherwise,
151                     # the tests that did not run would be assumed
152                     # to have passed.
153                     raise exception_info[0], exception_info[1], exception_info[2]
154
155                 if thread.isAlive():
156                     some_thread_is_alive = True
157                     next_timeout = thread.next_timeout()
158                     if next_timeout and t > next_timeout:
159                         log_wedged_worker(thread.getName(), thread.id())
160                         thread.clear_next_timeout()
161
162             self._test_runner.update()
163
164             if some_thread_is_alive:
165                 time.sleep(0.01)
166
167     def cancel_workers(self):
168         threads = self._threads()
169         for thread in threads:
170             thread.cancel()
171
172
173 def log_wedged_worker(name, id):
174     """Log information about the given worker state."""
175     stack = _find_thread_stack(id)
176     assert(stack is not None)
177     _log.error("")
178     _log.error("%s (tid %d) is wedged" % (name, id))
179     _log_stack(stack)
180     _log.error("")
181
182
183 def _find_thread_stack(id):
184     """Returns a stack object that can be used to dump a stack trace for
185     the given thread id (or None if the id is not found)."""
186     for thread_id, stack in sys._current_frames().items():
187         if thread_id == id:
188             return stack
189     return None
190
191
192 def _log_stack(stack):
193     """Log a stack trace to log.error()."""
194     for filename, lineno, name, line in traceback.extract_stack(stack):
195         _log.error('File: "%s", line %d, in %s' % (filename, lineno, name))
196         if line:
197             _log.error('  %s' % line.strip())