Clean up Storage code
[WebKit-https.git] / Source / WebKit2 / Platform / IPC / Connection.cpp
1 /*
2  * Copyright (C) 2010-2016 Apple Inc. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  * 1. Redistributions of source code must retain the above copyright
8  *    notice, this list of conditions and the following disclaimer.
9  * 2. Redistributions in binary form must reproduce the above copyright
10  *    notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
14  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
15  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
17  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
18  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
19  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
20  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
21  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
23  * THE POSSIBILITY OF SUCH DAMAGE.
24  */
25
26 #include "config.h"
27 #include "Connection.h"
28
29 #include "Logging.h"
30 #include <memory>
31 #include <wtf/CurrentTime.h>
32 #include <wtf/HashSet.h>
33 #include <wtf/NeverDestroyed.h>
34 #include <wtf/RunLoop.h>
35 #include <wtf/text/WTFString.h>
36 #include <wtf/threads/BinarySemaphore.h>
37
38 namespace IPC {
39
40 struct Connection::ReplyHandler {
41     RefPtr<FunctionDispatcher> dispatcher;
42     Function<void (std::unique_ptr<Decoder>)> handler;
43 };
44
45 struct Connection::WaitForMessageState {
46     WaitForMessageState(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, OptionSet<WaitForOption> waitForOptions)
47         : messageReceiverName(messageReceiverName)
48         , messageName(messageName)
49         , destinationID(destinationID)
50         , waitForOptions(waitForOptions)
51     {
52     }
53
54     StringReference messageReceiverName;
55     StringReference messageName;
56     uint64_t destinationID;
57
58     OptionSet<WaitForOption> waitForOptions;
59     bool messageWaitingInterrupted = false;
60
61     std::unique_ptr<Decoder> decoder;
62 };
63
64 class Connection::SyncMessageState {
65 public:
66     static SyncMessageState& singleton();
67
68     SyncMessageState();
69     ~SyncMessageState() = delete;
70
71     void wakeUpClientRunLoop()
72     {
73         m_waitForSyncReplySemaphore.signal();
74     }
75
76     bool wait(TimeWithDynamicClockType absoluteTime)
77     {
78         return m_waitForSyncReplySemaphore.wait(absoluteTime);
79     }
80
81     // Returns true if this message will be handled on a client thread that is currently
82     // waiting for a reply to a synchronous message.
83     bool processIncomingMessage(Connection&, std::unique_ptr<Decoder>&);
84
85     // Dispatch pending sync messages. if allowedConnection is not null, will only dispatch messages
86     // from that connection and put the other messages back in the queue.
87     void dispatchMessages(Connection* allowedConnection);
88
89 private:
90     void dispatchMessageAndResetDidScheduleDispatchMessagesForConnection(Connection&);
91
92     BinarySemaphore m_waitForSyncReplySemaphore;
93
94     // Protects m_didScheduleDispatchMessagesWorkSet and m_messagesToDispatchWhileWaitingForSyncReply.
95     Lock m_mutex;
96
97     // The set of connections for which we've scheduled a call to dispatchMessageAndResetDidScheduleDispatchMessagesForConnection.
98     HashSet<RefPtr<Connection>> m_didScheduleDispatchMessagesWorkSet;
99
100     struct ConnectionAndIncomingMessage {
101         Ref<Connection> connection;
102         std::unique_ptr<Decoder> message;
103     };
104     Vector<ConnectionAndIncomingMessage> m_messagesToDispatchWhileWaitingForSyncReply;
105 };
106
107 Connection::SyncMessageState& Connection::SyncMessageState::singleton()
108 {
109     static std::once_flag onceFlag;
110     static LazyNeverDestroyed<SyncMessageState> syncMessageState;
111
112     std::call_once(onceFlag, [] {
113         syncMessageState.construct();
114     });
115
116     return syncMessageState;
117 }
118
119 Connection::SyncMessageState::SyncMessageState()
120 {
121 }
122
123 bool Connection::SyncMessageState::processIncomingMessage(Connection& connection, std::unique_ptr<Decoder>& message)
124 {
125     if (!message->shouldDispatchMessageWhenWaitingForSyncReply())
126         return false;
127
128     ConnectionAndIncomingMessage connectionAndIncomingMessage { connection, WTFMove(message) };
129
130     {
131         std::lock_guard<Lock> lock(m_mutex);
132         
133         if (m_didScheduleDispatchMessagesWorkSet.add(&connection).isNewEntry) {
134             RunLoop::main().dispatch([this, protectedConnection = Ref<Connection>(connection)]() mutable {
135                 dispatchMessageAndResetDidScheduleDispatchMessagesForConnection(protectedConnection);
136             });
137         }
138
139         m_messagesToDispatchWhileWaitingForSyncReply.append(WTFMove(connectionAndIncomingMessage));
140     }
141
142     wakeUpClientRunLoop();
143
144     return true;
145 }
146
147 void Connection::SyncMessageState::dispatchMessages(Connection* allowedConnection)
148 {
149     ASSERT(RunLoop::isMain());
150
151     Vector<ConnectionAndIncomingMessage> messagesToDispatchWhileWaitingForSyncReply;
152
153     {
154         std::lock_guard<Lock> lock(m_mutex);
155         m_messagesToDispatchWhileWaitingForSyncReply.swap(messagesToDispatchWhileWaitingForSyncReply);
156     }
157
158     Vector<ConnectionAndIncomingMessage> messagesToPutBack;
159
160     for (size_t i = 0; i < messagesToDispatchWhileWaitingForSyncReply.size(); ++i) {
161         ConnectionAndIncomingMessage& connectionAndIncomingMessage = messagesToDispatchWhileWaitingForSyncReply[i];
162
163         if (allowedConnection && allowedConnection != connectionAndIncomingMessage.connection.ptr()) {
164             // This incoming message belongs to another connection and we don't want to dispatch it now
165             // so mark it to be put back in the message queue.
166             messagesToPutBack.append(WTFMove(connectionAndIncomingMessage));
167             continue;
168         }
169
170         connectionAndIncomingMessage.connection->dispatchMessage(WTFMove(connectionAndIncomingMessage.message));
171     }
172
173     if (!messagesToPutBack.isEmpty()) {
174         std::lock_guard<Lock> lock(m_mutex);
175
176         for (auto& message : messagesToPutBack)
177             m_messagesToDispatchWhileWaitingForSyncReply.append(WTFMove(message));
178     }
179 }
180
181 void Connection::SyncMessageState::dispatchMessageAndResetDidScheduleDispatchMessagesForConnection(Connection& connection)
182 {
183     {
184         std::lock_guard<Lock> lock(m_mutex);
185         ASSERT(m_didScheduleDispatchMessagesWorkSet.contains(&connection));
186         m_didScheduleDispatchMessagesWorkSet.remove(&connection);
187     }
188
189     dispatchMessages(&connection);
190 }
191
192 // Represents a sync request for which we're waiting on a reply.
193 struct Connection::PendingSyncReply {
194     // The request ID.
195     uint64_t syncRequestID { 0 };
196
197     // The reply decoder, will be null if there was an error processing the sync
198     // message on the other side.
199     std::unique_ptr<Decoder> replyDecoder;
200
201     // Will be set to true once a reply has been received.
202     bool didReceiveReply { false };
203
204     PendingSyncReply() = default;
205
206     explicit PendingSyncReply(uint64_t syncRequestID)
207         : syncRequestID(syncRequestID)
208     {
209     }
210 };
211
212 Ref<Connection> Connection::createServerConnection(Identifier identifier, Client& client)
213 {
214     return adoptRef(*new Connection(identifier, true, client));
215 }
216
217 Ref<Connection> Connection::createClientConnection(Identifier identifier, Client& client)
218 {
219     return adoptRef(*new Connection(identifier, false, client));
220 }
221
222 Connection::Connection(Identifier identifier, bool isServer, Client& client)
223     : m_client(client)
224     , m_isServer(isServer)
225     , m_syncRequestID(0)
226     , m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(false)
227     , m_shouldExitOnSyncMessageSendFailure(false)
228     , m_didCloseOnConnectionWorkQueueCallback(0)
229     , m_isConnected(false)
230     , m_connectionQueue(WorkQueue::create("com.apple.IPC.ReceiveQueue"))
231     , m_inSendSyncCount(0)
232     , m_inDispatchMessageCount(0)
233     , m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount(0)
234     , m_didReceiveInvalidMessage(false)
235     , m_waitingForMessage(nullptr)
236     , m_shouldWaitForSyncReplies(true)
237 {
238     ASSERT(RunLoop::isMain());
239
240     platformInitialize(identifier);
241
242 #if HAVE(QOS_CLASSES)
243     ASSERT(pthread_main_np());
244     m_mainThread = pthread_self();
245 #endif
246 }
247
248 Connection::~Connection()
249 {
250     ASSERT(!isValid());
251 }
252
253 void Connection::setOnlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(bool flag)
254 {
255     ASSERT(!m_isConnected);
256
257     m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage = flag;
258 }
259
260 void Connection::setShouldExitOnSyncMessageSendFailure(bool shouldExitOnSyncMessageSendFailure)
261 {
262     ASSERT(!m_isConnected);
263
264     m_shouldExitOnSyncMessageSendFailure = shouldExitOnSyncMessageSendFailure;
265 }
266
267 void Connection::addWorkQueueMessageReceiver(StringReference messageReceiverName, WorkQueue& workQueue, WorkQueueMessageReceiver* workQueueMessageReceiver)
268 {
269     ASSERT(RunLoop::isMain());
270
271     m_connectionQueue->dispatch([protectedThis = makeRef(*this), messageReceiverName = WTFMove(messageReceiverName), workQueue = &workQueue, workQueueMessageReceiver]() mutable {
272         ASSERT(!protectedThis->m_workQueueMessageReceivers.contains(messageReceiverName));
273
274         protectedThis->m_workQueueMessageReceivers.add(messageReceiverName, std::make_pair(workQueue, workQueueMessageReceiver));
275     });
276 }
277
278 void Connection::removeWorkQueueMessageReceiver(StringReference messageReceiverName)
279 {
280     ASSERT(RunLoop::isMain());
281
282     m_connectionQueue->dispatch([protectedThis = makeRef(*this), messageReceiverName = WTFMove(messageReceiverName)]() mutable {
283         ASSERT(protectedThis->m_workQueueMessageReceivers.contains(messageReceiverName));
284         protectedThis->m_workQueueMessageReceivers.remove(messageReceiverName);
285     });
286 }
287
288 void Connection::dispatchWorkQueueMessageReceiverMessage(WorkQueueMessageReceiver& workQueueMessageReceiver, Decoder& decoder)
289 {
290     if (!decoder.isSyncMessage()) {
291         workQueueMessageReceiver.didReceiveMessage(*this, decoder);
292         return;
293     }
294
295     uint64_t syncRequestID = 0;
296     if (!decoder.decode(syncRequestID) || !syncRequestID) {
297         // We received an invalid sync message.
298         // FIXME: Handle this.
299         decoder.markInvalid();
300         return;
301     }
302
303     auto replyEncoder = std::make_unique<Encoder>("IPC", "SyncMessageReply", syncRequestID);
304
305     // Hand off both the decoder and encoder to the work queue message receiver.
306     workQueueMessageReceiver.didReceiveSyncMessage(*this, decoder, replyEncoder);
307
308     // FIXME: If the message was invalid, we should send back a SyncMessageError.
309     ASSERT(!decoder.isInvalid());
310
311     if (replyEncoder)
312         sendSyncReply(WTFMove(replyEncoder));
313 }
314
315 void Connection::setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback callback)
316 {
317     ASSERT(!m_isConnected);
318
319     m_didCloseOnConnectionWorkQueueCallback = callback;    
320 }
321
322 void Connection::invalidate()
323 {
324     ASSERT(RunLoop::isMain());
325
326     if (!isValid()) {
327         // Someone already called invalidate().
328         return;
329     }
330     
331     m_isValid = false;
332
333     {
334         std::lock_guard<Lock> lock(m_replyHandlersLock);
335         for (auto& replyHandler : m_replyHandlers.values()) {
336             replyHandler.dispatcher->dispatch([handler = WTFMove(replyHandler.handler)] {
337                 handler(nullptr);
338             });
339         }
340
341         m_replyHandlers.clear();
342     }
343
344     m_connectionQueue->dispatch([protectedThis = makeRef(*this)]() mutable {
345         protectedThis->platformInvalidate();
346     });
347 }
348
349 void Connection::markCurrentlyDispatchedMessageAsInvalid()
350 {
351     // This should only be called while processing a message.
352     ASSERT(m_inDispatchMessageCount > 0);
353
354     m_didReceiveInvalidMessage = true;
355 }
356
357 std::unique_ptr<Encoder> Connection::createSyncMessageEncoder(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, uint64_t& syncRequestID)
358 {
359     auto encoder = std::make_unique<Encoder>(messageReceiverName, messageName, destinationID);
360     encoder->setIsSyncMessage(true);
361
362     // Encode the sync request ID.
363     syncRequestID = ++m_syncRequestID;
364     *encoder << syncRequestID;
365
366     return encoder;
367 }
368
369 bool Connection::sendMessage(std::unique_ptr<Encoder> encoder, OptionSet<SendOption> sendOptions)
370 {
371     if (!isValid())
372         return false;
373
374     if (m_inDispatchMessageMarkedToUseFullySynchronousModeForTesting && !encoder->isSyncMessage() && !(encoder->messageReceiverName() == "IPC")) {
375         uint64_t syncRequestID;
376         auto wrappedMessage = createSyncMessageEncoder("IPC", "WrappedAsyncMessageForTesting", encoder->destinationID(), syncRequestID);
377         wrappedMessage->setFullySynchronousModeForTesting();
378         wrappedMessage->wrapForTesting(WTFMove(encoder));
379         return static_cast<bool>(sendSyncMessage(syncRequestID, WTFMove(wrappedMessage), Seconds::infinity(), { }));
380     }
381
382     if (sendOptions.contains(SendOption::DispatchMessageEvenWhenWaitingForSyncReply)
383         && (!m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage
384             || m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount))
385         encoder->setShouldDispatchMessageWhenWaitingForSyncReply(true);
386
387     {
388         std::lock_guard<Lock> lock(m_outgoingMessagesMutex);
389         m_outgoingMessages.append(WTFMove(encoder));
390     }
391     
392     // FIXME: We should add a boolean flag so we don't call this when work has already been scheduled.
393     m_connectionQueue->dispatch([protectedThis = makeRef(*this)]() mutable {
394         protectedThis->sendOutgoingMessages();
395     });
396     return true;
397 }
398
399 void Connection::sendMessageWithReply(uint64_t requestID, std::unique_ptr<Encoder> encoder, FunctionDispatcher& replyDispatcher, Function<void (std::unique_ptr<Decoder>)>&& replyHandler)
400 {
401     {
402         std::lock_guard<Lock> lock(m_replyHandlersLock);
403
404         if (!isValid()) {
405             replyDispatcher.dispatch([replyHandler = WTFMove(replyHandler)] {
406                 replyHandler(nullptr);
407             });
408             return;
409         }
410
411         ASSERT(!m_replyHandlers.contains(requestID));
412         m_replyHandlers.set(requestID, ReplyHandler { &replyDispatcher, WTFMove(replyHandler) });
413     }
414
415     sendMessage(WTFMove(encoder), { });
416 }
417
418 bool Connection::sendSyncReply(std::unique_ptr<Encoder> encoder)
419 {
420     return sendMessage(WTFMove(encoder), { });
421 }
422
423 Seconds Connection::timeoutRespectingIgnoreTimeoutsForTesting(Seconds timeout) const
424 {
425     return m_ignoreTimeoutsForTesting ? Seconds::infinity() : timeout;
426 }
427
428 std::unique_ptr<Decoder> Connection::waitForMessage(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, Seconds timeout, OptionSet<WaitForOption> waitForOptions)
429 {
430     ASSERT(RunLoop::isMain());
431
432     timeout = timeoutRespectingIgnoreTimeoutsForTesting(timeout);
433
434     bool hasIncomingSynchronousMessage = false;
435
436     // First, check if this message is already in the incoming messages queue.
437     {
438         std::lock_guard<Lock> lock(m_incomingMessagesMutex);
439
440         for (auto it = m_incomingMessages.begin(), end = m_incomingMessages.end(); it != end; ++it) {
441             std::unique_ptr<Decoder>& message = *it;
442
443             if (message->messageReceiverName() == messageReceiverName && message->messageName() == messageName && message->destinationID() == destinationID) {
444                 std::unique_ptr<Decoder> returnedMessage = WTFMove(message);
445
446                 m_incomingMessages.remove(it);
447                 return returnedMessage;
448             }
449
450             if (message->isSyncMessage())
451                 hasIncomingSynchronousMessage = true;
452         }
453     }
454
455     // Don't even start waiting if we have InterruptWaitingIfSyncMessageArrives and there's a sync message already in the queue.
456     if (hasIncomingSynchronousMessage && waitForOptions.contains(WaitForOption::InterruptWaitingIfSyncMessageArrives)) {
457         m_waitingForMessage = nullptr;
458         return nullptr;
459     }
460
461     WaitForMessageState waitingForMessage(messageReceiverName, messageName, destinationID, waitForOptions);
462
463     {
464         std::lock_guard<Lock> lock(m_waitForMessageMutex);
465
466         // We don't support having multiple clients waiting for messages.
467         ASSERT(!m_waitingForMessage);
468
469         m_waitingForMessage = &waitingForMessage;
470     }
471
472     MonotonicTime absoluteTimeout = MonotonicTime::now() + timeout;
473
474     // Now wait for it to be set.
475     while (true) {
476         std::unique_lock<Lock> lock(m_waitForMessageMutex);
477
478         if (m_waitingForMessage->decoder) {
479             auto decoder = WTFMove(m_waitingForMessage->decoder);
480             m_waitingForMessage = nullptr;
481             return decoder;
482         }
483
484         // Now we wait.
485         bool didTimeout = !m_waitForMessageCondition.waitUntil(lock, absoluteTimeout);
486         // We timed out, lost our connection, or a sync message came in with InterruptWaitingIfSyncMessageArrives, so stop waiting.
487         if (didTimeout || m_waitingForMessage->messageWaitingInterrupted) {
488             m_waitingForMessage = nullptr;
489             break;
490         }
491     }
492
493     return nullptr;
494 }
495
496 std::unique_ptr<Decoder> Connection::sendSyncMessage(uint64_t syncRequestID, std::unique_ptr<Encoder> encoder, Seconds timeout, OptionSet<SendSyncOption> sendSyncOptions)
497 {
498     ASSERT(RunLoop::isMain());
499
500     if (!isValid()) {
501         didFailToSendSyncMessage();
502         return nullptr;
503     }
504
505     // Push the pending sync reply information on our stack.
506     {
507         LockHolder locker(m_syncReplyStateMutex);
508         if (!m_shouldWaitForSyncReplies) {
509             didFailToSendSyncMessage();
510             return nullptr;
511         }
512
513         m_pendingSyncReplies.append(PendingSyncReply(syncRequestID));
514     }
515
516     ++m_inSendSyncCount;
517
518     // First send the message.
519     sendMessage(WTFMove(encoder), IPC::SendOption::DispatchMessageEvenWhenWaitingForSyncReply);
520
521     // Then wait for a reply. Waiting for a reply could involve dispatching incoming sync messages, so
522     // keep an extra reference to the connection here in case it's invalidated.
523     Ref<Connection> protect(*this);
524     std::unique_ptr<Decoder> reply = waitForSyncReply(syncRequestID, timeout, sendSyncOptions);
525
526     --m_inSendSyncCount;
527
528     // Finally, pop the pending sync reply information.
529     {
530         LockHolder locker(m_syncReplyStateMutex);
531         ASSERT(m_pendingSyncReplies.last().syncRequestID == syncRequestID);
532         m_pendingSyncReplies.removeLast();
533     }
534
535     if (!reply)
536         didFailToSendSyncMessage();
537
538     return reply;
539 }
540
541 std::unique_ptr<Decoder> Connection::waitForSyncReply(uint64_t syncRequestID, Seconds timeout, OptionSet<SendSyncOption> sendSyncOptions)
542 {
543     timeout = timeoutRespectingIgnoreTimeoutsForTesting(timeout);
544     WallTime absoluteTime = WallTime::now() + timeout;
545
546     willSendSyncMessage(sendSyncOptions);
547     
548     bool timedOut = false;
549     while (!timedOut) {
550         // First, check if we have any messages that we need to process.
551         SyncMessageState::singleton().dispatchMessages(nullptr);
552         
553         {
554             LockHolder locker(m_syncReplyStateMutex);
555
556             // Second, check if there is a sync reply at the top of the stack.
557             ASSERT(!m_pendingSyncReplies.isEmpty());
558             
559             PendingSyncReply& pendingSyncReply = m_pendingSyncReplies.last();
560             ASSERT_UNUSED(syncRequestID, pendingSyncReply.syncRequestID == syncRequestID);
561             
562             // We found the sync reply, or the connection was closed.
563             if (pendingSyncReply.didReceiveReply || !m_shouldWaitForSyncReplies) {
564                 didReceiveSyncReply(sendSyncOptions);
565                 return WTFMove(pendingSyncReply.replyDecoder);
566             }
567         }
568
569         // Processing a sync message could cause the connection to be invalidated.
570         // (If the handler ends up calling Connection::invalidate).
571         // If that happens, we need to stop waiting, or we'll hang since we won't get
572         // any more incoming messages.
573         if (!isValid()) {
574             RELEASE_LOG_ERROR(IPC, "Connection::waitForSyncReply: Connection no longer valid, id = %" PRIu64, syncRequestID);
575             didReceiveSyncReply(sendSyncOptions);
576             return nullptr;
577         }
578
579         // We didn't find a sync reply yet, keep waiting.
580         // This allows the WebProcess to still serve clients while waiting for the message to return.
581         // Notably, it can continue to process accessibility requests, which are on the main thread.
582         timedOut = !SyncMessageState::singleton().wait(absoluteTime);
583     }
584
585     RELEASE_LOG_ERROR(IPC, "Connection::waitForSyncReply: Timed-out while waiting for reply, id = %" PRIu64, syncRequestID);
586     didReceiveSyncReply(sendSyncOptions);
587
588     return nullptr;
589 }
590
591 void Connection::processIncomingSyncReply(std::unique_ptr<Decoder> decoder)
592 {
593     {
594         LockHolder locker(m_syncReplyStateMutex);
595
596         // Go through the stack of sync requests that have pending replies and see which one
597         // this reply is for.
598         for (size_t i = m_pendingSyncReplies.size(); i > 0; --i) {
599             PendingSyncReply& pendingSyncReply = m_pendingSyncReplies[i - 1];
600
601             if (pendingSyncReply.syncRequestID != decoder->destinationID())
602                 continue;
603
604             ASSERT(!pendingSyncReply.replyDecoder);
605
606             pendingSyncReply.replyDecoder = WTFMove(decoder);
607             pendingSyncReply.didReceiveReply = true;
608
609             // We got a reply to the last send message, wake up the client run loop so it can be processed.
610             if (i == m_pendingSyncReplies.size())
611                 SyncMessageState::singleton().wakeUpClientRunLoop();
612
613             return;
614         }
615     }
616
617     {
618         LockHolder locker(m_replyHandlersLock);
619
620         auto replyHandler = m_replyHandlers.take(decoder->destinationID());
621         if (replyHandler.dispatcher) {
622             replyHandler.dispatcher->dispatch([protectedThis = makeRef(*this), handler = WTFMove(replyHandler.handler), decoder = WTFMove(decoder)] () mutable {
623                 if (!protectedThis->isValid()) {
624                     handler(nullptr);
625                     return;
626                 }
627
628                 handler(WTFMove(decoder));
629             });
630         }
631     }
632
633     // If we get here, it means we got a reply for a message that wasn't in the sync request stack or map.
634     // This can happen if the send timed out, so it's fine to ignore.
635 }
636
637 void Connection::processIncomingMessage(std::unique_ptr<Decoder> message)
638 {
639     ASSERT(!message->messageReceiverName().isEmpty());
640     ASSERT(!message->messageName().isEmpty());
641
642     if (message->messageReceiverName() == "IPC" && message->messageName() == "SyncMessageReply") {
643         processIncomingSyncReply(WTFMove(message));
644         return;
645     }
646
647     if (!m_workQueueMessageReceivers.isValidKey(message->messageReceiverName())) {
648         RefPtr<Connection> protectedThis(this);
649         StringReference messageReceiverNameReference = message->messageReceiverName();
650         String messageReceiverName(messageReceiverNameReference.isEmpty() ? "<unknown message receiver>" : String(messageReceiverNameReference.data(), messageReceiverNameReference.size()));
651         StringReference messageNameReference = message->messageName();
652         String messageName(messageNameReference.isEmpty() ? "<unknown message>" : String(messageNameReference.data(), messageNameReference.size()));
653
654         RunLoop::main().dispatch([protectedThis = makeRef(*this), messageReceiverName = WTFMove(messageReceiverName), messageName = WTFMove(messageName)]() mutable {
655             protectedThis->dispatchDidReceiveInvalidMessage(messageReceiverName.utf8(), messageName.utf8());
656         });
657         return;
658     }
659
660     auto it = m_workQueueMessageReceivers.find(message->messageReceiverName());
661     if (it != m_workQueueMessageReceivers.end()) {
662         it->value.first->dispatch([protectedThis = makeRef(*this), workQueueMessageReceiver = it->value.second, decoder = WTFMove(message)]() mutable {
663             protectedThis->dispatchWorkQueueMessageReceiverMessage(*workQueueMessageReceiver, *decoder);
664         });
665         return;
666     }
667
668 #if HAVE(QOS_CLASSES)
669     if (message->isSyncMessage() && m_shouldBoostMainThreadOnSyncMessage) {
670         pthread_override_t override = pthread_override_qos_class_start_np(m_mainThread, QOS_CLASS_USER_INTERACTIVE, 0);
671         message->setQOSClassOverride(override);
672     }
673 #endif
674
675     if (message->isSyncMessage()) {
676         std::lock_guard<Lock> lock(m_incomingSyncMessageCallbackMutex);
677
678         for (auto& callback : m_incomingSyncMessageCallbacks.values())
679             m_incomingSyncMessageCallbackQueue->dispatch(WTFMove(callback));
680
681         m_incomingSyncMessageCallbacks.clear();
682     }
683
684     // Check if this is a sync message or if it's a message that should be dispatched even when waiting for
685     // a sync reply. If it is, and we're waiting for a sync reply this message needs to be dispatched.
686     // If we don't we'll end up with a deadlock where both sync message senders are stuck waiting for a reply.
687     if (SyncMessageState::singleton().processIncomingMessage(*this, message))
688         return;
689
690     // Check if we're waiting for this message.
691     {
692         std::lock_guard<Lock> lock(m_waitForMessageMutex);
693
694         if (m_waitingForMessage && !m_waitingForMessage->decoder) {
695             if (m_waitingForMessage->messageReceiverName == message->messageReceiverName() && m_waitingForMessage->messageName == message->messageName() && m_waitingForMessage->destinationID == message->destinationID()) {
696                 m_waitingForMessage->decoder = WTFMove(message);
697                 ASSERT(m_waitingForMessage->decoder);
698                 m_waitForMessageCondition.notifyOne();
699                 return;
700             }
701
702             if (m_waitingForMessage->waitForOptions.contains(WaitForOption::InterruptWaitingIfSyncMessageArrives) && message->isSyncMessage()) {
703                 m_waitingForMessage->messageWaitingInterrupted = true;
704                 m_waitForMessageCondition.notifyOne();
705             }
706         }
707     }
708
709     enqueueIncomingMessage(WTFMove(message));
710 }
711
712 uint64_t Connection::installIncomingSyncMessageCallback(std::function<void ()> callback)
713 {
714     std::lock_guard<Lock> lock(m_incomingSyncMessageCallbackMutex);
715
716     m_nextIncomingSyncMessageCallbackID++;
717
718     if (!m_incomingSyncMessageCallbackQueue)
719         m_incomingSyncMessageCallbackQueue = WorkQueue::create("com.apple.WebKit.IPC.IncomingSyncMessageCallbackQueue");
720
721     m_incomingSyncMessageCallbacks.add(m_nextIncomingSyncMessageCallbackID, callback);
722
723     return m_nextIncomingSyncMessageCallbackID;
724 }
725
726 void Connection::uninstallIncomingSyncMessageCallback(uint64_t callbackID)
727 {
728     std::lock_guard<Lock> lock(m_incomingSyncMessageCallbackMutex);
729     m_incomingSyncMessageCallbacks.remove(callbackID);
730 }
731
732 bool Connection::hasIncomingSyncMessage()
733 {
734     std::lock_guard<Lock> lock(m_incomingMessagesMutex);
735
736     for (auto& message : m_incomingMessages) {
737         if (message->isSyncMessage())
738             return true;
739     }
740     
741     return false;
742 }
743
744 void Connection::postConnectionDidCloseOnConnectionWorkQueue()
745 {
746     m_connectionQueue->dispatch([protectedThis = makeRef(*this)]() mutable {
747         protectedThis->connectionDidClose();
748     });
749 }
750
751 void Connection::connectionDidClose()
752 {
753     // The connection is now invalid.
754     platformInvalidate();
755
756     {
757         LockHolder locker(m_replyHandlersLock);
758         for (auto& replyHandler : m_replyHandlers.values()) {
759             replyHandler.dispatcher->dispatch([handler = WTFMove(replyHandler.handler)] {
760                 handler(nullptr);
761             });
762         }
763
764         m_replyHandlers.clear();
765     }
766
767     {
768         LockHolder locker(m_syncReplyStateMutex);
769
770         ASSERT(m_shouldWaitForSyncReplies);
771         m_shouldWaitForSyncReplies = false;
772
773         if (!m_pendingSyncReplies.isEmpty())
774             SyncMessageState::singleton().wakeUpClientRunLoop();
775     }
776
777     {
778         std::lock_guard<Lock> lock(m_waitForMessageMutex);
779         if (m_waitingForMessage)
780             m_waitingForMessage->messageWaitingInterrupted = true;
781     }
782     m_waitForMessageCondition.notifyAll();
783
784     if (m_didCloseOnConnectionWorkQueueCallback)
785         m_didCloseOnConnectionWorkQueueCallback(this);
786
787     RunLoop::main().dispatch([protectedThis = makeRef(*this)]() mutable {
788         // If the connection has been explicitly invalidated before dispatchConnectionDidClose was called,
789         // then the the connection will be invalid here.
790         if (!protectedThis->isValid())
791             return;
792
793         // Set m_isValid to false before calling didClose, otherwise, sendSync will try to send a message
794         // to the connection and will then wait indefinitely for a reply.
795         protectedThis->m_isValid = false;
796
797         protectedThis->m_client.didClose(protectedThis.get());
798     });
799 }
800
801 bool Connection::canSendOutgoingMessages() const
802 {
803     return m_isConnected && platformCanSendOutgoingMessages();
804 }
805
806 void Connection::sendOutgoingMessages()
807 {
808     if (!canSendOutgoingMessages())
809         return;
810
811     while (true) {
812         std::unique_ptr<Encoder> message;
813
814         {
815             std::lock_guard<Lock> lock(m_outgoingMessagesMutex);
816             if (m_outgoingMessages.isEmpty())
817                 break;
818             message = m_outgoingMessages.takeFirst();
819         }
820
821         if (!sendOutgoingMessage(WTFMove(message)))
822             break;
823     }
824 }
825
826 void Connection::dispatchSyncMessage(Decoder& decoder)
827 {
828     ASSERT(decoder.isSyncMessage());
829
830     uint64_t syncRequestID = 0;
831     if (!decoder.decode(syncRequestID) || !syncRequestID) {
832         // We received an invalid sync message.
833         decoder.markInvalid();
834         return;
835     }
836
837     auto replyEncoder = std::make_unique<Encoder>("IPC", "SyncMessageReply", syncRequestID);
838
839     if (decoder.messageReceiverName() == "IPC" && decoder.messageName() == "WrappedAsyncMessageForTesting") {
840         if (!m_fullySynchronousModeIsAllowedForTesting) {
841             decoder.markInvalid();
842             return;
843         }
844         std::unique_ptr<Decoder> unwrappedDecoder = Decoder::unwrapForTesting(decoder);
845         RELEASE_ASSERT(unwrappedDecoder);
846         processIncomingMessage(WTFMove(unwrappedDecoder));
847
848         SyncMessageState::singleton().dispatchMessages(nullptr);
849     } else {
850         // Hand off both the decoder and encoder to the client.
851         m_client.didReceiveSyncMessage(*this, decoder, replyEncoder);
852     }
853
854     // FIXME: If the message was invalid, we should send back a SyncMessageError.
855     ASSERT(!decoder.isInvalid());
856
857     if (replyEncoder)
858         sendSyncReply(WTFMove(replyEncoder));
859 }
860
861 void Connection::dispatchDidReceiveInvalidMessage(const CString& messageReceiverNameString, const CString& messageNameString)
862 {
863     ASSERT(RunLoop::isMain());
864
865     if (!isValid())
866         return;
867
868     m_client.didReceiveInvalidMessage(*this, StringReference(messageReceiverNameString.data(), messageReceiverNameString.length()), StringReference(messageNameString.data(), messageNameString.length()));
869 }
870
871 void Connection::didFailToSendSyncMessage()
872 {
873     if (!m_shouldExitOnSyncMessageSendFailure)
874         return;
875
876     exit(0);
877 }
878
879 void Connection::enqueueIncomingMessage(std::unique_ptr<Decoder> incomingMessage)
880 {
881     {
882         std::lock_guard<Lock> lock(m_incomingMessagesMutex);
883         m_incomingMessages.append(WTFMove(incomingMessage));
884     }
885
886     RunLoop::main().dispatch([protectedThis = makeRef(*this)]() mutable {
887         protectedThis->dispatchOneMessage();
888     });
889 }
890
891 void Connection::dispatchMessage(Decoder& decoder)
892 {
893     m_client.didReceiveMessage(*this, decoder);
894 }
895
896 void Connection::dispatchMessage(std::unique_ptr<Decoder> message)
897 {
898     if (!isValid())
899         return;
900
901     if (message->shouldUseFullySynchronousModeForTesting()) {
902         if (!m_fullySynchronousModeIsAllowedForTesting) {
903             m_client.didReceiveInvalidMessage(*this, message->messageReceiverName(), message->messageName());
904             return;
905         }
906         m_inDispatchMessageMarkedToUseFullySynchronousModeForTesting++;
907     }
908
909     m_inDispatchMessageCount++;
910
911     if (message->shouldDispatchMessageWhenWaitingForSyncReply())
912         m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount++;
913
914     bool oldDidReceiveInvalidMessage = m_didReceiveInvalidMessage;
915     m_didReceiveInvalidMessage = false;
916
917     if (message->isSyncMessage())
918         dispatchSyncMessage(*message);
919     else
920         dispatchMessage(*message);
921
922     m_didReceiveInvalidMessage |= message->isInvalid();
923     m_inDispatchMessageCount--;
924
925     // FIXME: For Delayed synchronous messages, we should not decrement the counter until we send a response.
926     // Otherwise, we would deadlock if processing the message results in a sync message back after we exit this function.
927     if (message->shouldDispatchMessageWhenWaitingForSyncReply())
928         m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount--;
929
930     if (message->shouldUseFullySynchronousModeForTesting())
931         m_inDispatchMessageMarkedToUseFullySynchronousModeForTesting--;
932
933     if (m_didReceiveInvalidMessage && isValid())
934         m_client.didReceiveInvalidMessage(*this, message->messageReceiverName(), message->messageName());
935
936     m_didReceiveInvalidMessage = oldDidReceiveInvalidMessage;
937 }
938
939 void Connection::dispatchOneMessage()
940 {
941     std::unique_ptr<Decoder> message;
942
943     {
944         std::lock_guard<Lock> lock(m_incomingMessagesMutex);
945         if (m_incomingMessages.isEmpty())
946             return;
947
948         message = m_incomingMessages.takeFirst();
949     }
950
951     dispatchMessage(WTFMove(message));
952 }
953
954 void Connection::wakeUpRunLoop()
955 {
956     RunLoop::main().wakeUp();
957 }
958
959 } // namespace IPC