37a536ad9b8fd31b1c4069af7c4e52d6a4bfe378
[WebKit-https.git] / Source / WebKit2 / NetworkProcess / cache / NetworkCacheIOChannelSoup.cpp
1 /*
2  * Copyright (C) 2015 Igalia S.L.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  * 1. Redistributions of source code must retain the above copyright
8  *    notice, this list of conditions and the following disclaimer.
9  * 2. Redistributions in binary form must reproduce the above copyright
10  *    notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
14  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
15  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
17  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
18  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
19  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
20  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
21  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
23  * THE POSSIBILITY OF SUCH DAMAGE.
24  */
25
26 #include "config.h"
27 #include "NetworkCacheIOChannel.h"
28
29 #if ENABLE(NETWORK_CACHE)
30
31 #include "NetworkCacheFileSystem.h"
32 #include <wtf/MainThread.h>
33 #include <wtf/RunLoop.h>
34 #include <wtf/glib/GUniquePtr.h>
35
36 namespace WebKit {
37 namespace NetworkCache {
38
39 static const size_t gDefaultReadBufferSize = 4096;
40
41 IOChannel::IOChannel(const String& filePath, Type type)
42     : m_path(filePath)
43     , m_type(type)
44 {
45     auto path = WebCore::fileSystemRepresentation(filePath);
46     GRefPtr<GFile> file = adoptGRef(g_file_new_for_path(path.data()));
47     switch (m_type) {
48     case Type::Create: {
49         g_file_delete(file.get(), nullptr, nullptr);
50         m_outputStream = adoptGRef(G_OUTPUT_STREAM(g_file_create(file.get(), static_cast<GFileCreateFlags>(G_FILE_CREATE_PRIVATE), nullptr, nullptr)));
51 #if !HAVE(STAT_BIRTHTIME)
52         GUniquePtr<char> birthtimeString(g_strdup_printf("%" G_GUINT64_FORMAT, std::chrono::system_clock::to_time_t(std::chrono::system_clock::now())));
53         g_file_set_attribute_string(file.get(), "xattr::birthtime", birthtimeString.get(), G_FILE_QUERY_INFO_NONE, nullptr, nullptr);
54 #endif
55         break;
56     }
57     case Type::Write: {
58         m_ioStream = adoptGRef(g_file_open_readwrite(file.get(), nullptr, nullptr));
59         break;
60     }
61     case Type::Read:
62         m_inputStream = adoptGRef(G_INPUT_STREAM(g_file_read(file.get(), nullptr, nullptr)));
63         break;
64     }
65 }
66
67 Ref<IOChannel> IOChannel::open(const String& filePath, IOChannel::Type type)
68 {
69     return adoptRef(*new IOChannel(filePath, type));
70 }
71
72 static inline void runTaskInQueue(std::function<void ()> task, WorkQueue* queue)
73 {
74     if (queue) {
75         queue->dispatch(task);
76         return;
77     }
78
79     // Using nullptr as queue submits the result to the main context.
80     RunLoop::main().dispatch(WTF::move(task));
81 }
82
83 static void fillDataFromReadBuffer(SoupBuffer* readBuffer, size_t size, Data& data)
84 {
85     GRefPtr<SoupBuffer> buffer;
86     if (size != readBuffer->length) {
87         // The subbuffer does not copy the data.
88         buffer = adoptGRef(soup_buffer_new_subbuffer(readBuffer, 0, size));
89     } else
90         buffer = readBuffer;
91
92     if (data.isNull()) {
93         // First chunk, we need to force the data to be copied.
94         data = { reinterpret_cast<const uint8_t*>(buffer->data), size };
95     } else {
96         Data dataRead(WTF::move(buffer));
97         // Concatenate will copy the data.
98         data = concatenate(data, dataRead);
99     }
100 }
101
102 struct ReadAsyncData {
103     RefPtr<IOChannel> channel;
104     GRefPtr<SoupBuffer> buffer;
105     RefPtr<WorkQueue> queue;
106     size_t bytesToRead;
107     std::function<void (Data&, int error)> completionHandler;
108     Data data;
109 };
110
111 static void inputStreamReadReadyCallback(GInputStream* stream, GAsyncResult* result, gpointer userData)
112 {
113     std::unique_ptr<ReadAsyncData> asyncData(static_cast<ReadAsyncData*>(userData));
114     gssize bytesRead = g_input_stream_read_finish(stream, result, nullptr);
115     if (bytesRead == -1) {
116         WorkQueue* queue = asyncData->queue.get();
117         auto* asyncDataPtr = asyncData.release();
118         runTaskInQueue([asyncDataPtr] {
119             std::unique_ptr<ReadAsyncData> asyncData(asyncDataPtr);
120             asyncData->completionHandler(asyncData->data, -1);
121         }, queue);
122         return;
123     }
124
125     if (!bytesRead) {
126         WorkQueue* queue = asyncData->queue.get();
127         auto* asyncDataPtr = asyncData.release();
128         runTaskInQueue([asyncDataPtr] {
129             std::unique_ptr<ReadAsyncData> asyncData(asyncDataPtr);
130             asyncData->completionHandler(asyncData->data, 0);
131         }, queue);
132         return;
133     }
134
135     ASSERT(bytesRead > 0);
136     fillDataFromReadBuffer(asyncData->buffer.get(), static_cast<size_t>(bytesRead), asyncData->data);
137
138     size_t pendingBytesToRead = asyncData->bytesToRead - asyncData->data.size();
139     if (!pendingBytesToRead) {
140         WorkQueue* queue = asyncData->queue.get();
141         auto* asyncDataPtr = asyncData.release();
142         runTaskInQueue([asyncDataPtr] {
143             std::unique_ptr<ReadAsyncData> asyncData(asyncDataPtr);
144             asyncData->completionHandler(asyncData->data, 0);
145         }, queue);
146         return;
147     }
148
149     size_t bytesToRead = std::min(pendingBytesToRead, asyncData->buffer->length);
150     // Use a local variable for the data buffer to pass it to g_input_stream_read_async(), because ReadAsyncData is released.
151     auto data = const_cast<char*>(asyncData->buffer->data);
152     g_input_stream_read_async(stream, data, bytesToRead, G_PRIORITY_DEFAULT, nullptr,
153         reinterpret_cast<GAsyncReadyCallback>(inputStreamReadReadyCallback), asyncData.release());
154 }
155
156 void IOChannel::read(size_t offset, size_t size, WorkQueue* queue, std::function<void (Data&, int error)> completionHandler)
157 {
158     RefPtr<IOChannel> channel(this);
159     if (!m_inputStream) {
160         runTaskInQueue([channel, completionHandler] {
161             Data data;
162             completionHandler(data, -1);
163         }, queue);
164         return;
165     }
166
167     if (!isMainThread()) {
168         readSyncInThread(offset, size, queue, completionHandler);
169         return;
170     }
171
172     size_t bufferSize = std::min(size, gDefaultReadBufferSize);
173     uint8_t* bufferData = static_cast<uint8_t*>(fastMalloc(bufferSize));
174     GRefPtr<SoupBuffer> buffer = adoptGRef(soup_buffer_new_with_owner(bufferData, bufferSize, bufferData, fastFree));
175     ReadAsyncData* asyncData = new ReadAsyncData { this, buffer.get(), queue, size, completionHandler, { } };
176
177     // FIXME: implement offset.
178     g_input_stream_read_async(m_inputStream.get(), const_cast<char*>(buffer->data), bufferSize, G_PRIORITY_DEFAULT, nullptr,
179         reinterpret_cast<GAsyncReadyCallback>(inputStreamReadReadyCallback), asyncData);
180 }
181
182 void IOChannel::readSyncInThread(size_t offset, size_t size, WorkQueue* queue, std::function<void (Data&, int error)> completionHandler)
183 {
184     ASSERT(!isMainThread());
185
186     RefPtr<IOChannel> channel(this);
187     createThread("IOChannel::readSync", [channel, size, queue, completionHandler] {
188         size_t bufferSize = std::min(size, gDefaultReadBufferSize);
189         uint8_t* bufferData = static_cast<uint8_t*>(fastMalloc(bufferSize));
190         GRefPtr<SoupBuffer> readBuffer = adoptGRef(soup_buffer_new_with_owner(bufferData, bufferSize, bufferData, fastFree));
191         Data data;
192         size_t pendingBytesToRead = size;
193         size_t bytesToRead = bufferSize;
194         do {
195             // FIXME: implement offset.
196             gssize bytesRead = g_input_stream_read(channel->m_inputStream.get(), const_cast<char*>(readBuffer->data), bytesToRead, nullptr, nullptr);
197             if (bytesRead == -1) {
198                 runTaskInQueue([channel, completionHandler] {
199                     Data data;
200                     completionHandler(data, -1);
201                 }, queue);
202                 return;
203             }
204
205             if (!bytesRead)
206                 break;
207
208             ASSERT(bytesRead > 0);
209             fillDataFromReadBuffer(readBuffer.get(), static_cast<size_t>(bytesRead), data);
210
211             pendingBytesToRead = size - data.size();
212             bytesToRead = std::min(pendingBytesToRead, readBuffer->length);
213         } while (pendingBytesToRead);
214
215         GRefPtr<SoupBuffer> bufferCapture = data.soupBuffer();
216         runTaskInQueue([channel, bufferCapture, completionHandler] {
217             GRefPtr<SoupBuffer> buffer = bufferCapture;
218             Data data = { WTF::move(buffer) };
219             completionHandler(data, 0);
220         }, queue);
221     });
222 }
223
224 struct WriteAsyncData {
225     RefPtr<IOChannel> channel;
226     GRefPtr<SoupBuffer> buffer;
227     RefPtr<WorkQueue> queue;
228     std::function<void (int error)> completionHandler;
229 };
230
231 static void outputStreamWriteReadyCallback(GOutputStream* stream, GAsyncResult* result, gpointer userData)
232 {
233     std::unique_ptr<WriteAsyncData> asyncData(static_cast<WriteAsyncData*>(userData));
234     gssize bytesWritten = g_output_stream_write_finish(stream, result, nullptr);
235     if (bytesWritten == -1) {
236         WorkQueue* queue = asyncData->queue.get();
237         auto* asyncDataPtr = asyncData.release();
238         runTaskInQueue([asyncDataPtr] {
239             std::unique_ptr<WriteAsyncData> asyncData(asyncDataPtr);
240             asyncData->completionHandler(-1);
241         }, queue);
242         return;
243     }
244
245     gssize pendingBytesToWrite = asyncData->buffer->length - bytesWritten;
246     if (!pendingBytesToWrite) {
247         WorkQueue* queue = asyncData->queue.get();
248         auto* asyncDataPtr = asyncData.release();
249         runTaskInQueue([asyncDataPtr] {
250             std::unique_ptr<WriteAsyncData> asyncData(asyncDataPtr);
251             asyncData->completionHandler(0);
252         }, queue);
253         return;
254     }
255
256     asyncData->buffer = adoptGRef(soup_buffer_new_subbuffer(asyncData->buffer.get(), bytesWritten, pendingBytesToWrite));
257     // Use a local variable for the data buffer to pass it to g_output_stream_write_async(), because WriteAsyncData is released.
258     auto data = asyncData->buffer->data;
259     g_output_stream_write_async(stream, data, pendingBytesToWrite, G_PRIORITY_DEFAULT_IDLE, nullptr,
260         reinterpret_cast<GAsyncReadyCallback>(outputStreamWriteReadyCallback), asyncData.release());
261 }
262
263 void IOChannel::write(size_t offset, const Data& data, WorkQueue* queue, std::function<void (int error)> completionHandler)
264 {
265     RefPtr<IOChannel> channel(this);
266     if (!m_outputStream && !m_ioStream) {
267         runTaskInQueue([channel, completionHandler] {
268             completionHandler(-1);
269         }, queue);
270         return;
271     }
272
273     GOutputStream* stream = m_outputStream ? m_outputStream.get() : g_io_stream_get_output_stream(G_IO_STREAM(m_ioStream.get()));
274     if (!stream) {
275         runTaskInQueue([channel, completionHandler] {
276             completionHandler(-1);
277         }, queue);
278         return;
279     }
280
281     WriteAsyncData* asyncData = new WriteAsyncData { this, data.soupBuffer(), queue, completionHandler };
282     // FIXME: implement offset.
283     g_output_stream_write_async(stream, asyncData->buffer->data, data.size(), G_PRIORITY_DEFAULT_IDLE, nullptr,
284         reinterpret_cast<GAsyncReadyCallback>(outputStreamWriteReadyCallback), asyncData);
285 }
286
287 } // namespace NetworkCache
288 } // namespace WebKit
289
290 #endif