2011-02-11 Dirk Pranke <dpranke@chromium.org>
authordpranke@chromium.org <dpranke@chromium.org@268f45cc-cd09-0410-ab3c-d52691b4dbfc>
Sat, 12 Feb 2011 01:42:00 +0000 (01:42 +0000)
committerdpranke@chromium.org <dpranke@chromium.org@268f45cc-cd09-0410-ab3c-d52691b4dbfc>
Sat, 12 Feb 2011 01:42:00 +0000 (01:42 +0000)
        Reviewed by Tony Chang.

        This patch adds to NRWT most of the support needed to run the new
        message-based workers in separate threads or processes. The code
        isn't fully complete yet because we don't support cancel() or
        is_alive().

        https://bugs.webkit.org/show_bug.cgi?id=54070

        * Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker.py:
        * Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker_unittest.py:
        * Scripts/webkitpy/layout_tests/layout_package/test_runner2.py:
        * Scripts/webkitpy/layout_tests/port/base.py:
        * Scripts/webkitpy/layout_tests/port/mock_drt.py:
        * Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py:

git-svn-id: https://svn.webkit.org/repository/webkit/trunk@78398 268f45cc-cd09-0410-ab3c-d52691b4dbfc

Tools/ChangeLog
Tools/Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker.py
Tools/Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker_unittest.py
Tools/Scripts/webkitpy/layout_tests/layout_package/test_runner2.py
Tools/Scripts/webkitpy/layout_tests/port/base.py
Tools/Scripts/webkitpy/layout_tests/port/mock_drt.py
Tools/Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py

index b44341c..8a53f42 100644 (file)
@@ -1,3 +1,21 @@
+2011-02-11  Dirk Pranke  <dpranke@chromium.org>
+
+        Reviewed by Tony Chang.
+
+        This patch adds to NRWT most of the support needed to run the new
+        message-based workers in separate threads or processes. The code
+        isn't fully complete yet because we don't support cancel() or
+        is_alive().
+
+        https://bugs.webkit.org/show_bug.cgi?id=54070
+
+        * Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker.py:
+        * Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker_unittest.py:
+        * Scripts/webkitpy/layout_tests/layout_package/test_runner2.py:
+        * Scripts/webkitpy/layout_tests/port/base.py:
+        * Scripts/webkitpy/layout_tests/port/mock_drt.py:
+        * Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py:
+
 2011-02-11  Sailesh Agrawal  <sail@chromium.org>
 
         Reviewed by Kenneth Russell.
 2011-02-11  Sailesh Agrawal  <sail@chromium.org>
 
         Reviewed by Kenneth Russell.
index c47e9d7..a0f252c 100644 (file)
@@ -46,15 +46,24 @@ TestRunner2  --> _InlineManager ---> _InlineWorker <-> Worker
 import logging
 import optparse
 import Queue
 import logging
 import optparse
 import Queue
+import thread
 import threading
 import threading
+import time
+
 
 # Handle Python < 2.6 where multiprocessing isn't available.
 
 # Handle Python < 2.6 where multiprocessing isn't available.
+#
+# _Multiprocessing_Process is needed so that _MultiProcessWorker
+# can be defined with or without multiprocessing.
 try:
     import multiprocessing
 try:
     import multiprocessing
+    _Multiprocessing_Process = multiprocessing.Process
 except ImportError:
     multiprocessing = None
 except ImportError:
     multiprocessing = None
+    _Multiprocessing_Process = threading.Thread
 
 
 
 
+from webkitpy.layout_tests import port
 from webkitpy.layout_tests.layout_package import message_broker2
 
 
 from webkitpy.layout_tests.layout_package import message_broker2
 
 
@@ -164,8 +173,8 @@ class _InlineManager(_ManagerConnection):
         self._inline_worker = None
 
     def start_worker(self, worker_number):
         self._inline_worker = None
 
     def start_worker(self, worker_number):
-        self._inline_worker = _InlineWorker(self._broker, self._port, self._client,
-            self._worker_class, worker_number)
+        self._inline_worker = _InlineWorkerConnection(self._broker, self._port,
+            self._client, self._worker_class, worker_number)
         return self._inline_worker
 
     def run_message_loop(self, delay_secs=None):
         return self._inline_worker
 
     def run_message_loop(self, delay_secs=None):
@@ -177,12 +186,30 @@ class _InlineManager(_ManagerConnection):
 
 class _ThreadedManager(_ManagerConnection):
     def __init__(self, broker, port, options, client, worker_class):
 
 class _ThreadedManager(_ManagerConnection):
     def __init__(self, broker, port, options, client, worker_class):
