75ce40b25cd67da9e135732d0a2514dbb3bf57bf
[WebKit.git] / Tools / Scripts / webkitpy / layout_tests / layout_package / message_broker_unittest.py
1 # Copyright (C) 2010 Google Inc. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or without
4 # modification, are permitted provided that the following conditions are
5 # met:
6 #
7 #     * Redistributions of source code must retain the above copyright
8 # notice, this list of conditions and the following disclaimer.
9 #     * Redistributions in binary form must reproduce the above
10 # copyright notice, this list of conditions and the following disclaimer
11 # in the documentation and/or other materials provided with the
12 # distribution.
13 #     * Neither the name of Google Inc. nor the names of its
14 # contributors may be used to endorse or promote products derived from
15 # this software without specific prior written permission.
16 #
17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
29 import logging
30 import Queue
31 import sys
32 import thread
33 import threading
34 import time
35 import unittest
36
37 from webkitpy.common import array_stream
38 from webkitpy.common.system import outputcapture
39 from webkitpy.tool import mocktool
40
41 from webkitpy.layout_tests import run_webkit_tests
42
43 import message_broker
44
45
46 class TestThread(threading.Thread):
47     def __init__(self, started_queue, stopping_queue):
48         threading.Thread.__init__(self)
49         self._thread_id = None
50         self._started_queue = started_queue
51         self._stopping_queue = stopping_queue
52         self._timeout = False
53         self._timeout_queue = Queue.Queue()
54         self._exception_info = None
55
56     def id(self):
57         return self._thread_id
58
59     def getName(self):
60         return "worker-0"
61
62     def run(self):
63         self._covered_run()
64
65     def _covered_run(self):
66         # FIXME: this is a separate routine to work around a bug
67         # in coverage: see http://bitbucket.org/ned/coveragepy/issue/85.
68         self._thread_id = thread.get_ident()
69         try:
70             self._started_queue.put('')
71             msg = self._stopping_queue.get()
72             if msg == 'KeyboardInterrupt':
73                 raise KeyboardInterrupt
74             elif msg == 'Exception':
75                 raise ValueError()
76             elif msg == 'Timeout':
77                 self._timeout = True
78                 self._timeout_queue.get()
79         except:
80             self._exception_info = sys.exc_info()
81
82     def exception_info(self):
83         return self._exception_info
84
85     def next_timeout(self):
86         if self._timeout:
87             self._timeout_queue.put('done')
88             return time.time() - 10
89         return time.time()
90
91     def clear_next_timeout(self):
92         self._next_timeout = None
93
94
95 class TestHandler(logging.Handler):
96     def __init__(self, astream):
97         logging.Handler.__init__(self)
98         self._stream = astream
99
100     def emit(self, record):
101         self._stream.write(self.format(record))
102
103
104 class MultiThreadedBrokerTest(unittest.TestCase):
105     class MockTestRunner(object):
106         def __init__(self):
107             pass
108
109         def __del__(self):
110             pass
111
112         def update(self):
113             pass
114
115     def run_one_thread(self, msg):
116         runner = self.MockTestRunner()
117         port = None
118         options = mocktool.MockOptions(child_processes='1')
119         starting_queue = Queue.Queue()
120         stopping_queue = Queue.Queue()
121         broker = message_broker.MultiThreadedBroker(port, options)
122         broker._test_runner = runner
123         child_thread = TestThread(starting_queue, stopping_queue)
124         broker._workers['worker-0'] = message_broker._WorkerState('worker-0')
125         broker._workers['worker-0'].thread = child_thread
126         child_thread.start()
127         started_msg = starting_queue.get()
128         stopping_queue.put(msg)
129         res = broker.run_message_loop()
130         if msg == 'Timeout':
131             child_thread._timeout_queue.put('done')
132         child_thread.join(1.0)
133         self.assertFalse(child_thread.isAlive())
134         return res
135
136     def test_basic(self):
137         interrupted = self.run_one_thread('')
138         self.assertFalse(interrupted)
139
140     def test_interrupt(self):
141         self.assertRaises(KeyboardInterrupt, self.run_one_thread, 'KeyboardInterrupt')
142
143     def test_timeout(self):
144         # Because the timeout shows up as a wedged thread, this also tests
145         # log_wedged_worker().
146         oc = outputcapture.OutputCapture()
147         stdout, stderr = oc.capture_output()
148         logger = message_broker._log
149         astream = array_stream.ArrayStream()
150         handler = TestHandler(astream)
151         logger.addHandler(handler)
152         interrupted = self.run_one_thread('Timeout')
153         stdout, stderr = oc.restore_output()
154         self.assertFalse(interrupted)
155         logger.handlers.remove(handler)
156         self.assertTrue('All remaining threads are wedged, bailing out.' in astream.get())
157
158     def test_exception(self):
159         self.assertRaises(ValueError, self.run_one_thread, 'Exception')
160
161     def test_find_thread_stack_found(self):
162         id, stack = sys._current_frames().items()[0]
163         found_stack = message_broker._find_thread_stack(id)
164         self.assertNotEqual(found_stack, None)
165
166     def test_find_thread_stack_not_found(self):
167         found_stack = message_broker._find_thread_stack(0)
168         self.assertEqual(found_stack, None)
169
170
171 if __name__ == '__main__':
172     unittest.main()