clean up NRWT logging/metering, be less verbose
[WebKit-https.git] / Tools / Scripts / webkitpy / layout_tests / controllers / manager_worker_broker.py
1 # Copyright (C) 2011 Google Inc. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or without
4 # modification, are permitted provided that the following conditions are
5 # met:
6 #
7 #     * Redistributions of source code must retain the above copyright
8 # notice, this list of conditions and the following disclaimer.
9 #     * Redistributions in binary form must reproduce the above
10 # copyright notice, this list of conditions and the following disclaimer
11 # in the documentation and/or other materials provided with the
12 # distribution.
13 #     * Neither the name of Google Inc. nor the names of its
14 # contributors may be used to endorse or promote products derived from
15 # this software without specific prior written permission.
16 #
17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
29 """Module for handling messages and concurrency for run-webkit-tests.
30
31 This module implements a message broker that connects the manager to the
32 workers: it provides a messaging abstraction and message loops (building on
33 top of message_broker), and handles starting workers by launching processes.
34
35 There are a lot of classes and objects involved in a fully connected system.
36 They interact more or less like:
37
38   Manager  -->  _InlineManager ---> _InlineWorker <-> Worker
39      ^                    \               /              ^
40      |                     v             v               |
41      \-----------------------  Broker   ----------------/
42
43 The broker simply distributes messages onto topics (named queues); the actual
44 queues themselves are provided by the caller, as the queue's implementation
45 requirements varies vary depending on the desired concurrency model
46 (none/threads/processes).
47
48 In order for shared-nothing messaging between processing to be possible,
49 Messages must be picklable.
50
51 The module defines one interface and two classes. Callers of this package
52 must implement the BrokerClient interface, and most callers will create
53 _BrokerConnections as well as Brokers.
54
55 The classes relate to each other as:
56
57     BrokerClient   ------>    _BrokerConnection
58          ^                         |
59          |                         v
60          \----------------      _Broker
61
62 (The BrokerClient never calls broker directly after it is created, only
63 _BrokerConnection.  _BrokerConnection passes a reference to BrokerClient to
64 _Broker, and _Broker only invokes that reference, never talking directly to
65 BrokerConnection).
66 """
67
68 import cPickle
69 import logging
70 import multiprocessing
71 import optparse
72 import Queue
73 import sys
74 import traceback
75
76
77 from webkitpy.common.system import stack_utils
78
79
80 _log = logging.getLogger(__name__)
81
82
83 #
84 # Topic names for Manager <-> Worker messaging
85 #
86 MANAGER_TOPIC = 'managers'
87 ANY_WORKER_TOPIC = 'workers'
88
89
90 def get(worker_model, client, worker_class):
91     """Return a connection to a manager/worker message_broker
92
93     Args:
94         worker_model - concurrency model to use (inline/processes)
95         client - BrokerClient implementation to dispatch
96             replies to.
97         worker_class - type of workers to create. This class should override
98             the methods in AbstractWorker.
99     Returns:
100         A handle to an object that will talk to a message broker configured
101         for the normal manager/worker communication."""
102     if worker_model == 'inline':
103         queue_class = Queue.Queue
104         manager_class = _InlineManager
105     elif worker_model == 'processes':
106         queue_class = multiprocessing.Queue
107         manager_class = _MultiProcessManager
108     else:
109         raise ValueError("unsupported value for --worker-model: %s" % worker_model)
110
111     broker = _Broker(queue_class)
112     return manager_class(broker, client, worker_class)
113
114
115 class BrokerClient(object):
116     """Abstract base class / interface that all message broker clients must
117     implement. In addition to the methods below, by convention clients
118     implement routines of the signature type
119
120         handle_MESSAGE_NAME(self, src, ...):
121
122     where MESSAGE_NAME matches the string passed to post_message(), and
123     src indicates the name of the sender. If the message contains values in
124     the message body, those will be provided as optparams."""
125
126     def is_done(self):
127         """Called from inside run_message_loop() to indicate whether to exit."""
128         raise NotImplementedError
129
130     def name(self):
131         """Return a name that identifies the client."""
132         raise NotImplementedError
133
134
135 class _Broker(object):
136     """Brokers provide the basic model of a set of topics. Clients can post a
137     message to any topic using post_message(), and can process messages on one
138     topic at a time using run_message_loop()."""
139
140     def __init__(self, queue_maker):
141         """Args:
142             queue_maker: a factory method that returns objects implementing a
143                 Queue interface (put()/get()).
144         """
145         self._queue_maker = queue_maker
146         self._topics = {}
147
148     def add_topic(self, topic_name):
149         if topic_name not in self._topics:
150             self._topics[topic_name] = self._queue_maker()
151
152     def _get_queue_for_topic(self, topic_name):
153         return self._topics[topic_name]
154
155     def post_message(self, client, topic_name, message_name, *message_args):
156         """Post a message to the appropriate topic name.
157
158         Messages have a name and a tuple of optional arguments. Both must be picklable."""
159         message = _Message(client.name(), topic_name, message_name, message_args)
160         queue = self._get_queue_for_topic(topic_name)
161         queue.put(_Message.dumps(message))
162
163     def run_message_loop(self, topic_name, client, delay_secs=None):
164         """Loop processing messages until client.is_done() or delay passes.
165
166         To run indefinitely, set delay_secs to None."""
167         assert delay_secs is None or delay_secs > 0
168         self._run_loop(topic_name, client, block=True, delay_secs=delay_secs)
169
170     def run_all_pending(self, topic_name, client):
171         """Process messages until client.is_done() or caller would block."""
172         self._run_loop(topic_name, client, block=False, delay_secs=None)
173
174     def _run_loop(self, topic_name, client, block, delay_secs):
175         queue = self._get_queue_for_topic(topic_name)
176         while not client.is_done():
177             try:
178                 s = queue.get(block, delay_secs)
179             except Queue.Empty:
180                 return
181             msg = _Message.loads(s)
182             self._dispatch_message(msg, client)
183
184     def _dispatch_message(self, message, client):
185         if not hasattr(client, 'handle_' + message.name):
186             raise ValueError(
187                "%s: received message '%s' it couldn't handle" %
188                (client.name(), message.name))
189         optargs = message.args
190         message_handler = getattr(client, 'handle_' + message.name)
191         message_handler(message.src, *optargs)
192
193
194 class _Message(object):
195     @staticmethod
196     def loads(string_value):
197         obj = cPickle.loads(string_value)
198         assert(isinstance(obj, _Message))
199         return obj
200
201     def __init__(self, src, topic_name, message_name, message_args):
202         self.src = src
203         self.topic_name = topic_name
204         self.name = message_name
205         self.args = message_args
206
207     def dumps(self):
208         return cPickle.dumps(self)
209
210     def __repr__(self):
211         return ("_Message(from='%s', topic_name='%s', message_name='%s')" %
212                 (self.src, self.topic_name, self.name))
213
214
215 class _BrokerConnection(object):
216     """_BrokerConnection provides a connection-oriented facade on top of a
217     Broker, so that callers don't have to repeatedly pass the same topic
218     names over and over."""
219
220     def __init__(self, broker, client, run_topic, post_topic):
221         """Create a _BrokerConnection on top of a _Broker. Note that the _Broker
222         is passed in rather than created so that a single _Broker can be used
223         by multiple _BrokerConnections."""
224         self._broker = broker
225         self._client = client
226         self._post_topic = post_topic
227         self._run_topic = run_topic
228         broker.add_topic(run_topic)
229         broker.add_topic(post_topic)
230
231     def run_message_loop(self, delay_secs=None):
232         self._broker.run_message_loop(self._run_topic, self._client, delay_secs)
233
234     def post_message(self, message_name, *message_args):
235         self._broker.post_message(self._client, self._post_topic,
236                                   message_name, *message_args)
237
238     def raise_exception(self, exc_info):
239         # Since tracebacks aren't picklable, send the extracted stack instead.
240         exception_type, exception_value, exception_traceback = sys.exc_info()
241         stack_utils.log_traceback(_log.debug, exception_traceback)
242         stack = traceback.extract_tb(exception_traceback)
243         self._broker.post_message(self._client, self._post_topic, 'exception', exception_type, exception_value, stack)
244
245
246 class AbstractWorker(BrokerClient):
247     def __init__(self, worker_connection, worker_arguments=None):
248         """The constructor should be used to do any simple initialization
249         necessary, but should not do anything that creates data structures
250         that cannot be Pickled or sent across processes (like opening
251         files or sockets). Complex initialization should be done at the
252         start of the run() call.
253
254         Args:
255             worker_connection - handle to the _BrokerConnection object creating
256                 the worker and that can be used for messaging.
257             worker_arguments - (optional, Picklable) object passed to the worker from the manager"""
258         BrokerClient.__init__(self)
259         self._worker_connection = worker_connection
260         self._name = 'worker'
261         self._done = False
262         self._canceled = False
263
264     def name(self):
265         return self._name
266
267     def is_done(self):
268         return self._done or self._canceled
269
270     def stop_handling_messages(self):
271         self._done = True
272
273     def run(self):
274         """Callback for the worker to start executing. Typically does any
275         remaining initialization and then calls broker_connection.run_message_loop()."""
276         exception_msg = ""
277
278         try:
279             self._worker_connection.run_message_loop()
280             if not self.is_done():
281                 raise AssertionError("%s: ran out of messages in worker queue."
282                                      % self._name)
283         except KeyboardInterrupt:
284             exception_msg = ", interrupted"
285             self._worker_connection.raise_exception(sys.exc_info())
286         except:
287             exception_msg = ", exception raised"
288             self._worker_connection.raise_exception(sys.exc_info())
289         finally:
290             _log.debug("%s done with message loop%s" % (self._name, exception_msg))
291
292     def cancel(self):
293         """Called when possible to indicate to the worker to stop processing
294         messages and shut down. Note that workers may be stopped without this
295         method being called, so clients should not rely solely on this."""
296         self._canceled = True
297
298
299 class _ManagerConnection(_BrokerConnection):
300     def __init__(self, broker, client, worker_class):
301         """Base initialization for all Manager objects.
302
303         Args:
304             broker: handle to the message_broker object
305             client: callback object (the caller)
306             worker_class: class object to use to create workers.
307         """
308         _BrokerConnection.__init__(self, broker, client, MANAGER_TOPIC, ANY_WORKER_TOPIC)
309         self._worker_class = worker_class
310
311     def start_worker(self, worker_arguments=None):
312         """Starts a new worker.
313
314         Args:
315             worker_arguments - an optional Picklable object that is passed to the worker constructor
316         """
317         raise NotImplementedError
318
319
320 class _InlineManager(_ManagerConnection):
321     def __init__(self, broker, client, worker_class):
322         _ManagerConnection.__init__(self, broker, client, worker_class)
323         self._inline_worker = None
324
325     def start_worker(self, worker_arguments=None):
326         self._inline_worker = _InlineWorkerConnection(self._broker,
327             self._client, self._worker_class, worker_arguments)
328         return self._inline_worker
329
330     def set_inline_arguments(self, arguments=None):
331         # Note that this method only exists here, and not on all
332         # ManagerConnections; calling this method on a MultiProcessManager
333         # will deliberately result in a runtime error.
334         self._inline_worker.set_inline_arguments(arguments)
335
336     def run_message_loop(self, delay_secs=None):
337         # Note that delay_secs is ignored in this case since we can't easily
338         # implement it.
339         self._inline_worker.run()
340         self._broker.run_all_pending(MANAGER_TOPIC, self._client)
341
342
343 class _MultiProcessManager(_ManagerConnection):
344     def start_worker(self, worker_arguments=None):
345         worker_connection = _MultiProcessWorkerConnection(self._broker,
346             self._worker_class, worker_arguments)
347         worker_connection.start()
348         return worker_connection
349
350
351 class _WorkerConnection(_BrokerConnection):
352     def __init__(self, broker, worker_class, worker_arguments=None):
353         self._client = worker_class(self, worker_arguments)
354         _BrokerConnection.__init__(self, broker, self._client, ANY_WORKER_TOPIC, MANAGER_TOPIC)
355
356     def name(self):
357         return self._client.name()
358
359     def cancel(self):
360         raise NotImplementedError
361
362     def is_alive(self):
363         raise NotImplementedError
364
365     def join(self, timeout):
366         raise NotImplementedError
367
368     def yield_to_broker(self):
369         pass
370
371
372 class _InlineWorkerConnection(_WorkerConnection):
373     def __init__(self, broker, manager_client, worker_class, worker_arguments):
374         _WorkerConnection.__init__(self, broker, worker_class, worker_arguments)
375         self._alive = False
376         self._manager_client = manager_client
377
378     def cancel(self):
379         self._client.cancel()
380
381     def is_alive(self):
382         return self._alive
383
384     def join(self, timeout):
385         assert not self._alive
386
387     def set_inline_arguments(self, arguments):
388         self._client.set_inline_arguments(arguments)
389
390     def run(self):
391         self._alive = True
392         try:
393             self._client.run()
394         finally:
395             self._alive = False
396
397     def yield_to_broker(self):
398         self._broker.run_all_pending(MANAGER_TOPIC, self._manager_client)
399
400     def raise_exception(self, exc_info):
401         # Since the worker is in the same process as the manager, we can
402         # raise the exception directly, rather than having to send it through
403         # the queue. This allows us to preserve the traceback.
404         raise exc_info[0], exc_info[1], exc_info[2]
405
406
407 class _Process(multiprocessing.Process):
408     def __init__(self, worker_connection, client):
409         multiprocessing.Process.__init__(self)
410         self._worker_connection = worker_connection
411         self._client = client
412
413     def run(self):
414         self._client.run()
415
416
417 class _MultiProcessWorkerConnection(_WorkerConnection):
418     def __init__(self, broker, worker_class, worker_arguments):
419         _WorkerConnection.__init__(self, broker, worker_class, worker_arguments)
420         self._proc = _Process(self, self._client)
421
422     def cancel(self):
423         return self._proc.terminate()
424
425     def is_alive(self):
426         return self._proc.is_alive()
427
428     def join(self, timeout):
429         return self._proc.join(timeout)
430
431     def start(self):
432         self._proc.start()