-        raise NotImplementedError
+        _ManagerConnection.__init__(self, broker, options, client, worker_class)
+        self._port = port
+
+    def start_worker(self, worker_number):
+        worker_connection = _ThreadedWorkerConnection(self._broker, self._port,
+            self._worker_class, worker_number)
+        worker_connection.start()
+        return worker_connection
 
 
 class _MultiProcessManager(_ManagerConnection):
     def __init__(self, broker, port, options, client, worker_class):
 
 
 class _MultiProcessManager(_ManagerConnection):
     def __init__(self, broker, port, options, client, worker_class):
-        raise NotImplementedError
+        # Note that this class does not keep a handle to the actual port
+        # object, because it isn't Picklable. Instead it keeps the port
+        # name and recreates the port in the child process from the name
+        # and options.
+        _ManagerConnection.__init__(self, broker, options, client, worker_class)
+        self._platform_name = port.real_name()
+
+    def start_worker(self, worker_number):
+        worker_connection = _MultiProcessWorkerConnection(self._broker, self._platform_name,
+            self._worker_class, worker_number, self._options)
+        worker_connection.start()
+        return worker_connection
 
 
 class _WorkerConnection(message_broker2.BrokerConnection):
 
 
 class _WorkerConnection(message_broker2.BrokerConnection):
@@ -192,14 +219,11 @@ class _WorkerConnection(message_broker2.BrokerConnection):
         message_broker2.BrokerConnection.__init__(self, broker, self._client,
                                                   ANY_WORKER_TOPIC, MANAGER_TOPIC)
 
         message_broker2.BrokerConnection.__init__(self, broker, self._client,
                                                   ANY_WORKER_TOPIC, MANAGER_TOPIC)
 
-    def run(self):
-        raise NotImplementedError
-
     def yield_to_broker(self):
         pass
 
 
     def yield_to_broker(self):
         pass
 
 
-class _InlineWorker(_WorkerConnection):
+class _InlineWorkerConnection(_WorkerConnection):
     def __init__(self, broker, port, manager_client, worker_class, worker_number):
         _WorkerConnection.__init__(self, broker, worker_class, worker_number, port._options)
         self._port = port
     def __init__(self, broker, port, manager_client, worker_class, worker_number):
         _WorkerConnection.__init__(self, broker, worker_class, worker_number, port._options)
         self._port = port
@@ -210,3 +234,49 @@ class _InlineWorker(_WorkerConnection):
 
     def yield_to_broker(self):
         self._broker.run_all_pending(MANAGER_TOPIC, self._manager_client)
 
     def yield_to_broker(self):
         self._broker.run_all_pending(MANAGER_TOPIC, self._manager_client)
+
+
+class _Thread(threading.Thread):
+    def __init__(self, worker_connection, port, client):
+        threading.Thread.__init__(self)
+        self._worker_connection = worker_connection
+        self._port = port
+        self._client = client
+
+    def run(self):
+        # FIXME: We can remove this once everyone is on 2.6.
+        if not hasattr(self, 'ident'):
+            self.ident = thread.get_ident()
+        self._client.run(self._port)
+
+
+class _ThreadedWorkerConnection(_WorkerConnection):
+    def __init__(self, broker, port, worker_class, worker_number):
+        _WorkerConnection.__init__(self, broker, worker_class, worker_number, port._options)
+        self._thread = _Thread(self, port, self._client)
+
+    def start(self):
+        self._thread.start()
+
+
+class _Process(_Multiprocessing_Process):
+    def __init__(self, worker_connection, platform_name, options, client):
+        _Multiprocessing_Process.__init__(self)
+        self._worker_connection = worker_connection
+        self._platform_name = platform_name
+        self._options = options
+        self._client = client
+
+    def run(self):
+        logging.basicConfig()
+        port_obj = port.get(self._platform_name, self._options)
+        self._client.run(port_obj)
+
+
+class _MultiProcessWorkerConnection(_WorkerConnection):
+    def __init__(self, broker, platform_name, worker_class, worker_number, options):
+        _WorkerConnection.__init__(self, broker, worker_class, worker_number, options)
+        self._proc = _Process(self, platform_name, options, self._client)
+
+    def start(self):
+        self._proc.start()
index 1a432d6..ffbe081 100644 (file)
@@ -27,6 +27,7 @@
 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
 import optparse
 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
 import optparse
+import Queue
 import sys
 import unittest
 
 import sys
 import unittest
 
@@ -43,8 +44,38 @@ from webkitpy.layout_tests.layout_package import manager_worker_broker
 from webkitpy.layout_tests.layout_package import message_broker2
 
 
 from webkitpy.layout_tests.layout_package import message_broker2
 
 
