Make NetworkCache::traverse faster
[WebKit-https.git] / Source / WebKit2 / NetworkProcess / cache / NetworkCacheStorage.cpp
index 4db11a2..5f0b784 100644 (file)
@@ -32,6 +32,7 @@
 #include "NetworkCacheCoders.h"
 #include "NetworkCacheFileSystem.h"
 #include "NetworkCacheIOChannel.h"
+#include <condition_variable>
 #include <wtf/RandomNumber.h>
 #include <wtf/RunLoop.h>
 #include <wtf/text/CString.h>
@@ -68,11 +69,26 @@ struct Storage::WriteOperation {
         , mappedBodyHandler(mappedBodyHandler)
     { }
     
-    Record record;
-    MappedBodyHandler mappedBodyHandler;
+    const Record record;
+    const MappedBodyHandler mappedBodyHandler;
+
     std::atomic<unsigned> activeCount { 0 };
 };
 
+struct Storage::TraverseOperation {
+    TraverseOperation(TraverseFlags flags, const TraverseHandler& handler)
+        : flags(flags)
+        , handler(handler)
+    { }
+
+    const TraverseFlags flags;
+    const TraverseHandler handler;
+
+    std::mutex activeMutex;
+    std::condition_variable activeCondition;
+    unsigned activeCount { 0 };
+};
+
 std::unique_ptr<Storage> Storage::open(const String& cachePath)
 {
     ASSERT(RunLoop::isMain());
@@ -653,7 +669,8 @@ void Storage::retrieve(const Key& key, unsigned priority, RetrieveCompletionHand
     if (retrieveFromMemory(m_activeWriteOperations, key, completionHandler))
         return;
 
-    m_pendingReadOperationsByPriority[priority].prepend(new ReadOperation { key, WTF::move(completionHandler) } );
+    auto readOperation = std::make_unique<ReadOperation>(key, WTF::move(completionHandler));
+    m_pendingReadOperationsByPriority[priority].prepend(WTF::move(readOperation));
     dispatchPendingReadOperations();
 }
 
@@ -665,7 +682,8 @@ void Storage::store(const Record& record, MappedBodyHandler&& mappedBodyHandler)
     if (!m_capacity)
         return;
 
-    m_pendingWriteOperations.prepend(new WriteOperation { record, WTF::move(mappedBodyHandler) });
+    auto writeOperation = std::make_unique<WriteOperation>(record, WTF::move(mappedBodyHandler));
+    m_pendingWriteOperations.prepend(WTF::move(writeOperation));
 
     // Add key to the filter already here as we do lookups from the pending operations too.
     addToRecordFilter(record.key);
@@ -685,37 +703,65 @@ void Storage::traverse(TraverseFlags flags, TraverseHandler&& traverseHandler)
     ASSERT(RunLoop::isMain());
     ASSERT(traverseHandler);
     // Avoid non-thread safe std::function copies.
-    auto* traverseHandlerPtr = new TraverseHandler(WTF::move(traverseHandler));
-    
-    ioQueue().dispatch([this, flags, traverseHandlerPtr] {
-        auto& traverseHandler = *traverseHandlerPtr;
-        traverseRecordsFiles(recordsPath(), [this, flags, &traverseHandler](const String& fileName, const String& partitionPath) {
+
+    auto traverseOperationPtr = std::make_unique<TraverseOperation>(flags, WTF::move(traverseHandler));
+    auto& traverseOperation = *traverseOperationPtr;
+    m_activeTraverseOperations.add(WTF::move(traverseOperationPtr));
+
+    ioQueue().dispatch([this, &traverseOperation] {
+        traverseRecordsFiles(recordsPath(), [this, &traverseOperation](const String& fileName, const String& partitionPath) {
             if (fileName.length() != Key::hashStringLength())
                 return;
             auto recordPath = WebCore::pathByAppendingComponent(partitionPath, fileName);
 
-            RecordInfo info;
-            if (flags & TraverseFlag::ComputeWorth)
-                info.worth = computeRecordWorth(fileTimes(recordPath));
-            if (flags & TraverseFlag::ShareCount)
-                info.bodyShareCount = m_blobStorage.shareCount(bodyPathForRecordPath(recordPath));
+            double worth = -1;
+            if (traverseOperation.flags & TraverseFlag::ComputeWorth)
+                worth = computeRecordWorth(fileTimes(recordPath));
+            unsigned bodyShareCount = 0;
+            if (traverseOperation.flags & TraverseFlag::ShareCount)
+                bodyShareCount = m_blobStorage.shareCount(bodyPathForRecordPath(recordPath));
+
+            std::unique_lock<std::mutex> lock(traverseOperation.activeMutex);
+            ++traverseOperation.activeCount;
 
             auto channel = IOChannel::open(recordPath, IOChannel::Type::Read);
-            // FIXME: Traversal is slower than it should be due to lack of parallelism.
-            channel->readSync(0, std::numeric_limits<size_t>::max(), nullptr, [this, &traverseHandler, &info](Data& fileData, int) {
+            channel->read(0, std::numeric_limits<size_t>::max(), nullptr, [this, &traverseOperation, worth, bodyShareCount](Data& fileData, int) {
                 RecordMetaData metaData;
                 Data headerData;
                 if (decodeRecordHeader(fileData, metaData, headerData)) {
-                    Record record { metaData.key, std::chrono::system_clock::time_point(metaData.epochRelativeTimeStamp), headerData, { } };
-                    info.bodySize = metaData.bodySize;
-                    info.bodyHash = String::fromUTF8(SHA1::hexDigest(metaData.bodyHash));
-                    traverseHandler(&record, info);
+                    Record record {
+                        metaData.key,
+                        std::chrono::system_clock::time_point(metaData.epochRelativeTimeStamp),
+                        headerData,
+                        { }
+                    };
+                    RecordInfo info {
+                        static_cast<size_t>(metaData.bodySize),
+                        worth,
+                        bodyShareCount,
+                        String::fromUTF8(SHA1::hexDigest(metaData.bodyHash))
+                    };
+                    traverseOperation.handler(&record, info);
                 }
+
+                std::lock_guard<std::mutex> lock(traverseOperation.activeMutex);
+                --traverseOperation.activeCount;
+                traverseOperation.activeCondition.notify_one();
             });
+
+            const unsigned maximumParallelReadCount = 5;
+            traverseOperation.activeCondition.wait(lock, [&traverseOperation] {
+                return traverseOperation.activeCount <= maximumParallelReadCount;
+            });
+        });
+        // Wait for all reads to finish.
+        std::unique_lock<std::mutex> lock(traverseOperation.activeMutex);
+        traverseOperation.activeCondition.wait(lock, [&traverseOperation] {
+            return !traverseOperation.activeCount;
         });
-        RunLoop::main().dispatch([this, traverseHandlerPtr] {
-            (*traverseHandlerPtr)(nullptr, { });
-            delete traverseHandlerPtr;
+        RunLoop::main().dispatch([this, &traverseOperation] {
+            traverseOperation.handler(nullptr, { });
+            m_activeTraverseOperations.remove(&traverseOperation);
         });
     });
 }