#include "NetworkCacheCoders.h"
#include "NetworkCacheFileSystem.h"
#include "NetworkCacheIOChannel.h"
+#include <condition_variable>
#include <wtf/RandomNumber.h>
#include <wtf/RunLoop.h>
#include <wtf/text/CString.h>
, mappedBodyHandler(mappedBodyHandler)
{ }
- Record record;
- MappedBodyHandler mappedBodyHandler;
+ const Record record;
+ const MappedBodyHandler mappedBodyHandler;
+
std::atomic<unsigned> activeCount { 0 };
};
+struct Storage::TraverseOperation {
+ TraverseOperation(TraverseFlags flags, const TraverseHandler& handler)
+ : flags(flags)
+ , handler(handler)
+ { }
+
+ const TraverseFlags flags;
+ const TraverseHandler handler;
+
+ std::mutex activeMutex;
+ std::condition_variable activeCondition;
+ unsigned activeCount { 0 };
+};
+
std::unique_ptr<Storage> Storage::open(const String& cachePath)
{
ASSERT(RunLoop::isMain());
if (retrieveFromMemory(m_activeWriteOperations, key, completionHandler))
return;
- m_pendingReadOperationsByPriority[priority].prepend(new ReadOperation { key, WTF::move(completionHandler) } );
+ auto readOperation = std::make_unique<ReadOperation>(key, WTF::move(completionHandler));
+ m_pendingReadOperationsByPriority[priority].prepend(WTF::move(readOperation));
dispatchPendingReadOperations();
}
if (!m_capacity)
return;
- m_pendingWriteOperations.prepend(new WriteOperation { record, WTF::move(mappedBodyHandler) });
+ auto writeOperation = std::make_unique<WriteOperation>(record, WTF::move(mappedBodyHandler));
+ m_pendingWriteOperations.prepend(WTF::move(writeOperation));
// Add key to the filter already here as we do lookups from the pending operations too.
addToRecordFilter(record.key);
ASSERT(RunLoop::isMain());
ASSERT(traverseHandler);
// Avoid non-thread safe std::function copies.
- auto* traverseHandlerPtr = new TraverseHandler(WTF::move(traverseHandler));
-
- ioQueue().dispatch([this, flags, traverseHandlerPtr] {
- auto& traverseHandler = *traverseHandlerPtr;
- traverseRecordsFiles(recordsPath(), [this, flags, &traverseHandler](const String& fileName, const String& partitionPath) {
+
+ auto traverseOperationPtr = std::make_unique<TraverseOperation>(flags, WTF::move(traverseHandler));
+ auto& traverseOperation = *traverseOperationPtr;
+ m_activeTraverseOperations.add(WTF::move(traverseOperationPtr));
+
+ ioQueue().dispatch([this, &traverseOperation] {
+ traverseRecordsFiles(recordsPath(), [this, &traverseOperation](const String& fileName, const String& partitionPath) {
if (fileName.length() != Key::hashStringLength())
return;
auto recordPath = WebCore::pathByAppendingComponent(partitionPath, fileName);
- RecordInfo info;
- if (flags & TraverseFlag::ComputeWorth)
- info.worth = computeRecordWorth(fileTimes(recordPath));
- if (flags & TraverseFlag::ShareCount)
- info.bodyShareCount = m_blobStorage.shareCount(bodyPathForRecordPath(recordPath));
+ double worth = -1;
+ if (traverseOperation.flags & TraverseFlag::ComputeWorth)
+ worth = computeRecordWorth(fileTimes(recordPath));
+ unsigned bodyShareCount = 0;
+ if (traverseOperation.flags & TraverseFlag::ShareCount)
+ bodyShareCount = m_blobStorage.shareCount(bodyPathForRecordPath(recordPath));
+
+ std::unique_lock<std::mutex> lock(traverseOperation.activeMutex);
+ ++traverseOperation.activeCount;
auto channel = IOChannel::open(recordPath, IOChannel::Type::Read);
- // FIXME: Traversal is slower than it should be due to lack of parallelism.
- channel->readSync(0, std::numeric_limits<size_t>::max(), nullptr, [this, &traverseHandler, &info](Data& fileData, int) {
+ channel->read(0, std::numeric_limits<size_t>::max(), nullptr, [this, &traverseOperation, worth, bodyShareCount](Data& fileData, int) {
RecordMetaData metaData;
Data headerData;
if (decodeRecordHeader(fileData, metaData, headerData)) {
- Record record { metaData.key, std::chrono::system_clock::time_point(metaData.epochRelativeTimeStamp), headerData, { } };
- info.bodySize = metaData.bodySize;
- info.bodyHash = String::fromUTF8(SHA1::hexDigest(metaData.bodyHash));
- traverseHandler(&record, info);
+ Record record {
+ metaData.key,
+ std::chrono::system_clock::time_point(metaData.epochRelativeTimeStamp),
+ headerData,
+ { }
+ };
+ RecordInfo info {
+ static_cast<size_t>(metaData.bodySize),
+ worth,
+ bodyShareCount,
+ String::fromUTF8(SHA1::hexDigest(metaData.bodyHash))
+ };
+ traverseOperation.handler(&record, info);
}
+
+ std::lock_guard<std::mutex> lock(traverseOperation.activeMutex);
+ --traverseOperation.activeCount;
+ traverseOperation.activeCondition.notify_one();
});
+
+ const unsigned maximumParallelReadCount = 5;
+ traverseOperation.activeCondition.wait(lock, [&traverseOperation] {
+ return traverseOperation.activeCount <= maximumParallelReadCount;
+ });
+ });
+ // Wait for all reads to finish.
+ std::unique_lock<std::mutex> lock(traverseOperation.activeMutex);
+ traverseOperation.activeCondition.wait(lock, [&traverseOperation] {
+ return !traverseOperation.activeCount;
});
- RunLoop::main().dispatch([this, traverseHandlerPtr] {
- (*traverseHandlerPtr)(nullptr, { });
- delete traverseHandlerPtr;
+ RunLoop::main().dispatch([this, &traverseOperation] {
+ traverseOperation.handler(nullptr, { });
+ m_activeTraverseOperations.remove(&traverseOperation);
});
});
}