tests_run0.txt gets clobbered when re-running failing tests
[WebKit-https.git] / Tools / Scripts / webkitpy / layout_tests / controllers / manager_worker_broker.py
1 # Copyright (C) 2011 Google Inc. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or without
4 # modification, are permitted provided that the following conditions are
5 # met:
6 #
7 #     * Redistributions of source code must retain the above copyright
8 # notice, this list of conditions and the following disclaimer.
9 #     * Redistributions in binary form must reproduce the above
10 # copyright notice, this list of conditions and the following disclaimer
11 # in the documentation and/or other materials provided with the
12 # distribution.
13 #     * Neither the name of Google Inc. nor the names of its
14 # contributors may be used to endorse or promote products derived from
15 # this software without specific prior written permission.
16 #
17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
29 """Module for handling messages and concurrency for run-webkit-tests.
30
31 This module implements a message broker that connects the manager to the
32 workers: it provides a messaging abstraction and message loops (building on
33 top of message_broker), and handles starting workers by launching processes.
34
35 There are a lot of classes and objects involved in a fully connected system.
36 They interact more or less like:
37
38   Manager  -->  _InlineManager ---> _InlineWorker <-> Worker
39      ^                    \               /              ^
40      |                     v             v               |
41      \--------------------  MessageBroker   -------------/
42 """
43
44 import logging
45 import optparse
46 import Queue
47 import sys
48
49 # Handle Python < 2.6 where multiprocessing isn't available.
50 try:
51     import multiprocessing
52 except ImportError:
53     multiprocessing = None
54
55 from webkitpy.common.host import Host  # FIXME: This should not be needed!
56 from webkitpy.layout_tests.controllers import message_broker
57 from webkitpy.layout_tests.views import printing
58
59
60 _log = logging.getLogger(__name__)
61
62 #
63 # Topic names for Manager <-> Worker messaging
64 #
65 MANAGER_TOPIC = 'managers'
66 ANY_WORKER_TOPIC = 'workers'
67
68
69 def runtime_options():
70     """Return a list of optparse.Option objects for any runtime values used
71     by this module."""
72     options = [
73         optparse.make_option("--worker-model", action="store",
74             help=("controls worker model. Valid values are "
75             "'inline' and 'processes'.")),
76     ]
77     return options
78
79
80 def get(port, options, client, worker_class):
81     """Return a connection to a manager/worker message_broker
82
83     Args:
84         port - handle to layout_tests/port object for port-specific stuff
85         options - optparse argument for command-line options
86         client - message_broker.BrokerClient implementation to dispatch
87             replies to.
88         worker_class - type of workers to create. This class must implement
89             the methods in AbstractWorker.
90     Returns:
91         A handle to an object that will talk to a message broker configured
92         for the normal manager/worker communication.
93     """
94     worker_model = options.worker_model
95     if worker_model == 'inline':
96         queue_class = Queue.Queue
97         manager_class = _InlineManager
98     elif worker_model == 'processes' and multiprocessing:
99         queue_class = multiprocessing.Queue
100         manager_class = _MultiProcessManager
101     else:
102         raise ValueError("unsupported value for --worker-model: %s" % worker_model)
103
104     broker = message_broker.Broker(options, queue_class)
105     return manager_class(broker, port, options, client, worker_class)
106
107
108 class AbstractWorker(message_broker.BrokerClient):
109     def __init__(self, worker_connection, worker_number, results_directory, options):
110         """The constructor should be used to do any simple initialization
111         necessary, but should not do anything that creates data structures
112         that cannot be Pickled or sent across processes (like opening
113         files or sockets). Complex initialization should be done at the
114         start of the run() call.
115
116         Args:
117             worker_connection - handle to the BrokerConnection object creating
118                 the worker and that can be used for messaging.
119             worker_number - identifier for this particular worker
120             options - command-line argument object from optparse"""
121         message_broker.BrokerClient.__init__(self)
122         self._worker_connection = worker_connection
123         self._options = options
124         self._worker_number = worker_number
125         self._name = 'worker/%d' % worker_number
126         self._results_directory = results_directory
127
128     def run(self, port):
129         """Callback for the worker to start executing. Typically does any
130         remaining initialization and then calls broker_connection.run_message_loop()."""
131         raise NotImplementedError
132
133     def cancel(self):
134         """Called when possible to indicate to the worker to stop processing
135         messages and shut down. Note that workers may be stopped without this
136         method being called, so clients should not rely solely on this."""
137         raise NotImplementedError
138
139
140 class _ManagerConnection(message_broker.BrokerConnection):
141     def __init__(self, broker, options, client, worker_class):
142         """Base initialization for all Manager objects.
143
144         Args:
145             broker: handle to the message_broker object
146             options: command line options object
147             client: callback object (the caller)
148             worker_class: class object to use to create workers.
149         """
150         message_broker.BrokerConnection.__init__(self, broker, client,
151             MANAGER_TOPIC, ANY_WORKER_TOPIC)
152         self._options = options
153         self._worker_class = worker_class
154
155     def start_worker(self, worker_number, results_directory):
156         raise NotImplementedError
157
158
159 class _InlineManager(_ManagerConnection):
160     def __init__(self, broker, port, options, client, worker_class):
161         _ManagerConnection.__init__(self, broker, options, client, worker_class)
162         self._port = port
163         self._inline_worker = None
164
165     def start_worker(self, worker_number, results_directory):
166         self._inline_worker = _InlineWorkerConnection(self._broker, self._port,
167             self._client, self._worker_class, worker_number, results_directory)
168         return self._inline_worker
169
170     def run_message_loop(self, delay_secs=None):
171         # Note that delay_secs is ignored in this case since we can't easily
172         # implement it.
173         self._inline_worker.run()
174         self._broker.run_all_pending(MANAGER_TOPIC, self._client)
175
176
177 class _MultiProcessManager(_ManagerConnection):
178     def __init__(self, broker, port, options, client, worker_class):
179         # Note that this class does not keep a handle to the actual port
180         # object, because it isn't Picklable. Instead it keeps the port
181         # name and recreates the port in the child process from the name
182         # and options.
183         _ManagerConnection.__init__(self, broker, options, client, worker_class)
184         self._platform_name = port.real_name()
185
186     def start_worker(self, worker_number, results_directory):
187         worker_connection = _MultiProcessWorkerConnection(self._broker, self._platform_name,
188             self._worker_class, worker_number, results_directory, self._options)
189         worker_connection.start()
190         return worker_connection
191
192
193 class _WorkerConnection(message_broker.BrokerConnection):
194     def __init__(self, broker, worker_class, worker_number, results_directory, options):
195         self._client = worker_class(self, worker_number, results_directory, options)
196         self.name = self._client.name()
197         message_broker.BrokerConnection.__init__(self, broker, self._client,
198                                                  ANY_WORKER_TOPIC, MANAGER_TOPIC)
199
200     def cancel(self):
201         raise NotImplementedError
202
203     def is_alive(self):
204         raise NotImplementedError
205
206     def join(self, timeout):
207         raise NotImplementedError
208
209     def yield_to_broker(self):
210         pass
211
212
213 class _InlineWorkerConnection(_WorkerConnection):
214     def __init__(self, broker, port, manager_client, worker_class, worker_number, results_directory):
215         _WorkerConnection.__init__(self, broker, worker_class, worker_number, results_directory, port.options)
216         self._alive = False
217         self._port = port
218         self._manager_client = manager_client
219
220     def cancel(self):
221         self._client.cancel()
222
223     def is_alive(self):
224         return self._alive
225
226     def join(self, timeout):
227         assert not self._alive
228
229     def run(self):
230         self._alive = True
231         self._client.run(self._port)
232         self._alive = False
233
234     def yield_to_broker(self):
235         self._broker.run_all_pending(MANAGER_TOPIC, self._manager_client)
236
237     def raise_exception(self, exc_info):
238         # Since the worker is in the same process as the manager, we can
239         # raise the exception directly, rather than having to send it through
240         # the queue. This allows us to preserve the traceback.
241         raise exc_info[0], exc_info[1], exc_info[2]
242
243
244 if multiprocessing:
245
246     class _Process(multiprocessing.Process):
247         def __init__(self, worker_connection, platform_name, options, client):
248             multiprocessing.Process.__init__(self)
249             self._worker_connection = worker_connection
250             self._platform_name = platform_name
251             self._options = options
252             self._client = client
253
254         def run(self):
255             options = self._options
256             # FIXME: This should get the Host from the owner of this object
257             # so this function can be properly mocked!
258             host = Host()
259             host._initialize_scm()
260             port_obj = host.port_factory.get(self._platform_name, options)
261
262             # The unix multiprocessing implementation clones the
263             # log handler configuration into the child processes,
264             # but the win implementation doesn't.
265             configure_logging = (sys.platform == 'win32')
266
267             # FIXME: this won't work if the calling process is logging
268             # somewhere other than sys.stderr and sys.stdout, but I'm not sure
269             # if this will be an issue in practice.
270             printer = printing.Printer(port_obj, options, sys.stderr, sys.stdout, configure_logging)
271             self._client.run(port_obj)
272             printer.cleanup()
273
274
275 class _MultiProcessWorkerConnection(_WorkerConnection):
276     def __init__(self, broker, platform_name, worker_class, worker_number, results_directory, options):
277         _WorkerConnection.__init__(self, broker, worker_class, worker_number, results_directory, options)
278         self._proc = _Process(self, platform_name, options, self._client)
279
280     def cancel(self):
281         return self._proc.terminate()
282
283     def is_alive(self):
284         return self._proc.is_alive()
285
286     def join(self, timeout):
287         return self._proc.join(timeout)
288
289     def start(self):
290         self._proc.start()