results.webkit.org: Abstract archive storage solutions
authorjbedard@apple.com <jbedard@apple.com@268f45cc-cd09-0410-ab3c-d52691b4dbfc>
Mon, 10 Feb 2020 16:10:22 +0000 (16:10 +0000)
committerjbedard@apple.com <jbedard@apple.com@268f45cc-cd09-0410-ab3c-d52691b4dbfc>
Mon, 10 Feb 2020 16:10:22 +0000 (16:10 +0000)
https://bugs.webkit.org/show_bug.cgi?id=207277

Rubber-stamped by Aakash Jain.

* resultsdbpy/resultsdbpy/model/archive_context.py:
(ArchiveContext):
(ArchiveContext.ArchiveMetaDataByCommit.unpack):
(ArchiveContext.__init__): Add archiver member.
(ArchiveContext.__enter__): archiver is a context manager.
(ArchiveContext.__exit__): Ditto.
(ArchiveContext.register): The archiver member owns saving an archive.
(ArchiveContext.find_archive): The archiver member owns retrieving an archive.
(ArchiveContext.ArchiveChunks): Move to CassandraArchiver.
* resultsdbpy/resultsdbpy/model/archiver.py: Added.
(Archiver): Base class for all Archiving systems.
(Archiver.archive_digest): Given a file-like archive, calculate it's digest.
(Archiver.archive_size): Given a file-like archive, calculate it's size.
(Archiver.save):
(Archiver.retrieve):
* resultsdbpy/resultsdbpy/model/cassandra_archiver.py: Added.
(CassandraArchiver):
(CassandraArchiver.ArchiveChunks): Moved from ArchiveContext.
(CassandraArchiver.__init__):
(CassandraArchiver.__enter__):
(CassandraArchiver.__exit__):
(CassandraArchiver.save): Split the provided archive into chunks for saving, moved
from ArchiveContext.
(CassandraArchiver.retrieve): Retrieve an archive with the provided hash, moved
from ArchiveContext.

git-svn-id: https://svn.webkit.org/repository/webkit/trunk@256172 268f45cc-cd09-0410-ab3c-d52691b4dbfc

Tools/ChangeLog
Tools/resultsdbpy/resultsdbpy/model/archive_context.py
Tools/resultsdbpy/resultsdbpy/model/archiver.py [new file with mode: 0644]
Tools/resultsdbpy/resultsdbpy/model/cassandra_archiver.py [new file with mode: 0644]

index f908e53..e1de693 100644 (file)
@@ -1,3 +1,36 @@
+2020-02-10  Jonathan Bedard  <jbedard@apple.com>
+
+        results.webkit.org: Abstract archive storage solutions
+        https://bugs.webkit.org/show_bug.cgi?id=207277
+
+        Rubber-stamped by Aakash Jain.
+
+        * resultsdbpy/resultsdbpy/model/archive_context.py:
+        (ArchiveContext):
+        (ArchiveContext.ArchiveMetaDataByCommit.unpack):
+        (ArchiveContext.__init__): Add archiver member.
+        (ArchiveContext.__enter__): archiver is a context manager.
+        (ArchiveContext.__exit__): Ditto.
+        (ArchiveContext.register): The archiver member owns saving an archive.
+        (ArchiveContext.find_archive): The archiver member owns retrieving an archive.
+        (ArchiveContext.ArchiveChunks): Move to CassandraArchiver.
+        * resultsdbpy/resultsdbpy/model/archiver.py: Added.
+        (Archiver): Base class for all Archiving systems.
+        (Archiver.archive_digest): Given a file-like archive, calculate it's digest.
+        (Archiver.archive_size): Given a file-like archive, calculate it's size.
+        (Archiver.save):
+        (Archiver.retrieve):
+        * resultsdbpy/resultsdbpy/model/cassandra_archiver.py: Added.
+        (CassandraArchiver):
+        (CassandraArchiver.ArchiveChunks): Moved from ArchiveContext.
+        (CassandraArchiver.__init__):
+        (CassandraArchiver.__enter__):
+        (CassandraArchiver.__exit__):
+        (CassandraArchiver.save): Split the provided archive into chunks for saving, moved
+        from ArchiveContext.
+        (CassandraArchiver.retrieve): Retrieve an archive with the provided hash, moved
+        from ArchiveContext.
+
 2020-02-09  Lauro Moura  <lmoura@igalia.com>
 
         [GTK][WPE] Expose allowTopNavigationToDataURL
index 48b3a4d..5b2cb9e 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (C) 2019 Apple Inc. All rights reserved.
+# Copyright (C) 2019-2020 Apple Inc. All rights reserved.
 #
 # Redistribution and use in source and binary forms, with or without
 # modification, are permitted provided that the following conditions
@@ -21,7 +21,6 @@
 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
 import calendar
