Make NetworkCache::traverse faster
authorantti@apple.com <antti@apple.com@268f45cc-cd09-0410-ab3c-d52691b4dbfc>
Sat, 27 Jun 2015 08:52:58 +0000 (08:52 +0000)
committerantti@apple.com <antti@apple.com@268f45cc-cd09-0410-ab3c-d52691b4dbfc>
Sat, 27 Jun 2015 08:52:58 +0000 (08:52 +0000)
https://bugs.webkit.org/show_bug.cgi?id=146354

Reviewed by Anders Carlsson.

* NetworkProcess/cache/NetworkCacheIOChannel.h:
(WebKit::NetworkCache::IOChannel::path):
(WebKit::NetworkCache::IOChannel::type):
* NetworkProcess/cache/NetworkCacheIOChannelCocoa.mm:
(WebKit::NetworkCache::IOChannel::open):
(WebKit::NetworkCache::IOChannel::read):
(WebKit::NetworkCache::IOChannel::write):
(WebKit::NetworkCache::IOChannel::readSync): Deleted.

    Not needed anymore.

* NetworkProcess/cache/NetworkCacheStorage.cpp:
(WebKit::NetworkCache::Storage::WriteOperation::WriteOperation):
(WebKit::NetworkCache::Storage::TraverseOperation::TraverseOperation):

    Add TraverseOperation, similar to Read/Write.

(WebKit::NetworkCache::Storage::open):
(WebKit::NetworkCache::Storage::traverse):

    Use async I/O.
    Use condition variable to allow maximum 5 parallel file reads.

* NetworkProcess/cache/NetworkCacheStorage.h:

git-svn-id: https://svn.webkit.org/repository/webkit/trunk@186025 268f45cc-cd09-0410-ab3c-d52691b4dbfc

Source/WebKit2/ChangeLog
Source/WebKit2/NetworkProcess/cache/NetworkCacheIOChannel.h
Source/WebKit2/NetworkProcess/cache/NetworkCacheIOChannelCocoa.mm
Source/WebKit2/NetworkProcess/cache/NetworkCacheIOChannelSoup.cpp
Source/WebKit2/NetworkProcess/cache/NetworkCacheStorage.cpp
Source/WebKit2/NetworkProcess/cache/NetworkCacheStorage.h

index d3891b9..312ffd4 100644 (file)
@@ -1,3 +1,35 @@
+2015-06-26  Antti Koivisto  <antti@apple.com>
+
+        Make NetworkCache::traverse faster
+        https://bugs.webkit.org/show_bug.cgi?id=146354
+
+        Reviewed by Anders Carlsson.
+
+        * NetworkProcess/cache/NetworkCacheIOChannel.h:
+        (WebKit::NetworkCache::IOChannel::path):
+        (WebKit::NetworkCache::IOChannel::type):
+        * NetworkProcess/cache/NetworkCacheIOChannelCocoa.mm:
+        (WebKit::NetworkCache::IOChannel::open):
+        (WebKit::NetworkCache::IOChannel::read):
+        (WebKit::NetworkCache::IOChannel::write):
+        (WebKit::NetworkCache::IOChannel::readSync): Deleted.
+
+            Not needed anymore.
+
+        * NetworkProcess/cache/NetworkCacheStorage.cpp:
+        (WebKit::NetworkCache::Storage::WriteOperation::WriteOperation):
+        (WebKit::NetworkCache::Storage::TraverseOperation::TraverseOperation):
+
+            Add TraverseOperation, similar to Read/Write.
+
+        (WebKit::NetworkCache::Storage::open):
+        (WebKit::NetworkCache::Storage::traverse):
+
+            Use async I/O.
+            Use condition variable to allow maximum 5 parallel file reads.
+
+        * NetworkProcess/cache/NetworkCacheStorage.h:
+
 2015-06-27  Carlos Garcia Campos  <cgarcia@igalia.com>
 
         [SOUP] NetworkCache: Make NetworkProcess::clearDiskCache actually clear the the resources depending on the date
