57eb370d69b0d3bcb316d178798b3a7873eac7e4
[WebKit-https.git] / Source / WebKit / Platform / IPC / Connection.cpp
1 /*
2  * Copyright (C) 2010-2016 Apple Inc. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  * 1. Redistributions of source code must retain the above copyright
8  *    notice, this list of conditions and the following disclaimer.
9  * 2. Redistributions in binary form must reproduce the above copyright
10  *    notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
14  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
15  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
17  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
18  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
19  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
20  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
21  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
23  * THE POSSIBILITY OF SUCH DAMAGE.
24  */
25
26 #include "config.h"
27 #include "Connection.h"
28
29 #include "Logging.h"
30 #include <memory>
31 #include <wtf/CurrentTime.h>
32 #include <wtf/HashSet.h>
33 #include <wtf/NeverDestroyed.h>
34 #include <wtf/RunLoop.h>
35 #include <wtf/text/WTFString.h>
36 #include <wtf/threads/BinarySemaphore.h>
37
38 #if PLATFORM(COCOA)
39 #include "MachMessage.h"
40 #endif
41
42 #if USE(UNIX_DOMAIN_SOCKETS)
43 #include "UnixMessage.h"
44 #endif
45
46 namespace IPC {
47
48 struct Connection::ReplyHandler {
49     RefPtr<FunctionDispatcher> dispatcher;
50     Function<void (std::unique_ptr<Decoder>)> handler;
51 };
52
53 struct Connection::WaitForMessageState {
54     WaitForMessageState(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, OptionSet<WaitForOption> waitForOptions)
55         : messageReceiverName(messageReceiverName)
56         , messageName(messageName)
57         , destinationID(destinationID)
58         , waitForOptions(waitForOptions)
59     {
60     }
61
62     StringReference messageReceiverName;
63     StringReference messageName;
64     uint64_t destinationID;
65
66     OptionSet<WaitForOption> waitForOptions;
67     bool messageWaitingInterrupted = false;
68
69     std::unique_ptr<Decoder> decoder;
70 };
71
72 class Connection::SyncMessageState {
73 public:
74     static SyncMessageState& singleton();
75
76     SyncMessageState();
77     ~SyncMessageState() = delete;
78
79     void wakeUpClientRunLoop()
80     {
81         m_waitForSyncReplySemaphore.signal();
82     }
83
84     bool wait(TimeWithDynamicClockType absoluteTime)
85     {
86         return m_waitForSyncReplySemaphore.wait(absoluteTime);
87     }
88
89     // Returns true if this message will be handled on a client thread that is currently
90     // waiting for a reply to a synchronous message.
91     bool processIncomingMessage(Connection&, std::unique_ptr<Decoder>&);
92
93     // Dispatch pending sync messages. if allowedConnection is not null, will only dispatch messages
94     // from that connection and put the other messages back in the queue.
95     void dispatchMessages(Connection* allowedConnection);
96
97 private:
98     void dispatchMessageAndResetDidScheduleDispatchMessagesForConnection(Connection&);
99
100     BinarySemaphore m_waitForSyncReplySemaphore;
101
102     // Protects m_didScheduleDispatchMessagesWorkSet and m_messagesToDispatchWhileWaitingForSyncReply.
103     Lock m_mutex;
104
105     // The set of connections for which we've scheduled a call to dispatchMessageAndResetDidScheduleDispatchMessagesForConnection.
106     HashSet<RefPtr<Connection>> m_didScheduleDispatchMessagesWorkSet;
107
108     struct ConnectionAndIncomingMessage {
109         Ref<Connection> connection;
110         std::unique_ptr<Decoder> message;
111     };
112     Vector<ConnectionAndIncomingMessage> m_messagesToDispatchWhileWaitingForSyncReply;
113 };
114
115 Connection::SyncMessageState& Connection::SyncMessageState::singleton()
116 {
117     static std::once_flag onceFlag;
118     static LazyNeverDestroyed<SyncMessageState> syncMessageState;
119
120     std::call_once(onceFlag, [] {
121         syncMessageState.construct();
122     });
123
124     return syncMessageState;
125 }
126
127 Connection::SyncMessageState::SyncMessageState()
128 {
129 }
130
131 bool Connection::SyncMessageState::processIncomingMessage(Connection& connection, std::unique_ptr<Decoder>& message)
132 {
133     if (!message->shouldDispatchMessageWhenWaitingForSyncReply())
134         return false;
135
136     ConnectionAndIncomingMessage connectionAndIncomingMessage { connection, WTFMove(message) };
137
138     {
139         std::lock_guard<Lock> lock(m_mutex);
140         
141         if (m_didScheduleDispatchMessagesWorkSet.add(&connection).isNewEntry) {
142             RunLoop::main().dispatch([this, protectedConnection = Ref<Connection>(connection)]() mutable {
143                 dispatchMessageAndResetDidScheduleDispatchMessagesForConnection(protectedConnection);
144             });
145         }
146
147         m_messagesToDispatchWhileWaitingForSyncReply.append(WTFMove(connectionAndIncomingMessage));
148     }
149
150     wakeUpClientRunLoop();
151
152     return true;
153 }
154
155 void Connection::SyncMessageState::dispatchMessages(Connection* allowedConnection)
156 {
157     ASSERT(RunLoop::isMain());
158
159     Vector<ConnectionAndIncomingMessage> messagesToDispatchWhileWaitingForSyncReply;
160
161     {
162         std::lock_guard<Lock> lock(m_mutex);
163         m_messagesToDispatchWhileWaitingForSyncReply.swap(messagesToDispatchWhileWaitingForSyncReply);
164     }
165
166     Vector<ConnectionAndIncomingMessage> messagesToPutBack;
167
168     for (size_t i = 0; i < messagesToDispatchWhileWaitingForSyncReply.size(); ++i) {
169         ConnectionAndIncomingMessage& connectionAndIncomingMessage = messagesToDispatchWhileWaitingForSyncReply[i];
170
171         if (allowedConnection && allowedConnection != connectionAndIncomingMessage.connection.ptr()) {
172             // This incoming message belongs to another connection and we don't want to dispatch it now
173             // so mark it to be put back in the message queue.
174             messagesToPutBack.append(WTFMove(connectionAndIncomingMessage));
175             continue;
176         }
177
178         connectionAndIncomingMessage.connection->dispatchMessage(WTFMove(connectionAndIncomingMessage.message));
179     }
180
181     if (!messagesToPutBack.isEmpty()) {
182         std::lock_guard<Lock> lock(m_mutex);
183
184         for (auto& message : messagesToPutBack)
185             m_messagesToDispatchWhileWaitingForSyncReply.append(WTFMove(message));
186     }
187 }
188
189 void Connection::SyncMessageState::dispatchMessageAndResetDidScheduleDispatchMessagesForConnection(Connection& connection)
190 {
191     {
192         std::lock_guard<Lock> lock(m_mutex);
193         ASSERT(m_didScheduleDispatchMessagesWorkSet.contains(&connection));
194         m_didScheduleDispatchMessagesWorkSet.remove(&connection);
195     }
196
197     dispatchMessages(&connection);
198 }
199
200 // Represents a sync request for which we're waiting on a reply.
201 struct Connection::PendingSyncReply {
202     // The request ID.
203     uint64_t syncRequestID { 0 };
204
205     // The reply decoder, will be null if there was an error processing the sync
206     // message on the other side.
207     std::unique_ptr<Decoder> replyDecoder;
208
209     // Will be set to true once a reply has been received.
210     bool didReceiveReply { false };
211
212     PendingSyncReply() = default;
213
214     explicit PendingSyncReply(uint64_t syncRequestID)
215         : syncRequestID(syncRequestID)
216     {
217     }
218 };
219
220 Ref<Connection> Connection::createServerConnection(Identifier identifier, Client& client)
221 {
222     return adoptRef(*new Connection(identifier, true, client));
223 }
224
225 Ref<Connection> Connection::createClientConnection(Identifier identifier, Client& client)
226 {
227     return adoptRef(*new Connection(identifier, false, client));
228 }
229
230 Connection::Connection(Identifier identifier, bool isServer, Client& client)
231     : m_client(client)
232     , m_isServer(isServer)
233     , m_syncRequestID(0)
234     , m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(false)
235     , m_shouldExitOnSyncMessageSendFailure(false)
236     , m_didCloseOnConnectionWorkQueueCallback(0)
237     , m_isConnected(false)
238     , m_connectionQueue(WorkQueue::create("com.apple.IPC.ReceiveQueue"))
239     , m_inSendSyncCount(0)
240     , m_inDispatchMessageCount(0)
241     , m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount(0)
242     , m_didReceiveInvalidMessage(false)
243     , m_waitingForMessage(nullptr)
244     , m_shouldWaitForSyncReplies(true)
245 {
246     ASSERT(RunLoop::isMain());
247
248     platformInitialize(identifier);
249
250 #if HAVE(QOS_CLASSES)
251     ASSERT(pthread_main_np());
252     m_mainThread = pthread_self();
253 #endif
254 }
255
256 Connection::~Connection()
257 {
258     ASSERT(!isValid());
259 }
260
261 void Connection::setOnlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(bool flag)
262 {
263     ASSERT(!m_isConnected);
264
265     m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage = flag;
266 }
267
268 void Connection::setShouldExitOnSyncMessageSendFailure(bool shouldExitOnSyncMessageSendFailure)
269 {
270     ASSERT(!m_isConnected);
271
272     m_shouldExitOnSyncMessageSendFailure = shouldExitOnSyncMessageSendFailure;
273 }
274
275 void Connection::addWorkQueueMessageReceiver(StringReference messageReceiverName, WorkQueue& workQueue, WorkQueueMessageReceiver* workQueueMessageReceiver)
276 {
277     ASSERT(RunLoop::isMain());
278
279     m_connectionQueue->dispatch([protectedThis = makeRef(*this), messageReceiverName = WTFMove(messageReceiverName), workQueue = &workQueue, workQueueMessageReceiver]() mutable {
280         ASSERT(!protectedThis->m_workQueueMessageReceivers.contains(messageReceiverName));
281
282         protectedThis->m_workQueueMessageReceivers.add(messageReceiverName, std::make_pair(workQueue, workQueueMessageReceiver));
283     });
284 }
285
286 void Connection::removeWorkQueueMessageReceiver(StringReference messageReceiverName)
287 {
288     ASSERT(RunLoop::isMain());
289
290     m_connectionQueue->dispatch([protectedThis = makeRef(*this), messageReceiverName = WTFMove(messageReceiverName)]() mutable {
291         ASSERT(protectedThis->m_workQueueMessageReceivers.contains(messageReceiverName));
292         protectedThis->m_workQueueMessageReceivers.remove(messageReceiverName);
293     });
294 }
295
296 void Connection::dispatchWorkQueueMessageReceiverMessage(WorkQueueMessageReceiver& workQueueMessageReceiver, Decoder& decoder)
297 {
298     if (!decoder.isSyncMessage()) {
299         workQueueMessageReceiver.didReceiveMessage(*this, decoder);
300         return;
301     }
302
303     uint64_t syncRequestID = 0;
304     if (!decoder.decode(syncRequestID) || !syncRequestID) {
305         // We received an invalid sync message.
306         // FIXME: Handle this.
307         decoder.markInvalid();
308         return;
309     }
310
311     auto replyEncoder = std::make_unique<Encoder>("IPC", "SyncMessageReply", syncRequestID);
312
313     // Hand off both the decoder and encoder to the work queue message receiver.
314     workQueueMessageReceiver.didReceiveSyncMessage(*this, decoder, replyEncoder);
315
316     // FIXME: If the message was invalid, we should send back a SyncMessageError.
317     ASSERT(!decoder.isInvalid());
318
319     if (replyEncoder)
320         sendSyncReply(WTFMove(replyEncoder));
321 }
322
323 void Connection::setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback callback)
324 {
325     ASSERT(!m_isConnected);
326
327     m_didCloseOnConnectionWorkQueueCallback = callback;    
328 }
329
330 void Connection::invalidate()
331 {
332     ASSERT(RunLoop::isMain());
333
334     if (!isValid()) {
335         // Someone already called invalidate().
336         return;
337     }
338     
339     m_isValid = false;
340
341     {
342         std::lock_guard<Lock> lock(m_replyHandlersLock);
343         for (auto& replyHandler : m_replyHandlers.values()) {
344             replyHandler.dispatcher->dispatch([handler = WTFMove(replyHandler.handler)] {
345                 handler(nullptr);
346             });
347         }
348
349         m_replyHandlers.clear();
350     }
351
352     m_connectionQueue->dispatch([protectedThis = makeRef(*this)]() mutable {
353         protectedThis->platformInvalidate();
354     });
355 }
356
357 void Connection::markCurrentlyDispatchedMessageAsInvalid()
358 {
359     // This should only be called while processing a message.
360     ASSERT(m_inDispatchMessageCount > 0);
361
362     m_didReceiveInvalidMessage = true;
363 }
364
365 std::unique_ptr<Encoder> Connection::createSyncMessageEncoder(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, uint64_t& syncRequestID)
366 {
367     auto encoder = std::make_unique<Encoder>(messageReceiverName, messageName, destinationID);
368     encoder->setIsSyncMessage(true);
369
370     // Encode the sync request ID.
371     syncRequestID = ++m_syncRequestID;
372     *encoder << syncRequestID;
373
374     return encoder;
375 }
376
377 bool Connection::sendMessage(std::unique_ptr<Encoder> encoder, OptionSet<SendOption> sendOptions)
378 {
379     if (!isValid())
380         return false;
381
382     if (isMainThread() && m_inDispatchMessageMarkedToUseFullySynchronousModeForTesting && !encoder->isSyncMessage() && !(encoder->messageReceiverName() == "IPC")) {
383         uint64_t syncRequestID;
384         auto wrappedMessage = createSyncMessageEncoder("IPC", "WrappedAsyncMessageForTesting", encoder->destinationID(), syncRequestID);
385         wrappedMessage->setFullySynchronousModeForTesting();
386         wrappedMessage->wrapForTesting(WTFMove(encoder));
387         return static_cast<bool>(sendSyncMessage(syncRequestID, WTFMove(wrappedMessage), Seconds::infinity(), { }));
388     }
389
390     if (sendOptions.contains(SendOption::DispatchMessageEvenWhenWaitingForSyncReply)
391         && (!m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage
392             || m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount))
393         encoder->setShouldDispatchMessageWhenWaitingForSyncReply(true);
394
395     {
396         std::lock_guard<Lock> lock(m_outgoingMessagesMutex);
397         m_outgoingMessages.append(WTFMove(encoder));
398     }
399     
400     // FIXME: We should add a boolean flag so we don't call this when work has already been scheduled.
401     m_connectionQueue->dispatch([protectedThis = makeRef(*this)]() mutable {
402         protectedThis->sendOutgoingMessages();
403     });
404     return true;
405 }
406
407 void Connection::sendMessageWithReply(uint64_t requestID, std::unique_ptr<Encoder> encoder, FunctionDispatcher& replyDispatcher, Function<void (std::unique_ptr<Decoder>)>&& replyHandler)
408 {
409     {
410         std::lock_guard<Lock> lock(m_replyHandlersLock);
411
412         if (!isValid()) {
413             replyDispatcher.dispatch([replyHandler = WTFMove(replyHandler)] {
414                 replyHandler(nullptr);
415             });
416             return;
417         }
418
419         ASSERT(!m_replyHandlers.contains(requestID));
420         m_replyHandlers.set(requestID, ReplyHandler { &replyDispatcher, WTFMove(replyHandler) });
421     }
422
423     sendMessage(WTFMove(encoder), { });
424 }
425
426 bool Connection::sendSyncReply(std::unique_ptr<Encoder> encoder)
427 {
428     return sendMessage(WTFMove(encoder), { });
429 }
430
431 Seconds Connection::timeoutRespectingIgnoreTimeoutsForTesting(Seconds timeout) const
432 {
433     return m_ignoreTimeoutsForTesting ? Seconds::infinity() : timeout;
434 }
435
436 std::unique_ptr<Decoder> Connection::waitForMessage(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, Seconds timeout, OptionSet<WaitForOption> waitForOptions)
437 {
438     ASSERT(RunLoop::isMain());
439
440     timeout = timeoutRespectingIgnoreTimeoutsForTesting(timeout);
441
442     bool hasIncomingSynchronousMessage = false;
443
444     // First, check if this message is already in the incoming messages queue.
445     {
446         std::lock_guard<Lock> lock(m_incomingMessagesMutex);
447
448         for (auto it = m_incomingMessages.begin(), end = m_incomingMessages.end(); it != end; ++it) {
449             std::unique_ptr<Decoder>& message = *it;
450
451             if (message->messageReceiverName() == messageReceiverName && message->messageName() == messageName && message->destinationID() == destinationID) {
452                 std::unique_ptr<Decoder> returnedMessage = WTFMove(message);
453
454                 m_incomingMessages.remove(it);
455                 return returnedMessage;
456             }
457
458             if (message->isSyncMessage())
459                 hasIncomingSynchronousMessage = true;
460         }
461     }
462
463     // Don't even start waiting if we have InterruptWaitingIfSyncMessageArrives and there's a sync message already in the queue.
464     if (hasIncomingSynchronousMessage && waitForOptions.contains(WaitForOption::InterruptWaitingIfSyncMessageArrives)) {
465         m_waitingForMessage = nullptr;
466         return nullptr;
467     }
468
469     WaitForMessageState waitingForMessage(messageReceiverName, messageName, destinationID, waitForOptions);
470
471     {
472         std::lock_guard<Lock> lock(m_waitForMessageMutex);
473
474         // We don't support having multiple clients waiting for messages.
475         ASSERT(!m_waitingForMessage);
476
477         m_waitingForMessage = &waitingForMessage;
478     }
479
480     MonotonicTime absoluteTimeout = MonotonicTime::now() + timeout;
481
482     // Now wait for it to be set.
483     while (true) {
484         std::unique_lock<Lock> lock(m_waitForMessageMutex);
485
486         if (m_waitingForMessage->decoder) {
487             auto decoder = WTFMove(m_waitingForMessage->decoder);
488             m_waitingForMessage = nullptr;
489             return decoder;
490         }
491
492         // Now we wait.
493         bool didTimeout = !m_waitForMessageCondition.waitUntil(lock, absoluteTimeout);
494         // We timed out, lost our connection, or a sync message came in with InterruptWaitingIfSyncMessageArrives, so stop waiting.
495         if (didTimeout || m_waitingForMessage->messageWaitingInterrupted) {
496             m_waitingForMessage = nullptr;
497             break;
498         }
499     }
500
501     return nullptr;
502 }
503
504 std::unique_ptr<Decoder> Connection::sendSyncMessage(uint64_t syncRequestID, std::unique_ptr<Encoder> encoder, Seconds timeout, OptionSet<SendSyncOption> sendSyncOptions)
505 {
506     ASSERT(RunLoop::isMain());
507
508     if (!isValid()) {
509         didFailToSendSyncMessage();
510         return nullptr;
511     }
512
513     // Push the pending sync reply information on our stack.
514     {
515         LockHolder locker(m_syncReplyStateMutex);
516         if (!m_shouldWaitForSyncReplies) {
517             didFailToSendSyncMessage();
518             return nullptr;
519         }
520
521         m_pendingSyncReplies.append(PendingSyncReply(syncRequestID));
522     }
523
524     ++m_inSendSyncCount;
525
526     // If the caller has set the DoNotProcessIncomingMessagesWhenWaitingForSyncReply then we need to make sure the destination process
527     // dispatches this message even when waiting for a sync reply. It could cause deadlocks otherwise.
528     if (sendSyncOptions.contains(SendSyncOption::DoNotProcessIncomingMessagesWhenWaitingForSyncReply))
529         encoder->setShouldDispatchMessageWhenWaitingForSyncReply(true);
530
531     // First send the message.
532     sendMessage(WTFMove(encoder), IPC::SendOption::DispatchMessageEvenWhenWaitingForSyncReply);
533
534     // Then wait for a reply. Waiting for a reply could involve dispatching incoming sync messages, so
535     // keep an extra reference to the connection here in case it's invalidated.
536     Ref<Connection> protect(*this);
537     std::unique_ptr<Decoder> reply = waitForSyncReply(syncRequestID, timeout, sendSyncOptions);
538
539     --m_inSendSyncCount;
540
541     // Finally, pop the pending sync reply information.
542     {
543         LockHolder locker(m_syncReplyStateMutex);
544         ASSERT(m_pendingSyncReplies.last().syncRequestID == syncRequestID);
545         m_pendingSyncReplies.removeLast();
546     }
547
548     if (!reply)
549         didFailToSendSyncMessage();
550
551     return reply;
552 }
553
554 std::unique_ptr<Decoder> Connection::waitForSyncReply(uint64_t syncRequestID, Seconds timeout, OptionSet<SendSyncOption> sendSyncOptions)
555 {
556     timeout = timeoutRespectingIgnoreTimeoutsForTesting(timeout);
557     WallTime absoluteTime = WallTime::now() + timeout;
558
559     willSendSyncMessage(sendSyncOptions);
560     
561     bool timedOut = false;
562     while (!timedOut) {
563         // First, check if we have any messages that we need to process.
564         if (!sendSyncOptions.contains(SendSyncOption::DoNotProcessIncomingMessagesWhenWaitingForSyncReply))
565             SyncMessageState::singleton().dispatchMessages(nullptr);
566         
567         {
568             LockHolder locker(m_syncReplyStateMutex);
569
570             // Second, check if there is a sync reply at the top of the stack.
571             ASSERT(!m_pendingSyncReplies.isEmpty());
572             
573             PendingSyncReply& pendingSyncReply = m_pendingSyncReplies.last();
574             ASSERT_UNUSED(syncRequestID, pendingSyncReply.syncRequestID == syncRequestID);
575             
576             // We found the sync reply, or the connection was closed.
577             if (pendingSyncReply.didReceiveReply || !m_shouldWaitForSyncReplies) {
578                 didReceiveSyncReply(sendSyncOptions);
579                 return WTFMove(pendingSyncReply.replyDecoder);
580             }
581         }
582
583         // Processing a sync message could cause the connection to be invalidated.
584         // (If the handler ends up calling Connection::invalidate).
585         // If that happens, we need to stop waiting, or we'll hang since we won't get
586         // any more incoming messages.
587         if (!isValid()) {
588             RELEASE_LOG_ERROR(IPC, "Connection::waitForSyncReply: Connection no longer valid, id = %" PRIu64, syncRequestID);
589             didReceiveSyncReply(sendSyncOptions);
590             return nullptr;
591         }
592
593         // We didn't find a sync reply yet, keep waiting.
594         // This allows the WebProcess to still serve clients while waiting for the message to return.
595         // Notably, it can continue to process accessibility requests, which are on the main thread.
596         timedOut = !SyncMessageState::singleton().wait(absoluteTime);
597     }
598
599     RELEASE_LOG_ERROR(IPC, "Connection::waitForSyncReply: Timed-out while waiting for reply, id = %" PRIu64, syncRequestID);
600     didReceiveSyncReply(sendSyncOptions);
601
602     return nullptr;
603 }
604
605 void Connection::processIncomingSyncReply(std::unique_ptr<Decoder> decoder)
606 {
607     {
608         LockHolder locker(m_syncReplyStateMutex);
609
610         // Go through the stack of sync requests that have pending replies and see which one
611         // this reply is for.
612         for (size_t i = m_pendingSyncReplies.size(); i > 0; --i) {
613             PendingSyncReply& pendingSyncReply = m_pendingSyncReplies[i - 1];
614
615             if (pendingSyncReply.syncRequestID != decoder->destinationID())
616                 continue;
617
618             ASSERT(!pendingSyncReply.replyDecoder);
619
620             pendingSyncReply.replyDecoder = WTFMove(decoder);
621             pendingSyncReply.didReceiveReply = true;
622
623             // We got a reply to the last send message, wake up the client run loop so it can be processed.
624             if (i == m_pendingSyncReplies.size())
625                 SyncMessageState::singleton().wakeUpClientRunLoop();
626
627             return;
628         }
629     }
630
631     {
632         LockHolder locker(m_replyHandlersLock);
633
634         auto replyHandler = m_replyHandlers.take(decoder->destinationID());
635         if (replyHandler.dispatcher) {
636             replyHandler.dispatcher->dispatch([protectedThis = makeRef(*this), handler = WTFMove(replyHandler.handler), decoder = WTFMove(decoder)] () mutable {
637                 if (!protectedThis->isValid()) {
638                     handler(nullptr);
639                     return;
640                 }
641
642                 handler(WTFMove(decoder));
643             });
644         }
645     }
646
647     // If we get here, it means we got a reply for a message that wasn't in the sync request stack or map.
648     // This can happen if the send timed out, so it's fine to ignore.
649 }
650
651 void Connection::processIncomingMessage(std::unique_ptr<Decoder> message)
652 {
653     ASSERT(!message->messageReceiverName().isEmpty());
654     ASSERT(!message->messageName().isEmpty());
655
656     if (message->messageReceiverName() == "IPC" && message->messageName() == "SyncMessageReply") {
657         processIncomingSyncReply(WTFMove(message));
658         return;
659     }
660
661     if (!m_workQueueMessageReceivers.isValidKey(message->messageReceiverName())) {
662         RefPtr<Connection> protectedThis(this);
663         StringReference messageReceiverNameReference = message->messageReceiverName();
664         String messageReceiverName(messageReceiverNameReference.isEmpty() ? "<unknown message receiver>" : String(messageReceiverNameReference.data(), messageReceiverNameReference.size()));
665         StringReference messageNameReference = message->messageName();
666         String messageName(messageNameReference.isEmpty() ? "<unknown message>" : String(messageNameReference.data(), messageNameReference.size()));
667
668         RunLoop::main().dispatch([protectedThis = makeRef(*this), messageReceiverName = WTFMove(messageReceiverName), messageName = WTFMove(messageName)]() mutable {
669             protectedThis->dispatchDidReceiveInvalidMessage(messageReceiverName.utf8(), messageName.utf8());
670         });
671         return;
672     }
673
674     auto it = m_workQueueMessageReceivers.find(message->messageReceiverName());
675     if (it != m_workQueueMessageReceivers.end()) {
676         it->value.first->dispatch([protectedThis = makeRef(*this), workQueueMessageReceiver = it->value.second, decoder = WTFMove(message)]() mutable {
677             protectedThis->dispatchWorkQueueMessageReceiverMessage(*workQueueMessageReceiver, *decoder);
678         });
679         return;
680     }
681
682 #if HAVE(QOS_CLASSES)
683     if (message->isSyncMessage() && m_shouldBoostMainThreadOnSyncMessage) {
684         pthread_override_t override = pthread_override_qos_class_start_np(m_mainThread, Thread::adjustedQOSClass(QOS_CLASS_USER_INTERACTIVE), 0);
685         message->setQOSClassOverride(override);
686     }
687 #endif
688
689     if (message->isSyncMessage()) {
690         std::lock_guard<Lock> lock(m_incomingSyncMessageCallbackMutex);
691
692         for (auto& callback : m_incomingSyncMessageCallbacks.values())
693             m_incomingSyncMessageCallbackQueue->dispatch(WTFMove(callback));
694
695         m_incomingSyncMessageCallbacks.clear();
696     }
697
698     // Check if this is a sync message or if it's a message that should be dispatched even when waiting for
699     // a sync reply. If it is, and we're waiting for a sync reply this message needs to be dispatched.
700     // If we don't we'll end up with a deadlock where both sync message senders are stuck waiting for a reply.
701     if (SyncMessageState::singleton().processIncomingMessage(*this, message))
702         return;
703
704     // Check if we're waiting for this message.
705     {
706         std::lock_guard<Lock> lock(m_waitForMessageMutex);
707
708         if (m_waitingForMessage && !m_waitingForMessage->decoder) {
709             if (m_waitingForMessage->messageReceiverName == message->messageReceiverName() && m_waitingForMessage->messageName == message->messageName() && m_waitingForMessage->destinationID == message->destinationID()) {
710                 m_waitingForMessage->decoder = WTFMove(message);
711                 ASSERT(m_waitingForMessage->decoder);
712                 m_waitForMessageCondition.notifyOne();
713                 return;
714             }
715
716             if (m_waitingForMessage->waitForOptions.contains(WaitForOption::InterruptWaitingIfSyncMessageArrives) && message->isSyncMessage()) {
717                 m_waitingForMessage->messageWaitingInterrupted = true;
718                 m_waitForMessageCondition.notifyOne();
719             }
720         }
721     }
722
723     enqueueIncomingMessage(WTFMove(message));
724 }
725
726 uint64_t Connection::installIncomingSyncMessageCallback(WTF::Function<void ()>&& callback)
727 {
728     std::lock_guard<Lock> lock(m_incomingSyncMessageCallbackMutex);
729
730     m_nextIncomingSyncMessageCallbackID++;
731
732     if (!m_incomingSyncMessageCallbackQueue)
733         m_incomingSyncMessageCallbackQueue = WorkQueue::create("com.apple.WebKit.IPC.IncomingSyncMessageCallbackQueue");
734
735     m_incomingSyncMessageCallbacks.add(m_nextIncomingSyncMessageCallbackID, WTFMove(callback));
736
737     return m_nextIncomingSyncMessageCallbackID;
738 }
739
740 void Connection::uninstallIncomingSyncMessageCallback(uint64_t callbackID)
741 {
742     std::lock_guard<Lock> lock(m_incomingSyncMessageCallbackMutex);
743     m_incomingSyncMessageCallbacks.remove(callbackID);
744 }
745
746 bool Connection::hasIncomingSyncMessage()
747 {
748     std::lock_guard<Lock> lock(m_incomingMessagesMutex);
749
750     for (auto& message : m_incomingMessages) {
751         if (message->isSyncMessage())
752             return true;
753     }
754     
755     return false;
756 }
757
758 void Connection::postConnectionDidCloseOnConnectionWorkQueue()
759 {
760     m_connectionQueue->dispatch([protectedThis = makeRef(*this)]() mutable {
761         protectedThis->connectionDidClose();
762     });
763 }
764
765 void Connection::connectionDidClose()
766 {
767     // The connection is now invalid.
768     platformInvalidate();
769
770     {
771         LockHolder locker(m_replyHandlersLock);
772         for (auto& replyHandler : m_replyHandlers.values()) {
773             replyHandler.dispatcher->dispatch([handler = WTFMove(replyHandler.handler)] {
774                 handler(nullptr);
775             });
776         }
777
778         m_replyHandlers.clear();
779     }
780
781     {
782         LockHolder locker(m_syncReplyStateMutex);
783
784         ASSERT(m_shouldWaitForSyncReplies);
785         m_shouldWaitForSyncReplies = false;
786
787         if (!m_pendingSyncReplies.isEmpty())
788             SyncMessageState::singleton().wakeUpClientRunLoop();
789     }
790
791     {
792         std::lock_guard<Lock> lock(m_waitForMessageMutex);
793         if (m_waitingForMessage)
794             m_waitingForMessage->messageWaitingInterrupted = true;
795     }
796     m_waitForMessageCondition.notifyAll();
797
798     if (m_didCloseOnConnectionWorkQueueCallback)
799         m_didCloseOnConnectionWorkQueueCallback(this);
800
801     RunLoop::main().dispatch([protectedThis = makeRef(*this)]() mutable {
802         // If the connection has been explicitly invalidated before dispatchConnectionDidClose was called,
803         // then the connection will be invalid here.
804         if (!protectedThis->isValid())
805             return;
806
807         // Set m_isValid to false before calling didClose, otherwise, sendSync will try to send a message
808         // to the connection and will then wait indefinitely for a reply.
809         protectedThis->m_isValid = false;
810
811         protectedThis->m_client.didClose(protectedThis.get());
812     });
813 }
814
815 bool Connection::canSendOutgoingMessages() const
816 {
817     return m_isConnected && platformCanSendOutgoingMessages();
818 }
819
820 void Connection::sendOutgoingMessages()
821 {
822     if (!canSendOutgoingMessages())
823         return;
824
825     while (true) {
826         std::unique_ptr<Encoder> message;
827
828         {
829             std::lock_guard<Lock> lock(m_outgoingMessagesMutex);
830             if (m_outgoingMessages.isEmpty())
831                 break;
832             message = m_outgoingMessages.takeFirst();
833         }
834
835         if (!sendOutgoingMessage(WTFMove(message)))
836             break;
837     }
838 }
839
840 void Connection::dispatchSyncMessage(Decoder& decoder)
841 {
842     ASSERT(decoder.isSyncMessage());
843
844     uint64_t syncRequestID = 0;
845     if (!decoder.decode(syncRequestID) || !syncRequestID) {
846         // We received an invalid sync message.
847         decoder.markInvalid();
848         return;
849     }
850
851     auto replyEncoder = std::make_unique<Encoder>("IPC", "SyncMessageReply", syncRequestID);
852
853     if (decoder.messageReceiverName() == "IPC" && decoder.messageName() == "WrappedAsyncMessageForTesting") {
854         if (!m_fullySynchronousModeIsAllowedForTesting) {
855             decoder.markInvalid();
856             return;
857         }
858         std::unique_ptr<Decoder> unwrappedDecoder = Decoder::unwrapForTesting(decoder);
859         RELEASE_ASSERT(unwrappedDecoder);
860         processIncomingMessage(WTFMove(unwrappedDecoder));
861
862         SyncMessageState::singleton().dispatchMessages(nullptr);
863     } else {
864         // Hand off both the decoder and encoder to the client.
865         m_client.didReceiveSyncMessage(*this, decoder, replyEncoder);
866     }
867
868     // FIXME: If the message was invalid, we should send back a SyncMessageError.
869     ASSERT(!decoder.isInvalid());
870
871     if (replyEncoder)
872         sendSyncReply(WTFMove(replyEncoder));
873 }
874
875 void Connection::dispatchDidReceiveInvalidMessage(const CString& messageReceiverNameString, const CString& messageNameString)
876 {
877     ASSERT(RunLoop::isMain());
878
879     if (!isValid())
880         return;
881
882     m_client.didReceiveInvalidMessage(*this, StringReference(messageReceiverNameString.data(), messageReceiverNameString.length()), StringReference(messageNameString.data(), messageNameString.length()));
883 }
884
885 void Connection::didFailToSendSyncMessage()
886 {
887     if (!m_shouldExitOnSyncMessageSendFailure)
888         return;
889
890     exit(0);
891 }
892
893 void Connection::enqueueIncomingMessage(std::unique_ptr<Decoder> incomingMessage)
894 {
895     {
896         std::lock_guard<Lock> lock(m_incomingMessagesMutex);
897         m_incomingMessages.append(WTFMove(incomingMessage));
898     }
899
900     RunLoop::main().dispatch([protectedThis = makeRef(*this)]() mutable {
901         protectedThis->dispatchOneMessage();
902     });
903 }
904
905 void Connection::dispatchMessage(Decoder& decoder)
906 {
907     m_client.didReceiveMessage(*this, decoder);
908 }
909
910 void Connection::dispatchMessage(std::unique_ptr<Decoder> message)
911 {
912     if (!isValid())
913         return;
914
915     if (message->shouldUseFullySynchronousModeForTesting()) {
916         if (!m_fullySynchronousModeIsAllowedForTesting) {
917             m_client.didReceiveInvalidMessage(*this, message->messageReceiverName(), message->messageName());
918             return;
919         }
920         m_inDispatchMessageMarkedToUseFullySynchronousModeForTesting++;
921     }
922
923     m_inDispatchMessageCount++;
924
925     if (message->shouldDispatchMessageWhenWaitingForSyncReply())
926         m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount++;
927
928     bool oldDidReceiveInvalidMessage = m_didReceiveInvalidMessage;
929     m_didReceiveInvalidMessage = false;
930
931     if (message->isSyncMessage())
932         dispatchSyncMessage(*message);
933     else
934         dispatchMessage(*message);
935
936     m_didReceiveInvalidMessage |= message->isInvalid();
937     m_inDispatchMessageCount--;
938
939     // FIXME: For Delayed synchronous messages, we should not decrement the counter until we send a response.
940     // Otherwise, we would deadlock if processing the message results in a sync message back after we exit this function.
941     if (message->shouldDispatchMessageWhenWaitingForSyncReply())
942         m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount--;
943
944     if (message->shouldUseFullySynchronousModeForTesting())
945         m_inDispatchMessageMarkedToUseFullySynchronousModeForTesting--;
946
947     if (m_didReceiveInvalidMessage && isValid())
948         m_client.didReceiveInvalidMessage(*this, message->messageReceiverName(), message->messageName());
949
950     m_didReceiveInvalidMessage = oldDidReceiveInvalidMessage;
951 }
952
953 void Connection::dispatchOneMessage()
954 {
955     std::unique_ptr<Decoder> message;
956
957     {
958         std::lock_guard<Lock> lock(m_incomingMessagesMutex);
959         if (m_incomingMessages.isEmpty())
960             return;
961
962         message = m_incomingMessages.takeFirst();
963     }
964
965     dispatchMessage(WTFMove(message));
966 }
967
968 void Connection::wakeUpRunLoop()
969 {
970     RunLoop::main().wakeUp();
971 }
972
973 } // namespace IPC