-import hashlib
 import io
 import json
 import time
@@ -33,6 +32,8 @@ from collections import OrderedDict
 from datetime import datetime
 from resultsdbpy.controller.commit import Commit
 from resultsdbpy.controller.configuration import Configuration
+from resultsdbpy.model.archiver import Archiver
+from resultsdbpy.model.cassandra_archiver import CassandraArchiver
 from resultsdbpy.model.commit_context import CommitContext
 from resultsdbpy.model.configuration_context import ClusteredByConfiguration
 from resultsdbpy.model.upload_context import UploadContext
@@ -48,7 +49,6 @@ def _get_time(input_time):
 
 class ArchiveContext(object):
     DEFAULT_LIMIT = 10
-    CHUNK_SIZE = 10 * 1024 * 1024  # Cassandra doesn't do well with data blobs of more than 10 MB
     MEMORY_LIMIT = 2 * 1024 * 1024 * 1024  # Don't allow more than 2 gigs of archives in memory at one time
 
     class ArchiveMetaDataByCommit(ClusteredByConfiguration):
@@ -69,14 +69,6 @@ class ArchiveContext(object):
                 size=self.size,
             )
 
-    # According to https://cwiki.apache.org/confluence/display/CASSANDRA2/CassandraLimitations, we should shard
-    # large data blobs.
-    class ArchiveChunks(Model):
-        __table_name__ = 'archive_chunks_02'
-        digest = columns.Text(partition_key=True, required=True)
-        index = columns.Integer(primary_key=True, required=True)
-        chunk = columns.Blob(required=True)
-
     @classmethod
     def assert_zipfile(cls, archive):
         if not isinstance(archive, io.BytesIO):
@@ -93,20 +85,22 @@ class ArchiveContext(object):
         self.configuration_context = configuration_context
         self.commit_context = commit_context
         self.cassandra = self.configuration_context.cassandra
+        self.archiver = CassandraArchiver(self.cassandra)
         self.ttl_seconds = ttl_seconds
 
         with self:
             self.cassandra.create_table(self.ArchiveMetaDataByCommit)
-            self.cassandra.create_table(self.ArchiveChunks)
             self.cassandra.create_table(UploadContext.SuitesByConfiguration)
 
     def __enter__(self):
         self.configuration_context.__enter__()
         self.commit_context.__enter__()
+        self.archiver.__enter__()
 
     def __exit__(self, *args, **kwargs):
         self.commit_context.__exit__(*args, **kwargs)
         self.configuration_context.__exit__(*args, **kwargs)
+        self.archiver.__exit__(*args, **kwargs)
 
     def register(self, archive, configuration, commits, suite, timestamp=None):
         self.assert_zipfile(archive)
@@ -122,27 +116,14 @@ class ArchiveContext(object):
                 self.configuration_context.insert_row_with_configuration(
                     UploadContext.SuitesByConfiguration.__table_name__, configuration, suite=suite, branch=branch, ttl=ttl,
                 )
