cfd1b61657cb7571911596367636bb74c4faf532
[WebKit-https.git] / Tools / Scripts / webkitpy / webdriver_tests / pytest_runner.py
1 # Copyright (C) 2017 Igalia S.L.
2 #
3 # Redistribution and use in source and binary forms, with or without
4 # modification, are permitted provided that the following conditions
5 # are met:
6 # 1.  Redistributions of source code must retain the above copyright
7 #     notice, this list of conditions and the following disclaimer.
8 # 2.  Redistributions in binary form must reproduce the above copyright
9 #     notice, this list of conditions and the following disclaimer in the
10 #     documentation and/or other materials provided with the distribution.
11 #
12 # THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' AND ANY
13 # EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
14 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
15 # DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY
16 # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
17 # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
18 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
19 # ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
20 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
21 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22
23 import errno
24 import json
25 import os
26 import shutil
27 import sys
28 import tempfile
29
30 import webkitpy.thirdparty.autoinstalled.pytest
31 import webkitpy.thirdparty.autoinstalled.pytest_timeout
32 import pytest
33 from _pytest.main import EXIT_INTERNALERROR
34
35
36 class TemporaryDirectory(object):
37
38     def __enter__(self):
39         self.path = tempfile.mkdtemp(prefix="pytest-")
40         return self.path
41
42     def __exit__(self, *args):
43         try:
44             shutil.rmtree(self.path)
45         except OSError as e:
46             # no such file or directory
47             if e.errno != errno.ENOENT:
48                 raise
49
50
51 class CollectRecorder(object):
52
53     def __init__(self):
54         self.tests = []
55
56     def pytest_collectreport(self, report):
57         if report.nodeid:
58             self.tests.append(report.nodeid)
59
60
61 class HarnessResultRecorder(object):
62
63     def __init__(self):
64         self.outcome = ('OK', None)
65
66     def pytest_collectreport(self, report):
67         if report.outcome == 'failed':
68             self.outcome = ('ERROR', None)
69         elif report.outcome == 'skipped':
70             self.outcome = ('SKIP', None)
71
72
73 class SubtestResultRecorder(object):
74
75     def __init__(self):
76         self.results = []
77
78     def pytest_runtest_logreport(self, report):
79         if report.passed and report.when == 'call':
80             self.record_pass(report)
81         elif report.failed:
82             if report.when != 'call':
83                 self.record_error(report)
84             else:
85                 self.record_fail(report)
86         elif report.skipped:
87             self.record_skip(report)
88
89     def _was_timeout(self, report):
90         return hasattr(report.longrepr, 'reprcrash') and report.longrepr.reprcrash.message.startswith('Failed: Timeout >')
91
92     def record_pass(self, report):
93         if hasattr(report, 'wasxfail'):
94             if report.wasxfail == 'Timeout':
95                 self.record(report.nodeid, 'XPASS_TIMEOUT')
96             else:
97                 self.record(report.nodeid, 'XPASS')
98         else:
99             self.record(report.nodeid, 'PASS')
100
101     def record_fail(self, report):
102         if self._was_timeout(report):
103             self.record(report.nodeid, 'TIMEOUT', stack=report.longrepr)
104         else:
105             self.record(report.nodeid, 'FAIL', stack=report.longrepr)
106
107     def record_error(self, report):
108         # error in setup/teardown
109         if report.when != 'call':
110             message = '%s error' % report.when
111         self.record(report.nodeid, 'ERROR', message, report.longrepr)
112
113     def record_skip(self, report):
114         if hasattr(report, 'wasxfail'):
115             if self._was_timeout(report) and report.wasxfail != 'Timeout':
116                 self.record(report.nodeid, 'TIMEOUT', stack=report.longrepr)
117             else:
118                 self.record(report.nodeid, 'XFAIL')
119         else:
120             self.record(report.nodeid, 'SKIP')
121
122     def record(self, test, status, message=None, stack=None):
123         if stack is not None:
124             stack = str(stack)
125         new_result = (test, status, message, stack)
126         self.results.append(new_result)
127
128
129 def collect(directory, args):
130     collect_recorder = CollectRecorder()
131     stdout = sys.stdout
132     with open(os.devnull, 'wb') as devnull:
133         sys.stdout = devnull
134         with TemporaryDirectory() as cache_directory:
135             cmd = ['--collect-only',
136                    '--basetemp=%s' % cache_directory]
137             cmd.extend(args)
138             cmd.append(directory)
139             pytest.main(cmd, plugins=[collect_recorder])
140     sys.stdout = stdout
141     return collect_recorder.tests
142
143
144 def run(path, args, timeout, env={}):
145     harness_recorder = HarnessResultRecorder()
146     subtests_recorder = SubtestResultRecorder()
147     _environ = dict(os.environ)
148     os.environ.clear()
149     os.environ.update(env)
150
151     with TemporaryDirectory() as cache_directory:
152         try:
153             cmd = ['--verbose',
154                    '--capture=no',
155                    '--basetemp=%s' % cache_directory,
156                    '--showlocals',
157                    '--timeout=%s' % timeout,
158                    '-p', 'no:cacheprovider',
159                    '-p', 'pytest_timeout']
160             cmd.extend(args)
161             cmd.append(path)
162             result = pytest.main(cmd, plugins=[harness_recorder, subtests_recorder])
163             if result == EXIT_INTERNALERROR:
164                 harness_recorder.outcome = ('ERROR', None)
165         except Exception as e:
166             harness_recorder.outcome = ('ERROR', str(e))
167
168     os.environ.clear()
169     os.environ.update(_environ)
170
171     return harness_recorder.outcome, subtests_recorder.results