index 29e84b2..414390a 100644 (file)
@@ -49,7 +49,6 @@ public:
     // Using nullptr as queue submits the result to the main queue.
     // FIXME: We should add WorkQueue::main() instead.
     void read(size_t offset, size_t, WorkQueue*, std::function<void (Data&, int error)>);
-    void readSync(size_t offset, size_t, WorkQueue*, std::function<void (Data&, int error)>);
     void write(size_t offset, const Data&, WorkQueue*, std::function<void (int error)>);
 
     const String& path() const { return m_path; }
index 4d143f4..ed0e4ae 100644 (file)
@@ -104,17 +104,6 @@ void IOChannel::read(size_t offset, size_t size, WorkQueue* queue, std::function
     });
 }
 
-// FIXME: It would be better to do without this.
-void IOChannel::readSync(size_t offset, size_t size, WorkQueue* queue, std::function<void (Data&, int error)> completionHandler)
-{
-    auto semaphore = adoptDispatch(dispatch_semaphore_create(0));
-    read(offset, size, queue, [semaphore, &completionHandler](Data& data, int error) {
-        completionHandler(data, error);
-        dispatch_semaphore_signal(semaphore.get());
-    });
-    dispatch_semaphore_wait(semaphore.get(), DISPATCH_TIME_FOREVER);
-}
-
 void IOChannel::write(size_t offset, const Data& data, WorkQueue* queue, std::function<void (int error)> completionHandler)
 {
     RefPtr<IOChannel> channel(this);
index aa90a3d..0a4af25 100644 (file)
@@ -175,57 +175,6 @@ void IOChannel::read(size_t offset, size_t size, WorkQueue* queue, std::function
         reinterpret_cast<GAsyncReadyCallback>(inputStreamReadReadyCallback), asyncData);
 }
 