-
-                # Breaking up the archive into chunks
-                index = 0
-                size = len(archive.getvalue())
-                digest = hashlib.md5(archive.getvalue()).hexdigest()
-                archive.seek(0)
-                while size > index * self.CHUNK_SIZE:
-                    self.cassandra.insert_row(
-                        self.ArchiveChunks.__table_name__,
-                        digest=digest, index=index,
-                        chunk=archive.read(self.CHUNK_SIZE),
-                        ttl=ttl,
-                    )
-                    index += 1
+                digest = self.archiver.save(archive, retain_for=ttl)
 
                 self.configuration_context.insert_row_with_configuration(
                     self.ArchiveMetaDataByCommit.__table_name__, configuration=configuration, suite=suite,
                     branch=branch, uuid=uuid, ttl=ttl,
                     sdk=configuration.sdk or '?', start_time=timestamp,
                     digest=digest,
-                    size=size,
+                    size=Archiver.archive_size(archive),
                 )
 
     def find_archive(
@@ -186,32 +167,16 @@ class ArchiveContext(object):
                         continue
 
                     if not archive_by_digest.get(value.get('digest')):
-                        rows = self.cassandra.select_from_table(
-                            self.ArchiveChunks.__table_name__,
-                            digest=value.get('digest'),
-                            limit=1 + int(value.get('size', 0) / self.CHUNK_SIZE),
-                        )
-                        if len(rows) == 0:
+                        archive = self.archiver.retrieve(value.get('digest'), value.get('size', None))
+                        if not archive:
                             continue
-
-                        digest = hashlib.md5()
-                        archive = io.BytesIO()
-                        archive_size = 0
-                        for row in rows:
-                            archive_size += len(row.chunk)
-                            digest.update(row.chunk)
-                            archive.write(row.chunk)
-
-                        if archive_size != value.get('size', 0) or value.get('digest', '') != digest.hexdigest():
-                            raise RuntimeError('Failed to reconstruct archive from chunks')
-
                         archive_by_digest[value.get('digest')] = archive
 
                     archive_by_digest.get(value.get('digest')).seek(0)
                     result.setdefault(config, [])
                     result[config].append(dict(
                         archive=archive_by_digest.get(value.get('digest')),
-                        digest=digest.hexdigest(),
+                        digest=value.get('digest'),
                         uuid=value['uuid'],
                         start_time=value['start_time'],
                     ))
diff --git a/Tools/resultsdbpy/resultsdbpy/model/archiver.py b/Tools/resultsdbpy/resultsdbpy/model/archiver.py
new file mode 100644 (file)
index 0000000..621b391
--- /dev/null
@@ -0,0 +1,45 @@
+# Copyright (C) 2020 Apple Inc. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+# 1.  Redistributions of source code must retain the above copyright
+#     notice, this list of conditions and the following disclaimer.
+# 2.  Redistributions in binary form must reproduce the above copyright
+#     notice, this list of conditions and the following disclaimer in the
+#     documentation and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS "AS IS" AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+# DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS BE LIABLE FOR
+# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import os
+import hashlib
+
+
+class Archiver(object):
+    @classmethod
+    def archive_digest(cls, archive):
+        digest = hashlib.md5(archive.getvalue()).hexdigest()
+        archive.seek(0)
+        return digest
+
+    @classmethod
+    def archive_size(cls, archive):
+        archive.seek(0, os.SEEK_END)
+        size = archive.tell()
+        archive.seek(0)
+        return size
+
+    def save(self, archive, retain_for=None):
+        raise NotImplementedError
+
+    def retrieve(self, digest, size=None):
+        raise NotImplementedError
diff --git a/Tools/resultsdbpy/resultsdbpy/model/cassandra_archiver.py b/Tools/resultsdbpy/resultsdbpy/model/cassandra_archiver.py
new file mode 100644 (file)
index 0000000..54f289c
--- /dev/null
@@ -0,0 +1,85 @@
+# Copyright (C) 2020 Apple Inc. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+# 1.  Redistributions of source code must retain the above copyright
+#     notice, this list of conditions and the following disclaimer.
+# 2.  Redistributions in binary form must reproduce the above copyright
+#     notice, this list of conditions and the following disclaimer in the
+#     documentation and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS "AS IS" AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+# DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS BE LIABLE FOR
+# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import io
+
+from cassandra.cqlengine import columns
+from cassandra.cqlengine.models import Model
+from resultsdbpy.model.archiver import Archiver
+
+
+class CassandraArchiver(Archiver):
+    MAX_ARCHIVE = 500 * 1024 * 1024  # Archives should be smaller than 500 MB
+    CHUNK_SIZE = 10 * 1024 * 1024    # Cassandra doesn't do well with data blobs of more than 10 MB
+
+    # According to https://cwiki.apache.org/confluence/display/CASSANDRA2/CassandraLimitations, we should shard
+    # large data blobs.
+    class ArchiveChunks(Model):
+        __table_name__ = 'archive_chunks_02'
+        digest = columns.Text(partition_key=True, required=True)
+        index = columns.Integer(primary_key=True, required=True)
+        chunk = columns.Blob(required=True)
+
+    def __init__(self, cassandra):
+        self.cassandra = cassandra
+        with self:
+            self.cassandra.create_table(self.ArchiveChunks)
+
+    def __enter__(self):
+        self.cassandra.__enter__()
+
+    def __exit__(self, *args, **kwargs):
+        self.cassandra.__exit__(*args, **kwargs)
+
+    def save(self, archive, retain_for=None):
+        index = 0
+        size = self.archive_size(archive)
+        if size > self.MAX_ARCHIVE:
+            raise ValueError('Archive larger than 500 MB')
+        digest = self.archive_digest(archive)
+        while size > index * self.CHUNK_SIZE:
+            self.cassandra.insert_row(
+                self.ArchiveChunks.__table_name__,
+                digest=digest, index=index,
+                chunk=archive.read(self.CHUNK_SIZE),
+                ttl=retain_for,
+            )
+            index += 1
+        return digest
+
+    def retrieve(self, digest, size=None):
+        rows = self.cassandra.select_from_table(
+            self.ArchiveChunks.__table_name__,
+            digest=digest,
+            limit=1 + int(size or self.MAX_ARCHIVE / self.CHUNK_SIZE),
+        )
+        if len(rows) == 0:
+            return None
+
+        archive = io.BytesIO()
+        for row in rows:
+            archive.write(row.chunk)
+
+        if (size and self.archive_size(archive) != size) or digest != self.archive_digest(archive):
+            raise RuntimeError('Failed to reconstruct archive from chunks')
+
+        return archive