webkitpy: Notify parent process when a worker is spawned
[WebKit-https.git] / Tools / Scripts / webkitpy / test / runner.py
index 9c95207..3c25a16 100644 (file)
 
 """code to actually run a list of python tests."""
 
-import logging
+import re
 import time
 import unittest
 
+from webkitpy.common import message_pool
 
-_log = logging.getLogger(__name__)
+_test_description = re.compile("(\w+) \(([\w.]+)\)")
+
+
+def unit_test_name(test):
+    m = _test_description.match(str(test))
+    return "%s.%s" % (m.group(2), m.group(1))
 
 
 class Runner(object):
-    def __init__(self, printer, options, loader):
-        self.options = options
+    def __init__(self, printer, loader):
         self.printer = printer
         self.loader = loader
+        self.tests_run = 0
+        self.errors = []
+        self.failures = []
+        self.worker_factory = lambda caller: _Worker(caller, self.loader)
+
+    def run(self, test_names, num_workers):
+        if not test_names:
+            return
+        num_workers = min(num_workers, len(test_names))
+        with message_pool.get(self, self.worker_factory, num_workers) as pool:
+            pool.run(('test', test_name) for test_name in test_names)
+
+    def handle(self, message_name, source, test_name=None, delay=None, failures=None, errors=None):
+        if message_name == 'did_spawn_worker':
+            return
+
+        if message_name == 'started_test':
+            self.printer.print_started_test(source, test_name)
+            return
 
-    def all_test_names(self, suite):
-        names = []
-        if hasattr(suite, '_tests'):
-            for t in suite._tests:
-                names.extend(self.all_test_names(t))
-        else:
-            names.append(self.printer.test_name(suite))
-        return names
-
-    def run(self, suite):
-        run_start_time = time.time()
-        all_test_names = self.all_test_names(suite)
+        self.tests_run += 1
+        if failures:
+            self.failures.append((test_name, failures))
+        if errors:
+            self.errors.append((test_name, errors))
+        self.printer.print_finished_test(source, test_name, delay, failures, errors)
+
+
+class _Worker(object):
+    def __init__(self, caller, loader):
+        self._caller = caller
+        self._loader = loader
+
+    def handle(self, message_name, source, test_name):
+        assert message_name == 'test'
         result = unittest.TestResult()
-        stop = run_start_time
-        for test_name in all_test_names:
-            self.printer.print_started_test(test_name)
-            num_failures = len(result.failures)
-            num_errors = len(result.errors)
-
-            start = time.time()
-            # FIXME: it's kinda lame that we re-load the test suites for each
-            # test, and this may slow things down, but this makes implementing
-            # the logging easy and will also allow us to parallelize nicely.
-            self.loader.loadTestsFromName(test_name, None).run(result)
-            stop = time.time()
-
-            err = None
-            failure = None
-            if len(result.failures) > num_failures:
-                failure = result.failures[num_failures][1]
-            elif len(result.errors) > num_errors:
-                err = result.errors[num_errors][1]
-            self.printer.print_finished_test(result, test_name, stop - start, failure, err)
-
-        self.printer.print_result(result, stop - run_start_time)
-
-        return result
+        start = time.time()
+        self._caller.post('started_test', test_name)
+
+        # We will need to rework this if a test_name results in multiple tests.
+        self._loader.loadTestsFromName(test_name, None).run(result)
+        self._caller.post('finished_test', test_name, time.time() - start,
+            [failure[1] for failure in result.failures], [error[1] for error in result.errors])