Make NetworkCache::traverse faster
[WebKit-https.git] / Source / WebKit2 / NetworkProcess / cache / NetworkCacheIOChannelSoup.cpp
1 /*
2  * Copyright (C) 2015 Igalia S.L.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  * 1. Redistributions of source code must retain the above copyright
8  *    notice, this list of conditions and the following disclaimer.
9  * 2. Redistributions in binary form must reproduce the above copyright
10  *    notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
14  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
15  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
17  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
18  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
19  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
20  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
21  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
23  * THE POSSIBILITY OF SUCH DAMAGE.
24  */
25
26 #include "config.h"
27 #include "NetworkCacheIOChannel.h"
28
29 #if ENABLE(NETWORK_CACHE)
30
31 #include "NetworkCacheFileSystem.h"
32 #include <wtf/MainThread.h>
33 #include <wtf/glib/GMainLoopSource.h>
34 #include <wtf/glib/GMutexLocker.h>
35 #include <wtf/glib/GUniquePtr.h>
36
37 namespace WebKit {
38 namespace NetworkCache {
39
40 static const size_t gDefaultReadBufferSize = 4096;
41
42 IOChannel::IOChannel(const String& filePath, Type type)
43     : m_path(filePath)
44     , m_type(type)
45 {
46     auto path = WebCore::fileSystemRepresentation(filePath);
47     GRefPtr<GFile> file = adoptGRef(g_file_new_for_path(path.data()));
48     switch (m_type) {
49     case Type::Create: {
50         g_file_delete(file.get(), nullptr, nullptr);
51         m_outputStream = adoptGRef(G_OUTPUT_STREAM(g_file_create(file.get(), static_cast<GFileCreateFlags>(G_FILE_CREATE_PRIVATE), nullptr, nullptr)));
52 #if !HAVE(STAT_BIRTHTIME)
53         GUniquePtr<char> birthtimeString(g_strdup_printf("%" G_GUINT64_FORMAT, std::chrono::system_clock::to_time_t(std::chrono::system_clock::now())));
54         g_file_set_attribute_string(file.get(), "xattr::birthtime", birthtimeString.get(), G_FILE_QUERY_INFO_NONE, nullptr, nullptr);
55 #endif
56         break;
57     }
58     case Type::Write: {
59         m_ioStream = adoptGRef(g_file_open_readwrite(file.get(), nullptr, nullptr));
60         break;
61     }
62     case Type::Read:
63         m_inputStream = adoptGRef(G_INPUT_STREAM(g_file_read(file.get(), nullptr, nullptr)));
64         break;
65     }
66 }
67
68 Ref<IOChannel> IOChannel::open(const String& filePath, IOChannel::Type type)
69 {
70     return adoptRef(*new IOChannel(filePath, type));
71 }
72
73 static inline void runTaskInQueue(std::function<void ()> task, WorkQueue* queue)
74 {
75     if (queue) {
76         queue->dispatch(task);
77         return;
78     }
79
80     // Using nullptr as queue submits the result to the main context.
81     GMainLoopSource::scheduleAndDeleteOnDestroy("[WebKit] IOChannel task", task);
82 }
83
84 static void fillDataFromReadBuffer(SoupBuffer* readBuffer, size_t size, Data& data)
85 {
86     GRefPtr<SoupBuffer> buffer;
87     if (size != readBuffer->length) {
88         // The subbuffer does not copy the data.
89         buffer = adoptGRef(soup_buffer_new_subbuffer(readBuffer, 0, size));
90     } else
91         buffer = readBuffer;
92
93     if (data.isNull()) {
94         // First chunk, we need to force the data to be copied.
95         data = { reinterpret_cast<const uint8_t*>(buffer->data), size };
96     } else {
97         Data dataRead(WTF::move(buffer));
98         // Concatenate will copy the data.
99         data = concatenate(data, dataRead);
100     }
101 }
102
103 struct ReadAsyncData {
104     RefPtr<IOChannel> channel;
105     GRefPtr<SoupBuffer> buffer;
106     RefPtr<WorkQueue> queue;
107     size_t bytesToRead;
108     std::function<void (Data&, int error)> completionHandler;
109     Data data;
110 };
111
112 static void inputStreamReadReadyCallback(GInputStream* stream, GAsyncResult* result, gpointer userData)
113 {
114     std::unique_ptr<ReadAsyncData> asyncData(static_cast<ReadAsyncData*>(userData));
115     gssize bytesRead = g_input_stream_read_finish(stream, result, nullptr);
116     if (bytesRead == -1) {
117         WorkQueue* queue = asyncData->queue.get();
118         auto* asyncDataPtr = asyncData.release();
119         runTaskInQueue([asyncDataPtr] {
120             std::unique_ptr<ReadAsyncData> asyncData(asyncDataPtr);
121             asyncData->completionHandler(asyncData->data, -1);
122         }, queue);
123         return;
124     }
125
126     if (!bytesRead) {
127         WorkQueue* queue = asyncData->queue.get();
128         auto* asyncDataPtr = asyncData.release();
129         runTaskInQueue([asyncDataPtr] {
130             std::unique_ptr<ReadAsyncData> asyncData(asyncDataPtr);
131             asyncData->completionHandler(asyncData->data, 0);
132         }, queue);
133         return;
134     }
135
136     ASSERT(bytesRead > 0);
137     fillDataFromReadBuffer(asyncData->buffer.get(), static_cast<size_t>(bytesRead), asyncData->data);
138
139     size_t pendingBytesToRead = asyncData->bytesToRead - asyncData->data.size();
140     if (!pendingBytesToRead) {
141         WorkQueue* queue = asyncData->queue.get();
142         auto* asyncDataPtr = asyncData.release();
143         runTaskInQueue([asyncDataPtr] {
144             std::unique_ptr<ReadAsyncData> asyncData(asyncDataPtr);
145             asyncData->completionHandler(asyncData->data, 0);
146         }, queue);
147         return;
148     }
149
150     size_t bytesToRead = std::min(pendingBytesToRead, asyncData->buffer->length);
151     // Use a local variable for the data buffer to pass it to g_input_stream_read_async(), because ReadAsyncData is released.
152     auto data = const_cast<char*>(asyncData->buffer->data);
153     g_input_stream_read_async(stream, data, bytesToRead, G_PRIORITY_DEFAULT, nullptr,
154         reinterpret_cast<GAsyncReadyCallback>(inputStreamReadReadyCallback), asyncData.release());
155 }
156
157 void IOChannel::read(size_t offset, size_t size, WorkQueue* queue, std::function<void (Data&, int error)> completionHandler)
158 {
159     RefPtr<IOChannel> channel(this);
160     if (!m_inputStream) {
161         runTaskInQueue([channel, completionHandler] {
162             Data data;
163             completionHandler(data, -1);
164         }, queue);
165         return;
166     }
167
168     size_t bufferSize = std::min(size, gDefaultReadBufferSize);
169     uint8_t* bufferData = static_cast<uint8_t*>(fastMalloc(bufferSize));
170     GRefPtr<SoupBuffer> buffer = adoptGRef(soup_buffer_new_with_owner(bufferData, bufferSize, bufferData, fastFree));
171     ReadAsyncData* asyncData = new ReadAsyncData { this, buffer.get(), queue, size, completionHandler, { } };
172
173     // FIXME: implement offset.
174     g_input_stream_read_async(m_inputStream.get(), const_cast<char*>(buffer->data), bufferSize, G_PRIORITY_DEFAULT, nullptr,
175         reinterpret_cast<GAsyncReadyCallback>(inputStreamReadReadyCallback), asyncData);
176 }
177
178 struct WriteAsyncData {
179     RefPtr<IOChannel> channel;
180     GRefPtr<SoupBuffer> buffer;
181     RefPtr<WorkQueue> queue;
182     std::function<void (int error)> completionHandler;
183 };
184
185 static void outputStreamWriteReadyCallback(GOutputStream* stream, GAsyncResult* result, gpointer userData)
186 {
187     std::unique_ptr<WriteAsyncData> asyncData(static_cast<WriteAsyncData*>(userData));
188     gssize bytesWritten = g_output_stream_write_finish(stream, result, nullptr);
189     if (bytesWritten == -1) {
190         WorkQueue* queue = asyncData->queue.get();
191         auto* asyncDataPtr = asyncData.release();
192         runTaskInQueue([asyncDataPtr] {
193             std::unique_ptr<WriteAsyncData> asyncData(asyncDataPtr);
194             asyncData->completionHandler(-1);
195         }, queue);
196         return;
197     }
198
199     gssize pendingBytesToWrite = asyncData->buffer->length - bytesWritten;
200     if (!pendingBytesToWrite) {
201         WorkQueue* queue = asyncData->queue.get();
202         auto* asyncDataPtr = asyncData.release();
203         runTaskInQueue([asyncDataPtr] {
204             std::unique_ptr<WriteAsyncData> asyncData(asyncDataPtr);
205             asyncData->completionHandler(0);
206         }, queue);
207         return;
208     }
209
210     asyncData->buffer = adoptGRef(soup_buffer_new_subbuffer(asyncData->buffer.get(), bytesWritten, pendingBytesToWrite));
211     // Use a local variable for the data buffer to pass it to g_output_stream_write_async(), because WriteAsyncData is released.
212     auto data = asyncData->buffer->data;
213     g_output_stream_write_async(stream, data, pendingBytesToWrite, G_PRIORITY_DEFAULT_IDLE, nullptr,
214         reinterpret_cast<GAsyncReadyCallback>(outputStreamWriteReadyCallback), asyncData.release());
215 }
216
217 void IOChannel::write(size_t offset, const Data& data, WorkQueue* queue, std::function<void (int error)> completionHandler)
218 {
219     RefPtr<IOChannel> channel(this);
220     if (!m_outputStream && !m_ioStream) {
221         runTaskInQueue([channel, completionHandler] {
222             completionHandler(-1);
223         }, queue);
224         return;
225     }
226
227     GOutputStream* stream = m_outputStream ? m_outputStream.get() : g_io_stream_get_output_stream(G_IO_STREAM(m_ioStream.get()));
228     if (!stream) {
229         runTaskInQueue([channel, completionHandler] {
230             completionHandler(-1);
231         }, queue);
232         return;
233     }
234
235     WriteAsyncData* asyncData = new WriteAsyncData { this, data.soupBuffer(), queue, completionHandler };
236     // FIXME: implement offset.
237     g_output_stream_write_async(stream, asyncData->buffer->data, data.size(), G_PRIORITY_DEFAULT_IDLE, nullptr,
238         reinterpret_cast<GAsyncReadyCallback>(outputStreamWriteReadyCallback), asyncData);
239 }
240
241 } // namespace NetworkCache
242 } // namespace WebKit
243
244 #endif