c47e9d7dc6eb3d3a556b588bc0538eed6fa7169d
[WebKit-https.git] / Tools / Scripts / webkitpy / layout_tests / layout_package / manager_worker_broker.py
1 # Copyright (C) 2011 Google Inc. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or without
4 # modification, are permitted provided that the following conditions are
5 # met:
6 #
7 #     * Redistributions of source code must retain the above copyright
8 # notice, this list of conditions and the following disclaimer.
9 #     * Redistributions in binary form must reproduce the above
10 # copyright notice, this list of conditions and the following disclaimer
11 # in the documentation and/or other materials provided with the
12 # distribution.
13 #     * Neither the name of Google Inc. nor the names of its
14 # contributors may be used to endorse or promote products derived from
15 # this software without specific prior written permission.
16 #
17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
29 """Module for handling messages and concurrency for run-webkit-tests.
30
31 This module implements a message broker that connects the manager
32 (TestRunner2) to the workers: it provides a messaging abstraction and
33 message loops (building on top of message_broker2), and handles starting
34 workers by launching threads and/or processes depending on the
35 requested configuration.
36
37 There are a lot of classes and objects involved in a fully connected system.
38 They interact more or less like:
39
40 TestRunner2  --> _InlineManager ---> _InlineWorker <-> Worker
41      ^                    \               /              ^
42      |                     v             v               |
43      \--------------------  MessageBroker   -------------/
44 """
45
46 import logging
47 import optparse
48 import Queue
49 import threading
50
51 # Handle Python < 2.6 where multiprocessing isn't available.
52 try:
53     import multiprocessing
54 except ImportError:
55     multiprocessing = None
56
57
58 from webkitpy.layout_tests.layout_package import message_broker2
59
60
61 _log = logging.getLogger(__name__)
62
63 #
64 # Topic names for Manager <-> Worker messaging
65 #
66 MANAGER_TOPIC = 'managers'
67 ANY_WORKER_TOPIC = 'workers'
68
69
70 def runtime_options():
71     """Return a list of optparse.Option objects for any runtime values used
72     by this module."""
73     options = [
74         optparse.make_option("--worker-model", action="store",
75             help=("controls worker model. Valid values are "
76             "'inline', 'threads', and 'processes'.")),
77     ]
78     return options
79
80
81 def get(port, options, client, worker_class):
82     """Return a connection to a manager/worker message_broker
83
84     Args:
85         port - handle to layout_tests/port object for port-specific stuff
86         options - optparse argument for command-line options
87         client - message_broker2.BrokerClient implementation to dispatch
88             replies to.
89         worker_class - type of workers to create. This class must implement
90             the methods in AbstractWorker.
91     Returns:
92         A handle to an object that will talk to a message broker configured
93         for the normal manager/worker communication.
94     """
95     worker_model = options.worker_model
96     if worker_model == 'inline':
97         queue_class = Queue.Queue
98         manager_class = _InlineManager
99     elif worker_model == 'threads':
100         queue_class = Queue.Queue
101         manager_class = _ThreadedManager
102     elif worker_model == 'processes' and multiprocessing:
103         queue_class = multiprocessing.Queue
104         manager_class = _MultiProcessManager
105     else:
106         raise ValueError("unsupported value for --worker-model: %s" %
107                          worker_model)
108
109     broker = message_broker2.Broker(options, queue_class)
110     return manager_class(broker, port, options, client, worker_class)
111
112
113 class AbstractWorker(message_broker2.BrokerClient):
114     def __init__(self, broker_connection, worker_number, options):
115         """The constructor should be used to do any simple initialization
116         necessary, but should not do anything that creates data structures
117         that cannot be Pickled or sent across processes (like opening
118         files or sockets). Complex initialization should be done at the
119         start of the run() call.
120
121         Args:
122             broker_connection - handle to the BrokerConnection object creating
123                 the worker and that can be used for messaging.
124             worker_number - identifier for this particular worker
125             options - command-line argument object from optparse"""
126
127         raise NotImplementedError
128
129     def run(self, port):
130         """Callback for the worker to start executing. Typically does any
131         remaining initialization and then calls broker_connection.run_message_loop()."""
132         raise NotImplementedError
133
134     def cancel(self):
135         """Called when possible to indicate to the worker to stop processing
136         messages and shut down. Note that workers may be stopped without this
137         method being called, so clients should not rely solely on this."""
138         raise NotImplementedError
139
140
141 class _ManagerConnection(message_broker2.BrokerConnection):
142     def __init__(self, broker, options, client, worker_class):
143         """Base initialization for all Manager objects.
144
145         Args:
146             broker: handle to the message_broker2 object
147             options: command line options object
148             client: callback object (the caller)
149             worker_class: class object to use to create workers.
150         """
151         message_broker2.BrokerConnection.__init__(self, broker, client,
152             MANAGER_TOPIC, ANY_WORKER_TOPIC)
153         self._options = options
154         self._worker_class = worker_class
155
156     def start_worker(self, worker_number):
157         raise NotImplementedError
158
159
160 class _InlineManager(_ManagerConnection):
161     def __init__(self, broker, port, options, client, worker_class):
162         _ManagerConnection.__init__(self, broker, options, client, worker_class)
163         self._port = port
164         self._inline_worker = None
165
166     def start_worker(self, worker_number):
167         self._inline_worker = _InlineWorker(self._broker, self._port, self._client,
168             self._worker_class, worker_number)
169         return self._inline_worker
170
171     def run_message_loop(self, delay_secs=None):
172         # Note that delay_secs is ignored in this case since we can't easily
173         # implement it.
174         self._inline_worker.run()
175         self._broker.run_all_pending(MANAGER_TOPIC, self._client)
176
177
178 class _ThreadedManager(_ManagerConnection):
179     def __init__(self, broker, port, options, client, worker_class):
180         raise NotImplementedError
181
182
183 class _MultiProcessManager(_ManagerConnection):
184     def __init__(self, broker, port, options, client, worker_class):
185         raise NotImplementedError
186
187
188 class _WorkerConnection(message_broker2.BrokerConnection):
189     def __init__(self, broker, worker_class, worker_number, options):
190         self._client = worker_class(self, worker_number, options)
191         self.name = self._client.name()
192         message_broker2.BrokerConnection.__init__(self, broker, self._client,
193                                                   ANY_WORKER_TOPIC, MANAGER_TOPIC)
194
195     def run(self):
196         raise NotImplementedError
197
198     def yield_to_broker(self):
199         pass
200
201
202 class _InlineWorker(_WorkerConnection):
203     def __init__(self, broker, port, manager_client, worker_class, worker_number):
204         _WorkerConnection.__init__(self, broker, worker_class, worker_number, port._options)
205         self._port = port
206         self._manager_client = manager_client
207
208     def run(self):
209         self._client.run(self._port)
210
211     def yield_to_broker(self):
212         self._broker.run_all_pending(MANAGER_TOPIC, self._manager_client)