2010-12-02 Dirk Pranke <dpranke@chromium.org>
[WebKit.git] / WebKitTools / Scripts / webkitpy / layout_tests / layout_package / message_broker.py
1 # Copyright (C) 2010 Google Inc. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or without
4 # modification, are permitted provided that the following conditions are
5 # met:
6 #
7 #     * Redistributions of source code must retain the above copyright
8 # notice, this list of conditions and the following disclaimer.
9 #     * Redistributions in binary form must reproduce the above
10 # copyright notice, this list of conditions and the following disclaimer
11 # in the documentation and/or other materials provided with the
12 # distribution.
13 #     * Neither the name of Google Inc. nor the names of its
14 # contributors may be used to endorse or promote products derived from
15 # this software without specific prior written permission.
16 #
17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
29 """Module for handling messages and concurrency for run-webkit-tests.
30
31 Testing is accomplished by having a manager (TestRunner) gather all of the
32 tests to be run, and sending messages to a pool of workers (TestShellThreads)
33 to run each test. Each worker communicates with one driver (usually
34 DumpRenderTree) to run one test at a time and then compare the output against
35 what we expected to get.
36
37 This modules provides a message broker that connects the manager to the
38 workers: it provides a messaging abstraction and message loops, and
39 handles launching threads and/or processes depending on the
40 requested configuration.
41 """
42
43 import logging
44 import sys
45 import time
46 import traceback
47
48 import dump_render_tree_thread
49
50 _log = logging.getLogger(__name__)
51
52
53 def get(port, options):
54     """Return an instance of a WorkerMessageBroker."""
55     worker_model = options.worker_model
56     if worker_model == 'inline':
57         return _InlineBroker(port, options)
58     if worker_model == 'threads':
59         return _MultiThreadedBroker(port, options)
60     raise ValueError('unsupported value for --worker-model: %s' % worker_model)
61
62
63 class _WorkerMessageBroker(object):
64     def __init__(self, port, options):
65         self._port = port
66         self._options = options
67
68         # This maps worker_names to TestShellThreads
69         self._threads = {}
70
71     def start_worker(self, test_runner, worker_number):
72         """Start a worker with the given index number.
73
74         Returns the actual TestShellThread object."""
75         # FIXME: Remove dependencies on test_runner.
76         # FIXME: Replace with something that isn't a thread, and return
77         # the name of the worker, not the thread itself. We need to return
78         # the thread itself for now to allow TestRunner to access the object
79         # directly to read shared state.
80         thread = dump_render_tree_thread.TestShellThread(self._port,
81             self._options, worker_number, test_runner._current_filename_queue,
82             test_runner._result_queue)
83         self._threads[thread.name()] = thread
84         # Note: Don't start() the thread! If we did, it would actually
85         # create another thread and start executing it, and we'd no longer
86         # be single-threaded.
87         return thread
88
89     def cancel_worker(self, worker_name):
90         """Attempt to cancel a worker (best-effort). The worker may still be
91         running after this call returns."""
92         self._threads[worker_name].cancel()
93
94     def log_wedged_worker(self, worker_name):
95         """Log information about the given worker's state."""
96         raise NotImplementedError
97
98     def run_message_loop(self, test_runner):
99         """Loop processing messages until done."""
100         # FIXME: eventually we'll need a message loop that the workers
101         # can also call.
102         raise NotImplementedError
103
104
105 class _InlineBroker(_WorkerMessageBroker):
106     def run_message_loop(self, test_runner):
107         thread = self._threads.values()[0]
108         thread.run_in_main_thread(test_runner,
109                                   test_runner._current_result_summary)
110
111     def log_wedged_worker(self, worker_name):
112         raise AssertionError('_InlineBroker.log_wedged_worker() called')
113
114
115 class _MultiThreadedBroker(_WorkerMessageBroker):
116     def start_worker(self, test_runner, worker_number):
117         thread = _WorkerMessageBroker.start_worker(self, test_runner,
118                                                    worker_number)
119         # Unlike the base implementation, here we actually want to start
120         # the thread.
121         thread.start()
122         return thread
123
124     def run_message_loop(self, test_runner):
125         # FIXME: Remove the dependencies on test_runner. Checking on workers
126         # should be done via a timer firing.
127         test_runner._check_on_workers()
128
129     def log_wedged_worker(self, worker_name):
130         thread = self._threads[worker_name]
131         stack = self._find_thread_stack(thread.id())
132         assert(stack is not None)
133         _log.error("")
134         _log.error("%s (tid %d) is wedged" % (worker_name, thread.id()))
135         self._log_stack(stack)
136         _log.error("")
137
138     def _find_thread_stack(self, id):
139         """Returns a stack object that can be used to dump a stack trace for
140         the given thread id (or None if the id is not found)."""
141         for thread_id, stack in sys._current_frames().items():
142             if thread_id == id:
143                 return stack
144         return None
145
146     def _log_stack(self, stack):
147         """Log a stack trace to log.error()."""
148         for filename, lineno, name, line in traceback.extract_stack(stack):
149             _log.error('File: "%s", line %d, in %s' % (filename, lineno, name))
150             if line:
151                 _log.error('  %s' % line.strip())