-class TestWorker(object):
-    pass
+class TestWorker(manager_worker_broker.AbstractWorker):
+    def __init__(self, broker_connection, worker_number, options):
+        self._broker_connection = broker_connection
+        self._options = options
+        self._worker_number = worker_number
+        self._name = 'TestWorker/%d' % worker_number
+        self._stopped = False
+
+    def handle_stop(self, src):
+        self._stopped = True
+
+    def handle_test(self, src, an_int, a_str):
+        assert an_int == 1
+        assert a_str == "hello, world"
+        self._broker_connection.post_message('test', 2, 'hi, everybody')
+
+    def is_done(self):
+        return self._stopped
+
+    def name(self):
+        return self._name
+
+    def start(self):
+        pass
+
+    def run(self, port):
+        try:
+            self._broker_connection.run_message_loop()
+            self._broker_connection.yield_to_broker()
+            self._broker_connection.post_message('done')
+        except Exception, e:
+            self._broker_connection.post_message('exception', (type(e), str(e), None))
 
 
 def get_options(worker_model):
 
 
 def get_options(worker_model):
@@ -65,24 +96,131 @@ class FunctionTests(unittest.TestCase):
         self.assertTrue(make_broker(self, 'inline') is not None)
 
     def test_get__threads(self):
         self.assertTrue(make_broker(self, 'inline') is not None)
 
     def test_get__threads(self):
-        self.assertRaises(NotImplementedError, make_broker, self, 'threads')
+        self.assertTrue(make_broker(self, 'threads') is not None)
 
     def test_get__processes(self):
         if multiprocessing:
 
     def test_get__processes(self):
         if multiprocessing:
-            self.assertRaises(NotImplementedError, make_broker, self, 'processes')
+            self.assertTrue(make_broker(self, 'processes') is not None)
         else:
             self.assertRaises(ValueError, make_broker, self, 'processes')
 
     def test_get__unknown(self):
         self.assertRaises(ValueError, make_broker, self, 'unknown')
 
         else:
             self.assertRaises(ValueError, make_broker, self, 'processes')
 
     def test_get__unknown(self):
         self.assertRaises(ValueError, make_broker, self, 'unknown')
 
+
+class _TestsMixin(object):
+    """Mixin class that implements a series of tests to enforce the
+    contract all implementations must follow."""
+
+    #
+    # Methods to implement the Manager side of the ClientInterface
+    #
+    def name(self):
+        return 'Tester'
+
+    def is_done(self):
+        return self._done
+
+    #
+    # Handlers for the messages the TestWorker may send.
+    #
+    def handle_done(self, src):
+        self._done = True
+
+    def handle_test(self, src, an_int, a_str):
+        self._an_int = an_int
+        self._a_str = a_str
+
+    def handle_exception(self, src, exc_info):
+        self._exception = exc_info
+        self._done = True
+
+    #
+    # Testing helper methods
+    #
+    def setUp(self):
+        self._an_int = None
+        self._a_str = None
+        self._broker = None
+        self._done = False
+        self._exception = None
+        self._worker_model = None
+
+    def make_broker(self):
+        self._broker = make_broker(self, self._worker_model)
+
+    #
+    # Actual unit tests
+    #
+    def test_done(self):
+        if not self._worker_model:
+            return
+        self.make_broker()
+        worker = self._broker.start_worker(0)
+        self._broker.post_message('test', 1, 'hello, world')
+        self._broker.post_message('stop')
+        self._broker.run_message_loop()
+        self.assertTrue(self.is_done())
+        self.assertEqual(self._an_int, 2)
+        self.assertEqual(self._a_str, 'hi, everybody')
+
+    def test_unknown_message(self):
+        if not self._worker_model:
+            return
+        self.make_broker()
+        worker = self._broker.start_worker(0)
+        self._broker.post_message('unknown')
+        self._broker.run_message_loop()
+
+        self.assertTrue(self.is_done())
+        self.assertEquals(self._exception[0], ValueError)
+        self.assertEquals(self._exception[1],
+            "TestWorker/0: received message 'unknown' it couldn't handle")
+
+
+class InlineBrokerTests(_TestsMixin, unittest.TestCase):
+    def setUp(self):
+        _TestsMixin.setUp(self)
+        self._worker_model = 'inline'
+
+
+class MultiProcessBrokerTests(_TestsMixin, unittest.TestCase):
+    def setUp(self):
+        _TestsMixin.setUp(self)
+        if multiprocessing:
+            self._worker_model = 'processes'
+        else:
+            self._worker_model = None
+
+    def queue(self):
+        return multiprocessing.Queue()
+
+
+class ThreadedBrokerTests(_TestsMixin, unittest.TestCase):
+    def setUp(self):
+        _TestsMixin.setUp(self)
+        self._worker_model = 'threads'
+
+
+class FunctionsTest(unittest.TestCase):
     def test_runtime_options(self):
         option_list = manager_worker_broker.runtime_options()
         parser = optparse.OptionParser(option_list=option_list)
         options, args = parser.parse_args([])
         self.assertTrue(options)
 
     def test_runtime_options(self):
         option_list = manager_worker_broker.runtime_options()
         parser = optparse.OptionParser(option_list=option_list)
         options, args = parser.parse_args([])
         self.assertTrue(options)
 