-// FIXME: It would be better to do without this.
-void IOChannel::readSync(size_t offset, size_t size, WorkQueue* queue, std::function<void (Data&, int error)> completionHandler)
-{
-    ASSERT(!isMainThread());
-
-    static GMutex mutex;
-    static GCond condition;
-    WTF::GMutexLocker<GMutex> lock(mutex);
-    RefPtr<IOChannel> channel(this);
-
-    if (!m_inputStream) {
-        runTaskInQueue([channel, completionHandler] {
-            Data data;
-            completionHandler(data, -1);
-            g_cond_signal(&condition);
-        }, queue);
-        g_cond_wait(&condition, &mutex);
-        return;
-    }
-
-    size_t bufferSize = std::min(size, gDefaultReadBufferSize);
-    uint8_t* bufferData = static_cast<uint8_t*>(fastMalloc(bufferSize));
-    GRefPtr<SoupBuffer> readBuffer = adoptGRef(soup_buffer_new_with_owner(bufferData, bufferSize, bufferData, fastFree));
-    Data data;
-    size_t pendingBytesToRead = size;
-    size_t bytesToRead = bufferSize;
-    do {
-        // FIXME: implement offset.
-        gssize bytesRead = g_input_stream_read(m_inputStream.get(), const_cast<char*>(readBuffer->data), bytesToRead, nullptr, nullptr);
-        if (bytesRead == -1) {
-            completionHandler(data, -1);
-            return;
-        }
-
-        if (!bytesRead)
-            break;
-
-        ASSERT(bytesRead > 0);
-        fillDataFromReadBuffer(readBuffer.get(), static_cast<size_t>(bytesRead), data);
-
-        pendingBytesToRead = size - data.size();
-        bytesToRead = std::min(pendingBytesToRead, readBuffer->length);
-    } while (pendingBytesToRead);
-
-    runTaskInQueue([channel, &data, completionHandler] {
-        completionHandler(data, 0);
-        g_cond_signal(&condition);
-    }, queue);
-    g_cond_wait(&condition, &mutex);
-}
-
 struct WriteAsyncData {
     RefPtr<IOChannel> channel;
     GRefPtr<SoupBuffer> buffer;
index 4db11a2..5f0b784 100644 (file)
@@ -32,6 +32,7 @@
 #include "NetworkCacheCoders.h"
 #include "NetworkCacheFileSystem.h"
 #include "NetworkCacheIOChannel.h"
+#include <condition_variable>
 #include <wtf/RandomNumber.h>
 #include <wtf/RunLoop.h>
 #include <wtf/text/CString.h>
@@ -68,11 +69,26 @@ struct Storage::WriteOperation {
         , mappedBodyHandler(mappedBodyHandler)
     { }
     
-    Record record;
-    MappedBodyHandler mappedBodyHandler;
+    const Record record;
+    const MappedBodyHandler mappedBodyHandler;
+
     std::atomic<unsigned> activeCount { 0 };
 };
 
+struct Storage::TraverseOperation {
+    TraverseOperation(TraverseFlags flags, const TraverseHandler& handler)
+        : flags(flags)
+        , handler(handler)
+    { }
+
+    const TraverseFlags flags;
+    const TraverseHandler handler;
+
+    std::mutex activeMutex;
+    std::condition_variable activeCondition;
+    unsigned activeCount { 0 };
+};
+
 std::unique_ptr<Storage> Storage::open(const String& cachePath)
 {
     ASSERT(RunLoop::isMain());
@@ -653,7 +669,8 @@ void Storage::retrieve(const Key& key, unsigned priority, RetrieveCompletionHand
     if (retrieveFromMemory(m_activeWriteOperations, key, completionHandler))
         return;
 
-    m_pendingReadOperationsByPriority[priority].prepend(new ReadOperation { key, WTF::move(completionHandler) } );
+    auto readOperation = std::make_unique<ReadOperation>(key, WTF::move(completionHandler));
+    m_pendingReadOperationsByPriority[priority].prepend(WTF::move(readOperation));
     dispatchPendingReadOperations();
 }
 
@@ -665,7 +682,8 @@ void Storage::store(const Record& record, MappedBodyHandler&& mappedBodyHandler)
     if (!m_capacity)
         return;
 
-    m_pendingWriteOperations.prepend(new WriteOperation { record, WTF::move(mappedBodyHandler) });
+    auto writeOperation = std::make_unique<WriteOperation>(record, WTF::move(mappedBodyHandler));
+    m_pendingWriteOperations.prepend(WTF::move(writeOperation));
 
     // Add key to the filter already here as we do lookups from the pending operations too.
     addToRecordFilter(record.key);
@@ -685,37 +703,65 @@ void Storage::traverse(TraverseFlags flags, TraverseHandler&& traverseHandler)
     ASSERT(RunLoop::isMain());
     ASSERT(traverseHandler);
     // Avoid non-thread safe std::function copies.
-    auto* traverseHandlerPtr = new TraverseHandler(WTF::move(traverseHandler));
-    
-    ioQueue().dispatch([this, flags, traverseHandlerPtr] {
-        auto& traverseHandler = *traverseHandlerPtr;
-        traverseRecordsFiles(recordsPath(), [this, flags, &traverseHandler](const String& fileName, const String& partitionPath) {
+
+    auto traverseOperationPtr = std::make_unique<TraverseOperation>(flags, WTF::move(traverseHandler));
+    auto& traverseOperation = *traverseOperationPtr;
+    m_activeTraverseOperations.add(WTF::move(traverseOperationPtr));
+
+    ioQueue().dispatch([this, &traverseOperation] {
+        traverseRecordsFiles(recordsPath(), [this, &traverseOperation](const String& fileName, const String& partitionPath) {
             if (fileName.length() != Key::hashStringLength())
                 return;
             auto recordPath = WebCore::pathByAppendingComponent(partitionPath, fileName);
 
-            RecordInfo info;
-            if (flags & TraverseFlag::ComputeWorth)
-                info.worth = computeRecordWorth(fileTimes(recordPath));
-            if (flags & TraverseFlag::ShareCount)
-                info.bodyShareCount = m_blobStorage.shareCount(bodyPathForRecordPath(recordPath));
+            double worth = -1;
+            if (traverseOperation.flags & TraverseFlag::ComputeWorth)
+                worth = computeRecordWorth(fileTimes(recordPath));
+            unsigned bodyShareCount = 0;
+            if (traverseOperation.flags & TraverseFlag::ShareCount)
+                bodyShareCount = m_blobStorage.shareCount(bodyPathForRecordPath(recordPath));
+
+            std::unique_lock<std::mutex> lock(traverseOperation.activeMutex);
+            ++traverseOperation.activeCount;
 
             auto channel = IOChannel::open(recordPath, IOChannel::Type::Read);
-            // FIXME: Traversal is slower than it should be due to lack of parallelism.
-            channel->readSync(0, std::numeric_limits<size_t>::max(), nullptr, [this, &traverseHandler, &info](Data& fileData, int) {
+            channel->read(0, std::numeric_limits<size_t>::max(), nullptr, [this, &traverseOperation, worth, bodyShareCount](Data& fileData, int) {
                 RecordMetaData metaData;
                 Data headerData;
                 if (decodeRecordHeader(fileData, metaData, headerData)) {
-                    Record record { metaData.key, std::chrono::system_clock::time_point(metaData.epochRelativeTimeStamp), headerData, { } };
-                    info.bodySize = metaData.bodySize;
-                    info.bodyHash = String::fromUTF8(SHA1::hexDigest(metaData.bodyHash));
-                    traverseHandler(&record, info);
+                    Record record {
+                        metaData.key,
+                        std::chrono::system_clock::time_point(metaData.epochRelativeTimeStamp),
+                        headerData,
+                        { }
+                    };
+                    RecordInfo info {
+                        static_cast<size_t>(metaData.bodySize),
+                        worth,
+                        bodyShareCount,
+                        String::fromUTF8(SHA1::hexDigest(metaData.bodyHash))
+                    };
+                    traverseOperation.handler(&record, info);
                 }
+
+                std::lock_guard<std::mutex> lock(traverseOperation.activeMutex);
+                --traverseOperation.activeCount;
+                traverseOperation.activeCondition.notify_one();
             });
+
+            const unsigned maximumParallelReadCount = 5;
+            traverseOperation.activeCondition.wait(lock, [&traverseOperation] {
+                return traverseOperation.activeCount <= maximumParallelReadCount;
+            });
+        });
+        // Wait for all reads to finish.
+        std::unique_lock<std::mutex> lock(traverseOperation.activeMutex);
+        traverseOperation.activeCondition.wait(lock, [&traverseOperation] {
+            return !traverseOperation.activeCount;
         });
-        RunLoop::main().dispatch([this, traverseHandlerPtr] {
-            (*traverseHandlerPtr)(nullptr, { });
-            delete traverseHandlerPtr;
+        RunLoop::main().dispatch([this, &traverseOperation] {
+            traverseOperation.handler(nullptr, { });
+            m_activeTraverseOperations.remove(&traverseOperation);
         });
     });
 }
index 85ce407..6eb7f71 100644 (file)
@@ -66,9 +66,9 @@ public:
     void clear(std::chrono::system_clock::time_point modifiedSinceTime, std::function<void ()>&& completionHandler);
 
     struct RecordInfo {
-        size_t bodySize { 0 };
-        double worth { -1 }; // 0-1 where 1 is the most valuable.
-        unsigned bodyShareCount { 0 };
+        size_t bodySize;
+        double worth; // 0-1 where 1 is the most valuable.
+        unsigned bodyShareCount;
         String bodyHash;
     };
     enum TraverseFlag {
@@ -155,6 +155,9 @@ private:
     HashSet<std::unique_ptr<WriteOperation>> m_activeWriteOperations;
     WebCore::Timer m_writeOperationDispatchTimer;
 
+    struct TraverseOperation;
+    HashSet<std::unique_ptr<TraverseOperation>> m_activeTraverseOperations;
+
     Ref<WorkQueue> m_ioQueue;
     Ref<WorkQueue> m_backgroundIOQueue;
     Ref<WorkQueue> m_serialBackgroundIOQueue;