Unreviewed. Update W3C WebDriver imported tests.
[WebKit-https.git] / WebDriverTests / imported / w3c / tools / wptrunner / wptrunner / testrunner.py
1 from __future__ import unicode_literals
2
3 import multiprocessing
4 import sys
5 import threading
6 import traceback
7 from Queue import Empty
8 from collections import namedtuple
9 from multiprocessing import Process, current_process, Queue
10
11 from mozlog import structuredlog
12
13 # Special value used as a sentinal in various commands
14 Stop = object()
15
16
17 class MessageLogger(object):
18     def __init__(self, message_func):
19         self.send_message = message_func
20
21     def _log_data(self, action, **kwargs):
22         self.send_message("log", action, kwargs)
23
24     def process_output(self, process, data, command):
25         self._log_data("process_output", process=process, data=data, command=command)
26
27
28 def _log_func(level_name):
29     def log(self, message):
30         self._log_data(level_name.lower(), message=message)
31     log.__doc__ = """Log a message with level %s
32
33 :param message: The string message to log
34 """ % level_name
35     log.__name__ = str(level_name).lower()
36     return log
37
38 # Create all the methods on StructuredLog for debug levels
39 for level_name in structuredlog.log_levels:
40     setattr(MessageLogger, level_name.lower(), _log_func(level_name))
41
42
43 class TestRunner(object):
44     def __init__(self, command_queue, result_queue, executor):
45         """Class implementing the main loop for running tests.
46
47         This class delegates the job of actually running a test to the executor
48         that is passed in.
49
50         :param command_queue: subprocess.Queue used to send commands to the
51                               process
52         :param result_queue: subprocess.Queue used to send results to the
53                              parent TestManager process
54         :param executor: TestExecutor object that will actually run a test.
55         """
56         self.command_queue = command_queue
57         self.result_queue = result_queue
58
59         self.executor = executor
60         self.name = current_process().name
61         self.logger = MessageLogger(self.send_message)
62
63     def __enter__(self):
64         return self
65
66     def __exit__(self, exc_type, exc_value, traceback):
67         self.teardown()
68
69     def setup(self):
70         self.logger.debug("Executor setup")
71         self.executor.setup(self)
72         self.logger.debug("Executor setup done")
73
74     def teardown(self):
75         self.executor.teardown()
76         self.send_message("runner_teardown")
77         self.result_queue = None
78         self.command_queue = None
79         self.browser = None
80
81     def run(self):
82         """Main loop accepting commands over the pipe and triggering
83         the associated methods"""
84         self.setup()
85         commands = {"run_test": self.run_test,
86                     "stop": self.stop,
87                     "wait": self.wait}
88         while True:
89             command, args = self.command_queue.get()
90             try:
91                 rv = commands[command](*args)
92             except Exception:
93                 self.send_message("error",
94                                   "Error running command %s with arguments %r:\n%s" %
95                                   (command, args, traceback.format_exc()))
96             else:
97                 if rv is Stop:
98                     break
99
100     def stop(self):
101         return Stop
102
103     def run_test(self, test):
104         try:
105             return self.executor.run_test(test)
106         except Exception:
107             self.logger.critical(traceback.format_exc())
108             raise
109
110     def wait(self):
111         self.executor.protocol.wait()
112         self.send_message("wait_finished")
113
114     def send_message(self, command, *args):
115         self.result_queue.put((command, args))
116
117
118 def start_runner(runner_command_queue, runner_result_queue,
119                  executor_cls, executor_kwargs,
120                  executor_browser_cls, executor_browser_kwargs,
121                  stop_flag):
122     """Launch a TestRunner in a new process"""
123     try:
124         browser = executor_browser_cls(**executor_browser_kwargs)
125         executor = executor_cls(browser, **executor_kwargs)
126         with TestRunner(runner_command_queue, runner_result_queue, executor) as runner:
127             try:
128                 runner.run()
129             except KeyboardInterrupt:
130                 stop_flag.set()
131     except Exception:
132         runner_result_queue.put(("log", ("critical", {"message": traceback.format_exc()})))
133         print >> sys.stderr, traceback.format_exc()
134         stop_flag.set()
135     finally:
136         runner_command_queue = None
137         runner_result_queue = None
138
139
140 manager_count = 0
141
142
143 def next_manager_number():
144     global manager_count
145     local = manager_count = manager_count + 1
146     return local
147
148
149 class BrowserManager(object):
150     def __init__(self, logger, browser, command_queue, no_timeout=False):
151         self.logger = logger
152         self.browser = browser
153         self.no_timeout = no_timeout
154         self.browser_settings = None
155         self.last_test = None
156
157         self.started = False
158
159         self.init_timer = None
160         self.command_queue = command_queue
161
162     def update_settings(self, test):
163         browser_settings = self.browser.settings(test)
164         restart_required = ((self.browser_settings is not None and
165                              self.browser_settings != browser_settings) or
166                             (self.last_test != test and test.expected() == "CRASH"))
167         self.browser_settings = browser_settings
168         self.last_test = test
169         return restart_required
170
171     def init(self):
172         """Launch the browser that is being tested,
173         and the TestRunner process that will run the tests."""
174         # It seems that this lock is helpful to prevent some race that otherwise
175         # sometimes stops the spawned processes initialising correctly, and
176         # leaves this thread hung
177         if self.init_timer is not None:
178             self.init_timer.cancel()
179
180         self.logger.debug("Init called, starting browser and runner")
181
182         if not self.no_timeout:
183             self.init_timer = threading.Timer(self.browser.init_timeout,
184                                               self.init_timeout)
185         try:
186             if self.init_timer is not None:
187                 self.init_timer.start()
188             self.logger.debug("Starting browser with settings %r" % self.browser_settings)
189             self.browser.start(**self.browser_settings)
190             self.browser_pid = self.browser.pid()
191         except:
192             self.logger.warning("Failure during init %s" % traceback.format_exc())
193             if self.init_timer is not None:
194                 self.init_timer.cancel()
195             self.logger.error(traceback.format_exc())
196             succeeded = False
197         else:
198             succeeded = True
199             self.started = True
200
201         return succeeded
202
203     def send_message(self, command, *args):
204         self.command_queue.put((command, args))
205
206     def init_timeout(self):
207         # This is called from a seperate thread, so we send a message to the
208         # main loop so we get back onto the manager thread
209         self.logger.debug("init_failed called from timer")
210         self.send_message("init_failed")
211
212     def after_init(self):
213         """Callback when we have started the browser, started the remote
214         control connection, and we are ready to start testing."""
215         if self.init_timer is not None:
216             self.init_timer.cancel()
217
218     def stop(self, force=False):
219         self.browser.stop(force=force)
220         self.started = False
221
222     def cleanup(self):
223         if self.init_timer is not None:
224             self.init_timer.cancel()
225         self.browser.cleanup()
226
227     def check_for_crashes(self):
228         self.browser.check_for_crashes()
229
230     def log_crash(self, test_id):
231         self.browser.log_crash(process=self.browser_pid, test=test_id)
232
233     def is_alive(self):
234         return self.browser.is_alive()
235
236
237 class _RunnerManagerState(object):
238     before_init = namedtuple("before_init", [])
239     initializing = namedtuple("initializing_browser",
240                               ["test", "test_group", "group_metadata", "failure_count"])
241     running = namedtuple("running", ["test", "test_group", "group_metadata"])
242     restarting = namedtuple("restarting", ["test", "test_group", "group_metadata"])
243     error = namedtuple("error", [])
244     stop = namedtuple("stop", [])
245
246
247 RunnerManagerState = _RunnerManagerState()
248
249
250 class TestRunnerManager(threading.Thread):
251     def __init__(self, suite_name, test_queue, test_source_cls, browser_cls, browser_kwargs,
252                  executor_cls, executor_kwargs, stop_flag, rerun=1, pause_after_test=False,
253                  pause_on_unexpected=False, restart_on_unexpected=True, debug_info=None):
254         """Thread that owns a single TestRunner process and any processes required
255         by the TestRunner (e.g. the Firefox binary).
256
257         TestRunnerManagers are responsible for launching the browser process and the
258         runner process, and for logging the test progress. The actual test running
259         is done by the TestRunner. In particular they:
260
261         * Start the binary of the program under test
262         * Start the TestRunner
263         * Tell the TestRunner to start a test, if any
264         * Log that the test started
265         * Log the test results
266         * Take any remedial action required e.g. restart crashed or hung
267           processes
268         """
269         self.suite_name = suite_name
270
271         self.test_source = test_source_cls(test_queue)
272
273         self.browser_cls = browser_cls
274         self.browser_kwargs = browser_kwargs
275
276         self.executor_cls = executor_cls
277         self.executor_kwargs = executor_kwargs
278
279         # Flags used to shut down this thread if we get a sigint
280         self.parent_stop_flag = stop_flag
281         self.child_stop_flag = multiprocessing.Event()
282
283         self.rerun = rerun
284         self.run_count = 0
285         self.pause_after_test = pause_after_test
286         self.pause_on_unexpected = pause_on_unexpected
287         self.restart_on_unexpected = restart_on_unexpected
288         self.debug_info = debug_info
289
290         self.manager_number = next_manager_number()
291
292         self.command_queue = Queue()
293         self.remote_queue = Queue()
294
295         self.test_runner_proc = None
296
297         threading.Thread.__init__(self, name="Thread-TestrunnerManager-%i" % self.manager_number)
298         # This is started in the actual new thread
299         self.logger = None
300
301         self.unexpected_count = 0
302
303         # This may not really be what we want
304         self.daemon = True
305
306         self.max_restarts = 5
307
308         self.browser = None
309
310     def run(self):
311         """Main loop for the TestManager.
312
313         TestManagers generally receive commands from their
314         TestRunner updating them on the status of a test. They
315         may also have a stop flag set by the main thread indicating
316         that the manager should shut down the next time the event loop
317         spins."""
318         self.logger = structuredlog.StructuredLogger(self.suite_name)
319         with self.browser_cls(self.logger, **self.browser_kwargs) as browser:
320             self.browser = BrowserManager(self.logger,
321                                           browser,
322                                           self.command_queue,
323                                           no_timeout=self.debug_info is not None)
324             dispatch = {
325                 RunnerManagerState.before_init: self.start_init,
326                 RunnerManagerState.initializing: self.init,
327                 RunnerManagerState.running: self.run_test,
328                 RunnerManagerState.restarting: self.restart_runner
329             }
330
331             self.state = RunnerManagerState.before_init()
332             end_states = (RunnerManagerState.stop,
333                           RunnerManagerState.error)
334
335             try:
336                 while not isinstance(self.state, end_states):
337                     f = dispatch.get(self.state.__class__)
338                     while f:
339                         self.logger.debug("Dispatch %s" % f.__name__)
340                         if self.should_stop():
341                             return
342                         new_state = f()
343                         if new_state is None:
344                             break
345                         self.state = new_state
346                         self.logger.debug("new state: %s" % self.state.__class__.__name__)
347                         if isinstance(self.state, end_states):
348                             return
349                         f = dispatch.get(self.state.__class__)
350
351                     new_state = None
352                     while new_state is None:
353                         new_state = self.wait_event()
354                         if self.should_stop():
355                             return
356                     self.state = new_state
357                     self.logger.debug("new state: %s" % self.state.__class__.__name__)
358             except Exception as e:
359                 self.logger.error(traceback.format_exc(e))
360                 raise
361             finally:
362                 self.logger.debug("TestRunnerManager main loop terminating, starting cleanup")
363                 clean = isinstance(self.state, RunnerManagerState.stop)
364                 self.stop_runner(force=not clean)
365                 self.teardown()
366         self.logger.debug("TestRunnerManager main loop terminated")
367
368     def wait_event(self):
369         dispatch = {
370             RunnerManagerState.before_init: {},
371             RunnerManagerState.initializing:
372             {
373                 "init_succeeded": self.init_succeeded,
374                 "init_failed": self.init_failed,
375             },
376             RunnerManagerState.running:
377             {
378                 "test_ended": self.test_ended,
379                 "wait_finished": self.wait_finished,
380             },
381             RunnerManagerState.restarting: {},
382             RunnerManagerState.error: {},
383             RunnerManagerState.stop: {},
384             None: {
385                 "runner_teardown": self.runner_teardown,
386                 "log": self.log,
387                 "error": self.error
388             }
389         }
390         try:
391             command, data = self.command_queue.get(True, 1)
392         except IOError:
393             self.logger.error("Got IOError from poll")
394             return RunnerManagerState.restarting(0)
395         except Empty:
396             if (self.debug_info and self.debug_info.interactive and
397                 self.browser.started and not self.browser.is_alive()):
398                 self.logger.debug("Debugger exited")
399                 return RunnerManagerState.stop()
400
401             if (isinstance(self.state, RunnerManagerState.running) and
402                 not self.test_runner_proc.is_alive()):
403                 if not self.command_queue.empty():
404                     # We got a new message so process that
405                     return
406
407                 # If we got to here the runner presumably shut down
408                 # unexpectedly
409                 self.logger.info("Test runner process shut down")
410
411                 if self.state.test is not None:
412                     # This could happen if the test runner crashed for some other
413                     # reason
414                     # Need to consider the unlikely case where one test causes the
415                     # runner process to repeatedly die
416                     self.logger.critical("Last test did not complete")
417                     return RunnerManagerState.error()
418                 self.logger.warning("More tests found, but runner process died, restarting")
419                 return RunnerManagerState.restarting(0)
420         else:
421             f = (dispatch.get(self.state.__class__, {}).get(command) or
422                  dispatch.get(None, {}).get(command))
423             if not f:
424                 self.logger.warning("Got command %s in state %s" %
425                                     (command, self.state.__class__.__name__))
426                 return
427             return f(*data)
428
429     def should_stop(self):
430         return self.child_stop_flag.is_set() or self.parent_stop_flag.is_set()
431
432     def start_init(self):
433         test, test_group, group_metadata = self.get_next_test()
434         if test is None:
435             return RunnerManagerState.stop()
436         else:
437             return RunnerManagerState.initializing(test, test_group, group_metadata, 0)
438
439     def init(self):
440         assert isinstance(self.state, RunnerManagerState.initializing)
441         if self.state.failure_count > self.max_restarts:
442             self.logger.error("Max restarts exceeded")
443             return RunnerManagerState.error()
444
445         self.browser.update_settings(self.state.test)
446
447         result = self.browser.init()
448         if result is Stop:
449             return RunnerManagerState.error()
450         elif not result:
451             return RunnerManagerState.initializing(self.state.test,
452                                                    self.state.test_group,
453                                                    self.state.group_metadata,
454                                                    self.state.failure_count + 1)
455         else:
456             self.executor_kwargs["group_metadata"] = self.state.group_metadata
457             self.start_test_runner()
458
459     def start_test_runner(self):
460         # Note that we need to be careful to start the browser before the
461         # test runner to ensure that any state set when the browser is started
462         # can be passed in to the test runner.
463         assert isinstance(self.state, RunnerManagerState.initializing)
464         assert self.command_queue is not None
465         assert self.remote_queue is not None
466         self.logger.info("Starting runner")
467         executor_browser_cls, executor_browser_kwargs = self.browser.browser.executor_browser()
468
469         args = (self.remote_queue,
470                 self.command_queue,
471                 self.executor_cls,
472                 self.executor_kwargs,
473                 executor_browser_cls,
474                 executor_browser_kwargs,
475                 self.child_stop_flag)
476         self.test_runner_proc = Process(target=start_runner,
477                                         args=args,
478                                         name="Thread-TestRunner-%i" % self.manager_number)
479         self.test_runner_proc.start()
480         self.logger.debug("Test runner started")
481         # Now we wait for either an init_succeeded event or an init_failed event
482
483     def init_succeeded(self):
484         assert isinstance(self.state, RunnerManagerState.initializing)
485         self.browser.after_init()
486         return RunnerManagerState.running(self.state.test,
487                                           self.state.test_group,
488                                           self.state.group_metadata)
489
490     def init_failed(self):
491         assert isinstance(self.state, RunnerManagerState.initializing)
492         self.browser.after_init()
493         self.stop_runner(force=True)
494         return RunnerManagerState.initializing(self.state.test,
495                                                self.state.test_group,
496                                                self.state.group_metadata,
497                                                self.state.failure_count + 1)
498
499     def get_next_test(self, test_group=None):
500         test = None
501         while test is None:
502             while test_group is None or len(test_group) == 0:
503                 test_group, group_metadata = self.test_source.group()
504                 if test_group is None:
505                     self.logger.info("No more tests")
506                     return None, None, None
507             test = test_group.popleft()
508         self.run_count = 0
509         return test, test_group, group_metadata
510
511     def run_test(self):
512         assert isinstance(self.state, RunnerManagerState.running)
513         assert self.state.test is not None
514
515         if self.browser.update_settings(self.state.test):
516             self.logger.info("Restarting browser for new test environment")
517             return RunnerManagerState.restarting(self.state.test,
518                                                  self.state.test_group,
519                                                  self.state.group_metadata)
520
521         self.logger.test_start(self.state.test.id)
522         if self.rerun > 1:
523             self.logger.info("Run %d/%d" % (self.run_count, self.rerun))
524         self.run_count += 1
525         self.send_message("run_test", self.state.test)
526
527     def test_ended(self, test, results):
528         """Handle the end of a test.
529
530         Output the result of each subtest, and the result of the overall
531         harness to the logs.
532         """
533         assert isinstance(self.state, RunnerManagerState.running)
534         assert test == self.state.test
535         # Write the result of each subtest
536         file_result, test_results = results
537         subtest_unexpected = False
538         for result in test_results:
539             if test.disabled(result.name):
540                 continue
541             expected = test.expected(result.name)
542             is_unexpected = expected != result.status
543
544             if is_unexpected:
545                 self.unexpected_count += 1
546                 self.logger.debug("Unexpected count in this thread %i" % self.unexpected_count)
547                 subtest_unexpected = True
548             self.logger.test_status(test.id,
549                                     result.name,
550                                     result.status,
551                                     message=result.message,
552                                     expected=expected,
553                                     stack=result.stack)
554
555         # TODO: consider changing result if there is a crash dump file
556
557         # Write the result of the test harness
558         expected = test.expected()
559         status = file_result.status if file_result.status != "EXTERNAL-TIMEOUT" else "TIMEOUT"
560
561         if file_result.status in  ("TIMEOUT", "EXTERNAL-TIMEOUT"):
562             if self.browser.check_for_crashes():
563                 status = "CRASH"
564
565         is_unexpected = expected != status
566         if is_unexpected:
567             self.unexpected_count += 1
568             self.logger.debug("Unexpected count in this thread %i" % self.unexpected_count)
569         if status == "CRASH":
570             self.browser.log_crash(test.id)
571
572         self.logger.test_end(test.id,
573                              status,
574                              message=file_result.message,
575                              expected=expected,
576                              extra=file_result.extra)
577
578         restart_before_next = (test.restart_after or
579                                file_result.status in ("CRASH", "EXTERNAL-TIMEOUT") or
580                                ((subtest_unexpected or is_unexpected)
581                                 and self.restart_on_unexpected))
582
583         if (self.pause_after_test or
584             (self.pause_on_unexpected and (subtest_unexpected or is_unexpected))):
585             self.logger.info("Pausing until the browser exits")
586             self.send_message("wait")
587         else:
588             return self.after_test_end(test, restart_before_next)
589
590     def wait_finished(self):
591         assert isinstance(self.state, RunnerManagerState.running)
592         # The browser should be stopped already, but this ensures we do any post-stop
593         # processing
594         self.logger.debug("Wait finished")
595
596         return self.after_test_end(self.state.test, True)
597
598     def after_test_end(self, test, restart):
599         assert isinstance(self.state, RunnerManagerState.running)
600         if self.run_count == self.rerun:
601             test, test_group, group_metadata = self.get_next_test()
602             if test is None:
603                 return RunnerManagerState.stop()
604             if test_group != self.state.test_group:
605                 # We are starting a new group of tests, so force a restart
606                 restart = True
607         else:
608             test = test
609             test_group = self.state.test_group
610             group_metadata = self.state.group_metadata
611         if restart:
612             return RunnerManagerState.restarting(test, test_group, group_metadata)
613         else:
614             return RunnerManagerState.running(test, test_group, group_metadata)
615
616     def restart_runner(self):
617         """Stop and restart the TestRunner"""
618         assert isinstance(self.state, RunnerManagerState.restarting)
619         self.stop_runner()
620         return RunnerManagerState.initializing(self.state.test, self.state.test_group, self.state.group_metadata, 0)
621
622     def log(self, action, kwargs):
623         getattr(self.logger, action)(**kwargs)
624
625     def error(self, message):
626         self.logger.error(message)
627         self.restart_runner()
628
629     def stop_runner(self, force=False):
630         """Stop the TestRunner and the browser binary."""
631         if self.test_runner_proc is None:
632             return
633
634         if self.test_runner_proc.is_alive():
635             self.send_message("stop")
636         try:
637             self.browser.stop(force=force)
638             self.ensure_runner_stopped()
639         finally:
640             self.cleanup()
641
642     def teardown(self):
643         self.logger.debug("teardown in testrunnermanager")
644         self.test_runner_proc = None
645         self.command_queue.close()
646         self.remote_queue.close()
647         self.command_queue = None
648         self.remote_queue = None
649
650     def ensure_runner_stopped(self):
651         self.logger.debug("ensure_runner_stopped")
652         if self.test_runner_proc is None:
653             return
654
655         self.logger.debug("waiting for runner process to end")
656         self.test_runner_proc.join(10)
657         self.logger.debug("After join")
658         if self.test_runner_proc.is_alive():
659             # This might leak a file handle from the queue
660             self.logger.warning("Forcibly terminating runner process")
661             self.test_runner_proc.terminate()
662             self.test_runner_proc.join(10)
663         else:
664             self.logger.debug("Testrunner exited with code %i" % self.test_runner_proc.exitcode)
665
666     def runner_teardown(self):
667         self.ensure_runner_stopped()
668         return RunnerManagerState.stop()
669
670     def send_message(self, command, *args):
671         self.remote_queue.put((command, args))
672
673     def cleanup(self):
674         self.logger.debug("TestManager cleanup")
675         if self.browser:
676             self.browser.cleanup()
677         while True:
678             try:
679                 self.logger.warning(" ".join(map(repr, self.command_queue.get_nowait())))
680             except Empty:
681                 break
682
683
684 def make_test_queue(tests, test_source_cls, **test_source_kwargs):
685     queue = test_source_cls.make_queue(tests, **test_source_kwargs)
686
687     # There is a race condition that means sometimes we continue
688     # before the tests have been written to the underlying pipe.
689     # Polling the pipe for data here avoids that
690     queue._reader.poll(10)
691     assert not queue.empty()
692     return queue
693
694
695 class ManagerGroup(object):
696     def __init__(self, suite_name, size, test_source_cls, test_source_kwargs,
697                  browser_cls, browser_kwargs,
698                  executor_cls, executor_kwargs,
699                  rerun=1,
700                  pause_after_test=False,
701                  pause_on_unexpected=False,
702                  restart_on_unexpected=True,
703                  debug_info=None):
704         """Main thread object that owns all the TestManager threads."""
705         self.suite_name = suite_name
706         self.size = size
707         self.test_source_cls = test_source_cls
708         self.test_source_kwargs = test_source_kwargs
709         self.browser_cls = browser_cls
710         self.browser_kwargs = browser_kwargs
711         self.executor_cls = executor_cls
712         self.executor_kwargs = executor_kwargs
713         self.pause_after_test = pause_after_test
714         self.pause_on_unexpected = pause_on_unexpected
715         self.restart_on_unexpected = restart_on_unexpected
716         self.debug_info = debug_info
717         self.rerun = rerun
718
719         self.pool = set()
720         # Event that is polled by threads so that they can gracefully exit in the face
721         # of sigint
722         self.stop_flag = threading.Event()
723         self.logger = structuredlog.StructuredLogger(suite_name)
724
725     def __enter__(self):
726         return self
727
728     def __exit__(self, exc_type, exc_val, exc_tb):
729         self.stop()
730
731     def run(self, test_type, tests):
732         """Start all managers in the group"""
733         self.logger.debug("Using %i processes" % self.size)
734         type_tests = tests[test_type]
735         if not type_tests:
736             self.logger.info("No %s tests to run" % test_type)
737             return
738
739         test_queue = make_test_queue(type_tests, self.test_source_cls, **self.test_source_kwargs)
740
741         for _ in range(self.size):
742             manager = TestRunnerManager(self.suite_name,
743                                         test_queue,
744                                         self.test_source_cls,
745                                         self.browser_cls,
746                                         self.browser_kwargs,
747                                         self.executor_cls,
748                                         self.executor_kwargs,
749                                         self.stop_flag,
750                                         self.rerun,
751                                         self.pause_after_test,
752                                         self.pause_on_unexpected,
753                                         self.restart_on_unexpected,
754                                         self.debug_info)
755             manager.start()
756             self.pool.add(manager)
757         self.wait()
758
759     def is_alive(self):
760         """Boolean indicating whether any manager in the group is still alive"""
761         return any(manager.is_alive() for manager in self.pool)
762
763     def wait(self):
764         """Wait for all the managers in the group to finish"""
765         for item in self.pool:
766             item.join()
767
768     def stop(self):
769         """Set the stop flag so that all managers in the group stop as soon
770         as possible"""
771         self.stop_flag.set()
772         self.logger.debug("Stop flag set in ManagerGroup")
773
774     def unexpected_count(self):
775         return sum(item.unexpected_count for item in self.pool)