import Queue
import sys
import thread
+import threading
import time
import unittest
from webkitpy.common import array_stream
from webkitpy.common.system import outputcapture
+from webkitpy.tool import mocktool
from webkitpy.layout_tests import run_webkit_tests
-import dump_render_tree_thread
import message_broker
-class TestThread(dump_render_tree_thread.WatchableThread):
+class TestThread(threading.Thread):
def __init__(self, started_queue, stopping_queue):
- dump_render_tree_thread.WatchableThread.__init__(self)
+ threading.Thread.__init__(self)
+ self._thread_id = None
self._started_queue = started_queue
self._stopping_queue = stopping_queue
self._timeout = False
self._timeout_queue = Queue.Queue()
+ self._exception_info = None
+
+ def id(self):
+ return self._thread_id
+
+ def getName(self):
+ return "worker-0"
def run(self):
self._covered_run()
except:
self._exception_info = sys.exc_info()
+ def exception_info(self):
+ return self._exception_info
+
def next_timeout(self):
if self._timeout:
self._timeout_queue.put('done')
return time.time() - 10
return time.time()
+ def clear_next_timeout(self):
+ self._next_timeout = None
class TestHandler(logging.Handler):
def __init__(self, astream):
self._stream.write(self.format(record))
-class WaitForThreadsToFinishTest(unittest.TestCase):
- class MockTestRunner(run_webkit_tests.TestRunner):
+class MultiThreadedBrokerTest(unittest.TestCase):
+ class MockTestRunner(object):
def __init__(self):
pass
def __del__(self):
pass
- def update_summary(self, result_summary):
+ def update(self):
pass
def run_one_thread(self, msg):
runner = self.MockTestRunner()
+ port = None
+ options = mocktool.MockOptions(child_processes='1')
starting_queue = Queue.Queue()
stopping_queue = Queue.Queue()
+ broker = message_broker.MultiThreadedBroker(port, options)
+ broker._test_runner = runner
child_thread = TestThread(starting_queue, stopping_queue)
+ broker._workers['worker-0'] = message_broker._WorkerState('worker-0')
+ broker._workers['worker-0'].thread = child_thread
child_thread.start()
started_msg = starting_queue.get()
stopping_queue.put(msg)
- threads = [child_thread]
- return runner._wait_for_threads_to_finish(threads, None)
+ return broker.run_message_loop()
def test_basic(self):
interrupted = self.run_one_thread('')
self.assertFalse(interrupted)
def test_interrupt(self):
- interrupted = self.run_one_thread('KeyboardInterrupt')
- self.assertTrue(interrupted)
+ self.assertRaises(KeyboardInterrupt, self.run_one_thread, 'KeyboardInterrupt')
def test_timeout(self):
oc = outputcapture.OutputCapture()
found_stack = message_broker._find_thread_stack(0)
self.assertEqual(found_stack, None)
- def test_log_wedged_thread(self):
+ def test_log_wedged_worker(self):
oc = outputcapture.OutputCapture()
oc.capture_output()
logger = message_broker._log
child_thread.start()
msg = starting_queue.get()
- message_broker.log_wedged_thread(child_thread.id())
+ message_broker.log_wedged_worker(child_thread.getName(),
+ child_thread.id())
stopping_queue.put('')
child_thread.join(timeout=1.0)