Use WTF::Function instead of std::function in NetworkProcess code
[WebKit-https.git] / Source / WebKit2 / NetworkProcess / cache / NetworkCacheIOChannelSoup.cpp
1 /*
2  * Copyright (C) 2015 Igalia S.L.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  * 1. Redistributions of source code must retain the above copyright
8  *    notice, this list of conditions and the following disclaimer.
9  * 2. Redistributions in binary form must reproduce the above copyright
10  *    notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
14  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
15  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
17  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
18  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
19  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
20  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
21  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
23  * THE POSSIBILITY OF SUCH DAMAGE.
24  */
25
26 #include "config.h"
27 #include "NetworkCacheIOChannel.h"
28
29 #if ENABLE(NETWORK_CACHE)
30
31 #include "NetworkCacheFileSystem.h"
32 #include <wtf/MainThread.h>
33 #include <wtf/RunLoop.h>
34 #include <wtf/glib/GUniquePtr.h>
35 #include <wtf/glib/RunLoopSourcePriority.h>
36
37 namespace WebKit {
38 namespace NetworkCache {
39
40 static const size_t gDefaultReadBufferSize = 4096;
41
42 IOChannel::IOChannel(const String& filePath, Type type)
43     : m_path(filePath)
44     , m_type(type)
45 {
46     auto path = WebCore::fileSystemRepresentation(filePath);
47     GRefPtr<GFile> file = adoptGRef(g_file_new_for_path(path.data()));
48     switch (m_type) {
49     case Type::Create: {
50         g_file_delete(file.get(), nullptr, nullptr);
51         m_outputStream = adoptGRef(G_OUTPUT_STREAM(g_file_create(file.get(), static_cast<GFileCreateFlags>(G_FILE_CREATE_PRIVATE), nullptr, nullptr)));
52 #if !HAVE(STAT_BIRTHTIME)
53         GUniquePtr<char> birthtimeString(g_strdup_printf("%" G_GUINT64_FORMAT, std::chrono::system_clock::to_time_t(std::chrono::system_clock::now())));
54         g_file_set_attribute_string(file.get(), "xattr::birthtime", birthtimeString.get(), G_FILE_QUERY_INFO_NONE, nullptr, nullptr);
55 #endif
56         break;
57     }
58     case Type::Write: {
59         m_ioStream = adoptGRef(g_file_open_readwrite(file.get(), nullptr, nullptr));
60         break;
61     }
62     case Type::Read:
63         m_inputStream = adoptGRef(G_INPUT_STREAM(g_file_read(file.get(), nullptr, nullptr)));
64         break;
65     }
66 }
67
68 IOChannel::~IOChannel()
69 {
70     RELEASE_ASSERT(!m_wasDeleted.exchange(true));
71 }
72
73 Ref<IOChannel> IOChannel::open(const String& filePath, IOChannel::Type type)
74 {
75     return adoptRef(*new IOChannel(filePath, type));
76 }
77
78 static inline void runTaskInQueue(Function<void ()>&& task, WorkQueue* queue)
79 {
80     if (queue) {
81         queue->dispatch(WTFMove(task));
82         return;
83     }
84
85     // Using nullptr as queue submits the result to the main context.
86     RunLoop::main().dispatch(WTFMove(task));
87 }
88
89 static void fillDataFromReadBuffer(SoupBuffer* readBuffer, size_t size, Data& data)
90 {
91     GRefPtr<SoupBuffer> buffer;
92     if (size != readBuffer->length) {
93         // The subbuffer does not copy the data.
94         buffer = adoptGRef(soup_buffer_new_subbuffer(readBuffer, 0, size));
95     } else
96         buffer = readBuffer;
97
98     if (data.isNull()) {
99         // First chunk, we need to force the data to be copied.
100         data = { reinterpret_cast<const uint8_t*>(buffer->data), size };
101     } else {
102         Data dataRead(WTFMove(buffer));
103         // Concatenate will copy the data.
104         data = concatenate(data, dataRead);
105     }
106 }
107
108 struct ReadAsyncData {
109     RefPtr<IOChannel> channel;
110     GRefPtr<SoupBuffer> buffer;
111     RefPtr<WorkQueue> queue;
112     size_t bytesToRead;
113     Function<void (Data&, int error)> completionHandler;
114     Data data;
115 };
116
117 static void inputStreamReadReadyCallback(GInputStream* stream, GAsyncResult* result, gpointer userData)
118 {
119     std::unique_ptr<ReadAsyncData> asyncData(static_cast<ReadAsyncData*>(userData));
120     gssize bytesRead = g_input_stream_read_finish(stream, result, nullptr);
121     if (bytesRead == -1) {
122         WorkQueue* queue = asyncData->queue.get();
123         runTaskInQueue([asyncData = WTFMove(asyncData)] {
124             asyncData->completionHandler(asyncData->data, -1);
125         }, queue);
126         return;
127     }
128
129     if (!bytesRead) {
130         WorkQueue* queue = asyncData->queue.get();
131         runTaskInQueue([asyncData = WTFMove(asyncData)] {
132             asyncData->completionHandler(asyncData->data, 0);
133         }, queue);
134         return;
135     }
136
137     ASSERT(bytesRead > 0);
138     fillDataFromReadBuffer(asyncData->buffer.get(), static_cast<size_t>(bytesRead), asyncData->data);
139
140     size_t pendingBytesToRead = asyncData->bytesToRead - asyncData->data.size();
141     if (!pendingBytesToRead) {
142         WorkQueue* queue = asyncData->queue.get();
143         runTaskInQueue([asyncData = WTFMove(asyncData)] {
144             asyncData->completionHandler(asyncData->data, 0);
145         }, queue);
146         return;
147     }
148
149     size_t bytesToRead = std::min(pendingBytesToRead, asyncData->buffer->length);
150     // Use a local variable for the data buffer to pass it to g_input_stream_read_async(), because ReadAsyncData is released.
151     auto data = const_cast<char*>(asyncData->buffer->data);
152     g_input_stream_read_async(stream, data, bytesToRead, RunLoopSourcePriority::DiskCacheRead, nullptr,
153         reinterpret_cast<GAsyncReadyCallback>(inputStreamReadReadyCallback), asyncData.release());
154 }
155
156 void IOChannel::read(size_t offset, size_t size, WorkQueue* queue, Function<void (Data&, int error)>&& completionHandler)
157 {
158     RefPtr<IOChannel> channel(this);
159     if (!m_inputStream) {
160         runTaskInQueue([channel, completionHandler = WTFMove(completionHandler)] {
161             Data data;
162             completionHandler(data, -1);
163         }, queue);
164         return;
165     }
166
167     if (!isMainThread()) {
168         readSyncInThread(offset, size, queue, WTFMove(completionHandler));
169         return;
170     }
171
172     size_t bufferSize = std::min(size, gDefaultReadBufferSize);
173     uint8_t* bufferData = static_cast<uint8_t*>(fastMalloc(bufferSize));
174     GRefPtr<SoupBuffer> buffer = adoptGRef(soup_buffer_new_with_owner(bufferData, bufferSize, bufferData, fastFree));
175     ReadAsyncData* asyncData = new ReadAsyncData { this, buffer.get(), queue, size, WTFMove(completionHandler), { } };
176
177     // FIXME: implement offset.
178     g_input_stream_read_async(m_inputStream.get(), const_cast<char*>(buffer->data), bufferSize, RunLoopSourcePriority::DiskCacheRead, nullptr,
179         reinterpret_cast<GAsyncReadyCallback>(inputStreamReadReadyCallback), asyncData);
180 }
181
182 void IOChannel::readSyncInThread(size_t offset, size_t size, WorkQueue* queue, Function<void (Data&, int error)>&& completionHandler)
183 {
184     ASSERT(!isMainThread());
185
186     RefPtr<IOChannel> channel(this);
187     Thread::create("IOChannel::readSync", [channel, size, queue, completionHandler = WTFMove(completionHandler)] () mutable {
188         size_t bufferSize = std::min(size, gDefaultReadBufferSize);
189         uint8_t* bufferData = static_cast<uint8_t*>(fastMalloc(bufferSize));
190         GRefPtr<SoupBuffer> readBuffer = adoptGRef(soup_buffer_new_with_owner(bufferData, bufferSize, bufferData, fastFree));
191         Data data;
192         size_t pendingBytesToRead = size;
193         size_t bytesToRead = bufferSize;
194         do {
195             // FIXME: implement offset.
196             gssize bytesRead = g_input_stream_read(channel->m_inputStream.get(), const_cast<char*>(readBuffer->data), bytesToRead, nullptr, nullptr);
197             if (bytesRead == -1) {
198                 runTaskInQueue([channel, completionHandler = WTFMove(completionHandler)] {
199                     Data data;
200                     completionHandler(data, -1);
201                 }, queue);
202                 return;
203             }
204
205             if (!bytesRead)
206                 break;
207
208             ASSERT(bytesRead > 0);
209             fillDataFromReadBuffer(readBuffer.get(), static_cast<size_t>(bytesRead), data);
210
211             pendingBytesToRead = size - data.size();
212             bytesToRead = std::min(pendingBytesToRead, readBuffer->length);
213         } while (pendingBytesToRead);
214
215         GRefPtr<SoupBuffer> bufferCapture = data.soupBuffer();
216         runTaskInQueue([channel, bufferCapture, completionHandler = WTFMove(completionHandler)] {
217             GRefPtr<SoupBuffer> buffer = bufferCapture;
218             Data data = { WTFMove(buffer) };
219             completionHandler(data, 0);
220         }, queue);
221     })->detach();
222 }
223
224 struct WriteAsyncData {
225     RefPtr<IOChannel> channel;
226     GRefPtr<SoupBuffer> buffer;
227     RefPtr<WorkQueue> queue;
228     Function<void (int error)> completionHandler;
229 };
230
231 static void outputStreamWriteReadyCallback(GOutputStream* stream, GAsyncResult* result, gpointer userData)
232 {
233     std::unique_ptr<WriteAsyncData> asyncData(static_cast<WriteAsyncData*>(userData));
234     gssize bytesWritten = g_output_stream_write_finish(stream, result, nullptr);
235     if (bytesWritten == -1) {
236         WorkQueue* queue = asyncData->queue.get();
237         runTaskInQueue([asyncData = WTFMove(asyncData)] {
238             asyncData->completionHandler(-1);
239         }, queue);
240         return;
241     }
242
243     gssize pendingBytesToWrite = asyncData->buffer->length - bytesWritten;
244     if (!pendingBytesToWrite) {
245         WorkQueue* queue = asyncData->queue.get();
246         runTaskInQueue([asyncData = WTFMove(asyncData)] {
247             asyncData->completionHandler(0);
248         }, queue);
249         return;
250     }
251
252     asyncData->buffer = adoptGRef(soup_buffer_new_subbuffer(asyncData->buffer.get(), bytesWritten, pendingBytesToWrite));
253     // Use a local variable for the data buffer to pass it to g_output_stream_write_async(), because WriteAsyncData is released.
254     auto data = asyncData->buffer->data;
255     g_output_stream_write_async(stream, data, pendingBytesToWrite, RunLoopSourcePriority::DiskCacheWrite, nullptr,
256         reinterpret_cast<GAsyncReadyCallback>(outputStreamWriteReadyCallback), asyncData.release());
257 }
258
259 void IOChannel::write(size_t offset, const Data& data, WorkQueue* queue, Function<void (int error)>&& completionHandler)
260 {
261     RefPtr<IOChannel> channel(this);
262     if (!m_outputStream && !m_ioStream) {
263         runTaskInQueue([channel, completionHandler = WTFMove(completionHandler)] {
264             completionHandler(-1);
265         }, queue);
266         return;
267     }
268
269     GOutputStream* stream = m_outputStream ? m_outputStream.get() : g_io_stream_get_output_stream(G_IO_STREAM(m_ioStream.get()));
270     if (!stream) {
271         runTaskInQueue([channel, completionHandler = WTFMove(completionHandler)] {
272             completionHandler(-1);
273         }, queue);
274         return;
275     }
276
277     WriteAsyncData* asyncData = new WriteAsyncData { this, data.soupBuffer(), queue, WTFMove(completionHandler) };
278     // FIXME: implement offset.
279     g_output_stream_write_async(stream, asyncData->buffer->data, data.size(), RunLoopSourcePriority::DiskCacheWrite, nullptr,
280         reinterpret_cast<GAsyncReadyCallback>(outputStreamWriteReadyCallback), asyncData);
281 }
282
283 } // namespace NetworkCache
284 } // namespace WebKit
285
286 #endif