-# FIXME: Add in unit tests for the managers, coverage tests for the interfaces.
+
+class InterfaceTest(unittest.TestCase):
+    # These tests mostly exist to pacify coverage.
+
+    # FIXME: There must be a better way to do this and also verify
+    # that classes do implement every abstract method in an interface.
+    def test_managerconnection_is_abstract(self):
+        # Test that all the base class methods are abstract and have the
+        # signature we expect.
+        broker = make_broker(self, 'inline')
+        obj = manager_worker_broker._ManagerConnection(broker._broker, None, self, None)
+        self.assertRaises(NotImplementedError, obj.start_worker, 0)
 
 
 if __name__ == '__main__':
 
 
 if __name__ == '__main__':
index 75b9dda..f097b83 100644 (file)
@@ -45,9 +45,6 @@ _log = logging.getLogger(__name__)
 
 class TestRunner2(test_runner.TestRunner):
     def __init__(self, port, options, printer):
 
 class TestRunner2(test_runner.TestRunner):
     def __init__(self, port, options, printer):
-        if options.worker_model in ('threads', 'processes'):
-            raise ValueError('--worker-model=%s not supported yet' % options.worker_model)
-
         test_runner.TestRunner.__init__(self, port, options, printer)
         self._all_results = []
         self._group_stats = {}
         test_runner.TestRunner.__init__(self, port, options, printer)
         self._all_results = []
         self._group_stats = {}
index 8f3c840..5ff4bff 100644 (file)
@@ -467,6 +467,10 @@ class Port(object):
         ('cpu')."""
         return 'cpu'
 
         ('cpu')."""
         return 'cpu'
 
+    def real_name(self):
+        """Returns the actual name of the port, not the delegate's."""
+        return self.name()
+
     def get_option(self, name, default_value=None):
         # FIXME: Eventually we should not have to do a test for
         # hasattr(), and we should be able to just do
     def get_option(self, name, default_value=None):
         # FIXME: Eventually we should not have to do a test for
         # hasattr(), and we should be able to just do
index 370327f..1147846 100644 (file)
@@ -53,6 +53,10 @@ class MockDRTPort(object):
         if 'port_name' in kwargs:
             kwargs['port_name'] = kwargs['port_name'][len(prefix):]
         self.__delegate = factory.get(**kwargs)
         if 'port_name' in kwargs:
             kwargs['port_name'] = kwargs['port_name'][len(prefix):]
         self.__delegate = factory.get(**kwargs)
+        self.__real_name = prefix + self.__delegate.name()
+
+    def real_name(self):
+        return self.__real_name
 
     def __getattr__(self, name):
         return getattr(self.__delegate, name)
 
     def __getattr__(self, name):
         return getattr(self.__delegate, name)
@@ -271,7 +275,6 @@ class MockChromiumDRT(MockDRT):
         self._stdout.flush()
 
 
         self._stdout.flush()
 
 
-
 if __name__ == '__main__':
     fs = filesystem.FileSystem()
     sys.exit(main(sys.argv[1:], fs, sys.stdin, sys.stdout, sys.stderr))
 if __name__ == '__main__':
     fs = filesystem.FileSystem()
     sys.exit(main(sys.argv[1:], fs, sys.stdin, sys.stdout, sys.stderr))
index 891797d..84f5718 100644 (file)
@@ -467,10 +467,11 @@ class MainTest(unittest.TestCase):
         self.assertTrue(passing_run(['--worker-model', 'old-threads']))
 
     def test_worker_model__processes(self):
         self.assertTrue(passing_run(['--worker-model', 'old-threads']))
 
     def test_worker_model__processes(self):
-        self.assertRaises(ValueError, logging_run, ['--worker-model', 'processes'])
+        if compare_version(sys, '2.6')[0] >= 0:
+            self.assertTrue(passing_run(['--worker-model', 'processes']))
 
     def test_worker_model__threads(self):
 
     def test_worker_model__threads(self):
-        self.assertRaises(ValueError, logging_run, ['--worker-model', 'threads'])
+        self.assertTrue(passing_run(['--worker-model', 'threads']))
 
     def test_worker_model__unknown(self):
         self.assertRaises(ValueError, logging_run,
 
     def test_worker_model__unknown(self):
         self.assertRaises(ValueError, logging_run,