2011-02-11 Dirk Pranke <dpranke@chromium.org>
[WebKit-https.git] / Tools / Scripts / webkitpy / layout_tests / layout_package / manager_worker_broker.py
1 # Copyright (C) 2011 Google Inc. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or without
4 # modification, are permitted provided that the following conditions are
5 # met:
6 #
7 #     * Redistributions of source code must retain the above copyright
8 # notice, this list of conditions and the following disclaimer.
9 #     * Redistributions in binary form must reproduce the above
10 # copyright notice, this list of conditions and the following disclaimer
11 # in the documentation and/or other materials provided with the
12 # distribution.
13 #     * Neither the name of Google Inc. nor the names of its
14 # contributors may be used to endorse or promote products derived from
15 # this software without specific prior written permission.
16 #
17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
29 """Module for handling messages and concurrency for run-webkit-tests.
30
31 This module implements a message broker that connects the manager
32 (TestRunner2) to the workers: it provides a messaging abstraction and
33 message loops (building on top of message_broker2), and handles starting
34 workers by launching threads and/or processes depending on the
35 requested configuration.
36
37 There are a lot of classes and objects involved in a fully connected system.
38 They interact more or less like:
39
40 TestRunner2  --> _InlineManager ---> _InlineWorker <-> Worker
41      ^                    \               /              ^
42      |                     v             v               |
43      \--------------------  MessageBroker   -------------/
44 """
45
46 import logging
47 import optparse
48 import Queue
49 import thread
50 import threading
51 import time
52
53
54 # Handle Python < 2.6 where multiprocessing isn't available.
55 #
56 # _Multiprocessing_Process is needed so that _MultiProcessWorker
57 # can be defined with or without multiprocessing.
58 try:
59     import multiprocessing
60     _Multiprocessing_Process = multiprocessing.Process
61 except ImportError:
62     multiprocessing = None
63     _Multiprocessing_Process = threading.Thread
64
65
66 from webkitpy.layout_tests import port
67 from webkitpy.layout_tests.layout_package import message_broker2
68
69
70 _log = logging.getLogger(__name__)
71
72 #
73 # Topic names for Manager <-> Worker messaging
74 #
75 MANAGER_TOPIC = 'managers'
76 ANY_WORKER_TOPIC = 'workers'
77
78
79 def runtime_options():
80     """Return a list of optparse.Option objects for any runtime values used
81     by this module."""
82     options = [
83         optparse.make_option("--worker-model", action="store",
84             help=("controls worker model. Valid values are "
85             "'inline', 'threads', and 'processes'.")),
86     ]
87     return options
88
89
90 def get(port, options, client, worker_class):
91     """Return a connection to a manager/worker message_broker
92
93     Args:
94         port - handle to layout_tests/port object for port-specific stuff
95         options - optparse argument for command-line options
96         client - message_broker2.BrokerClient implementation to dispatch
97             replies to.
98         worker_class - type of workers to create. This class must implement
99             the methods in AbstractWorker.
100     Returns:
101         A handle to an object that will talk to a message broker configured
102         for the normal manager/worker communication.
103     """
104     worker_model = options.worker_model
105     if worker_model == 'inline':
106         queue_class = Queue.Queue
107         manager_class = _InlineManager
108     elif worker_model == 'threads':
109         queue_class = Queue.Queue
110         manager_class = _ThreadedManager
111     elif worker_model == 'processes' and multiprocessing:
112         queue_class = multiprocessing.Queue
113         manager_class = _MultiProcessManager
114     else:
115         raise ValueError("unsupported value for --worker-model: %s" %
116                          worker_model)
117
118     broker = message_broker2.Broker(options, queue_class)
119     return manager_class(broker, port, options, client, worker_class)
120
121
122 class AbstractWorker(message_broker2.BrokerClient):
123     def __init__(self, broker_connection, worker_number, options):
124         """The constructor should be used to do any simple initialization
125         necessary, but should not do anything that creates data structures
126         that cannot be Pickled or sent across processes (like opening
127         files or sockets). Complex initialization should be done at the
128         start of the run() call.
129
130         Args:
131             broker_connection - handle to the BrokerConnection object creating
132                 the worker and that can be used for messaging.
133             worker_number - identifier for this particular worker
134             options - command-line argument object from optparse"""
135
136         raise NotImplementedError
137
138     def run(self, port):
139         """Callback for the worker to start executing. Typically does any
140         remaining initialization and then calls broker_connection.run_message_loop()."""
141         raise NotImplementedError
142
143     def cancel(self):
144         """Called when possible to indicate to the worker to stop processing
145         messages and shut down. Note that workers may be stopped without this
146         method being called, so clients should not rely solely on this."""
147         raise NotImplementedError
148
149
150 class _ManagerConnection(message_broker2.BrokerConnection):
151     def __init__(self, broker, options, client, worker_class):
152         """Base initialization for all Manager objects.
153
154         Args:
155             broker: handle to the message_broker2 object
156             options: command line options object
157             client: callback object (the caller)
158             worker_class: class object to use to create workers.
159         """
160         message_broker2.BrokerConnection.__init__(self, broker, client,
161             MANAGER_TOPIC, ANY_WORKER_TOPIC)
162         self._options = options
163         self._worker_class = worker_class
164
165     def start_worker(self, worker_number):
166         raise NotImplementedError
167
168
169 class _InlineManager(_ManagerConnection):
170     def __init__(self, broker, port, options, client, worker_class):
171         _ManagerConnection.__init__(self, broker, options, client, worker_class)
172         self._port = port
173         self._inline_worker = None
174
175     def start_worker(self, worker_number):
176         self._inline_worker = _InlineWorkerConnection(self._broker, self._port,
177             self._client, self._worker_class, worker_number)
178         return self._inline_worker
179
180     def run_message_loop(self, delay_secs=None):
181         # Note that delay_secs is ignored in this case since we can't easily
182         # implement it.
183         self._inline_worker.run()
184         self._broker.run_all_pending(MANAGER_TOPIC, self._client)
185
186
187 class _ThreadedManager(_ManagerConnection):
188     def __init__(self, broker, port, options, client, worker_class):
189         _ManagerConnection.__init__(self, broker, options, client, worker_class)
190         self._port = port
191
192     def start_worker(self, worker_number):
193         worker_connection = _ThreadedWorkerConnection(self._broker, self._port,
194             self._worker_class, worker_number)
195         worker_connection.start()
196         return worker_connection
197
198
199 class _MultiProcessManager(_ManagerConnection):
200     def __init__(self, broker, port, options, client, worker_class):
201         # Note that this class does not keep a handle to the actual port
202         # object, because it isn't Picklable. Instead it keeps the port
203         # name and recreates the port in the child process from the name
204         # and options.
205         _ManagerConnection.__init__(self, broker, options, client, worker_class)
206         self._platform_name = port.real_name()
207
208     def start_worker(self, worker_number):
209         worker_connection = _MultiProcessWorkerConnection(self._broker, self._platform_name,
210             self._worker_class, worker_number, self._options)
211         worker_connection.start()
212         return worker_connection
213
214
215 class _WorkerConnection(message_broker2.BrokerConnection):
216     def __init__(self, broker, worker_class, worker_number, options):
217         self._client = worker_class(self, worker_number, options)
218         self.name = self._client.name()
219         message_broker2.BrokerConnection.__init__(self, broker, self._client,
220                                                   ANY_WORKER_TOPIC, MANAGER_TOPIC)
221
222     def yield_to_broker(self):
223         pass
224
225
226 class _InlineWorkerConnection(_WorkerConnection):
227     def __init__(self, broker, port, manager_client, worker_class, worker_number):
228         _WorkerConnection.__init__(self, broker, worker_class, worker_number, port._options)
229         self._port = port
230         self._manager_client = manager_client
231
232     def run(self):
233         self._client.run(self._port)
234
235     def yield_to_broker(self):
236         self._broker.run_all_pending(MANAGER_TOPIC, self._manager_client)
237
238
239 class _Thread(threading.Thread):
240     def __init__(self, worker_connection, port, client):
241         threading.Thread.__init__(self)
242         self._worker_connection = worker_connection
243         self._port = port
244         self._client = client
245
246     def run(self):
247         # FIXME: We can remove this once everyone is on 2.6.
248         if not hasattr(self, 'ident'):
249             self.ident = thread.get_ident()
250         self._client.run(self._port)
251
252
253 class _ThreadedWorkerConnection(_WorkerConnection):
254     def __init__(self, broker, port, worker_class, worker_number):
255         _WorkerConnection.__init__(self, broker, worker_class, worker_number, port._options)
256         self._thread = _Thread(self, port, self._client)
257
258     def start(self):
259         self._thread.start()
260
261
262 class _Process(_Multiprocessing_Process):
263     def __init__(self, worker_connection, platform_name, options, client):
264         _Multiprocessing_Process.__init__(self)
265         self._worker_connection = worker_connection
266         self._platform_name = platform_name
267         self._options = options
268         self._client = client
269
270     def run(self):
271         logging.basicConfig()
272         port_obj = port.get(self._platform_name, self._options)
273         self._client.run(port_obj)
274
275
276 class _MultiProcessWorkerConnection(_WorkerConnection):
277     def __init__(self, broker, platform_name, worker_class, worker_number, options):
278         _WorkerConnection.__init__(self, broker, worker_class, worker_number, options)
279         self._proc = _Process(self, platform_name, options, self._client)
280
281     def start(self):
282         self._proc.start()