10f3586f143368b230415c2643a6e6a1ae195dc8
[WebKit-https.git] / WebDriverTests / imported / w3c / tools / wptrunner / wptrunner / testloader.py
1 import hashlib
2 import json
3 import os
4 import urlparse
5 from abc import ABCMeta, abstractmethod
6 from Queue import Empty
7 from collections import defaultdict, OrderedDict, deque
8 from multiprocessing import Queue
9
10 import manifestinclude
11 import manifestexpected
12 import wpttest
13 from mozlog import structured
14
15 manifest = None
16 manifest_update = None
17
18 def do_delayed_imports():
19     # This relies on an already loaded module having set the sys.path correctly :(
20     global manifest, manifest_update
21     from manifest import manifest
22     from manifest import update as manifest_update
23
24 class TestChunker(object):
25     def __init__(self, total_chunks, chunk_number):
26         self.total_chunks = total_chunks
27         self.chunk_number = chunk_number
28         assert self.chunk_number <= self.total_chunks
29         self.logger = structured.get_default_logger()
30         assert self.logger
31
32     def __call__(self, manifest):
33         raise NotImplementedError
34
35
36 class Unchunked(TestChunker):
37     def __init__(self, *args, **kwargs):
38         TestChunker.__init__(self, *args, **kwargs)
39         assert self.total_chunks == 1
40
41     def __call__(self, manifest):
42         for item in manifest:
43             yield item
44
45
46 class HashChunker(TestChunker):
47     def __call__(self, manifest):
48         chunk_index = self.chunk_number - 1
49         for test_type, test_path, tests in manifest:
50             h = int(hashlib.md5(test_path).hexdigest(), 16)
51             if h % self.total_chunks == chunk_index:
52                 yield test_type, test_path, tests
53
54
55 class DirectoryHashChunker(TestChunker):
56     """Like HashChunker except the directory is hashed.
57
58     This ensures that all tests in the same directory end up in the same
59     chunk.
60     """
61     def __call__(self, manifest):
62         chunk_index = self.chunk_number - 1
63         for test_type, test_path, tests in manifest:
64             h = int(hashlib.md5(os.path.dirname(test_path)).hexdigest(), 16)
65             if h % self.total_chunks == chunk_index:
66                 yield test_type, test_path, tests
67
68
69 class EqualTimeChunker(TestChunker):
70     def _group_by_directory(self, manifest_items):
71         """Split the list of manifest items into a ordered dict that groups tests in
72         so that anything in the same subdirectory beyond a depth of 3 is in the same
73         group. So all tests in a/b/c, a/b/c/d and a/b/c/e will be grouped together
74         and separate to tests in a/b/f
75
76         Returns: tuple (ordered dict of {test_dir: PathData}, total estimated runtime)
77         """
78
79         class PathData(object):
80             def __init__(self, path):
81                 self.path = path
82                 self.time = 0
83                 self.tests = []
84
85         by_dir = OrderedDict()
86         total_time = 0
87
88         for i, (test_type, test_path, tests) in enumerate(manifest_items):
89             test_dir = tuple(os.path.split(test_path)[0].split(os.path.sep)[:3])
90
91             if not test_dir in by_dir:
92                 by_dir[test_dir] = PathData(test_dir)
93
94             data = by_dir[test_dir]
95             time = sum(test.default_timeout if test.timeout !=
96                        "long" else test.long_timeout for test in tests)
97             data.time += time
98             total_time += time
99             data.tests.append((test_type, test_path, tests))
100
101         return by_dir, total_time
102
103     def _maybe_remove(self, chunks, i, direction):
104         """Trial removing a chunk from one chunk to an adjacent one.
105
106         :param chunks: - the list of all chunks
107         :param i: - the chunk index in the list of chunks to try removing from
108         :param direction: either "next" if we are going to move from the end to
109                           the subsequent chunk, or "prev" if we are going to move
110                           from the start into the previous chunk.
111
112         :returns bool: Did a chunk get moved?"""
113         source_chunk = chunks[i]
114         if direction == "next":
115             target_chunk = chunks[i+1]
116             path_index = -1
117             move_func = lambda: target_chunk.appendleft(source_chunk.pop())
118         elif direction == "prev":
119             target_chunk = chunks[i-1]
120             path_index = 0
121             move_func = lambda: target_chunk.append(source_chunk.popleft())
122         else:
123             raise ValueError("Unexpected move direction %s" % direction)
124
125         return self._maybe_move(source_chunk, target_chunk, path_index, move_func)
126
127     def _maybe_add(self, chunks, i, direction):
128         """Trial adding a chunk from one chunk to an adjacent one.
129
130         :param chunks: - the list of all chunks
131         :param i: - the chunk index in the list of chunks to try adding to
132         :param direction: either "next" if we are going to remove from the
133                           the subsequent chunk, or "prev" if we are going to remove
134                           from the the previous chunk.
135
136         :returns bool: Did a chunk get moved?"""
137         target_chunk = chunks[i]
138         if direction == "next":
139             source_chunk = chunks[i+1]
140             path_index = 0
141             move_func = lambda: target_chunk.append(source_chunk.popleft())
142         elif direction == "prev":
143             source_chunk = chunks[i-1]
144             path_index = -1
145             move_func = lambda: target_chunk.appendleft(source_chunk.pop())
146         else:
147             raise ValueError("Unexpected move direction %s" % direction)
148
149         return self._maybe_move(source_chunk, target_chunk, path_index, move_func)
150
151     def _maybe_move(self, source_chunk, target_chunk, path_index, move_func):
152         """Move from one chunk to another, assess the change in badness,
153         and keep the move iff it decreases the badness score.
154
155         :param source_chunk: chunk to move from
156         :param target_chunk: chunk to move to
157         :param path_index: 0 if we are moving from the start or -1 if we are moving from the
158                            end
159         :param move_func: Function that actually moves between chunks"""
160         if len(source_chunk.paths) <= 1:
161             return False
162
163         move_time = source_chunk.paths[path_index].time
164
165         new_source_badness = self._badness(source_chunk.time - move_time)
166         new_target_badness = self._badness(target_chunk.time + move_time)
167
168         delta_badness = ((new_source_badness + new_target_badness) -
169                          (source_chunk.badness + target_chunk.badness))
170         if delta_badness < 0:
171             move_func()
172             return True
173
174         return False
175
176     def _badness(self, time):
177         """Metric of badness for a specific chunk
178
179         :param time: the time for a specific chunk"""
180         return (time - self.expected_time)**2
181
182     def _get_chunk(self, manifest_items):
183         by_dir, total_time = self._group_by_directory(manifest_items)
184
185         if len(by_dir) < self.total_chunks:
186             raise ValueError("Tried to split into %i chunks, but only %i subdirectories included" % (
187                 self.total_chunks, len(by_dir)))
188
189         self.expected_time = float(total_time) / self.total_chunks
190
191         chunks = self._create_initial_chunks(by_dir)
192
193         while True:
194             # Move a test from one chunk to the next until doing so no longer
195             # reduces the badness
196             got_improvement = self._update_chunks(chunks)
197             if not got_improvement:
198                 break
199
200         self.logger.debug(self.expected_time)
201         for i, chunk in chunks.iteritems():
202             self.logger.debug("%i: %i, %i" % (i + 1, chunk.time, chunk.badness))
203
204         assert self._all_tests(by_dir) == self._chunked_tests(chunks)
205
206         return self._get_tests(chunks)
207
208     @staticmethod
209     def _all_tests(by_dir):
210         """Return a set of all tests in the manifest from a grouping by directory"""
211         return set(x[0] for item in by_dir.itervalues()
212                    for x in item.tests)
213
214     @staticmethod
215     def _chunked_tests(chunks):
216         """Return a set of all tests in the manifest from the chunk list"""
217         return set(x[0] for chunk in chunks.itervalues()
218                    for path in chunk.paths
219                    for x in path.tests)
220
221
222     def _create_initial_chunks(self, by_dir):
223         """Create an initial unbalanced list of chunks.
224
225         :param by_dir: All tests in the manifest grouped by subdirectory
226         :returns list: A list of Chunk objects"""
227
228         class Chunk(object):
229             def __init__(self, paths, index):
230                 """List of PathData objects that together form a single chunk of
231                 tests"""
232                 self.paths = deque(paths)
233                 self.time = sum(item.time for item in paths)
234                 self.index = index
235
236             def appendleft(self, path):
237                 """Add a PathData object to the start of the chunk"""
238                 self.paths.appendleft(path)
239                 self.time += path.time
240
241             def append(self, path):
242                 """Add a PathData object to the end of the chunk"""
243                 self.paths.append(path)
244                 self.time += path.time
245
246             def pop(self):
247                 """Remove PathData object from the end of the chunk"""
248                 assert len(self.paths) > 1
249                 self.time -= self.paths[-1].time
250                 return self.paths.pop()
251
252             def popleft(self):
253                 """Remove PathData object from the start of the chunk"""
254                 assert len(self.paths) > 1
255                 self.time -= self.paths[0].time
256                 return self.paths.popleft()
257
258             @property
259             def badness(self_):
260                 """Badness metric for this chunk"""
261                 return self._badness(self_.time)
262
263         initial_size = len(by_dir) / self.total_chunks
264         chunk_boundaries = [initial_size * i
265                             for i in xrange(self.total_chunks)] + [len(by_dir)]
266
267         chunks = OrderedDict()
268         for i, lower in enumerate(chunk_boundaries[:-1]):
269             upper = chunk_boundaries[i + 1]
270             paths = by_dir.values()[lower:upper]
271             chunks[i] = Chunk(paths, i)
272
273         assert self._all_tests(by_dir) == self._chunked_tests(chunks)
274
275         return chunks
276
277     def _update_chunks(self, chunks):
278         """Run a single iteration of the chunk update algorithm.
279
280         :param chunks: - List of chunks
281         """
282         #TODO: consider replacing this with a heap
283         sorted_chunks = sorted(chunks.values(), key=lambda x:-x.badness)
284         got_improvement = False
285         for chunk in sorted_chunks:
286             if chunk.time < self.expected_time:
287                 f = self._maybe_add
288             else:
289                 f = self._maybe_remove
290
291             if chunk.index == 0:
292                 order = ["next"]
293             elif chunk.index == self.total_chunks - 1:
294                 order = ["prev"]
295             else:
296                 if chunk.time < self.expected_time:
297                     # First try to add a test from the neighboring chunk with the
298                     # greatest total time
299                     if chunks[chunk.index + 1].time > chunks[chunk.index - 1].time:
300                         order = ["next", "prev"]
301                     else:
302                         order = ["prev", "next"]
303                 else:
304                     # First try to remove a test and add to the neighboring chunk with the
305                     # lowest total time
306                     if chunks[chunk.index + 1].time > chunks[chunk.index - 1].time:
307                         order = ["prev", "next"]
308                     else:
309                         order = ["next", "prev"]
310
311             for direction in order:
312                 if f(chunks, chunk.index, direction):
313                     got_improvement = True
314                     break
315
316             if got_improvement:
317                 break
318
319         return got_improvement
320
321     def _get_tests(self, chunks):
322         """Return the list of tests corresponding to the chunk number we are running.
323
324         :param chunks: List of chunks"""
325         tests = []
326         for path in chunks[self.chunk_number - 1].paths:
327             tests.extend(path.tests)
328
329         return tests
330
331     def __call__(self, manifest_iter):
332         manifest = list(manifest_iter)
333         tests = self._get_chunk(manifest)
334         for item in tests:
335             yield item
336
337
338 class TestFilter(object):
339     def __init__(self, test_manifests, include=None, exclude=None, manifest_path=None):
340         if manifest_path is not None and include is None:
341             self.manifest = manifestinclude.get_manifest(manifest_path)
342         else:
343             self.manifest = manifestinclude.IncludeManifest.create()
344             self.manifest.set_defaults()
345
346         if include:
347             self.manifest.set("skip", "true")
348             for item in include:
349                 self.manifest.add_include(test_manifests, item)
350
351         if exclude:
352             for item in exclude:
353                 self.manifest.add_exclude(test_manifests, item)
354
355     def __call__(self, manifest_iter):
356         for test_type, test_path, tests in manifest_iter:
357             include_tests = set()
358             for test in tests:
359                 if self.manifest.include(test):
360                     include_tests.add(test)
361
362             if include_tests:
363                 yield test_type, test_path, include_tests
364
365 class TagFilter(object):
366     def __init__(self, tags):
367         self.tags = set(tags)
368
369     def __call__(self, test_iter):
370         for test in test_iter:
371             if test.tags & self.tags:
372                 yield test
373
374 class ManifestLoader(object):
375     def __init__(self, test_paths, force_manifest_update=False):
376         do_delayed_imports()
377         self.test_paths = test_paths
378         self.force_manifest_update = force_manifest_update
379         self.logger = structured.get_default_logger()
380         if self.logger is None:
381             self.logger = structured.structuredlog.StructuredLogger("ManifestLoader")
382
383     def load(self):
384         rv = {}
385         for url_base, paths in self.test_paths.iteritems():
386             manifest_file = self.load_manifest(url_base=url_base,
387                                                **paths)
388             path_data = {"url_base": url_base}
389             path_data.update(paths)
390             rv[manifest_file] = path_data
391         return rv
392
393     def create_manifest(self, manifest_path, tests_path, url_base="/"):
394         self.update_manifest(manifest_path, tests_path, url_base, recreate=True)
395
396     def update_manifest(self, manifest_path, tests_path, url_base="/",
397                         recreate=False):
398         self.logger.info("Updating test manifest %s" % manifest_path)
399
400         json_data = None
401         if not recreate:
402             try:
403                 with open(manifest_path) as f:
404                     json_data = json.load(f)
405             except IOError:
406                 #If the existing file doesn't exist just create one from scratch
407                 pass
408
409         if not json_data:
410             manifest_file = manifest.Manifest(url_base)
411         else:
412             try:
413                 manifest_file = manifest.Manifest.from_json(tests_path, json_data)
414             except manifest.ManifestVersionMismatch:
415                 manifest_file = manifest.Manifest(url_base)
416
417         manifest_update.update(tests_path, manifest_file, True)
418
419         manifest.write(manifest_file, manifest_path)
420
421     def load_manifest(self, tests_path, metadata_path, url_base="/"):
422         manifest_path = os.path.join(metadata_path, "MANIFEST.json")
423         if (not os.path.exists(manifest_path) or
424             self.force_manifest_update):
425             self.update_manifest(manifest_path, tests_path, url_base)
426         manifest_file = manifest.load(tests_path, manifest_path)
427         if manifest_file.url_base != url_base:
428             self.logger.info("Updating url_base in manifest from %s to %s" % (manifest_file.url_base,
429                                                                               url_base))
430             manifest_file.url_base = url_base
431             manifest.write(manifest_file, manifest_path)
432
433         return manifest_file
434
435 def iterfilter(filters, iter):
436     for f in filters:
437         iter = f(iter)
438     for item in iter:
439         yield item
440
441 class TestLoader(object):
442     def __init__(self,
443                  test_manifests,
444                  test_types,
445                  run_info,
446                  manifest_filters=None,
447                  meta_filters=None,
448                  chunk_type="none",
449                  total_chunks=1,
450                  chunk_number=1,
451                  include_https=True):
452
453         self.test_types = test_types
454         self.run_info = run_info
455
456         self.manifest_filters = manifest_filters if manifest_filters is not None else []
457         self.meta_filters = meta_filters if meta_filters is not None else []
458
459         self.manifests = test_manifests
460         self.tests = None
461         self.disabled_tests = None
462         self.include_https = include_https
463
464         self.chunk_type = chunk_type
465         self.total_chunks = total_chunks
466         self.chunk_number = chunk_number
467
468         self.chunker = {"none": Unchunked,
469                         "hash": HashChunker,
470                         "dir_hash": DirectoryHashChunker,
471                         "equal_time": EqualTimeChunker}[chunk_type](total_chunks,
472                                                                     chunk_number)
473
474         self._test_ids = None
475
476         self.directory_manifests = {}
477
478         self._load_tests()
479
480     @property
481     def test_ids(self):
482         if self._test_ids is None:
483             self._test_ids = []
484             for test_dict in [self.disabled_tests, self.tests]:
485                 for test_type in self.test_types:
486                     self._test_ids += [item.id for item in test_dict[test_type]]
487         return self._test_ids
488
489     def get_test(self, manifest_test, inherit_metadata, test_metadata):
490         if test_metadata is not None:
491             inherit_metadata.append(test_metadata)
492             test_metadata = test_metadata.get_test(manifest_test.id)
493
494         return wpttest.from_manifest(manifest_test, inherit_metadata, test_metadata)
495
496     def load_dir_metadata(self, test_manifest, metadata_path, test_path):
497         rv = []
498         path_parts = os.path.dirname(test_path).split(os.path.sep)
499         for i in xrange(1,len(path_parts) + 1):
500             path = os.path.join(metadata_path, os.path.sep.join(path_parts[:i]), "__dir__.ini")
501             if path not in self.directory_manifests:
502                 self.directory_manifests[path] = manifestexpected.get_dir_manifest(path,
503                                                                                    self.run_info)
504             manifest = self.directory_manifests[path]
505             if manifest is not None:
506                 rv.append(manifest)
507         return rv
508
509     def load_metadata(self, test_manifest, metadata_path, test_path):
510         inherit_metadata = self.load_dir_metadata(test_manifest, metadata_path, test_path)
511         test_metadata = manifestexpected.get_manifest(
512             metadata_path, test_path, test_manifest.url_base, self.run_info)
513         return inherit_metadata, test_metadata
514
515     def iter_tests(self):
516         manifest_items = []
517
518         for manifest in sorted(self.manifests.keys(), key=lambda x:x.url_base):
519             manifest_iter = iterfilter(self.manifest_filters,
520                                        manifest.itertypes(*self.test_types))
521             manifest_items.extend(manifest_iter)
522
523         if self.chunker is not None:
524             manifest_items = self.chunker(manifest_items)
525
526         for test_type, test_path, tests in manifest_items:
527             manifest_file = iter(tests).next().manifest
528             metadata_path = self.manifests[manifest_file]["metadata_path"]
529             inherit_metadata, test_metadata = self.load_metadata(manifest_file, metadata_path, test_path)
530
531             for test in iterfilter(self.meta_filters,
532                                    self.iter_wpttest(inherit_metadata, test_metadata, tests)):
533                 yield test_path, test_type, test
534
535     def iter_wpttest(self, inherit_metadata, test_metadata, tests):
536         for manifest_test in tests:
537             yield self.get_test(manifest_test, inherit_metadata, test_metadata)
538
539     def _load_tests(self):
540         """Read in the tests from the manifest file and add them to a queue"""
541         tests = {"enabled":defaultdict(list),
542                  "disabled":defaultdict(list)}
543
544         for test_path, test_type, test in self.iter_tests():
545             enabled = not test.disabled()
546             if not self.include_https and test.environment["protocol"] == "https":
547                 enabled = False
548             key = "enabled" if enabled else "disabled"
549             tests[key][test_type].append(test)
550
551         self.tests = tests["enabled"]
552         self.disabled_tests = tests["disabled"]
553
554     def groups(self, test_types, chunk_type="none", total_chunks=1, chunk_number=1):
555         groups = set()
556
557         for test_type in test_types:
558             for test in self.tests[test_type]:
559                 group = test.url.split("/")[1]
560                 groups.add(group)
561
562         return groups
563
564
565 class TestSource(object):
566     __metaclass__ = ABCMeta
567
568     def __init__(self, test_queue):
569         self.test_queue = test_queue
570         self.current_group = None
571         self.current_metadata = None
572
573     @abstractmethod
574     #@classmethod (doesn't compose with @abstractmethod)
575     def make_queue(cls, tests, **kwargs):
576         pass
577
578     def group(self):
579         if not self.current_group or len(self.current_group) == 0:
580             try:
581                 self.current_group, self.current_metadata = self.test_queue.get(block=False)
582             except Empty:
583                 return None, None
584         return self.current_group, self.current_metadata
585
586
587 class GroupedSource(TestSource):
588     @classmethod
589     def new_group(cls, state, test, **kwargs):
590         raise NotImplementedError
591
592     @classmethod
593     def make_queue(cls, tests, **kwargs):
594         test_queue = Queue()
595         groups = []
596
597         state = {}
598
599         for test in tests:
600             if cls.new_group(state, test, **kwargs):
601                 groups.append((deque(), {}))
602
603             group, metadata = groups[-1]
604             group.append(test)
605             test.update_metadata(metadata)
606
607         for item in groups:
608             test_queue.put(item)
609         return test_queue
610
611
612 class SingleTestSource(TestSource):
613     @classmethod
614     def make_queue(cls, tests, **kwargs):
615         test_queue = Queue()
616         processes = kwargs["processes"]
617         queues = [deque([]) for _ in xrange(processes)]
618         metadatas = [{} for _ in xrange(processes)]
619         for test in tests:
620             idx = hash(test.id) % processes
621             group = queues[idx]
622             metadata = metadatas[idx]
623             group.append(test)
624             test.update_metadata(metadata)
625
626         for item in zip(queues, metadatas):
627             test_queue.put(item)
628
629         return test_queue
630
631
632 class PathGroupedSource(GroupedSource):
633     @classmethod
634     def new_group(cls, state, test, **kwargs):
635         depth = kwargs.get("depth")
636         if depth is True:
637             depth = None
638         path = urlparse.urlsplit(test.url).path.split("/")[1:-1][:depth]
639         rv = path != state.get("prev_path")
640         state["prev_path"] = path
641         return rv